The initial system was a predictable failure. A Vue.js dashboard polling a MongoDB collection every three seconds via a simple REST API. It worked for a dozen devices generating metrics. When the client scaled to a fleet of 5,000 industrial sensors, each reporting multiple readings per second, the architecture crumbled. The polling interval became a choice between stale data and a denial-of-service attack on our own database. The frontend would lock up for hundreds of milliseconds trying to parse and render the massive JSON payloads. This wasn’t a feature request; it was an operational crisis. The requirement was clear: build a push-based, real-time pipeline capable of ingesting a high-frequency data stream, processing it, and visualizing it without causing browser seizures.
The architectural pivot was away from the pull model entirely. In a real-world project, a stateful, persistent connection is the only sane way to handle this kind of data velocity.
Event Source: Instead of polling, we needed the database to tell us when new data arrived. MongoDB Change Streams are tailor-made for this. They provide a server-side push API for notifications on collection changes, effectively turning MongoDB into a message bus. This avoids the overhead of constant queries and provides near-instant data access. A replica set is a prerequisite, which is a standard production practice anyway.
Processing Layer: Directly piping raw sensor data to thousands of web clients is wildly inefficient. The data is often noisy, verbose, and requires aggregation to be meaningful. A middle-tier service was non-negotiable. Python, with its unparalleled scientific computing stack, was the obvious choice. Specifically,
NumPy
for high-performance numerical operations. This service would subscribe to the MongoDB Change Stream, batch the incoming documents, perform calculations like moving averages or downsampling, and then broadcast the processed results.Transport Protocol: For server-to-client push, WebSockets are the industry standard. They offer a persistent, full-duplex communication channel over a single TCP connection, drastically reducing the latency and overhead associated with HTTP’s request-response cycle.
Frontend Rendering: The final bottleneck is the browser’s DOM. Attempting to render tens of thousands of data points as individual DOM elements using a naive
v-for
loop is a recipe for disaster. The browser’s rendering engine will stall, leading to a frozen UI. The solution must be DOM virtualization—only rendering the small subset of data currently visible in the viewport.
This defines the architecture: a high-throughput data pipeline flowing from MongoDB’s core to the user’s screen.
graph TD A[Data Source Simulators] -- Inserts --> B{MongoDB Replica Set}; B -- Change Stream --> C[Python Processor Service]; subgraph C direction LR C1[Motor Client] --> C2[Data Batching]; C2 --> C3[NumPy Aggregation]; C3 --> C4[WebSocket Server]; end C4 -- WebSocket Push --> D[Vue.js Clients]; subgraph D direction LR D1[WebSocket Client] --> D2[Data Store]; D2 --> D3[Virtualized List Component]; end
Phase 1: Infrastructure and Data Simulation
Before writing any application code, the foundation must be solid. This entire system runs within Docker containers, ensuring a reproducible environment. The critical component is the MongoDB replica set.
A common mistake is developing against a standalone MongoDB instance, only to find Change Streams are unavailable. It must be a replica set.
docker-compose.yml
:
version: '3.8'
services:
mongo1:
image: mongo:6.0
container_name: mongo1
command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"]
ports:
- 27017:27017
healthcheck:
test: test $$(mongosh --port 27017 --eval "try { rs.status().ok } catch (e) { rs.initiate({_id:'rs0',members:[{_id:0,host:'mongo1:27017'},{_id:1,host:'mongo2:27018'},{_id:2,host:'mongo3:27019'}]}).ok }") -q
interval: 5s
timeout: 30s
start_period: 5s
retries: 5
mongo2:
image: mongo:6.0
container_name: mongo2
command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27018"]
ports:
- 27018:27018
mongo3:
image: mongo:6.0
container_name: mongo3
command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27019"]
ports:
- 27019:27019
processor:
build:
context: ./processor
container_name: processor
ports:
- "8765:8765"
depends_on:
mongo1:
condition: service_healthy
mongo2:
condition: service_healthy
mongo3:
condition: service_healthy
environment:
- MONGO_URI=mongodb://mongo1:27017,mongo2:27018,mongo3:27019/?replicaSet=rs0
volumes:
- ./processor:/app
frontend:
build:
context: ./frontend
container_name: frontend
ports:
- "8080:80"
depends_on:
- processor
networks:
default:
driver: bridge
To test this pipeline under realistic load, a data simulator is essential. This standalone Python script continuously inserts documents into the sensor_data
collection.
data_simulator.py
:
import asyncio
import random
import time
from datetime import datetime, timezone
from motor.motor_asyncio import AsyncIOMotorClient
MONGO_URI = "mongodb://localhost:27017"
DB_NAME = "iot_platform"
COLLECTION_NAME = "sensor_data"
NUM_SENSORS = 5000
BATCH_SIZE = 1000
async def main():
"""Continuously inserts batches of simulated sensor data."""
client = AsyncIOMotorClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
# Ensure collection exists
await collection.insert_one({})
await collection.delete_many({})
print(f"Connected to MongoDB. Simulating data for {NUM_SENSORS} sensors.")
while True:
docs = []
current_time = datetime.now(timezone.utc)
for _ in range(BATCH_SIZE):
doc = {
"sensorId": f"sensor_{random.randint(1, NUM_SENSORS)}",
"timestamp": current_time,
"reading": {
"temperature": random.uniform(15.0, 35.0),
"humidity": random.uniform(40.0, 60.0),
"pressure": random.uniform(980.0, 1020.0),
},
"status": "ok"
}
docs.append(doc)
try:
await collection.insert_many(docs, ordered=False)
print(f"Inserted batch of {len(docs)} documents.")
except Exception as e:
print(f"Error inserting documents: {e}")
await asyncio.sleep(0.1) # Control insertion rate
if __name__ == "__main__":
asyncio.run(main())
This script mimics the high-volume environment that broke the original system, providing a continuous stream of new documents for the Change Stream to capture.
Phase 2: The Python Processing Service
This service is the heart of the architecture. It must be asynchronous, resilient, and computationally efficient. It performs three key functions:
- Listens to the MongoDB Change Stream using
motor
. - Batches incoming data changes to avoid processing every single event individually.
- Uses
NumPy
to perform calculations on the batch. - Broadcasts the processed data to all connected WebSocket clients.
processor/main.py
:
import asyncio
import json
import logging
import os
from collections import deque
from datetime import datetime, timezone
import numpy as np
import websockets
from motor.motor_asyncio import AsyncIOMotorClient
# --- Configuration ---
MONGO_URI = os.environ.get("MONGO_URI", "mongodb://localhost:27017/?replicaSet=rs0")
DB_NAME = "iot_platform"
COLLECTION_NAME = "sensor_data"
WEBSOCKET_PORT = 8765
BATCH_INTERVAL_SECONDS = 0.2 # Process data every 200ms
MOVING_AVG_WINDOW = 50 # Window for NumPy moving average
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
# --- Global State ---
# A set of connected WebSocket clients. Using a set provides fast add/remove.
CONNECTED_CLIENTS = set()
# A thread-safe queue to hold incoming documents from MongoDB
DATA_BUFFER = deque()
def calculate_moving_average(data):
"""Calculates a moving average using NumPy's convolution method for performance."""
if len(data) < MOVING_AVG_WINDOW:
return None # Not enough data for a meaningful average
# The 'valid' mode ensures the output is the size of data - window + 1
return np.convolve(data, np.ones(MOVING_AVG_WINDOW), 'valid') / MOVING_AVG_WINDOW
async def register_client(websocket):
"""Adds a new client to the set of connected clients."""
CONNECTED_CLIENTS.add(websocket)
logging.info(f"Client connected: {websocket.remote_address}. Total clients: {len(CONNECTED_CLIENTS)}")
async def unregister_client(websocket):
"""Removes a client from the set."""
CONNECTED_CLIENTS.remove(websocket)
logging.info(f"Client disconnected: {websocket.remote_address}. Total clients: {len(CONNECTED_CLIENTS)}")
async def serve_clients(websocket, path):
"""Handles incoming WebSocket connections."""
await register_client(websocket)
try:
# Keep the connection alive, but we don't expect messages from clients.
await websocket.wait_closed()
finally:
await unregister_client(websocket)
async def broadcast_data(data):
"""Sends data to all connected clients."""
if not CONNECTED_CLIENTS:
return
# A common pitfall is modifying the set while iterating. Create a copy.
clients_to_send = list(CONNECTED_CLIENTS)
# Custom JSON encoder to handle NumPy arrays and datetime objects
def custom_encoder(obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
message = json.dumps(data, default=custom_encoder)
# websockets.broadcast is efficient for sending the same message to many clients.
websockets.broadcast(clients_to_send, message)
async def data_processor():
"""
The core loop that periodically processes the buffered data.
This decouples database reading from client broadcasting.
"""
logging.info("Data processor task started.")
sensor_history = {} # Store recent readings per sensor for calculations
while True:
await asyncio.sleep(BATCH_INTERVAL_SECONDS)
# Drain the buffer into a processing batch. This is an atomic operation.
batch_size = len(DATA_BUFFER)
if batch_size == 0:
continue
processing_batch = [DATA_BUFFER.popleft() for _ in range(batch_size)]
# --- NumPy Processing Logic ---
# A real-world scenario would be more complex. Here, we calculate a moving
# average temperature for each sensor that reported in this batch.
updated_sensors = {}
for doc in processing_batch:
sensor_id = doc.get("sensorId")
temp = doc.get("reading", {}).get("temperature")
if not sensor_id or temp is None:
continue
if sensor_id not in sensor_history:
sensor_history[sensor_id] = deque(maxlen=MOVING_AVG_WINDOW)
sensor_history[sensor_id].append(temp)
# Only calculate if we have a full window
if len(sensor_history[sensor_id]) == MOVING_AVG_WINDOW:
temps = np.array(list(sensor_history[sensor_id]))
avg = np.mean(temps) # Simple mean is faster than convolve for a single value
updated_sensors[sensor_id] = {
"last_read": temp,
"moving_avg_temp": round(avg, 2),
"timestamp": doc.get("timestamp")
}
if not updated_sensors:
continue
payload = {
"type": "data_update",
"timestamp": datetime.now(timezone.utc).isoformat(),
"processed_docs_count": len(processing_batch),
"updated_sensors_count": len(updated_sensors),
"data": updated_sensors
}
await broadcast_data(payload)
async def watch_mongodb():
"""Establishes a connection to MongoDB and listens to the Change Stream."""
logging.info(f"Connecting to MongoDB at {MONGO_URI}")
while True:
try:
client = AsyncIOMotorClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
# The pipeline ensures we only get notifications for 'insert' operations.
pipeline = [{"$match": {"operationType": "insert"}}]
async with collection.watch(pipeline) as stream:
logging.info("Successfully started watching MongoDB change stream.")
async for change in stream:
# 'fullDocument' contains the inserted document
DATA_BUFFER.append(change["fullDocument"])
except asyncio.CancelledError:
break
except Exception as e:
logging.error(f"MongoDB connection error: {e}. Retrying in 5 seconds...")
await asyncio.sleep(5)
async def main():
"""Starts all concurrent tasks."""
websocket_server = await websockets.serve(serve_clients, "0.0.0.0", WEBSOCKET_PORT)
logging.info(f"WebSocket server started on port {WEBSOCKET_PORT}")
# Run watcher and processor as concurrent tasks
watcher_task = asyncio.create_task(watch_mongodb())
processor_task = asyncio.create_task(data_processor())
await asyncio.gather(watcher_task, processor_task)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Shutting down.")
The key design decision here is the decoupling of data ingestion (watch_mongodb
) from data processing (data_processor
) via the DATA_BUFFER
deque. This prevents backpressure issues. If the WebSocket broadcast is slow, it doesn’t block the ingestion from MongoDB. The batching approach is also critical for performance; running NumPy calculations on a batch of 1000 points is far more efficient than running it 1000 individual times.
Phase 3: The Vue.js Frontend
The final piece is the client application. Its primary challenge is to render a high-frequency stream of data without stuttering.
First, the WebSocket service to manage the connection.
frontend/src/services/WebSocketService.js
:
import { ref } from 'vue';
const WS_URL = 'ws://localhost:8765';
class WebSocketService {
constructor() {
this.socket = null;
this.connectionState = ref('disconnected'); // 'connecting', 'connected', 'disconnected', 'error'
this.messageHandler = null;
}
connect(onMessage) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
console.warn('WebSocket is already connected.');
return;
}
this.messageHandler = onMessage;
this.connectionState.value = 'connecting';
try {
this.socket = new WebSocket(WS_URL);
} catch (error) {
console.error('Failed to create WebSocket:', error);
this.connectionState.value = 'error';
this.reconnect();
return;
}
this.socket.onopen = () => {
console.log('WebSocket connection established.');
this.connectionState.value = 'connected';
};
this.socket.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (this.messageHandler) {
this.messageHandler(data);
}
} catch (e) {
console.error('Error parsing WebSocket message:', e);
}
};
this.socket.onclose = (event) => {
console.warn(`WebSocket connection closed: ${event.code}. Reconnecting...`);
this.connectionState.value = 'disconnected';
this.reconnect();
};
this.socket.onerror = (error) => {
console.error('WebSocket error:', error);
this.connectionState.value = 'error';
// onclose will be called subsequently, triggering reconnection.
};
}
reconnect() {
// Simple exponential backoff to prevent hammering the server.
setTimeout(() => {
console.log('Attempting to reconnect WebSocket...');
this.connect(this.messageHandler);
}, 3000);
}
disconnect() {
if (this.socket) {
this.socket.close();
this.socket = null;
}
}
}
export const webSocketService = new WebSocketService();
This service provides robust connection management, including automatic reconnection—a must for any real-world application.
Now, the component. Using vue-virtual-scroller
is a pragmatic choice to avoid reinventing the wheel for DOM virtualization. It efficiently handles rendering only the visible items in a large list.
frontend/src/components/RealTimeMonitor.vue
:
<template>
<div class="monitor-container">
<div class="header">
<h1>Real-Time Sensor Monitor</h1>
<div class="status">
<span :class="['status-light', connectionStatus]"></span>
Connection: {{ connectionStatus }} | Updates/sec: {{ updatesPerSecond.toFixed(1) }} | Total Sensors: {{ totalSensors }}
</div>
</div>
<!-- The core of the performance solution: a virtual scroller -->
<RecycleScroller
class="scroller"
:items="sensorList"
:item-size="60"
key-field="id"
v-slot="{ item }"
>
<div class="sensor-item" :class="{ 'highlight': item.highlight }">
<div class="sensor-id">{{ item.id }}</div>
<div class="sensor-data">
<span>Last Temp: <strong>{{ item.last_read.toFixed(2) }}°C</strong></span>
<span>Moving Avg: <strong>{{ item.moving_avg_temp.toFixed(2) }}°C</strong></span>
</div>
<div class="sensor-time">{{ formatTimestamp(item.timestamp) }}</div>
</div>
</RecycleScroller>
</div>
</template>
<script setup>
import { ref, onMounted, onUnmounted, computed, nextTick } from 'vue';
import { webSocketService } from '@/services/WebSocketService';
import 'vue-virtual-scroller/dist/vue-virtual-scroller.css';
import { RecycleScroller } from 'vue-virtual-scroller';
const connectionStatus = ref('disconnected');
const sensorData = ref({});
const updateCount = ref(0);
const updatesPerSecond = ref(0);
// Computed property to transform the sensor data map into a sorted list for the scroller.
// The list must be stable for the virtual scroller to work correctly.
const sensorList = computed(() => {
return Object.values(sensorData.value).sort((a, b) => a.id.localeCompare(b.id));
});
const totalSensors = computed(() => sensorList.value.length);
// This is the message handler passed to the WebSocket service.
const handleWebSocketMessage = (message) => {
if (message.type === 'data_update' && message.data) {
const updates = message.data;
let localUpdateCount = 0;
// The pitfall here is reactivity overhead. Mutating a large reactive object
// key-by-key can be slow. It's often better to build a new object and assign.
// However, for high-frequency streams, direct mutation is often more memory-efficient.
// We'll proceed with direct mutation but be mindful of its performance characteristics.
for (const sensorId in updates) {
const update = updates[sensorId];
if (sensorData.value[sensorId]) {
// Update existing entry
sensorData.value[sensorId].last_read = update.last_read;
sensorData.value[sensorId].moving_avg_temp = update.moving_avg_temp;
sensorData.value[sensorId].timestamp = update.timestamp;
sensorData.value[sensorId].highlight = true;
} else {
// Add new entry
sensorData.value[sensorId] = {
id: sensorId,
...update,
highlight: true
};
}
localUpdateCount++;
// Remove the highlight effect after a short period to provide visual feedback.
setTimeout(() => {
if (sensorData.value[sensorId]) {
sensorData.value[sensorId].highlight = false;
}
}, 500);
}
updateCount.value += localUpdateCount;
}
};
const formatTimestamp = (isoString) => {
if (!isoString) return '';
return new Date(isoString).toLocaleTimeString();
};
let updateInterval;
onMounted(() => {
connectionStatus.value = webSocketService.connectionState;
webSocketService.connect(handleWebSocketMessage);
// Set up a timer to calculate updates per second.
updateInterval = setInterval(() => {
updatesPerSecond.value = updateCount.value;
updateCount.value = 0;
}, 1000);
});
onUnmounted(() => {
webSocketService.disconnect();
clearInterval(updateInterval);
});
</script>
<style scoped>
.monitor-container {
display: flex;
flex-direction: column;
height: 100vh;
background-color: #1a1a1a;
color: #e0e0e0;
font-family: 'Courier New', Courier, monospace;
}
.header {
padding: 1rem;
background-color: #2c2c2c;
border-bottom: 1px solid #444;
}
h1 {
margin: 0;
font-size: 1.5rem;
}
.status {
display: flex;
align-items: center;
gap: 1rem;
font-size: 0.9rem;
margin-top: 0.5rem;
}
.status-light {
width: 12px;
height: 12px;
border-radius: 50%;
background-color: #555;
}
.status-light.connected { background-color: #4caf50; }
.status-light.disconnected { background-color: #f44336; }
.status-light.connecting { background-color: #ffeb3b; }
.status-light.error { background-color: #f44336; animation: blinker 1s linear infinite; }
@keyframes blinker { 50% { opacity: 0; } }
.scroller {
flex-grow: 1;
overflow-y: auto;
}
.sensor-item {
display: flex;
align-items: center;
justify-content: space-between;
padding: 0 1rem;
height: 60px;
border-bottom: 1px solid #333;
transition: background-color 0.5s ease;
}
.sensor-item.highlight {
background-color: #3a523a;
}
.sensor-id {
font-weight: bold;
width: 20%;
color: #7fdbff;
}
.sensor-data {
display: flex;
gap: 2rem;
width: 50%;
}
.sensor-time {
text-align: right;
width: 20%;
color: #aaa;
}
</style>
The resulting system achieves its goals. The database is no longer under polling load; it pushes changes as they happen. The Python service efficiently absorbs bursts of data, aggregates it, and broadcasts meaningful updates. The Vue.js frontend remains fluid and responsive, effortlessly displaying a real-time view into thousands of data streams, thanks to the critical decision to use DOM virtualization.
This architecture, however, is not without its own set of trade-offs and future considerations. The Python processor is a single point of failure and a potential bottleneck. A production-grade system would require multiple processor instances, necessitating a message bus like Redis Pub/Sub between the MongoDB listener and the WebSocket broadcasters to distribute the load. Furthermore, the client-side data store grows indefinitely; for long-running sessions, a memory-capping strategy, such as using a fixed-size circular buffer for sensor data, would be essential to prevent browser crashes. Finally, the WebSocket protocol itself is unencrypted (ws://
); production deployment would mandate TLS encryption (wss://
), adding another layer of operational complexity.