The initial deployment of our time series aggregation pipeline seemed straightforward. A Dask cluster was spun up, partitioned data was read from Parquet files, and a series of rolling window calculations were performed over several terabytes of sensor data. The logic, executed on a single machine with a LocalCluster
, was flawless. In staging, with a dask.distributed
cluster, the system began to show its fragility. The computation, designed to run for eight hours, would fail unpredictably around the five-hour mark. A spot instance termination, a transient network partition, a memory-overloaded worker—any of these events would terminate the entire Future
and force a complete restart. This wasn’t just an inconvenience; it was an operational impossibility for a production system.
Our primary pain point was the ephemeral nature of the Dask scheduler’s state. It brilliantly manages tasks, data locality, and worker communication, but it assumes a reliable cluster. When a worker is lost, tasks assigned to it are re-run. But if the scheduler itself fails, or if a critical, long-running task chain is broken, the entire computation graph’s state is gone. Simple retries at the client level are naive; they can’t recover the intermediate state of a massive aggregation. The real requirement was not just task resilience, but computation resilience. The system had to be ableto resume from the last known good state, even in the face of scheduler or multiple worker failures.
This led to the conclusion that we needed an external, highly-available control plane to manage the cluster’s lifecycle and the computation’s progress. We evaluated Zookeeper and Redis but settled on etcd
. Its Raft-based consensus provides strong consistency guarantees, its watch mechanism is ideal for monitoring distributed state changes, and its lease feature provides a robust mechanism for service discovery and health checking. In a real-world project, predictability and reliability trump raw performance, and etcd
‘s operational model gave us the confidence we needed. Our new architecture would use etcd
for three core functions: dynamic worker registration, scheduler leader election, and—most critically—computation checkpointing.
Phase 1: Dynamic Worker Registration via etcd
Leases
The default method of connecting Dask workers to a scheduler via a fixed address (tcp://scheduler:8786
) is brittle. If the scheduler restarts with a new IP, workers are orphaned. Our first step was to decouple workers from the scheduler’s physical address. Instead, they would rely on a logical service name managed in etcd
.
Furthermore, the scheduler needs a definitive, real-time list of healthy workers. A worker process could die without properly deregistering. We implemented a heartbeating system using etcd
leases. Each worker acquires a lease and attaches it to its registration key. The worker is responsible for keeping this lease alive. If the worker crashes, the lease expires, and etcd
automatically deletes the key, instantly notifying any watchers—like our scheduler—that the worker is gone.
Here is the core implementation for a Dask worker that registers itself.
import os
import sys
import time
import logging
import threading
import uuid
import etcd3
from dask.distributed import Worker
from tornado.ioloop import IOLoop
# --- Configuration ---
# In a production setup, these would come from environment variables or a config file.
ETCD_HOST = os.getenv("ETCD_HOST", "127.0.0.1")
ETCD_PORT = int(os.getenv("ETCD_PORT", 2379))
SCHEDULER_KEY = "/dask/cluster/scheduler"
WORKER_PREFIX = "/dask/cluster/workers/"
WORKER_LEASE_TTL = 10 # Seconds. Workers must heartbeat faster than this.
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
# --- Logging Setup ---
logging.basicConfig(
level=LOG_LEVEL,
format="%(asctime)s - %(threadName)s - %(levelname)s - %(message)s",
stream=sys.stdout,
)
class ResilientDaskWorker:
"""
A Dask Worker that registers itself with etcd and maintains its presence
via a lease, allowing for dynamic discovery and health checking.
"""
def __init__(self, etcd_host, etcd_port):
self.worker_id = f"worker-{uuid.uuid4()}"
self.etcd_key = f"{WORKER_PREFIX}{self.worker_id}"
self.dask_worker = None
self.lease = None
self.stop_event = threading.Event()
try:
self.etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
logging.info("Successfully connected to etcd at %s:%s", etcd_host, etcd_port)
except Exception as e:
logging.critical("Failed to connect to etcd: %s", e)
raise
def _find_scheduler(self):
"""Polls etcd to find the active scheduler's address."""
while not self.stop_event.is_set():
try:
value, _ = self.etcd_client.get(SCHEDULER_KEY)
if value is not None:
scheduler_address = value.decode("utf-8")
logging.info("Found active scheduler at %s", scheduler_address)
return scheduler_address
except Exception as e:
logging.error("Error finding scheduler in etcd: %s. Retrying...", e)
time.sleep(5)
return None
def _start_dask_worker(self, scheduler_address, loop):
"""Initializes and starts the underlying Dask worker."""
# The Dask worker needs to run in the same event loop.
# This is critical for Tornado-based applications like Dask.
try:
self.dask_worker = Worker(
scheduler_address,
name=self.worker_id,
loop=loop
)
# Register with etcd *after* the worker is successfully initialized.
self.lease = self.etcd_client.lease(WORKER_LEASE_TTL)
self.etcd_client.put(self.etcd_key, self.dask_worker.address.encode('utf-8'), lease=self.lease)
logging.info(
"Dask worker started at %s and registered in etcd with key %s",
self.dask_worker.address, self.etcd_key
)
return True
except Exception as e:
logging.error("Failed to start Dask worker: %s", e)
if self.lease:
self.lease.revoke()
return False
def _keep_lease_alive(self):
"""Runs in a separate thread to refresh the etcd lease."""
logging.info("Starting lease keep-alive thread for lease ID %s", self.lease.id)
while not self.stop_event.is_set():
try:
# The python-etcd3 client handles the refresh logic internally
# when we iterate over the responses.
for _ in self.lease.refresh():
if self.stop_event.is_set():
break
# The refresh interval is determined by the TTL. We sleep briefly
# to avoid a tight loop if the connection is lost.
time.sleep(WORKER_LEASE_TTL / 3)
logging.warning("Lease refresh loop exited.")
except Exception as e:
logging.error("Failed to refresh lease: %s. Worker might be deregistered.", e)
# If lease fails, it's a critical error. We should stop.
self.stop_event.set()
break
def run(self):
"""Main execution loop for the worker."""
scheduler_address = self._find_scheduler()
if not scheduler_address:
logging.critical("Could not find scheduler. Shutting down.")
return
# Dask's worker needs to own the event loop.
loop = IOLoop.current()
if not self._start_dask_worker(scheduler_address, loop):
logging.critical("Failed to initialize Dask worker. Aborting.")
return
# Start the heartbeating in a daemon thread.
heartbeat_thread = threading.Thread(
target=self._keep_lease_alive,
name=f"etcd-heartbeat-{self.worker_id}",
daemon=True
)
heartbeat_thread.start()
try:
# This starts the Tornado event loop and blocks until the worker is closed.
loop.start()
finally:
self.shutdown()
def shutdown(self):
"""Gracefully shuts down the worker and cleans up etcd registration."""
logging.info("Shutting down resilient worker %s...", self.worker_id)
self.stop_event.set()
if self.lease:
try:
self.lease.revoke()
logging.info("Revoked etcd lease %s", self.lease.id)
except Exception as e:
logging.error("Failed to revoke etcd lease during shutdown: %s", e)
# The Dask worker's close method must be called from its own loop.
if self.dask_worker and self.dask_worker.loop:
self.dask_worker.loop.add_callback(self.dask_worker.close)
logging.info("Shutdown complete.")
if __name__ == "__main__":
worker_node = ResilientDaskWorker(etcd_host=ETCD_HOST, etcd_port=ETCD_PORT)
try:
worker_node.run()
except KeyboardInterrupt:
logging.info("Keyboard interrupt received.")
finally:
worker_node.shutdown()
The key insight here is the separation of concerns. The main thread runs the Dask/Tornado event loop, while a dedicated thread manages the etcd
lease. If the heartbeating fails, the entire worker process is designed to shut down, as it’s no longer a valid member of the cluster.
Phase 2: Scheduler High Availability with Leader Election
A dynamic pool of workers is useless if the scheduler remains a single point of failure. The next logical step was to make the scheduler itself highly available. We can run multiple scheduler instances, but only one should be active at any given time. This is a classic distributed systems problem solved by leader election. etcd
provides a straightforward recipe for this using its lock mechanism.
The process is as follows:
- Multiple scheduler candidates start up.
- Each candidate attempts to acquire a distributed lock on a predefined
etcd
key (e.g.,/dask/cluster/scheduler_lock
). - The instance that successfully acquires the lock becomes the leader. It writes its own network address to the
SCHEDULER_KEY
so workers can find it. - The other instances (followers) wait, periodically re-attempting to acquire the lock.
- If the leader fails, its
etcd
session will time out, releasing the lock. One of the followers will then acquire it and become the new leader.
This pattern prevents a “split-brain” scenario and ensures a single, authoritative scheduler is coordinating the cluster.
sequenceDiagram participant S1 as Scheduler 1 (Follower) participant S2 as Scheduler 2 (Follower) participant ETCD participant W1 as Worker 1 participant W2 as Worker 2 S1->>ETCD: Attempt to acquire lock on /dask/scheduler_lock ETCD-->>S1: Lock Acquired (Success) S1->>ETCD: Write "tcp://s1_ip:8786" to /dask/scheduler Note over S1: Becomes Leader S2->>ETCD: Attempt to acquire lock on /dask/scheduler_lock ETCD-->>S2: Lock Held (Fail) Note over S2: Enters waiting loop W1->>ETCD: Read /dask/scheduler ETCD-->>W1: "tcp://s1_ip:8786" W1->>S1: Connect W2->>ETCD: Read /dask/scheduler ETCD-->>W2: "tcp://s1_ip:8786" W2->>S1: Connect rect rgb(255, 220, 220) Note over S1: S1 process crashes S1--xETCD: Session times out, lock is released end S2->>ETCD: Attempt to acquire lock on /dask/scheduler_lock ETCD-->>S2: Lock Acquired (Success) S2->>ETCD: Write "tcp://s2_ip:8786" to /dask/scheduler Note over S2: Becomes New Leader W1->>ETCD: Watch on /dask/scheduler triggers W1->>S2: Reconnect to new leader W2->>ETCD: Watch on /dask/scheduler triggers W2->>S2: Reconnect to new leader
The workers, which are already watching the SCHEDULER_KEY
, will be notified of the change and automatically reconnect to the new leader. This failover is nearly transparent to the running computation, although any tasks held in the old scheduler’s memory will need to be re-submitted by the client. This brings us to the final, and most important, piece of the puzzle.
Phase 3: Resumable Computations with etcd
Checkpointing
Surviving cluster infrastructure failures is only half the battle. We must also survive the failure of the computation itself. For our long-running time series aggregation, we can’t afford to restart from scratch. The solution is to break the computation into smaller, idempotent chunks and persist the state after each chunk completes. etcd
serves as the durable ledger to track which chunks have been successfully processed.
Our strategy involved redesigning the Dask graph. Instead of submitting one monolithic graph, the client application acts as a stateful driver.
- Define Chunks: The total time series range is divided into discrete time windows (e.g., one hour per chunk).
- State Tracking in
etcd
: A dedicatedetcd
key,/dask/jobs/timeseries_agg/last_processed_ts
, stores the timestamp of the last successfully completed chunk. - Driver Loop: The client application starts by reading this key from
etcd
. This determines the starting point for the computation. - Transactional Updates: The client submits a Dask computation for the next chunk. When the
future.result()
returns successfully, the client writes the intermediate aggregation result to a durable store (like S3 or a shared filesystem) and then, critically, updates the progress key inetcd
within a transaction. Using a transaction ensures we don’t update the progress marker unless the previous step (saving results) was implicitly successful. - Recovery: If the client, scheduler, or cluster fails and restarts, the driver’s first action is always to read the progress key from
etcd
. It will seamlessly resume processing from the next unprocessed chunk, loading the last saved intermediate state from the durable store.
Here’s the Python code for the client-side driver that orchestrates this resumable computation.
import os
import logging
import sys
import time
from datetime import datetime, timedelta
import dask
import dask.dataframe as dd
from dask.distributed import Client, progress
import pandas as pd
import etcd3
# --- Configuration ---
ETCD_HOST = os.getenv("ETCD_HOST", "127.0.0.1")
ETCD_PORT = int(os.getenv("ETCD_PORT", 2379))
SCHEDULER_KEY = "/dask/cluster/scheduler"
JOB_ID = "timeseries_agg_v1"
PROGRESS_KEY = f"/dask/jobs/{JOB_ID}/last_processed_ts"
CHECKPOINT_PATH = os.getenv("CHECKPOINT_PATH", "/tmp/dask_checkpoints") # Use S3 in production
# --- Logging Setup ---
logging.basicConfig(
level="INFO",
format="%(asctime)s - %(levelname)s - %(message)s",
stream=sys.stdout,
)
class ResumableJobDriver:
def __init__(self, etcd_host, etcd_port, start_date, end_date, chunk_size_hours):
self.etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
self.dask_client = None
self.start_date = datetime.fromisoformat(start_date)
self.end_date = datetime.fromisoformat(end_date)
self.chunk_size = timedelta(hours=chunk_size_hours)
def _get_scheduler_address(self):
"""Finds the scheduler from etcd."""
value, _ = self.etcd_client.get(SCHEDULER_KEY)
if value is None:
raise RuntimeError("Could not find active Dask scheduler in etcd.")
return value.decode("utf-8")
def connect_to_dask(self):
"""Establishes connection to the Dask cluster."""
scheduler_address = self._get_scheduler_address()
logging.info("Connecting to Dask scheduler at %s", scheduler_address)
self.dask_client = Client(scheduler_address)
logging.info("Dask client connected. Dashboard at: %s", self.dask_client.dashboard_link)
def get_last_processed_timestamp(self):
"""Retrieves the last successful checkpoint from etcd."""
value, _ = self.etcd_client.get(PROGRESS_KEY)
if value is None:
logging.warning("No previous progress found. Starting from the beginning.")
return self.start_date
last_ts = datetime.fromisoformat(value.decode('utf-8'))
logging.info("Resuming computation from checkpoint: %s", last_ts.isoformat())
return last_ts
def update_progress(self, timestamp):
"""Atomically updates the progress key in etcd."""
logging.info("Updating progress checkpoint in etcd to %s", timestamp.isoformat())
# A real implementation might use a transaction to ensure atomicity
# if more complex state were being stored. For a single key, put is sufficient.
self.etcd_client.put(PROGRESS_KEY, timestamp.isoformat().encode('utf-8'))
def process_chunk(self, chunk_start, chunk_end, intermediate_state=None):
"""
Processes a single chunk of time series data.
This is where the actual business logic resides.
"""
logging.info("Processing chunk from %s to %s", chunk_start, chunk_end)
# In a real scenario, this would read data for the specific time range.
# For this example, we generate dummy data.
n_rows = 1_000_000
df = pd.DataFrame({
'timestamp': pd.to_datetime(pd.to_datetime(chunk_start).value + \
(pd.to_datetime(chunk_end).value - pd.to_datetime(chunk_start).value) * \
pd.np.random.rand(n_rows)),
'sensor_id': pd.np.random.randint(0, 100, size=n_rows),
'value': pd.np.random.randn(n_rows)
}).set_index('timestamp')
ddf = dd.from_pandas(df, npartitions=10)
# Example aggregation: 30-minute rolling average per sensor.
# The `min_periods` is crucial for handling the edges of chunks.
rolling_avg = ddf.groupby('sensor_id')['value'].rolling('30min', min_periods=1).mean()
# A common pitfall is forgetting to handle state across chunks.
# Here, we'd load the previous chunk's state if necessary.
if intermediate_state is not None:
logging.info("Applying intermediate state from previous chunk.")
# This logic is highly application-specific. It might involve
# concatenating data from the boundary of the last chunk to ensure
# rolling windows are calculated correctly.
# Example: final_ddf = dd.concat([intermediate_state, ddf])
pass
# Compute the result for this chunk
result_future = self.dask_client.compute(rolling_avg)
progress(result_future)
computed_result = result_future.result()
# Checkpoint the result to a durable store
checkpoint_file = os.path.join(CHECKPOINT_PATH, f"result_{chunk_start.strftime('%Y%m%d%H%M%S')}.parquet")
logging.info("Saving checkpoint to %s", checkpoint_file)
# In a real implementation, you'd use Dask's to_parquet directly on the future
# for better performance, but we demonstrate the client-side save here.
computed_result.to_frame().to_parquet(checkpoint_file, engine='pyarrow')
# This is also where you would save any state needed for the *next* chunk.
# For instance, the last 29 minutes of data to prime the next rolling window.
return computed_result
def run_job(self):
"""Main job execution loop."""
if not self.dask_client:
self.connect_to_dask()
current_ts = self.get_last_processed_timestamp()
if current_ts == self.start_date:
# First run, no intermediate state to load
intermediate_state = None
else:
# On resume, load the last checkpoint.
# This logic needs to be robust.
# intermediate_state = load_from_durable_store(...)
current_ts += self.chunk_size # Move to the next chunk to process
intermediate_state = None # Placeholder
while current_ts < self.end_date:
chunk_start_time = current_ts
chunk_end_time = current_ts + self.chunk_size
if chunk_end_time > self.end_date:
chunk_end_time = self.end_date
try:
self.process_chunk(chunk_start_time, chunk_end_time, intermediate_state)
# If processing succeeds, update the progress marker
self.update_progress(chunk_end_time)
current_ts = chunk_end_time
except Exception as e:
logging.critical("Failed to process chunk %s - %s: %s", chunk_start_time, chunk_end_time, e)
logging.critical("System will halt. Rerun the driver to resume from last checkpoint.")
# A more advanced system might have an automated retry policy here.
break
logging.info("Job finished processing all chunks up to %s", self.end_date)
if __name__ == "__main__":
# Ensure checkpoint directory exists
os.makedirs(CHECKPOINT_PATH, exist_ok=True)
driver = ResumableJobDriver(
etcd_host=ETCD_HOST,
etcd_port=ETCD_PORT,
start_date="2023-01-01T00:00:00Z",
end_date="2023-01-01T12:00:00Z",
chunk_size_hours=1
)
driver.run_job()
This driver-based approach fundamentally changes the architecture. The Dask cluster becomes a powerful but stateless execution engine, and the intelligence and state management are centralized in our resilient client application, which relies on etcd
and a durable blob store as its source of truth.
The presented architecture is not without its trade-offs. The checkpointing process introduces latency, as each chunk must be fully computed and saved before the next one can begin. The performance is directly tied to the chunk size—smaller chunks allow for faster recovery but incur more overhead from saving and loading state. A common mistake is to design the checkpointing logic without considering the state required to bridge chunks, especially for operations like rolling windows or sessionization, which can lead to incorrect results at the boundaries. Furthermore, this pattern is best suited for batch-like, sequential processing tasks. It is less applicable to complex, DAG-based workflows where tasks have multiple interdependencies. For those scenarios, a more sophisticated workflow orchestrator that integrates with Dask, like Airflow or Prefect, might be a better choice, but they often come with their own complexity and infrastructure requirements. This etcd
-based control plane provides a powerful, relatively lightweight solution for enhancing the resilience of long-running, linear computations.