Projecting MLOps Event Sourcing Streams into a Recoil-Managed Micro-frontend within an Angular Application


The core challenge in building user interfaces for MLOps platforms is not rendering forms or tables, but accurately reflecting the state of long-running, asynchronous, and often distributed processes. A typical machine learning pipeline involves stages like data validation, training, evaluation, and deployment, each emitting a stream of events over minutes or even hours. A traditional RESTful approach, relying on polling a status endpoint, is inefficient, latent, and fails to capture the rich history of the process. The state model on the frontend becomes a fragile, periodically synchronized copy of a server-side state machine, leading to inconsistencies and a poor user experience.

A more robust architecture models the MLOps pipeline itself as a stream of immutable facts, or events. This is Event Sourcing. Instead of storing the current state of a model training job, we store the full sequence of events that led to that state: TrainingJobInitiated, DatasetVersionVerified, EpochCompleted, ValidationAccuracyCalculated, DeploymentToStagingSucceeded. The current state is merely a projection, or a left-fold, over this history of events. This architectural pattern, typically confined to the backend, can be extended directly to the frontend, creating a powerful, reactive, and auditable system.

The fundamental problem this solves is transforming the UI from a state-polling entity into a real-time event consumer. The source of truth becomes the event stream itself, which the frontend can subscribe to. This allows the UI to not only display the current status but also to reconstruct the entire history of a pipeline run, providing invaluable diagnostic and observability capabilities. In a real-world project, this means the UI is no longer guessing or lagging; it is reacting to the same immutable facts that drive the backend logic.

Our implementation context introduces a common enterprise constraint: an existing, large-scale portal built with Angular serves as the shell for all new functionality. The team tasked with building the new MLOps monitoring module, however, has expertise in React and prefers its compositional model for building highly dynamic data visualizations. This dictates a micro-frontend (MFE) architecture. We will use Webpack’s Module Federation to load a React-based MFE, which will manage its complex state using Recoil, into the Angular shell.

The architecture can be visualized as a clear data flow from the event source to the rendered component.

graph TD
    subgraph MLOps Backend
        A[Event Store] -->|Persists Events| B(ML Pipeline Process);
        B -->|Emits Events| A;
        A -->|Publishes to Stream| C{Event Bus / Message Queue};
    end

    subgraph Backend For Frontend
        C --> D[WebSocket Gateway];
    end

    subgraph Frontend Application
        subgraph Angular Shell
            E[Angular Host Component] -->|Loads MFE| F;
        end
        subgraph React MFE
            F[React Root] --> G(WebSocket Client Service);
            D -->|Pushes Events| G;
            G -->|Updates Recoil State| H{Recoil Atom: rawEventsState};
            H -->|Input for Projection| I[Recoil Selector: derivedModelState];
            I -->|Provides Projected State| J(React Component);
        end
    end

    style A fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#bbf,stroke:#333,stroke-width:2px
    style H fill:#9f9,stroke:#333,stroke-width:2px
    style I fill:#9f9,stroke:#333,stroke-width:2px

This design decouples the event production on the backend from its consumption on the frontend, using a WebSocket gateway as the real-time bridge. The critical logic resides within the React MFE, where Recoil is used not just as a state container, but as a client-side projection engine.

Backend Event Emitter Simulation

To focus on the frontend architecture, we’ll simulate the MLOps backend. The following Node.js server uses the ws library to create a WebSocket server. When a client connects and requests a specific modelRunId, it begins streaming a plausible sequence of MLOps events for that run.

In a production system, this would be connected to a real event bus like Kafka or a database change-data-capture (CDC) stream.

// mfe-event-sourcing-backend/server.js

const WebSocket = require('ws');
const url = require('url');

const wss = new WebSocket.Server({ port: 8085 });

console.log('WebSocket Event Emitter running on port 8085');

