Engineering a Low-Latency Visualization Pipeline with MongoDB Change Streams, NumPy Aggregation, and a Vue.js WebSocket Client


The initial system was a predictable failure. A Vue.js dashboard polling a MongoDB collection every three seconds via a simple REST API. It worked for a dozen devices generating metrics. When the client scaled to a fleet of 5,000 industrial sensors, each reporting multiple readings per second, the architecture crumbled. The polling interval became a choice between stale data and a denial-of-service attack on our own database. The frontend would lock up for hundreds of milliseconds trying to parse and render the massive JSON payloads. This wasn’t a feature request; it was an operational crisis. The requirement was clear: build a push-based, real-time pipeline capable of ingesting a high-frequency data stream, processing it, and visualizing it without causing browser seizures.

The architectural pivot was away from the pull model entirely. In a real-world project, a stateful, persistent connection is the only sane way to handle this kind of data velocity.

  1. Event Source: Instead of polling, we needed the database to tell us when new data arrived. MongoDB Change Streams are tailor-made for this. They provide a server-side push API for notifications on collection changes, effectively turning MongoDB into a message bus. This avoids the overhead of constant queries and provides near-instant data access. A replica set is a prerequisite, which is a standard production practice anyway.

  2. Processing Layer: Directly piping raw sensor data to thousands of web clients is wildly inefficient. The data is often noisy, verbose, and requires aggregation to be meaningful. A middle-tier service was non-negotiable. Python, with its unparalleled scientific computing stack, was the obvious choice. Specifically, NumPy for high-performance numerical operations. This service would subscribe to the MongoDB Change Stream, batch the incoming documents, perform calculations like moving averages or downsampling, and then broadcast the processed results.

  3. Transport Protocol: For server-to-client push, WebSockets are the industry standard. They offer a persistent, full-duplex communication channel over a single TCP connection, drastically reducing the latency and overhead associated with HTTP’s request-response cycle.

  4. Frontend Rendering: The final bottleneck is the browser’s DOM. Attempting to render tens of thousands of data points as individual DOM elements using a naive v-for loop is a recipe for disaster. The browser’s rendering engine will stall, leading to a frozen UI. The solution must be DOM virtualization—only rendering the small subset of data currently visible in the viewport.

This defines the architecture: a high-throughput data pipeline flowing from MongoDB’s core to the user’s screen.

graph TD
    A[Data Source Simulators] -- Inserts --> B{MongoDB Replica Set};
    B -- Change Stream --> C[Python Processor Service];
    subgraph C
        direction LR
        C1[Motor Client] --> C2[Data Batching];
        C2 --> C3[NumPy Aggregation];
        C3 --> C4[WebSocket Server];
    end
    C4 -- WebSocket Push --> D[Vue.js Clients];
    subgraph D
        direction LR
        D1[WebSocket Client] --> D2[Data Store];
        D2 --> D3[Virtualized List Component];
    end

Phase 1: Infrastructure and Data Simulation

Before writing any application code, the foundation must be solid. This entire system runs within Docker containers, ensuring a reproducible environment. The critical component is the MongoDB replica set.

A common mistake is developing against a standalone MongoDB instance, only to find Change Streams are unavailable. It must be a replica set.

docker-compose.yml:

version: '3.8'

services:
  mongo1:
    image: mongo:6.0
    container_name: mongo1
    command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"]
    ports:
      - 27017:27017
    healthcheck:
      test: test $$(mongosh --port 27017 --eval "try { rs.status().ok } catch (e) { rs.initiate({_id:'rs0',members:[{_id:0,host:'mongo1:27017'},{_id:1,host:'mongo2:27018'},{_id:2,host:'mongo3:27019'}]}).ok }") -q
      interval: 5s
      timeout: 30s
      start_period: 5s
      retries: 5

  mongo2:
    image: mongo:6.0
    container_name: mongo2
    command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27018"]
    ports:
      - 27018:27018

  mongo3:
    image: mongo:6.0
    container_name: mongo3
    command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27019"]
    ports:
      - 27019:27019
      
  processor:
    build:
      context: ./processor
    container_name: processor
    ports:
      - "8765:8765"
    depends_on:
      mongo1:
        condition: service_healthy
      mongo2:
        condition: service_healthy
      mongo3:
        condition: service_healthy
    environment:
      - MONGO_URI=mongodb://mongo1:27017,mongo2:27018,mongo3:27019/?replicaSet=rs0
    volumes:
      - ./processor:/app

  frontend:
    build:
      context: ./frontend
    container_name: frontend
    ports:
      - "8080:80"
    depends_on:
      - processor

