Constructing a Full-Stack Observability Pipeline for a Valtio-Driven iOS App with Sanic and Fluentd


The core technical pain point was a complete lack of visibility into the user journey. Our iOS application, built on React Native, uses Valtio for state management. Users would report intermittent failures or extreme latency, and our support teams were flying blind. We had backend logs from our Sanic services and client-side error reports, but they were disjointed silos of information. Correlating a specific user’s sequence of UI interactions—represented by rapid-fire Valtio state changes—with a subsequent backend error or slow API response was impossible. We were debugging with anecdotes instead of data. The goal was to build a system that could trace a single logical operation from the moment a user taps a button on their iPhone, through the corresponding Valtio state mutation, across the network to our Sanic API, and finally visualize the entire distributed trace, including associated logs, in one unified Datadog view.

Our initial concept revolved around creating a high-fidelity, context-aware observability pipeline. Technology selection was driven by this requirement and our existing stack.

  • Sanic: Chosen for its high-throughput, async capabilities, which are essential for our real-time features. Any observability solution had to integrate seamlessly with its event loop without introducing significant blocking I/O.
  • Valtio: Its proxy-based architecture was the linchpin of our client-side strategy. Unlike other state managers, Valtio allows for transparent interception of state mutations. We theorized we could wrap the state proxy to automatically generate trace spans for critical user interactions without littering the UI code with manual instrumentation calls.
  • Fluentd: In a production environment, having our Sanic application send logs directly to the Datadog API introduces unacceptable coupling and risk. A network blip or a rate-limiting issue at the Datadog endpoint could block our web workers or cause log loss. Fluentd serves as a robust, asynchronous, and persistent buffer. It runs as a sidecar or daemon, accepts logs over a local network socket, and handles the complexities of batching, retrying, and forwarding to Datadog. This decouples the application’s lifecycle from the logging backend’s availability.
  • Datadog: We needed a platform that could ingest and correlate three distinct data types: Real User Monitoring (RUM) events from the iOS client, APM traces from the Sanic backend, and structured logs from Fluentd. Datadog’s ability to automatically link these via trace and span IDs was the primary reason for its selection.

The implementation began with the backend, establishing a solid foundation for tracing and structured logging.

1. Sanic Backend Instrumentation and Structured Logging

The first step is to configure the Sanic application to participate in distributed tracing and to emit logs in a structured JSON format that includes tracing identifiers.

We use ddtrace-py for automatic instrumentation and structlog for flexible, structured logging. The key is to configure structlog to automatically inject the active Datadog trace and span IDs into every log record.

# file: sanic_app/app.py
import asyncio
import logging
import sys
from os import getenv

from sanic import Sanic, response
from sanic.log import logger

import structlog
from ddtrace import patch_all, tracer
from ddtrace.contrib.sanic import TracingSanic

# --- Datadog Configuration ---
# In a real project, these would come from environment variables.
# DD_AGENT_HOST, DD_LOGS_INJECTION=true, DD_SERVICE, DD_ENV
patch_all()

# --- Structlog Configuration ---
# This setup is critical for connecting logs to traces in Datadog.
def configure_structlog():
    """
    Configures structlog to output JSON with Datadog trace correlation.
    """
    logging.basicConfig(
        format="%(message)s",
        stream=sys.stdout,
        level=logging.INFO,
    )

    # These processors run in order for each log record.
    processors = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        # This is the magic part that adds dd.trace_id and dd.span_id
        structlog.processors.format_exc_info,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ]

    structlog.configure(
        processors=processors,
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )

# Apply the configuration at application startup
configure_structlog()
log = structlog.get_logger()

# Create a Sanic app with Datadog tracing enabled
app = TracingSanic("sanic-datadog-app")

@app.middleware("request")
async def inject_request_id(request):
    """
    Example of binding context to logs for a request's lifetime.
    """
    structlog.contextvars.bind_contextvars(
        http_method=request.method,
        http_path=request.path,
        # Other useful context can be added here
    )

