Implementing a Low-Latency Signal Processing Pipeline Using SwiftUI for Real-Time Data and a Ray Cluster for Distributed SciPy Computation


The initial on-device prototype for real-time accelerometer data analysis was a failure. The requirement was to process a 200Hz stream, apply a series of filters, run a Fast Fourier Transform (FFT), and extract frequency-domain features, all while maintaining a fluid 60fps UI on a mid-range iPhone. The SwiftUI interface for visualizing the output was responsive, but as soon as the scipy equivalent functions (implemented via a Python bridge or native Accelerate framework) were engaged, the main thread would stutter, leading to dropped frames and significant battery drain. This approach was fundamentally unscalable and unacceptable for a production application.

The first attempt at a solution was to offload the computation. A standard RESTful API backend, built with Flask, served as the target. The Swift client would batch sensor data, POST it to an endpoint, and poll for a result. This immediately introduced unacceptable latency, turning a “real-time” experience into a “near-time” one at best. More critically, the single-process Python backend, constrained by the Global Interpreter Lock (GIL), could not concurrently process streams from multiple clients. The problem was not just about raw compute power, but about stateful, concurrent stream processing. Each client connection requires its own isolated computational state—data buffers, filter states, etc. This led to the selection of a completely different architectural pattern.

The final architecture combines a native SwiftUI client for data capture and visualization, gRPC for efficient, bidirectional streaming, and a Ray cluster on the backend. Ray’s actor model is the linchpin of this design. It allows us to transparently instantiate a dedicated, stateful Python process (Actor) for each incoming client connection on a distributed cluster. This actor encapsulates the entire SciPy-based processing logic for a single stream, providing both computational and stateful isolation with minimal boilerplate.

The Service Contract: Defining Streams with Protocol Buffers

Before any implementation, the contract between the Swift client and the Python backend must be solidified. gRPC with Protocol Buffers is the logical choice over WebSockets or REST for this task. Its high-performance binary serialization is ideal for streams of numerical data, and its support for bidirectional streaming provides a persistent, low-latency communication channel.

The .proto file defines a single StreamSignal service with one method, Process. This method establishes a full-duplex stream where the client sends SensorRequest messages and the server responds with ProcessedResponse messages.

// signal_service.proto
syntax = "proto3";

package signal_processing;

// The client streams chunks of sensor data.
// A typical chunk might contain 100 samples (0.5s of data at 200Hz).
message SensorRequest {
  string stream_id = 1; // Uniquely identifies this client's stream
  repeated float readings = 2; // Raw accelerometer readings (e.g., x-axis)
  int64 timestamp_ms = 3; // Client-side timestamp for the chunk
}

// The server streams back computed features from the signal.
message ProcessedResponse {
  string stream_id = 1;
  // Example feature: Dominant frequency in a specific band.
  float dominant_frequency = 2;
  // Example feature: Power in the 5-15Hz band.
  float power_band_alpha = 3;
  // The timestamp of the data chunk that generated this result.
  int64 source_timestamp_ms = 4;
}

// The main service definition.
service StreamSignal {
  // A bidirectional streaming RPC.
  // The client continuously sends sensor data, and the server continuously
  // sends back processed results as they become available.
  rpc Process(stream(SensorRequest)) returns (stream(ProcessedResponse));
}

This contract is critical. The stream_id ensures requests can be correlated, and the timestamp round-trip allows for latency measurement. Compiling this .proto file generates the necessary gRPC client stubs in Swift and server skeletons in Python, removing significant manual effort.

Backend Implementation: Stateful Actors on Ray

The backend’s core is a Ray actor responsible for handling a single client’s data stream. This actor maintains state, such as a buffer of recent sensor readings, and encapsulates all the SciPy logic. A real-world project requires careful dependency management.

Project Structure and Dependencies (requirements.txt):

ray[serve]>=2.7.0
grpcio==1.59.0
grpcio-tools==1.59.0
numpy==1.26.0
scipy==1.11.3

The SignalProcessorActor is where the actual work happens. It’s a simple Python class decorated with @ray.remote. In a production setup, this actor would be more complex, potentially loading a pre-trained model or configuration files during initialization.

# processing/actor.py
import time
import logging
from collections import deque
from typing import Dict, Any

