Implementing a Real-Time Streaming Interface for LangChain Agent Reasoning on Mobile with Angular and WebSockets


The conventional request-response cycle of HTTP is a poor fit for exposing the inner workings of a Large Language Model agent. In a mobile application context, invoking a multi-step, tool-using LangChain agent via a standard REST API endpoint results in a period of unnerving silence for the user. The application sends a query and then waits, sometimes for many seconds, before a final answer appears. This latency creates a disconnected and unresponsive user experience, which is unacceptable for a modern mobile interface. Our initial attempt involved polling a status endpoint, a clumsy and inefficient workaround that cluttered the network and still failed to provide a genuinely fluid view into the agent’s reasoning process.

The core of the problem is that an agent’s value isn’t just in its final answer, but in its transparent, step-by-step process of thought, tool invocation, and observation. To build user trust and provide a rich interactive experience, we needed to stream this process to the client in real-time. This immediately ruled out HTTP-based solutions and pointed directly toward a persistent, bidirectional communication channel. The technical objective became clear: design and implement a robust, event-driven architecture to bridge a Python-based LangChain backend with an Angular mobile frontend, capable of streaming complex agent state transitions without compromising performance or maintainability.

Our technology selection process was guided by pragmatism. For the backend, while several frameworks exist in the Python ecosystem, FastAPI was the logical choice due to its first-class async support and incredibly straightforward WebSocket implementation. It provides the performance we need without the boilerplate of heavier frameworks like Django Channels. For the frontend, Angular’s built-in reactive programming library, RxJS, is not just a convenience but a necessity for this architecture. A WebSocket is fundamentally a stream of events, and RxJS provides the powerful operators required to manage, transform, and consume this stream declaratively, preventing the spaghetti code of imperative callback handling that often plagues such implementations. The mobile shell itself, built with Capacitor, is largely incidental; the core architectural challenge lies in the communication layer between the web view and the server.

The Backend: A WebSocket-Enabled LangChain Callback

The foundation of the real-time stream is a custom AsyncCallbackHandler in LangChain. This handler intercepts events from the agent’s execution loop and pushes them down the WebSocket connection to the client. This approach decouples the agent’s logic from the communication protocol, which is a critical design choice for maintainability.

First, we define a clear, typed schema for the events we’ll be sending. Unstructured data over a WebSocket is a common source of bugs in real-world projects.

# src/schemas.py
from typing import Dict, Any, Literal
from pydantic import BaseModel, Field

# Defines the structure for events streamed to the client.
# Using a literal type for 'event_type' provides strong typing
# on both the server and client, reducing integration errors.
EventType = Literal[
    "on_agent_start",
    "on_chain_start",
s    "on_tool_start",
    "on_tool_end",
    "on_agent_action",
    "on_llm_new_token",
    "on_agent_finish",
    "on_error"
]

class WebSocketEvent(BaseModel):
    event_type: EventType
    data: Dict[str, Any] = Field(default_factory=dict)
    run_id: str

Next, we implement the AsyncCallbackHandler. Its primary responsibility is to take a WebSocket connection object and format agent lifecycle events according to our schema before sending them as JSON. A crucial detail here is robust error handling within the sending logic; if a client disconnects, the handler must catch the exception gracefully without crashing the entire agent execution.

# src/websocket_handler.py
import logging
from typing import Any, Dict, List, Optional
from uuid import UUID

from fastapi import WebSocket, WebSocketDisconnect
from langchain.callbacks.base import AsyncCallbackHandler
from langchain.schema.agent import AgentAction, AgentFinish
from langchain.schema.output import LLMResult

from src.schemas import WebSocketEvent

# Configure a dedicated logger for our callback handler.
# This helps in debugging issues specific to the streaming logic.
logger = logging.getLogger(__name__)

