The data-consistency problem across microservices isn’t theoretical. We had a cascading failure during a peak sales event last quarter. A user’s order was confirmed, the payment service successfully processed the charge, but the inventory service failed to decrement stock due to a network blip. The result was a confirmed, paid order for an out-of-stock item. The customer support fallout and manual data reconciliation cost us dearly. Two-phase commit is a non-starter in our distributed environment due to its synchronous, blocking nature. This incident forced a mandate: build a reliable, lightweight, and maintainable mechanism for managing distributed transactions.
The initial concept was to implement the Saga pattern via orchestration. A central orchestrator would be responsible for issuing commands to participant services and reacting to their outcomes. If any step fails, the orchestrator would issue compensating commands to undo the work of previous successful steps. The key was selecting the right toolset. We needed something more robust than basic Redis Pub/Sub, which offers no persistence, but lighter than Kafka or RabbitMQ, which felt like overkill for this specific internal component.
Our technology selection process was rigorous:
- Message Bus: Redis Streams. This was the critical choice. Streams provide the persistence and consumer groups that are essential for a reliable Saga. A command sent to a service isn’t lost if the service is down. Consumer groups allow us to have multiple orchestrator instances for high availability, while ensuring each message is processed by only one instance. The ability to inspect pending messages (
XPENDING
) and re-claim them is perfect for handling failed service nodes. - Language: JavaScript (Node.js). The orchestrator is primarily an I/O-bound component, waiting for messages and dispatching new ones. Node.js’s event-driven, non-blocking model is a perfect fit. The ecosystem is mature, and our team has deep expertise.
- Build Tool: Rollup. This was a contentious choice. Why use a front-end bundler for a Node.js service? The goal was a minimal, production-ready container image. By bundling our entire orchestrator logic into a single, tree-shaken JavaScript file, we eliminate
node_modules
from the final container image. This drastically reduces the image size, attack surface, and potential for dependency issues in production. It creates a self-contained artifact. - Containerization: Podman. For development and deployment, we opted for Podman. Its daemonless, rootless architecture simplifies local development workflows and enhances security, aligning with our principle of least privilege for production workloads.
The architecture we settled on is straightforward but powerful. The client initiates a saga by sending a definition and initial payload to a saga_start
stream. The orchestrator, a dedicated containerized service, is the sole consumer of this stream. It then manages the entire lifecycle of the saga, communicating with participant services over dedicated command streams and listening for replies on a shared saga_reply
stream.
sequenceDiagram participant Client participant Orchestrator participant Redis Streams participant ServiceA participant ServiceB Client->>+Redis Streams: XADD saga_start ... (Saga Definition) Redis Streams-->>-Orchestrator: XREADGROUP ... Orchestrator->>+Redis Streams: XADD service_a_commands ... (Execute Step 1) Redis Streams-->>-ServiceA: XREADGROUP ... ServiceA-->>ServiceA: Process Logic ServiceA->>+Redis Streams: XADD saga_reply ... (Step 1 Success) Redis Streams-->>-Orchestrator: XREADGROUP ... Orchestrator->>+Redis Streams: XADD service_b_commands ... (Execute Step 2) Redis Streams-->>-ServiceB: XREADGROUP ... ServiceB-->>ServiceB: Process Logic (Failure) ServiceB->>+Redis Streams: XADD saga_reply ... (Step 2 Failure) Redis Streams-->>-Orchestrator: XREADGROUP ... Orchestrator-->>Orchestrator: Initiate Compensation Orchestrator->>+Redis Streams: XADD service_a_commands ... (Compensate Step 1) Redis Streams-->>-ServiceA: XREADGROUP ... ServiceA-->>ServiceA: Process Compensation ServiceA->>+Redis Streams: XADD saga_reply ... (Compensation Success) Redis Streams-->>-Orchestrator: XREADGROUP ... Orchestrator-->>Orchestrator: Saga Finished (Failed)
Core Orchestrator Implementation
The heart of the system is the SagaOrchestrator
. It’s a state machine driven by messages from Redis Streams. First, let’s define the connection and utility module for Redis. We’ll use the popular redis
npm package.
src/redis-client.js
:
import { createClient } from 'redis';
import { randomUUID } from 'crypto';
// A singleton pattern for the Redis client ensures we don't create multiple connections.
let client;
export async function getRedisClient() {
if (client) {
return client;
}
client = createClient({
url: process.env.REDIS_URL || 'redis://localhost:6379'
});
client.on('error', (err) => console.error('Redis Client Error', err));
await client.connect();
return client;
}
/**
* A utility to parse the flat array response from XREAD(GROUP) into a more usable object structure.
* Redis returns [streamName, [ [messageId, [field1, value1, ...]], ... ]].
* We transform this into [{ stream: streamName, id: messageId, message: { field1: value1, ... } }].
*/
export function parseXReadResponse(response) {
if (!response) return [];
const messages = [];
for (const stream of response) {
const streamName = stream.name;
for (const [id, rawMessage] of stream.messages) {
const message = {};
for (let i = 0; i < rawMessage.length; i += 2) {
message[rawMessage[i]] = rawMessage[i + 1];
}
messages.push({ stream: streamName, id, message });
}
}
return messages;
}
/**
* A robust XADD wrapper that includes retry logic.
* In a production system, transient network errors are a reality.
*/
export async function resilientXAdd(stream, message, retries = 3, delay = 100) {
const redis = await getRedisClient();
for (let i = 0; i < retries; i++) {
try {
return await redis.xAdd(stream, '*', message);
} catch (err) {
console.error(`XADD to ${stream} failed. Attempt ${i + 1}/${retries}. Error: ${err.message}`);
if (i < retries - 1) {
await new Promise(resolve => setTimeout(resolve, delay * (i + 1)));
} else {
// After all retries, re-throw the error to be handled by the caller.
throw new Error(`Failed to add message to stream ${stream} after ${retries} attempts.`);
}
}
}
}
Now for the orchestrator itself. It needs to define the Saga structure, manage state in memory, and process messages from the reply stream.
src/orchestrator.js
:
import { getRedisClient, parseXReadResponse, resilientXAdd } from './redis-client.js';
import { randomUUID } from 'crypto';
const SAGA_REPLY_STREAM = 'saga_reply';
const CONSUMER_GROUP = 'orchestrator_group';
const CONSUMER_NAME = `orchestrator_instance_${randomUUID()}`;
class SagaOrchestrator {
constructor() {
this.activeSagas = new Map(); // In-memory state of running sagas.
this.redis = null;
}
async initialize() {
this.redis = await getRedisClient();
console.log(`Orchestrator instance ${CONSUMER_NAME} starting...`);
// Ensure the consumer group exists. 'MKSTREAM' creates the stream if it doesn't exist.
try {
await this.redis.xGroupCreate(SAGA_REPLY_STREAM, CONSUMER_GROUP, '0', {
MKSTREAM: true,
});
console.log(`Consumer group '${CONSUMER_GROUP}' created or already exists for stream '${SAGA_REPLY_STREAM}'.`);
} catch (err) {
if (!err.message.includes('BUSYGROUP')) {
throw err;
}
// Group already exists, which is fine.
}
}
// The core processing loop.
async run() {
await this.initialize();
console.log('Orchestrator is waiting for replies...');
while (true) {
try {
// BLOCK 0 means wait indefinitely for new messages.
// '>' means only read messages that have not been delivered to any consumer in the group.
const response = await this.redis.xReadGroup(
CONSUMER_GROUP,
CONSUMER_NAME,
{ key: SAGA_REPLY_STREAM, id: '>' },
{ BLOCK: 0 }
);
const messages = parseXReadResponse(response);
for (const { id, message } of messages) {
await this.processReply(message);
// Acknowledge the message so it's not re-delivered.
await this.redis.xAck(SAGA_REPLY_STREAM, CONSUMER_GROUP, id);
}
} catch (err) {
console.error('Error in main processing loop:', err);
// In a real project, we might need a more robust back-off strategy.
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
startSaga(definition, initialPayload) {
const sagaId = randomUUID();
const sagaState = {
id: sagaId,
definition,
payload: initialPayload,
currentStep: 0,
isCompensating: false,
history: [],
};
this.activeSagas.set(sagaId, sagaState);
console.log(`Starting new saga: ${sagaId}`);
this.executeNextStep(sagaState);
}
async processReply({ sagaId, step, status, result }) {
const sagaState = this.activeSagas.get(sagaId);
if (!sagaState) {
console.warn(`Received reply for unknown or completed saga: ${sagaId}`);
return;
}
console.log(`Received reply for saga ${sagaId}, step ${step}: ${status}`);
sagaState.history.push({ step, status, result, timestamp: new Date().toISOString() });
const stepIndex = parseInt(step, 10);
if (status === 'SUCCESS') {
if (sagaState.isCompensating) {
// A compensation step succeeded, continue rolling back.
sagaState.currentStep = stepIndex - 1;
this.executeCompensation(sagaState);
} else {
// A forward step succeeded, proceed to the next one.
sagaState.currentStep = stepIndex + 1;
this.executeNextStep(sagaState);
}
} else { // status === 'FAILURE'
if (sagaState.isCompensating) {
// A critical error: a compensating action failed.
// This requires manual intervention.
console.error(`CRITICAL: Compensation failed for saga ${sagaId} at step ${step}. Manual intervention required.`);
this.activeSagas.delete(sagaId); // End the saga.
} else {
// A forward step failed, begin compensation.
sagaState.isCompensating = true;
sagaState.currentStep = stepIndex; // Start compensation from the failed step.
this.executeCompensation(sagaState);
}
}
}
async executeNextStep(sagaState) {
if (sagaState.currentStep >= sagaState.definition.steps.length) {
console.log(`Saga ${sagaState.id} completed successfully.`);
this.activeSagas.delete(sagaState.id);
return;
}
const stepDefinition = sagaState.definition.steps[sagaState.currentStep];
console.log(`Executing step ${sagaState.currentStep} for saga ${sagaState.id}: ${stepDefinition.action.command}`);
await resilientXAdd(stepDefinition.action.stream, {
sagaId: sagaState.id,
step: String(sagaState.currentStep),
command: stepDefinition.action.command,
payload: JSON.stringify(sagaState.payload), // Pass current payload
});
}
async executeCompensation(sagaState) {
if (sagaState.currentStep < 0) {
console.log(`Saga ${sagaState.id} compensation finished. Final status: FAILED.`);
this.activeSagas.delete(sagaState.id);
return;
}
const stepDefinition = sagaState.definition.steps[sagaState.currentStep];
const compensation = stepDefinition.compensation;
if (!compensation) {
// If a step has no compensation, we skip it and try to compensate the previous one.
console.warn(`No compensation defined for step ${sagaState.currentStep} in saga ${sagaState.id}. Skipping.`);
sagaState.currentStep--;
this.executeCompensation(sagaState);
return;
}
console.log(`Compensating step ${sagaState.currentStep} for saga ${sagaState.id}: ${compensation.command}`);
await resilientXAdd(compensation.stream, {
sagaId: sagaState.id,
step: String(sagaState.currentStep),
command: compensation.command,
// Pass the original payload to the compensation handler.
payload: JSON.stringify(sagaState.payload),
});
}
}
export default SagaOrchestrator;
A mock entrypoint is needed to simulate starting a saga. In a real system, this would be triggered by an API call or another event.
src/index.js
:
import SagaOrchestrator from './orchestrator.js';
// Example Saga Definition for an E-commerce order
const orderSagaDefinition = {
name: 'CreateOrderSaga',
steps: [
{
name: 'ReserveInventory',
action: { stream: 'inventory_commands', command: 'RESERVE' },
compensation: { stream: 'inventory_commands', command: 'RELEASE' },
},
{
name: 'ProcessPayment',
action: { stream: 'payment_commands', command: 'CHARGE' },
compensation: { stream: 'payment_commands', command: 'REFUND' },
},
{
name: 'CreateShipment',
action: { stream: 'shipping_commands', command: 'SCHEDULE' },
// Some actions might be non-compensatable once done.
compensation: { stream: 'shipping_commands', command: 'CANCEL' },
},
],
};
async function main() {
const orchestrator = new SagaOrchestrator();
// This part is just for demonstration. In reality, the run() method would be the main process,
// and startSaga would be called from another source (e.g., an API request handler).
// Here we start a saga after a delay to ensure the orchestrator is running.
setTimeout(() => {
console.log('--- Triggering a new Saga ---');
const initialPayload = {
userId: 'user-123',
itemId: 'item-abc',
quantity: 2,
amount: 99.98,
};
orchestrator.startSaga(orderSagaDefinition, initialPayload);
}, 5000);
// This will run forever, processing replies.
await orchestrator.run();
}
main().catch(console.error);
Participant Service Simulation
To test the orchestrator, we need services that act on its commands. Here is a simple, generic participant service that can be configured to succeed or fail to test the compensation flow.
src/participant.js
:
import { getRedisClient, parseXReadResponse, resilientXAdd } from './redis-client.js';
import { randomUUID } from 'crypto';
async function createParticipant(streamName, shouldFailCommand) {
const redis = await getRedisClient();
const consumerGroup = `${streamName}_group`;
const consumerName = `${streamName}_consumer_${randomUUID()}`;
try {
await redis.xGroupCreate(streamName, consumerGroup, '0', {
MKSTREAM: true,
});
console.log(`Consumer group '${consumerGroup}' created for stream '${streamName}'.`);
} catch (err) {
// Ignore "group already exists" errors
if (!err.message.includes('BUSYGROUP')) throw err;
}
console.log(`Participant for stream '${streamName}' starting...`);
while (true) {
try {
const response = await redis.xReadGroup(
consumerGroup,
consumerName,
{ key: streamName, id: '>' },
{ BLOCK: 0 }
);
const messages = parseXReadResponse(response);
for (const { id, message } of messages) {
const { sagaId, step, command, payload } = message;
console.log(`[${streamName}] Received command '${command}' for saga ${sagaId}`);
let status = 'SUCCESS';
let result = 'OK';
// Simulate a failure for testing compensation logic
if (command === shouldFailCommand) {
status = 'FAILURE';
result = `Simulated failure for command: ${command}`;
console.error(`[${streamName}] SIMULATING FAILURE for saga ${sagaId}`);
} else {
// Simulate work
await new Promise(resolve => setTimeout(resolve, 500 + Math.random() * 500));
}
await resilientXAdd('saga_reply', {
sagaId,
step,
status,
result: JSON.stringify(result),
});
await redis.xAck(streamName, consumerGroup, id);
}
} catch (err) {
console.error(`[${streamName}] Error in participant loop:`, err);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
// Read command-line arguments to configure the participant
const stream = process.argv[2];
const failCommand = process.argv[3] || null;
if (!stream) {
console.error('Usage: node participant.js <stream_name> [fail_command]');
process.exit(1);
}
createParticipant(stream, failCommand).catch(console.error);
Bundling for Production with Rollup
The pitfall of a typical Node.js container is the bulky node_modules
directory. Rollup allows us to bypass this entirely. We need to install Rollup and its plugins for resolving Node modules and handling CommonJS syntax, which is prevalent in the ecosystem.
package.json
(dependencies section):
"dependencies": {
"redis": "^4.6.10"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.7",
"@rollup/plugin-node-resolve": "^15.2.3",
"rollup": "^4.1.0"
}
The Rollup configuration is key. It needs to know the entry point and how to handle external dependencies.
rollup.config.js
:
import commonjs from '@rollup/plugin-commonjs';
import resolve from '@rollup/plugin-node-resolve';
export default [
{
input: 'src/index.js',
output: {
file: 'dist/orchestrator.js',
format: 'cjs', // CommonJS format is suitable for Node.js
sourcemap: true,
},
plugins: [
resolve({
preferBuiltins: true, // Important for Node.js built-ins like 'crypto'
}),
commonjs(), // Convert CommonJS modules to ES6
],
// The 'redis' package has dependencies that can't be bundled cleanly.
// We mark it as external, which means it must be present in node_modules
// where the final bundle is run. This is a common trade-off. For a truly
// dependency-free bundle, one might need to use a different redis client
// or accept a larger bundle size. For now, we'll install it in the final image.
// A better approach for many libs would be to bundle them.
// external: ['redis'],
},
{
input: 'src/participant.js',
output: {
file: 'dist/participant.js',
format: 'cjs',
sourcemap: true,
},
plugins: [
resolve({ preferBuiltins: true }),
commonjs(),
],
// external: ['redis'],
}
];
After running npx rollup -c
, we get two optimized files in the dist/
directory. These are the only application code we need for our container image.
Containerizing with Podman
A multi-stage Containerfile
is the most efficient way to build our image. The first stage is a “builder” with the full Node.js toolchain. The final stage is a minimal image that only contains the bundled output and the production node_modules
.
Containerfile
:
# ---- Builder Stage ----
FROM node:18-alpine AS builder
WORKDIR /usr/src/app
# Copy package files and install all dependencies (including dev)
COPY package*.json ./
RUN npm install
# Copy source code
COPY . .
# Run the rollup build
RUN npm run build
# ---- Production Stage ----
FROM node:18-alpine
WORKDIR /usr/src/app
# Only copy necessary files from the builder stage
COPY package*.json ./
# Install ONLY production dependencies. This is much smaller than the dev dependencies.
RUN npm install --omit=dev
# Copy the bundled application code from the builder stage
COPY /usr/src/app/dist/ ./dist/
# Set environment variables
ENV NODE_ENV=production
ENV REDIS_URL=redis://redis:6379
# Expose ports if needed (not for this worker service)
# EXPOSE 3000
# The command will be provided when running the container
CMD [ "node", "./dist/orchestrator.js" ]
We can create a simple composition file for podman-compose
or docker-compose
to run the entire stack locally.
compose.yml
:
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
orchestrator:
build: .
image: saga-orchestrator
command: ["node", "./dist/orchestrator.js"]
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
inventory_service:
image: saga-orchestrator # Reuse the same image
command: ["node", "./dist/participant.js", "inventory_commands"]
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
payment_service:
image: saga-orchestrator
# This service is configured to fail the 'CHARGE' command to test compensation
command: ["node", "./dist/participant.js", "payment_commands", "CHARGE"]
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
shipping_service:
image: saga-orchestrator
command: ["node", "./dist/participant.js", "shipping_commands"]
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
volumes:
redis_data:
To run this stack, first build the image with podman build -t saga-orchestrator .
and then launch with podman-compose up
. The logs will show the saga starting, the inventory service succeeding, the payment service failing, and then the orchestrator commanding the inventory service to execute its RELEASE
compensation action.
This implementation provides a solid foundation. The state of active sagas is currently held in memory within the orchestrator. For a production system with high-availability requirements, this state would need to be persisted, perhaps in Redis itself using Hashes. This would allow a standby orchestrator instance to take over mid-saga if the active one fails. Furthermore, the current model lacks robust observability; integrating distributed tracing by passing a trace context ID through the saga messages would be a critical next step for debugging complex flows in a production environment. The error handling for a failed compensation also remains a significant challenge—this often signifies a “business-level” inconsistency that requires manual intervention, and the system should be designed to flag these events clearly for operations teams.