The technical directive was unambiguous: architect a sidecar service to shield a legacy, high-latency downstream system from unpredictable traffic surges. The primary load profile consists of bursts reaching up to 50,000 small, independent requests per second. The core mechanism required is request batching—coalescing individual requests into a single, larger payload for the downstream service. Key constraints included sub-15ms processing overhead at the 99th percentile and guaranteed processing of in-flight requests during a graceful shutdown.
This problem is not one of business logic complexity, but of pure concurrent throughput and latency control. Two primary candidates emerged from our initial technology radar scan: Python with FastAPI, representing rapid development and a mature async ecosystem, and Rust with Actix-web, promising raw performance and memory safety. This document records the architectural analysis and implementation comparison that drove our final decision.
Defining the Batching Problem
The required behavior can be distilled into a simple rule: a batch is dispatched to the downstream service when either a batch size limit (e.g., 100 requests) is reached, or a time window (e.g., 10 milliseconds) expires, whichever occurs first. This dual-trigger mechanism ensures low latency during periods of low traffic while maximizing throughput during bursts.
Architecturally, the sidecar intercepts individual requests, places them into a temporary holding area, and returns a future or promise to the original caller. A separate, long-running background task is responsible for observing this holding area, forming batches based on the defined rules, dispatching them, and finally resolving the promises of the original callers with the appropriate results from the batched response.
A critical component is the mapping of individual requests to their corresponding responses after the batch has been processed. Each incoming request must receive its specific result, not the entire batch response. Graceful shutdown is another non-negotiable requirement; upon receiving a termination signal, the service must stop accepting new requests but ensure all queued items are processed and responses are sent before exiting.
The data flow can be visualized as follows:
sequenceDiagram participant Client participant BatchingSidecar participant BatchProcessor participant DownstreamService loop For each incoming request Client->>+BatchingSidecar: POST /process (request_A) Note right of BatchingSidecar: Places request_A and a response future into a queue. BatchingSidecar-->>-Client: (Awaits future resolution) end BatchProcessor->>BatchProcessor: Loop continuously Note left of BatchProcessor: Collects requests from queue (request_A, request_B, ...) alt Batch size reached OR Timeout expired BatchProcessor->>+DownstreamService: POST /process_batch (batch [A, B, ...]) DownstreamService-->>-BatchProcessor: (batch_response [res_A, res_B, ...]) end Note left of BatchProcessor: Maps batch_response back to individual futures. BatchProcessor-->>BatchingSidecar: Resolve future for request_A with res_A BatchingSidecar-->>Client: 200 OK (res_A)
This sequence highlights the core challenge: managing thousands of concurrent connections and their associated state (the response futures) while a centralized processor operates on a different clock (the batching triggers).
Solution A: Python with FastAPI and Asyncio
Python’s asyncio
ecosystem, combined with FastAPI’s high-level abstractions, offers the fastest path from concept to a working implementation. The developer experience is first-class, and the code is often more readable for engineers unfamiliar with low-level systems programming.
Pros:
- Development Velocity: The implementation is straightforward. The language’s dynamic nature and extensive libraries significantly accelerate development.
- Ecosystem Maturity:
asyncio
is battle-tested. Libraries for configuration, logging, and metrics are abundant and integrate seamlessly. - Readability: Python code is generally more concise and easier to reason about for a wider range of developers, reducing maintenance overhead.
Cons:
- Global Interpreter Lock (GIL): This is the paramount concern. Although
asyncio
provides I/O concurrency, any CPU-intensive work within the batch processing logic can block the single event loop, impacting all concurrent requests. Even seemingly innocuous tasks like complex deserialization or data transformation at scale can introduce latency spikes. - Performance Ceiling: Python’s dynamic typing and interpreted nature impose a fundamental overhead. While frameworks like FastAPI are incredibly fast for Python, they cannot compete with compiled languages in raw computation or memory management efficiency. This translates to higher resource consumption (CPU/memory) for the same workload.
- Latency Jitter: Garbage collection pauses and event loop scheduling nuances can introduce non-deterministic latency, making it harder to meet strict p99 latency targets under heavy, sustained load.
Core Implementation: FastAPI
The implementation hinges on a central BatchProcessor
service managed by FastAPI’s dependency injection and lifecycle events. Communication between the request handlers and the background processor is handled by an asyncio.Queue
.
Configuration and Boilerplate (app/config.py
)
In a real-world project, configuration must be externalized. We use a simple Pydantic model for this.
# app/config.py
import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
BATCH_MAX_SIZE: int = int(os.environ.get("BATCH_MAX_SIZE", 100))
BATCH_MAX_WAIT_SECONDS: float = float(os.environ.get("BATCH_MAX_WAIT_SECONDS", 0.01)) # 10ms
WORKER_COUNT: int = int(os.environ.get("WORKER_COUNT", 1)) # We'll discuss this limitation later
settings = Settings()
The Batching Logic (app/batch_processor.py
)
This is the heart of the Python solution. It manages a queue of pending jobs and a background task that processes them.
# app/batch_processor.py
import asyncio
import logging
from typing import List, Dict, Any
from collections import namedtuple
from .config import settings
# A simple structure to hold the request data and its corresponding future
PendingJob = namedtuple("PendingJob", ["data", "future"])
logger = logging.getLogger(__name__)
class BatchProcessor:
def __init__(self):
self._queue = asyncio.Queue()
self._processor_task = None
self._active = False
async def start(self):
if not self._active:
logger.info("Starting batch processor...")
self._active = True
self._processor_task = asyncio.create_task(self._run())
logger.info("Batch processor started.")
async def stop(self):
if self._active:
logger.info("Stopping batch processor...")
self._active = False
# Signal the queue that no more items will be added.
# This allows the background task to drain the queue and exit gracefully.
await self._queue.put(None)
if self._processor_task:
await self._processor_task
logger.info("Batch processor stopped.")
async def submit_job(self, data: Dict[str, Any]) -> Any:
if not self._active:
raise RuntimeError("Processor is not active.")
future = asyncio.Future()
job = PendingJob(data=data, future=future)
await self._queue.put(job)
return await future
async def _run(self):
while self._active:
try:
batch = await self._collect_batch()
if not batch:
continue
logger.info(f"Processing batch of size {len(batch)}.")
# In a real application, this would be an async HTTP call
# to the downstream service.
results = await self._process_batch_downstream(batch)
# Distribute results back to the original callers
for job, result in zip(batch, results):
job.future.set_result(result)
except asyncio.CancelledError:
logger.warning("Processor task cancelled.")
break
except Exception:
logger.exception("Error in batch processing loop.")
# In a real system, we'd need to decide how to fail the futures.
# For now, we log and continue. A robust implementation would
# set an exception on each future in the failed batch.
async def _collect_batch(self) -> List[PendingJob]:
"""
Collects jobs from the queue until batch size or timeout is reached.
This is the core batching logic.
"""
batch = []
try:
# Wait for the first item with the timeout.
# This prevents the loop from spinning when the queue is empty.
first_job = await asyncio.wait_for(self._queue.get(), timeout=settings.BATCH_MAX_WAIT_SECONDS)
if first_job is None: # Shutdown signal
return []
batch.append(first_job)
# Once we have one item, quickly drain any other items in the queue
# up to the max batch size without further waiting.
while len(batch) < settings.BATCH_MAX_SIZE:
try:
# `get_nowait` is crucial here to avoid blocking.
job = self._queue.get_nowait()
if job is None: # Shutdown signal
self._queue.put_nowait(None) # Put it back for the main loop to catch
break
batch.append(job)
except asyncio.QueueEmpty:
break # The queue is empty, so we process what we have.
except asyncio.TimeoutError:
# No items arrived within the time window. Return an empty batch.
return []
return batch
async def _process_batch_downstream(self, batch: List[PendingJob]) -> List[Dict[str, Any]]:
# Simulate a network call to the downstream service
await asyncio.sleep(0.05) # Simulate 50ms downstream latency
# Simulate processing and returning individual results
results = []
for job in batch:
results.append({"processed": True, "original_id": job.data.get("id")})
return results
FastAPI Application (app/main.py
)
This ties everything together, managing the lifecycle of the BatchProcessor
.
# app/main.py
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, Request
from .batch_processor import BatchProcessor
from .config import settings
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Singleton instance of our processor
batch_processor = BatchProcessor()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup logic
await batch_processor.start()
yield
# Shutdown logic
await batch_processor.stop()
app = FastAPI(lifespan=lifespan)
# Dependency to provide the processor to routes
def get_batch_processor():
return batch_processor
@app.post("/process")
async def process_item(
request: Request,
processor: BatchProcessor = Depends(get_batch_processor)
):
request_data = await request.json()
# The `submit_job` method hides all the complexity of queueing
# and waiting for the batch result.
result = await processor.submit_job(request_data)
return result
This Python implementation is elegant and relatively simple. However, running it under a tool like gunicorn
with uvicorn
workers reveals the architectural weakness. To utilize multiple cores, you must run multiple worker processes. Each worker process will have its own independent BatchProcessor
instance. This completely defeats the purpose of centralized batching across all incoming requests. A request arriving at worker 1 cannot be batched with a request arriving at worker 2. The only way to achieve global batching would be to introduce an external message queue (like Redis or RabbitMQ), which adds significant operational complexity and another point of failure.
Solution B: Rust with Actix-web and Tokio
Rust presents a starkly different proposition. The trade-off is upfront complexity and a stricter compiler for runtime performance and safety. The language’s ownership model and lack of a garbage collector provide fine-grained control over memory and execution, making it ideal for latency-sensitive systems software like our sidecar.
Pros:
- Fearless Concurrency: Rust’s ownership model eliminates entire classes of concurrency bugs at compile time. This gives developers confidence when building complex multi-threaded systems.
- Predictable Performance: No GIL, no GC pauses. The compiled code is highly optimized, resulting in extremely low overhead and predictable latency. A single Rust process can effectively utilize all available CPU cores for its async runtime.
- Memory Safety: The borrow checker guarantees memory safety without the runtime cost of a garbage collector. This is critical for long-running services where memory leaks are unacceptable.
Cons:
- Steep Learning Curve: Rust’s core concepts (ownership, borrowing, lifetimes) are challenging for developers new to the language.
- Development Time: The strict compiler and more verbose syntax lead to a slower development cycle compared to Python.
- Verbosity: Implementing the same logic often requires more lines of code, especially around error handling and type definitions.
Core Implementation: Actix-web
The Rust implementation leverages the tokio
async runtime. Communication between the web server threads (actors in Actix) and the central batch processor is achieved via a multi-producer, single-consumer (MPSC) channel.
Project Setup (Cargo.toml
)
[package]
name = "batching_sidecar_rust"
version = "0.1.0"
edition = "2021"
[dependencies]
actix-web = "4"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
config = "0.13"
lazy_static = "1.4"
env_logger = "0.10"
log = "0.4"
Configuration (src/config.rs
)
We use the config
crate for managing settings.
// src/config.rs
use config::{Config, ConfigError, File};
use serde::Deserialize;
use std::env;
#[derive(Debug, Deserialize, Clone)]
pub struct Settings {
pub batch_max_size: usize,
pub batch_max_wait_ms: u64,
pub server_addr: String,
}
impl Settings {
pub fn new() -> Result<Self, ConfigError> {
let mut s = Config::default();
s.set_default("batch_max_size", 100)?;
s.set_default("batch_max_wait_ms", 10)?;
s.set_default("server_addr", "127.0.0.1:8080")?;
// Load a configuration file `config/default.toml`
s.merge(File::with_name("config/default").required(false))?;
// Allow environment variables to override (e.g., BATCH_MAX_SIZE=200)
s.merge(config::Environment::default())?;
s.try_into()
}
}
The Batching Logic (src/batch_processor.rs
)
This is where the power and complexity of Rust become apparent. We define explicit types for communication over the channel.
// src/batch_processor.rs
use crate::config::Settings;
use log::{info, error, warn};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout;
// The data we receive in a request
#[derive(Debug, Serialize, Deserialize)]
pub struct JobData {
pub id: u64,
pub payload: String,
}
// The response we send back
#[derive(Debug, Serialize, Deserialize)]
pub struct JobResult {
pub processed: bool,
pub original_id: u64,
}
// The message sent over the MPSC channel. It includes a
// one-shot channel sender to send the result back.
type Job = (JobData, oneshot::Sender<JobResult>);
#[derive(Clone)]
pub struct BatchProcessorHandle {
sender: mpsc::Sender<Job>,
}
impl BatchProcessorHandle {
pub async fn submit_job(&self, data: JobData) -> Result<JobResult, anyhow::Error> {
let (response_tx, response_rx) = oneshot::channel();
self.sender.send((data, response_tx)).await?;
Ok(response_rx.await?)
}
}
pub fn start_batch_processor(settings: Settings) -> BatchProcessorHandle {
let (tx, mut rx) = mpsc::channel::<Job>(100_000); // Large buffer
tokio::spawn(async move {
let batch_max_size = settings.batch_max_size;
let wait_duration = Duration::from_millis(settings.batch_max_wait_ms);
let mut batch: Vec<Job> = Vec::with_capacity(batch_max_size);
loop {
// Use tokio::timeout to wait for the first item.
// This is the equivalent of our asyncio.wait_for logic.
match timeout(wait_duration, rx.recv()).await {
Ok(Some(job)) => {
batch.push(job);
// After getting one, greedily pull more items without waiting.
while batch.len() < batch_max_size {
match rx.try_recv() {
Ok(job) => batch.push(job),
Err(mpsc::error::TryRecvError::Empty) => break,
Err(mpsc::error::TryRecvError::Disconnected) => {
info!("Channel disconnected. Shutting down processor.");
return;
}
}
}
}
Ok(None) => { // Channel closed
info!("Channel closed. Shutting down processor.");
break;
}
Err(_) => { // Timeout occurred
// If the batch is empty, we timed out waiting for the first item.
// Continue the loop to start a new wait.
if batch.is_empty() {
continue;
}
}
}
if batch.is_empty() {
continue;
}
info!("Processing batch of size {}", batch.len());
// In a real application, this would be an async HTTP call.
// We're just simulating the work.
let results = process_batch_downstream(&batch).await;
// Unzip the batch to separate the job data from the response senders.
// Then iterate and send each result back on its one-shot channel.
for (job, result) in batch.drain(..).zip(results) {
let (_, response_tx) = job;
if let Err(_) = response_tx.send(result) {
warn!("Failed to send response back to a waiting request. Receiver dropped.");
}
}
}
});
BatchProcessorHandle { sender: tx }
}
async fn process_batch_downstream(batch: &[Job]) -> Vec<JobResult> {
tokio::time::sleep(Duration::from_millis(50)).await; // Simulate downstream latency
batch
.iter()
.map(|(data, _)| JobResult {
processed: true,
original_id: data.id,
})
.collect()
}
Actix-web Application (src/main.rs
)
The main function initializes the processor and shares its handle with all web workers.
// src/main.rs
mod batch_processor;
mod config;
use actix_web::{web, App, HttpServer, Responder, HttpResponse, Error};
use batch_processor::{start_batch_processor, BatchProcessorHandle, JobData};
use config::Settings;
use log::info;
async fn process_item(
handle: web::Data<BatchProcessorHandle>,
item: web::Json<JobData>,
) -> Result<impl Responder, Error> {
match handle.submit_job(item.into_inner()).await {
Ok(result) => Ok(HttpResponse::Ok().json(result)),
Err(e) => {
log::error!("Failed to process job: {}", e);
Ok(HttpResponse::InternalServerError().finish())
}
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let settings = Settings::new().expect("Failed to load configuration.");
// Start the single, shared batch processor and get its handle.
let processor_handle = start_batch_processor(settings.clone());
info!("Starting server at {}", &settings.server_addr);
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(processor_handle.clone()))
.route("/process", web::post().to(process_item))
})
.bind(&settings.server_addr)?
.run()
.await
}
The Rust version is undeniably more verbose. However, it achieves something the Python version cannot do without external dependencies: a single BatchProcessor
task serves requests from all the web server’s worker threads. Actix-web starts multiple threads, but because the BatchProcessorHandle
(containing a channel sender) is cheap to clone and thread-safe (Send + Sync
), it can be shared across all threads. This provides true, centralized batching that scales with the number of CPU cores, all within a single process.
Final Choice and Rationale
For a non-critical internal service where time-to-market is the primary driver, the FastAPI solution is compelling. Its simplicity and the vast Python ecosystem make it a pragmatic choice.
However, for this specific technical directive—a critical path sidecar with stringent latency requirements—the potential for unpredictable performance in the Python version presented an unacceptable operational risk. The GIL limitation and the process-per-core model for web workers fundamentally clash with the requirement for centralized, high-throughput batching. While solvable with external infrastructure, that solution introduces complexity, cost, and additional failure modes.
The Rust solution, despite its higher initial development cost, was selected. The rationale is as follows:
- Performance Guarantee: The absence of a GIL and GC allows us to meet the sub-15ms p99 latency target with much higher confidence. The system’s behavior under load is far more predictable.
- Architectural Simplicity: The ability to share state safely across threads within a single process allows for a self-contained application. This reduces deployment complexity and eliminates dependencies on external message brokers for the core batching logic.
- Resource Efficiency: For the same workload, the Rust service will consume significantly less CPU and memory. Over the long term, in a cloud environment, this translates to direct cost savings.
The cost of the steeper learning curve and longer development time was deemed a necessary investment to ensure the service’s long-term stability and performance in a critical infrastructure role.
Extensibility and Limitations
The chosen Rust architecture, while robust, is not without its own limitations. The current implementation uses a single-consumer channel, meaning only one tokio
task is processing all batches. On a machine with a very high core count (e.g., 32+ cores), this single processor task could itself become a bottleneck, especially if the post-processing logic (distributing results) involves any non-trivial computation.
A future iteration could explore a sharded processor model. This would involve spawning multiple batch processor tasks, each with its own MPSC channel. Incoming requests would be hashed by a key (e.g., a user ID) and dispatched to the corresponding processor’s channel. This would parallelize the batch formation and processing work, scaling the system beyond a single consumer task. Furthermore, the current implementation assumes the downstream service is purely I/O-bound. If batch processing itself required significant CPU work, the tokio::spawn
task would need to be augmented with tokio::task::spawn_blocking
to move that work onto a dedicated thread pool, preventing it from starving the async runtime.