Building a Kernel-Level IAM Guardrail for Data Warehouses with eBPF


Our existing Identity and Access Management (IAM) framework for the corporate data warehouse was, by all accounts, robust. Role-based access control was granular, column-level security was enforced, and audit logs from the warehouse provider were diligently shipped to a SIEM. The problem was one of fundamental visibility. The audit logs only told us about authenticated sessions. We couldn’t definitively answer questions about the originating processes or network traffic patterns before authentication. A developer’s misconfigured script, a compromised container, or an internal port scan could generate thousands of failed connection attempts against our warehouse endpoints, and we would be largely blind to the source and intent until it was too late. This is a classic blind spot: treating the data warehouse as a trusted black box and only monitoring its internal logs. In a real-world project, this assumption is dangerous.

The goal became to build a zero-instrumentation, high-performance guardrail that could observe and audit every single TCP connection attempt destined for our data warehouse endpoints, enriching this network data with process-level context directly from the kernel. The system had to be tamper-resistant and impose negligible performance overhead on the source machines. Traditional host agents were ruled out; they are too intrusive, require application-specific integration, and increase the attack surface. We needed to operate at a lower level.

This led us to eBPF. By attaching a small, sandboxed program to the kernel’s tcp_v4_connect function, we could intercept the metadata of every outbound IPv4 TCP connection system-wide. This approach is powerful because it’s application-agnostic. It doesn’t matter if the connection originates from a Python script using a Snowflake driver, a Java application with a JDBC connector, or even a simple curl command. If it opens a TCP socket, we see it. The data captured would then be streamed to a dedicated, high-throughput analytics warehouse—a security data lakehouse—for correlation against our central IAM policies.

The architecture is straightforward but effective:

graph TD
    subgraph "Application Hosts"
        A[Application Process] -- "syscall: connect()" --> B{Kernel};
        B -- "kprobe: tcp_v4_connect" --> C[eBPF Program];
        C -- "perf_submit" --> D[Ring Buffer];
    end

    subgraph "Userspace Agent (Go)"
        E[Go Agent] -- "Reads" --> D;
        E -- "Enriches Data (Container ID, Hostname)" --> F[Serialized Event];
    end

    G[Messaging Queue - Kafka]

    subgraph "Security Analytics Platform"
        H[Data Warehouse Ingester]
        I[Security Data Warehouse - ClickHouse]
        J[Alerting Engine] -- "SQL Query" --> I
        K[IAM Policy Store] -- "Provides Context" --> J
    end

    F --> G;
    G --> H;
    H --> I;
    J --> L[Security Team];

The core of the solution lies in two components: the eBPF program running in the kernel and the user-space agent responsible for collecting, enriching, and forwarding the data.

The eBPF Kernel Probe

The eBPF program is the heart of the data collection. Written in restricted C, it’s compiled to BPF bytecode and loaded into the kernel. We attach it to the tcp_v4_connect kernel function using a kprobe. A common mistake here is to use a less stable function or to try and read too much data from kernel memory, which can lead to verifier rejections or performance degradation. We focused on the bare minimum required for effective auditing.

The program’s primary job is to populate a custom event_t struct and push it into a BPF perf buffer, a high-performance mechanism for sending data from kernel to user space.

Here is the complete eBPF C code.

// SPDX-License-Identifier: GPL-2.0
// +build ignore

#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
#include <bpf/bpf_core_read.h>

// Define a structure to hold our event data.
// This is the data that will be sent from the kernel to user space.
// The `__attribute__((packed))` is important to avoid padding issues
// between the kernel and user space Go struct.
struct event_t {
    u64 ts_ns;         // Timestamp in nanoseconds
    u32 pid;           // Process ID
    u32 tgid;          // Thread Group ID (User-space PID)
    u32 uid;           // User ID
    u32 gid;           // Group ID
    u32 saddr;         // Source IPv4 address
    u32 daddr;         // Destination IPv4 address
    u16 dport;         // Destination port
    char comm[16];     // Process name
} __attribute__((packed));

// BPF_MAP_TYPE_PERF_EVENT_ARRAY is a standard way to send data to user space.
// We create a map named 'events' that can be accessed by the user-space agent.
struct {
    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
    __uint(key_size, sizeof(u32));
    __uint(value_size, sizeof(u32));
} events SEC(".maps");

