Building a Non-Blocking Telemetry Pipeline for a Go API Gateway Using Celery for Asynchronous Processing


The latency telemetry for our primary API gateway was becoming the single largest source of request latency. The existing middleware, which synchronously wrote log entries and metrics to a remote service on every request, was introducing a p99 latency overhead of over 80ms. Under heavy load, this synchronous call would frequently time out, causing cascading failures. Initial attempts at sampling were unacceptable from a business intelligence perspective; we needed 100% fidelity for billing and security auditing. The core problem was clear: the critical request path was being polluted by a non-critical, blocking I/O operation.

Our first principle for the redesign was absolute decoupling. The gateway’s data plane should do one thing: proxy requests. Telemetry capture must be a “fire-and-forget” operation with a minimal, constant-time performance footprint on the request itself. The actual processing, aggregation, and storage of this telemetry data had to occur completely out-of-band. This led us to a hybrid architecture. For the gateway’s performance-critical data plane, Go was the non-negotiable choice. For the out-of-band processing, we could leverage our organization’s extensive and mature Python ecosystem, particularly our existing Celery infrastructure which was already processing millions of background tasks daily.

The final technology selection was driven by this pragmatic constraint.

  1. API Gateway Core: A custom Go application. We used the Echo framework for the control plane API (managing routes, consumers, etc.) due to its speed and simplicity. However, the actual request proxying is handled by a barebones net/http/httputil.ReverseProxy wrapped in custom middleware. This avoids framework overhead on the hot path.
  2. Asynchronous Message Bus: Redis. While Kafka offers stronger delivery guarantees, our existing Celery farm is built on Redis. In a real-world project, leveraging battle-tested operational knowledge and tooling often outweighs the marginal benefits of a “technically superior” but unfamiliar system. The risk of telemetry data loss during a Redis outage was deemed acceptable.
  3. Asynchronous Processing: Python with Celery. Python’s rich data manipulation libraries and ecosystem make it perfect for the telemetry transformation logic. Celery provides the robust framework for distributed task queuing we already know how to scale and monitor.
  4. Data Storage: A time-series database (TSDB), specifically VictoriaMetrics. We anticipated high-cardinality labels (e.g., user_id, api_key_id, source_ip), which is a known weakness for some Prometheus-like systems. VictoriaMetrics is designed to handle this gracefully.
  5. Management Dashboard: A simple React frontend whose CSS is managed by a PostCSS pipeline. This allowed our front-end team to use their standard tooling (tailwind, autoprefixer) to quickly build a consistent UI for visualizing the TSDB data and interacting with the gateway’s control plane API.

The architecture looks like this:

sequenceDiagram
    participant Client
    participant Gateway (Go)
    participant Redis
    participant Celery Worker (Python)
    participant Time Series DB

    Client->>+Gateway: API Request (e.g., GET /v1/data)
    Gateway->>Gateway: Telemetry Middleware intercepts
    Note right of Gateway: Captures req/res data (path, status, latency)
    Gateway-->>Redis: RPUSH telemetry_queue '{"...json..."}'
    Gateway-->>-Client: API Response
    
    loop Batch Processing
        Celery Worker (Python)- RPOP/BRPOP ->>Redis: Fetch telemetry data
        Note right of Celery Worker (Python): Deserialize JSON, transform to metrics
        Celery Worker (Python)->>Time Series DB: Batch write metrics
    end

Gateway Telemetry Middleware Implementation

The heart of the low-latency solution is the Go middleware. It cannot block. It cannot wait for a network ACK. Its job is to capture the necessary data, serialize it, and hand it off to a background goroutine for delivery. A buffered channel is used to absorb bursts and prevent the delivery goroutine from blocking the request goroutine if Redis is slow.

Here is the core middleware. It’s designed to be wrapped around the httputil.ReverseProxy.

package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/go-redis/redis/v8"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
)

// TelemetryData holds all the information we want to capture per request.
type TelemetryData struct {
	Timestamp     time.Time `json:"timestamp"`
	Latency       int64     `json:"latency_ms"`
	RequestMethod string    `json:"request_method"`
	RequestPath   string    `json:"request_path"`
	ResponseCode  int       `json:"response_code"`
	ClientIP      string    `json:"client_ip"`
	APIKeyID      string    `json:"api_key_id"` // Example of a high-cardinality label
	UserID        string    `json:"user_id"`      // Another high-cardinality label
}

