构建一套贯穿 Swift、Java 与 TypeScript 的 WebSocket 全链路追踪系统


我们的实时仪表盘系统成了一个调试黑洞。一个指令从用户的iOS设备发出,通过WebSocket推送到Java后端,再分发给Web端的监控面板。当指令“石沉大海”时,排查过程就是一场灾难。问题出在Swift客户端的网络抖动、WebSocket网关的连接管理、还是后端Java服务的业务逻辑异常?我们只能靠猜测和分散在各处的日志进行“grep考古”,效率极低。RESTful API的链路追踪方案已经很成熟,但在长连接的WebSocket世界里,我们缺少一把能解剖整个消息流的“手术刀”。

初步构想:为WebSocket消息安上追踪的“信封”

HTTP有Headers,这是分布式追踪的天然载体。traceparenttracestate等W3C Trace Context标准正是通过Headers在服务间传递的。但WebSocket在连接建立后,其双向数据帧(Frame)本质上是一个裸流,没有为每条消息提供独立的元数据区域。

唯一的出路是在应用层设计一套协议。我们必须将追踪上下文与业务数据打包在一起,像一个信封(Envelope)一样,业务数据是信,追踪上下文就是写在信封上的邮寄信息。

这个“信封”协议必须足够简单、可扩展且对业务逻辑侵入性低。JSON是一个合理的选择。

{
  "metadata": {
    "traceparent": "00-0a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d-0123456789abcdef-01",
    "tracestate": "vendor=value"
  },
  "payload": {
    // 真正的业务数据
    "command": "UPDATE_ENTITY",
    "entityId": "xyz-123",
    "data": { "status": "processing" }
  }
}

metadata字段将专门用于承载所有非业务数据,其中traceparent是核心。payload则完全封装业务逻辑,这样业务代码的开发者可以几乎无感知地处理数据,而追踪逻辑则由底层的网络层或消息处理器统一处理。技术选型上,OpenTelemetry作为业界标准,是连接我们异构技术栈(Swift, Java, TypeScript)的不二之选。

后端Java框架:用AOP和Handler解耦追踪逻辑

后端是整个系统的中枢。我们使用Spring Boot及其spring-boot-starter-websocket模块。关键在于如何在不污染业务@Service层的前提下,自动化地提取和注入追踪上下文。

首先,引入OpenTelemetry和Spring的相关依赖。

<!-- pom.xml -->
<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-api</artifactId>
    <version>1.31.0</version>
</dependency>
<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-sdk</artifactId>
    <version>1.31.0</version>
</dependency>
<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-exporter-otlp</artifactId>
    <version>1.31.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

接下来,是配置OpenTelemetry SDK的核心部分。一个常见的错误是忘记配置ContextPropagators,这会导致上下文无法在线程间正确传递。

// src/main/java/com/example/config/OpenTelemetryConfig.java
package com.example.config;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcTraceExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

@Configuration
public class OpenTelemetryConfig {

    private static final String SERVICE_NAME = "websocket-java-backend";

    @Bean
    public OpenTelemetry openTelemetry() {
        Resource resource = Resource.getDefault()
                .merge(Resource.create(io.opentelemetry.api.common.Attributes.of(ResourceAttributes.SERVICE_NAME, SERVICE_NAME)));

        SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
                .addSpanProcessor(BatchSpanProcessor.builder(
                        OtlpGrpcTraceExporter.builder()
                                .setEndpoint("http://localhost:4317") // 指向OTel Collector
                                .setTimeout(2, TimeUnit.SECONDS)
                                .build())
                        .build())
                .setResource(resource)
                .build();

        return OpenTelemetrySdk.builder()
                .setTracerProvider(sdkTracerProvider)
                .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
                .buildAndRegisterGlobal();
    }

    @Bean
    public Tracer tracer(OpenTelemetry openTelemetry) {
        return openTelemetry.getTracer(SERVICE_NAME, "1.0.0");
    }
}

核心实现位于自定义的TextWebSocketHandler。我们重写handleTextMessage方法,在这里完成追踪上下文的提取和新Span的创建。

// src/main/java/com/example/websocket/TracingWebSocketHandler.java
package com.example.websocket;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@Component
public class TracingWebSocketHandler extends TextWebSocketHandler {

    private static final Logger logger = LoggerFactory.getLogger(TracingWebSocketHandler.class);
    private final Tracer tracer;
    private final OpenTelemetry openTelemetry;
    private final ObjectMapper objectMapper;
    private final BusinessLogicService businessLogicService; // 注入业务逻辑处理类

