The initial request was straightforward: a new analytics service required a near real-time feed of user profile changes from our primary monolithic application. The monolith’s database was a battle-hardened MySQL instance, the central source of truth. The most immediate, and ultimately flawed, idea was dual writes. The monolith would write to its own DB and then push a message to a queue for the analytics service. This was rejected almost immediately. In a real-world project, dual writes introduce unbearable consistency problems. An outage in the message queue or a network partition means the two systems silently drift apart, a nightmare for data integrity.
The next logical step was Change Data Capture (CDC). By tailing the MySQL binary log (binlog), we could capture every committed INSERT
, UPDATE
, and DELETE
operation as a stream of events. This decouples the source system from the consumer; the monolith remains blissfully unaware of who is listening to its changes. This approach is fundamentally more robust.
Our stack of choice was already established. The new services were to be built in Scala for its type safety and powerful concurrency libraries. The transport layer would be Apache Kafka. For the CDC component itself, Debezium was the obvious choice, being the industry standard for connecting databases to Kafka. The decision to house both the existing monolith’s relevant data models and the new Scala consumer service within a single SBT monorepo was made to enforce consistency and simplify cross-service changes. An atomic commit could update a database schema, the corresponding data model, and the consumer logic all at once. Or so we thought.
The initial setup was deceptively simple. We configured MySQL, spun up Kafka and Debezium via Docker Compose, and pointed a Debezium MySQL Connector at our user_profiles
table. The events flowed into a Kafka topic as JSON payloads. Our first-pass Scala consumer, built with Akka Streams Kafka (now Pekko Connectors), was trivial to write. It worked perfectly. For about a week.
Then came the first schema change. A product requirement added an optional secondary_email
field to the user profile. The monolith’s team added the column to the user_profiles
table, deployed their code, and everything seemed fine.
-- The seemingly innocent schema change
ALTER TABLE user_profiles ADD COLUMN secondary_email VARCHAR(255) NULL;
Our JSON-based consumer didn’t crash. Since the new field was nullable, it simply didn’t appear in the JSON for older records, and our deserialization logic (using a simple JSON library) handled the missing key gracefully. We dodged a bullet. The second time, we weren’t so lucky. A new feature required a non-nullable account_status
field with a default value.
-- The change that brought everything down
ALTER TABLE user_profiles ADD COLUMN account_status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE';
The monolith deployed. New users signed up. Debezium dutifully captured the INSERT
statements, now with the new account_status
field. It published a new schema structure within its JSON payload. Our consumer, hard-coded to expect the old structure, immediately hit a deserialization error upon receiving the first new message. The consumer process crashed, entered a crash loop, and the data pipeline halted entirely. The pitfall here is that relying on raw JSON for CDC events creates a brittle, implicit contract between the database schema and every consumer. This contract is not checked at compile time and breaks at the worst possible moment: in production.
This outage forced a fundamental rethink. Our pipeline needed to be not just aware of schema changes, but resilient to them. The solution was to introduce a proper schema management system. We chose the Confluent Schema Registry and switched our serialization format from JSON to Avro. Avro schemas are defined explicitly, support evolution rules (like adding or removing fields with defaults), and integrate seamlessly with the Kafka ecosystem. The Schema Registry acts as a centralized store for these schemas, versioning them as they evolve. Debezium would now publish compact Avro messages, and our Scala consumer would deserialize them by first fetching the correct schema version from the registry.
This shift required a significant refactoring of our project structure and code, but it was the only way to build a production-grade system.
The Monorepo and Infrastructure Foundation
The first step was structuring our SBT monorepo to support this new, schema-aware architecture. We needed three distinct sub-projects:
-
common
: A project to hold the Avro schema definitions (.avsc
files) and the auto-generated Scala case classes. This would be a shared dependency. -
producer-app
: A mock application simulating our monolith, responsible for database migrations and writing to MySQL. -
consumer-app
: The core Scala CDC consumer that reads from Kafka and processes the events.
Here is the top-level build.sbt
file that wires everything together.
// build.sbt
import sbt.Keys.libraryDependencies
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.13.12"
lazy val pekkoVersion = "1.0.2"
lazy val pekkoConnectorsVersion = "1.0.1"
lazy val avroVersion = "1.11.3"
lazy val confluentVersion = "7.5.0"
lazy val commonSettings = Seq(
organization := "com.example.cdc",
scalacOptions ++= Seq(
"-deprecation",
"-feature",
"-unchecked",
"-language:implicitConversions",
"-language:higherKinds"
)
)
// Project for shared Avro schemas and generated classes
lazy val common = (project in file("common"))
.settings(
commonSettings,
name := "cdc-common",
// sbt-avrohugger plugin automatically generates Scala case classes from .avsc files
Compile / avroSource := file("common/src/main/avro"),
Compile / avroScalaCustomTypes := Map(
"timestamp-millis" -> "java.time.Instant"
),
libraryDependencies ++= Seq(
"org.apache.avro" % "avro" % avroVersion,
"ch.qos.logback" % "logback-classic" % "1.4.11"
)
)
// Project simulating the monolith writing to MySQL
lazy val producerApp = (project in file("producer-app"))
.dependsOn(common)
.settings(
commonSettings,
name := "cdc-producer-app",
libraryDependencies ++= Seq(
"mysql" % "mysql-connector-java" % "8.0.33",
"org.flywaydb" % "flyway-core" % "9.22.3",
"org.flywaydb" % "flyway-mysql" % "9.22.3",
"com.zaxxer" % "HikariCP" % "5.0.1",
"org.slf4j" % "slf4j-api" % "2.0.9"
)
)
// Project for our schema-aware Scala consumer
lazy val consumerApp = (project in file("consumer-app"))
.dependsOn(common)
.settings(
commonSettings,
name := "cdc-consumer-app",
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-stream" % pekkoVersion,
"org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion,
"org.apache.pekko" %% "pekko-stream-kafka" % pekkoConnectorsVersion,
"io.confluent" % "kafka-avro-serializer" % confluentVersion,
// Test dependencies
"org.apache.pekko" %% "pekko-stream-testkit" % pekkoVersion % Test,
"net.manub" %% "scalatest-embedded-kafka" % "3.0.0" % Test,
"org.scalatest" %% "scalatest" % "3.2.17" % Test
)
)
lazy val root = (project in file("."))
.aggregate(common, producerApp, consumerApp)
.settings(commonSettings)
The core infrastructure is defined in a docker-compose.yml
file. This is not a toy setup; it includes Kafka Connect and the Schema Registry, which are critical for the final solution.
# docker-compose.yml
version: '3.8'
services:
mysql:
image: debezium/example-mysql:2.4
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
ports:
- "2181:2181"
environment:
- ZOOKEEPER_CLIENT_PORT=2181
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
- KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
depends_on:
- zookeeper
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
ports:
- "8081:8081"
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:29092
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
depends_on:
- kafka
connect:
image: debezium/connect:2.4
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=kafka:29092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
# Use AvroConverter and point to the schema registry
- KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
depends_on:
- kafka
- schema-registry
With the infrastructure defined, we register the Debezium connector. The configuration now crucially specifies AvroConverter
.
// debezium-connector-config.json
{
"name": "user-profile-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "debezium",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"table.include.list": "inventory.user_profiles",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "schema-changes.inventory",
"include.schema.changes": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"decimal.handling.mode": "double",
"topic.prefix": "dbserver1"
}
}
To register it, we run: curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @debezium-connector-config.json
.
Evolving Schemas and the Resilient Consumer
The core of the solution lies in the common
project. We define our initial Avro schema for the user_profiles
table.
// common/src/main/avro/UserProfileV1.avsc
{
"type": "record",
"name": "UserProfile",
"namespace": "com.example.cdc.common.model",
"fields": [
{"name": "id", "type": "long"},
{"name": "username", "type": "string"},
{"name": "email", "type": "string"},
{
"name": "created_at",
"type": {"type": "long", "logicalType": "timestamp-millis"}
}
]
}
The sbt-avrohugger
plugin automatically generates UserProfile.scala
from this file during compilation.
Now, we build the consumer. It uses Pekko Connectors Kafka (pekko-stream-kafka
) and Confluent’s KafkaAvroDeserializer
. The configuration for the deserializer is critical; it must point to the Schema Registry URL so it can fetch the correct schema for each message.
// consumer-app/src/main/scala/com/example/cdc/consumer/Main.scala
package com.example.cdc.consumer
import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.kafka.scaladsl.Consumer
import org.apache.pekko.kafka.{ConsumerSettings, Subscriptions}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext
object Main extends App {
implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "CdcConsumer")
implicit val ec: ExecutionContext = system.executionContext
private val logger = LoggerFactory.getLogger(getClass)
private val kafkaBootstrapServers = "localhost:9092"
private val schemaRegistryUrl = "http://localhost:8081"
private val topic = "dbserver1.inventory.user_profiles"
private val groupId = "user-profile-consumer-group"
// Configuration for the KafkaAvroDeserializer
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new KafkaAvroDeserializer)
.withBootstrapServers(kafkaBootstrapServers)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty("schema.registry.url", schemaRegistryUrl)
// Important for Avro deserialization of Debezium messages
.withProperty("specific.avro.reader", "false")
val stream = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
.map { record =>
// The value is now a GenericRecord, an Avro-native representation
val value = record.value().asInstanceOf[GenericRecord]
// Debezium wraps the actual data in a payload structure.
// We need to extract the 'after' state for INSERTs and UPDATEs.
// A proper implementation would also handle DELETEs ('before' state) and other op types.
val operation = value.get("op").toString
val payload = operation match {
case "c" | "u" => value.get("after").asInstanceOf[GenericRecord] // create or update
case "d" => value.get("before").asInstanceOf[GenericRecord] // delete
case _ => value // Should not happen for standard CDC
}
(operation, payload)
}
.map { case (op, payload) =>
// Now we can safely access fields by name
val id = payload.get("id")
val username = payload.get("username")
val email = payload.get("email")
// This is where the resilience comes in. We check for the new field's existence.
// Avro's GenericRecord allows for this dynamic, yet safe, access.
val secondaryEmail = if (payload.getSchema.getField("secondary_email") != null) {
Option(payload.get("secondary_email")).map(_.toString)
} else {
None
}
s"Operation: $op, ID: $id, Username: $username, Email: $email, SecondaryEmail: ${secondaryEmail.getOrElse("N/A")}"
}
.runForeach(result => logger.info(s"Processed event: $result"))
logger.info("CDC Consumer started...")
stream.onComplete {
case scala.util.Success(_) =>
logger.info("Stream completed successfully.")
system.terminate()
case scala.util.Failure(e) =>
logger.error("Stream failed.", e)
system.terminate()
}
}
A key detail is specific.avro.reader
set to false
. This tells the deserializer to give us a GenericRecord
, which is like a map, rather than trying to cast it to a specific generated class. This is crucial for handling schema evolution. When a new field like secondary_email
is added, we can check for its presence on the GenericRecord
‘s schema (payload.getSchema.getField(...)
) before attempting to access it. Our consumer no longer crashes; it adapts.
Let’s simulate the schema evolution. First, our mock producer inserts a V1 record.
// In a mock producer application
// ... database connection setup ...
val stmt = conn.prepareStatement("INSERT INTO user_profiles (id, username, email, created_at) VALUES (?, ?, ?, ?)")
stmt.setLong(1, 101)
stmt.setString(2, "john.doe")
stmt.setString(3, "[email protected]")
stmt.setTimestamp(4, new java.sql.Timestamp(System.currentTimeMillis()))
stmt.executeUpdate()
The consumer logs:Processed event: Operation: c, ID: 101, Username: john.doe, Email: [email protected], SecondaryEmail: N/A
Now, we apply the schema migration and update our Avro schema file.
-- db/migration/V2__add_secondary_email.sql
ALTER TABLE user_profiles ADD COLUMN secondary_email VARCHAR(255) NULL;
// common/src/main/avro/UserProfileV2.avsc
{
"type": "record",
"name": "UserProfile",
"namespace": "com.example.cdc.common.model",
"fields": [
{"name": "id", "type": "long"},
{"name": "username", "type": "string"},
{"name": "email", "type": "string"},
{
"name": "created_at",
"type": {"type": "long", "logicalType": "timestamp-millis"}
},
// New optional field added
{"name": "secondary_email", "type": ["null", "string"], "default": null}
]
}
Debezium detects this change and registers a new version of the schema in the Schema Registry. Then, our producer inserts a V2 record.
// In mock producer
val stmt = conn.prepareStatement("INSERT INTO user_profiles (id, username, email, secondary_email, created_at) VALUES (?, ?, ?, ?, ?)")
stmt.setLong(1, 102)
stmt.setString(2, "jane.doe")
stmt.setString(3, "[email protected]")
stmt.setString(4, "[email protected]")
stmt.setTimestamp(5, new java.sql.Timestamp(System.currentTimeMillis()))
stmt.executeUpdate()
The consumer, without any code change or restart, correctly processes the new message format.
Processed event: Operation: c, ID: 102, Username: jane.doe, Email: [email protected], SecondaryEmail: [email protected]
This architecture has solved the schema drift problem.
graph TD A[Monolith/Producer App] -- Writes to --> B(MySQL Database); B -- Binlog --> C{Debezium Connector}; C -- 1. Reads Schema --> B; C -- 2. Registers Avro Schema --> D(Schema Registry); C -- 3. Publishes Avro --> E(Kafka Topic); F[Scala Consumer App] -- 1. Consumes Avro --> E; F -- 2. Fetches Schema by ID --> D; F -- 3. Deserializes & Processes --> G[Analytics Service/Sink];
Addressing Delivery Guarantees and Testing
The current consumer uses auto-commit, which provides at-most-once semantics. In a real-world project, this is often insufficient. If the consumer crashes after processing a batch but before the offsets are committed, that data is lost upon restart. We need at-least-once semantics. This is achieved by taking manual control of offset commits.
// consumer-app/src/main/scala/com/example/cdc/consumer/ReliableConsumer.scala (excerpt)
import org.apache.pekko.kafka.scaladsl.Consumer.Control
import org.apache.pekko.kafka.{CommitterSettings, Subscriptions}
import org.apache.pekko.kafka.scaladsl.Committer
// ...
val committerSettings = CommitterSettings(system)
val stream: Control =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1) { msg =>
// The processing logic is now inside a Future
val processingFuture = Future {
val value = msg.record.value().asInstanceOf[GenericRecord]
val payload = value.get("after").asInstanceOf[GenericRecord]
val id = payload.get("id")
logger.info(s"Processing record with ID: $id")
// Potentially write to a database or call another service here
// This is the "work"
}
// After the work is done, we pass the committable offset along
processingFuture.map(_ => msg.committableOffset)
}
// Commit offsets in batches for efficiency
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
This pattern ensures that we only commit an offset to Kafka after our processing logic for that message has successfully completed. If the application crashes mid-process, it will restart from the last committed offset and re-process the message, guaranteeing at-least-once delivery. True exactly-once semantics would require the downstream sink to be idempotent (e.g., using INSERT ... ON DUPLICATE KEY UPDATE
if writing to another database).
Finally, a robust system requires testing. Using a library like scalatest-embedded-kafka
, we can write integration tests for our consumer logic without needing a full Docker environment.
// consumer-app/src/test/scala/com/example/cdc/consumer/ConsumerSpec.scala
import net.manub.scalatest.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.matchers.should.Matchers
// ... other imports
class ConsumerSpec extends AnyWordSpec with Matchers with EmbeddedKafka {
implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
kafkaPort = 9093,
zooKeeperPort = 2182
)
"UserProfile Consumer" should {
"process a valid Avro message from Kafka" in {
// Setup: This would involve a mock Schema Registry or pre-registering schemas
// For simplicity, we can use a simpler deserializer for the test if schema registry is complex to mock
// A more advanced test would use a test container for the schema registry
val topic = "dbserver1.inventory.user_profiles.test"
// We'd create a sample Avro GenericRecord here and serialize it using KafkaAvroSerializer
val sampleMessage = "..." // Serialized Avro payload
withRunningKafka {
publishStringMessageToKafka(topic, sampleMessage)
// Run the consumer stream against the embedded kafka instance
// And assert that the expected side-effects (e.g., logging, database writes) occur
// This part is highly dependent on the consumer's sink logic
}
}
}
}
A common mistake is neglecting tests for data pipelines. While mocking the full Debezium-to-Kafka flow is complex, testing the consumer’s deserialization and business logic against a local embedded Kafka is essential for maintainability.
The final architecture, centered around a schema registry and managed within a monorepo, provides a solid foundation for evolving our systems. The initial pain of the production outage forced us to build a more principled, resilient solution. The operational cost of managing a Schema Registry is not zero, and it adds a point of failure that must be monitored. Furthermore, this solution gracefully handles additive schema changes (new optional columns) but requires a more deliberate, multi-step deployment strategy for destructive changes like renaming or deleting columns to avoid breaking consumers that haven’t yet deployed the new code. The next iteration would involve building automated guardrails in our CI/CD pipeline to detect breaking schema changes within the monorepo before they are ever applied to the database, creating a truly unified and safe development lifecycle.