Architecting a High-Throughput Ingestion Pipeline Using FastAPI Batching for ClickHouse and SSE Streaming to a Flutter PWA


The initial system was deceptively simple and fundamentally broken. The requirement was to build a real-time monitoring dashboard for a fleet of IoT devices, each emitting telemetry events several times a minute. The proof-of-concept, hastily assembled, consisted of a FastAPI endpoint that received a JSON payload and immediately inserted it into a ClickHouse table. The front-end, a Flutter web app, polled another endpoint every two seconds to refresh its charts. It worked flawlessly with ten simulated devices. With ten thousand, it didn’t just slow down; it collapsed.

The logs told a clear story of resource exhaustion. The API server’s connection pool to ClickHouse was perpetually maxed out. CPU usage on the database node spiked due to thousands of tiny, concurrent insert operations, each carrying significant transactional overhead. The database itself was struggling with parts merging. On the client side, the constant polling created a thundering herd problem, hammering the API’s query endpoint and yielding a user experience that was anything but “real-time.” The data on screen was often 30-60 seconds stale. This wasn’t a scaling problem; it was an architectural failure.

The redesign had to address two core bottlenecks: write amplification on the ingestion path and read amplification on the visualization path. The solution required a complete shift from a synchronous, request-response model to an asynchronous, batched, and push-based architecture.

Decoupling Ingestion with an Asynchronous Batching Service

The root cause of the ingestion failure was treating ClickHouse like a transactional OLTP database. ClickHouse, with its MergeTree engine family, is optimized for bulk inserts of large batches of data, not for high-frequency, single-row writes. Each small INSERT creates a new data part on disk, which later requires an expensive merge operation.

Our first principle became: never write to ClickHouse on a per-request basis. We needed an intermediate buffer within our FastAPI application to collect events and flush them to the database periodically. asyncio provides the perfect tools for this.

Here is the architecture we settled on for the ingestion service:

graph TD
    subgraph FastAPI Application
        A[Ingestion Endpoint /events] --> B{asyncio.Queue};
        C[Background Writer Task] -- reads from --> B;
        C -- bulk insert --> D[ClickHouse];
    end
    E[IoT Devices] -- POST JSON --> A;

The core components are an asyncio.Queue acting as an in-memory buffer and a background task that runs for the lifetime of the application. The HTTP endpoint’s only job is to perform validation and put the event into the queue, making its response time incredibly fast.

Here is the implementation of the BatchingClickHouseClient. This class encapsulates the queue, the background writer, and the graceful shutdown logic. In a real-world project, this would live in its own module.

# src/analytics/batch_client.py

import asyncio
import logging
from typing import List, Dict, Any
from collections import deque

