Implementing an Asynchronous LLM Fine-Tuning Workflow with Vercel Functions and a Cilium-Secured Kafka Consumer


The initial requirement seemed straightforward: provide an API endpoint for users to submit datasets for LLM fine-tuning. The immediate temptation was to build a monolithic service. However, a synchronous API was a non-starter; fine-tuning can take hours, far exceeding the 60-second execution limit of Vercel Pro functions, let alone the standard 10 seconds. This immediately pushed us toward an asynchronous, decoupled architecture. The second constraint was security. In a multi-tenant environment, cross-tenant data access during the training process is an existential threat. We needed kernel-level guarantees of isolation, not just application-level checks. This led us down a path combining serverless, event streaming, and an eBPF-powered Kubernetes backend, all stitched together with a unified CI/CD pipeline.

The Architectural Pain Point and Initial Design

Our core problem was managing a workflow with two vastly different operational profiles. The front-end is a lightweight, stateless API endpoint that receives sporadic, bursty traffic. The back-end is a heavy, stateful, long-running compute task that requires GPU resources and strict network isolation.

A naive approach of putting everything into a single Kubernetes cluster would be cost-prohibitive. Running services just to wait for API calls is inefficient. This is where a hybrid model became the obvious choice.

  1. API Ingestion: Vercel Functions. They scale to zero, are dirt cheap for low traffic, and integrate seamlessly with our Next.js frontend. They would serve as a simple webhook receiver, responsible only for validating the request and queueing the job.
  2. Job Queuing: Apache Kafka. We needed durability. If the training cluster went down, we couldn’t lose submitted jobs. While a simpler message queue might have worked, we chose Kafka with an eye toward the future. We envisioned streaming real-time training logs back to the user, and Kafka’s stream processing capabilities made it a more strategic, albeit more complex, choice.
  3. Compute Backend: A dedicated GPU-enabled Kubernetes cluster. This is the only place where it makes sense to run expensive, long-running training pods.
  4. Security Layer: Cilium. Standard Kubernetes NetworkPolicies operate at L3/L4 and are based on IP addresses, which can be clumsy and insufficient. We needed identity-based security. A pod running a job for tenant-A must be cryptographically proven to be tenant-A‘s pod and should only be allowed to talk to Kafka and tenant-A‘s designated S3 bucket. Cilium, with its eBPF and service identity foundation, was designed for precisely this kind of zero-trust environment.
  5. Orchestration: GitHub Actions. We maintain a monorepo for this service. The pipeline must be intelligent enough to deploy the Vercel Function when API code changes and deploy the Kubernetes consumer when backend code changes, all from a single git push.

Here is the high-level data flow we settled on.

graph TD
    subgraph "User Environment"
        A[User via UI/CLI] --> B{POST /api/finetune};
    end

    subgraph "Vercel Edge Network"
        B -- Request --> C[Vercel Function];
    end

    subgraph "Messaging Infrastructure"
        C -- Job Payload --> D[Kafka Topic: 'finetuning-jobs'];
    end

    subgraph "Kubernetes GPU Cluster (Cilium-Secured)"
        E[Kafka Consumer Pod] -- Polls --> D;
        E -- Fetches Dataset --> F[External S3 Bucket];
        E -- Starts Training --> G[LLM Fine-Tuning Script];
        G -- Writes Model --> F;
    end

    style C fill:#000,stroke:#fff,stroke-width:2px
    style E fill:#467EEA,stroke:#fff,stroke-width:2px

Phase 1: The Serverless Ingress and Kafka Producer

The Vercel Function is the public entry point. Its only jobs are to validate the incoming payload and produce a message to Kafka. In a serverless context, managing persistent connections is a known anti-pattern. We opted to use the Upstash Kafka client, which is designed for this environment as it works over an HTTP/S REST API, avoiding TCP connection management issues.

A common mistake here is to put too much logic in the function. It should be lean. All it does is act as a secure gateway to our internal message bus.

Here is the implementation for pages/api/finetune.ts:

// pages/api/finetune.ts
import { NextApiRequest, NextApiResponse } from 'next';
import { Kafka } from '@upstash/kafka';
import { z } from 'zod';

