Implementing Full-Stack Traceability in an MLOps Platform Using Redux Middleware and OpenTelemetry


The operational visibility into our ML model serving platform was effectively zero. When a user reported that a prediction request was “slow,” the subsequent investigation was a painful, multi-team effort involving sifting through disconnected logs from the front-end Nginx, the React application’s console output, the Python model server, and the feature store database. Correlating a specific user click with a specific backend model inference execution was guesswork. This lack of end-to-end context propagation made root cause analysis inefficient and proactive performance monitoring impossible. Our MLOps pipeline, while functional, was an opaque black box.

The initial concept was to establish a single, continuous distributed trace that originated within the user’s browser and persisted across every microservice hop until a prediction was returned. This required a mechanism to generate a trace context on the client-side, inject it into API requests, and have every subsequent service honor and propagate that context. Our front-end stack was built on React and Redux, while the backend consisted of Python-based model serving APIs. To capture and visualize this data, we needed a robust observability backend.

A contentious point in our technology selection was the choice between Jaeger and SkyWalking. In a real-world project, they are often seen as competitors, but their strengths are complementary. We selected Jaeger for its deep, detailed trace visualization and flame graph analysis, which is unparalleled for debugging specific, complex requests. We chose SkyWalking for its powerful service topology mapping, APM-centric dashboards, and metric correlation capabilities, which are better suited for high-level system health monitoring. The pivotal decision was not to choose one over the other, but to use both, fed by a single, standardized data pipeline built on OpenTelemetry. This avoids vendor lock-in and allows different teams to use the tool best suited for their task.

For the front-end, instead of introducing a new library purely for tracing, we leveraged our existing Redux architecture. The Redux middleware pattern provided the perfect interception point for API-triggering actions. For deployment, managing the configuration sprawl of all these components (Otel Collector, Jaeger, SkyWalking, multiple model servers) necessitated a declarative, version-controlled approach. GitOps, implemented with ArgoCD, became the non-negotiable choice for managing our Kubernetes-based infrastructure, with Tekton handling the CI part of building and publishing our application images.

The Redux Middleware for Trace Initiation

The entire chain of observability begins here. A common mistake is to only start tracing at the API gateway. This misses crucial client-side time: rendering delays, request preparation, and network latency from the user’s perspective. Our solution was a custom Redux middleware that instruments fetch calls triggered by specific Redux actions.

Here is the complete implementation of the traceInjectorMiddleware.js. It uses the official OpenTelemetry JS libraries to create a root span and inject W3C Trace Context headers.

// src/store/traceInjectorMiddleware.js

import { trace, context } from '@opentelemetry/api';
import { W3C_TRACE_CONTEXT_HEADER } from '@opentelemetry/core';

// Assume tracer is initialized elsewhere and exported, e.g., in `src/instrumentation.js`
import { tracer } from '../instrumentation'; 

// A helper to inject trace context into headers
const injectContextIntoHeaders = (headers) => {
  const carrier = {};
  const activeContext = context.active();
  // The W3CTraceContextPropagator is what creates the `traceparent` header
  // This is typically configured globally when setting up instrumentation
  // but the core logic is to populate the carrier object.
  // For simplicity, we assume a global propagator is active.
  // In a real setup, you would import and use your configured propagator.
  // Example: opentelemetry.propagation.inject(activeContext, carrier);
  
  // Manual demonstration of what the propagator does:
  const spanContext = trace.getSpanContext(activeContext);
  if (spanContext) {
    const headerValue = `00-${spanContext.traceId}-${spanContext.spanId}-0${span-context.traceFlags}`;
    carrier[W3C_TRACE_CONTEXT_HEADER] = headerValue;
  }
  
  return { ...headers, ...carrier };
};

const isApiAction = (action) => {
  // We use a convention for our API actions: they have a `meta.api` property.
  return action.meta && action.meta.api === true;
};

