Utilizing Redis Streams for a Resilient MLOps Data Ingestion Layer from Swift Applications


The core pain point was latency. Our on-device personalization models, running in a native Swift application, were updated via a batch pipeline that ran every 24 hours. A user’s interactions today would only influence their experience tomorrow. In the world of real-time recommendations, this is an eternity. Our initial attempts to solve this with direct, per-interaction REST API calls from the client to our MLOps backend created a new set of problems: excessive network chatter, poor performance on spotty connections, and a tightly coupled architecture that was brittle and difficult to scale. We needed an asynchronous, persistent, and ordered event ingestion pipeline that could bridge our Swift clients directly with the feature engineering services on the backend.

Our first thought was a full-blown message queue like Kafka. The engineering overhead was a non-starter for our team size. We were already using Redis extensively for caching, and the operations team was comfortable with it. When Redis Streams became generally available, we saw a potential fit. It offered persistence, consumer groups for scalable processing, and message acknowledgements—the key primitives of a real message broker, but with the operational simplicity of Redis. The decision was made to build a proof-of-concept around it.

The architecture we settled on was straightforward in concept:

  1. The Swift client serializes user interaction events (e.g., productViewed, articleFavorited).
  2. These events are pushed directly into a Redis Stream using the XADD command.
  3. A Python-based consumer service, part of our MLOps infrastructure, reads from this stream in a consumer group, processes the events to update a real-time feature store, and acknowledges the messages.
sequenceDiagram
    participant SwiftClient as iOS App
    participant Redis
    participant FeatureProcessor as Feature Engineering Service (Python)
    participant FeatureStore as Real-time Feature Store

    SwiftClient->>+Redis: XADD user-interactions * event_type="view" ...
    Note right of SwiftClient: Event published to Stream
    Redis-->>-SwiftClient: Message ID (e.g., 1672531200000-0)

    loop Consumer Polling
        FeatureProcessor->>+Redis: XREADGROUP GROUP ml-group consumer-1 COUNT 100 STREAMS user-interactions >
        Redis-->>-FeatureProcessor: Batch of messages
    end

    Note over FeatureProcessor, FeatureStore: Process each message, update user vectors
    FeatureProcessor->>+FeatureStore: SET user:123:features "..."
    FeatureStore-->>-FeatureProcessor: OK

    FeatureProcessor->>+Redis: XACK user-interactions ml-group 
    Redis-->>-FeatureProcessor: 1 (acknowledged)

The real challenges, as always, emerged during implementation, particularly around guaranteeing data delivery in the face of mobile network instability and potential backend failures.

Building a Resilient Swift Publisher

The first piece of the puzzle was the Swift client. Simply firing an XADD command and hoping for the best is not production-grade. Mobile clients are frequently offline or on unreliable networks. Dropping events during these periods was unacceptable.

Our solution was to build a small persistence layer directly within the Swift app. Events are first written to a local, on-disk queue. A separate background task is responsible for draining this queue and sending the events to Redis. This decouples event generation from network transmission.

We used the excellent RediStack library for Swift, which provides a non-blocking, SwiftNIO-based Redis client.

First, let’s define the event structure and the persistence protocol.

// InteractionEvent.swift

import Foundation

// A simple, serializable struct for our events.
// In a real project, this would likely be generated from a shared schema (e.g., Protobuf).
struct InteractionEvent: Codable, Identifiable {
    let id: UUID
    let eventType: String
    let userId: String
    let itemId: String
    let timestamp: Date
    let metadata: [String: String]?

    init(eventType: String, userId: String, itemId:String, metadata: [String: String]? = nil) {
        self.id = UUID()
        self.eventType = eventType
        self.userId = userId
        self.itemId = itemId
        self.timestamp = Date()
        self.metadata = metadata
    }

    // Convert the event to a dictionary format suitable for Redis Streams.
    // [String: RESPValue] is the target format for RediStack.
    func toRedisFields() -> [String: String] {
        var fields: [String: String] = [
            "event_type": self.eventType,
            "user_id": self.userId,
            "item_id": self.itemId,
            "timestamp": String(self.timestamp.timeIntervalSince1970)
        ]
        if let metadata = self.metadata,
           let jsonData = try? JSONEncoder().encode(metadata),
           let jsonString = String(data: jsonData, encoding: .utf8) {
            fields["metadata_json"] = jsonString
        }
        return fields
    }
}

// A protocol for our persistent event store. This allows us to swap
// the implementation (e.g., from simple file storage to CoreData).
protocol EventStore {
    func save(event: InteractionEvent) throws
    func fetch(limit: Int) throws -> [InteractionEvent]
    func delete(events: [InteractionEvent]) throws
}

Next, the implementation of a simple file-based EventStore. For a real application, using Core Data or GRDB would be more robust, but this illustrates the principle.