import numpy as np
import ray
from scipy import signal

# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Constants for signal processing
# In a real application, these would come from a configuration file.
SAMPLE_RATE = 200  # Hz
WINDOW_SIZE = 512  # Number of samples for FFT, determines frequency resolution
WINDOW_OVERLAP = 256 # 50% overlap

@ray.remote
class SignalProcessorActor:
    """
    A stateful Ray actor to process a single, continuous stream of sensor data.
    Each instance of this actor corresponds to one active client connection.
    """
    def __init__(self, stream_id: str):
        self.stream_id = stream_id
        # Use a deque as a sliding window buffer for incoming sensor data.
        # It's efficient for appends and pops from either end.
        self._buffer = deque(maxlen=WINDOW_SIZE)
        self._last_processed_time = time.time()
        
        logger.info(f"[Actor {self.stream_id}] Initialized. Window size: {WINDOW_SIZE}, Sample rate: {SAMPLE_RATE}")

    def process_chunk(self, readings: np.ndarray) -> Dict[str, Any] | None:
        """
        Processes a new chunk of sensor readings.
        This method is the core computational logic. It appends new data,
        and if the buffer is full, it performs signal processing.
        """
        if not isinstance(readings, np.ndarray):
            logger.error(f"[Actor {self.stream_id}] Received non-numpy data.")
            return None
            
        self._buffer.extend(readings)

        # Only process if we have a full window of data.
        if len(self._buffer) < WINDOW_SIZE:
            return None

        # Create a copy for processing to avoid mutation during analysis
        window_data = np.array(self._buffer)

        try:
            # SciPy's Welch method to compute power spectral density, which is
            # more robust against noise than a single FFT.
            frequencies, psd = signal.welch(window_data, fs=SAMPLE_RATE, nperseg=WINDOW_SIZE)

            # --- Feature Extraction ---
            # This is where domain-specific logic resides.
            
            # 1. Find dominant frequency
            dominant_frequency = frequencies[np.argmax(psd)]

            # 2. Calculate power in a specific band (e.g., 5-15 Hz)
            band_mask = (frequencies >= 5) & (frequencies <= 15)
            # Integrate PSD over the frequency band to get power.
            power_band_alpha = np.trapz(psd[band_mask], frequencies[band_mask])

            # In a real-world scenario, you might extract dozens of features.
            
            # After processing, slide the window by removing the overlap amount
            # This is a simplification; a more robust implementation would manage
            # the buffer more carefully to slide exactly by the non-overlapped part.
            # For this deque, new data pushes old data out automatically. We
            # don't need to manually slide, but we control processing frequency.

            current_time = time.time()
            processing_latency = current_time - self._last_processed_time
            self._last_processed_time = current_time
            
            logger.debug(f"[Actor {self.stream_id}] Processed window. Latency: {processing_latency:.4f}s")
            
            return {
                "dominant_frequency": float(dominant_frequency),
                "power_band_alpha": float(power_band_alpha),
            }

        except Exception as e:
            logger.error(f"[Actor {self.stream_id}] Error during signal processing: {e}", exc_info=True)
            return None

    def get_stream_id(self) -> str:
        return self.stream_id

The actor is stateful (self._buffer) and its methods can be invoked remotely. The next step is to expose this actor via a gRPC server. The server’s role is minimal: it authenticates connections (if needed), instantiates an actor for each new stream, and then acts as a proxy, forwarding messages between the gRPC stream and the actor.

# server/grpc_server.py
import asyncio
import logging
import uuid
from concurrent import futures

import grpc
import ray
import numpy as np

# Import generated gRPC files and the actor
from generated import signal_service_pb2, signal_service_pb2_grpc
from processing.actor import SignalProcessorActor

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# A simple manager to hold references to actors.
# In a real system, this might be more complex, handling actor lifecycle.
actor_registry = {}

