Building a Stateful CDC Processor with Micronaut and XState for Apache Hudi Upserts from MongoDB


The standard approach to Change Data Capture (CDC) pipelines often results in a messy, difficult-to-query data lake. Simply mirroring raw insert, update, and delete operations from an operational database like MongoDB into an analytical store creates a change history, not a clean, current-state view. For any given entity, you’re left with multiple rows representing its evolution, forcing complex and inefficient queries with ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ...) just to find the latest version. This becomes untenable when the business logic governing an entity’s lifecycle is complex. An Order isn’t just a document; it’s an entity that transitions through states like PENDING, PAID, SHIPPED, and CANCELLED. A raw CDC stream loses this critical business context.

Our initial attempt to solve this involved building a custom state-management service within our Micronaut-based ingestion pipeline. It would consume Kafka events from Debezium, look up the previous state of an entity from a cache, apply business rules, and then write the updated state to our Apache Hudi data lake. This quickly devolved into a maze of if-else blocks and boolean flags, becoming a nightmare to maintain, test, and reason about. Every new business rule added exponential complexity. The core issue was that we were implicitly building a state machine but without the formal structure to manage it, leading to brittle, error-prone code.

The turning point was the decision to model the entity lifecycle explicitly using a formal state machine. We chose XState, a library typically associated with UI development, for its robust implementation of SCXML (State Chart XML) and its actor model concepts. By embedding an XState interpreter within our Micronaut CDC consumer, we could formally define, validate, and execute the lifecycle of each data entity. This decouples the complex state transition logic from the data transport mechanics, transforming our pipeline from a simple data replicator into a stateful, business-aware processing engine that lands clean, transactionally consistent, and analysis-ready data in Hudi.

The architecture is a clear progression of data refinement:

flowchart TD
    A[MongoDB Replica Set] -- Oplog --> B[Debezium Connector];
    B -- CDC Events --> C[Apache Kafka Topic: 'mongo.inventory.applications'];
    C --> D{Micronaut CDC Processor};
    D -- Fetches/Saves State --> E[MongoDB: 'fsm_states' collection];
    D -- Manages Lifecycle --> F[XState Interpreter];
    D -- Upserts Final State --> G[Apache Hudi Table on HDFS/S3];

    subgraph "Kafka Connect"
        B
    end

    subgraph "Stateful Ingestion Service"
        D
        F
    end

    subgraph "State Management"
        E
    end

    subgraph "Data Lake"
        G
    end

This post documents the build process, focusing on the integration of these disparate technologies to solve a concrete data engineering problem.

The Environment and Data Model

To create a runnable, production-grade example, we first need a complete local environment. This is best managed with Docker Compose. Our setup includes MongoDB configured as a replica set (a requirement for Debezium’s oplog reader), Kafka, Zookeeper, and Kafka Connect with the Debezium MongoDB connector.

docker-compose.yml:

version: '3.8'
services:
  mongo:
    image: mongo:5.0
    container_name: mongo
    ports:
      - "27017:27017"
    volumes:
      - mongo_data:/data/db
    command: ["--replSet", "rs0", "--bind_ip_all"]

  mongo-setup:
    image: mongo:5.0
    container_name: mongo-setup
    depends_on:
      - mongo
    restart: "no"
    entrypoint: |
      bash -c '
      sleep 10 &&
      mongo --host mongo:27017 --eval "rs.initiate({_id: \"rs0\", members: [{_id: 0, host: \"mongo:27017\"}]})"
      '

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  connect:
    image: confluentinc/cp-kafka-connect:7.3.0
    container_name: connect
    depends_on:
      - kafka
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"

volumes:
  mongo_data:

Once the environment is running, we configure the Debezium connector to watch our source inventory.applications collection.

debezium-mongo-connector.json:

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "rs0/mongo:27017",
    "mongodb.name": "inventory",
    "collection.include.list": "inventory.applications",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "value.json.passthrough.enable": "true",
    "publish.full.document.only": "true"
  }
}

A key configuration here is "publish.full.document.only": "true". Instead of receiving just a patch of what changed, we get the entire MongoDB document after the update. This simplifies our processor logic immensely, as we always have the full context of the entity.

Defining the Business Lifecycle with XState

The core of our solution is the formal state machine definition. We’ll model a loan application process. An application can be a DRAFT, then SUBMITTED, go UNDER_REVIEW, and finally end up APPROVED or REJECTED. This is a perfect use case for a finite state machine.

We define this logic in an SCXML file, which is a standard format for state charts. The XState Java library can directly parse and execute this.

src/main/resources/application-state-machine.scxml:

