The analytics team’s daily reports were consistently late. Our legacy ETL process, a collection of batch scripts running nightly, was buckling under the weight of growing data volumes. The core pain point was the latency between an operational event occurring in our production PostgreSQL database and its availability for analysis in our ClickHouse data warehouse. A 24-hour delay was no longer acceptable for business intelligence; the demand was for near-real-time insights. The nightly batch jobs were not only slow but also incredibly resource-intensive, causing load spikes on the production database that impacted application performance.
The initial concept was to scrap the batch ETL entirely and move to a stream-based ingestion model using Change Data Capture (CDC). This approach promised to capture row-level changes from PostgreSQL as they happened, stream them through a message bus, and apply them to ClickHouse continuously. This would drastically reduce data latency and distribute the ingestion load evenly throughout the day.
Our technology selection process was guided by pragmatism and the need for a robust, maintainable system.
- Capture Agent: Debezium. We chose Debezium as our CDC tool. Its connector-based architecture integrates seamlessly with Kafka Connect, and its PostgreSQL connector is mature. It reliably captures
INSERT
,UPDATE
, andDELETE
operations from the write-ahead log (WAL) without requiring application-level changes. Crucially, it also captures schema changes, a requirement for any evolving production system. - Message Bus: Apache Kafka. Kafka was the obvious choice for the transport layer. It provides the necessary durability, backpressure handling, and decoupling between our source database and the ingestion service. If our ingestion service goes down, events are safely buffered in Kafka until it recovers.
- Data Warehouse: ClickHouse. The destination was already established. Its columnar storage format and
MergeTree
family of table engines are optimized for the fast analytical queries our BI team needs. - Processing Service: Custom Spring Boot Application. While a simple Kafka Connect sink could theoretically move data, our requirements were more complex. We needed custom logic for batching writes to ClickHouse for performance, handling different event types (
c
,u
,d
), transforming data payloads, and implementing a sophisticated error handling strategy with a Dead-Letter Queue (DLQ). A custom Spring Boot service provided the flexibility and control we needed, leveraging the excellentspring-kafka
library for consumer management.
The first step was to establish the full infrastructure stack locally for development and testing. docker-compose
is the perfect tool for this.
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
postgres:
image: debezium/postgres:14
hostname: postgres
container_name: postgres
ports:
- "5432:5432"
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=appdb
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
connect:
image: debezium/connect:2.1
hostname: connect
container_name: connect
depends_on:
- kafka
- postgres
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: 'kafka:9092'
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
clickhouse:
image: clickhouse/clickhouse-server:23.8
hostname: clickhouse
container_name: clickhouse
ports:
- "8123:8123"
- "9000:9000"
ulimits:
nproc: 65535
nofile:
soft: 262144
hard: 262144
The PostgreSQL instance is initialized with a sample table and configured for logical replication, which Debezium requires.
-- init.sql
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
last_updated_at TIMESTAMPTZ DEFAULT NOW() NOT NULL
);
ALTER TABLE products REPLICA IDENTITY FULL;
INSERT INTO products (name, price) VALUES ('Laptop', 1200.50);
INSERT INTO products (name, price) VALUES ('Mouse', 25.00);
With the infrastructure running, we configure the Debezium PostgreSQL connector via the Kafka Connect REST API. This configuration tells Debezium which tables to monitor and how to format the output messages.
// debezium-pg-connector.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "appdb",
"database.server.name": "dbserver1",
"table.include.list": "public.products",
"plugin.name": "pgoutput",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}
A curl
command posts this configuration to the connect
service, and Debezium starts streaming changes from the products
table into a Kafka topic named dbserver1.public.products
.
Our initial Spring Boot processor was a straightforward “happy path” implementation. The goal was to consume a message, parse it, and insert it into ClickHouse.
// pom.xml dependencies
// ...
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.5.0</version>
<classifier>all</classifier>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
// ...
The Kafka consumer configuration is critical. We disabled auto-commit to take full control over offset management, ensuring we only commit an offset after successfully processing the corresponding message.
// package com.example.cdcingestor.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// We disable auto-commit to manually acknowledge messages. This is key for reliability.
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Manually acknowledge after the listener completes.
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
The first version of the consumer simply logged the message. This verified the connection was working. The next step was parsing the complex Debezium JSON structure and handling different database operations. A raw Debezium message for an INSERT
looks something like this:
{
"before": null,
"after": {
"id": 1,
"name": "Laptop",
"price": "1200.50",
"last_updated_at": 1672531200000000
},
"source": { ... },
"op": "c",
"ts_ms": 1672531200123
}
An UPDATE
includes both before
and after
states, and op
is "u"
. A DELETE
includes the before
state, after
is null, and op
is "d"
.
This immediately presented our first major challenge. Standard INSERT
statements are insufficient. To handle updates and deletes in an analytical warehouse like ClickHouse, a simple INSERT
would create duplicates. The correct approach is to use a table engine that can handle mutations, like ReplacingMergeTree
. This engine deduplicates rows during its background merge process based on a specified key.
First, we defined the target table in ClickHouse:
CREATE TABLE appdb.products (
id Int32,
name String,
price Decimal(10, 2),
last_updated_at DateTime64(3, 'UTC'),
_sign Int8,
_version UInt64
) ENGINE = ReplacingMergeTree(_version)
ORDER BY id;
The _version
column (we’ll use the Debezium event timestamp ts_ms
) tells ReplacingMergeTree
which row is the newest version to keep. The _sign
column is a common pattern for handling deletes; we’ll insert a row with _sign = -1
for a delete and _sign = 1
for an insert/update. This pattern is more flexible and performant than ClickHouse’s ALTER TABLE ... DELETE
mutations.
The second problem was performance. Writing to ClickHouse one record at a time is disastrously slow due to network latency and transaction overhead. The solution is batching. We reconfigured our Kafka listener to receive a list of messages at once and modified our writer service to use JDBC batch inserts.
// KafkaConsumerConfig.java - update
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// Enable batch listening
factory.setBatchListener(true);
return factory;
}
The refactored ingestion service now parses the operation type and constructs batch statements.
// package com.example.cdcingestor.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Service
public class ClickHouseWriterService {
private static final Logger log = LoggerFactory.getLogger(ClickHouseWriterService.class);
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
public ClickHouseWriterService(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper;
}
@Transactional
public void writeBatch(List<String> messages) {
List<ProductRecord> records = new ArrayList<>();
for (String message : messages) {
try {
JsonNode root = objectMapper.readTree(message);
JsonNode payload = root.path("payload");
// A common mistake is assuming the payload is always present.
// Tombstone records for deleted keys will have a null payload.
if (payload.isNull()) {
// We need to handle tombstones. For now, we'll log and skip.
log.warn("Received tombstone record. Skipping.");
continue;
}
String op = payload.path("op").asText();
long version = payload.path("ts_ms").asLong();
JsonNode dataNode;
int sign;
switch (op) {
case "c": // Create
case "u": // Update
dataNode = payload.path("after");
sign = 1;
break;
case "d": // Delete
dataNode = payload.path("before");
sign = -1;
break;
default: // Read events, etc.
log.warn("Skipping non-DML operation: {}", op);
continue;
}
if (dataNode.isMissingNode() || dataNode.isNull()) {
log.error("Data node is missing for operation '{}'. Payload: {}", op, message);
continue; // Skip this malformed record
}
Map<String, Object> data = objectMapper.convertValue(dataNode, MAP_TYPE_REFERENCE);
records.add(new ProductRecord(data, sign, version));
} catch (JsonProcessingException e) {
log.error("Failed to parse message: {}", message, e);
// In a real project, this message should be routed to a DLQ.
}
}
if (records.isEmpty()) {
return;
}
String sql = "INSERT INTO appdb.products (id, name, price, last_updated_at, _sign, _version) VALUES (?, ?, ?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql,
records,
100, // Batch size
(PreparedStatement ps, ProductRecord record) -> {
ps.setInt(1, (Integer) record.data().get("id"));
ps.setString(2, (String) record.data().get("name"));
// Debezium sends decimals as strings, requires conversion
ps.setBigDecimal(3, new java.math.BigDecimal((String) record.data().get("price")));
// Debezium timestamp is in microseconds for pgoutput plugin, needs conversion to millis for Timestamp
long lastUpdatedMicros = (Long) record.data().get("last_updated_at");
ps.setTimestamp(4, Timestamp.from(Instant.ofEpochMilli(lastUpdatedMicros / 1000)));
ps.setInt(5, record.sign());
ps.setLong(6, record.version());
});
log.info("Successfully wrote {} records to ClickHouse.", records.size());
}
private record ProductRecord(Map<String, Object> data, int sign, long version) {}
}
This implementation was much more robust, but it surfaced the next critical issue: error handling. What happens if a single message in the batch is malformed or violates a ClickHouse constraint? The entire batchUpdate
fails, the transaction rolls back, and because the Kafka offset isn’t acknowledged, Spring Kafka will re-deliver the entire batch. This creates an infinite poison-pill loop.
The solution is a Dead-Letter Queue (DLQ). We must catch exceptions at the individual message level, send the failing message to a separate Kafka topic for later analysis, and continue processing the rest of the batch.
// package com.example.cdcingestor.consumer;
import com.example.cdcingestor.service.ClickHouseWriterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class DebeziumEventConsumer {
private static final Logger log = LoggerFactory.getLogger(DebeziumEventConsumer.class);
private final ClickHouseWriterService writerService;
// We would inject a KafkaTemplate here to send to a DLQ
public DebeziumEventConsumer(ClickHouseWriterService writerService) {
this.writerService = writerService;
}
@KafkaListener(topics = "${cdc.topic.name}", containerFactory = "kafkaListenerContainerFactory")
public void handleBatch(@Payload List<String> messages, Acknowledgment acknowledgment) {
if (messages.isEmpty()) {
acknowledgment.acknowledge();
return;
}
log.info("Received batch of {} messages.", messages.size());
try {
// The writer service needs to be internally resilient, but we still wrap the call.
writerService.writeBatch(messages);
// If the entire batch call succeeds, we acknowledge.
acknowledgment.acknowledge();
log.info("Batch processed and acknowledged.");
} catch (Exception e) {
log.error("Unrecoverable error processing batch. Acknowledging to avoid retry loop. Manual intervention required.", e);
// In a catastrophic failure (e.g., ClickHouse is down), we might choose not to acknowledge
// and let the consumer retry. But for data errors, we must acknowledge to prevent blocking.
// A more advanced strategy would involve a circuit breaker.
acknowledgment.acknowledge();
}
}
}
The final major hurdle was schema evolution. A developer added a new description
column to the products
table in PostgreSQL. Debezium correctly picked this up and started including it in the after
payload. Our processor, which assumed a fixed structure, immediately broke.
The pitfall here is hardcoding column lists. A production data pipeline must be resilient to additive schema changes. The solution is to make the ingestion logic dynamic. Instead of mapping to a static DTO, we can parse the payload map and dynamically construct the INSERT
statement’s column list and parameter bindings based on the keys present in the event.
graph TD A[PostgreSQL WAL] -->|Debezium Connector| B(Kafka Topic: dbserver1.public.products); B --> C{Spring Boot Ingestor}; C -->|Batch Consumer| D[Parse Debezium Envelope]; D --> E{Check Operation Type}; E -->|'c', 'u'| F[Extract 'after' fields]; E -->|'d'| G[Extract 'before' fields]; F --> H[Dynamically Build INSERT Statement]; G --> H; H --> I{JDBC Batch Update}; I --> J[ClickHouse]; subgraph Error Handling D -->|Parse Error| K(DLQ Producer); I -->|DB Write Error| K; K --> L(Kafka Topic: ingestor_dlq); end
This diagram illustrates the final, more resilient architecture. The Spring Boot ingestor becomes a generic processor capable of handling any table’s data stream, provided the target table in ClickHouse exists and is compatible.
While this implementation solves the core problem, several aspects would need further hardening in a true production environment. The current DLQ strategy is passive; a real system requires an active monitoring and reprocessing mechanism for dead-lettered messages. The dynamic schema handling is resilient to added columns but would still require manual intervention for destructive changes like column drops or type changes. Furthermore, for extremely high-volume topics, a single ingestor instance might not keep up. Scaling the service horizontally would require careful consideration of Kafka consumer group rebalancing and ensuring processing remains idempotent across multiple consumers. The current at-least-once delivery guarantee, combined with ClickHouse’s ReplacingMergeTree
, provides eventual consistency, which is acceptable for most analytical use cases, but it’s not a substitute for true transactional integrity.