Implementing a Resilient gRPC-Go Observability Pipeline on Azure AKS with OpenSearch


The transition to a microservices architecture on Azure Kubernetes Service (AKS) left our operations team flying blind. With dozens of gRPC services communicating, tracking a single user request through the system became an exercise in forensic archaeology. Our initial approach of kubectl logs was untenable during incidents, leading to an unacceptable Mean Time to Resolution (MTTR). The core technical pain point was clear: we lacked a centralized, structured, and correlated logging system capable of handling the high-throughput, low-latency demands of our gRPC-based infrastructure. A mandate came down to solve this, and our platform team tackled it using a series of two-week Scrum sprints.

Our initial concept during the first sprint planning was deceptively simple: embed a logging library in each Go service to send JSON logs to a collector. This was quickly dismissed. Direct logging from the request path, especially over the network, introduces unacceptable latency and couples application logic with observability concerns. A common mistake is to treat logging as a fire-and-forget operation without considering its performance impact. If the logging endpoint is slow or unavailable, it can stall the entire service. This realization shifted our focus towards a non-blocking, asynchronous architecture built around gRPC interceptors.

The technology selection debate centered on three key components: the transport mechanism, the collection agent, and the storage backend. For transport, gRPC interceptors were the obvious choice. They provide a perfect cross-cutting concern entry point, allowing us to capture request/response metadata without polluting business logic. For the agent, we chose Fluent Bit over alternatives like Logstash or Fluentd due to its minimal resource footprint and first-class Kubernetes integration. For the backend, we opted to self-host an OpenSearch cluster directly on AKS. While managed services are easier, self-hosting gave us fine-grained control over versioning, plugin installation, and cost—a trade-off we were willing to make, though not without acknowledging the significant operational overhead it would introduce.

The architecture we settled on was a multi-stage pipeline. A gRPC interceptor in each Go service would capture request data, format it as structured JSON, and write it to a non-blocking in-memory channel. A separate goroutine would consume from this channel and write the JSON to stdout. From there, a Fluent Bit DaemonSet running on each AKS node would tail the container logs, parse the JSON, enrich it with Kubernetes metadata, and forward it to our internal OpenSearch cluster for indexing and analysis.

graph TD
    subgraph Client
        A[gRPC Client]
    end

    subgraph "AKS Pod: Go Microservice"
        B(gRPC Server) --> C{Unary Interceptor};
        C -- Request Path --> D[Business Logic];
        C -- Non-blocking --> E[In-Memory Log Channel];
        F[Log Processor Goroutine] -- reads --> E;
        F --> G[stdout as structured JSON];
    end

    subgraph "AKS Node"
        H[Fluent Bit DaemonSet] -- tails --> G;
        H -- enrich with k8s metadata --> H;
    end
    
    subgraph "AKS: OpenSearch Cluster"
        I(OpenSearch Service)
        J(OpenSearch StatefulSet)
        I --> J
    end
    
    A --> B;
    H -- ships logs --> I;

Our second sprint focused on building the core of this system: the Go interceptor. The goal was to create a resilient, non-blocking logger that would not impact service performance.

// internal/interceptor/observability.go

package interceptor

import (
	"context"
	"encoding/json"
	"os"
	"sync"
	"time"

	"github.com/google/uuid"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/peer"
	"google.golang.org/grpc/status"
)

const (
	// A common pitfall is not defining a clear, consistent key for correlation IDs.
	correlationIDKey = "x-correlation-id"
	// Buffer size for the log channel. A small buffer risks blocking on bursty traffic,
	// while a large one consumes memory. 1000 is a reasonable starting point for tuning.
	logChannelBufferSize = 1000
)

