Our initial system for running nightly data processing jobs was held together with cron and bash scripts. It was a classic single-point-of-failure architecture that caused silent failures and frequent manual interventions. When the host machine running a critical job died, the job simply vanished. We needed a system where the state of every job was durable, consistently managed, and whose entire lifecycle was auditable. The goal was a custom, lightweight orchestrator without the operational overhead of a full-blown Kubernetes cluster, which for our bare-metal environment, was overkill.
The core pain point was state management. We needed a database that could survive node failures without manual failover. This immediately ruled out traditional single-leader databases. We selected CockroachDB because its survivability is a core design principle, built directly on the Raft consensus protocol. It provides serializable isolation, ensuring our scheduler’s view of the world is always consistent, even during network partitions or node outages.
For execution, we decided to interface directly with containerd
on our worker nodes. This gives us precise control over the container lifecycle without the layers of abstraction present in Docker or Kubernetes. The control plane—an API for defining and monitoring jobs—was a natural fit for Django, given its robust ORM and our team’s familiarity with it.
The final piece of the puzzle was auditing. We couldn’t afford to store an infinite history of job state transitions in our primary operational database. This is where CockroachDB’s Change Data Capture (CDC) became the lynchpin of the design. We could stream every single row change from our jobs
table to a webhook, process it, and archive it in a data lake. This provides a completely decoupled, immutable log of the entire system’s operation for analytics and post-mortem analysis.
graph TD subgraph Control Plane A[User/API Client] -- HTTP API --> B(Django Application) B -- SQL (pgwire) --> C{CockroachDB Cluster} end subgraph Scheduler & Workers D(Scheduler Process) -- Reads PENDING jobs --> C D -- gRPC/HTTP --> E[Worker Node Agent] E -- containerd API --> F(containerd daemon) F -- Manages --> G[Job Container] end subgraph Audit Trail via CDC C -- Core-based CDC --> H(CDC Webhook Endpoint) H -- JSON Payload --> I(CDC Ingestion Service) I -- Writes Parquet --> J[(Data Lake - S3/MinIO)] end style C fill:#4CAF50,color:#fff style J fill:#FF9800,color:#fff
Part 1: The Core State Model in CockroachDB via Django
The foundation of the system is the state model. In a real-world project, keeping this model simple and robust is critical. We defined two primary models: Job
representing the template for a task, and JobExecution
representing a specific run of that job.
The Job
model stores the definition: the image to run, the command, and scheduling information. The JobExecution
is the state machine. Its status
field is the single source of truth for the entire system.
Here are the Django models. Notice the use of models.TextChoices
for status, which is a clean way to manage enumerated states.
orchestrator/jobs/models.py
:
import uuid
from django.db import models
from django.utils import timezone
class Job(models.Model):
"""
Represents a job template that can be scheduled for execution.
"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=255, unique=True, db_index=True)
container_image = models.CharField(max_length=512)
command = models.JSONField(
help_text="Command to run in the container, e.g., ['python', 'main.py']"
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def __str__(self):
return self.name
class JobExecution(models.Model):
"""
Represents a single run of a Job. This is our state machine.
"""
class Status(models.TextChoices):
PENDING = 'PENDING', 'Pending'
SCHEDULED = 'SCHEDULED', 'Scheduled'
RUNNING = 'RUNNING', 'Running'
COMPLETED = 'COMPLETED', 'Completed'
FAILED = 'FAILED', 'Failed'
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
job = models.ForeignKey(Job, on_delete=models.CASCADE, related_name='executions')
status = models.CharField(
max_length=20,
choices=Status.choices,
default=Status.PENDING,
db_index=True
)
# Execution details
worker_node_id = models.CharField(max_length=255, null=True, blank=True, db_index=True)
container_id = models.CharField(max_length=255, null=True, blank=True)
# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
scheduled_at = models.DateTimeField(null=True, blank=True)
started_at = models.DateTimeField(null=True, blank=True)
finished_at = models.DateTimeField(null=True, blank=True)
# Store exit code and logs for diagnostics
exit_code = models.IntegerField(null=True, blank=True)
logs_tail = models.TextField(null=True, blank=True)
def __str__(self):
return f"{self.job.name} - {self.id} [{self.status}]"
class Meta:
ordering = ['-created_at']
Connecting Django to CockroachDB is straightforward, as CockroachDB is compatible with the PostgreSQL wire protocol. A common mistake here is to forget the OPTIONS
for retries. CockroachDB uses optimistic locking and may return retryable transaction errors (40001
SQLSTATE), which the official psycopg2
driver does not handle automatically. The cockroachdb
python package is a thin wrapper that adds this retry logic.
orchestrator/settings.py
:
# ... other settings
# Ensure you have `cockroachdb` installed: pip install cockroachdb
DATABASES = {
'default': {
'ENGINE': 'cockroachdb.django',
'NAME': 'orchestrator',
'USER': 'your_user',
'PASSWORD': 'your_password',
'HOST': 'localhost', # Or your CRDB load balancer
'PORT': '26257',
'OPTIONS': {
'sslmode': 'verify-full',
'sslrootcert': '/path/to/ca.crt',
# This is critical for production resilience.
# It enables the driver to automatically retry transactions
# that fail with a 40001 serialization error.
'application_name': 'django_orchestrator'
},
}
}
# ... rest of settings
Part 2: The Worker Agent with containerd
Each worker node runs a small agent. This agent exposes a simple API that the central scheduler calls to manage containers. We chose to build this with FastAPI for its speed and simplicity, and used the official containerd
client library for Python. The pitfall here is underestimating the complexity of the containerd
API. It’s powerful but low-level.
This agent needs to handle image pulling, container creation, starting, stopping, and log streaming. Production-grade code must include robust error handling for each of these steps.
worker/agent.py
:
import asyncio
import logging
from contextlib import asynccontextmanager
from containerd.services.containers.v1 import containers_pb2
from containerd.services.content.v1 import content_pb2
from containerd.services.images.v1 import images_pb2
from containerd.services.tasks.v1 import tasks_pb2
from containerd.shim.v1 import shim_pb2
from containerd.types.v1 import oci_pb2
from grpclib.client import Channel
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import json
# --- Configuration ---
CONTAINERD_SOCK = "/run/containerd/containerd.sock"
NAMESPACE = "default"
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- gRPC Clients ---
# In a real app, you would manage the lifecycle of these channels better.
channel = Channel(path=CONTAINERD_SOCK)
containers_client = containers_pb2.ContainersStub(channel)
images_client = images_pb2.ImagesStub(channel)
content_client = content_pb2.ContentStub(channel)
tasks_client = tasks_pb2.TasksStub(channel)
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Worker agent starting up...")
yield
logger.info("Worker agent shutting down...")
channel.close()
app = FastAPI(lifespan=lifespan)
class ContainerSpec(BaseModel):
execution_id: str
image: str
command: list[str]
async def pull_image_if_not_exists(image_name: str):
"""Pulls a container image, handling potential failures."""
try:
req = images_pb2.GetImageRequest(name=image_name)
await images_client.get(req)
logger.info(f"Image '{image_name}' already exists locally.")
return
except Exception:
logger.info(f"Image '{image_name}' not found. Pulling...")
try:
pull_req = images_pb2.PullRequest(image=image_name, platform="linux/amd64")
# This is a streaming response, but for simplicity, we'll just wait for it to finish.
# A production implementation would handle the stream to show progress.
async for _ in images_client.pull(pull_req):
pass
logger.info(f"Successfully pulled image '{image_name}'.")
except Exception as e:
logger.error(f"Failed to pull image '{image_name}': {e}")
raise HTTPException(status_code=500, detail=f"Image pull failed: {e}")
@app.post("/containers/create_and_run")
async def create_and_run_container(spec: ContainerSpec):
"""
The core endpoint to create and start a container for a job execution.
This function demonstrates the multi-step process in containerd.
"""
logger.info(f"Received request to run execution {spec.execution_id} with image {spec.image}")
await pull_image_if_not_exists(spec.image)
# Step 1: Get image metadata to create the OCI spec
try:
img_resp = await images_client.get(images_pb2.GetImageRequest(name=spec.image))
except Exception as e:
logger.error(f"Could not get image metadata for {spec.image}: {e}")
raise HTTPException(status_code=404, detail=f"Image metadata not found: {e}")
# Step 2: Create an OCI runtime spec
# This is a very minimal spec. Production use would require configuring
# networking, resource limits, volumes, etc.
oci_spec = oci_pb2.Spec(
version="1.0.2",
process=oci_pb2.Process(
args=spec.command,
cwd="/",
),
root=oci_pb2.Root(
path=f"rootfs-{spec.execution_id}", # This is relative to containerd's state dir
)
)
# The spec needs to be encoded as a Protobuf Any type.
spec_any = content_pb2.Any()
spec_any.Pack(oci_spec)
# Step 3: Create the container object in containerd
container_id = f"exec-{spec.execution_id}"
container = containers_pb2.Container(
id=container_id,
image=spec.image,
spec=spec_any,
runtime=containers_pb2.Container_Runtime(name="io.containerd.runc.v2")
)
try:
create_req = containers_pb2.CreateContainerRequest(container=container)
await containers_client.create(create_req)
logger.info(f"Created container '{container_id}' for execution {spec.execution_id}")
except Exception as e:
# It might already exist from a previous failed attempt, try to clean up.
logger.warning(f"Container creation failed, possibly already exists: {e}. Attempting to proceed.")
# Step 4: Create and start the task (the running process)
try:
task_req = tasks_pb2.CreateTaskRequest(container_id=container_id)
await tasks_client.create(task_req)
start_req = tasks_pb2.StartRequest(container_id=container_id)
await tasks_client.start(start_req)
logger.info(f"Started task for container '{container_id}'")
return {"container_id": container_id, "status": "RUNNING"}
except Exception as e:
logger.error(f"Failed to create or start task for container '{container_id}': {e}")
# Cleanup the container object if task creation fails
await containers_client.delete(containers_pb2.DeleteContainerRequest(id=container_id))
raise HTTPException(status_code=500, detail=f"Failed to start task: {e}")
# ... other endpoints for status, logs, stop would be needed ...
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)
Part 3: The Scheduler Brain
The scheduler is a continuously running process that polls CockroachDB for PENDING
jobs. Its logic is simple but must be robust. We implemented it as a Django management command.
A critical design decision here is how to handle transactions. The scheduler’s loop should be:
- BEGIN a transaction.
- SELECT a
PENDING
job, locking the row for update (SELECT ... FOR UPDATE
). - UPDATE the job’s status to
SCHEDULED
. - COMMIT the transaction.
- After the commit, make the RPC call to the worker.
This ensures that no two scheduler instances can pick up the same job. If the RPC call fails, another process will handle reconciling this SCHEDULED
job that never started.
orchestrator/jobs/management/commands/run_scheduler.py
:
import time
import logging
import requests
from django.core.management.base import BaseCommand
from django.db import transaction
from orchestrator.jobs.models import JobExecution
logger = logging.getLogger(__name__)
# In a real system, this would come from a service discovery mechanism.
WORKER_NODES = {
"worker-01": "http://localhost:8001",
}
class Command(BaseCommand):
help = 'Runs the main job scheduler loop.'
def handle(self, *args, **options):
logger.info("Scheduler process started. Polling for jobs...")
while True:
try:
self.schedule_one_job()
except Exception as e:
logger.error(f"Scheduler loop encountered an error: {e}", exc_info=True)
time.sleep(5) # Polling interval
def select_worker(self):
# Extremely basic round-robin. A real implementation needs to
# check worker health, load, capabilities, etc.
return list(WORKER_NODES.keys())[0], list(WORKER_NODES.values())[0]
def schedule_one_job(self):
execution_to_run = None
# This transaction is the heart of preventing double-scheduling.
with transaction.atomic():
# Find a pending job and lock it to prevent other schedulers from picking it up.
# `select_for_update(skip_locked=True)` is crucial for concurrency.
# It tells the database to skip any rows that are already locked by another transaction.
job_exec = JobExecution.objects.select_for_update(skip_locked=True).filter(
status=JobExecution.Status.PENDING
).first()
if not job_exec:
return # No jobs to schedule
worker_id, worker_url = self.select_worker()
job_exec.status = JobExecution.Status.SCHEDULED
job_exec.worker_node_id = worker_id
job_exec.save(update_fields=['status', 'worker_node_id', 'updated_at'])
execution_to_run = job_exec
if execution_to_run:
logger.info(f"Scheduled job execution {execution_to_run.id} on worker {worker_id}")
self.dispatch_to_worker(execution_to_run, worker_url)
def dispatch_to_worker(self, execution: JobExecution, worker_url: str):
job = execution.job
spec = {
"execution_id": str(execution.id),
"image": job.container_image,
"command": job.command,
}
try:
response = requests.post(f"{worker_url}/containers/create_and_run", json=spec, timeout=30)
response.raise_for_status()
data = response.json()
container_id = data.get("container_id")
# Update the state in the database upon successful dispatch
execution.container_id = container_id
execution.status = JobExecution.Status.RUNNING
execution.save(update_fields=['container_id', 'status', 'updated_at'])
logger.info(f"Successfully started container {container_id} for execution {execution.id}")
except requests.RequestException as e:
logger.error(f"Failed to dispatch job {execution.id} to worker: {e}")
# Mark the job as FAILED so it can be investigated or retried.
execution.status = JobExecution.Status.FAILED
execution.logs_tail = f"Failed to dispatch to worker: {e}"
execution.save(update_fields=['status', 'logs_tail', 'updated_at'])
Part 4: The CDC Audit Trail to a Data Lake
This is where the observability aspect shines. We don’t want to query our production CockroachDB for analytics. Instead, we stream every change to our JobExecution
table into an S3-compatible data lake.
First, enable rangefeeds on the cluster, a prerequisite for CDC.
-- Connect to CockroachDB with `cockroach sql`
SET CLUSTER SETTING kv.rangefeed.enabled = true;
Next, create the changefeed. We’ll use the webhook
sink, which sends JSON-formatted changes to an HTTP endpoint. The updated
and resolved
options provide metadata about the stream’s progress.
CREATE CHANGEFEED FOR TABLE jobs_jobexecution
INTO 'webhook-https://your-webhook-receiver.com/cdc?insecure_tls_skip_verify=true'
WITH updated, resolved;
Now, we need a service to receive these webhooks. This service will parse the JSON, format it into a queryable format like Parquet, and write it to object storage. We’ll use MinIO as our self-hosted S3-compatible lake.
cdc_ingestor/main.py
:
import logging
import json
from datetime import datetime
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
import pyarrow as pa
import pyarrow.parquet as pq
from s3fs import S3FileSystem
# --- Configuration ---
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS_KEY = "minioadmin"
S3_SECRET_KEY = "minioadmin"
S3_BUCKET = "job-executions-lake"
# --- Logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- S3 Client ---
s3 = S3FileSystem(
key=S3_ACCESS_KEY,
secret=S3_SECRET_KEY,
client_kwargs={'endpoint_url': S3_ENDPOINT}
)
app = FastAPI()
class CdcPayload(BaseModel):
# This is a simplified model of the actual CockroachDB CDC payload
payload: list[dict]
# Ensure bucket exists
if not s3.exists(S3_BUCKET):
s3.mkdir(S3_BUCKET)
logger.info(f"Created S3 bucket: {S3_BUCKET}")
@app.post("/cdc")
async def handle_cdc_webhook(request: Request):
"""Receives CDC data, transforms it, and writes to the data lake."""
body = await request.body()
# CockroachDB sends newline-delimited JSON
lines = body.decode('utf-8').strip().split('\n')
records_to_write = []
for line in lines:
try:
data = json.loads(line)
# We are interested in the 'after' state of the row
if 'after' in data:
record = data['after']
# Basic type coercion and cleaning for Parquet
record['updated_at_cdc_ts'] = datetime.utcnow().isoformat()
for key, val in record.items():
if val is None:
record[key] = "" # Parquet handles nulls, but empty strings can be simpler
records_to_write.append(record)
except json.JSONDecodeError:
logger.warning(f"Could not decode JSON line: {line}")
continue
if not records_to_write:
return {"status": "ok", "records_processed": 0}
try:
# Use PyArrow to create a table and write to Parquet
table = pa.Table.from_pylist(records_to_write)
# Partition the data by date for efficient querying
now = datetime.utcnow()
partition_path = f"year={now.year}/month={now.month:02d}/day={now.day:02d}"
file_path = f"{S3_BUCKET}/{partition_path}/{now.isoformat()}.parquet"
with s3.open(file_path, 'wb') as f:
pq.write_table(table, f)
logger.info(f"Wrote {len(records_to_write)} records to {file_path}")
return {"status": "ok", "records_processed": len(records_to_write)}
except Exception as e:
logger.error(f"Failed to write to data lake: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to persist CDC data")
When we now create and run a job, we can watch the state transitions get persisted to MinIO almost instantly. A job might go from PENDING
-> SCHEDULED
-> RUNNING
-> COMPLETED
. Each of these updates creates a new row in our Parquet files, giving us a full, time-series history of every execution. We can later query this data with tools like DuckDB, Spark, or Trino to analyze job performance, failure rates, and resource usage without ever touching the production database.
Limitations and Future Iterations
This implementation provides a resilient foundation, but it’s far from complete. The scheduler’s placement logic is non-existent; it simply picks the first available worker. A production system would require workers to heartbeat their status and resource availability back into CockroachDB, allowing the scheduler to make intelligent decisions.
The communication between the scheduler and workers is simple REST over HTTP. This is fine for commands but inefficient for log streaming or more complex interactions. Migrating this to gRPC would be a logical next step for performance and type safety.
Furthermore, the system currently offers no mechanism for resource isolation (CPU, memory limits). The containerd
API and OCI spec fully support this, but it requires adding more complexity to the worker agent when creating the container spec.
Finally, while the CDC pipeline is robust, the webhook ingestor itself is a single point of failure. Running multiple instances of it and ensuring idempotent writes to the data lake (e.g., by using the CDC event’s timestamp and primary key to name files) would be necessary to make the audit trail as resilient as the core job execution system.