networks:
  default:
    driver: bridge

To test this pipeline under realistic load, a data simulator is essential. This standalone Python script continuously inserts documents into the sensor_data collection.

data_simulator.py:

import asyncio
import random
import time
from datetime import datetime, timezone
from motor.motor_asyncio import AsyncIOMotorClient

MONGO_URI = "mongodb://localhost:27017"
DB_NAME = "iot_platform"
COLLECTION_NAME = "sensor_data"
NUM_SENSORS = 5000
BATCH_SIZE = 1000

async def main():
    """Continuously inserts batches of simulated sensor data."""
    client = AsyncIOMotorClient(MONGO_URI)
    db = client[DB_NAME]
    collection = db[COLLECTION_NAME]
    
    # Ensure collection exists
    await collection.insert_one({})
    await collection.delete_many({})
    print(f"Connected to MongoDB. Simulating data for {NUM_SENSORS} sensors.")
    
    while True:
        docs = []
        current_time = datetime.now(timezone.utc)
        for _ in range(BATCH_SIZE):
            doc = {
                "sensorId": f"sensor_{random.randint(1, NUM_SENSORS)}",
                "timestamp": current_time,
                "reading": {
                    "temperature": random.uniform(15.0, 35.0),
                    "humidity": random.uniform(40.0, 60.0),
                    "pressure": random.uniform(980.0, 1020.0),
                },
                "status": "ok"
            }
            docs.append(doc)
        
        try:
            await collection.insert_many(docs, ordered=False)
            print(f"Inserted batch of {len(docs)} documents.")
        except Exception as e:
            print(f"Error inserting documents: {e}")
            
        await asyncio.sleep(0.1) # Control insertion rate

if __name__ == "__main__":
    asyncio.run(main())

This script mimics the high-volume environment that broke the original system, providing a continuous stream of new documents for the Change Stream to capture.

Phase 2: The Python Processing Service

This service is the heart of the architecture. It must be asynchronous, resilient, and computationally efficient. It performs three key functions:

  1. Listens to the MongoDB Change Stream using motor.
  2. Batches incoming data changes to avoid processing every single event individually.
  3. Uses NumPy to perform calculations on the batch.
  4. Broadcasts the processed data to all connected WebSocket clients.

processor/main.py:

import asyncio
import json
import logging
import os
from collections import deque
from datetime import datetime, timezone

import numpy as np
import websockets
from motor.motor_asyncio import AsyncIOMotorClient

# --- Configuration ---
MONGO_URI = os.environ.get("MONGO_URI", "mongodb://localhost:27017/?replicaSet=rs0")
DB_NAME = "iot_platform"
COLLECTION_NAME = "sensor_data"
WEBSOCKET_PORT = 8765
BATCH_INTERVAL_SECONDS = 0.2  # Process data every 200ms
MOVING_AVG_WINDOW = 50 # Window for NumPy moving average

# --- Logging Setup ---
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)

# --- Global State ---
# A set of connected WebSocket clients. Using a set provides fast add/remove.
CONNECTED_CLIENTS = set()
# A thread-safe queue to hold incoming documents from MongoDB
DATA_BUFFER = deque()

def calculate_moving_average(data):
    """Calculates a moving average using NumPy's convolution method for performance."""
    if len(data) < MOVING_AVG_WINDOW:
        return None # Not enough data for a meaningful average
    
    # The 'valid' mode ensures the output is the size of data - window + 1
    return np.convolve(data, np.ones(MOVING_AVG_WINDOW), 'valid') / MOVING_AVG_WINDOW

async def register_client(websocket):
    """Adds a new client to the set of connected clients."""
    CONNECTED_CLIENTS.add(websocket)
    logging.info(f"Client connected: {websocket.remote_address}. Total clients: {len(CONNECTED_CLIENTS)}")

async def unregister_client(websocket):
    """Removes a client from the set."""
    CONNECTED_CLIENTS.remove(websocket)
    logging.info(f"Client disconnected: {websocket.remote_address}. Total clients: {len(CONNECTED_CLIENTS)}")

