Our primary OLTP workload runs on a geo-distributed CockroachDB cluster, a choice made for its promise of serializable isolation and resilience against regional failures. This worked flawlessly for transactional integrity. The problem emerged six months into production: our query patterns became pathologically complex. Users demanded not just transactional lookups but faceted full-text search and, critically, “concept-based” semantic search over product catalogs. A simple SELECT
statement with a LIKE
clause wasn’t just slow; it was functionally incorrect for the user experience we needed to build. Initial attempts to solve this with materialized views and complex secondary indexes in CockroachDB itself led to write amplification and crippling query latency. The indexes required to satisfy our read patterns were bloating the storage footprint and slowing down the very transactions the database was chosen to protect.
We committed to a full CQRS (Command Query Responsibility Segregation) architecture. The write model would remain pristine on CockroachDB. The read models, however, would be offloaded to specialized engines. After evaluating our needs, we selected two: Apache Solr for its mature, powerful faceted text search capabilities, and Milvus for vector similarity search to power the “semantic search” features. The architectural decision was sound, but the implementation devil was in the data synchronization. Polling the database was a non-starter due to the load it would place on the cluster and the unacceptable latency. The only viable path was Change Data Capture (CDC). CockroachDB’s native CHANGEFEED
feature, which can stream row-level changes to Kafka, became the backbone of our new architecture.
Our first step was to establish the data firehose from our products
table. The table itself contained structured data, some JSONB for flexible attributes, and a reference to an image URL which would be the source for our vector embeddings.
-- The source-of-truth table in CockroachDB
CREATE TABLE products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sku STRING(50) UNIQUE,
name STRING(255),
description STRING,
brand STRING(100),
category STRING(100),
-- Flexible attributes for different product types
attributes JSONB,
-- URL to the primary product image
image_url STRING,
-- Stock and pricing info
stock_count INT8,
price DECIMAL(10, 2),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
INDEX(brand),
INDEX(category)
);
With the table defined, we created the changefeed. A critical decision here was the diff
option. It provides both the “before” and “after” state of the row, which is invaluable for certain downstream logic, though we primarily used the “after” state. We chose the json
format for maximum interoperability.
-- production_kafka_uri should be replaced with the actual broker address
-- e.g., 'kafka://broker1:9092,broker2:9092?topic_prefix=crdb_cdc_'
CREATE CHANGEFEED FOR TABLE products
INTO 'kafka://production_kafka_uri'
WITH
format = json,
updated,
resolved,
diff;
This produced a stream of messages in a Kafka topic, typically named crdb_cdc_products
. A raw message looked like this:
{
"after": {
"id": "a1b2c3d4-...",
"sku": "HW-TS-001",
"name": "Quantum Weave T-Shirt",
"description": "A comfortable t-shirt made from next-gen materials.",
"brand": "HexaCorp",
"category": "Apparel",
"attributes": {"color": "black", "size": "L"},
"image_url": "https://cdn.example.com/images/qw-tshirt.jpg",
"stock_count": 150,
"price": 39.99,
"created_at": "2023-10-27T10:00:00.000000Z",
"updated_at": "2023-10-27T10:00:00.000000Z"
},
"before": null,
"updated": "1698399600000000000.0000000000"
}
This raw data, however, was useless to our read models. Solr expected a flat or specific nested structure for indexing, and Milvus required a high-dimensional vector, not a URL. Our first attempt to solve this was a bespoke Go microservice. It consumed from the Kafka topic, performed the transformations, called an external ML model service to get embeddings for the image_url
, and then wrote to Solr and Milvus.
// A snippet from our first, failed attempt at a custom processor.
// Do not use this in production.
package main
import (
"context"
"encoding/json"
"log"
"net/http"
"bytes"
"github.com/segmentio/kafka-go"
)
// Simplified structures
type CockroachCDCMessage struct {
After map[string]interface{} `json:"after"`
}
type SolrProduct struct {
ID string `json:"id"`
SKU string `json:"sku_s"`
Name string `json:"name_t"`
Description string `json:"description_t"`
Brand string `json:"brand_s"`
Category string `json:"category_s"`
Color string `json:"color_s,omitempty"`
Size string `json:"size_s,omitempty"`
Price float64 `json:"price_f"`
InStock bool `json:"in_stock_b"`
}
// This approach couples business logic directly into the data pipeline.
func processMessage(msg kafka.Message) {
var cdcMsg CockroachCDCMessage
if err := json.Unmarshal(msg.Value, &cdcMsg); err != nil {
log.Printf("ERROR: could not unmarshal cdc message: %v", err)
return // In reality, send to Dead Letter Queue (DLQ)
}
// Tedious and brittle manual mapping
after := cdcMsg.After
solrDoc := SolrProduct{
ID: after["id"].(string),
SKU: after["sku"].(string),
Name: after["name"].(string),
Description: after["description"].(string),
Brand: after["brand"].(string),
Category: after["category"].(string),
Price: after["price"].(float64),
InStock: after["stock_count"].(float64) > 0,
}
// Manual extraction from JSONB
if attrs, ok := after["attributes"].(map[string]interface{}); ok {
if color, ok := attrs["color"].(string); ok {
solrDoc.Color = color
}
if size, ok := attrs["size"].(string); ok {
solrDoc.Size = size
}
}
// Call external service for embedding
imageUrl := after["image_url"].(string)
embedding, err := getEmbedding(imageUrl)
if err != nil {
log.Printf("ERROR: failed to get embedding for %s: %v", imageUrl, err)
// DLQ this message
return
}
// Write to Solr and Milvus (client logic omitted for brevity)
// postToSolr(solrDoc)
// insertIntoMilvus(after["id"].(string), embedding)
log.Printf("Processed product %s", solrDoc.ID)
}
func getEmbedding(url string) ([]float32, error) {
// Dummy implementation of calling an embedding service.
// In a real project, this would be a gRPC or HTTP client with retries, timeouts, etc.
reqBody, _ := json.Marshal(map[string]string{"image_url": url})
resp, err := http.Post("http://embedding-service/vectorize", "application/json", bytes.NewBuffer(reqBody))
if err != nil { return nil, err }
defer resp.Body.Close()
var result struct {
Vector []float32 `json:"vector"`
}
json.NewDecoder(resp.Body).Decode(&result)
return result.Vector, nil
}
This custom service approach was a disaster. It worked for one entity, but our platform had hundreds. Each new entity or schema modification on an existing one required a code change, a PR, a review, and a deployment of this critical pipeline component. It became the single largest bottleneck in our development lifecycle. The code was a brittle mess of type assertions and manual JSON traversals. This pain forced a radical rethink. We needed to abstract the transformation logic away from the pipeline’s plumbing. Developers should declare transformations, not code them.
This led us to the core of our solution: a declarative mapping system powered by a surprising tool from the front-end world: Babel. We would define a simple JavaScript-based DSL for developers to express the mapping from a CockroachDB table to various read models. Then, a build-time script using Babel would transpile this DSL into a simple, intermediate JSON representation. This JSON configuration would then be consumed by a generic Kafka Connect processor.
Here is an example of what a developer would write in a product.projection.js
file:
// product.projection.js
// This is our domain-specific language (DSL) for defining projections.
// It uses a JavaScript-like syntax that is intuitive for developers.
import { defineProjection, source, to, field, from, expr, call } from './projection-dsl-types';
export default defineProjection(
source('products'), // Source CockroachDB table
// Projection target: Solr core
to('solr', { core: 'products_v1' },
field('id').mapsFrom(from('id')),
field('sku_s').mapsFrom(from('sku')),
field('name_t').mapsFrom(from('name')),
field('description_t').mapsFrom(from('description')),
field('brand_s').mapsFrom(from('brand')),
field('category_s').mapsFrom(from('category')),
// Flattening a nested JSONB attribute
field('color_s').mapsFrom(from('attributes.color')),
field('size_s').mapsFrom(from('attributes.size')),
// Simple transformation expression
field('price_f').mapsFrom(from('price').as('float')),
field('in_stock_b').mapsFrom(expr('stock_count > 0')),
),
// Projection target: Milvus collection
to('milvus', { collection: 'product_images_v1' },
// The primary key for the vector
field('id').mapsFrom(from('id')),
// The vector field, generated by an external service call
field('vector').mapsFrom(call('embedding-service.vectorize', {
image_url: from('image_url')
})),
)
);
The beauty of this is its declarative nature. It describes the “what,” not the “how.” To make this work, we built a Node.js script that uses @babel/core
to parse these files and generate a JSON configuration.
// scripts/transpile-projections.js
// This script is the heart of our build process. It uses Babel to
// understand the DSL and output a machine-readable JSON format.
const babel = require('@babel/core');
const fs = require('fs');
const path = require('path');
const glob = require('glob');
// A custom Babel plugin to interpret our DSL
const projectionDslPlugin = ({ types: t }) => {
return {
visitor: {
CallExpression(path, state) {
// We're looking for the top-level `defineProjection` call
if (path.get('callee').isIdentifier({ name: 'defineProjection' })) {
const [sourceNode, ...toNodes] = path.node.arguments;
const projectionConfig = {
source: null,
targets: []
};
// Extract source table name
if (t.isCallExpression(sourceNode) && sourceNode.callee.name === 'source') {
projectionConfig.source = sourceNode.arguments[0].value;
}
// Process each `to(...)` block
toNodes.forEach(toNode => {
if (!t.isCallExpression(toNode) || toNode.callee.name !== 'to') return;
const targetType = toNode.arguments[0].value;
const targetOptions = parseObjectExpression(toNode.arguments[1]);
const fields = toNode.arguments.slice(2).map(fieldNode => parseField(fieldNode));
projectionConfig.targets.push({
type: targetType,
options: targetOptions,
fields: fields,
});
});
// Attach the final config to the file's state
state.file.metadata.projectionConfig = projectionConfig;
}
}
}
};
// Helper to parse the `field(...)` calls into a structured object
function parseField(fieldNode) {
if (!t.isCallExpression(fieldNode) || fieldNode.callee.name !== 'field') return null;
const fieldConfig = { name: fieldNode.arguments[0].value, mapping: null };
const mapsFromCall = fieldNode.get('callee.object').node;
if (t.isCallExpression(mapsFromCall) && mapsFromCall.callee.property.name === 'mapsFrom') {
const mappingNode = mapsFromCall.arguments[0];
fieldConfig.mapping = parseMapping(mappingNode);
}
return fieldConfig;
}
// This is where we interpret `from()`, `expr()`, and `call()`
function parseMapping(mappingNode) {
if (t.isCallExpression(mappingNode)) {
const calleeName = mappingNode.callee.name || mappingNode.callee.property.name;
switch(calleeName) {
case 'from':
return { type: 'path', value: mappingNode.arguments[0].value };
case 'expr':
return { type: 'expression', value: mappingNode.arguments[0].value };
case 'call':
return {
type: 'service_call',
service: mappingNode.arguments[0].value,
params: parseObjectExpression(mappingNode.arguments[1])
};
case 'as': // Handles chained calls like from('price').as('float')
const sourceMapping = parseMapping(mappingNode.callee.object);
sourceMapping.cast = mappingNode.arguments[0].value;
return sourceMapping;
}
}
return null;
}
// A simplified utility to convert Babel AST ObjectExpression to a plain JS object
function parseObjectExpression(node) {
if (!node || !t.isObjectExpression(node)) return {};
const obj = {};
node.properties.forEach(prop => {
if (t.isObjectProperty(prop)) {
const key = prop.key.name || prop.key.value;
// Recursively parse mappings for values within service calls
if (t.isCallExpression(prop.value)) {
obj[key] = parseMapping(prop.value);
} else if (t.isStringLiteral(prop.value) || t.isNumericLiteral(prop.value) || t.isBooleanLiteral(prop.value)) {
obj[key] = prop.value.value;
}
}
});
return obj;
}
};
// Main execution logic
const inputDir = path.resolve(__dirname, '../src/projections');
const outputDir = path.resolve(__dirname, '../dist/configs');
if (!fs.existsSync(outputDir)) {
fs.mkdirSync(outputDir, { recursive: true });
}
glob.sync(`${inputDir}/**/*.projection.js`).forEach(file => {
console.log(`Transpiling ${file}...`);
const sourceCode = fs.readFileSync(file, 'utf-8');
// Run Babel transformation
const { metadata } = babel.transformSync(sourceCode, {
plugins: [projectionDslPlugin],
// We don't need to generate JS code, only extract metadata
code: false,
});
if (metadata.projectionConfig) {
const outputFile = path.join(outputDir, `${path.basename(file, '.projection.js')}.json`);
fs.writeFileSync(outputFile, JSON.stringify(metadata.projectionConfig, null, 2));
console.log(` -> Wrote config to ${outputFile}`);
} else {
console.error(` -> ERROR: No projection config found in ${file}.`);
}
});
Running this script (node scripts/transpile-projections.js
) against product.projection.js
generates the following product.json
file:
{
"source": "products",
"targets": [
{
"type": "solr",
"options": {
"core": "products_v1"
},
"fields": [
{ "name": "id", "mapping": { "type": "path", "value": "id" } },
{ "name": "sku_s", "mapping": { "type": "path", "value": "sku" } },
{ "name": "name_t", "mapping": { "type": "path", "value": "name" } },
{ "name": "description_t", "mapping": { "type": "path", "value": "description" } },
{ "name": "brand_s", "mapping": { "type": "path", "value": "brand" } },
{ "name": "category_s", "mapping": { "type": "path", "value": "category" } },
{ "name": "color_s", "mapping": { "type": "path", "value": "attributes.color" } },
{ "name": "size_s", "mapping": { "type": "path", "value": "attributes.size" } },
{ "name": "price_f", "mapping": { "type": "path", "value": "price", "cast": "float" } },
{ "name": "in_stock_b", "mapping": { "type": "expression", "value": "stock_count > 0" } }
]
},
{
"type": "milvus",
"options": {
"collection": "product_images_v1"
},
"fields": [
{ "name": "id", "mapping": { "type": "path", "value": "id" } },
{ "name": "vector", "mapping": {
"type": "service_call",
"service": "embedding-service.vectorize",
"params": {
"image_url": { "type": "path", "value": "image_url" }
}
}
}
]
}
]
}
This JSON file is the portable, machine-readable instruction set for our data pipeline. We now use standard Kafka Connect with a custom, but generic, Single Message Transform (SMT). The SMT is configured with the path to this JSON file. Its job is to read the configuration on startup and apply the specified transformations to each Kafka message that passes through it. The SMT is now completely decoupled from any specific entity’s business logic.
A simplified view of the Kafka Connect connector configuration:
{
"name": "crdb-products-to-solr",
"config": {
"connector.class": "org.apache.kafka.connect.solr.SolrSinkConnector",
"tasks.max": "2",
"topics": "crdb_cdc_products",
"solr.url": "http://solr-host:8983/solr/products_v1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "CqrsProjection",
"transforms.CqrsProjection.type": "com.mycompany.kafka.connect.smt.CqrsProjectionSmt",
"transforms.CqrsProjection.config.path": "/etc/kafka-connect/configs/product.json",
"transforms.CqrsProjection.target.name": "solr",
// Critical for production: handle bad messages
"errors.log.enable": true,
"errors.log.include.messages": true,
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq_cdc_products_solr",
"errors.deadletterqueue.topic.replication.factor": "3"
}
}
The SMT implementation in Java would parse the JSON config and dynamically build a transformation pipeline. For a path
mapping, it uses a library like JsonPath to extract values. For an expression
, it uses a lightweight expression engine. For a service_call
, it uses an HTTP client to call the specified service. The result is a robust, configurable, and incredibly flexible system. The developer workflow is now simple: define a projection, run an npm
script, and deploy a standard Kafka Connect configuration file. No custom Go service deployments are needed for 95% of cases.
graph TD subgraph CockroachDB Cluster A[products Table] end subgraph Developer Workflow D[product.projection.js DSL] -- npm run build:projections --> E(Babel Transpiler) E -- Generates --> F[product.json Config] end subgraph Data Pipeline subgraph Kafka Connect K[Kafka Consumer] -->|Raw CDC Msg| SMT[Generic CqrsProjectionSMT] F -- Loaded by --> SMT SMT -->|Solr Document| P1[Solr Sink Connector] SMT -->|Milvus Payload| P2[HTTP Sink Connector] end Kafka[Kafka Topic: crdb_cdc_products] end subgraph Read Models Solr[Apache Solr] Milvus[Milvus] end A -- CORE Changefeed --> Kafka Kafka --> K P1 --> Solr P2 --> Milvus style SMT fill:#f9f,stroke:#333,stroke-width:2px style E fill:#ccf,stroke:#333,stroke-width:2px
This architecture, born from the failure of a monolithic transformation service, has proven its worth. It scales horizontally with Kafka Connect and vertically by allowing developers to self-serve their read model projection needs without involving the data platform team. The use of Babel, a tool typically associated with web development, was unconventional but solved the core problem of developer ergonomics and abstraction.
However, this system isn’t without its own set of trade-offs and potential issues. The Babel-based transpilation step adds complexity to the CI/CD pipeline and requires front-end tooling in a back-end project, which can be a cultural hurdle. The generic SMT, while flexible, will never be as performant as a hand-optimized, compiled Go or Java application for a specific transformation; for extremely high-throughput tables, we may still need to fall back to a custom service. Furthermore, the reliance on synchronous calls to external services like the embedding model within the SMT can introduce latency and a single point of failure into the data pipeline. A future iteration might involve a two-step process: the SMT enriches the message with an “embedding_request” event, and a separate, asynchronous processor handles the service call and updates the Milvus index, introducing eventual consistency for the vector data but improving the resilience of the main pipeline.