Building a Resilient Distributed Job Executor Using CockroachDB CDC and Direct containerd Integration


Our initial system for running nightly data processing jobs was held together with cron and bash scripts. It was a classic single-point-of-failure architecture that caused silent failures and frequent manual interventions. When the host machine running a critical job died, the job simply vanished. We needed a system where the state of every job was durable, consistently managed, and whose entire lifecycle was auditable. The goal was a custom, lightweight orchestrator without the operational overhead of a full-blown Kubernetes cluster, which for our bare-metal environment, was overkill.

The core pain point was state management. We needed a database that could survive node failures without manual failover. This immediately ruled out traditional single-leader databases. We selected CockroachDB because its survivability is a core design principle, built directly on the Raft consensus protocol. It provides serializable isolation, ensuring our scheduler’s view of the world is always consistent, even during network partitions or node outages.

For execution, we decided to interface directly with containerd on our worker nodes. This gives us precise control over the container lifecycle without the layers of abstraction present in Docker or Kubernetes. The control plane—an API for defining and monitoring jobs—was a natural fit for Django, given its robust ORM and our team’s familiarity with it.

The final piece of the puzzle was auditing. We couldn’t afford to store an infinite history of job state transitions in our primary operational database. This is where CockroachDB’s Change Data Capture (CDC) became the lynchpin of the design. We could stream every single row change from our jobs table to a webhook, process it, and archive it in a data lake. This provides a completely decoupled, immutable log of the entire system’s operation for analytics and post-mortem analysis.

graph TD
    subgraph Control Plane
        A[User/API Client] -- HTTP API --> B(Django Application)
        B -- SQL (pgwire) --> C{CockroachDB Cluster}
    end

    subgraph Scheduler & Workers
        D(Scheduler Process) -- Reads PENDING jobs --> C
        D -- gRPC/HTTP --> E[Worker Node Agent]
        E -- containerd API --> F(containerd daemon)
        F -- Manages --> G[Job Container]
    end

    subgraph Audit Trail via CDC
        C -- Core-based CDC --> H(CDC Webhook Endpoint)
        H -- JSON Payload --> I(CDC Ingestion Service)
        I -- Writes Parquet --> J[(Data Lake - S3/MinIO)]
    end

    style C fill:#4CAF50,color:#fff
    style J fill:#FF9800,color:#fff

Part 1: The Core State Model in CockroachDB via Django

The foundation of the system is the state model. In a real-world project, keeping this model simple and robust is critical. We defined two primary models: Job representing the template for a task, and JobExecution representing a specific run of that job.

The Job model stores the definition: the image to run, the command, and scheduling information. The JobExecution is the state machine. Its status field is the single source of truth for the entire system.

Here are the Django models. Notice the use of models.TextChoices for status, which is a clean way to manage enumerated states.

orchestrator/jobs/models.py:

import uuid
from django.db import models
from django.utils import timezone

class Job(models.Model):
    """
    Represents a job template that can be scheduled for execution.
    """
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    name = models.CharField(max_length=255, unique=True, db_index=True)
    container_image = models.CharField(max_length=512)
    command = models.JSONField(
        help_text="Command to run in the container, e.g., ['python', 'main.py']"
    )
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    def __str__(self):
        return self.name

