The pain point wasn’t a catastrophic failure. It was the slow, creeping degradation of service quality in our event-sourced payment processing system. Our batch-oriented monitoring, which aggregated events every five minutes, was consistently late to the party. By the time an alert fired for a spike in PaymentFailed
events, the underlying issue—a subtle misconfiguration in a downstream partner API—had already impacted thousands of transactions. We were blind to the patterns leading up to the failure. We needed to see the system’s pulse, not just read its autopsy report.
The initial concept was to build a system that could tap directly into our event firehose (a Kafka topic), analyze event patterns in real-time, and push insights to a live dashboard. A key, and somewhat unusual, requirement was that the dashboard client must be exceptionally lightweight. All heavy lifting—data aggregation, machine learning inference, and even chart rendering—had to occur on the server. This constraint stemmed from a need to deploy this dashboard on low-power devices in our operations center and ensure absolute consistency in visualization, irrespective of the client’s browser or hardware.
This led to a specific and deliberate technology stack. Our Event Sourcing architecture was a given. For anomaly detection, TensorFlow was the natural choice, as our data science team had already developed a robust autoencoder model trained on historical event data. For pushing data to the client, Server-Sent Events (SSE) were selected over WebSockets. The communication is strictly one-way (server-to-client), and SSE’s simplicity, automatic reconnection handling, and use of standard HTTP made it a more resilient and less complex choice for this use case. The most debated component was visualization. Instead of sending aggregated JSON data for a client-side library like D3.js or Chart.js to render, we opted to generate static plots on the server using Seaborn. This decision, while unorthodox for web applications, directly addressed our core requirements: it offloaded all rendering logic from the client, guaranteed pixel-perfect consistency, and enabled us to use sophisticated statistical plots that are often difficult to replicate in JavaScript.
The resulting architecture is a pipeline of specialized services. It starts with an event consumer, feeds data into an inference engine, and terminates at an SSE endpoint that streams both anomaly scores and fully rendered visualizations.
graph TD subgraph Event Store A[Kafka Topic: payment_events] end subgraph Real-Time Pipeline B(Event Consumer) -- Batched Events --> C{Task Queue: Redis} C -- Inference Task --> D[Inference Worker Pool] D -- Loads Model --> E(TensorFlow Autoencoder) D -- Inference Result --> F{Result Store: Redis Pub/Sub} end subgraph Dashboard Backend G(SSE Server: FastAPI) -- Subscribes to --> F G -- Queries Recent Events --> B H(Seaborn Plot Generator) -- Generates Image --> G G -- SSE Stream --> I[Client Dashboard] end A --> B
This isn’t a simple script; it’s a decoupled system designed for resilience. A common mistake is to build this as a monolithic process. In a real-world project, if the TensorFlow inference or plot generation blocks, the entire event consumption pipeline halts, and you fall behind reality. Decoupling with a task queue is non-negotiable.
Part 1: The Event Consumer and Windowing Logic
The first component is a Python service that consumes from the event stream. Its sole responsibilities are to ingest events, group them into fixed-time windows (e.g., 10 seconds), and dispatch them for processing. It must be efficient and non-blocking. A critical pitfall here is naive consumption. Simply pulling one message at a time is inefficient. We need batching, but also a timeout to ensure we process data even during periods of low traffic.
Here is the core logic for a robust Kafka consumer. This example assumes events are JSON-serialized dictionaries with event_type
and timestamp
fields.
# consumer/consumer.py
import asyncio
import json
import logging
import uuid
from collections import Counter
from aiokafka import AIOKafkaConsumer
from redis import asyncio as aioredis
from tenacity import retry, stop_after_attempt, wait_fixed
# --- Configuration ---
KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
KAFKA_TOPIC = "payment_events"
KAFKA_CONSUMER_GROUP = "anomaly_detector_group"
REDIS_URL = "redis://redis:6379"
WINDOW_SECONDS = 10 # Aggregate events in 10-second windows.
MAX_BATCH_SIZE = 500 # Max events to process in one go.
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# --- Redis Connection ---
redis_client = aioredis.from_url(REDIS_URL, decode_responses=True)
@retry(stop=stop_after_attempt(5), wait=wait_fixed(2))
async def connect_to_kafka():
"""Attempt to connect to Kafka with retries."""
logger.info("Attempting to connect to Kafka...")
consumer = AIOKafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=KAFKA_CONSUMER_GROUP,
auto_offset_reset="latest",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)
await consumer.start()
logger.info("Successfully connected to Kafka.")
return consumer
async def dispatch_window_for_analysis(window_data: dict):
"""
Pushes the aggregated event window to a Redis Stream for processing.
Using a Stream is more robust than a simple queue.
"""
if not window_data:
return
task_id = str(uuid.uuid4())
payload = {
"task_id": task_id,
"window_start_ts": min(window_data.keys()),
"window_end_ts": max(window_data.keys()),
"event_counts": json.dumps(window_data['counts'])
}
await redis_client.xadd("inference_tasks", payload)
logger.info(f"Dispatched task {task_id} with {sum(window_data['counts'].values())} events.")
async def consume_events():
"""Main consumer loop."""
consumer = await connect_to_kafka()
try:
window_events = []
last_flush_time = asyncio.get_event_loop().time()
while True:
try:
# The timeout_ms is crucial. It prevents the loop from blocking indefinitely
# and allows us to process windows based on time.
result = await consumer.getmany(timeout_ms=1000, max_records=MAX_BATCH_SIZE)
for tp, messages in result.items():
for msg in messages:
# In a production system, you'd add validation here.
# e.g., using Pydantic to parse and validate the event.
window_events.append(msg.value)
current_time = asyncio.get_event_loop().time()
time_since_flush = current_time - last_flush_time
# We flush the window if it's full OR if the time limit is exceeded.
# This ensures both throughput during bursts and timeliness during lulls.
if window_events and (len(window_events) >= MAX_BATCH_SIZE or time_since_flush >= WINDOW_SECONDS):
# Aggregate events by type
event_counts = Counter(event.get("event_type", "unknown") for event in window_events)
# Prepare data for dispatch
dispatch_data = {
"counts": dict(event_counts),
"timestamps": [e.get("timestamp") for e in window_events]
}
# We pass this to a background task to avoid blocking the consumer loop
asyncio.create_task(dispatch_window_for_analysis({
'counts': event_counts,
'start_ts': min(e['timestamp'] for e in window_events),
'end_ts': max(e['timestamp'] for e in window_events)
}))
logger.info(f"Flushed window with {len(window_events)} events. Time elapsed: {time_since_flush:.2f}s")
window_events = []
last_flush_time = current_time
except Exception as e:
logger.error(f"Error in consumer loop: {e}", exc_info=True)
# Sleep briefly to prevent tight loop on persistent errors
await asyncio.sleep(5)
finally:
await consumer.stop()
logger.info("Kafka consumer stopped.")
if __name__ == "__main__":
try:
asyncio.run(consume_events())
except KeyboardInterrupt:
logger.info("Consumer process interrupted.")
The key design decision here is the dual-condition flush: process a batch when it’s full or when a time window expires. This is a pragmatic compromise between latency and efficiency. We also dispatch the task using asyncio.create_task
to prevent the network call to Redis from blocking the tight consumption loop.
Part 2: The TensorFlow Inference Worker
This component is a standalone process that listens for tasks on the Redis Stream, performs inference, and publishes the result. It’s designed to be horizontally scalable. If inference becomes a bottleneck, we simply launch more instances of this worker.
The worker loads a pre-trained Keras autoencoder model. The model’s job is to reconstruct the input vector of event counts. A large reconstruction error signifies that the current pattern of events is unusual compared to what the model learned during training, indicating a potential anomaly.
# worker/inference_worker.py
import asyncio
import json
import logging
import os
import numpy as np
import tensorflow as tf
from redis import asyncio as aioredis
from tenacity import retry, stop_after_attempt, wait_fixed
# --- Configuration ---
REDIS_URL = "redis://redis:6379"
MODEL_PATH = "/models/event_autoencoder.h5"
# Define the order of events the model was trained on. This is CRITICAL.
EVENT_VECTOR_ORDER = [
"PaymentInitiated",
"PaymentSuccess",
"PaymentFailed",
"FraudCheckPassed",
"FraudCheckFailed",
"UserAuthenticated",
"APILimitExceeded",
]
ANOMALY_THRESHOLD = 0.8 # Determined experimentally
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# --- Global Model Variable ---
# A common practice to avoid reloading the model for every inference.
model = None
def load_model():
"""Loads the TensorFlow/Keras model into memory."""
global model
if not os.path.exists(MODEL_PATH):
logger.error(f"Model file not found at {MODEL_PATH}")
raise FileNotFoundError(f"Model file not found at {MODEL_PATH}")
try:
model = tf.keras.models.load_model(MODEL_PATH)
logger.info(f"TensorFlow model loaded successfully from {MODEL_PATH}")
# Perform a dummy prediction to "warm up" the model
dummy_input = np.zeros((1, len(EVENT_VECTOR_ORDER)))
model.predict(dummy_input)
logger.info("Model warmed up.")
except Exception as e:
logger.error(f"Failed to load model: {e}", exc_info=True)
raise
def prepare_input_vector(event_counts: dict) -> np.ndarray:
"""
Converts a dictionary of event counts into a fixed-order numpy array.
The order must match the training data precisely.
"""
vector = [event_counts.get(event_type, 0) for event_type in EVENT_VECTOR_ORDER]
return np.array(vector, dtype=np.float32).reshape(1, -1)
async def process_inference_task(task_data: dict, redis_client):
"""
Performs inference and publishes the result.
"""
global model
task_id = task_data.get('task_id', 'unknown')
logger.info(f"Processing task {task_id}")
try:
event_counts = json.loads(task_data['event_counts'])
# 1. Prepare input
input_vector = prepare_input_vector(event_counts)
# 2. Perform inference
reconstructed_vector = model.predict(input_vector)
# 3. Calculate reconstruction error (Mean Squared Error)
mse = np.mean(np.power(input_vector - reconstructed_vector, 2))
is_anomaly = mse > ANOMALY_THRESHOLD
logger.info(f"Task {task_id}: MSE={mse:.4f}, Anomaly={is_anomaly}")
# 4. Publish result
result_payload = json.dumps({
"task_id": task_id,
"reconstruction_error": float(mse),
"is_anomaly": is_anomaly,
"timestamp": task_data['window_end_ts'],
"event_counts": event_counts,
})
await redis_client.publish("anomaly_results", result_payload)
except Exception as e:
logger.error(f"Error processing task {task_id}: {e}", exc_info=True)
@retry(stop=stop_after_attempt(5), wait=wait_fixed(2))
async def connect_to_redis():
logger.info("Connecting to Redis...")
client = aioredis.from_url(REDIS_URL, decode_responses=True)
await client.ping()
logger.info("Successfully connected to Redis.")
return client
async def main():
"""Main worker loop listening for tasks."""
load_model()
redis_client = await connect_to_redis()
# Using '0-0' starts reading from the beginning of the stream.
# In a real deployment, you'd manage the last seen ID for failover.
last_id = '0-0'
stream_name = "inference_tasks"
group_name = "inference_workers"
consumer_name = f"worker-{os.getpid()}"
try:
await redis_client.xgroup_create(stream_name, group_name, id='0', mkstream=True)
except Exception:
logger.info(f"Group '{group_name}' already exists.")
logger.info(f"Worker {consumer_name} starting to listen for tasks...")
while True:
try:
# `XREADGROUP` ensures that messages are distributed among workers in the group.
# The `block=0` makes it wait indefinitely for a new message.
response = await redis_client.xreadgroup(
group_name,
consumer_name,
{stream_name: '>'}, # '>' means only new messages
count=1,
block=0
)
if response:
stream, messages = response[0]
message_id, task_data = messages[0]
await process_inference_task(task_data, redis_client)
# Acknowledge the message so it's not redelivered.
await redis_client.xack(stream_name, group_name, message_id)
except Exception as e:
logger.error(f"Error in worker main loop: {e}", exc_info=True)
await asyncio.sleep(5)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Inference worker interrupted.")
A critical implementation detail is the EVENT_VECTOR_ORDER
list. A common mistake is to feed data to a model without guaranteeing the feature order. If the training data had PaymentSuccess
as the second feature and the inference code puts it third, the results will be meaningless. This mapping must be treated as a contract between the model and the application.
Part 3: The SSE Server and Visualization Backend
This is the public-facing component. It uses FastAPI for its excellent async support, which is ideal for handling long-lived SSE connections. It does two things:
- Listens to the
anomaly_results
Redis Pub/Sub channel for new inference results. - Upon receiving a result (especially an anomaly), it generates a statistical plot with Seaborn and pushes both the score and the plot to all connected clients via SSE.
The trickiest part is handling the Seaborn plot generation, which is a blocking, CPU-bound operation. Running it directly in the async event loop would be a disaster, freezing the entire server. The solution is to run it in a separate thread pool using asyncio.to_thread
(or loop.run_in_executor
in older Python).
# sse_server/main.py
import asyncio
import base64
import io
import json
import logging
from collections import deque
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from sse_starlette.sse import EventSourceResponse
from redis import asyncio as aioredis
# --- Configuration ---
REDIS_URL = "redis://redis:6379"
MAX_HISTORY_POINTS = 100 # Keep last 100 data points for plotting trends
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s - [%(levelname)s] - %(message)s")
logger = logging.getLogger(__name__)
# --- Application State ---
# Using deque for efficient append and pop from opposite ends.
reconstruction_errors = deque(maxlen=MAX_HISTORY_POINTS)
# A simple in-memory cache for recent event counts, keyed by timestamp.
recent_event_counts = {}
# --- FastAPI App ---
app = FastAPI()
# --- SSE Stream Generator ---
async def sse_generator(request: Request):
"""
Yields server-sent events to the client.
"""
redis_client = aioredis.from_url(REDIS_URL)
pubsub = redis_client.pubsub()
await pubsub.subscribe("anomaly_results")
logger.info("Client connected to SSE stream.")
try:
while True:
# Check if client has disconnected
if await request.is_disconnected():
logger.info("Client disconnected.")
break
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if message and message["type"] == "message":
data = json.loads(message["data"])
# Append data for trend plotting
error = data['reconstruction_error']
timestamp = data['timestamp']
reconstruction_errors.append({'timestamp': timestamp, 'error': error})
# Cache event counts for detailed visualization
recent_event_counts[timestamp] = data['event_counts']
# Prune old cache entries to prevent memory leak
if len(recent_event_counts) > MAX_HISTORY_POINTS:
oldest_ts = sorted(recent_event_counts.keys())[0]
del recent_event_counts[oldest_ts]
# Yield the raw anomaly score immediately for responsiveness
yield {
"event": "anomaly_score",
"data": json.dumps({"timestamp": timestamp, "error": error, "is_anomaly": data['is_anomaly']})
}
# If it's an anomaly, generate and send a detailed plot
if data['is_anomaly']:
logger.info("Anomaly detected. Generating detailed plot...")
# This is the critical part: run blocking code in a thread pool
plot_b64 = await asyncio.to_thread(generate_anomaly_plot, data['event_counts'])
yield {
"event": "anomaly_plot",
"data": json.dumps({"image": plot_b64})
}
# On a regular interval, send a trend plot regardless of anomaly
# (This part can be added for continuous visualization)
await asyncio.sleep(0.1) # Small sleep to yield control
finally:
await pubsub.unsubscribe("anomaly_results")
await redis_client.close()
logger.info("SSE connection closed and resources released.")
def generate_anomaly_plot(event_counts: dict) -> str:
"""
Generates a bar plot of event counts using Seaborn, and returns a base64 encoded string.
This is a BLOCKING function.
"""
try:
plt.style.use('seaborn-v0_8-darkgrid')
fig, ax = plt.subplots(figsize=(10, 6))
events_df = pd.DataFrame(list(event_counts.items()), columns=['EventType', 'Count']).sort_values('Count', ascending=False)
sns.barplot(x='Count', y='EventType', data=events_df, ax=ax, palette='viridis')
ax.set_title('Event Distribution During Anomaly')
ax.set_xlabel('Event Count')
ax.set_ylabel('Event Type')
plt.tight_layout()
# Save plot to an in-memory buffer
buf = io.BytesIO()
plt.savefig(buf, format='png', bbox_inches='tight')
buf.seek(0)
# Encode to base64
img_str = base64.b64encode(buf.read()).decode('utf-8')
return f"data:image/png;base64,{img_str}"
finally:
# IMPORTANT: Clear the plot to free memory. Matplotlib state is global.
plt.close(fig)
@app.get("/stream")
async def stream(request: Request):
return EventSourceResponse(sse_generator(request))
@app.get("/", response_class=HTMLResponse)
async def root():
# Simple HTML page to display the stream
return """
<!DOCTYPE html>
<html>
<head>
<title>Real-Time Anomaly Detection</title>
<style>
body { font-family: sans-serif; background-color: #121212; color: #e0e0e0; }
#content { max-width: 900px; margin: auto; padding: 20px; }
#plot-container { margin-top: 20px; border: 1px solid #444; padding: 10px; min-height: 400px; }
img { max-width: 100%; }
.anomaly { color: #ff6b6b; font-weight: bold; }
</style>
</head>
<body>
<div id="content">
<h1>System Status</h1>
<p><strong>Current Reconstruction Error:</strong> <span id="error">N/A</span></p>
<div id="plot-container">
<p>Waiting for anomaly plot...</p>
<img id="plot-img" src="" alt="Anomaly Plot"/>
</div>
</div>
<script>
const errorEl = document.getElementById('error');
const plotImgEl = document.getElementById('plot-img');
const plotContainer = document.getElementById('plot-container');
const sse = new EventSource('/stream');
sse.addEventListener('anomaly_score', function(e) {
const data = JSON.parse(e.data);
errorEl.textContent = parseFloat(data.error).toFixed(5);
if (data.is_anomaly) {
errorEl.classList.add('anomaly');
errorEl.textContent += ' (ANOMALY DETECTED)';
} else {
errorEl.classList.remove('anomaly');
}
});
sse.addEventListener('anomaly_plot', function(e) {
const data = JSON.parse(e.data);
plotImgEl.src = data.image;
plotContainer.querySelector('p').style.display = 'none';
});
sse.onerror = function() {
errorEl.textContent = "Connection lost. Reconnecting...";
errorEl.classList.add('anomaly');
};
</script>
</body>
</html>
"""
The frontend JavaScript is minimal, as intended. It establishes an EventSource
connection and listens for two custom events: anomaly_score
for the raw metric and anomaly_plot
for the visualization. When a plot event arrives, it simply updates the src
attribute of an <img>
tag with the base64 data URI. This architecture proves robust and fulfills the “thin client” requirement perfectly.
Limitations and Future Work
This system, while effective, has clear boundaries. The server-side rendering with Seaborn is its greatest strength and its primary bottleneck. For a handful of operators watching dashboards, it’s efficient. For hundreds of concurrent users, the CPU load for plot generation would become prohibitive. A potential optimization path would be to render plots to an in-memory cache (e.g., Redis) with a short TTL, so that multiple clients viewing the same anomaly event would receive the same cached image instead of triggering redundant renders.
Another significant consideration is model drift. The autoencoder is trained on historical data. As the system evolves and event patterns naturally change, the model’s definition of “normal” will become outdated, leading to false positives or negatives. A complete MLOps pipeline for periodic model retraining, validation, and seamless deployment is a necessary next step for a true production-grade system.
Finally, the in-memory state of the SSE server (the deque
of recent errors and event counts) is ephemeral. If the server restarts, that history is lost. For more critical applications, this state could be persisted to an external store like a time-series database (e.g., InfluxDB or Prometheus) to provide more durable trend visualizations across service interruptions.