Propagating OpenTelemetry Context from Redux Actions Through an OAuth 2.0 Flow to a LangChain Backend


Our generative AI application was functionally complete, but operationally it was a black box. A user action, dispatched through Redux on the client, would trigger a request to a Python backend. This backend, secured by OAuth 2.0, would invoke a LangChain agent. That agent, in turn, might make multiple calls to an LLM, use a few tools, and query a database before returning a response. When a request took 15 seconds, the critical question was: where was that time spent? Was it network latency, a slow tool, the LLM itself, or contention in our own API? Without end-to-end distributed tracing, we were just guessing.

The initial goal was to adopt OpenTelemetry. The theory is simple: start a trace on the client, propagate a context header with the request, and have the backend continue that trace. The reality, however, presented a significant architectural challenge. The path wasn’t a simple client-server hop. It involved three distinct domains: the client-side state machine (Redux), the security layer (OAuth 2.0), and the backend’s complex execution graph (LangChain). A common mistake is to instrument these in isolation, resulting in broken, disconnected traces that are more noise than signal. The real task was to stitch them into a single, cohesive narrative for every request.

The Backend Foundation: Instrumenting LangChain Beyond the Surface

Before even touching the client, the backend needed to provide deep visibility. A standard OpenTelemetry instrumentation for a web framework like FastAPI is a solved problem, but it’s insufficient for our needs. It would create a single span for the entire API request, hiding the complex inner workings of the LangChain agent.

In a real-world project, the value isn’t knowing that /api/v1/chat took 5 seconds; it’s knowing that within that request, LLMChain_1 took 3.5s, CalculatorTool took 0.2s, and SearchAPI took 1.1s. To achieve this, we had to hook directly into LangChain’s execution lifecycle using its callback system.

First, the basic FastAPI server setup with OpenTelemetry. We use opentelemetry-instrumentation-fastapi for automatic instrumentation of inbound requests.

# file: main.py
import os
import logging
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from jose import JWTError, jwt
from typing import Dict

# OpenTelemetry Setup
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

# --- Basic Observability Configuration ---
# In a production setup, use OTLPExporter to send to Jaeger, Datadog, etc.
provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)

# --- Application Setup ---
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)

# --- Mock LangChain and Custom Callback ---
# This will be replaced by our detailed implementation later
from langchain_core.callbacks.base import BaseCallbackHandler
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI

# A simple mock LLM for demonstration without requiring an API key for this snippet
class MockLLM(OpenAI):
    def _call(self, prompt, stop=None, run_manager=None, **kwargs):
        return "This is a mocked response."
    async def _acall(self, prompt, stop=None, run_manager=None, **kwargs):
        return "This is a mocked async response."

# --- OAuth 2.0 Security Setup ---
# These should be in a secure config management system
SECRET_KEY = os.environ.get("SECRET_KEY", "a_very_secret_key_for_dev")
ALGORITHM = "HS256"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Dummy tokenUrl

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # In a real app, this would also check against a DB of users/sessions
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    return {"username": username}

class ChatRequest(BaseModel):
    query: str

@app.post("/api/v1/chat")
async def chat_endpoint(request: ChatRequest, current_user: Dict = Depends(get_current_user)):
    # The real work happens here
    # We will insert our custom LangChain handler soon
    llm = MockLLM(temperature=0)
    prompt = PromptTemplate.from_template("What is the capital of {place}?")
    chain = LLMChain(llm=llm, prompt=prompt)

    # This is where we need more detailed spans
    response = chain.invoke({"place": request.query})
    
    return {"response": response, "user": current_user['username']}

# Add a dummy token generator for testing
from datetime import datetime, timedelta

@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordBearer = Depends()):
    # In a real app, you'd validate username/password
    user = {"username": form_data.username}
    access_token_expires = timedelta(minutes=30)
    expire = datetime.utcnow() + access_token_expires
    to_encode = {"sub": user['username'], "exp": expire}
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return {"access_token": encoded_jwt, "token_type": "bearer"}

Running this provides a single span. The solution is a custom callback handler that creates spans for each LangChain event.

# file: telemetry_callback.py
import logging
from typing import Any, Dict, List, Optional, Union
from uuid import UUID

from opentelemetry import trace
from opentelemetry.trace.span import Span

from langchain_core.callbacks.base import BaseCallbackHandler
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.outputs import LLMResult

tracer = trace.get_tracer(__name__)
logger = logging.getLogger(__name__)

