Implementing a Multi-Model CQRS Read Projection Pipeline from CockroachDB Changefeeds


Our primary OLTP workload runs on a geo-distributed CockroachDB cluster, a choice made for its promise of serializable isolation and resilience against regional failures. This worked flawlessly for transactional integrity. The problem emerged six months into production: our query patterns became pathologically complex. Users demanded not just transactional lookups but faceted full-text search and, critically, “concept-based” semantic search over product catalogs. A simple SELECT statement with a LIKE clause wasn’t just slow; it was functionally incorrect for the user experience we needed to build. Initial attempts to solve this with materialized views and complex secondary indexes in CockroachDB itself led to write amplification and crippling query latency. The indexes required to satisfy our read patterns were bloating the storage footprint and slowing down the very transactions the database was chosen to protect.

We committed to a full CQRS (Command Query Responsibility Segregation) architecture. The write model would remain pristine on CockroachDB. The read models, however, would be offloaded to specialized engines. After evaluating our needs, we selected two: Apache Solr for its mature, powerful faceted text search capabilities, and Milvus for vector similarity search to power the “semantic search” features. The architectural decision was sound, but the implementation devil was in the data synchronization. Polling the database was a non-starter due to the load it would place on the cluster and the unacceptable latency. The only viable path was Change Data Capture (CDC). CockroachDB’s native CHANGEFEED feature, which can stream row-level changes to Kafka, became the backbone of our new architecture.

Our first step was to establish the data firehose from our products table. The table itself contained structured data, some JSONB for flexible attributes, and a reference to an image URL which would be the source for our vector embeddings.

-- The source-of-truth table in CockroachDB
CREATE TABLE products (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    sku STRING(50) UNIQUE,
    name STRING(255),
    description STRING,
    brand STRING(100),
    category STRING(100),
    -- Flexible attributes for different product types
    attributes JSONB,
    -- URL to the primary product image
    image_url STRING,
    -- Stock and pricing info
    stock_count INT8,
    price DECIMAL(10, 2),
    created_at TIMESTAMPTZ DEFAULT now(),
    updated_at TIMESTAMPTZ DEFAULT now(),
    INDEX(brand),
    INDEX(category)
);

With the table defined, we created the changefeed. A critical decision here was the diff option. It provides both the “before” and “after” state of the row, which is invaluable for certain downstream logic, though we primarily used the “after” state. We chose the json format for maximum interoperability.

-- production_kafka_uri should be replaced with the actual broker address
-- e.g., 'kafka://broker1:9092,broker2:9092?topic_prefix=crdb_cdc_'
CREATE CHANGEFEED FOR TABLE products
INTO 'kafka://production_kafka_uri'
WITH
    format = json,
    updated,
    resolved,
    diff;

This produced a stream of messages in a Kafka topic, typically named crdb_cdc_products. A raw message looked like this:

{
  "after": {
    "id": "a1b2c3d4-...",
    "sku": "HW-TS-001",
    "name": "Quantum Weave T-Shirt",
    "description": "A comfortable t-shirt made from next-gen materials.",
    "brand": "HexaCorp",
    "category": "Apparel",
    "attributes": {"color": "black", "size": "L"},
    "image_url": "https://cdn.example.com/images/qw-tshirt.jpg",
    "stock_count": 150,
    "price": 39.99,
    "created_at": "2023-10-27T10:00:00.000000Z",
    "updated_at": "2023-10-27T10:00:00.000000Z"
  },
  "before": null,
  "updated": "1698399600000000000.0000000000"
}

This raw data, however, was useless to our read models. Solr expected a flat or specific nested structure for indexing, and Milvus required a high-dimensional vector, not a URL. Our first attempt to solve this was a bespoke Go microservice. It consumed from the Kafka topic, performed the transformations, called an external ML model service to get embeddings for the image_url, and then wrote to Solr and Milvus.

// A snippet from our first, failed attempt at a custom processor.
// Do not use this in production.
package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "bytes"

    "github.com/segmentio/kafka-go"
)

// Simplified structures
type CockroachCDCMessage struct {
    After map[string]interface{} `json:"after"`
}

type SolrProduct struct {
    ID          string   `json:"id"`
    SKU         string   `json:"sku_s"`
    Name        string   `json:"name_t"`
    Description string   `json:"description_t"`
    Brand       string   `json:"brand_s"`
    Category    string   `json:"category_s"`
    Color       string   `json:"color_s,omitempty"`
    Size        string   `json:"size_s,omitempty"`
    Price       float64  `json:"price_f"`
    InStock     bool     `json:"in_stock_b"`
}

