Building a Unified Telemetry Pipeline Across SwiftUI RabbitMQ and ActiveMQ with Fluentd


The project started with a familiar mandate: build a new, modern iOS application using SwiftUI. The backend, however, was anything but modern. It was a sprawling collection of services stitched together over a decade, a living museum of architectural fads. A significant portion communicated via RabbitMQ, while a critical, untouchable legacy system relied on ActiveMQ. The immediate pain point manifested within days of the first internal release. A user action in the SwiftUI app would fail, and we were blind. The support ticket would read “The checkout button is broken,” and we’d have to manually inspect logs on a dozen different machines, trying to piece together a story from timestamps and prayers. There was no concept of a holistic request trace. Our new, elegant frontend was tethered to a backend black box.

The initial proposal to standardize on a single message bus and instrument everything with a commercial APM was rejected. The migration cost and risk associated with touching the ActiveMQ system were deemed too high for the current roadmap. We were tasked with achieving end-to-end visibility without a significant backend refactor. The challenge was clear: bridge a state-of-the-art SwiftUI client with a heterogeneous, partially archaic backend, and create a single, correlated view of every user interaction. Our tool of choice for this plumbing nightmare had to be flexible, powerful, and vendor-agnostic. We chose Fluentd.

Our core concept was to treat Fluentd not just as a log forwarder, but as a central telemetry transformation and enrichment engine. All components—the SwiftUI app, the RabbitMQ services, the API gateway, and even the hostile ActiveMQ services—would send their raw logs to a distributed set of Fluentd agents. These agents would parse, clean, and normalize the data before forwarding it to a central Fluentd aggregator. The aggregator’s job was to perform the most critical step: enrich logs with a consistent trace_id to stitch the disparate streams into a coherent narrative.

First, we defined the canonical data model. Every single log event, regardless of its origin, had to conform to this JSON structure before being indexed in our Elasticsearch cluster. A common mistake is to let each service define its own log schema; this makes correlation nearly impossible.

{
  "timestamp": "2023-10-27T11:30:00.123Z",
  "service.name": "ios-checkout-swiftui",
  "trace_id": "d8a7f0c1-3e4b-4b9a-8c7d-9e6f5a2b1c0d",
  "span_id": "a1b2c3d4e5f6g7h8",
  "log.level": "INFO",
  "message": "User initiated checkout process",
  "user.id": "usr-12345",
  "session.id": "sess-abcde",
  "ios.device.model": "iPhone15,2",
  "ios.os.version": "17.1"
}

The trace_id is the lynchpin. It is generated once at the beginning of a user interaction in the SwiftUI app and must be propagated through every subsequent network call and message bus hop.

Instrumenting the SwiftUI Client

The first step was to control the source. We built a lightweight, non-blocking logging client directly into the SwiftUI application. Using a standard library like os.log was insufficient as we needed to batch logs and ship them over HTTP with custom headers and structured formats.

Here’s the core of the TelemetryManager in Swift. It generates a trace_id for a user session, batches log events, and periodically flushes them to a dedicated Fluentd HTTP endpoint.

import Foundation
import Combine

// Represents a single structured log event
struct LogEvent: Codable, Identifiable {
    let id = UUID()
    let timestamp: String
    let serviceName: String = "ios-checkout-swiftui"
    let traceId: String
    let spanId: String = UUID().uuidString.lowercased().replacingOccurrences(of: "-", with: "").prefix(16).description
    let logLevel: String
    let message: String
    // Add other relevant metadata
    let userId: String?
    let sessionId: String
}

// A simple protocol for log handlers
protocol LogHandler {
    func log(_ event: LogEvent)
}

// Manages the telemetry lifecycle, batching, and dispatching
class TelemetryManager {
    static let shared = TelemetryManager()

    private var buffer: [LogEvent] = []
    private let bufferQueue = DispatchQueue(label: "com.company.telemetryManager.bufferQueue")
    private let flushInterval: TimeInterval = 15.0 // Flush every 15 seconds
    private let maxBufferSize: Int = 100 // Or when buffer reaches 100 events
    private var timer: Timer?

    private var currentTraceId: String = UUID().uuidString
    private var currentSessionId: String = UUID().uuidString

    private lazy var httpLogHandler = HTTPLogHandler(endpoint: URL(string: "http://fluentd.internal.corp:9880/telemetry.ios")!)

    private init() {
        start()
    }