<?xml version="1.0" encoding="UTF-8"?>
<scxml xmlns="http://www.w3.org/2005/07/scxml"
       version="1.0"
       initial="DRAFT">

    <state id="DRAFT">
        <onentry>
            <log expr="'Entering DRAFT state'"/>
        </onentry>
        <transition event="SUBMIT" target="SUBMITTED">
            <assign location="document.status" expr="'SUBMITTED'"/>
        </transition>
    </state>

    <state id="SUBMITTED">
        <onentry>
            <log expr="'Entering SUBMITTED state'"/>
        </onentry>
        <transition event="BEGIN_REVIEW" target="UNDER_REVIEW">
            <assign location="document.status" expr="'UNDER_REVIEW'"/>
        </transition>
    </state>

    <state id="UNDER_REVIEW">
        <onentry>
            <log expr="'Entering UNDER_REVIEW state'"/>
        </onentry>
        <transition event="APPROVE" target="APPROVED">
             <assign location="document.status" expr="'APPROVED'"/>
        </transition>
        <transition event="REJECT" target="REJECTED">
             <assign location="document.status" expr="'REJECTED'"/>
        </transition>
    </state>

    <state id="APPROVED">
        <onentry>
            <log expr="'Entering final state: APPROVED'"/>
        </onentry>
        <!-- Final state -->
        <type>final</type>
    </state>

    <state id="REJECTED">
        <onentry>
            <log expr="'Entering final state: REJECTED'"/>
        </onentry>
        <!-- Final state -->
        <type>final</type>
    </state>

</scxml>

This XML defines the valid states and the events that cause transitions between them. A critical piece is the <assign> action, which ensures the status field in our data context is updated to reflect the current state. This allows the state machine to be the single source of truth for the entity’s status.

The Micronaut Processor: Orchestrating State and Storage

The Micronaut application is the glue. It listens for Kafka messages, manages the state machine instances, and writes to Hudi.

Project Dependencies

The build.gradle.kts file highlights the key technologies involved. We need Micronaut’s Kafka integration, the official Apache Hudi Java client, a MongoDB driver for state persistence, and the XState Java library.

// build.gradle.kts (partial)
dependencies {
    implementation("io.micronaut:micronaut-runtime")
    implementation("io.micronaut.kafka:micronaut-kafka")
    implementation("jakarta.annotation:jakarta.annotation-api")
    implementation("org.mongodb:mongodb-driver-sync:4.9.0")

    // XState for state machine management
    implementation("com.github.davidmoten:xstate-java:0.1.2")

    // Apache Hudi for Data Lake writes
    implementation("org.apache.hudi:hudi-java-client:0.14.0")
    implementation("org.apache.spark:spark-core_2.12:3.3.2")
    implementation("org.apache.spark:spark-sql_2.12:3.3.2")
    implementation("org.apache.hadoop:hadoop-client:3.3.4") {
        exclude(group = "org.slf4j", module = "slf4j-reload4j")
    }

    // Logging
    runtimeOnly("ch.qos.logback:logback-classic")
}

A common pitfall here is dependency hell, especially with Hadoop and Spark dependencies required by the Hudi client. Careful exclusion of conflicting transitive dependencies like logging frameworks is essential for a stable build.

State Persistence Service

Before processing an event, we need to know the entity’s last known state. After processing, we must save its new state. This must be an atomic operation to prevent race conditions and ensure idempotency. We use a separate MongoDB collection, fsm_states, for this purpose.

StatePersistenceService.java:

package com.example.cdc;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOptions;
import io.micronaut.context.annotation.Context;
import jakarta.inject.Singleton;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;

@Singleton
public class StatePersistenceService {

    private static final Logger LOG = LoggerFactory.getLogger(StatePersistenceService.class);
    private final MongoCollection<Document> stateCollection;

    public StatePersistenceService(MongoClient mongoClient) {
        MongoDatabase database = mongoClient.getDatabase("fsm_state_store");
        this.stateCollection = database.getCollection("fsm_states");
    }

    public Optional<String> getCurrentState(String entityId) {
        Document stateDoc = stateCollection.find(Filters.eq("_id", entityId)).first();
        if (stateDoc != null) {
            return Optional.of(stateDoc.getString("state"));
        }
        return Optional.empty();
    }

    public void saveState(String entityId, String newState) {
        LOG.info("Persisting new state '{}' for entity '{}'", newState, entityId);
        Document stateDoc = new Document("_id", entityId).append("state", newState);
        // Using replace with upsert=true is an atomic operation.
        stateCollection.replaceOne(Filters.eq("_id", entityId), stateDoc, new ReplaceOptions().upsert(true));
    }
}

The Kafka Listener