@app.route("/api/user/profile", methods=["POST"])
@tracer.trace(name="user.profile.update", service="user-api", resource="/api/user/profile")
async def update_user_profile(request):
    """
    A simulated endpoint that performs some async work and logs progress.
    """
    user_id = request.json.get("userId")
    if not user_id:
        log.warning("update_user_profile.missing_userid", request_body=request.json)
        return response.json({"error": "userId is required"}, status=400)

    log.info(
        "update_user_profile.start",
        user_id=user_id,
        new_profile_data_keys=list(request.json.keys())
    )

    try:
        # Simulate a slow async database call
        await perform_database_update(user_id)
        # Simulate a call to another microservice
        await notify_analytics_service(user_id)

    except Exception as e:
        log.error("update_user_profile.failed", user_id=user_id, exc_info=True)
        # The tracer will automatically mark the span as an error
        return response.json({"status": "failed", "error": str(e)}, status=500)

    log.info("update_user_profile.success", user_id=user_id)
    return response.json({"status": "success", "userId": user_id})


@tracer.trace(name="db.update_user")
async def perform_database_update(user_id: str):
    """
    Simulates an async I/O operation like a database write.
    Custom tracing gives us visibility into this specific step.
    """
    log.debug("db.update.begin", user_id=user_id, db_table="users")
    await asyncio.sleep(0.15)  # Simulate network latency to DB
    log.info("db.update.complete", user_id=user_id, rows_affected=1)


@tracer.trace(name="analytics.notify")
async def notify_analytics_service(user_id: str):
    """
    Simulates an async HTTP call to another internal service.
    """
    log.debug("analytics.notify.begin", user_id=user_id)
    await asyncio.sleep(0.05)
    log.info("analytics.notify.complete", user_id=user_id)


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, debug=False, access_log=False)

The critical detail here is the structlog configuration. By default, ddtrace only patches the standard logging module. structlog needs to be explicitly configured to be aware of the context variables that ddtrace uses. When DD_LOGS_INJECTION=true is enabled, the ddtrace library automatically injects the necessary IDs. Running this application now produces JSON logs that look like this, ready to be shipped by Fluentd:

{
  "log_level": "info",
  "logger_name": "__main__",
  "event": "update_user_profile.start",
  "timestamp": "2023-10-27T10:45:12.123456Z",
  "http_method": "POST",
  "http_path": "/api/user/profile",
  "user_id": "user-abc-123",
  "new_profile_data_keys": ["userId", "displayName"],
  "dd.trace_id": "4825132948192236511",
  "dd.span_id": "7721652484439113925"
}

2. Fluentd Configuration for Log Aggregation and Forwarding

Next, we set up Fluentd to act as the log forwarder. In a containerized environment like Kubernetes, the application container would log to stdout, and Fluentd would run as a DaemonSet, collecting these logs. For this example, we’ll configure Fluentd to listen on a TCP port.

The Sanic application needs a minor change to its logging configuration to forward logs instead of writing to stdout. We’d add a logging.handlers.SocketHandler for production use. For simplicity in this example, we assume container orchestration handles the stdout capture.

Here is the fluent.conf file. It’s designed for robustness.

# file: fluentd/fluent.conf

# -------------------------------------------
# Source: Listen for logs from our Sanic app
# -------------------------------------------
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

# -------------------------------------------
# Filter: Parse and process the incoming logs
# -------------------------------------------
<filter sanic.app.**>
  @type parser
  key_name log
  reserve_data true
  <parse>
    @type json
  </parse>
</filter>

# Add kubernetes metadata if running in K8s (highly recommended for production)
# <filter sanic.app.**>
#   @type kubernetes_metadata
# </filter>