    func start() {
        // Reset trace and session on start
        newTrace()
        
        // In a real app, you might tie this to application lifecycle events
        DispatchQueue.main.async {
            self.timer = Timer.scheduledTimer(withTimeInterval: self.flushInterval, repeats: true) { [weak self] _ in
                self?.flush()
            }
        }
    }
    
    // Call this at the start of a new user journey (e.g., view appearance)
    func newTrace() {
        self.currentTraceId = UUID().uuidString
    }

    func logInfo(message: String, userId: String? = nil) {
        let event = LogEvent(
            timestamp: ISO8601DateFormatter().string(from: Date()),
            traceId: self.currentTraceId,
            logLevel: "INFO",
            message: message,
            userId: userId,
            sessionId: self.currentSessionId
        )
        enqueue(event)
    }
    
    func logError(message: String, error: Error, userId: String? = nil) {
        let fullMessage = "\(message) - Error: \(error.localizedDescription)"
        let event = LogEvent(
            timestamp: ISO8601DateFormatter().string(from: Date()),
            traceId: self.currentTraceId,
            logLevel: "ERROR",
            message: fullMessage,
            userId: userId,
            sessionId: self.currentSessionId
        )
        enqueue(event)
    }

    private func enqueue(_ event: LogEvent) {
        bufferQueue.async {
            self.buffer.append(event)
            if self.buffer.count >= self.maxBufferSize {
                self.flush()
            }
        }
    }

    private func flush() {
        bufferQueue.async {
            guard !self.buffer.isEmpty else { return }
            let eventsToFlush = self.buffer
            self.buffer.removeAll()
            
            // In a production app, add retry logic with exponential backoff
            self.httpLogHandler.log(eventsToFlush)
        }
    }
    
    // Critical function for propagating the trace context to backend services
    func getTraceHeaders() -> [String: String] {
        return [
            "X-Trace-Id": self.currentTraceId,
            "X-Span-Id": UUID().uuidString.lowercased().replacingOccurrences(of: "-", with: "").prefix(16).description
        ]
    }
}


// Handles sending batched logs over HTTP
class HTTPLogHandler {
    private let endpoint: URL
    private let urlSession: URLSession

    init(endpoint: URL) {
        self.endpoint = endpoint
        let config = URLSessionConfiguration.default
        config.timeoutIntervalForRequest = 10
        config.timeoutIntervalForResource = 30
        self.urlSession = URLSession(configuration: config)
    }

    func log(_ events: [LogEvent]) {
        guard !events.isEmpty else { return }
        
        var request = URLRequest(url: endpoint)
        request.httpMethod = "POST"
        request.setValue("application/json", forHTTPHeaderField: "Content-Type")

        do {
            let encoder = JSONEncoder()
            request.httpBody = try encoder.encode(events)
        } catch {
            // In a real scenario, this should log locally to a fallback mechanism
            print("Failed to encode log events: \(error)")
            return
        }
        
        let task = urlSession.dataTask(with: request) { data, response, error in
            if let error = error {
                // Production code needs a robust retry queue here.
                // For now, we just print the failure.
                print("Failed to send logs: \(error.localizedDescription)")
                return
            }
            
            guard let httpResponse = response as? HTTPURLResponse, (200...299).contains(httpResponse.statusCode) else {
                print("Log submission failed with status code: \((response as? HTTPURLResponse)?.statusCode ?? -1)")
                return
            }
        }
        task.resume()
    }
}

Any API call made from the app now uses TelemetryManager.shared.getTraceHeaders() to inject the trace context into the HTTP headers. This is the first and most vital link in the chain.

The Fluentd Aggregator: The Central Nervous System

Our central Fluentd instance acts as the brain. It receives data from various sources, applies transformations, and routes to the final destination. The configuration is complex but modular.

Here’s a breakdown of the central fluent.conf:

# fluent.conf on the central aggregator node

# ===============================================================
# SOURCES
# ===============================================================

# 1. Receives structured JSON logs from the SwiftUI client
<source>
  @type http
  port 9880
  bind 0.0.0.0
  tag ios.app.raw
  
  <parse>
    @type json
  </parse>
</source>

# 2. Receives structured logs from modern backend services (e.g., RabbitMQ producers/consumers)
# These services are expected to log to stdout in JSON format already.
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

