Building a High-Throughput containerd Event Collector for ClickHouse with Idempotent Ingestion


The standard observability stack for our container fleet was failing us. Scraping Prometheus metrics and shipping logs via Fluentd provided a lagging, high-level view. When a critical service began experiencing cascading failures due to OOM kills, we found our existing tools were too slow and lacked the granularity to pinpoint the root cause. The “container killed” events were buried in a sea of application logs, often delayed by minutes. We needed direct, near real-time access to the container runtime’s event stream, across thousands of nodes, with the ability to perform complex analytical queries on that data immediately.

The initial concept was to bypass traditional logging agents and tap directly into the source: the containerd events API. This gRPC-based stream provides structured, low-level information about container lifecycle events (Create, Start, OOM, Exit, etc.). For the destination, ClickHouse was the obvious choice. Its columnar architecture and phenomenal ingestion speed are purpose-built for the firehose of event data we anticipated. The missing piece was the transport—a lightweight, resilient agent to bridge containerd‘s gRPC stream with ClickHouse‘s HTTP interface. We chose Node.js with TypeScript for its robust asynchronous I/O capabilities, which are ideal for this kind of I/O-bound task.

This entire endeavor hinged on the reliability of this new collector agent. Data loss during collection or transport was unacceptable. Therefore, a significant portion of the effort was dedicated to building a comprehensive test suite using Jest to simulate failure modes and validate the agent’s behavior under duress.

ClickHouse Schema: The Foundation for Analysis

Before writing a single line of collector code, we defined the destination schema in ClickHouse. A poorly designed schema would cripple query performance, regardless of ingestion speed. In a real-world project, getting this right upfront saves immense refactoring pain later.

We settled on a MergeTree engine, partitioned by day and ordered by the event timestamp and a unique identifier for the node.

