The standard observability stack for our container fleet was failing us. Scraping Prometheus metrics and shipping logs via Fluentd provided a lagging, high-level view. When a critical service began experiencing cascading failures due to OOM kills, we found our existing tools were too slow and lacked the granularity to pinpoint the root cause. The “container killed” events were buried in a sea of application logs, often delayed by minutes. We needed direct, near real-time access to the container runtime’s event stream, across thousands of nodes, with the ability to perform complex analytical queries on that data immediately.
The initial concept was to bypass traditional logging agents and tap directly into the source: the containerd
events API. This gRPC-based stream provides structured, low-level information about container lifecycle events (Create
, Start
, OOM
, Exit
, etc.). For the destination, ClickHouse
was the obvious choice. Its columnar architecture and phenomenal ingestion speed are purpose-built for the firehose of event data we anticipated. The missing piece was the transport—a lightweight, resilient agent to bridge containerd
‘s gRPC stream with ClickHouse
‘s HTTP interface. We chose Node.js with TypeScript for its robust asynchronous I/O capabilities, which are ideal for this kind of I/O-bound task.
This entire endeavor hinged on the reliability of this new collector agent. Data loss during collection or transport was unacceptable. Therefore, a significant portion of the effort was dedicated to building a comprehensive test suite using Jest
to simulate failure modes and validate the agent’s behavior under duress.
ClickHouse Schema: The Foundation for Analysis
Before writing a single line of collector code, we defined the destination schema in ClickHouse
. A poorly designed schema would cripple query performance, regardless of ingestion speed. In a real-world project, getting this right upfront saves immense refactoring pain later.
We settled on a MergeTree
engine, partitioned by day and ordered by the event timestamp and a unique identifier for the node.
CREATE TABLE containerd_events (
`event_timestamp` DateTime64(9, 'UTC'),
`ingest_timestamp` DateTime64(3, 'UTC') DEFAULT now(),
`node_id` String,
`event_uuid` UUID,
`namespace` String,
`topic` String,
`event_type` String,
`container_id` String,
`image_name` String,
`event_payload` String
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(event_timestamp)
ORDER BY (event_timestamp, node_id)
TTL toDateTime(event_timestamp) + INTERVAL 30 DAY;
A few key decisions here:
-
event_timestamp
: ADateTime64(9)
to accommodate nanosecond precision fromcontainerd
. This is our primary sorting key. -
event_uuid
: A client-generated UUID for each event. This is critical for idempotency and downstream deduplication if the agent accidentally sends the same batch twice. -
node_id
: A configurable identifier for the host running the collector. Essential for filtering and aggregation. -
event_payload
: A JSON string containing the raw event protobuf. This “schema-on-read” approach provides flexibility; we don’t need to alter the table ifcontainerd
adds new event fields. We can parse it at query time usingClickHouse
‘s JSON functions. -
TTL
: Raw event data is valuable for a limited time. Automatically dropping partitions older than 30 days is a simple, effective data lifecycle management strategy.
The Collector Service Implementation
The core of the system is a TypeScript class that manages the gRPC connection, event processing, batching, and ClickHouse
insertion. The implementation prioritizes resilience over raw throughput, employing batching and a retry mechanism with exponential backoff.
Here is the main structure of our collector service. Note the dependencies: @grpc/grpc-js
for containerd
communication, @clickhouse/client
for database interaction, and uuid
for generating unique event identifiers.
src/containerd-collector.ts
import { credentials, ServiceError } from '@grpc/grpc-js';
import { ClickHouseClient, createClient } from '@clickhouse/client';
import { EventsClient } from './generated/containerd/events/v1/events_grpc_pb';
import { SubscribeRequest, Envelope } from './generated/containerd/events/v1/events_pb';
import { v4 as uuidv4 } from 'uuid';
import { Any } from 'google-protobuf/google/protobuf/any_pb';
import { TaskOOM } from './generated/containerd/events/v1/events_pb'; // Example event type
// A common mistake is to not define types for your processed events.
// This leads to difficult-to-maintain code.
interface FormattedEvent {
event_timestamp: string;
node_id: string;
event_uuid: string;
namespace: string;
topic: string;
event_type: string;
container_id: string;
image_name: string; // Extracted for easier querying
event_payload: string;
}
export class ContainerdCollector {
private readonly grpcClient: EventsClient;
private readonly clickhouseClient: ClickHouseClient;
private readonly nodeId: string;
private buffer: FormattedEvent[] = [];
private timer: NodeJS.Timeout | null = null;
private isShuttingDown = false;
constructor(
containerdSocketPath: string,
clickhouseConfig: any,
nodeId: string
) {
// In production, you'd want more robust credentials, but for a local socket, this is sufficient.
this.grpcClient = new EventsClient(containerdSocketPath, credentials.createInsecure());
this.clickhouseClient = createClient(clickhouseConfig);
this.nodeId = nodeId;
}
public async start(): Promise<void> {
console.log(`[Collector] Starting on node ${this.nodeId}...`);
this.scheduleFlush();
await this.subscribeToEvents();
}
public async stop(): Promise<void> {
this.isShuttingDown = true;
console.log('[Collector] Shutdown initiated. Flushing remaining buffer...');
if (this.timer) {
clearTimeout(this.timer);
}
await this.flushBuffer();
this.grpcClient.close();
await this.clickhouseClient.close();
console.log('[Collector] Shutdown complete.');
}
private async subscribeToEvents() {
if (this.isShuttingDown) return;
const request = new SubscribeRequest();
const stream = this.grpcClient.subscribe(request);
stream.on('data', (envelope: Envelope) => {
try {
const formattedEvent = this.formatEvent(envelope);
if (formattedEvent) {
this.buffer.push(formattedEvent);
}
} catch (err) {
console.error('[Collector] Error formatting event:', err);
}
});
stream.on('error', (err: ServiceError) => {
console.error(`[Collector] gRPC stream error: ${err.message}. Reconnecting in 5s...`);
// The pitfall here is not handling stream errors. A simple crash is bad;
// a resilient service must attempt to reconnect.
if (!this.isShuttingDown) {
setTimeout(() => this.subscribeToEvents(), 5000);
}
});
stream.on('end', () => {
console.log('[Collector] gRPC stream ended. Reconnecting in 5s...');
if (!this.isShuttingDown) {
setTimeout(() => this.subscribeToEvents(), 5000);
}
});
}
private formatEvent(envelope: Envelope): FormattedEvent | null {
const timestamp = envelope.getTimestamp()?.toDate();
if (!timestamp) return null;
const eventAny = envelope.getEvent();
if (!eventAny) return null;
// Example of unpacking a specific event type to extract key fields.
// In a real implementation, this would be a large switch statement or a map.
const typeUrl = eventAny.getTypeUrl();
let containerId = 'N/A';
let imageName = 'N/A';
// This demonstrates how to unpack a well-known type.
// You'll need protobuf definitions for all event types you care about.
if (typeUrl.includes('TaskOOM')) {
const oomEvent = eventAny.unpack(TaskOOM.deserializeBinary, 'containerd.events.TaskOOM');
if (oomEvent) {
containerId = oomEvent.getContainerId();
}
}
// ... add more unpackers for TaskStart, TaskExit, etc.
// Extracting common fields like `image_name` at ingestion time drastically improves query performance.
return {
event_timestamp: timestamp.toISOString(),
node_id: this.nodeId,
event_uuid: uuidv4(),
namespace: envelope.getNamespace(),
topic: envelope.getTopic(),
event_type: typeUrl.substring(typeUrl.lastIndexOf('/') + 1),
container_id: containerId,
image_name: imageName,
event_payload: JSON.stringify(eventAny.toObject()),
};
}
private scheduleFlush() {
if (this.isShuttingDown) return;
// The timer ensures data is sent even if the event rate is low.
this.timer = setTimeout(async () => {
if (this.buffer.length > 0) {
await this.flushBuffer();
}
this.scheduleFlush();
}, 5000); // Flush every 5 seconds
}
// This is the most critical part for data integrity.
private async flushBuffer() {
if (this.buffer.length === 0) {
return;
}
const batch = [...this.buffer];
this.buffer = []; // Clear buffer immediately to start collecting the next batch
console.log(`[Collector] Flushing batch of ${batch.length} events.`);
let attempt = 0;
const maxRetries = 5;
const baseDelay = 1000;
while (attempt < maxRetries) {
try {
await this.clickhouseClient.insert({
table: 'containerd_events',
values: batch,
format: 'JSONEachRow',
});
console.log(`[Collector] Successfully flushed ${batch.length} events.`);
return; // Success
} catch (err) {
attempt++;
console.error(`[Collector] ClickHouse insert failed (attempt ${attempt}/${maxRetries}):`, err);
if (attempt >= maxRetries) {
// A common mistake is to drop data on persistent failure.
// For critical data, you should write it to a dead-letter queue or local disk.
console.error('[Collector] Max retries reached. Discarding batch.', {
count: batch.length,
firstEventId: batch[0]?.event_uuid
});
break;
}
// Exponential backoff with jitter
const delay = baseDelay * Math.pow(2, attempt - 1) + Math.random() * 1000;
await new Promise(res => setTimeout(res, delay));
}
}
}
}
This implementation handles the core logic: connecting, receiving, formatting, batching, and resiliently inserting. The error handling for the gRPC stream and the ClickHouse client is non-negotiable for a production-grade service.
Rigorous Validation with Jest
This collector is a critical piece of infrastructure. If it fails silently, we lose visibility. If it corrupts data, our analysis is worthless. Unit and integration testing are not optional. We used Jest
to simulate the entire environment, allowing us to test edge cases that are difficult to reproduce in a live system.
Our testing strategy involved two primary mocks:
- Mock gRPC Server: A mock
containerd
events server that we can control to emit specific event sequences, including malformed data or sudden stream terminations. - Mock ClickHouse Client: A mock of the
@clickhouse/client
that allows us to verify the data being sent and simulate database failures to test our retry logic.
tests/containerd-collector.test.ts
import { ContainerdCollector } from '../src/containerd-collector';
import { Envelope } from '../src/generated/containerd/events/v1/events_pb';
import { TaskOOM } from '../src/generated/containerd/events/v1/events_pb';
import { Any } from 'google-protobuf/google/protobuf/any_pb';
import { Timestamp } from 'google-protobuf/google/protobuf/timestamp_pb';
import { PassThrough } from 'stream';
// In-depth mocking is essential for testing infrastructure components.
// We mock the entire client module.
const mockClickHouseClient = {
insert: jest.fn(),
close: jest.fn().mockResolvedValue(undefined),
};
const mockCreateClient = jest.fn(() => mockClickHouseClient);
jest.mock('@clickhouse/client', () => ({
createClient: () => mockCreateClient,
}));
// Mocking the gRPC client is more complex. We mock the constructor and its methods.
const mockGrpcStream = new PassThrough({ objectMode: true });
const mockGrpcClient = {
subscribe: jest.fn(() => mockGrpcStream),
close: jest.fn(),
};
jest.mock('../src/generated/containerd/events/v1/events_grpc_pb', () => ({
EventsClient: jest.fn(() => mockGrpcClient),
}));
describe('ContainerdCollector', () => {
let collector: ContainerdCollector;
beforeEach(() => {
// Reset mocks before each test to ensure isolation.
jest.clearAllMocks();
jest.useFakeTimers();
collector = new ContainerdCollector('unix:///fake.sock', {}, 'test-node-01');
});
afterEach(async () => {
await collector.stop();
jest.useRealTimers();
});
function createFakeOOMEvent(containerId: string): Envelope {
const oomEvent = new TaskOOM();
oomEvent.setContainerId(containerId);
const any = new Any();
any.setTypeUrl('/containerd.events.TaskOOM');
any.setValue(oomEvent.serializeBinary());
const envelope = new Envelope();
const ts = new Timestamp();
ts.fromDate(new Date());
envelope.setTimestamp(ts);
envelope.setNamespace('default');
envelope.setTopic('/tasks/oom');
envelope.setEvent(any);
return envelope;
}
it('should collect, batch, and flush events on a timer', async () => {
await collector.start();
// Simulate receiving two events from the gRPC stream
mockGrpcStream.write(createFakeOOMEvent('container-1'));
mockGrpcStream.write(createFakeOOMEvent('container-2'));
// The buffer should contain 2 items before the flush
// Note: Direct inspection of private members is for testing only.
// @ts-ignore
expect(collector.buffer.length).toBe(2);
// Advance timers past the 5-second flush interval
await jest.advanceTimersByTimeAsync(5001);
// Verify that clickhouseClient.insert was called exactly once
expect(mockClickHouseClient.insert).toHaveBeenCalledTimes(1);
const insertCall = mockClickHouseClient.insert.mock.calls[0][0];
// Deeply inspect the payload sent to ClickHouse
expect(insertCall.table).toBe('containerd_events');
expect(insertCall.values.length).toBe(2);
expect(insertCall.values[0].container_id).toBe('container-1');
expect(insertCall.values[1].container_id).toBe('container-2');
expect(insertCall.values[0].node_id).toBe('test-node-01');
// After flushing, the buffer should be empty
// @ts-ignore
expect(collector.buffer.length).toBe(0);
});
it('should retry flushing on ClickHouse failure and succeed eventually', async () => {
await collector.start();
// Configure the mock client to fail twice then succeed.
// This is a powerful pattern for testing resilience.
mockClickHouseClient.insert
.mockRejectedValueOnce(new Error('Connection timeout'))
.mockRejectedValueOnce(new Error('Service unavailable'))
.mockResolvedValueOnce({} as any);
mockGrpcStream.write(createFakeOOMEvent('container-retry'));
// Fast-forward past the flush interval
await jest.advanceTimersByTimeAsync(5001);
// It should have failed the first time
expect(mockClickHouseClient.insert).toHaveBeenCalledTimes(1);
// Fast-forward past the first retry delay (1s + jitter)
await jest.advanceTimersByTimeAsync(2000);
expect(mockClickHouseClient.insert).toHaveBeenCalledTimes(2);
// Fast-forward past the second retry delay (2s + jitter)
await jest.advanceTimersByTimeAsync(3000);
expect(mockClickHouseClient.insert).toHaveBeenCalledTimes(3);
// The third call should have succeeded, and the buffer should be empty.
// @ts-ignore
expect(collector.buffer.length).toBe(0);
});
it('should discard a batch after max retries are exceeded', async () => {
await collector.start();
// Mock persistent failure
mockClickHouseClient.insert.mockRejectedValue(new Error('Permanent failure'));
mockGrpcStream.write(createFakeOOMEvent('container-discard'));
// Trigger the initial flush
await jest.advanceTimersByTimeAsync(5001);
// Trigger all retries
await jest.advanceTimersByTimeAsync(30000); // More than enough time for all backoffs
// It should have tried 1 initial + 4 retries = 5 times.
expect(mockClickHouseClient.insert).toHaveBeenCalledTimes(5);
// Buffer should be empty as the batch was discarded
// @ts-ignore
expect(collector.buffer.length).toBe(0);
});
it('should flush remaining events on graceful shutdown', async () => {
await collector.start();
mockGrpcStream.write(createFakeOOMEvent('container-shutdown'));
// @ts-ignore
expect(collector.buffer.length).toBe(1);
// Stop the collector without waiting for the timer
await collector.stop();
// The flush should have been triggered by the stop() method
expect(mockClickHouseClient.insert).toHaveBeenCalledTimes(1);
expect(mockClickHouseClient.insert.mock.calls[0][0].values[0].container_id).toBe('container-shutdown');
// Ensure resources are released
expect(mockGrpcClient.close).toHaveBeenCalledTimes(1);
expect(mockClickHouseClient.close).toHaveBeenCalledTimes(1);
});
});
This test suite validates our core requirements: correct batching, timer-based flushing, robust retry logic for transient failures, data discarding for permanent failures (an explicit design choice), and graceful shutdown. This level of testing provides high confidence before deploying the agent to the fleet.
Deployment and Lingering Issues
The collector, packaged as a minimal Docker container, is deployed as a Kubernetes DaemonSet, ensuring it runs on every node in the cluster. It mounts the host’s containerd.sock
to gain access to the runtime. The node_id
is passed via the Downward API, typically using the node’s name.
The result was transformative. We could now run queries like this in ClickHouse
and get answers in milliseconds:
SELECT
image_name,
count() AS oom_kills
FROM containerd_events
WHERE
event_type = 'TaskOOM' AND
event_timestamp >= now() - INTERVAL 1 HOUR
GROUP BY image_name
ORDER BY oom_kills DESC
LIMIT 10;
This gave us the immediate insight we lacked, directly tying OOM events to specific container images across the entire fleet.
However, this solution is not without its trade-offs. The current implementation uses an in-memory buffer. A crash of the collector process between a successful flush and the next will result in the loss of events collected in that window. For our use case—observability data—this small potential for loss was an acceptable trade-off for simplicity and performance. A system requiring absolute zero data loss would need to augment this with a persistent buffer, such as writing batches to a local file before attempting to send them, which would add significant complexity. Furthermore, the single-threaded nature of Node.js could become a CPU bottleneck on nodes with extremely high container churn. While performance has been more than adequate so far, a future iteration for a 10x larger fleet might necessitate a rewrite in a concurrent language like Go or Rust to fully leverage multi-core processors for event deserialization and processing.