// Zod schema for strict payload validation.
// A real-world project requires robust validation at the edge.
const FineTuneRequestSchema = z.object({
  tenantId: z.string().regex(/^[a-zA-Z0-9_-]{5,50}$/),
  modelId: z.string().min(1),
  datasetUrl: z.string().url(),
  callbackUrl: z.string().url().optional(),
});

type FineTuneRequest = z.infer<typeof FineTuneRequestSchema>;

// Initialize Kafka client using environment variables.
// In Vercel, these are configured in the project settings.
const kafka = new Kafka({
  url: process.env.UPSTASH_KAFKA_REST_URL!,
  username: process.env.UPSTASH_KAFKA_REST_USERNAME!,
  password: process.env.UPSTASH_KAFKA_REST_PASSWORD!,
});

const producer = kafka.producer();
const KAFKA_TOPIC = 'finetuning-jobs';

export default async function handler(
  req: NextApiRequest,
  res: NextApiResponse
) {
  if (req.method !== 'POST') {
    res.setHeader('Allow', ['POST']);
    return res.status(405).end(`Method ${req.method} Not Allowed`);
  }

  // 1. Validate Input
  const validationResult = FineTuneRequestSchema.safeParse(req.body);
  if (!validationResult.success) {
    return res.status(400).json({
      message: 'Invalid request payload.',
      errors: validationResult.error.flatten().fieldErrors,
    });
  }

  const payload: FineTuneRequest = validationResult.data;
  const jobId = `job-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`;

  // 2. Produce to Kafka
  try {
    await producer.produce(KAFKA_TOPIC, {
        jobId: jobId,
        timestamp: new Date().toISOString(),
        ...payload,
      },
      {
        // Use tenantId as the key for partitioning.
        // This ensures all jobs for a given tenant go to the same partition,
        // which can be useful for ordered processing if needed later.
        key: payload.tenantId,
      }
    );
  } catch (error) {
    console.error('Failed to produce message to Kafka', {
      error,
      payload,
    });
    // A production system would have more robust error handling,
    // potentially with a retry mechanism or a dead-letter queue.
    return res.status(503).json({
      message: 'Service unavailable: Could not queue job.',
    });
  }

  // 3. Acknowledge Receipt
  return res.status(202).json({
    message: 'Fine-tuning job accepted.',
    jobId: jobId,
  });
}

The key takeaways here are:

  • Input Validation is Security: Using zod prevents malformed data from ever entering our system.
  • Stateless Connection: @upstash/kafka is crucial for performance and reliability in a serverless function. A traditional TCP-based client would suffer from cold starts.
  • Meaningful Response: We return a 202 Accepted status code, which correctly signals that the request has been accepted for processing, but the processing is not yet complete. A jobId is returned for future status tracking.

Phase 2: The Kubernetes Kafka Consumer

This is the workhorse of the system. It’s a long-running process inside a Kubernetes cluster that polls Kafka for new messages. We chose Go for its performance, low memory footprint, and excellent concurrency primitives, which are ideal for a background worker.

The consumer’s logic is:

  1. Connect to Kafka and subscribe to the finetuning-jobs topic as part of a consumer group.
  2. In an infinite loop, poll for messages.
  3. When a message is received, deserialize and validate it.
  4. Execute the fine-tuning script (a Python script in our case, shelled out from the Go program).
  5. Commit the Kafka offset only after the job is successfully launched.

Here is the core logic of the Go consumer, main.go:

// cmd/consumer/main.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/exec"
	"strings"
	"time"

	"github.com/segmentio/kafka-go"
)

const (
	kafkaTopic     = "finetuning-jobs"
	consumerGroupID = "finetuning-workers"
	logPrefix      = "[KAFKA-CONSUMER] "
)

type JobPayload struct {
	JobID       string `json:"jobId"`
	TenantID    string `json:"tenantId"`
	ModelID     string `json:"modelId"`
	DatasetURL  string `json:"datasetUrl"`
	CallbackURL string `json:"callbackUrl,omitempty"`
}