// logEntry defines the structured format for our logs.
// Using a struct ensures consistency and simplifies JSON marshaling.
type logEntry struct {
	Timestamp     time.Time              `json:"timestamp"`
	CorrelationID string                 `json:"correlation_id"`
	Service       string                 `json:"service"`
	Method        string                 `json:"grpc_method"`
	StatusCode    string                 `json:"grpc_status_code"`
	DurationMs    float64                `json:"duration_ms"`
	PeerAddress   string                 `json:"peer_address"`
	UserAgent     string                 `json:"user_agent"`
	Error         string                 `json:"error,omitempty"`
	Metadata      map[string]interface{} `json:"metadata,omitempty"`
}

// AsyncLogger is a non-blocking logger that writes to stdout.
type AsyncLogger struct {
	serviceName string
	logChannel  chan logEntry
	writer      *json.Encoder
	wg          sync.WaitGroup
	shutdown    chan struct{}
}

// NewAsyncLogger creates and starts a new asynchronous logger.
// In a real-world project, this would be part of the application's startup lifecycle.
func NewAsyncLogger(serviceName string) *AsyncLogger {
	logger := &AsyncLogger{
		serviceName: serviceName,
		logChannel:  make(chan logEntry, logChannelBufferSize),
		writer:      json.NewEncoder(os.Stdout),
		shutdown:    make(chan struct{}),
	}

	logger.wg.Add(1)
	go logger.processLogs()

	return logger
}

// processLogs is the core worker goroutine. It reads from the channel
// and writes to stdout. This decouples logging I/O from the request path.
func (l *AsyncLogger) processLogs() {
	defer l.wg.Done()
	for {
		select {
		case entry := <-l.logChannel:
			// The encoder writes directly to the stream (stdout), which is
			// more efficient than marshaling to a byte slice and then writing.
			if err := l.writer.Encode(entry); err != nil {
				// If we can't write to stdout, something is seriously wrong.
				// We log to stderr as a last resort.
				// This avoids an infinite loop if stdout logging itself is the problem.
				fmt.Fprintf(os.Stderr, "failed to write log entry: %v\n", err)
			}
		case <-l.shutdown:
			// Graceful shutdown: process remaining logs in the channel.
			for len(l.logChannel) > 0 {
				entry := <-l.logChannel
				_ = l.writer.Encode(entry)
			}
			return
		}
	}
}

// Log sends a log entry to the processing channel.
// It includes a non-blocking send to prevent stalling the caller.
func (l *AsyncLogger) Log(entry logEntry) {
	select {
	case l.logChannel <- entry:
		// Log successfully queued
	default:
		// Channel is full. This indicates the logging pipeline can't keep up.
		// Dropping the log is preferable to blocking the service.
		// In a real-world project, you would increment a metric here to monitor log drop rate.
		fmt.Fprintf(os.Stderr, "log channel buffer full, dropping log for method: %s\n", entry.Method)
	}
}

// Shutdown initiates a graceful shutdown of the logger.
func (l *AsyncLogger) Shutdown(ctx context.Context) error {
	close(l.shutdown)
	
	// Wait for the WaitGroup with a timeout to prevent hanging.
	done := make(chan struct{})
	go func() {
		l.wg.Wait()
		close(done)
	}()

	select {
	case <-done:
		return nil
	case <-ctx.Done():
		return fmt.Errorf("timed out waiting for logger to shut down: %w", ctx.Err())
	}
}


// UnaryServerInterceptor returns a new unary server interceptor that logs request details.
func (l *AsyncLogger) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
		startTime := time.Now()

		md, _ := metadata.FromIncomingContext(ctx)
		correlationID := getOrSetCorrelationID(md)

		// Create a new context with the correlation ID to pass downstream.
		// This is crucial for maintaining context through the entire call stack.
		newCtx := metadata.AppendToOutgoingContext(context.Background(), correlationIDKey, correlationID)
		newCtx = metadata.NewIncomingContext(newCtx, md)

		// Invoke the actual RPC handler
		resp, err := handler(newCtx, req)

		duration := time.Since(startTime)
		st, _ := status.FromError(err)

		var peerAddress string
		if p, ok := peer.FromContext(ctx); ok {
			peerAddress = p.Addr.String()
		}

		var userAgent string
		if ua := md.Get("user-agent"); len(ua) > 0 {
			userAgent = ua[0]
		}
		
		entry := logEntry{
			Timestamp:     startTime.UTC(),
			CorrelationID: correlationID,
			Service:       l.serviceName,
			Method:        info.FullMethod,
			StatusCode:    st.Code().String(),
			DurationMs:    float64(duration.Nanoseconds()) / 1e6,
			PeerAddress:   peerAddress,
			UserAgent:     userAgent,
		}

		if err != nil {
			entry.Error = st.Message()
		}

		l.Log(entry)

		return resp, err
	}
}