class JobExecution(models.Model):
    """
    Represents a single run of a Job. This is our state machine.
    """
    class Status(models.TextChoices):
        PENDING = 'PENDING', 'Pending'
        SCHEDULED = 'SCHEDULED', 'Scheduled'
        RUNNING = 'RUNNING', 'Running'
        COMPLETED = 'COMPLETED', 'Completed'
        FAILED = 'FAILED', 'Failed'

    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    job = models.ForeignKey(Job, on_delete=models.CASCADE, related_name='executions')
    status = models.CharField(
        max_length=20,
        choices=Status.choices,
        default=Status.PENDING,
        db_index=True
    )
    
    # Execution details
    worker_node_id = models.CharField(max_length=255, null=True, blank=True, db_index=True)
    container_id = models.CharField(max_length=255, null=True, blank=True)
    
    # Timestamps
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    scheduled_at = models.DateTimeField(null=True, blank=True)
    started_at = models.DateTimeField(null=True, blank=True)
    finished_at = models.DateTimeField(null=True, blank=True)
    
    # Store exit code and logs for diagnostics
    exit_code = models.IntegerField(null=True, blank=True)
    logs_tail = models.TextField(null=True, blank=True)

    def __str__(self):
        return f"{self.job.name} - {self.id} [{self.status}]"

    class Meta:
        ordering = ['-created_at']

Connecting Django to CockroachDB is straightforward, as CockroachDB is compatible with the PostgreSQL wire protocol. A common mistake here is to forget the OPTIONS for retries. CockroachDB uses optimistic locking and may return retryable transaction errors (40001 SQLSTATE), which the official psycopg2 driver does not handle automatically. The cockroachdb python package is a thin wrapper that adds this retry logic.

orchestrator/settings.py:

# ... other settings

# Ensure you have `cockroachdb` installed: pip install cockroachdb
DATABASES = {
    'default': {
        'ENGINE': 'cockroachdb.django',
        'NAME': 'orchestrator',
        'USER': 'your_user',
        'PASSWORD': 'your_password',
        'HOST': 'localhost',  # Or your CRDB load balancer
        'PORT': '26257',
        'OPTIONS': {
            'sslmode': 'verify-full',
            'sslrootcert': '/path/to/ca.crt',
            # This is critical for production resilience.
            # It enables the driver to automatically retry transactions
            # that fail with a 40001 serialization error.
            'application_name': 'django_orchestrator'
        },
    }
}

# ... rest of settings

Part 2: The Worker Agent with containerd

Each worker node runs a small agent. This agent exposes a simple API that the central scheduler calls to manage containers. We chose to build this with FastAPI for its speed and simplicity, and used the official containerd client library for Python. The pitfall here is underestimating the complexity of the containerd API. It’s powerful but low-level.

This agent needs to handle image pulling, container creation, starting, stopping, and log streaming. Production-grade code must include robust error handling for each of these steps.

worker/agent.py:

import asyncio
import logging
from contextlib import asynccontextmanager
from containerd.services.containers.v1 import containers_pb2
from containerd.services.content.v1 import content_pb2
from containerd.services.images.v1 import images_pb2
from containerd.services.tasks.v1 import tasks_pb2
from containerd.shim.v1 import shim_pb2
from containerd.types.v1 import oci_pb2
from grpclib.client import Channel
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import json

# --- Configuration ---
CONTAINERD_SOCK = "/run/containerd/containerd.sock"
NAMESPACE = "default"

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- gRPC Clients ---
# In a real app, you would manage the lifecycle of these channels better.
channel = Channel(path=CONTAINERD_SOCK)
containers_client = containers_pb2.ContainersStub(channel)
images_client = images_pb2.ImagesStub(channel)
content_client = content_pb2.ContentStub(channel)
tasks_client = tasks_pb2.TasksStub(channel)

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("Worker agent starting up...")
    yield
    logger.info("Worker agent shutting down...")
    channel.close()

app = FastAPI(lifespan=lifespan)

class ContainerSpec(BaseModel):
    execution_id: str
    image: str
    command: list[str]

async def pull_image_if_not_exists(image_name: str):
    """Pulls a container image, handling potential failures."""
    try:
        req = images_pb2.GetImageRequest(name=image_name)
        await images_client.get(req)
        logger.info(f"Image '{image_name}' already exists locally.")
        return
    except Exception:
        logger.info(f"Image '{image_name}' not found. Pulling...")
        try:
            pull_req = images_pb2.PullRequest(image=image_name, platform="linux/amd64")
            # This is a streaming response, but for simplicity, we'll just wait for it to finish.
            # A production implementation would handle the stream to show progress.
            async for _ in images_client.pull(pull_req):
                pass
            logger.info(f"Successfully pulled image '{image_name}'.")
        except Exception as e:
            logger.error(f"Failed to pull image '{image_name}': {e}")
            raise HTTPException(status_code=500, detail=f"Image pull failed: {e}")

