Leveraging MySQL Change Data Capture with a Kotlin Service to Trigger Real-Time Gatsby Incremental Builds


The operational bottleneck for our content platform, managing over 50,000 statically generated pages with Gatsby, was the build time. Sourcing data from a central MySQL database, a full production build consistently clocked in at over fifteen minutes. For our editorial team, this delay between hitting “save” and seeing the content live was an unacceptable friction point, crippling the promise of a nimble, modern content workflow. The initial, naive approach of a Kotlin backend service polling the database for last_updated_at timestamps every few seconds was a non-starter. It was resource-intensive, failed to capture deletions gracefully, and placed a constant, unnecessary load on our primary database. We needed a reactive, event-driven architecture, not a brute-force polling mechanism.

Our first real architectural pivot was towards Change Data Capture (CDC). Instead of asking the database “what’s new?” repeatedly, CDC allows us to have the database tell us precisely what changed, the moment it changed. This is a complete paradigm shift. The core idea is to tap into MySQL’s binary log (binlog), a low-level, ordered record of every data modification. This log, designed for replication and point-in-time recovery, becomes our real-time event stream.

The choice to implement this was not a full-blown Kafka and Kafka Connect deployment. While powerful, that felt like using a sledgehammer to crack a nut for this specific problem. It would introduce significant operational overhead with Zookeeper, Kafka brokers, and Connect workers. The breakthrough was discovering the Debezium embedded engine. This allows us to run the Debezium MySQL connector directly within our Kotlin application’s process. It’s a library, not a separate service. This gave us the power of Debezium’s reliable CDC processing without the infrastructural complexity, a perfect middle-ground. Kotlin, with its powerful coroutines for asynchronous programming and strong JVM ecosystem, was the ideal language to host this engine and orchestrate the downstream logic.

Phase 1: Database Preparation for CDC

Before writing a single line of Kotlin, MySQL must be configured to produce the data Debezium needs. This involves enabling the binary log and setting its format to ROW. The ROW format is critical; it logs the actual changed row data, unlike STATEMENT (which logs the SQL statement) or MIXED. Debezium requires ROW to construct detailed change events.

Here are the essential parameters for the MySQL configuration file (my.cnf or mysqld.cnf):

# /etc/mysql/my.cnf

[mysqld]
# A unique ID for the server in a replication topology. Must be > 0.
server-id                = 223344
# Enable the binary log.
log_bin                  = mysql-bin
# Set binlog format to ROW. This is mandatory for Debezium.
binlog_format            = ROW
# Defines how many binlog files to keep. Adjust based on disk space and recovery needs.
expire_logs_days         = 10
# Recommended for consistency.
binlog_row_image         = FULL
# GTID (Global Transaction Identifier) mode is not strictly required by Debezium
# but is a best practice for modern replication setups.
gtid_mode                = ON
enforce_gtid_consistency = ON

After applying these settings and restarting the MySQL server, we need a dedicated user for Debezium with the appropriate replication permissions. Running as root in production is a major security anti-pattern.

-- Create a dedicated user for the CDC service.
CREATE USER 'debezium'@'%' IDENTIFIED BY 'your-secure-password';

-- Grant the necessary privileges for replication.
-- REPLICATION SLAVE: Allows reading the binary log.
-- REPLICATION CLIENT: Allows showing master status and binlog files.
-- SELECT: Allows Debezium to get an initial consistent snapshot of the data.
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';

-- It's good practice to lock tables during the initial snapshot to ensure consistency.
-- This requires an additional privilege.
GRANT LOCK TABLES ON your_database.* TO 'debezium'@'%';

FLUSH PRIVILEGES;

With the database primed, the foundation is set. MySQL will now meticulously record every change, waiting for our service to listen.

Phase 2: The Kotlin CDC Service with Embedded Debezium

This service is the heart of the new architecture. It’s a lightweight, standalone Kotlin application whose sole purpose is to listen to MySQL and trigger a Gatsby build when relevant changes occur.

