The project mandate was deceptively simple: unlock decades of institutional knowledge trapped in our on-premise Hadoop cluster. We’re talking petabytes of design documents, incident post-mortems, and architectural decision records, all sitting in HDFS. The business wanted a modern, conversational AI interface to query this data. A full migration to a cloud-native object store was off the table—a multi-year, multi-million dollar effort. The constraint was clear: build on top of what exists. This immediately ruled out most off-the-shelf RAG solutions that assume data lives in S3, GCS, or a tidy database.
Our initial concept was to create a bridge between the old and the new. We would leave the data at rest in HDFS but build a modern indexing and querying layer on top. This layer would need to be smart enough to handle HDFS’s peculiarities—its high-latency, batch-oriented nature—while providing a low-latency query experience expected from a modern application. The core challenge was clear: creating a production-grade LlamaIndex connector that could efficiently and safely read from HDFS without crippling the NameNode, and then serving this through a responsive front-end.
The technology selection process was driven by these constraints. Hadoop/HDFS was the immovable object. For the RAG framework, LlamaIndex was a clear choice due to its highly modular and extensible architecture, specifically the BaseReader
interface, which seemed designed for custom data source integration. For the vector store, we needed a dedicated system; querying vectors stored as flat files on HDFS would be impossibly slow. We started with an in-memory ChromaDB for the proof-of-concept. On the front-end, the choice was Vite with React and TypeScript for its unmatched developer experience and performance, coupled with Emotion for maintainable, component-scoped styling.
The architecture began to solidify:
- A Python-based indexing service responsible for periodically scanning HDFS, using a custom LlamaIndex
HDFSReader
to load documents. - A vector index populated by the service and stored in ChromaDB.
- A FastAPI server exposing a query endpoint that leverages the LlamaIndex query engine.
- A Vite/React single-page application providing the user interface.
graph TD subgraph Frontend A[Vite + React UI] end subgraph Backend API B[FastAPI Endpoint: /query] end subgraph RAG Pipeline C[LlamaIndex Query Engine] D[Vector Store: ChromaDB] E[Custom HDFSReader] end subgraph Legacy Data Store F[Hadoop HDFS Cluster] end A -- HTTP Request --> B B -- query() --> C C -- Similarity Search --> D C -- Retrieve Context --> E E -- Read Document Chunks --> F
The most critical and unknown piece was the HDFSReader
. A naive implementation could easily cause a denial-of-service on our production cluster.
The HDFSReader: A Bridge to the Past
The first step was establishing a reliable connection to HDFS from Python. The pyarrow
library is the standard for this, providing a hdfs.connect()
method. In a real-world project, this connection is never simple. Our cluster is Kerberized, requiring proper ticket management.
Here’s the initial, bare-bones structure of our reader. A common mistake is to try and list the entire directory tree recursively (hdfs.walk()
) in the load_data
method. On a cluster with millions of files, this will time out or exhaust NameNode resources.
# hdfs_reader/reader.py
import os
import logging
from typing import List, Optional, Dict
from pyarrow import hdfs
from llama_index.core.readers.base import BaseReader
from llama_index.core.schema import Document
# Setup structured logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class HDFSReader(BaseReader):
"""
A reader for Hadoop Distributed File System (HDFS).
Connects to an HDFS cluster to read files and convert them into
LlamaIndex Document objects.
A key consideration in a production environment is not to overwhelm
the HDFS NameNode. This implementation reads from a manifest file
that lists the absolute paths of files to be ingested.
"""
def __init__(
self,
host: str,
port: int,
user: Optional[str] = None,
kerb_ticket: Optional[str] = None,
extra_conf: Optional[Dict] = None,
):
"""
Initializes the HDFSReader.
Args:
host (str): The HDFS NameNode host.
port (int): The HDFS NameNode port.
user (Optional[str]): The user to connect as.
kerb_ticket (Optional[str]): Path to the Kerberos ticket cache.
extra_conf (Optional[Dict]): Extra configuration for pyarrow.
"""
self.host = host
self.port = port
self.user = user
self.kerb_ticket = kerb_ticket
self.extra_conf = extra_conf
self._fs = None
# A simple retry mechanism for connection
try:
self._connect_with_retry()
except Exception as e:
logging.error(f"Failed to connect to HDFS after multiple retries: {e}")
raise
def _connect_with_retry(self, retries=3, delay=5):
for i in range(retries):
try:
self._fs = hdfs.connect(
host=self.host,
port=self.port,
user=self.user,
kerb_ticket=self.kerb_ticket,
extra_conf=self.extra_conf
)
logging.info(f"Successfully connected to HDFS at {self.host}:{self.port}")
return
except Exception as e:
logging.warning(f"HDFS connection attempt {i+1} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
raise ConnectionError("Could not establish connection to HDFS.")
def load_data(self, file_manifest_path: str) -> List[Document]:
"""
Loads data from HDFS based on a manifest file.
The manifest file should contain one HDFS file path per line. This
avoids expensive recursive directory listing operations on the NameNode.
Args:
file_manifest_path (str): The HDFS path to the manifest file.
Returns:
List[Document]: A list of Document objects.
"""
documents = []
if not self._fs:
logging.error("HDFS filesystem not available.")
return []
try:
with self._fs.open(file_manifest_path, 'rb') as f:
manifest_content = f.read().decode('utf-8')
file_paths = [path for path in manifest_content.splitlines() if path.strip()]
logging.info(f"Found {len(file_paths)} files to process from manifest: {file_manifest_path}")
except Exception as e:
logging.error(f"Failed to read manifest file from HDFS: {e}")
return []
for path in file_paths:
try:
# Check file status once to get metadata
file_info = self._fs.info(path)
file_size = file_info.get('size', 0)
# A simple safeguard against ingesting massive files by mistake
if file_size > 100 * 1024 * 1024: # 100MB limit
logging.warning(f"Skipping large file {path} ({file_size} bytes)")
continue
with self._fs.open(path, 'rb') as f:
content = f.read().decode('utf-8', errors='ignore')
# Metadata is crucial for RAG context
metadata = {
"hdfs_path": path,
"file_name": os.path.basename(path),
"last_modified": file_info.get('last_modified'),
"source": "HDFS",
}
documents.append(Document(text=content, extra_info=metadata))
except Exception as e:
logging.error(f"Failed to read or process file {path}: {e}")
# Continue processing other files
continue
return documents
Our solution to the NameNode pressure problem was to shift the responsibility of discovery. Instead of the reader walking the filesystem, it consumes a manifest file. A separate, scheduled, and resource-managed process (like a simple MapReduce or Spark job) is responsible for generating this manifest. This decouples the high-intensity discovery phase from the data-reading phase.
The Indexing and Query Service
With the reader in place, the backend service was straightforward. We used FastAPI for its performance and simplicity. The service has two main parts: an indexing script and the query endpoint.
First, the indexing script that ties everything together. The pitfall here is memory management. Loading hundreds of thousands of documents into memory before indexing is not viable. LlamaIndex’s pipeline processes documents in a streaming fashion, which mitigates this.
# services/indexing_service.py
import os
import chromadb
import logging
from llama_index.core import VectorStoreIndex, ServiceContext
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.openai import OpenAI # For this example
from hdfs_reader.reader import HDFSReader
# --- Configuration ---
HDFS_HOST = os.getenv("HDFS_HOST", "namenode")
HDFS_PORT = int(os.getenv("HDFS_PORT", 9870))
HDFS_USER = os.getenv("HDFS_USER", "hadoop")
MANIFEST_PATH = "/data/manifests/latest_docs.txt"
CHROMA_PATH = "./chroma_db"
COLLECTION_NAME = "hdfs_docs"
EMBED_MODEL_NAME = "BAAI/bge-small-en-v1.5"
LLM_MODEL = "gpt-4-turbo" # Replace with your model
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def setup_services():
"""Initializes and returns the service context and vector store."""
logging.info(f"Initializing embedding model: {EMBED_MODEL_NAME}")
embed_model = HuggingFaceEmbedding(model_name=EMBED_MODEL_NAME)
# In a real setup, OpenAI key would be managed via a secrets manager
llm = OpenAI(model=LLM_MODEL, api_key=os.getenv("OPENAI_API_KEY"))
service_context = ServiceContext.from_defaults(
llm=llm,
embed_model=embed_model,
chunk_size=512,
chunk_overlap=50
)
db = chromadb.PersistentClient(path=CHROMA_PATH)
chroma_collection = db.get_or_create_collection(COLLECTION_NAME)
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
return service_context, vector_store
def run_indexing():
"""Runs the full indexing pipeline."""
logging.info("Starting indexing process...")
try:
reader = HDFSReader(host=HDFS_HOST, port=HDFS_PORT, user=HDFS_USER)
documents = reader.load_data(file_manifest_path=MANIFEST_PATH)
if not documents:
logging.warning("No documents found to index. Exiting.")
return
service_context, vector_store = setup_services()
logging.info(f"Creating or updating VectorStoreIndex with {len(documents)} documents.")
index = VectorStoreIndex.from_documents(
documents,
service_context=service_context,
vector_store=vector_store,
show_progress=True
)
logging.info("Indexing process completed successfully.")
except Exception as e:
logging.critical(f"An unrecoverable error occurred during indexing: {e}", exc_info=True)
if __name__ == "__main__":
run_indexing()
The query API is then built on top of this indexed data.
# services/api.py
import os
import chromadb
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from llama_index.core import VectorStoreIndex, ServiceContext, StorageContext
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.openai import OpenAI
# --- Configuration ---
# (Same as indexing_service.py)
CHROMA_PATH = "./chroma_db"
COLLECTION_NAME = "hdfs_docs"
EMBED_MODEL_NAME = "BAAI/bge-small-en-v1.5"
LLM_MODEL = "gpt-4-turbo"
# --- API Setup ---
app = FastAPI()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Global variable to hold the query engine
# In a production app, this would be managed more robustly.
query_engine = None
class QueryRequest(BaseModel):
query: str
top_k: int = 3
@app.on_event("startup")
def load_model():
"""Load the index and prepare the query engine on startup."""
global query_engine
try:
logging.info("Loading model and index on startup...")
db = chromadb.PersistentClient(path=CHROMA_PATH)
chroma_collection = db.get_collection(COLLECTION_NAME)
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
embed_model = HuggingFaceEmbedding(model_name=EMBED_MODEL_NAME)
llm = OpenAI(model=LLM_MODEL, api_key=os.getenv("OPENAI_API_KEY"))
service_context = ServiceContext.from_defaults(llm=llm, embed_model=embed_model)
index = VectorStoreIndex.from_vector_store(
vector_store,
storage_context=storage_context,
service_context=service_context,
)
query_engine = index.as_query_engine(similarity_top_k=5, response_mode="compact")
logging.info("Query engine ready.")
except Exception as e:
logging.critical(f"Failed to initialize query engine: {e}", exc_info=True)
# The app will fail to start if the engine can't load, which is desired behavior.
raise
@app.post("/query")
async def handle_query(request: QueryRequest):
if query_engine is None:
raise HTTPException(status_code=503, detail="Query engine is not available.")
try:
logging.info(f"Received query: {request.query}")
response = query_engine.query(request.query)
# In a real app, you'd want to return source nodes for citation.
source_nodes = [
{"path": node.metadata.get("hdfs_path"), "score": node.score}
for node in response.source_nodes
]
return {
"response": str(response),
"sources": source_nodes
}
except Exception as e:
logging.error(f"Error processing query '{request.query}': {e}", exc_info=True)
raise HTTPException(status_code=500, detail="An error occurred while processing the query.")
A problem surfaced during testing. Query latency was acceptable, but the context returned by the RAG pipeline wasn’t always sufficient. Because LlamaIndex fetches the text chunks associated with the top-k vectors, it’s retrieving from its own cache of the data, not hitting HDFS during the query. This is good for performance. However, our initial chunk size of 512 was often too small. Increasing it improved context but also increased indexing time and vector store size. This is a classic trade-off in RAG systems, exacerbated by the diversity of our document types. We settled on a dynamic chunking strategy for a future iteration, where different document types would get different chunking parameters.
The Front-end: A Responsive Interface
The front-end’s job is to be a clean, fast, and usable window into this complex backend. Vite, React, and Emotion are a powerful combination for this.
First, setting up the API communication layer. It’s crucial to handle loading states and errors gracefully to provide a good user experience.
// src/services/api.ts
export interface SourceNode {
path: string;
score: number;
}
export interface QueryResponse {
response: string;
sources: SourceNode[];
}
export class ApiError extends Error {
constructor(message: string) {
super(message);
this.name = 'ApiError';
}
}
export const postQuery = async (query: string): Promise<QueryResponse> => {
const response = await fetch('/api/query', { // Assuming Vite proxy is set up
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ query }),
});
if (!response.ok) {
const errorData = await response.json().catch(() => ({ detail: 'An unknown error occurred' }));
throw new ApiError(errorData.detail || 'Failed to fetch response from the server.');
}
return response.json();
};
The main UI component uses Emotion for styling. This keeps the styles colocated with the component logic, making it highly maintainable.
// src/components/ChatInterface.tsx
import React, { useState, FormEvent, useRef, useEffect } from 'react';
import styled from '@emotion/styled';
import { postQuery, QueryResponse, SourceNode } from '../services/api';
// --- Styled Components with Emotion ---
const ChatContainer = styled.div`
display: flex;
flex-direction: column;
height: 100vh;
width: 100%;
max-width: 800px;
margin: 0 auto;
background-color: #f0f2f5;
`;
const MessageList = styled.div`
flex-grow: 1;
overflow-y: auto;
padding: 20px;
display: flex;
flex-direction: column;
gap: 15px;
`;
interface MessageProps {
isUser: boolean;
}
const MessageBubble = styled.div<MessageProps>`
padding: 12px 18px;
border-radius: 20px;
max-width: 75%;
align-self: ${props => (props.isUser ? 'flex-end' : 'flex-start')};
background-color: ${props => (props.isUser ? '#0084ff' : '#e4e6eb')};
color: ${props => (props.isUser ? 'white' : 'black')};
word-wrap: break-word;
`;
const InputArea = styled.form`
display: flex;
padding: 10px;
border-top: 1px solid #ccc;
`;
const TextInput = styled.input`
flex-grow: 1;
padding: 10px 15px;
border: 1px solid #ddd;
border-radius: 20px;
margin-right: 10px;
font-size: 16px;
&:focus {
outline: none;
border-color: #0084ff;
}
`;
const SubmitButton = styled.button`
padding: 10px 20px;
border-radius: 20px;
border: none;
background-color: #0084ff;
color: white;
cursor: pointer;
&:disabled {
background-color: #a0c3ff;
cursor: not-allowed;
}
`;
// --- Component Logic ---
interface Message {
text: string;
isUser: boolean;
sources?: SourceNode[];
}
export const ChatInterface: React.FC = () => {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const [isLoading, setIsLoading] = useState(false);
const messageListRef = useRef<HTMLDivElement>(null);
useEffect(() => {
// Auto-scroll to the latest message
messageListRef.current?.scrollTo(0, messageListRef.current.scrollHeight);
}, [messages]);
const handleSubmit = async (e: FormEvent) => {
e.preventDefault();
if (!input.trim() || isLoading) return;
const userMessage: Message = { text: input, isUser: true };
setMessages(prev => [...prev, userMessage]);
setInput('');
setIsLoading(true);
try {
const data: QueryResponse = await postQuery(userMessage.text);
const aiMessage: Message = { text: data.response, isUser: false, sources: data.sources };
setMessages(prev => [...prev, aiMessage]);
} catch (error) {
const errorMessage: Message = {
text: error instanceof Error ? error.message : 'An unexpected error occurred.',
isUser: false,
};
setMessages(prev => [...prev, errorMessage]);
} finally {
setIsLoading(false);
}
};
return (
<ChatContainer>
<MessageList ref={messageListRef}>
{messages.map((msg, index) => (
<MessageBubble key={index} isUser={msg.isUser}>
{msg.text}
</MessageBubble>
))}
{isLoading && <MessageBubble isUser={false}>Thinking...</MessageBubble>}
</MessageList>
<InputArea onSubmit={handleSubmit}>
<TextInput
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Ask about the project archives..."
disabled={isLoading}
/>
<SubmitButton type="submit" disabled={isLoading}>
Send
</SubmitButton>
</InputArea>
</ChatContainer>
);
};
The final result was a functional, performant system that successfully bridged our legacy data store with a modern AI interface. It met the primary business goal without requiring a painful data migration.
However, this solution has clear boundaries and lingering issues. The indexing process is still batch-oriented. For data that changes frequently, this introduces unacceptable latency. A true real-time system would require integrating with a Change Data Capture (CDC) tool on HDFS, like Kafka connect sinks or parsing HDFS audit logs, which adds significant architectural complexity. The current security model is also simplistic; a production system would need to propagate the end-user’s credentials all the way to the HDFS read to enforce file-level permissions, a non-trivial task. Finally, the scalability of the single FastAPI instance and the ChromaDB store is limited. A production deployment would require moving to a distributed vector database like Milvus or Weaviate and deploying the API server on a scalable platform like Kubernetes.