The promise of microservices—decoupled, independently deployable units—often conceals a lurking beast: operational blindness. In one of our systems, this blindness became a critical failure. A user-facing request would trigger a chain of events across services communicating via NATS. When a downstream consumer service failed silently, we were left scrambling. The logs from the consumer showed an error, but tracing it back to the originating API call that caused it was a manual, time-consuming process of correlating timestamps and log messages. The asynchronous boundary of NATS had become an informational black hole.
Our initial concept was straightforward: inject distributed tracing context into NATS messages. The challenge wasn’t the idea, but the execution. How do we build a propagation mechanism that is reliable, transparent to the application developer, and, most importantly, verifiably correct without depending on a running Zipkin instance for validation? In a production environment, infrastructure components like this cannot be “hope-driven.” They must be provably robust. This led us to a non-negotiable requirement: the entire context propagation layer must be developed using a strict Test-Driven Development (TDD) methodology.
The technology selection followed from this core requirement. For tracing, OpenTelemetry was the clear choice over vendor-specific libraries. Its separation of API, SDK, and exporters provides the flexibility to switch backends (from Zipkin to Jaeger, for instance) without rewriting instrumentation code. For context propagation, the W3C Trace Context standard is the default and most widely supported format. NATS messages support headers, a natural and non-intrusive place to carry this metadata without polluting the primary payload. TypeScript, with its static typing, would provide the guardrails to prevent common errors when handling complex context objects.
The core of our solution would be a wrapper around the standard NATS client. This wrapper would automatically handle the injection and extraction of trace contexts, making observability a default feature, not an afterthought. And every step of its creation would begin with a failing test.
Project Foundation and Initial Failing Test
First, the development environment needs to be set up. This includes the core dependencies for our services and the testing framework.
package.json
{
"name": "nats-tracing-tdd",
"version": "1.0.0",
"main": "index.js",
"scripts": {
"test": "jest"
},
"devDependencies": {
"@types/jest": "^29.5.12",
"jest": "^29.7.0",
"ts-jest": "^29.1.2",
"typescript": "^5.3.3"
},
"dependencies": {
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/context-async-hooks": "^1.22.0",
"@opentelemetry/exporter-zipkin": "^1.22.0",
"@opentelemetry/instrumentation": "^0.50.0",
"@opentelemetry/resources": "^1.22.0",
"@opentelemetry/sdk-node": "^0.50.0",
"@opentelemetry/semantic-conventions": "^1.22.0",
"nats": "^2.23.0"
}
}
We also need a Jest configuration that works with TypeScript.
jest.config.js
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
clearMocks: true,
};
Our first goal is to prove, via a test, that a trace context created in a “producer” can be successfully passed through a NATS message and reconstituted in a “consumer.” The consumer’s span must be a direct child of the producer’s span.
To do this, we need a minimal, in-memory OpenTelemetry setup for testing purposes. We’ll use a ReadableSpan
and an InMemorySpanExporter
to capture exported spans and assert their properties.
src/testing/tracing.ts
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import { InMemorySpanExporter, SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { W3CTraceContextPropagator } from '@opentelemetry/core';
import { api, ContextManager } from '@opentelemetry/api';
import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks';
// This setup is specifically for testing. It uses an in-memory exporter
// so we can inspect the spans generated during a test run.
export function setupTestTracer(): {
provider: NodeTracerProvider;
exporter: InMemorySpanExporter;
propagator: W3CTraceContextPropagator;
contextManager: ContextManager;
} {
const provider = new NodeTracerProvider();
const exporter = new InMemorySpanExporter();
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));
provider.register();
const contextManager = new AsyncHooksContextManager();
contextManager.enable();
api.context.setGlobalContextManager(contextManager);
const propagator = new W3CTraceContextPropagator();
api.propagation.setGlobalPropagator(propagator);
return { provider, exporter, propagator, contextManager };
}
Now, the first failing test. We will define a test suite for our yet-to-be-created TraceableNatsClient
. The test will simulate the full publish-subscribe cycle.
src/TraceableNatsClient.test.ts
import { setupTestTracer } from './testing/tracing';
import { api } from '@opentelemetry/api';
// Placeholder for the class we are about to build
class TraceableNatsClient {
// Methods will be implemented here
}
describe('TraceableNatsClient', () => {
// Setup in-memory tracer for testing
const { exporter, provider } = setupTestTracer();
const tracer = provider.getTracer('test-tracer');
beforeEach(() => {
// Clear captured spans before each test
exporter.reset();
});
it('should propagate trace context from publisher to subscriber', () => {
// This is the core test that drives our implementation.
// It will fail until we build the TraceableNatsClient.
const parentSpan = tracer.startSpan('test-parent-span');
// The test needs to run within the context of the parent span
api.context.with(api.trace.setSpan(api.context.active(), parentSpan), () => {
// 1. ARRANGE
// const natsClient = new TraceableNatsClient();
// const subject = 'test.subject';
// const payload = 'hello world';
// 2. ACT
// natsClient.publish(subject, payload);
// natsClient.subscribe(subject, (msg) => {
// const childSpan = tracer.startSpan('test-child-span');
// childSpan.end();
// });
});
parentSpan.end();
// 3. ASSERT
const spans = exporter.getFinishedSpans();
// For now, this will fail because no code exists and no spans are generated.
// The goal is to see two spans: a parent and a child.
expect(spans).toHaveLength(2);
const child = spans.find(s => s.name === 'test-child-span');
const parent = spans.find(s => s.name === 'test-parent-span');
expect(child).toBeDefined();
expect(parent).toBeDefined();
// The critical assertion: the child's parentSpanId must match the parent's spanId.
expect(child!.parentSpanId).toEqual(parent!.spanContext().spanId);
expect(child!.spanContext().traceId).toEqual(parent!.spanContext().traceId);
});
});
Running npm test
at this point correctly produces a failure. The test expects two spans, but finds zero. This is the starting point for our TDD cycle.
Implementation: The TraceableNatsClient
Our goal is to create a wrapper that intercepts publish
and subscribe
calls to manage trace context. The wrapper will hold a reference to a real nats.js
client.
Let’s start building the class and the publish
method. The publish
method must:
- Get the active OpenTelemetry context.
- Use the global propagator to inject the context into a carrier object.
- Convert this carrier object into NATS headers.
- Call the underlying NATS client’s
publish
method with the payload and the newly created headers.
src/TraceableNatsClient.ts
import { NatsConnection, StringCodec, headers, MsgHdrs } from 'nats';
import { api, propagation, SpanStatusCode, Tracer } from '@opentelemetry/api';
const sc = StringCodec();
export class TraceableNatsClient {
private readonly tracer: Tracer;
constructor(private natsConnection: NatsConnection) {
// In a real application, you'd get the tracer from a global registry.
this.tracer = api.trace.getTracer('nats-instrumentation', '0.1.0');
}
publish(subject: string, payload: string): void {
const natsHeaders = headers();
// 1. Get the active context. This contains the current span, if any.
const activeContext = api.context.active();
// 2. Use the global propagator (W3CTraceContextPropagator by default)
// to inject the context's `traceparent` and `tracestate` into a carrier object.
propagation.inject(activeContext, natsHeaders, {
// The setter function tells the propagator how to set a key/value on our carrier.
// For NATS headers, the carrier is the `MsgHdrs` object itself.
set: (carrier: MsgHdrs, key: string, value: string) => {
carrier.set(key, value);
},
});
// A common mistake is to forget to handle potential errors from the underlying client.
// While NATS publish is fire-and-forget, in a real project you'd add logging.
this.natsConnection.publish(subject, sc.encode(payload), {
headers: natsHeaders,
});
}
// subscribe method to be implemented next
}
This implementation of publish
is incomplete without its counterpart, subscribe
. The subscribe
method is more complex. It needs to wrap the user-provided message handler to perform context extraction before executing the user’s logic.
The subscribe
method must:
- Extract the context from the incoming message headers using the propagator.
- Create a new span representing the “processing” of the message. This new span’s parent will be the context extracted from the headers.
- Execute the user’s callback within the context of this new span.
- Ensure the span is correctly ended, even if the user’s callback throws an error.
// ... inside TraceableNatsClient.ts
// The user provides a handler that just cares about the message payload.
type MessageHandler = (payload: string) => void | Promise<void>;
export class TraceableNatsClient {
// ... constructor and publish method ...
async subscribe(subject: string, handler: MessageHandler): Promise<void> {
const subscription = this.natsConnection.subscribe(subject);
(async () => {
for await (const msg of subscription) {
// 1. Extract context from NATS headers. The carrier is the headers object.
const parentContext = propagation.extract(api.context.active(), msg.headers, {
keys: (carrier: MsgHdrs) => carrier.keys(),
get: (carrier: MsgHdrs, key: string) => carrier.get(key),
});
const span = this.tracer.startSpan(
`${subject} process`,
{
kind: api.SpanKind.CONSUMER,
attributes: {
'messaging.system': 'nats',
'messaging.destination': subject,
'messaging.protocol': 'nats',
},
},
parentContext // This links the new span to the producer's span
);
// 2. Execute user code within the new span's context.
// This is the most critical step. `context.with` ensures that any further
// async operations or spans created inside the handler will automatically
// become children of this new consumer span.
await api.context.with(api.trace.setSpan(parentContext, span), async () => {
try {
await handler(sc.decode(msg.data));
span.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: (error as Error).message,
});
span.recordException(error as Error);
// In a real project, re-throwing or handling the error is crucial.
// For this example, we log it to the span and move on.
console.error('Error processing message:', error);
} finally {
// 3. Always end the span.
span.end();
}
});
}
})().catch(err => console.error(`Subscription to ${subject} failed`, err));
}
}
Making the Test Pass
With the implementation in place, we need to wire it into our test file. A real-world test would use a mock NATS client, but for an integration test like this, using the actual nats.js
library against an in-memory or mock server provides higher confidence. For simplicity here, we’ll simulate the client interaction directly.
To do this properly, we need a way to mock the NatsConnection
and simulate a message delivery.
Updated src/TraceableNatsClient.test.ts
import { setupTestTracer } from './testing/tracing';
import { api } from '@opentelemetry/api';
import { TraceableNatsClient } from './TraceableNatsClient';
import { NatsConnection, MsgHdrs } from 'nats';
// A simple mock for the NATS connection
const createMockNatsConnection = () => {
let handler: (msg: { data: Uint8Array; headers?: MsgHdrs }) => void;
return {
publish: jest.fn((subject, payload, options) => {
// When publish is called, immediately trigger the mocked subscriber handler
if (handler) {
handler({ data: payload, headers: options?.headers });
}
}),
subscribe: jest.fn().mockImplementation(() => ({
[Symbol.asyncIterator]: async function* () {
// This allows the for-await-of loop in our implementation to work.
// We'll manually yield a message when `publish` is called.
yield await new Promise(resolve => {
handler = resolve;
});
},
})),
// Add any other methods from NatsConnection that might be called.
closed: jest.fn().mockResolvedValue(undefined),
};
};
describe('TraceableNatsClient', () => {
const { exporter, provider } = setupTestTracer();
const tracer = provider.getTracer('test-tracer');
let mockNatsConn: ReturnType<typeof createMockNatsConnection>;
let traceableClient: TraceableNatsClient;
beforeEach(() => {
exporter.reset();
mockNatsConn = createMockNatsConnection();
traceableClient = new TraceableNatsClient(mockNatsConn as unknown as NatsConnection);
});
it('should propagate trace context from publisher to subscriber', async () => {
const subject = 'test.subject';
const payload = 'hello world';
let childSpanEnded = false;
// Arrange: Set up a subscriber
await traceableClient.subscribe(subject, (receivedPayload) => {
expect(receivedPayload).toBe(payload);
// The code inside this handler should run within the consumer span's context.
// So starting a new span here should make it a child of the consumer span.
const grandchildSpan = tracer.startSpan('test-grandchild-span');
grandchildSpan.end();
});
// Act: Publish a message within a parent span context
const parentSpan = tracer.startSpan('test-parent-span');
await api.context.with(api.trace.setSpan(api.context.active(), parentSpan), () => {
traceableClient.publish(subject, payload);
});
parentSpan.end();
// A small delay to ensure async processing completes in the mock.
await new Promise(resolve => setTimeout(resolve, 10));
// Assert
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(3); // parent, consumer, grandchild
const parent = spans.find(s => s.name === 'test-parent-span');
const consumer = spans.find(s => s.name === `${subject} process`);
const grandchild = spans.find(s => s.name === 'test-grandchild-span');
expect(parent).toBeDefined();
expect(consumer).toBeDefined();
expect(grandchild).toBeDefined();
// The key assertions for our propagation logic
expect(consumer!.parentSpanId).toEqual(parent!.spanContext().spanId);
expect(consumer!.spanContext().traceId).toEqual(parent!.spanContext().traceId);
// Verify the context was correctly activated in the handler
expect(grandchild!.parentSpanId).toEqual(consumer!.spanContext().spanId);
expect(grandchild!.spanContext().traceId).toEqual(parent!.spanContext().traceId);
expect(mockNatsConn.publish).toHaveBeenCalledTimes(1);
const publishOptions = mockNatsConn.publish.mock.calls[0][2];
expect(publishOptions.headers.has('traceparent')).toBe(true);
});
});
With this detailed test and the full implementation, running npm test
now yields a pass. We have programmatically proven that our context propagation logic works as intended.
TDD Cycle 2: Handling Messages Without a Trace Context
A pragmatic engineer knows systems don’t always operate under ideal conditions. What if a message is published to NATS from a legacy service that isn’t instrumented? Our subscriber should not fail. It should gracefully handle the absence of trace headers and simply start a new trace.
Here is the failing test for this scenario:
src/TraceableNatsClient.test.ts
// ... inside the describe block ...
it('should start a new trace if no context is present in the message', async () => {
const subject = 'untraced.subject';
const payload = 'legacy message';
// Arrange: Subscribe like normal
await traceableClient.subscribe(subject, (receivedPayload) => {
expect(receivedPayload).toBe(payload);
// Create a span inside the handler
tracer.startSpan('span-inside-handler').end();
});
// Act: Publish with no active span context. We also directly call the mock
// to simulate a message arriving with no headers.
const mockPublish = mockNatsConn.publish as jest.Mock;
mockPublish.mock.calls[0][2].headers = undefined; // Manually remove headers
traceableClient.publish(subject, payload);
await new Promise(resolve => setTimeout(resolve, 10));
// Assert
const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(2); // The consumer span and the one inside the handler
const consumerSpan = spans.find(s => s.name === `${subject} process`);
expect(consumerSpan).toBeDefined();
// The critical assertion: This span should be a root span, having no parent.
expect(consumerSpan!.parentSpanId).toBeUndefined();
const innerSpan = spans.find(s => s.name === 'span-inside-handler');
expect(innerSpan).toBeDefined();
expect(innerSpan!.parentSpanId).toEqual(consumerSpan!.spanContext().spanId);
});
This test will pass without any code changes. The OpenTelemetry API’s propagation.extract
method is designed to handle this case gracefully. When it fails to find headers, it returns the unaltered active context (which is empty in this test case). Then, tracer.startSpan
with this empty context automatically creates a new root span. This is a great example of where relying on a well-designed library pays off. The “pitfall” here would have been trying to manually parse headers and failing to account for their absence. TDD confirms the behavior is correct.
Full System Integration and Visualization
While TDD gives us confidence in the logic, seeing the end-to-end result is the final validation. We’ll set up a small project with two services, api-gateway
and email-service
, that communicate over NATS. We’ll also configure a Zipkin exporter.
A docker-compose.yml
is needed to run the dependencies:
version: '3.8'
services:
nats:
image: nats:2.10
ports:
- "4222:4222"
zipkin:
image: openzipkin/zipkin:latest
ports:
- "9411:9411"
A shared tracing setup file is crucial for consistency.
src/tracing.ts
import { NodeSDK } from '@opentelemetry/sdk-node';
import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks';
import { ZipkinExporter } from '@opentelemetry/exporter-zipkin';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import { W3CTraceContextPropagator } from '@opentelemetry/core';
import {
SimpleSpanProcessor,
} from '@opentelemetry/sdk-trace-node';
export function configureTracing(serviceName: string) {
const sdk = new NodeSDK({
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: serviceName,
}),
spanProcessor: new SimpleSpanProcessor(new ZipkinExporter()),
contextManager: new AsyncHooksContextManager(),
propagator: new W3CTraceContextPropagator(),
});
sdk.start();
console.log(`Tracing configured for service: ${serviceName}`);
process.on('SIGTERM', () => {
sdk.shutdown()
.then(() => console.log('Tracing terminated'))
.catch((error) => console.log('Error terminating tracing', error))
.finally(() => process.exit(0));
});
}
Now, the two services using our TraceableNatsClient
.
src/service-a.ts
import { connect } from 'nats';
import { configureTracing } from './tracing';
import { TraceableNatsClient } from './TraceableNatsClient';
import { api } from '@opentelemetry/api';
async function main() {
configureTracing('api-gateway');
const tracer = api.trace.getTracer('api-gateway-tracer');
const nc = await connect({ servers: 'nats://localhost:4222' });
const traceableClient = new TraceableNatsClient(nc);
console.log('API Gateway connected to NATS');
// Simulate an incoming HTTP request every 5 seconds
setInterval(() => {
const span = tracer.startSpan('POST /users/register');
api.context.with(api.trace.setSpan(api.context.active(), span), () => {
console.log('API Gateway: Handling user registration, publishing event.');
const userEmail = `user-${Date.now()}@example.com`;
traceableClient.publish('user.registered', JSON.stringify({ email: userEmail }));
span.end();
});
}, 5000);
}
main().catch(console.error);
src/service-b.ts
import { connect } from 'nats';
import { configureTracing } from './tracing';
import { TraceableNatsClient } from './TraceableNatsClient';
async function main() {
configureTracing('email-service');
const nc = await connect({ servers: 'nats://localhost:4222' });
const traceableClient = new TraceableNatsClient(nc);
console.log('Email Service connected to NATS, subscribing to user.registered');
await traceableClient.subscribe('user.registered', async (payload) => {
// This handler runs inside a new span created by our traceable client
const data = JSON.parse(payload);
console.log(`Email Service: Received registration for ${data.email}. Sending email...`);
// Simulate async work like a database call or external API call
await new Promise(resolve => setTimeout(resolve, 150));
console.log(`Email Service: Email sent to ${data.email}.`);
});
}
main().catch(console.error);
After running docker-compose up
and then starting both services, the Zipkin UI at localhost:9411
will show the complete, correlated trace, crossing the process and asynchronous NATS boundary seamlessly.
sequenceDiagram participant User participant ServiceA as api-gateway participant NATS participant ServiceB as email-service participant Zipkin User->>ServiceA: POST /users/register activate ServiceA Note over ServiceA: Start Span A ("POST /users/register") ServiceA->>NATS: PUBLISH user.registered (with Trace Context of A) deactivate ServiceA NATS-->>ServiceB: DELIVER user.registered activate ServiceB Note over ServiceB: Extract Trace Context from headers Note over ServiceB: Start Span B ("user.registered process")
Parent: Span A ServiceB->>ServiceB: Simulate sending email Note over ServiceB: End Span B deactivate ServiceB ServiceA->>Zipkin: Report Span A ServiceB->>Zipkin: Report Span B
The presented solution, built via TDD, provides a robust foundation for observability in a NATS-based architecture. However, it is not without limitations. The current TraceableNatsClient
is a simple wrapper; in a larger framework, this logic would be better encapsulated as middleware that could be applied more declaratively. Furthermore, while the W3C Trace Context is a strong standard, performance-critical systems might investigate binary propagation formats like B3, which can offer lower serialization overhead. The next logical iteration would be to extend this pattern to NATS JetStream, where message acknowledgements and retry logic introduce more complexity into the lifecycle of a consumer span, requiring careful state management to ensure spans accurately reflect the processing attempts of a message.