A high-frequency C++ application began exhibiting sporadic but critical message processing delays. The application communicates with an ActiveMQ broker, and while our own application-level timers showed consistent sub-millisecond processing, end-to-end latency occasionally spiked into double-digit milliseconds. The root cause was elusive; logs were clean, CPU and memory usage were nominal, and the ActiveMQ broker itself reported healthy metrics. The latency was originating from a black box somewhere between the C++ client library invoking a send operation and the message hitting the wire, or vice-versa on the receive path. Traditional profilers were too coarse, and instrumenting the activemq-cpp
library or the kernel was not a viable option in our production environment. This situation demanded a non-intrusive, kernel-level observation tool.
Our initial concept was to build a pipeline to precisely measure the time spent within the kernel for network system calls originating from our specific C++ process. The core idea was to leverage eBPF to attach probes to tcp_sendmsg
and tcp_recvmsg
kernel functions. By capturing nanosecond-resolution timestamps at the entry and exit of these functions, we could calculate the exact duration of each socket I/O operation without altering a single line of the application code. This raw data, once collected, would be passed to an offline analysis script using Python with Pandas and Seaborn to visualize the latency distribution in a statistically meaningful way, helping us hunt down the long-tail latency events.
The technology selection was driven by this need for precision and non-invasiveness.
- eBPF with BCC (BPF Compiler Collection): We chose eBPF because it provides a safe and efficient way to run custom programs within the Linux kernel. BCC was selected for its Python bindings, which significantly accelerate development and prototyping. While a production-grade tool might eventually be rewritten in C/C++ with
libbpf
for minimal overhead, BCC is ideal for this kind of investigative engineering. - C++ with
activemq-cpp
: This represents the system under test. We needed to build a minimal but realistic client that could generate a sustained load of messages, mimicking the behavior of our production application. It needed proper connection handling, session management, and exception handling. - ActiveMQ: The incumbent message broker. Our focus is on the client-side interaction with it, not the broker itself. For this test, running it in a standard Docker container is sufficient.
- Python with Seaborn and Pandas: Standard tools for data analysis. Pandas is for data manipulation, and Seaborn excels at creating rich statistical visualizations like violin plots and distribution plots, which are far more insightful for latency analysis than simple averages or line charts.
The entire analysis pipeline can be visualized as follows:
graph TD subgraph System Under Test A[C++ ActiveMQ Client] -- Sends/Receives Data --> B{Network Socket}; end subgraph Kernel Space B -- syscall --> C[tcp_sendmsg / tcp_recvmsg]; D[eBPF Probe] -- kprobe --> C; D -- Records Timestamps --> E[BPF Perf Buffer]; end subgraph User Space Analysis F[BCC Python Script] -- Reads Events --> E; F -- Calculates Latency --> G[latency_data.csv]; H[Seaborn/Pandas Script] -- Reads --> G; H -- Generates Plots --> I[Latency Visualizations]; end A -- Monitored by --> F;
The System Under Test: A Resilient C++ ActiveMQ Client
First, we need a C++ application that reliably connects to ActiveMQ and produces/consumes messages at a steady rate. A common pitfall in creating such test harnesses is neglecting robust error handling and resource management, leading to a brittle tool that fails under load. This implementation uses RAII principles for resource cleanup and includes comprehensive exception handling.
The project structure requires activemq-cpp
and its dependencies. The CMakeLists.txt
file handles finding the necessary libraries.
CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(ActiveMQLatencyTester CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# Find ActiveMQ-CPP and its dependencies (APR)
find_package(PkgConfig REQUIRED)
pkg_check_modules(APR REQUIRED apr-1)
pkg_check_modules(APRUTIL REQUIRED apr-util-1)
find_path(ACTIVEMQ_CPP_INCLUDE_DIR activemq/library/ActiveMQCPP.h)
find_library(ACTIVEMQ_CPP_LIBRARY NAMES activemq-cpp)
if(NOT ACTIVEMQ_CPP_INCLUDE_DIR OR NOT ACTIVEMQ_CPP_LIBRARY)
message(FATAL_ERROR "ActiveMQ-CPP library or headers not found. Please ensure it is installed.")
endif()
include_directories(
${ACTIVEMQ_CPP_INCLUDE_DIR}
${APR_INCLUDE_DIRS}
${APRUTIL_INCLUDE_DIRS}
)
add_executable(latency_client main.cpp)
target_link_libraries(latency_client
${ACTIVEMQ_CPP_LIBRARY}
${APR_LIBRARIES}
${APRUTIL_LIBRARIES}
pthread
)
main.cpp
The C++ client is designed to run in one of two modes: producer
or consumer
. It establishes a connection, creates a session, and then enters a loop to send or receive messages. Logging is included to track its state, and std::chrono
is used for managing message rates.
#include <iostream>
#include <string>
#include <memory>
#include <thread>
#include <chrono>
#include <stdexcept>
#include <vector>
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
using namespace activemq::core;
using namespace cms;
using namespace std;
class SimpleExceptionListener : public ExceptionListener {
public:
virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
cerr << "CMS Exception occurred. Shutting down client." << endl;
// In a real application, you might attempt to reconnect.
exit(1);
}
};
void run_producer(const string& brokerURI, const string& queueName) {
unique_ptr<Connection> connection;
unique_ptr<Session> session;
unique_ptr<Destination> destination;
unique_ptr<MessageProducer> producer;
try {
cout << "Producer: Initializing ActiveMQ library" << endl;
activemq::library::ActiveMQCPP::initializeLibrary();
auto connectionFactory = make_unique<ActiveMQConnectionFactory>(brokerURI);
connection.reset(connectionFactory->createConnection());
SimpleExceptionListener exceptionListener;
connection->setExceptionListener(&exceptionListener);
connection->start();
cout << "Producer: Connection started" << endl;
session.reset(connection->createSession(Session::AUTO_ACKNOWLEDGE));
destination.reset(session->createQueue(queueName));
producer.reset(session->createProducer(destination.get()));
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
cout << "Producer: Starting message loop" << endl;
long long message_count = 0;
string payload_base = "Test message body ";
while (true) {
string text = payload_base + to_string(message_count++);
unique_ptr<TextMessage> message(session->createTextMessage(text));
producer->send(message.get());
if (message_count % 1000 == 0) {
cout << "Producer: Sent " << message_count << " messages" << endl;
}
// Regulate send rate to avoid overwhelming the system
this_thread::sleep_for(chrono::milliseconds(5));
}
} catch (CMSException& e) {
cerr << "Producer Error: " << e.getMessage() << endl;
e.printStackTrace();
}
cout << "Producer: Shutting down" << endl;
if (connection) {
connection->close();
}
activemq::library::ActiveMQCPP::shutdownLibrary();
}
void run_consumer(const string& brokerURI, const string& queueName) {
unique_ptr<Connection> connection;
unique_ptr<Session> session;
unique_ptr<Destination> destination;
unique_ptr<MessageConsumer> consumer;
try {
cout << "Consumer: Initializing ActiveMQ library" << endl;
activemq::library::ActiveMQCPP::initializeLibrary();
auto connectionFactory = make_unique<ActiveMQConnectionFactory>(brokerURI);
connection.reset(connectionFactory->createConnection());
SimpleExceptionListener exceptionListener;
connection->setExceptionListener(&exceptionListener);
connection->start();
cout << "Consumer: Connection started" << endl;
session.reset(connection->createSession(Session::AUTO_ACKNOWLEDGE));
destination.reset(session->createQueue(queueName));
consumer.reset(session->createConsumer(destination.get()));
cout << "Consumer: Starting message receive loop" << endl;
long long message_count = 0;
while (true) {
unique_ptr<Message> message(consumer->receive());
if (message.get() != nullptr) {
message_count++;
if (message_count % 1000 == 0) {
cout << "Consumer: Received " << message_count << " messages" << endl;
}
} else {
cout << "Consumer: Received null message, maybe broker shutdown?" << endl;
break;
}
}
} catch (CMSException& e) {
cerr << "Consumer Error: " << e.getMessage() << endl;
e.printStackTrace();
}
cout << "Consumer: Shutting down" << endl;
if (connection) {
connection->close();
}
activemq::library::ActiveMQCPP::shutdownLibrary();
}
int main(int argc, char* argv[]) {
if (argc != 3) {
cerr << "Usage: " << argv[0] << " <producer|consumer> <broker_uri>" << endl;
return 1;
}
string mode = argv[1];
string brokerURI = argv[2]; // e.g., "tcp://127.0.0.1:61616"
string queueName = "latency.test.queue";
if (mode == "producer") {
run_producer(brokerURI, queueName);
} else if (mode == "consumer") {
run_consumer(brokerURI, queueName);
} else {
cerr << "Invalid mode. Use 'producer' or 'consumer'." << endl;
return 1;
}
return 0;
}
To compile and run this, you would first need ActiveMQ running, for example, via Docker:docker run -p 61616:61616 -p 8161:8161 rmohr/activemq:5.15.9
Then compile and run the clients in separate terminals:
# Terminal 1: Compile
mkdir build && cd build
cmake ..
make
# Terminal 2: Run Consumer
./latency_client consumer "tcp://127.0.0.1:61616"
# Terminal 3: Run Producer
./latency_client producer "tcp://127.0.0.1:61616"
The eBPF Probe for Kernel-Level Latency
Now we construct the eBPF probe using BCC. This Python script has two parts: an embedded C program that will be compiled and loaded into the kernel, and the Python user-space logic to manage the BPF program and collect the data.
The C program attaches kprobes to tcp_sendmsg
and tcp_recvmsg
. When a function is entered, we store the current timestamp in a BPF map, keyed by the thread ID (pid_tgid
). When the function returns, we retrieve the start timestamp, calculate the delta, and push an event containing the PID, function name, and latency into a perf buffer for the user-space script to read.
tcp_latency_probe.py
#!/usr/bin/python3
from bcc import BPF
import argparse
from time import strftime
import signal
# --- Arguments Parsing ---
parser = argparse.ArgumentParser(
description="Trace TCP send/recv latency at the kernel level.",
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("-p", "--pid", type=int, help="Trace this PID only.")
parser.add_argument("-o", "--output", type=str, help="Output file path (CSV).")
args = parser.parse_args()
if not args.pid:
print("Error: A specific PID must be provided with the -p flag.")
exit(1)
if not args.output:
print("Error: An output file path must be provided with the -o flag.")
exit(1)
# --- eBPF C Program ---
bpf_text = """
#include <uapi/linux/ptrace.h>
#include <net/sock.h>
#include <linux/sched.h>
// Data structure for an event to be sent to user-space
struct data_t {
u64 pid;
u64 latency_ns;
char comm[TASK_COMM_LEN];
char func[16]; // To store "send" or "recv"
};
BPF_PERF_OUTPUT(events);
// Map to store start timestamps, keyed by thread ID
BPF_HASH(start_ts, u64);
// Kprobe for tcp_sendmsg entry
int trace_tcp_sendmsg_entry(struct pt_regs *ctx, struct sock *sk) {
u64 pid_tgid = bpf_get_current_pid_tgid();
u32 pid = pid_tgid >> 32;
// Filter by the target PID if specified
if (FILTER_PID != 0 && pid != FILTER_PID) {
return 0;
}
u64 ts = bpf_ktime_get_ns();
start_ts.update(&pid_tgid, &ts);
return 0;
}
// Kretprobe for tcp_sendmsg return
int trace_tcp_sendmsg_return(struct pt_regs *ctx) {
u64 pid_tgid = bpf_get_current_pid_tgid();
u32 pid = pid_tgid >> 32;
// Filter by the target PID if specified
if (FILTER_PID != 0 && pid != FILTER_PID) {
return 0;
}
u64 *tsp = start_ts.lookup(&pid_tgid);
if (tsp == 0) {
return 0; // Missed the entry event
}
u64 latency = bpf_ktime_get_ns() - *tsp;
start_ts.delete(&pid_tgid);
// Prepare data to send to user-space
struct data_t data = {};
data.pid = pid;
data.latency_ns = latency;
bpf_get_current_comm(&data.comm, sizeof(data.comm));
__builtin_memcpy(data.func, "tcp_sendmsg", sizeof("tcp_sendmsg"));
events.perf_submit(ctx, &data, sizeof(data));
return 0;
}
// Kprobe for tcp_recvmsg entry
int trace_tcp_recvmsg_entry(struct pt_regs *ctx, struct sock *sk) {
u64 pid_tgid = bpf_get_current_pid_tgid();
u32 pid = pid_tgid >> 32;
if (FILTER_PID != 0 && pid != FILTER_PID) {
return 0;
}
u64 ts = bpf_ktime_get_ns();
start_ts.update(&pid_tgid, &ts);
return 0;
}
// Kretprobe for tcp_recvmsg return
int trace_tcp_recvmsg_return(struct pt_regs *ctx) {
u64 pid_tgid = bpf_get_current_pid_tgid();
u32 pid = pid_tgid >> 32;
if (FILTER_PID != 0 && pid != FILTER_PID) {
return 0;
}
u64 *tsp = start_ts.lookup(&pid_tgid);
if (tsp == 0) {
return 0;
}
u64 latency = bpf_ktime_get_ns() - *tsp;
start_ts.delete(&pid_tgid);
struct data_t data = {};
data.pid = pid;
data.latency_ns = latency;
bpf_get_current_comm(&data.comm, sizeof(data.comm));
__builtin_memcpy(data.func, "tcp_recvmsg", sizeof("tcp_recvmsg"));
events.perf_submit(ctx, &data, sizeof(data));
return 0;
}
"""
# --- Python User-space Logic ---
print(f"Attaching eBPF probes to process PID: {args.pid}")
print(f"Data will be written to {args.output}")
# Substitute the PID filter into the BPF code
bpf_text = bpf_text.replace('FILTER_PID', str(args.pid))
# Initialize BPF
b = BPF(text=bpf_text)
b.attach_kprobe(event="tcp_sendmsg", fn_name="trace_tcp_sendmsg_entry")
b.attach_kretprobe(event="tcp_sendmsg", fn_name="trace_tcp_sendmsg_return")
b.attach_kprobe(event="tcp_recvmsg", fn_name="trace_tcp_recvmsg_entry")
b.attach_kretprobe(event="tcp_recvmsg", fn_name="trace_tcp_recvmsg_return")
print("Probes attached. Press Ctrl-C to stop.")
# Open output file
try:
output_file = open(args.output, 'w')
output_file.write("timestamp,comm,pid,func,latency_ns\n")
except IOError as e:
print(f"Error opening output file: {e}")
exit(1)
# Process events from the perf buffer
def print_event(cpu, data, size):
event = b["events"].event(data)
output_file.write("%s,%s,%d,%s,%d\n" % (
strftime("%H:%M:%S"),
event.comm.decode('utf-8', 'replace'),
event.pid,
event.func.decode('utf-8', 'replace'),
event.latency_ns
))
# This is the 'events' BPF_PERF_OUTPUT buffer we defined in C
b["events"].open_perf_buffer(print_event)
# Signal handler for clean exit
def signal_handler(sig, frame):
print("\nDetaching probes and closing file...")
output_file.close()
exit(0)
signal.signal(signal.SIGINT, signal_handler)
# Main loop to poll for events
while True:
try:
b.perf_buffer_poll()
except KeyboardInterrupt:
signal_handler(None, None)
To run this, find the PID of your latency_client
producer or consumer (e.g., pgrep latency_client
) and then execute the probe:sudo python3 tcp_latency_probe.py -p <PID> -o /tmp/latency_data.csv
The script will run until you press Ctrl-C, at which point /tmp/latency_data.csv
will contain the captured latency data.
Analysis and Visualization with Seaborn
The final step is to analyze the collected raw data. A CSV file full of nanosecond timings is not intuitive. We need to visualize the distribution to understand the performance characteristics. A common mistake is to rely solely on averages, which can be heavily skewed by outliers and hide the true user experience. Statistical plots give a much clearer picture.
This Python script reads the CSV file generated by our eBPF probe and creates three key visualizations:
- Histogram with KDE: Shows the frequency of latencies across different buckets.
- Violin Plot: Combines a box plot with a kernel density plot. It’s excellent for visualizing the distribution shape and identifying multiple modes (clusters) of latency.
- ECDF Plot: The Empirical Cumulative Distribution Function plot shows the percentile distribution, answering questions like “What percentage of operations completed within X microseconds?”.
analyze_latency.py
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import argparse
def analyze_and_visualize(filepath):
"""
Reads latency data from a CSV file and generates statistical plots.
"""
try:
df = pd.read_csv(filepath)
except FileNotFoundError:
print(f"Error: The file '{filepath}' was not found.")
return
except pd.errors.EmptyDataError:
print(f"Error: The file '{filepath}' is empty. No data to analyze.")
return
# Data validation and cleaning
if 'latency_ns' not in df.columns or 'func' not in df.columns:
print("Error: CSV must contain 'latency_ns' and 'func' columns.")
return
# Convert latency to a more readable unit, like microseconds
df['latency_us'] = df['latency_ns'] / 1000.0
# Basic statistical summary
print("--- Latency Statistical Summary (in microseconds) ---")
print(df.groupby('func')['latency_us'].describe(percentiles=[.5, .9, .99, .999]))
print("-" * 55)
# Set plot style
sns.set_theme(style="whitegrid")
# 1. Histogram and KDE Plot
plt.figure(figsize=(12, 7))
sns.histplot(data=df, x='latency_us', hue='func', kde=True, element="step")
plt.title('Distribution of Kernel TCP Send/Recv Latency')
plt.xlabel('Latency (microseconds)')
plt.ylabel('Count')
plt.xscale('log') # Use a log scale if there is a wide range of values
plt.savefig('latency_histogram.png')
plt.close()
print("Generated latency_histogram.png")
# 2. Violin Plot
plt.figure(figsize=(12, 7))
sns.violinplot(data=df, x='func', y='latency_us', inner='quartile', cut=0)
plt.title('Violin Plot of Kernel TCP Latency')
plt.xlabel('Kernel Function')
plt.ylabel('Latency (microseconds)')
plt.yscale('log') # Log scale is often necessary for latency
plt.savefig('latency_violinplot.png')
plt.close()
print("Generated latency_violinplot.png")
# 3. ECDF Plot
plt.figure(figsize=(12, 7))
sns.ecdfplot(data=df, x='latency_us', hue='func')
plt.title('ECDF of Kernel TCP Latency')
plt.xlabel('Latency (microseconds)')
plt.ylabel('Proportion of Operations')
plt.grid(True, which="both", ls="--")
plt.xscale('log')
plt.savefig('latency_ecdf.png')
plt.close()
print("Generated latency_ecdf.png")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Analyze and visualize TCP latency data from a CSV file.")
parser.add_argument("filepath", type=str, help="Path to the latency data CSV file.")
args = parser.parse_args()
analyze_and_visualize(args.filepath)
Running this script (python3 analyze_latency.py /tmp/latency_data.csv
) would produce the image files. In our hypothetical investigation, the violin plot was the most revealing. It showed that while the median latency for tcp_sendmsg
was very low (around 15µs), there was a distinct “long tail” and a smaller bump in the distribution around the 1-2ms mark. This bimodal distribution suggested that most operations were fast, but a periodic, systemic issue was causing a subset of calls to be significantly delayed. This allowed us to rule out the C++ application’s logic and focus our investigation on kernel-level phenomena like scheduler contention, TCP buffer management, or even interactions with the virtualization layer.
The eBPF-based approach provided objective, high-resolution data that was previously inaccessible. The combination of a targeted eBPF probe and statistical visualization proved to be an effective toolchain for diagnosing these kinds of deep-system performance anomalies.
This methodology, however, is not without its limitations. The BCC Python frontend introduces a degree of overhead that might be unacceptable in the most performance-sensitive environments; a migration to a pure C/libbpf
solution would be necessary for permanent production monitoring. Furthermore, this approach only measures the time spent within the traced kernel functions. It does not account for time spent within the C++ client library before the syscall is made, nor does it capture delays on the network or at the broker. It is a precise tool for one specific part of the stack. A more complete picture would require correlating this kernel data with application-level tracing and network packet captures, which represents a logical next step in building a comprehensive observability platform.