// This approach couples business logic directly into the data pipeline.
func processMessage(msg kafka.Message) {
    var cdcMsg CockroachCDCMessage
    if err := json.Unmarshal(msg.Value, &cdcMsg); err != nil {
        log.Printf("ERROR: could not unmarshal cdc message: %v", err)
        return // In reality, send to Dead Letter Queue (DLQ)
    }

    // Tedious and brittle manual mapping
    after := cdcMsg.After
    solrDoc := SolrProduct{
        ID:          after["id"].(string),
        SKU:         after["sku"].(string),
        Name:        after["name"].(string),
        Description: after["description"].(string),
        Brand:       after["brand"].(string),
        Category:    after["category"].(string),
        Price:       after["price"].(float64),
        InStock:     after["stock_count"].(float64) > 0,
    }
    
    // Manual extraction from JSONB
    if attrs, ok := after["attributes"].(map[string]interface{}); ok {
        if color, ok := attrs["color"].(string); ok {
            solrDoc.Color = color
        }
        if size, ok := attrs["size"].(string); ok {
            solrDoc.Size = size
        }
    }

    // Call external service for embedding
    imageUrl := after["image_url"].(string)
    embedding, err := getEmbedding(imageUrl)
    if err != nil {
        log.Printf("ERROR: failed to get embedding for %s: %v", imageUrl, err)
        // DLQ this message
        return
    }

    // Write to Solr and Milvus (client logic omitted for brevity)
    // postToSolr(solrDoc)
    // insertIntoMilvus(after["id"].(string), embedding)

    log.Printf("Processed product %s", solrDoc.ID)
}

func getEmbedding(url string) ([]float32, error) {
    // Dummy implementation of calling an embedding service.
    // In a real project, this would be a gRPC or HTTP client with retries, timeouts, etc.
    reqBody, _ := json.Marshal(map[string]string{"image_url": url})
    resp, err := http.Post("http://embedding-service/vectorize", "application/json", bytes.NewBuffer(reqBody))
    if err != nil { return nil, err }
    defer resp.Body.Close()
    
    var result struct {
        Vector []float32 `json:"vector"`
    }
    json.NewDecoder(resp.Body).Decode(&result)
    return result.Vector, nil
}

This custom service approach was a disaster. It worked for one entity, but our platform had hundreds. Each new entity or schema modification on an existing one required a code change, a PR, a review, and a deployment of this critical pipeline component. It became the single largest bottleneck in our development lifecycle. The code was a brittle mess of type assertions and manual JSON traversals. This pain forced a radical rethink. We needed to abstract the transformation logic away from the pipeline’s plumbing. Developers should declare transformations, not code them.

This led us to the core of our solution: a declarative mapping system powered by a surprising tool from the front-end world: Babel. We would define a simple JavaScript-based DSL for developers to express the mapping from a CockroachDB table to various read models. Then, a build-time script using Babel would transpile this DSL into a simple, intermediate JSON representation. This JSON configuration would then be consumed by a generic Kafka Connect processor.

Here is an example of what a developer would write in a product.projection.js file:

// product.projection.js
// This is our domain-specific language (DSL) for defining projections.
// It uses a JavaScript-like syntax that is intuitive for developers.

import { defineProjection, source, to, field, from, expr, call } from './projection-dsl-types';

export default defineProjection(
    source('products'), // Source CockroachDB table

    // Projection target: Solr core
    to('solr', { core: 'products_v1' },
        field('id').mapsFrom(from('id')),
        field('sku_s').mapsFrom(from('sku')),
        field('name_t').mapsFrom(from('name')),
        field('description_t').mapsFrom(from('description')),
        field('brand_s').mapsFrom(from('brand')),
        field('category_s').mapsFrom(from('category')),

        // Flattening a nested JSONB attribute
        field('color_s').mapsFrom(from('attributes.color')),
        field('size_s').mapsFrom(from('attributes.size')),

        // Simple transformation expression
        field('price_f').mapsFrom(from('price').as('float')),
        field('in_stock_b').mapsFrom(expr('stock_count > 0')),
    ),

    // Projection target: Milvus collection
    to('milvus', { collection: 'product_images_v1' },
        // The primary key for the vector
        field('id').mapsFrom(from('id')),

        // The vector field, generated by an external service call
        field('vector').mapsFrom(call('embedding-service.vectorize', {
            image_url: from('image_url')
        })),
    )
);

The beauty of this is its declarative nature. It describes the “what,” not the “how.” To make this work, we built a Node.js script that uses @babel/core to parse these files and generate a JSON configuration.

// scripts/transpile-projections.js
// This script is the heart of our build process. It uses Babel to
// understand the DSL and output a machine-readable JSON format.

const babel = require('@babel/core');
const fs = require('fs');
const path = require('path');
const glob = require('glob');