This is the main entry point. The listener consumes messages from the Debezium topic. For each message, it performs a sequence of critical operations:

  1. Parse the incoming JSON message to extract the entity ID and the full document.
  2. Retrieve the last known state of this entity’s state machine from our StatePersistenceService.
  3. Instantiate an XState machine interpreter with the current state.
  4. Determine the appropriate XState event to send based on the change in the document.
  5. Transition the state machine to its new state.
  6. Persist the new state back to MongoDB.
  7. If the new state is a significant one (e.g., a terminal state), trigger the Hudi writer.

ApplicationCdcListener.java:

package com.example.cdc;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.davidmoten.xstate.StateMachine;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

@KafkaListener(groupId = "hudi-stateful-ingestor")
public class ApplicationCdcListener {

    private static final Logger LOG = LoggerFactory.getLogger(ApplicationCdcListener.class);
    private final StateMachine<Map<String, Object>> stateMachine;
    private final StatePersistenceService statePersistence;
    private final HudiWriterService hudiWriter;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public ApplicationCdcListener(StatePersistenceService statePersistence, HudiWriterService hudiWriter) {
        this.statePersistence = statePersistence;
        this.hudiWriter = hudiWriter;
        // The state machine definition is loaded once at startup.
        this.stateMachine = StateMachine.fromscxml(
                ApplicationCdcListener.class.getResourceAsStream("/application-state-machine.scxml")
        );
    }

    @Topic("dbserver1.inventory.applications")
    public void receive(@KafkaKey String key, String value) {
        try {
            JsonNode payload = objectMapper.readTree(value);
            String entityId = extractEntityId(key);
            JsonNode document = payload.get("fullDocument");

            if (entityId == null || document == null) {
                LOG.warn("Skipping message with missing key or document: {}", key);
                return;
            }

            // 1. Get current state or use initial state from the machine definition
            String currentStateId = statePersistence.getCurrentState(entityId)
                    .orElse(stateMachine.getInitialState().getId());

            // 2. Determine the event based on document changes
            String event = determineEvent(document);
            if (event == null) {
                LOG.info("No relevant state change detected for entity {}. Ignoring.", entityId);
                return;
            }
            
            // 3. Transition the state machine
            LOG.info("Entity [{}], Current State [{}], Received Event [{}]", entityId, currentStateId, event);
            Map<String, Object> dataContext = Collections.singletonMap("document", document);
            StateMachine.State<Map<String, Object>> newState = stateMachine.transition(
                    new StateMachine.State<>(currentStateId, dataContext),
                    event
            );

            // 4. If a transition occurred, persist and process
            if (!newState.getId().equals(currentStateId)) {
                LOG.info("Entity [{}] transitioned from [{}] to [{}]", entityId, currentStateId, newState.getId());
                
                // This step is crucial for idempotency.
                statePersistence.saveState(entityId, newState.getId());
                
                // 5. Write to Hudi if it's a final state.
                if (newState.isFinal()) {
                    LOG.info("Entity [{}] reached a final state. Writing to Hudi.", entityId);
                    hudiWriter.upsert(document);
                }
            } else {
                LOG.warn("Event [{}] did not cause a state transition for entity [{}] in state [{}]", event, entityId, currentStateId);
            }

        } catch (JsonProcessingException e) {
            LOG.error("Failed to parse Kafka message.", e);
        } catch (Exception e) {
            LOG.error("Unexpected error processing CDC event for key {}.", key, e);
            // In a production system, this would go to a dead-letter queue.
        }
    }

    private String extractEntityId(String kafkaKey) {
        try {
            return objectMapper.readTree(kafkaKey).get("documentKey").get("_id").get("$oid").asText();
        } catch (Exception e) {
            return null;
        }
    }

    // This logic maps data changes to state machine events.
    // In a real-world project, this could be a more sophisticated rules engine.
    private String determineEvent(JsonNode document) {
        String status = document.path("status").asText();
        switch (status) {
            case "SUBMITTED": return "SUBMIT";
            case "UNDER_REVIEW": return "BEGIN_REVIEW";
            case "APPROVED": return "APPROVE";
            case "REJECTED": return "REJECTE";
            default: return null;
        }
    }
}

The logic in determineEvent is currently a simple mapping. In a production system, this could be more complex, potentially looking at combinations of field changes to derive the correct business event.

Writing to Apache Hudi

The final step is writing the consolidated record to our data lake. The Apache Hudi Java client provides a programmatic way to do this without needing a full Spark cluster for ingestion. The configuration is verbose but critical for correctness.

HudiWriterService.java:

package com.example.cdc;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import jakarta.inject.Singleton;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.model.HoodieJavaRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.payload.HoodieJsonPayload;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.UUID;

@Singleton
public class HudiWriterService {

    private static final Logger LOG = LoggerFactory.getLogger(HudiWriterService.class);