class OpenTelemetryCallbackHandler(BaseCallbackHandler):
    """Callback Handler for OpenTelemetry."""

    def __init__(self) -> None:
        super().__init__()
        # A dictionary to store the current span for each run_id.
        # This is crucial for nesting spans correctly.
        self.span_map: Dict[UUID, Span] = {}

    def _get_parent_span(self, run_id: UUID, parent_run_id: Optional[UUID]) -> Optional[Span]:
        """Get the parent span from the span_map if it exists."""
        if parent_run_id and parent_run_id in self.span_map:
            return self.span_map[parent_run_id]
        return tracer.get_current_span()

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any
    ) -> None:
        """Create a new span for the LLM execution."""
        parent_span = self._get_parent_span(run_id, parent_run_id)
        with trace.use_span(parent_span, end_on_exit=False):
            span = tracer.start_span(f"LLM - {serialized.get('name', 'unknown')}")
            span.set_attribute("llm.prompts", "\n---\n".join(prompts))
            span.set_attribute("run_id", str(run_id))
            if parent_run_id:
                span.set_attribute("parent_run_id", str(parent_run_id))
            self.span_map[run_id] = span

    def on_llm_end(self, response: LLMResult, *, run_id: UUID, **kwargs: Any) -> None:
        """End the span for the LLM execution."""
        if run_id in self.span_map:
            span = self.span_map.pop(run_id)
            # You can add token usage and other metrics here
            if response.llm_output:
                span.set_attribute("llm.token_usage", str(response.llm_output.get("token_usage", {})))
            span.end()
        else:
            logger.warning(f"on_llm_end called for run_id {run_id} but no span was found.")

    def on_llm_error(self, error: Union[Exception, KeyboardInterrupt], *, run_id: UUID, **kwargs: Any) -> None:
        if run_id in self.span_map:
            span = self.span_map.pop(run_id)
            span.record_exception(error)
            span.set_status(trace.Status(trace.StatusCode.ERROR, description=str(error)))
            span.end()

    def on_chain_start(
        self, serialized: Dict[str, Any], inputs: Dict[str, Any], *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any
    ) -> None:
        parent_span = self._get_parent_span(run_id, parent_run_id)
        with trace.use_span(parent_span, end_on_exit=False):
            span = tracer.start_span(f"Chain - {serialized.get('name', 'unknown')}")
            span.set_attribute("chain.inputs", str(inputs))
            span.set_attribute("run_id", str(run_id))
            if parent_run_id:
                span.set_attribute("parent_run_id", str(parent_run_id))
            self.span_map[run_id] = span

    def on_chain_end(self, outputs: Dict[str, Any], *, run_id: UUID, **kwargs: Any) -> None:
        if run_id in self.span_map:
            span = self.span_map.pop(run_id)
            span.set_attribute("chain.outputs", str(outputs))
            span.end()
            
    def on_chain_error(self, error: Union[Exception, KeyboardInterrupt], *, run_id: UUID, **kwargs: Any) -> None:
        if run_id in self.span_map:
            span = self.span_map.pop(run_id)
            span.record_exception(error)
            span.set_status(trace.Status(trace.StatusCode.ERROR, description=str(error)))
            span.end()
            
    def on_tool_start(
        self, serialized: Dict[str, Any], input_str: str, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any
    ) -> None:
        parent_span = self._get_parent_span(run_id, parent_run_id)
        with trace.use_span(parent_span, end_on_exit=False):
            span = tracer.start_span(f"Tool - {serialized.get('name', 'unknown')}")
            span.set_attribute("tool.input", input_str)
            span.set_attribute("run_id", str(run_id))
            if parent_run_id:
                span.set_attribute("parent_run_id", str(parent_run_id))
            self.span_map[run_id] = span
            
    def on_tool_end(self, output: str, *, run_id: UUID, **kwargs: Any) -> None:
        if run_id in self.span_map:
            span = self.span_map.pop(run_id)
            span.set_attribute("tool.output", output)
            span.end()

    def on_tool_error(self, error: Union[Exception, KeyboardInterrupt], *, run_id: UUID, **kwargs: Any) -> None:
        if run_id in self.span_map:
            span = self.span_map.pop(run_id)
            span.record_exception(error)
            span.set_status(trace.Status(trace.StatusCode.ERROR, description=str(error)))
            span.end()

The pitfall here is managing parent-child relationships between spans. LangChain provides run_id and parent_run_id which are essential. We maintain a span_map to look up the parent span for the current event, ensuring a correctly nested trace. Now, we integrate this into our endpoint.

# In main.py, update the chat_endpoint
from telemetry_callback import OpenTelemetryCallbackHandler

@app.post("/api/v1/chat")
async def chat_endpoint(request: ChatRequest, current_user: Dict = Depends(get_current_user)):
    otel_callback = OpenTelemetryCallbackHandler()
    
    llm = MockLLM(temperature=0)
    prompt = PromptTemplate.from_template("What is the capital of {place}?")
    chain = LLMChain(llm=llm, prompt=prompt)

    # Invoke the chain with the callback attached
    response = chain.invoke(
        {"place": request.query},
        config={"callbacks": [otel_callback]}
    )
    
    return {"response": response, "user": current_user['username']}