func getOrSetCorrelationID(md metadata.MD) string {
	if id := md.Get(correlationIDKey); len(id) > 0 && id[0] != "" {
		return id[0]
	}
	// No correlation ID found, generate a new one.
	return uuid.New().String()
}

The implementation of AsyncLogger was critical. It uses a buffered channel to decouple the gRPC request goroutine from the I/O-bound operation of writing logs. The processLogs goroutine acts as a singleton consumer. The non-blocking send in the Log method is a key resilience pattern: if the logging system is overwhelmed, we drop logs instead of degrading service performance. Monitoring the “log channel buffer full” error rate becomes an essential SLI for our observability platform itself. The graceful shutdown logic is equally important to prevent log loss during deployments or pod restarts.

The next sprint was dedicated to deploying a production-ready OpenSearch cluster on AKS. A common mistake is to deploy it as a standard Deployment, which doesn’t provide stable network identifiers or persistent storage volumes. A StatefulSet is the correct approach.

Here is a simplified but functional set of manifests for a three-node OpenSearch cluster.

# opensearch-storage.yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: opensearch-sc
provisioner: disk.csi.azure.com
parameters:
  skuName: Premium_LRS # Use premium SSD for better I/O performance
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true
---
# opensearch-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: opensearch-config
  namespace: observability
data:
  opensearch.yml: |
    cluster.name: my-opensearch-cluster
    node.name: ${NODE_NAME}
    network.host: 0.0.0.0
    discovery.seed_hosts:
     - opensearch-cluster-0.opensearch-headless.observability.svc.cluster.local
     - opensearch-cluster-1.opensearch-headless.observability.svc.cluster.local
     - opensearch-cluster-2.opensearch-headless.observability.svc.cluster.local
    cluster.initial_master_nodes:
     - opensearch-cluster-0
     - opensearch-cluster-1
     - opensearch-cluster-2
---
# opensearch-headless-svc.yaml
apiVersion: v1
kind: Service
metadata:
  name: opensearch-headless
  namespace: observability
spec:
  clusterIP: None # This makes it a headless service
  selector:
    app: opensearch-cluster
  ports:
    - name: transport
      port: 9300
      protocol: TCP
---
# opensearch-svc.yaml
apiVersion: v1
kind: Service
metadata:
  name: opensearch-http
  namespace: observability
spec:
  selector:
    app: opensearch-cluster
  ports:
    - name: http
      port: 9200
      protocol: TCP
---
# opensearch-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: opensearch-cluster
  namespace: observability
spec:
  serviceName: opensearch-headless
  replicas: 3
  selector:
    matchLabels:
      app: opensearch-cluster
  template:
    metadata:
      labels:
        app: opensearch-cluster
    spec:
      containers:
      - name: opensearch
        image: opensearchproject/opensearch:2.9.0
        env:
          - name: NODE_NAME
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          # Production-grade JVM settings are critical. Never use defaults.
          - name: OPENSEARCH_JAVA_OPTS
            value: "-Xms1g -Xmx1g" # Set heap to 50% of container memory limit, up to 30GB
          - name: "DISABLE_INSTALL_DEMO_CONFIG"
            value: "true"
          - name: "DISABLE_SECURITY_PLUGIN"
            value: "true" # For simplicity in this example. In production, security is mandatory.
        ports:
        - containerPort: 9200
          name: http
        - containerPort: 9300
          name: transport
        volumeMounts:
        - name: config
          mountPath: /usr/share/opensearch/config/opensearch.yml
          subPath: opensearch.yml
        - name: data
          mountPath: /usr/share/opensearch/data
      volumes:
      - name: config
        configMap:
          name: opensearch-config
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: "opensearch-sc"
      resources:
        requests:
          storage: 20Gi # Start with a reasonable size and allow for expansion.

