Our initial implementation of semantic search was failing. It was a simple batch process: a nightly job would scan our primary document store, generate embeddings, and bulk-load them into a vector index. The user-facing application would then query this index. The core problem was data staleness. Documents updated in our primary OLTP database—a sharded MySQL cluster at the time—wouldn’t be reflected in the vector search for up to 24 hours. In a real-world project where document relevance is time-sensitive, this latency was unacceptable. Users were searching for content that was already modified or deleted, leading to a cascade of downstream issues. This was the technical pain point that forced a complete architectural rethink.
The goal was to build a system where changes in our structured source of truth would propagate to the unstructured vector index in near real-time. Furthermore, we needed to combine traditional keyword-based search with this new semantic capability, as our users relied on both precise term matching and conceptual similarity. The initial concept settled on a Change Data Capture (CDC) pipeline feeding a vector database, with a unified query layer to orchestrate and fuse the results.
The technology selection process was rigorous. For the primary database, we migrated from sharded MySQL to TiDB. This choice was deliberate. TiDB’s horizontal scalability and MySQL compatibility made the migration manageable, but the killer feature for this project was its native, stable CDC component, TiCDC. It provides low-latency, transactional change data streams without imposing significant overhead on the transactional workload. This was the foundation for our real-time pipeline.
For the vector search component, Milvus was selected. Its ability to scale horizontally and support for various index types like HNSW (Hierarchical Navigable Small World) were critical for our performance requirements. More importantly, its SDKs are mature, and it supports filtering on scalar fields alongside vector similarity search, a crucial feature for implementing hybrid search logic.
The most unconventional choice was for the query orchestration layer. Instead of a dedicated Go or Python microservice, we chose Astro. While known as a static site generator, its server-side rendering (SSR) mode and API routes provide a lightweight, high-performance Node.js environment. By collocating the query orchestration logic within the web framework itself, we created a Backend-for-Frontend (BFF) that simplified our deployment topology and reduced network latency between the presentation layer and the data aggregation logic. A common mistake is to over-architect solutions; in our case, Astro provided just enough backend capability without the operational overhead of another full-fledged microservice.
The final architecture can be visualized as two distinct flows: a real-time ingestion flow and a user-facing query flow.
graph TD subgraph Ingestion Pipeline TiDB(TiDB Cluster) -- TiCDC --> Kafka(Kafka Topic: document_changes) Kafka --> Consumer(Embedding Consumer Service) Consumer -- Generate Embedding --> EmbeddingModel(Sentence Transformer API) Consumer -- Insert/Update Vector --> Milvus(Milvus Cluster) end subgraph Query Flow User(User) --> Astro(Astro App) Astro -- API Route: /api/search --> Orchestrator{Query Orchestrator} Orchestrator -- Keyword Query --> TiDB Orchestrator -- Vector Query --> Milvus TiDB -- Results --> Reranker(Result Fusion & Reranking) Milvus -- Results --> Reranker Reranker -- Fused Results --> Astro Astro -- Renders Page --> User end
The remainder of this log details the implementation of these two core components, focusing on the production-grade code required to make this system robust and maintainable.
Part 1: The Real-Time Ingestion Pipeline
The pipeline’s reliability is paramount. If it fails, the system reverts to its original problem of data staleness. The core components are the TiDB schema, the TiCDC configuration, and a resilient Kafka consumer service.
TiDB Schema and Data Model
We started by defining a simple schema in TiDB for our documents. The updated_at
column is automatically managed by TiDB and is crucial for tracking changes.
CREATE TABLE documents (
id VARCHAR(36) PRIMARY KEY,
tenant_id VARCHAR(36) NOT NULL,
title VARCHAR(255) NOT NULL,
content TEXT,
category VARCHAR(50),
status ENUM('published', 'draft', 'archived') DEFAULT 'draft',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
KEY idx_tenant_status (tenant_id, status),
FULLTEXT KEY ft_idx_title_content (title, content)
);
A pitfall here is neglecting proper indexing on the metadata fields like tenant_id
and status
. These will be essential for filtering in the query phase. We also added a FULLTEXT
index for our keyword search component, though more advanced use cases might offload this to a dedicated search engine.
Configuring TiCDC
With the table in place, we configured TiCDC to capture changes and stream them to a Kafka topic. This is done via the cdc
command-line tool. The configuration specifies the Kafka broker address and the topic to which changes should be sent. We use the canal-json
protocol, which provides a clean format containing both the old and new data for each row-level change.
# Command to create the changefeed from TiDB to Kafka
# Assumes TiDB cluster PD endpoint is at 10.0.0.1:2379 and Kafka is at 10.0.0.2:9092
tiup cdc cli changefeed create \
--pd="http://10.0.0.1:2379" \
--sink-uri="kafka://10.0.0.2:9092/document_changes?protocol=canal-json&kafka-version=2.4.0" \
--changefeed-id="tidb-to-milvus-pipeline" \
--config=- <<EOF
[filter]
rules = ['docs_db.documents']
[mounter]
worker-num = 8
[sink]
# Set dispatching rule based on the primary key (id) to ensure ordered processing for the same document.
dispatchers = [
{matcher = ['docs_db.documents'], topic = "document_changes", partition = "index-value"},
]
EOF
A critical detail in this configuration is the partition = "index-value"
dispatcher rule. This ensures that all changes for a specific document id
are sent to the same Kafka partition, guaranteeing ordered processing and preventing race conditions where an older update might overwrite a newer one in Milvus.
The Embedding Consumer Service
This service is the heart of the ingestion pipeline. It’s a Go application that consumes messages from Kafka, generates embeddings, and interacts with Milvus. We chose Go for its performance and robust concurrency model, which is ideal for building high-throughput data processing services.
Here is a simplified but functional structure for the consumer. It uses libraries like sarama
for Kafka and the official Milvus Go SDK.
main.go
- Application Entrypoint and Setup
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/IBM/sarama"
"github.com/milvus-io/milvus-sdk-go/v2/client"
// Local packages
"tidb-milvus-sync/consumer"
"tidb-milvus-sync/embedding"
"tidb-milvus-sync/milvus"
"tidb-milvus-sync/config"
)
func main() {
// Load configuration from environment variables or a config file
cfg, err := config.Load()
if err != nil {
log.Fatalf("Failed to load configuration: %v", err)
}
// Initialize Milvus client
milvusClient, err := milvus.NewClient(cfg.Milvus.Address, cfg.Milvus.APIKey)
if err != nil {
log.Fatalf("Failed to initialize Milvus client: %v", err)
}
defer milvusClient.Close()
log.Println("Milvus client initialized successfully.")
// Initialize embedding service client
embeddingService := embedding.NewClient(cfg.Embedding.Address)
log.Println("Embedding service client initialized.")
// Setup Sarama consumer group
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_8_0_0 // Use a specific version
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
saramaConfig.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(cfg.Kafka.Brokers, cfg.Kafka.Group, saramaConfig)
if err != nil {
log.Fatalf("Error creating consumer group client: %v", err)
}
// Create the consumer handler
consumerHandler := consumer.NewHandler(milvusClient, embeddingService, cfg.Milvus.CollectionName)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will end,
// and `Consume` will return.
if err := client.Consume(ctx, []string{cfg.Kafka.Topic}, consumerHandler); err != nil {
log.Printf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumerHandler.Ready() // Mark as ready for the next session
}
}()
<-consumerHandler.ReadyChan() // Wait until the consumer has been set up
log.Println("Sarama consumer up and running...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Fatalf("Error closing client: %v", err)
}
}
consumer/handler.go
- Core Message Processing Logic
This is where each CDC message is processed. The logic must handle INSERT
, UPDATE
, and DELETE
events from TiDB.
package consumer
import (
"context"
"encoding/json"
"log"
"time"
"github.com/IBM/sarama"
"github.com/milvus-io/milvus-sdk-go/v2/client"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"tidb-milvus-sync/embedding"
"tidb-milvus-sync/model"
)
// Handler represents a Sarama consumer group handler
type Handler struct {
ready chan bool
milvusClient client.Client
embedder *embedding.Client
collectionName string
}
func NewHandler(mc client.Client, embed *embedding.Client, collName string) *Handler {
return &Handler{
ready: make(chan bool),
milvusClient: mc,
embedder: embed,
collectionName: collName,
}
}
func (h *Handler) Ready() {
h.ready = make(chan bool)
}
func (h *Handler) ReadyChan() <-chan bool {
return h.ready
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (h *Handler) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(h.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
var canalMsg model.CanalMessage
if err := json.Unmarshal(message.Value, &canalMsg); err != nil {
log.Printf("Failed to unmarshal Canal JSON message: %v", err)
// Acknowledge the message to avoid reprocessing a poison pill
session.MarkMessage(message, "")
continue
}
// We only care about DML events for the documents table
if canalMsg.Type == "INSERT" || canalMsg.Type == "UPDATE" {
err := h.processUpsert(canalMsg)
if err != nil {
log.Printf("Error processing upsert for doc ID %s: %v", canalMsg.Data[0]["id"], err)
// In a real-world project, this is where you'd implement a dead-letter queue (DLQ)
// For simplicity, we log and continue.
}
} else if canalMsg.Type == "DELETE" {
err := h.processDelete(canalMsg)
if err != nil {
log.Printf("Error processing delete for doc ID %s: %v", canalMsg.PKNames[0], err)
}
}
// Mark the message as processed
session.MarkMessage(message, "")
}
return nil
}
func (h *Handler) processUpsert(msg model.CanalMessage) error {
// Canal message for INSERT/UPDATE contains data in the 'data' field
if len(msg.Data) == 0 {
return nil // No data to process
}
docData := msg.Data[0]
// Only process 'published' documents
status, ok := docData["status"].(string)
if !ok || status != "published" {
// If a document is updated to a non-published state, treat it as a delete from the index
id, idOk := docData["id"].(string)
if idOk {
log.Printf("Document %s is not published, deleting from Milvus.", id)
return h.deleteFromMilvus(id)
}
return nil
}
// Construct the text to be embedded. A common practice is to combine title and content.
title, _ := docData["title"].(string)
content, _ := docData["content"].(string)
textToEmbed := title + "\n" + content
// Call the embedding service
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
vector, err := h.embedder.CreateEmbedding(ctx, textToEmbed)
if err != nil {
return fmt.Errorf("embedding failed: %w", err)
}
// Prepare data for Milvus
// The pitfall is to forget metadata. Storing it in Milvus allows for pre-filtering.
idColumn := entity.NewColumnVarChar("id", []string{docData["id"].(string)})
tenantIDColumn := entity.NewColumnVarChar("tenant_id", []string{docData["tenant_id"].(string)})
categoryColumn := entity.NewColumnVarChar("category", []string{docData["category"].(string)})
vectorColumn := entity.NewColumnFloatVector("embedding", 768, [][]float32{vector}) // Assuming embedding dimension is 768
// Use Upsert for simplicity, which handles both inserts and updates based on primary key.
_, err = h.milvusClient.Upsert(ctx, h.collectionName, "", idColumn, tenantIDColumn, categoryColumn, vectorColumn)
if err != nil {
return fmt.Errorf("milvus upsert failed: %w", err)
}
log.Printf("Successfully upserted document %s into Milvus.", docData["id"])
return nil
}
func (h *Handler) processDelete(msg model.CanalMessage) error {
// For DELETE events, Canal/TiCDC puts the old data in the 'data' field.
// The primary key is what matters.
if len(msg.Data) == 0 {
return nil
}
docData := msg.Data[0]
id, ok := docData["id"].(string)
if !ok {
return fmt.Errorf("delete event missing document id")
}
return h.deleteFromMilvus(id)
}
func (h *Handler) deleteFromMilvus(docID string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Milvus deletes entities based on a boolean expression over primary or scalar fields.
expr := fmt.Sprintf("id in ['%s']", docID)
err := h.milvusClient.Delete(ctx, h.collectionName, "", expr)
if err != nil {
return fmt.Errorf("milvus delete failed for docID %s: %w", docID, err)
}
log.Printf("Successfully deleted document %s from Milvus.", docID)
return nil
}
This consumer code demonstrates several production-ready concepts: graceful shutdown, context for timeouts, clear separation of concerns (config, embedding, milvus), and explicit handling of different CDC event types. The error handling is logged, but a real system would need a robust dead-letter queue strategy.
Part 2: The Hybrid Query Orchestration Layer in Astro
With the ingestion pipeline running, the focus shifts to the query side. An Astro API endpoint (src/pages/api/search.ts
) serves as the single point of entry for user queries. It federates requests to TiDB and Milvus, then fuses the results.
Connecting to Data Sources
First, we need clients to connect to our databases from the Astro server environment. These should be initialized once and reused.
src/lib/tidb.ts
- TiDB Connection Pool
import mysql from 'mysql2/promise';
// In a real-world project, these credentials should come from environment variables.
const pool = mysql.createPool({
host: process.env.TIDB_HOST,
user: process.env.TIDB_USER,
password: process.env.TIDB_PASSWORD,
database: process.env.TIDB_DATABASE,
port: parseInt(process.env.TIDB_PORT || '4000', 10),
ssl: {
// For TiDB Cloud or other secure connections
minVersion: 'TLSv1.2',
rejectUnauthorized: true,
},
connectionLimit: 10,
queueLimit: 0,
enableKeepAlive: true,
keepAliveInitialDelay: 0,
});
export default pool;
src/lib/milvus.ts
- Milvus Client
import { MilvusClient } from "@zilliz/milvus2-sdk-node";
// The client should be a singleton to manage connections effectively.
const address = process.env.MILVUS_ADDRESS || 'localhost:19530';
const token = process.env.MILVUS_API_KEY; // For Zilliz Cloud
const milvusClient = new MilvusClient({ address, token });
export default milvusClient;
The Astro API Endpoint
This endpoint encapsulates the core query logic. It’s responsible for receiving the query, parallelizing the database lookups, and merging the results.
src/pages/api/search.ts
import type { APIRoute } from 'astro';
import tidbPool from '../../lib/tidb';
import milvusClient from '../../lib/milvus';
import { getEmbedding } from '../../lib/embedding'; // Assume an embedding utility exists
const COLLECTION_NAME = 'documents';
const VECTOR_DIMENSION = 768;
const TOP_K = 10;
interface SearchResult {
id: string;
score: number;
source: 'keyword' | 'vector';
}
// A simple Reciprocal Rank Fusion implementation
function reciprocalRankFusion(results: SearchResult[][], k: number = 60): SearchResult[] {
const rankedLists = results.filter(list => list && list.length > 0);
if (rankedLists.length === 0) {
return [];
}
const scores: { [id: string]: number } = {};
for (const list of rankedLists) {
for (let i = 0; i < list.length; i++) {
const doc = list[i];
const rank = i + 1;
if (!scores[doc.id]) {
scores[doc.id] = 0;
}
scores[doc.id] += 1 / (k + rank);
}
}
const fused = Object.entries(scores).map(([id, score]) => ({ id, score, source: 'fused' as const }));
fused.sort((a, b) => b.score - a.score);
return fused;
}
export const POST: APIRoute = async ({ request }) => {
try {
const body = await request.json();
const { query, tenantId } = body;
if (!query || typeof query !== 'string' || !tenantId) {
return new Response(JSON.stringify({ error: 'Missing query or tenantId' }), { status: 400 });
}
// 1. Generate embedding for the user query
const queryVector = await getEmbedding(query);
// 2. Execute keyword and vector searches in parallel
const [keywordResults, vectorResults] = await Promise.all([
// Keyword search against TiDB
(async (): Promise<SearchResult[]> => {
try {
const [rows] = await tidbPool.execute(
`SELECT id, MATCH (title, content) AGAINST (? IN NATURAL LANGUAGE MODE) AS score
FROM documents
WHERE tenant_id = ? AND status = 'published' AND MATCH (title, content) AGAINST (? IN NATURAL LANGUAGE MODE) > 0
ORDER BY score DESC LIMIT ?`,
[query, tenantId, query, TOP_K]
);
return (rows as any[]).map(row => ({ id: row.id, score: row.score, source: 'keyword' }));
} catch (e) {
console.error('TiDB search failed:', e);
return []; // Return empty on error to not fail the whole request
}
})(),
// Vector search against Milvus
(async (): Promise<SearchResult[]> => {
try {
await milvusClient.loadCollection({ collection_name: COLLECTION_NAME });
const searchResults = await milvusClient.search({
collection_name: COLLECTION_NAME,
vector: queryVector,
limit: TOP_K,
filter: `tenant_id == "${tenantId}"`, // Pre-filtering on scalar fields
output_fields: ["id"], // Only fetch the ID
});
await milvusClient.releaseCollection({ collection_name: COLLECTION_NAME });
return searchResults.results.map(res => ({ id: res.id, score: res.score, source: 'vector' }));
} catch (e) {
console.error('Milvus search failed:', e);
return [];
}
})()
]);
// 3. Fuse the results
const fusedResults = reciprocalRankFusion([keywordResults, vectorResults]);
// 4. Hydrate the fused results with full document data from TiDB
if (fusedResults.length === 0) {
return new Response(JSON.stringify({ data: [] }), { status: 200 });
}
const resultIds = fusedResults.slice(0, TOP_K).map(r => r.id);
const placeholders = resultIds.map(() => '?').join(',');
const [hydratedRows] = await tidbPool.query(
`SELECT id, title, category, LEFT(content, 200) as snippet FROM documents WHERE id IN (${placeholders})`,
resultIds
);
// A common mistake is to lose the fused order. We need to re-order based on the fused list.
const idToDocMap = new Map((hydratedRows as any[]).map(row => [row.id, row]));
const finalResults = resultIds.map(id => idToDocMap.get(id)).filter(Boolean);
return new Response(JSON.stringify({ data: finalResults }), {
status: 200,
headers: { 'Content-Type': 'application/json' },
});
} catch (error) {
console.error("Search API error:", error);
return new Response(JSON.stringify({ error: 'Internal Server Error' }), { status: 500 });
}
};
This endpoint demonstrates several key patterns. Promise.all
is used to execute the two different types of searches concurrently, minimizing latency. Error handling is localized to each search function so that a failure in one (e.g., Milvus is temporarily unavailable) doesn’t completely break the user experience; it can gracefully fall back to the other search method. The use of Reciprocal Rank Fusion (RRF) is a simple yet effective way to combine ranked lists from different sources without needing to normalize scores, which is a notoriously difficult problem. Finally, the results are “hydrated” in a final batch query to TiDB, which is much more efficient than fetching full document content during the initial search phases.
The system described here is not a trivial “hello world” example. It’s a robust foundation for a production-grade hybrid search system. However, it’s not without its limitations and potential future enhancements. The RRF fusion algorithm is basic; a more advanced system might use a machine-learned ranking (LTR) model to combine features from both keyword and vector scores for superior relevance. The keyword search, relying on MySQL’s full-text capabilities within TiDB, might hit performance limits at extreme scale; offloading this responsibility to a dedicated search engine like Elasticsearch could be a future iteration. Finally, the entire pipeline, from CDC latency to query orchestration performance, requires comprehensive observability with metrics, logs, and traces to identify and resolve bottlenecks in a production environment.