Implementing a LlamaIndex HDFS Reader for Real-time RAG on a Legacy Big Data Cluster


The project mandate was deceptively simple: unlock decades of institutional knowledge trapped in our on-premise Hadoop cluster. We’re talking petabytes of design documents, incident post-mortems, and architectural decision records, all sitting in HDFS. The business wanted a modern, conversational AI interface to query this data. A full migration to a cloud-native object store was off the table—a multi-year, multi-million dollar effort. The constraint was clear: build on top of what exists. This immediately ruled out most off-the-shelf RAG solutions that assume data lives in S3, GCS, or a tidy database.

Our initial concept was to create a bridge between the old and the new. We would leave the data at rest in HDFS but build a modern indexing and querying layer on top. This layer would need to be smart enough to handle HDFS’s peculiarities—its high-latency, batch-oriented nature—while providing a low-latency query experience expected from a modern application. The core challenge was clear: creating a production-grade LlamaIndex connector that could efficiently and safely read from HDFS without crippling the NameNode, and then serving this through a responsive front-end.

The technology selection process was driven by these constraints. Hadoop/HDFS was the immovable object. For the RAG framework, LlamaIndex was a clear choice due to its highly modular and extensible architecture, specifically the BaseReader interface, which seemed designed for custom data source integration. For the vector store, we needed a dedicated system; querying vectors stored as flat files on HDFS would be impossibly slow. We started with an in-memory ChromaDB for the proof-of-concept. On the front-end, the choice was Vite with React and TypeScript for its unmatched developer experience and performance, coupled with Emotion for maintainable, component-scoped styling.

The architecture began to solidify:

  1. A Python-based indexing service responsible for periodically scanning HDFS, using a custom LlamaIndex HDFSReader to load documents.
  2. A vector index populated by the service and stored in ChromaDB.
  3. A FastAPI server exposing a query endpoint that leverages the LlamaIndex query engine.
  4. A Vite/React single-page application providing the user interface.
graph TD
    subgraph Frontend
        A[Vite + React UI]
    end

    subgraph Backend API
        B[FastAPI Endpoint: /query]
    end

    subgraph RAG Pipeline
        C[LlamaIndex Query Engine]
        D[Vector Store: ChromaDB]
        E[Custom HDFSReader]
    end

    subgraph Legacy Data Store
        F[Hadoop HDFS Cluster]
    end

    A -- HTTP Request --> B
    B -- query() --> C
    C -- Similarity Search --> D
    C -- Retrieve Context --> E
    E -- Read Document Chunks --> F

The most critical and unknown piece was the HDFSReader. A naive implementation could easily cause a denial-of-service on our production cluster.

The HDFSReader: A Bridge to the Past

The first step was establishing a reliable connection to HDFS from Python. The pyarrow library is the standard for this, providing a hdfs.connect() method. In a real-world project, this connection is never simple. Our cluster is Kerberized, requiring proper ticket management.

Here’s the initial, bare-bones structure of our reader. A common mistake is to try and list the entire directory tree recursively (hdfs.walk()) in the load_data method. On a cluster with millions of files, this will time out or exhaust NameNode resources.

# hdfs_reader/reader.py

import os
import logging
from typing import List, Optional, Dict

from pyarrow import hdfs
from llama_index.core.readers.base import BaseReader
from llama_index.core.schema import Document