With this in place, a single API request now generates a parent span for the FastAPI request, and nested within it, spans for the LLMChain execution and the underlying LLM call. The backend is ready.

The Client-Side Challenge: Tying Traces to Redux Actions

On the client, we need to initiate the trace. The trace must start not when the component renders, but precisely when the user action that triggers the API call occurs. In a Redux application, this means hooking into the action dispatch mechanism. The perfect tool for this is a custom Redux middleware.

We’ll use Redux Toolkit and its createAsyncThunk for managing API calls. Our middleware will wrap these thunks.

First, the OpenTelemetry JS setup. This should be done at the application’s entry point.

// file: telemetry/opentelemetry.js
import { WebTracerProvider } from '@opentelemetry/sdk-trace-web';
import { SimpleSpanProcessor, ConsoleSpanExporter } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { W3CTraceContextPropagator } from '@opentelemetry/core';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { FetchInstrumentation } from '@opentelemetry/instrumentation-fetch';

// In production, use OTLPTraceExporter to send data to a collector.
// const exporter = new OTLPTraceExporter({
//   url: 'http://localhost:4318/v1/traces',
// });
const exporter = new ConsoleSpanExporter();

const provider = new WebTracerProvider();
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));

// We use ZoneContextManager to automatically handle context propagation in async operations.
provider.register({
  contextManager: new ZoneContextManager(),
  propagator: new W3CTraceContextPropagator(),
});

registerInstrumentations({
  instrumentations: [
    new FetchInstrumentation({
      // We must tell the instrumentation to propagate trace headers to our API backend.
      propagateTraceHeaderCorsUrls: [
        /http:\/\/localhost:8000\.*/,
      ],
    }),
  ],
});

export const tracer = provider.getTracer('my-react-app');

Now, the core piece: the Redux middleware. It will watch for the pending, fulfilled, and rejected states of our async thunks.

// file: store/telemetryMiddleware.js
import { isAsyncThunkAction } from '@reduxjs/toolkit';
import { trace, context } from '@opentelemetry/api';

const activeSpans = new Map();

export const openTelemetryMiddleware = (store) => (next) => (action) => {
  const tracer = trace.getTracer('redux-middleware');

  // Check if the action is a pending async thunk
  if (isAsyncThunkAction(action) && action.type.endsWith('/pending')) {
    const actionName = action.type.replace('/pending', '');
    const span = tracer.startSpan(`Redux Async Thunk: ${actionName}`);
    
    // Associate span with the unique request ID of the thunk
    const requestId = action.meta.requestId;
    activeSpans.set(requestId, span);

    // This is the key part: we activate this span in the current context.
    // The FetchInstrumentation will then pick up this active context
    // and inject the necessary headers (`traceparent`) into the outgoing request.
    return context.with(trace.setSpan(context.active(), span), () => {
      return next(action);
    });
  }

  // Check if the action is a fulfilled or rejected async thunk
  if (isAsyncThunkAction(action) && (action.type.endsWith('/fulfilled') || action.type.endsWith('/rejected'))) {
    const requestId = action.meta.requestId;
    const span = activeSpans.get(requestId);

    if (span) {
      if (action.type.endsWith('/rejected')) {
        span.setStatus({ code: trace.StatusCode.ERROR, message: action.error.message });
        span.recordException({
          code: action.error.code,
          message: action.error.message,
          stack: action.error.stack,
        });
      } else {
        span.setStatus({ code: trace.StatusCode.OK });
      }
      span.end();
      activeSpans.delete(requestId);
    }
  }

  return next(action);
};

This middleware is robust. It uses the unique requestId generated by createAsyncThunk to track spans, ensuring that concurrent API calls don’t interfere with each other’s traces. The most critical line is context.with(...). It sets our newly created span as the “active” one for the duration of the action dispatch. The FetchInstrumentation we configured earlier automatically detects this active span and injects its context into the headers of any fetch request made within that scope.

Here is how you would define a thunk and configure the store:

// file: store/chatSlice.js
import { createSlice, createAsyncThunk } from '@reduxjs/toolkit';

// A utility to get the OAuth token. In a real app, this would come from a secure storage.
const getAuthToken = () => localStorage.getItem('access_token');

