Implementing a Resilient Time Series Aggregation System Using Dask with an etcd-based Control Plane


The initial deployment of our time series aggregation pipeline seemed straightforward. A Dask cluster was spun up, partitioned data was read from Parquet files, and a series of rolling window calculations were performed over several terabytes of sensor data. The logic, executed on a single machine with a LocalCluster, was flawless. In staging, with a dask.distributed cluster, the system began to show its fragility. The computation, designed to run for eight hours, would fail unpredictably around the five-hour mark. A spot instance termination, a transient network partition, a memory-overloaded worker—any of these events would terminate the entire Future and force a complete restart. This wasn’t just an inconvenience; it was an operational impossibility for a production system.

Our primary pain point was the ephemeral nature of the Dask scheduler’s state. It brilliantly manages tasks, data locality, and worker communication, but it assumes a reliable cluster. When a worker is lost, tasks assigned to it are re-run. But if the scheduler itself fails, or if a critical, long-running task chain is broken, the entire computation graph’s state is gone. Simple retries at the client level are naive; they can’t recover the intermediate state of a massive aggregation. The real requirement was not just task resilience, but computation resilience. The system had to be ableto resume from the last known good state, even in the face of scheduler or multiple worker failures.

This led to the conclusion that we needed an external, highly-available control plane to manage the cluster’s lifecycle and the computation’s progress. We evaluated Zookeeper and Redis but settled on etcd. Its Raft-based consensus provides strong consistency guarantees, its watch mechanism is ideal for monitoring distributed state changes, and its lease feature provides a robust mechanism for service discovery and health checking. In a real-world project, predictability and reliability trump raw performance, and etcd‘s operational model gave us the confidence we needed. Our new architecture would use etcd for three core functions: dynamic worker registration, scheduler leader election, and—most critically—computation checkpointing.

Phase 1: Dynamic Worker Registration via etcd Leases