// A library of potential MLOps events for a training run.
const generateMockEvents = (runId) => {
    const startTime = Date.now();
    let eventCounter = 0;

    return [
        { type: 'TRAINING_INITIATED', payload: { runId, framework: 'TensorFlow', version: '2.9.1' }, delay: 500 },
        { type: 'DATASET_VALIDATION_STARTED', payload: { dataset: 'CIFAR-100', size: '5.2GB' }, delay: 1000 },
        { type: 'DATASET_VALIDATION_SUCCEEDED', payload: {}, delay: 1500 },
        ...Array.from({ length: 10 }).map((_, i) => ({
            type: 'EPOCH_COMPLETED',
            payload: { epoch: i + 1, loss: Math.max(0.1, 1.5 - i * 0.14), accuracy: 0.65 + i * 0.03 },
            delay: 1000 + i * 800
        })),
        { type: 'TRAINING_COMPLETED', payload: { finalLoss: 0.12, finalAccuracy: 0.92 }, delay: 9000 },
        { type: 'MODEL_EVALUATION_STARTED', payload: { on: 'holdout_set' }, delay: 9500 },
        { type: 'MODEL_EVALUATION_SUCCEEDED', payload: { precision: 0.91, recall: 0.93 }, delay: 11000 },
        { type: 'MODEL_DEPLOYMENT_STARTED', payload: { target: 'staging-environment' }, delay: 12000 },
        { type: 'MODEL_DEPLOYMENT_FAILED', payload: { reason: 'Insufficient GPU quota in staging cluster.' }, delay: 14000 },
    ].map(event => ({
        ...event,
        metadata: {
            eventId: `${runId}-${++eventCounter}`,
            streamId: runId,
            timestamp: new Date(startTime + event.delay).toISOString(),
        }
    }));
};

const streams = new Map();

wss.on('connection', (ws, req) => {
    const parameters = new url.URL(req.url, `http://${req.headers.host}`).searchParams;
    const runId = parameters.get('runId');

    if (!runId) {
        console.warn('Connection attempt without runId. Closing.');
        ws.close();
        return;
    }

    console.log(`Client connected for runId: ${runId}`);

    if (streams.has(runId)) {
        // If a stream is already running, kill the old one.
        clearInterval(streams.get(runId).intervalId);
    }
    
    const events = generateMockEvents(runId);
    let eventIndex = 0;

    const intervalId = setInterval(() => {
        if (eventIndex < events.length) {
            const eventToSend = events[eventIndex];
            ws.send(JSON.stringify(eventToSend));
            console.log(`Sent event ${eventToSend.type} for ${runId}`);
            eventIndex++;
        } else {
            console.log(`Finished event stream for ${runId}`);
            clearInterval(intervalId);
            // Optionally close the connection
            // ws.close();
        }
    }, 1000); // Send an event every second

    streams.set(runId, { intervalId });

    ws.on('close', () => {
        console.log(`Client disconnected for runId: ${runId}`);
        if (streams.has(runId)) {
            clearInterval(streams.get(runId).intervalId);
            streams.delete(runId);
        }
    });

    ws.on('error', (error) => {
        console.error(`WebSocket error for ${runId}:`, error);
    });
});

To run this, initialize a Node.js project, install ws (npm i ws), and run node server.js. It provides the real-time event source our frontend will consume.

The Angular Shell Configuration

The Angular application acts as the host. Its primary responsibilities are to provide the overall application layout and to load the React MFE using Module Federation. A common pitfall here is improper Webpack configuration, leading to dependency conflicts or runtime errors.

The shell’s Webpack configuration (webpack.config.js) must define itself as a host and list the remotes it will consume.

// angular-shell/webpack.config.js

const ModuleFederationPlugin = require("webpack/lib/container/ModuleFederationPlugin");
const deps = require("./package.json").dependencies;

module.exports = {
  output: {
    uniqueName: "angularShell",
    publicPath: "auto"
  },
  optimization: {
    runtimeChunk: false
  },
  resolve: {
    alias: {
      ...deps,
    }
  },
  plugins: [
    new ModuleFederationPlugin({
        remotes: {
            "reactMFE": "reactMFE@http://localhost:4201/remoteEntry.js",
        },
        shared: {
          ...deps,
          "react": { singleton: true, eager: true, requiredVersion: deps.react },
          "react-dom": { singleton: true, eager: true, requiredVersion: deps["react-dom"] },
        }
    }),
  ],
};

The Angular component that hosts the MFE will then dynamically load the remote component. We create a wrapper component to handle mounting and unmounting the React application inside an Angular view.

// angular-shell/src/app/mfe-wrapper/mfe-wrapper.component.ts

import { Component, AfterViewInit, ElementRef, ViewChild, OnDestroy } from '@angular/core';
import { loadRemoteModule } from '@angular-architects/module-federation';
import type { ComponentType } from 'react';
import { createRoot, Root } from 'react-dom/client';

@Component({
  selector: 'app-mfe-wrapper',
  template: '<div #reactContainer></div>',
})
export class MfeWrapperComponent implements AfterViewInit, OnDestroy {
  @ViewChild('reactContainer', { static: true }) containerRef!: ElementRef;
  
  private reactRoot: Root | null = null;
  private modelRunId = `run-${Date.now()}`; // Example ID