# Setup structured logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class HDFSReader(BaseReader):
    """
    A reader for Hadoop Distributed File System (HDFS).

    Connects to an HDFS cluster to read files and convert them into
    LlamaIndex Document objects.

    A key consideration in a production environment is not to overwhelm
    the HDFS NameNode. This implementation reads from a manifest file

    that lists the absolute paths of files to be ingested.
    """
    def __init__(
        self,
        host: str,
        port: int,
        user: Optional[str] = None,
        kerb_ticket: Optional[str] = None,
        extra_conf: Optional[Dict] = None,
    ):
        """
        Initializes the HDFSReader.

        Args:
            host (str): The HDFS NameNode host.
            port (int): The HDFS NameNode port.
            user (Optional[str]): The user to connect as.
            kerb_ticket (Optional[str]): Path to the Kerberos ticket cache.
            extra_conf (Optional[Dict]): Extra configuration for pyarrow.
        """
        self.host = host
        self.port = port
        self.user = user
        self.kerb_ticket = kerb_ticket
        self.extra_conf = extra_conf
        self._fs = None
        
        # A simple retry mechanism for connection
        try:
            self._connect_with_retry()
        except Exception as e:
            logging.error(f"Failed to connect to HDFS after multiple retries: {e}")
            raise

    def _connect_with_retry(self, retries=3, delay=5):
        for i in range(retries):
            try:
                self._fs = hdfs.connect(
                    host=self.host,
                    port=self.port,
                    user=self.user,
                    kerb_ticket=self.kerb_ticket,
                    extra_conf=self.extra_conf
                )
                logging.info(f"Successfully connected to HDFS at {self.host}:{self.port}")
                return
            except Exception as e:
                logging.warning(f"HDFS connection attempt {i+1} failed: {e}. Retrying in {delay}s...")
                time.sleep(delay)
        raise ConnectionError("Could not establish connection to HDFS.")

    def load_data(self, file_manifest_path: str) -> List[Document]:
        """
        Loads data from HDFS based on a manifest file.

        The manifest file should contain one HDFS file path per line. This
        avoids expensive recursive directory listing operations on the NameNode.

        Args:
            file_manifest_path (str): The HDFS path to the manifest file.

        Returns:
            List[Document]: A list of Document objects.
        """
        documents = []
        
        if not self._fs:
            logging.error("HDFS filesystem not available.")
            return []
            
        try:
            with self._fs.open(file_manifest_path, 'rb') as f:
                manifest_content = f.read().decode('utf-8')
            
            file_paths = [path for path in manifest_content.splitlines() if path.strip()]
            logging.info(f"Found {len(file_paths)} files to process from manifest: {file_manifest_path}")

        except Exception as e:
            logging.error(f"Failed to read manifest file from HDFS: {e}")
            return []

        for path in file_paths:
            try:
                # Check file status once to get metadata
                file_info = self._fs.info(path)
                file_size = file_info.get('size', 0)
                
                # A simple safeguard against ingesting massive files by mistake
                if file_size > 100 * 1024 * 1024: # 100MB limit
                    logging.warning(f"Skipping large file {path} ({file_size} bytes)")
                    continue

                with self._fs.open(path, 'rb') as f:
                    content = f.read().decode('utf-8', errors='ignore')
                
                # Metadata is crucial for RAG context
                metadata = {
                    "hdfs_path": path,
                    "file_name": os.path.basename(path),
                    "last_modified": file_info.get('last_modified'),
                    "source": "HDFS",
                }
                documents.append(Document(text=content, extra_info=metadata))

            except Exception as e:
                logging.error(f"Failed to read or process file {path}: {e}")
                # Continue processing other files
                continue
                
        return documents

Our solution to the NameNode pressure problem was to shift the responsibility of discovery. Instead of the reader walking the filesystem, it consumes a manifest file. A separate, scheduled, and resource-managed process (like a simple MapReduce or Spark job) is responsible for generating this manifest. This decouples the high-intensity discovery phase from the data-reading phase.

The Indexing and Query Service

With the reader in place, the backend service was straightforward. We used FastAPI for its performance and simplicity. The service has two main parts: an indexing script and the query endpoint.

First, the indexing script that ties everything together. The pitfall here is memory management. Loading hundreds of thousands of documents into memory before indexing is not viable. LlamaIndex’s pipeline processes documents in a streaming fashion, which mitigates this.

# services/indexing_service.py

import os
import chromadb
import logging
from llama_index.core import VectorStoreIndex, ServiceContext
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.openai import OpenAI # For this example
from hdfs_reader.reader import HDFSReader

# --- Configuration ---
HDFS_HOST = os.getenv("HDFS_HOST", "namenode")
HDFS_PORT = int(os.getenv("HDFS_PORT", 9870))
HDFS_USER = os.getenv("HDFS_USER", "hadoop")
MANIFEST_PATH = "/data/manifests/latest_docs.txt"
CHROMA_PATH = "./chroma_db"
COLLECTION_NAME = "hdfs_docs"
EMBED_MODEL_NAME = "BAAI/bge-small-en-v1.5"
LLM_MODEL = "gpt-4-turbo" # Replace with your model

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def setup_services():
    """Initializes and returns the service context and vector store."""
    logging.info(f"Initializing embedding model: {EMBED_MODEL_NAME}")
    embed_model = HuggingFaceEmbedding(model_name=EMBED_MODEL_NAME)
    
    # In a real setup, OpenAI key would be managed via a secrets manager
    llm = OpenAI(model=LLM_MODEL, api_key=os.getenv("OPENAI_API_KEY"))

    service_context = ServiceContext.from_defaults(
        llm=llm, 
        embed_model=embed_model,
        chunk_size=512,
        chunk_overlap=50
    )

    db = chromadb.PersistentClient(path=CHROMA_PATH)
    chroma_collection = db.get_or_create_collection(COLLECTION_NAME)
    vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
    
    return service_context, vector_store

