Constructing a Resilient Configuration Plane for Android Edge LLMs via etcd and TimescaleDB Telemetry


The initial deployment of our on-device LLM across a fleet of Android devices felt like a success. The model, a quantized 3B parameter variant, handled localized NLP tasks without constant server roundtrips. But within weeks, the operational telemetry painted a grim picture. Performance was erratic. A configuration that worked flawlessly on a high-end device with ample RAM caused constant crashes on older hardware. Prompt templates optimized for one user cohort performed poorly for another. The static, one-size-fits-all configuration model was a clear failure. We needed a closed-loop, self-optimizing system where each device could be tuned dynamically based on its own real-world performance data.

Our first whiteboard sketch outlined the desired flow:

  1. Android devices collect and periodically report performance telemetry (e.g., query latency, tokens per second, memory usage).
  2. A central database ingests and aggregates this time-series data.
  3. An analysis service processes the aggregated data to generate new, optimized configurations.
  4. A delivery mechanism pushes these new configurations back to the specific devices that need them, reliably and with minimal latency.

The choice for the telemetry database was straightforward. We were dealing with timestamped performance metrics from thousands of sources, a classic time-series workload. We chose TimescaleDB. Its hypertable abstraction on top of PostgreSQL gave us the powerful analytical capabilities of SQL for complex cohort analysis, combined with the performance characteristics of a purpose-built TSDB. A simple schema was enough to get started. In a real-world project, you’d add more metadata, but the core is the time-series measurement.

-- DDL for the core device telemetry table in TimescaleDB
CREATE TABLE device_telemetry (
    ts              TIMESTAMPTZ       NOT NULL,
    device_id       TEXT              NOT NULL,
    model_version   TEXT              NOT NULL,
    -- Performance Metrics
    query_latency_ms    INT               NOT NULL,
    tokens_per_second   DOUBLE PRECISION  NOT NULL,
    peak_memory_mb      INT               NOT NULL,
    -- Contextual Tags for Cohort Analysis
    device_class        TEXT, -- e.g., 'high-end', 'mid-range'
    app_version         TEXT
);

-- Convert the table into a TimescaleDB hypertable, partitioned by time.
-- This is the magic that makes time-series queries fast.
SELECT create_hypertable('device_telemetry', 'ts', chunk_time_interval => INTERVAL '1 day');

-- Add compression to save space on older data. A must for production.
ALTER TABLE device_telemetry SET (
  timescaledb.compress,
  timescaledb.compress_segmentby = 'device_id, model_version'
);

-- Set a policy to automatically compress data older than 7 days.
SELECT add_compression_policy('device_telemetry', compress_after => INTERVAL '7 days');

-- Example Index for common query patterns
CREATE INDEX ON device_telemetry (device_id, ts DESC);

The real architectural contention arose around the configuration delivery mechanism. The initial, naive suggestion was a standard REST endpoint on the device that would poll for updates every few minutes. This was immediately discarded. Polling from thousands of battery-constrained devices is a recipe for disaster—it’s inefficient, creates thundering herd problems, and isn’t real-time.

The next proposal was to use WebSockets or an MQTT broker. This was a much better approach, providing persistent connections for server-side pushes. However, it introduced its own complexity. We would need to manage the lifecycle of these connections across a potentially distributed fleet of gateway servers. Ensuring exactly-once delivery, handling configuration versioning, and preventing split-brain scenarios where different gateways might have different states of the “correct” configuration would require us to build a significant amount of coordination logic from scratch.

This is where we made an unconventional choice: etcd. Known as the coordination backbone of Kubernetes, etcd is a distributed, consistent key-value store. Its core strength isn’t just storing data, but its Watch API. A client can watch a key or a key prefix and receive a stream of notifications for every change. This was precisely the push mechanism we needed, with consistency and versioning built-in. The pitfall here is that etcd is designed for a few dozen or hundred trusted servers in a cluster, not for direct exposure to thousands or millions of untrusted mobile clients.

Our solution was a bridge: a lightweight, stateless gRPC gateway service written in Go. This gateway would terminate gRPC connections from the Android devices and multiplex them into a smaller number of long-lived Watch streams against the etcd cluster. This architecture confines the direct etcd client connections to our trusted backend, while providing a modern, efficient, and scalable RPC mechanism for the mobile clients.