import backoff
from aiohttp import ClientSession, ClientError
from fastapi import FastAPI

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class BatchingClickHouseClient:
    """
    Manages asynchronous batching and insertion of data into ClickHouse.
    This service collects events in an in-memory queue and flushes them
    periodically in a background task.
    """

    def __init__(
        self,
        app: FastAPI,
        host: str,
        port: int,
        database: str,
        table: str,
        batch_size: int = 1000,
        flush_interval_seconds: float = 5.0,
    ):
        self._app = app
        self._host = host
        self._port = port
        self._database = database
        self._table = table
        self._batch_size = batch_size
        self._flush_interval = flush_interval_seconds

        self._queue = asyncio.Queue()
        self._writer_task = None
        self._http_session = None

        # Register startup and shutdown event handlers
        self._app.add_event_handler("startup", self.startup)
        self._app.add_event_handler("shutdown", self.shutdown)

    async def startup(self):
        """Initializes resources and starts the background writer task."""
        logger.info("Starting ClickHouse batching client...")
        self._http_session = ClientSession()
        self._writer_task = asyncio.create_task(self._batch_writer())
        logger.info("ClickHouse batch writer task started.")

    async def shutdown(self):
        """Gracefully shuts down the client, flushing any remaining events."""
        logger.info("Shutting down ClickHouse batching client...")
        if self._writer_task:
            # Signal the writer to finish and wait for it
            await self._queue.put(None) 
            try:
                await asyncio.wait_for(self._writer_task, timeout=self._flush_interval + 5)
            except asyncio.TimeoutError:
                logger.warning("Writer task shutdown timed out. Some data may be lost.")
        
        if self._http_session and not self._http_session.closed:
            await self._http_session.close()
        
        logger.info("ClickHouse client shut down complete.")
        
    async def push_event(self, event: Dict[str, Any]):
        """Asynchronously adds a single event to the processing queue."""
        await self._queue.put(event)

    @backoff.on_exception(backoff.expo, ClientError, max_tries=5, jitter=backoff.full_jitter)
    async def _flush_batch(self, batch: List[Dict[str, Any]]):
        """
        Sends a batch of data to ClickHouse via its HTTP interface.
        Uses an exponential backoff strategy for transient network errors.
        """
        if not batch:
            return

        # ClickHouse HTTP interface expects data in a specific format, like JSONEachRow.
        # Each JSON object is a row, separated by a newline.
        payload = "\n".join([str(row).replace("'", '"') for row in batch])
        
        params = {
            "database": self._database,
            "query": f"INSERT INTO {self._table} FORMAT JSONEachRow",
        }
        
        url = f"http://{self._host}:{self._port}/"

        try:
            async with self._http_session.post(url, params=params, data=payload.encode('utf-8')) as response:
                response.raise_for_status()
                logger.info(f"Successfully flushed {len(batch)} events to ClickHouse.")
        except ClientError as e:
            logger.error(f"Failed to flush batch to ClickHouse: {e}. Batch size: {len(batch)}. Retrying...")
            raise # Re-raise to trigger backoff

    async def _batch_writer(self):
        """
        The core background task. It continuously consumes from the queue
        and flushes batches based on size or time.
        """
        batch = []
        while True:
            try:
                # Wait for an item, but with a timeout to enable periodic flushes
                event = await asyncio.wait_for(self._queue.get(), timeout=self._flush_interval)
                
                if event is None: # Shutdown signal
                    logger.info("Shutdown signal received. Flushing final batch.")
                    if batch:
                        await self._flush_batch(batch)
                    break # Exit the loop

                batch.append(event)
                if len(batch) >= self._batch_size:
                    await self._flush_batch(batch)
                    batch.clear()

            except asyncio.TimeoutError:
                # Flush interval reached
                if batch:
                    logger.info(f"Flush interval reached. Flushing batch of size {len(batch)}.")
                    await self._flush_batch(batch)
                    batch.clear()
            except Exception as e:
                logger.critical(f"Unhandled exception in batch writer: {e}. The writer will restart.", exc_info=True)
                # In a real system, you might want more robust recovery here.
                # For now, we log and continue, which might lose the current batch.
                batch.clear()
                await asyncio.sleep(5) # Avoid tight loop on persistent errors

The corresponding ClickHouse table must be designed for this workload. We use a MergeTree engine, sorting by device_id and timestamp as this is our primary query pattern.

-- ddl/events_table.sql
CREATE TABLE default.telemetry_events (
    `timestamp` DateTime64(3, 'UTC'),
    `device_id` String,
    `event_type` LowCardinality(String),
    `value_float` Float64,
    `value_string` String,
    `metadata` Map(String, String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (device_id, timestamp);

The FastAPI application ties this together. The configuration is loaded from environment variables for production readiness.

# src/main.py

from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel, Field
from typing import Dict, Any
import datetime
import os

from .analytics.batch_client import BatchingClickHouseClient

app = FastAPI(title="Telemetry Ingestion Service")

# --- Configuration ---
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "localhost")
CLICKHOUSE_PORT = int(os.getenv("CLICKHOUSE_PORT", 9000))
CLICKHOUSE_DB = os.getenv("CLICKHOUSE_DB", "default")
CLICKHOUSE_TABLE = "telemetry_events"
BATCH_SIZE = int(os.getenv("BATCH_SIZE", 5000))
FLUSH_INTERVAL = float(os.getenv("FLUSH_INTERVAL", 2.0))

# --- Instantiate the batching client ---
# This automatically registers startup/shutdown hooks
ch_client = BatchingClickHouseClient(
    app=app,
    host=CLICKHOUSE_HOST,
    port=CLICKHOUSE_PORT,
    database=CLICKHOUSE_DB,
    table=CLICKHOUSE_TABLE,
    batch_size=BATCH_SIZE,
    flush_interval_seconds=FLUSH_INTERVAL,
)

# --- Pydantic Models for Validation ---
class TelemetryEvent(BaseModel):
    device_id: str = Field(..., min_length=1)
    event_type: str = Field(..., min_length=1)
    value_float: float | None = None
    value_string: str | None = None
    metadata: Dict[str, str] = {}
    
    # We'll add the timestamp on the server side to ensure consistency
    timestamp: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now(datetime.timezone.utc), exclude=True)

