The catalyst for this entire architecture was a four-hour outage. Our monolithic, single-region sentiment analysis service, built around a fine-tuned BERT model, went offline when its cloud region experienced a networking partition. The post-mortem was clear: we needed a geo-distributed, active-active system that could survive a regional failure and allow for zero-downtime model updates. A simple DNS load balancer wasn’t enough; we required granular, real-time control over model versions and service health across continents. This log details the journey from that failure to a resilient, observable inference platform.
Technical Pain Point -> Initial Concept
The core requirements were stringent:
- Multi-Region Active-Active Deployment: Services in
us-east-1
andeu-west-1
must both serve production traffic. - Dynamic Model Rollouts: The ability to deploy a new model version (
v2.1
) to one region, or even a subset of nodes (canary), without affecting the other. - Automatic Fault Detection: Unhealthy inference nodes must be automatically detected and taken out of rotation.
- Scalable Request Logging: All inference requests and their metadata must be persisted for analytics, withstanding high write throughput from all regions.
- Real-Time Observability: An internal dashboard must reflect the live state of the entire distributed system—every node, its health, and the model version it’s currently serving.
Our initial thought was to use a managed database like RDS to store the deployment configuration. This was quickly discarded. A traditional database introduces its own single point of failure and lacks the low-latency watch mechanisms required for near-instant propagation of configuration changes. We needed a distributed consensus system.
Technology Selection Decisions
In a real-world project, technology choices are about trade-offs, not just features.
Service Coordination (
etcd
): We evaluated etcd, ZooKeeper, and Consul. We choseetcd
for three primary reasons: its clean gRPC API, the simplicity and power of itswatch
primitive, and its use of TTL-based leases for ephemeral node registration, which is perfect for service health checks. It would act as our single source of truth for the system’s intended state.Inference Engine (
Python
+Hugging Face Transformers
): This was the incumbent technology and performed well. The challenge was not the inference itself, but how to manage the lifecycle of the model objects within a long-running Python service that needed to react to external configuration changes.Distributed Logging (
Cassandra
): We anticipated tens of thousands of inference requests per minute, globally. A relational database would choke on this write load. We needed a database designed for multi-datacenter replication and linear write scalability. Cassandra was the obvious choice. Its tunable consistency (LOCAL_QUORUM
for writes) allowed us to prioritize write availability over immediate global consistency for our logging data, which was an acceptable trade-off.Real-Time Dashboard (
Jotai
): For the internal monitoring dashboard, we needed to manage state derived from a WebSocket stream reflectingetcd
changes. We considered Redux, but the boilerplate for our use case felt excessive.Jotai
‘s atomic, bottom-up approach was a perfect fit. Each piece of state (e.g., the health of a specific node) could be an independent atom, preventing unnecessary re-renders of the entire dashboard when a single node’s status changed.
The etcd Key-Value Schema
The foundation of the control plane is a well-defined etcd
key schema. A poor structure here leads to chaos.
/inference_service/
├── config/
│ └── models/
│ ├── sentiment/
│ │ ├── v1.0.0 # Value: {"path": "distilbert-base-uncased-finetuned-sst-2-english", "hash": "sha256:..."}
│ │ └── v1.1.0 # Value: {"path": "path/to/new/model", "hash": "sha256:..."}
│ └── summarization/
│ └── v1.0.0
├── deployments/
│ ├── us-east-1/
│ │ ├── model_assignment # Value: {"model": "sentiment", "version": "v1.1.0"}
│ │ └── canary_percent # Value: 0
│ └── eu-west-1/
│ ├── model_assignment # Value: {"model": "sentiment", "version": "v1.0.0"}
│ └── canary_percent # Value: 0
└── service_registry/
├── us-east-1/
│ └── inference-node-a7b2c8 # Ephemeral node with TTL, Value: {"ip": "10.0.1.23", "pid": 4512}
└── eu-west-1/
└── inference-node-d9e4f1
-
/config/models
: A catalog of all available models and their locations (e.g., Hugging Face Hub path or S3 bucket). This is our model registry. -
/deployments/{region}/model_assignment
: This is the critical key. Our serviceswatch
this key. Changing its value triggers a model reload across all nodes in that region. -
/service_registry/{region}/{node_id}
: Each inference node registers itself here using anetcd
lease. If the node dies, the lease expires, and the key is automatically deleted. This provides passive health checking.
Part 1: The Resilient Python Inference Node
The core of the system is a Python service, built with FastAPI, that serves the model and listens for changes in etcd
. The pitfall here is managing concurrency; the web server, the etcd
watcher, and the health check heartbeat must all run without blocking each other.
Project Structure & Dependencies
/inference_node
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI application
│ ├── config.py # Configuration loading
│ ├── etcd_client.py # etcd interaction and watch loop
│ ├── model_manager.py # Handles loading/swapping Hugging Face models
│ └── cassandra_logger.py # Asynchronous logging to Cassandra
├── tests/
├── Dockerfile
└── requirements.txt
requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0.post1
transformers==4.35.0
torch==2.1.0
python-etcd3==0.13.0
cassandra-driver==3.28.0
aiohttp==3.8.6 # For async cassandra logging
pydantic-settings==2.0.3
Configuration (app/config.py
)
Production-grade services externalize configuration. Hardcoding endpoints is a common mistake.
# app/config.py
import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
APP_NAME: str = "Resilient Inference Service"
REGION: str = os.environ.get("SERVICE_REGION", "unknown")
NODE_ID: str = os.environ.get("HOSTNAME", "local-node")
ETCD_HOST: str = "127.0.0.1"
ETCD_PORT: int = 2379
ETCD_LEASE_TTL: int = 10 # Seconds
CASSANDRA_CONTACT_POINTS: list[str] = ["127.0.0.1"]
CASSANDRA_PORT: int = 9042
CASSANDRA_KEYSPACE: str = "inference_logs"
CASSANDRA_LOCAL_DC: str = "dc1"
LOG_LEVEL: str = "INFO"
settings = Settings()
The Model Manager (app/model_manager.py
)
This is where the magic happens. We need to be able to swap the active model in memory without any downtime. A simple dictionary holding the model and tokenizer allows for an atomic switch.
# app/model_manager.py
import logging
import threading
from typing import Optional, Dict, Any
from transformers import pipeline, Pipeline
logger = logging.getLogger(__name__)
class ModelManager:
def __init__(self):
self._model_lock = threading.Lock()
self._active_model_key: Optional[str] = None
# In a real system, this cache would have eviction policies
self._model_cache: Dict[str, Pipeline] = {}
self._current_pipeline: Optional[Pipeline] = None
def get_current_pipeline(self) -> Optional[Pipeline]:
"""Returns the currently active pipeline for inference."""
with self._model_lock:
return self._current_pipeline
def load_and_swap_model(self, model_key: str, model_path: str, task: str = "sentiment-analysis"):
"""
Loads a new model and atomically swaps it into the active slot.
This is the most critical section for zero-downtime model updates.
"""
if self._active_model_key == model_key:
logger.info(f"Model {model_key} is already active. No action taken.")
return
logger.info(f"Attempting to load model '{model_key}' from path '{model_path}'...")
try:
# Check cache first
if model_key in self._model_cache:
logger.info(f"Model {model_key} found in cache.")
new_pipeline = self._model_cache[model_key]
else:
# This is a blocking I/O and CPU-intensive operation.
# In a high-performance system, this might be offloaded to a separate process.
new_pipeline = pipeline(task, model=model_path)
self._model_cache[model_key] = new_pipeline
logger.info(f"Successfully loaded model {model_key}. Swapping now.")
with self._model_lock:
self._current_pipeline = new_pipeline
self._active_model_key = model_key
logger.info(f"Model {model_key} is now active.")
except Exception as e:
logger.error(f"Failed to load or swap model {model_key}: {e}", exc_info=True)
# IMPORTANT: We do not change the state if the new model fails to load.
# The old model continues to serve traffic.
# Global instance
model_manager = ModelManager()
The etcd Watcher (app/etcd_client.py
)
This runs in a background thread, maintaining the service’s heartbeat and watching for configuration changes.
# app/etcd_client.py
import json
import logging
import threading
import time
from typing import Optional
import etcd3
from .config import settings
from .model_manager import model_manager
logger = logging.getLogger(__name__)
class EtcdClient:
def __init__(self):
self.client = etcd3.client(host=settings.ETCD_HOST, port=settings.ETCD_PORT)
self.lease: Optional[etcd3.Lease] = None
self.node_key = f"/service_registry/{settings.REGION}/{settings.NODE_ID}"
self.model_assignment_key = f"/deployments/{settings.REGION}/model_assignment"
self._stop_event = threading.Event()
self._watch_thread = threading.Thread(target=self._watch_model_assignment, daemon=True)
self._heartbeat_thread = threading.Thread(target=self._keep_alive, daemon=True)
def register_node(self):
"""Register this node in etcd with a TTL lease."""
try:
self.lease = self.client.lease(settings.ETCD_LEASE_TTL)
node_value = json.dumps({"ip": "127.0.0.1", "pid": os.getpid()}) # Placeholder IP
self.client.put(self.node_key, node_value, lease=self.lease)
logger.info(f"Node registered in etcd at {self.node_key} with TTL {settings.ETCD_LEASE_TTL}s")
self._heartbeat_thread.start()
except Exception as e:
logger.critical(f"Failed to register with etcd: {e}", exc_info=True)
# This is a fatal error. The service should probably exit.
raise
def _keep_alive(self):
"""Refresh the etcd lease periodically."""
while not self._stop_event.is_set() and self.lease:
try:
self.lease.refresh()
time.sleep(settings.ETCD_LEASE_TTL / 2)
except Exception as e:
logger.error(f"Failed to refresh etcd lease: {e}. Re-registering...")
try:
self.register_node()
except Exception as reg_e:
logger.critical(f"Failed to re-register after lease failure: {reg_e}")
time.sleep(5) # Backoff before retrying
def get_initial_model_config(self) -> Optional[dict]:
"""Fetch the current model assignment on startup."""
try:
value, _ = self.client.get(self.model_assignment_key)
if value:
return json.loads(value)
except Exception as e:
logger.error(f"Failed to get initial model config: {e}")
return None
def get_model_path_from_registry(self, model_name: str, version: str) -> Optional[str]:
"""Looks up the model's path in the model registry."""
key = f"/config/models/{model_name}/{version}"
try:
value, _ = self.client.get(key)
if value:
model_info = json.loads(value)
return model_info.get("path")
except Exception as e:
logger.error(f"Failed to retrieve model path for {model_name}:{version} from etcd: {e}")
return None
def start_watching(self):
"""Starts the background thread to watch for model assignment changes."""
self._watch_thread.start()
logger.info(f"Started watching for changes on key: {self.model_assignment_key}")
def _watch_model_assignment(self):
"""The core watch loop."""
while not self._stop_event.is_set():
try:
events_iterator, cancel = self.client.watch(self.model_assignment_key)
for event in events_iterator:
# We only care about PUT events (creations or updates)
if isinstance(event, etcd3.events.PutEvent):
logger.info(f"Detected change on {self.model_assignment_key}")
config = json.loads(event.value)
self._handle_config_update(config)
except Exception as e:
logger.error(f"Etcd watch encountered an error: {e}. Reconnecting in 5s...")
time.sleep(5)
def _handle_config_update(self, config: dict):
model_name = config.get("model")
version = config.get("version")
if not model_name or not version:
logger.warning(f"Invalid model assignment received: {config}")
return
model_path = self.get_model_path_from_registry(model_name, version)
if not model_path:
logger.error(f"Could not find path for model {model_name}:{version} in registry.")
return
model_key = f"{model_name}:{version}"
# Offload the blocking model load to a new thread to not block the watch loop.
loader_thread = threading.Thread(
target=model_manager.load_and_swap_model,
args=(model_key, model_path)
)
loader_thread.start()
def shutdown(self):
self._stop_event.set()
if self.lease:
self.lease.revoke()
self.client.delete(self.node_key)
logger.info("Deregistered node and shut down etcd client.")
etcd_client = EtcdClient()
Part 2: Cassandra for High-Throughput Logging
We need to log inference calls without blocking the response to the user. The write to Cassandra must be asynchronous and fault-tolerant.
Cassandra Schema
A denormalized schema optimized for our primary query pattern: retrieving logs by request ID or by time window.
CREATE KEYSPACE IF NOT EXISTS inference_logs
WITH replication = {'class': 'NetworkTopologyStrategy', 'us-east-1': '3', 'eu-west-1': '3'};
USE inference_logs;
CREATE TABLE IF NOT EXISTS sentiment_requests (
request_id uuid,
request_time timestamp,
region text,
node_id text,
model_key text,
request_body text,
response_body text,
latency_ms int,
PRIMARY KEY ((region, node_id), request_time, request_id)
) WITH CLUSTERING ORDER BY (request_time DESC);
The partition key (region, node_id)
prevents hot spots by distributing writes across nodes and regions.
The Logger (app/cassandra_logger.py
)
Using the cassandra-driver
with a proper DCAwareRoundRobinPolicy
is critical for multi-region performance. A common mistake is using the default policy, which can lead to high-latency cross-DC connections.
# app/cassandra_logger.py
import logging
import uuid
from datetime import datetime
from cassandra.cluster import Cluster, DCAwareRoundRobinPolicy, Session
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import PreparedStatement
from .config import settings
logger = logging.getLogger(__name__)
class CassandraLogger:
def __init__(self):
self.session: Optional[Session] = None
self._insert_statement: Optional[PreparedStatement] = None
def connect(self):
try:
# Policy is crucial for multi-DC awareness
policy = DCAwareRoundRobinPolicy(local_dc=settings.CASSANDRA_LOCAL_DC)
cluster = Cluster(
contact_points=settings.CASSANDRA_CONTACT_POINTS,
port=settings.CASSANDRA_PORT,
load_balancing_policy=policy,
# Add auth provider if needed
# auth_provider=PlainTextAuthProvider(username='...', password='...')
)
self.session = cluster.connect(settings.CASSANDRA_KEYSPACE)
logger.info("Successfully connected to Cassandra.")
self._prepare_statements()
except Exception as e:
logger.error(f"Could not connect to Cassandra: {e}", exc_info=True)
# The service can still run without Cassandra, but logging will be disabled.
self.session = None
def _prepare_statements(self):
if self.session:
query = """
INSERT INTO sentiment_requests (request_id, request_time, region, node_id, model_key, request_body, response_body, latency_ms)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
"""
self._insert_statement = self.session.prepare(query)
async def log_request_async(self, data: dict):
if not self.session or not self._insert_statement:
logger.warning("Cassandra not available. Skipping log.")
return
try:
# The driver's execute_async returns a future-like object
self.session.execute_async(self._insert_statement, (
uuid.uuid4(),
datetime.utcnow(),
data.get("region"),
data.get("node_id"),
data.get("model_key"),
data.get("request_body"),
data.get("response_body"),
data.get("latency_ms")
))
except Exception as e:
# A simple circuit breaker could be implemented here to stop trying for a while.
logger.error(f"Failed to log request to Cassandra: {e}")
def shutdown(self):
if self.session:
self.session.cluster.shutdown()
logger.info("Cassandra connection closed.")
cassandra_logger = CassandraLogger()
Putting It All Together (app/main.py
)
The FastAPI application ties everything together.
# app/main.py
import logging
import time
import json
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, BackgroundTasks
from pydantic import BaseModel
from .config import settings
from .etcd_client import etcd_client
from .model_manager import model_manager
from .cassandra_logger import cassandra_logger
logging.basicConfig(level=settings.LOG_LEVEL)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
cassandra_logger.connect()
etcd_client.register_node()
# Load initial model
initial_config = etcd_client.get_initial_model_config()
if initial_config:
etcd_client._handle_config_update(initial_config)
else:
logging.warning("No initial model configuration found in etcd.")
etcd_client.start_watching()
yield
# Shutdown
etcd_client.shutdown()
cassandra_logger.shutdown()
app = FastAPI(lifespan=lifespan)
class InferenceRequest(BaseModel):
text: str
class InferenceResponse(BaseModel):
label: str
score: float
model_key: str
node_id: str
@app.post("/predict", response_model=InferenceResponse)
async def predict(request: InferenceRequest, http_request: Request, background_tasks: BackgroundTasks):
start_time = time.time()
pipeline = model_manager.get_current_pipeline()
if not pipeline:
raise HTTPException(status_code=503, detail="Model not loaded or unavailable.")
# Core inference logic
result = pipeline(request.text)[0]
end_time = time.time()
latency_ms = int((end_time - start_time) * 1000)
response_data = {
"label": result['label'],
"score": result['score'],
"model_key": model_manager._active_model_key,
"node_id": settings.NODE_ID,
}
# Asynchronous logging
log_data = {
"region": settings.REGION,
"node_id": settings.NODE_ID,
"model_key": response_data["model_key"],
"request_body": request.model_dump_json(),
"response_body": json.dumps(response_data),
"latency_ms": latency_ms,
}
background_tasks.add_task(cassandra_logger.log_request_async, log_data)
return InferenceResponse(**response_data)
@app.get("/health")
def health():
# A more robust health check would verify model presence and dependency connections.
return {"status": "ok", "node_id": settings.NODE_ID, "model": model_manager._active_model_key}
Part 3: The Real-Time Dashboard with Jotai
The dashboard’s purpose is to give operators a live, at-a-glance view of the system’s state. It subscribes to a WebSocket stream from a control-plane service (not detailed here, but it essentially watches /service_registry/
and /deployments/
in etcd
and broadcasts changes).
Jotai Atom Definitions (src/state/atoms.js
)
We define atoms for each slice of state. This is Jotai’s strength: granular state management.
// src/state/atoms.js
import { atom } from 'jotai';
import { atomWithWebSocket } from 'jotai-formik'; // A utility library could be used
// Raw state from WebSocket
// In a real app, you'd use a more robust WebSocket atom utility or write your own.
// This is a simplified representation.
export const systemStateAtom = atom({}); // This will be populated by the WebSocket
// Derived atoms for UI components. These only recompute when their dependencies change.
export const nodesByRegionAtom = atom((get) => {
const state = get(systemStateAtom);
const nodes = state.nodes || {};
const grouped = {};
Object.entries(nodes).forEach(([key, value]) => {
const [_, region, nodeId] = key.split('/');
if (!grouped[region]) {
grouped[region] = [];
}
grouped[region].push({ id: nodeId, ...value });
});
return grouped;
});
export const deploymentsAtom = atom((get) => {
const state = get(systemStateAtom);
return state.deployments || {};
});
export const modelRegistryAtom = atom((get) => {
const state = get(systemStateAtom);
return state.models || {};
});
Note: This is pseudo-code for the WebSocket connection part. You would typically use a library or a useEffect
hook to manage the WebSocket connection and update a writable atom.
React Component (src/components/Dashboard.js
)
The component uses these atoms to render the state. It will automatically re-render only the necessary parts when the underlying atoms are updated.
// src/components/Dashboard.js
import React, { useEffect } from 'react';
import { useAtom, useSetAtom } from 'jotai';
import { nodesByRegionAtom, deploymentsAtom } from '../state/atoms';
// A mock WebSocket connection manager
const useSystemStateSocket = (url, setState) => {
useEffect(() => {
// In a real implementation, this would be a robust WebSocket client.
console.log('Connecting to WebSocket...');
// Mock receiving data every 2 seconds
const interval = setInterval(() => {
// MOCK DATA - this would come from the WebSocket
const mockData = {
nodes: {
"/service_registry/us-east-1/inference-node-a7b2c8": { ip: "10.0.1.23", status: "healthy" },
"/service_registry/eu-west-1/inference-node-d9e4f1": { ip: "20.0.4.55", status: "healthy" },
},
deployments: {
"us-east-1": { model_assignment: { model: "sentiment", version: "v1.1.0" } },
"eu-west-1": { model_assignment: { model: "sentiment", version: "v1.0.0" } },
}
};
setState(mockData);
}, 2000);
return () => clearInterval(interval);
}, [url, setState]);
};
const Dashboard = () => {
const [nodesByRegion] = useAtom(nodesByRegionAtom);
const [deployments] = useAtom(deploymentsAtom);
const setSystemState = useSetAtom(systemStateAtom); // Writable atom setter
// Connect to the backend and feed data into our atom
useSystemStateSocket('ws://localhost:8001/ws/system-state', setSystemState);
return (
<div style={{ fontFamily: 'monospace', display: 'flex', gap: '40px' }}>
{Object.entries(nodesByRegion).map(([region, nodes]) => (
<div key={region} style={{ border: '1px solid #ccc', padding: '10px' }}>
<h2>Region: {region}</h2>
<h4>Active Model: {deployments[region]?.model_assignment.model}:{deployments[region]?.model_assignment.version}</h4>
<hr />
{nodes.map(node => (
<div key={node.id} style={{ backgroundColor: '#f0f0f0', margin: '5px', padding: '5px' }}>
<strong>Node ID:</strong> {node.id}<br />
<strong>IP:</strong> {node.ip}<br />
<strong>Status:</strong> <span style={{ color: 'green' }}>{node.status}</span>
</div>
))}
</div>
))}
</div>
);
};
This demonstrates how Jotai allows for a clean separation between raw state fetching and derived state consumption in the UI.
Architecture Overview
graph TD subgraph Operator A[Admin CLI/UI] -- writes --> E[etcd Cluster]; end subgraph Real-time Dashboard D[Jotai Frontend] <-. WebSocket .-> B[Control Plane Service]; B -- watches --> E; end subgraph us-east-1 E1(etcd Member 1) C1(Cassandra Node) IN1[Python Inference Node 1] -- inference --> IN1; IN2[Python Inference Node 2] -- inference --> IN2; end subgraph eu-west-1 E2(etcd Member 2) C2(Cassandra Node) IN3[Python Inference Node 3] -- inference --> IN3; end E --- E1 --- E2 --- E; IN1 -- watch/lease --> E; IN2 -- watch/lease --> E; IN3 -- watch/lease --> E; IN1 -- async log --> C1; IN2 -- async log --> C1; IN3 -- async log --> C2; LB[Global Load Balancer] --> IN1; LB --> IN2; LB --> IN3; style IN1 fill:#d4fcd7 style IN2 fill:#d4fcd7 style IN3 fill:#d4fcd7
Lingering Issues and Future Iterations
This architecture provides a solid foundation, but it is not without its limitations. The process of downloading and loading a large Hugging Face model is slow and resource-intensive. A significant improvement would be a shared, read-only model cache (e.g., on EFS or a shared volume) accessible by all nodes in a region, combined with a pre-warming mechanism that loads the next model version onto nodes before making it active in etcd
. Furthermore, the current Cassandra logging is “fire-and-forget.” For critical auditing, a more robust mechanism using a message queue like Kafka would be necessary to guarantee delivery in the event of a prolonged Cassandra outage. Finally, our health check is passive (lease expiration); active health checks from a central service that probe the /health
endpoint would provide a more immediate and accurate picture of a node’s true serviceability.