graph TD
    subgraph Backend Infrastructure
        A[etcd Cluster]
        B(TimescaleDB)
        C{Analysis Service 
Python + LLM} D[gRPC Gateway
Go] end subgraph Android Device Fleet E[Device 1] F[Device 2] G[...] H[Device N] end E -- Telemetry (HTTP/POST) --> B F -- Telemetry (HTTP/POST) --> B H -- Telemetry (HTTP/POST) --> B C -- Reads Aggregated Data --> B C -- Writes New Config --> A A -- Watch Events --> D D -- gRPC Stream --> E D -- gRPC Stream --> F D -- gRPC Stream --> H

The gRPC gateway became the linchpin of the system. Its responsibility is simple but critical: translate etcd watch events into a gRPC stream for a specific device.

Here is the protobuf definition for the gateway’s API:

// proto/config_watcher.proto
syntax = "proto3";

package configwatcher;

option go_package = "gen/configwatcher";

// The service exposed by our Go gateway
service ConfigWatcher {
  // A client establishes a long-lived stream to watch for its configuration.
  rpc Watch(WatchRequest) returns (stream WatchResponse);
}

message WatchRequest {
  string device_id = 1;
  // Could include auth tokens, current config version, etc.
  string auth_token = 2; 
}

message WatchResponse {
  // The full configuration payload, typically a JSON string.
  string config_json = 1;
  // The etcd modification revision. The client can use this to
  // ignore stale or out-of-order updates.
  int64 revision = 2;
}

And the core logic of the Go gateway server. The critical section is managing the lifecycle of the etcd watch and piping events to the gRPC stream, with robust error handling for both sides of the connection.

// cmd/gateway/main.go
package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"os"
	"os/signal"
	"syscall"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	pb "path/to/gen/configwatcher" // Generated protobuf code
	clientv3 "go.etcd.io/etcd/client/v3"
)

const (
	etcdKeyPrefix = "/configs/devices/"
)

// server implements the ConfigWatcher service.
type server struct {
	pb.UnimplementedConfigWatcherServer
	etcdClient *clientv3.Client
}

// newServer creates a new server instance.
func newServer(etcdEndpoints []string) (*server, error) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   etcdEndpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to connect to etcd: %w", err)
	}
	return &server{etcdClient: cli}, nil
}

// Watch is the core RPC method. It establishes a watch on etcd for a device's config
// and streams updates back to the Android client.
func (s *server) Watch(req *pb.WatchRequest, stream pb.ConfigWatcher_ServerStream) error {
	if req.DeviceId == "" {
		return status.Error(codes.InvalidArgument, "device_id is required")
	}
    // In a real project, auth_token would be validated here.

	ctx := stream.Context()
	deviceKey := etcdKeyPrefix + req.DeviceId
	log.Printf("Starting watch for device: %s on key: %s", req.DeviceId, deviceKey)

	// First, get the current value to send immediately upon connection.
	resp, err := s.etcdClient.Get(ctx, deviceKey)
	if err != nil {
		log.Printf("Error getting initial config for %s: %v", req.DeviceId, err)
		return status.Errorf(codes.Internal, "could not fetch initial config")
	}

	// Send initial config if it exists
	if len(resp.Kvs) > 0 {
		kv := resp.Kvs[0]
		err := stream.Send(&pb.WatchResponse{
			ConfigJson: string(kv.Value),
			Revision:   kv.ModRevision,
		})
		if err != nil {
			log.Printf("Error sending initial config to %s: %v", req.DeviceId, err)
			return status.Errorf(codes.Internal, "connection closed by client")
		}
	}

	// Start watching from the revision AFTER the initial GET.
	watchChan := s.etcdClient.Watch(ctx, deviceKey, clientv3.WithRev(resp.Header.Revision+1))

	for {
		select {
		case <-ctx.Done():
			log.Printf("Client %s disconnected, closing watch stream.", req.DeviceId)
			return ctx.Err()
		case watchResp, ok := <-watchChan:
			if !ok {
				log.Printf("etcd watch channel closed for device %s. Re-establishing may be needed.", req.DeviceId)
				return status.Errorf(codes.Aborted, "etcd watch channel closed")
			}
			if err := watchResp.Err(); err != nil {
				log.Printf("etcd watch error for %s: %v", req.DeviceId, err)
				return status.Errorf(codes.Internal, "etcd watch error: %v", err)
			}

			for _, ev := range watchResp.Events {
				// We only care about PUT events (creations or updates)
				if ev.Type == clientv3.EventTypePut {
					log.Printf("Detected config change for %s at revision %d", req.DeviceId, ev.Kv.ModRevision)
					err := stream.Send(&pb.WatchResponse{
						ConfigJson: string(ev.Kv.Value),
						Revision:   ev.Kv.ModRevision,
					})
					// If sending fails, the client has likely disconnected.
					if err != nil {
						log.Printf("Failed to send update to %s: %v. Terminating stream.", req.DeviceId, err)
						return status.Errorf(codes.Unavailable, "client stream unavailable")
					}
				}
			}
		}
	}
}