    public TracingWebSocketHandler(Tracer tracer, OpenTelemetry openTelemetry, BusinessLogicService businessLogicService) {
        this.tracer = tracer;
        this.openTelemetry = openTelemetry;
        this.objectMapper = new ObjectMapper();
        this.businessLogicService = businessLogicService;
    }

    // TextMapGetter的实现,告诉OpenTelemetry如何从我们的自定义结构中读取traceparent
    private static final TextMapGetter<JsonNode> getter = new TextMapGetter<>() {
        @Override
        public Iterable<String> keys(JsonNode carrier) {
            if (carrier != null && carrier.has("metadata")) {
                return () -> carrier.get("metadata").fieldNames();
            }
            return Collections.emptyList();
        }

        @Override
        public String get(JsonNode carrier, String key) {
            if (carrier != null && carrier.has("metadata") && carrier.get("metadata").has(key)) {
                return carrier.get("metadata").get(key).asText();
            }
            return null;
        }
    };
    
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        JsonNode rootNode = objectMapper.readTree(message.getPayload());

        // 1. 从消息中提取追踪上下文
        Context extractedContext = openTelemetry.getPropagators().getTextMapPropagator()
                .extract(Context.current(), rootNode, getter);

        // 2. 围绕消息处理创建一个新的Span,并将其与父Span关联
        Span span = tracer.spanBuilder("websocket.message.process")
                .setParent(extractedContext)
                .setSpanKind(SpanKind.SERVER)
                .startSpan();

        // 3. 将新的Span设为当前上下文,确保后续业务逻辑和日志都在此Span内
        try (Scope scope = span.makeCurrent()) {
            span.setAttribute("websocket.session.id", session.getId());
            JsonNode payload = rootNode.get("payload");
            span.setAttribute("websocket.payload.command", payload.path("command").asText("unknown"));
            
            logger.info("Processing command: {}", payload.path("command").asText());

            // 4. 执行业务逻辑
            String responsePayload = businessLogicService.handleCommand(payload);

            // 5. 准备响应,将当前追踪上下文注入回去
            sendResponse(session, responsePayload, Context.current());

        } catch (Exception e) {
            span.setStatus(StatusCode.ERROR, "Error processing WebSocket message");
            span.recordException(e);
            logger.error("Error handling message for session {}", session.getId(), e);
            throw e; // 重新抛出,让Spring框架处理
        } finally {
            span.end(); // 确保Span总是被关闭
        }
    }

    private void sendResponse(WebSocketSession session, String payload, Context context) throws IOException {
        Map<String, String> metadata = new HashMap<>();
        
        // 注入器,告诉OpenTelemetry如何写入traceparent
        openTelemetry.getPropagators().getTextMapPropagator().inject(context, metadata, (carrier, key, value) -> carrier.put(key, value));
        
        ObjectNode responseNode = objectMapper.createObjectNode();
        responseNode.set("metadata", objectMapper.valueToTree(metadata));
        responseNode.put("payload", payload);
        
        if (session.isOpen()) {
            session.sendMessage(new TextMessage(responseNode.toString()));
        }
    }
}

这里的关键在于TextMapGetter的实现,它教会了OpenTelemetry如何从我们的JSON metadata中读取traceparent。处理逻辑被包裹在try-with-resources块中,确保Scope被正确关闭,这是避免上下文泄漏的关键实践。

sequenceDiagram
    participant Client
    participant WebSocketHandler
    participant BusinessLogic
    
    Client->>WebSocketHandler: Send Message (with traceparent)
    activate WebSocketHandler
    
    WebSocketHandler->>WebSocketHandler: Extract Parent Context
    WebSocketHandler->>WebSocketHandler: Start New Span (child of parent)
    
    Note right of WebSocketHandler: try (Scope scope = span.makeCurrent())
    
    WebSocketHandler->>BusinessLogic: execute(payload)
    activate BusinessLogic
    
    BusinessLogic-->>WebSocketHandler: return result
    deactivate BusinessLogic
    
    WebSocketHandler->>WebSocketHandler: Inject Current Context into Response
    WebSocketHandler-->>Client: Send Response (with new traceparent)
    
    Note right of WebSocketHandler: span.end()
    
    deactivate WebSocketHandler

Swift客户端:在发送前封装追踪上下文

iOS端使用URLSessionWebSocketTask来管理WebSocket连接。我们需要一个服务类来封装连接、发送、接收的逻辑,并在发送时自动注入追踪信息。

首先,配置Swift的OpenTelemetry SDK。

// AppDelegate.swift or a dedicated TelemetryManager.swift
import OpenTelemetryApi
import OpenTelemetrySdk
import OpenTelemetryOtlpExporter

class TelemetryManager {
    static let shared = TelemetryManager()
    
