Our move to Kubernetes was not a clean slate. We had a decade’s worth of infrastructure automation built on Chef, and our SRE team was accustomed to its declarative, convergent model for managing fleets of VMs. When we introduced a Kubernetes cluster using Cilium for networking and security, we inadvertently created an observability gap. The dynamic, API-driven nature of Cilium’s network policies and the sheer volume of real-time flow data were invisible to our existing toolchains. The immediate pain point was troubleshooting network connectivity issues. kubectl
commands were not a scalable solution for an operations team that needed a centralized, real-time view.
The initial, naive approach was a simple dashboard that would periodically poll the Kubernetes API and Cilium’s custom resources. This failed within hours. The polling interval was either too long to be useful for real-time debugging or so short that it hammered the API server, creating more problems than it solved. It became clear we needed a push-based, streaming architecture.
This led us to Cilium’s hidden gem: Hubble. Hubble provides an observability layer that can stream network flow data via a gRPC API. This was the source we needed. However, directly connecting multiple operator browsers to a single gRPC stream was not a production-ready design. It lacked scalability, resiliency, and a mechanism for fan-out. A common mistake is to build a direct, tightly coupled link between a backend data source and a UI. In any real-world project, this will break under load or during backend restarts.
We decided to introduce a message queue as a durable, scalable buffer between the data source and the consumers. We chose RabbitMQ because our team had extensive operational experience with it. It would ingest the high-frequency stream from Hubble and distribute it to any number of front-end clients. For the front-end itself, we needed a way to handle this firehose of data without freezing the browser. Raw DOM manipulation was out. This is where MobX came in. Its transparent reactive programming model was a perfect fit, allowing us to update our data model and have the UI react efficiently with minimal boilerplate. The final, non-negotiable constraint was that the deployment and configuration of all the new backend components—the gRPC adapter and the RabbitMQ cluster itself—had to be managed via Chef to fit into our existing CI/CD and infrastructure management pipelines. This created a peculiar but realistic hybrid architecture.
The final design looked like this:
graph TD subgraph Kubernetes Cluster Cilium -- eBPF --> Hubble[Hubble gRPC Server] end subgraph Chef Managed Infra A[Node.js gRPC Adapter] -- Consumes gRPC Stream --> Hubble A -- Publishes to Exchange --> MQ[RabbitMQ Broker] end subgraph Operator Workstation B[Browser Frontend] -- Subscribes via Web-STOMP --> MQ subgraph B C[JavaScript App] -- Uses --> D[MobX Store] D -- Reactively Updates --> E[DOM] end end style A fill:#f9f,stroke:#333,stroke-width:2px style MQ fill:#ff9,stroke:#333,stroke-width:2px
The gRPC-to-MessageQueue Adapter
The core of the backend is a Node.js service responsible for a single task: connect to the Hubble gRPC stream, parse the flow events, and publish them to a RabbitMQ fanout exchange. It has to be resilient to connection drops from either end and configurable without code changes.
Here’s the project structure for this adapter:
/hubble-mq-adapter
├── src/
│ ├── adapter.js # Main application logic
│ ├── grpcClient.js # Hubble gRPC connection handling
│ └── mqPublisher.js # RabbitMQ connection and publishing
├── test/
│ └── mqPublisher.test.js # Unit tests for the publisher
├── .env.example # Environment variable template
├── package.json
└── README.md
The package.json
defines our dependencies. We need libraries for gRPC, RabbitMQ, and configuration management.
{
"name": "hubble-mq-adapter",
"version": "1.0.0",
"description": "Streams Cilium Hubble flows to RabbitMQ",
"main": "src/adapter.js",
"scripts": {
"start": "node src/adapter.js",
"test": "jest"
},
"dependencies": {
"@grpc/grpc-js": "^1.8.0",
"@grpc/proto-loader": "^0.7.4",
"amqplib": "^0.10.3",
"dotenv": "^16.0.3"
},
"devDependencies": {
"jest": "^29.3.1"
}
}
Configuration is managed through environment variables, loaded by dotenv
. This is crucial for integration with Chef, which will generate the .env
file from a template.
.env.example
# Hubble gRPC Server configuration
HUBBLE_RELAY_ADDRESS=localhost:4245
HUBBLE_REQUEST_TIMEOUT_SECONDS=30
# RabbitMQ configuration
RABBITMQ_URL=amqp://guest:guest@localhost:5672
RABBITMQ_EXCHANGE_NAME=hubble_flows
RABBITMQ_EXCHANGE_TYPE=fanout
# Flow filtering options
# Comma-separated list of verdict strings to include (e.g., FORWARDED,DROPPED)
# Leave empty to include all.
VERDICT_WHITELIST=DROPPED
The gRPC client module handles the complexity of connecting to Hubble and managing the stream. A critical aspect here is retry logic. The connection will fail in a production environment, and the service must recover gracefully.
src/grpcClient.js
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');
const PROTO_PATH = path.resolve(__dirname, '../protos/observer.proto');
// Note: You must obtain the .proto files from the Cilium repository.
// For this example, we assume they are in a `protos` directory.
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
});
const observer = grpc.loadPackageDefinition(packageDefinition).observer;
// A wrapper class to handle connection and streaming logic
class HubbleClient {
constructor(address, onDataCallback, onErrorCallback) {
this.address = address;
this.onData = onDataCallback;
this.onError = onErrorCallback;
this.client = new observer.Observer(this.address, grpc.credentials.createInsecure());
this.stream = null;
this.reconnectInterval = 5000; // 5 seconds
}
startStreaming() {
console.log(`[gRPC] Attempting to connect to Hubble at ${this.address}`);
const request = {
follow: true,
// Example of a blacklist filter to reduce noise.
// In a real system, this would be more sophisticated.
blacklist: [{
source_pod: 'kube-system/' // Ignore flows from kube-system
}]
};
this.stream = this.client.getFlows(request);
this.stream.on('data', (response) => {
// We only care about actual flow data
if (response.flow) {
this.onData(response.flow);
}
});
this.stream.on('error', (err) => {
console.error(`[gRPC] Stream error: ${err.message}`);
this.onError(err);
this.scheduleReconnect();
});
this.stream.on('end', () => {
console.log('[gRPC] Stream ended. Attempting to reconnect.');
this.scheduleReconnect();
});
}
scheduleReconnect() {
if (this.stream) {
this.stream.cancel();
this.stream = null;
}
setTimeout(() => this.startStreaming(), this.reconnectInterval);
}
}
module.exports = HubbleClient;
Similarly, the RabbitMQ publisher needs robust error handling. If the message broker is unavailable, it should not crash the adapter but rather attempt to reconnect.
src/mqPublisher.js
const amqp = require('amqplib');
class MqPublisher {
constructor(url, exchangeName, exchangeType) {
this.url = url;
this.exchangeName = exchangeName;
this.exchangeType = exchangeType;
this.connection = null;
this.channel = null;
this.reconnectInterval = 5000;
}
async connect() {
try {
console.log(`[RabbitMQ] Connecting to ${this.url}`);
this.connection = await amqp.connect(this.url);
this.channel = await this.connection.createChannel();
await this.channel.assertExchange(this.exchangeName, this.exchangeType, { durable: false });
console.log('[RabbitMQ] Connection successful. Exchange asserted.');
this.connection.on('error', (err) => {
console.error('[RabbitMQ] Connection error:', err.message);
});
this.connection.on('close', () => {
console.error('[RabbitMQ] Connection closed. Reconnecting...');
this.scheduleReconnect();
});
} catch (err) {
console.error('[RabbitMQ] Failed to connect:', err.message);
this.scheduleReconnect();
}
}
scheduleReconnect() {
this.connection = null;
this.channel = null;
setTimeout(() => this.connect(), this.reconnectInterval);
}
publish(message) {
if (!this.channel) {
console.warn('[RabbitMQ] Channel not available. Message dropped.');
return;
}
try {
// In a fanout exchange, the routing key is ignored.
this.channel.publish(
this.exchangeName,
'',
Buffer.from(JSON.stringify(message))
);
} catch (err) {
console.error('[RabbitMQ] Failed to publish message:', err.message);
}
}
}
module.exports = MqPublisher;
Finally, the main adapter file ties them together. It processes and filters the data before publishing. A key lesson learned was that Hubble can produce an overwhelming amount of data. Filtering for specific events (like DROPPED
verdicts) at the source is critical to avoid saturating the message queue and the front-end.
src/adapter.js
require('dotenv').config();
const HubbleClient = require('./grpcClient');
const MqPublisher = require('./mqPublisher');
// --- Configuration ---
const HUBBLE_RELAY_ADDRESS = process.env.HUBBLE_RELAY_ADDRESS;
const RABBITMQ_URL = process.env.RABBITMQ_URL;
const RABBITMQ_EXCHANGE_NAME = process.env.RABBITMQ_EXCHANGE_NAME;
const RABBITMQ_EXCHANGE_TYPE = process.env.RABBITMQ_EXCHANGE_TYPE;
const VERDICT_WHITELIST = (process.env.VERDICT_WHITELIST || '').split(',').filter(Boolean);
if (!HUBBLE_RELAY_ADDRESS || !RABBITMQ_URL) {
console.error("Missing critical environment variables. Exiting.");
process.exit(1);
}
// --- Main Logic ---
const publisher = new MqPublisher(RABBITMQ_URL, RABBITMQ_EXCHANGE_NAME, RABBITMQ_EXCHANGE_TYPE);
const processFlow = (flow) => {
// A common pitfall is to forward the raw, verbose protobuf object.
// We transform it into a leaner JSON payload for the frontend.
const simplifiedFlow = {
time: flow.time.seconds,
verdict: flow.verdict,
// Optional chaining is crucial as not all flows have all fields.
source: `${flow.source?.pod_name} (${flow.IP?.source})`,
destination: `${flow.destination?.pod_name} (${flow.IP?.destination})`,
l4_protocol: Object.keys(flow.l4)[0], // e.g., 'TCP', 'UDP'
dest_port: flow.l4?.TCP?.destination_port || flow.l4?.UDP?.destination_port,
type: flow.Type,
summary: flow.Summary,
};
// Apply server-side filtering. This is much more efficient than filtering in the browser.
if (VERDICT_WHITELIST.length > 0 && !VERDICT_WHITELIST.includes(simplifiedFlow.verdict)) {
return;
}
publisher.publish(simplifiedFlow);
};
const handleGrpcError = (err) => {
// In a production system, this should trigger an alert.
console.error("gRPC client encountered an unrecoverable error:", err);
};
async function main() {
console.log("Starting Hubble MQ Adapter...");
await publisher.connect();
const hubbleClient = new HubbleClient(HUBBLE_RELAY_ADDRESS, processFlow, handleGrpcError);
hubbleClient.startStreaming();
}
main();
Deployment with Chef
Integrating this service into our existing Chef-based workflow was mandatory. This meant creating a cookbook to handle the deployment, configuration, and service management.
A simplified Chef recipe to deploy this adapter would look like this. It uses a template for the .env
file, pulling values from Chef attributes. This ensures configuration is managed centrally and is consistent across environments.
cookbooks/hubble_adapter/recipes/default.rb
# Install Node.js and npm
include_recipe 'nodejs'
app_dir = '/opt/hubble-mq-adapter'
app_user = 'hubble'
user app_user do
system true
shell '/bin/false'
home '/nonexistent'
end
directory app_dir do
owner app_user
group app_user
mode '0755'
recursive true
end
# Deploy application code from a source (e.g., git or artifact repository)
git app_dir do
repository 'https://your-git-repo/hubble-mq-adapter.git'
revision 'main'
action :sync
notifies :run, 'execute[npm install]', :immediately
end
execute 'npm install' do
command 'npm install --production'
cwd app_dir
user app_user
action :nothing # Only run on notification
end
# Manage configuration via a template
template "#{app_dir}/.env" do
source 'env.erb'
owner app_user
group app_user
mode '0640'
variables(
hubble_address: node['hubble_adapter']['hubble_address'],
rabbitmq_url: node['hubble_adapter']['rabbitmq_url'],
verdict_whitelist: node['hubble_adapter']['verdict_whitelist'].join(',')
)
notifies :restart, 'systemd_unit[hubble-adapter.service]', :delayed
end
# Manage the service with systemd
systemd_unit 'hubble-adapter.service' do
content({
Unit: {
Description: 'Hubble to Message Queue Adapter',
After: 'network.target',
},
Service: {
Type: 'simple',
User: app_user,
WorkingDirectory: app_dir,
ExecStart: '/usr/bin/node src/adapter.js',
Restart: 'on-failure',
},
Install: {
WantedBy: 'multi-user.target',
},
})
action [:create, :enable, :start]
end
This approach, while feeling somewhat archaic in a cloud-native context, provided the bridge we needed between our old and new infrastructure paradigms.
The MobX-Powered Frontend
The frontend’s job is to present a live, scrolling view of the network flows. The primary challenge is performance. A busy cluster can generate hundreds of flow events per second. Updating the DOM that frequently with naive methods would lock up the browser.
This is where MobX shines. We define an observable state object, and our rendering logic automatically subscribes to changes in that state. MobX ensures that only the necessary parts of the UI are re-rendered when data changes.
index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Cilium Real-Time Flow Monitor</title>
<style>
body { font-family: monospace; background-color: #1e1e1e; color: #d4d4d4; }
table { width: 100%; border-collapse: collapse; }
th, td { padding: 4px 8px; text-align: left; border-bottom: 1px solid #444; }
.DROPPED { color: #f48771; }
.FORWARDED { color: #8cc265; }
#status { position: fixed; top: 10px; right: 10px; padding: 5px; background: #555; }
</style>
</head>
<body>
<h1>Live Cilium Network Flows</h1>
<div id="status">Connecting...</div>
<table>
<thead>
<tr>
<th>Timestamp</th>
<th>Verdict</th>
<th>Source</th>
<th>Destination</th>
<th>Port</th>
<th>Summary</th>
</tr>
</thead>
<tbody id="flow-container"></tbody>
</table>
<!-- Dependencies -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/mobx/6.7.0/mobx.umd.production.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
<script src="app.js"></script>
</body>
</html>
The JavaScript application consists of a MobX store and a UI rendering part that reacts to store changes.
app.js
// --- MobX State Store ---
const flowStore = mobx.observable({
flows: [],
maxFlows: 200, // Keep the list from growing indefinitely
connectionStatus: 'Connecting',
addFlow(flow) {
this.flows.unshift(flow); // Add to the top
if (this.flows.length > this.maxFlows) {
this.flows.pop(); // Remove from the bottom
}
},
setStatus(status) {
this.connectionStatus = status;
}
});
// --- UI Rendering ---
const flowContainer = document.getElementById('flow-container');
const statusDiv = document.getElementById('status');
// This autorun function is the magic of MobX.
// It runs once initially, and then re-runs automatically
// whenever any observable data it reads (flowStore.flows) changes.
// MobX handles the dependency tracking and efficient re-rendering.
mobx.autorun(() => {
// A common mistake is to re-render the entire table, which is slow.
// A more sophisticated implementation would use keys to patch the DOM,
// but for this example, regenerating the content is simple and clear.
const rowsHtml = flowStore.flows.map(flow => `
<tr class="verdict-${flow.verdict}">
<td>${new Date(flow.time * 1000).toLocaleTimeString()}</td>
<td class="${flow.verdict}">${flow.verdict}</td>
<td>${flow.source}</td>
<td>${flow.destination}</td>
<td>${flow.dest_port}</td>
<td>${flow.summary}</td>
</tr>
`).join('');
flowContainer.innerHTML = rowsHtml;
});
mobx.autorun(() => {
statusDiv.textContent = `Status: ${flowStore.connectionStatus}`;
statusDiv.style.color = flowStore.connectionStatus === 'Connected' ? '#8cc265' : '#f48771';
});
// --- WebSocket/STOMP Connection ---
function connectToMq() {
// Assumes RabbitMQ has the Web-STOMP plugin enabled
const client = Stomp.client('ws://localhost:15674/ws');
const onConnect = () => {
mobx.runInAction(() => flowStore.setStatus('Connected'));
client.subscribe('/exchange/hubble_flows', (message) => {
try {
const flow = JSON.parse(message.body);
// mobx.runInAction is used to batch state changes
mobx.runInAction(() => {
flowStore.addFlow(flow);
});
} catch (e) {
console.error("Failed to parse message", e);
}
});
};
const onError = (error) => {
mobx.runInAction(() => flowStore.setStatus(`Error: ${error.body || 'Connection failed'}`));
console.error('STOMP Error:', error);
// Attempt to reconnect after a delay
setTimeout(connectToMq, 5000);
};
client.connect('guest', 'guest', onConnect, onError, '/');
}
// Initial connection
connectToMq();
With this setup, the browser remains responsive even with a high rate of incoming messages. The autorun
function re-renders the table whenever flowStore.flows
is modified, but MobX ensures this is done efficiently. The state logic is completely decoupled from the rendering logic, which is a massive win for maintainability.
Limitations and Future Paths
This architecture, while effective, is a product of its constraints. Using Chef to manage a cloud-native component’s adapter feels like a temporary bridge. The natural evolution is to package the adapter into a container, deploy it to the Kubernetes cluster, and manage its configuration via a ConfigMap or a custom operator. This would fully align its lifecycle with the component it’s observing.
The front-end is purely for real-time visualization. It has no memory of past events. A logical next step would be to have another consumer on the message queue—a data sink service—that writes the flow data into a time-series database like Prometheus or VictoriaMetrics. This would unlock historical analysis, alerting, and more sophisticated dashboarding with tools like Grafana.
Finally, the current data flow is unidirectional. The true power of such a system would be to make it bidirectional, allowing operators to send commands back through the queue—for instance, to temporarily create a CiliumNetworkPolicy to isolate a problematic pod—turning this observability tool into a rudimentary control plane. That, however, opens up a new set of challenges around security, authorization, and idempotency.