Our initial MLOps platform was a monument to technical debt. Model training jobs ran on a handful of large, statically configured Ray clusters. Experiment metadata, artifacts, and run states were all logged to a single, monolithic PostgreSQL instance. Every time a data science team needed a new environment, it involved manual configuration, IP address wrangling, and updating a dozen YAML files. When a Ray head node crashed overnight, the on-call engineer had to manually fail it over. The PostgreSQL server was a single point of failure we simply chose to ignore, hoping it would never go down during a critical model release. This was not sustainable.
The goal became clear: build a dynamic, resilient compute plane. Ray clusters needed to be treated as cattle, not pets. They should be able to appear, join a logical fabric, perform work, and disappear without manual intervention. The state management layer had to be as resilient and scalable as the compute itself. This required a fundamental shift from static configuration files to a dynamic service-oriented architecture, even for our internal training infrastructure.
We decided on a technology stack that embraced this dynamic nature. Ray would remain our compute framework. For the state management backend, CockroachDB was a natural fit, offering PostgreSQL wire compatibility with horizontal scalability and fault tolerance. The real linchpin, however, was HashiCorp Consul. We needed more than just DNS-based service discovery; we needed a reliable key-value store for distributed configuration and a robust health-checking mechanism to automatically prune unhealthy nodes from our compute fabric. This is the log of how we stitched them together.
The Foundation: Resilient State and Discovery
Before touching any Python or Ray code, we had to establish a solid foundation with CockroachDB and Consul. In a real-world project, these would be managed, highly-available clusters. For development and demonstration, a docker-compose
setup is sufficient to simulate the multi-node environment.
# docker-compose.yml
version: '3.8'
services:
consul-server:
image: hashicorp/consul:1.13.1
container_name: consul-server
ports:
- "8500:8500"
- "8600:8600/udp"
command: "agent -server -ui -node=server-1 -bootstrap-expect=1 -client=0.0.0.0"
roach-1:
image: cockroachdb/cockroach:v22.1.8
container_name: roach-1
hostname: roach-1
ports:
- "26257:26257"
- "8080:8080"
command: start-single-node --insecure --advertise-addr=roach-1
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health?ready=1"]
interval: 10s
timeout: 5s
retries: 5
roach-2:
image: cockroachdb/cockroach:v22.1.8
container_name: roach-2
hostname: roach-2
command: start --insecure --join=roach-1 --advertise-addr=roach-2
depends_on:
roach-1:
condition: service_healthy
roach-3:
image: cockroachdb/cockroach:v22.1.8
container_name: roach-3
hostname: roach-3
command: start --insecure --join=roach-1 --advertise-addr=roach-3
depends_on:
roach-1:
condition: service_healthy
# This service registers the CockroachDB cluster in Consul.
# In a real system, this might be handled by an agent or sidecar.
crdb-consul-registrator:
image: curlimages/curl:7.85.0
container_name: crdb-registrator
command: >
/bin/sh -c '
echo "Waiting for CockroachDB and Consul...";
sleep 15;
echo "Registering CockroachDB service in Consul...";
curl -X PUT --data @/config/crdb-service.json http://consul-server:8500/v1/agent/service/register;
echo "Registration attempt complete. Container will exit.";
'
volumes:
- ./config:/config
depends_on:
- consul-server
- roach-1
networks:
default:
name: mlops_fabric
The accompanying configuration file, config/crdb-service.json
, defines how CockroachDB presents itself to the Consul catalog. A key detail here is the health check. Consul will actively probe the CockroachDB node’s health endpoint. If a node becomes unresponsive, Consul will automatically mark it as unhealthy and stop routing traffic to it.
/* config/crdb-service.json */
{
"ID": "crdb-cluster-1",
"Name": "cockroachdb",
"Tags": ["mlops", "metadata-store"],
"Address": "roach-1",
"Port": 26257,
"Check": {
"HTTP": "http://roach-1:8080/health?ready=1",
"Interval": "10s",
"Timeout": "2s",
"DeregisterCriticalServiceAfter": "1m"
}
}
With this setup running (docker-compose up -d
), we have a 3-node CockroachDB cluster and a Consul server. The crdb-consul-registrator
is a simple one-shot container that registers the database service. In a production scenario, a Consul agent running on each CockroachDB host would handle this registration lifecycle more gracefully.
Next, we define the schema for our MLOps metadata. This needs to be robust enough to track experiments, model versions, and the runs that produce them.
-- init_db.sql
-- Run this against the CockroachDB cluster.
-- Example: cockroach sql --insecure -h localhost -p 26257 < init_db.sql
CREATE DATABASE IF NOT EXISTS mlops;
USE mlops;
CREATE TABLE IF NOT EXISTS experiments (
experiment_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
experiment_name STRING NOT NULL UNIQUE,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS model_versions (
model_version_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
experiment_id UUID NOT NULL REFERENCES experiments(experiment_id),
version_string STRING NOT NULL,
artifact_path STRING NOT NULL,
metrics JSONB,
created_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (experiment_id, version_string)
);
CREATE TABLE IF NOT EXISTS pipeline_runs (
run_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
model_version_id UUID REFERENCES model_versions(model_version_id),
run_status STRING NOT NULL CHECK (run_status IN ('STARTED', 'COMPLETED', 'FAILED')),
start_time TIMESTAMPTZ DEFAULT now(),
end_time TIMESTAMPTZ,
run_logs TEXT,
ray_cluster_id STRING -- ID of the Ray cluster that executed this run
);
-- Indexes are critical for performance in a real-world project.
CREATE INDEX IF NOT EXISTS on model_versions (created_at DESC);
CREATE INDEX IF NOT EXISTS on pipeline_runs (start_time DESC);
This schema provides the basic structure for tracking our MLOps lifecycle. The ray_cluster_id
in pipeline_runs
is crucial; it will link a specific training job back to the ephemeral compute cluster that executed it.
Dynamic Ray Cluster Lifecycle Management
The core of the problem was decoupling Ray workers from a hardcoded head node address. A worker node, upon startup, should be able to ask the service discovery layer, “Where is a healthy Ray head node for my pool?” and get back a usable address.
We achieve this with a Python script that wraps the ray start
command. This script is responsible for two things:
- For a head node: Start Ray, then register itself with Consul, complete with a meaningful service ID and a health check.
- For a worker node: Query Consul for a healthy
ray-head
service, then use the returned address to join the cluster.
Here is the implementation of this cluster management script. It uses the python-consul2
library and standard Python modules. Note the extensive logging and error handling; in a distributed system, assuming a network call will succeed is a recipe for disaster.
# cluster_manager.py
import argparse
import logging
import os
import socket
import subprocess
import sys
import time
import uuid
import consul
import requests
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
stream=sys.stdout,
)
CONSUL_HOST = os.environ.get("CONSUL_HOST", "consul-server")
CONSUL_PORT = int(os.environ.get("CONSUL_PORT", 8500))
# In a production setup, this would be more sophisticated, possibly
# coming from instance metadata.
def get_host_ip():
"""A reliable way to get the host's primary IP address."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception as e:
logging.error(f"Could not determine host IP: {e}")
return "127.0.0.1"
class ConsulManager:
"""Handles all interactions with the Consul agent."""
def __init__(self, host, port):
self.host = host
self.port = port
self.c = None
self.connect()
def connect(self):
try:
self.c = consul.Consul(host=self.host, port=self.port)
self.c.agent.self() # Test connection
logging.info(f"Successfully connected to Consul at {self.host}:{self.port}")
except requests.exceptions.ConnectionError as e:
logging.error(f"Failed to connect to Consul: {e}")
self.c = None
raise ConnectionError("Consul connection failed")
def register_service(self, name, service_id, address, port, tags, check):
if not self.c:
raise ConnectionError("Consul client not initialized.")
logging.info(f"Registering service '{name}' with ID '{service_id}' at {address}:{port}")
# A common mistake is to not include a health check. Without it,
# a dead service remains in the catalog indefinitely.
return self.c.agent.service.register(
name=name,
service_id=service_id,
address=address,
port=port,
tags=tags,
check=check
)
def deregister_service(self, service_id):
if not self.c:
logging.warning("Cannot deregister; Consul client not initialized.")
return
logging.info(f"Deregistering service ID '{service_id}'")
self.c.agent.service.deregister(service_id)
def find_healthy_service(self, name, tag=None):
if not self.c:
raise ConnectionError("Consul client not initialized.")
# The passing=True flag is critical. It filters for services
# that are passing their health checks.
_index, services = self.c.health.service(name, tag=tag, passing=True)
if not services:
logging.warning(f"No healthy instances of service '{name}' found.")
return None
# In a more complex scenario, you might use tags to select a specific
# type of cluster or implement load balancing logic here.
service = services[0]
address = service['Service']['Address']
port = service['Service']['Port']
logging.info(f"Found healthy service '{name}' at {address}:{port}")
return address, port
def start_ray_head(args):
"""Starts a Ray head node and registers it with Consul."""
host_ip = get_host_ip()
service_id = f"ray-head-{uuid.uuid4()}"
ray_port = 6379 # Default Ray head port
# This TCP check is basic but effective. For production, a more robust
# check might query Ray's metrics endpoint.
check = consul.Check.tcp(host_ip, ray_port, interval="10s", timeout="2s", deregister="1m")
consul_manager = None
ray_process = None
try:
consul_manager = ConsulManager(CONSUL_HOST, CONSUL_PORT)
command = [
"ray", "start", "--head", f"--port={ray_port}",
"--dashboard-host=0.0.0.0", f"--node-ip-address={host_ip}"
]
ray_process = subprocess.Popen(command)
# Give Ray a moment to start up before registering.
time.sleep(5)
if ray_process.poll() is not None:
raise RuntimeError("Ray head process failed to start.")
consul_manager.register_service(
name="ray-head",
service_id=service_id,
address=host_ip,
port=ray_port,
tags=["mlops-compute", f"pool:{args.pool}"],
check=check
)
logging.info("Ray head started and registered. Monitoring process...")
ray_process.wait()
except ConnectionError as e:
logging.error(f"Consul connection error: {e}. Shutting down.")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}", exc_info=True)
finally:
if ray_process and ray_process.poll() is None:
logging.info("Terminating Ray process.")
ray_process.terminate()
if consul_manager:
consul_manager.deregister_service(service_id)
logging.info("Cleanup complete. Exiting head node wrapper.")
def start_ray_worker(args):
"""Starts a Ray worker node by discovering the head via Consul."""
try:
consul_manager = ConsulManager(CONSUL_HOST, CONSUL_PORT)
# The pitfall here is a race condition. If workers start before a head
# is available, they'll fail. A robust solution needs a retry loop.
head_address = None
for i in range(args.retries):
logging.info(f"Attempt {i+1}/{args.retries} to discover Ray head for pool '{args.pool}'...")
result = consul_manager.find_healthy_service("ray-head", tag=f"pool:{args.pool}")
if result:
head_ip, head_port = result
head_address = f"{head_ip}:{head_port}"
break
time.sleep(5)
if not head_address:
logging.error(f"Could not discover a healthy Ray head for pool '{args.pool}' after {args.retries} retries.")
sys.exit(1)
host_ip = get_host_ip()
command = [
"ray", "start", f"--address={head_address}", f"--node-ip-address={host_ip}"
]
logging.info(f"Starting Ray worker to join cluster at {head_address}...")
ray_process = subprocess.Popen(command)
ray_process.wait()
except ConnectionError as e:
logging.error(f"Consul connection error: {e}. Exiting.")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}", exc_info=True)
finally:
logging.info("Ray worker process exited.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Ray Cluster Manager with Consul")
subparsers = parser.add_subparsers(dest="command", required=True)
head_parser = subparsers.add_parser("head", help="Start a Ray head node.")
head_parser.add_argument("--pool", type=str, default="default", help="The compute pool this head belongs to.")
head_parser.set_defaults(func=start_ray_head)
worker_parser = subparsers.add_parser("worker", help="Start a Ray worker node.")
worker_parser.add_argument("--pool", type=str, default="default", help="The compute pool to join.")
worker_parser.add_argument("--retries", type=int, default=12, help="Number of retries to discover the head.")
worker_parser.set_defaults(func=start_ray_worker)
args = parser.parse_args()
args.func(args)
This script now orchestrates the entire lifecycle. To start a head node for the gpu-intensive
pool, one would run python cluster_manager.py head --pool gpu-intensive
. A worker would use python cluster_manager.py worker --pool gpu-intensive
. The use of tags (pool:gpu-intensive
) is a powerful Consul feature that allows us to partition our compute plane into logical groups without any static configuration.
Integrating the MLOps Application Logic
With the infrastructure in place, the final step is to make the application logic—the actual model training code—aware of this dynamic environment. The code running inside a Ray task must now discover the CockroachDB service through Consul instead of reading a static connection string from a config file.
Here is a utility module for database interactions and a sample Ray training task.
# mlops_lib/db.py
import os
import logging
import threading
import consul
import psycopg2
from psycopg2 import pool
CONSUL_HOST = os.environ.get("CONSUL_HOST", "consul-server")
CONSUL_PORT = int(os.environ.get("CONSUL_PORT", 8500))
# Using a thread-local cache to avoid querying Consul for every single connection request.
# The cache can be invalidated periodically in a real application.
local_cache = threading.local()
def find_db_service_from_consul():
"""Queries Consul for a healthy cockroachdb service."""
if hasattr(local_cache, 'db_conn_string') and local_cache.db_conn_string:
return local_cache.db_conn_string
try:
c = consul.Consul(host=CONSUL_HOST, port=CONSUL_PORT)
_index, services = c.health.service("cockroachdb", passing=True)
if not services:
raise ConnectionError("No healthy CockroachDB instances found in Consul.")
service = services[0]['Service']
address = service['Address']
port = service['Port']
conn_string = f"postgresql://root@{address}:{port}/mlops?sslmode=disable"
local_cache.db_conn_string = conn_string
logging.info(f"Discovered DB connection string via Consul: postgresql://root@***:****/mlops")
return conn_string
except Exception as e:
logging.error(f"Failed to discover DB service from Consul: {e}")
return None
# A production-grade connection pool is non-negotiable. Opening/closing
# connections for every query is extremely inefficient.
connection_pool = None
pool_lock = threading.Lock()
def get_connection_pool():
global connection_pool
# Double-checked locking to ensure pool is initialized only once.
if connection_pool is None:
with pool_lock:
if connection_pool is None:
conn_string = find_db_service_from_consul()
if not conn_string:
raise RuntimeError("Failed to initialize DB pool: could not get connection string.")
logging.info("Initializing CockroachDB connection pool.")
connection_pool = psycopg2.pool.SimpleConnectionPool(
minconn=2,
maxconn=10,
dsn=conn_string
)
return connection_pool
def get_db_connection():
"""Gets a connection from the pool."""
p = get_connection_pool()
return p.getconn()
def release_db_connection(conn):
"""Releases a connection back to the pool."""
p = get_connection_pool()
p.putconn(conn)
Now, the training task can use these utilities to interact with the database in a resilient way.
# train_task.py
import logging
import random
import time
import uuid
import ray
from mlops_lib import db
@ray.remote
class TrainingActor:
def __init__(self, experiment_name: str, ray_cluster_id: str):
self.experiment_name = experiment_name
self.experiment_id = None
self.ray_cluster_id = ray_cluster_id
self._ensure_experiment_exists()
def _execute_query(self, query, params=None, fetch=None):
"""A robust query execution wrapper with connection management."""
conn = None
try:
conn = db.get_db_connection()
with conn.cursor() as cur:
cur.execute(query, params)
if fetch == 'one':
result = cur.fetchone()
elif fetch == 'all':
result = cur.fetchall()
else:
result = None
conn.commit()
return result
except Exception as e:
logging.error(f"Database query failed: {e}")
if conn:
conn.rollback()
raise
finally:
if conn:
db.release_db_connection(conn)
def _ensure_experiment_exists(self):
query = "SELECT experiment_id FROM experiments WHERE experiment_name = %s"
result = self._execute_query(query, (self.experiment_name,), fetch='one')
if result:
self.experiment_id = result[0]
else:
insert_query = "INSERT INTO experiments (experiment_name) VALUES (%s) RETURNING experiment_id"
self.experiment_id = self._execute_query(insert_query, (self.experiment_name,), fetch='one')[0]
logging.info(f"Using experiment ID: {self.experiment_id} for '{self.experiment_name}'")
def run_training(self, version_string: str, params: dict):
# 1. Create the model version record
insert_version_query = """
INSERT INTO model_versions (experiment_id, version_string, artifact_path, metrics)
VALUES (%s, %s, %s, %s) RETURNING model_version_id
"""
artifact_path = f"s3://models/{self.experiment_name}/{version_string}.pkl"
# Metrics are stored as JSONB
metrics_json = '{"initial_params": "%s"}' % str(params)
model_version_id = self._execute_query(insert_version_query,
(self.experiment_id, version_string, artifact_path, metrics_json), fetch='one')[0]
# 2. Create the pipeline run record
insert_run_query = """
INSERT INTO pipeline_runs (model_version_id, run_status, ray_cluster_id)
VALUES (%s, 'STARTED', %s) RETURNING run_id
"""
run_id = self._execute_query(insert_run_query, (model_version_id, self.ray_cluster_id), fetch='one')[0]
logging.info(f"Starting run {run_id} for model version {model_version_id}")
# 3. Simulate the training process
try:
logging.info(f"Simulating training for version {version_string}...")
time.sleep(random.randint(5, 15))
# Simulate a potential failure
if random.random() < 0.1:
raise RuntimeError("Random training failure!")
final_accuracy = 0.85 + (random.random() * 0.1)
final_metrics = f'{{"accuracy": {final_accuracy}, "loss": {1-final_accuracy}}}'
# 4. Update run status and model metrics on success
update_run_query = "UPDATE pipeline_runs SET run_status = 'COMPLETED', end_time = now() WHERE run_id = %s"
self._execute_query(update_run_query, (run_id,))
update_version_query = "UPDATE model_versions SET metrics = metrics || %s::jsonb WHERE model_version_id = %s"
self._execute_query(update_version_query, (final_metrics, model_version_id))
logging.info(f"Run {run_id} completed successfully.")
return True
except Exception as e:
# 5. Update run status on failure
logging.error(f"Run {run_id} failed: {e}")
update_run_query = "UPDATE pipeline_runs SET run_status = 'FAILED', end_time = now(), run_logs = %s WHERE run_id = %s"
self._execute_query(update_run_query, (str(e), run_id))
return False
if __name__ == "__main__":
# This part would be the entrypoint for submitting jobs.
# It discovers the Ray head via Consul and connects to it.
consul_manager = db.ConsulManager(CONSUL_HOST, CONSUL_PORT)
head_addr, head_port = consul_manager.find_healthy_service("ray-head", tag="pool:default")
ray.init(address=f"{head_addr}:{head_port}", namespace="mlops")
actor = TrainingActor.remote(
experiment_name="fraud-detection-v3",
ray_cluster_id=os.environ.get("RAY_CLUSTER_ID", "local")
)
# Submit multiple training jobs in parallel
futures = [
actor.run_training.remote(f"v3.1.{i}", {"learning_rate": 0.01 * i})
for i in range(1, 4)
]
results = ray.get(futures)
print(f"Training run results: {results}")
The final architecture looks like this:
graph TD subgraph "MLOps Platform" subgraph "Dynamic Compute Plane" RayHead1[Ray Head 1 - Pool A] RayWorker1A[Ray Worker 1A] RayWorker1B[Ray Worker 1B] RayHead2[Ray Head 2 - Pool B] RayWorker2A[Ray Worker 2A] end subgraph "Control & State Plane" Consul[Consul Cluster] CRDB[CockroachDB Cluster] end end Client[ML Engineer / CI/CD] -- Submits Job --> RayHead1 RayHead1 -- 1. Registers Service --> Consul RayHead2 -- 1. Registers Service --> Consul RayWorker1A -- 2. Discovers Head --> Consul RayWorker1B -- 2. Discovers Head --> Consul RayWorker2A -- 2. Discovers Head --> Consul RayWorker1A --> RayHead1 RayWorker1B --> RayHead1 RayWorker2A --> RayHead2 subgraph "Training Task on RayWorker1A" AppCode[Application Code] end AppCode -- 3. Discovers DB --> Consul AppCode -- 4. Writes Metadata --> CRDB Consul -- Health Checks --> RayHead1 Consul -- Health Checks --> CRDB
This system effectively solves the initial problems. Ray clusters are now ephemeral. We can scale the number of workers or entire clusters up and down based on demand. A failure in a Ray head node is handled gracefully; Consul’s health checks remove it from the available pool, and new workers will automatically find a different, healthy head. The database is no longer a single point of failure. This architecture provides a level of resilience and automation that is essential for any serious MLOps platform, moving far beyond brittle, static configurations.
The current implementation, however, is not without its limitations. The health check for the Ray head node is a simple TCP check; a more production-ready solution would query Ray’s own metrics API to ensure the scheduler is responsive, not just that the port is open. Security is also a significant consideration for the next iteration. All communication is currently insecure; integrating Consul ACLs and enabling mTLS between all services via Consul Connect would be a critical next step before exposing this to a wider range of internal teams. Finally, while the service discovery is robust, the job submission logic is still manual. The next evolution would involve building a scheduler service that programmatically selects a compute pool based on job requirements, provisions a Ray cluster via an infrastructure-as-code tool, and monitors its lifecycle, making the entire process fully automated.