def run_indexing():
    """Runs the full indexing pipeline."""
    logging.info("Starting indexing process...")
    
    try:
        reader = HDFSReader(host=HDFS_HOST, port=HDFS_PORT, user=HDFS_USER)
        documents = reader.load_data(file_manifest_path=MANIFEST_PATH)
        
        if not documents:
            logging.warning("No documents found to index. Exiting.")
            return

        service_context, vector_store = setup_services()
        
        logging.info(f"Creating or updating VectorStoreIndex with {len(documents)} documents.")
        index = VectorStoreIndex.from_documents(
            documents,
            service_context=service_context,
            vector_store=vector_store,
            show_progress=True
        )
        logging.info("Indexing process completed successfully.")
        
    except Exception as e:
        logging.critical(f"An unrecoverable error occurred during indexing: {e}", exc_info=True)

if __name__ == "__main__":
    run_indexing()

The query API is then built on top of this indexed data.

# services/api.py

import os
import chromadb
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from llama_index.core import VectorStoreIndex, ServiceContext, StorageContext
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.openai import OpenAI

# --- Configuration ---
# (Same as indexing_service.py)
CHROMA_PATH = "./chroma_db"
COLLECTION_NAME = "hdfs_docs"
EMBED_MODEL_NAME = "BAAI/bge-small-en-v1.5"
LLM_MODEL = "gpt-4-turbo"

# --- API Setup ---
app = FastAPI()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Global variable to hold the query engine
# In a production app, this would be managed more robustly.
query_engine = None

class QueryRequest(BaseModel):
    query: str
    top_k: int = 3

@app.on_event("startup")
def load_model():
    """Load the index and prepare the query engine on startup."""
    global query_engine
    try:
        logging.info("Loading model and index on startup...")
        db = chromadb.PersistentClient(path=CHROMA_PATH)
        chroma_collection = db.get_collection(COLLECTION_NAME)
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        
        embed_model = HuggingFaceEmbedding(model_name=EMBED_MODEL_NAME)
        llm = OpenAI(model=LLM_MODEL, api_key=os.getenv("OPENAI_API_KEY"))
        service_context = ServiceContext.from_defaults(llm=llm, embed_model=embed_model)

        index = VectorStoreIndex.from_vector_store(
            vector_store,
            storage_context=storage_context,
            service_context=service_context,
        )
        
        query_engine = index.as_query_engine(similarity_top_k=5, response_mode="compact")
        logging.info("Query engine ready.")
    except Exception as e:
        logging.critical(f"Failed to initialize query engine: {e}", exc_info=True)
        # The app will fail to start if the engine can't load, which is desired behavior.
        raise

