The requirement was seemingly simple: ensure an atomic update across two distinct microservices—a Node.js service managing user account balances and a new Rust-based service handling a high-performance, append-only transaction ledger. A user action initiated via a REST API call on the Node service needed to debit a balance and simultaneously write a corresponding, immutable record to the Rust ledger. Failure in either service must result in a complete rollback. In a real-world project, this is where data consistency nightmares begin.
Initial discussions floated event-driven approaches like the Saga pattern. While powerful, Saga introduces significant complexity for what needed to be a synchronous, user-facing operation. The business logic demanded a clear, immediate success-or-fail response. Compensating transactions would add asynchronous reconciliation logic that felt like overkill for this tightly-coupled operation. After weighing the trade-offs, we made a controversial choice: implement a lightweight, synchronous Two-Phase Commit (2PC) protocol. The primary pitfall of 2PC—its blocking nature—was deemed an acceptable risk given the low-latency internal network and the short-lived nature of the transaction. The coordinator, however, would be a potential single point of failure, a risk we accepted for this specific critical path, with plans for future hardening.
Our stack was polyglot: Express.js for the user-facing API and WebSocket notifications, and Tonic for the high-performance gRPC service in Rust. The glue binding them together for the transaction coordination would be Redis. Its atomic operations and pub/sub capabilities provided the perfect primitives to build our transaction manager without pulling in heavier dependencies like ZooKeeper or etcd. This is the log of that implementation.
Defining the 2PC State Machine in Redis
Before writing a single line of service code, the protocol had to be solidified. We defined the entire lifecycle of a distributed transaction using a set of Redis keys and state transitions. A transaction ID (txnId
) would be the primary identifier.
The core states for any transaction txn:{txnId}
would be:
-
INIT
: The transaction has been created but the prepare phase has not started. -
PREPARING
: The coordinator has issued thePREPARE
command to all participants. -
PREPARED
: All participants have successfully prepared and voted to commit. -
ABORTING
: At least one participant voted to abort, or a timeout occurred. The coordinator is issuing theROLLBACK
command. -
COMMITTING
: The coordinator is issuing theCOMMIT
command. -
COMMITTED
: All participants have confirmed the commit. -
ABORTED
: All participants have confirmed the rollback.
We’d manage this using several Redis data structures:
-
txn:{txnId}:state
(String): Stores the current state (e.g.,PREPARING
). -
txn:{txnId}:participants
(Set): Contains the names of the participating services (e.g.,account-service
,ledger-service
). -
txn:{txnId}:votes
(Set): Stores the votes from participants (VOTE_COMMIT
orVOTE_ABORT
). -
txn:{txnId}:data
(Hash): Holds the transaction payload.
Here is the state transition diagram we worked from:
stateDiagram-v2 [*] --> INIT INIT --> PREPARING: start_transaction PREPARING --> PREPARED: all_vote_commit PREPARING --> ABORTING: any_vote_abort / timeout PREPARED --> COMMITTING: coordinator_decision COMMITTING --> COMMITTED: all_confirm_commit ABORTING --> ABORTED: all_confirm_abort COMMITTED --> [*] ABORTED --> [*]
The Coordinator and API Service: Express.js
The account-service
written in Node.js served as both the API entry point and the 2PC coordinator. It exposes an HTTP endpoint to trigger the transaction and a WebSocket endpoint to notify the client of the final outcome.
Project Setup (account-service/
)
# Directory structure
# /account-service
# |- src/
# | |- coordinator.js
# | |- participant.js
# | |- server.js
# | |- websocket.js
# |- test/
# |- package.json
We’ll need a few dependencies: express
, redis
, ws
, and uuid
.
// package.json
{
"name": "account-service",
"version": "1.0.0",
"main": "src/server.js",
"type": "module",
"scripts": {
"start": "node src/server.js"
},
"dependencies": {
"express": "^4.18.2",
"redis": "^4.6.10",
"uuid": "^9.0.1",
"ws": "^8.14.2"
}
}
WebSocket Server (websocket.js
)
This is a straightforward ws
server setup to manage client connections and broadcast transaction status updates.
// src/websocket.js
import { WebSocketServer } from 'ws';
import { v4 as uuidv4 } from 'uuid';
import { createLogger } from './logger.js'; // A simple logger utility
const logger = createLogger('WebSocket');
const clients = new Map();
export function setupWebSocketServer(server) {
const wss = new WebSocketServer({ server });
wss.on('connection', (ws) => {
const clientId = uuidv4();
clients.set(clientId, ws);
logger.info(`Client connected: ${clientId}`);
ws.on('message', (message) => {
logger.info(`Received message from ${clientId}: ${message}`);
});
ws.on('close', () => {
clients.delete(clientId);
logger.info(`Client disconnected: ${clientId}`);
});
ws.on('error', (err) => {
logger.error(`Client error for ${clientId}:`, err);
clients.delete(clientId);
});
});
logger.info('WebSocket server initialized');
}
export function broadcast(message) {
if (clients.size === 0) {
logger.warn('No WebSocket clients connected to broadcast to.');
return;
}
logger.info(`Broadcasting message to ${clients.size} clients: ${JSON.stringify(message)}`);
for (const client of clients.values()) {
if (client.readyState === 1) { // WebSocket.OPEN
client.send(JSON.stringify(message));
}
}
}
The 2PC Coordinator Logic (coordinator.js
)
This is the heart of the system. The TransactionCoordinator
class manages the entire 2PC flow. It uses Redis multi/exec for atomic state updates and pub/sub for communication. A critical detail is the use of timeouts at every stage to prevent the system from getting stuck indefinitely.
// src/coordinator.js
import { createClient } from 'redis';
import { v4 as uuidv4 } from 'uuid';
import { broadcast } from './websocket.js';
import { createLogger } from './logger.js';
const logger = createLogger('Coordinator');
const PREPARE_TIMEOUT = 5000; // 5 seconds
const COMMIT_TIMEOUT = 5000; // 5 seconds
const PARTICIPANTS = ['account-service', 'ledger-service'];
export class TransactionCoordinator {
constructor(redisClient) {
this.redis = redisClient;
this.subscriber = redisClient.duplicate();
}
async init() {
await this.subscriber.connect();
}
async executeTransaction(data) {
const txnId = uuidv4();
logger.info(`[${txnId}] Starting transaction with data:`, data);
const wsMessage = (status, details) => ({ txnId, status, details });
try {
await this.initializeTransactionState(txnId, data);
broadcast(wsMessage('INITIATED', 'Transaction coordinator started.'));
const vote = await this.requestPrepare(txnId);
if (vote === 'COMMIT') {
await this.requestGlobalCommit(txnId);
logger.info(`[${txnId}] Transaction COMMITTED successfully.`);
broadcast(wsMessage('COMMITTED', 'All services confirmed commit.'));
return { status: 'COMMITTED', txnId };
} else {
await this.requestGlobalAbort(txnId, 'Received VOTE_ABORT or timeout');
logger.warn(`[${txnId}] Transaction ABORTED.`);
broadcast(wsMessage('ABORTED', 'Transaction was rolled back.'));
return { status: 'ABORTED', txnId };
}
} catch (error) {
logger.error(`[${txnId}] Unhandled error during transaction, attempting global abort:`, error);
// This is a last-resort cleanup.
await this.requestGlobalAbort(txnId, `Coordinator error: ${error.message}`).catch(err => {
logger.error(`[${txnId}] Failed to execute global abort during error handling:`, err);
});
broadcast(wsMessage('ABORTED', `Critical error: ${error.message}`));
return { status: 'ABORTED', txnId, error: error.message };
} finally {
// In a real system, you might not clean up immediately for post-mortem analysis.
// await this.cleanupTransaction(txnId);
}
}
async initializeTransactionState(txnId, data) {
const key = `txn:${txnId}`;
const multi = this.redis.multi();
multi.hSet(`${key}:data`, data);
multi.sAdd(`${key}:participants`, PARTICIPANTS);
multi.set(`${key}:state`, 'INIT');
multi.expire(`${key}:data`, 60); // TTL for cleanup
multi.expire(`${key}:participants`, 60);
multi.expire(`${key}:state`, 60);
await multi.exec();
}
async requestPrepare(txnId) {
return new Promise(async (resolve) => {
const timeout = setTimeout(() => {
logger.warn(`[${txnId}] Prepare phase timed out.`);
resolve('ABORT');
}, PREPARE_TIMEOUT);
const voteListener = (message, channel) => {
if (channel !== `txn:${txnId}:vote_cast`) return;
this.redis.sMembers(`txn:${txnId}:votes`).then(votes => {
if (votes.includes('VOTE_ABORT')) {
clearTimeout(timeout);
this.subscriber.unsubscribe(`txn:${txnId}:vote_cast`, voteListener);
resolve('ABORT');
} else if (votes.length === PARTICIPANTS.length) {
clearTimeout(timeout);
this.subscriber.unsubscribe(`txn:${txnId}:vote_cast`, voteListener);
resolve('COMMIT');
}
});
};
await this.subscriber.subscribe(`txn:${txnId}:vote_cast`, voteListener);
// Atomically update state and publish prepare command
const multi = this.redis.multi();
multi.set(`txn:${txnId}:state`, 'PREPARING');
multi.publish('txn:prepare', JSON.stringify({ txnId }));
await multi.exec();
logger.info(`[${txnId}] Published PREPARE command.`);
broadcast({ txnId, status: 'PREPARING', details: 'Waiting for participant votes...' });
});
}
async requestGlobalCommit(txnId) {
logger.info(`[${txnId}] All participants voted to commit. Proceeding with global commit.`);
await this.redis.set(`txn:${txnId}:state`, 'COMMITTING');
await this.redis.publish('txn:decision', JSON.stringify({ txnId, decision: 'COMMIT' }));
broadcast({ txnId, status: 'COMMITTING', details: 'Broadcasting commit decision...' });
// In a production system, you would wait for acknowledgements here with a timeout.
}
async requestGlobalAbort(txnId, reason) {
logger.warn(`[${txnId}] Initiating global abort. Reason: ${reason}`);
await this.redis.set(`txn:${txnId}:state`, 'ABORTING');
await this.redis.publish('txn:decision', JSON.stringify({ txnId, decision: 'ABORT' }));
broadcast({ txnId, status: 'ABORTING', details: `Rolling back. Reason: ${reason}` });
}
}
The Participant Logic in Node.js (participant.js
)
The account-service
itself is also a participant. It needs to listen for the PREPARE
command and perform its local work. A key aspect of any 2PC participant is writing to a Write-Ahead Log (WAL) before voting COMMIT
. This ensures that if the service crashes after voting but before receiving the final COMMIT
command, it can recover its state upon restart. Here, we’ll simulate the WAL by writing to another Redis key.
// src/participant.js
import { createLogger } from './logger.js';
const logger = createLogger('Participant-Node');
const SERVICE_NAME = 'account-service';
export class TransactionParticipant {
constructor(redisClient) {
this.redis = redisClient;
this.subscriber = redisClient.duplicate();
}
async init() {
await this.subscriber.connect();
await this.subscriber.subscribe('txn:prepare', (message) => this.handlePrepare(message));
await this.subscriber.subscribe('txn:decision', (message) => this.handleDecision(message));
logger.info('Participant subscribed to transaction channels.');
}
async handlePrepare(message) {
const { txnId } = JSON.parse(message);
logger.info(`[${txnId}] Received PREPARE command.`);
try {
// 1. Perform local work (e.g., check balance, reserve funds).
// This is a critical step. A common mistake is to perform the final action here.
// You should only check for possibility and reserve resources.
const canCommit = await this.performLocalPreparation(txnId);
if (canCommit) {
// 2. Write to a Write-Ahead Log (WAL).
await this.writeToWAL(txnId, 'prepared');
// 3. Vote commit.
await this.castVote(txnId, 'VOTE_COMMIT');
} else {
throw new Error('Insufficient funds or invalid operation.');
}
} catch (error) {
logger.error(`[${txnId}] Failed to prepare. Voting ABORT.`, error);
await this.castVote(txnId, 'VOTE_ABORT');
}
}
async handleDecision(message) {
const { txnId, decision } = JSON.parse(message);
logger.info(`[${txnId}] Received decision: ${decision}`);
// A real implementation needs to check if this service participated in this txn.
const walState = await this.redis.get(`wal:${SERVICE_NAME}:${txnId}`);
if (!walState || walState !== 'prepared') {
// This service didn't vote commit, or already cleaned up. Ignore.
return;
}
if (decision === 'COMMIT') {
// Apply the actual changes from the WAL
logger.info(`[${txnId}] Applying changes from WAL and committing.`);
// e.g., `await applyDebitFromTxnData(txnId)`
await this.redis.set(`wal:${SERVICE_NAME}:${txnId}`, 'committed');
} else {
// Rollback changes
logger.warn(`[${txnId}] Rolling back changes based on ABORT decision.`);
// e.g., `await releaseReservedFunds(txnId)`
await this.redis.set(`wal:${SERVICE_NAME}:${txnId}`, 'aborted');
}
}
async performLocalPreparation(txnId) {
// Simulate checking balance and reserving it.
const { amount } = await this.redis.hGetAll(`txn:${txnId}:data`);
logger.info(`[${txnId}] Checking if amount ${amount} can be debited.`);
// In a real system, this would be an atomic check-and-hold operation.
return parseFloat(amount) > 0 && parseFloat(amount) < 1000; // Business rule
}
async writeToWAL(txnId, state) {
// WAL ensures we can recover if we crash after voting.
logger.info(`[${txnId}] Writing to WAL with state: ${state}`);
await this.redis.set(`wal:${SERVICE_NAME}:${txnId}`, state, { EX: 120 });
}
async castVote(txnId, vote) {
const multi = this.redis.multi();
multi.sAdd(`txn:${txnId}:votes`, vote);
multi.publish(`txn:${txnId}:vote_cast`, vote);
await multi.exec();
logger.info(`[${txnId}] Cast vote: ${vote}`);
}
}
The Rust Participant: Tonic gRPC Service
Now for the ledger-service
. This service is built in Rust for performance and safety. It will act as the second participant in our distributed transaction, listening to the same Redis channels.
Project Setup (ledger-service/
)
# Directory structure
# /ledger-service
# |- src/
# | |- main.rs
# | |- participant.rs
# | |- service.rs
# |- build.rs
# |- proto/
# | |- ledger.proto
# |- Cargo.toml
Cargo.toml
dependencies:
[package]
name = "ledger-service"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
tonic = "0.10"
prost = "0.12"
redis = { version = "0.23", features = ["tokio-comp"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
[build-dependencies]
tonic-build = "0.10"
The participant.rs
module in the Rust service mirrors the logic of its Node.js counterpart, demonstrating the polyglot nature of the protocol.
The Rust Participant Logic (participant.rs
)
// src/participant.rs
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{error, info};
const SERVICE_NAME: &str = "ledger-service";
#[derive(Serialize, Deserialize)]
struct TxnMessage {
#[serde(rename = "txnId")]
txn_id: String,
}
#[derive(Serialize, Deserialize)]
struct DecisionMessage {
#[serde(rename = "txnId")]
txn_id: String,
decision: String,
}
// Cloneable handle to the Redis connection
#[derive(Clone)]
pub struct TransactionParticipant {
redis_client: Arc<Mutex<redis::aio::MultiplexedConnection>>,
}
impl TransactionParticipant {
pub fn new(conn: redis::aio::MultiplexedConnection) -> Self {
Self {
redis_client: Arc::new(Mutex::new(conn)),
}
}
pub async fn listen_for_events(&self) {
info!("Participant starting to listen for transaction events...");
let mut pubsub_conn = self
.redis_client
.lock()
.await
.clone()
.into_pubsub();
pubsub_conn.subscribe("txn:prepare").await.unwrap();
pubsub_conn.subscribe("txn:decision").await.unwrap();
let participant_clone = self.clone();
tokio::spawn(async move {
use futures_util::stream::StreamExt;
while let Some(msg) = pubsub_conn.on_message().next().await {
let channel_name = msg.get_channel_name();
let payload: String = msg.get_payload().unwrap_or_default();
let p_clone = participant_clone.clone();
tokio::spawn(async move {
match channel_name {
"txn:prepare" => p_clone.handle_prepare(&payload).await,
"txn:decision" => p_clone.handle_decision(&payload).await,
_ => (),
}
});
}
});
}
async fn handle_prepare(&self, message: &str) {
let Ok(msg) = serde_json::from_str::<TxnMessage>(message) else { return };
let txn_id = &msg.txn_id;
info!("[{}] Received PREPARE command", txn_id);
let can_commit = self.perform_local_preparation(txn_id).await;
if can_commit {
self.write_to_wal(txn_id, "prepared").await;
self.cast_vote(txn_id, "VOTE_COMMIT").await;
} else {
error!("[{}] Failed local preparation, voting ABORT.", txn_id);
self.cast_vote(txn_id, "VOTE_ABORT").await;
}
}
async fn handle_decision(&self, message: &str) {
let Ok(msg) = serde_json::from_str::<DecisionMessage>(message) else { return };
let txn_id = &msg.txn_id;
info!("[{}] Received decision: {}", txn_id, msg.decision);
let wal_key = format!("wal:{}:{}", SERVICE_NAME, txn_id);
let mut conn = self.redis_client.lock().await;
let wal_state: Option<String> = conn.get(&wal_key).await.unwrap_or(None);
if wal_state.as_deref() != Some("prepared") {
return; // Not in a state to act on a decision
}
if msg.decision == "COMMIT" {
info!("[{}] Applying changes from WAL and committing.", txn_id);
// In a real app, append to ledger file or database table.
let _: () = conn.set(wal_key, "committed").await.unwrap();
} else {
info!("[{}] Rolling back based on ABORT decision.", txn_id);
let _: () = conn.set(wal_key, "aborted").await.unwrap();
}
}
async fn perform_local_preparation(&self, txn_id: &str) -> bool {
// Simulate checking ledger constraints.
info!("[{}] Verifying ledger constraints.", txn_id);
// This could involve I/O, so it's async.
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
true // Always succeed for this demo.
}
async fn write_to_wal(&self, txn_id: &str, state: &str) {
info!("[{}] Writing to WAL with state: {}", txn_id, state);
let key = format!("wal:{}:{}", SERVICE_NAME, txn_id);
let mut conn = self.redis_client.lock().await;
let _: () = conn.set_ex(key, state, 120).await.unwrap();
}
async fn cast_vote(&self, txn_id: &str, vote: &str) {
info!("[{}] Casting vote: {}", txn_id, vote);
let votes_key = format!("txn:{}:votes", txn_id);
let channel = format!("txn:{}:vote_cast", txn_id);
let mut conn = self.redis_client.lock().await;
let mut pipe = redis::pipe();
pipe.atomic()
.sadd(&votes_key, vote)
.publish(&channel, vote);
let _: redis::Value = pipe.query_async(&mut *conn).await.unwrap();
}
}
The Tonic Service Boilerplate (main.rs
)
// src/main.rs
mod participant;
use participant::TransactionParticipant;
use tonic::{transport::Server, Request, Response, Status};
// Assuming a simple proto definition for a health check
pub mod ledger {
tonic::include_proto!("ledger");
}
use ledger::ledger_server::{Ledger, LedgerServer};
use ledger::{HealthCheckRequest, HealthCheckResponse};
#[derive(Default)]
pub struct MyLedger {}
#[tonic::async_trait]
impl Ledger for MyLedger {
async fn health_check(
&self,
_request: Request<HealthCheckRequest>,
) -> Result<Response<HealthCheckResponse>, Status> {
Ok(Response::new(HealthCheckResponse { status: "OK".into() }))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let redis_client = redis::Client::open("redis://redis:6379/")?;
let conn = redis_client.get_multiplexed_async_connection().await?;
let participant = TransactionParticipant::new(conn);
// Spawn the listener in the background.
participant.listen_for_events().await;
let addr = "[::0]:50051".parse()?;
let ledger_service = MyLedger::default();
tracing::info!("LedgerService listening on {}", addr);
Server::builder()
.add_service(LedgerServer::new(ledger_service))
.serve(addr)
.await?;
Ok(())
}
Running the Polyglot System
To tie this all together, we use docker-compose
.
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --loglevel verbose
account-service:
build: ./account-service
ports:
- "3000:3000"
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
ledger-service:
build: ./ledger-service
ports:
- "50051:50051"
depends_on:
- redis
With this setup, starting the services via docker-compose up --build
brings the entire distributed system online. A client can connect to the WebSocket server at ws://localhost:3000
and then trigger a transaction by sending a POST request to http://localhost:3000/transfer
. The client will receive real-time updates on the WebSocket connection, from INITIATED
through PREPARING
to the final COMMITTED
or ABORTED
state. This provides the synchronous feedback the business logic required while leveraging a robust, albeit complex, distributed consensus protocol.
The implementation is not without its sharp edges. The coordinator in the account-service
is a single point of failure. If it crashes after participants have voted but before it can broadcast a decision, the transaction is stuck until manual intervention or a recovery process kicks in. The timeouts mitigate indefinite blocking, but they don’t solve the core problem of a lost coordinator decision. Furthermore, the recovery logic (re-reading the WAL on service startup) has been omitted for brevity but is non-negotiable for a production system. This architecture was a calculated trade-off, prioritizing strong consistency for a critical, synchronous operation over the operational complexity of eventual consistency patterns. Future iterations would likely involve making the coordinator itself highly available, perhaps using a Raft-based cluster, or replacing this manual 2PC implementation with a dedicated transaction manager.