// FileEventStore.swift

import Foundation

class FileEventStore: EventStore {
    private let directoryURL: URL
    private let fileManager = FileManager.default
    private let encoder = JSONEncoder()
    private let decoder = JSONDecoder()

    init() throws {
        guard let url = fileManager.urls(for: .applicationSupportDirectory, in: .userDomainMask).first else {
            throw EventStoreError.cannotGetDirectory
        }
        self.directoryURL = url.appendingPathComponent("event_queue")
        try createDirectoryIfNeeded()
    }

    private func createDirectoryIfNeeded() throws {
        if !fileManager.fileExists(atPath: directoryURL.path) {
            try fileManager.createDirectory(at: directoryURL, withIntermediateDirectories: true, attributes: nil)
        }
    }

    func save(event: InteractionEvent) throws {
        let data = try encoder.encode(event)
        let fileURL = directoryURL.appendingPathComponent(event.id.uuidString)
        try data.write(to: fileURL)
    }

    func fetch(limit: Int) throws -> [InteractionEvent] {
        let fileURLs = try fileManager.contentsOfDirectory(at: directoryURL, includingPropertiesForKeys: nil)
        
        let sortedURLs = try fileURLs.sorted {
            let date1 = try $0.resourceValues(forKeys: [.creationDateKey]).creationDate ?? .distantPast
            let date2 = try $1.resourceValues(forKeys: [.creationDateKey]).creationDate ?? .distantPast
            return date1 < date2
        }

        var events: [InteractionEvent] = []
        for url in sortedURLs.prefix(limit) {
            let data = try Data(contentsOf: url)
            let event = try decoder.decode(InteractionEvent.self, from: data)
            events.append(event)
        }
        return events
    }

    func delete(events: [InteractionEvent]) throws {
        for event in events {
            let fileURL = directoryURL.appendingPathComponent(event.id.uuidString)
            if fileManager.fileExists(atPath: fileURL.path) {
                try fileManager.removeItem(at: fileURL)
            }
        }
    }
    
    enum EventStoreError: Error {
        case cannotGetDirectory
    }
}

Finally, the RedisEventPublisher which ties everything together. It uses the EventStore and manages the connection to Redis.

// RedisEventPublisher.swift

import Foundation
import RediStack
import NIO

class RedisEventPublisher {
    private let streamKey = "user-interactions"
    private let eventStore: EventStore
    private var redisClient: RedisClient?
    private let eventLoopGroup: EventLoopGroup
    private var isSyncing = false
    private let syncQueue = DispatchQueue(label: "com.mlops.eventpublisher.sync")

    init(eventStore: EventStore, eventLoopGroup: EventLoopGroup) {
        self.eventStore = eventStore
        self.eventLoopGroup = eventLoopGroup
    }

    func connect(host: String, port: Int) {
        let connectionConfig: RedisConnection.Configuration
        do {
            connectionConfig = try RedisConnection.Configuration(
                hostname: host,
                port: port,
                password: "your-redis-password", // Use proper secret management
                pool: .init(maximumConnectionCount: .one)
            )
            self.redisClient = RedisClient.connect(
                config: connectionConfig,
                on: self.eventLoopGroup
            ).wait()
        } catch {
            // In a real app, log this error extensively.
            print("Failed to connect to Redis: \(error)")
        }
    }

    // This is the main method called by the app to log an event.
    func track(event: InteractionEvent) {
        do {
            try eventStore.save(event: event)
            // Trigger a sync after saving a new event.
            triggerSync()
        } catch {
            print("Failed to save event to local store: \(error)")
        }
    }

    // Kicks off the process of sending stored events to Redis.
    func triggerSync() {
        syncQueue.async {
            guard !self.isSyncing else { return }
            guard let client = self.redisClient else {
                print("Cannot sync, Redis client is not connected.")
                return
            }
            
            self.isSyncing = true
            
            // We use a semaphore to wait for the async operation to complete
            // within our synchronous dispatch queue block. This simplifies the logic.
            let semaphore = DispatchSemaphore(value: 0)

            self.processQueue(client: client) {
                self.isSyncing = false
                semaphore.signal()
            }
            
            semaphore.wait()
        }
    }

