The initial Python prototype for our Retrieval-Augmented Generation (RAG) system hit a performance wall. A simple vector search against Milvus was fast enough, but it lacked the contextual depth our domain required. Our knowledge base isn’t a flat collection of documents; it’s an interconnected web of technical specifications, design documents, and incident reports. A query for a “V-engine fuel injector failure” needs to retrieve not just documents mentioning that phrase, but also specifications for related components, parent system design notes, and historical reports from similar engine models. This required a hybrid approach, but the prototype, which serially queried Milvus, then Dgraph for relationships, and finally Couchbase for full documents, clocked in at over 400ms per query. For a user-facing interactive system, this was unacceptable. The mandate was a sub-50ms P99 latency for the entire retrieval pipeline. This forced a ground-up rewrite, leading us to Rust and a concurrent, federated query architecture.
Our core architectural decision was to build a stateless Rust service that acts as a federation layer over three distinct databases, each chosen for its specific strengths.
- Milvus: The clear choice for high-throughput, low-latency approximate nearest neighbor (ANN) search on high-dimensional vectors. It would serve as the entry point for semantic retrieval.
- Dgraph: Our knowledge graph, containing metadata and relationships between documents, components, and systems, is stored here. Its GraphQL+- interface is perfect for traversing complex entity connections that a vector search alone would miss.
- Couchbase: A high-performance document database acting as our “source of truth” store. It holds the full text and metadata for every document chunk, addressable by a stable key. Its key-value speed is critical for the final hydration step.
The orchestration of these three systems demanded performance and reliability that pointed directly to Rust. Its fearless concurrency, compile-time guarantees, and lack of a garbage collector pause were non-negotiable for meeting our strict latency budget. For the internal administration dashboard—a tool for visualizing the knowledge graph and debugging retrieval results—we needed an equally performant developer experience. The sluggishness of our previous Webpack-based toolchain was a constant drag. esbuild
was selected for its near-instantaneous build times, allowing for rapid iteration on what became a surprisingly complex UI.
The foundation of the Rust service is its Cargo.toml
. In a real-world project, dependency management is the first line of defense for stability.
[package]
name = "federated-rag-engine"
version = "0.1.0"
edition = "2021"
[dependencies]
# Core async runtime
tokio = { version = "1", features = ["full"] }
# gRPC communication for Milvus and Dgraph
tonic = "0.10"
prost = "0.12"
# Couchbase SDK
couchbase = "1.0.0-alpha.12"
futures = "0.3"
# Serialization and Deserialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# Configuration management
config = "0.13"
# Logging and tracing
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
[build-dependencies]
tonic-build = "0.10"
The core logic of the federation service revolves around a multi-stage, concurrent query execution flow. A request comes in with a query vector, and the service orchestrates a fan-out/gather pattern to build a rich context for the language model.
graph TD A[Incoming gRPC Request w/ Query Vector] --> B{Federation Service}; B --> C[Phase 1: Vector Search]; C --> D{Milvus}; D --> E[Top-K Chunk IDs]; B --> F[Phase 2: Graph Expansion]; E --> F; F --> G{Dgraph}; G --> H[Related Entity IDs]; B --> I[Phase 3: Document Hydration]; E --> I; H --> I; I --> J{Couchbase}; J --> K[Full Text Chunks]; K --> L[Phase 4: Rerank & Aggregate]; L --> M[Final Context Block]; M --> B; B --> N[gRPC Response];
The first step is establishing robust, configurable clients for each database. A common mistake is to hardcode connection details. We use the config
crate to manage this from a Settings.toml
file, allowing for different configurations in development, staging, and production without code changes.
// src/config.rs
use serde::Deserialize;
#[derive(Debug, Deserialize)]
pub struct MilvusSettings {
pub uri: String,
pub collection_name: String,
}
#[derive(Debug, Deserialize)]
pub struct DgraphSettings {
pub endpoint: String,
}
#[derive(Debug, Deserialize)]
pub struct CouchbaseSettings {
pub connection_string: String,
pub username: String,
pub password: String,
pub bucket_name: String,
}
#[derive(Debug, Deserialize)]
pub struct Settings {
pub milvus: MilvusSettings,
pub dgraph: DgraphSettings,
pub couchbase: CouchbaseSettings,
pub grpc_server_address: String,
}
impl Settings {
pub fn new() -> Result<Self, config::ConfigError> {
let s = config::Config::builder()
.add_source(config::File::with_name("config/default"))
// e.g., `APP_GRPC_SERVER_ADDRESS=0.0.0.0:50052`
.add_source(config::Environment::with_prefix("APP"))
.build()?;
s.try_deserialize()
}
}
The core of the application is the FederationService
. Let’s define the main structures and the gRPC service interface using protobuf.
File: proto/federation.proto
syntax = "proto3";
package federation;
service FederatedRetriever {
rpc GetContext(GetContextRequest) returns (GetContextResponse);
}
message GetContextRequest {
repeated float query_vector = 1;
int32 top_k = 2;
// Metadata filters can be added here
}
message DocumentChunk {
string id = 1;
string content = 2;
double score = 3;
map<string, string> metadata = 4;
}
message GetContextResponse {
repeated DocumentChunk chunks = 1;
}
Now, we implement the service in Rust. The get_context
function is where the orchestration happens. The critical pattern here is using tokio::spawn
and futures::future::join_all
to execute independent I/O operations concurrently.
// src/service.rs
use crate::clients::{MilvusClient, DgraphClient, CouchbaseClient};
use futures::future::join_all;
use std::collections::{HashMap, HashSet};
use tonic::{Request, Response, Status};
// Protobuf generated structs
use federation_proto::federated_retriever_server::FederatedRetriever;
use federation_proto::{GetContextRequest, GetContextResponse, DocumentChunk};
pub struct FederationService {
milvus: MilvusClient,
dgraph: DgraphClient,
couchbase: CouchbaseClient,
}
impl FederationService {
// Constructor to initialize with clients
pub fn new(milvus: MilvusClient, dgraph: DgraphClient, couchbase: CouchbaseClient) -> Self {
Self { milvus, dgraph, couchbase }
}
}
#[tonic::async_trait]
impl FederatedRetriever for FederationService {
async fn get_context(
&self,
request: Request<GetContextRequest>,
) -> Result<Response<GetContextResponse>, Status> {
let inner_req = request.into_inner();
let query_vector = inner_req.query_vector;
let top_k = inner_req.top_k;
// Phase 1: Vector Search in Milvus
// This is the initial retrieval step based on semantic similarity.
let initial_results = self.milvus.search(query_vector, top_k as i64).await
.map_err(|e| Status::internal(format!("Milvus search failed: {}", e)))?;
let initial_ids: HashSet<String> = initial_results.keys().cloned().collect();
// Phase 2: Concurrent Graph Expansion in Dgraph
// For each result from Milvus, we look for related entities.
// A common pitfall here is a N+1 query problem. We batch this by spawning
// a Tokio task for each ID.
let graph_expansion_tasks: Vec<_> = initial_ids.iter()
.map(|id| {
let dgraph_clone = self.dgraph.clone();
let id_clone = id.clone();
tokio::spawn(async move {
dgraph_clone.find_related_docs(&id_clone).await
})
})
.collect();
let expansion_results = join_all(graph_expansion_tasks).await;
let mut all_ids_to_fetch = initial_ids;
for result in expansion_results {
match result {
Ok(Ok(related_ids)) => {
for id in related_ids {
all_ids_to_fetch.insert(id);
}
},
Ok(Err(e)) => tracing::error!("Dgraph expansion task failed: {}", e),
Err(e) => tracing::error!("Tokio task for Dgraph failed: {}", e),
}
}
// Phase 3: Batch Document Hydration from Couchbase
// We now have a unique set of IDs. We fetch all their content in one go.
let final_ids: Vec<String> = all_ids_to_fetch.into_iter().collect();
let documents = self.couchbase.get_many(&final_ids).await
.map_err(|e| Status::internal(format!("Couchbase get_many failed: {}", e)))?;
// Phase 4: Rerank and Aggregate
// The scoring logic here is rudimentary. A production system would use a more
// sophisticated reranker model. For now, we prioritize original vector search hits.
let chunks = documents.into_iter().map(|(id, content)| {
let score = initial_results.get(&id).copied().unwrap_or(0.5); // Downgrade score for expanded docs
DocumentChunk {
id,
content,
score,
metadata: HashMap::new(), // Metadata would be fetched as well
}
}).collect();
let response = GetContextResponse { chunks };
Ok(Response::new(response))
}
}
The implementation of each client is crucial. It must be Clone
-able so it can be shared across async tasks without expensive re-initialization. This is typically achieved by wrapping the actual client connection pool in an Arc
.
Here’s a sketch of the Dgraph client. The key is the GraphQL+- query that traverses the graph from a starting document node.
// src/clients/dgraph.rs
use dgraph_tonic::{Client, Query};
use std::sync::Arc;
use serde::Deserialize;
#[derive(Clone)]
pub struct DgraphClient {
client: Arc<Client>,
}
#[derive(Deserialize)]
struct RelatedDocsResponse {
related: Vec<Node>,
}
#[derive(Deserialize)]
struct Node {
uid: String,
}
impl DgraphClient {
pub async fn new(endpoint: &str) -> Result<Self, Box<dyn std::error::Error>> {
let client = Client::new(endpoint).await?;
Ok(Self { client: Arc::new(client) })
}
// A real implementation would have more robust error handling and response parsing.
pub async fn find_related_docs(&self, doc_id: &str) -> Result<Vec<String>, String> {
let mut txn = self.client.new_readonly_txn();
let query_str = format!(
r#"
query related($id: string) {{
var(func: eq(doc_id, $id)) {{
~belongs_to @normalize {{
component_of @normalize {{
related_spec as ~specification_for
}}
}}
}}
related(func: uid(related_spec)) {{
uid
}}
}}
"#
);
let mut query = Query::with_vars(query_str, [("$id".into(), doc_id.into())]);
let response = txn.query(query).await.map_err(|e| e.to_string())?;
let data: RelatedDocsResponse = serde_json::from_slice(&response.json)
.map_err(|e| format!("Failed to parse Dgraph response: {}", e))?;
Ok(data.related.into_iter().map(|n| n.uid).collect())
}
}
The Couchbase client uses the official Rust SDK’s get_many
feature, which is heavily optimized for batch key-value lookups, a perfect fit for our hydration phase.
// src/clients/couchbase.rs
use couchbase::{Cluster, GetOptions, MultiGetResult};
use futures::stream::StreamExt;
use std::sync::Arc;
#[derive(Clone)]
pub struct CouchbaseClient {
cluster: Arc<Cluster>,
bucket_name: String,
}
impl CouchbaseClient {
pub async fn new(conn_str: &str, user: &str, pass: &str, bucket: &str) -> Result<Self, couchbase::Error> {
let cluster = Cluster::connect(conn_str, user, pass).await?;
let _ = cluster.bucket(bucket).wait_until_ready(std::time::Duration::from_secs(5)).await?;
Ok(Self {
cluster: Arc::new(cluster),
bucket_name: bucket.to_string(),
})
}
// This method leverages the SDK's ability to perform parallel GET operations.
pub async fn get_many(&self, ids: &[String]) -> Result<Vec<(String, String)>, couchbase::Error> {
let bucket = self.cluster.bucket(&self.bucket_name);
let collection = bucket.default_collection();
let mut results = Vec::with_capacity(ids.len());
let mut stream = collection.get_many(ids.to_vec(), GetOptions::default());
while let Some(result) = stream.next().await {
match result {
Ok(res) => {
if let Ok(content) = res.content::<String>() {
results.push((res.id().to_string(), content));
} else {
tracing::warn!("Failed to decode content for key: {}", res.id());
}
},
Err(e) => {
// In a production system, we might want to collect partial results
// instead of failing the whole batch.
tracing::error!("Couchbase get operation failed for a key: {}", e);
}
}
}
Ok(results)
}
}
On the frontend, the administrative UI required a simple yet powerful build tool. Our esbuild
script replaced a multi-file Webpack configuration, drastically improving build times.
File: ui/build.js
const esbuild = require('esbuild');
const path = require('path');
const isProduction = process.env.NODE_ENV === 'production';
esbuild.build({
entryPoints: [path.resolve(__dirname, 'src/index.tsx')],
bundle: true,
minify: isProduction,
sourcemap: !isProduction,
outfile: path.resolve(__dirname, 'dist/bundle.js'),
define: {
'process.env.NODE_ENV': `"${process.env.NODE_ENV}"`,
'process.env.API_ENDPOINT': `"${process.env.API_ENDPOINT || 'http://localhost:8080'}"`
},
loader: {
'.ts': 'tsx'
},
// For live reloading during development
watch: !isProduction ? {
onRebuild(error, result) {
if (error) console.error('watch build failed:', error)
else console.log('watch build succeeded:', result)
},
} : false,
}).catch(() => process.exit(1));
console.log(`[esbuild] starting ${isProduction ? 'production' : 'watch'} build...`);
This script is executed via package.json
scripts. The speed increase was not trivial; full rebuilds dropped from over 15 seconds to under 200 milliseconds. This tight feedback loop was invaluable when building complex data visualizations for our knowledge graph.
The final system, benchmarked with ghz
against the gRPC endpoint, achieved a P99 latency of 42ms under a load of 500 concurrent requests, successfully meeting our target. The key was the move to Rust and the fully concurrent execution model for the fan-out stages of retrieval. The parallel graph expansion and document hydration steps effectively masked the I/O latency of the individual databases.
However, this architecture is not without its own set of trade-offs. The current implementation uses a simple “fail-fast” error handling strategy. If the Dgraph query fails for one of the initial vector search results, that entire branch of context is lost. A more resilient implementation might return partial results, allowing the downstream LLM to work with what’s available. Furthermore, the data consistency between the three systems is maintained by an out-of-band ETL process. For true real-time synchronization, a Change Data Capture (CDC) pipeline would be necessary, but that introduces significant operational complexity. The current reranking logic is also naive; a future path involves integrating a lightweight cross-encoder model as a final reranking step within the Rust service itself, which would add computational cost but likely yield significant relevance gains.