// TelemetryCollector is responsible for collecting and dispatching telemetry.
type TelemetryCollector struct {
	redisClient *redis.Client
	redisQueue  string
	dataChannel chan TelemetryData
	wg          sync.WaitGroup
	ctx         context.Context
	cancel      context.CancelFunc
}

// NewTelemetryCollector creates and starts a collector.
// It uses a buffered channel to avoid blocking requests.
// A buffer size of 10000 is a starting point; this needs tuning based on traffic.
func NewTelemetryCollector(redisAddr, redisQueue string) (*TelemetryCollector, error) {
	rdb := redis.NewClient(&redis.Options{
		Addr:     redisAddr,
		PoolSize: 20, // A reasonably sized pool for the background worker.
	})

	if _, err := rdb.Ping(context.Background()).Result(); err != nil {
		return nil, err
	}

	ctx, cancel := context.WithCancel(context.Background())

	collector := &TelemetryCollector{
		redisClient: rdb,
		redisQueue:  redisQueue,
		dataChannel: make(chan TelemetryData, 10000),
		ctx:         ctx,
		cancel:      cancel,
	}

	collector.wg.Add(1)
	go collector.worker() // Start the background worker

	return collector, nil
}

// Middleware returns the http.Handler middleware.
func (tc *TelemetryCollector) Middleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		start := time.Now()

		// Use a custom response writer to capture status code
		res := &responseObserver{ResponseWriter: w}

		next.ServeHTTP(res, r)

		latency := time.Since(start).Milliseconds()

		// Example of extracting high-cardinality data from headers/context
		// In a real project, this would come from an auth middleware.
		apiKey := r.Header.Get("X-API-Key")
		userID := r.Header.Get("X-User-ID")

		data := TelemetryData{
			Timestamp:     start.UTC(),
			Latency:       latency,
			RequestMethod: r.Method,
			RequestPath:   r.URL.Path,
			ResponseCode:  res.statusCode,
			ClientIP:      echo.ExtractIPFromRealIPHeader()(r),
			APIKeyID:      apiKey,
			UserID:        userID,
		}

		// This is the critical part: non-blocking send.
		// If the channel is full, we drop the data. This is a conscious
		// trade-off: preserving gateway performance is more important than
		// capturing 100% of telemetry during extreme overload or a
		// downstream failure.
		select {
		case tc.dataChannel <- data:
			// Sent successfully
		default:
			// Channel is full, data dropped. Log this event.
			log.Println("WARNING: Telemetry channel full. Dropping data.")
		}
	})
}

// worker is the background goroutine that pulls from the channel and pushes to Redis.
func (tc *TelemetryCollector) worker() {
	defer tc.wg.Done()
	ticker := time.NewTicker(2 * time.Second) // Batch pushes every 2 seconds
	defer ticker.Stop()

	batch := make([]TelemetryData, 0, 1000) // Max batch size of 1000

	for {
		select {
		case <-tc.ctx.Done():
			// Drain remaining items before shutting down
			log.Println("Telemetry worker shutting down. Draining channel...")
			close(tc.dataChannel)
			for data := range tc.dataChannel {
				batch = append(batch, data)
			}
			tc.pushToRedis(batch)
			log.Println("Final telemetry batch pushed.")
			return
		case data := <-tc.dataChannel:
			batch = append(batch, data)
			if len(batch) >= 1000 {
				tc.pushToRedis(batch)
				batch = make([]TelemetryData, 0, 1000) // Reset batch
			}
		case <-ticker.C:
			if len(batch) > 0 {
				tc.pushToRedis(batch)
				batch = make([]TelemetryData, 0, 1000) // Reset batch
			}
		}
	}
}

func (tc *TelemetryCollector) pushToRedis(batch []TelemetryData) {
	if len(batch) == 0 {
		return
	}

	// In a production system, use a more efficient serialization format like MessagePack or Protobuf.
	// JSON is used here for simplicity.
	pipe := tc.redisClient.Pipeline()
	for _, data := range batch {
		payload, err := json.Marshal(data)
		if err != nil {
			log.Printf("ERROR: Failed to marshal telemetry data: %v", err)
			continue
		}
		pipe.LPush(tc.ctx, tc.redisQueue, payload)
	}

	_, err := pipe.Exec(tc.ctx)
	if err != nil {
		log.Printf("ERROR: Failed to push telemetry batch to Redis: %v", err)
		// Implement retry logic or dead-letter queue here if needed.
	} else {
		log.Printf("Pushed %d telemetry items to Redis.", len(batch))
	}
}

