Constructing a CDC-Based Vector Synchronization Pipeline from MariaDB to Weaviate


Our primary product catalog, a critical asset, resides in a well-structured MariaDB instance. While excellent for transactional integrity and structured queries, it’s completely inept at handling semantic search. A user searching for “summer outdoor party wear” gets zero results if the product is named “Men’s Linen Beach Shirt”. The business requirement was clear: enable powerful, meaning-based search on our catalog. The initial attempt, a nightly batch job that read the entire product table, generated embeddings, and bulk-loaded them into Weaviate, was a predictable failure. The data was stale by up to 24 hours, and the process was resource-intensive and error-prone. The search index was perpetually out of sync with our inventory and pricing reality, leading to customer frustration.

The core of the problem was the lack of real-time synchronization. This pointed directly toward a Change Data Capture (CDC) architecture. The concept was to treat database changes as a stream of events. Instead of querying the database, we listen to it. The initial architectural sketch was straightforward: MariaDB’s binary log would be tailed by Debezium, which would publish change events to Kafka. A Python consumer would then process these events, generate vector embeddings for product descriptions, and update a Weaviate vector index in near real-time. This approach promised low latency and minimal impact on the source MariaDB instance.

The technology stack selection was driven by production pragmatism. MariaDB was the immovable incumbent. Weaviate was selected for its mature Python client, hybrid search capabilities, and straightforward deployment via Docker. For the CDC backbone, the Debezium and Kafka combination is the de facto industry standard, offering reliability and scalability. Python was the natural choice for the consumer due to its rich ecosystem for machine learning, specifically the sentence-transformers library for high-quality embedding generation. We planned to use a Jupyter notebook for the initial development and debugging phase to rapidly iterate on the consumer logic before hardening it into a deployable service.

The first step was establishing the infrastructure. A docker-compose.yml file is the most effective way to manage this multi-container environment for development. It ensures all services—from the source database to the vector index—are configured correctly and can communicate.

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  mariadb:
    image: mariadb:10.6
    container_name: mariadb
    restart: unless-stopped
    environment:
      MARIADB_ROOT_PASSWORD: rootpassword
      MARIADB_DATABASE: catalog
      MARIADB_USER: user
      MARIADB_PASSWORD: password
    ports:
      - "3306:3306"
    volumes:
      - ./mariadb-conf:/etc/mysql/conf.d
    command:
      --character-set-server=utf8mb4
      --collation-server=utf8mb4_unicode_ci

  debezium:
    image: debezium/connect:2.1
    container_name: debezium
    ports:
      - "8083:8083"
    depends_on:
      - kafka
      - mariadb
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      STATUS_STORAGE_TOPIC: my_connect_statuses

  weaviate:
    image: semitechnologies/weaviate:1.22.3
    container_name: weaviate
    ports:
      - "8080:8080"
    restart: on-failure:0
    environment:
      QUERY_DEFAULTS_LIMIT: 25
      AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
      PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
      DEFAULT_VECTORIZER_MODULE: 'none'
      ENABLE_MODULES: ''
      CLUSTER_HOSTNAME: 'node1'

  memcached:
    image: memcached:1.6.18-alpine
    container_name: memcached
    ports:
      - "11211:11211"

A critical detail for MariaDB is enabling the binary log (binlog), which is Debezium’s data source. This is done via a configuration file mounted into the container.

# ./mariadb-conf/custom.cnf
[mysqld]
server-id=223344
log-bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
expire_logs_days=10

With the infrastructure defined, we set up the source data. A simple products table in the catalog database serves as our model.

-- Initial setup SQL executed against MariaDB
CREATE TABLE products (
    id INT AUTO_INCREMENT PRIMARY KEY,
    sku VARCHAR(50) UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    price DECIMAL(10, 2),
    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- Grant permissions for Debezium user
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user'@'%';
FLUSH PRIVILEGES;

-- Insert some initial data
INSERT INTO products (sku, name, description, price) VALUES
('TS-001', 'Cotton T-Shirt', 'A classic crew-neck t-shirt made from 100% premium cotton.', 19.99),
('JN-001', 'Slim Fit Jeans', 'Modern slim fit jeans crafted from stretch denim for comfort.', 79.50);

Next, the Debezium connector configuration is posted to its REST API. This JSON object instructs Debezium which server and tables to monitor and where to publish the change events.

// debezium-connector-config.json
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mariadb",
    "database.port": "3306",
    "database.user": "user",
    "database.password": "password",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.include.list": "catalog",
    "table.include.list": "catalog.products",
    "database.history.kafka.bootstrap.servers": "kafka:29092",
    "database.history.kafka.topic": "dbhistory.inventory",
    "include.schema.changes": "false",
    "tombstones.on.delete": "false"
  }
}