    private init() {
        let resource = Resource(attributes: [
            "service.name": AttributeValue.string("swift-ios-client")
        ])
        
        let exporter = OtlpHttpTraceExporter(endpoint: URL(string: "http://localhost:4318/v1/traces")!)
        
        let processor = BatchSpanProcessor(spanExporter: exporter)
        
        OpenTelemetry.registerTracerProvider(
            tracerProvider: TracerProviderBuilder(resource: resource)
                .add(spanProcessor: processor)
                .build()
        )
    }
    
    func getTracer() -> Tracer {
        return OpenTelemetry.instance.tracerProvider.get(instrumentationName: "com.example.websocket", instrumentationVersion: "1.0.0")
    }
}

接着是WebSocketService,这是整个客户端追踪的核心。

// WebSocketService.swift
import Foundation
import OpenTelemetryApi

// W3C Trace Context Propagator for Swift
struct W3CHeaderInjector: TextMapPropagator {
    func inject<S>(context: SpanContext, carrier: inout [String: String], setter: S) where S : Setter {
        let propagator = W3CTraceContextPropagator()
        propagator.inject(context: context, carrier: &carrier, setter: setter)
    }
    // `extract` is not needed for the client sending part
    // ...
}


final class WebSocketService: NSObject, URLSessionWebSocketDelegate {
    private var webSocketTask: URLSessionWebSocketTask?
    private let tracer: Tracer
    
    override init() {
        self.tracer = TelemetryManager.shared.getTracer()
        super.init()
    }
    
    func connect(url: URL) {
        let session = URLSession(configuration: .default, delegate: self, delegateQueue: OperationQueue())
        webSocketTask = session.webSocketTask(with: url)
        webSocketTask?.resume()
        receive()
    }

    func send(command: String, data: [String: Any]) {
        // 1. 创建一个新的Span来包裹本次发送操作
        let span = tracer.spanBuilder(spanName: "websocket.message.send").setSpanKind(spanKind: .client).startSpan()
        
        // 2. 将此Span设为当前上下文
        OpenTelemetry.instance.contextProvider.withActiveSpan(span) {
            do {
                var metadata = [String: String]()
                
                // 3. 注入W3C trace context到我们的metadata字典
                // 在真实项目中, 应该使用更完整的上下文传播, 这里为了简化仅用span context
                if let spanContext = OpenTelemetry.instance.contextProvider.activeSpan?.context {
                     let propagator = W3CTraceContextPropagator()
                     propagator.inject(context: spanContext, carrier: &metadata, setter: DictionarySetter())
                }

                span.setAttribute(key: "websocket.payload.command", value: command)
                
                let payload: [String: Any] = [
                    "command": command,
                    "data": data
                ]
                
                let messageEnvelope: [String: Any] = [
                    "metadata": metadata,
                    "payload": payload
                ]
                
                let jsonData = try JSONSerialization.data(withJSONObject: messageEnvelope, options: [])
                guard let jsonString = String(data: jsonData, encoding: .utf8) else {
                    span.setStatus(status: .error(description: "Failed to create JSON string"))
                    span.end()
                    return
                }
                
                webSocketTask?.send(.string(jsonString)) { error in
                    if let error = error {
                        span.record(error: error)
                        span.setStatus(status: .error(description: "WebSocket send error"))
                        print("WebSocket send error: \(error)")
                    } else {
                        span.setStatus(status: .ok)
                    }
                    // 4. 在发送完成(或失败)后结束Span
                    span.end()
                }
            } catch {
                span.record(error: error)
                span.setStatus(status: .error(description: "JSON serialization failed"))
                span.end()
                print("Error serializing JSON: \(error)")
            }
        }
    }
    
    private func receive() {
        webSocketTask?.receive { [weak self] result in
            switch result {
            case .failure(let error):
                print("Failed to receive message: \(error)")
            case .success(let message):
                switch message {
                case .string(let text):
                    print("Received string: \(text)")
                    // 在这里可以解析响应的traceparent,用于后续的UI操作追踪
                case .data(let data):
                    print("Received data: \(data)")
                @unknown default:
                    fatalError()
                }
                self?.receive() // 循环接收
            }
        }
    }
    
    // URLSession delegate method for connection status
    func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) {
        print("WebSocket connection established")
    }
    
    func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
        print("WebSocket connection closed with error: \(error?.localizedDescription ?? "No error")")
    }
}

// 帮助OpenTelemetry将上下文注入到[String: String]字典中
public struct DictionarySetter: Setter {
    public func set(carrier: inout [String: String], key: String, value: String) {
        carrier[key] = value
    }
}