    private func processQueue(client: RedisClient, completion: @escaping () -> Void) {
        do {
            let events = try eventStore.fetch(limit: 50) // Process in batches
            if events.isEmpty {
                completion()
                return
            }

            // Using EventLoopFuture to chain async operations.
            let futures = events.map { event in
                return client.xadd(
                    stream: self.streamKey,
                    id: "*", // Let Redis generate the ID
                    fields: event.toRedisFields()
                )
            }

            // When all futures complete successfully, we delete the events from local store.
            EventLoopFuture.whenAllSucceed(futures, on: client.eventLoop).whenComplete { result in
                switch result {
                case .success:
                    do {
                        try self.eventStore.delete(events: events)
                        print("Successfully synced \(events.count) events.")
                        // If we succeeded, there might be more events, so we recurse.
                        self.processQueue(client: client, completion: completion)
                    } catch {
                        print("Failed to delete synced events: \(error)")
                        completion()
                    }
                case .failure(let error):
                    // The most likely failure here is a network issue.
                    // We don't delete the events, they will be retried on the next `triggerSync`.
                    print("Failed to send batch to Redis: \(error). Will retry later.")
                    completion()
                }
            }
        } catch {
            print("Failed to fetch events from store: \(error)")
            completion()
        }
    }
}

This setup gave us a robust publisher. If the Redis connection drops, events simply accumulate on the device. When connectivity is restored, a call to triggerSync (perhaps triggered by a network reachability monitor) sends the backlog.

The Python Consumer and the Problem of Failure

On the backend, we needed a consumer that could read from the user-interactions stream, process the data, and acknowledge it. We chose Python with the redis-py library for its strong presence in the ML ecosystem.

First, we needed to create the stream and the consumer group. This is a one-time setup.

# Connect to Redis via redis-cli
# Create a consumer group named 'feature-engineering-service' for the stream 'user-interactions'.
# The '$' means the group will only see new messages arriving after its creation.
# MKSTREAM ensures the stream is created if it doesn't exist.
XGROUP CREATE user-interactions feature-engineering-service $ MKSTREAM

The initial consumer logic was simple: an infinite loop that reads, processes, and acknowledges.

# consumer_v1.py
import redis
import time
import os

REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
STREAM_KEY = "user-interactions"
GROUP_NAME = "feature-engineering-service"
CONSUMER_NAME = f"consumer-{os.getpid()}"

r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)

def process_event(event_data):
    """
    Placeholder for the actual feature engineering logic.
    This might involve updating a feature store like DynamoDB, another Redis instance,
    or calling another service. We simulate a potential failure.
    """
    print(f"Processing event for user: {event_data.get('user_id')}")
    if event_data.get('user_id') == "user_fail_simulation":
        raise ValueError("Simulated processing failure")
    time.sleep(0.1) # Simulate work
    print(f"Successfully processed event.")

def main():
    print(f"Starting consumer '{CONSUMER_NAME}' for group '{GROUP_NAME}'...")
    while True:
        try:
            # Block for up to 2 seconds waiting for new messages.
            # Using '>' as the ID means we only want messages that have never
            # been delivered to any consumer in this group.
            messages = r.xreadgroup(
                GROUP_NAME,
                CONSUMER_NAME,
                {STREAM_KEY: '>'},
                count=10,
                block=2000
            )

            if not messages:
                continue

            for stream, msg_list in messages:
                for msg_id, data in msg_list:
                    print(f"Received message {msg_id}")
                    try:
                        process_event(data)
                        # Acknowledge the message so it's not delivered again.
                        r.xack(STREAM_KEY, GROUP_NAME, msg_id)
                    except Exception as e:
                        print(f"ERROR: Failed to process message {msg_id}: {e}")
                        # What do we do here? This is the critical problem.

        except Exception as e:
            print(f"An error occurred in the consumer loop: {e}")
            time.sleep(5)

if __name__ == "__main__":
    main()

The problem is highlighted in the except block. If process_event fails (e.g., the feature store database is temporarily unavailable), we don’t XACK the message. This means the message remains in the stream and will be re-delivered later. But to whom? And when?

Redis handles this with a “Pending Entries List” (PEL). Every message delivered to a consumer is added to the PEL for that consumer group. It’s only removed upon XACK. If our consumer crashes after receiving a message but before acknowledging it, that message stays in the PEL. When the consumer restarts, it can ask for pending messages.

The issue is with “stuck” messages. If a consumer is stuck processing a message for a long time or crashes permanently, its pending messages will never be processed. This is where a second “janitor” process becomes essential. This process periodically checks the PEL for messages that have been pending for too long and re-assigns them to other, healthier consumers in the group using the XCLAIM command.

This led to our v2 architecture: a main consumer and a separate janitor/monitor process.

# consumer_v2.py
# (main function remains the same as v1, but the error handling logic is what we need to perfect)

# janitor.py - A separate process to handle stale messages
import redis
import time
import os

REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
STREAM_KEY = "user-interactions"
GROUP_NAME = "feature-engineering-service"
DEAD_LETTER_STREAM_KEY = f"{STREAM_KEY}:dlq"

# If a message has been pending for this long, it's considered stale.
STALE_MESSAGE_TIMEOUT_MS = 60 * 1000 # 1 minute

# If a message has been redelivered this many times, move to DLQ.
MAX_RETRIES = 3

