Implementing Serverless Backpressure for an iOS Telemetry Ingestion Pipeline from Kafka to ScyllaDB


Our initial iOS telemetry pipeline was a textbook example of a system that worked perfectly in staging but crumbled under the chaotic reality of production traffic. Millions of devices reported analytics events directly to a cluster of REST API endpoints backed by a monolithic service. During peak usage—typically app launch or following a major marketing push—the service would experience cascading failures. The database connections would saturate, response times would skyrocket, and the load balancer would start dropping requests. We were losing valuable data and, more importantly, confidence in our infrastructure.

The mandate was clear: build a new ingestion pipeline that was fundamentally elastic, resilient to massive traffic spikes, and decoupled from the core application services. The initial architectural sketch was straightforward, leveraging a common event-driven pattern.

graph TD
    A[iOS Devices] -->|HTTP POST| B(Ingestion Endpoint);
    B --> C{Apache Kafka};
    C --> D[Processing Layer];
    D --> E(ScyllaDB Cluster);

The choice of technologies was driven by specific, pragmatic requirements.

  • Apache Kafka: Selected as the central buffer. Its ability to absorb enormous write bursts and persist data with high durability provides the decoupling we desperately needed. It transforms an unpredictable, spiky workload into a more manageable, continuous stream for downstream consumers.
  • ScyllaDB: Chosen for its raw performance. We needed a database capable of sustaining millions of writes per second with predictably low latency. Its Cassandra-compatible API and focus on kernel-level optimization made it a better fit for our high-throughput needs than other NoSQL options.
  • Google Cloud Functions (GCF): The processing layer choice was less obvious. A fleet of VMs or a Kubernetes deployment were considered, but the spiky nature of our traffic made a serverless approach compelling from a cost perspective. GCF allows us to scale from zero to thousands of instances automatically, paying only for the compute we actively use.
  • Pulumi: We committed to managing this entire stack via Infrastructure as Code. Pulumi, using TypeScript, allowed us to define our multi-cloud (GCP for compute, a managed provider for Kafka) and multi-technology stack in a single, cohesive codebase. This was critical for maintainability and reproducibility.

The initial implementation felt promising. Using Pulumi, we provisioned the entire stack: Kafka topics, GCF deployments with appropriate triggers, and the necessary IAM roles.

The Foundational Infrastructure with Pulumi

Our Pulumi project started by defining the core resources. We needed a Kafka topic, a service account for our Cloud Function, and the function itself. The beauty of Pulumi is expressing these relationships directly in a language we already use.

// pulumi/index.ts
import * as gcp from "@pulumi/gcp";
import * as pulumi from "@pulumi/pulumi";
import * as aiven from "@pulumi/aiven"; // Assuming Aiven for managed Kafka

// Configuration for our environment
const config = new pulumi.Config();
const gcpProject = config.require("gcpProject");
const gcpRegion = config.require("gcpRegion");
const aivenProject = config.require("aivenProject");
const kafkaServiceName = config.require("kafkaServiceName");

// 1. Provision the Kafka Topic for raw telemetry events
const telemetryTopic = new aiven.KafkaTopic("telemetry-events", {
    project: aivenProject,
    serviceName: kafkaServiceName,
    topicName: "ios-raw-telemetry-v1",
    partitions: 64, // Start with a healthy number of partitions for parallelism
    replication: 3,
    config: {
        "retention_ms": "604800000", // Retain data for 7 days
        "min_insync_replicas": "2",
    }
});

// 2. Create a dedicated Service Account for the GCF consumer
const consumerServiceAccount = new gcp.serviceaccount.Account("consumer-sa", {
    accountId: "kafka-scylla-consumer-sa",
    displayName: "Service Account for Telemetry Consumer",
});

// 3. Define the Google Cloud Function
const functionArchive = new pulumi.asset.AssetArchive({
    ".": new pulumi.asset.FileArchive("./function"), // Path to function source code
});

// Environment variables passed to the function, sourced from our infra
const functionEnvVars = {
    KAFKA_BROKERS: kafkaServiceName, // This would be the actual broker list from Aiven output
    KAFKA_TOPIC: telemetryTopic.topicName,
    KAFKA_CLIENT_ID: "gcf-consumer-telemetry",
    SCYLLA_CONTACT_POINTS: "scylla-node1.example.com,scylla-node2.example.com", // Example ScyllaDB hosts
    SCYLLA_DATACENTER: "gcp-us-central1",
    SCYLLA_KEYSPACE: "telemetry_data",
    // We will add more config here later...
};