  async ngAfterViewInit() {
    try {
      // Dynamically load the remote React component
      const { MLOpsDashboard } = await loadRemoteModule({
        remoteEntry: 'http://localhost:4201/remoteEntry.js',
        remoteName: 'reactMFE',
        exposedModule: './MLOpsDashboard',
      });
      
      this.reactRoot = createRoot(this.containerRef.nativeElement);
      // The React component is cast here. In a real project, type definitions should be shared.
      const ReactComponent = MLOpsDashboard as ComponentType<{ modelRunId: string }>;
      
      // Mount the React component with props
      this.reactRoot.render(<ReactComponent modelRunId={this.modelRunId} />);

    } catch (error) {
      console.error('Error loading remote module', error);
      this.containerRef.nativeElement.innerText = 'Failed to load MLOps Dashboard.';
    }
  }

  ngOnDestroy() {
    // A crucial step to prevent memory leaks is unmounting the React app.
    if (this.reactRoot) {
      this.reactRoot.unmount();
    }
  }
}

The React MFE with Recoil for State Projection

This is the core of our solution. The React application receives a modelRunId, establishes a WebSocket connection, and feeds the incoming event stream into a Recoil state management system. Selectors then act as the projection functions, transforming the raw event list into meaningful, derived state for UI components.

First, the Webpack configuration for the React MFE declares its exposed modules.

// react-mfe/webpack.config.js

const ModuleFederationPlugin = require("webpack/lib/container/ModuleFederationPlugin");
const deps = require("./package.json").dependencies;

module.exports = {
  // ... other webpack config ...
  output: {
    publicPath: "http://localhost:4201/",
  },
  plugins: [
    new ModuleFederationPlugin({
      name: "reactMFE",
      filename: "remoteEntry.js",
      exposes: {
        './MLOpsDashboard': './src/MLOpsDashboard',
      },
      shared: {
        ...deps,
        "react": { singleton: true, eager: true, requiredVersion: deps.react },
        "react-dom": { singleton: true, eager: true, requiredVersion: deps["react-dom"] },
        "recoil": { singleton: true, eager: true, requiredVersion: deps.recoil },
      }
    }),
    // ... other plugins ...
  ],
};

Next, we define the WebSocket service. A common mistake is to place connection logic inside a React component, leading to multiple connections and complex lifecycle management. A dedicated, non-React class is cleaner.

// react-mfe/src/services/event-stream-service.ts

export type MLOpsEvent = {
  type: string;
  payload: any;
  metadata: {
    eventId: string;
    streamId: string;
    timestamp: string;
  };
};

type EventHandler = (event: MLOpsEvent) => void;

export class EventStreamService {
  private ws: WebSocket | null = null;
  private readonly runId: string;
  private readonly url = 'ws://localhost:8085';
  private subscribers: EventHandler[] = [];
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;

  constructor(runId: string) {
    this.runId = runId;
  }

  public connect() {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      console.warn('WebSocket is already connected.');
      return;
    }

    try {
      this.ws = new WebSocket(`${this.url}?runId=${this.runId}`);
      
      this.ws.onopen = () => {
        console.log(`WebSocket connected for runId: ${this.runId}`);
        this.reconnectAttempts = 0;
      };

      this.ws.onmessage = (message) => {
        try {
          const event: MLOpsEvent = JSON.parse(message.data as string);
          this.subscribers.forEach(handler => handler(event));
        } catch (error) {
          console.error('Failed to parse incoming event:', message.data, error);
        }
      };

      this.ws.onclose = () => {
        console.log('WebSocket disconnected.');
        this.handleReconnect();
      };
      
      this.ws.onerror = (error) => {
        console.error('WebSocket error:', error);
        // The 'onclose' event will be fired subsequently, triggering reconnection logic.
      };

    } catch (error) {
        console.error('Failed to initialize WebSocket connection:', error);
    }
  }

  private handleReconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      const delay = Math.pow(2, this.reconnectAttempts) * 1000; // Exponential backoff
      console.log(`Attempting to reconnect in ${delay / 1000}s...`);
      setTimeout(() => this.connect(), delay);
    } else {
      console.error('Max reconnection attempts reached. Giving up.');
    }
  }

  public subscribe(handler: EventHandler): () => void {
    this.subscribers.push(handler);
    // Return an unsubscribe function
    return () => {
      this.subscribers = this.subscribers.filter(sub => sub !== handler);
    };
  }
  
  public disconnect() {
    if (this.ws) {
      this.ws.close();
      this.ws = null;
    }
    this.reconnectAttempts = this.maxReconnectAttempts; // Prevent reconnection
  }
}

