Constructing a Real-Time Telemetry Pipeline from Cilium to a MobX Frontend via a Message Queue


Our move to Kubernetes was not a clean slate. We had a decade’s worth of infrastructure automation built on Chef, and our SRE team was accustomed to its declarative, convergent model for managing fleets of VMs. When we introduced a Kubernetes cluster using Cilium for networking and security, we inadvertently created an observability gap. The dynamic, API-driven nature of Cilium’s network policies and the sheer volume of real-time flow data were invisible to our existing toolchains. The immediate pain point was troubleshooting network connectivity issues. kubectl commands were not a scalable solution for an operations team that needed a centralized, real-time view.

The initial, naive approach was a simple dashboard that would periodically poll the Kubernetes API and Cilium’s custom resources. This failed within hours. The polling interval was either too long to be useful for real-time debugging or so short that it hammered the API server, creating more problems than it solved. It became clear we needed a push-based, streaming architecture.

This led us to Cilium’s hidden gem: Hubble. Hubble provides an observability layer that can stream network flow data via a gRPC API. This was the source we needed. However, directly connecting multiple operator browsers to a single gRPC stream was not a production-ready design. It lacked scalability, resiliency, and a mechanism for fan-out. A common mistake is to build a direct, tightly coupled link between a backend data source and a UI. In any real-world project, this will break under load or during backend restarts.

We decided to introduce a message queue as a durable, scalable buffer between the data source and the consumers. We chose RabbitMQ because our team had extensive operational experience with it. It would ingest the high-frequency stream from Hubble and distribute it to any number of front-end clients. For the front-end itself, we needed a way to handle this firehose of data without freezing the browser. Raw DOM manipulation was out. This is where MobX came in. Its transparent reactive programming model was a perfect fit, allowing us to update our data model and have the UI react efficiently with minimal boilerplate. The final, non-negotiable constraint was that the deployment and configuration of all the new backend components—the gRPC adapter and the RabbitMQ cluster itself—had to be managed via Chef to fit into our existing CI/CD and infrastructure management pipelines. This created a peculiar but realistic hybrid architecture.

The final design looked like this:

graph TD
    subgraph Kubernetes Cluster
        Cilium -- eBPF --> Hubble[Hubble gRPC Server]
    end

    subgraph Chef Managed Infra
        A[Node.js gRPC Adapter] -- Consumes gRPC Stream --> Hubble
        A -- Publishes to Exchange --> MQ[RabbitMQ Broker]
    end

    subgraph Operator Workstation
        B[Browser Frontend] -- Subscribes via Web-STOMP --> MQ
        subgraph B
            C[JavaScript App] -- Uses --> D[MobX Store]
            D -- Reactively Updates --> E[DOM]
        end
    end

    style A fill:#f9f,stroke:#333,stroke-width:2px
    style MQ fill:#ff9,stroke:#333,stroke-width:2px

The gRPC-to-MessageQueue Adapter

The core of the backend is a Node.js service responsible for a single task: connect to the Hubble gRPC stream, parse the flow events, and publish them to a RabbitMQ fanout exchange. It has to be resilient to connection drops from either end and configurable without code changes.

Here’s the project structure for this adapter:

/hubble-mq-adapter
├── src/
│   ├── adapter.js        # Main application logic
│   ├── grpcClient.js     # Hubble gRPC connection handling
│   └── mqPublisher.js    # RabbitMQ connection and publishing
├── test/
│   └── mqPublisher.test.js # Unit tests for the publisher
├── .env.example          # Environment variable template
├── package.json
└── README.md

The package.json defines our dependencies. We need libraries for gRPC, RabbitMQ, and configuration management.

{
  "name": "hubble-mq-adapter",
  "version": "1.0.0",
  "description": "Streams Cilium Hubble flows to RabbitMQ",
  "main": "src/adapter.js",
  "scripts": {
    "start": "node src/adapter.js",
    "test": "jest"
  },
  "dependencies": {
    "@grpc/grpc-js": "^1.8.0",
    "@grpc/proto-loader": "^0.7.4",
    "amqplib": "^0.10.3",
    "dotenv": "^16.0.3"
  },
  "devDependencies": {
    "jest": "^29.3.1"
  }
}

Configuration is managed through environment variables, loaded by dotenv. This is crucial for integration with Chef, which will generate the .env file from a template.

.env.example

# Hubble gRPC Server configuration
HUBBLE_RELAY_ADDRESS=localhost:4245
HUBBLE_REQUEST_TIMEOUT_SECONDS=30

# RabbitMQ configuration
RABBITMQ_URL=amqp://guest:guest@localhost:5672
RABBITMQ_EXCHANGE_NAME=hubble_flows
RABBITMQ_EXCHANGE_TYPE=fanout