// A custom Babel plugin to interpret our DSL
const projectionDslPlugin = ({ types: t }) => {
    return {
        visitor: {
            CallExpression(path, state) {
                // We're looking for the top-level `defineProjection` call
                if (path.get('callee').isIdentifier({ name: 'defineProjection' })) {
                    const [sourceNode, ...toNodes] = path.node.arguments;
                    
                    const projectionConfig = {
                        source: null,
                        targets: []
                    };

                    // Extract source table name
                    if (t.isCallExpression(sourceNode) && sourceNode.callee.name === 'source') {
                        projectionConfig.source = sourceNode.arguments[0].value;
                    }

                    // Process each `to(...)` block
                    toNodes.forEach(toNode => {
                        if (!t.isCallExpression(toNode) || toNode.callee.name !== 'to') return;
                        
                        const targetType = toNode.arguments[0].value;
                        const targetOptions = parseObjectExpression(toNode.arguments[1]);
                        const fields = toNode.arguments.slice(2).map(fieldNode => parseField(fieldNode));
                        
                        projectionConfig.targets.push({
                            type: targetType,
                            options: targetOptions,
                            fields: fields,
                        });
                    });
                    
                    // Attach the final config to the file's state
                    state.file.metadata.projectionConfig = projectionConfig;
                }
            }
        }
    };

    // Helper to parse the `field(...)` calls into a structured object
    function parseField(fieldNode) {
        if (!t.isCallExpression(fieldNode) || fieldNode.callee.name !== 'field') return null;

        const fieldConfig = { name: fieldNode.arguments[0].value, mapping: null };
        const mapsFromCall = fieldNode.get('callee.object').node;

        if (t.isCallExpression(mapsFromCall) && mapsFromCall.callee.property.name === 'mapsFrom') {
            const mappingNode = mapsFromCall.arguments[0];
            fieldConfig.mapping = parseMapping(mappingNode);
        }
        return fieldConfig;
    }
    
    // This is where we interpret `from()`, `expr()`, and `call()`
    function parseMapping(mappingNode) {
        if (t.isCallExpression(mappingNode)) {
            const calleeName = mappingNode.callee.name || mappingNode.callee.property.name;
            switch(calleeName) {
                case 'from':
                    return { type: 'path', value: mappingNode.arguments[0].value };
                case 'expr':
                    return { type: 'expression', value: mappingNode.arguments[0].value };
                case 'call':
                    return {
                        type: 'service_call',
                        service: mappingNode.arguments[0].value,
                        params: parseObjectExpression(mappingNode.arguments[1])
                    };
                case 'as': // Handles chained calls like from('price').as('float')
                    const sourceMapping = parseMapping(mappingNode.callee.object);
                    sourceMapping.cast = mappingNode.arguments[0].value;
                    return sourceMapping;
            }
        }
        return null;
    }

    // A simplified utility to convert Babel AST ObjectExpression to a plain JS object
    function parseObjectExpression(node) {
        if (!node || !t.isObjectExpression(node)) return {};
        const obj = {};
        node.properties.forEach(prop => {
            if (t.isObjectProperty(prop)) {
                const key = prop.key.name || prop.key.value;
                // Recursively parse mappings for values within service calls
                if (t.isCallExpression(prop.value)) {
                    obj[key] = parseMapping(prop.value);
                } else if (t.isStringLiteral(prop.value) || t.isNumericLiteral(prop.value) || t.isBooleanLiteral(prop.value)) {
                    obj[key] = prop.value.value;
                }
            }
        });
        return obj;
    }
};

// Main execution logic
const inputDir = path.resolve(__dirname, '../src/projections');
const outputDir = path.resolve(__dirname, '../dist/configs');

if (!fs.existsSync(outputDir)) {
    fs.mkdirSync(outputDir, { recursive: true });
}

glob.sync(`${inputDir}/**/*.projection.js`).forEach(file => {
    console.log(`Transpiling ${file}...`);
    const sourceCode = fs.readFileSync(file, 'utf-8');
    
    // Run Babel transformation
    const { metadata } = babel.transformSync(sourceCode, {
        plugins: [projectionDslPlugin],
        // We don't need to generate JS code, only extract metadata
        code: false, 
    });

    if (metadata.projectionConfig) {
        const outputFile = path.join(outputDir, `${path.basename(file, '.projection.js')}.json`);
        fs.writeFileSync(outputFile, JSON.stringify(metadata.projectionConfig, null, 2));
        console.log(`  -> Wrote config to ${outputFile}`);
    } else {
        console.error(`  -> ERROR: No projection config found in ${file}.`);
    }
});

Running this script (node scripts/transpile-projections.js) against product.projection.js generates the following product.json file:

{
  "source": "products",
  "targets": [
    {
      "type": "solr",
      "options": {
        "core": "products_v1"
      },
      "fields": [
        { "name": "id", "mapping": { "type": "path", "value": "id" } },
        { "name": "sku_s", "mapping": { "type": "path", "value": "sku" } },
        { "name": "name_t", "mapping": { "type": "path", "value": "name" } },
        { "name": "description_t", "mapping": { "type": "path", "value": "description" } },
        { "name": "brand_s", "mapping": { "type": "path", "value": "brand" } },
        { "name": "category_s", "mapping": { "type": "path", "value": "category" } },
        { "name": "color_s", "mapping": { "type": "path", "value": "attributes.color" } },
        { "name": "size_s", "mapping": { "type": "path", "value": "attributes.size" } },
        { "name": "price_f", "mapping": { "type": "path", "value": "price", "cast": "float" } },
        { "name": "in_stock_b", "mapping": { "type": "expression", "value": "stock_count > 0" } }
      ]
    },
    {
      "type": "milvus",
      "options": {
        "collection": "product_images_v1"
      },
      "fields": [
        { "name": "id", "mapping": { "type": "path", "value": "id" } },
        { "name": "vector", "mapping": {
            "type": "service_call",
            "service": "embedding-service.vectorize",
            "params": {
              "image_url": { "type": "path", "value": "image_url" }
            }
          }
        }
      ]
    }
  ]
}

This JSON file is the portable, machine-readable instruction set for our data pipeline. We now use standard Kafka Connect with a custom, but generic, Single Message Transform (SMT). The SMT is configured with the path to this JSON file. Its job is to read the configuration on startup and apply the specified transformations to each Kafka message that passes through it. The SMT is now completely decoupled from any specific entity’s business logic.

A simplified view of the Kafka Connect connector configuration:

{
    "name": "crdb-products-to-solr",
    "config": {
        "connector.class": "org.apache.kafka.connect.solr.SolrSinkConnector",
        "tasks.max": "2",
        "topics": "crdb_cdc_products",
        "solr.url": "http://solr-host:8983/solr/products_v1",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        
        "transforms": "CqrsProjection",
        "transforms.CqrsProjection.type": "com.mycompany.kafka.connect.smt.CqrsProjectionSmt",
        "transforms.CqrsProjection.config.path": "/etc/kafka-connect/configs/product.json",
        "transforms.CqrsProjection.target.name": "solr",

        // Critical for production: handle bad messages
        "errors.log.enable": true,
        "errors.log.include.messages": true,
        "errors.tolerance": "all",
        "errors.deadletterqueue.topic.name": "dlq_cdc_products_solr",
        "errors.deadletterqueue.topic.replication.factor": "3"
    }
}

The SMT implementation in Java would parse the JSON config and dynamically build a transformation pipeline. For a path mapping, it uses a library like JsonPath to extract values. For an expression, it uses a lightweight expression engine. For a service_call, it uses an HTTP client to call the specified service. The result is a robust, configurable, and incredibly flexible system. The developer workflow is now simple: define a projection, run an npm script, and deploy a standard Kafka Connect configuration file. No custom Go service deployments are needed for 95% of cases.

graph TD
    subgraph CockroachDB Cluster
        A[products Table]
    end
    subgraph Developer Workflow
        D[product.projection.js DSL] -- npm run build:projections --> E(Babel Transpiler)
        E -- Generates --> F[product.json Config]
    end
    subgraph Data Pipeline
        subgraph Kafka Connect
            K[Kafka Consumer] -->|Raw CDC Msg| SMT[Generic CqrsProjectionSMT]
            F -- Loaded by --> SMT
            SMT -->|Solr Document| P1[Solr Sink Connector]
            SMT -->|Milvus Payload| P2[HTTP Sink Connector]
        end
        Kafka[Kafka Topic: crdb_cdc_products]
    end
    subgraph Read Models
        Solr[Apache Solr]
        Milvus[Milvus]
    end
    
    A -- CORE Changefeed --> Kafka
    Kafka --> K
    P1 --> Solr
    P2 --> Milvus
    
    style SMT fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#ccf,stroke:#333,stroke-width:2px

This architecture, born from the failure of a monolithic transformation service, has proven its worth. It scales horizontally with Kafka Connect and vertically by allowing developers to self-serve their read model projection needs without involving the data platform team. The use of Babel, a tool typically associated with web development, was unconventional but solved the core problem of developer ergonomics and abstraction.

However, this system isn’t without its own set of trade-offs and potential issues. The Babel-based transpilation step adds complexity to the CI/CD pipeline and requires front-end tooling in a back-end project, which can be a cultural hurdle. The generic SMT, while flexible, will never be as performant as a hand-optimized, compiled Go or Java application for a specific transformation; for extremely high-throughput tables, we may still need to fall back to a custom service. Furthermore, the reliance on synchronous calls to external services like the embedding model within the SMT can introduce latency and a single point of failure into the data pipeline. A future iteration might involve a two-step process: the SMT enriches the message with an “embedding_request” event, and a separate, asynchronous processor handles the service call and updates the Milvus index, introducing eventual consistency for the vector data but improving the resilience of the main pipeline.


  TOC