class WebSocketCallbackHandler(AsyncCallbackHandler):
    """
    An async callback handler that streams agent events over a WebSocket connection.
    This is the core component bridging LangChain's execution with the frontend.
    """
    def __init__(self, websocket: WebSocket):
        super().__init__()
        self.websocket = websocket

    async def _send_event(self, event: WebSocketEvent):
        """A robust method to send an event, handling potential disconnects."""
        try:
            await self.websocket.send_json(event.model_dump())
        except WebSocketDisconnect:
            logger.warning(f"Client disconnected. Run ID: {event.run_id}. Unable to send event: {event.event_type}")
        except Exception as e:
            # Catching other potential exceptions during JSON serialization or sending.
            logger.error(f"Error sending event for run ID {event.run_id}: {e}", exc_info=True)

    async def on_llm_new_token(self, token: str, *, run_id: UUID, **kwargs: Any) -> None:
        """Stream individual tokens from the LLM as they are generated."""
        event = WebSocketEvent(
            event_type="on_llm_new_token",
            run_id=str(run_id),
            data={"token": token}
        )
        await self._send_event(event)
        
    async def on_agent_action(self, action: AgentAction, *, run_id: UUID, **kwargs: Any) -> None:
        """Called when the agent decides to take an action."""
        event = WebSocketEvent(
            event_type="on_agent_action",
            run_id=str(run_id),
            data={
                "tool": action.tool,
                "tool_input": action.tool_input,
                "log": action.log,
            },
        )
        await self._send_event(event)

    async def on_tool_end(self, output: str, *, run_id: UUID, **kwargs: Any) -> None:
        """Called when a tool finishes execution."""
        event = WebSocketEvent(
            event_type="on_tool_end",
            run_id=str(run_id),
            data={"output": output}
        )
        await self._send_event(event)

    async def on_agent_finish(self, finish: AgentFinish, *, run_id: UUID, **kwargs: Any) -> None:
        """Called when the agent has a final answer."""
        event = WebSocketEvent(
            event_type="on_agent_finish",
            run_id=str(run_id),
            data={"return_values": finish.return_values, "log": finish.log}
        )
        await self._send_event(event)
        
    # We can selectively implement other callback methods like on_chain_start,
    # on_tool_start, etc., if the UI needs to react to those events.
    # For this example, we focus on the most visually impactful events.

Finally, we construct the FastAPI application. It exposes a single /ws endpoint that accepts WebSocket connections. Upon connection, it instantiates our WebSocketCallbackHandler and an agent, then awaits a message from the client to trigger the agent’s execution. The agent’s ainvoke method is called with the callback handler in its callbacks list. This is where the magic happens: as the agent runs, it automatically triggers the hooks in our handler, which in turn streams the data to the connected client.

# src/main.py
import logging
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain import hub

# Basic logging configuration for the server.
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI()

# In a production environment, secrets should be managed via environment variables
# or a secrets management system, not hardcoded.
# For simplicity, we assume OPENAI_API_KEY and TAVILY_API_KEY are in the environment.
llm = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0, streaming=True)
tools = [TavilySearchResults(max_results=1)]
prompt = hub.pull("hwchase17/react")
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    logger.info("WebSocket connection accepted.")
    
    # Instantiate the callback handler with the current websocket connection.
    ws_handler = WebSocketCallbackHandler(websocket)
    
    try:
        while True:
            # Wait for a message from the client (e.g., the user's query).
            data = await websocket.receive_text()
            logger.info(f"Received message: {data}")
            
            # Run the agent with the custom callback handler.
            # This is an asynchronous, non-blocking call.
            await agent_executor.ainvoke(
                {"input": data},
                config={"callbacks": [ws_handler]}
            )

    except WebSocketDisconnect:
        logger.warning("Client disconnected.")
    except Exception as e:
        logger.error(f"An error occurred in the websocket connection: {e}", exc_info=True)
        # It's good practice to inform the client about server-side errors.
        await websocket.send_json(
            WebSocketEvent(
                event_type="on_error",
                run_id="N/A",
                data={"error": "An internal server error occurred."}
            ).model_dump()
        )
    finally:
        # Ensure the connection is closed gracefully.
        await websocket.close()
        logger.info("WebSocket connection closed.")

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

The Frontend: An RxJS-Powered Angular Service

On the Angular side, we need a service to encapsulate the WebSocket connection logic. Direct manipulation of WebSocket objects in components is a common anti-pattern that leads to tightly coupled and untestable code. An injectable service provides a clean API for components to use.

The core of this service is RxJS’s webSocket subject. It handles the underlying connection, serialization, and deserialization, and exposes the connection as an Observable, which fits perfectly into Angular’s change detection and component architecture.

// src/app/services/agent-socket.service.ts
import { Injectable } from '@angular/core';
import { Observable, Subject, timer } from 'rxjs';
import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { retryWhen, delayWhen, tap, shareReplay } from 'rxjs/operators';