const consumerFunction = new gcp.cloudfunctions.Function("kafka-scylla-consumer", {
    project: gcpProject,
    region: gcpRegion,
    runtime: "nodejs18",
    sourceArchiveBucket: new gcp.storage.Bucket("source-bucket").name,
    sourceArchiveObject: new gcp.storage.ZipballArchive(functionArchive).name,
    entryPoint: "consumeTelemetryEvents",
    triggerEvent: "google.pubsub.topic.publish", // NOTE: We use a Pub/Sub trigger for Kafka Connect, or a direct Kafka trigger if available
    serviceAccountEmail: consumerServiceAccount.email,
    environmentVariables: functionEnvVars,
    maxInstances: 1000, // Allow significant scale-out
    minInstances: 0,
});

export const functionName = consumerFunction.name;
export const topicName = telemetryTopic.topicName;

This Pulumi code declaratively sets up our pipeline’s backbone. It creates a durable Kafka topic, a secure service account, and a Cloud Function ready to be deployed. The problem, however, was not in the infrastructure but in the application logic running inside that function.

The Naive Consumer: A Recipe for Disaster

Our first version of the GCF consumer was functionally correct but operationally fragile. It used kafkajs to connect to the topic and the cassandra-driver to write to ScyllaDB.

// function/index.ts (Initial Version - DO NOT USE IN PRODUCTION)
import { Kafka, EachBatchPayload } from 'kafkajs';
import { Client as ScyllaClient, auth } from 'cassandra-driver';

// --- Client Initialization ---
// In a real GCF, this initialization happens outside the handler to be reused across invocations.
const kafka = new Kafka({
    clientId: process.env.KAFKA_CLIENT_ID,
    brokers: process.env.KAFKA_BROKERS!.split(','),
    // SSL/SASL configuration would go here for connecting to a managed Kafka service
});

const scyllaClient = new ScyllaClient({
    contactPoints: process.env.SCYLLA_CONTACT_POINTS!.split(','),
    localDataCenter: process.env.SCYLLA_DATACENTER,
    keyspace: process.env.SCYLLA_KEYSPACE,
    // Authentication, pooling options, etc.
});

const consumer = kafka.consumer({ groupId: 'telemetry-consumer-group' });

// --- Main Logic ---
const runConsumer = async () => {
    await consumer.connect();
    await consumer.subscribe({ topic: process.env.KAFKA_TOPIC!, fromBeginning: true });

    await consumer.run({
        eachBatch: async ({ batch, resolveOffset, heartbeat }: EachBatchPayload) => {
            const promises = batch.messages.map(async (message) => {
                if (!message.value) return;

                const event = JSON.parse(message.value.toString());

                // A simple INSERT statement
                const query = 'INSERT INTO user_events (user_id, event_id, event_type, payload, timestamp) VALUES (?, ?, ?, ?, ?)';
                const params = [event.userId, event.eventId, event.eventType, JSON.stringify(event.payload), event.timestamp];

                try {
                    await scyllaClient.execute(query, params, { prepare: true });
                    resolveOffset(message.offset);
                } catch (err) {
                    console.error(`Failed to write message at offset ${message.offset}:`, err);
                    // This is a critical failure point. What do we do here?
                    // Crashing the process is the default kafkajs behavior to force a rebalance.
                    throw err;
                }
            });

            await Promise.all(promises);
            await heartbeat();
        },
    });
};

// Start the consumer when the function cold starts
runConsumer().catch(console.error);

// The actual GCF entry point is just a placeholder to keep the function alive
// This is a common pattern for "push-style" GCF consumers that pull from a source like Kafka.
export const consumeTelemetryEvents = (req: any, res: any) => {
    res.status(204).send();
};

During a load test simulating a traffic spike, the flaw became brutally apparent.

  1. A massive number of messages landed in the Kafka topic.
  2. Google Cloud Functions scaled up aggressively, creating hundreds of new function instances.
  3. Each new instance established its own connection pool to ScyllaDB and started consuming messages as fast as possible.
  4. The ScyllaDB cluster, despite being powerful, was overwhelmed by the sheer number of concurrent connections and write requests from this uncoordinated swarm of consumers.
  5. ScyllaDB’s write latency increased. The cassandra-driver started timing out.
  6. The catch block in our function logged the error and the process crashed.
  7. Kafka’s consumer group protocol triggered a rebalance. The message that caused the timeout was eventually redelivered to another consumer instance, which also failed.
  8. The entire pipeline ground to a halt, stuck processing the same failing batch of messages, while Kafka lag grew uncontrollably.

The root cause was a fundamental design flaw: the consumption rate was completely decoupled from the database’s actual write capacity. GCF scaled based on the input pressure from Kafka, not the output capacity of ScyllaDB. We had built a perfect distributed denial-of-service weapon aimed at our own database.

Implementing Consumer-Side Backpressure

The solution required making the consumer “intelligent.” It needed to monitor the health of the downstream system (ScyllaDB) and throttle its own consumption rate accordingly. This is a classic backpressure problem. In our serverless context, the most effective tool at our disposal was kafkajs‘s consumer flow control: consumer.pause() and consumer.resume().