func main() {
	brokers := os.Getenv("KAFKA_BROKERS")
	if brokers == "" {
		log.Fatal("KAFKA_BROKERS environment variable must be set")
	}

	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:        strings.Split(brokers, ","),
		GroupID:        consumerGroupID,
		Topic:          kafkaTopic,
		MinBytes:       10e3, // 10KB
		MaxBytes:       10e6, // 10MB
		CommitInterval: time.Second, // Commit offsets automatically every second.
		// For more control, one could disable auto-commit and commit manually.
	})
	defer reader.Close()

	log.Printf(logPrefix+"Listening for jobs on topic: %s\n", kafkaTopic)

	for {
		// The FetchMessage call blocks until a message is available.
		msg, err := reader.FetchMessage(context.Background())
		if err != nil {
			log.Printf(logPrefix+"Error fetching message: %v\n", err)
			continue // In a real system, better error handling is needed.
		}

		log.Printf(logPrefix+"Received message for key '%s' at offset %d\n", string(msg.Key), msg.Offset)

		var payload JobPayload
		if err := json.Unmarshal(msg.Value, &payload); err != nil {
			log.Printf(logPrefix+"Error unmarshalling JSON: %v. Skipping message.\n", err)
			// Commit message to avoid reprocessing a poison pill.
			if err := reader.CommitMessages(context.Background(), msg); err != nil {
				log.Printf(logPrefix+"Failed to commit poison pill message: %v\n", err)
			}
			continue
		}

		// Process the job. In a real-world scenario, this would be more complex,
		// likely involving a state machine, database updates, and robust error handling.
		err = processFineTuningJob(payload)
		if err != nil {
			log.Printf(logPrefix+"Failed to process job %s for tenant %s: %v\n", payload.JobID, payload.TenantID, err)
			// Do NOT commit the message. Kafka will redeliver it after a timeout.
			// This is a basic at-least-once delivery guarantee. A dead-letter queue is a must for production.
			continue
		}

		log.Printf(logPrefix+"Successfully processed job %s. Committing offset.\n", payload.JobID)
		if err := reader.CommitMessages(context.Background(), msg); err != nil {
			log.Printf(logPrefix+"Failed to commit message for job %s: %v\n", payload.JobID, err)
		}
	}
}

// processFineTuningJob shells out to a Python script to perform the actual work.
// This decouples the Go orchestrator from the Python ML code.
func processFineTuningJob(p JobPayload) error {
	log.Printf(logPrefix+"Starting job %s for tenant %s\n", p.JobID, p.TenantID)

	// In a production system, these parameters must be sanitized to prevent command injection.
	cmd := exec.Command("python", "/app/scripts/run_training.py",
		"--job-id", p.JobID,
		"--tenant-id", p.TenantID,
		"--model-id", p.ModelID,
		"--dataset-url", p.DatasetURL,
	)

	// Set environment variables for the script, such as credentials.
	// These would be mounted from Kubernetes secrets.
	cmd.Env = append(os.Environ(),
		fmt.Sprintf("S3_BUCKET_NAME=tenant-%s-models", p.TenantID),
	)

	cmd.Stdout = os.Stdout
	cmd.Stderr = os.Stderr

	err := cmd.Run()
	if err != nil {
		return fmt.Errorf("training script failed for job %s: %w", p.JobID, err)
	}

	log.Printf(logPrefix+"Training script completed for job %s\n", p.JobID)
	return nil
}

The associated Kubernetes Deployment manifest would look something like this:

# k8s/consumer-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-finetune-consumer
  namespace: ml-workers
  labels:
    app: llm-finetune-consumer
spec:
  replicas: 3 # Start with a few replicas for parallel processing
  selector:
    matchLabels:
      app: llm-finetune-consumer
  template:
    metadata:
      labels:
        app: llm-finetune-consumer
        # This label is critical for Cilium policy enforcement
        # It establishes the pod's identity.
        identity: llm-worker
    spec:
      containers:
      - name: consumer
        image: ghcr.io/your-org/llm-consumer:latest
        env:
        - name: KAFKA_BROKERS
          valueFrom:
            secretKeyRef:
              name: kafka-credentials
              key: brokers
        # Mount other secrets for S3, etc.
        # ...
        resources:
          limits:
            cpu: "1"
            memory: "2Gi"
          requests:
            cpu: "500m"
            memory: "1Gi"
      # In a real setup, you would add node selectors/tolerations
      # to schedule these pods on GPU-enabled nodes.
      # nodeSelector:
      #   nvidia.com/gpu: "true"

