Implementing a Paxos-Managed State Machine for a Distributed LlamaIndex Cluster on Alibaba Cloud


The initial deployment of our internal diagnostics RAG agent was a success. Built on LlamaIndex, it queried a vector index of our entire knowledge base—runbooks, incident post-mortems, architecture decision records—to provide instant troubleshooting advice. The problem began when we moved from a single-node proof-of-concept to a highly available, multi-node cluster on Alibaba Cloud ECS. During a rolling update to deploy a new version of the knowledge base index, we encountered a critical split-brain failure. For a 15-minute window, half our query engine pods were using index v1.1, while the newer pods were using v1.2. This resulted in two separate pods giving conflicting, and dangerously incorrect, advice for the same critical incident query. Eventual consistency was not an acceptable risk; the state of the active index across all query nodes had to be atomic.

The core challenge was distributed consensus. All active nodes needed to agree on exactly one value: the identifier of the canonical “live” index version. Our initial thoughts gravitated towards using a distributed lock manager like Redis or even a full-fledged consensus system like etcd. In a real-world project, pulling in a dependency like etcd or Zookeeper just to manage a single string value felt like operational overkill. The maintenance burden, network overhead, and additional failure domain seemed disproportionate to the problem. This led us down the path of implementing a lean, special-purpose consensus module directly within the application, based on the classic Paxos algorithm. It provided an opportunity for deep control and avoided introducing a heavy external dependency.

Our stack was already defined: OCI-compliant containers deployed on Alibaba Cloud ECS instances, with the cluster topology and software state managed declaratively by our existing Puppet infrastructure. The task was to weave a Paxos implementation into our Python-based LlamaIndex application to create a resilient, strongly consistent distributed query engine.

Architectural Blueprint

The final architecture consists of several components working in concert. A Server Load Balancer (SLB) on Alibaba Cloud distributes query traffic to a fleet of ECS instances. Each ECS instance runs an identical OCI container. Inside the container, our custom Python application serves the LlamaIndex query API. Crucially, before loading the vector index from Alibaba Cloud Object Storage Service (OSS), the application instance joins a Paxos consensus group with its peers. They execute the Paxos protocol to agree on the target index version string (e.g., "2023-10-27T10:00:00Z_v2.5.1"). Only after consensus is reached does the LlamaIndex engine load the corresponding index files from OSS. The entire cluster configuration, including the IP addresses of peer nodes, is rendered and managed by Puppet.

graph TD
    subgraph Alibaba Cloud
        SLB[Server Load Balancer] --> Node1[ECS 1: OCI Container];
        SLB --> Node2[ECS 2: OCI Container];
        SLB --> Node3[ECS 3: OCI Container];

        subgraph "Paxos Consensus Group"
            Node1 <--> Node2;
            Node2 <--> Node3;
            Node3 <--> Node1;
        end

        OSS[Object Storage Service Bucket]
        Node1 -- "Reads index data" --> OSS;
        Node2 -- "Reads index data" --> OSS;
        Node3 -- "Reads index data" --> OSS;
    end

    subgraph "Puppet Master"
        Puppet[Puppet Server] -- "Manages config for" --> Node1;
        Puppet -- "Manages config for" --> Node2;
        Puppet -- "Manages config for" --> Node3;
    end

    style Node1 fill:#f9f,stroke:#333,stroke-width:2px
    style Node2 fill:#f9f,stroke:#333,stroke-width:2px
    style Node3 fill:#f9f,stroke:#333,stroke-width:2px

The Paxos Core Implementation

Implementing Paxos from scratch is notoriously difficult to get right. For our specific use case—agreeing on a single, immutable value that changes infrequently—we can implement a simplified version of the “single-decree” Paxos protocol. The code is structured around the three roles: Proposer, Acceptor, and Learner. In our implementation, every node embodies all three roles.

The state that needs to be persisted by an Acceptor on disk is minimal: the highest promise number (promised_id) it has seen and the details of the last value it accepted (accepted_id and accepted_value). A common pitfall here is to keep this state in memory, which would violate the protocol’s safety guarantees if a node crashes and restarts. We use simple JSON files for persistence, ensuring fsync is called after every write.

This is the core network and logic module. It uses Python’s asyncio for non-blocking network I/O, which is essential for handling messages from multiple peers without blocking the main application thread.

paxos_core/consensus.py