在Swift端,最容易犯的错误是忘记在异步回调中正确地结束SpanwebSocketTask?.send的回调是异步的,必须确保无论成功还是失败,span.end()都会被调用,否则就会产生悬挂的Span,导致链路数据不完整。

TypeScript前端:为Web应用带来一致的追踪体验

Web前端使用TypeScript,并集成OpenTelemetry的Web SDK。这个SDK能自动追踪页面加载、XHR/Fetch请求,但对于WebSocket,同样需要手动埋点。

首先,初始化Web Tracer。

// src/telemetry.ts
import { WebTracerProvider } from '@opentelemetry/sdk-trace-web';
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { W3CTraceContextPropagator } from '@opentelemetry/core';
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { getWebAutoInstrumentations } from '@opentelemetry/auto-instrumentations-web';

const provider = new WebTracerProvider({
  resource: {
    'service.name': 'typescript-web-client',
  },
});

provider.addSpanProcessor(new BatchSpanProcessor(new OTLPTraceExporter({
  url: 'http://localhost:4318/v1/traces',
})));

provider.register({
  contextManager: new ZoneContextManager(),
  propagator: new W3CTraceContextPropagator(),
});

registerInstrumentations({
  instrumentations: [
    getWebAutoInstrumentations(),
  ],
});

export const tracer = provider.getTracer('com.example.websocket', '1.0.0');

这里的ZoneContextManager至关重要,它利用zone.js来确保在浏览器的异步事件(如setTimeout, Promise)中,追踪上下文能够正确传递。

然后,是WebSocketManager的实现。

// src/WebSocketManager.ts
import { tracer } from './telemetry';
import { trace, context, propagation, SpanKind, SpanStatusCode } from '@opentelemetry/api';

class WebSocketManager {
  private socket: WebSocket | null = null;

  public connect(url: string): void {
    this.socket = new WebSocket(url);

    this.socket.onopen = () => {
      console.log('WebSocket connection established.');
    };

    this.socket.onmessage = (event) => {
      console.log('Received message:', event.data);
      // 这里同样可以解析响应的traceparent
    };

    this.socket.onclose = () => {
      console.log('WebSocket connection closed.');
    };

    this.socket.onerror = (error) => {
      console.error('WebSocket error:', error);
    };
  }

  public send(command: string, data: object): void {
    if (!this.socket || this.socket.readyState !== WebSocket.OPEN) {
      console.error('WebSocket is not connected.');
      return;
    }

    // 1. 创建一个新的Span
    const span = tracer.startSpan('websocket.message.send', {
      kind: SpanKind.CLIENT,
    });

    // 2. 将Span设为激活状态,后续操作会在此Span的上下文中执行
    context.with(trace.setSpan(context.active(), span), () => {
      try {
        const metadata: Record<string, string> = {};

        // 3. 注入追踪上下文
        propagation.inject(context.active(), metadata);
        
        span.setAttribute('websocket.payload.command', command);

        const messageEnvelope = {
          metadata,
          payload: { command, data },
        };

        const messageString = JSON.stringify(messageEnvelope);
        this.socket!.send(messageString);

        span.setStatus({ code: SpanStatusCode.OK });
      } catch (error) {
        span.recordException(error as Error);
        span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
      } finally {
        // 4. 确保Span被结束
        span.end();
      }
    });
  }
}

export const webSocketManager = new WebSocketManager();

TypeScript端的逻辑与Swift端非常相似。context.with()函数确保了在同步代码块内,span是激活的。这使得propagation.inject能够获取到正确的当前上下文。一个常见的疏忽是在catch块中忘记记录异常和设置Span的状态,这会导致在追踪系统中,一次失败的操作看起来像是成功的。

方案的局限性与未来迭代路径

这套基于应用层协议的追踪方案,成功地将我们的异构系统串联了起来。现在,任何一条WebSocket消息的完整生命周期都可以在Jaeger或Zipkin中一览无余。

然而,这个方案并非没有成本。首先,它为每条消息都增加了metadata的开销,对于高频、小包的场景,这个开销比例可能不容忽视。在这种情况下,可以考虑使用更紧凑的二进制序列化格式,如Protobuf,来代替JSON。

其次,当前的追踪粒度在“消息”层面。WebSocket连接的建立、断开和异常本身并未被完全纳入追踪链路。一个更完善的方案应该为整个WebSocket会话(session)创建一个父Span,而每一条消息的收发作为其子Span。这需要在客户端和服务端的连接生命周期事件中进行更深度的埋点。

最后,此方案依赖于所有端点都遵循“信封”协议。如果系统中存在一个不遵循此协议的遗留组件或第三方服务,链路就会在此处中断。这要求在架构层面有强制的规范和代码评审来保证协议的一致性。


  目录