// This interface should mirror the Pydantic schema on the backend.
// Keeping these in sync is crucial for type safety.
export interface AgentEvent {
  event_type: 'on_agent_start' | 'on_chain_start' | 'on_tool_start' | 'on_tool_end' | 'on_agent_action' | 'on_llm_new_token' | 'on_agent_finish' | 'on_error';
  data: any;
  run_id: string;
}

const WEBSOCKET_URL = 'ws://localhost:8000/ws';
const RECONNECT_INTERVAL = 5000; // ms

@Injectable({
  providedIn: 'root'
})
export class AgentSocketService {
  private socket$: WebSocketSubject<AgentEvent> | undefined;
  private messages$: Observable<AgentEvent> | undefined;
  
  // A subject to manually send messages. Components will call this.
  private messageSender$ = new Subject<string>();

  constructor() {
    this.messageSender$.subscribe(msg => {
      if (this.socket$) {
        this.socket$.next(msg as any); // The RxJS webSocket subject sends on next()
      } else {
        console.error('WebSocket is not connected.');
      }
    });
  }

  public connect(): Observable<AgentEvent> {
    if (!this.socket$ || this.socket$.closed) {
      const config: WebSocketSubjectConfig<AgentEvent> = {
        url: WEBSOCKET_URL,
        openObserver: {
          next: () => console.log('WebSocket connection established')
        },
        closeObserver: {
          next: () => console.log('WebSocket connection closed')
        },
        // We don't need custom deserializer since backend sends JSON
      };
      
      this.socket$ = webSocket(config);

      this.messages$ = this.socket$.pipe(
        // The retryWhen operator is essential for production mobile apps.
        // It automatically attempts to reconnect on failure.
        retryWhen(errors =>
          errors.pipe(
            tap(err => console.error(`WebSocket error: ${err}. Retrying in ${RECONNECT_INTERVAL}ms`)),
            delayWhen(() => timer(RECONNECT_INTERVAL))
          )
        ),
        // shareReplay ensures that multiple subscribers share the same
        // underlying WebSocket connection, preventing duplicate connections.
        shareReplay({ bufferSize: 1, refCount: true })
      );
    }
    return this.messages$!;
  }
  
  public sendMessage(message: string): void {
    this.messageSender$.next(message);
  }

  public close(): void {
    if (this.socket$) {
      this.socket$.complete();
      this.socket$ = undefined;
    }
  }
}

This service includes a robust reconnection strategy using retryWhen, a critical feature for mobile applications where network connectivity can be intermittent. The shareReplay operator is another production-grade addition, ensuring that even if multiple components subscribe to the message stream, only one WebSocket connection is ever created.

The Component: Managing State with Observables

With the service in place, the component’s role is simplified. It’s now responsible for orchestrating the connection, sending user input, and transforming the incoming stream of AgentEvent objects into a state that can be easily rendered by the template.

The most powerful pattern here is to use the scan operator from RxJS. Instead of manually pushing items into an array in the component class, scan allows us to build up the state declaratively within the observable pipeline itself. This keeps our component logic clean and side-effect-free.

// src/app/components/agent-chat/agent-chat.component.ts
import { Component, OnDestroy, OnInit } from '@angular/core';
import { Observable, Subject } from 'rxjs';
import { scan, startWith, takeUntil, tap } from 'rxjs/operators';
import { AgentEvent, AgentSocketService } from '../../services/agent-socket.service';

// A more specific type to represent a rendered step in our UI
interface AgentStep {
  type: AgentEvent['event_type'];
  content: string;
  isStreaming?: boolean;
}

@Component({
  selector: 'app-agent-chat',
  templateUrl: './agent-chat.component.html',
  styleUrls: ['./agent-chat.component.css']
})
export class AgentChatComponent implements OnInit, OnDestroy {
  public agentSteps$: Observable<AgentStep[]>;
  public userInput = '';
  
  private destroy$ = new Subject<void>();