// The kprobe is attached to the `tcp_v4_connect` function.
// This function is called for every attempt to initiate a TCP connection over IPv4.
SEC("kprobe/tcp_v4_connect")
int BPF_KPROBE(trace_tcp_v4_connect, struct sock *sk) {
    // Get current process/thread identifiers
    u64 id = bpf_get_current_pid_tgid();
    u32 tgid = id >> 32;
    u32 pid = id;

    // Get user and group ID
    u64 uid_gid = bpf_get_current_uid_gid();
    u32 uid = uid_gid;
    u32 gid = uid_gid >> 32;

    // The verifier needs to know we're checking for a NULL pointer.
    if (sk == NULL) {
        return 0;
    }

    struct event_t event = {};

    // Populate the event struct with collected data.
    // We use bpf_core_read for safely accessing kernel memory. This is
    // a CO-RE (Compile Once - Run Everywhere) feature that makes the BPF
    // program more portable across different kernel versions.
    bpf_core_read(&event.saddr, sizeof(event.saddr), &sk->__sk_common.skc_rcv_saddr);
    bpf_core_read(&event.daddr, sizeof(event.daddr), &sk->__sk_common.skc_daddr);
    // dport is stored in network byte order, so we need bpf_ntohs to convert it.
    u16 dport = 0;
    bpf_core_read(&dport, sizeof(dport), &sk->__sk_common.skc_dport);
    event.dport = bpf_ntohs(dport);

    // --- Critical Filter ---
    // In a production environment, you don't want to trace every single connection.
    // This is a major pitfall. You filter as early as possible inside the kernel
    // to minimize overhead. For example, only trace connections to known data
    // warehouse ports.
    // For this example, we'll imagine our warehouse runs on port 10000.
    // A more robust implementation would use a BPF map to store a set of target IPs/ports.
    if (event.dport != 10000) {
        return 0;
    }

    event.pid = pid;
    event.tgid = tgid;
    event.uid = uid;
    event.gid = gid;
    event.ts_ns = bpf_ktime_get_ns();

    // Get the process command name.
    bpf_get_current_comm(&event.comm, sizeof(event.comm));

    // Submit the event to the perf buffer. The BPF_F_CURRENT_CPU flag
    // specifies the CPU-specific buffer. The user-space agent will read
    // from all CPU buffers.
    bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &event, sizeof(event));

    return 0;
}

// A license is required for the eBPF program to be loaded by the kernel.
char LICENSE[] SEC("license") = "GPL";

A critical detail here is the in-kernel filter: if (event.dport != 10000) { return 0; }. Without this, the program would capture every TCP connection on the host, creating an overwhelming amount of noise and performance overhead. The most efficient filtering happens inside the kernel itself. In a real deployment, we replaced this hardcoded port with a BPF hash map that the user-space agent could dynamically update with a list of monitored data warehouse IPs and ports.

The User-space Collection Agent in Go

The user-space agent has several responsibilities:

  1. Load the compiled eBPF bytecode into the kernel.
  2. Attach the eBPF program to the specified kprobe.
  3. Listen to the perf buffer for events sent by the eBPF program.
  4. Deserialize the event data from raw bytes into a structured format.
  5. Enrich the event with user-space context (e.g., hostname, container metadata).
  6. Forward the enriched data to a downstream system like Kafka.

We chose Go for its strong concurrency support, excellent ecosystem for observability, and the mature cilium/ebpf library for interacting with the kernel’s eBPF subsystem.

Here is a significant portion of the agent’s main logic.

package main

import (
	"bytes"
	"context"
	"encoding/binary"
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"net"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/cilium/ebpf"
	"github.com/cilium/ebpf/link"
	"github.com/cilium/ebpf/perf"
	"github.com/segmentio/kafka-go"
)

//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang bpf connect.c -- -I./headers

// Event mirrors the C struct in the eBPF program.
// The layout and padding must be identical.
type Event struct {
	TsNs  uint64
	Pid   uint32
	Tgid  uint32
	Uid   uint32
	Gid   uint32
	Saddr uint32
	Daddr uint32
	Dport uint16
	Comm  [16]byte
}

