The operational gap was becoming untenable. Our data analytics team relied on a series of complex, multi-stage Trino queries for generating critical business reports. These weren’t simple SELECT * FROM table
jobs; they involved creating temporary tables, running heavy aggregations, joining federated data sources, and finally, cleaning up intermediate artifacts. A typical workflow could take anywhere from five minutes to over an hour. The execution mechanism was a collection of brittle Python scripts, triggered manually. If step three of a five-step process failed, the entire chain halted. There was no automatic rollback, leaving orphaned temporary tables and forcing a manual, error-prone cleanup. For the analyst, it was a black box—they’d kick off a job and pray, with no visibility into progress until a success or failure message appeared in a Slack channel much later.
This lack of transactional integrity and real-time feedback was not just an inconvenience; it was a direct drain on productivity and compute resources. We needed a robust orchestrator to manage these long-running workflows as a single, logical transaction. The solution had to provide atomicity—all steps succeed, or the system compensates for partial success. It also had to be observable, both for the developers debugging it and for the end-users monitoring their jobs.
This led us to an architecture centered on the Saga pattern, specifically an orchestration-based implementation. A central service would define and execute the workflow, step-by-step, and would be responsible for triggering compensation actions in case of failure. To solve the black-box problem, we decided to build a simple Next.js interface that would provide real-time progress updates pushed from the orchestrator over WebSockets. Finally, the entire process would be heavily instrumented with metrics and structured logs, feeding into a Grafana dashboard to give us a god’s-eye view of the system’s health and performance.
The Initial Architectural Blueprint
Our goal was to build a system where a user action in a Next.js frontend would trigger a long-running, reliable, and observable backend process.
The core components were:
- Next.js Frontend: A simple UI to define query parameters and initiate the job. It would also feature a real-time progress view.
- API Layer: A Next.js API route to accept the initial request and hand it off to the Saga Orchestrator.
- Saga Orchestrator (Node.js): The heart of the system. A stateful service that manages the lifecycle of the query workflow.
- WebSocket Server: Pushes real-time status updates from the orchestrator to the specific client that initiated the job.
- Trino Client: A dedicated module for interacting with our Trino cluster, capable of executing queries and handling cancellation.
- Observability Stack: Prometheus for metrics, Loki for logs, and Grafana for visualization.
Here’s the high-level data flow:
sequenceDiagram participant User participant Next.js Frontend participant Next.js API participant Saga Orchestrator participant WebSocket Server participant Trino User->>Next.js Frontend: Submits Query Job Next.js Frontend->>Next.js API: POST /api/execute-query Next.js API->>Saga Orchestrator: startSaga(queryDefinition) Saga Orchestrator-->>Next.js API: { executionId: 'xyz-123' } Next.js API-->>Next.js Frontend: { executionId: 'xyz-123' } Next.js Frontend->>WebSocket Server: Connect & subscribe to channel 'xyz-123' loop For each Saga step Saga Orchestrator->>Trino: Execute Query (Step N) Saga Orchestrator->>WebSocket Server: Push status_update (Step N Started) WebSocket Server->>Next.js Frontend: Real-time update Trino-->>Saga Orchestrator: Query Result (Success/Failure) alt Step N Fails Saga Orchestrator->>Trino: Execute Compensation Query (Step N-1, N-2, ...) Saga Orchestrator->>WebSocket Server: Push status_update (Step N Failed, Compensating...) WebSocket Server->>Next.js Frontend: Real-time error/rollback update end Saga Orchestrator->>WebSocket Server: Push status_update (Step N Completed) WebSocket Server->>Next.js Frontend: Real-time update end Saga Orchestrator->>WebSocket Server: Push final_result WebSocket Server->>Next.js Frontend: Display final result/status
Implementing the Saga Orchestrator
The first and most critical piece was the orchestrator. We built this in a standalone Node.js service using TypeScript. The orchestrator’s responsibility is to manage the state machine of the Saga. We defined a simple in-memory orchestrator for our V1. A production-ready system must persist this state to a database like Redis or Postgres to handle service restarts, but an in-memory approach was sufficient to validate the logic.
A SagaDefinition
outlines the steps and their corresponding compensation actions.
// src/saga/types.ts
// The context is a shared state object passed between steps.
export interface SagaContext {
executionId: string;
userId: string;
queryParams: Record<string, any>;
results: Record<string, any>; // Store results from each step
// ... other shared state
}
// A single step in the saga
export interface SagaStep {
name: string;
// The main action to perform
invoke: (context: SagaContext) => Promise<any>;
// The action to perform if a subsequent step fails
compensate: (context: Saga-Context) => Promise<void>;
}
export interface SagaDefinition {
name: string;
steps: SagaStep[];
}
The orchestrator itself is a class that takes a definition and a context, then executes the steps sequentially. The critical part is the try...catch
block that triggers the compensation flow.
// src/saga/SagaOrchestrator.ts
import { SagaContext, SagaStep } from './types';
import { logger } from '../utils/logger'; // A structured logger (e.g., Winston)
export class SagaOrchestrator {
private completedSteps: SagaStep[] = [];
constructor(
private readonly steps: SagaStep[],
private readonly context: SagaContext
) {}
public async execute(): Promise<boolean> {
for (const step of this.steps) {
try {
logger.info(`[Saga ${this.context.executionId}] Invoking step: ${step.name}`);
// In a real implementation, you would emit a WebSocket event here.
// this.eventEmitter.emit('step:start', { executionId: this.context.executionId, step: step.name });
const result = await step.invoke(this.context);
this.context.results[step.name] = result;
this.completedSteps.push(step);
logger.info(`[Saga ${this.context.executionId}] Completed step: ${step.name}`);
// this.eventEmitter.emit('step:success', { executionId: this.context.executionId, step: step.name });
} catch (error) {
logger.error(`[Saga ${this.context.executionId}] Step ${step.name} failed. Starting compensation.`, { error });
// this.eventEmitter.emit('step:failure', { executionId: this.context.executionId, step: step.name, error });
await this.compensate();
return false; // Saga failed
}
}
logger.info(`[Saga ${this.context.executionId}] Saga completed successfully.`);
// this.eventEmitter.emit('saga:success', { executionId: this.context.executionId });
return true; // Saga succeeded
}
private async compensate(): Promise<void> {
logger.warn(`[Saga ${this.context.executionId}] Starting rollback for ${this.completedSteps.length} steps.`);
for (const step of this.completedSteps.reverse()) {
try {
logger.info(`[Saga ${this.context.executionId}] Compensating for step: ${step.name}`);
// this.eventEmitter.emit('step:compensating', { executionId: this.context.executionId, step: step.name });
await step.compensate(this.context);
logger.info(`[Saga ${this.context.executionId}] Compensation for step ${step.name} successful.`);
// this.eventEmitter.emit('step:compensated', { executionId: this.context.executionId, step: step.name });
} catch (compensationError) {
// This is a critical failure. Manual intervention is required.
logger.error(`[Saga ${this.context.executionId}] CRITICAL: Compensation for step ${step.name} failed.`, { compensationError });
// this.eventEmitter.emit('saga:critical-failure', { executionId: this.context.executionId, step: step.name });
// In a real system, this would trigger a high-priority alert.
return;
}
}
}
}
This orchestrator is generic. The actual logic for our Trino workflow is defined separately.
Defining the Trino Query Saga
Our specific workflow involves creating a temporary summary table, running the main analytical query against it, and then dropping the temporary table.
// src/services/TrinoClient.ts
import { Trino } from 'trino-client';
// A simplified Trino client wrapper. A real one would have connection pooling,
// query cancellation logic, and more robust error handling.
export const trinoClient = Trino.create({
server: process.env.TRINO_SERVER || 'http://localhost:8080',
user: 'analytics-service',
catalog: 'hive',
schema: 'default',
});
// A production-grade implementation would include methods to check query status and cancel it.
// For example: `cancelQuery(queryId: string)`.
export async function executeTrinoQuery(sql: string): Promise<any> {
const query = trinoClient.query(sql);
const result = await query.execute();
if (result.error) {
// Enrich error with query details for better debugging
const enrichedError = new Error(`Trino query failed: ${result.error.message}`);
(enrichedError as any).query = sql;
(enrichedError as any).errorCode = result.error.errorCode;
throw enrichedError;
}
return result.data;
}
// src/sagas/trinoReportSaga.ts
import { SagaDefinition, SagaStep, SagaContext } from '../saga/types';
import { executeTrinoQuery } from '../services/TrinoClient';
const createTempTableStep: SagaStep = {
name: 'CreateTempSummaryTable',
invoke: async (context: SagaContext) => {
const { executionId, queryParams } = context;
const tempTableName = `tmp_summary_${executionId.replace(/-/g, '_')}`;
context.results.tempTableName = tempTableName; // Save for subsequent steps
const sql = `
CREATE TABLE ${tempTableName} AS
SELECT user_id, COUNT(*) as event_count
FROM events
WHERE event_date BETWEEN DATE '${queryParams.startDate}' AND DATE '${queryParams.endDate}'
GROUP BY user_id
HAVING COUNT(*) > ${queryParams.minEvents};
`;
return executeTrinoQuery(sql);
},
compensate: async (context: SagaContext) => {
const { tempTableName } = context.results;
if (tempTableName) {
const sql = `DROP TABLE IF EXISTS ${tempTableName};`;
await executeTrinoQuery(sql);
}
},
};
const runMainAnalyticsStep: SagaStep = {
name: 'RunMainAnalyticsQuery',
invoke: async (context: SagaContext) => {
const { tempTableName } = context.results;
const sql = `
SELECT u.country, AVG(s.event_count) as avg_events
FROM users u
JOIN ${tempTableName} s ON u.id = s.user_id
GROUP BY u.country
ORDER BY avg_events DESC;
`;
return executeTrinoQuery(sql);
},
compensate: async () => {
// This step is read-only, so no compensation is needed.
return Promise.resolve();
},
};
// The cleanup step is crucial. Its compensation is empty because if the cleanup
// itself fails, we don't want to try and re-create the table.
const dropTempTableStep: SagaStep = {
name: 'DropTempTable',
invoke: async (context: SagaContext) => {
const { tempTableName } = context.results;
const sql = `DROP TABLE IF EXISTS ${tempTableName};`;
return executeTrinoQuery(sql);
},
compensate: async () => {
// A failure here is problematic but we don't want to trigger other compensations.
// Logging and alerting are key.
return Promise.resolve();
},
};
export const trinoReportSagaDefinition: SagaDefinition = {
name: 'TrinoDailyReportSaga',
steps: [createTempTableStep, runMainAnalyticsStep, dropTempTableStep],
};
This definition cleanly separates the what (the SQL queries) from the how (the orchestration logic).
Real-Time Frontend with Next.js and WebSockets
With the backend logic defined, we turned to the user-facing component. The goal was to avoid polling at all costs. WebSockets were the natural choice. We used the ws
library on our Node.js server for its simplicity and performance.
The WebSocket server logic is straightforward: when a client connects, it expects a message to subscribe to a specific executionId
. All subsequent events for that Saga are published only to clients subscribed to that ID.
// src/services/WebSocketServer.ts
import { WebSocketServer, WebSocket } from 'ws';
import { EventEmitter } from 'events';
// A simple pub/sub manager for WebSocket connections
const subscriptions = new Map<string, Set<WebSocket>>();
export const sagaEventEmitter = new EventEmitter();
export function setupWebSocketServer(server: any) {
const wss = new WebSocketServer({ server });
wss.on('connection', (ws) => {
ws.on('message', (message) => {
try {
const data = JSON.parse(message.toString());
if (data.type === 'subscribe' && data.executionId) {
const { executionId } = data;
if (!subscriptions.has(executionId)) {
subscriptions.set(executionId, new Set());
}
subscriptions.get(executionId)?.add(ws);
ws.send(JSON.stringify({ type: 'subscribed', executionId }));
}
} catch (e) {
console.error('Invalid WebSocket message:', e);
}
});
ws.on('close', () => {
// Clean up subscriptions when a client disconnects
subscriptions.forEach((clients, executionId) => {
if (clients.has(ws)) {
clients.delete(ws);
if (clients.size === 0) {
subscriptions.delete(executionId);
}
}
});
});
});
// Hook into the saga events
const broadcast = (executionId: string, payload: any) => {
const clients = subscriptions.get(executionId);
if (clients) {
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(payload));
}
});
}
};
// Now we wire up the events from the orchestrator
sagaEventEmitter.on('step:start', (data) => broadcast(data.executionId, { type: 'step:start', ...data }));
sagaEventEmitter.on('step:success', (data) => broadcast(data.executionId, { type: 'step:success', ...data }));
sagaEventEmitter.on('step:failure', (data) => broadcast(data.executionId, { type: 'step:failure', ...data }));
// ... and so on for all other events
}
On the Next.js frontend, we created a custom hook useSagaMonitor
to encapsulate the WebSocket logic.
// components/useSagaMonitor.ts
import { useState, useEffect, useRef } from 'react';
export interface SagaEvent {
type: string;
step?: string;
timestamp: string;
error?: string;
// ... any other event data
}
export function useSagaMonitor(executionId: string | null) {
const [events, setEvents] = useState<SagaEvent[]>([]);
const [isConnected, setIsConnected] = useState(false);
const ws = useRef<WebSocket | null>(null);
useEffect(() => {
if (!executionId) {
return;
}
// Connect to the WebSocket server
ws.current = new WebSocket(process.env.NEXT_PUBLIC_WEBSOCKET_URL || 'ws://localhost:8081');
ws.current.onopen = () => {
console.log('WebSocket connected');
setIsConnected(true);
// Subscribe to events for this execution
ws.current?.send(JSON.stringify({ type: 'subscribe', executionId }));
};
ws.current.onmessage = (event) => {
const newEvent = JSON.parse(event.data);
setEvents((prevEvents) => [...prevEvents, { ...newEvent, timestamp: new Date().toISOString() }]);
};
ws.current.onclose = () => {
console.log('WebSocket disconnected');
setIsConnected(false);
};
ws.current.onerror = (error) => {
console.error('WebSocket error:', error);
setIsConnected(false);
};
// Cleanup on unmount
return () => {
ws.current?.close();
};
}, [executionId]);
return { events, isConnected };
}
A React component can then use this hook to display the real-time progress.
// pages/query/[executionId].tsx
import { useRouter } from 'next/router';
import { useSagaMonitor, SagaEvent } from '../../components/useSagaMonitor';
function renderEvent(event: SagaEvent) {
// Simple rendering logic for different event types
switch (event.type) {
case 'step:start':
return `[${event.timestamp}] Step '${event.step}' started...`;
case 'step:success':
return `[${event.timestamp}] Step '${event.step}' completed successfully.`;
case 'step:failure':
return `[${event.timestamp}] ERROR: Step '${event.step}' failed. Reason: ${event.error}`;
case 'step:compensating':
return `[${event.timestamp}] ROLLING BACK: Compensating for '${event.step}'.`;
// ... other cases
default:
return `[${event.timestamp}] Unknown event: ${event.type}`;
}
}
export default function QueryStatusPage() {
const router = useRouter();
const { executionId } = router.query;
const { events, isConnected } = useSagaMonitor(executionId as string | null);
return (
<div>
<h1>Query Status: {executionId}</h1>
<p>Connection Status: {isConnected ? 'Connected' : 'Disconnected'}</p>
<div style={{ fontFamily: 'monospace', whiteSpace: 'pre', backgroundColor: '#f0f0f0', padding: '1rem' }}>
{events.map((event, index) => (
<div key={index}>{renderEvent(event)}</div>
))}
</div>
</div>
);
}
Making It Observable in Grafana
The final piece was closing the loop on observability. Without it, we’re just creating a different kind of black box. We instrumented the orchestrator with two key signals: structured logs and Prometheus metrics.
1. Structured Logging with a Correlation ID
We used Winston to produce JSON logs. Every saga execution is assigned a unique executionId
which acts as our correlation ID. This ID is injected into every single log statement related to that execution.
// src/utils/logger.ts
import winston from 'winston';
export const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
// In production, this would write to stdout to be collected by a log aggregator like Fluentd
new winston.transports.Console(),
],
});
// Example usage within the orchestrator
// logger.info(`My log message`, { executionId: 'xyz-123', component: 'SagaOrchestrator' });
This allows us to use Loki in Grafana to instantly pull all logs for a specific failed job. A simple LogQL query like {app="saga-orchestrator"} | json | executionId="xyz-123"
will show the entire lifecycle of that one execution across all components.
2. Key Performance Metrics with Prometheus
Metrics give us the high-level view. We used the prom-client
library to expose a /metrics
endpoint. The crucial metrics for a Saga orchestrator are:
-
saga_executions_total
: A counter for started, completed, and failed sagas. -
saga_duration_seconds
: A histogram to track the latency of entire saga executions. -
saga_step_duration_seconds
: A histogram to pinpoint which specific steps are slow.
// src/utils/metrics.ts
import client from 'prom-client';
export const registry = new client.Registry();
// Enable default metrics
client.collectDefaultMetrics({ register: registry });
export const sagaExecutionsTotal = new client.Counter({
name: 'saga_executions_total',
help: 'Total number of saga executions',
labelNames: ['saga_name', 'status'], // status can be 'started', 'completed', 'failed'
});
registry.register(sagaExecutionsTotal);
export const sagaStepDurationSeconds = new client.Histogram({
name: 'saga_step_duration_seconds',
help: 'Duration of saga steps in seconds',
labelNames: ['saga_name', 'step_name', 'status'],
buckets: [0.1, 0.5, 1, 5, 10, 30, 60, 300], // Buckets from 100ms to 5 minutes
});
registry.register(sagaStepDurationSeconds);
We instrumented the orchestrator to update these metrics at key points in the lifecycle. For example, we’d start a timer before step.invoke
and record the duration upon completion or failure.
In Grafana, we can now build a dashboard that shows:
- A time-series graph of saga success/failure rates:
sum(rate(saga_executions_total{status="completed"}[5m])) / sum(rate(saga_executions_total[5m]))
- A heatmap of step latencies to identify bottlenecks: using the
saga_step_duration_seconds
histogram. - A table of the slowest-running sagas.
When an alert fires for a high failure rate, an engineer can look at the Grafana dashboard to see which step is failing, then pivot directly to the logs for a specific failed executionId
to see the exact error message and context, all within a few clicks.
Lingering Issues and Future Work
This architecture solved our immediate problems, but it’s not without its own set of trade-offs and areas for improvement. The current in-memory orchestrator is a single point of failure. If the Node.js process crashes mid-saga, the state is lost, and we could have orphaned resources in Trino. The immediate next step is to persist the saga’s state and current step to a durable store like PostgreSQL after each successful step invocation. This would allow a new orchestrator instance to resume interrupted sagas upon startup.
Furthermore, our WebSocket error handling is simplistic. A client’s network connection can be flaky. A more robust solution would involve an event queue on the client-side and a sequence numbering system to ensure that events are not missed during brief disconnections. The client could request any missed events from the server upon reconnection.
Finally, the Trino client itself is a potential bottleneck. It executes queries without any concurrency control or backpressure. In a scenario with many simultaneous users, we could easily overwhelm the Trino cluster. A proper implementation would require a queuing mechanism in front of the Trino client, with configurable concurrency limits and priorities to ensure fair resource usage and system stability.