// Shutdown gracefully stops the collector.
func (tc *TelemetryCollector) Shutdown() {
	tc.cancel()
	tc.wg.Wait()
	tc.redisClient.Close()
}

// responseObserver is a wrapper around http.ResponseWriter to capture the status code.
type responseObserver struct {
	http.ResponseWriter
	statusCode int
}

func (o *responseObserver) WriteHeader(code int) {
	o.ResponseWriter.WriteHeader(code)
	o.statusCode = code
}

func main() {
	// A simple backend service to proxy to.
	backend := echo.New()
	backend.GET("/data", func(c echo.Context) error {
		time.Sleep(20 * time.Millisecond) // Simulate work
		return c.String(http.StatusOK, "Hello from backend")
	})
	go func() {
		backend.Start(":8081")
	}()

	// Setup Telemetry
	redisAddr := os.Getenv("REDIS_ADDR")
	if redisAddr == "" {
		redisAddr = "localhost:6379"
	}
	collector, err := NewTelemetryCollector(redisAddr, "gateway_telemetry")
	if err != nil {
		log.Fatalf("Failed to create telemetry collector: %v", err)
	}
	defer collector.Shutdown()

	// Setup Reverse Proxy
	proxy := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// This is where you would do your actual routing logic.
		// For this example, we just proxy everything to the backend.
		r.URL.Scheme = "http"
		r.URL.Host = "localhost:8081"
		r.Host = "localhost:8081"
		proxyHandler := http.StripPrefix("/", http.FileServer(http.Dir("/your/static/files"))) // Replace this with a real reverse proxy
		
		// A minimal reverse proxy for demonstration
		// In a real application, use httputil.NewSingleHostReverseProxy
		req, _ := http.NewRequest(r.Method, r.URL.String(), r.Body)
		req.Header = r.Header
		client := &http.Client{Timeout: 5 * time.Second}
		resp, err := client.Do(req)
		if err != nil {
			http.Error(w, "Proxy Error", http.StatusBadGateway)
			return
		}
		defer resp.Body.Close()
		for k, v := range resp.Header {
			w.Header()[k] = v
		}
		w.WriteHeader(resp.StatusCode)
		http.ServeContent(w, r, "", resp.ModTime, resp.Body)
	})

	// Wrap the proxy with our telemetry middleware
	gatewayHandler := collector.Middleware(proxy)

	// Use Echo for the control plane but the standard net/http for the data plane
	e := echo.New()
	e.Use(middleware.Logger())
	e.GET("/control/status", func(c echo.Context) error {
		return c.JSON(http.StatusOK, map[string]string{"status": "ok"})
	})

	// Create a separate server for the data plane to apply middleware
	dataPlaneServer := &http.Server{
		Addr:    ":8080",
		Handler: gatewayHandler,
	}

	go func() {
		if err := dataPlaneServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			e.Logger.Fatal("shutting down the data plane server")
		}
	}()

	// Start the control plane server
	go func() {
		e.Start(":9090")
	}()

	// Graceful shutdown
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	log.Println("Shutting down servers...")

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	if err := dataPlaneServer.Shutdown(ctx); err != nil {
		log.Fatal("Data plane server forced to shutdown:", err)
	}
	if err := e.Shutdown(ctx); err != nil {
		log.Fatal("Control plane server forced to shutdown:", err)
	}

	log.Println("Servers gracefully stopped.")
}

A key pitfall here is the unbounded nature of the goroutine that pushes to Redis. If Redis becomes unavailable or extremely slow, the buffered channel dataChannel will fill up. The select statement with a default case ensures that we start dropping telemetry instead of blocking web requests, which is the correct trade-off. Logging the drop is crucial for observability into the health of the telemetry pipeline itself.

Celery Worker for Asynchronous Processing

On the other side of the Redis queue is our Python Celery worker. Its job is to consume the JSON blobs, parse them, transform them into a format our TSDB understands, and write them in batches.

First, the Celery application setup:

# tasks/celery_app.py
from celery import Celery
import os

REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0")