This is registered using a curl command: curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @debezium-connector-config.json.

Now, we define the schema in Weaviate. We’ll create a class named Product and specify which properties should be indexed and which should be vectorized. Here, only the description field will be converted into a vector for semantic search.

# weaviate_schema.py
import weaviate

client = weaviate.Client("http://localhost:8080")

schema = {
    "classes": [
        {
            "class": "Product",
            "description": "A product from the catalog",
            "properties": [
                {
                    "name": "sku",
                    "dataType": ["string"],
                    "description": "The unique stock keeping unit",
                },
                {
                    "name": "name",
                    "dataType": ["string"],
                    "description": "The name of the product",
                },
                {
                    "name": "description",
                    "dataType": ["text"],
                    "description": "The detailed product description to be vectorized",
                },
                {
                    "name": "price",
                    "dataType": ["number"],
                    "description": "The price of the product",
                }
            ],
            # We are not using a built-in vectorizer; embeddings will be provided manually.
            "vectorizer": "none"
        }
    ]
}

# Clean slate for idempotency
if client.schema.exists("Product"):
    client.schema.delete_class("Product")

client.schema.create(schema)
print("Weaviate schema for 'Product' created.")

With the pipeline infrastructure in place, the focus shifted to the consumer. The initial version was a straightforward script to prove the concept. It connects to Kafka, polls for messages, parses the Debezium JSON payload, generates an embedding for the description, and upserts the object into Weaviate.

# consumer_v1.py - A simplified initial consumer
import json
import logging
from kafka import KafkaConsumer
import weaviate
from sentence_transformers import SentenceTransformer

# ... (logging setup, constants)

def process_message(msg, weaviate_client, embedding_model):
    # Simplified parsing and processing
    message = json.loads(msg.value.decode('utf-8'))
    payload = message.get('payload', {})
    op = payload.get('op') # 'c' for create, 'u' for update, 'd' for delete
    
    if op in ['c', 'u']:
        data = payload.get('after', {})
        if not data:
            return
            
        description_text = data.get('description', '')
        # Generate embedding
        vector = embedding_model.encode(description_text).tolist()
        
        product_object = {
            "sku": data.get('sku'),
            "name": data.get('name'),
            "description": description_text,
            "price": data.get('price')
        }
        
        # UUID is generated based on the SKU for idempotency
        product_uuid = weaviate.util.generate_uuid5(data.get('sku'), 'Product')
        
        weaviate_client.data_object.replace(
            uuid=product_uuid,
            class_name="Product",
            data_object=product_object,
            vector=vector
        )
        logging.info(f"Upserted product with SKU: {data.get('sku')}")

    elif op == 'd':
        data = payload.get('before', {})
        if not data:
            return

        product_uuid = weaviate.util.generate_uuid5(data.get('sku'), 'Product')
        weaviate_client.data_object.delete(uuid=product_uuid, class_name="Product")
        logging.info(f"Deleted product with SKU: {data.get('sku')}")

# ... (main loop connecting to Kafka and calling process_message)

This initial version worked during simple tests. An INSERT, UPDATE, or DELETE in MariaDB was reflected in Weaviate within seconds. The problem surfaced during load testing. We simulated a common production scenario: a script that rapidly updates the prices or descriptions of a small subset of products. For instance, updating a single product’s description three times in one second.

UPDATE products SET description = 'new description 1' WHERE sku = 'TS-001';
UPDATE products SET description = 'new description 2' WHERE sku = 'TS-001';
UPDATE products SET description = 'new description 3' WHERE sku = 'TS-001';

The consumer dutifully processed all three events. This meant three separate calls to the embedding model and three separate replace operations to Weaviate for the same object, all within a second. This is grossly inefficient. The embedding model call is computationally expensive, and the rapid-fire API calls to Weaviate create unnecessary network and database load. In a real-world “update storm,” where thousands of records might be updated by a batch job, the consumer would quickly fall behind, and Kafka consumer lag would skyrocket. This was the exact scenario that mandated a more intelligent solution.

The bottleneck was the redundant work performed on rapidly successive updates for the same object. The solution was to introduce a short-lived cache to deduplicate this work. Memcached was the perfect tool for this: a simple, high-speed, in-memory key-value store. The strategy was to cache the generated embedding for a given product’s description, but only for a very short duration (e.g., 1-2 seconds).