func main() {
	etcdEndpoints := []string{"localhost:2379"} // From config in production
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	s := grpc.NewServer()
	srv, err := newServer(etcdEndpoints)
	if err != nil {
		log.Fatalf("failed to create server: %v", err)
	}
	defer srv.etcdClient.Close()

	pb.RegisterConfigWatcherServer(s, srv)
	log.Printf("gRPC gateway listening at %v", lis.Addr())

	// Graceful shutdown
	go func() {
		if err := s.Serve(lis); err != nil {
			log.Fatalf("failed to serve: %v", err)
		}
	}()

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	log.Println("Shutting down server...")
	s.GracefulStop()
}

On the Android side, we built a ConfigurationRepository using Kotlin Coroutines and Flow to manage the gRPC stream and expose the configuration as a reactive data source to the rest of the app.

// Android client-side implementation
import android.content.Context
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import io.grpc.StatusRuntimeException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.flow.cancellable
import java.util.concurrent.TimeUnit
import com.google.gson.Gson // Or any other JSON library

// Represents the parsed configuration
data class LLMConfig(
    val promptTemplate: String = "default_template",
    val contextWindowSize: Int = 2048
)

// A singleton repository to manage the config lifecycle
class ConfigurationRepository(
    private val context: Context,
    private val deviceId: String,
    private val gatewayHost: String,
    private val gatewayPort: Int
) {
    private val repositoryScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
    private var channel: ManagedChannel? = null

    private val _configState = MutableStateFlow(LLMConfig()) // Start with a safe default
    val configState = _configState.asStateFlow()
    
    private val gson = Gson()

    init {
        startWatching()
    }

    private fun startWatching() {
        repositoryScope.launch {
            while (true) { // Retry loop
                try {
                    val currentChannel = getOrCreateChannel()
                    val stub = ConfigWatcherGrpc.newStub(currentChannel)
                    val request = WatchRequest.newBuilder()
                        .setDeviceId(deviceId)
                        .setAuthToken("dummy-auth-token") // Use real auth
                        .build()

                    stub.watch(request).cancellable().collect { response ->
                        // A common mistake is to perform heavy processing on this collector thread.
                        // Here we just parse and update the StateFlow.
                        try {
                            val newConfig = gson.fromJson(response.configJson, LLMConfig::class.java)
                            _configState.value = newConfig
                            // Log success
                        } catch (e: Exception) {
                            // Log JSON parsing error
                        }
                    }
                } catch (e: StatusRuntimeException) {
                    // Log gRPC error (e.g., UNAVAILABLE when server is down)
                    channel?.shutdownNow()
                    channel = null
                } catch (e: Exception) {
                    // Log other exceptions
                }
                
                // Exponential backoff before retrying
                delay(5000)
            }
        }
    }

    private fun getOrCreateChannel(): ManagedChannel {
        return channel ?: synchronized(this) {
            channel ?: ManagedChannelBuilder.forAddress(gatewayHost, gatewayPort)
                .usePlaintext() // Use TLS in production!
                .keepAliveTime(30, TimeUnit.SECONDS)
                .keepAliveWithoutCalls(true)
                .build().also { channel = it }
        }
    }

    fun shutdown() {
        repositoryScope.cancel()
        channel?.shutdown()?.awaitTermination(5, TimeUnit.SECONDS)
    }
}

Finally, the analysis service. This was a Python script designed to run periodically. It would query TimescaleDB for underperforming device cohorts, format the findings into a structured prompt, and ask a larger, server-side LLM (we used a fine-tuned Llama-3 8B model) to act as a “systems administrator” and propose a new JSON configuration. This introduces a layer of non-deterministic, AI-driven optimization into our control loop.

# analysis/optimizer.py
import os
import json
import psycopg2
import etcd3
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM

# --- Configuration ---
TIMESCALE_CONN_STRING = "..."
ETCD_HOST = "localhost"
ETCD_PORT = 2379
LLM_MODEL_PATH = "path/to/finetuned-llama3-8b"