Phase 3: Kernel-Level Security with Cilium

This is the most critical part of the architecture for a multi-tenant system. We need to enforce a strict “default deny” policy in our ml-workers namespace and then selectively allow only the necessary traffic for the consumer pods.

Our security goals are:

  1. Pods cannot talk to each other within the namespace.
  2. Pods can ONLY initiate outbound connections to our Kafka brokers and to Amazon S3.
  3. Pods can ONLY receive inbound connections from the Kafka brokers.
  4. Crucially, the S3 access must be restricted. A pod processing a job for tenant-A must never be able to even resolve the DNS for or route packets to a bucket belonging to tenant-B.

Cilium’s CiliumNetworkPolicy CRD makes this possible. Here’s our policy set.

First, a default deny policy for the entire namespace:

# k8s/policies/00-default-deny.yaml
apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
  name: "default-deny-all"
  namespace: ml-workers
spec:
  endpointSelector: {} # An empty selector matches all pods in the namespace
  ingress: [] # Empty list means deny all ingress
  egress: []  # Empty list means deny all egress

Now, the specific policy for our workers. The pitfall here is dealing with cloud service endpoints like S3, which use vast, changing IP ranges. IP-based egress rules are a non-starter. Cilium’s DNS-based policy is the correct solution. It allows egress based on DNS query patterns, which Cilium’s eBPF programs can intercept and enforce at the kernel level.

# k8s/policies/01-worker-policy.yaml
apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
  name: "allow-llm-worker-traffic"
  namespace: ml-workers
spec:
  description: "Allow LLM workers to talk to Kafka and S3 only"
  endpointSelector:
    matchLabels:
      identity: llm-worker # This policy applies only to our consumer pods

  # Ingress: Allow traffic FROM Kafka
  ingress:
  - fromEndpoints:
    # This assumes Kafka brokers are in a different namespace ('kafka')
    # and have the label 'app: kafka'. This is identity-based, not IP-based.
    - matchLabels:
        "k8s:io.kubernetes.pod.namespace": kafka
        "k8s:app": kafka
    toPorts:
    - ports:
      - port: "9092"
        protocol: TCP

  # Egress: Allow traffic TO Kafka and S3
  egress:
  # 1. Allow egress to Kafka brokers
  - toEndpoints:
    - matchLabels:
        "k8s:io.kubernetes.pod.namespace": kafka
        "k8s:app": kafka
    toPorts:
    - ports:
      - port: "9092"
        protocol: TCP

  # 2. Allow egress to AWS S3 via DNS name matching.
  # This is the key for secure cloud service access.
  - toFQDNs:
    - matchPattern: "*.s3.amazonaws.com"
    toPorts:
    - ports:
      - port: "443"
        protocol: TCP

  # 3. Allow egress for DNS queries themselves.
  # The 'toEndpoints' selector targets the CoreDNS/kube-dns pods.
  - toEndpoints:
    - matchLabels:
        "k8s:io.kubernetes.pod.namespace": kube-system
        "k8s:k8s-app": kube-dns
    toPorts:
    - ports:
      - port: "53"
        protocol: ANY
      rules:
        dns:
        - matchPattern: "*" # Allow any DNS query

With these policies applied, a compromised container running a tenant-A job attempting to curl tenant-b-models.s3.amazonaws.com would fail at the DNS resolution or TCP connection phase, enforced by eBPF in the kernel, long before it ever reaches the application layer. This provides a much stronger security guarantee than application-level logic alone.

Phase 4: The Unified CI/CD Pipeline with GitHub Actions

The final piece is automating the deployment of this hybrid system. We need a single workflow that can handle both the serverless and Kubernetes components.

