The problem was fragmentation. Our mobile engineering velocity had increased, but our ability to diagnose regressions was stuck in the past. A 2% drop in app startup time would trigger a multi-day investigation involving three teams, each with their own dashboards and data silos. The mobile team would point to the CI build logs showing artifact size growth. The frontend team, responsible for the JavaScript bundles loaded by the app, would look at their Webpack build stats in isolation. The backend team would scrutinize their API latency graphs. There was no single source of truth, no thread connecting a specific commit to its impact across the entire stack.
We needed a correlation ID, but not for a single user request. We needed a correlation ID for an entire release artifact. The initial concept was to use the CI pipeline’s unique identifier—our CI_PIPELINE_IID
—as this unifying key. This ID would be injected into every piece of telemetry generated during the build and subsequent runtime of that specific app version. This would allow us to pivot on a single ID in a centralized datastore and see the CI build duration, the resulting Webpack bundle size, the mobile artifact size, and even backend API performance metrics tied to that specific client version.
Our initial architecture discussions revolved around sending this data directly to our observability platform. This was a non-starter. Our CI system runs hundreds of jobs concurrently, and a high-frequency release schedule meant we’d be hammering the analytics backend with spiky, unpredictable write loads. This is a classic use case for a durable, high-throughput buffer. We chose Redis Streams over a heavier message queue like Kafka. For this specific telemetry pipeline, Redis offered lower operational overhead, sufficient persistence guarantees, and the consumer group functionality we needed to build a resilient ingestion service.
The final piece was the platform. We needed a flexible search and visualization engine. OpenSearch was the logical choice for its powerful query DSL, dashboarding capabilities, and open-source nature. The entire pipeline would run on our existing Kubernetes platform, which meant we had to consider the security and observability of the pipeline itself. This is where Cilium came in. We would use its eBPF-powered network policies to lock down communication between the pipeline components and use Hubble to gain visibility into the network traffic flow, ensuring the observability system itself was observable and secure.
Here is the high-level data flow we settled on:
graph TD subgraph "GitLab CI Runner" A[Mobile Build Job] -->|Generates build_metadata| B C[Webpack Build Job] -->|Generates bundle_stats| B end A -- Injects CI_PIPELINE_IID --> C B(Redis Stream: telemetry-events) subgraph "Kubernetes Cluster (Cilium CNI)" D[Consumer Service] -- XREADGROUP --> B D -- CiliumNetworkPolicy --> E E[OpenSearch Cluster] D -- Bulk Index --> E end F[OpenSearch Dashboards] -->|Queries| E style A fill:#f9f,stroke:#333,stroke-width:2px style C fill:#f9f,stroke:#333,stroke-width:2px style D fill:#ccf,stroke:#333,stroke-width:2px
Phase 1: Instrumenting the CI Pipeline
The first step was to capture metadata directly from the source: our GitLab CI pipeline. We needed to capture build durations, artifact sizes, and job statuses, all tagged with the unique pipeline ID. We added a separate telem-collector
stage that runs after all build and test jobs.
This job uses a simple shell script and the redis-cli
to push a structured JSON payload into our Redis Stream. A common mistake is to try to manage complex JSON payloads directly in shell scripts. Instead, we use jq
to construct the payload safely and pipe it to redis-cli
.
Here’s the relevant section of our .gitlab-ci.yml
:
# .gitlab-ci.yml
variables:
# The correlation key for the entire pipeline
BUILD_CORRELATION_ID: "mobile-build-${CI_PIPELINE_IID}"
REDIS_STREAM_KEY: "stream:telemetry:mobile-ci"
# ... other stages like build, test, package ...
publish_build_telemetry:
stage: telem-collector
image: alpine:3.18
script:
- apk add --no-cache redis jq
# NOTE: In a real-world project, REDIS_HOST and REDIS_PASSWORD
# would be configured as protected CI/CD variables.
- |
echo "Publishing build telemetry for ${BUILD_CORRELATION_ID}..."
# Calculate total duration (example, not a built-in GitLab variable)
# In a real pipeline, you might calculate this based on job start/end times via the API.
BUILD_DURATION_SECONDS=$((CI_JOB_STARTED_AT - CI_PIPELINE_CREATED_AT))
# Construct the JSON payload using jq
JSON_PAYLOAD=$(jq -n \
--arg id "$BUILD_CORRELATION_ID" \
--arg project "$CI_PROJECT_PATH" \
--arg ref "$CI_COMMIT_REF_NAME" \
--arg commit "$CI_COMMIT_SHORT_SHA" \
--arg status "$CI_JOB_STATUS" \
--argjson duration "$BUILD_DURATION_SECONDS" \
--arg type "ci_build_metadata" \
'{
"timestamp": (now | todate),
"type": $type,
"correlation_id": $id,
"project": $project,
"git_ref": $ref,
"commit_sha": $commit,
"job_status": $status,
"metrics": {
"duration_seconds": $duration
}
}')
# Push to Redis Stream using XADD
# The '*' tells Redis to generate a new entry ID automatically.
redis-cli -h $REDIS_HOST -a $REDIS_PASSWORD XADD $REDIS_STREAM_KEY '*' payload "$JSON_PAYLOAD"
echo "Telemetry published successfully."
when: always # Ensure this job runs even if previous stages fail
The when: always
clause is critical. We want telemetry even for failed builds, as analyzing patterns in build failures is a key part of improving engineering efficiency.
Phase 2: A Custom Webpack Plugin for Deep Bundle Analysis
Getting CI metadata was the easy part. The real challenge was extracting detailed, structured data from our Webpack build process. We have a complex monorepo where the mobile app’s embedded webviews are built alongside our main web app. We needed to know not just the total bundle size, but the size of individual chunks, the number of modules, and the contribution of third-party libraries for each specific build.
A standard Webpack build log is unstructured and difficult to parse. We decided to write a custom Webpack plugin to emit structured JSON telemetry.
The plugin hooks into Webpack’s compilation lifecycle. The done
hook is the perfect place to tap in, as it provides access to the final stats
object after the compilation is complete. This object contains a wealth of information that we can cherry-pick.
Here’s the code for TelemetryPlugin.js
:
// build/webpack/TelemetryPlugin.js
const { execSync } = require('child_process');
const Redis = require('ioredis');
class WebpackTelemetryPlugin {
constructor(options) {
// A pragmatic approach: Fail fast if critical config is missing.
if (!options || !options.correlationId) {
throw new Error('WebpackTelemetryPlugin: correlationId is required.');
}
if (!options.redisStreamKey) {
throw new Error('WebpackTelemetryPlugin: redisStreamKey is required.');
}
this.correlationId = options.correlationId;
this.redisStreamKey = options.redisStreamKey;
// In a production environment, connection details should come from a secure source.
// For simplicity, we use environment variables here.
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
password: process.env.REDIS_PASSWORD,
lazyConnect: true, // Don't connect until we need to
maxRetriesPerRequest: 3,
showFriendlyErrorStack: process.env.NODE_ENV !== 'production',
});
}
apply(compiler) {
compiler.hooks.done.tapPromise('WebpackTelemetryPlugin', async (stats) => {
// Don't generate telemetry for failed builds from this plugin;
// the CI pipeline will report the failure.
if (stats.hasErrors()) {
console.warn('WebpackTelemetryPlugin: Build has errors. Skipping telemetry.');
return;
}
console.log(`WebpackTelemetryPlugin: Compiling telemetry for ${this.correlationId}...`);
const statsJson = stats.toJson({
assets: true,
chunks: true,
modules: false, // Module list can be huge, we only need counts.
});
try {
await this.redisClient.connect();
const payload = this.constructPayload(statsJson);
// XADD stream-key * field value [field value ...]
await this.redisClient.xadd(
this.redisStreamKey,
'*',
'payload',
JSON.stringify(payload)
);
console.log('WebpackTelemetryPlugin: Telemetry successfully sent to Redis.');
} catch (error) {
// This is a critical point. Telemetry failure should not fail the build.
// We log the error but allow the CI job to succeed.
console.error('WebpackTelemetryPlugin: Failed to send telemetry to Redis.', error);
} finally {
// Ensure we disconnect to allow the process to exit cleanly.
if (this.redisClient.status === 'ready') {
this.redisClient.disconnect();
}
}
});
}
constructPayload(statsJson) {
// A key part of production-grade code is transforming raw data into a clean,
// well-structured format for the downstream consumer.
const totalAssetSize = statsJson.assets.reduce((sum, asset) => sum + asset.size, 0);
const largestAssets = statsJson.assets
.sort((a, b) => b.size - a.size)
.slice(0, 5)
.map(asset => ({ name: asset.name, size_bytes: asset.size }));
const chunkMetrics = statsJson.chunks.map(chunk => ({
id: chunk.id,
name: chunk.names.join(', ') || 'unnamed',
size_bytes: chunk.size,
modules: chunk.modules ? chunk.modules.length : 0,
initial: chunk.initial,
}));
const payload = {
timestamp: new Date().toISOString(),
type: 'webpack_build_stats',
correlation_id: this.correlationId,
metrics: {
compile_time_ms: statsJson.time,
total_asset_size_bytes: totalAssetSize,
asset_count: statsJson.assets.length,
chunk_count: statsJson.chunks.length,
},
details: {
largest_assets: largestAssets,
chunks: chunkMetrics,
}
};
return payload;
}
}
module.exports = WebpackTelemetryPlugin;
And here is how we integrate it into our webpack.config.js
:
// webpack.config.js
const WebpackTelemetryPlugin = require('./build/webpack/TelemetryPlugin');
// The correlation ID is passed from the CI environment variable we defined earlier.
const buildCorrelationId = process.env.BUILD_CORRELATION_ID;
const redisStreamKey = process.env.REDIS_STREAM_KEY;
module.exports = {
// ... other webpack configuration ...
plugins: [
// ... other plugins ...
// The plugin is only added if the correlation ID is present.
// This prevents it from running during local development builds.
buildCorrelationId && new WebpackTelemetryPlugin({
correlationId: buildCorrelationId,
redisStreamKey: redisStreamKey
}),
].filter(Boolean), // .filter(Boolean) is a clean way to remove falsy values from the array.
};
This plugin is now a core part of our build process. It reliably sends detailed bundle analysis to the same Redis Stream as our general CI metadata, all linked by the BUILD_CORRELATION_ID
. The error handling is pragmatic: a failure to send telemetry should never block a critical release build.
Phase 3: The Redis Stream Consumer and OpenSearch Ingestor
With data flowing into Redis, we needed a service to consume it, batch it, and index it into OpenSearch. This consumer is a long-running service deployed in our Kubernetes cluster. We chose Go for its performance and robust concurrency model, which is well-suited for a high-throughput I/O-bound task like this.
The core logic uses Redis consumer groups. This pattern is powerful for building resilient consumers. If one instance of our consumer service dies, another can pick up pending messages from where the last one left off, preventing data loss.
Here is a simplified but functional version of the consumer’s core processing loop:
// internal/consumer/processor.go
package consumer
import (
"context"
"encoding/json"
"log"
"time"
"github.com/go-redis/redis/v8"
"github.com/olivere/elastic/v7" // OpenSearch is compatible with the Elasticsearch v7 client
)
const (
streamKey = "stream:telemetry:mobile-ci"
consumerGroup = "opensearch-ingestor-group"
consumerName = "ingestor-1" // In production, this would be the pod name
batchSize = 100
batchTimeout = 5 * time.Second
)
type Processor struct {
redisClient *redis.Client
osClient *elastic.Client
}
func NewProcessor(redisAddr, osAddr string) (*Processor, error) {
// ... client initialization code omitted for brevity ...
// It would include robust connection logic and health checks.
// A critical step: ensure the consumer group exists.
// The '$' means only read new messages arriving after group creation.
_, err := rdb.XGroupCreateMkStream(context.Background(), streamKey, consumerGroup, "$").Result()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
log.Fatalf("Failed to create consumer group: %v", err)
}
return &Processor{redisClient: rdb, osClient: esc}, nil
}
func (p *Processor) Run(ctx context.Context) {
log.Println("Starting Redis Stream consumer...")
for {
select {
case <-ctx.Done():
log.Println("Shutting down consumer...")
return
default:
p.processMessages(ctx)
}
}
}
func (p *Processor) processMessages(ctx context.Context) {
// XReadGroup reads from the stream as part of a consumer group.
// 'Block' makes it wait for messages if the stream is empty.
streams, err := p.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: consumerGroup,
Consumer: consumerName,
Streams: []string{streamKey, ">"}, // ">" means read new, unread messages
Count: batchSize,
Block: batchTimeout,
}).Result()
if err != nil && err != redis.Nil {
log.Printf("Error reading from stream: %v", err)
time.Sleep(1 * time.Second) // Backoff on error
return
}
if len(streams) == 0 || len(streams[0].Messages) == 0 {
return // No new messages
}
messages := streams[0].Messages
bulkRequest := p.osClient.Bulk()
messageIDs := make([]string, 0, len(messages))
for _, msg := range messages {
payload, ok := msg.Values["payload"].(string)
if !ok {
log.Printf("Skipping message %s: payload is not a string", msg.ID)
continue
}
// Important: We must validate the JSON. Never trust upstream data.
var data map[string]interface{}
if err := json.Unmarshal([]byte(payload), &data); err != nil {
log.Printf("Skipping message %s: invalid JSON: %v", msg.ID, err)
// In a real system, this would go to a dead-letter queue.
continue
}
// Dynamically determine the index name, e.g., based on date
indexName := "telemetry-mobile-ci-" + time.Now().UTC().Format("2006.01.02")
// Create a bulk index request for OpenSearch
req := elastic.NewBulkIndexRequest().Index(indexName).Doc(data)
bulkRequest.Add(req)
messageIDs = append(messageIDs, msg.ID)
}
if bulkRequest.NumberOfActions() == 0 {
return
}
// Execute the bulk request to OpenSearch
bulkResponse, err := bulkRequest.Do(ctx)
if err != nil {
log.Printf("Failed to bulk index documents: %v", err)
// Don't ACK messages on failure, they will be re-delivered.
return
}
if bulkResponse.Errors {
log.Printf("Bulk indexing had errors. Succeeded: %d, Failed: %d",
len(bulkResponse.Succeeded()), len(bulkResponse.Failed()))
// More sophisticated logic would handle partial failures.
}
// Acknowledge the messages in Redis so they are not re-delivered.
// This is the most critical step for exactly-once processing semantics.
if err := p.redisClient.XAck(ctx, streamKey, consumerGroup, messageIDs...).Err(); err != nil {
log.Printf("Failed to ACK messages: %v", err)
}
log.Printf("Successfully processed and ACKed %d messages.", len(messageIDs))
}
This consumer is the heart of the pipeline’s reliability. The combination of XReadGroup
and XAck
ensures that even if the consumer pod crashes after reading messages but before successfully indexing them, the messages will be re-delivered to another consumer instance after a timeout.
Phase 4: Securing the Pipeline with Cilium
Our observability pipeline is now a critical piece of infrastructure. Its components—the CI runners, Redis, and OpenSearch—are all running in Kubernetes. Unrestricted network access between these components is an unnecessary security risk.
We use Cilium to enforce a zero-trust network policy. The principle is simple: deny all traffic by default and explicitly allow only the specific communication paths required for the pipeline to function.
First, we need a policy that allows our CI runner pods (which we label with app=gitlab-runner
) to talk to our Redis primary instance (labeled app=redis,role=master
) on the standard Redis port.
# k8s/policies/allow-runner-to-redis.yaml
apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
name: "allow-runner-to-redis"
namespace: ci-cd
spec:
endpointSelector:
matchLabels:
app: gitlab-runner
egress:
- toEndpoints:
- matchLabels:
app: redis
role: master
# This policy applies to pods in the 'database' namespace
namespace: database
toPorts:
- port: "6379"
protocol: TCP
Next, we need a policy for our Go consumer service (labeled app=telemetry-consumer
) to connect to Redis for reading the stream and to OpenSearch for writing the data.
# k8s/policies/allow-consumer-dependencies.yaml
apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
name: "allow-consumer-dependencies"
namespace: observability
spec:
endpointSelector:
matchLabels:
app: telemetry-consumer
egress:
# Allow traffic to Redis
- toEndpoints:
- matchLabels:
app: redis
role: master
namespace: database
toPorts:
- port: "6379"
protocol: TCP
# Allow traffic to OpenSearch
- toEndpoints:
- matchLabels:
app: opensearch
role: data
namespace: observability
toPorts:
- port: "9200"
protocol: TCP
These policies, enforced at the kernel level by eBPF, ensure that even if a container in the CI runner pod were compromised, it could not reach the OpenSearch cluster directly or any other service in the cluster. It can only talk to Redis.
Furthermore, we can use Cilium’s Hubble CLI to observe the network flows in real-time to verify our policies are working as intended and to debug connectivity issues without needing to exec
into pods.
# Observe traffic from the telemetry consumer
$ hubble observe --namespace observability --from app=telemetry-consumer -f
# Sample output showing allowed traffic
Oct 27 11:45:01.234 [ether] observability/telemetry-consumer-78f... (ID:1234) -> database/redis-master-0 (ID:5678) TCP 10.0.1.56:45212 -> 10.0.2.10:6379 SENT
Oct 27 11:45:01.890 [ether] observability/telemetry-consumer-78f... (ID:1234) -> observability/opensearch-data-0 (ID:9012) TCP 10.0.1.56:45218 -> 10.0.1.99:9200 SENT
Putting It All Together: The Unified Dashboard
With data flowing and the pipeline secured, the final step was creating a dashboard in OpenSearch. This dashboard has a single filter at the top: correlation_id
. When an engineer investigates a performance issue in a new release, they can paste the CI_PIPELINE_IID
into this filter.
Instantly, they see a set of visualizations:
- A timeseries graph showing Webpack chunk sizes for this build compared to the last 10 builds.
- A data table listing the largest assets in the bundle.
- A metric visualization showing the CI build duration.
- If integrated with backend logs, another panel would show API error rates and latencies for clients reporting this specific build version.
This unified view broke down the data silos. A spike in mobile app startup time was directly correlated with a 300KB increase in the main JavaScript chunk size and a longer CI build duration, all from a single query. The “blame game” was replaced by data-driven analysis, shortening the feedback loop from days to minutes.
The current implementation is a massive step forward, but it’s not the end of the road. The telemetry is still limited to the build-time process. The next logical evolution is to propagate this correlation_id
into the mobile application binary itself. The app would then include this ID in the headers of every API request it makes. This would allow us to link backend traces and real-user monitoring (RUM) data directly to the specific build artifact that generated them, completing the observability loop from commit to production runtime. Furthermore, our single-instance Go consumer is a potential bottleneck; we will need to implement leader election or a sharded processing model to scale it horizontally as our build volume continues to grow.