@app.post("/containers/create_and_run")
async def create_and_run_container(spec: ContainerSpec):
    """
    The core endpoint to create and start a container for a job execution.
    This function demonstrates the multi-step process in containerd.
    """
    logger.info(f"Received request to run execution {spec.execution_id} with image {spec.image}")
    
    await pull_image_if_not_exists(spec.image)

    # Step 1: Get image metadata to create the OCI spec
    try:
        img_resp = await images_client.get(images_pb2.GetImageRequest(name=spec.image))
    except Exception as e:
        logger.error(f"Could not get image metadata for {spec.image}: {e}")
        raise HTTPException(status_code=404, detail=f"Image metadata not found: {e}")

    # Step 2: Create an OCI runtime spec
    # This is a very minimal spec. Production use would require configuring
    # networking, resource limits, volumes, etc.
    oci_spec = oci_pb2.Spec(
        version="1.0.2",
        process=oci_pb2.Process(
            args=spec.command,
            cwd="/",
        ),
        root=oci_pb2.Root(
            path=f"rootfs-{spec.execution_id}", # This is relative to containerd's state dir
        )
    )
    # The spec needs to be encoded as a Protobuf Any type.
    spec_any = content_pb2.Any()
    spec_any.Pack(oci_spec)

    # Step 3: Create the container object in containerd
    container_id = f"exec-{spec.execution_id}"
    container = containers_pb2.Container(
        id=container_id,
        image=spec.image,
        spec=spec_any,
        runtime=containers_pb2.Container_Runtime(name="io.containerd.runc.v2")
    )

    try:
        create_req = containers_pb2.CreateContainerRequest(container=container)
        await containers_client.create(create_req)
        logger.info(f"Created container '{container_id}' for execution {spec.execution_id}")
    except Exception as e:
        # It might already exist from a previous failed attempt, try to clean up.
        logger.warning(f"Container creation failed, possibly already exists: {e}. Attempting to proceed.")


    # Step 4: Create and start the task (the running process)
    try:
        task_req = tasks_pb2.CreateTaskRequest(container_id=container_id)
        await tasks_client.create(task_req)

        start_req = tasks_pb2.StartRequest(container_id=container_id)
        await tasks_client.start(start_req)
        
        logger.info(f"Started task for container '{container_id}'")
        return {"container_id": container_id, "status": "RUNNING"}
    except Exception as e:
        logger.error(f"Failed to create or start task for container '{container_id}': {e}")
        # Cleanup the container object if task creation fails
        await containers_client.delete(containers_pb2.DeleteContainerRequest(id=container_id))
        raise HTTPException(status_code=500, detail=f"Failed to start task: {e}")

# ... other endpoints for status, logs, stop would be needed ...

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8001)

Part 3: The Scheduler Brain

The scheduler is a continuously running process that polls CockroachDB for PENDING jobs. Its logic is simple but must be robust. We implemented it as a Django management command.

A critical design decision here is how to handle transactions. The scheduler’s loop should be:

  1. BEGIN a transaction.
  2. SELECT a PENDING job, locking the row for update (SELECT ... FOR UPDATE).
  3. UPDATE the job’s status to SCHEDULED.
  4. COMMIT the transaction.
  5. After the commit, make the RPC call to the worker.

This ensures that no two scheduler instances can pick up the same job. If the RPC call fails, another process will handle reconciling this SCHEDULED job that never started.

orchestrator/jobs/management/commands/run_scheduler.py:

import time
import logging
import requests
from django.core.management.base import BaseCommand
from django.db import transaction
from orchestrator.jobs.models import JobExecution

logger = logging.getLogger(__name__)

# In a real system, this would come from a service discovery mechanism.
WORKER_NODES = {
    "worker-01": "http://localhost:8001",
}

class Command(BaseCommand):
    help = 'Runs the main job scheduler loop.'

    def handle(self, *args, **options):
        logger.info("Scheduler process started. Polling for jobs...")
        while True:
            try:
                self.schedule_one_job()
            except Exception as e:
                logger.error(f"Scheduler loop encountered an error: {e}", exc_info=True)
            
            time.sleep(5) # Polling interval

    def select_worker(self):
        # Extremely basic round-robin. A real implementation needs to
        # check worker health, load, capabilities, etc.
        return list(WORKER_NODES.keys())[0], list(WORKER_NODES.values())[0]

    def schedule_one_job(self):
        execution_to_run = None
        
        # This transaction is the heart of preventing double-scheduling.
        with transaction.atomic():
            # Find a pending job and lock it to prevent other schedulers from picking it up.
            # `select_for_update(skip_locked=True)` is crucial for concurrency.
            # It tells the database to skip any rows that are already locked by another transaction.
            job_exec = JobExecution.objects.select_for_update(skip_locked=True).filter(
                status=JobExecution.Status.PENDING
            ).first()

            if not job_exec:
                return # No jobs to schedule

            worker_id, worker_url = self.select_worker()
            
            job_exec.status = JobExecution.Status.SCHEDULED
            job_exec.worker_node_id = worker_id
            job_exec.save(update_fields=['status', 'worker_node_id', 'updated_at'])
            
            execution_to_run = job_exec

        if execution_to_run:
            logger.info(f"Scheduled job execution {execution_to_run.id} on worker {worker_id}")
            self.dispatch_to_worker(execution_to_run, worker_url)

    def dispatch_to_worker(self, execution: JobExecution, worker_url: str):
        job = execution.job
        spec = {
            "execution_id": str(execution.id),
            "image": job.container_image,
            "command": job.command,
        }
        
        try:
            response = requests.post(f"{worker_url}/containers/create_and_run", json=spec, timeout=30)
            response.raise_for_status()
            
            data = response.json()
            container_id = data.get("container_id")
            
            # Update the state in the database upon successful dispatch
            execution.container_id = container_id
            execution.status = JobExecution.Status.RUNNING
            execution.save(update_fields=['container_id', 'status', 'updated_at'])
            logger.info(f"Successfully started container {container_id} for execution {execution.id}")

        except requests.RequestException as e:
            logger.error(f"Failed to dispatch job {execution.id} to worker: {e}")
            # Mark the job as FAILED so it can be investigated or retried.
            execution.status = JobExecution.Status.FAILED
            execution.logs_tail = f"Failed to dispatch to worker: {e}"
            execution.save(update_fields=['status', 'logs_tail', 'updated_at'])

Part 4: The CDC Audit Trail to a Data Lake

This is where the observability aspect shines. We don’t want to query our production CockroachDB for analytics. Instead, we stream every change to our JobExecution table into an S3-compatible data lake.

First, enable rangefeeds on the cluster, a prerequisite for CDC.

-- Connect to CockroachDB with `cockroach sql`
SET CLUSTER SETTING kv.rangefeed.enabled = true;

Next, create the changefeed. We’ll use the webhook sink, which sends JSON-formatted changes to an HTTP endpoint. The updated and resolved options provide metadata about the stream’s progress.

CREATE CHANGEFEED FOR TABLE jobs_jobexecution 
INTO 'webhook-https://your-webhook-receiver.com/cdc?insecure_tls_skip_verify=true'
WITH updated, resolved;

Now, we need a service to receive these webhooks. This service will parse the JSON, format it into a queryable format like Parquet, and write it to object storage. We’ll use MinIO as our self-hosted S3-compatible lake.