CREATE TABLE containerd_events (
    `event_timestamp` DateTime64(9, 'UTC'),
    `ingest_timestamp` DateTime64(3, 'UTC') DEFAULT now(),
    `node_id` String,
    `event_uuid` UUID,
    `namespace` String,
    `topic` String,
    `event_type` String,
    `container_id` String,
    `image_name` String,
    `event_payload` String
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(event_timestamp)
ORDER BY (event_timestamp, node_id)
TTL toDateTime(event_timestamp) + INTERVAL 30 DAY;

A few key decisions here:

  • event_timestamp: A DateTime64(9) to accommodate nanosecond precision from containerd. This is our primary sorting key.
  • event_uuid: A client-generated UUID for each event. This is critical for idempotency and downstream deduplication if the agent accidentally sends the same batch twice.
  • node_id: A configurable identifier for the host running the collector. Essential for filtering and aggregation.
  • event_payload: A JSON string containing the raw event protobuf. This “schema-on-read” approach provides flexibility; we don’t need to alter the table if containerd adds new event fields. We can parse it at query time using ClickHouse‘s JSON functions.
  • TTL: Raw event data is valuable for a limited time. Automatically dropping partitions older than 30 days is a simple, effective data lifecycle management strategy.

The Collector Service Implementation

The core of the system is a TypeScript class that manages the gRPC connection, event processing, batching, and ClickHouse insertion. The implementation prioritizes resilience over raw throughput, employing batching and a retry mechanism with exponential backoff.

Here is the main structure of our collector service. Note the dependencies: @grpc/grpc-js for containerd communication, @clickhouse/client for database interaction, and uuid for generating unique event identifiers.

src/containerd-collector.ts

import { credentials, ServiceError } from '@grpc/grpc-js';
import { ClickHouseClient, createClient } from '@clickhouse/client';
import { EventsClient } from './generated/containerd/events/v1/events_grpc_pb';
import { SubscribeRequest, Envelope } from './generated/containerd/events/v1/events_pb';
import { v4 as uuidv4 } from 'uuid';
import { Any } from 'google-protobuf/google/protobuf/any_pb';
import { TaskOOM } from './generated/containerd/events/v1/events_pb'; // Example event type

// A common mistake is to not define types for your processed events.
// This leads to difficult-to-maintain code.
interface FormattedEvent {
    event_timestamp: string;
    node_id: string;
    event_uuid: string;
    namespace: string;
    topic: string;
    event_type: string;
    container_id: string;
    image_name: string; // Extracted for easier querying
    event_payload: string;
}

export class ContainerdCollector {
    private readonly grpcClient: EventsClient;
    private readonly clickhouseClient: ClickHouseClient;
    private readonly nodeId: string;

    private buffer: FormattedEvent[] = [];
    private timer: NodeJS.Timeout | null = null;
    private isShuttingDown = false;

    constructor(
        containerdSocketPath: string,
        clickhouseConfig: any,
        nodeId: string
    ) {
        // In production, you'd want more robust credentials, but for a local socket, this is sufficient.
        this.grpcClient = new EventsClient(containerdSocketPath, credentials.createInsecure());
        this.clickhouseClient = createClient(clickhouseConfig);
        this.nodeId = nodeId;
    }

    public async start(): Promise<void> {
        console.log(`[Collector] Starting on node ${this.nodeId}...`);
        this.scheduleFlush();
        await this.subscribeToEvents();
    }

    public async stop(): Promise<void> {
        this.isShuttingDown = true;
        console.log('[Collector] Shutdown initiated. Flushing remaining buffer...');
        if (this.timer) {
            clearTimeout(this.timer);
        }
        await this.flushBuffer();
        this.grpcClient.close();
        await this.clickhouseClient.close();
        console.log('[Collector] Shutdown complete.');
    }

    private async subscribeToEvents() {
        if (this.isShuttingDown) return;

        const request = new SubscribeRequest();
        const stream = this.grpcClient.subscribe(request);

        stream.on('data', (envelope: Envelope) => {
            try {
                const formattedEvent = this.formatEvent(envelope);
                if (formattedEvent) {
                    this.buffer.push(formattedEvent);
                }
            } catch (err) {
                console.error('[Collector] Error formatting event:', err);
            }
        });

        stream.on('error', (err: ServiceError) => {
            console.error(`[Collector] gRPC stream error: ${err.message}. Reconnecting in 5s...`);
            // The pitfall here is not handling stream errors. A simple crash is bad;
            // a resilient service must attempt to reconnect.
            if (!this.isShuttingDown) {
                setTimeout(() => this.subscribeToEvents(), 5000);
            }
        });

        stream.on('end', () => {
            console.log('[Collector] gRPC stream ended. Reconnecting in 5s...');
            if (!this.isShuttingDown) {
                setTimeout(() => this.subscribeToEvents(), 5000);
            }
        });
    }

    private formatEvent(envelope: Envelope): FormattedEvent | null {
        const timestamp = envelope.getTimestamp()?.toDate();
        if (!timestamp) return null;

        const eventAny = envelope.getEvent();
        if (!eventAny) return null;
        
        // Example of unpacking a specific event type to extract key fields.
        // In a real implementation, this would be a large switch statement or a map.
        const typeUrl = eventAny.getTypeUrl();
        let containerId = 'N/A';
        let imageName = 'N/A';

        // This demonstrates how to unpack a well-known type.
        // You'll need protobuf definitions for all event types you care about.
        if (typeUrl.includes('TaskOOM')) {
            const oomEvent = eventAny.unpack(TaskOOM.deserializeBinary, 'containerd.events.TaskOOM');
            if (oomEvent) {
                containerId = oomEvent.getContainerId();
            }
        }
        // ... add more unpackers for TaskStart, TaskExit, etc.
        // Extracting common fields like `image_name` at ingestion time drastically improves query performance.

        return {
            event_timestamp: timestamp.toISOString(),
            node_id: this.nodeId,
            event_uuid: uuidv4(),
            namespace: envelope.getNamespace(),
            topic: envelope.getTopic(),
            event_type: typeUrl.substring(typeUrl.lastIndexOf('/') + 1),
            container_id: containerId,
            image_name: imageName,
            event_payload: JSON.stringify(eventAny.toObject()),
        };
    }

    private scheduleFlush() {
        if (this.isShuttingDown) return;
        // The timer ensures data is sent even if the event rate is low.
        this.timer = setTimeout(async () => {
            if (this.buffer.length > 0) {
                await this.flushBuffer();
            }
            this.scheduleFlush();
        }, 5000); // Flush every 5 seconds
    }
    
    // This is the most critical part for data integrity.
    private async flushBuffer() {
        if (this.buffer.length === 0) {
            return;
        }

        const batch = [...this.buffer];
        this.buffer = []; // Clear buffer immediately to start collecting the next batch

        console.log(`[Collector] Flushing batch of ${batch.length} events.`);

        let attempt = 0;
        const maxRetries = 5;
        const baseDelay = 1000;

        while (attempt < maxRetries) {
            try {
                await this.clickhouseClient.insert({
                    table: 'containerd_events',
                    values: batch,
                    format: 'JSONEachRow',
                });
                console.log(`[Collector] Successfully flushed ${batch.length} events.`);
                return; // Success
            } catch (err) {
                attempt++;
                console.error(`[Collector] ClickHouse insert failed (attempt ${attempt}/${maxRetries}):`, err);
                if (attempt >= maxRetries) {
                    // A common mistake is to drop data on persistent failure.
                    // For critical data, you should write it to a dead-letter queue or local disk.
                    console.error('[Collector] Max retries reached. Discarding batch.', {
                        count: batch.length,
                        firstEventId: batch[0]?.event_uuid
                    });
                    break;
                }
                // Exponential backoff with jitter
                const delay = baseDelay * Math.pow(2, attempt - 1) + Math.random() * 1000;
                await new Promise(res => setTimeout(res, delay));
            }
        }
    }
}

This implementation handles the core logic: connecting, receiving, formatting, batching, and resiliently inserting. The error handling for the gRPC stream and the ClickHouse client is non-negotiable for a production-grade service.

Rigorous Validation with Jest

This collector is a critical piece of infrastructure. If it fails silently, we lose visibility. If it corrupts data, our analysis is worthless. Unit and integration testing are not optional. We used Jest to simulate the entire environment, allowing us to test edge cases that are difficult to reproduce in a live system.

Our testing strategy involved two primary mocks:

  1. Mock gRPC Server: A mock containerd events server that we can control to emit specific event sequences, including malformed data or sudden stream terminations.
  2. Mock ClickHouse Client: A mock of the @clickhouse/client that allows us to verify the data being sent and simulate database failures to test our retry logic.

tests/containerd-collector.test.ts

import { ContainerdCollector } from '../src/containerd-collector';
import { Envelope } from '../src/generated/containerd/events/v1/events_pb';
import { TaskOOM } from '../src/generated/containerd/events/v1/events_pb';
import { Any } from 'google-protobuf/google/protobuf/any_pb';
import { Timestamp } from 'google-protobuf/google/protobuf/timestamp_pb';
import { PassThrough } from 'stream';

// In-depth mocking is essential for testing infrastructure components.
// We mock the entire client module.
const mockClickHouseClient = {
    insert: jest.fn(),
    close: jest.fn().mockResolvedValue(undefined),
};
const mockCreateClient = jest.fn(() => mockClickHouseClient);
jest.mock('@clickhouse/client', () => ({
    createClient: () => mockCreateClient,
}));

// Mocking the gRPC client is more complex. We mock the constructor and its methods.
const mockGrpcStream = new PassThrough({ objectMode: true });
const mockGrpcClient = {
    subscribe: jest.fn(() => mockGrpcStream),
    close: jest.fn(),
};
jest.mock('../src/generated/containerd/events/v1/events_grpc_pb', () => ({
    EventsClient: jest.fn(() => mockGrpcClient),
}));

describe('ContainerdCollector', () => {
    let collector: ContainerdCollector;

    beforeEach(() => {
        // Reset mocks before each test to ensure isolation.
        jest.clearAllMocks();
        jest.useFakeTimers();

        collector = new ContainerdCollector('unix:///fake.sock', {}, 'test-node-01');
    });

    afterEach(async () => {
        await collector.stop();
        jest.useRealTimers();
    });

    function createFakeOOMEvent(containerId: string): Envelope {
        const oomEvent = new TaskOOM();
        oomEvent.setContainerId(containerId);

        const any = new Any();
        any.setTypeUrl('/containerd.events.TaskOOM');
        any.setValue(oomEvent.serializeBinary());

        const envelope = new Envelope();
        const ts = new Timestamp();
        ts.fromDate(new Date());
        envelope.setTimestamp(ts);
        envelope.setNamespace('default');
        envelope.setTopic('/tasks/oom');
        envelope.setEvent(any);

        return envelope;
    }

    it('should collect, batch, and flush events on a timer', async () => {
        await collector.start();

        // Simulate receiving two events from the gRPC stream
        mockGrpcStream.write(createFakeOOMEvent('container-1'));
        mockGrpcStream.write(createFakeOOMEvent('container-2'));

        // The buffer should contain 2 items before the flush
        // Note: Direct inspection of private members is for testing only.
        // @ts-ignore
        expect(collector.buffer.length).toBe(2);

        // Advance timers past the 5-second flush interval
        await jest.advanceTimersByTimeAsync(5001);

        // Verify that clickhouseClient.insert was called exactly once
        expect(mockClickHouseClient.insert).toHaveBeenCalledTimes(1);
        const insertCall = mockClickHouseClient.insert.mock.calls[0][0];

        // Deeply inspect the payload sent to ClickHouse
        expect(insertCall.table).toBe('containerd_events');
        expect(insertCall.values.length).toBe(2);
        expect(insertCall.values[0].container_id).toBe('container-1');
        expect(insertCall.values[1].container_id).toBe('container-2');
        expect(insertCall.values[0].node_id).toBe('test-node-01');

        // After flushing, the buffer should be empty
        // @ts-ignore
        expect(collector.buffer.length).toBe(0);
    });

    it('should retry flushing on ClickHouse failure and succeed eventually', async () => {
        await collector.start();

        // Configure the mock client to fail twice then succeed.
        // This is a powerful pattern for testing resilience.
        mockClickHouseClient.insert
            .mockRejectedValueOnce(new Error('Connection timeout'))
            .mockRejectedValueOnce(new Error('Service unavailable'))
            .mockResolvedValueOnce({} as any);

        mockGrpcStream.write(createFakeOOMEvent('container-retry'));
        
        // Fast-forward past the flush interval
        await jest.advanceTimersByTimeAsync(5001);

        // It should have failed the first time
        expect(mockClickHouseClient.insert).toHaveBeenCalledTimes(1);

        // Fast-forward past the first retry delay (1s + jitter)
        await jest.advanceTimersByTimeAsync(2000);
        expect(mockClickHouseClient.insert).toHaveBeenCalledTimes(2);

        // Fast-forward past the second retry delay (2s + jitter)
        await jest.advanceTimersByTimeAsync(3000);
        expect(mockClickHouseClient.insert).toHaveBeenCalledTimes(3);

        // The third call should have succeeded, and the buffer should be empty.
        // @ts-ignore
        expect(collector.buffer.length).toBe(0);
    });

    it('should discard a batch after max retries are exceeded', async () => {
        await collector.start();

        // Mock persistent failure
        mockClickHouseClient.insert.mockRejectedValue(new Error('Permanent failure'));

        mockGrpcStream.write(createFakeOOMEvent('container-discard'));
        
        // Trigger the initial flush
        await jest.advanceTimersByTimeAsync(5001);

        // Trigger all retries
        await jest.advanceTimersByTimeAsync(30000); // More than enough time for all backoffs

        // It should have tried 1 initial + 4 retries = 5 times.
        expect(mockClickHouseClient.insert).toHaveBeenCalledTimes(5);
        
        // Buffer should be empty as the batch was discarded
        // @ts-ignore
        expect(collector.buffer.length).toBe(0);
    });
    
    it('should flush remaining events on graceful shutdown', async () => {
        await collector.start();
        mockGrpcStream.write(createFakeOOMEvent('container-shutdown'));

        // @ts-ignore
        expect(collector.buffer.length).toBe(1);

        // Stop the collector without waiting for the timer
        await collector.stop();

        // The flush should have been triggered by the stop() method
        expect(mockClickHouseClient.insert).toHaveBeenCalledTimes(1);
        expect(mockClickHouseClient.insert.mock.calls[0][0].values[0].container_id).toBe('container-shutdown');

        // Ensure resources are released
        expect(mockGrpcClient.close).toHaveBeenCalledTimes(1);
        expect(mockClickHouseClient.close).toHaveBeenCalledTimes(1);
    });
});

This test suite validates our core requirements: correct batching, timer-based flushing, robust retry logic for transient failures, data discarding for permanent failures (an explicit design choice), and graceful shutdown. This level of testing provides high confidence before deploying the agent to the fleet.

Deployment and Lingering Issues

The collector, packaged as a minimal Docker container, is deployed as a Kubernetes DaemonSet, ensuring it runs on every node in the cluster. It mounts the host’s containerd.sock to gain access to the runtime. The node_id is passed via the Downward API, typically using the node’s name.

The result was transformative. We could now run queries like this in ClickHouse and get answers in milliseconds:

SELECT
    image_name,
    count() AS oom_kills
FROM containerd_events
WHERE
    event_type = 'TaskOOM' AND
    event_timestamp >= now() - INTERVAL 1 HOUR
GROUP BY image_name
ORDER BY oom_kills DESC
LIMIT 10;

This gave us the immediate insight we lacked, directly tying OOM events to specific container images across the entire fleet.

However, this solution is not without its trade-offs. The current implementation uses an in-memory buffer. A crash of the collector process between a successful flush and the next will result in the loss of events collected in that window. For our use case—observability data—this small potential for loss was an acceptable trade-off for simplicity and performance. A system requiring absolute zero data loss would need to augment this with a persistent buffer, such as writing batches to a local file before attempting to send them, which would add significant complexity. Furthermore, the single-threaded nature of Node.js could become a CPU bottleneck on nodes with extremely high container churn. While performance has been more than adequate so far, a future iteration for a 10x larger fleet might necessitate a rewrite in a concurrent language like Go or Rust to fully leverage multi-core processors for event deserialization and processing.


  TOC