The pitfall here is under-provisioning resources. The OPENSEARCH_JAVA_OPTS must be set explicitly. A common rule is to set Xms and Xmx to the same value, which should be no more than 50% of the container’s memory limit, and never to exceed ~31GB to stay within the range of compressed ordinary object pointers (oops).

With the interceptor logging to stdout and OpenSearch running, the final piece of the puzzle, tackled in our fourth sprint, was connecting them with Fluent Bit. We deployed it as a DaemonSet to ensure an instance runs on every node, collecting logs from all pods on that node.

# fluent-bit-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent-bit-config
  namespace: observability
  labels:
    app.kubernetes.io/name: fluent-bit
data:
  fluent-bit.conf: |
    [SERVICE]
        Flush        1
        Daemon       Off
        Log_Level    info
        HTTP_Server  On
        HTTP_Listen  0.0.0.0
        HTTP_Port    2020

    [INPUT]
        Name             tail
        Tag              kube.*
        Path             /var/log/containers/*.log
        Parser           docker
        DB               /var/log/flb_kube.db
        Mem_Buf_Limit    5MB
        Skip_Long_Lines  On

    [FILTER]
        Name                kubernetes
        Match               kube.*
        Kube_URL            https://kubernetes.default.svc:443
        Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
        Kube_Tag_Prefix     kube.var.log.containers.
        Merge_Log           On
        Keep_Log            Off
        K8S-Logging.Parser  On
        K8S-Logging.Exclude Off

    [FILTER]
        Name           parser
        Match          kube.*
        Key_Name       log
        Parser         json
        Reserve_Data   On # Important: keeps original fields

    [OUTPUT]
        Name           opensearch
        Match          *
        Host           opensearch-http.observability.svc.cluster.local
        Port           9200
        Logstash_Format On
        Logstash_Prefix service-logs
        Retry_Limit    False

The kubernetes filter is the most powerful part of this configuration. It automatically enriches each log record with metadata from the Kubernetes API server, such as pod name, namespace, labels, and annotations. This provides invaluable context when debugging. The second parser filter then processes the log field itself, which contains our structured JSON from the Go application, promoting the nested JSON fields to top-level fields in the final document sent to OpenSearch.

With all components deployed, a request to one of our gRPC services now generates a rich document in OpenSearch. A query for a specific correlation_id instantly returns all log entries for that request, in order, across all services it traversed. This capability, demonstrated at the end-of-sprint review, was a game-changer for our development and SRE teams, drastically reducing the time spent debugging distributed workflows.

The solution, however, is not without its limitations and areas for future iteration. This implementation focuses solely on logs and a simple correlation ID. It’s not a full distributed tracing solution. The next logical step would be to integrate OpenTelemetry, propagating trace and span IDs through the gRPC metadata and emitting trace data to an OpenTelemetry collector. This would allow us to visualize request latency breakdowns in tools like Jaeger or Grafana Tempo. Furthermore, self-managing OpenSearch carries a high operational burden for backups, upgrades, and scaling. As our data volume grows, we will need to implement robust automation for index lifecycle management (ILM) to move older data to warm or cold storage tiers to manage costs. Finally, the Scrum process itself highlighted a recurring challenge: while it enabled iterative progress, the unpredictable nature of R&D and debugging in a complex distributed environment made story point estimation notoriously difficult, often leading to sprint spillover that required careful management and communication with stakeholders.


  TOC