# -------------------------------------------
# Output: Send logs to Datadog
# -------------------------------------------
<match sanic.app.**>
  @type datadog
  @id datadog_output

  api_key <YOUR_DATADOG_API_KEY>
  
  # Use HTTPS for security
  use_ssl true
  
  # These tags will be applied to all logs from this source
  dd_source "python"
  dd_tags "env:staging,app:sanic-backend"

  # Service name mapping. Connects logs to the APM service.
  # This MUST match the DD_SERVICE name in the Sanic app.
  <service>
    sanic-datadog-app
  </service>
  
  # --- Buffer Configuration (CRITICAL for production) ---
  # This prevents data loss and stalls in the application.
  <buffer>
    @type file
    path /fluentd/log/buffer/datadog
    
    # Flush logs to Datadog every 5 seconds
    flush_interval 5s
    
    # Try to flush at least once per minute in quiet periods
    flush_at_shutdown true
    
    # Use larger chunks for better network efficiency
    chunk_limit_size 8m
    
    # Keep up to 512MB of logs buffered on disk if Datadog is down
    total_limit_size 512m
    
    # Retry logic for network failures
    retry_type exponential_backoff
    retry_wait 1s
    retry_max_interval 60s
    retry_timeout 12h
  </buffer>
</match>

# For debugging: output matched logs to stdout as well
<match **>
 @type stdout
</match>

This configuration is production-grade. The <buffer> section is non-negotiable for a real-world project. It ensures that if the Fluentd instance cannot reach the Datadog API, logs are spooled to disk and retried later, preventing data loss and backpressure on the Sanic application.

3. iOS Client Instrumentation with Valtio and Datadog RUM

This is where the observability chain originates. The core idea is to create a utility function that wraps our Valtio state proxy. This wrapper will use subscribe from valtio/utils to listen for changes and automatically generate Datadog RUM actions.

First, set up the Datadog RUM SDK in a React Native project.

// file: ios_app/services/datadog.ts
import { DatadogRum, RumFetch } from '@datadog/mobile-react-native';

const DATADOG_CONFIG = {
  clientToken: '<YOUR_RUM_CLIENT_TOKEN>',
  env: 'staging',
  applicationId: '<YOUR_RUM_APPLICATION_ID>',
  trackInteractions: true,
  trackResources: true,
  trackErrors: true,
  // This is required to link RUM sessions with APM traces
  firstPartyHosts: ['your-api-backend.com'],
};

export const initializeDatadog = async () => {
  await DatadogRum.initialize(DATADOG_CONFIG);
  
  // Start tracking user sessions
  DatadogRum.startView('main', 'MainApplicationView');
};

// Global fetch instrumentation
const { fetch: instrumentedFetch } = RumFetch.get(global);
global.fetch = instrumentedFetch;

Now for the Valtio wrapper. This is the custom piece of our solution.

// file: ios_app/state/tracedState.ts
import { proxy, ref } from 'valtio';
import { subscribeKey } from 'valtio/utils';
import { DatadogRum } from '@datadog/mobile-react-native';

type ActionContext = Record<string, string | number | boolean>;

interface TracedStateOptions<T extends object> {
  // Name for the state slice, used in action names
  name: string;
  // Function to derive context from state for richer actions
  getContext?: (state: T) => ActionContext;
}

/**
 * Creates a Valtio proxy that automatically creates Datadog RUM actions on mutations.
 * @param initialState The initial state object.
 * @param options Configuration for tracing.
 * @returns A Valtio proxy with subscribed tracing.
 */
export function createTracedState<T extends object>(
  initialState: T,
  options: TracedStateOptions<T>
) {
  const state = proxy<T>(initialState);
  const stateName = options.name;

  // The pitfall here is subscribing to the root. For deeply nested objects,
  // this can be noisy. Subscribing to specific keys is more practical.
  for (const key in initialState) {
    if (Object.prototype.hasOwnProperty.call(initialState, key)) {
      // subscribeKey is more efficient than a global subscribe
      subscribeKey(state, key as keyof T, (value) => {
        const actionName = `${stateName}.${key as string}.Update`;
        console.log(`[TRACED STATE] Firing action: ${actionName}`, { value });

        let context: ActionContext = {};
        if (options.getContext) {
          context = options.getContext(state);
        }
        
        // Add the new value to the context if it's a primitive
        if (typeof value !== 'object' && value !== null) {
          context.newValue = value;
        }

        DatadogRum.addAction('Tap', actionName, context, Date.now());
      });
    }
  }

  return state;
}

// --- Example Usage ---

interface UserProfile {
  id: string | null;
  displayName: string;
  isLoading: boolean;
  // ref() is used to mark objects that should not be proxied, like API clients
  apiClient: any; 
}

