The fundamental flaw in many Retrieval-Augmented Generation (RAG) proof-of-concepts is their security model: it’s often an all-or-nothing affair. Once a user is authenticated, the backend LlamaIndex pipeline can access the entire vector store. In any real-world enterprise scenario, this is a non-starter. Data access must be granular, dynamic, and tied directly to the user’s identity—their department, role, or project access. The technical challenge is not merely authenticating the user, but making the RAG pipeline itself identity-aware at the retrieval stage.
This problem is compounded when the client is a Progressive Web Application (PWA). A PWA introduces complexities around the authentication lifecycle, specifically how to manage OpenID Connect (OIDC) tokens securely for offline access and background operations managed by a service worker. The naive approach of passing a JWT and having a monolithic backend switch between pre-built indices for different roles is brittle and doesn’t scale to handle complex, overlapping permissions.
Our goal was to build a system where OIDC claims are used to dynamically construct metadata filters within the LlamaIndex query pipeline. This ensures that the retriever only ever sees data nodes the user is authorized to access, enforcing a zero-trust model at the data layer. This requires deep integration between a custom Webpack configuration for the PWA’s service worker, a robust client-side OIDC token management strategy, and a bespoke LlamaIndex query transformation component on the backend.
The Architectural Pain Point: Secure Token Handling in a PWA Service Worker
A service worker runs in a separate thread and doesn’t have direct access to the DOM or window
objects like localStorage
. This presents an immediate problem for API calls initiated from the service worker (e.g., for background sync or pre-caching). If the API requires an Authorization
header, how does the service worker obtain the access token?
A common mistake is to try to hack around this, perhaps by having the main thread periodically write the token to IndexedDB. This adds significant complexity and security risks if not encrypted properly. The more robust solution is a message-based channel. The main application thread is the single source of truth for the OIDC tokens. It communicates them securely to the service worker when they are issued or refreshed.
Our Webpack configuration uses workbox-webpack-plugin
‘s InjectManifest
mode, which gives us full control over the service worker file (sw.js
), allowing us to write custom logic instead of relying on generated code.
// webpack.config.js
const { InjectManifest } = require('workbox-webpack-plugin');
const path = require('path');
// ... other webpack config ...
module.exports = {
// ...
plugins: [
// ... other plugins
new InjectManifest({
swSrc: './src/sw.js', // Our custom service worker file
swDest: 'sw.js', // Output file in the build directory
// We exclude sourcemaps and the manifest itself from precaching
exclude: [/\.map$/, /manifest\.json$/],
}),
],
};
The service worker itself needs to maintain an internal, volatile state for the access token. It should not persist it to disk. It listens for messages from the client and updates its internal token variable. The fetch
event listener then injects this token into outgoing API requests.
// src/sw.js
import { precacheAndRoute } from 'workbox-precaching';
import { registerRoute } from 'workbox-routing';
import { StaleWhileRevalidate, NetworkFirst } from 'workbox-strategies';
// This variable will hold the token in memory for the life of the service worker.
// It is volatile and disappears when the worker is terminated.
let accessToken = null;
// Workbox precaching for the PWA shell
precacheAndRoute(self.__WB_MANIFEST);
// Listen for messages from the main application thread
self.addEventListener('message', (event) => {
if (event.data && event.data.type === 'SET_TOKEN') {
console.log('Service Worker: Received new access token.');
accessToken = event.data.token;
}
});
// The core logic: an interception for fetch requests
const apiAuthHandler = async ({ request }) => {
try {
// Clone the request to be able to modify it
const modifiedHeaders = new Headers(request.headers);
if (accessToken) {
modifiedHeaders.set('Authorization', `Bearer ${accessToken}`);
} else {
// In a production scenario, you might queue the request or signal an error.
// For now, we log a warning. The request will likely fail with a 401.
console.warn('Service Worker: No access token available for API request.');
}
const modifiedRequest = new Request(request, {
headers: modifiedHeaders,
mode: 'cors', // Important for cross-origin API calls
});
// Use NetworkFirst for API calls to ensure fresh data when online
return await fetch(modifiedRequest);
} catch (error) {
// This is a critical failure point. Proper logging is essential.
console.error('Service Worker: Failed to handle API request.', error);
// You could return a cached response here if available and appropriate
throw error;
}
};
// Register the route for our specific API endpoint
registerRoute(
({ url }) => url.pathname.startsWith('/api/rag/query'),
apiAuthHandler,
'POST'
);
// A simple cache-first strategy for static assets
registerRoute(
({ request }) => request.destination === 'style' || request.destination === 'script',
new StaleWhileRevalidate({
cacheName: 'static-resources',
})
);
On the client side, the application logic responsible for OIDC must post the token to the service worker upon successful login and every time the token is refreshed.
// src/authService.js
import { UserManager } from 'oidc-client-ts';
// Assume this is configured with your OIDC provider details
const userManager = new UserManager({
authority: 'https://your-oidc-provider.com',
client_id: 'your-client-id',
redirect_uri: window.location.origin + '/callback',
// ... other settings like scope, response_type, etc.
});
// Function to notify the service worker of the new token
const notifyServiceWorker = (token) => {
if ('serviceWorker' in navigator && navigator.serviceWorker.controller) {
navigator.serviceWorker.controller.postMessage({
type: 'SET_TOKEN',
token: token,
});
}
};
// After a successful sign-in
userManager.signinRedirectCallback().then((user) => {
if (user && !user.expired) {
notifyServiceWorker(user.access_token);
// Store user session in a way accessible to your app (e.g., state management)
}
});
// Event for when the access token is renewed
userManager.events.addAccessTokenExpiring(() => {
userManager.signinSilent().then(user => {
if (user && user.access_token) {
console.log('Token silently refreshed.');
notifyServiceWorker(user.access_token);
}
}).catch(err => {
// Handle silent refresh failure, possibly by forcing a full redirect
console.error('Silent refresh failed:', err);
});
});
This establishes a secure communication channel. The main thread manages the complex OIDC flow, and the service worker is just a passive recipient of the latest valid token, using it to authenticate its own network requests. The pitfall here is race conditions during startup; the service worker might activate and attempt a background fetch before the main client has authenticated and sent the first token. Robust error handling and request queuing strategies are necessary for production-grade reliability.
Backend: Building the Identity-Aware LlamaIndex Pipeline
With the PWA correctly sending a validated OIDC access_token
, the backend’s responsibility shifts to using that token’s claims to enforce data boundaries. The first step is standard API security: validating the JWT. We use a FastAPI middleware for this.
# backend/security.py
import jwt
from fastapi import Request, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from typing import Dict
# In a real app, these would come from config and be fetched from the OIDC .well-known endpoint
JWKS_CLIENT = jwt.PyJWKClient("https://your-oidc-provider.com/.well-known/jwks.json")
AUDIENCE = "your-api-audience"
ISSUER = "https://your-oidc-provider.com"
security = HTTPBearer()
async def validate_token(credentials: HTTPAuthorizationCredentials = Security(security)) -> Dict:
token = credentials.credentials
try:
signing_key = JWKS_CLIENT.get_signing_key_from_jwt(token).key
decoded_token = jwt.decode(
token,
signing_key,
algorithms=["RS256"],
audience=AUDIENCE,
issuer=ISSUER,
)
return decoded_token
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token has expired")
except jwt.InvalidTokenError as e:
# Log the specific error for debugging, but return a generic message to the client
print(f"JWT Validation Error: {e}")
raise HTTPException(status_code=401, detail="Invalid token")
The core innovation is in the LlamaIndex pipeline. Instead of having separate indices per department, we ingest all documents into a single index but enrich them with metadata during ingestion.
# backend/data_ingestion.py
from llama_index.core import Document
# Example document with metadata
doc1 = Document(
text="This is a confidential engineering document about project 'Phoenix'.",
metadata={
"department": "engineering",
"clearance_level": "3",
"project_id": "phoenix-001"
}
)
doc2 = Document(
text="This is a Q3 financial report for the sales department.",
metadata={
"department": "sales",
"clearance_level": "2",
"doc_type": "finance_report"
}
)
# ... Ingest these documents into your VectorStoreIndex ...
Now, we create a custom QueryTransform
. This component sits in the query pipeline after the user’s initial query is received but before the retriever is invoked. It inspects the validated JWT claims and dynamically constructs MetadataFilters
that will be passed to the retriever.
# backend/query_engine.py
from llama_index.core.query_engine import BaseQueryEngine
from llama_index.core.indices.query.schema import QueryBundle
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.postprocessor import BaseNodePostprocessor
from llama_index.core.schema import NodeWithScore
from llama_index.core.vector_stores import MetadataFilters, ExactMatchFilter
from typing import List, Dict, Any, Optional
class OidcClaimsFilterTransform:
"""
A LlamaIndex query transformation that constructs metadata filters
based on validated OIDC JWT claims.
"""
def __init__(self, claims: Dict[str, Any]):
self.claims = claims
def run(self, query_bundle: QueryBundle, **kwargs: Any) -> QueryBundle:
"""
Applies the transformation. It modifies the retriever's kwargs
to include metadata filters. This is a bit of a hack, but it's
a clean way to inject state into the retrieval process.
"""
# A common mistake is to fail silently. If claims are missing, it's a security risk.
if not self.claims:
# This should be a hard failure.
raise ValueError("OIDC claims are missing. Cannot proceed with query.")
filters = []
# Map OIDC claims to document metadata keys
# In production, this mapping should be configurable.
claim_to_metadata_map = {
'department': 'department',
'user_clearance': 'clearance_level'
}
# Example: The 'department' claim must match exactly.
if 'department' in self.claims and 'department' in claim_to_metadata_map:
dept_claim = self.claims['department']
# Support for single string or list of departments in the claim
if isinstance(dept_claim, list):
# We need to create an OR filter group if the user belongs to multiple departments
department_filters = [
ExactMatchFilter(key='department', value=dept) for dept in dept_claim
]
filters.extend(department_filters) # LlamaIndex retriever handles list of filters as OR
else:
filters.append(ExactMatchFilter(key='department', value=str(dept_claim)))
# Example: Clearance level must be less than or equal to user's clearance.
# The retriever doesn't support <= operators directly in a simple filter.
# This highlights a limitation. A workaround is to tag documents with all
# valid levels, e.g., a level 3 doc gets tags ["level_1", "level_2", "level_3"].
# For this example, we'll assume an exact match for simplicity.
if 'user_clearance' in self.claims and 'user_clearance' in claim_to_metadata_map:
filters.append(ExactMatchFilter(key='clearance_level', value=str(self.claims['user_clearance'])))
# In a real-world project, you'd add more sophisticated logic here,
# perhaps even calling out to an external policy engine like OPA.
if not filters:
# If no relevant claims are found, we must decide on a default policy.
# Defaulting to "deny" is the safest posture.
# We can create an impossible filter to return no results.
print("Warning: No applicable claims found for filtering. Denying access.")
filters.append(ExactMatchFilter(key="deny_access_tag", value="impossible_value"))
# The key step: inject the filters into the retriever args
# We store it in a custom kwarg that our custom retriever will look for.
if 'retriever_kwargs' not in kwargs:
kwargs['retriever_kwargs'] = {}
kwargs['retriever_kwargs']['filters'] = MetadataFilters(filters=filters)
return query_bundle, kwargs
class SecureRAGQueryEngine(BaseQueryEngine):
"""A custom query engine that integrates the OIDC filter transform."""
def __init__(
self,
retriever: BaseRetriever,
response_synthesizer: Any, # BaseResponseSynthesizer
node_postprocessors: Optional[List[BaseNodePostprocessor]] = None,
):
self._retriever = retriever
self._response_synthesizer = response_synthesizer
self._node_postprocessors = node_postprocessors or []
super().__init__()
def _query(self, query_bundle: QueryBundle, claims: Dict[str, Any]) -> Any: # Response
"""The core query logic, now accepting claims."""
# 1. Apply the OIDC claims transformation
filter_transform = OidcClaimsFilterTransform(claims)
query_bundle, retriever_kwargs = filter_transform.run(query_bundle)
# 2. Retrieve nodes with the dynamically generated filters
nodes = self._retriever.retrieve(query_bundle.query_str, **retriever_kwargs)
# 3. Apply post-processing (e.g., reranking)
for postprocessor in self._node_postprocessors:
nodes = postprocessor.postprocess_nodes(nodes, query_bundle=query_bundle)
# 4. Synthesize the final response
response = self._response_synthesizer.synthesize(
query=query_bundle,
nodes=nodes,
)
return response
async def _aquery(self, query_bundle: QueryBundle, claims: Dict[str, Any]) -> Any:
# Async version would follow the same logic
return self._query(query_bundle, claims)
The FastAPI endpoint now ties everything together. It validates the token and passes the resulting claims dictionary directly to our custom query engine.
# backend/main.py
from fastapi import FastAPI, Depends, Body
from pydantic import BaseModel
from typing import Dict, Any
# Assuming query_engine_setup initializes your index, retriever, synthesizer, etc.
from .query_engine_setup import get_secure_query_engine
from .security import validate_token
app = FastAPI()
class QueryRequest(BaseModel):
query: str
@app.post("/api/rag/query")
async def handle_query(
request: QueryRequest,
claims: Dict[str, Any] = Depends(validate_token)
):
try:
query_engine = get_secure_query_engine() # This should be a singleton instance
response = query_engine.query(request.query, claims=claims)
return {"response": str(response), "source_nodes": response.source_nodes}
except ValueError as e:
# Catch specific errors from our pipeline for proper HTTP responses
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
# Generic catch-all for unexpected errors
print(f"Unhandled error in RAG pipeline: {e}")
raise HTTPException(status_code=500, detail="Internal server error during query processing")
The complete data flow is now secure and identity-aware:
sequenceDiagram participant PWA Client participant Service Worker participant Backend API participant LlamaIndex Pipeline PWA Client->>PWA Client: OIDC Login/Refresh PWA Client->>Service Worker: postMessage({type: 'SET_TOKEN', token: ...}) Service Worker-->>Service Worker: Stores token in memory PWA Client->>Service Worker: fetch('/api/rag/query', {body: ...}) Service Worker->>Service Worker: Intercept fetch event Service Worker->>Service Worker: Attach 'Authorization' header Service Worker->>Backend API: POST /api/rag/query Backend API->>Backend API: JWT Middleware validates token alt Token Invalid Backend API-->>Service Worker: 401 Unauthorized else Token Valid Backend API->>LlamaIndex Pipeline: query(text, claims) LlamaIndex Pipeline->>LlamaIndex Pipeline: OidcClaimsFilterTransform runs LlamaIndex Pipeline->>LlamaIndex Pipeline: Creates MetadataFilters from claims LlamaIndex Pipeline->>LlamaIndex Pipeline: Retriever fetches nodes using filters LlamaIndex Pipeline->>LlamaIndex Pipeline: Synthesizer generates response LlamaIndex Pipeline-->>Backend API: Returns response object Backend API-->>Service Worker: 200 OK with response data end Service Worker-->>PWA Client: Returns response
This architecture is robust because the security logic is completely encapsulated on the backend. The PWA client does not need to know which documents it is allowed to see; it simply authenticates and sends a query. The LlamaIndex pipeline itself enforces the access control, preventing any possibility of data leakage at the retrieval step, long before an LLM could potentially see and summarize unauthorized information.
The current implementation using exact match filters is a starting point. A more advanced system would require more flexible filtering logic, potentially integrating an external policy engine like Open Policy Agent (OPA). The query transformation could call out to OPA with the user’s claims and the document’s metadata to get a simple allow/deny decision, enabling far more complex rules than simple string matching. Furthermore, the performance of metadata filtering at scale in the underlying vector database is a critical consideration; the metadata fields must be properly indexed to avoid a significant latency penalty on retrieval. These represent the next frontier of optimization for this architecture.