When an update event for SKU-123 arrives, the consumer would:

  1. Extract the description text.
  2. Create a cache key based on the SKU and a hash of the description text itself (e.g., embedding:SKU-123:sha256_of_description). Using a hash of the content ensures we only get a cache hit if the text is identical.
  3. Check Memcached for this key.
  4. Cache Hit: An identical update was processed within the last second. We can skip the expensive embedding generation and Weaviate API call, assuming the previous event was successfully processed. (A more robust implementation might still update Weaviate’s scalar fields). For this iteration, we focus on the embedding call.
  5. Cache Miss: This is a new description or enough time has passed. Generate the embedding, store it in Memcached with a 2-second TTL, and then proceed with the Weaviate update.

This refactoring led to a more production-ready consumer implementation, complete with proper configuration management, error handling, and the caching logic.

Here is the final, hardened consumer script. It’s designed to be run as a service, not just within a notebook.

# resilient_consumer.py
import os
import json
import time
import logging
import hashlib
from typing import Optional

from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable
import weaviate
from weaviate.exceptions import WeaviateException
from sentence_transformers import SentenceTransformer
from pymemcache.client.base import Client as MemcachedClient
from pymemcache.exceptions import MemcacheError

# --- Configuration ---
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
KAFKA_BROKER = os.getenv("KAFKA_BROKER", "localhost:9092")
KAFKA_TOPIC = "dbserver1.catalog.products"
KAFKA_GROUP_ID = "weaviate_sync_group"
WEAVIATE_URL = os.getenv("WEAVIATE_URL", "http://localhost:8080")
MEMCACHED_SERVER = os.getenv("MEMCACHED_SERVER", "localhost:11211")
EMBEDDING_MODEL = 'all-MiniLM-L6-v2'
CACHE_TTL_SECONDS = 2 # Short TTL to handle update bursts

# --- Logging Setup ---
logging.basicConfig(
    level=LOG_LEVEL,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler()],
)

class VectorSyncConsumer:
    """
    A resilient Kafka consumer to synchronize MariaDB changes to Weaviate
    with a Memcached-backed embedding cache.
    """
    def __init__(self):
        self.weaviate_client = self._connect_weaviate()
        self.kafka_consumer = self._connect_kafka()
        self.memcached_client = self._connect_memcached()
        logging.info(f"Loading embedding model: {EMBEDDING_MODEL}...")
        self.embedding_model = SentenceTransformer(EMBEDDING_MODEL)
        logging.info("Embedding model loaded.")

    def _connect_weaviate(self) -> weaviate.Client:
        """Connects to Weaviate with retry logic."""
        retries = 5
        for i in range(retries):
            try:
                client = weaviate.Client(WEAVIATE_URL, timeout_config=(5, 15))
                if client.is_ready():
                    logging.info("Successfully connected to Weaviate.")
                    return client
            except WeaviateException as e:
                logging.warning(f"Weaviate not ready, retrying... ({i+1}/{retries}). Error: {e}")
                time.sleep(5)
        logging.error("Could not connect to Weaviate after several retries.")
        raise ConnectionError("Failed to connect to Weaviate.")

    def _connect_kafka(self) -> KafkaConsumer:
        """Connects to Kafka with retry logic."""
        retries = 5
        for i in range(retries):
            try:
                consumer = KafkaConsumer(
                    KAFKA_TOPIC,
                    bootstrap_servers=[KAFKA_BROKER],
                    auto_offset_reset='earliest',
                    group_id=KAFKA_GROUP_ID,
                    enable_auto_commit=True,
                    value_deserializer=lambda v: json.loads(v.decode('utf-8'))
                )
                logging.info("Successfully connected to Kafka.")
                return consumer
            except NoBrokersAvailable:
                logging.warning(f"Kafka broker not available, retrying... ({i+1}/{retries})")
                time.sleep(5)
        logging.error("Could not connect to Kafka after several retries.")
        raise ConnectionError("Failed to connect to Kafka.")

    def _connect_memcached(self) -> MemcachedClient:
        """Connects to Memcached."""
        client = MemcachedClient(MEMCACHED_SERVER)
        logging.info("Memcached client initialized.")
        return client

    def _generate_text_hash(self, text: str) -> str:
        """Creates a SHA256 hash of the input text."""
        return hashlib.sha256(text.encode('utf-8')).hexdigest()

    def get_embedding(self, sku: str, text: str) -> Optional[list[float]]:
        """
        Gets embedding for text, using Memcached as a short-term cache
        to avoid redundant computations during update bursts.
        """
        if not text:
            return None # Handle empty descriptions

        text_hash = self._generate_text_hash(text)
        cache_key = f"embedding:{sku}:{text_hash}"
        
        try:
            cached_embedding = self.memcached_client.get(cache_key)
            if cached_embedding:
                logging.info(f"Cache HIT for SKU {sku}. Skipping embedding generation.")
                return json.loads(cached_embedding.decode('utf-8'))
        except MemcacheError as e:
            logging.error(f"Memcached GET error: {e}. Proceeding without cache.")

        logging.info(f"Cache MISS for SKU {sku}. Generating new embedding.")
        embedding = self.embedding_model.encode(text).tolist()

        try:
            self.memcached_client.set(
                cache_key,
                json.dumps(embedding).encode('utf-8'),
                expire=CACHE_TTL_SECONDS
            )
        except MemcacheError as e:
            logging.error(f"Memcached SET error: {e}. Failed to cache embedding.")
        
        return embedding

    def handle_delete(self, data: dict):
        """Handles a delete event from Debezium."""
        sku = data.get('sku')
        if not sku:
            logging.warning("Delete event received without SKU. Skipping.")
            return

        product_uuid = weaviate.util.generate_uuid5(sku, 'Product')
        try:
            self.weaviate_client.data_object.delete(uuid=product_uuid, class_name="Product")
            logging.info(f"Deleted product with SKU: {sku}")
        except WeaviateException as e:
            logging.error(f"Failed to delete object {sku} from Weaviate: {e}")

    def handle_upsert(self, data: dict):
        """Handles a create or update event from Debezium."""
        sku = data.get('sku')
        if not sku:
            logging.warning("Upsert event received without SKU. Skipping.")
            return
        
        description = data.get('description', '')
        
        # Core logic: Use the caching mechanism to get the vector
        vector = self.get_embedding(sku, description)

        product_object = {
            "sku": sku,
            "name": data.get('name'),
            "description": description,
            "price": data.get('price')
        }
        
        product_uuid = weaviate.util.generate_uuid5(sku, 'Product')
        
        try:
            # Use replace for idempotent create/update behavior
            self.weaviate_client.data_object.replace(
                uuid=product_uuid,
                class_name="Product",
                data_object=product_object,
                vector=vector
            )
            logging.info(f"Upserted product with SKU: {sku}")
        except WeaviateException as e:
            logging.error(f"Failed to upsert object {sku} to Weaviate: {e}")


    def run(self):
        """Main consumer loop."""
        logging.info("Starting consumer loop...")
        for message in self.kafka_consumer:
            payload = message.value.get('payload', {})
            if not payload:
                logging.warning(f"Empty payload in message: {message.key}")
                continue

            op = payload.get('op')
            if op in ['c', 'u']:
                self.handle_upsert(payload.get('after', {}))
            elif op == 'd':
                self.handle_delete(payload.get('before', {}))
            else:
                # Handle snapshotting ('r') or other event types if necessary
                logging.debug(f"Ignoring operation type '{op}'")


