The initial system was deceptively simple and fundamentally broken. The requirement was to build a real-time monitoring dashboard for a fleet of IoT devices, each emitting telemetry events several times a minute. The proof-of-concept, hastily assembled, consisted of a FastAPI endpoint that received a JSON payload and immediately inserted it into a ClickHouse table. The front-end, a Flutter web app, polled another endpoint every two seconds to refresh its charts. It worked flawlessly with ten simulated devices. With ten thousand, it didn’t just slow down; it collapsed.
The logs told a clear story of resource exhaustion. The API server’s connection pool to ClickHouse was perpetually maxed out. CPU usage on the database node spiked due to thousands of tiny, concurrent insert operations, each carrying significant transactional overhead. The database itself was struggling with parts merging. On the client side, the constant polling created a thundering herd problem, hammering the API’s query endpoint and yielding a user experience that was anything but “real-time.” The data on screen was often 30-60 seconds stale. This wasn’t a scaling problem; it was an architectural failure.
The redesign had to address two core bottlenecks: write amplification on the ingestion path and read amplification on the visualization path. The solution required a complete shift from a synchronous, request-response model to an asynchronous, batched, and push-based architecture.
Decoupling Ingestion with an Asynchronous Batching Service
The root cause of the ingestion failure was treating ClickHouse like a transactional OLTP database. ClickHouse, with its MergeTree
engine family, is optimized for bulk inserts of large batches of data, not for high-frequency, single-row writes. Each small INSERT
creates a new data part on disk, which later requires an expensive merge operation.
Our first principle became: never write to ClickHouse on a per-request basis. We needed an intermediate buffer within our FastAPI application to collect events and flush them to the database periodically. asyncio
provides the perfect tools for this.
Here is the architecture we settled on for the ingestion service:
graph TD subgraph FastAPI Application A[Ingestion Endpoint /events] --> B{asyncio.Queue}; C[Background Writer Task] -- reads from --> B; C -- bulk insert --> D[ClickHouse]; end E[IoT Devices] -- POST JSON --> A;
The core components are an asyncio.Queue
acting as an in-memory buffer and a background task that runs for the lifetime of the application. The HTTP endpoint’s only job is to perform validation and put the event into the queue, making its response time incredibly fast.
Here is the implementation of the BatchingClickHouseClient
. This class encapsulates the queue, the background writer, and the graceful shutdown logic. In a real-world project, this would live in its own module.
# src/analytics/batch_client.py
import asyncio
import logging
from typing import List, Dict, Any
from collections import deque
import backoff
from aiohttp import ClientSession, ClientError
from fastapi import FastAPI
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class BatchingClickHouseClient:
"""
Manages asynchronous batching and insertion of data into ClickHouse.
This service collects events in an in-memory queue and flushes them
periodically in a background task.
"""
def __init__(
self,
app: FastAPI,
host: str,
port: int,
database: str,
table: str,
batch_size: int = 1000,
flush_interval_seconds: float = 5.0,
):
self._app = app
self._host = host
self._port = port
self._database = database
self._table = table
self._batch_size = batch_size
self._flush_interval = flush_interval_seconds
self._queue = asyncio.Queue()
self._writer_task = None
self._http_session = None
# Register startup and shutdown event handlers
self._app.add_event_handler("startup", self.startup)
self._app.add_event_handler("shutdown", self.shutdown)
async def startup(self):
"""Initializes resources and starts the background writer task."""
logger.info("Starting ClickHouse batching client...")
self._http_session = ClientSession()
self._writer_task = asyncio.create_task(self._batch_writer())
logger.info("ClickHouse batch writer task started.")
async def shutdown(self):
"""Gracefully shuts down the client, flushing any remaining events."""
logger.info("Shutting down ClickHouse batching client...")
if self._writer_task:
# Signal the writer to finish and wait for it
await self._queue.put(None)
try:
await asyncio.wait_for(self._writer_task, timeout=self._flush_interval + 5)
except asyncio.TimeoutError:
logger.warning("Writer task shutdown timed out. Some data may be lost.")
if self._http_session and not self._http_session.closed:
await self._http_session.close()
logger.info("ClickHouse client shut down complete.")
async def push_event(self, event: Dict[str, Any]):
"""Asynchronously adds a single event to the processing queue."""
await self._queue.put(event)
@backoff.on_exception(backoff.expo, ClientError, max_tries=5, jitter=backoff.full_jitter)
async def _flush_batch(self, batch: List[Dict[str, Any]]):
"""
Sends a batch of data to ClickHouse via its HTTP interface.
Uses an exponential backoff strategy for transient network errors.
"""
if not batch:
return
# ClickHouse HTTP interface expects data in a specific format, like JSONEachRow.
# Each JSON object is a row, separated by a newline.
payload = "\n".join([str(row).replace("'", '"') for row in batch])
params = {
"database": self._database,
"query": f"INSERT INTO {self._table} FORMAT JSONEachRow",
}
url = f"http://{self._host}:{self._port}/"
try:
async with self._http_session.post(url, params=params, data=payload.encode('utf-8')) as response:
response.raise_for_status()
logger.info(f"Successfully flushed {len(batch)} events to ClickHouse.")
except ClientError as e:
logger.error(f"Failed to flush batch to ClickHouse: {e}. Batch size: {len(batch)}. Retrying...")
raise # Re-raise to trigger backoff
async def _batch_writer(self):
"""
The core background task. It continuously consumes from the queue
and flushes batches based on size or time.
"""
batch = []
while True:
try:
# Wait for an item, but with a timeout to enable periodic flushes
event = await asyncio.wait_for(self._queue.get(), timeout=self._flush_interval)
if event is None: # Shutdown signal
logger.info("Shutdown signal received. Flushing final batch.")
if batch:
await self._flush_batch(batch)
break # Exit the loop
batch.append(event)
if len(batch) >= self._batch_size:
await self._flush_batch(batch)
batch.clear()
except asyncio.TimeoutError:
# Flush interval reached
if batch:
logger.info(f"Flush interval reached. Flushing batch of size {len(batch)}.")
await self._flush_batch(batch)
batch.clear()
except Exception as e:
logger.critical(f"Unhandled exception in batch writer: {e}. The writer will restart.", exc_info=True)
# In a real system, you might want more robust recovery here.
# For now, we log and continue, which might lose the current batch.
batch.clear()
await asyncio.sleep(5) # Avoid tight loop on persistent errors
The corresponding ClickHouse table must be designed for this workload. We use a MergeTree
engine, sorting by device_id
and timestamp
as this is our primary query pattern.
-- ddl/events_table.sql
CREATE TABLE default.telemetry_events (
`timestamp` DateTime64(3, 'UTC'),
`device_id` String,
`event_type` LowCardinality(String),
`value_float` Float64,
`value_string` String,
`metadata` Map(String, String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (device_id, timestamp);
The FastAPI application ties this together. The configuration is loaded from environment variables for production readiness.
# src/main.py
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel, Field
from typing import Dict, Any
import datetime
import os
from .analytics.batch_client import BatchingClickHouseClient
app = FastAPI(title="Telemetry Ingestion Service")
# --- Configuration ---
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "localhost")
CLICKHOUSE_PORT = int(os.getenv("CLICKHOUSE_PORT", 9000))
CLICKHOUSE_DB = os.getenv("CLICKHOUSE_DB", "default")
CLICKHOUSE_TABLE = "telemetry_events"
BATCH_SIZE = int(os.getenv("BATCH_SIZE", 5000))
FLUSH_INTERVAL = float(os.getenv("FLUSH_INTERVAL", 2.0))
# --- Instantiate the batching client ---
# This automatically registers startup/shutdown hooks
ch_client = BatchingClickHouseClient(
app=app,
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_PORT,
database=CLICKHOUSE_DB,
table=CLICKHOUSE_TABLE,
batch_size=BATCH_SIZE,
flush_interval_seconds=FLUSH_INTERVAL,
)
# --- Pydantic Models for Validation ---
class TelemetryEvent(BaseModel):
device_id: str = Field(..., min_length=1)
event_type: str = Field(..., min_length=1)
value_float: float | None = None
value_string: str | None = None
metadata: Dict[str, str] = {}
# We'll add the timestamp on the server side to ensure consistency
timestamp: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now(datetime.timezone.utc), exclude=True)
class IngestionPayload(BaseModel):
events: list[TelemetryEvent]
# --- API Endpoint ---
@app.post("/ingest")
async def ingest_events(payload: IngestionPayload):
"""
Receives a batch of telemetry events and queues them for processing.
Responds immediately with 202 Accepted.
"""
for event in payload.events:
event_dict = event.model_dump()
# Add the server-generated timestamp
event_dict["timestamp"] = event.timestamp.isoformat()
await ch_client.push_event(event_dict)
return {"status": "queued", "event_count": len(payload.events)}
This architecture transformed the ingestion performance. The API endpoint response time dropped to under 5ms, as its only job was validation and an in-memory queue operation. The database load stabilized, as it was now receiving large, orderly batches of data perfectly suited for its MergeTree
engine.
Real-Time Updates with Server-Sent Events
With ingestion solved, we turned to the visualization side. Polling was out. WebSockets were an option, but for a one-way stream of data from server to client, they felt like overkill. A key pragmatic decision was to use Server-Sent Events (SSE). SSE runs over standard HTTP, is simpler to implement on both client and server, and has built-in support for automatic reconnection, which is a critical feature for a long-lived dashboard PWA.
The naive approach would be to have the SSE endpoint query ClickHouse directly. This couples the real-time layer directly to the analytics database, creating a new performance bottleneck. A better approach is to introduce another decoupled component: an in-memory aggregator.
graph TD subgraph FastAPI Application D[ClickHouse] F[Aggregator Task] -- periodically SELECT from --> D; F -- updates state --> G[In-memory State Cache]; H[SSE Broadcaster] -- reads from --> G; H -- pushes updates to --> I[Connected Clients]; J[SSE Endpoint /dashboard-stream] -- registers client with --> H; end
The Aggregator Task
is another asyncio
background task. It wakes up every few seconds, runs a fast, pre-defined aggregation query against ClickHouse, and stores the result in a simple Python dictionary. The SSE Broadcaster
then pushes this cached result to all connected clients. This decouples client demand from database load. One hundred connected clients result in only one database query every N seconds, not one hundred.
Here’s the implementation for the SSE streaming components.
# src/streaming/sse_manager.py
import asyncio
import logging
import json
from typing import Set
from sse_starlette.sse import EventSourceResponse
logger = logging.getLogger(__name__)
class SSEManager:
"""
Manages active SSE connections and broadcasts data to all of them.
"""
def __init__(self):
self.active_connections: Set[asyncio.Queue] = set()
logger.info("SSE Manager initialized.")
async def add_connection(self, queue: asyncio.Queue):
"""Registers a new client connection."""
self.active_connections.add(queue)
logger.info(f"New SSE client connected. Total clients: {len(self.active_connections)}")
def remove_connection(self, queue: asyncio.Queue):
"""Removes a client connection."""
self.active_connections.remove(queue)
logger.info(f"SSE client disconnected. Total clients: {len(self.active_connections)}")
async def broadcast(self, data: dict):
"""Sends data to all connected clients."""
if not self.active_connections:
return
# The data must be a JSON string for SSE
message = json.dumps(data)
# Use asyncio.gather to send to all clients concurrently
tasks = [conn.put(message) for conn in self.active_connections]
await asyncio.gather(*tasks, return_exceptions=False)
logger.debug(f"Broadcasted update to {len(self.active_connections)} clients.")
async def stream_generator(self):
"""
An async generator that yields events for a single client connection.
This is what EventSourceResponse consumes.
"""
queue = asyncio.Queue()
await self.add_connection(queue)
try:
while True:
# Wait for a message from the broadcaster
message = await queue.get()
yield {
"event": "update",
"data": message
}
except asyncio.CancelledError:
# This happens when the client disconnects
self.remove_connection(queue)
logger.info("SSE stream cancelled for a client.")
raise
# Create a singleton instance
sse_manager = SSEManager()
And the aggregation task, which we’ll add to our main.py
:
# src/main.py (additions)
import aiohttp
# ... (previous code) ...
from .streaming.sse_manager import sse_manager
# --- Aggregation and Streaming Logic ---
AGGREGATION_INTERVAL = float(os.getenv("AGGREGATION_INTERVAL", 5.0))
_aggregator_task = None
_http_session_for_aggregator = None
async def run_aggregation():
"""Background task to periodically query ClickHouse for aggregated data."""
global _http_session_for_aggregator
_http_session_for_aggregator = aiohttp.ClientSession()
while True:
try:
await asyncio.sleep(AGGREGATION_INTERVAL)
logger.info("Running aggregation query...")
# Example query: count events by type in the last 10 minutes
query = """
SELECT
event_type,
count() AS event_count,
avg(value_float) AS avg_value
FROM default.telemetry_events
WHERE timestamp >= now() - INTERVAL 10 MINUTE
GROUP BY event_type
ORDER BY event_count DESC
LIMIT 10
"""
params = {"query": query, "default_format": "JSON"}
url = f"http://{CLICKHOUSE_HOST}:{CLICKHOUSE_PORT}/"
async with _http_session_for_aggregator.get(url, params=params) as response:
response.raise_for_status()
data = await response.json()
# The data structure from ClickHouse is specific
payload = {
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"aggregations": data.get("data", [])
}
# Broadcast the fresh data to all connected SSE clients
await sse_manager.broadcast(payload)
except Exception as e:
logger.error(f"Error in aggregation task: {e}", exc_info=True)
@app.on_event("startup")
async def startup_aggregator():
global _aggregator_task
_aggregator_task = asyncio.create_task(run_aggregation())
logger.info("Aggregation background task started.")
@app.on_event("shutdown")
async def shutdown_aggregator():
if _aggregator_task:
_aggregator_task.cancel()
logger.info("Aggregation task cancelled.")
if _http_session_for_aggregator:
await _http_session_for_aggregator.close()
@app.get("/dashboard-stream")
async def dashboard_stream(request: Request):
"""Endpoint for clients to connect for real-time dashboard updates."""
# EventSourceResponse handles the SSE protocol details
return EventSourceResponse(sse_manager.stream_generator())
The Flutter PWA Client: Taming the Data Stream
The final piece was the client. Flutter for web was chosen to maintain a single codebase for a potential native mobile app. While powerful, building a high-performance, data-heavy PWA in Flutter presents its own challenges. The key is efficient state management and rendering. A naive implementation that calls setState()
on the root widget with every SSE message would cause the entire UI to rebuild multiple times per second, leading to a sluggish experience.
We used the riverpod
package for state management, which allows for granular control over which widgets rebuild. The connection logic was encapsulated in a service that exposes a Stream
of parsed data.
Here is the Dart service for handling the SSE connection. For a real PWA, you would use the html.EventSource
class from dart:html
.
// lib/services/sse_service.dart
import 'dart:async';
import 'dart:convert';
import 'package:http/http.dart' as http;
import 'package:flutter_riverpod/flutter_riverpod.dart';
// A simple model for our aggregated data
class DashboardUpdate {
final DateTime timestamp;
final List<Map<String, dynamic>> aggregations;
DashboardUpdate({required this.timestamp, required this.aggregations});
factory DashboardUpdate.fromJson(Map<String, dynamic> json) {
return DashboardUpdate(
timestamp: DateTime.parse(json['timestamp']),
aggregations: List<Map<String, dynamic>>.from(json['aggregations']),
);
}
}
class SseService {
final _controller = StreamController<DashboardUpdate>();
http.Client? _client;
Stream<DashboardUpdate> get updates => _controller.stream;
void connect() {
// In a real browser environment, you'd use dart:html's EventSource.
// This is a dart:io http client implementation for demonstration.
_client = http.Client();
final request = http.Request("GET", Uri.parse("http://localhost:8000/dashboard-stream"));
request.headers["Cache-Control"] = "no-cache";
request.headers["Accept"] = "text/event-stream";
Future<http.StreamedResponse> responseFuture = _client!.send(request);
responseFuture.then((response) {
print("SSE Connection established.");
response.stream.transform(utf8.decoder).transform(const LineSplitter()).listen(
(line) {
if (line.startsWith('data:')) {
final dataString = line.substring(5).trim();
if (dataString.isNotEmpty) {
try {
final json = jsonDecode(dataString);
final update = DashboardUpdate.fromJson(json);
_controller.add(update);
} catch (e) {
print("Error parsing SSE data: $e");
}
}
}
},
onDone: () {
print("SSE Connection closed by server.");
_reconnect();
},
onError: (error) {
print("SSE Stream error: $error");
_controller.addError(error);
_reconnect();
},
cancelOnError: true,
);
}).catchError((error) {
print("Error establishing SSE connection: $error");
_reconnect();
});
}
void _reconnect() {
dispose();
print("Attempting to reconnect in 5 seconds...");
Timer(const Duration(seconds: 5), connect);
}
void dispose() {
_client?.close();
_client = null;
print("SSE Service disposed.");
}
}
// Riverpod provider for the service and stream
final sseServiceProvider = Provider<SseService>((ref) {
final service = SseService();
service.connect();
ref.onDispose(() => service.dispose());
return service;
});
final dashboardUpdateProvider = StreamProvider<DashboardUpdate>((ref) {
return ref.watch(sseServiceProvider).updates;
});
The UI can then use a Consumer
widget and ref.watch
on the dashboardUpdateProvider
. This ensures that only the widgets that depend on this stream are rebuilt when new data arrives.
// lib/widgets/dashboard_view.dart
import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import '../services/sse_service.dart';
class DashboardView extends ConsumerWidget {
const DashboardView({super.key});
Widget build(BuildContext context, WidgetRef ref) {
final asyncUpdate = ref.watch(dashboardUpdateProvider);
return Scaffold(
appBar: AppBar(title: const Text("Real-Time Telemetry Dashboard")),
body: Center(
child: asyncUpdate.when(
data: (update) => _buildDataTable(context, update),
loading: () => const CircularProgressIndicator(),
error: (err, stack) => Text("Error: $err"),
),
),
);
}
Widget _buildDataTable(BuildContext context, DashboardUpdate update) {
final aggregations = update.aggregations;
if (aggregations.isEmpty) {
return const Text("Waiting for data...");
}
return Column(
children: [
Text("Last updated: ${update.timestamp.toLocal()}"),
DataTable(
columns: const [
DataColumn(label: Text('Event Type')),
DataColumn(label: Text('Count (last 10m)')),
DataColumn(label: Text('Average Value')),
],
rows: aggregations.map((agg) => DataRow(
cells: [
DataCell(Text(agg['event_type'].toString())),
DataCell(Text(agg['event_count'].toString())),
DataCell(Text(
(agg['avg_value'] as num?)?.toStringAsFixed(2) ?? 'N/A')),
],
)).toList(),
),
],
);
}
}
This final architecture proved robust and scalable. It could comfortably handle hundreds of thousands of events per minute, with end-to-end latency (from event generation to UI update) consistently under 10 seconds. The database remained stable, the API server’s resource usage was minimal, and the PWA was responsive.
The primary limitation of this design is its statefulness. Both the batching queue and the SSE connection manager reside within single instances of the FastAPI application. This makes horizontal scaling impossible without introducing an external dependency. A production-grade evolution would replace the in-memory asyncio.Queue
with Redis Streams or Kafka for durable, scalable ingestion. Similarly, the SSE broadcaster would use Redis Pub/Sub to allow multiple API instances to broadcast updates to their respective connected clients, enabling true stateless horizontal scaling of the web tier. Furthermore, while Flutter for Web proved capable, its initial load time and bundle size remain a consideration for public-facing PWAs compared to JavaScript-native frameworks, requiring careful performance tuning and code splitting strategies.