The strategy:

  • Use path-based filtering to trigger jobs. Changes in /pages/api trigger the Vercel deploy. Changes in /cmd/consumer or /k8s trigger the Kubernetes deploy.
  • Use GitHub Environments for secrets management (e.g., VERCEL_TOKEN for production, KUBE_CONFIG for staging).
  • Build and push the consumer Docker image to GitHub Container Registry (GHCR).

Here’s a condensed version of the .github/workflows/deploy.yml:

# .github/workflows/deploy.yml
name: Deploy Service

on:
  push:
    branches:
      - main
  workflow_dispatch:

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}/llm-consumer

jobs:
  deploy-vercel:
    name: Deploy Vercel Function
    runs-on: ubuntu-latest
    # Only run if files in the api directory change
    if: "contains(join(github.event.commits.*.files, ' '), 'pages/api/')"
    steps:
      - uses: actions/checkout@v3

      - name: Setup Node.js
        uses: actions/setup-node@v3
        with:
          node-version: 18

      - name: Install Vercel CLI
        run: npm install --global vercel@latest

      - name: Deploy to Vercel
        run: vercel --prod --token ${{ secrets.VERCEL_TOKEN }} --yes

  build-and-deploy-k8s:
    name: Build and Deploy K8s Consumer
    runs-on: ubuntu-latest
    # Only run if backend code or k8s manifests change
    if: "contains(join(github.event.commits.*.files, ' '), 'cmd/') || contains(join(github.event.commits.*.files, ' '), 'k8s/')"
    permissions:
      contents: read
      packages: write # Needed to push to GHCR
    
    steps:
      - uses: actions/checkout@v3

      - name: Log in to GitHub Container Registry
        uses: docker/login-action@v2
        with:
          registry: ${{ env.REGISTRY }}
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Extract metadata (tags, labels) for Docker
        id: meta
        uses: docker/metadata-action@v4
        with:
          images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}

      - name: Build and push Docker image
        uses: docker/build-push-action@v4
        with:
          context: .
          file: ./Dockerfile.consumer # Assuming a dedicated Dockerfile
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

      - name: Setup Kubeconfig
        uses: azure/k8s-set-context@v3
        with:
          method: kubeconfig
          kubeconfig: ${{ secrets.KUBECONFIG_PROD }}

      - name: Deploy to Kubernetes
        run: |
          # A real-world deployment would use Kustomize or Helm
          # to manage environment-specific configurations.
          # This is a simplified example.
          IMAGE_TAG=${{ steps.meta.outputs.version }}
          sed -i "s|image:.*|image: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${IMAGE_TAG}|g" k8s/consumer-deployment.yaml
          
          kubectl apply -f k8s/policies/
          kubectl apply -f k8s/consumer-deployment.yaml
          
          # Trigger a rolling update
          kubectl rollout restart deployment/llm-finetune-consumer -n ml-workers

Current Limitations and Future Outlook

This architecture, while robust, is not without its shortcomings. The current model of one-pod-per-consumer-instance is inefficient for managing distinct jobs. If three jobs arrive simultaneously, our three replica pods will pick them up. If ten jobs arrive, seven will wait in Kafka. A more sophisticated approach would be to have a single “dispatcher” consumer that, upon receiving a message, creates a Kubernetes Job resource for each fine-tuning task. This would provide better resource utilization and job lifecycle management.

Furthermore, our observability is fragmented. We have Vercel logs, Kafka metrics, and Kubernetes pod logs, but no unified trace that follows a single jobId from the initial API call to the final model artifact being written to S3. The next logical iteration is to implement OpenTelemetry across the entire stack—instrumenting the Vercel Function, propagating trace contexts through Kafka message headers, and continuing the trace in the Go consumer. This would provide invaluable insight into performance bottlenecks and error sources.

Finally, the Kafka consumer’s error handling—simply failing and relying on redelivery—is primitive. For jobs that consistently fail (“poison pills”), this would create an endless loop. A dead-letter queue (DLQ) strategy is essential for production, where failed messages are shunted to a separate topic for manual inspection, preventing them from blocking the main processing pipeline.


  TOC