# Flow filtering options
# Comma-separated list of verdict strings to include (e.g., FORWARDED,DROPPED)
# Leave empty to include all.
VERDICT_WHITELIST=DROPPED

The gRPC client module handles the complexity of connecting to Hubble and managing the stream. A critical aspect here is retry logic. The connection will fail in a production environment, and the service must recover gracefully.

src/grpcClient.js

const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');

const PROTO_PATH = path.resolve(__dirname, '../protos/observer.proto');
// Note: You must obtain the .proto files from the Cilium repository.
// For this example, we assume they are in a `protos` directory.

const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true,
});
const observer = grpc.loadPackageDefinition(packageDefinition).observer;

// A wrapper class to handle connection and streaming logic
class HubbleClient {
    constructor(address, onDataCallback, onErrorCallback) {
        this.address = address;
        this.onData = onDataCallback;
        this.onError = onErrorCallback;
        this.client = new observer.Observer(this.address, grpc.credentials.createInsecure());
        this.stream = null;
        this.reconnectInterval = 5000; // 5 seconds
    }

    startStreaming() {
        console.log(`[gRPC] Attempting to connect to Hubble at ${this.address}`);
        const request = {
            follow: true,
            // Example of a blacklist filter to reduce noise.
            // In a real system, this would be more sophisticated.
            blacklist: [{
                source_pod: 'kube-system/' // Ignore flows from kube-system
            }]
        };

        this.stream = this.client.getFlows(request);

        this.stream.on('data', (response) => {
            // We only care about actual flow data
            if (response.flow) {
                this.onData(response.flow);
            }
        });

        this.stream.on('error', (err) => {
            console.error(`[gRPC] Stream error: ${err.message}`);
            this.onError(err);
            this.scheduleReconnect();
        });

        this.stream.on('end', () => {
            console.log('[gRPC] Stream ended. Attempting to reconnect.');
            this.scheduleReconnect();
        });
    }

    scheduleReconnect() {
        if (this.stream) {
            this.stream.cancel();
            this.stream = null;
        }
        setTimeout(() => this.startStreaming(), this.reconnectInterval);
    }
}

module.exports = HubbleClient;

Similarly, the RabbitMQ publisher needs robust error handling. If the message broker is unavailable, it should not crash the adapter but rather attempt to reconnect.

src/mqPublisher.js

const amqp = require('amqplib');

class MqPublisher {
    constructor(url, exchangeName, exchangeType) {
        this.url = url;
        this.exchangeName = exchangeName;
        this.exchangeType = exchangeType;
        this.connection = null;
        this.channel = null;
        this.reconnectInterval = 5000;
    }

    async connect() {
        try {
            console.log(`[RabbitMQ] Connecting to ${this.url}`);
            this.connection = await amqp.connect(this.url);
            this.channel = await this.connection.createChannel();

            await this.channel.assertExchange(this.exchangeName, this.exchangeType, { durable: false });

            console.log('[RabbitMQ] Connection successful. Exchange asserted.');

            this.connection.on('error', (err) => {
                console.error('[RabbitMQ] Connection error:', err.message);
            });

            this.connection.on('close', () => {
                console.error('[RabbitMQ] Connection closed. Reconnecting...');
                this.scheduleReconnect();
            });

        } catch (err) {
            console.error('[RabbitMQ] Failed to connect:', err.message);
            this.scheduleReconnect();
        }
    }

    scheduleReconnect() {
        this.connection = null;
        this.channel = null;
        setTimeout(() => this.connect(), this.reconnectInterval);
    }

    publish(message) {
        if (!this.channel) {
            console.warn('[RabbitMQ] Channel not available. Message dropped.');
            return;
        }
        try {
            // In a fanout exchange, the routing key is ignored.
            this.channel.publish(
                this.exchangeName,
                '',
                Buffer.from(JSON.stringify(message))
            );
        } catch (err) {
            console.error('[RabbitMQ] Failed to publish message:', err.message);
        }
    }
}

module.exports = MqPublisher;

Finally, the main adapter file ties them together. It processes and filters the data before publishing. A key lesson learned was that Hubble can produce an overwhelming amount of data. Filtering for specific events (like DROPPED verdicts) at the source is critical to avoid saturating the message queue and the front-end.

src/adapter.js

require('dotenv').config();

const HubbleClient = require('./grpcClient');
const MqPublisher = require('./mqPublisher');

// --- Configuration ---
const HUBBLE_RELAY_ADDRESS = process.env.HUBBLE_RELAY_ADDRESS;
const RABBITMQ_URL = process.env.RABBITMQ_URL;
const RABBITMQ_EXCHANGE_NAME = process.env.RABBITMQ_EXCHANGE_NAME;
const RABBITMQ_EXCHANGE_TYPE = process.env.RABBITMQ_EXCHANGE_TYPE;
const VERDICT_WHITELIST = (process.env.VERDICT_WHITELIST || '').split(',').filter(Boolean);