app = Celery(
    'telemetry_tasks',
    broker=REDIS_URL,
    backend=REDIS_URL, # Backend isn't strictly needed but good practice
    include=['tasks.process_telemetry']
)

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    # Acknowledge tasks only after they complete successfully.
    # If a worker crashes mid-task, the message will be redelivered.
    task_acks_late=True,
    # Prefetch multiplier of 1 with concurrency of 1 means the worker
    # fetches one message at a time. Essential for our custom batching.
    worker_prefetch_multiplier=1,
)

if __name__ == '__main__':
    app.start()

And now the task itself. A common mistake is to process one message per task. This is inefficient, as it incurs the overhead of task scheduling for every single request. A much more performant approach is to have the task itself implement a batching loop. It will try to pull multiple messages from Redis up to a certain batch size or timeout, process them all, and then write to the database once.

# tasks/process_telemetry.py
import json
import logging
import os
import time
from typing import List, Dict, Any

import redis
import requests
from celery.signals import worker_process_init
from celery.utils.log import get_task_logger

from .celery_app import app

# Configuration
TSDB_WRITE_URL = os.environ.get("TSDB_WRITE_URL", "http://localhost:8428/api/v1/import/prometheus")
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0")
REDIS_QUEUE_NAME = "gateway_telemetry"
BATCH_SIZE = 500
BATCH_TIMEOUT_SECONDS = 5

logger = get_task_logger(__name__)
logger.setLevel(logging.INFO)

# Use a connection pool for Redis. This is critical for performance.
# The connection is established once per worker process.
REDIS_POOL = None

@worker_process_init.connect
def init_worker(**kwargs):
    """Initialize resources once per worker process."""
    global REDIS_POOL
    logger.info("Initializing Redis connection pool for worker process.")
    REDIS_POOL = redis.ConnectionPool.from_url(REDIS_URL, decode_responses=True)

def fetch_batch_from_redis(r: redis.Redis) -> List[Dict[str, Any]]:
    """
    Fetches a batch of telemetry data from the Redis list.
    This custom fetching logic allows us to bypass Celery's message-per-task
    model and process in more efficient batches.
    """
    batch = []
    start_time = time.time()
    
    # Use a pipeline for efficiency to get multiple items
    pipe = r.pipeline()
    # RPOP is atomic and returns one item at a time
    for _ in range(BATCH_SIZE):
        pipe.rpop(REDIS_QUEUE_NAME)
    
    results = pipe.execute()
    
    for raw_data in results:
        if raw_data is None:
            # Queue is empty
            break
        try:
            batch.append(json.loads(raw_data))
        except json.JSONDecodeError:
            logger.error(f"Failed to decode JSON from Redis: {raw_data}")
            
    return batch


def format_for_prometheus(data: Dict[str, Any]) -> str:
    """
    Transforms a telemetry dictionary into Prometheus text exposition format.
    Example output:
    http_requests_latency_ms{method="GET",path="/data",status="200",api_key="key1"} 85 1672531200000
    """
    labels = {
        "method": data.get("request_method", "unknown"),
        "path": data.get("request_path", "unknown"),
        "status": str(data.get("response_code", "0")),
        "api_key_id": data.get("api_key_id", ""),
        "user_id": data.get("user_id", ""),
        "client_ip": data.get("client_ip", ""),
    }
    
    # The pitfall here is high-cardinality labels. `api_key_id`, `user_id`, and `client_ip`
    # can cause a "cardinality explosion". The TSDB must be chosen to handle this.
    # We also need to sanitize label values to be valid Prometheus label values.
    
    label_str = ",".join([f'{k}="{v}"' for k, v in labels.items() if v])
    
    # Convert ISO timestamp from Go to Unix milliseconds for TSDB
    ts = int(time.mktime(time.strptime(data['timestamp'], "%Y-%m-%dT%H:%M:%S.%fZ")) * 1000)
    
    latency_metric = f"http_requests_latency_ms{{{label_str}}} {data['latency_ms']} {ts}"
    count_metric = f"http_requests_total{{{label_str}}} 1 {ts}"
    
    return f"{latency_metric}\n{count_metric}"