First, the project dependencies in build.gradle.kts. We need the Debezium embedded engine, the MySQL connector, and a logging framework. We’ll also add Ktor client to make HTTP requests to our build server.

// build.gradle.kts
plugins {
    kotlin("jvm") version "1.9.20"
    application
}

repositories {
    mavenCentral()
}

dependencies {
    // Debezium Core
    implementation("io.debezium:debezium-api:2.4.1.Final")
    implementation("io.debezium:debezium-embedded:2.4.1.Final")
    implementation("io.debezium:debezium-connector-mysql:2.4.1.Final")

    // Kotlin Coroutines for async operations
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")

    // Ktor Client for HTTP requests
    implementation("io.ktor:ktor-client-core:2.3.6")
    implementation("io.ktor:ktor-client-cio:2.3.6") // CIO engine
    implementation("io.ktor:ktor-client-content-negotiation:2.3.6")
    implementation("io.ktor:ktor-serialization-kotlinx-json:2.3.6")

    // Logging
    implementation("ch.qos.logback:logback-classic:1.4.11")
    implementation("org.slf4j:slf4j-api:2.0.9")
}

application {
    mainClass.set("com.mycompany.cdc.CdcApplicationKt")
}

Next, we define the configuration for the Debezium engine. This is normally done via properties files, but for clarity and type safety in Kotlin, we can create a function that builds the io.debezium.config.Configuration object directly.

// src/main/kotlin/com/mycompany/cdc/DebeziumConfig.kt
package com.mycompany.cdc

import io.debezium.config.Configuration
import java.io.File

fun getDebeziumConfig(): Configuration {
    val offsetStorageFile = File("cdc-storage/offsets.dat")
    val dbHistoryFile = File("cdc-storage/dbhistory.dat")

    // Ensure storage directories exist
    offsetStorageFile.parentFile.mkdirs()
    dbHistoryFile.parentFile.mkdirs()

    return Configuration.create()
        /*
         * Unique name for the connector. This is critical as it scopes the persisted offsets.
         * If you change this name, Debezium will start from the beginning.
         */
        .with("name", "gatsby-content-connector")
        
        // The Java class for the MySQL connector
        .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
        
        /*
         * Offset storage. For the embedded engine, a simple file-based storage is sufficient.
         * In a clustered environment, you'd use KafkaOffsetBackingStore or similar.
         */
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", offsetStorageFile.absolutePath)
        .with("offset.flush.interval.ms", "5000") // Flush offsets every 5 seconds
        
        /*
         * Database history. This is where Debezium stores the DDL changes.
         * It's mandatory for the MySQL connector.
         */
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", dbHistoryFile.absolutePath)
        
        // -- Database connection properties --
        .with("database.hostname", "your-mysql-host")
        .with("database.port", "3306")
        .with("database.user", "debezium")
        .with("database.password", "your-secure-password")
        .with("database.server.id", "85744") // Must be unique and not conflict with MySQL's server-id
        .with("database.server.name", "gatsby_content_db") // A logical name for the source database server
        
        // -- What to capture --
        // A whitelist of databases. Highly recommended for production.
        .with("database.include.list", "your_database")
        // Optionally, a whitelist of tables within those databases.
        .with("table.include.list", "your_database.articles,your_database.authors")
        
        // -- Snapshot configuration --
        // 'initial' mode takes a consistent snapshot of the tables upon first startup.
        .with("snapshot.mode", "initial")
        .build()
}

The core of the application is the CdcEngine class, which encapsulates the Debezium engine’s lifecycle and event handling. We use Kotlin coroutines to manage the asynchronous nature of the engine.

// src/main/kotlin/com/mycompany/cdc/CdcEngine.kt
package com.mycompany.cdc

import io.debezium.engine.ChangeEvent
import io.debezium.engine.DebeziumEngine
import io.debezium.engine.format.Json
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import org.slf4j.LoggerFactory
import java.io.Closeable
import java.util.concurrent.Executors

