The initial data lake ingestion pipeline was hemorrhaging data integrity. Our mandate was simple: every database change captured via CDC from our primary OLTP systems must land in our data lake’s staging zone—a MongoDB cluster—exactly once. The first iteration, a straightforward consumer reading from a RocketMQ topic and writing to MongoDB, was a disaster in staging. Under simulated failure conditions—consumer restarts, network partitions, broker rebalancing—we saw rampant data duplication and, worse, occasional data loss. The at-least-once delivery guarantee, combined with non-idempotent writes, was creating downstream analytical chaos. The problem wasn’t the individual components, but the seams between them. A message could be successfully processed and written to MongoDB, but the consumer could crash before its offset was committed to the RocketMQ broker. Upon restart, the same message would be redelivered, creating a duplicate record. This post-mortem details the journey of building a resilient, exactly-once ingestion layer that solved this fundamental distributed systems problem.
Our stack was fixed: Debezium reading the MySQL binlog, publishing JSON-formatted change events to Apache RocketMQ, and a fleet of Java consumers responsible for persisting these events into a structured MongoDB collection. The initial consumer logic was naive, relying on RocketMQ’s default CONSUME_SUCCESS
acknowledgement to advance the consumer offset.
// DO NOT USE THIS IN PRODUCTION - FLAWED INITIAL APPROACH
public class NaiveAtLeastOnceConsumer {
// ... setup for consumer and mongoClient ...
public void start() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cdc_ingestion_group_naive");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("mysql_binlog_topic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
byte[] body = msg.getBody();
String eventJson = new String(body, StandardCharsets.UTF_8);
Document doc = Document.parse(eventJson);
// The critical, non-atomic operation
getMongoCollection().insertOne(doc);
} catch (Exception e) {
// On failure, message will be redelivered.
// This is the "at-least-once" part.
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// If we crash here, after insertOne() but before this return,
// the message is processed but its offset is not committed.
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
// ... helper methods for mongo collection ...
}
The flaw is the gap between the database write and the message acknowledgement. In a distributed system, this gap is an invitation for inconsistency. A common mistake is to assume these two actions are atomic when they are, by nature, executed over a network against two separate systems. The fix required us to stop treating them as two independent steps and instead bind them into a single logical transaction.
Our first major pivot was to address the consumer side’s lack of atomicity. The core principle we adopted was to make the write operation to our NoSQL sink, MongoDB, fully idempotent. If we could safely re-process a message without creating duplicate data, the danger of at-least-once delivery would be neutralized.
To achieve idempotency, we redesigned our MongoDB document schema. Each document would now be uniquely identified by a composite key derived from the source database event. For a typical CDC event, a combination of the source table’s primary key and the transaction ID is a strong candidate for a natural unique key. We enforced this uniqueness with an index in MongoDB.
The consumer logic was then rewritten to use replaceOne
with the upsert=true
option. Instead of blindly inserting, we were now effectively performing an “upsert.” If a document with the unique key existed, it would be replaced (harmless for redelivered messages); if not, it would be inserted.
// A more robust consumer using an idempotent sink
// Configuration class
public class CDCIdempotentConsumerConfig {
public static final String NAME_SERVER_ADDR = "localhost:9876";
public static final String MONGO_URI = "mongodb://localhost:27017";
public static final String DATABASE_NAME = "datalake_landing";
public static final String COLLECTION_NAME = "orders_cdc";
public static final String CONSUMER_GROUP = "cdc_ingestion_group_idempotent";
public static final String TOPIC_NAME = "mysql_binlog_topic";
}
// Main consumer logic
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOptions;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
public class IdempotentConsumer {
private static final Logger logger = LoggerFactory.getLogger(IdempotentConsumer.class);
private final MongoClient mongoClient;
private final MongoCollection<Document> collection;
private final DefaultMQPushConsumer consumer;
public IdempotentConsumer() throws Exception {
this.mongoClient = MongoClients.create(CDCIdempotentConsumerConfig.MONGO_URI);
this.collection = mongoClient.getDatabase(CDCIdempotentConsumerConfig.DATABASE_NAME)
.getCollection(CDCIdempotentConsumerConfig.COLLECTION_NAME);
// Ensure unique index exists for idempotency
// This should ideally be managed by an infrastructure-as-code tool
// Example: based on a composite key from the event payload
// collection.createIndex(Indexes.compoundIndex(Indexes.ascending("payload.source.txId"), Indexes.ascending("payload.after.order_id")));
this.consumer = new DefaultMQPushConsumer(CDCIdempotentConsumerConfig.CONSUMER_GROUP);
this.consumer.setNamesrvAddr(CDCIdempotentConsumerConfig.NAME_SERVER_ADDR);
this.consumer.subscribe(CDCIdempotentConsumerConfig.TOPIC_NAME, "*");
this.consumer.setConsumeMessageBatchMaxSize(32); // Process in batches
}
public void start() throws Exception {
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
String eventJson = new String(msg.getBody(), StandardCharsets.UTF_8);
Document eventDoc = Document.parse(eventJson);
// Extract a unique key from the CDC event payload
// Debezium format is nested. Let's assume a structure like: { "payload": { "source": { "txId": "..." }, "after": { "order_id": 123 } } }
Document payload = eventDoc.get("payload", Document.class);
Document source = payload.get("source", Document.class);
Document after = payload.get("after", Document.class);
if (source == null || after == null || source.getString("txId") == null || after.getInteger("order_id") == null) {
logger.error("Malformed CDC event, skipping. MsgId: {}", msg.getMsgId());
continue; // Or move to a dead-letter queue
}
// This is our idempotent key
String txId = source.getString("txId");
Integer orderId = after.getInteger("order_id");
// Use the idempotent key in the filter
var filter = Filters.and(
Filters.eq("payload.source.txId", txId),
Filters.eq("payload.after.order_id", orderId)
);
// The core of idempotency: replaceOne with upsert
ReplaceOptions options = new ReplaceOptions().upsert(true);
collection.replaceOne(filter, eventDoc, options);
logger.info("Processed messageId: {}", msg.getMsgId());
} catch (Exception e) {
logger.error("Error processing messageId: {}. Attempting redelivery.", msg.getMsgId(), e);
// Critical: if the DB operation fails, we must trigger redelivery.
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// If the process crashes here, messages are redelivered,
// but the idempotent write protects against duplicates.
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
logger.info("Idempotent Consumer Started.");
}
public void shutdown() {
if (this.consumer != null) {
this.consumer.shutdown();
}
if (this.mongoClient != null) {
this.mongoClient.close();
}
logger.info("Idempotent Consumer Shutdown.");
}
}
This was a significant improvement. We were now safe from data duplication. However, there’s a subtle performance pitfall here. During a re-delivery scenario, we are performing a full database write operation for a message that has already been processed. While correct, it’s inefficient. More importantly, this architecture doesn’t guarantee exactly-once semantics in a strict sense; it provides an effectively-once outcome through idempotency. For many systems this is sufficient. But our requirement was stricter, and we needed to eliminate the possibility of data loss which could still occur in some edge cases if message ordering was critical and not handled properly.
The ultimate solution required us to achieve true atomicity between the data write and the offset commit. Since a two-phase commit (2PC) between MongoDB and RocketMQ isn’t feasible, we simulated it by co-locating the consumer offset within the same transactional boundary as our data. We would store the RocketMQ consumer offsets in MongoDB itself.
The flow became:
- Start a MongoDB transaction.
- Receive a batch of messages from RocketMQ.
- Inside the transaction, perform the idempotent
replaceOne
operation for each message’s data. - Inside the same transaction, update a dedicated
consumer_offsets
collection in MongoDB with the offset of the last successfully processed message for that specific message queue. - Commit the MongoDB transaction.
- Only if the transaction commits successfully, acknowledge the batch of messages to RocketMQ.
If the consumer process crashes at any point before or during the MongoDB transaction commit, the transaction is rolled back. No data is written, and no offset is updated. When the consumer restarts, RocketMQ redelivers the same batch of messages, and the process repeats safely. If the crash happens after the MongoDB commit but before the RocketMQ acknowledgement, on restart, the messages are redelivered. However, our consumer logic first checks the stored offset in MongoDB. If the incoming message’s offset is less than or equal to the stored offset, it’s a duplicate and can be safely ignored without a database hit.
Here is the architecture visualized with Mermaid.js:
sequenceDiagram participant R as RocketMQ Broker participant C as CDC Consumer participant M as MongoDB loop Message Batch Processing C->>R: Fetch messages R-->>C: Delivers [Msg1, Msg2, Msg3] C->>M: Start Transaction Note over C,M: Transaction boundary begins C->>M: upsert(data for Msg1) C->>M: upsert(data for Msg2) C->>M: upsert(data for Msg3) C->>M: update_offset(queueId, Msg3.offset) alt Transaction Succeeds C->>M: Commit Transaction Note over C,M: Transaction boundary ends M-->>C: Commit OK C->>R: Acknowledge messages up to Msg3.offset R-->>C: Ack OK else Transaction Fails C->>M: Abort Transaction Note over C,M: All writes are rolled back M-->>C: Abort OK Note over C,R: No acknowledgement sent. Messages will be redelivered. end end
The implementation of this transactional consumer is significantly more complex, requiring careful state management.
// FINAL, PRODUCTION-GRADE IMPLEMENTATION
import com.mongodb.ClientSessionOptions;
import com.mongodb.ReadConcern;
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;
import com.mongodb.client.*;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOptions;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class TransactionalExactlyOnceConsumer {
private static final Logger logger = LoggerFactory.getLogger(TransactionalExactlyOnceConsumer.class);
// Configuration constants
private static final String NAME_SERVER_ADDR = "localhost:9876";
private static final String MONGO_URI = "mongodb://localhost:27017/?replicaSet=rs0"; // Replica set is required for transactions
private static final String DATABASE_NAME = "datalake_landing";
private static final String DATA_COLLECTION_NAME = "orders_cdc_final";
private static final String OFFSET_COLLECTION_NAME = "consumer_offsets";
private static final String CONSUMER_GROUP = "cdc_ingestion_group_exactly_once";
private static final String TOPIC_NAME = "mysql_binlog_topic";
private final MongoClient mongoClient;
private final DefaultMQPushConsumer consumer;
public TransactionalExactlyOnceConsumer() {
this.mongoClient = MongoClients.create(MONGO_URI);
this.consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
}
public void start() throws MQClientException {
// Consumer setup
consumer.setNamesrvAddr(NAME_SERVER_ADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(TOPIC_NAME, "*");
// Using MessageListenerOrderly ensures we process messages for a single queue sequentially.
// This simplifies offset management significantly.
consumer.registerMessageListener(new MongoTransactionalMessageListener());
consumer.start();
logger.info("Transactional Exactly-Once Consumer Started.");
}
public void shutdown() {
if (consumer != null) {
consumer.shutdown();
}
if (mongoClient != null) {
mongoClient.close();
}
logger.info("Transactional Exactly-Once Consumer Shutdown.");
}
// The core logic is encapsulated in this listener
private class MongoTransactionalMessageListener implements MessageListenerOrderly {
private final MongoDatabase database;
private final MongoCollection<Document> dataCollection;
private final MongoCollection<Document> offsetCollection;
MongoTransactionalMessageListener() {
this.database = mongoClient.getDatabase(DATABASE_NAME);
this.dataCollection = database.getCollection(DATA_COLLECTION_NAME);
this.offsetCollection = database.getCollection(OFFSET_COLLECTION_NAME);
}
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext context) {
// Auto-commit is disabled for orderly consumer; we control suspension.
context.setAutoCommit(false);
MessageQueue mq = context.getMessageQueue();
// In a real-world project, a robust transaction retry policy is essential.
TransactionOptions txnOptions = TransactionOptions.builder()
.readConcern(ReadConcern.MAJORITY)
.writeConcern(WriteConcern.MAJORITY)
.build();
try (ClientSession clientSession = mongoClient.startSession(ClientSessionOptions.builder().build())) {
// Start the MongoDB transaction
clientSession.startTransaction(txnOptions);
logger.debug("Starting MongoDB transaction for queue {} with {} messages.", mq.getQueueId(), msgs.size());
long maxOffset = -1;
for (MessageExt msg : msgs) {
try {
String eventJson = new String(msg.getBody(), StandardCharsets.UTF_8);
Document eventDoc = Document.parse(eventJson);
// Idempotency is still key. We use it as a safety net.
Document payload = eventDoc.get("payload", Document.class);
Document source = payload.get("source", Document.class);
Document after = payload.get("after", Document.class);
if (source == null || after == null || source.getString("txId") == null || after.getInteger("order_id") == null) {
logger.warn("Skipping malformed message: {}", msg.getMsgId());
continue;
}
var idempotentFilter = Filters.and(
Filters.eq("payload.source.txId", source.getString("txId")),
Filters.eq("payload.after.order_id", after.getInteger("order_id"))
);
ReplaceOptions options = new ReplaceOptions().upsert(true);
// Perform data write within the transaction
dataCollection.replaceOne(clientSession, idempotentFilter, eventDoc, options);
// Keep track of the highest offset processed in this batch
if (msg.getQueueOffset() > maxOffset) {
maxOffset = msg.getQueueOffset();
}
} catch (Exception e) {
logger.error("Failed to process message {} inside transaction. Aborting.", msg.getMsgId(), e);
clientSession.abortTransaction();
// Suspend consumption of this queue for a while before retrying
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
if (maxOffset > -1) {
// Update the offset in MongoDB, also within the transaction
updateOffsetInMongo(clientSession, mq, maxOffset);
}
// Commit the entire transaction
clientSession.commitTransaction();
logger.info("Successfully committed transaction for queue {} up to offset {}.", mq.getQueueId(), maxOffset);
} catch (Exception e) {
logger.error("MongoDB transaction failed for queue {}. Messages will be redelivered.", mq.getQueueId(), e);
// On any transaction failure, suspend and let RocketMQ redeliver.
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
// Acknowledge to RocketMQ *after* successful DB commit.
return ConsumeOrderlyStatus.SUCCESS;
}
private void updateOffsetInMongo(ClientSession session, MessageQueue mq, long offset) {
String offsetStoreId = String.format("%s@%s@%s", CONSUMER_GROUP, mq.getTopic(), mq.getQueueId());
Document filter = new Document("_id", offsetStoreId);
Document update = new Document("$set", new Document("offset", offset));
UpdateOptions options = new UpdateOptions().upsert(true);
logger.debug("Updating offset in MongoDB for {} to {}", offsetStoreId, offset);
offsetCollection.updateOne(session, filter, update, options);
}
}
}
This final architecture passed all our chaos tests. We could kill consumers mid-batch, disconnect them from the database, or simulate network partitions, and the pipeline would always self-correct upon recovery without data loss or duplication. The use of MessageListenerOrderly
was a crucial simplification; by processing messages for each queue partition sequentially, we avoided the complexities of managing offsets for concurrent messages within a single batch.
The solution, however, is not without its trade-offs. The throughput is inherently limited by the transactional overhead on MongoDB and the single-threaded processing per message queue. For our use case, the volume per queue was manageable, and the guarantee of data integrity far outweighed the performance cost. The logic is also considerably more complex than a simple consumer, which increases the maintenance burden and requires developers to have a deep understanding of the failure modes of both RocketMQ and MongoDB transactions. For scenarios requiring extreme throughput, a dedicated stream processing framework like Apache Flink, with its sophisticated state backends and checkpointing mechanisms, might be a more suitable choice. Yet, for projects needing precise transactional control without introducing a new, heavy framework, this pattern of combining an idempotent NoSQL sink with co-located offset management provides a robust and powerful mechanism for building exactly-once ingestion systems.