@app.task(bind=True, max_retries=3, default_retry_delay=30)
def process_telemetry_batch(self):
    """
    This is a long-running task that continuously polls Redis for batches.
    We only run one instance of this task per worker process.
    """
    if not REDIS_POOL:
        logger.error("Redis pool not initialized. Retrying task.")
        raise self.retry()
        
    r = redis.Redis(connection_pool=REDIS_POOL)
    logger.info("Starting telemetry batch processing loop.")
    
    while True:
        try:
            batch = fetch_batch_from_redis(r)
            if not batch:
                # Queue is empty, sleep for a bit to prevent busy-looping
                time.sleep(1)
                continue

            logger.info(f"Processing a batch of {len(batch)} items.")
            
            # Transform to Prometheus format
            payload_lines = [format_for_prometheus(item) for item in batch]
            payload = "\n".join(payload_lines)

            # Post to VictoriaMetrics
            response = requests.post(TSDB_WRITE_URL, data=payload.encode('utf-8'), timeout=10)
            response.raise_for_status()

            logger.info(f"Successfully wrote {len(batch)} metrics to TSDB.")

        except requests.RequestException as e:
            # If writing to TSDB fails, we should retry. Celery's retry mechanism
            # is not ideal for this loop. A better approach is a custom retry inside the loop.
            logger.error(f"Failed to write to TSDB: {e}. The current batch will be lost.")
            # In a more robust system, you would push this failed batch to a dead-letter queue.
            time.sleep(5) # Backoff before next attempt
            
        except Exception as e:
            logger.error(f"An unexpected error occurred: {e}", exc_info=True)
            time.sleep(5)

To run this, you would start a Celery worker configured to run a single, concurrent instance of this task: celery -A tasks.celery_app worker -l info -c 1. The task is an infinite loop, effectively turning the Celery worker into a dedicated streaming processor. This is a slightly unconventional use of Celery, which is typically used for discrete, short-lived tasks, but it’s a pragmatic way to implement a continuous processor without introducing a full-fledged stream processing framework like Flink or Spark Streaming.

Full System Deployment and Configuration

Tying this all together requires orchestrating the different components. A docker-compose.yml file illustrates the relationships:

version: '3.8'

services:
  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"

  victoria-metrics:
    image: victoriametrics/victoria-metrics
    ports:
      - "8428:8428"
    volumes:
      - vm-data:/victoria-metrics-data
    command:
      - "--storageDataPath=/victoria-metrics-data"
      - "--promscrape.config=/etc/prometheus/prometheus.yml" # not used for writes, but good to have
      - "--retentionPeriod=1" # 1 month retention

  api-gateway:
    build:
      context: ./gateway-go
    ports:
      - "8080:8080" # Data plane
      - "9090:9090" # Control plane
    environment:
      - REDIS_ADDR=redis:6379
    depends_on:
      - redis

  celery-worker:
    build:
      context: ./celery-python
    command: celery -A tasks.celery_app worker -l info -c 1 --autoscale=1,1
    environment:
      - REDIS_URL=redis://redis:6379/0
      - TSDB_WRITE_URL=http://victoria-metrics:8428/api/v1/import/prometheus
    depends_on:
      - redis
      - victoria-metrics

  # This service would build and serve the dashboard UI.
  # The Dockerfile for this service would include the `npm run build` step
  # which executes the PostCSS pipeline.
  dashboard-ui:
    build:
      context: ./dashboard-ui # contains package.json with postcss scripts
    ports:
      - "3000:80"

volumes:
  vm-data:

The PostCSS integration happens within the dashboard-ui service. Its Dockerfile would look something like this:

# Stage 1: Build the React app
FROM node:18-alpine AS builder
WORKDIR /app
COPY package.json yarn.lock ./
RUN yarn install
COPY . .
# This command runs the build process, which includes PostCSS
# for processing Tailwind CSS, autoprefixing, etc.
RUN yarn build

# Stage 2: Serve the static files
FROM nginx:1.23-alpine
COPY --from=builder /app/build /usr/share/nginx/html
COPY nginx.conf /etc/nginx/conf.d/default.conf

This demonstrates how the front-end build toolchain, while a distinct technology, fits cleanly into the overall system’s operational deployment.

The architecture’s main limitation is its reliance on an at-most-once delivery semantic between the gateway and Redis, and the potential for data loss in the Celery worker if the TSDB is down for an extended period. The Go middleware intentionally drops data if the outbound channel is full, prioritizing gateway stability. The Python worker currently logs and discards batches that fail to write; a more resilient implementation would use a dead-letter queue. Furthermore, using Celery as a pseudo-streaming engine is a pragmatic hack. For stateful processing or aggregations (like calculating unique users over a time window), this model would break down, necessitating a move to a true stream-processing system.


  TOC