// EnrichedEvent adds user-space context.
type EnrichedEvent struct {
	Timestamp   time.Time `json:"timestamp"`
	Hostname    string    `json:"hostname"`
	ProcessName string    `json:"process_name"`
	PID         uint32    `json:"pid"`
	TGID        uint32    `json:"tgid"`
	UID         uint32    `json:"uid"`
	GID         uint32    `json:"gid"`
	SrcIP       string    `json:"src_ip"`
	DstIP       string    `json:"dst_ip"`
	DstPort     uint16    `json:"dst_port"`
}

const (
	// In a real-world project, these should come from configuration.
	kafkaBroker = "kafka:9092"
	kafkaTopic  = "dw-connect-audit"
)

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
	defer stop()

	hostname, err := os.Hostname()
	if err != nil {
		log.Fatalf("Failed to get hostname: %v", err)
	}

	// --- eBPF Setup ---
	objs := bpfObjects{}
	if err := loadBpfObjects(&objs, nil); err != nil {
		log.Fatalf("Loading eBPF objects failed: %v", err)
	}
	defer objs.Close()

	// Attach the kprobe.
	kp, err := link.Kprobe("tcp_v4_connect", objs.TraceTcpV4Connect, nil)
	if err != nil {
		log.Fatalf("Attaching kprobe failed: %v", err)
	}
	defer kp.Close()

	log.Println("eBPF probe attached. Waiting for events...")

	// --- Perf Buffer Reader ---
	// The perf reader will listen for events from the kernel.
	rd, err := perf.NewReader(objs.Events, os.Getpagesize()*64)
	if err != nil {
		log.Fatalf("Creating perf event reader failed: %v", err)
	}
	defer rd.Close()

	// --- Kafka Producer ---
	// Using a robust Kafka client is crucial. segmentio/kafka-go provides
	// good defaults for retries and batching.
	kafkaWriter := &kafka.Writer{
		Addr:         kafka.TCP(kafkaBroker),
		Topic:        kafkaTopic,
		Balancer:     &kafka.LeastBytes{},
		RequiredAcks: kafka.RequireOne,
		Async:        true, // Use async for high throughput.
		Completion: func(messages []kafka.Message, err error) {
			if err != nil {
				log.Printf("Kafka async write failed: %v", err)
			}
		},
	}
	defer kafkaWriter.Close()

	// --- Main Event Loop ---
	eventChan := make(chan []byte, 1000)
	go processEvents(ctx, eventChan, kafkaWriter, hostname)

	go func() {
		<-ctx.Done()
		rd.Close()
	}()

	for {
		record, err := rd.Read()
		if err != nil {
			if errors.Is(err, perf.ErrClosed) {
				log.Println("Perf reader closed. Exiting...")
				return
			}
			log.Printf("Error reading from perf reader: %v", err)
			continue
		}

		if record.LostSamples > 0 {
			log.Printf("Lost %d samples due to buffer overflow", record.LostSamples)
		}

		// Non-blocking send to the processing channel.
		// A potential pitfall is a slow consumer (Kafka); if the channel fills up,
		// we start dropping events. Monitoring channel depth is important.
		select {
		case eventChan <- record.RawSample:
		default:
			log.Println("Event processing channel is full. Dropping event.")
		}
	}
}

// processEvents handles deserialization, enrichment, and forwarding.
func processEvents(ctx context.Context, eventChan <-chan []byte, writer *kafka.Writer, hostname string) {
	for {
		select {
		case <-ctx.Done():
			return
		case rawSample := <-eventChan:
			var event Event
			// The memory layout of the Go struct must match the C struct exactly.
			if err := binary.Read(bytes.NewReader(rawSample), binary.LittleEndian, &event); err != nil {
				log.Printf("Error parsing event: %v", err)
				continue
			}

			enriched := enrichEvent(event, hostname)
			payload, err := json.Marshal(enriched)
			if err != nil {
				log.Printf("Error marshalling event: %v", err)
				continue
			}

			// Write to Kafka. Error handling for async writes is done in the Completion func.
			err = writer.WriteMessages(ctx, kafka.Message{Value: payload})
			if err != nil {
				// This might happen if the context is cancelled.
				if !errors.Is(err, context.Canceled) {
					log.Printf("Failed to enqueue message to Kafka: %v", err)
				}
			}
		}
	}
}