Now, we set up the Recoil state. We have one core atom to store the raw, unordered list of events as they arrive. The real power comes from selectors, which compute derived state.

// react-mfe/src/state/mlops.state.ts

import { atom, selector } from 'recoil';
import { MLOpsEvent } from '../services/event-stream-service';

// The source of truth: a simple array of all received events.
export const mlOpsEventsState = atom<MLOpsEvent[]>({
  key: 'mlOpsEventsState',
  default: [],
});

// Selector to derive the current high-level status of the pipeline.
export const pipelineStatusSelector = selector<string>({
  key: 'pipelineStatusSelector',
  get: ({ get }) => {
    const events = get(mlOpsEventsState);
    if (events.length === 0) return 'PENDING';

    // Find the latest, most definitive event type
    const eventTypes = new Set(events.map(e => e.type));
    if (eventTypes.has('MODEL_DEPLOYMENT_FAILED') || eventTypes.has('DATASET_VALIDATION_FAILED')) {
        return 'FAILED';
    }
    if (eventTypes.has('MODEL_DEPLOYMENT_SUCCEEDED')) {
        return 'DEPLOYED';
    }
    if (eventTypes.has('MODEL_DEPLOYMENT_STARTED')) {
        return 'DEPLOYING';
    }
    if (eventTypes.has('TRAINING_COMPLETED')) {
        return 'TRAINING_COMPLETE';
    }
    if (eventTypes.has('TRAINING_INITIATED')) {
        return 'TRAINING';
    }
    return 'INITIALIZING';
  },
});

// Selector to project the training progress from EPOCH_COMPLETED events.
export const trainingProgressSelector = selector<{ epoch: number; loss: number; accuracy: number }[]>({
    key: 'trainingProgressSelector',
    get: ({ get }) => {
        const events = get(mlOpsEventsState);
        return events
            .filter(e => e.type === 'EPOCH_COMPLETED')
            .map(e => e.payload)
            // A real-world project must handle out-of-order events. Sorting ensures correctness.
            .sort((a, b) => a.epoch - b.epoch);
    }
});

// Selector to extract the final failure reason, if any.
export const failureReasonSelector = selector<string | null>({
    key: 'failureReasonSelector',
    get: ({get}) => {
        const events = get(mlOpsEventsState);
        const failureEvent = events.find(e => e.type.endsWith('_FAILED'));
        return failureEvent ? failureEvent.payload.reason || 'Unknown error' : null;
    }
})

Finally, the React component ties everything together. It instantiates the service, subscribes to events to update the Recoil atom, and consumes the derived state from the selectors to render the UI. The component itself remains declarative and simple.

// react-mfe/src/MLOpsDashboard.tsx

import React, { useEffect, useMemo } from 'react';
import { RecoilRoot, useRecoilValue, useSetRecoilState } from 'recoil';
import { EventStreamService, MLOpsEvent } from './services/event-stream-service';
import { mlOpsEventsState, pipelineStatusSelector, trainingProgressSelector, failureReasonSelector } from './state/mlops.state';

// The component that manages the connection and state updates
const EventConnector = ({ runId }: { runId: string }) => {
  const setEvents = useSetRecoilState(mlOpsEventsState);
  
  const eventStreamService = useMemo(() => new EventStreamService(runId), [runId]);
  
  useEffect(() => {
    eventStreamService.connect();
    
    const unsubscribe = eventStreamService.subscribe((event: MLOpsEvent) => {
      // It's critical to use the callback form of setState to avoid stale closures
      setEvents(prevEvents => {
          // Prevent duplicate events, a common issue in distributed systems.
          if (prevEvents.some(e => e.metadata.eventId === event.metadata.eventId)) {
              return prevEvents;
          }
          return [...prevEvents, event];
      });
    });
    
    // Cleanup on component unmount
    return () => {
      unsubscribe();
      eventStreamService.disconnect();
    };
  }, [eventStreamService, setEvents]);

  return null; // This component has no UI
};


