The transition to a microservices architecture on Azure Kubernetes Service (AKS) left our operations team flying blind. With dozens of gRPC services communicating, tracking a single user request through the system became an exercise in forensic archaeology. Our initial approach of kubectl logs
was untenable during incidents, leading to an unacceptable Mean Time to Resolution (MTTR). The core technical pain point was clear: we lacked a centralized, structured, and correlated logging system capable of handling the high-throughput, low-latency demands of our gRPC-based infrastructure. A mandate came down to solve this, and our platform team tackled it using a series of two-week Scrum sprints.
Our initial concept during the first sprint planning was deceptively simple: embed a logging library in each Go service to send JSON logs to a collector. This was quickly dismissed. Direct logging from the request path, especially over the network, introduces unacceptable latency and couples application logic with observability concerns. A common mistake is to treat logging as a fire-and-forget operation without considering its performance impact. If the logging endpoint is slow or unavailable, it can stall the entire service. This realization shifted our focus towards a non-blocking, asynchronous architecture built around gRPC interceptors.
The technology selection debate centered on three key components: the transport mechanism, the collection agent, and the storage backend. For transport, gRPC interceptors were the obvious choice. They provide a perfect cross-cutting concern entry point, allowing us to capture request/response metadata without polluting business logic. For the agent, we chose Fluent Bit over alternatives like Logstash or Fluentd due to its minimal resource footprint and first-class Kubernetes integration. For the backend, we opted to self-host an OpenSearch cluster directly on AKS. While managed services are easier, self-hosting gave us fine-grained control over versioning, plugin installation, and cost—a trade-off we were willing to make, though not without acknowledging the significant operational overhead it would introduce.
The architecture we settled on was a multi-stage pipeline. A gRPC interceptor in each Go service would capture request data, format it as structured JSON, and write it to a non-blocking in-memory channel. A separate goroutine would consume from this channel and write the JSON to stdout
. From there, a Fluent Bit DaemonSet running on each AKS node would tail the container logs, parse the JSON, enrich it with Kubernetes metadata, and forward it to our internal OpenSearch cluster for indexing and analysis.
graph TD subgraph Client A[gRPC Client] end subgraph "AKS Pod: Go Microservice" B(gRPC Server) --> C{Unary Interceptor}; C -- Request Path --> D[Business Logic]; C -- Non-blocking --> E[In-Memory Log Channel]; F[Log Processor Goroutine] -- reads --> E; F --> G[stdout as structured JSON]; end subgraph "AKS Node" H[Fluent Bit DaemonSet] -- tails --> G; H -- enrich with k8s metadata --> H; end subgraph "AKS: OpenSearch Cluster" I(OpenSearch Service) J(OpenSearch StatefulSet) I --> J end A --> B; H -- ships logs --> I;
Our second sprint focused on building the core of this system: the Go interceptor. The goal was to create a resilient, non-blocking logger that would not impact service performance.
// internal/interceptor/observability.go
package interceptor
import (
"context"
"encoding/json"
"os"
"sync"
"time"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
const (
// A common pitfall is not defining a clear, consistent key for correlation IDs.
correlationIDKey = "x-correlation-id"
// Buffer size for the log channel. A small buffer risks blocking on bursty traffic,
// while a large one consumes memory. 1000 is a reasonable starting point for tuning.
logChannelBufferSize = 1000
)
// logEntry defines the structured format for our logs.
// Using a struct ensures consistency and simplifies JSON marshaling.
type logEntry struct {
Timestamp time.Time `json:"timestamp"`
CorrelationID string `json:"correlation_id"`
Service string `json:"service"`
Method string `json:"grpc_method"`
StatusCode string `json:"grpc_status_code"`
DurationMs float64 `json:"duration_ms"`
PeerAddress string `json:"peer_address"`
UserAgent string `json:"user_agent"`
Error string `json:"error,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// AsyncLogger is a non-blocking logger that writes to stdout.
type AsyncLogger struct {
serviceName string
logChannel chan logEntry
writer *json.Encoder
wg sync.WaitGroup
shutdown chan struct{}
}
// NewAsyncLogger creates and starts a new asynchronous logger.
// In a real-world project, this would be part of the application's startup lifecycle.
func NewAsyncLogger(serviceName string) *AsyncLogger {
logger := &AsyncLogger{
serviceName: serviceName,
logChannel: make(chan logEntry, logChannelBufferSize),
writer: json.NewEncoder(os.Stdout),
shutdown: make(chan struct{}),
}
logger.wg.Add(1)
go logger.processLogs()
return logger
}
// processLogs is the core worker goroutine. It reads from the channel
// and writes to stdout. This decouples logging I/O from the request path.
func (l *AsyncLogger) processLogs() {
defer l.wg.Done()
for {
select {
case entry := <-l.logChannel:
// The encoder writes directly to the stream (stdout), which is
// more efficient than marshaling to a byte slice and then writing.
if err := l.writer.Encode(entry); err != nil {
// If we can't write to stdout, something is seriously wrong.
// We log to stderr as a last resort.
// This avoids an infinite loop if stdout logging itself is the problem.
fmt.Fprintf(os.Stderr, "failed to write log entry: %v\n", err)
}
case <-l.shutdown:
// Graceful shutdown: process remaining logs in the channel.
for len(l.logChannel) > 0 {
entry := <-l.logChannel
_ = l.writer.Encode(entry)
}
return
}
}
}
// Log sends a log entry to the processing channel.
// It includes a non-blocking send to prevent stalling the caller.
func (l *AsyncLogger) Log(entry logEntry) {
select {
case l.logChannel <- entry:
// Log successfully queued
default:
// Channel is full. This indicates the logging pipeline can't keep up.
// Dropping the log is preferable to blocking the service.
// In a real-world project, you would increment a metric here to monitor log drop rate.
fmt.Fprintf(os.Stderr, "log channel buffer full, dropping log for method: %s\n", entry.Method)
}
}
// Shutdown initiates a graceful shutdown of the logger.
func (l *AsyncLogger) Shutdown(ctx context.Context) error {
close(l.shutdown)
// Wait for the WaitGroup with a timeout to prevent hanging.
done := make(chan struct{})
go func() {
l.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return fmt.Errorf("timed out waiting for logger to shut down: %w", ctx.Err())
}
}
// UnaryServerInterceptor returns a new unary server interceptor that logs request details.
func (l *AsyncLogger) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
md, _ := metadata.FromIncomingContext(ctx)
correlationID := getOrSetCorrelationID(md)
// Create a new context with the correlation ID to pass downstream.
// This is crucial for maintaining context through the entire call stack.
newCtx := metadata.AppendToOutgoingContext(context.Background(), correlationIDKey, correlationID)
newCtx = metadata.NewIncomingContext(newCtx, md)
// Invoke the actual RPC handler
resp, err := handler(newCtx, req)
duration := time.Since(startTime)
st, _ := status.FromError(err)
var peerAddress string
if p, ok := peer.FromContext(ctx); ok {
peerAddress = p.Addr.String()
}
var userAgent string
if ua := md.Get("user-agent"); len(ua) > 0 {
userAgent = ua[0]
}
entry := logEntry{
Timestamp: startTime.UTC(),
CorrelationID: correlationID,
Service: l.serviceName,
Method: info.FullMethod,
StatusCode: st.Code().String(),
DurationMs: float64(duration.Nanoseconds()) / 1e6,
PeerAddress: peerAddress,
UserAgent: userAgent,
}
if err != nil {
entry.Error = st.Message()
}
l.Log(entry)
return resp, err
}
}
func getOrSetCorrelationID(md metadata.MD) string {
if id := md.Get(correlationIDKey); len(id) > 0 && id[0] != "" {
return id[0]
}
// No correlation ID found, generate a new one.
return uuid.New().String()
}
The implementation of AsyncLogger
was critical. It uses a buffered channel to decouple the gRPC request goroutine from the I/O-bound operation of writing logs. The processLogs
goroutine acts as a singleton consumer. The non-blocking send in the Log
method is a key resilience pattern: if the logging system is overwhelmed, we drop logs instead of degrading service performance. Monitoring the “log channel buffer full” error rate becomes an essential SLI for our observability platform itself. The graceful shutdown logic is equally important to prevent log loss during deployments or pod restarts.
The next sprint was dedicated to deploying a production-ready OpenSearch cluster on AKS. A common mistake is to deploy it as a standard Deployment, which doesn’t provide stable network identifiers or persistent storage volumes. A StatefulSet
is the correct approach.
Here is a simplified but functional set of manifests for a three-node OpenSearch cluster.
# opensearch-storage.yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: opensearch-sc
provisioner: disk.csi.azure.com
parameters:
skuName: Premium_LRS # Use premium SSD for better I/O performance
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true
---
# opensearch-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: opensearch-config
namespace: observability
data:
opensearch.yml: |
cluster.name: my-opensearch-cluster
node.name: ${NODE_NAME}
network.host: 0.0.0.0
discovery.seed_hosts:
- opensearch-cluster-0.opensearch-headless.observability.svc.cluster.local
- opensearch-cluster-1.opensearch-headless.observability.svc.cluster.local
- opensearch-cluster-2.opensearch-headless.observability.svc.cluster.local
cluster.initial_master_nodes:
- opensearch-cluster-0
- opensearch-cluster-1
- opensearch-cluster-2
---
# opensearch-headless-svc.yaml
apiVersion: v1
kind: Service
metadata:
name: opensearch-headless
namespace: observability
spec:
clusterIP: None # This makes it a headless service
selector:
app: opensearch-cluster
ports:
- name: transport
port: 9300
protocol: TCP
---
# opensearch-svc.yaml
apiVersion: v1
kind: Service
metadata:
name: opensearch-http
namespace: observability
spec:
selector:
app: opensearch-cluster
ports:
- name: http
port: 9200
protocol: TCP
---
# opensearch-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: opensearch-cluster
namespace: observability
spec:
serviceName: opensearch-headless
replicas: 3
selector:
matchLabels:
app: opensearch-cluster
template:
metadata:
labels:
app: opensearch-cluster
spec:
containers:
- name: opensearch
image: opensearchproject/opensearch:2.9.0
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
# Production-grade JVM settings are critical. Never use defaults.
- name: OPENSEARCH_JAVA_OPTS
value: "-Xms1g -Xmx1g" # Set heap to 50% of container memory limit, up to 30GB
- name: "DISABLE_INSTALL_DEMO_CONFIG"
value: "true"
- name: "DISABLE_SECURITY_PLUGIN"
value: "true" # For simplicity in this example. In production, security is mandatory.
ports:
- containerPort: 9200
name: http
- containerPort: 9300
name: transport
volumeMounts:
- name: config
mountPath: /usr/share/opensearch/config/opensearch.yml
subPath: opensearch.yml
- name: data
mountPath: /usr/share/opensearch/data
volumes:
- name: config
configMap:
name: opensearch-config
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "opensearch-sc"
resources:
requests:
storage: 20Gi # Start with a reasonable size and allow for expansion.
The pitfall here is under-provisioning resources. The OPENSEARCH_JAVA_OPTS
must be set explicitly. A common rule is to set Xms and Xmx to the same value, which should be no more than 50% of the container’s memory limit, and never to exceed ~31GB to stay within the range of compressed ordinary object pointers (oops).
With the interceptor logging to stdout
and OpenSearch running, the final piece of the puzzle, tackled in our fourth sprint, was connecting them with Fluent Bit. We deployed it as a DaemonSet
to ensure an instance runs on every node, collecting logs from all pods on that node.
# fluent-bit-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
namespace: observability
labels:
app.kubernetes.io/name: fluent-bit
data:
fluent-bit.conf: |
[SERVICE]
Flush 1
Daemon Off
Log_Level info
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
[INPUT]
Name tail
Tag kube.*
Path /var/log/containers/*.log
Parser docker
DB /var/log/flb_kube.db
Mem_Buf_Limit 5MB
Skip_Long_Lines On
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Kube_Tag_Prefix kube.var.log.containers.
Merge_Log On
Keep_Log Off
K8S-Logging.Parser On
K8S-Logging.Exclude Off
[FILTER]
Name parser
Match kube.*
Key_Name log
Parser json
Reserve_Data On # Important: keeps original fields
[OUTPUT]
Name opensearch
Match *
Host opensearch-http.observability.svc.cluster.local
Port 9200
Logstash_Format On
Logstash_Prefix service-logs
Retry_Limit False
The kubernetes
filter is the most powerful part of this configuration. It automatically enriches each log record with metadata from the Kubernetes API server, such as pod name, namespace, labels, and annotations. This provides invaluable context when debugging. The second parser
filter then processes the log
field itself, which contains our structured JSON from the Go application, promoting the nested JSON fields to top-level fields in the final document sent to OpenSearch.
With all components deployed, a request to one of our gRPC services now generates a rich document in OpenSearch. A query for a specific correlation_id
instantly returns all log entries for that request, in order, across all services it traversed. This capability, demonstrated at the end-of-sprint review, was a game-changer for our development and SRE teams, drastically reducing the time spent debugging distributed workflows.
The solution, however, is not without its limitations and areas for future iteration. This implementation focuses solely on logs and a simple correlation ID. It’s not a full distributed tracing solution. The next logical step would be to integrate OpenTelemetry, propagating trace and span IDs through the gRPC metadata and emitting trace data to an OpenTelemetry collector. This would allow us to visualize request latency breakdowns in tools like Jaeger or Grafana Tempo. Furthermore, self-managing OpenSearch carries a high operational burden for backups, upgrades, and scaling. As our data volume grows, we will need to implement robust automation for index lifecycle management (ILM) to move older data to warm or cold storage tiers to manage costs. Finally, the Scrum process itself highlighted a recurring challenge: while it enabled iterative progress, the unpredictable nature of R&D and debugging in a complex distributed environment made story point estimation notoriously difficult, often leading to sprint spillover that required careful management and communication with stakeholders.