@app.post("/query")
async def handle_query(request: QueryRequest):
    if query_engine is None:
        raise HTTPException(status_code=503, detail="Query engine is not available.")
    
    try:
        logging.info(f"Received query: {request.query}")
        response = query_engine.query(request.query)
        
        # In a real app, you'd want to return source nodes for citation.
        source_nodes = [
            {"path": node.metadata.get("hdfs_path"), "score": node.score}
            for node in response.source_nodes
        ]
        
        return {
            "response": str(response),
            "sources": source_nodes
        }
    except Exception as e:
        logging.error(f"Error processing query '{request.query}': {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="An error occurred while processing the query.")

A problem surfaced during testing. Query latency was acceptable, but the context returned by the RAG pipeline wasn’t always sufficient. Because LlamaIndex fetches the text chunks associated with the top-k vectors, it’s retrieving from its own cache of the data, not hitting HDFS during the query. This is good for performance. However, our initial chunk size of 512 was often too small. Increasing it improved context but also increased indexing time and vector store size. This is a classic trade-off in RAG systems, exacerbated by the diversity of our document types. We settled on a dynamic chunking strategy for a future iteration, where different document types would get different chunking parameters.

The Front-end: A Responsive Interface

The front-end’s job is to be a clean, fast, and usable window into this complex backend. Vite, React, and Emotion are a powerful combination for this.

First, setting up the API communication layer. It’s crucial to handle loading states and errors gracefully to provide a good user experience.

// src/services/api.ts

export interface SourceNode {
  path: string;
  score: number;
}

export interface QueryResponse {
  response: string;
  sources: SourceNode[];
}

export class ApiError extends Error {
  constructor(message: string) {
    super(message);
    this.name = 'ApiError';
  }
}

export const postQuery = async (query: string): Promise<QueryResponse> => {
  const response = await fetch('/api/query', { // Assuming Vite proxy is set up
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({ query }),
  });

  if (!response.ok) {
    const errorData = await response.json().catch(() => ({ detail: 'An unknown error occurred' }));
    throw new ApiError(errorData.detail || 'Failed to fetch response from the server.');
  }

  return response.json();
};

The main UI component uses Emotion for styling. This keeps the styles colocated with the component logic, making it highly maintainable.

// src/components/ChatInterface.tsx

import React, { useState, FormEvent, useRef, useEffect } from 'react';
import styled from '@emotion/styled';
import { postQuery, QueryResponse, SourceNode } from '../services/api';

// --- Styled Components with Emotion ---
const ChatContainer = styled.div`
  display: flex;
  flex-direction: column;
  height: 100vh;
  width: 100%;
  max-width: 800px;
  margin: 0 auto;
  background-color: #f0f2f5;
`;

const MessageList = styled.div`
  flex-grow: 1;
  overflow-y: auto;
  padding: 20px;
  display: flex;
  flex-direction: column;
  gap: 15px;
`;

interface MessageProps {
  isUser: boolean;
}

const MessageBubble = styled.div<MessageProps>`
  padding: 12px 18px;
  border-radius: 20px;
  max-width: 75%;
  align-self: ${props => (props.isUser ? 'flex-end' : 'flex-start')};
  background-color: ${props => (props.isUser ? '#0084ff' : '#e4e6eb')};
  color: ${props => (props.isUser ? 'white' : 'black')};
  word-wrap: break-word;
`;

const InputArea = styled.form`
  display: flex;
  padding: 10px;
  border-top: 1px solid #ccc;
`;

const TextInput = styled.input`
  flex-grow: 1;
  padding: 10px 15px;
  border: 1px solid #ddd;
  border-radius: 20px;
  margin-right: 10px;
  font-size: 16px;
  &:focus {
    outline: none;
    border-color: #0084ff;
  }
`;

const SubmitButton = styled.button`
  padding: 10px 20px;
  border-radius: 20px;
  border: none;
  background-color: #0084ff;
  color: white;
  cursor: pointer;
  &:disabled {
    background-color: #a0c3ff;
    cursor: not-allowed;
  }
`;

// --- Component Logic ---
interface Message {
  text: string;
  isUser: boolean;
  sources?: SourceNode[];
}

export const ChatInterface: React.FC = () => {
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState('');
  const [isLoading, setIsLoading] = useState(false);
  const messageListRef = useRef<HTMLDivElement>(null);

  useEffect(() => {
    // Auto-scroll to the latest message
    messageListRef.current?.scrollTo(0, messageListRef.current.scrollHeight);
  }, [messages]);

  const handleSubmit = async (e: FormEvent) => {
    e.preventDefault();
    if (!input.trim() || isLoading) return;

    const userMessage: Message = { text: input, isUser: true };
    setMessages(prev => [...prev, userMessage]);
    setInput('');
    setIsLoading(true);

    try {
      const data: QueryResponse = await postQuery(userMessage.text);
      const aiMessage: Message = { text: data.response, isUser: false, sources: data.sources };
      setMessages(prev => [...prev, aiMessage]);
    } catch (error) {
      const errorMessage: Message = {
        text: error instanceof Error ? error.message : 'An unexpected error occurred.',
        isUser: false,
      };
      setMessages(prev => [...prev, errorMessage]);
    } finally {
      setIsLoading(false);
    }
  };

  return (
    <ChatContainer>
      <MessageList ref={messageListRef}>
        {messages.map((msg, index) => (
          <MessageBubble key={index} isUser={msg.isUser}>
            {msg.text}
          </MessageBubble>
        ))}
        {isLoading && <MessageBubble isUser={false}>Thinking...</MessageBubble>}
      </MessageList>
      <InputArea onSubmit={handleSubmit}>
        <TextInput
          type="text"
          value={input}
          onChange={(e) => setInput(e.target.value)}
          placeholder="Ask about the project archives..."
          disabled={isLoading}
        />
        <SubmitButton type="submit" disabled={isLoading}>
          Send
        </SubmitButton>
      </InputArea>
    </ChatContainer>
  );
};

The final result was a functional, performant system that successfully bridged our legacy data store with a modern AI interface. It met the primary business goal without requiring a painful data migration.

However, this solution has clear boundaries and lingering issues. The indexing process is still batch-oriented. For data that changes frequently, this introduces unacceptable latency. A true real-time system would require integrating with a Change Data Capture (CDC) tool on HDFS, like Kafka connect sinks or parsing HDFS audit logs, which adds significant architectural complexity. The current security model is also simplistic; a production system would need to propagate the end-user’s credentials all the way to the HDFS read to enforce file-level permissions, a non-trivial task. Finally, the scalability of the single FastAPI instance and the ChromaDB store is limited. A production deployment would require moving to a distributed vector database like Milvus or Weaviate and deploying the API server on a scalable platform like Kubernetes.


  TOC