// The presentational component
const DashboardView = () => {
  const status = useRecoilValue(pipelineStatusSelector);
  const progress = useRecoilValue(trainingProgressSelector);
  const failureReason = useRecoilValue(failureReasonSelector);
  const allEvents = useRecoilValue(mlOpsEventsState);

  return (
    <div style={{ fontFamily: 'sans-serif', padding: '1em', border: '1px solid #ccc' }}>
      <h2>MLOps Pipeline Status</h2>
      <p><strong>Status:</strong> {status}</p>
      {failureReason && <p style={{ color: 'red' }}><strong>Failure:</strong> {failureReason}</p>}
      
      <h3>Training Progress</h3>
      <pre style={{ background: '#f0f0f0', padding: '10px', maxHeight: '200px', overflowY: 'auto' }}>
        {progress.length > 0 ? progress.map(p => 
          `Epoch ${p.epoch}: Loss=${p.loss.toFixed(4)}, Accuracy=${p.accuracy.toFixed(4)}\n`
        ).join('') : 'Waiting for training to start...'}
      </pre>

      <h3>Raw Event Log</h3>
      <pre style={{ background: '#333', color: '#eee', padding: '10px', maxHeight: '300px', overflowY: 'auto' }}>
        {allEvents.map(e => `[${e.metadata.timestamp}] ${e.type}`).join('\n')}
      </pre>
    </div>
  );
};


// The root component exposed to the Angular shell
export const MLOpsDashboard = ({ modelRunId }: { modelRunId: string }) => {
  return (
    <RecoilRoot>
      <EventConnector runId={modelRunId} />
      <DashboardView />
    </RecoilRoot>
  );
};

Testing the State Logic

A major advantage of this architecture is its testability. Since the state logic is encapsulated in pure Recoil selectors, we can test the projections without needing to render components or mock WebSocket connections.

// react-mfe/src/state/mlops.state.test.ts

import { snapshot_UNSTABLE } from 'recoil';
import { pipelineStatusSelector, trainingProgressSelector } from './mlops.state';
import { MLOpsEvent } from '../services/event-stream-service';

// Mock setter for the atom state during tests
const setMockEvents = (events: MLOpsEvent[]) => ({ set }) => {
  set(mlOpsEventsState, events);
};

describe('MLOps Recoil Selectors', () => {
  
  it('pipelineStatusSelector should return PENDING for no events', () => {
    const initialSnapshot = snapshot_UNSTABLE(setMockEvents([]));
    const status = initialSnapshot.getLoadable(pipelineStatusSelector).contents;
    expect(status).toBe('PENDING');
  });
  
  it('pipelineStatusSelector should return TRAINING when initiated', () => {
    const events = [{ type: 'TRAINING_INITIATED' }];
    const snapshot = snapshot_UNSTABLE(setMockEvents(events as MLOpsEvent[]));
    const status = snapshot.getLoadable(pipelineStatusSelector).contents;
    expect(status).toBe('TRAINING');
  });

  it('pipelineStatusSelector should return FAILED on a failure event', () => {
    const events = [
        { type: 'TRAINING_INITIATED' },
        { type: 'EPOCH_COMPLETED', payload: { epoch: 1 } },
        { type: 'MODEL_DEPLOYMENT_FAILED', payload: { reason: 'test' } },
    ];
    const snapshot = snapshot_UNSTABLE(setMockEvents(events as MLOpsEvent[]));
    const status = snapshot.getLoadable(pipelineStatusSelector).contents;
    expect(status).toBe('FAILED');
  });

  it('trainingProgressSelector should correctly filter and sort epoch events', () => {
    const events = [
        { type: 'TRAINING_INITIATED' },
        // Intentionally out of order to test sorting
        { type: 'EPOCH_COMPLETED', payload: { epoch: 2, loss: 0.8, accuracy: 0.8 } },
        { type: 'DATASET_VALIDATION_SUCCEEDED' },
        { type: 'EPOCH_COMPLETED', payload: { epoch: 1, loss: 1.2, accuracy: 0.7 } },
    ];
    const snapshot = snapshot_UNSTABLE(setMockEvents(events as MLOpsEvent[]));
    const progress = snapshot.getLoadable(trainingProgressSelector).contents;
    
    expect(progress).toHaveLength(2);
    expect(progress[0].epoch).toBe(1);
    expect(progress[1].epoch).toBe(2);
  });
});

This pattern of client-side projection via Event Sourcing is not without its trade-offs. It introduces a higher level of initial complexity compared to simple polling. The memory footprint on the client will grow with the number of events in a single stream, which necessitates strategies like event stream snapshotting from the backend, where the client receives an initial projected state and then subscribes only to subsequent events. Furthermore, the frontend must be resilient to out-of-order or duplicate events, requiring idempotent state update logic based on event IDs or sequence numbers.

The applicability of this architecture is therefore bounded. It is most valuable in domains where the history of state changes is as important as the final state itself—such as in financial systems, audit logs, collaborative applications, and, as demonstrated, complex MLOps pipelines. For simple CRUD applications, the architectural overhead would be unjustified. A potential future iteration could involve moving the projection logic into a server-side CQRS read model for exceptionally large event streams, where the frontend queries a pre-materialized view but still subscribes to real-time updates for low latency.


  TOC