在 Azure AKS 上构建从 Caddy 到 MobX 的全栈可观测性数据管道


在一个多租户SaaS平台中,可观测性的挑战远不止于收集后端服务的CPU和内存使用率。真正的痛点在于数据孤岛:前端的用户感知性能数据(如页面加载、API交互耗时)与后端服务的健康状况、乃至边缘入口的流量指标完全割裂。当客户报告“系统很慢”时,问题可能出在Caddy的TLS握手延迟、AKS的Pod调度压力、后端服务的数据库慢查询,或是MobX驱动的前端应用在处理某个复杂状态变更时导致的UI卡顿。我们需要一个能将这些孤立信号关联起来的统一数据管道。

摆在面前的方案有两个。方案A,采用业界主流的Prometheus + Grafana生态。通过Prometheus Operator在AKS中快速部署,利用prometheus-pushgateway接收来自前端的事件型指标,再通过blackbox-exporter探测后端服务。这是一个成熟、功能强大的方案。

方案B,构建一个更具侵入性但控制力更强的自定义数据管道。该方案以InfluxDB为核心时序数据库,因为它在处理高基数(high-cardinality)标签(例如tenant_id, user_id, session_id)方面表现优于Prometheus的某些默认配置。数据流如下:

  1. Caddy作为Ingress,利用其内置的metrics模块暴露Prometheus格式的指标,由Telegraf Agent采集并推送至InfluxDB。
  2. 后端服务(Go语言实现)内建一个轻量级遥测(Telemetry)API端点。
  3. 前端(使用MobX)通过封装的工具函数,在关键的用户交互和状态变更后,将性能指标(例如组件渲染耗时、数据获取时长)打包推送到后端的遥测端点。
  4. 后端服务接收到前端遥测数据后,会用服务端的上下文信息(如用户归属的租户ID、服务器实例名)进行丰富,然后以结构化行协议(Line Protocol)格式写入InfluxDB。

对比两种方案:

方案A (Prometheus生态)

  • 优势:
    • 社区标准,生态成熟,开箱即用的组件多。
    • PromQL查询语言功能强大。
    • 与Kubernetes生态集成紧密。
  • 劣势:
    • Pushgateway并非为大规模事件型指标设计,可能成为单点瓶颈。
    • 处理高基数标签时,Prometheus的TSDB存储引擎可能会面临性能挑战和巨大的内存消耗,即所谓的“基数爆炸”。对于需要精细到每个用户的多租户平台,这是一个现实风险。
    • 数据关联需要在Grafana的查询层面手动实现,缺乏原生的数据丰富(enrichment)流程。

方案B (自定义InfluxDB管道)

  • 优势:
    • InfluxDB v2/v3在设计上对高基数场景更加友好。
    • Push模型天然适合前端和无状态后端服务的指标上报。
    • 自定义遥测后端允许我们在数据写入前进行任意的丰富和预处理,例如将前端的会话ID与后端的租户ID关联,这是实现深度下钻分析的关键。
    • 架构相对简单,组件职责清晰。
  • 劣势:
    • 需要编写和维护自定义的遥测端点代码。
    • 相比Prometheus Operator,在AKS上的部署和维护需要更多手动配置。
    • 查询语言Flux(InfluxDB 2.0)的学习曲线可能比PromQL陡峭。

在我们的场景中,对“租户”和“用户会话”这两个高基数维度的精细化分析是核心诉求。我们需要回答“为什么A租户的P99响应时间比B租户高500ms?”这类问题。这种需求使得方案B的优势——特别是数据丰富和对高基数的友好支持——变得至关重要。因此,我们选择方案B。

核心实现概览

整个系统的架构和数据流可以用下图表示:

graph TD
    subgraph Browser
        A[User Interaction] --> B{MobX State Change};
        B --> C[Performance Measurement];
        C --> D[Telemetry Beacon Sender];
    end

    subgraph Azure AKS Cluster
        subgraph Ingress
            E[Caddy Server] -- Exposes /metrics --> F[Telegraf Agent];
        end
        
        subgraph Backend Pod
            G[Go Backend Service];
            D -- HTTP POST /v1/telemetry --> G;
        end

        subgraph Monitoring
            H[InfluxDB Service];
        end

        F -- Influx Line Protocol --> H;
        G -- Influx Line Protocol --> H;
    end

    I[Developer/SRE] -- Flux Query --> H;

1. InfluxDB 数据模型设计

在InfluxDB中,数据模型至关重要。我们定义一个名为app_observability的Bucket,其中包含几个关键的Measurement:

  • caddy_http_requests: 存储来自Caddy的HTTP请求指标。
    • Tags: host, method, status_code, instance
    • Fields: count, duration_seconds_sum, duration_seconds_bucket
  • backend_api_latency: 存储后端API处理延迟。
    • Tags: tenant_id, endpoint, http_method
    • Fields: latency_ms (integer)
  • frontend_performance: 存储来自前端的性能指标。
    • Tags: tenant_id, session_id, view_name, action_name
    • Fields: duration_ms (integer), element_count (integer)