class CdcEngine : Closeable {
    private val logger = LoggerFactory.getLogger(javaClass)

    // Using a single-threaded executor for the Debezium engine itself.
    private val executor = Executors.newSingleThreadExecutor()
    
    // A coroutine scope for managing background tasks.
    private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

    // A communication channel to pass change events from Debezium's callback to a coroutine.
    private val eventChannel = Channel<String>(Channel.BUFFERED)

    private val engine: DebeziumEngine<ChangeEvent<String, String>> = DebeziumEngine.create(Json::class.java)
        .using(getDebeziumConfig().asProperties())
        .notifying { record ->
            // This callback is executed by the Debezium thread.
            // It's crucial to not block this thread. We offload the work by sending to a channel.
            record.value()?.let {
                try {
                    // Using trySend to avoid blocking if the channel is full.
                    eventChannel.trySend(it).isSuccess
                } catch (e: Exception) {
                    logger.error("Failed to send event to channel", e)
                }
            }
        }
        .using { status, throwable ->
            // Callback for engine lifecycle events (e.g., completion, errors).
            if (status.isCompleted || throwable != null) {
                logger.error("Debezium engine stopped. Status: $status", throwable)
                // Here you might implement a restart strategy.
                scope.cancel()
            }
        }
        .build()

    fun start() {
        // Run the Debezium engine in its own thread.
        executor.execute(engine)
        
        // Launch a coroutine to process events from the channel.
        scope.launch {
            processEvents()
        }
        logger.info("CDC Engine started and listening for changes...")
    }

    private suspend fun processEvents() {
        // The build trigger needs to be smart to avoid overwhelming the build system.
        // We'll batch changes and trigger a build at most once every 10 seconds.
        val buildTrigger = BuildTrigger()
        
        try {
            for (event in eventChannel) {
                logger.debug("Received raw event: $event")
                // A simple notification that *something* changed is enough for now.
                // In a more advanced setup, you could parse the event to decide
                // if a build is even necessary.
                buildTrigger.notifyChange()
            }
        } catch (e: CancellationException) {
            logger.info("Event processing coroutine cancelled.")
        } catch (e: Exception) {
            logger.error("Error in event processing loop", e)
        } finally {
            buildTrigger.close()
        }
    }

    override fun close() {
        logger.info("Closing CDC Engine...")
        engine.close()
        executor.shutdown()
        scope.cancel()
        eventChannel.close()
    }
}

fun main() {
    val cdcEngine = CdcEngine()
    cdcEngine.start()

    // Keep the main thread alive. In a real app, this would be part of a server framework.
    Runtime.getRuntime().addShutdownHook(Thread {
        cdcEngine.close()
    })
}

The coolest part of this setup is the decoupling between the Debezium thread and our application logic. The notifying lambda is on a hot path; blocking it can disrupt the entire CDC pipeline. By immediately handing off the event payload to a kotlinx.coroutines.channels.Channel, we free up the Debezium thread instantly and process the event asynchronously in a separate coroutine.

Phase 3: Triggering Gatsby Builds Intelligently

Simply calling the build webhook on every single database change would be disastrous. If an editor saves a document 10 times in a minute, we would queue 10 separate builds. This would be even worse than the original problem. We need a debouncing or batching mechanism.

The BuildTrigger class handles this logic. It uses coroutines to collect change notifications and triggers a single build after a period of inactivity.

// src/main/kotlin/com/mycompany/cdc/BuildTrigger.kt
package com.mycompany.cdc

import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.contentnegotiation.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.serialization.kotlinx.json.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import org.slf4j.LoggerFactory
import java.io.Closeable

