The challenge was unambiguous: unlock decades of unstructured text data—technical manuals, project reports, operational logs—siloed within a multi-petabyte Hadoop HDFS cluster for use with modern Large Language Models. The business objective was to build a Retrieval-Augmented Generation (RAG) system, allowing internal engineering teams to perform natural language queries against this massive, dormant knowledge base. A direct interface was out of the question due to data volume, security constraints, and the prohibitive cost of feeding terabytes of context to an LLM API.
The architectural constraints were equally rigid. The solution had to be performant enough to handle batch indexing of millions of documents, deployable within a CI/CD environment that strictly forbids Docker daemons, and maintainable by a team with mixed expertise in both Java/Scala (the Hadoop ecosystem) and modern backend development.
Defining the Architectural Crossroads
Two primary architectural paths presented themselves. Each carried significant trade-offs concerning performance, complexity, and long-term operational cost.
Solution A: The Python Monolith
The most direct path involved building the entire ingestion pipeline in Python. This approach leverages the native ecosystem of tools like LlamaIndex and LangChain.
graph TD subgraph Python Service Container A[HDFS Client Lib] --> B(Data Transformation); B --> C{LlamaIndex Core}; C --> D[Vector Store]; end HDFS[(HDFS Cluster)] --> A; style HDFS fill:#f9f,stroke:#333,stroke-width:2px
Pros:
- Ecosystem Homogeneity: LlamaIndex is Python-native. Keeping the entire stack in Python simplifies dependency management and reduces context switching for developers working on the core AI logic.
- Rapid Prototyping: A vast collection of Python libraries for HDFS access (
hdfs
,pyarrow
), data manipulation (pandas
), and AI orchestration makes initial development exceptionally fast.
Cons:
- Performance Ceiling: In a real-world project, reading and processing millions of small-to-medium-sized files from HDFS is an I/O and CPU-bound task. Python’s Global Interpreter Lock (GIL) poses a significant bottleneck for true parallelism on multi-core machines, even with
multiprocessing
. Async I/O helps but doesn’t solve the compute-bound transformation steps. - HDFS Client Stability: Python HDFS clients, which often wrap the
webhdfs
REST API or use JNI bridges, can introduce performance overhead and become a point of instability, especially when dealing with complex Hadoop security configurations like Kerberos. - Deployment Dependency: The standard deployment artifact is a Docker container. This ties our CI/CD process to a running Docker daemon, which was explicitly forbidden due to security policies and resource contention on build agents.
Solution B: The Polyglot Go-Centric Architecture
This alternative proposes a separation of concerns, using the right tool for each specific job. A high-performance Go service is responsible for the heavy lifting of data extraction and pre-processing, communicating with a specialized Python service that handles only the final LLM-specific indexing.
graph TD subgraph Go Ingestor Container direction LR A[Native Go HDFS Client] --> B{Parallel Processing Worker Pool}; B --> C[gRPC Client]; end subgraph Python Indexer Container direction LR D[gRPC Server] --> E{LlamaIndex Core}; E --> F[Vector Store]; end subgraph User Interaction Flow direction LR G[Browser UI] --> H{Go Query Service}; H --> F; end HDFS[(HDFS Cluster)] --> A; C -- gRPC --> D; style HDFS fill:#f9f,stroke:#333,stroke-width:2px
Pros:
- I/O and Concurrency Performance: Go’s goroutines and statically-typed nature are purpose-built for high-throughput, concurrent I/O operations. A native Go HDFS client can communicate directly with NameNodes and DataNodes, bypassing REST API overhead and offering superior performance and control.
- Daemonless Container Builds: Go compiles to a single, static binary. This artifact is perfect for
Buildah
, which can construct a minimalscratch
-based container image without requiring a daemon, aligning perfectly with our infrastructure constraints. - Robustness and Maintainability: The separation is clean. The Go service is a durable, high-performance data plumbing utility. The Python service is a focused, potentially more volatile component that can be updated or swapped independently as the LLM landscape evolves.
Cons:
- Increased Complexity: A polyglot system introduces a new failure point: the network boundary between the services (managed via gRPC). It also requires expertise in both Go and Python, and a well-defined contract (the
.proto
file) must be maintained. - Development Overhead: The initial setup is more involved than a single Python script. Defining protobuf messages, generating stubs, and implementing gRPC clients/servers adds to the initial workload.
The Decision and Rationale
We selected Solution B. The critical factor was long-term scalability and operational robustness. In a system expected to process petabytes, the performance limitations of the Python-only approach were not a risk we were willing to take. The overhead of managing a gRPC interface was deemed a worthwhile trade-off for the raw performance gains and the deployment flexibility afforded by the Go/Buildah combination. A common mistake is to optimize for initial development speed at the expense of production performance, leading to costly rewrites. This architecture confronts the performance challenge head-on.
The Babel
keyword, which might seem out of place, fits into this architecture as a concern for the front-end application. A sophisticated user interface for data scientists to construct complex queries, visualize results, and manage indexing jobs would be built using a modern JavaScript framework. Babel
is a key part of that toolchain, transpiling ESNext code for browser compatibility. It consumes the API exposed by a separate Go query service, but the core ingestion pipeline architecture remains independent of it.
Core Implementation: The Go Ingestor Service
The heart of the solution is the Go service responsible for connecting to HDFS, orchestrating the processing, and communicating with the Python indexer.
1. Configuration and HDFS Connection Management
A production-grade service starts with robust configuration. We use a simple YAML structure and a library like Viper
to manage it.
config/config.yml
:
hadoop:
namenodes:
- "nn1.hadoop.example.com:8020"
- "nn2.hadoop.example.com:8020"
user: "gouser"
kerberos:
enabled: true
keytab_path: "/etc/security/keytabs/gouser.keytab"
krb5_conf_path: "/etc/krb5.conf"
realm: "EXAMPLE.COM"
principal: "gouser/[email protected]"
ingestion:
source_path: "/user/raw_docs/text_data/"
worker_pool_size: 32
file_batch_size: 100
grpc_client:
indexer_address: "llama-indexer.service.local:50051"
timeout_seconds: 120
logging:
level: "info"
format: "json"
The Go code for initializing the HDFS client must handle Kerberos authentication, which is non-negotiable in an enterprise Hadoop environment.
internal/hadoop/client.go
:
package hadoop
import (
"fmt"
"log/slog"
"os"
"time"
"github.com/colinmarc/hdfs/v2"
"github.com/colinmarc/hdfs/v2/krb"
)
// Config holds all necessary HDFS configuration.
type Config struct {
Namenodes []string
User string
Kerberos KerberosConfig
}
// KerberosConfig defines Kerberos authentication parameters.
type KerberosConfig struct {
Enabled bool
KeytabPath string
Krb5ConfPath string
Realm string
Principal string
}
// NewClient creates and returns a new HDFS client.
// It handles both simple and Kerberos authentication.
func NewClient(cfg Config) (*hdfs.Client, error) {
options := hdfs.ClientOptions{
Addresses: cfg.Namenodes,
User: cfg.User,
}
if cfg.Kerberos.Enabled {
slog.Info("Kerberos authentication enabled", "principal", cfg.Kerberos.Principal)
// A common pitfall is assuming keytabs or config files exist.
// Always check for file existence before proceeding.
if _, err := os.Stat(cfg.Kerberos.KeytabPath); os.IsNotExist(err) {
return nil, fmt.Errorf("kerberos keytab file not found at %s", cfg.Kerberos.KeytabPath)
}
if _, err := os.Stat(cfg.Kerberos.Krb5ConfPath); os.IsNotExist(err) {
return nil, fmt.Errorf("krb5.conf file not found at %s", cfg.Kerberos.Krb5ConfPath)
}
kerberosClient, err := krb.NewClientFromKeytab(
cfg.Kerberos.Principal,
cfg.Kerberos.Realm,
cfg.Kerberos.KeytabPath,
)
if err != nil {
return nil, fmt.Errorf("failed to create kerberos client from keytab: %w", err)
}
// Set a timeout for the entire dial process.
kerberosClient.SetTimeout(20 * time.Second)
options.KerberosClient = kerberosClient
options.KerberosServicePrincipleName = "hdfs" // Default for HDFS
}
slog.Info("Connecting to HDFS NameNode(s)", "namenodes", cfg.Namenodes)
client, err := hdfs.NewClient(options)
if err != nil {
return nil, fmt.Errorf("failed to create HDFS client: %w", err)
}
// Perform a simple read operation to verify the connection.
_, err = client.ReadDir("/")
if err != nil {
client.Close()
return nil, fmt.Errorf("failed to verify HDFS connection with ReadDir: %w", err)
}
slog.Info("Successfully connected to HDFS")
return client, nil
}
2. The Worker Pool for Parallel Ingestion
To process files concurrently, we implement a worker pool pattern. A main goroutine walks the HDFS directory tree and dispatches file paths to a channel. A pool of worker goroutines consumes from this channel, reads the file content, performs any pre-processing, and sends the result to the gRPC client.
internal/ingestor/pipeline.go
:
package ingestor
import (
"context"
"fmt"
"io"
"log/slog"
"sync"
"time"
"github.com/colinmarc/hdfs/v2"
"github.com/my-org/ingestor/internal/indexer" // Our gRPC client package
)
// Document represents a single piece of content to be indexed.
type Document struct {
SourcePath string
Content []byte
Metadata map[string]string
}
// Pipeline orchestrates the ingestion process.
type Pipeline struct {
hdfsClient *hdfs.Client
indexerClient indexer.IndexerClient // gRPC client interface
cfg Config
logger *slog.Logger
}
// Config for the pipeline.
type Config struct {
SourcePath string
WorkerPoolSize int
}
func NewPipeline(hdfsClient *hdfs.Client, indexerClient indexer.IndexerClient, cfg Config, logger *slog.Logger) *Pipeline {
return &Pipeline{
hdfsClient: hdfsClient,
indexerClient: indexerClient,
cfg: cfg,
logger: logger,
}
}
// Run starts the entire ingestion workflow.
func (p *Pipeline) Run(ctx context.Context) error {
pathsChan := make(chan string, p.cfg.WorkerPoolSize*2)
docsChan := make(chan Document, p.cfg.WorkerPoolSize)
var wg sync.WaitGroup
// Start the file walkers
go func() {
defer close(pathsChan)
err := p.walkHDFS(p.cfg.SourcePath, pathsChan)
if err != nil {
p.logger.Error("HDFS walk failed", "error", err)
// In a real system, you'd need a way to signal this error back.
}
}()
// Start the file reader workers
for i := 0; i < p.cfg.WorkerPoolSize; i++ {
wg.Add(1)
go p.fileReaderWorker(ctx, i, &wg, pathsChan, docsChan)
}
// Start a goroutine to wait for all readers to finish, then close docsChan.
go func() {
wg.Wait()
close(docsChan)
}()
// Process and send documents to the indexer
// This part could also be parallelized if the gRPC server can handle it.
return p.processDocuments(ctx, docsChan)
}
// fileReaderWorker reads file paths from a channel, fetches content from HDFS.
func (p *Pipeline) fileReaderWorker(ctx context.Context, id int, wg *sync.WaitGroup, paths <-chan string, docs chan<- Document) {
defer wg.Done()
logger := p.logger.With("worker_id", id)
logger.Info("Starting file reader worker")
for path := range paths {
select {
case <-ctx.Done():
logger.Warn("Context cancelled, shutting down worker")
return
default:
// continue
}
logger.Debug("Processing file", "path", path)
content, err := p.readFile(path)
if err != nil {
logger.Error("Failed to read file from HDFS", "path", path, "error", err)
continue // Skip problematic files
}
docs <- Document{
SourcePath: path,
Content: content,
Metadata: map[string]string{
"hdfs_path": path,
"ingested_at": time.Now().UTC().Format(time.RFC3339),
},
}
}
logger.Info("File reader worker shutting down")
}
func (p *Pipeline) readFile(path string) ([]byte, error) {
file, err := p.hdfsClient.Open(path)
if err != nil {
return nil, fmt.Errorf("failed to open HDFS file %s: %w", path, err)
}
defer file.Close()
// In production, you'd add constraints on file size.
// Reading a 10GB file into memory is a recipe for disaster.
// This example assumes reasonably sized text documents.
return io.ReadAll(file)
}
// walkHDFS recursively finds files and sends their paths to a channel.
func (p *Pipeline) walkHDFS(root string, paths chan<- string) error {
// ... implementation for hdfsClient.Walk ...
// For brevity, this is omitted but would use hdfs.Walk to traverse directories.
// It's crucial to handle errors during the walk gracefully.
return nil
}
// processDocuments consumes documents and sends them to the indexer via gRPC.
func (p *Pipeline) processDocuments(ctx context.Context, docs <-chan Document) error {
// This function would batch documents and call the gRPC client.
// Batching is critical for performance to reduce network round trips.
// ... gRPC client call implementation omitted for brevity ...
return nil
}
3. Defining the gRPC Contract
The interface between the Go and Python services is defined in a .proto
file. This contract is the source of truth for communication.
protos/indexer.proto
:
syntax = "proto3";
package indexer;
option go_package = "github.com/my-org/ingestor/gen/go/indexer";
service Indexer {
// IndexDocuments processes a stream of documents and adds them to the vector store.
rpc IndexDocuments(stream Document) returns (IndexResponse);
}
message Document {
string source_path = 1;
bytes content = 2;
map<string, string> metadata = 3;
}
message IndexResponse {
uint32 indexed_count = 1;
bool success = 2;
string error_message = 3;
}
From this file, we generate Go client code and Python server code using protoc
. This strongly-typed interface prevents a wide class of integration errors that are common with REST/JSON APIs.
The Python LlamaIndex Service
The Python service is deliberately minimal. Its only job is to expose a gRPC endpoint, receive documents, and pass them to LlamaIndex.
indexer_server.py
:
import grpc
import logging
from concurrent import futures
# Import generated protobuf stubs
import indexer_pb2
import indexer_pb2_grpc
# Import LlamaIndex components
from llama_index.core import Document, VectorStoreIndex, StorageContext
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core.node_parser import SentenceSplitter
import chromadb
# Basic logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class IndexerService(indexer_pb2_grpc.IndexerServicer):
"""
gRPC service to handle document indexing with LlamaIndex.
"""
def __init__(self, vector_store):
self.vector_store = vector_store
self.node_parser = SentenceSplitter(chunk_size=1024, chunk_overlap=100)
def IndexDocuments(self, request_iterator, context):
"""
Receives a stream of documents and indexes them.
A real-world implementation should handle transactions or batching more carefully.
"""
logging.info("Received IndexDocuments request stream.")
count = 0
documents_to_process = []
try:
for doc_proto in request_iterator:
# The pitfall here is processing one by one. It's better to batch them
# for more efficient processing by LlamaIndex.
doc = Document(
text=doc_proto.content.decode('utf-8', errors='ignore'),
metadata=dict(doc_proto.metadata)
)
documents_to_process.append(doc)
count += 1
logging.debug(f"Received document: {doc_proto.source_path}")
if documents_to_process:
logging.info(f"Processing batch of {len(documents_to_process)} documents.")
storage_context = StorageContext.from_defaults(vector_store=self.vector_store)
# This is where the core LlamaIndex work happens.
# It involves embedding generation and storage.
VectorStoreIndex.from_documents(
documents_to_process,
storage_context=storage_context,
node_parser=self.node_parser,
show_progress=True
)
logging.info("Batch successfully indexed.")
return indexer_pb2.IndexResponse(indexed_count=count, success=True)
except Exception as e:
logging.error(f"An error occurred during indexing: {e}", exc_info=True)
context.set_details(f"Indexing failed: {e}")
context.set_code(grpc.StatusCode.INTERNAL)
return indexer_pb2.IndexResponse(indexed_count=0, success=False, error_message=str(e))
def serve():
"""Starts the gRPC server."""
logging.info("Initializing vector store...")
# In a production setup, ChromaDB would be a persistent, standalone service.
chroma_client = chromadb.PersistentClient(path="/var/data/chroma_db")
chroma_collection = chroma_client.get_or_create_collection("hdfs_docs")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
indexer_pb2_grpc.add_IndexerServicer_to_server(
IndexerService(vector_store=vector_store), server
)
server.add_insecure_port('[::]:50051')
logging.info("Starting gRPC server on port 50051.")
server.start()
server.wait_for_termination()
if __name__ == '__main__':
serve()
Building the Go Service with Buildah
The final piece of the puzzle is containerizing the Go service without a Docker daemon. Buildah
allows us to do this with simple shell scripts, ideal for CI environments.
build.sh
:
#!/bin/bash
set -eo pipefail
IMAGE_NAME="registry.example.com/ingestion/hdfs-go-ingestor"
IMAGE_TAG="1.0.0"
# 1. Use a builder container to compile the Go application.
# This ensures a reproducible build environment.
echo "--- Building Go binary ---"
builder=$(buildah from golang:1.21-alpine)
buildah copy $builder . /src
buildah config --workingdir /src $builder
# The CGO_ENABLED=0 and -installsuffix cgo flags are crucial for creating a
# truly static binary that can run in a 'scratch' image.
buildah run $builder -- sh -c "CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o /app/ingestor ./cmd/ingestor/"
# 2. Create the final, minimal image from scratch.
echo "--- Building final image ---"
final_image=$(buildah from scratch)
# Copy the compiled binary from the builder container.
# This is the core of a multi-stage build using Buildah.
buildah copy --from $builder $final_image /app/ingestor /usr/local/bin/ingestor
# Copy necessary files like Kerberos config and keytabs.
# In a real pipeline, these would be mounted as secrets, not baked into the image.
# For demonstration, we copy them.
buildah copy $final_image ./config/config.yml /etc/ingestor/config.yml
buildah copy $final_image /path/to/krb5.conf /etc/krb5.conf
buildah config --cmd "/usr/local/bin/ingestor -config /etc/ingestor/config.yml" $final_image
buildah config --author "SysArchitect" $final_image
buildah config --label name="hdfs-go-ingestor" $final_image
# 3. Commit the final image.
echo "--- Pushing image to registry ---"
buildah commit $final_image ${IMAGE_NAME}:${IMAGE_TAG}
buildah push ${IMAGE_NAME}:${IMAGE_TAG}
# 4. Clean up containers.
echo "--- Cleaning up ---"
buildah rm $builder
buildah rm $final_image
This script is self-contained, auditable, and runs without any privileged daemon, addressing the core infrastructure requirement.
Architectural Limitations and Future Iterations
This architecture, while robust for batch processing, has clear boundaries. It is not designed for real-time indexing; a change to a document in HDFS will not be reflected until the next scheduled batch run. Integrating a CDC (Change Data Capture) mechanism or a message queue like Kafka would be the next logical step to address this limitation.
The gRPC interface, while efficient, introduces a single point of failure. A production deployment would require load balancing across multiple Python indexer instances and implementing retry logic with exponential backoff in the Go client. Furthermore, distributed tracing across the Go and Python services is essential for debugging performance issues in this polyglot environment.
Finally, the error handling is currently service-level. A more refined implementation would include a dead-letter queue for documents that fail to process, allowing for manual inspection and reprocessing without halting the entire pipeline. The current approach of logging and skipping is pragmatic for an initial build but insufficient for a mission-critical system.