if (!HUBBLE_RELAY_ADDRESS || !RABBITMQ_URL) {
    console.error("Missing critical environment variables. Exiting.");
    process.exit(1);
}

// --- Main Logic ---
const publisher = new MqPublisher(RABBITMQ_URL, RABBITMQ_EXCHANGE_NAME, RABBITMQ_EXCHANGE_TYPE);

const processFlow = (flow) => {
    // A common pitfall is to forward the raw, verbose protobuf object.
    // We transform it into a leaner JSON payload for the frontend.
    const simplifiedFlow = {
        time: flow.time.seconds,
        verdict: flow.verdict,
        // Optional chaining is crucial as not all flows have all fields.
        source: `${flow.source?.pod_name} (${flow.IP?.source})`,
        destination: `${flow.destination?.pod_name} (${flow.IP?.destination})`,
        l4_protocol: Object.keys(flow.l4)[0], // e.g., 'TCP', 'UDP'
        dest_port: flow.l4?.TCP?.destination_port || flow.l4?.UDP?.destination_port,
        type: flow.Type,
        summary: flow.Summary,
    };
    
    // Apply server-side filtering. This is much more efficient than filtering in the browser.
    if (VERDICT_WHITELIST.length > 0 && !VERDICT_WHITELIST.includes(simplifiedFlow.verdict)) {
        return;
    }
    
    publisher.publish(simplifiedFlow);
};

const handleGrpcError = (err) => {
    // In a production system, this should trigger an alert.
    console.error("gRPC client encountered an unrecoverable error:", err);
};

async function main() {
    console.log("Starting Hubble MQ Adapter...");
    await publisher.connect();
    const hubbleClient = new HubbleClient(HUBBLE_RELAY_ADDRESS, processFlow, handleGrpcError);
    hubbleClient.startStreaming();
}

main();

Deployment with Chef

Integrating this service into our existing Chef-based workflow was mandatory. This meant creating a cookbook to handle the deployment, configuration, and service management.

A simplified Chef recipe to deploy this adapter would look like this. It uses a template for the .env file, pulling values from Chef attributes. This ensures configuration is managed centrally and is consistent across environments.

cookbooks/hubble_adapter/recipes/default.rb

# Install Node.js and npm
include_recipe 'nodejs'

app_dir = '/opt/hubble-mq-adapter'
app_user = 'hubble'

user app_user do
  system true
  shell '/bin/false'
  home '/nonexistent'
end

directory app_dir do
  owner app_user
  group app_user
  mode '0755'
  recursive true
end

# Deploy application code from a source (e.g., git or artifact repository)
git app_dir do
  repository 'https://your-git-repo/hubble-mq-adapter.git'
  revision 'main'
  action :sync
  notifies :run, 'execute[npm install]', :immediately
end

execute 'npm install' do
  command 'npm install --production'
  cwd app_dir
  user app_user
  action :nothing # Only run on notification
end

# Manage configuration via a template
template "#{app_dir}/.env" do
  source 'env.erb'
  owner app_user
  group app_user
  mode '0640'
  variables(
    hubble_address: node['hubble_adapter']['hubble_address'],
    rabbitmq_url: node['hubble_adapter']['rabbitmq_url'],
    verdict_whitelist: node['hubble_adapter']['verdict_whitelist'].join(',')
  )
  notifies :restart, 'systemd_unit[hubble-adapter.service]', :delayed
end

# Manage the service with systemd
systemd_unit 'hubble-adapter.service' do
  content({
    Unit: {
      Description: 'Hubble to Message Queue Adapter',
      After: 'network.target',
    },
    Service: {
      Type: 'simple',
      User: app_user,
      WorkingDirectory: app_dir,
      ExecStart: '/usr/bin/node src/adapter.js',
      Restart: 'on-failure',
    },
    Install: {
      WantedBy: 'multi-user.target',
    },
  })
  action [:create, :enable, :start]
end

This approach, while feeling somewhat archaic in a cloud-native context, provided the bridge we needed between our old and new infrastructure paradigms.

The MobX-Powered Frontend

The frontend’s job is to present a live, scrolling view of the network flows. The primary challenge is performance. A busy cluster can generate hundreds of flow events per second. Updating the DOM that frequently with naive methods would lock up the browser.

This is where MobX shines. We define an observable state object, and our rendering logic automatically subscribes to changes in that state. MobX ensures that only the necessary parts of the UI are re-rendered when data changes.