if __name__ == "__main__":
    try:
        consumer = VectorSyncConsumer()
        consumer.run()
    except ConnectionError as e:
        logging.critical(f"A critical connection failed: {e}. Shutting down.")
        exit(1)
    except KeyboardInterrupt:
        logging.info("Consumer process interrupted. Shutting down.")
        exit(0)

The final architecture is more resilient and performant. The use of Jupyter in the early stages was invaluable for quickly prototyping the message parsing and Weaviate interaction. However, the move to a standalone, robust Python script was necessary for a production deployment.

flowchart TD
    subgraph "Source System"
        MariaDB[(MariaDB)]
    end

    subgraph "CDC Pipeline"
        Debezium(Debezium Connector)
        Kafka(Kafka Topic: products)
    end
    
    subgraph "Synchronization Service"
        Consumer[Python Consumer]
        Memcached[(Memcached)]
    end

    subgraph "Search & Destination"
        Weaviate[(Weaviate)]
    end

    MariaDB -- Binlog --> Debezium
    Debezium -- Change Events --> Kafka
    Kafka --> Consumer
    
    Consumer -- GET/SET Embedding --> Memcached
    Consumer -- Upsert/Delete --> Weaviate

The system, as built, effectively solves the problem of near-real-time vector synchronization. However, it’s not without its limitations. The consumer is single-threaded. To scale throughput for a database with millions of writes per hour, we would need to run multiple instances of the consumer in the same Kafka consumer group. This parallelization introduces a new challenge: the Memcached cache is local to the consumer’s decision-making process. While multiple consumers hitting the same Memcached instance is fine, there’s a potential race condition where two consumers process updates for the same SKU from different Kafka partitions simultaneously. Furthermore, the embedding model runs within the consumer process itself. For larger, more resource-intensive models, this logic should be externalized to a dedicated, auto-scaling model inference service to decouple compute from I/O. Finally, the current error handling logs failures but moves on; a more robust pattern would involve a dead-letter queue (DLQ) in Kafka to isolate and replay problematic messages without halting the entire pipeline.


  TOC