The system had grown beyond our mental models. What started as a handful of Node.js microservices had metastasized into a web of over fifty, and our ability to reason about its behavior during incidents was collapsing. Our existing observability stack—Loki for logs, OpenSearch for traces—was effective for isolated debugging but offered no holistic view. A cascading failure originating from a downstream dependency could take hours to untangle, tracing trace_id
s across a dozen Kibana tabs. The fundamental pain point was our inability to answer a simple question quickly: “What does our service topology actually look like right now, and where are the performance hotspots?”
Our initial concept was to leverage the OpenTelemetry traces we were already collecting. Every parent-child span relationship within a trace implicitly defines a directed edge between two services. If we could process this stream of traces in near-real-time, we could construct a living graph of our architecture. This wasn’t about static, manually drawn diagrams; this was about automated, data-driven discovery.
The technology selection process was driven by pragmatism, not hype. For the graph itself, a relational database felt wrong. Modeling transitive dependencies would involve painful recursive queries. We evaluated several graph databases, ultimately settling on Dgraph. Its GraphQL-native API was a major draw for our frontend and backend teams, and its performance characteristics for the kind of deep traversal queries we anticipated (e.g., “find all services three hops away from the api-gateway
“) were superior in our benchmarks. A common mistake here is to shoehorn graph-shaped problems into relational models; we chose to use the right tool for the job.
We made a conscious decision to keep OpenSearch. Dgraph would store the topological structure—the nodes (services) and edges (calls)—but not the high-cardinality raw trace data. OpenSearch excels at searching and aggregating this raw data. Our architecture would therefore be dual-store: OpenSearch as the source of truth for trace events, Dgraph as the derived, queryable model of the service graph.
The front-end and API layer was straightforward. Our teams are fluent in Node.js, so an Express.js server was the path of least resistance. It would serve a simple UI and provide an API to query the graph data from Dgraph.
The most contentious decision was visualization. The team wanted to overlay performance metrics, like p99 latency, as a heatmap onto the service graph. The obvious path was a complex client-side library like D3.js. However, this represented a significant development effort we couldn’t afford. A senior data engineer proposed a contrarian solution: a small Python script using networkx
for graph layout and seaborn
for plotting. The Express.js backend could invoke this script on-demand to generate a high-quality, information-dense static PNG. It’s not interactive, but it delivers the core value—a visual hotspot analysis—with a fraction of the implementation cost. In a real-world project, this kind of trade-off, sacrificing glamour for velocity and utility, is critical.
Loki’s role was not diminished. It remained our primary interface for log exploration. The dashboard we planned would integrate with it by providing deep links from each service node in the graph directly to a pre-filtered Loki query, bridging the gap between the high-level topological view and the low-level log data.
The overall data flow looks like this:
graph TD subgraph Microservices A[Express.js Service A] --> B[Express.js Service B] B --> C[Express.js Service C] end subgraph Data Collection A -- OTel Traces --> OTelCollector B -- OTel Traces --> OTelCollector C -- OTel Traces --> OTelCollector A -- Structured Logs --> Promtail B -- Structured Logs --> Promtail C -- Structured Logs --> Promtail end subgraph Observability Backend OTelCollector --> OpenSearch[(OpenSearch)] Promtail --> Loki[(Loki)] end subgraph Topology Processor Processor[Trace Processor Service] -- Polls Traces --> OpenSearch Processor -- Writes Graph Data --> Dgraph[(Dgraph)] end subgraph Dashboard DashboardAPI[Express.js Dashboard API] -- Queries Graph --> Dgraph DashboardAPI -- Calls Script --> PythonViz[Seaborn Script] PythonViz -- Queries Metrics --> OpenSearch User[User] --> DashboardAPI DashboardAPI -- Deep Links --> Loki end
Phase 1: Instrumenting the Foundation
Everything starts with good telemetry. Our Express.js services needed to be consistently instrumented to produce the structured logs and traces our system would depend on. We created a standard instrumentation wrapper to be included in all new services.
Here is a simplified instrumentation.js
file using OpenTelemetry. The key is configuring the exporter to send data to an OTel Collector and ensuring the logger includes trace context.
// instrumentation.js
const opentelemetry = require('@opentelemetry/api');
const { NodeSDK } = require('@opentelemetry/sdk-node');
const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node');
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-grpc');
const { PinoInstrumentation } = require('@opentelemetry/instrumentation-pino');
// Configure the OTLP exporter to point to your OTel Collector
const traceExporter = new OTLPTraceExporter({
// url: 'grpc://otel-collector:4317', // In production, use your collector's address
});
const sdk = new NodeSDK({
traceExporter,
instrumentations: [
getNodeAutoInstrumentations({
// We disable fs instrumentation as it's too noisy for our use case
'@opentelemetry/instrumentation-fs': {
enabled: false,
},
}),
// Ensure logs are correlated with traces
new PinoInstrumentation(),
],
// The service name is crucial for identifying nodes in our graph
serviceName: process.env.SERVICE_NAME || 'unknown-service',
});
// Graceful shutdown
process.on('SIGTERM', () => {
sdk.shutdown()
.then(() => console.log('Tracing terminated'))
.catch((error) => console.log('Error terminating tracing', error))
.finally(() => process.exit(0));
});
try {
sdk.start();
console.log('OpenTelemetry SDK started successfully.');
} catch (error) {
console.error('Error starting OpenTelemetry SDK:', error);
}
// Helper to get trace context for manual logging
const getTraceContext = () => {
const span = opentelemetry.trace.getSpan(opentelemetry.context.active());
if (!span) {
return {};
}
const spanContext = span.spanContext();
return {
trace_id: spanContext.traceId,
span_id: spanContext.spanId,
};
};
module.exports = { getTraceContext };
And a sample server.js
for an Express service showing how this is used alongside a structured logger like Pino, which outputs to stdout
for Promtail to collect.
// server.js for 'user-service'
// This must be required before any other module
require('./instrumentation');
const express = require('express');
const axios = require('axios');
const pino = require('pino');
const { getTraceContext } = require('./instrumentation');
const app = express();
const PORT = process.env.PORT || 3001;
const AUTH_SERVICE_URL = process.env.AUTH_SERVICE_URL || 'http://localhost:3002';
// Standard pino logger. It will be auto-instrumented to add trace context.
const logger = pino({ level: 'info' });
app.get('/user/:id', async (req, res) => {
const userId = req.params.id;
logger.info({ userId }, `Fetching user data for user ${userId}`);
try {
// This outgoing HTTP call will be automatically traced by OpenTelemetry
const authResponse = await axios.get(`${AUTH_SERVICE_URL}/auth/check/${userId}`, {
headers: { 'Content-Type': 'application/json' }
});
if (authResponse.data.authorized) {
logger.info({ userId }, 'Authorization successful');
res.status(200).json({ id: userId, name: `User ${userId}`, email: `user${userId}@example.com` });
} else {
logger.warn({ userId }, 'Authorization failed');
res.status(403).json({ error: 'Forbidden' });
}
} catch (error) {
// Log the error with trace context for better debugging
const traceContext = getTraceContext();
logger.error({ err: error.message, ...traceContext }, `Error processing request for user ${userId}`);
res.status(500).json({ error: 'Internal Server Error' });
}
});
// Basic health check
app.get('/health', (req, res) => {
res.status(200).send('OK');
});
// A simple error handling middleware
app.use((err, req, res, next) => {
const traceContext = getTraceContext();
logger.error({ err, ...traceContext }, 'An unhandled error occurred');
res.status(500).send('Something broke!');
});
app.listen(PORT, () => {
logger.info(`User service listening on port ${PORT}`);
});
With this setup, every log line related to a request has a trace_id
, and every HTTP call between services generates a span that OpenSearch will receive. This is the raw material for our graph.
Phase 2: The Trace Processor
This is the heart of the system. A standalone Node.js application that polls OpenSearch for recent traces, parses them to find service-to-service calls, and then writes this dependency information to Dgraph. A common pitfall is processing each span individually; a much more robust approach is to query for completed traces and process all spans of a trace together.
// processor/index.js
const { Client } = require('@opensearch-project/opensearch');
const dgraph = require('dgraph-js-http');
const OPENSEARCH_HOST = process.env.OPENSEARCH_HOST || 'http://localhost:9200';
const DGRAPH_HOST = process.env.DGRAPH_HOST || 'http://localhost:8080';
const POLLING_INTERVAL_MS = 30000; // 30 seconds
const LOOKBACK_MINUTES = 5;
const osClient = new Client({ node: OPENSEARCH_HOST });
const dgraphClient = new dgraph.DgraphClient(
new dgraph.DgraphClientStub(DGRAPH_HOST)
);
async function pollAndProcessTraces() {
console.log('Polling OpenSearch for new traces...');
try {
const now = new Date();
const lookback = new Date(now.getTime() - LOOKBACK_MINUTES * 60 * 1000);
const response = await osClient.search({
index: 'otel-v1-apm-span-*', // Adjust to your index pattern
body: {
size: 10000, // Process up to 10k spans per run
query: {
range: {
startTime: {
gte: lookback.toISOString(),
lt: now.toISOString(),
},
},
},
// Sort to process traces in chronological order
sort: [{ "startTime": "asc" }]
},
});
const spans = response.body.hits.hits.map(hit => hit._source);
if (spans.length === 0) {
console.log('No new spans found.');
return;
}
const traces = groupSpansByTraceId(spans);
console.log(`Found ${spans.length} spans across ${Object.keys(traces).length} traces.`);
const edges = extractEdgesFromTraces(traces);
if (edges.size > 0) {
await upsertEdgesToDgraph(edges);
}
} catch (error) {
console.error('Error during trace processing:', error.meta ? error.meta.body : error);
}
}
function groupSpansByTraceId(spans) {
return spans.reduce((acc, span) => {
const traceId = span.traceId;
if (!acc[traceId]) {
acc[traceId] = [];
}
acc[traceId].push(span);
return acc;
}, {});
}
function extractEdgesFromTraces(traces) {
const edges = new Map();
for (const traceId in traces) {
const spans = traces[traceId];
const spanMap = new Map(spans.map(s => [s.spanId, s]));
for (const span of spans) {
// We are interested in SERVER spans, as they represent a service receiving a call.
// The parent of a SERVER span is typically a CLIENT span from the calling service.
if (span.kind !== 'SPAN_KIND_SERVER' || !span.parentSpanId) {
continue;
}
const parentSpan = spanMap.get(span.parentSpanId);
if (!parentSpan || parentSpan.kind !== 'SPAN_KIND_CLIENT') {
continue;
}
// Extract service names from resource attributes
const sourceService = parentSpan.resource.attributes['service.name'];
const destinationService = span.resource.attributes['service.name'];
if (sourceService && destinationService && sourceService !== destinationService) {
const edgeKey = `${sourceService}->${destinationService}`;
if (!edges.has(edgeKey)) {
edges.set(edgeKey, { source: sourceService, destination: destinationService, count: 0 });
}
edges.get(edgeKey).count++;
}
}
}
return edges;
}
async function upsertEdgesToDgraph(edges) {
console.log(`Upserting ${edges.size} edges to Dgraph.`);
const mutations = [];
for (const [key, edge] of edges.entries()) {
const { source, destination, count } = edge;
// Define the query to find existing nodes and edge
const query = `
query {
source as var(func: eq(service_name, "${source}"))
destination as var(func: eq(service_name, "${destination}"))
call as var(func: uid(source)) @filter(uid_in(calls, uid(destination)))
}
`;
// This mutation block does a few things:
// 1. Creates the source service node if it doesn't exist (using `service_name` as an upsert key).
// 2. Creates the destination service node if it doesn't exist.
// 3. Creates an edge `calls` from source to destination.
// 4. Increments the `call_count` on that specific edge. A common mistake is to just set the count,
// which would overwrite previous runs. Incrementing is key.
const mutation = {
'uid': 'uid(source)',
'dgraph.type': 'Service',
'service_name': source,
'calls': {
'uid': 'uid(destination)',
'dgraph.type': 'Service',
'service_name': destination,
'call_count|uid(call)': `val(call_count)`, // Preserve existing count
'call_count|+': count, // Increment by new count
}
};
mutations.push(mutation);
}
const txn = dgraphClient.newTxn();
try {
const req = new dgraph.Request();
req.setQuery(query);
req.setMutations(mutations.map(mu => ({ setJson: mu })));
req.setCommitNow(true);
await txn.doRequest(req);
console.log('Successfully upserted data to Dgraph.');
} catch (err) {
console.error('Error during Dgraph transaction:', err);
} finally {
await txn.discard();
}
}
// Initial run and then set interval
pollAndProcessTraces();
setInterval(pollAndProcessTraces, POLLING_INTERVAL_MS);
Phase 3: Dgraph Schema and Data Model
Before the processor can write data, we need a schema in Dgraph. We opted for a simple model. A Service
type with a service_name
(which we’ll use for upserts) and an edge called calls
that points to other Service
nodes. The edge itself has a property, call_count
.
Dgraph schema (schema.graphql
):
# Dgraph Schema
type Service {
service_name: String! @id
calls: [Service] @hasInverse(field: "called_by")
called_by: [Service]
}
# The directive below tells Dgraph to allow predicates not strictly in the schema.
# We use this for the edge property 'call_count'.
# Dgraph.Authorization: {
# VerificationKey: "...",
# Namespace: 0x...,
# }
You apply this schema to your Dgraph instance. The processor’s upsert
logic leverages Dgraph’s conditional mutation capabilities to create nodes only if they don’t exist and to increment the call_count
on the edge atomically.
Phase 4: The Dashboard API Layer
The Express.js API server has two primary responsibilities: querying Dgraph for the topology and orchestrating the on-demand visualization generation.
// dashboard-api/server.js
const express = require('express');
const { spawn } = require('child_process');
const dgraph = require('dgraph-js-http');
const app = express();
const PORT = process.env.PORT || 4000;
const DGRAPH_HOST = process.env.DGRAPH_HOST || 'http://localhost:8080';
const LOKI_HOST = process.env.LOKI_HOST || 'http://localhost:3100';
const dgraphClient = new dgraph.DgraphClient(
new dgraph.DgraphClientStub(DGRAPH_HOST)
);
// Endpoint to fetch the entire service topology graph
app.get('/api/topology', async (req, res) => {
const query = `
query allServices {
services(func: has(service_name)) {
uid
service_name
calls @facets(call_count: call_count) {
uid
service_name
}
}
}
`;
try {
const dgraphRes = await dgraphClient.newTxn({ readOnly: true }).query(query);
const data = dgraphRes.data;
// Add deep link to Loki for log exploration
data.services.forEach(service => {
const lokiQuery = `_selector: '{job="${service.service_name}"}'`;
service.loki_url = `${LOKI_HOST}/explore?q=${encodeURIComponent(lokiQuery)}`;
});
res.json(data);
} catch (error) {
console.error("Error querying Dgraph:", error);
res.status(500).json({ error: 'Failed to fetch topology from Dgraph' });
}
});
// Endpoint to generate and stream a visualization
app.get('/api/visualize/latency-heatmap', (req, res) => {
const timeRange = req.query.range || '1h'; // default to 1 hour
console.log(`Generating latency heatmap for time range: ${timeRange}`);
const pythonProcess = spawn('python3', ['./visualizer/main.py', timeRange]);
// The Python script will write the PNG image data to its stdout
res.setHeader('Content-Type', 'image/png');
pythonProcess.stdout.pipe(res);
// Error handling is crucial for spawned processes
pythonProcess.stderr.on('data', (data) => {
console.error(`[Python Visualizer STDERR]: ${data}`);
});
pythonProcess.on('close', (code) => {
if (code !== 0) {
console.error(`Python visualizer script exited with code ${code}`);
if (!res.headersSent) {
res.status(500).send('Failed to generate visualization');
}
}
console.log('Visualization generated successfully.');
});
});
app.listen(PORT, () => {
console.log(`Dashboard API server listening on port ${PORT}`);
});
The pitfall to avoid here is blocking the Node.js event loop. By using spawn
and piping the output, we stream the image data efficiently without buffering the entire file in memory.
Phase 5: The Python Visualization Core
This script is the pragmatic core of our visualization strategy. It connects to both Dgraph (for topology) and OpenSearch (for performance metrics), builds a graph with networkx
, and plots it with matplotlib
and seaborn
.
# visualizer/main.py
import sys
import os
import json
import networkx as nx
import matplotlib.pyplot as plt
import seaborn as sns
from opensearchpy import OpenSearch
from pydgraph import DgraphClient, DgraphClientStub
from datetime import datetime, timedelta
import io
OPENSEARCH_HOST = os.getenv('OPENSEARCH_HOST', 'http://localhost:9200')
DGRAPH_HOST = os.getenv('DGRAPH_HOST', 'http://localhost:8080')
def get_topology_from_dgraph(client):
query = """
query {
services(func: has(service_name)) {
source: service_name
calls {
destination: service_name
}
}
}
"""
res = client.txn(read_only=True).query(query)
data = json.loads(res.json)
edges = []
for service in data.get('services', []):
source = service.get('source')
for call in service.get('calls', []):
destination = call.get('destination')
if source and destination:
edges.append((source, destination))
return edges
def get_latency_p99_from_opensearch(client, source, destination, time_range):
# This is a simplified query; a production one would be more robust.
# It calculates the p99 latency for spans where a client (source)
# calls a server (destination).
now = datetime.utcnow()
# Simple time range parsing, e.g., '1h', '6h'
hours = int(time_range.replace('h', ''))
start_time = now - timedelta(hours=hours)
query = {
"size": 0,
"query": {
"bool": {
"must": [
{"term": {"resource.attributes.service.name": destination}},
{"term": {"kind": "SPAN_KIND_SERVER"}},
{"exists": {"field": "parentSpanId"}},
{
"range": {
"startTime": {
"gte": start_time.isoformat() + "Z",
"lt": now.isoformat() + "Z"
}
}
}
]
}
},
"aggs": {
"latency_percentiles": {
"percentiles": {
"field": "durationInNanos",
"percents": [99]
}
}
}
}
# A real implementation would need to correlate parent spans to confirm the source.
# This is simplified for brevity but highlights the concept.
try:
response = client.search(index="otel-v1-apm-span-*", body=query)
p99_value_ns = response['aggregations']['latency_percentiles']['values']['99.0']
return p99_value_ns / 1_000_000 if p99_value_ns else 0 # convert to ms
except Exception:
return 0 # Default to 0 if no data or error
def main(time_range):
# 1. Fetch topology from Dgraph
dgraph_stub = DgraphClientStub(DGRAPH_HOST)
dgraph_client = DgraphClient(dgraph_stub)
edges = get_topology_from_dgraph(dgraph_client)
if not edges:
# Handle empty graph case
fig, ax = plt.subplots(figsize=(10, 8))
ax.text(0.5, 0.5, 'No service topology data available.', ha='center', va='center')
ax.set_axis_off()
fig.savefig(sys.stdout.buffer, format='png')
return
# 2. Fetch performance metrics from OpenSearch
os_client = OpenSearch(OPENSEARCH_HOST)
edge_latencies = {}
for source, dest in edges:
p99 = get_latency_p99_from_opensearch(os_client, source, dest, time_range)
edge_latencies[(source, dest)] = p99
# 3. Build and draw the graph
G = nx.DiGraph()
G.add_edges_from(edges)
pos = nx.spring_layout(G, k=0.5, iterations=50)
plt.figure(figsize=(16, 12))
latencies = [edge_latencies.get(edge, 0) for edge in G.edges()]
# Use seaborn for a nice color palette
cmap = sns.color_palette("coolwarm", as_cmap=True)
nodes = nx.draw_networkx_nodes(G, pos, node_size=3000, node_color="#3498db")
edges = nx.draw_networkx_edges(G, pos, arrowstyle="->",
arrowsize=20,
edge_color=latencies,
edge_cmap=cmap,
width=2)
labels = nx.draw_networkx_labels(G, pos, font_size=10, font_color="white")
# Add a color bar for the latency heatmap
sm = plt.cm.ScalarMappable(cmap=cmap, norm=plt.Normalize(vmin=min(latencies), vmax=max(latencies) or 1))
sm.set_array([])
cbar = plt.colorbar(sm)
cbar.set_label('p99 Latency (ms)', rotation=270, labelpad=20)
plt.title(f'Service Dependency Graph with p99 Latency Heatmap ({time_range})')
plt.axis('off')
# 4. Write image to stdout
plt.savefig(sys.stdout.buffer, format='png', bbox_inches='tight')
dgraph_stub.close()
if __name__ == '__main__':
time_range_arg = sys.argv[1] if len(sys.argv) > 1 else '1h'
main(time_range_arg)
This system provides an immediate, high-level view of our architecture’s health. When an alert fires for the user-service
, the on-call engineer no longer starts by guessing. Their first action is to view the generated heatmap. A bright red edge pointing to the auth-service
immediately narrows the investigation, and a single click on the auth-service
node takes them directly to the relevant logs in Loki.
The solution is not without its limitations. The on-demand visualization can be slow if the graph is large or the OpenSearch query is complex. A more robust implementation might cache the generated images or pre-compute them on a schedule. The service discovery mechanism is also inherently reactive; it only knows about services and dependencies that have recently handled traffic. Furthermore, the correlation between client and server spans in the OpenSearch query is simplified; a production-grade version would require a more sophisticated aggregation to accurately attribute latency to a specific call signature. These are not deal-breakers, but rather avenues for future iteration as the platform matures.