The project started with a familiar mandate: build a new, modern iOS application using SwiftUI. The backend, however, was anything but modern. It was a sprawling collection of services stitched together over a decade, a living museum of architectural fads. A significant portion communicated via RabbitMQ, while a critical, untouchable legacy system relied on ActiveMQ. The immediate pain point manifested within days of the first internal release. A user action in the SwiftUI app would fail, and we were blind. The support ticket would read “The checkout button is broken,” and we’d have to manually inspect logs on a dozen different machines, trying to piece together a story from timestamps and prayers. There was no concept of a holistic request trace. Our new, elegant frontend was tethered to a backend black box.
The initial proposal to standardize on a single message bus and instrument everything with a commercial APM was rejected. The migration cost and risk associated with touching the ActiveMQ system were deemed too high for the current roadmap. We were tasked with achieving end-to-end visibility without a significant backend refactor. The challenge was clear: bridge a state-of-the-art SwiftUI client with a heterogeneous, partially archaic backend, and create a single, correlated view of every user interaction. Our tool of choice for this plumbing nightmare had to be flexible, powerful, and vendor-agnostic. We chose Fluentd.
Our core concept was to treat Fluentd not just as a log forwarder, but as a central telemetry transformation and enrichment engine. All components—the SwiftUI app, the RabbitMQ services, the API gateway, and even the hostile ActiveMQ services—would send their raw logs to a distributed set of Fluentd agents. These agents would parse, clean, and normalize the data before forwarding it to a central Fluentd aggregator. The aggregator’s job was to perform the most critical step: enrich logs with a consistent trace_id
to stitch the disparate streams into a coherent narrative.
First, we defined the canonical data model. Every single log event, regardless of its origin, had to conform to this JSON structure before being indexed in our Elasticsearch cluster. A common mistake is to let each service define its own log schema; this makes correlation nearly impossible.
{
"timestamp": "2023-10-27T11:30:00.123Z",
"service.name": "ios-checkout-swiftui",
"trace_id": "d8a7f0c1-3e4b-4b9a-8c7d-9e6f5a2b1c0d",
"span_id": "a1b2c3d4e5f6g7h8",
"log.level": "INFO",
"message": "User initiated checkout process",
"user.id": "usr-12345",
"session.id": "sess-abcde",
"ios.device.model": "iPhone15,2",
"ios.os.version": "17.1"
}
The trace_id
is the lynchpin. It is generated once at the beginning of a user interaction in the SwiftUI app and must be propagated through every subsequent network call and message bus hop.
Instrumenting the SwiftUI Client
The first step was to control the source. We built a lightweight, non-blocking logging client directly into the SwiftUI application. Using a standard library like os.log
was insufficient as we needed to batch logs and ship them over HTTP with custom headers and structured formats.
Here’s the core of the TelemetryManager
in Swift. It generates a trace_id
for a user session, batches log events, and periodically flushes them to a dedicated Fluentd HTTP endpoint.
import Foundation
import Combine
// Represents a single structured log event
struct LogEvent: Codable, Identifiable {
let id = UUID()
let timestamp: String
let serviceName: String = "ios-checkout-swiftui"
let traceId: String
let spanId: String = UUID().uuidString.lowercased().replacingOccurrences(of: "-", with: "").prefix(16).description
let logLevel: String
let message: String
// Add other relevant metadata
let userId: String?
let sessionId: String
}
// A simple protocol for log handlers
protocol LogHandler {
func log(_ event: LogEvent)
}
// Manages the telemetry lifecycle, batching, and dispatching
class TelemetryManager {
static let shared = TelemetryManager()
private var buffer: [LogEvent] = []
private let bufferQueue = DispatchQueue(label: "com.company.telemetryManager.bufferQueue")
private let flushInterval: TimeInterval = 15.0 // Flush every 15 seconds
private let maxBufferSize: Int = 100 // Or when buffer reaches 100 events
private var timer: Timer?
private var currentTraceId: String = UUID().uuidString
private var currentSessionId: String = UUID().uuidString
private lazy var httpLogHandler = HTTPLogHandler(endpoint: URL(string: "http://fluentd.internal.corp:9880/telemetry.ios")!)
private init() {
start()
}
func start() {
// Reset trace and session on start
newTrace()
// In a real app, you might tie this to application lifecycle events
DispatchQueue.main.async {
self.timer = Timer.scheduledTimer(withTimeInterval: self.flushInterval, repeats: true) { [weak self] _ in
self?.flush()
}
}
}
// Call this at the start of a new user journey (e.g., view appearance)
func newTrace() {
self.currentTraceId = UUID().uuidString
}
func logInfo(message: String, userId: String? = nil) {
let event = LogEvent(
timestamp: ISO8601DateFormatter().string(from: Date()),
traceId: self.currentTraceId,
logLevel: "INFO",
message: message,
userId: userId,
sessionId: self.currentSessionId
)
enqueue(event)
}
func logError(message: String, error: Error, userId: String? = nil) {
let fullMessage = "\(message) - Error: \(error.localizedDescription)"
let event = LogEvent(
timestamp: ISO8601DateFormatter().string(from: Date()),
traceId: self.currentTraceId,
logLevel: "ERROR",
message: fullMessage,
userId: userId,
sessionId: self.currentSessionId
)
enqueue(event)
}
private func enqueue(_ event: LogEvent) {
bufferQueue.async {
self.buffer.append(event)
if self.buffer.count >= self.maxBufferSize {
self.flush()
}
}
}
private func flush() {
bufferQueue.async {
guard !self.buffer.isEmpty else { return }
let eventsToFlush = self.buffer
self.buffer.removeAll()
// In a production app, add retry logic with exponential backoff
self.httpLogHandler.log(eventsToFlush)
}
}
// Critical function for propagating the trace context to backend services
func getTraceHeaders() -> [String: String] {
return [
"X-Trace-Id": self.currentTraceId,
"X-Span-Id": UUID().uuidString.lowercased().replacingOccurrences(of: "-", with: "").prefix(16).description
]
}
}
// Handles sending batched logs over HTTP
class HTTPLogHandler {
private let endpoint: URL
private let urlSession: URLSession
init(endpoint: URL) {
self.endpoint = endpoint
let config = URLSessionConfiguration.default
config.timeoutIntervalForRequest = 10
config.timeoutIntervalForResource = 30
self.urlSession = URLSession(configuration: config)
}
func log(_ events: [LogEvent]) {
guard !events.isEmpty else { return }
var request = URLRequest(url: endpoint)
request.httpMethod = "POST"
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
do {
let encoder = JSONEncoder()
request.httpBody = try encoder.encode(events)
} catch {
// In a real scenario, this should log locally to a fallback mechanism
print("Failed to encode log events: \(error)")
return
}
let task = urlSession.dataTask(with: request) { data, response, error in
if let error = error {
// Production code needs a robust retry queue here.
// For now, we just print the failure.
print("Failed to send logs: \(error.localizedDescription)")
return
}
guard let httpResponse = response as? HTTPURLResponse, (200...299).contains(httpResponse.statusCode) else {
print("Log submission failed with status code: \((response as? HTTPURLResponse)?.statusCode ?? -1)")
return
}
}
task.resume()
}
}
Any API call made from the app now uses TelemetryManager.shared.getTraceHeaders()
to inject the trace context into the HTTP headers. This is the first and most vital link in the chain.
The Fluentd Aggregator: The Central Nervous System
Our central Fluentd instance acts as the brain. It receives data from various sources, applies transformations, and routes to the final destination. The configuration is complex but modular.
Here’s a breakdown of the central fluent.conf
:
# fluent.conf on the central aggregator node
# ===============================================================
# SOURCES
# ===============================================================
# 1. Receives structured JSON logs from the SwiftUI client
<source>
@type http
port 9880
bind 0.0.0.0
tag ios.app.raw
<parse>
@type json
</parse>
</source>
# 2. Receives structured logs from modern backend services (e.g., RabbitMQ producers/consumers)
# These services are expected to log to stdout in JSON format already.
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
# 3. Receives UNSTRUCTURED text logs from legacy ActiveMQ services
# These are forwarded from a local Fluentd agent on the legacy machine.
<source>
@type forward
port 24225
bind 0.0.0.0
tag legacy.activemq.raw
</source>
# ===============================================================
# PROCESSING & ENRICHMENT PIPELINE
# ===============================================================
# Match raw iOS logs, validate, and tag them for canonical processing.
<match ios.app.raw>
@type rewrite_tag_filter
<rule>
key message
pattern .+
tag canonical.logs
</rule>
</match>
# The main processing pipeline for all structured logs
<filter canonical.logs>
@type record_transformer
enable_ruby true
<record>
# Ensure timestamp is in the correct format
timestamp ${Time.parse(record["timestamp"]).iso8601(3)}
# Add cluster metadata
k8s.cluster "production-us-east-1"
</record>
</filter>
# The "magic" part: handling the legacy ActiveMQ logs.
# This is where we parse unstructured text and attempt to correlate it.
<filter legacy.activemq.raw>
@type parser
key_name message
reserve_data true
<parse>
@type multiline_grok
grok_pattern %{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread_name}\] %{LOGLEVEL:log.level} %{GREEDYDATA:class_name} - %{GREEDYDATA:message_body}
</parse>
</filter>
# After parsing, we have structured fields but still lack the trace_id.
# The API gateway that calls this legacy service is responsible for logging a "correlation map".
# e.g., {"correlation_id": "req-9876", "trace_id": "d8a7f0c1..."}
# Here, we assume the legacy log contains "correlation_id=req-9876" inside its message_body.
# We use a Lua filter for the complex lookup logic.
<filter legacy.activemq.raw>
@type record_transformer
enable_ruby true
auto_typecast true
<record>
correlation_id ${record["message_body"].match(/correlation_id=([a-zA-Z0-9\-]+)/) ? $1 : nil}
</record>
</filter>
# This filter uses an external cache (Redis) to find the trace_id from the correlation_id.
# This requires a custom Fluentd plugin or a Lua script. A simplified Lua example:
<filter legacy.activemq.raw.**>
@type lua
script_path /etc/fluent/lua/enrich_with_trace_id.lua
# The Lua script connects to Redis, fetches `trace_id` using `correlation_id`,
# and adds it to the record. If not found, it flags the log.
</filter>
# After enrichment, re-tag the legacy logs to join the main pipeline.
<match legacy.activemq.raw>
@type rewrite_tag_filter
<rule>
key trace_id # Only logs that were successfully enriched
pattern .+
tag canonical.logs
</rule>
</match>
# ===============================================================
# OUTPUT
# ===============================================================
<match canonical.logs>
@type elasticsearch
host elasticsearch.internal.corp
port 9200
logstash_format true
logstash_prefix "app-telemetry"
logstash_dateformat "%Y.%m.%d"
<buffer>
@type file
path /var/log/fluent/buffer/es
flush_interval 10s
retry_max_interval 300
retry_forever true
</buffer>
</match>
The enrich_with_trace_id.lua
script is the most delicate part of this architecture. It’s the bridge to the past.
-- /etc/fluent/lua/enrich_with_trace_id.lua
local redis = require('redis')
-- In a real environment, use connection pooling and better error handling.
local client = redis.connect('redis.internal.corp', 6379)
function filter(tag, es, record)
local correlation_id = record['correlation_id']
if correlation_id == nil then
record['enrichment_status'] = 'failed_no_correlation_id'
return 2, es, record
end
-- The API gateway must have written this key to Redis with a short TTL
-- HGET trace_context:req-9876 trace_id
local trace_id = client:hget('trace_context:' .. correlation_id, 'trace_id')
if trace_id then
record['trace_id'] = trace_id
record['service.name'] = "legacy-payment-service" -- Hardcode service name
record['enrichment_status'] = 'success'
else
record['enrichment_status'] = 'failed_lookup_miss'
end
-- The original correlation_id can be removed if not needed downstream
record['correlation_id'] = nil
return 2, es, record
end
Bridging RabbitMQ and ActiveMQ
The RabbitMQ services were simpler. Being more modern, we could enforce a standard:
- Read
X-Trace-Id
from incoming HTTP requests. - Log all messages as structured JSON to stdout.
- When publishing a message to RabbitMQ, add the
trace_id
to the message’s headers. - The consumer service must read the
trace_id
from the message headers and continue the chain.
A Python consumer example using pika
:
import pika
import json
import logging
# Configure structured logging
# ... (use a library like python-json-logger)
def callback(ch, method, properties, body):
trace_id = properties.headers.get('x-trace-id')
# Associate the trace_id with this thread of execution for all subsequent logs
# This can be done using context vars or a similar mechanism.
logging.info("Processing message", extra={'trace_id': trace_id, 'payload_size': len(body)})
try:
# Business logic here
process_payload(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logging.error("Failed to process message", extra={'trace_id': trace_id, 'error': str(e)})
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
# ... connection logic ...
channel.basic_consume(queue='my_queue', on_message_callback=callback)
channel.start_consuming()
The ActiveMQ service was the true monster. We couldn’t modify its code. The solution was the correlation trick described in the Fluentd configuration. The API gateway fronting this service was modified to do two things upon receiving a request from the SwiftUI app:
- Generate a unique
correlation_id
(e.g., a request UUID). - Call the legacy ActiveMQ service, passing this
correlation_id
in a way it understood (e.g., as part of the message body). - Immediately write a record to Redis:
HSET trace_context:correlation_id_123 trace_id abc-def-ghi
. This record has a short TTL (e.g., 5 minutes).
When the unstructured log from the ActiveMQ service eventually arrives at our central Fluentd, the Lua script performs the Redis lookup, injecting the trace_id
and completing the chain.
The full data flow can be visualized:
graph TD subgraph "iOS Device" A[SwiftUI App] -- 1. Generate trace_id --> A A -- 2. Batch & Send Logs (JSON over HTTP) --> B[Fluentd Endpoint: 9880] end subgraph "API Gateway" C[Gateway Service] A -- 3. API Call w/ X-Trace-Id Header --> C end subgraph "Modern Services" D[RabbitMQ Producer] E[RabbitMQ Consumer] F[RabbitMQ] C -- 4a. Call with X-Trace-Id --> D D -- 5a. Publish Msg w/ trace_id in header --> F F -- 6a. Deliver Msg --> E D -- Logs to stdout --> G[Fluentd Agent] E -- Logs to stdout --> G end subgraph "Legacy System" H[ActiveMQ Service] I[Redis] J[ActiveMQ] C -- 4b. Generate correlation_id --> C C -- 5b. Write {corr_id: trace_id} --> I C -- 6b. Call with correlation_id --> H H -- 7b. Logs to file (unstructured) --> K[Fluentd Agent] end subgraph "Central Observability Pipeline" B -- Forward --> L[Central Fluentd Aggregator] G -- Forward --> L K -- Forward --> L L -- 8. Parse, Enrich, Correlate via Redis --> L L -- 9. Send Unified JSON --> M[Elasticsearch] end
The final result was transformative. We could now take a trace_id
from a failed SwiftUI session and execute a single query in Kibana. The results would show, in perfect chronological order: the user tapping the button in the app, the API gateway receiving the request, the RabbitMQ producer processing it, the consumer picking it up, and—critically—the unstructured, previously-opaque log line from the ActiveMQ service, now fully enriched with the same trace_id
and parsed into a structured format. We had achieved our goal of end-to-end visibility.
This architecture, however, is a technical bridge, not a final destination. Its primary weakness is the correlation mechanism for the legacy system. It depends on Redis being available and performant, and the logic in the API gateway must be perfectly in sync with the Fluentd enrichment script. A change in the legacy log format would require an immediate and precise update to the Grok patterns in our Fluentd config, making the pipeline brittle. The significant investment in Fluentd configuration and custom scripting bought us invaluable time and operational stability, allowing the business to move forward without being blocked by a massive refactoring project. The long-term goal remains to modernize the legacy services, at which point this complex enrichment logic can be decommissioned in favor of native instrumentation. The pipeline also currently only handles logs; a future iteration would be to adapt it to receive and correlate metrics and traces via the OpenTelemetry protocol, with Fluentd acting as an OTLP receiver.