export const traceInjectorMiddleware = store => next => action => {
  if (!isApiAction(action)) {
    return next(action);
  }

  // 1. Define the attributes for our root span
  const spanAttributes = {
    'redux.action.type': action.type,
    'app.component': action.meta.componentName || 'unknown',
    // Add any other relevant metadata from the action payload
    'model.name': action.payload.request.modelName,
  };

  // 2. Create the root span. This span represents the entire user interaction
  // from the moment the action is dispatched until the API call completes.
  const span = tracer.startSpan(`Redux Action: ${action.type}`, {
    attributes: spanAttributes,
    kind: 1 // CLIENT
  });

  // 3. Execute the span within an active context. This is critical.
  // Any child spans (like the one created by the fetch instrumentation)
  // will automatically become children of this root span.
  return context.with(trace.setSpan(context.active(), span), () => {
    try {
      // Modify the original action to include instrumented fetch logic
      const originalFetch = action.payload.request.fetcher;
      
      const instrumentedFetcher = async (...args) => {
        const [url, options = {}] = args;
        const headers = options.headers || {};
        
        // 4. Inject the W3C Trace Context headers into the request
        const headersWithTrace = injectContextIntoHeaders(headers);
        
        const newOptions = { ...options, headers: headersWithTrace };
        
        console.log(`[Trace Middleware] Injected traceparent: ${headersWithTrace[W3C_TRACE_CONTEXT_HEADER]}`);

        // The actual fetch call is now wrapped.
        // If you have automatic fetch instrumentation configured for OpenTelemetry JS,
        // it will create a child span for the HTTP request itself.
        return originalFetch(url, newOptions);
      };

      // Replace the fetcher in the action payload
      const newAction = {
        ...action,
        payload: {
          ...action.payload,
          request: {
            ...action.payload.request,
            fetcher: instrumentedFetcher,
          },
        },
      };

      return next(newAction);
    } catch (error) {
      span.recordException(error);
      span.setStatus({ code: 2, message: error.message }); // 2 is ERROR
      throw error;
    } finally {
      // 5. End the span when the action has been processed.
      // If the action is async (e.g., using redux-thunk), this might end prematurely.
      // A more robust solution involves tracking the async lifecycle.
      // For this example, we assume the action is handled synchronously by another middleware.
      // In a real-world thunk, you'd end the span in the `finally` block of the async operation.
      const promise = store.getState().someSlice.promise; // Example of getting the promise
      if (promise && typeof promise.finally === 'function') {
        promise.finally(() => {
          span.end();
        });
      } else {
        span.end();
      }
    }
  });
};

This middleware intercepts any action with meta.api: true. It creates a parent span, injects the traceparent header into the outgoing request, and then passes the action along. The key pitfall here is handling asynchronous actions correctly. If you’re using redux-thunk, the middleware must be structured to await the promise resolution to accurately capture the entire duration of the operation before ending the span.

Backend Instrumentation: Propagating Context in a Python Model Server

Once the request hits our backend, the trace context must be extracted and propagated. Our model server is a FastAPI application. The opentelemetry-python SDK provides ASGI middleware that handles this extraction automatically. The real work is in creating meaningful child spans for the internal operations of the model server.

Here is a snippet from our main.py for the model serving application.

# model_server/main.py

import os
import logging
import random
import time
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

# --- OpenTelemetry Boilerplate ---
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.resources import Resource

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Service name is crucial for filtering in Jaeger/SkyWalking
resource = Resource(attributes={
    "service.name": "model-serving-api"
})

# Configure the OTLP exporter to send data to our collector
otel_collector_endpoint = os.environ.get("OTEL_COLLECTOR_ENDPOINT", "otel-collector:4317")
trace_provider = TracerProvider(resource=resource)
otlp_exporter = OTLPSpanExporter(endpoint=otel_collector_endpoint, insecure=True)
trace_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(trace_provider)

# Get a tracer for manual instrumentation
tracer = trace.get_tracer(__name__)

# --- Application Logic ---
app = FastAPI()

# Auto-instrument FastAPI and the `requests` library
FastAPIInstrumentor.instrument_app(app)
RequestsInstrumentor().instrument()

# A dummy feature store client
class FeatureStoreClient:
    def get_features(self, user_id: str):
        # In a real system, this would be a network call to Redis, DynamoDB, etc.
        # We create a child span to measure its performance.
        with tracer.start_as_current_span("feature_store_lookup") as span:
            span.set_attribute("db.system", "redis")
            span.set_attribute("db.statement", f"GET features FOR user:{user_id}")
            
            # Simulate network latency and processing
            latency = random.uniform(0.05, 0.15)
            time.sleep(latency)
            
            if random.random() < 0.05: # 5% chance of failure
                raise ConnectionError("Feature store connection timed out")
            
            span.set_attribute("features.retrieved_count", 10)
            return {"feature_vector": [random.random() for _ in range(10)]}