class StreamSignalServicer(signal_service_pb2_grpc.StreamSignalServicer):
    """
    Implements the gRPC service defined in the .proto file.
    This class bridges the gRPC stream with a Ray actor.
    """
    async def Process(self, request_iterator, context):
        stream_id = str(uuid.uuid4())
        peer = context.peer()
        logger.info(f"New client connection from {peer}. Assigning stream_id: {stream_id}")

        # Instantiate a dedicated actor for this stream on the Ray cluster.
        # The `name` makes the actor findable if needed, but here we hold a handle.
        try:
            actor_handle = SignalProcessorActor.options(name=stream_id, lifetime="detached").remote(stream_id)
            actor_registry[stream_id] = actor_handle
            logger.info(f"Successfully created Ray actor for stream_id: {stream_id}")
        except Exception as e:
            logger.error(f"Failed to create Ray actor for {stream_id}: {e}")
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details("Could not initialize processing backend.")
            return

        try:
            async for request in request_iterator:
                # Convert readings to a numpy array for efficient processing.
                readings_np = np.array(request.readings, dtype=np.float32)

                # Asynchronously call the actor's method. This is non-blocking.
                # Ray returns an ObjectRef immediately.
                result_ref = actor_handle.process_chunk.remote(readings_np)
                
                # We don't block here (`ray.get(result_ref)`).
                # This allows the server to immediately accept the next chunk of data
                # from the client, preventing backpressure on the network layer.
                # A more advanced implementation might use a task queue to process results.
                
                # For this example, we'll `await` the result to send it back.
                # In a high-throughput system, you would decouple receiving from processing results.
                processed_data = await result_ref

                if processed_data:
                    response = signal_service_pb2.ProcessedResponse(
                        stream_id=stream_id,
                        dominant_frequency=processed_data["dominant_frequency"],
                        power_band_alpha=processed_data["power_band_alpha"],
                        source_timestamp_ms=request.timestamp_ms
                    )
                    yield response
        except grpc.aio.AioRpcError as e:
            logger.warning(f"Client stream closed for {stream_id}. Code: {e.code()}")
        finally:
            logger.info(f"Cleaning up resources for stream_id: {stream_id}")
            # Gracefully terminate the actor when the client disconnects.
            if stream_id in actor_registry:
                ray.kill(actor_registry[stream_id])
                del actor_registry[stream_id]
                logger.info(f"Killed actor for stream_id: {stream_id}")

async def serve():
    """Starts the asynchronous gRPC server."""
    server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
    signal_service_pb2_grpc.add_StreamSignalServicer_to_server(StreamSignalServicer(), server)
    
    server_address = '[::]:50051'
    server.add_insecure_port(server_address)
    
    logger.info(f"Starting gRPC server on {server_address}")
    await server.start()
    await server.wait_for_termination()

if __name__ == '__main__':
    # Initialize Ray. In a production cluster, `address="auto"` would connect
    # to an existing cluster. Here, we start a local one for development.
    if not ray.is_initialized():
        ray.init(logging_level=logging.ERROR, ignore_reinit_error=True)
    
    try:
        asyncio.run(serve())
    except KeyboardInterrupt:
        logger.info("Server shutting down.")
        ray.shutdown()

This server code demonstrates the core pattern: one gRPC connection maps to one Ray Actor. When a client disconnects, the finally block ensures the corresponding actor is killed, releasing its resources on the cluster. This is a critical detail for avoiding resource leaks in a long-running service.

Here is a diagram of the overall architecture:

sequenceDiagram
    participant SwiftUI_Client as SwiftUI Client
    participant GRPC_Server as gRPC Gateway
    participant Ray_Cluster as Ray Cluster
    participant Signal_Actor as SignalProcessorActor

    SwiftUI_Client->>+GRPC_Server: Establishes bidirectional stream (Process RPC)
    GRPC_Server->>+Ray_Cluster: ray.remote(SignalProcessorActor)
    Ray_Cluster-->>-GRPC_Server: Returns ActorHandle
    Note over GRPC_Server: Stores handle for the stream
    GRPC_Server-->>-SwiftUI_Client: Stream connection opened

    loop Real-time Data Flow
        SwiftUI_Client-)+GRPC_Server: Stream SensorRequest chunk
        GRPC_Server-)+Signal_Actor: actor.process_chunk.remote(data)
        Note over Signal_Actor: Appends to buffer, runs SciPy