class IngestionPayload(BaseModel):
    events: list[TelemetryEvent]

# --- API Endpoint ---
@app.post("/ingest")
async def ingest_events(payload: IngestionPayload):
    """
    Receives a batch of telemetry events and queues them for processing.
    Responds immediately with 202 Accepted.
    """
    for event in payload.events:
        event_dict = event.model_dump()
        # Add the server-generated timestamp
        event_dict["timestamp"] = event.timestamp.isoformat()
        await ch_client.push_event(event_dict)

    return {"status": "queued", "event_count": len(payload.events)}

This architecture transformed the ingestion performance. The API endpoint response time dropped to under 5ms, as its only job was validation and an in-memory queue operation. The database load stabilized, as it was now receiving large, orderly batches of data perfectly suited for its MergeTree engine.

Real-Time Updates with Server-Sent Events

With ingestion solved, we turned to the visualization side. Polling was out. WebSockets were an option, but for a one-way stream of data from server to client, they felt like overkill. A key pragmatic decision was to use Server-Sent Events (SSE). SSE runs over standard HTTP, is simpler to implement on both client and server, and has built-in support for automatic reconnection, which is a critical feature for a long-lived dashboard PWA.

The naive approach would be to have the SSE endpoint query ClickHouse directly. This couples the real-time layer directly to the analytics database, creating a new performance bottleneck. A better approach is to introduce another decoupled component: an in-memory aggregator.

graph TD
    subgraph FastAPI Application
        D[ClickHouse]
        F[Aggregator Task] -- periodically SELECT from --> D;
        F -- updates state --> G[In-memory State Cache];
        H[SSE Broadcaster] -- reads from --> G;
        H -- pushes updates to --> I[Connected Clients];
        J[SSE Endpoint /dashboard-stream] -- registers client with --> H;
    end

The Aggregator Task is another asyncio background task. It wakes up every few seconds, runs a fast, pre-defined aggregation query against ClickHouse, and stores the result in a simple Python dictionary. The SSE Broadcaster then pushes this cached result to all connected clients. This decouples client demand from database load. One hundred connected clients result in only one database query every N seconds, not one hundred.

Here’s the implementation for the SSE streaming components.

# src/streaming/sse_manager.py
import asyncio
import logging
import json
from typing import Set

from sse_starlette.sse import EventSourceResponse

logger = logging.getLogger(__name__)

class SSEManager:
    """
    Manages active SSE connections and broadcasts data to all of them.
    """
    def __init__(self):
        self.active_connections: Set[asyncio.Queue] = set()
        logger.info("SSE Manager initialized.")

    async def add_connection(self, queue: asyncio.Queue):
        """Registers a new client connection."""
        self.active_connections.add(queue)
        logger.info(f"New SSE client connected. Total clients: {len(self.active_connections)}")

    def remove_connection(self, queue: asyncio.Queue):
        """Removes a client connection."""
        self.active_connections.remove(queue)
        logger.info(f"SSE client disconnected. Total clients: {len(self.active_connections)}")

    async def broadcast(self, data: dict):
        """Sends data to all connected clients."""
        if not self.active_connections:
            return
        
        # The data must be a JSON string for SSE
        message = json.dumps(data)
        
        # Use asyncio.gather to send to all clients concurrently
        tasks = [conn.put(message) for conn in self.active_connections]
        await asyncio.gather(*tasks, return_exceptions=False)
        logger.debug(f"Broadcasted update to {len(self.active_connections)} clients.")

    async def stream_generator(self):
        """
        An async generator that yields events for a single client connection.
        This is what EventSourceResponse consumes.
        """
        queue = asyncio.Queue()
        await self.add_connection(queue)
        try:
            while True:
                # Wait for a message from the broadcaster
                message = await queue.get()
                yield {
                    "event": "update",
                    "data": message
                }
        except asyncio.CancelledError:
            # This happens when the client disconnects
            self.remove_connection(queue)
            logger.info("SSE stream cancelled for a client.")
            raise