# 3. Receives UNSTRUCTURED text logs from legacy ActiveMQ services
# These are forwarded from a local Fluentd agent on the legacy machine.
<source>
  @type forward
  port 24225
  bind 0.0.0.0
  tag legacy.activemq.raw
</source>

# ===============================================================
# PROCESSING & ENRICHMENT PIPELINE
# ===============================================================

# Match raw iOS logs, validate, and tag them for canonical processing.
<match ios.app.raw>
  @type rewrite_tag_filter
  <rule>
    key message
    pattern .+
    tag canonical.logs
  </rule>
</match>

# The main processing pipeline for all structured logs
<filter canonical.logs>
  @type record_transformer
  enable_ruby true
  <record>
    # Ensure timestamp is in the correct format
    timestamp ${Time.parse(record["timestamp"]).iso8601(3)}
    # Add cluster metadata
    k8s.cluster "production-us-east-1"
  </record>
</filter>

# The "magic" part: handling the legacy ActiveMQ logs.
# This is where we parse unstructured text and attempt to correlate it.
<filter legacy.activemq.raw>
  @type parser
  key_name message
  reserve_data true
  <parse>
    @type multiline_grok
    grok_pattern %{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread_name}\] %{LOGLEVEL:log.level} %{GREEDYDATA:class_name} - %{GREEDYDATA:message_body}
  </parse>
</filter>

# After parsing, we have structured fields but still lack the trace_id.
# The API gateway that calls this legacy service is responsible for logging a "correlation map".
# e.g., {"correlation_id": "req-9876", "trace_id": "d8a7f0c1..."}
# Here, we assume the legacy log contains "correlation_id=req-9876" inside its message_body.
# We use a Lua filter for the complex lookup logic.
<filter legacy.activemq.raw>
  @type record_transformer
  enable_ruby true
  auto_typecast true
  <record>
    correlation_id ${record["message_body"].match(/correlation_id=([a-zA-Z0-9\-]+)/) ? $1 : nil}
  </record>
</filter>

# This filter uses an external cache (Redis) to find the trace_id from the correlation_id.
# This requires a custom Fluentd plugin or a Lua script. A simplified Lua example:
<filter legacy.activemq.raw.**>
  @type lua
  script_path /etc/fluent/lua/enrich_with_trace_id.lua
  # The Lua script connects to Redis, fetches `trace_id` using `correlation_id`,
  # and adds it to the record. If not found, it flags the log.
</filter>

# After enrichment, re-tag the legacy logs to join the main pipeline.
<match legacy.activemq.raw>
  @type rewrite_tag_filter
  <rule>
    key trace_id # Only logs that were successfully enriched
    pattern .+
    tag canonical.logs
  </rule>
</match>

# ===============================================================
# OUTPUT
# ===============================================================

<match canonical.logs>
  @type elasticsearch
  host elasticsearch.internal.corp
  port 9200
  logstash_format true
  logstash_prefix "app-telemetry"
  logstash_dateformat "%Y.%m.%d"
  
  <buffer>
    @type file
    path /var/log/fluent/buffer/es
    flush_interval 10s
    retry_max_interval 300
    retry_forever true
  </buffer>
</match>

The enrich_with_trace_id.lua script is the most delicate part of this architecture. It’s the bridge to the past.

-- /etc/fluent/lua/enrich_with_trace_id.lua
local redis = require('redis')

-- In a real environment, use connection pooling and better error handling.
local client = redis.connect('redis.internal.corp', 6379)

function filter(tag, es, record)
  local correlation_id = record['correlation_id']

  if correlation_id == nil then
    record['enrichment_status'] = 'failed_no_correlation_id'
    return 2, es, record
  end

  -- The API gateway must have written this key to Redis with a short TTL
  -- HGET trace_context:req-9876 trace_id
  local trace_id = client:hget('trace_context:' .. correlation_id, 'trace_id')

  if trace_id then
    record['trace_id'] = trace_id
    record['service.name'] = "legacy-payment-service" -- Hardcode service name
    record['enrichment_status'] = 'success'
  else
    record['enrichment_status'] = 'failed_lookup_miss'
  end
  
  -- The original correlation_id can be removed if not needed downstream
  record['correlation_id'] = nil

  return 2, es, record
end

Bridging RabbitMQ and ActiveMQ