// The API client that will make the call to our Sanic backend
const apiClient = {
  updateProfile: async (id: string, name: string) => {
    // global.fetch is instrumented by Datadog RUM.
    // It will automatically add x-datadog-* headers to this request.
    const response = await fetch('http://<YOUR_SANIC_API_IP>:8000/api/user/profile', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({ userId: id, displayName: name }),
    });
    if (!response.ok) {
      throw new Error('Failed to update profile');
    }
    return response.json();
  },
};


const userProfileInitialState: UserProfile = {
  id: 'user-abc-123',
  displayName: 'Jane Doe',
  isLoading: false,
  apiClient: ref(apiClient), // Use ref() for non-state objects
};

export const userProfileState = createTracedState(userProfileInitialState, {
  name: 'UserProfile',
  getContext: (state) => ({
    userId: state.id ?? 'unknown',
  }),
});

// This is the function our React component would call
export async function updateDisplayName(newName: string) {
  if (!userProfileState.id) return;
  
  // This mutation is NOT traced directly because it's a precursor.
  userProfileState.isLoading = true;

  try {
    // The API call will be part of the active trace initiated by the next state change
    await userProfileState.apiClient.updateProfile(userProfileState.id, newName);

    // THIS mutation is the one that gets traced as the primary user action
    userProfileState.displayName = newName;

  } catch (error) {
    DatadogRum.addError(error as Error, 'UpdateDisplayName');
  } finally {
    userProfileState.isLoading = false;
  }
}

4. The Complete Flow in Action

When a user types a new name in a text field and presses “Save” in the UI, the updateDisplayName('John Doe') function is called.

  1. Client-Side (iOS):
    • userProfileState.isLoading is set to true. Our current wrapper implementation will fire a UserProfile.isLoading.Update action. A more refined version might filter out such noisy, transient state changes.
    • apiClient.updateProfile is called. The instrumented fetch function from Datadog RUM injects headers like x-datadog-trace-id and x-datadog-parent-id into the outgoing HTTP POST request to /api/user/profile.
    • The request is sent to the Sanic backend.
  2. Backend (Sanic):
    • The Sanic app receives the request. The ddtrace middleware detects the incoming trace headers and continues the distributed trace instead of starting a new one. The trace_id is identical to the one from the client.
    • The update_user_profile endpoint runs. Its logs, such as "update_user_profile.start", are generated by structlog. Because this code is executing within the trace context, structlog automatically enriches the JSON log with dd.trace_id and dd.span_id.
    • The logs are sent to Fluentd.
  3. Log Pipeline (Fluentd):
    • Fluentd receives the JSON log, buffers it, and forwards it to the Datadog Logs API endpoint.
  4. Client-Side (iOS again):
    • The API call succeeds. The line userProfileState.displayName = newName; is executed.
    • Our createTracedState wrapper’s subscribeKey callback fires.
    • It creates a new Datadog RUM action named "UserProfile.displayName.Update" with the context { userId: 'user-abc-123', newValue: 'John Doe' }. This action is now part of the same RUM session and is linked to the APM trace via the SDK’s internal context.

The result in Datadog is a complete, unified view. You can open the RUM session for the user, see the UserProfile.displayName.Update action, and from there, seamlessly click through to the backend APM trace. Within that trace, you can see the breakdown of time spent—user.profile.update, db.update_user, analytics.notify—and, critically, the correlated log entries for that specific request will appear in the “Logs” tab of the trace view. The debugging mystery is solved.

This architecture, while effective, has its own set of limitations and requires careful management in a production setting. The createTracedState wrapper is a powerful but blunt instrument; instrumenting every key on a large state object can generate a high volume of RUM actions, leading to increased costs and noise. A more advanced implementation would require a denylist or allowlist to specify which state mutations are significant enough to warrant a trace. Furthermore, handling complex asynchronous flows where state is updated in response to events like WebSocket messages requires more sophisticated context propagation on the client to ensure the correct RUM action is associated with the originating event. The next iteration would focus on building a more configurable and context-aware state tracing utility, potentially leveraging OpenTelemetry to remain vendor-agnostic at the instrumentation layer.


  TOC