The strategy was to wrap our ScyllaDB writes in a mechanism that could track in-flight requests and query latencies. If these metrics exceeded a predefined threshold, we would pause the Kafka consumer for the affected partitions until the database recovered.

Here is the revised, production-ready GCF code incorporating this backpressure logic.

// function/index.ts (Revised with Backpressure Control)
import { Kafka, EachBatchPayload, Consumer } from 'kafkajs';
import { Client as ScyllaClient, types, execution } from 'cassandra-driver';
import { hrtime } from 'process';

// --- Configuration from Environment Variables ---
const KAFKA_BROKERS = process.env.KAFKA_BROKERS!.split(',');
const KAFKA_TOPIC = process.env.KAFKA_TOPIC!;
const KAFKA_CLIENT_ID = process.env.KAFKA_CLIENT_ID!;
const KAFKA_GROUP_ID = "telemetry-consumer-group-v2";

const SCYLLA_CONTACT_POINTS = process.env.SCYLLA_CONTACT_POINTS!.split(',');
const SCYLLA_DATACENTER = process.env.SCYLLA_DATACENTER!;
const SCYLLA_KEYSPACE = process.env.SCYLLA_KEYSPACE!;

// Backpressure tuning parameters
const MAX_INFLIGHT_WRITES = parseInt(process.env.MAX_INFLIGHT_WRITES || "500", 10);
const LATENCY_THRESHOLD_MS = parseInt(process.env.LATENCY_THRESHOLD_MS || "100", 10);
const PAUSE_DURATION_MS = parseInt(process.env.PAUSE_DURATION_MS || "1000", 10);

// --- State Management ---
let inflightWrites = 0;
const pausedPartitions = new Set<number>();

// --- Client Initialization ---
const kafka = new Kafka({ clientId: KAFKA_CLIENT_ID, brokers: KAFKA_BROKERS });
const scyllaClient = new ScyllaClient({
    contactPoints: SCYLLA_CONTACT_POINTS,
    localDataCenter: SCYLLA_DATACENTER,
    keyspace: SCYLLA_KEYSPACE,
    pooling: {
        // Tune pooling to align with GCF's single-threaded nature and our backpressure logic
        coreConnectionsPerHost: {
            [types.distance.local]: 2
        },
        maxRequestsPerConnection: 2048
    },
    queryOptions: {
        consistency: types.consistencies.localQuorum,
        prepare: true
    }
});
const consumer = kafka.consumer({
    groupId: KAFKA_GROUP_ID,
    maxWaitTimeInMs: 5000,
    // This setting is crucial for batching to work effectively
    fetch: {
        minBytes: 1024,
        maxBytes: 10 * 1024 * 1024 // 10MB
    }
});

// --- Backpressure Logic ---
const checkAndApplyBackpressure = (partition: number, consumer: Consumer) => {
    if (inflightWrites >= MAX_INFLIGHT_WRITES && !pausedPartitions.has(partition)) {
        console.warn(`[Partition: ${partition}] High inflight writes (${inflightWrites}). Pausing consumption.`);
        consumer.pause([{ topic: KAFKA_TOPIC, partitions: [partition] }]);
        pausedPartitions.add(partition);

        // Schedule a resume check
        setTimeout(() => {
            if (inflightWrites < MAX_INFLIGHT_WRITES * 0.8) { // Resume when load drops
                console.log(`[Partition: ${partition}] Inflight writes reduced. Resuming consumption.`);
                consumer.resume([{ topic: KAFKA_TOPIC, partitions: [partition] }]);
                pausedPartitions.delete(partition);
            }
        }, PAUSE_DURATION_MS);
    }
};

// --- Main Processing Logic ---
const handleEachBatch = async ({ batch, resolveOffset, heartbeat, consumer }: EachBatchPayload) => {
    const { partition } = batch;

    // First, check if we should even process this batch or just wait.
    if (pausedPartitions.has(partition)) {
        console.log(`[Partition: ${partition}] Currently paused, skipping batch processing.`);
        await new Promise(resolve => setTimeout(resolve, PAUSE_DURATION_MS));
        return;
    }

    const promises = batch.messages.map(async (message) => {
        if (!message.value) {
            resolveOffset(message.offset);
            return;
        }

        // Apply backpressure before starting the write
        checkAndApplyBackpressure(partition, consumer);
        inflightWrites++;

        const startTime = hrtime.bigint();

        try {
            const event = JSON.parse(message.value.toString());
            const query = 'INSERT INTO user_events (user_id, event_id, event_type, payload, timestamp) VALUES (?, ?, ?, ?, ?)';
            const params = [event.userId, event.eventId, event.eventType, JSON.stringify(event.payload), event.timestamp];
            
            await scyllaClient.execute(query, params);
            
            const endTime = hrtime.bigint();
            const latencyMs = Number(endTime - startTime) / 1_000_000;

            // Check latency for additional backpressure signals
            if (latencyMs > LATENCY_THRESHOLD_MS && !pausedPartitions.has(partition)) {
                 console.warn(`[Partition: ${partition}] High ScyllaDB latency (${latencyMs.toFixed(2)}ms). Pausing consumption.`);
                 checkAndApplyBackpressure(partition, consumer);
            }

            resolveOffset(message.offset);
        } catch (error) {
            console.error(`[Partition: ${partition}] Failed to process message at offset ${message.offset}. CRITICAL ERROR.`, error);
            // In a real-world scenario, you might send this to a DLQ before crashing.
            // For now, crashing is the safest way to force a rebalance and prevent data loss.
            throw error;
        } finally {
            inflightWrites--;
        }
    });

    await Promise.all(promises);
    await heartbeat();
};