The RabbitMQ services were simpler. Being more modern, we could enforce a standard:

  1. Read X-Trace-Id from incoming HTTP requests.
  2. Log all messages as structured JSON to stdout.
  3. When publishing a message to RabbitMQ, add the trace_id to the message’s headers.
  4. The consumer service must read the trace_id from the message headers and continue the chain.

A Python consumer example using pika:

import pika
import json
import logging

# Configure structured logging
# ... (use a library like python-json-logger)

def callback(ch, method, properties, body):
    trace_id = properties.headers.get('x-trace-id')
    
    # Associate the trace_id with this thread of execution for all subsequent logs
    # This can be done using context vars or a similar mechanism.
    
    logging.info("Processing message", extra={'trace_id': trace_id, 'payload_size': len(body)})
    
    try:
        # Business logic here
        process_payload(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        logging.error("Failed to process message", extra={'trace_id': trace_id, 'error': str(e)})
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# ... connection logic ...
channel.basic_consume(queue='my_queue', on_message_callback=callback)
channel.start_consuming()

The ActiveMQ service was the true monster. We couldn’t modify its code. The solution was the correlation trick described in the Fluentd configuration. The API gateway fronting this service was modified to do two things upon receiving a request from the SwiftUI app:

  1. Generate a unique correlation_id (e.g., a request UUID).
  2. Call the legacy ActiveMQ service, passing this correlation_id in a way it understood (e.g., as part of the message body).
  3. Immediately write a record to Redis: HSET trace_context:correlation_id_123 trace_id abc-def-ghi. This record has a short TTL (e.g., 5 minutes).

When the unstructured log from the ActiveMQ service eventually arrives at our central Fluentd, the Lua script performs the Redis lookup, injecting the trace_id and completing the chain.

The full data flow can be visualized:

graph TD
    subgraph "iOS Device"
        A[SwiftUI App] -- 1. Generate trace_id --> A
        A -- 2. Batch & Send Logs (JSON over HTTP) --> B[Fluentd Endpoint: 9880]
    end

    subgraph "API Gateway"
        C[Gateway Service]
        A -- 3. API Call w/ X-Trace-Id Header --> C
    end
    
    subgraph "Modern Services"
        D[RabbitMQ Producer]
        E[RabbitMQ Consumer]
        F[RabbitMQ]
        C -- 4a. Call with X-Trace-Id --> D
        D -- 5a. Publish Msg w/ trace_id in header --> F
        F -- 6a. Deliver Msg --> E
        D -- Logs to stdout --> G[Fluentd Agent]
        E -- Logs to stdout --> G
    end

    subgraph "Legacy System"
        H[ActiveMQ Service]
        I[Redis]
        J[ActiveMQ]
        C -- 4b. Generate correlation_id --> C
        C -- 5b. Write {corr_id: trace_id} --> I
        C -- 6b. Call with correlation_id --> H
        H -- 7b. Logs to file (unstructured) --> K[Fluentd Agent]
    end
    
    subgraph "Central Observability Pipeline"
        B -- Forward --> L[Central Fluentd Aggregator]
        G -- Forward --> L
        K -- Forward --> L
        L -- 8. Parse, Enrich, Correlate via Redis --> L
        L -- 9. Send Unified JSON --> M[Elasticsearch]
    end

The final result was transformative. We could now take a trace_id from a failed SwiftUI session and execute a single query in Kibana. The results would show, in perfect chronological order: the user tapping the button in the app, the API gateway receiving the request, the RabbitMQ producer processing it, the consumer picking it up, and—critically—the unstructured, previously-opaque log line from the ActiveMQ service, now fully enriched with the same trace_id and parsed into a structured format. We had achieved our goal of end-to-end visibility.

This architecture, however, is a technical bridge, not a final destination. Its primary weakness is the correlation mechanism for the legacy system. It depends on Redis being available and performant, and the logic in the API gateway must be perfectly in sync with the Fluentd enrichment script. A change in the legacy log format would require an immediate and precise update to the Grok patterns in our Fluentd config, making the pipeline brittle. The significant investment in Fluentd configuration and custom scripting bought us invaluable time and operational stability, allowing the business to move forward without being blocked by a massive refactoring project. The long-term goal remains to modernize the legacy services, at which point this complex enrichment logic can be decommissioned in favor of native instrumentation. The pipeline also currently only handles logs; a future iteration would be to adapt it to receive and correlate metrics and traces via the OpenTelemetry protocol, with Fluentd acting as an OTLP receiver.


  TOC