Implementing a High-Throughput Distributed Tracing and Metrics Pipeline Using Spring Boot, Consul, and ClickHouse


Our existing observability stack, built on Prometheus and Jaeger, started to buckle under the load of a rapidly expanding microservices architecture. The core of the problem was high-cardinality. Tagging metrics with user_id, tenant_id, or order_id caused a cardinality explosion in Prometheus, leading to severe performance degradation and memory pressure. Similarly, querying traces in Jaeger across millions of requests to pinpoint a single user’s slow experience was an exercise in frustration, often timing out. We required sub-second query latency on terabytes of trace and metric data, a capability our current tools simply couldn’t provide.

This technical pain point forced us to reconsider our approach. The initial concept was to build a custom pipeline, one designed from the ground up for high-cardinality and analytical query performance. The architecture would consist of three main components: a lightweight instrumentation library embedded in our Spring Boot services, a stateless aggregator tier for batching and processing, and a high-performance analytical database as the storage backend.

The technology selection process was rigorous. For the storage layer, ClickHouse was the obvious front-runner. Its columnar storage engine is purpose-built for large-scale analytical queries (OLAP), making it exceptionally fast for the kind of slicing and dicing we needed. Its native support for complex data types like nested structures and maps was perfect for storing trace metadata. Our services are standardized on the Spring Framework, so leveraging Spring Boot for the instrumentation and aggregator components was a natural choice. It provided a rich ecosystem with AOP for transparent instrumentation and mature libraries for building robust services. Finally, for service discovery, we already had a production-hardened Consul cluster. Using Consul would allow our instrumentation agents to dynamically discover the aggregator instances, making the entire observability platform as resilient and scalable as the microservices it monitored.

ClickHouse Schema: The Foundation of Performance

Before a single line of application code was written, we focused on the most critical piece: the ClickHouse schema. A poor schema design would negate all other efforts. In a real-world project, this is where you win or lose the performance battle. The pitfall here is treating ClickHouse like a relational database; its power lies in its MergeTree engine family and understanding how the ORDER BY clause physically sorts data on disk.

Our primary table, traces.spans, was designed to store raw span data.

