The GraphQL endpoint for our primary metadata service became a performance bottleneck. The P99 latency was unacceptable, but traditional application-level tracing offered no clear answers. Traces showed time spent in “business logic,” a black box that involved complex data joins, serialization, and ultimately, interactions with the underlying storage and network subsystems. The root cause was invisible to the application runtime; it was buried in kernel-space activities like I/O contention, excessive context switching, or noisy neighbors on shared infrastructure. We needed a mechanism to correlate a specific GraphQL query with the syscalls and network packets it generated, without modifying or redeploying the application.
Our initial concept was to build a zero-instrumentation pipeline. The core idea was to use eBPF to tap into kernel events, enrich this firehose of data with application-level context (specifically, a GraphQL trace ID), and process it into a queryable format for incident analysis. This required stitching together several distinct technologies. For data capture, eBPF was the only viable choice due to its low overhead and programmability. For processing the immense volume of raw event data, Apache Spark’s structured streaming provided the necessary power for stateful correlation. For storing and analyzing the resulting structured telemetry, OpenSearch offered the required flexibility and search capabilities. Finally, to expose this correlated data to engineers, we decided to build a dedicated GraphQL API—an observability control plane for querying the observability data plane.
The architecture settled on the following flow:
- An eBPF program, loaded by a user-space agent, attaches to syscall tracepoints (
sys_enter_read
,sys_enter_write
, etc.) and TCP connection events. - The user-space agent reads events from the eBPF perf buffer. Crucially, it also tails application logs to find the mapping between a process/thread ID and our application’s trace ID.
- Both raw eBPF events and the trace ID mappings are published to separate Kafka topics.
- A Spark Structured Streaming job consumes both streams, performs a windowed stream-stream join to enrich eBPF events with the correct trace ID, aggregates the data, and writes the results to an OpenSearch index.
- A GraphQL server provides an API for SREs to execute complex queries against the OpenSearch index, allowing them to pivot from a trace ID to the underlying kernel activity.
graph TD subgraph Host Machine A[GraphQL Service] -- Writes log --> L[Log File with TraceID & PID] K[Kernel] -- Events --> BPF[eBPF Probes] BPF -- Perf Buffer --> C[Userspace Agent in Go] C -- Reads --> L end subgraph Data Pipeline C -- Publishes Events --> Kafka1[Kafka Topic: ebpf-raw-events] C -- Publishes Mappings --> Kafka2[Kafka Topic: trace-pid-mappings] Kafka1 --> Spark[Spark Structured Streaming Job] Kafka2 --> Spark Spark -- Writes Enriched Data --> OS[OpenSearch Cluster] end subgraph Query Layer SRE[SRE/Developer] -- GraphQL Query --> GQL[Observability GraphQL API] GQL -- OpenSearch DSL Query --> OS end
The most significant challenge was the correlation logic. An eBPF program running in the kernel is sandboxed and has no direct knowledge of a user-space trace ID. The link had to be established through something the kernel does know: the process ID (PID) and thread ID (TID). Our application was modified to log a single, structured line upon receiving a request: {"trace_id": "...", "pid": ..., "tid": ...}
. The user-space agent reads this and creates a short-lived mapping. The eBPF program, on every event, captures the current PID/TID. The final correlation happens downstream in Spark.
Phase 1: Kernel Data Collection with eBPF and Go
We used the cilium/ebpf
library in Go to manage the eBPF lifecycle. The eBPF program itself is written in restricted C. The goal is to capture file I/O syscalls and associate them with the process command name and PID/TID.
Here is the core eBPF C code (bpf_program.c
). It defines a perf event map to send data to user space and attaches to the sys_enter_write
tracepoint.
// bpf_program.c
#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
// Define a structure to hold event data sent to user space.
// In a real-world project, this would be much richer,
// including file descriptors, byte counts, etc.
struct event {
__u64 pid_tgid;
__u32 fd;
__u64 count;
char comm[16]; // TASK_COMM_LEN
};
// Define the perf event map. This is the channel from kernel to user space.
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__uint(key_size, sizeof(__u32));
__uint(value_size, sizeof(__u32));
__uint(max_entries, 1024);
} events SEC(".maps");
// SEC("tp/syscalls/sys_enter_write") is the attachment point.
// This function will be executed every time any process on the system
// enters the `write` syscall.
SEC("tracepoint/syscalls/sys_enter_write")
int tracepoint__sys_enter_write(struct trace_event_raw_sys_enter* ctx) {
struct event event = {};
// Get the process ID and thread group ID.
event.pid_tgid = bpf_get_current_pid_tgid();
// Get the command name of the current process.
bpf_get_current_comm(&event.comm, sizeof(event.comm));
// For simplicity, we only trace our target GraphQL service.
// A production system might filter based on cgroup or a map of PIDs.
char target_comm[] = "graphql-server";
for (int i = 0; i < sizeof(target_comm) - 1; ++i) {
if (event.comm[i] != target_comm[i]) {
return 0; // Not our target process, exit.
}
}
// Read syscall arguments from context. `ctx->args` is an array.
// For `write(int fd, const void *buf, size_t count)`:
// args[0] is fd, args[1] is buf, args[2] is count.
event.fd = (unsigned int)ctx->args[0];
event.count = (size_t)ctx->args[2];
// Submit the event to the perf buffer.
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &event, sizeof(event));
return 0;
}
char LICENSE[] SEC("license") = "GPL";
The user-space Go agent is responsible for loading this C code into the kernel, setting up the perf buffer reader, and forwarding events.
// agent/main.go
package main
import (
"bytes"
"encoding/binary"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/perf"
"golang.org/x/sys/unix"
)
// Event must match the C struct layout.
type Event struct {
PidTgid uint64
Fd uint32
Count uint64
Comm [16]byte
}
func main() {
stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)
// Increase rlimit memory lock. This is required for eBPF.
if err := unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{
Cur: unix.RLIM_INFINITY,
Max: unix.RLIM_INFINITY,
}); err != nil {
log.Fatalf("failed to set rlimit: %v", err)
}
// Load the compiled eBPF object file.
// In a production build, this would be embedded.
spec, err := ebpf.LoadCollectionSpec("bpf_program.o")
if err != nil {
log.Fatalf("could not load BPF spec: %v", err)
}
coll, err := ebpf.NewCollection(spec)
if err != nil {
log.Fatalf("could not create BPF collection: %v", err)
}
defer coll.Close()
// Attach the tracepoint.
tp, err := link.Tracepoint("syscalls", "sys_enter_write", coll.Programs["tracepoint__sys_enter_write"], nil)
if err != nil {
log.Fatalf("could not attach tracepoint: %v", err)
}
defer tp.Close()
log.Println("eBPF program loaded and attached. Waiting for events...")
// Open a perf event reader.
rd, err := perf.NewReader(coll.Maps["events"], os.Getpagesize()*64)
if err != nil {
log.Fatalf("creating perf event reader: %s", err)
}
defer rd.Close()
// Kafka Producer setup would go here.
// For this example, we log to stdout.
// producer := setupKafkaProducer()
go func() {
<-stopper
log.Println("Received stop signal, exiting.")
rd.Close()
}()
var event Event
for {
record, err := rd.Read()
if err != nil {
if perf.IsClosed(err) {
return
}
log.Printf("reading from perf reader: %s", err)
continue
}
if record.LostSamples > 0 {
log.Printf("perf buffer dropped %d samples", record.LostSamples)
continue
}
// Deserialize the raw data into our Go struct.
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Printf("parsing perf event: %s", err)
continue
}
comm := string(bytes.TrimRight(event.Comm[:], "\x00"))
pid := event.PidTgid >> 32
tid := uint32(event.PidTgid)
// In production, this would be a structured log or a Kafka message.
log.Printf("Syscall 'write': PID=%d, TID=%d, Comm='%s', FD=%d, Count=%d", pid, tid, comm, event.Fd, event.Count)
// message := createKafkaMessage(pid, tid, comm, event)
// producer.SendMessage(message)
}
}
A common mistake here is under-provisioning the perf buffer size (os.Getpagesize()*64
). If the user-space agent cannot read events fast enough, the kernel will start dropping them. Monitoring record.LostSamples
is critical for production health.
Phase 2: Data Correlation with Spark Structured Streaming
With raw events flowing into Kafka, the next task is to enrich them. We have two streams:
-
ebpf-raw-events
: JSON messages containing{ "pid": ..., "tid": ..., "syscall": "write", "timestamp": ... }
-
trace-pid-mappings
: JSON messages from the application log shipper:{ "trace_id": "...", "pid": ..., "tid": ..., "timestamp": ... }
The Spark job needs to join these two streams on pid
and tid
. Because there can be a delay between the application logging its trace mapping and the eBPF event occurring, a simple join is not enough. We need a stateful stream-stream join with a time window and watermarking to handle out-of-order events.
Here is a simplified Scala implementation for the Spark job.
// spark/src/main/scala/TraceCorrelator.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object TraceCorrelator {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("eBPF Trace Correlator")
.master("local[*]") // For local testing; use YARN/K8s in prod
.getOrCreate()
import spark.implicits._
// Define schemas for our Kafka topics
val ebpfEventSchema = new StructType()
.add("pid", LongType)
.add("tid", LongType)
.add("syscall", StringType)
.add("details", MapType(StringType, StringType))
.add("event_timestamp", TimestampType)
val traceMappingSchema = new StructType()
.add("trace_id", StringType)
.add("pid", LongType)
.add("tid", LongType)
.add("mapping_timestamp", TimestampType)
// Read from Kafka streams
val ebpfEventsDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "ebpf-raw-events")
.load()
.select(from_json($"value".cast("string"), ebpfEventSchema).as("data"))
.select("data.*")
.withWatermark("event_timestamp", "10 seconds") // Handle late events
val traceMappingsDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "trace-pid-mappings")
.load()
.select(from_json($"value".cast("string"), traceMappingSchema).as("data"))
.select("data.*")
.withWatermark("mapping_timestamp", "20 seconds") // Mappings can be slightly older
// The core of the logic: a stream-stream join
val joinCondition = expr(
"""
ebpf.pid = mapping.pid AND
ebpf.tid = mapping.tid AND
ebpf.event_timestamp >= mapping.mapping_timestamp AND
ebpf.event_timestamp <= mapping.mapping_timestamp + interval 5 minutes
"""
)
val enrichedEventsDF = ebpfEventsDF.as("ebpf")
.join(
traceMappingsDF.as("mapping"),
joinCondition,
"leftOuter" // Use left outer to not lose events if mapping is missed
)
.select(
coalesce($"mapping.trace_id", lit("unmapped")).as("trace_id"),
$"ebpf.*"
)
.withColumn("ingestion_timestamp", current_timestamp())
// Write the enriched stream to OpenSearch
val query = enrichedEventsDF.writeStream
.format("opensearch")
.option("es.nodes", "opensearch-node1")
.option("es.port", "9200")
.option("es.resource", "ebpf-traces-{trace_id}") // Dynamic index per trace
.option("es.mapping.id", "trace_id") // Use trace_id for document ID
.option("checkpointLocation", "/tmp/spark-checkpoints/trace-correlator")
.outputMode("append")
.start()
query.awaitTermination()
}
}
The pitfall here is the join condition and watermarking. ebpf.event_timestamp <= mapping.mapping_timestamp + interval 5 minutes
assumes a GraphQL request doesn’t last longer than 5 minutes. This needs to be tuned based on application behavior. If this window is too small, we miss correlations. If it’s too large, the state stored by Spark grows, increasing memory pressure and recovery time on failure. The leftOuter
join is a defensive measure; it ensures that even if we fail to find a trace_id
, the raw eBPF event is still indexed and not lost, just marked as “unmapped.”
Phase 3: Indexing and Querying
The data is now flowing into OpenSearch. A well-defined index mapping is not optional; it’s fundamental to query performance.
// opensearch/mapping.json
{
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"trace_id": { "type": "keyword" },
"pid": { "type": "long" },
"tid": { "type": "long" },
"syscall": { "type": "keyword" },
"event_timestamp": { "type": "date" },
"ingestion_timestamp": { "type": "date" },
"details": {
"type": "object",
"enabled": false
}
}
}
}
Notice details
is an object with enabled: false
. The raw details of every syscall can be voluminous and aren’t typically used for filtering, only for display. Disabling indexing on this field saves significant storage and indexing overhead. trace_id
and syscall
are keyword
fields, as they are used for exact matches and aggregations.
Finally, we build the GraphQL API for SREs. This server, written in Go, acts as a translator between GraphQL queries and OpenSearch DSL.
// gql-server/schema.graphqls
type Query {
trace(id: String!): Trace
syscalls(filter: SyscallFilter!): [SyscallEvent!]
}
type Trace {
id: String!
syscalls(limit: Int = 100): [SyscallEvent!]
networkConnections: [NetworkEvent!]
}
type SyscallEvent {
id: String!
timestamp: String!
pid: Int!
tid: Int!
syscall: String!
details: String! # JSON string
}
type NetworkEvent {
# ... similar structure for network events
}
input SyscallFilter {
traceID: String
pid: Int
startTime: String
endTime: String
syscallName: String
}
The resolver for the trace
query demonstrates the core logic. It receives a traceID
and fires off multiple queries to OpenSearch to assemble the complete picture.
// gql-server/resolvers.go
package main
import (
"context"
"encoding/json"
"fmt"
// opensearch client library
)
type Resolver struct {
osClient *opensearch.Client
}
type Trace struct {
ID string
}
// TraceResolver resolves the Trace type
func (r *Resolver) Trace(ctx context.Context, args struct{ ID string }) (*Trace, error) {
// Simple validation
if args.ID == "" {
return nil, fmt.Errorf("trace ID cannot be empty")
}
return &Trace{ID: args.ID}, nil
}
// Syscalls is a field resolver on the Trace type
func (r *Resolver) Syscalls(ctx context.Context, t *Trace, args struct{ Limit int }) ([]*SyscallEvent, error) {
if t == nil {
return nil, nil // Should not happen
}
// Construct an OpenSearch query
query := map[string]interface{}{
"size": args.Limit,
"query": map[string]interface{}{
"bool": map[string]interface{}{
"filter": []map[string]interface{}{
{"term": map[string]interface{}{"trace_id": t.ID}},
{"term": map[string]interface{}{"event_type": "syscall"}}, // Assuming we add this field
},
},
},
"sort": []map[string]interface{}{
{"event_timestamp": "asc"},
},
}
// This is a simplified representation of executing the query.
// A production implementation requires proper error handling, response parsing, etc.
res, err := r.osClient.Search(
r.osClient.Search.WithContext(ctx),
r.osClient.Search.WithIndex("ebpf-traces-*"),
r.osClient.Search.WithBody(buildQuery(query)),
)
if err != nil {
// Log the error
return nil, fmt.Errorf("failed to query opensearch for syscalls")
}
defer res.Body.Close()
// Parse the OpenSearch response into []*SyscallEvent
var events []*SyscallEvent
// ... parsing logic ...
return events, nil
}
// buildQuery is a helper to marshal the query map to JSON for the request body.
// ...
This resolver pattern allows engineers to ask questions like, “For trace abc-123
, show me the first 100 syscalls it made.” A more advanced resolver for networkConnections
would perform a similar query but filter for event_type: "net"
. The power of GraphQL here is allowing the client to request both syscalls and network data for a single trace in one round trip.
The primary limitation of this entire system is its latency. The path through Kafka and a Spark micro-batch introduces a delay of seconds to minutes. It is a forensics and deep-analysis tool, not a real-time monitoring system. Furthermore, the correlation based on PID/TID can be fragile. In environments with rapid process creation and destruction, or with complex threading models, mappings can be missed. Future iterations could explore using eBPF’s cgroup tracking capabilities for a more robust correlation context that survives process restarts. The operational cost is also non-trivial; maintaining Kafka, Spark, and OpenSearch clusters requires significant expertise. However, for debugging otherwise intractable performance problems in a complex distributed system, the visibility gained is invaluable.