# Create a singleton instance
sse_manager = SSEManager()

And the aggregation task, which we’ll add to our main.py:

# src/main.py (additions)
import aiohttp

# ... (previous code) ...
from .streaming.sse_manager import sse_manager

# --- Aggregation and Streaming Logic ---
AGGREGATION_INTERVAL = float(os.getenv("AGGREGATION_INTERVAL", 5.0))
_aggregator_task = None
_http_session_for_aggregator = None

async def run_aggregation():
    """Background task to periodically query ClickHouse for aggregated data."""
    global _http_session_for_aggregator
    _http_session_for_aggregator = aiohttp.ClientSession()
    
    while True:
        try:
            await asyncio.sleep(AGGREGATION_INTERVAL)
            logger.info("Running aggregation query...")

            # Example query: count events by type in the last 10 minutes
            query = """
            SELECT
                event_type,
                count() AS event_count,
                avg(value_float) AS avg_value
            FROM default.telemetry_events
            WHERE timestamp >= now() - INTERVAL 10 MINUTE
            GROUP BY event_type
            ORDER BY event_count DESC
            LIMIT 10
            """
            
            params = {"query": query, "default_format": "JSON"}
            url = f"http://{CLICKHOUSE_HOST}:{CLICKHOUSE_PORT}/"

            async with _http_session_for_aggregator.get(url, params=params) as response:
                response.raise_for_status()
                data = await response.json()
                
                # The data structure from ClickHouse is specific
                payload = {
                    "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
                    "aggregations": data.get("data", [])
                }
                
                # Broadcast the fresh data to all connected SSE clients
                await sse_manager.broadcast(payload)

        except Exception as e:
            logger.error(f"Error in aggregation task: {e}", exc_info=True)


@app.on_event("startup")
async def startup_aggregator():
    global _aggregator_task
    _aggregator_task = asyncio.create_task(run_aggregation())
    logger.info("Aggregation background task started.")

@app.on_event("shutdown")
async def shutdown_aggregator():
    if _aggregator_task:
        _aggregator_task.cancel()
        logger.info("Aggregation task cancelled.")
    if _http_session_for_aggregator:
        await _http_session_for_aggregator.close()

@app.get("/dashboard-stream")
async def dashboard_stream(request: Request):
    """Endpoint for clients to connect for real-time dashboard updates."""
    # EventSourceResponse handles the SSE protocol details
    return EventSourceResponse(sse_manager.stream_generator())

The Flutter PWA Client: Taming the Data Stream

The final piece was the client. Flutter for web was chosen to maintain a single codebase for a potential native mobile app. While powerful, building a high-performance, data-heavy PWA in Flutter presents its own challenges. The key is efficient state management and rendering. A naive implementation that calls setState() on the root widget with every SSE message would cause the entire UI to rebuild multiple times per second, leading to a sluggish experience.

We used the riverpod package for state management, which allows for granular control over which widgets rebuild. The connection logic was encapsulated in a service that exposes a Stream of parsed data.

Here is the Dart service for handling the SSE connection. For a real PWA, you would use the html.EventSource class from dart:html.

// lib/services/sse_service.dart
import 'dart:async';
import 'dart:convert';
import 'package:http/http.dart' as http;
import 'package:flutter_riverpod/flutter_riverpod.dart';

// A simple model for our aggregated data
class DashboardUpdate {
  final DateTime timestamp;
  final List<Map<String, dynamic>> aggregations;

  DashboardUpdate({required this.timestamp, required this.aggregations});

  factory DashboardUpdate.fromJson(Map<String, dynamic> json) {
    return DashboardUpdate(
      timestamp: DateTime.parse(json['timestamp']),
      aggregations: List<Map<String, dynamic>>.from(json['aggregations']),
    );
  }
}

class SseService {
  final _controller = StreamController<DashboardUpdate>();
  http.Client? _client;

  Stream<DashboardUpdate> get updates => _controller.stream;