CREATE TABLE traces.spans (
    `timestamp` DateTime64(9, 'UTC'),
    `trace_id` UUID,
    `span_id` UInt64,
    `parent_span_id` UInt64,
    `service_name` LowCardinality(String),
    `operation_name` String,
    `duration_ms` Float64,
    `is_error` Bool,
    `tags` Map(String, String),
    `logs` Array(Tuple(ts DateTime64(9, 'UTC'), level LowCardinality(String), message String))
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (service_name, operation_name, is_error, timestamp, trace_id)
SETTINGS index_granularity = 8192;

Several key decisions were made here:

  1. PARTITION BY toYYYYMM(timestamp): Partitioning by month is a coarse but effective strategy. It allows ClickHouse to prune entire directories of data for queries that have a time range filter, drastically reducing the amount of data scanned.
  2. ORDER BY (service_name, operation_name, is_error, timestamp, trace_id): This is the most important line in the schema. ClickHouse physically sorts the data on disk according to this tuple. It creates a sparse primary key index based on these columns. This means queries that filter or group by service_name, operation_name, and is_error will be incredibly fast because ClickHouse can jump directly to the relevant data blocks. We put the lowest cardinality columns first to maximize compression and query efficiency. timestamp and trace_id are included to ensure uniqueness and provide good locality for trace reconstruction queries.
  3. LowCardinality(String): For columns like service_name and log.level, which have a limited set of distinct values, this dictionary-encoding type significantly reduces storage footprint and speeds up queries involving these fields. A common mistake is to use String for everything, which bloats storage and slows down aggregations.
  4. Map(String, String) for tags: This is how we handle high-cardinality data. Instead of creating a column for every potential tag, we store them in a flexible map. This prevents schema alterations and manages the cardinality explosion gracefully. Querying specific keys is efficient with functions like tags['customer_id'].
  5. DateTime64(9, 'UTC'): Using nanosecond precision is crucial for accurate duration calculations and ordering of events within a trace. Storing in UTC is a non-negotiable best practice for distributed systems.

Spring Boot Instrumentation with AOP

With the schema defined, we built a custom Spring Boot starter to handle instrumentation. The goal was to make tracing transparent to developers. We chose AspectJ over Spring AOP for its ability to weave into method executions, which is more powerful and performant for cross-cutting concerns like this.

The core of the instrumentation is an aspect that intercepts key communication points.

// src/main/java/com/mycompany/tracing/instrument/TracingAspect.java
package com.mycompany.tracing.instrument;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Aspect
@Component
public class TracingAspect {

    private static final Logger log = LoggerFactory.getLogger(TracingAspect.class);
    private final SpanManager spanManager;
    private final SpanHttpHeaderPropagator headerPropagator;

    public TracingAspect(SpanManager spanManager, SpanHttpHeaderPropagator headerPropagator) {
        this.spanManager = spanManager;
        this.headerPropagator = headerPropagator;
    }

    @Pointcut("@within(org.springframework.web.bind.annotation.RestController)")
    public void restController() {}

    @Pointcut("execution(* org.springframework.cloud.openfeign.FeignClient+.*(..))")
    public void feignClient() {}

    @Around("restController() || feignClient()")
    public Object trace(ProceedingJoinPoint joinPoint) throws Throwable {
        // In a real implementation, we'd extract context from headers for incoming requests
        // For simplicity here, we start a new trace if none exists.
        Span span = spanManager.startSpan(joinPoint.getSignature().toShortString());
        
        try (SpanManager.Scope scope = spanManager.activateSpan(span)) {
            // For outgoing Feign calls, inject trace context into headers
            if (isFeignClient(joinPoint)) {
                 headerPropagator.inject(span.getContext(), /* ... get request headers object */);
            }
            
            return joinPoint.proceed();
        } catch (Throwable throwable) {
            span.setError(true);
            span.addTag("error.message", throwable.getMessage());
            throw throwable;
        } finally {
            span.finish();
            log.trace("Finished and reported span: {}", span.getSpanId());
        }
    }
    
    private boolean isFeignClient(ProceedingJoinPoint joinPoint) {
        // Logic to determine if the join point is a Feign client execution
        return joinPoint.getSignature().getDeclaringType().isInterface();
    }
}

The SpanManager is a critical component responsible for creating, activating (via ThreadLocal), and reporting spans to a buffered transport layer.

// src/main/java/com/mycompany/tracing/instrument/SpanManager.java
package com.mycompany.tracing.instrument;

import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

public class SpanManager {

    private static final ThreadLocal<Span> ACTIVE_SPAN = new ThreadLocal<>();
    private final SpanReporter spanReporter;
    private final String serviceName;
    
    // Injected via Spring configuration
    public SpanManager(SpanReporter spanReporter, String serviceName) {
        this.spanReporter = spanReporter;
        this.serviceName = serviceName;
    }

    public Span startSpan(String operationName) {
        Span activeSpan = ACTIVE_SPAN.get();
        Span newSpan;
        if (activeSpan != null) {
            // Create a child span
            newSpan = new Span(
                activeSpan.getContext().getTraceId(),
                generateSpanId(),
                activeSpan.getContext().getSpanId(), // parentId
                serviceName,
                operationName
            );
        } else {
            // Create a new root span for a new trace
            newSpan = new Span(
                UUID.randomUUID(),
                generateSpanId(),
                0L, // No parent
                serviceName,
                operationName
            );
        }
        return newSpan;
    }

    public Scope activateSpan(Span span) {
        ACTIVE_SPAN.set(span);
        return new Scope(span);
    }

    private void deactivate(Span span) {
        if (ACTIVE_SPAN.get() == span) {
            ACTIVE_SPAN.remove();
        }
    }

    private long generateSpanId() {
        return ThreadLocalRandom.current().nextLong(1, Long.MAX_VALUE);
    }
    
    // A simple Scope implementation for try-with-resources
    public class Scope implements AutoCloseable {
        private final Span spanToDeactivate;

        private Scope(Span span) {
            this.spanToDeactivate = span;
        }

        @Override
        public void close() {
            spanToDeactivate.finish(); // Mark span as complete
            spanReporter.report(spanToDeactivate); // Send to the background queue
            deactivate(spanToDeactivate);
        }
    }
}

The SpanReporter doesn’t send spans directly over the network. That would be inefficient and add latency to the request thread. Instead, it adds them to a non-blocking queue, and a background thread handles batching and sending.

Dynamic Aggregator Discovery via Consul

Our instrumentation agent, now embedded in every microservice, needs to know where to send its batches of spans. Hardcoding aggregator IPs is not an option in a dynamic, cloud-native environment. This is where Consul comes in.

The aggregator services register themselves with Consul using a specific service ID, say observability-aggregator. The agent in each microservice then uses Spring Cloud’s DiscoveryClient to query Consul for healthy instances of this service.

Here’s the configuration in the microservice’s application.yml:

spring:
  application:
    name: my-cool-service
  cloud:
    consul:
      host: consul.my-company.internal
      port: 8500
      discovery:
        service-name: ${spring.application.name}
        instance-id: ${spring.application.name}:${spring.cloud.client.ip-address}:${server.port}
        # We need this service to be discoverable itself
        register: true
        # We also use Consul for config, but discovery is key here
        
# Custom properties for our tracing starter
tracing:
  service-name: ${spring.application.name}
  aggregator:
    service-id: "observability-aggregator"

The component responsible for finding and sending data to an aggregator instance:

// src/main/java/com/mycompany/tracing/transport/ConsulAwareHttpSpanTransport.java
package com.mycompany.tracing.transport;

import com.mycompany.tracing.instrument.Span;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.client.RestTemplate;
import java.net.URI;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class ConsulAwareHttpSpanTransport implements SpanTransport {
    
    private final DiscoveryClient discoveryClient;
    private final RestTemplate restTemplate;
    private final String aggregatorServiceId;
    private final AtomicInteger counter = new AtomicInteger(0);

    public ConsulAwareHttpSpanTransport(DiscoveryClient discoveryClient, RestTemplate restTemplate, String aggregatorServiceId) {
        this.discoveryClient = discoveryClient;
        this.restTemplate = restTemplate;
        this.aggregatorServiceId = aggregatorServiceId;
    }

    @Override
    public void send(List<Span> spans) {
        URI aggregatorUri = selectAggregatorInstance();
        if (aggregatorUri == null) {
            // A real-world project would have robust error handling:
            // log, increment a metric, or drop the spans.
            System.err.println("No healthy aggregator instance found in Consul.");
            return;
        }

        try {
            // The endpoint on the aggregator would be something like /v1/spans
            restTemplate.postForEntity(aggregatorUri.toString() + "/v1/spans", spans, Void.class);
        } catch (Exception e) {
            // Similarly, handle network errors, retries with backoff, etc.
            System.err.println("Failed to send spans to " + aggregatorUri + ": " + e.getMessage());
        }
    }

    private URI selectAggregatorInstance() {
        List<ServiceInstance> instances = discoveryClient.getInstances(aggregatorServiceId);
        if (instances == null || instances.isEmpty()) {
            return null;
        }
        // Simple round-robin for load balancing. A more advanced strategy
        // could use a library like Resilience4j or Spring Cloud LoadBalancer.
        int index = counter.getAndIncrement() % instances.size();
        return instances.get(index).getUri();
    }
}

This setup makes our transport layer resilient. If an aggregator instance fails, Consul’s health checks will mark it as unhealthy, and the DiscoveryClient will stop routing traffic to it automatically. We can scale the aggregator tier up or down, and the clients will adapt without any configuration changes.

The Aggregator Service: Buffering and Batching

The aggregator is a simple Spring Boot application whose sole purpose is to receive spans, batch them efficiently, and write them to ClickHouse. A common mistake here is to write to the database one row at a time. This is catastrophic for ClickHouse’s performance, as it creates many small data parts, leading to high merge overhead. Batching is mandatory.

sequenceDiagram
    participant MS as Microservice Agent
    participant Aggregator
    participant ClickHouse

    MS->>+Aggregator: POST /v1/spans (Batch of Spans)
    Aggregator-->>-MS: 202 Accepted
    Aggregator->>Aggregator: Add spans to internal queue (e.g., Disruptor)
    loop Background Writer Thread
        Aggregator->>Aggregator: Dequeue spans, form large batch
        Aggregator->>+ClickHouse: INSERT INTO traces.spans VALUES (...)
        ClickHouse-->>-Aggregator: Acknowledgment
    end

The core logic resides in a background service that consumes from a queue.

// src/main/java/com/mycompany/aggregator/service/ClickHouseWriterService.java
package com.mycompany.aggregator.service;

import com.mycompany.aggregator.model.SpanDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Service
public class ClickHouseWriterService {
    private static final Logger log = LoggerFactory.getLogger(ClickHouseWriterService.class);
    private static final int BATCH_SIZE = 10000;
    private static final String INSERT_SQL = "INSERT INTO traces.spans (timestamp, trace_id, span_id, parent_span_id, service_name, operation_name, duration_ms, is_error, tags, logs) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
    
    private final BlockingQueue<SpanDto> spanQueue = new LinkedBlockingQueue<>(1_000_000);
    private final JdbcTemplate jdbcTemplate;

    public ClickHouseWriterService(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    public void accept(List<SpanDto> spans) {
        // This is called by the REST controller.
        // In a high-throughput system, check if queue is full and apply backpressure.
        spanQueue.addAll(spans);
    }
    
    @Scheduled(fixedDelayString = "${aggregator.writer.interval-ms:1000}")
    public void processQueue() {
        if (spanQueue.isEmpty()) {
            return;
        }

        List<SpanDto> batch = new ArrayList<>(BATCH_SIZE);
        spanQueue.drainTo(batch, BATCH_SIZE);

        if (batch.isEmpty()) {
            return;
        }

        log.info("Writing batch of {} spans to ClickHouse.", batch.size());
        
        try {
            jdbcTemplate.batchUpdate(INSERT_SQL, batch, BATCH_SIZE,
                (PreparedStatement ps, SpanDto span) -> {
                    ps.setTimestamp(1, Timestamp.from(Instant.ofEpochSecond(0, span.getTimestampNanos())));
                    ps.setObject(2, UUID.fromString(span.getTraceId()));
                    ps.setLong(3, span.getSpanId());
                    ps.setLong(4, span.getParentSpanId());
                    ps.setString(5, span.getServiceName());
                    ps.setString(6, span.getOperationName());
                    ps.setDouble(7, span.getDurationMs());
                    ps.setBoolean(8, span.isError());
                    
                    // The ClickHouse JDBC driver handles Maps and Arrays surprisingly well.
                    // However, serialization to string (e.g., JSON) might be more robust in some drivers.
                    // Here we assume native type support.
                    ps.setObject(9, span.getTags());
                    ps.setObject(10, span.getLogs()); // This part is tricky with JDBC and might require custom TypeHandlers or serialization.
                });
        } catch (Exception e) {
            log.error("Failed to write batch to ClickHouse. Spans will be lost.", e);
            // In a production system, failed batches should be retried or sent to a dead-letter queue.
        }
    }
}

This configuration allows the aggregator to ingest data very quickly while writing to the database in optimal, large batches. The fixedDelay scheduler ensures that even in periods of low traffic, data is flushed periodically.

Unleashing Analytical Power with ClickHouse Queries

With data flowing in, the true power of this system becomes apparent when we run queries. The performance is night and day compared to our previous solution.

Find all traces for a specific customer that resulted in an error:

SELECT
    trace_id,
    min(timestamp) as trace_start_time,
    max(timestamp) as trace_end_time,
    max(duration_ms) as max_span_duration
FROM traces.spans
WHERE
    tags['customer_id'] = 'customer-abc-123'
    AND is_error = true
    AND timestamp >= now() - interval 1 day
GROUP BY trace_id
ORDER BY trace_start_time DESC
LIMIT 100;

Calculate P95 and P99 latency for a specific API endpoint, grouped by service version (a high-cardinality tag):

SELECT
    tags['service.version'] as service_version,
    count(),
    quantile(0.95)(duration_ms) as p95,
    quantile(0.99)(duration_ms) as p99
FROM traces.spans
WHERE
    service_name = 'api-gateway'
    AND operation_name = 'POST /v1/orders'
    AND timestamp >= now() - interval 6 hour
GROUP BY service_version
ORDER BY service_version;

Reconstruct a full trace by trace_id:

SELECT
    timestamp,
    service_name,
    operation_name,
    duration_ms,
    is_error,
    parent_span_id,
    span_id,
    tags
FROM traces.spans
WHERE
    trace_id = '...' -- a specific trace_id
ORDER BY timestamp;

Because the data is ordered by timestamp and trace_id within the primary key, this final query is extremely fast, as ClickHouse reads a contiguous block of data from disk.

The current implementation provides a massive leap in our observability capabilities. However, it’s not without its limitations. The direct HTTP communication from service agent to aggregator introduces coupling; a more robust architecture would place a message broker like Kafka between them. This would buffer data during aggregator outages and provide better backpressure handling. Furthermore, our custom AspectJ-based instrumentation, while effective, creates a maintenance burden. Migrating the data collection to the OpenTelemetry standard, while keeping our custom ClickHouse backend, would be a logical next step, offering the best of both worlds: standardization at the edge and extreme performance at the core. Finally, operating a ClickHouse cluster at scale is a non-trivial task requiring dedicated expertise in cluster management, replication, and backup strategies, which must not be underestimated.


  TOC