这里的关键设计是,所有与业务上下文相关的维度,如tenant_id, session_id, view_name,都作为Tag。这使得我们可以高效地对这些维度进行GROUP BYFILTER操作。

2. Caddy与Telegraf集成配置

Caddy的配置极其简洁。在Caddyfile中,我们只需要在全局选项里启用metrics插件。

{
    # Enable the metrics endpoint, which defaults to /metrics
    # This exposes Prometheus-compatible metrics.
    metrics
}

# Example site configuration
app.yourdomain.com {
    # Reverse proxy to our backend service running in AKS
    reverse_proxy backend-service.default.svc.cluster.local:8080

    # Other configurations like TLS, logging, etc.
    tls [email protected]
    log {
        output file /var/log/caddy/access.log
    }
}

接着,我们在AKS中以DaemonSet的形式部署Telegraf。Telegraf的配置文件telegraf.conf负责抓取Caddy的指标并推送到InfluxDB。

# telegraf.conf

# Global agent configuration
[agent]
  interval = "15s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  hostname = "${HOSTNAME}" # Important for identifying the node
  omit_hostname = false

# Output plugin: InfluxDB v2
[[outputs.influxdb_v2]]
  urls = ["http://influxdb.default.svc.cluster.local:8086"]
  
  # InfluxDB credentials should be stored as Kubernetes secrets and mounted as env vars
  token = "${INFLUX_TOKEN}"
  organization = "my-org"
  bucket = "app_observability"

# Input plugin: Prometheus
[[inputs.prometheus]]
  # Scrape Caddy's metrics endpoint.
  # We assume Caddy is exposed via a service named 'caddy' on port 2019
  urls = ["http://caddy.default.svc.cluster.local:2019/metrics"]
  metric_version = 2 # Use Prometheus text format
  
  # Add tags to all metrics from this input
  [inputs.prometheus.tags]
    source = "caddy"

3. 后端遥测端点(Go)

后端服务是整个管道的核心。它不仅处理业务逻辑,还负责接收、丰富并转发前端遥测数据。我们使用官方的InfluxDB Go客户端。

package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"os"
	"time"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
	"github.com/influxdata/influxdb-client-go/v2/api"
)

// FrontendTelemetryPayload defines the structure of the data sent from the frontend.
type FrontendTelemetryPayload struct {
	SessionID string            `json:"sessionId"`
	ViewName  string            `json:"viewName"`
	ActionName string           `json:"actionName"`
	Metrics   map[string]int64  `json:"metrics"` // e.g., {"duration_ms": 120, "element_count": 50}
}

// TelemetryHandler handles incoming telemetry data.
type TelemetryHandler struct {
	InfluxWriteAPI api.WriteAPIBlocking
}

// NewTelemetryHandler creates a new handler with a configured InfluxDB client.
func NewTelemetryHandler(influxURL, influxToken, influxOrg, influxBucket string) *TelemetryHandler {
	client := influxdb2.NewClient(influxURL, influxToken)
	writeAPI := client.WriteAPIBlocking(influxOrg, influxBucket)
	return &TelemetryHandler{InfluxWriteAPI: writeAPI}
}

func (h *TelemetryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Only POST method is accepted", http.StatusMethodNotAllowed)
		return
	}

	// In a real application, you'd get tenantId from the JWT token or session.
	// Hardcoding for demonstration purposes.
	tenantID := r.Header.Get("X-Tenant-ID")
	if tenantID == "" {
		http.Error(w, "Missing X-Tenant-ID header", http.StatusBadRequest)
		return
	}

	var payload FrontendTelemetryPayload
	if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
		log.Printf("ERROR: Failed to decode telemetry payload: %v", err)
		http.Error(w, "Invalid payload", http.StatusBadRequest)
		return
	}

	// Data enrichment happens here. We add the tenant_id from a trusted source (server-side).
	tags := map[string]string{
		"tenant_id":  tenantID,
		"session_id": payload.SessionID,
		"view_name":  payload.ViewName,
		"action_name": payload.ActionName,
	}

	fields := make(map[string]interface{}, len(payload.Metrics))
	for k, v := range payload.Metrics {
		fields[k] = v
	}
	
	if len(fields) == 0 {
		log.Println("WARN: Received telemetry payload with no metrics")
		w.WriteHeader(http.StatusNoContent)
		return
	}

	point := influxdb2.NewPoint("frontend_performance", tags, fields, time.Now())

	// Write the point to InfluxDB.
	// In a high-throughput system, you would use the non-blocking WriteAPI and batch points.
	err := h.InfluxWriteAPI.WritePoint(context.Background(), point)
	if err != nil {
		log.Printf("ERROR: Failed to write telemetry point to InfluxDB: %v", err)
		http.Error(w, "Internal server error", http.StatusInternalServerError)
		return
	}

	log.Printf("INFO: Successfully logged telemetry for tenant %s, action %s", tenantID, payload.ActionName)
	w.WriteHeader(http.StatusAccepted)
}