feature_store = FeatureStoreClient()

@app.post("/predict/{model_name}")
async def predict(model_name: str, request: Request):
    try:
        # The FastAPI instrumentor has already created a parent span from incoming headers.
        # All spans created here will be children of that request span.
        current_span = trace.get_current_span()
        current_span.set_attribute("model.name", model_name)

        # 1. First child span: Data Preprocessing
        with tracer.start_as_current_span("data_preprocessing") as span:
            payload = await request.json()
            user_id = payload.get("user_id")
            if not user_id:
                return JSONResponse(status_code=400, content={"error": "user_id is required"})
            
            span.set_attribute("input.payload.size", len(str(payload)))
            time.sleep(random.uniform(0.01, 0.03)) # Simulate work
            logger.info(f"Preprocessing data for user {user_id}")
        
        # 2. Second child span: Feature Fetching
        try:
            features = feature_store.get_features(user_id)
        except ConnectionError as e:
            # Record exceptions on the span for analysis in Jaeger
            current_span.record_exception(e)
            current_span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
            return JSONResponse(status_code=503, content={"error": "Feature store unavailable"})

        # 3. Third child span: Model Inference
        with tracer.start_as_current_span("model_inference") as span:
            span.set_attribute("model.version", "v1.2.3")
            
            # Simulate model inference time
            inference_time = random.uniform(0.2, 0.5)
            time.sleep(inference_time)
            
            prediction = random.random()
            span.set_attribute("model.prediction.score", prediction)
            logger.info(f"Model {model_name} produced prediction {prediction}")

        return {"model": model_name, "prediction": prediction}

    except Exception as e:
        # Generic error handling
        current_span = trace.get_current_span()
        current_span.record_exception(e)
        current_span.set_status(trace.Status(trace.StatusCode.ERROR, "Internal Server Error"))
        return JSONResponse(status_code=500, content={"error": "An internal error occurred"})

This code demonstrates several production-grade practices. It uses environment variables for configuration (OTEL_COLLECTOR_ENDPOINT), instruments external calls (FeatureStoreClient), adds custom attributes to spans for rich context (model.name, db.system), and correctly records exceptions. A common pitfall is forgetting to instrument I/O-bound operations like database calls or requests to other services, which are often the primary sources of latency.

The OpenTelemetry Collector: The Central Hub

The Collector is the lynchpin of this architecture, decoupling our applications from the observability backends. It receives data in OTLP format and can be configured to process it and export it to multiple destinations simultaneously. This is how we feed both Jaeger and SkyWalking from a single data stream.

Here is the otel-collector-config.yaml that enables this fan-out architecture:

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    # Batches spans to reduce the number of outgoing requests
    timeout: 1s
    send_batch_size: 512
  memory_limiter:
    # Prevents the collector from consuming too much memory
    check_interval: 1s
    limit_mib: 500
    spike_limit_mib: 100
  attributes:
    # Adds a common attribute to all telemetry data
    actions:
      - key: deployment.environment
        value: "production"
        action: insert

exporters:
  logging:
    # Useful for debugging the collector itself
    loglevel: info
  
  jaeger:
    endpoint: "jaeger-collector.observability.svc.cluster.local:14250"
    tls:
      insecure: true

  skywalking:
    endpoint: "skywalking-oap.observability.svc.cluster.local:11800"
    tls:
      insecure: true

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [memory_limiter, batch, attributes]
      # This is the key part: send traces to multiple exporters
      exporters: [logging, jaeger, skywalking]

This configuration is critical. It defines a single pipeline for traces that receives data via OTLP, processes it (batching, adding attributes), and then sends the exact same data to the logging, jaeger, and skywalking exporters. The pitfall here is network configuration within Kubernetes. The exporter endpoints must use the correct Kubernetes internal DNS names (<service-name>.<namespace>.svc.cluster.local). Misconfiguration here is a frequent source of silent data loss.

