The requirement was deceptively simple: enable multiple users to edit a financial data grid simultaneously. The existing platform was a standard monolith with an Express.js backend, a PostgreSQL database, and a React frontend. Off-the-shelf collaborative solutions were evaluated, but they introduced significant architectural overhead and licensing costs. The decision was made to build a tailored solution leveraging our existing stack. The core technical pain point wasn’t just about real-time communication; it was about guaranteeing data consistency and providing a complete audit trail of every change without corrupting the state of the grid, even under concurrent edits.
Our initial concept avoided complex Conflict-free Replicated Data Types (CRDTs) in favor of a centralized, server-authoritative Operational Transformation (OT) model. In this model, clients don’t send their new state; they send their intent as a discrete operation. The server’s role is to be the single source of truth for the order of these operations. This approach simplifies the client-side logic immensely and maps well to a relational database, which can store an immutable log of these operations.
The technology selection was a pragmatic extension of our current stack.
- Express.js with the
ws
library: Instead of introducing another service, we augment the existing Express server to handle WebSocket connections. This keeps the infrastructure footprint small. - PostgreSQL: Chosen for its transactional integrity (ACID compliance). We would not store the grid’s current state directly. Instead, a table would act as an immutable, append-only log of every operation performed on a grid. This is a critical design choice. It gives us history, replayability, and state reconstruction for free. The performance penalty of not having the materialized state is acceptable for our use case, as grids are loaded infrequently but edited interactively.
- MobX: On the frontend, the state of a large grid can be complex. MobX’s fine-grained reactivity is ideal. When a new operation arrives from the server, we only need to mutate a specific part of our state tree (e.g., a single cell’s value), and MobX ensures that only the corresponding UI component re-renders. This is far more efficient than a top-down re-render of the entire grid.
- Cypress: End-to-end testing is non-negotiable for a feature this complex. The subtle race conditions and synchronization bugs in collaborative systems are nearly impossible to catch manually. Cypress allows us to script multi-user scenarios and assert state consistency across different client views.
Backend Implementation: The Authoritative Log
The foundation of the system is the database schema. It’s shockingly simple, designed to be an append-only ledger.
-- DDL for grid_operations table in PostgreSQL
CREATE TABLE grid_operations (
id BIGSERIAL PRIMARY KEY,
grid_id VARCHAR(36) NOT NULL,
operation_version BIGINT NOT NULL,
operation_payload JSONB NOT NULL,
client_id VARCHAR(36) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT unique_version_per_grid UNIQUE (grid_id, operation_version)
);
CREATE INDEX idx_grid_operations_grid_id_version ON grid_operations (grid_id, operation_version ASC);
The key here is the unique_version_per_grid
constraint. The server will be responsible for assigning a monotonically increasing operation_version
to each incoming operation for a specific grid. This constraint enforces strict ordering and prevents any possibility of duplicate or out-of-order operations being saved, which would corrupt the log.
The Express.js server is extended to handle WebSocket upgrades on a specific route. We use the ws
library for this.
// server.js
const express = require('express');
const http = require('http');
const { WebSocketServer } = require('ws');
const { Pool } = require('pg');
const { v4: uuidv4 } = require('uuid');
const app = express();
const server = http.createServer(app);
// --- Database Configuration ---
const pool = new Pool({
user: 'grid_user',
host: 'localhost',
database: 'collaborative_grid_db',
password: 'secure_password',
port: 5432,
});
// In-memory map to manage clients per grid
// In a production environment, this should be backed by Redis or similar
// to support multiple server instances.
const gridClients = new Map();
// --- WebSocket Server Setup ---
const wss = new WebSocketServer({ noServer: true });
server.on('upgrade', (request, socket, head) => {
// Simple URL parsing to extract gridId
const [pathname] = request.url.split('?');
const gridIdMatch = pathname.match(/^\/ws\/grid\/([a-zA-Z0-9-]+)$/);
if (!gridIdMatch) {
socket.destroy();
return;
}
const gridId = gridIdMatch[1];
wss.handleUpgrade(request, socket, head, (ws) => {
ws.gridId = gridId; // Attach gridId to the WebSocket instance
wss.emit('connection', ws, request);
});
});
wss.on('connection', async (ws, request) => {
const clientId = uuidv4();
ws.clientId = clientId;
const gridId = ws.gridId;
// Register client
if (!gridClients.has(gridId)) {
gridClients.set(gridId, new Set());
}
gridClients.get(gridId).add(ws);
console.log(`Client ${clientId} connected to grid ${gridId}`);
try {
// --- On connection, send the entire operation history to the new client ---
// This allows the client to build its state from scratch.
const historyResult = await pool.query(
'SELECT operation_payload, operation_version FROM grid_operations WHERE grid_id = $1 ORDER BY operation_version ASC',
[gridId]
);
ws.send(JSON.stringify({
type: 'INIT',
payload: {
operations: historyResult.rows.map(row => row.operation_payload),
version: historyResult.rows.length,
}
}));
} catch (err) {
console.error('Failed to fetch operation history:', err);
ws.close(1011, 'Database error');
}
ws.on('message', async (message) => {
try {
const { type, payload } = JSON.parse(message);
if (type === 'SUBMIT_OPERATION') {
await handleOperation(ws, payload);
}
} catch (e) {
console.error('Failed to process message:', e);
// Do not close connection for a single bad message
}
});
ws.on('close', () => {
console.log(`Client ${clientId} disconnected from grid ${gridId}`);
const clients = gridClients.get(gridId);
if (clients) {
clients.delete(ws);
if (clients.size === 0) {
gridClients.delete(gridId);
}
}
});
ws.on('error', (err) => {
console.error(`WebSocket error for client ${clientId}:`, err);
});
});
async function handleOperation(ws, clientOperation) {
const { gridId, clientId } = ws;
const dbClient = await pool.connect();
try {
await dbClient.query('BEGIN');
// Lock the grid for a consistent version read and increment.
// In a high-contention scenario, a separate table for grid metadata
// with a version counter would be better. Here, we derive it.
const lastVersionResult = await dbClient.query(
'SELECT COALESCE(MAX(operation_version), 0) as version FROM grid_operations WHERE grid_id = $1 FOR UPDATE',
[gridId]
);
const lastVersion = lastVersionResult.rows[0].version;
const newVersion = lastVersion + 1;
// A critical pitfall is assuming the client's version is correct.
// The server MUST be the sole authority on versioning. We ignore
// any version number sent by the client and assign our own.
const serverOperation = {
...clientOperation,
meta: {
clientId,
timestamp: new Date().toISOString()
}
};
await dbClient.query(
'INSERT INTO grid_operations (grid_id, operation_version, operation_payload, client_id) VALUES ($1, $2, $3, $4)',
[gridId, newVersion, JSON.stringify(serverOperation), clientId]
);
await dbClient.query('COMMIT');
// Broadcast the confirmed operation to all clients on the same grid
broadcastOperation(gridId, serverOperation, clientId);
} catch (err) {
await dbClient.query('ROLLBACK');
console.error(`Transaction failed for grid ${gridId}:`, err);
// Optionally, send an error message back to the originating client
ws.send(JSON.stringify({ type: 'OPERATION_REJECTED', reason: 'Failed to commit operation' }));
} finally {
dbClient.release();
}
}
function broadcastOperation(gridId, operation, originClientId) {
const clients = gridClients.get(gridId);
if (!clients) return;
const message = JSON.stringify({
type: 'NEW_OPERATION',
payload: operation,
});
clients.forEach(client => {
// Broadcast to everyone *except* the client that sent the operation,
// as they should handle their own optimistic updates (or wait for confirmation).
// For simplicity here, we'll broadcast to all, and the client can ignore its own ops.
if (client.readyState === 1) { // WebSocket.OPEN
client.send(message);
}
});
}
const PORT = process.env.PORT || 3001;
server.listen(PORT, () => {
console.log(`Server listening on port ${PORT}`);
});
This backend code establishes the core logic: connect, receive full history, submit new operations within a database transaction, and broadcast confirmed operations. The use of FOR UPDATE
provides a pessimistic lock to ensure that version assignment is atomic per grid, preventing race conditions if two operations arrive at nearly the same instant.
Frontend Implementation: The Reactive State
On the frontend, MobX manages the complex state of the grid. The GridStore
is the heart of the client-side logic.
// GridStore.ts
import { makeAutoObservable, observable, action, runInAction } from 'mobx';
// A simple representation of a cell's state
interface ICell {
id: string;
row: number;
col: number;
value: string;
}
// Define the shape of operations
type OperationPayload =
| { type: 'UPDATE_CELL'; row: number; col: number; value: string; }
| { type: 'ADD_ROW'; atIndex: number; };
// ... other operations like ADD_COLUMN, etc.
class GridStore {
// Using a map is more performant for sparse grids or frequent lookups
// than a 2D array for very large grids.
cells = observable.map<string, ICell>();
rows = 10;
cols = 10;
connectionStatus: 'disconnected' | 'connecting' | 'connected' = 'disconnected';
gridId: string | null = null;
private ws: WebSocket | null = null;
private localVersion = 0;
constructor() {
makeAutoObservable(this, {
// Internal methods should not be part of the public observable API
ws: false,
applyOperation: action,
});
this.initializeGrid(this.rows, this.cols);
}
@action
initializeGrid(rows: number, cols: number) {
this.cells.clear();
this.rows = rows;
this.cols = cols;
for (let r = 0; r < rows; r++) {
for (let c = 0; c < cols; c++) {
const id = `${r}-${c}`;
this.cells.set(id, { id, row: r, col: c, value: '' });
}
}
}
getCell(row: number, col: number): ICell | undefined {
return this.cells.get(`${row}-${col}`);
}
@action
connect(gridId: string) {
if (this.ws || this.connectionStatus === 'connecting') return;
this.gridId = gridId;
this.connectionStatus = 'connecting';
// In a real app, use wss:// for secure connections
this.ws = new WebSocket(`ws://localhost:3001/ws/grid/${gridId}`);
this.ws.onopen = () => {
runInAction(() => {
this.connectionStatus = 'connected';
});
console.log('WebSocket connected');
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
switch (message.type) {
case 'INIT':
this.handleInit(message.payload);
break;
case 'NEW_OPERATION':
this.applyOperation(message.payload);
this.localVersion++; // Increment version on receiving a confirmed operation
break;
case 'OPERATION_REJECTED':
console.error('Operation rejected by server:', message.reason);
// Here, you would implement logic to revert optimistic updates if you had them.
break;
}
};
this.ws.onclose = () => {
runInAction(() => {
this.connectionStatus = 'disconnected';
});
console.log('WebSocket disconnected. Attempting to reconnect...');
// Implement reconnection logic here
this.ws = null;
};
this.ws.onerror = (err) => {
console.error('WebSocket error:', err);
this.ws?.close();
};
}
@action
private handleInit(payload: { operations: OperationPayload[], version: number }) {
// On initialization, reset the grid and apply the entire history
this.initializeGrid(10, 10); // Or use metadata from payload
payload.operations.forEach(op => this.applyOperation(op));
this.localVersion = payload.version;
console.log(`Grid initialized to version ${this.localVersion}`);
}
// The core logic for mutating state based on an operation
private applyOperation(op: OperationPayload) {
switch (op.type) {
case 'UPDATE_CELL':
const cell = this.getCell(op.row, op.col);
if (cell) {
cell.value = op.value;
}
break;
// Implement other operation types here
}
}
submitCellUpdate(row: number, col: number, value: string) {
if (this.connectionStatus !== 'connected' || !this.ws) {
console.error("Can't submit operation: not connected.");
return;
}
const op: OperationPayload = { type: 'UPDATE_CELL', row, col, value };
// We are not performing optimistic updates here for simplicity.
// The UI will only update when the server broadcasts the operation back to us.
// This guarantees consistency at the cost of perceived latency.
this.ws.send(JSON.stringify({ type: 'SUBMIT_OPERATION', payload: op }));
}
}
export const gridStore = new GridStore();
The React component becomes a simple consumer of this store, using observer
from mobx-react-lite
to ensure efficient rendering.
// DataGrid.tsx
import React, { useEffect } from 'react';
import { observer } from 'mobx-react-lite';
import { gridStore } from './GridStore';
const GRID_ID = 'default-grid'; // In a real app, this comes from URL params
const Cell = observer(({ row, col }: { row: number; col: number }) => {
const cell = gridStore.getCell(row, col);
const handleChange = (e: React.ChangeEvent<HTMLInputElement>) => {
// Debouncing this input is a common production optimization
gridStore.submitCellUpdate(row, col, e.target.value);
};
// The key is critical for React's reconciliation process.
return <input key={`${row}-${col}`} value={cell?.value || ''} onChange={handleChange} />;
});
export const DataGrid = observer(() => {
useEffect(() => {
gridStore.connect(GRID_ID);
// Cleanup on unmount
return () => { /* Disconnect logic here */ };
}, []);
if (gridStore.connectionStatus !== 'connected') {
return <div>Connecting to grid...</div>;
}
const gridCells = [];
for (let r = 0; r < gridStore.rows; r++) {
const rowCells = [];
for (let c = 0; c < gridStore.cols; c++) {
rowCells.push(<td key={c}><Cell row={r} col={c} /></td>);
}
gridCells.push(<tr key={r}>{rowCells}</tr>);
}
return (
<div>
<h1>Collaborative Grid: {GRID_ID}</h1>
<p>Status: {gridStore.connectionStatus}</p>
<table>
<tbody>{gridCells}</tbody>
</table>
</div>
);
});
End-to-End Testing: Guaranteeing Correctness with Cypress
This is where the investment pays off. We can automate the validation of the entire flow. The challenge in testing collaborative features is managing multiple user sessions. Cypress doesn’t natively support two browsers in one test, but we can simulate it by having one test perform actions and another test (or the same test after a reload) verify the results, which simulates a second client connecting and receiving the state.
A prerequisite for this test is a Cypress task to reset the database.
// cypress/plugins/index.js (or cypress.config.js for newer versions)
const { Pool } = require('pg');
module.exports = (on, config) => {
const pool = new Pool({ /* ... db config ... */ });
on('task', {
async 'db:reset'() {
try {
await pool.query('TRUNCATE TABLE grid_operations');
return null;
} catch (e) {
console.error(e);
throw e;
}
}
});
};
Now, the test spec can be written.
// cypress/e2e/collaboration.cy.js
describe('Real-Time Collaborative Grid', () => {
const GRID_ID = 'test-grid-123';
const GRID_URL = `/grid/${GRID_ID}`; // Assuming you have routing set up
const getCellInput = (row, col) => {
return cy.get(`tbody tr`).eq(row).find('td').eq(col).find('input');
};
beforeEach(() => {
cy.task('db:reset');
// We will simulate two users by using two different windows/sessions conceptually.
// For this test, `cy.visit` represents a client connecting.
});
it('should reflect changes made by one user on another user’s screen', () => {
// --- User A's session ---
cy.visit(GRID_URL);
// User A types into cell (0, 0)
const userA_text = 'Hello from User A';
getCellInput(0, 0).type(userA_text);
getCellInput(0, 0).should('have.value', userA_text);
// --- User B's session ---
// Now, we simulate User B loading the page. They should receive the
// history of operations and see User A's change.
cy.visit(GRID_URL);
// Verify User B sees the text from User A
getCellInput(0, 0).should('have.value', userA_text);
// User B now types into cell (1, 1)
const userB_text = 'User B acknowledges';
getCellInput(1, 1).type(userB_text);
getCellInput(1, 1).should('have.value', userB_text);
// --- User A's session refreshed ---
// Let's go back to User A's perspective. After a reload, they should
// see both their original change and User B's change.
cy.visit(GRID_URL);
getCellInput(0, 0).should('have.value', userA_text);
getCellInput(1, 1).should('have.value', userB_text);
});
});
This test confirms the entire data flow: client action, server persistence, broadcasting, and state reconstruction on a new client connection.
Here is a Mermaid diagram illustrating the core operation flow:
sequenceDiagram participant UserA as User A's Browser participant UserB as User B's Browser participant Server as Express.js / WebSocket Server participant DB as PostgreSQL Database UserA->>+Server: WebSocket Connection Server->>+DB: SELECT * FROM grid_operations DB-->>-Server: Operation History Server-->>-UserA: INIT message with operations UserB->>+Server: WebSocket Connection Server->>+DB: SELECT * FROM grid_operations DB-->>-Server: Operation History Server-->>-UserB: INIT message with operations UserA->>UserA: Edits Cell (0,0) to "New Value" UserA->>+Server: SUBMIT_OPERATION { type: 'UPDATE_CELL', ... } Server->>+DB: BEGIN TRANSACTION Server->>DB: INSERT new operation with version N+1 Server->>-DB: COMMIT TRANSACTION Server-->>UserA: NEW_OPERATION (Confirmation) Server-->>UserB: NEW_OPERATION (Broadcast) UserA->>UserA: Apply operation to local MobX store UserB->>UserB: Apply operation to local MobX store
This architecture, while simple, is robust. The reliance on a transactional, append-only log in SQL provides a solid foundation for consistency and auditability. The primary limitation of this specific implementation is its handling of transformation logic. It purely serializes operations. If two users simultaneously submit an “insert row at index 5” operation, the server will simply order them, resulting in two new rows at indices 5 and 6. A true OT system would need to transform the second operation to become “insert row at index 6” based on the first. Furthermore, scaling the WebSocket server beyond a single instance would require a publish/subscribe mechanism like Redis to coordinate the broadcasting of messages across all server nodes, as the in-memory gridClients
map would no longer be sufficient. The initial data load, which fetches the entire operation history, also presents a performance bottleneck for grids with very long edit histories, suggesting a need for periodic state snapshotting in future iterations.