  void connect() {
    // In a real browser environment, you'd use dart:html's EventSource.
    // This is a dart:io http client implementation for demonstration.
    _client = http.Client();
    final request = http.Request("GET", Uri.parse("http://localhost:8000/dashboard-stream"));
    request.headers["Cache-Control"] = "no-cache";
    request.headers["Accept"] = "text/event-stream";

    Future<http.StreamedResponse> responseFuture = _client!.send(request);

    responseFuture.then((response) {
      print("SSE Connection established.");
      response.stream.transform(utf8.decoder).transform(const LineSplitter()).listen(
        (line) {
          if (line.startsWith('data:')) {
            final dataString = line.substring(5).trim();
            if (dataString.isNotEmpty) {
              try {
                final json = jsonDecode(dataString);
                final update = DashboardUpdate.fromJson(json);
                _controller.add(update);
              } catch (e) {
                print("Error parsing SSE data: $e");
              }
            }
          }
        },
        onDone: () {
          print("SSE Connection closed by server.");
          _reconnect();
        },
        onError: (error) {
          print("SSE Stream error: $error");
          _controller.addError(error);
          _reconnect();
        },
        cancelOnError: true,
      );
    }).catchError((error) {
        print("Error establishing SSE connection: $error");
        _reconnect();
    });
  }

  void _reconnect() {
    dispose();
    print("Attempting to reconnect in 5 seconds...");
    Timer(const Duration(seconds: 5), connect);
  }

  void dispose() {
    _client?.close();
    _client = null;
    print("SSE Service disposed.");
  }
}

// Riverpod provider for the service and stream
final sseServiceProvider = Provider<SseService>((ref) {
  final service = SseService();
  service.connect();
  ref.onDispose(() => service.dispose());
  return service;
});

final dashboardUpdateProvider = StreamProvider<DashboardUpdate>((ref) {
  return ref.watch(sseServiceProvider).updates;
});

The UI can then use a Consumer widget and ref.watch on the dashboardUpdateProvider. This ensures that only the widgets that depend on this stream are rebuilt when new data arrives.

// lib/widgets/dashboard_view.dart
import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';

import '../services/sse_service.dart';

class DashboardView extends ConsumerWidget {
  const DashboardView({super.key});

  
  Widget build(BuildContext context, WidgetRef ref) {
    final asyncUpdate = ref.watch(dashboardUpdateProvider);

    return Scaffold(
      appBar: AppBar(title: const Text("Real-Time Telemetry Dashboard")),
      body: Center(
        child: asyncUpdate.when(
          data: (update) => _buildDataTable(context, update),
          loading: () => const CircularProgressIndicator(),
          error: (err, stack) => Text("Error: $err"),
        ),
      ),
    );
  }
  
  Widget _buildDataTable(BuildContext context, DashboardUpdate update) {
    final aggregations = update.aggregations;
    if (aggregations.isEmpty) {
        return const Text("Waiting for data...");
    }

    return Column(
      children: [
        Text("Last updated: ${update.timestamp.toLocal()}"),
        DataTable(
          columns: const [
            DataColumn(label: Text('Event Type')),
            DataColumn(label: Text('Count (last 10m)')),
            DataColumn(label: Text('Average Value')),
          ],
          rows: aggregations.map((agg) => DataRow(
            cells: [
              DataCell(Text(agg['event_type'].toString())),
              DataCell(Text(agg['event_count'].toString())),
              DataCell(Text(
                  (agg['avg_value'] as num?)?.toStringAsFixed(2) ?? 'N/A')),
            ],
          )).toList(),
        ),
      ],
    );
  }
}

This final architecture proved robust and scalable. It could comfortably handle hundreds of thousands of events per minute, with end-to-end latency (from event generation to UI update) consistently under 10 seconds. The database remained stable, the API server’s resource usage was minimal, and the PWA was responsive.

The primary limitation of this design is its statefulness. Both the batching queue and the SSE connection manager reside within single instances of the FastAPI application. This makes horizontal scaling impossible without introducing an external dependency. A production-grade evolution would replace the in-memory asyncio.Queue with Redis Streams or Kafka for durable, scalable ingestion. Similarly, the SSE broadcaster would use Redis Pub/Sub to allow multiple API instances to broadcast updates to their respective connected clients, enabling true stateless horizontal scaling of the web tier. Furthermore, while Flutter for Web proved capable, its initial load time and bundle size remain a consideration for public-facing PWAs compared to JavaScript-native frameworks, requiring careful performance tuning and code splitting strategies.


  TOC