export const fetchChatResponse = createAsyncThunk(
  'chat/fetchResponse',
  async (query, { rejectWithValue }) => {
    try {
      const token = getAuthToken();
      if (!token) {
        return rejectWithValue('No authentication token found.');
      }

      const response = await fetch('http://localhost:8000/api/v1/chat', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${token}`,
        },
        body: JSON.stringify({ query }),
      });

      if (!response.ok) {
        const errorData = await response.json();
        // Propagate a meaningful error message
        throw new Error(errorData.detail || `Request failed with status ${response.status}`);
      }
      
      return await response.json();
    } catch (error) {
      return rejectWithValue(error.message);
    }
  }
);

const chatSlice = createSlice({
  name: 'chat',
  initialState: { messages: [], status: 'idle', error: null },
  reducers: {},
  extraReducers: (builder) => {
    builder
      .addCase(fetchChatResponse.pending, (state) => {
        state.status = 'loading';
      })
      .addCase(fetchChatResponse.fulfilled, (state, action) => {
        state.status = 'succeeded';
        state.messages.push(action.payload.response);
      })
      .addCase(fetchChatResponse.rejected, (state, action) => {
        state.status = 'failed';
        state.error = action.payload;
      });
  },
});

export default chatSlice.reducer;

// file: store/store.js
import { configureStore } from '@reduxjs/toolkit';
import chatReducer from './chatSlice';
import { openTelemetryMiddleware } from './telemetryMiddleware';

export const store = configureStore({
  reducer: {
    chat: chatReducer,
  },
  middleware: (getDefaultMiddleware) =>
    getDefaultMiddleware().concat(openTelemetryMiddleware),
});

The Bridge: OAuth 2.0 and Header Propagation

The final piece of the puzzle is ensuring the traceparent header, injected by the client, is correctly received by the FastAPI backend, even with the Authorization header being processed. This turned out to be less of a technical hurdle and more of a conceptual one. A common misconception is that the tracing headers need to be part of the OAuth 2.0 redirect flow itself. This is incorrect and overly complex.

The OAuth flow (e.g., Authorization Code) is purely for obtaining an access token. The tracing context belongs to the subsequent API calls that use that token. Our FetchInstrumentation and Redux middleware ensure that when fetchChatResponse is called, the request to /api/v1/chat contains two crucial headers:

  1. Authorization: Bearer <your_jwt_token>
  2. traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01

The FastAPIInstrumentor on the backend automatically inspects incoming requests for the traceparent header. If found, it extracts the context and continues the trace, making the client-side span the parent of the new server-side span. The OAuth 2.0 dependency (get_current_user) simply validates the Authorization header; it does not interfere with other headers.

The complete, end-to-end flow can be visualized as follows:

sequenceDiagram
    participant User
    participant ReactUI
    participant ReduxStore
    participant OTelMiddleware
    participant BrowserFetch
    participant FastAPI
    participant LangChainAgent
    participant LLM

    User->>ReactUI: Clicks "Send"
    ReactUI->>ReduxStore: dispatch(fetchChatResponse("Paris"))
    ReduxStore->>OTelMiddleware: Intercepts action
    OTelMiddleware->>OTelMiddleware: tracer.startSpan("Redux Thunk")
    OTelMiddleware-->>BrowserFetch: Injects `traceparent` header
    ReduxStore->>BrowserFetch: fetch('/api/v1/chat', {headers})
    BrowserFetch->>FastAPI: POST /api/v1/chat
    Note right of FastAPI: OTel instrumentor reads `traceparent`,
continues the trace. FastAPI->>FastAPI: Validates OAuth 2.0 token FastAPI->>LangChainAgent: invoke(query, callbacks) Note right of LangChainAgent: OTel callback handler starts
"Chain - LLMChain" span. LangChainAgent->>LLM: API Call Note right of LLM: OTel callback handler starts
"LLM - OpenAI" span. LLM-->>LangChainAgent: Response Note right of LLM: OTel callback handler ends "LLM" span. LangChainAgent-->>FastAPI: Response Note right of LangChainAgent: OTel callback handler ends "Chain" span. FastAPI-->>BrowserFetch: 200 OK BrowserFetch-->>ReduxStore: Promise resolves ReduxStore->>OTelMiddleware: Intercepts 'fulfilled' action OTelMiddleware->>OTelMiddleware: span.end() ReduxStore-->>ReactUI: Updates state with response

This diagram shows the unbroken chain of context. The traceparent header acts as the baton in a relay race, passed from the Redux middleware to the browser’s fetch, and finally caught by the FastAPI instrumentation, allowing the LangChain callback handler to build its part of the trace on the correct foundation.

This architecture, while involving several moving parts, is not overly complex. Its primary limitation is its reliance on explicit instrumentation within the LangChain callback system. If a tool used by an agent makes its own HTTP requests without being instrumented, that part of the trace will be a black box. Future iterations would involve creating instrumented base classes for our custom tools to ensure context is propagated even further, potentially using contextvars in Python for robust context passing in complex asynchronous tool executions. Additionally, productionizing this requires moving from console exporters to a proper OpenTelemetry collector and backend like Jaeger or Honeycomb, and implementing a sensible sampling strategy to manage costs without sacrificing visibility into critical or erroneous requests.


  TOC