// enrichEvent converts kernel data types to human-readable formats and adds metadata.
func enrichEvent(event Event, hostname string) EnrichedEvent {
	return EnrichedEvent{
		Timestamp:   time.Now(), // Using agent timestamp. Kernel ts is available if clock sync is good.
		Hostname:    hostname,
		ProcessName: string(bytes.TrimRight(event.Comm[:], "\x00")),
		PID:         event.Pid,
		TGID:        event.Tgid,
		UID:         event.Uid,
		GID:         event.Gid,
		SrcIP:       intToIP(event.Saddr).String(),
		DstIP:       intToIP(event.Daddr).String(),
		DstPort:     event.Dport,
	}
}

func intToIP(ipInt uint32) net.IP {
	ip := make(net.IP, 4)
	binary.BigEndian.PutUint32(ip, ipInt)
	return ip
}

This agent code includes graceful shutdown handling, a separate goroutine for processing events to avoid blocking the high-priority perf buffer reader, and an asynchronous Kafka producer for performance. It also handles a key issue: record.LostSamples. If user space cannot read from the perf buffer fast enough, the kernel will start dropping events. Logging this metric is vital for understanding if the agent is keeping up with the data volume.

Analysis and Alerting in the Security Warehouse

With a steady stream of connection events flowing into our security data warehouse (we used ClickHouse for its excellent ingestion speed and query performance on time-series data), the final step was correlation. The dw-connect-audit table was designed for this purpose.

CREATE TABLE dw_connect_audit (
    `timestamp` DateTime64(3, 'UTC'),
    `hostname` String,
    `process_name` String,
    `pid` UInt32,
    `tgid` UInt32,
    `uid` UInt32,
    `src_ip` IPv4,
    `dst_ip` IPv4,
    `dst_port` UInt16
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (hostname, process_name, timestamp);

We also created a table representing our IAM policy, which was periodically synced from our central IAM system.

CREATE TABLE iam_dw_policy (
    `process_name` String,
    `allowed_hostname_glob` String,
    `allowed_uid` UInt32
) ENGINE = ReplacingMergeTree()
ORDER BY (process_name, allowed_hostname_glob, allowed_uid);

Now, we could write simple SQL queries to detect anomalies. For instance, a query to find connections from unauthorized processes or users:

SELECT
    t1.timestamp,
    t1.hostname,
    t1.process_name,
    t1.uid,
    t1.dst_ip
FROM dw_connect_audit AS t1
LEFT JOIN iam_dw_policy AS t2 ON t1.process_name = t2.process_name AND t1.uid = t2.allowed_uid
WHERE t2.process_name IS NULL
AND t1.timestamp > now() - INTERVAL 5 MINUTE;

This query, run every five minutes, became a high-fidelity alerting mechanism. Within weeks of deployment, it flagged a developer’s local analytics tool, which had a cached production credential, attempting to connect directly to the production data warehouse. This was a policy violation that our traditional IAM logs would never have seen because the connection likely failed authentication, and the failure event would have been lost in a sea of noise. Here, the context of process_name='jupyter-notebook' and hostname='dev-laptop-123' was the critical piece of information that made the event actionable.

This system is not without its limitations. The process name (comm) in the kernel is limited to 16 bytes and can be spoofed by a sophisticated attacker. A more robust solution could involve hashing the process binary on disk and verifying its integrity. Furthermore, this implementation only monitors connection initiation; it provides no visibility into the encrypted data stream itself. Extending the eBPF probes to user-space SSL/TLS libraries (uprobes) could potentially extract unencrypted data or TLS metadata, but this significantly increases complexity and fragility. The current solution, however, strikes a pragmatic balance between visibility, performance, and implementation overhead, effectively closing a critical gap in our data warehouse security posture. Future iterations will focus on automating the IAM policy synchronization and building a BPF map-based mechanism for the agent to push real-time blocking rules directly into the kernel, transforming this from a passive auditing tool into an active, dynamic firewall.


  TOC