cdc_ingestor/main.py:

import logging
import json
from datetime import datetime
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
import pyarrow as pa
import pyarrow.parquet as pq
from s3fs import S3FileSystem

# --- Configuration ---
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS_KEY = "minioadmin"
S3_SECRET_KEY = "minioadmin"
S3_BUCKET = "job-executions-lake"

# --- Logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- S3 Client ---
s3 = S3FileSystem(
    key=S3_ACCESS_KEY,
    secret=S3_SECRET_KEY,
    client_kwargs={'endpoint_url': S3_ENDPOINT}
)

app = FastAPI()

class CdcPayload(BaseModel):
    # This is a simplified model of the actual CockroachDB CDC payload
    payload: list[dict]

# Ensure bucket exists
if not s3.exists(S3_BUCKET):
    s3.mkdir(S3_BUCKET)
    logger.info(f"Created S3 bucket: {S3_BUCKET}")


@app.post("/cdc")
async def handle_cdc_webhook(request: Request):
    """Receives CDC data, transforms it, and writes to the data lake."""
    body = await request.body()
    # CockroachDB sends newline-delimited JSON
    lines = body.decode('utf-8').strip().split('\n')
    
    records_to_write = []
    for line in lines:
        try:
            data = json.loads(line)
            # We are interested in the 'after' state of the row
            if 'after' in data:
                record = data['after']
                
                # Basic type coercion and cleaning for Parquet
                record['updated_at_cdc_ts'] = datetime.utcnow().isoformat()
                for key, val in record.items():
                    if val is None:
                        record[key] = "" # Parquet handles nulls, but empty strings can be simpler
                
                records_to_write.append(record)
        except json.JSONDecodeError:
            logger.warning(f"Could not decode JSON line: {line}")
            continue

    if not records_to_write:
        return {"status": "ok", "records_processed": 0}

    try:
        # Use PyArrow to create a table and write to Parquet
        table = pa.Table.from_pylist(records_to_write)
        
        # Partition the data by date for efficient querying
        now = datetime.utcnow()
        partition_path = f"year={now.year}/month={now.month:02d}/day={now.day:02d}"
        file_path = f"{S3_BUCKET}/{partition_path}/{now.isoformat()}.parquet"
        
        with s3.open(file_path, 'wb') as f:
            pq.write_table(table, f)

        logger.info(f"Wrote {len(records_to_write)} records to {file_path}")
        return {"status": "ok", "records_processed": len(records_to_write)}
    except Exception as e:
        logger.error(f"Failed to write to data lake: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Failed to persist CDC data")

When we now create and run a job, we can watch the state transitions get persisted to MinIO almost instantly. A job might go from PENDING -> SCHEDULED -> RUNNING -> COMPLETED. Each of these updates creates a new row in our Parquet files, giving us a full, time-series history of every execution. We can later query this data with tools like DuckDB, Spark, or Trino to analyze job performance, failure rates, and resource usage without ever touching the production database.

Limitations and Future Iterations

This implementation provides a resilient foundation, but it’s far from complete. The scheduler’s placement logic is non-existent; it simply picks the first available worker. A production system would require workers to heartbeat their status and resource availability back into CockroachDB, allowing the scheduler to make intelligent decisions.

The communication between the scheduler and workers is simple REST over HTTP. This is fine for commands but inefficient for log streaming or more complex interactions. Migrating this to gRPC would be a logical next step for performance and type safety.

Furthermore, the system currently offers no mechanism for resource isolation (CPU, memory limits). The containerd API and OCI spec fully support this, but it requires adding more complexity to the worker agent when creating the container spec.

Finally, while the CDC pipeline is robust, the webhook ingestor itself is a single point of failure. Running multiple instances of it and ensuring idempotent writes to the data lake (e.g., by using the CDC event’s timestamp and primary key to name files) would be necessary to make the audit trail as resilient as the core job execution system.


  TOC