    private final HoodieJavaWriteClient<HoodieRecordPayload> writeClient;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final String tableName = "applications";
    private final String basePath = "file:///tmp/hudi/applications"; // Use local filesystem for example

    public HudiWriterService() {
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
            .withPath(basePath)
            .withTableName(tableName)
            .withSchema(APPLICATION_SCHEMA) // Schema must be defined
            // Record key for upserts
            .withRecordKeyField("applicationId")
            // Partitioning for query performance
            .withPartitionField("country")
            .withPrecombineField("lastUpdatedAt") // Used for deduplication
            .withIndexType("SIMPLE")
            .withAutoCommit(true)
            .withProps(System.getProperties()) // Pass system properties for things like S3 access
            .build();

        // Using Java client requires a Hadoop configuration and an Engine Context.
        Configuration hadoopConf = new Configuration();
        HoodieJavaEngineContext context = new HoodieJavaEngineContext(hadoopConf);
        this.writeClient = new HoodieJavaWriteClient<>(context, config);
    }
    
    public void upsert(JsonNode document) {
        String commitTime = writeClient.startCommit();
        
        try {
            // Hudi Java client needs a HoodieRecordPayload. HoodieJsonPayload works well.
            ObjectNode mutableDoc = (ObjectNode) document;
            String recordKey = mutableDoc.get("applicationId").asText();
            String partitionPath = mutableDoc.get("country").asText();
            
            // Ensure precombine field exists. A common mistake is forgetting this.
            mutableDoc.put("lastUpdatedAt", System.currentTimeMillis());

            HoodieRecordPayload payload = new HoodieJsonPayload(Option.of(mutableDoc.toString()));
            HoodieRecord<HoodieRecordPayload> record = new HoodieJavaRecord<>(new HoodieKey(recordKey, partitionPath), payload);
            List<HoodieRecord<HoodieRecordPayload>> records = Collections.singletonList(record);
            
            writeClient.upsert(records, commitTime);
            LOG.info("Successfully upserted record with key {} to Hudi table {}", recordKey, tableName);
        } catch (Exception e) {
            LOG.error("Failed to write to Hudi, aborting commit {}", commitTime, e);
            writeClient.abort(commitTime);
            // Re-throw to signal failure to the Kafka consumer for potential retry.
            throw new RuntimeException("Hudi write failed", e);
        }
    }
    
    // In a real project, this schema should be loaded from an Avro schema file (.avsc)
    // or a schema registry for proper schema evolution management.
    private static final String APPLICATION_SCHEMA = "{\"type\":\"record\",\"name\":\"Application\",\"fields\":["
        + "{\"name\":\"applicationId\",\"type\":\"string\"},"
        + "{\"name\":\"applicantName\",\"type\":\"string\"},"
        + "{\"name\":\"country\",\"type\":\"string\"},"
        + "{\"name\":\"loanAmount\",\"type\":\"double\"},"
        + "{\"name\":\"status\",\"type\":\"string\"},"
        + "{\"name\":\"lastUpdatedAt\",\"type\":\"long\"}"
        + "]}";
}

The most critical configurations in HoodieWriteConfig are:

  • withRecordKeyField: This is the unique identifier for a record (applicationId). Hudi uses this for upserts.
  • withPartitionField: This determines the physical directory structure of the data (country). Queries filtering on this field will be significantly faster.
  • withPrecombineField: When two records with the same key arrive in a single commit, Hudi uses this field (lastUpdatedAt) to pick the latest one.

A major challenge when integrating Hudi’s Java client is its dependency on a Hadoop Configuration. This setup works for writing to a local filesystem, but for production systems writing to S3 or HDFS, the Hadoop configuration must be properly set up with filesystem implementations and credentials.

Limitations and Future Enhancements

While this architecture effectively solves the problem of stateless CDC replication, it introduces its own set of complexities. The state persistence in a separate MongoDB collection, while simple, could become a performance bottleneck under very high load. For massive throughput, a more specialized key-value store like Redis or even RocksDB managed by the service instances might be more appropriate.

Furthermore, the current implementation processes messages one by one, which is simple and robust but limits throughput. A batch-oriented approach where a micro-batch of Kafka messages is processed together would be more efficient. However, this dramatically complicates state management, as the batch might contain multiple events for the same entity that need to be processed in the correct order before a final state is determined and written to Hudi.

Finally, the resilience model relies on Kafka’s at-least-once delivery and our idempotent state persistence logic. If the Hudi write operation fails after the state has been persisted, a re-delivery of the Kafka message will attempt the Hudi write again. A more robust solution might involve a two-phase commit-style approach or writing to a temporary staging area before committing to the main Hudi table, though this adds significant operational overhead. The current design represents a pragmatic trade-off between architectural purity and implementation simplicity for many real-world use cases.


  TOC