# --- Setup Clients ---
etcd_client = etcd3.client(host=ETCD_HOST, port=ETCD_PORT)
# In a real project, the LLM would be behind an API, but for demonstration:
tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL_PATH)
model = AutoModelForCausalLM.from_pretrained(LLM_MODEL_PATH)
llm_pipeline = pipeline("text-generation", model=model, tokenizer=tokenizer)

def get_underperforming_devices():
    """Queries TimescaleDB for devices with high latency in the last hour."""
    conn = psycopg2.connect(TIMESCALE_CONN_STRING)
    cur = conn.cursor()
    
    query = """
    SELECT 
        device_id, 
        AVG(query_latency_ms) as avg_latency, 
        AVG(tokens_per_second) as avg_tps
    FROM device_telemetry
    WHERE ts > NOW() - INTERVAL '1 hour'
    GROUP BY device_id
    HAVING AVG(query_latency_ms) > 1500; -- Arbitrary threshold
    """
    
    cur.execute(query)
    devices = cur.fetchall()
    cur.close()
    conn.close()
    return devices

def generate_new_config_with_llm(device_id, telemetry_data):
    """Uses an LLM to generate a new configuration based on telemetry."""
    
    prompt = f"""
    You are an expert system administrator for an on-device LLM.
    A device with ID '{device_id}' is underperforming.
    
    Current Telemetry (last hour average):
    - Average Latency: {telemetry_data['avg_latency']:.2f} ms
    - Average Tokens/Second: {telemetry_data['avg_tps']:.2f}
    
    The current configuration is likely too demanding. Propose a new, less resource-intensive
    configuration. Your response MUST be a single, valid JSON object with keys
    "promptTemplate" and "contextWindowSize".
    
    A smaller contextWindowSize (e.g., 1024) reduces memory usage and may improve latency.
    The promptTemplate could be simplified.
    
    Example valid response:
    {{
      "promptTemplate": "simplified_v2",
      "contextWindowSize": 1024
    }}
    
    Generate the new configuration now:
    """
    
    response = llm_pipeline(prompt, max_new_tokens=100, num_return_sequences=1)
    
    # The pitfall here is that LLMs can return text that isn't valid JSON.
    # Robust parsing and validation are absolutely critical.
    try:
        # Extract JSON from the potentially messy output
        json_str = response[0]['generated_text'].split('{')[-1].split('}')[0]
        json_str = '{' + json_str + '}'
        config = json.loads(json_str)
        
        # Basic validation
        if "contextWindowSize" in config and isinstance(config["contextWindowSize"], int):
             return config
        else:
             return None
    except (json.JSONDecodeError, IndexError):
        print(f"LLM generated invalid JSON for device {device_id}")
        return None

def apply_config_to_etcd(device_id, config):
    """Writes the new configuration to etcd, triggering the watch."""
    key = f"/configs/devices/{device_id}"
    value = json.dumps(config)
    print(f"Updating config for {device_id}: {value}")
    etcd_client.put(key, value)

def main():
    devices = get_underperforming_devices()
    for device_id, avg_latency, avg_tps in devices:
        telemetry = {"avg_latency": avg_latency, "avg_tps": avg_tps}
        new_config = generate_new_config_with_llm(device_id, telemetry)
        
        if new_config:
            apply_config_to_etcd(device_id, new_config)

if __name__ == "__main__":
    main()

The system now functions as a complete feedback loop. Poor performance on a device leads to telemetry being sent to TimescaleDB. The analysis service detects this, uses an LLM to generate a corrective configuration, and writes it to etcd. The etcd watch fires, our Go gateway pushes the update over gRPC, and the Android client reconfigures its local model in near real-time.

This architecture, while functional, is not without its own set of production challenges. The gRPC gateway, though stateless, is a potential performance bottleneck and needs to be horizontally scaled behind a load balancer. A common mistake would be to use a standard L4 load balancer, which would break the long-lived gRPC streams. An L7 load balancer configured for gRPC is required. Furthermore, the “admin LLM” is a source of non-determinism; it could produce harmful or nonsensical configurations. A safer approach would involve a canarying stage, where a new configuration is first pushed to a small percentage of devices, and its impact is monitored in TimescaleDB before a fleet-wide rollout is approved, potentially with a human-in-the-loop. Lastly, the security model is currently non-existent; the gateway-client channel must be secured with mTLS, and the gateway must perform robust authentication and authorization to ensure a compromised device cannot request configurations for another. The path to a truly production-grade system requires addressing these resilience and security dimensions.


  TOC