(Welch, FFT, feature extraction) Signal_Actor--)-GRPC_Server: Returns result ObjectRef GRPC_Server-)-SwiftUI_Client: Stream ProcessedResponse end SwiftUI_Client->>GRPC_Server: Closes connection (e.g., app backgrounded) GRPC_Server->>+Ray_Cluster: ray.kill(ActorHandle) Note over GRPC_Server: Cleans up registry Ray_Cluster-->>-GRPC_Server: Actor terminated

SwiftUI Client: Capturing and Visualizing Data

The iOS client needs to perform three main tasks: capture sensor data (we will simulate this with a timer for simplicity), manage the gRPC bidirectional stream, and update the SwiftUI view reactively.

The gRPC connection management is encapsulated in a dedicated service class.

// SignalService.swift
import Foundation
import GRPC
import NIO
import Combine

// This class manages the gRPC connection and data streaming.
// It uses Combine to publish received data to the UI.
class SignalService: ObservableObject {
    private var client: Signal_processing_StreamSignalAsyncClient?
    private var connection: ClientConnection?
    private var call: GRPCAsyncBidirectionalStreamingCall<Signal_processing_SensorRequest, Signal_processing_ProcessedResponse>?

    // A PassthroughSubject to broadcast processed data to any subscriber (e.g., a ViewModel).
    let processedDataPublisher = PassthroughSubject<Signal_processing_ProcessedResponse, Error>()

    // State properties to reflect connection status in the UI.
    @Published var isConnected: Bool = false
    @Published var errorMessage: String?

    init() {
        setupConnection()
    }

    private func setupConnection() {
        // In a production app, the host and port would be configurable.
        // For local testing, the server runs on the host machine's IP.
        let group = PlatformSupport.makeEventLoopGroup(loopCount: 1)
        let connection = ClientConnection.insecure(group: group)
            .connect(host: "localhost", port: 50051)
        self.connection = connection
        self.client = Signal_processing_StreamSignalAsyncClient(channel: connection)
    }

    func startStreaming() {
        guard let client = self.client else {
            errorMessage = "gRPC client not initialized."
            return
        }
        
        // This is where the bidirectional stream is created.
        let call = client.makeProcessCall()
        self.call = call

        // Start a new Task to handle receiving messages from the server.
        // This runs concurrently and does not block the main thread.
        Task {
            do {
                for try await response in call.responseStream {
                    // When a response is received, publish it.
                    DispatchQueue.main.async {
                        self.processedDataPublisher.send(response)
                    }
                }
                // The loop finishes when the server closes the stream.
                DispatchQueue.main.async {
                    self.isConnected = false
                }
            } catch {
                // Handle errors, such as the server disconnecting.
                DispatchQueue.main.async {
                    self.errorMessage = "Stream failed: \(error.localizedDescription)"
                    self.isConnected = false
                }
            }
        }
        
        self.isConnected = true
        self.errorMessage = nil
    }

    func sendDataChunk(readings: [Float]) async {
        guard isConnected, let call = self.call else { return }

        let request = Signal_processing_SensorRequest.with {
            $0.streamID = "swiftui-client-01" // A unique ID for the device
            $0.readings = readings
            $0.timestampMs = Int64(Date().timeIntervalSince1970 * 1000)
        }

        do {
            try await call.requestStream.send(request)
        } catch {
            DispatchQueue.main.async {
                self.errorMessage = "Failed to send data: \(error.localizedDescription)"
                self.isConnected = false
            }
        }
    }
    
    func stopStreaming() {
        call?.requestStream.finish()
        connection?.close()
        isConnected = false
    }
}

The ViewModel connects the SignalService to the SwiftUI View. It simulates sensor data with a Timer and subscribes to the processedDataPublisher to receive results.

// SignalViewModel.swift
import Foundation
import Combine

class SignalViewModel: ObservableObject {
    @Published var dominantFrequency: Float = 0.0
    @Published var alphaBandPower: Float = 0.0
    @Published var connectionStatus: String = "Disconnected"
    @Published var lastLatency: TimeInterval = 0.0

    private let signalService = SignalService()
    private var cancellables = Set<AnyCancellable>()
    private var dataStreamTimer: Timer?

