The initial pain point was deceptive. A request for “real-time collaboration” often defaults to a simple WebSocket broadcaster. A client sends a message, the server forwards it to all other connected clients. This pattern works for chat rooms but fails catastrophically for stateful applications like a collaborative text editor. Network latency guarantees that messages arrive out of order, and without a conflict resolution strategy, the document state on each client diverges into a corrupted mess. The classic solution, Operational Transformation (OT), is notoriously complex to implement correctly, with a labyrinth of transform functions that must account for every possible concurrent operation.
This led our investigation toward Conflict-free Replicated Data Types (CRDTs). The core premise of a CRDT is that operations are structured to be commutative and idempotent. The order in which they are applied no longer matters, which elegantly sidesteps the entire problem of out-of-order message delivery. The technical challenge then shifts from complex transformation logic to designing a robust, persistent, and scalable backend capable of managing these CRDT-based sessions.
Our stack selection was driven by production pragmatism. We needed a lightweight, high-performance JVM framework for the WebSocket server, a database that could handle a high-throughput, append-only workload without falling over, and a clear path to horizontal scalability.
- Backend Framework: Micronaut was chosen for its Ahead-of-Time (AOT) compilation, resulting in low memory footprint and near-instant startup times, which is critical for a microservice architecture. Its first-class support for WebSockets provides a clean, annotation-driven model for handling the connection lifecycle.
- Database: Persisting every single keystroke or operation is an intense write workload. A traditional monolithic RDBMS would quickly become an I/O bottleneck. This is where TiDB entered the picture. As a distributed, NewSQL database with MySQL compatibility, it offers horizontal scalability for writes and a Hybrid Transactional/Analytical Processing (HTAP) capability that allows us to run analytics on collaboration patterns later without a separate ETL pipeline.
- Frontend: While this exploration focuses on the backend, the frontend would leverage a Headless UI library. This approach separates logic from presentation, allowing the frontend team to build accessible, custom UI components while the state management is dictated purely by the CRDT library’s data structures.
The initial architecture was simple: a stateful Micronaut service holding CRDT document states in memory. However, a real-world project must account for server crashes, restarts, and scaling. This meant the in-memory state had to be backed by a durable persistence layer in TiDB, leading to a more nuanced implementation involving state snapshots and an append-only operation log.
Phase 1: Establishing the WebSocket Backbone with Micronaut
The entry point is a Micronaut WebSocket server. The goal is to manage connections on a per-document basis. A user connecting to edit document-123
should only receive updates for that specific document.
We define a URI template /ws/documents/{documentId}
to handle this routing. Micronaut’s WebSocket support makes it trivial to bind path variables directly to method parameters.
// src/main/java/com/example/collaboration/DocumentSocketServer.java
package com.example.collaboration;
import io.micronaut.http.annotation.PathVariable;
import io.micronaut.websocket.WebSocketBroadcaster;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.OnClose;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import io.micronaut.websocket.annotation.ServerWebSocket;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Predicate;
@Singleton
@ServerWebSocket("/ws/documents/{documentId}")
public class DocumentSocketServer {
private static final Logger LOG = LoggerFactory.getLogger(DocumentSocketServer.class);
private final WebSocketBroadcaster broadcaster;
private final DocumentSessionManager sessionManager;
// Micronaut's dependency injection provides the necessary components.
public DocumentSocketServer(WebSocketBroadcaster broadcaster, DocumentSessionManager sessionManager) {
this.broadcaster = broadcaster;
this.sessionManager = sessionManager;
}
@OnOpen
public void onOpen(String documentId, WebSocketSession session) {
LOG.info("Session {} opened for document {}", session.getId(), documentId);
// On connection, the client is added to the document's session.
// The session manager handles loading or creating the document state.
sessionManager.join(documentId, session);
}
@OnMessage
public void onMessage(String documentId, String message, WebSocketSession session) {
LOG.debug("Message received from session {} for document {}: {}", session.getId(), documentId, message);
// Delegate the message processing to the session manager.
// It will apply the CRDT operation and get the new state or delta to broadcast.
String response = sessionManager.handleMessage(documentId, session, message);
if (response != null) {
// The broadcaster sends the message only to sessions for the same document.
broadcaster.broadcastSync(response, forDocument(documentId));
}
}
@OnClose
public void onClose(String documentId, WebSocketSession session) {
LOG.info("Session {} closed for document {}", session.getId(), documentId);
// Clean up when a client disconnects.
sessionManager.leave(documentId, session);
}
/**
* A predicate to filter WebSocket sessions. This is crucial for ensuring
* that broadcasts are scoped to a single document and not sent to all
* connected clients across the entire server.
*
* @param documentId The ID of the document to target.
* @return A predicate that returns true for sessions associated with the given documentId.
*/
private Predicate<WebSocketSession> forDocument(String documentId) {
return s -> documentId.equals(s.getUriVariables().get("documentId", String.class, null));
}
}
This controller is stateless. The real work is delegated to DocumentSessionManager
, which is responsible for the lifecycle of each collaborative document. This separation of concerns is critical for maintainability.
Phase 2: Managing State with CRDTs in a Session
The DocumentSessionManager
is the stateful heart of the application. It maintains a map of active document sessions. When a user joins a document for the first time, it loads the document’s state from TiDB. If subsequent users join, they are added to the existing in-memory session.
For this example, we’ll simulate a simple sequence CRDT for text. A real implementation would use a robust library providing structures like LSEQ, Logoot, or Y.js-compatible types. Our model will consist of CrdtOperation
objects.
First, let’s define the communication protocol. We’ll use a simple JSON structure.
// Example INSERT operation from a client
{
"type": "INSERT",
"sessionId": "client-session-id-abc",
"payload": {
"char": "H",
"position": "unique_position_id_1"
}
}
// Example DELETE operation
{
"type": "DELETE",
"sessionId": "client-session-id-xyz",
"payload": {
"position": "unique_position_id_2"
}
}
The DocumentSessionManager
orchestrates loading, updating, and persisting this state.
// src/main/java/com/example/collaboration/DocumentSessionManager.java
package com.example.collaboration;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.websocket.WebSocketSession;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
@Singleton
public class DocumentSessionManager {
private static final Logger LOG = LoggerFactory.getLogger(DocumentSessionManager.class);
// In-memory cache of active document sessions. A production system might use a
// distributed cache or a more sophisticated eviction policy.
private final ConcurrentHashMap<String, DocumentSession> activeSessions = new ConcurrentHashMap<>();
private final DocumentRepository documentRepository;
private final ObjectMapper objectMapper;
public DocumentSessionManager(DocumentRepository documentRepository, ObjectMapper objectMapper) {
this.documentRepository = documentRepository;
this.objectMapper = objectMapper;
}
public void join(String documentId, WebSocketSession session) {
// ComputeIfAbsent ensures atomic creation of the session.
DocumentSession docSession = activeSessions.computeIfAbsent(documentId, id -> {
LOG.info("No active session for document {}. Creating and loading from persistence.", id);
// This is a critical step: loading state from TiDB on first access.
return new DocumentSession(id, documentRepository);
});
docSession.addParticipant(session);
// When a new user joins, send them the full current state of the document.
session.sendSync(docSession.getFullStateAsJson());
}
public void leave(String documentId, WebSocketSession session) {
DocumentSession docSession = activeSessions.get(documentId);
if (docSession != null) {
docSession.removeParticipant(session);
// A common mistake is to keep empty sessions in memory forever.
// If no one is editing the document, we can evict it from the cache.
if (docSession.getParticipantCount() == 0) {
LOG.info("Last participant left document {}. Evicting session from memory.", documentId);
activeSessions.remove(documentId);
// The session should handle its own final persistence logic before being evicted.
docSession.flushAndClose();
}
}
}
public String handleMessage(String documentId, WebSocketSession session, String message) {
DocumentSession docSession = activeSessions.get(documentId);
if (docSession == null) {
LOG.warn("Received message for document {} but no active session found. Ignoring.", documentId);
return null;
}
try {
// Deserialize the incoming message into a structured operation.
CrdtOperation operation = objectMapper.readValue(message, CrdtOperation.class);
// The session applies the operation to its internal CRDT state.
docSession.applyOperation(operation);
// Return the serialized operation to be broadcast to other clients.
// The client's own session ID is included so the originating client can ignore it.
operation.setSessionId(session.getId());
return objectMapper.writeValueAsString(operation);
} catch (IOException e) {
LOG.error("Failed to deserialize or process message for document {}", documentId, e);
// In a real project, we would send an error message back to the client.
return null;
}
}
}
The DocumentSession
class itself contains the CRDT logic and interacts with the persistence layer.
Phase 3: Durable Persistence with TiDB
Here lies the most critical architectural decision. Relying solely on in-memory state is fragile. A service restart wipes all active collaboration. We need a durable store. TiDB is a good fit because it can absorb the high-volume, small-write workload of an operations log.
Our persistence strategy is twofold:
- Operations Log: An append-only table (
document_operations
) that stores every single CRDT operation. This provides a full audit trail and is the source of truth for reconstructing state. - Snapshots: A second table (
document_snapshots
) that periodically stores the full materialized state of a document. This is an optimization to speed up recovery. Instead of replaying millions of operations from the beginning of time, we can load the latest snapshot and replay only the operations that occurred since.
Here is the TiDB schema (MySQL-compatible DDL):
CREATE TABLE document_snapshots (
document_id VARCHAR(255) NOT NULL,
version BIGINT NOT NULL,
full_state JSON NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (document_id, version)
);
CREATE TABLE document_operations (
id BIGINT AUTO_RANDOM NOT NULL,
document_id VARCHAR(255) NOT NULL,
version BIGINT NOT NULL,
operation_payload JSON NOT NULL,
session_id VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
INDEX idx_doc_version (document_id, version)
);
Note the use of AUTO_RANDOM
for the primary key in document_operations
. This is a TiDB-specific feature that helps avoid write hotspots on a single region when dealing with high-throughput inserts, a common problem with auto-incrementing keys in distributed databases.
The DocumentSession
class will now manage the interaction with TiDB. A key pitfall to avoid is writing to the database on every single keystroke. This would create enormous pressure on both the application and the database. The solution is to batch writes.
// src/main/java/com/example/collaboration/DocumentSession.java
package com.example.collaboration;
// ... imports
public class DocumentSession {
// ... (Logger, documentId, participants list, etc.)
// The core CRDT data structure. This would be a complex object in a real system.
private final ConcurrentSkipListMap<String, Character> crdtState;
private final DocumentRepository documentRepository;
private final String documentId;
private long currentVersion;
// Buffer for batching operations before writing to TiDB.
private final List<CrdtOperation> operationBuffer = Collections.synchronizedList(new ArrayList<>());
private static final int BATCH_SIZE_THRESHOLD = 100; // Persist after 100 operations
private static final int SNAPSHOT_INTERVAL = 1000; // Create a snapshot every 1000 versions
public DocumentSession(String documentId, DocumentRepository documentRepository) {
this.documentId = documentId;
this.documentRepository = documentRepository;
this.crdtState = new ConcurrentSkipListMap<>();
loadFromPersistence();
}
private void loadFromPersistence() {
// 1. Try to load the latest snapshot.
Optional<DocumentSnapshot> snapshot = documentRepository.findLatestSnapshot(documentId);
if (snapshot.isPresent()) {
DocumentSnapshot snap = snapshot.get();
this.currentVersion = snap.getVersion();
// In a real system, deserialize `snap.getFullState()` into the CRDT structure.
LOG.info("Loaded snapshot for document {} at version {}", documentId, this.currentVersion);
} else {
this.currentVersion = 0;
LOG.info("No snapshot found for document {}. Starting from version 0.", documentId);
}
// 2. Replay all operations that occurred after the snapshot was taken.
List<DocumentOperation> operations = documentRepository.findOperationsAfterVersion(documentId, this.currentVersion);
LOG.info("Replaying {} operations for document {} since version {}", operations.size(), documentId, this.currentVersion);
operations.forEach(op -> {
// Apply operation to in-memory state without adding to buffer.
applyOperationInternal(op.getOperationPayload());
this.currentVersion = op.getVersion();
});
}
public synchronized void applyOperation(CrdtOperation operation) {
// Apply to the in-memory state immediately for real-time broadcast.
applyOperationInternal(operation.getPayload());
this.currentVersion++;
operation.setVersion(this.currentVersion);
operationBuffer.add(operation);
// Check if the buffer is full enough to warrant a database write.
if (operationBuffer.size() >= BATCH_SIZE_THRESHOLD) {
flushBufferToDb();
}
// Check if it's time to create a new snapshot.
if (this.currentVersion % SNAPSHOT_INTERVAL == 0) {
createSnapshot();
}
}
private void flushBufferToDb() {
if (operationBuffer.isEmpty()) {
return;
}
List<CrdtOperation> batch = new ArrayList<>(operationBuffer);
operationBuffer.clear();
LOG.debug("Flushing {} operations to TiDB for document {}", batch.size(), documentId);
documentRepository.saveOperations(documentId, batch);
}
public void flushAndClose() {
// Ensure any remaining operations are persisted when the session is closed.
flushBufferToDb();
}
// ... methods for add/remove participants, getFullState, etc.
}
This batching logic is a crucial performance optimization. It balances real-time responsiveness (in-memory updates) with durable persistence (batched database writes).
Phase 4: Micronaut Data Repository for TiDB
Micronaut Data simplifies database access significantly. We can define a repository interface, and Micronaut will provide the implementation at compile time.
// src/main/java/com/example/collaboration/DocumentRepository.java
package com.example.collaboration;
import io.micronaut.data.jdbc.annotation.JdbcRepository;
import io.micronaut.data.model.query.builder.sql.Dialect;
import io.micronaut.data.repository.CrudRepository;
import io.micronaut.data.annotation.Query;
import java.util.List;
import java.util.Optional;
// We specify MySQL dialect as TiDB is MySQL-compatible.
@JdbcRepository(dialect = Dialect.MYSQL)
public interface DocumentRepository extends CrudRepository<DocumentOperation, Long> {
@Query("SELECT * FROM document_snapshots WHERE document_id = :documentId ORDER BY version DESC LIMIT 1")
Optional<DocumentSnapshot> findLatestSnapshot(String documentId);
@Query("SELECT * FROM document_operations WHERE document_id = :documentId AND version > :version ORDER BY version ASC")
List<DocumentOperation> findOperationsAfterVersion(String documentId, long version);
// Micronaut Data doesn't have a direct "saveAll" for JDBC with custom logic,
// so we'd typically inject a `JdbcOperations` or `DataSource` to implement batch inserts.
// The implementation of `saveOperations` would use batch JDBC statements for performance.
void saveOperations(String documentId, List<CrdtOperation> operations);
void saveSnapshot(DocumentSnapshot snapshot);
}
The configuration in application.yml
ties everything together, pointing Micronaut to our TiDB cluster.
# src/main/resources/application.yml
micronaut:
application:
name: collaboration-service
datasources:
default:
url: jdbc:mysql://your-tidb-host:4000/collaborate?useSSL=false
username: root
password: ""
driverClassName: com.mysql.cj.jdbc.Driver
# Connection pool settings are critical for production.
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
The Full Architectural Flow
A diagram helps visualize the complete system.
sequenceDiagram participant ClientA participant ClientB participant MicronautService participant TiDBCluster ClientA->>+MicronautService: WebSocket Connect (/ws/documents/doc123) MicronautService->>+TiDBCluster: SELECT latest snapshot for doc123 TiDBCluster-->>-MicronautService: Snapshot @ v5000 MicronautService->>+TiDBCluster: SELECT operations where version > 5000 TiDBCluster-->>-MicronautService: Operations 5001-5025 Note over MicronautService: Reconstructs CRDT state in-memory MicronautService-->>-ClientA: Send full document state ClientB->>+MicronautService: WebSocket Connect (/ws/documents/doc123) Note over MicronautService: Session for doc123 already in memory MicronautService-->>-ClientB: Send full document state ClientA->>MicronautService: Send INSERT operation MicronautService->>MicronautService: Apply to in-memory CRDT MicronautService-->>ClientB: Broadcast INSERT operation Note over MicronautService: Add operation to buffer (size now 1) loop 99 times ClientA->>MicronautService: Send more operations MicronautService->>MicronautService: Apply & add to buffer MicronautService-->>ClientB: Broadcast operations end Note over MicronautService: Buffer reaches threshold (100) MicronautService->>+TiDBCluster: Batch INSERT 100 operations into document_operations TiDBCluster-->>-MicronautService: Batch write successful
This architecture provides real-time collaboration backed by a scalable and durable persistence layer. The choice of CRDTs simplifies the backend logic, while Micronaut offers a high-performance runtime. TiDB underpins the entire system with a database that won’t buckle under the intense, append-only write load characteristic of such applications.
The current implementation, while robust, still has limitations in a massive-scale environment. The state for all active documents is held in the memory of a single service instance. This creates a ceiling on scalability. A future iteration would need to address routing WebSocket connections for a specific document to the correct service instance that holds its state, perhaps using a distributed cache or a service discovery mechanism with consistent hashing. Furthermore, the operations log will grow infinitely; a production system requires a strategy for archiving or summarizing historical operations to manage storage growth.