index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Cilium Real-Time Flow Monitor</title>
    <style>
        body { font-family: monospace; background-color: #1e1e1e; color: #d4d4d4; }
        table { width: 100%; border-collapse: collapse; }
        th, td { padding: 4px 8px; text-align: left; border-bottom: 1px solid #444; }
        .DROPPED { color: #f48771; }
        .FORWARDED { color: #8cc265; }
        #status { position: fixed; top: 10px; right: 10px; padding: 5px; background: #555; }
    </style>
</head>
<body>
    <h1>Live Cilium Network Flows</h1>
    <div id="status">Connecting...</div>
    <table>
        <thead>
            <tr>
                <th>Timestamp</th>
                <th>Verdict</th>
                <th>Source</th>
                <th>Destination</th>
                <th>Port</th>
                <th>Summary</th>
            </tr>
        </thead>
        <tbody id="flow-container"></tbody>
    </table>

    <!-- Dependencies -->
    <script src="https://cdnjs.cloudflare.com/ajax/libs/mobx/6.7.0/mobx.umd.production.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
    <script src="app.js"></script>
</body>
</html>

The JavaScript application consists of a MobX store and a UI rendering part that reacts to store changes.

app.js

// --- MobX State Store ---
const flowStore = mobx.observable({
    flows: [],
    maxFlows: 200, // Keep the list from growing indefinitely
    connectionStatus: 'Connecting',

    addFlow(flow) {
        this.flows.unshift(flow); // Add to the top
        if (this.flows.length > this.maxFlows) {
            this.flows.pop(); // Remove from the bottom
        }
    },
    
    setStatus(status) {
        this.connectionStatus = status;
    }
});

// --- UI Rendering ---
const flowContainer = document.getElementById('flow-container');
const statusDiv = document.getElementById('status');

// This autorun function is the magic of MobX.
// It runs once initially, and then re-runs automatically
// whenever any observable data it reads (flowStore.flows) changes.
// MobX handles the dependency tracking and efficient re-rendering.
mobx.autorun(() => {
    // A common mistake is to re-render the entire table, which is slow.
    // A more sophisticated implementation would use keys to patch the DOM,
    // but for this example, regenerating the content is simple and clear.
    const rowsHtml = flowStore.flows.map(flow => `
        <tr class="verdict-${flow.verdict}">
            <td>${new Date(flow.time * 1000).toLocaleTimeString()}</td>
            <td class="${flow.verdict}">${flow.verdict}</td>
            <td>${flow.source}</td>
            <td>${flow.destination}</td>
            <td>${flow.dest_port}</td>
            <td>${flow.summary}</td>
        </tr>
    `).join('');
    flowContainer.innerHTML = rowsHtml;
});

mobx.autorun(() => {
    statusDiv.textContent = `Status: ${flowStore.connectionStatus}`;
    statusDiv.style.color = flowStore.connectionStatus === 'Connected' ? '#8cc265' : '#f48771';
});

// --- WebSocket/STOMP Connection ---
function connectToMq() {
    // Assumes RabbitMQ has the Web-STOMP plugin enabled
    const client = Stomp.client('ws://localhost:15674/ws');
    
    const onConnect = () => {
        mobx.runInAction(() => flowStore.setStatus('Connected'));
        
        client.subscribe('/exchange/hubble_flows', (message) => {
            try {
                const flow = JSON.parse(message.body);
                // mobx.runInAction is used to batch state changes
                mobx.runInAction(() => {
                    flowStore.addFlow(flow);
                });
            } catch (e) {
                console.error("Failed to parse message", e);
            }
        });
    };

    const onError = (error) => {
        mobx.runInAction(() => flowStore.setStatus(`Error: ${error.body || 'Connection failed'}`));
        console.error('STOMP Error:', error);
        // Attempt to reconnect after a delay
        setTimeout(connectToMq, 5000);
    };

    client.connect('guest', 'guest', onConnect, onError, '/');
}

// Initial connection
connectToMq();

With this setup, the browser remains responsive even with a high rate of incoming messages. The autorun function re-renders the table whenever flowStore.flows is modified, but MobX ensures this is done efficiently. The state logic is completely decoupled from the rendering logic, which is a massive win for maintainability.

Limitations and Future Paths

This architecture, while effective, is a product of its constraints. Using Chef to manage a cloud-native component’s adapter feels like a temporary bridge. The natural evolution is to package the adapter into a container, deploy it to the Kubernetes cluster, and manage its configuration via a ConfigMap or a custom operator. This would fully align its lifecycle with the component it’s observing.

The front-end is purely for real-time visualization. It has no memory of past events. A logical next step would be to have another consumer on the message queue—a data sink service—that writes the flow data into a time-series database like Prometheus or VictoriaMetrics. This would unlock historical analysis, alerting, and more sophisticated dashboarding with tools like Grafana.

Finally, the current data flow is unidirectional. The true power of such a system would be to make it bidirectional, allowing operators to send commands back through the queue—for instance, to temporarily create a CiliumNetworkPolicy to isolate a problematic pod—turning this observability tool into a rudimentary control plane. That, however, opens up a new set of challenges around security, authorization, and idempotency.


  TOC