    init() {
        // Subscribe to connection status changes from the service.
        signalService.$isConnected
            .map { $0 ? "Connected" : "Disconnected" }
            .assign(to: \.connectionStatus, on: self)
            .store(in: &cancellables)

        // Subscribe to processed data from the service.
        signalService.processedDataPublisher
            .receive(on: DispatchQueue.main)
            .sink(receiveCompletion: { completion in
                if case .failure(let error) = completion {
                    self.connectionStatus = "Error: \(error.localizedDescription)"
                }
            }, receiveValue: { [weak self] response in
                self?.dominantFrequency = response.dominantFrequency
                self?.alphaBandPower = response.powerBandAlpha
                
                // Calculate and display round-trip latency
                let now = Int64(Date().timeIntervalSince1970 * 1000)
                self?.lastLatency = TimeInterval(now - response.sourceTimestampMs) / 1000.0
            })
            .store(in: &cancellables)
    }

    func toggleConnection() {
        if signalService.isConnected {
            stop()
        } else {
            start()
        }
    }

    private func start() {
        signalService.startStreaming()
        
        // Simulate a 200Hz sensor by sending 40 samples every 0.2 seconds.
        dataStreamTimer = Timer.scheduledTimer(withTimeInterval: 0.2, repeats: true) { [weak self] _ in
            let mockReadings = (0..<40).map { _ in Float.random(in: -1.0...1.0) }
            Task {
                await self?.signalService.sendDataChunk(readings: mockReadings)
            }
        }
    }

    private func stop() {
        dataStreamTimer?.invalidate()
        dataStreamTimer = nil
        signalService.stopStreaming()
        dominantFrequency = 0.0
        alphaBandPower = 0.0
    }
}

Finally, the SwiftUI View is straightforward. It presents the data from the ViewModel and provides a button to control the connection.

// ContentView.swift
import SwiftUI

struct ContentView: View {
    @StateObject private var viewModel = SignalViewModel()

    var body: some View {
        VStack(spacing: 20) {
            Text("Real-Time Signal Processing")
                .font(.largeTitle)

            Text("Status: \(viewModel.connectionStatus)")
                .foregroundColor(viewModel.connectionStatus == "Connected" ? .green : .red)

            Divider()

            VStack(alignment: .leading, spacing: 15) {
                Text("Dominant Frequency:")
                    .font(.headline)
                Text("\(viewModel.dominantFrequency, specifier: "%.2f") Hz")
                    .font(.system(.title, design: .monospaced))
                    .foregroundColor(.accentColor)

                Text("Alpha Band Power:")
                    .font(.headline)
                Text("\(viewModel.alphaBandPower, specifier: "%.4f")")
                    .font(.system(.title, design: .monospaced))
                    .foregroundColor(.accentColor)
                
                Text("Round-trip Latency:")
                    .font(.headline)
                Text("\(viewModel.lastLatency * 1000, specifier: "%.1f") ms")
                    .font(.system(.callout, design: .monospaced))
            }
            .padding()
            .background(Color(.secondarySystemBackground))
            .cornerRadius(10)

            Spacer()

            Button(action: {
                viewModel.toggleConnection()
            }) {
                Text(viewModel.connectionStatus == "Connected" ? "Disconnect" : "Connect")
                    .frame(maxWidth: .infinity)
                    .padding()
                    .background(viewModel.connectionStatus == "Connected" ? Color.red : Color.blue)
                    .foregroundColor(.white)
                    .cornerRadius(10)
            }
        }
        .padding()
    }
}

This architecture successfully decouples the UI from the heavy computation. The SwiftUI view remains perfectly responsive, only responsible for rendering data provided by the ViewModel. The backend is now horizontally scalable; if the number of users increases, we simply add more nodes to the Ray cluster. Ray’s internal scheduler will handle placing new actors on the available machines.

The main limitation of this design lies in its fault tolerance. If a Ray worker node fails, any actors running on it are lost, abruptly terminating their corresponding client streams. While Ray actors can be configured to restart automatically, the internal state (the _buffer deque) is lost. For applications requiring higher availability, this state would need to be periodically checkpointed to a reliable external store like Redis or a distributed file system. Furthermore, the gRPC gateway is a single point of entry. In a production environment, this would sit behind a load balancer with multiple gateway instances to prevent it from becoming a bottleneck or a single point of failure. The current model also incurs a “cold start” penalty for the first connection as a new actor process must be spun up, which could be mitigated by maintaining a warm pool of pre-initialized actors.


  TOC