async def serve_clients(websocket, path):
    """Handles incoming WebSocket connections."""
    await register_client(websocket)
    try:
        # Keep the connection alive, but we don't expect messages from clients.
        await websocket.wait_closed()
    finally:
        await unregister_client(websocket)

async def broadcast_data(data):
    """Sends data to all connected clients."""
    if not CONNECTED_CLIENTS:
        return

    # A common pitfall is modifying the set while iterating. Create a copy.
    clients_to_send = list(CONNECTED_CLIENTS)
    
    # Custom JSON encoder to handle NumPy arrays and datetime objects
    def custom_encoder(obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        if isinstance(obj, datetime):
            return obj.isoformat()
        raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")

    message = json.dumps(data, default=custom_encoder)
    
    # websockets.broadcast is efficient for sending the same message to many clients.
    websockets.broadcast(clients_to_send, message)

async def data_processor():
    """
    The core loop that periodically processes the buffered data.
    This decouples database reading from client broadcasting.
    """
    logging.info("Data processor task started.")
    sensor_history = {} # Store recent readings per sensor for calculations

    while True:
        await asyncio.sleep(BATCH_INTERVAL_SECONDS)
        
        # Drain the buffer into a processing batch. This is an atomic operation.
        batch_size = len(DATA_BUFFER)
        if batch_size == 0:
            continue
        
        processing_batch = [DATA_BUFFER.popleft() for _ in range(batch_size)]

        # --- NumPy Processing Logic ---
        # A real-world scenario would be more complex. Here, we calculate a moving
        # average temperature for each sensor that reported in this batch.
        updated_sensors = {}
        for doc in processing_batch:
            sensor_id = doc.get("sensorId")
            temp = doc.get("reading", {}).get("temperature")

            if not sensor_id or temp is None:
                continue

            if sensor_id not in sensor_history:
                sensor_history[sensor_id] = deque(maxlen=MOVING_AVG_WINDOW)
            
            sensor_history[sensor_id].append(temp)
            
            # Only calculate if we have a full window
            if len(sensor_history[sensor_id]) == MOVING_AVG_WINDOW:
                temps = np.array(list(sensor_history[sensor_id]))
                avg = np.mean(temps) # Simple mean is faster than convolve for a single value
                updated_sensors[sensor_id] = {
                    "last_read": temp,
                    "moving_avg_temp": round(avg, 2),
                    "timestamp": doc.get("timestamp")
                }
        
        if not updated_sensors:
            continue

        payload = {
            "type": "data_update",
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "processed_docs_count": len(processing_batch),
            "updated_sensors_count": len(updated_sensors),
            "data": updated_sensors
        }

        await broadcast_data(payload)

async def watch_mongodb():
    """Establishes a connection to MongoDB and listens to the Change Stream."""
    logging.info(f"Connecting to MongoDB at {MONGO_URI}")
    while True:
        try:
            client = AsyncIOMotorClient(MONGO_URI)
            db = client[DB_NAME]
            collection = db[COLLECTION_NAME]
            
            # The pipeline ensures we only get notifications for 'insert' operations.
            pipeline = [{"$match": {"operationType": "insert"}}]
            
            async with collection.watch(pipeline) as stream:
                logging.info("Successfully started watching MongoDB change stream.")
                async for change in stream:
                    # 'fullDocument' contains the inserted document
                    DATA_BUFFER.append(change["fullDocument"])
        except asyncio.CancelledError:
            break
        except Exception as e:
            logging.error(f"MongoDB connection error: {e}. Retrying in 5 seconds...")
            await asyncio.sleep(5)


async def main():
    """Starts all concurrent tasks."""
    websocket_server = await websockets.serve(serve_clients, "0.0.0.0", WEBSOCKET_PORT)
    logging.info(f"WebSocket server started on port {WEBSOCKET_PORT}")

    # Run watcher and processor as concurrent tasks
    watcher_task = asyncio.create_task(watch_mongodb())
    processor_task = asyncio.create_task(data_processor())
    
    await asyncio.gather(watcher_task, processor_task)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logging.info("Shutting down.")

The key design decision here is the decoupling of data ingestion (watch_mongodb) from data processing (data_processor) via the DATA_BUFFER deque. This prevents backpressure issues. If the WebSocket broadcast is slow, it doesn’t block the ingestion from MongoDB. The batching approach is also critical for performance; running NumPy calculations on a batch of 1000 points is far more efficient than running it 1000 individual times.

Phase 3: The Vue.js Frontend

The final piece is the client application. Its primary challenge is to render a high-frequency stream of data without stuttering.

First, the WebSocket service to manage the connection.

frontend/src/services/WebSocketService.js:

import { ref } from 'vue';

const WS_URL = 'ws://localhost:8765';

class WebSocketService {
    constructor() {
        this.socket = null;
        this.connectionState = ref('disconnected'); // 'connecting', 'connected', 'disconnected', 'error'
        this.messageHandler = null;
    }

    connect(onMessage) {
        if (this.socket && this.socket.readyState === WebSocket.OPEN) {
            console.warn('WebSocket is already connected.');
            return;
        }

        this.messageHandler = onMessage;
        this.connectionState.value = 'connecting';
        
        try {
            this.socket = new WebSocket(WS_URL);
        } catch (error) {
            console.error('Failed to create WebSocket:', error);
            this.connectionState.value = 'error';
            this.reconnect();
            return;
        }

        this.socket.onopen = () => {
            console.log('WebSocket connection established.');
            this.connectionState.value = 'connected';
        };

        this.socket.onmessage = (event) => {
            try {
                const data = JSON.parse(event.data);
                if (this.messageHandler) {
                    this.messageHandler(data);
                }
            } catch (e) {
                console.error('Error parsing WebSocket message:', e);
            }
        };

        this.socket.onclose = (event) => {
            console.warn(`WebSocket connection closed: ${event.code}. Reconnecting...`);
            this.connectionState.value = 'disconnected';
            this.reconnect();
        };

        this.socket.onerror = (error) => {
            console.error('WebSocket error:', error);
            this.connectionState.value = 'error';
            // onclose will be called subsequently, triggering reconnection.
        };
    }

    reconnect() {
        // Simple exponential backoff to prevent hammering the server.
        setTimeout(() => {
            console.log('Attempting to reconnect WebSocket...');
            this.connect(this.messageHandler);
        }, 3000);
    }
    
    disconnect() {
        if (this.socket) {
            this.socket.close();
            this.socket = null;
        }
    }
}

export const webSocketService = new WebSocketService();

This service provides robust connection management, including automatic reconnection—a must for any real-world application.

Now, the component. Using vue-virtual-scroller is a pragmatic choice to avoid reinventing the wheel for DOM virtualization. It efficiently handles rendering only the visible items in a large list.

frontend/src/components/RealTimeMonitor.vue:

<template>
  <div class="monitor-container">
    <div class="header">
      <h1>Real-Time Sensor Monitor</h1>
      <div class="status">
        <span :class="['status-light', connectionStatus]"></span>
        Connection: {{ connectionStatus }} | Updates/sec: {{ updatesPerSecond.toFixed(1) }} | Total Sensors: {{ totalSensors }}
      </div>
    </div>

    <!-- The core of the performance solution: a virtual scroller -->
    <RecycleScroller
      class="scroller"
      :items="sensorList"
      :item-size="60" 
      key-field="id"
      v-slot="{ item }"
    >
      <div class="sensor-item" :class="{ 'highlight': item.highlight }">
        <div class="sensor-id">{{ item.id }}</div>
        <div class="sensor-data">
          <span>Last Temp: <strong>{{ item.last_read.toFixed(2) }}°C</strong></span>
          <span>Moving Avg: <strong>{{ item.moving_avg_temp.toFixed(2) }}°C</strong></span>
        </div>
        <div class="sensor-time">{{ formatTimestamp(item.timestamp) }}</div>
      </div>
    </RecycleScroller>
  </div>
</template>

<script setup>
import { ref, onMounted, onUnmounted, computed, nextTick } from 'vue';
import { webSocketService } from '@/services/WebSocketService';
import 'vue-virtual-scroller/dist/vue-virtual-scroller.css';
import { RecycleScroller } from 'vue-virtual-scroller';

const connectionStatus = ref('disconnected');
const sensorData = ref({});
const updateCount = ref(0);
const updatesPerSecond = ref(0);

// Computed property to transform the sensor data map into a sorted list for the scroller.
// The list must be stable for the virtual scroller to work correctly.
const sensorList = computed(() => {
    return Object.values(sensorData.value).sort((a, b) => a.id.localeCompare(b.id));
});

const totalSensors = computed(() => sensorList.value.length);

// This is the message handler passed to the WebSocket service.
const handleWebSocketMessage = (message) => {
    if (message.type === 'data_update' && message.data) {
        const updates = message.data;
        let localUpdateCount = 0;

        // The pitfall here is reactivity overhead. Mutating a large reactive object
        // key-by-key can be slow. It's often better to build a new object and assign.
        // However, for high-frequency streams, direct mutation is often more memory-efficient.
        // We'll proceed with direct mutation but be mindful of its performance characteristics.
        for (const sensorId in updates) {
            const update = updates[sensorId];
            if (sensorData.value[sensorId]) {
                // Update existing entry
                sensorData.value[sensorId].last_read = update.last_read;
                sensorData.value[sensorId].moving_avg_temp = update.moving_avg_temp;
                sensorData.value[sensorId].timestamp = update.timestamp;
                sensorData.value[sensorId].highlight = true;
            } else {
                // Add new entry
                sensorData.value[sensorId] = {
                    id: sensorId,
                    ...update,
                    highlight: true
                };
            }
            localUpdateCount++;
            
            // Remove the highlight effect after a short period to provide visual feedback.
            setTimeout(() => {
              if (sensorData.value[sensorId]) {
                sensorData.value[sensorId].highlight = false;
              }
            }, 500);
        }
        updateCount.value += localUpdateCount;
    }
};

const formatTimestamp = (isoString) => {
    if (!isoString) return '';
    return new Date(isoString).toLocaleTimeString();
};

let updateInterval;

onMounted(() => {
    connectionStatus.value = webSocketService.connectionState;
    webSocketService.connect(handleWebSocketMessage);
    
    // Set up a timer to calculate updates per second.
    updateInterval = setInterval(() => {
        updatesPerSecond.value = updateCount.value;
        updateCount.value = 0;
    }, 1000);
});

onUnmounted(() => {
    webSocketService.disconnect();
    clearInterval(updateInterval);
});
</script>

<style scoped>
.monitor-container {
  display: flex;
  flex-direction: column;
  height: 100vh;
  background-color: #1a1a1a;
  color: #e0e0e0;
  font-family: 'Courier New', Courier, monospace;
}
.header {
  padding: 1rem;
  background-color: #2c2c2c;
  border-bottom: 1px solid #444;
}
h1 {
  margin: 0;
  font-size: 1.5rem;
}
.status {
  display: flex;
  align-items: center;
  gap: 1rem;
  font-size: 0.9rem;
  margin-top: 0.5rem;
}
.status-light {
  width: 12px;
  height: 12px;
  border-radius: 50%;
  background-color: #555;
}
.status-light.connected { background-color: #4caf50; }
.status-light.disconnected { background-color: #f44336; }
.status-light.connecting { background-color: #ffeb3b; }
.status-light.error { background-color: #f44336; animation: blinker 1s linear infinite; }

@keyframes blinker { 50% { opacity: 0; } }

.scroller {
  flex-grow: 1;
  overflow-y: auto;
}
.sensor-item {
  display: flex;
  align-items: center;
  justify-content: space-between;
  padding: 0 1rem;
  height: 60px;
  border-bottom: 1px solid #333;
  transition: background-color 0.5s ease;
}
.sensor-item.highlight {
  background-color: #3a523a;
}
.sensor-id {
  font-weight: bold;
  width: 20%;
  color: #7fdbff;
}
.sensor-data {
  display: flex;
  gap: 2rem;
  width: 50%;
}
.sensor-time {
  text-align: right;
  width: 20%;
  color: #aaa;
}
</style>

The resulting system achieves its goals. The database is no longer under polling load; it pushes changes as they happen. The Python service efficiently absorbs bursts of data, aggregates it, and broadcasts meaningful updates. The Vue.js frontend remains fluid and responsive, effortlessly displaying a real-time view into thousands of data streams, thanks to the critical decision to use DOM virtualization.

This architecture, however, is not without its own set of trade-offs and future considerations. The Python processor is a single point of failure and a potential bottleneck. A production-grade system would require multiple processor instances, necessitating a message bus like Redis Pub/Sub between the MongoDB listener and the WebSocket broadcasters to distribute the load. Furthermore, the client-side data store grows indefinitely; for long-running sessions, a memory-capping strategy, such as using a fixed-size circular buffer for sensor data, would be essential to prevent browser crashes. Finally, the WebSocket protocol itself is unencrypted (ws://); production deployment would mandate TLS encryption (wss://), adding another layer of operational complexity.


  TOC