const run = async () => {
    await scyllaClient.connect();
    console.log("ScyllaDB client connected.");
    await consumer.connect();
    console.log("Kafka consumer connected.");
    await consumer.subscribe({ topic: KAFKA_TOPIC, fromBeginning: false });

    await consumer.run({
        eachBatchAutoResolve: false, // We must manually resolve offsets
        eachBatch: handleEachBatch,
    });
};

run().catch(err => {
    console.error("Consumer encountered a fatal error:", err);
    process.exit(1);
});

export const consumeTelemetryEvents = (req: any, res: any) => {
    res.status(204).send();
};

This new implementation introduces a few critical concepts:

  1. In-flight Request Tracking: A simple counter, inflightWrites, tracks how many write operations are currently pending against ScyllaDB from this specific function instance.
  2. Latency Monitoring: We measure the duration of each scyllaClient.execute call.
  3. Conditional Pausing: If either the number of in-flight requests or the measured write latency exceeds a configured threshold, the function calls consumer.pause() for the specific Kafka partition it is processing. This immediately stops kafkajs from fetching new messages for that partition.
  4. Conditional Resuming: A setTimeout periodically checks if the pressure has subsided (e.g., in-flight requests drop below 80% of the maximum). If so, it calls consumer.resume() to begin processing messages again.
  5. Configuration via IaC: The thresholds (MAX_INFLIGHT_WRITES, LATENCY_THRESHOLD_MS) are no longer hardcoded. They are passed in as environment variables, which we can now manage and tune directly from our Pulumi code.
// pulumi/index.ts (Updated with backpressure config)

// ... existing code ...

const functionEnvVars = {
    KAFKA_BROKERS: kafkaServiceName,
    KAFKA_TOPIC: telemetryTopic.topicName,
    KAFKA_CLIENT_ID: "gcf-consumer-telemetry",
    SCYLLA_CONTACT_POINTS: "scylla-node1.example.com,scylla-node2.example.com",
    SCYLLA_DATACENTER: "gcp-us-central1",
    SCYLLA_KEYSPACE: "telemetry_data",
    
    // Injecting our tuning parameters
    MAX_INFLIGHT_WRITES: "500",
    LATENCY_THRESHOLD_MS: "100",
    PAUSE_DURATION_MS: "1000",
};

// ... function definition using these env vars ...

The System Under Load

With the new backpressure logic deployed, the results of the same load test were dramatically different.

  • When the event spike hit Kafka, GCF scaled up as before.
  • As the functions began writing to ScyllaDB, the inflightWrites counter and query latencies quickly climbed.
  • Once the MAX_INFLIGHT_WRITES threshold was breached, function instances began pausing their consumption from their assigned Kafka partitions.
  • The system reached a state of equilibrium. The GCF instances collectively throttled themselves to a rate that ScyllaDB could comfortably handle.
  • On our monitoring dashboards, we observed Kafka consumer lag increasing, which was now an expected and healthy sign that the buffer was doing its job.
  • As the initial wave of events subsided, the ScyllaDB load decreased, latencies dropped, and the GCF instances automatically resumed consumption, draining the lag from the Kafka topic until it returned to near-zero.
    We had successfully created a self-regulating, resilient pipeline. The serverless functions acted as an elastic valve, modulating the flow of data based on the real-time capacity of the downstream database.

This architecture is not without its limitations. The backpressure mechanism is local to each GCF instance and relies on instance-level metrics. A more sophisticated implementation might use a centralized system (like Redis or a monitoring service) to make global throttling decisions based on cluster-wide ScyllaDB health, providing smoother and more coordinated control. Furthermore, the pause/resume thresholds are currently static, defined in our Pulumi configuration; a future enhancement could involve a dynamic adjustment system that learns optimal thresholds based on historical performance. The trade-off for this resilience is increased ingestion latency during periods of high load, making this pattern best suited for analytics and telemetry workloads where near-real-time, not instantaneous, processing is acceptable.


  TOC