r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)

def check_for_stale_messages():
    """
    Finds messages that have been pending for too long and claims them
    for reprocessing by this janitor instance.
    """
    try:
        # Get a summary of the pending messages for the group.
        pending_summary = r.xpending(STREAM_KEY, GROUP_NAME)
        if pending_summary['pending'] == 0:
            return

        # Iterate through consumers with pending messages.
        for consumer, count in pending_summary['consumers'].items():
            print(f"Consumer '{consumer}' has {count} pending messages.")
            # Get the full details of pending messages for this consumer.
            pending_messages = r.xpending_range(
                STREAM_KEY,
                GROUP_NAME,
                min='-', # Smallest possible ID
                max='+', # Largest possible ID
                count=100,
                consumername=consumer,
            )

            for msg in pending_messages:
                msg_id = msg['message_id']
                idle_time = msg['idle']
                delivery_count = msg['delivered']

                if idle_time > STALE_MESSAGE_TIMEOUT_MS:
                    print(f"Message {msg_id} is stale (idle for {idle_time}ms, delivered {delivery_count} times).")
                    
                    if delivery_count > MAX_RETRIES:
                        print(f"Message {msg_id} exceeded max retries. Moving to DLQ.")
                        # Move to Dead Letter Queue (DLQ)
                        # We need to get the original message first.
                        original_msg = r.xrange(STREAM_KEY, min=msg_id, max=msg_id, count=1)
                        if original_msg:
                            _, data = original_msg[0]
                            # Add failure metadata
                            data['dlq_reason'] = 'max_retries_exceeded'
                            data['original_consumer'] = consumer
                            r.xadd(DEAD_LETTER_STREAM_KEY, data)
                        # Acknowledge the original message to remove it from the PEL.
                        r.xack(STREAM_KEY, GROUP_NAME, msg_id)

                    else:
                        print(f"Claiming message {msg_id} for reprocessing...")
                        # XCLAIM the message to another consumer (or a dedicated recovery one).
                        # Here, we just claim it for a generic 'janitor-consumer'.
                        # The `min_idle_time` ensures we don't race with the original consumer.
                        claimed_messages = r.xclaim(
                            STREAM_KEY,
                            GROUP_NAME,
                            "janitor-consumer",
                            STALE_MESSAGE_TIMEOUT_MS,
                            [msg_id]
                        )
                        # A robust system would then attempt to process this claimed message.
                        # For simplicity, we assume the main consumer group will pick it up
                        # on its next XREADGROUP call if 'janitor-consumer' is part of it.
                        
    except Exception as e:
        print(f"Error in janitor process: {e}")

def main():
    print("Starting janitor process...")
    while True:
        check_for_stale_messages()
        time.sleep(10) # Run every 10 seconds

if __name__ == "__main__":
    main()

This janitor/dead-letter-queue (DLQ) pattern is a cornerstone of building resilient message-based systems. It prevents a single “poison pill” message or a transient downstream failure from halting the entire pipeline. The main consumer can focus on the happy path, while the janitor handles the exceptions. This separation of concerns made the system far more stable.

Final Architecture and Lingering Considerations

With the resilient Swift publisher and the robust consumer/janitor backend, we achieved our goal. The end-to-end latency from a user tapping a button in the app to their feature vector being updated was consistently under 200ms. This unlocked a new tier of real-time personalization capabilities. The final data flow was solid.

graph TD
    A[Swift Client] -- 1. Save Locally --> B(On-Device File Queue)
    B -- 2. Background Sync --> C{Redis}
    subgraph MLOps Backend
        C -- 3. XREADGROUP --> D[Python Consumer Pool]
        D -- 4. Process Event --> E[Real-time Feature Store]
        D -- 5. XACK --> C
        F[Janitor Process] -- 6. XPENDING --> C
        F -- 7. XCLAIM Stale --> D
        F -- 8. Move to DLQ --> G[Dead Letter Stream]
    end
    H[Monitoring] -- Watches --> G
    H -- Watches --> F

However, the solution isn’t without its own set of trade-offs and potential future improvements. The current implementation uses a single Redis instance. While performant, it’s a single point of failure. A production-grade deployment would necessitate moving to a Redis Sentinel for high availability or Redis Cluster for sharding and scalability. This introduces significant complexity, especially for the Swift client, which would need to handle failover and cluster topology discovery gracefully.

Furthermore, our on-device queuing is simple. A more advanced implementation might leverage iOS’s BackgroundTasks framework to schedule syncs more intelligently, optimizing for battery life and network conditions. Finally, as more event types are added, schema management becomes critical. A schema registry would be a logical next step to ensure that malformed events from older app versions don’t break the consumer fleet. The current implementation is a powerful and pragmatic first step, but the path of MLOps is one of continuous iteration.


  TOC