func main() {
	// Configuration should come from environment variables in a 12-factor app.
	influxURL := os.Getenv("INFLUXDB_URL")
	influxToken := os.Getenv("INFLUXDB_TOKEN")
	influxOrg := os.Getenv("INFLUXDB_ORG")
	influxBucket := "app_observability"

	if influxURL == "" || influxToken == "" || influxOrg == "" {
		log.Fatal("FATAL: InfluxDB configuration environment variables not set.")
	}

	telemetryHandler := NewTelemetryHandler(influxURL, influxToken, influxOrg, influxBucket)

	mux := http.NewServeMux()
	mux.Handle("/v1/telemetry", telemetryHandler)
	// ... other business logic handlers
	
	log.Println("INFO: Starting backend server on :8080...")
	if err := http.ListenAndServe(":8080", mux); err != nil {
		log.Fatalf("FATAL: Could not start server: %v", err)
	}
}

4. 前端性能埋点(TypeScript & MobX)

前端的挑战在于如何以非侵入性、可维护的方式捕获性能指标。MobX的reaction是实现这一点的理想工具。我们可以在状态变化时触发计时器,并在下一次相关状态变化时停止计时。

// src/services/telemetry.ts

// A simple, robust beacon sender. In a real app, this might use 'navigator.sendBeacon'
// for more reliability on page exit, and include batching and retry logic.
async function sendTelemetry(payload: object) {
  try {
    // We assume the tenant ID is available globally or passed down.
    const tenantId = window.AppConfig.tenantId;

    await fetch('/v1/telemetry', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'X-Tenant-ID': tenantId,
      },
      body: JSON.stringify(payload),
      keepalive: true, // Important for reliability
    });
  } catch (error) {
    console.error('Failed to send telemetry data:', error);
  }
}

// A more specific performance tracking utility.
export function trackPerformance(
    sessionId: string,
    viewName: string,
    actionName: string,
    metrics: Record<string, number>
) {
    const payload = {
        sessionId,
        viewName,
        actionName,
        metrics,
    };
    sendTelemetry(payload);
}

现在,在一个使用MobX的React组件中集成它:

// src/components/UserList.tsx

import React, { useEffect } from 'react';
import { observer } from 'mobx-react-lite';
import { reaction } from 'mobx';
import { userListStore } from '../stores/UserListStore';
import { trackPerformance } from '../services/telemetry';
import { sessionStore } from '../stores/SessionStore';

const UserList = observer(() => {
  useEffect(() => {
    // This is the core of our MobX-based performance tracking.
    // We set up a reaction that triggers when the loading state changes.
    const disposeReaction = reaction(
      () => userListStore.isLoading,
      (isLoading, previousIsLoading) => {
        if (isLoading && !previousIsLoading) {
          // Transitioned from not-loading to loading: start the timer.
          userListStore.setLoadStartTime(Date.now());
        } else if (!isLoading && previousIsLoading) {
          // Transitioned from loading to not-loading: stop the timer and send data.
          const startTime = userListStore.loadStartTime;
          if (startTime) {
            const durationMs = Date.now() - startTime;
            trackPerformance(
              sessionStore.currentSessionId,
              'UserListView',
              'fetchUsers',
              {
                duration_ms: durationMs,
                element_count: userListStore.users.length,
              }
            );
          }
        }
      },
      { fireImmediately: false } // Don't run on initial setup
    );

    // Initial data fetch
    userListStore.fetchUsers();

    // Clean up the reaction when the component unmounts.
    return () => {
      disposeReaction();
    };
  }, []);

  if (userListStore.isLoading) {
    return <div>Loading users...</div>;
  }
  
  return (
    <ul>
      {userListStore.users.map(user => <li key={user.id}>{user.name}</li>)}
    </ul>
  );
});

export default UserList;

在这个例子中,userListStore是一个MobX store,包含isLoading(一个@observable布尔值)和fetchUsers(一个@action)。reaction精确地捕获了数据加载周期的开始和结束,然后调用trackPerformance将数据发送到后端。

架构的局限性与未来展望

此套自定义管道虽然精准解决了高基数和数据关联的核心问题,但也存在其局限性。首先,它并非一个完整的APM(应用性能监控)解决方案,缺乏对分布式追踪的开箱即用支持。虽然可以通过在遥测数据中手动传递和记录trace_id来部分实现,但这需要跨前后端进行严格的协议约定和实现。

其次,前端的埋点目前是手动进行的。对于大型应用,这可能导致覆盖不全或维护成本高。未来的一个优化方向是开发更高阶的封装,例如React Hooks或高阶组件(HOC),来自动化某些通用场景(如API调用、路由切换)的性能埋点。

最后,数据的价值在于被消费。虽然InfluxDB的UI提供了强大的数据探索和仪表盘功能,但要实现智能告警和异常检测,还需要进一步的工作。例如,可以利用InfluxDB的Task引擎定期运行Flux脚本,检查每个租户的P99延迟是否超过其历史基线,并将异常推送至告警系统。这可以从一个被动的数据收集管道,演进为一个主动的、能驱动运维决策的智能可观测性平台。


  目录