class BuildTrigger(
    private val debounceIntervalMillis: Long = 10000 // 10 seconds
) : Closeable {
    private val logger = LoggerFactory.getLogger(javaClass)
    private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
    private val triggerChannel = Channel<Unit>(Channel.CONFLATED)

    // A mutable state to track if a build is pending.
    @Volatile
    private var buildPending = false

    private val httpClient = HttpClient(CIO) {
        install(ContentNegotiation) {
            json()
        }
    }

    private val gatsbyBuildWebhookUrl = "https://api.gatsbyjs.com/v1/builds/YOUR_BUILD_WEBHOOK_ID"

    init {
        scope.launch {
            debounceAndTrigger()
        }
    }

    // This function is cheap to call. It just sends a signal.
    fun notifyChange() {
        if (!buildPending) {
            logger.info("Change detected. Scheduling a build.")
            buildPending = true
            triggerChannel.trySend(Unit)
        } else {
            logger.debug("Change detected, but a build is already pending. Ignoring.")
        }
    }

    private suspend fun debounceAndTrigger() {
        for (signal in triggerChannel) {
            // Wait for the debounce interval. The conflated channel ensures that
            // multiple signals during this period result in only one trigger.
            delay(debounceIntervalMillis)
            
            try {
                logger.info("Debounce period ended. Triggering Gatsby build NOW.")
                val response = httpClient.post(gatsbyBuildWebhookUrl) {
                    contentType(ContentType.Application.Json)
                }

                if (response.status.isSuccess()) {
                    logger.info("Successfully triggered Gatsby build. Status: ${response.status}")
                } else {
                    logger.error("Failed to trigger Gatsby build. Status: ${response.status}, Body: ${response.status.description}")
                    // You might want a retry mechanism here.
                }

            } catch (e: Exception) {
                logger.error("Exception occurred while triggering build", e)
            } finally {
                // Reset the state to allow the next change to schedule a new build.
                buildPending = false
                logger.info("Ready to accept new change notifications.")
            }
        }
    }

    override fun close() {
        scope.cancel()
        httpClient.close()
        triggerChannel.close()
    }
}

This implementation is robust. The CONFLATED channel holds at most one pending signal. When notifyChange() is called repeatedly, it simply replaces the existing signal. The debounceAndTrigger coroutine consumes this signal, waits 10 seconds, and only then makes the HTTP call. This completely prevents build spam.

The final architecture can be visualized as follows:

graph TD
    A[MySQL Database] -- Binlog Events --> B{Kotlin CDC Service};
    subgraph B
        direction LR
        B1[Debezium Embedded Engine] -- Raw ChangeEvent --> B2{Event Channel};
        B2 -- Event --> B3{Processing Coroutine};
        B3 -- notifyChange() --> B4[BuildTrigger];
    end
    B4 -- Debounced HTTP POST --> C[Gatsby Cloud Build Webhook];
    C -- Triggers Incremental Build --> D[Live Gatsby Site];
    E[Content Editor] -- Saves Content --> A;
    D -- Served to --> F[User];

This entire flow, from a database UPDATE statement to a live site update, now completes in under 90 seconds on average, with 10 seconds of that being our deliberate debounce window. The vast majority of the time is Gatsby’s own incremental build process. This is a game-changer for our content velocity. We moved from a frustrating 15-minute polling-based cycle to a sub-2-minute event-driven reality.

The current solution, while transformative, is not without its limitations. The Debezium embedded engine, running inside our single Kotlin application instance, represents a single point of failure. If this service crashes, no updates will be processed until it is manually restarted. The file-based offset storage also means it’s not trivial to run multiple instances for high availability.

For a future, more resilient iteration, the logical step is to graduate from the embedded engine to a full Debezium connector managed by a Kafka Connect cluster. This would involve introducing a Kafka topic as a durable, persistent log for all database change events. Our Kotlin service would then transform from a CDC engine host into a simpler Kafka consumer. This would decouple the event capture from event processing, allowing each to be scaled and managed independently, providing the fault tolerance required for a true mission-critical system. Furthermore, the current webhook trigger is still a blunt instrument. A more advanced implementation could parse the change event payload in the Kotlin service, gather a batch of changed entity IDs (e.g., article-123, author-45), and pass this data in the webhook payload. The build process could then use this information to be even more surgical in what pages it regenerates, pushing us ever closer to instantaneous updates.


  TOC