import asyncio
import json
import logging
import os
import random
import time
import uuid
from typing import Dict, Any, Optional, List, Tuple

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
)
logger = logging.getLogger(__name__)

STATE_FILE_PATH = "/var/lib/rag_agent/paxos_state.json"
# In a real-world project, this directory must be a persistent volume.

class PaxosNode:
    """
    A single node participating in the Paxos consensus protocol.
    Each node acts as a Proposer, Acceptor, and Learner.
    """

    def __init__(self, node_id: str, peer_nodes: List[str]):
        self.node_id = node_id
        self.peer_nodes = [p for p in peer_nodes if p != self.node_id]
        self.quorum_size = len(self.peer_nodes) // 2 + 1
        
        # Proposer state
        self.proposal_id = (0, self.node_id) # (n, node_id) to ensure uniqueness
        
        # Acceptor state - this MUST be persistent
        self.promised_id = (-1, "")
        self.accepted_id = (-1, "")
        self.accepted_value = None
        self._load_state()

        # Learner state
        self.final_value = None
        self.final_consensus_achieved = False

    def _load_state(self):
        """Load Acceptor state from a persistent file."""
        if os.path.exists(STATE_FILE_PATH):
            try:
                with open(STATE_FILE_PATH, 'r') as f:
                    state = json.load(f)
                    self.promised_id = tuple(state.get('promised_id', (-1, "")))
                    self.accepted_id = tuple(state.get('accepted_id', (-1, "")))
                    self.accepted_value = state.get('accepted_value')
                    logger.info(f"Loaded persistent Paxos state: {state}")
            except (IOError, json.JSONDecodeError) as e:
                logger.error(f"Failed to load Paxos state: {e}. Starting fresh.")
                self._ensure_dir_exists()
        else:
            logger.info("No persistent Paxos state file found. Starting fresh.")
            self._ensure_dir_exists()


    def _persist_state(self):
        """Persist Acceptor state to a file, ensuring it's written to disk."""
        self._ensure_dir_exists()
        state = {
            'promised_id': self.promised_id,
            'accepted_id': self.accepted_id,
            'accepted_value': self.accepted_value,
        }
        try:
            # Atomic write pattern
            temp_path = STATE_FILE_PATH + ".tmp"
            with open(temp_path, 'w') as f:
                json.dump(state, f)
                f.flush()
                os.fsync(f.fileno()) # Critical for durability
            os.rename(temp_path, STATE_FILE_PATH)
            logger.debug(f"Persisted Paxos state: {state}")
        except IOError as e:
            logger.error(f"CRITICAL: Failed to persist Paxos state: {e}")
    
    def _ensure_dir_exists(self):
        os.makedirs(os.path.dirname(STATE_FILE_PATH), exist_ok=True)


    async def _send_message(self, target_node_ip: str, message: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Sends a message to a peer and awaits a response."""
        # This simulates a network layer. In production, this would be a robust
        # client using HTTP, gRPC, or another protocol with proper error handling.
        host, port_str = target_node_ip.split(':')
        port = int(port_str)
        try:
            reader, writer = await asyncio.wait_for(
                asyncio.open_connection(host, port),
                timeout=2.0
            )
            writer.write(json.dumps(message).encode())
            await writer.drain()

            data = await asyncio.wait_for(reader.read(4096), timeout=2.0)
            response = json.loads(data.decode())
            
            writer.close()
            await writer.wait_closed()
            return response
        except (asyncio.TimeoutError, ConnectionRefusedError, OSError) as e:
            logger.warning(f"Could not connect to peer {target_node_ip}: {e}")
            return None

    # --- Acceptor Logic ---
    def handle_prepare(self, proposal_id: Tuple[int, str]) -> Dict[str, Any]:
        """Phase 1b: Acceptor receives a prepare request."""
        if proposal_id > self.promised_id:
            self.promised_id = proposal_id
            self._persist_state()
            logger.info(f"Promised to proposal {proposal_id}. Last accepted: {self.accepted_id} with value {self.accepted_value}")
            return {"type": "promise", "promised_id": self.promised_id, "accepted_id": self.accepted_id, "accepted_value": self.accepted_value}
        else:
            logger.warning(f"Rejected prepare for {proposal_id}, already promised {self.promised_id}")
            return {"type": "nack", "promised_id": self.promised_id}

    # --- Acceptor Logic ---
    def handle_propose(self, proposal_id: Tuple[int, str], value: Any) -> Dict[str, Any]:
        """Phase 2b: Acceptor receives a propose request."""
        if proposal_id >= self.promised_id:
            self.promised_id = proposal_id
            self.accepted_id = proposal_id
            self.accepted_value = value
            self._persist_state()
            logger.info(f"Accepted proposal {proposal_id} with value '{value}'")
            return {"type": "accepted", "accepted_id": proposal_id, "value": value}
        else:
            logger.warning(f"Rejected propose for {proposal_id}, already promised {self.promised_id}")
            return {"type": "nack", "promised_id": self.promised_id}

    # --- Proposer Logic ---
    async def run_consensus(self, value_to_propose: Any) -> Optional[Any]:
        """Drives the consensus process for a given value."""
        # Use a timestamp-based proposal ID to increase chances of success
        # The node_id tie-breaks in case of identical timestamps
        self.proposal_id = (int(time.time()), self.node_id)
        
        while not self.final_consensus_achieved:
            logger.info(f"Starting new consensus round with proposal ID {self.proposal_id} for value '{value_to_propose}'")
            
            # Phase 1a: Send Prepare requests
            prepare_message = {"type": "prepare", "proposal_id": self.proposal_id}
            tasks = [self._send_message(peer, prepare_message) for peer in self.peer_nodes]
            responses = await asyncio.gather(*tasks)

            promises = [r for r in responses if r and r.get("type") == "promise"]
            
            if len(promises) < self.quorum_size:
                logger.warning(f"Quorum not achieved in prepare phase ({len(promises)}/{self.quorum_size}). Retrying after a delay.")
                # Backoff and retry with a higher proposal ID
                await asyncio.sleep(random.uniform(1, 3))
                self.proposal_id = (self.proposal_id[0] + 1, self.node_id)
                continue

            # Phase 2a: Send Propose requests
            highest_accepted_id = (-1, "")
            value_from_promises = value_to_propose
            
            for promise in promises:
                if promise['accepted_id'] > highest_accepted_id:
                    highest_accepted_id = tuple(promise['accepted_id'])
                    # If a value was already accepted by another proposer, we MUST propose that value.
                    # This is the core safety property of Paxos.
                    value_from_promises = promise['accepted_value']
                    logger.info(f"Found a previously accepted value '{value_from_promises}' from proposal {highest_accepted_id}. Adopting it.")
            
            propose_message = {"type": "propose", "proposal_id": self.proposal_id, "value": value_from_promises}
            tasks = [self._send_message(peer, propose_message) for peer in self.peer_nodes]
            responses = await asyncio.gather(*tasks)

            acceptances = [r for r in responses if r and r.get("type") == "accepted"]

            if len(acceptances) >= self.quorum_size:
                logger.info(f"CONSENSUS REACHED! Value '{value_from_promises}' was accepted by a quorum.")
                self.final_value = value_from_promises
                self.final_consensus_achieved = True
                
                # Learner phase: Inform all other nodes of the final value (optional but good practice)
                learn_message = {"type": "learn", "value": self.final_value}
                await asyncio.gather(*[self._send_message(peer, learn_message) for peer in self.peer_nodes])
                
                return self.final_value

            else:
                logger.warning(f"Quorum not achieved in accept phase ({len(acceptances)}/{self.quorum_size}). Retrying.")
                await asyncio.sleep(random.uniform(1, 3))
                self.proposal_id = (self.proposal_id[0] + 1, self.node_id)
        
        return self.final_value


async def handle_paxos_client(reader, writer, paxos_node: PaxosNode):
    """Asyncio server callback to handle incoming messages."""
    data = await reader.read(4096)
    message = json.loads(data.decode())
    addr = writer.get_extra_info('peername')
    logger.debug(f"Received from {addr}: {message}")

    msg_type = message.get("type")
    response = {}
    
    if msg_type == "prepare":
        response = paxos_node.handle_prepare(tuple(message["proposal_id"]))
    elif msg_type == "propose":
        response = paxos_node.handle_propose(tuple(message["proposal_id"]), message["value"])
    elif msg_type == "learn": # Learner logic
        paxos_node.final_value = message["value"]
        paxos_node.final_consensus_achieved = True
        logger.info(f"Learned final value '{paxos_node.final_value}' from a peer.")
        response = {"status": "ok"}
    else:
        response = {"error": "unknown message type"}

    writer.write(json.dumps(response).encode())
    await writer.drain()
    writer.close()

This code is a direct, albeit simplified, translation of the Paxos algorithm. The uniqueness of proposal IDs is guaranteed by pairing a timestamp or an incrementing number with a unique node ID. The critical safety logic lies in the Proposer, which must adopt a previously accepted value if it discovers one during the promise phase.

Integration with LlamaIndex and the Application Server

With the consensus module in place, the application’s startup logic needs to be altered. Instead of directly loading the latest index, it must first participate in a consensus round.

We create a wrapper class that orchestrates this process. It starts the Paxos network listener and then triggers the consensus-seeking process.

app/main.py

import asyncio
import logging
import os
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, StorageContext, load_index_from_storage
from paxos_core.consensus import PaxosNode, handle_paxos_client

# Setup logging
logger = logging.getLogger(__name__)

# Environment variables provided by Puppet
NODE_ID = os.environ.get("NODE_ID") # e.g., "10.0.1.10:8000"
PEER_NODES_STR = os.environ.get("PEER_NODES") # e.g., "10.0.1.10:8000,10.0.1.11:8000"
PROPOSED_INDEX_VERSION = os.environ.get("INDEX_VERSION") # The version this node wants to propose
INDEX_STORE_PATH = "/data/indices" # Path to local storage for index files from OSS
PORT = int(NODE_ID.split(':')[1])

# Global variable to hold the loaded index
query_engine = None

class RAGService:
    def __init__(self, node_id, peer_nodes):
        self.paxos_node = PaxosNode(node_id, peer_nodes)
        self.index = None

    async def initialize_service(self):
        """
        Orchestrates service startup: start network listener, achieve consensus,
        then load the correct LlamaIndex index.
        """
        logger.info(f"Node {self.paxos_node.node_id} starting up...")
        
        # Start the Paxos server to listen for messages from peers
        server = await asyncio.start_server(
            lambda r, w: handle_paxos_client(r, w, self.paxos_node),
            '0.0.0.0',
            PORT
        )
        logger.info(f"Paxos listener started on port {PORT}")

        # A small delay to allow other nodes to start their listeners.
        # In production, a more robust discovery mechanism would be better.
        await asyncio.sleep(5)

        # Trigger the consensus process.
        # All nodes will attempt to propose their configured INDEX_VERSION.
        # Paxos guarantees they will all eventually agree on one of them.
        agreed_version = await self.paxos_node.run_consensus(PROPOSED_INDEX_VERSION)

        if agreed_version:
            logger.info(f"Consensus achieved. Loading index version: {agreed_version}")
            self.load_index(agreed_version)
            global query_engine
            query_engine = self.index.as_query_engine()
            logger.info("Service is ready to accept queries.")
        else:
            logger.error("CRITICAL: Could not achieve consensus. Shutting down.")
            # In a real system, this should trigger an alert.
            # The pod should be marked as unhealthy.
            raise SystemExit("Failed to reach consensus on index version.")

        # Keep the server running
        async with server:
            await server.serve_forever()

    def load_index(self, version: str):
        """
        Loads the LlamaIndex VectorStoreIndex from a specified version.
        This function would be responsible for downloading the correct
        index files from Alibaba Cloud OSS to a local path.
        """
        # A common mistake is to not handle the case where the index files are missing.
        # The download logic must be resilient.
        
        # Step 1: Simulate downloading from OSS
        # In a real implementation, you'd use the Alibaba Cloud OSS SDK here.
        logger.info(f"Simulating download of index '{version}' from OSS...")
        version_path = os.path.join(INDEX_STORE_PATH, version)
        
        if not os.path.exists(version_path):
            logger.error(f"Index files for version '{version}' not found at {version_path}!")
            # This is a critical failure. The consensus agreed on a version
            # that doesn't exist in storage. This points to a deployment process issue.
            raise FileNotFoundError(f"Index data for {version} is missing.")

        # Step 2: Load the index using LlamaIndex
        try:
            storage_context = StorageContext.from_defaults(persist_dir=version_path)
            self.index = load_index_from_storage(storage_context)
            logger.info(f"Successfully loaded index '{version}'.")
        except Exception as e:
            logger.error(f"Failed to load LlamaIndex from path {version_path}: {e}")
            raise

# --- Entrypoint ---
async def main():
    if not all([NODE_ID, PEER_NODES_STR, PROPOSED_INDEX_VERSION]):
        raise ValueError("Missing environment variables: NODE_ID, PEER_NODES, INDEX_VERSION")
    
    peer_nodes = PEER_NODES_STR.split(',')
    service = RAGService(NODE_ID, peer_nodes)
    await service.initialize_service()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except (KeyboardInterrupt, SystemExit) as e:
        logger.info(f"Service shutting down: {e}")

The RAGService.initialize_service method is the critical orchestrator. It ensures no queries can be served until consensus is reached and the correct index is loaded into memory. This design completely eliminates the split-brain problem we initially faced.

Declarative Cluster Management with Puppet

The final piece is managing the cluster declaratively. Puppet is responsible for ensuring each ECS instance is configured correctly. This includes installing the container runtime, managing the application’s systemd service, and, most importantly, generating the configuration that feeds into the application container.

Here is a simplified Puppet manifest demonstrating this principle.

manifests/rag_agent.pp

# This class defines the entire RAG agent node configuration.
class rag_agent::node (
  String $docker_image,              # e.g., 'registry.aliyuncs.com/my-org/rag-agent:1.2.0'
  String $index_version,             # The version this node should propose, e.g., '2023-10-27T10:00:00Z_v2.5.1'
  Array[String] $cluster_peer_ips,   # An array of all node IPs in the cluster, e.g., ['10.0.1.10', '10.0.1.11', '10.0.1.12']
  Integer $paxos_port = 8000,
) {

  # Ensure Docker (or any OCI-compliant runtime) is installed and running.
  include docker

  # Pull the correct OCI image for the application.
  docker::image { $docker_image:
    ensure => present,
  }

  # Prepare persistent storage directories on the host.
  # The Paxos state file and index data must survive container restarts.
  file { ['/opt/rag_agent/data', '/opt/rag_agent/state']:
    ensure => directory,
    owner  => '1001', # The user ID inside the container
    group  => '1001',
  }

  # Generate the string of peer nodes for the environment variable.
  # Example output: "10.0.1.10:8000,10.0.1.11:8000,10.0.1.12:8000"
  $peer_nodes_str = join($cluster_peer_ips.map |$ip| { "${ip}:${paxos_port}" }, ',')

  # Run the application container using the Puppet Docker module.
  docker::run { 'rag-agent-service':
    ensure  => running,
    image   => $docker_image,
    ports   => ["${paxos_port}:${paxos_port}"],
    volumes => [
      '/opt/rag_agent/data:/data/indices:rw',
      '/opt/rag_agent/state:/var/lib/rag_agent:rw'
    ],
    env     => [
      # The node's own identity. $facts['networking']['ip'] is a Facter fact.
      "NODE_ID=${facts['networking']['ip']}:${paxos_port}",
      "PEER_NODES=${peer_nodes_str}",
      "INDEX_VERSION=${index_version}",
      # Other app config
      "LOG_LEVEL=INFO",
    ],
    restart_on_unhealthy => true,
    # A proper healthcheck is critical for orchestrators to know when the
    # service is actually ready (i.e., after consensus is reached).
    # This would typically be an HTTP endpoint exposed by the app.
  }
}

This manifest is applied to every node in the cluster. When a rolling update is performed to deploy a new index, we update the $index_version variable in our Puppet data (e.g., Hiera) and re-apply the configuration. The Puppet agent on each node will restart the container with the new environment variable. As the new containers come online, they will initiate a new Paxos round, agree on the new version, and safely switch over without any period of inconsistency.

This entire exercise reveals a crucial lesson in distributed systems design. The decision to implement a core consensus algorithm was not driven by academic curiosity, but by a pragmatic need for strong consistency where simpler solutions fell short. The resulting system is more complex internally, but its external behavior is now predictable and safe. The Paxos implementation, while simplified, is not production-ready for all scenarios. It lacks features like dynamic membership changes, which would require a more complex protocol like Multi-Paxos or Raft. Furthermore, its performance is suited for low-frequency, high-importance decisions like configuration changes, not for high-throughput transactional data. For our specific problem of atomically updating a knowledge base index across a cluster, it is a robust and dependency-light solution.


  TOC