Declarative Deployment with GitOps

Managing this stack manually is not scalable. We use ArgoCD to synchronize our cluster state with a Git repository containing our Kubernetes manifests. This repository becomes the single source of truth for our entire application and its observability infrastructure. We use Kustomize to manage environment-specific overlays.

Here is an example of the ArgoCD Application manifest that deploys our entire stack:

# argocd/app-of-apps/mlops-platform.yaml
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  name: mlops-platform
  namespace: argocd
  finalizers:
    - resources-finalizer.argocd.argoproj.io
spec:
  project: default
  source:
    repoURL: 'https://github.com/your-org/mlops-platform-config.git'
    targetRevision: HEAD
    path: kubernetes/overlays/production
  destination:
    server: 'https://kubernetes.default.svc'
    namespace: mlops-production
  syncPolicy:
    automated:
      prune: true
      selfHeal: true
    syncOptions:
      - CreateNamespace=true

This manifest tells ArgoCD to monitor the kubernetes/overlays/production directory in our Git repo and apply it to the mlops-production namespace. The actual application manifests (Deployments, Services, ConfigMaps for the Otel Collector, Jaeger, etc.) reside in that directory.

To illustrate the structure, here is the kustomization.yaml for our model server:

# kubernetes/components/model-server/kustomization.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- deployment.yaml
- service.yaml

configMapGenerator:
- name: model-server-env
  literals:
  - OTEL_SERVICE_NAME=model-serving-api

images:
- name: model-server-image
  newName: your-registry/model-server
  newTag: latest # In production, this would be a specific Git SHA

Our CI pipeline (using Tekton) is responsible for building the image, tagging it with the Git commit SHA, and then updating this kustomization.yaml file in the Git repository. ArgoCD detects the change in Git and automatically rolls out the new version to the cluster. This Git-centric workflow provides a full audit trail and simplifies rollbacks.

graph TD
    subgraph User Browser
        A[Redux Action Dispatch] --> B{Trace Middleware};
        B -- Injects traceparent --> C[Fetch API Call];
    end

    subgraph Kubernetes Cluster
        C --> D[Ingress/Gateway];
        D --> E[FastAPI Model Server];
        E --> F[OTel Auto-Instrumentation];
        F -- Extracts traceparent & Creates Spans --> G(Manual Spans for Inference/Features);
        E --> H[Feature Store];

        subgraph Observability Namespace
            I[OTel Collector]
            J[Jaeger]
            K[SkyWalking]
        end
        
        G -- OTLP --> I;
        F -- OTLP --> I;
        I -- Traces --> J;
        I -- Traces/Metrics --> K;
    end
    
    subgraph CI/CD GitOps
        L[Developer Push to Git] --> M[Tekton Pipeline];
        M -- Builds & Pushes Image --> N[Container Registry];
        M -- Updates kustomization.yaml in Git --> O[Config Git Repo];
        P[ArgoCD] -- Watches --> O;
        P -- Deploys/Updates --> E;
    end

With this architecture, the original problem is solved. When a slow prediction is reported, we can look up the trace in Jaeger initiated by the Redux action. The resulting flame graph clearly shows every step: the time spent in the Redux middleware, the network transit time, the FastAPI request processing, and the precise duration of each child span for data_preprocessing, feature_store_lookup, and model_inference. Simultaneously, in SkyWalking, we can view the overall health of the model-serving-api, see its dependency graph, and correlate a spike in its average latency with, for example, a rise in CPU usage on the feature store pods.

The current implementation uses head-based sampling configured in the OpenTelemetry Collector, which is cost-effective but can miss intermittent, low-frequency errors. The next architectural iteration involves implementing a tail-based sampling strategy, which would require a more complex collector deployment with a tier that buffers all traces before making a sampling decision. Furthermore, our front-end tracing is limited to API calls; it does not yet capture component rendering times or other user experience metrics. Integrating these with the backend trace would provide an even more complete picture but requires a deeper, more framework-specific instrumentation on the client-side. The observability backends themselves, deployed here for simplicity, would need robust, scalable storage solutions like a managed Elasticsearch or Cassandra cluster in a large-scale production environment, adding significant operational complexity.


  TOC