  constructor(private agentSocketService: AgentSocketService) {
    // The core logic for transforming the event stream into a UI state array.
    this.agentSteps$ = this.agentSocketService.connect().pipe(
      scan((acc: AgentStep[], event: AgentEvent) => {
        // The logic here is the heart of the real-time UI updates.
        // It decides how to modify the array of steps based on the incoming event type.
        // This is far cleaner than a massive switch statement inside a subscribe block.
        switch (event.event_type) {
          case 'on_agent_action':
            return [...acc, {
              type: event.event_type,
              content: `Tool Invoked: ${event.data.tool} with input "${event.data.tool_input}"`
            }];

          case 'on_tool_end':
            return [...acc, {
              type: event.event_type,
              content: `Tool Result: ${event.data.output}`
            }];
            
          case 'on_llm_new_token':
            // This is the most complex part: handling token streaming.
            // We find the last step, assume it's a streaming step, and append the token.
            const lastStep = acc[acc.length - 1];
            if (lastStep && lastStep.isStreaming) {
                lastStep.content += event.data.token;
                return [...acc]; // Return a new array reference to trigger change detection
            } else {
                // If it's the first token, create a new streaming step
                return [...acc, {
                    type: 'on_agent_finish', // We'll render this as a final answer
                    content: event.data.token,
                    isStreaming: true
                }];
            }

          case 'on_agent_finish':
            // When the agent is finished, we find the streaming step and mark it as complete.
            const finalStepIndex = acc.findIndex(step => step.isStreaming);
            if (finalStepIndex > -1) {
                acc[finalStepIndex].isStreaming = false;
                // Optional: You could replace the content with the final log if desired
                // acc[finalStepIndex].content = event.data.log;
            }
            return [...acc];

          case 'on_error':
            return [...acc, {
              type: event.event_type,
              content: `Error: ${event.data.error}`
            }];
            
          default:
            // Ignore other events for this UI
            return acc;
        }
      }, []),
      startWith([]), // Ensure the observable emits an initial empty array.
      takeUntil(this.destroy$) // Automatically unsubscribe on component destruction.
    );
  }

  ngOnInit(): void {
    // It's good practice to have an explicit subscription for logging or side-effects
    // that don't fit into the main pipeline.
    this.agentSteps$.subscribe({
      next: (steps) => console.log('UI State Updated:', steps),
      error: (err) => console.error('Agent stream error:', err)
    });
  }

  ngOnDestroy(): void {
    this.destroy$.next();
    this.destroy$.complete();
    this.agentSocketService.close();
  }

  public sendMessage(): void {
    if (this.userInput.trim()) {
      this.agentSocketService.sendMessage(this.userInput);
      this.userInput = '';
    }
  }
}

The template then becomes remarkably simple. It uses the async pipe to subscribe to the agentSteps$ observable, and *ngFor to render the list of steps. ngSwitch is used to apply different styling or components based on the type of each step.

<!-- src/app/components/agent-chat/agent-chat.component.html -->
<div class="chat-container">
  <div class="messages">
    <div *ngFor="let step of agentSteps$ | async" class="message-row" [ngSwitch]="step.type">
      <div *ngSwitchCase="'on_agent_action'" class="step agent-action">
        <strong>Action:</strong> {{ step.content }}
      </div>
      <div *ngSwitchCase="'on_tool_end'" class="step tool-end">
        <strong>Observation:</strong> {{ step.content }}
      </div>
      <div *ngSwitchCase="'on_agent_finish'" class="step agent-finish" [class.streaming]="step.isStreaming">
        <strong>Response:</strong> <span [innerHTML]="step.content"></span>
      </div>
       <div *ngSwitchCase="'on_error'" class="step error">
        <strong>System Error:</strong> {{ step.content }}
      </div>
    </div>
  </div>

  <div class="input-area">
    <input 
      [(ngModel)]="userInput" 
      (keyup.enter)="sendMessage()" 
      placeholder="Ask the agent..."
    />
    <button (click)="sendMessage()">Send</button>
  </div>
</div>

The result of this architecture is a fluid, responsive mobile interface that provides immediate feedback to the user. As the agent on the server progresses through its reasoning loop, the UI on the device updates in lockstep, rendering each thought, tool call, and final token as it arrives. This achieves the initial goal of transparency and creates a vastly superior user experience compared to the static, high-latency nature of a traditional REST-based interaction.

This solution, however, is not without its limitations and areas for future improvement. The WebSocket server as implemented is stateful and single-instance; scaling it to handle many concurrent users would require a more sophisticated architecture, likely involving a load balancer and a backend message bus like Redis Pub/Sub to decouple the agent worker processes from the WebSocket gateway servers. Furthermore, the client-side error handling could be enhanced to provide more specific feedback on network state changes, a common occurrence in mobile environments. The current implementation also lacks a proper authentication mechanism, which would be a non-negotiable requirement for any production deployment. A token-based authentication flow during the WebSocket handshake would be the next logical step.


  TOC