The default method of connecting Dask workers to a scheduler via a fixed address (tcp://scheduler:8786) is brittle. If the scheduler restarts with a new IP, workers are orphaned. Our first step was to decouple workers from the scheduler’s physical address. Instead, they would rely on a logical service name managed in etcd.

Furthermore, the scheduler needs a definitive, real-time list of healthy workers. A worker process could die without properly deregistering. We implemented a heartbeating system using etcd leases. Each worker acquires a lease and attaches it to its registration key. The worker is responsible for keeping this lease alive. If the worker crashes, the lease expires, and etcd automatically deletes the key, instantly notifying any watchers—like our scheduler—that the worker is gone.

Here is the core implementation for a Dask worker that registers itself.

import os
import sys
import time
import logging
import threading
import uuid

import etcd3
from dask.distributed import Worker
from tornado.ioloop import IOLoop

# --- Configuration ---
# In a production setup, these would come from environment variables or a config file.
ETCD_HOST = os.getenv("ETCD_HOST", "127.0.0.1")
ETCD_PORT = int(os.getenv("ETCD_PORT", 2379))
SCHEDULER_KEY = "/dask/cluster/scheduler"
WORKER_PREFIX = "/dask/cluster/workers/"
WORKER_LEASE_TTL = 10  # Seconds. Workers must heartbeat faster than this.
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()

# --- Logging Setup ---
logging.basicConfig(
    level=LOG_LEVEL,
    format="%(asctime)s - %(threadName)s - %(levelname)s - %(message)s",
    stream=sys.stdout,
)


class ResilientDaskWorker:
    """
    A Dask Worker that registers itself with etcd and maintains its presence
    via a lease, allowing for dynamic discovery and health checking.
    """

    def __init__(self, etcd_host, etcd_port):
        self.worker_id = f"worker-{uuid.uuid4()}"
        self.etcd_key = f"{WORKER_PREFIX}{self.worker_id}"
        self.dask_worker = None
        self.lease = None
        self.stop_event = threading.Event()

        try:
            self.etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
            logging.info("Successfully connected to etcd at %s:%s", etcd_host, etcd_port)
        except Exception as e:
            logging.critical("Failed to connect to etcd: %s", e)
            raise

    def _find_scheduler(self):
        """Polls etcd to find the active scheduler's address."""
        while not self.stop_event.is_set():
            try:
                value, _ = self.etcd_client.get(SCHEDULER_KEY)
                if value is not None:
                    scheduler_address = value.decode("utf-8")
                    logging.info("Found active scheduler at %s", scheduler_address)
                    return scheduler_address
            except Exception as e:
                logging.error("Error finding scheduler in etcd: %s. Retrying...", e)
            
            time.sleep(5)
        return None

    def _start_dask_worker(self, scheduler_address, loop):
        """Initializes and starts the underlying Dask worker."""
        # The Dask worker needs to run in the same event loop.
        # This is critical for Tornado-based applications like Dask.
        try:
            self.dask_worker = Worker(
                scheduler_address,
                name=self.worker_id,
                loop=loop
            )
            # Register with etcd *after* the worker is successfully initialized.
            self.lease = self.etcd_client.lease(WORKER_LEASE_TTL)
            self.etcd_client.put(self.etcd_key, self.dask_worker.address.encode('utf-8'), lease=self.lease)
            logging.info(
                "Dask worker started at %s and registered in etcd with key %s",
                self.dask_worker.address, self.etcd_key
            )
            return True
        except Exception as e:
            logging.error("Failed to start Dask worker: %s", e)
            if self.lease:
                self.lease.revoke()
            return False

    def _keep_lease_alive(self):
        """Runs in a separate thread to refresh the etcd lease."""
        logging.info("Starting lease keep-alive thread for lease ID %s", self.lease.id)
        while not self.stop_event.is_set():
            try:
                # The python-etcd3 client handles the refresh logic internally
                # when we iterate over the responses.
                for _ in self.lease.refresh():
                    if self.stop_event.is_set():
                        break
                    # The refresh interval is determined by the TTL. We sleep briefly
                    # to avoid a tight loop if the connection is lost.
                    time.sleep(WORKER_LEASE_TTL / 3) 
                logging.warning("Lease refresh loop exited.")
            except Exception as e:
                logging.error("Failed to refresh lease: %s. Worker might be deregistered.", e)
                # If lease fails, it's a critical error. We should stop.
                self.stop_event.set()
                break

    def run(self):
        """Main execution loop for the worker."""
        scheduler_address = self._find_scheduler()
        if not scheduler_address:
            logging.critical("Could not find scheduler. Shutting down.")
            return

        # Dask's worker needs to own the event loop.
        loop = IOLoop.current()

        if not self._start_dask_worker(scheduler_address, loop):
            logging.critical("Failed to initialize Dask worker. Aborting.")
            return

        # Start the heartbeating in a daemon thread.
        heartbeat_thread = threading.Thread(
            target=self._keep_lease_alive,
            name=f"etcd-heartbeat-{self.worker_id}",
            daemon=True
        )
        heartbeat_thread.start()

        try:
            # This starts the Tornado event loop and blocks until the worker is closed.
            loop.start()
        finally:
            self.shutdown()

    def shutdown(self):
        """Gracefully shuts down the worker and cleans up etcd registration."""
        logging.info("Shutting down resilient worker %s...", self.worker_id)
        self.stop_event.set()
        
        if self.lease:
            try:
                self.lease.revoke()
                logging.info("Revoked etcd lease %s", self.lease.id)
            except Exception as e:
                logging.error("Failed to revoke etcd lease during shutdown: %s", e)
        
        # The Dask worker's close method must be called from its own loop.
        if self.dask_worker and self.dask_worker.loop:
             self.dask_worker.loop.add_callback(self.dask_worker.close)

        logging.info("Shutdown complete.")

if __name__ == "__main__":
    worker_node = ResilientDaskWorker(etcd_host=ETCD_HOST, etcd_port=ETCD_PORT)
    try:
        worker_node.run()
    except KeyboardInterrupt:
        logging.info("Keyboard interrupt received.")
    finally:
        worker_node.shutdown()

The key insight here is the separation of concerns. The main thread runs the Dask/Tornado event loop, while a dedicated thread manages the etcd lease. If the heartbeating fails, the entire worker process is designed to shut down, as it’s no longer a valid member of the cluster.

Phase 2: Scheduler High Availability with Leader Election

A dynamic pool of workers is useless if the scheduler remains a single point of failure. The next logical step was to make the scheduler itself highly available. We can run multiple scheduler instances, but only one should be active at any given time. This is a classic distributed systems problem solved by leader election. etcd provides a straightforward recipe for this using its lock mechanism.

The process is as follows:

  1. Multiple scheduler candidates start up.
  2. Each candidate attempts to acquire a distributed lock on a predefined etcd key (e.g., /dask/cluster/scheduler_lock).
  3. The instance that successfully acquires the lock becomes the leader. It writes its own network address to the SCHEDULER_KEY so workers can find it.
  4. The other instances (followers) wait, periodically re-attempting to acquire the lock.
  5. If the leader fails, its etcd session will time out, releasing the lock. One of the followers will then acquire it and become the new leader.

This pattern prevents a “split-brain” scenario and ensures a single, authoritative scheduler is coordinating the cluster.

sequenceDiagram
    participant S1 as Scheduler 1 (Follower)
    participant S2 as Scheduler 2 (Follower)
    participant ETCD
    participant W1 as Worker 1
    participant W2 as Worker 2

    S1->>ETCD: Attempt to acquire lock on /dask/scheduler_lock
    ETCD-->>S1: Lock Acquired (Success)
    S1->>ETCD: Write "tcp://s1_ip:8786" to /dask/scheduler
    Note over S1: Becomes Leader

    S2->>ETCD: Attempt to acquire lock on /dask/scheduler_lock
    ETCD-->>S2: Lock Held (Fail)
    Note over S2: Enters waiting loop

    W1->>ETCD: Read /dask/scheduler
    ETCD-->>W1: "tcp://s1_ip:8786"
    W1->>S1: Connect

    W2->>ETCD: Read /dask/scheduler
    ETCD-->>W2: "tcp://s1_ip:8786"
    W2->>S1: Connect
    
    rect rgb(255, 220, 220)
        Note over S1: S1 process crashes
        S1--xETCD: Session times out, lock is released
    end

    S2->>ETCD: Attempt to acquire lock on /dask/scheduler_lock
    ETCD-->>S2: Lock Acquired (Success)
    S2->>ETCD: Write "tcp://s2_ip:8786" to /dask/scheduler
    Note over S2: Becomes New Leader
    
    W1->>ETCD: Watch on /dask/scheduler triggers
    W1->>S2: Reconnect to new leader
    W2->>ETCD: Watch on /dask/scheduler triggers
    W2->>S2: Reconnect to new leader

The workers, which are already watching the SCHEDULER_KEY, will be notified of the change and automatically reconnect to the new leader. This failover is nearly transparent to the running computation, although any tasks held in the old scheduler’s memory will need to be re-submitted by the client. This brings us to the final, and most important, piece of the puzzle.

Phase 3: Resumable Computations with etcd Checkpointing

Surviving cluster infrastructure failures is only half the battle. We must also survive the failure of the computation itself. For our long-running time series aggregation, we can’t afford to restart from scratch. The solution is to break the computation into smaller, idempotent chunks and persist the state after each chunk completes. etcd serves as the durable ledger to track which chunks have been successfully processed.

Our strategy involved redesigning the Dask graph. Instead of submitting one monolithic graph, the client application acts as a stateful driver.

  1. Define Chunks: The total time series range is divided into discrete time windows (e.g., one hour per chunk).
  2. State Tracking in etcd: A dedicated etcd key, /dask/jobs/timeseries_agg/last_processed_ts, stores the timestamp of the last successfully completed chunk.
  3. Driver Loop: The client application starts by reading this key from etcd. This determines the starting point for the computation.
  4. Transactional Updates: The client submits a Dask computation for the next chunk. When the future.result() returns successfully, the client writes the intermediate aggregation result to a durable store (like S3 or a shared filesystem) and then, critically, updates the progress key in etcd within a transaction. Using a transaction ensures we don’t update the progress marker unless the previous step (saving results) was implicitly successful.
  5. Recovery: If the client, scheduler, or cluster fails and restarts, the driver’s first action is always to read the progress key from etcd. It will seamlessly resume processing from the next unprocessed chunk, loading the last saved intermediate state from the durable store.

Here’s the Python code for the client-side driver that orchestrates this resumable computation.

import os
import logging
import sys
import time
from datetime import datetime, timedelta

import dask
import dask.dataframe as dd
from dask.distributed import Client, progress
import pandas as pd
import etcd3

# --- Configuration ---
ETCD_HOST = os.getenv("ETCD_HOST", "127.0.0.1")
ETCD_PORT = int(os.getenv("ETCD_PORT", 2379))
SCHEDULER_KEY = "/dask/cluster/scheduler"
JOB_ID = "timeseries_agg_v1"
PROGRESS_KEY = f"/dask/jobs/{JOB_ID}/last_processed_ts"
CHECKPOINT_PATH = os.getenv("CHECKPOINT_PATH", "/tmp/dask_checkpoints") # Use S3 in production

# --- Logging Setup ---
logging.basicConfig(
    level="INFO",
    format="%(asctime)s - %(levelname)s - %(message)s",
    stream=sys.stdout,
)

class ResumableJobDriver:
    def __init__(self, etcd_host, etcd_port, start_date, end_date, chunk_size_hours):
        self.etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
        self.dask_client = None
        self.start_date = datetime.fromisoformat(start_date)
        self.end_date = datetime.fromisoformat(end_date)
        self.chunk_size = timedelta(hours=chunk_size_hours)

    def _get_scheduler_address(self):
        """Finds the scheduler from etcd."""
        value, _ = self.etcd_client.get(SCHEDULER_KEY)
        if value is None:
            raise RuntimeError("Could not find active Dask scheduler in etcd.")
        return value.decode("utf-8")

    def connect_to_dask(self):
        """Establishes connection to the Dask cluster."""
        scheduler_address = self._get_scheduler_address()
        logging.info("Connecting to Dask scheduler at %s", scheduler_address)
        self.dask_client = Client(scheduler_address)
        logging.info("Dask client connected. Dashboard at: %s", self.dask_client.dashboard_link)

    def get_last_processed_timestamp(self):
        """Retrieves the last successful checkpoint from etcd."""
        value, _ = self.etcd_client.get(PROGRESS_KEY)
        if value is None:
            logging.warning("No previous progress found. Starting from the beginning.")
            return self.start_date
        
        last_ts = datetime.fromisoformat(value.decode('utf-8'))
        logging.info("Resuming computation from checkpoint: %s", last_ts.isoformat())
        return last_ts

    def update_progress(self, timestamp):
        """Atomically updates the progress key in etcd."""
        logging.info("Updating progress checkpoint in etcd to %s", timestamp.isoformat())
        # A real implementation might use a transaction to ensure atomicity
        # if more complex state were being stored. For a single key, put is sufficient.
        self.etcd_client.put(PROGRESS_KEY, timestamp.isoformat().encode('utf-8'))

    def process_chunk(self, chunk_start, chunk_end, intermediate_state=None):
        """
        Processes a single chunk of time series data.
        This is where the actual business logic resides.
        """
        logging.info("Processing chunk from %s to %s", chunk_start, chunk_end)

        # In a real scenario, this would read data for the specific time range.
        # For this example, we generate dummy data.
        n_rows = 1_000_000
        df = pd.DataFrame({
            'timestamp': pd.to_datetime(pd.to_datetime(chunk_start).value + \
                (pd.to_datetime(chunk_end).value - pd.to_datetime(chunk_start).value) * \
                pd.np.random.rand(n_rows)),
            'sensor_id': pd.np.random.randint(0, 100, size=n_rows),
            'value': pd.np.random.randn(n_rows)
        }).set_index('timestamp')

        ddf = dd.from_pandas(df, npartitions=10)

        # Example aggregation: 30-minute rolling average per sensor.
        # The `min_periods` is crucial for handling the edges of chunks.
        rolling_avg = ddf.groupby('sensor_id')['value'].rolling('30min', min_periods=1).mean()
        
        # A common pitfall is forgetting to handle state across chunks.
        # Here, we'd load the previous chunk's state if necessary.
        if intermediate_state is not None:
            logging.info("Applying intermediate state from previous chunk.")
            # This logic is highly application-specific. It might involve
            # concatenating data from the boundary of the last chunk to ensure
            # rolling windows are calculated correctly.
            # Example: final_ddf = dd.concat([intermediate_state, ddf])
            pass

        # Compute the result for this chunk
        result_future = self.dask_client.compute(rolling_avg)
        progress(result_future)
        computed_result = result_future.result()

        # Checkpoint the result to a durable store
        checkpoint_file = os.path.join(CHECKPOINT_PATH, f"result_{chunk_start.strftime('%Y%m%d%H%M%S')}.parquet")
        logging.info("Saving checkpoint to %s", checkpoint_file)
        # In a real implementation, you'd use Dask's to_parquet directly on the future
        # for better performance, but we demonstrate the client-side save here.
        computed_result.to_frame().to_parquet(checkpoint_file, engine='pyarrow')
        
        # This is also where you would save any state needed for the *next* chunk.
        # For instance, the last 29 minutes of data to prime the next rolling window.
        
        return computed_result


    def run_job(self):
        """Main job execution loop."""
        if not self.dask_client:
            self.connect_to_dask()

        current_ts = self.get_last_processed_timestamp()
        if current_ts == self.start_date:
            # First run, no intermediate state to load
            intermediate_state = None
        else:
            # On resume, load the last checkpoint.
            # This logic needs to be robust.
            # intermediate_state = load_from_durable_store(...)
            current_ts += self.chunk_size # Move to the next chunk to process
            intermediate_state = None # Placeholder

        while current_ts < self.end_date:
            chunk_start_time = current_ts
            chunk_end_time = current_ts + self.chunk_size
            if chunk_end_time > self.end_date:
                chunk_end_time = self.end_date
            
            try:
                self.process_chunk(chunk_start_time, chunk_end_time, intermediate_state)
                # If processing succeeds, update the progress marker
                self.update_progress(chunk_end_time)
                current_ts = chunk_end_time
            except Exception as e:
                logging.critical("Failed to process chunk %s - %s: %s", chunk_start_time, chunk_end_time, e)
                logging.critical("System will halt. Rerun the driver to resume from last checkpoint.")
                # A more advanced system might have an automated retry policy here.
                break

        logging.info("Job finished processing all chunks up to %s", self.end_date)


if __name__ == "__main__":
    # Ensure checkpoint directory exists
    os.makedirs(CHECKPOINT_PATH, exist_ok=True)
    
    driver = ResumableJobDriver(
        etcd_host=ETCD_HOST,
        etcd_port=ETCD_PORT,
        start_date="2023-01-01T00:00:00Z",
        end_date="2023-01-01T12:00:00Z",
        chunk_size_hours=1
    )
    driver.run_job()

This driver-based approach fundamentally changes the architecture. The Dask cluster becomes a powerful but stateless execution engine, and the intelligence and state management are centralized in our resilient client application, which relies on etcd and a durable blob store as its source of truth.

The presented architecture is not without its trade-offs. The checkpointing process introduces latency, as each chunk must be fully computed and saved before the next one can begin. The performance is directly tied to the chunk size—smaller chunks allow for faster recovery but incur more overhead from saving and loading state. A common mistake is to design the checkpointing logic without considering the state required to bridge chunks, especially for operations like rolling windows or sessionization, which can lead to incorrect results at the boundaries. Furthermore, this pattern is best suited for batch-like, sequential processing tasks. It is less applicable to complex, DAG-based workflows where tasks have multiple interdependencies. For those scenarios, a more sophisticated workflow orchestrator that integrates with Dask, like Airflow or Prefect, might be a better choice, but they often come with their own complexity and infrastructure requirements. This etcd-based control plane provides a powerful, relatively lightweight solution for enhancing the resilience of long-running, linear computations.


  TOC