Building a Zero-Downtime Milvus Index Promotion Pipeline with MLflow and CircleCI


The initial deployment was simple enough. A single embedding model, a single Milvus collection, and a cron job to refresh the index weekly. This worked for about three months. Then the first model update request came through. The process was a catastrophic failure. We took the service endpoint down, dropped the old collection, created a new one, spent six hours re-indexing 100 million vectors, and brought the service back up. That was eight hours of downtime for a critical feature. The second model update involved a hot-swap that led to a period of mixed results, where queries would return vectors from both the old and new models, poisoning recommendations for hours. It became painfully clear our manual, ad-hoc process for promoting new embedding models into our vector search system was an active liability.

The core of the problem is that an embedding model and the vector index it produces are a tightly coupled pair. Deploying a new model without its corresponding index is useless. Deploying a new index without the correct model version querying it is equally disastrous. We needed to treat the “Model-Index Pair” as a single, atomic, versioned artifact. The goal became to build a fully automated, zero-downtime promotion pipeline. When a data scientist promotes a model version from “Development” to “Staging” in our MLflow registry, a process must kick off to build a corresponding Milvus index. After validation, a single, manual approval in CircleCI should atomically promote that new index into production by swapping an alias, with zero service interruption. This is the build log for that MLOps “Kit”.

The Architectural Blueprint: Model-Index Atomicity

Our technology stack was already in place: MLflow for model lifecycle management, Milvus as our vector database, and CircleCI for CI/CD. The challenge was weaving them together into a coherent, resilient system.

The central concept we adopted was using Milvus aliases as our abstraction for production traffic. Instead of applications querying a specific collection name like product_embeddings_v1_0_2, they would query a stable alias, product_embeddings_prod. Our CI/CD pipeline’s ultimate job would be to atomically re-point this alias from an old, stable collection to a new, validated one.

This leads to a clear workflow:

  1. Trigger: A model is transitioned to the “Staging” stage in the MLflow Model Registry.
  2. Build: A CircleCI pipeline triggers. It fetches the new “Staging” model from MLflow.
  3. Index Creation: The pipeline creates a new, version-stamped Milvus collection (e.g., product_embeddings_v1_1_0_build_af8c19).
  4. Re-Indexing: It pulls the source data (e.g., product catalog from a data warehouse) and uses the new model to generate and insert all vectors into this new collection.
  5. Validation: An automated job runs a set of “golden queries” against the new collection to ensure its performance and accuracy meet baseline standards. This is a critical quality gate.
  6. Manual Gate: The workflow pauses for a manual approval within the CircleCI UI. This gives a human final sign-off before impacting production.
  7. Promotion: Upon approval, a job executes the atomic alias switch in Milvus. It changes product_embeddings_prod to point to the new collection.
  8. Cleanup: The old collection is marked for deprecation but not immediately deleted, allowing for an instantaneous rollback by simply switching the alias back if a problem is discovered post-release.

Here is a Mermaid diagram visualizing this flow within the CircleCI context.

graph TD
    A[MLflow Webhook: Model to 'Staging'] --> B{CircleCI Workflow Starts};
    B --> C[Job: Setup Environment & Fetch Model];
    C --> D[Job: Create New Milvus Collection];
    D --> E[Job: Parallel Re-indexing];
    E --> F[Job: Validate New Index];
    F --> G{Hold: Manual Approval Gate};
    G -- Approve --> H[Job: Promote Alias to Production];
    H --> I[Job: Mark Old Collection for Cleanup];
    G -- Deny --> J[Job: Cleanup Failed Build];

The “Kit” itself is a collection of Python scripts and a .circleci/config.yml file, forming a reusable pattern for any vector search service we build in the future.

The MLOps Kit: Code and Configuration

Let’s break down the implementation. The project is structured as follows:

.
├── .circleci
│   └── config.yml
├── scripts
│   ├── __init__.py
│   ├── milvus_manager.py
│   ├── mlflow_handler.py
│   ├── validator.py
│   └── main.py
└── requirements.txt

1. Configuration and Secrets Management

A common mistake is hardcoding connection details. In a real-world project, this is unacceptable. CircleCI provides Contexts for securely storing environment variables. Our pipeline requires:

  • MLFLOW_TRACKING_URI: The URL for our MLflow server.
  • MILVUS_URI: The connection URI for Milvus.
  • MILVUS_TOKEN: Authentication token for Milvus.
  • AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY: For fetching source data from S3.

The Python scripts are designed to read these from the environment.

2. The MLflow Handler (scripts/mlflow_handler.py)

This script is responsible for interacting with the MLflow registry. Its primary job is to download the model that has just been promoted to “Staging”.

# scripts/mlflow_handler.py
import os
import logging
import mlflow
from mlflow.tracking import MlflowClient

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class MLflowHandler:
    """
    Handles interactions with the MLflow Model Registry.
    """
    def __init__(self, tracking_uri: str, model_name: str):
        if not tracking_uri:
            raise ValueError("MLFLOW_TRACKING_URI cannot be None or empty.")
        self.model_name = model_name
        mlflow.set_tracking_uri(tracking_uri)
        self.client = MlflowClient()

    def get_latest_staging_model(self) -> tuple[str, str]:
        """
        Fetches the latest model version in the 'Staging' stage.

        Returns:
            A tuple containing the model version string and the local path to the downloaded model.
        """
        try:
            latest_versions = self.client.get_latest_versions(self.model_name, stages=["Staging"])
            if not latest_versions:
                logging.error(f"No model versions found in 'Staging' for model '{self.model_name}'.")
                raise RuntimeError(f"No staging model for {self.model_name}")

            model_version_info = latest_versions[0]
            version = model_version_info.version
            model_uri = f"models:/{self.model_name}/{version}"
            
            logging.info(f"Found latest staging model: version {version}. Downloading from {model_uri}...")
            
            # This downloads the model artifacts to a local path
            local_path = mlflow.artifacts.download_artifacts(model_uri)
            
            logging.info(f"Model version {version} downloaded successfully to {local_path}.")
            return version, local_path

        except Exception as e:
            logging.error(f"Failed to get or download staging model: {e}", exc_info=True)
            raise

if __name__ == '__main__':
    # Example usage for local testing
    handler = MLflowHandler(
        tracking_uri=os.getenv("MLFLOW_TRACKING_URI"),
        model_name="product-embedding-model"
    )
    model_version, path = handler.get_latest_staging_model()
    print(f"Downloaded model version {model_version} to {path}")

The key here is resilience. We check if a staging model even exists and raise a clear error if not. The function returns both the version number and the path, as the version string is crucial for naming our new Milvus collection, ensuring traceability.

3. The Milvus Manager (scripts/milvus_manager.py)

This is the heart of the operation, managing the lifecycle of Milvus collections and aliases. It needs to be robust, handling connection errors and providing clear logging.

# scripts/milvus_manager.py
import os
import logging
from pymilvus import (
    connections, utility, Collection, FieldSchema, CollectionSchema, DataType
)

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class MilvusManager:
    """
    Manages connections, collections, and aliases in Milvus.
    Production-grade code must include extensive error handling and retries.
    """
    def __init__(self, uri: str, token: str):
        self.uri = uri
        self.token = token
        self._connect()

    def _connect(self):
        try:
            logging.info(f"Connecting to Milvus at {self.uri}...")
            connections.connect("default", uri=self.uri, token=self.token)
            logging.info("Milvus connection successful.")
        except Exception as e:
            logging.error(f"Failed to connect to Milvus: {e}", exc_info=True)
            raise

    def create_collection(self, collection_name: str, dim: int) -> Collection:
        """
        Creates a new collection with a predefined schema and index.
        The pitfall here is not defining a proper schema or index upfront.
        """
        if utility.has_collection(collection_name):
            logging.warning(f"Collection '{collection_name}' already exists. Re-creating.")
            utility.drop_collection(collection_name)

        fields = [
            FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=False),
            FieldSchema(name="doc_id", dtype=DataType.VARCHAR, max_length=256),
            FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
        ]
        schema = CollectionSchema(fields, f"Collection for model versioning: {collection_name}")
        
        logging.info(f"Creating collection '{collection_name}' with dimension {dim}.")
        collection = Collection(collection_name, schema)

        # A common mistake is forgetting to create an index before loading data.
        # This leads to terrible query performance.
        index_params = {
            "metric_type": "L2",
            "index_type": "IVF_FLAT",
            "params": {"nlist": 1024}
        }
        collection.create_index(field_name="embeddings", index_params=index_params)
        logging.info(f"Created IVF_FLAT index on '{collection_name}'.")
        return collection

    def switch_alias(self, alias: str, new_collection_name: str):
        """
        Atomically points an alias to a new collection.
        This is the core of the zero-downtime swap.
        """
        try:
            logging.info(f"Switching alias '{alias}' to point to '{new_collection_name}'.")
            
            # Check if the new collection exists and is loaded
            if not utility.has_collection(new_collection_name):
                raise ValueError(f"Target collection '{new_collection_name}' does not exist.")
            
            utility.alter_alias(alias, new_collection_name)
            logging.info(f"Successfully switched alias '{alias}' to '{new_collection_name}'.")
        except Exception as e:
            logging.error(f"Failed to switch alias '{alias}': {e}", exc_info=True)
            raise

    def get_collection_for_alias(self, alias: str) -> str | None:
        """
        Finds which collection an alias currently points to. Useful for cleanup.
        """
        try:
            aliases_info = utility.list_aliases()
            for a_info in aliases_info:
                if a_info.alias == alias:
                    return a_info.collection_name
            return None
        except Exception as e:
            logging.error(f"Could not retrieve collection for alias '{alias}': {e}")
            return None

    def drop_collection(self, collection_name: str):
        if utility.has_collection(collection_name):
            logging.info(f"Dropping collection '{collection_name}'.")
            utility.drop_collection(collection_name)
            logging.info(f"Collection '{collection_name}' dropped.")

The switch_alias method is the critical piece. Milvus’s alter_alias is an atomic operation on the server side, which is exactly what we need for a seamless transition.

4. The Main Orchestration Script (scripts/main.py)

This script ties everything together. It’s designed to be called from CircleCI with arguments specifying the action to perform (build_and_validate, promote, cleanup). This modularity makes the CircleCI configuration much cleaner.

# scripts/main.py
import os
import sys
import logging
import time
import pandas as pd
import numpy as np
from milvus_manager import MilvusManager
from mlflow_handler import MLflowHandler
from validator import GoldenQueryValidator

# Assume an embedding model loader utility exists
# For this example, we'll mock it.
def load_embedding_model(path):
    # In a real project, this would load a PyTorch, TF, or ONNX model.
    logging.info(f"Mock loading model from {path}. It will generate random vectors.")
    class MockModel:
        def encode(self, data, batch_size=32):
            num_samples = len(data)
            return np.random.rand(num_samples, 768).astype('float32')
    return MockModel()


def get_source_data() -> pd.DataFrame:
    # In a real project, this would pull from a database or data lake.
    logging.info("Fetching mock source data...")
    data = {
        'pk': list(range(1000)),
        'doc_id': [f'doc_{i}' for i in range(1000)],
        'text': ['sample text to embed'] * 1000
    }
    return pd.DataFrame(data)

def run_indexing(milvus_mgr: MilvusManager, collection_name: str, model_path: str):
    """
    The main data ingestion and indexing logic.
    """
    model = load_embedding_model(model_path)
    source_df = get_source_data()
    collection = milvus_mgr.create_collection(collection_name, dim=768)

    batch_size = 500
    total_inserted = 0
    start_time = time.time()

    for i in range(0, len(source_df), batch_size):
        batch_df = source_df.iloc[i:i+batch_size]
        embeddings = model.encode(batch_df['text'].tolist())
        
        entities = [
            batch_df['pk'].tolist(),
            batch_df['doc_id'].tolist(),
            embeddings
        ]
        
        try:
            mr = collection.insert(entities)
            total_inserted += mr.insert_count
            logging.info(f"Inserted batch {i//batch_size + 1}, total inserted: {total_inserted}")
        except Exception as e:
            logging.error(f"Failed to insert batch: {e}", exc_info=True)
            raise
    
    # Crucial step: flush data to disk and load the collection into memory for searching.
    logging.info("Flushing collection...")
    collection.flush()
    logging.info(f"Total time for indexing: {time.time() - start_time:.2f}s")
    
    logging.info(f"Loading collection '{collection_name}' into memory...")
    collection.load()
    logging.info("Collection loaded.")


def build_and_validate(model_name: str, new_collection_name: str):
    """
    Orchestrates downloading the model, creating a new index, and validating it.
    """
    mlflow_uri = os.getenv("MLFLOW_TRACKING_URI")
    milvus_uri = os.getenv("MILVUS_URI")
    milvus_token = os.getenv("MILVUS_TOKEN")

    mlflow_handler = MLflowHandler(mlflow_uri, model_name)
    milvus_mgr = MilvusManager(milvus_uri, milvus_token)
    
    model_version, model_path = mlflow_handler.get_latest_staging_model()
    
    run_indexing(milvus_mgr, new_collection_name, model_path)

    # Validation Step
    validator = GoldenQueryValidator(milvus_uri, milvus_token)
    is_valid = validator.run_validation(new_collection_name)

    if not is_valid:
        logging.error("Validation failed for new collection. Aborting.")
        # Perform cleanup of the failed build
        milvus_mgr.drop_collection(new_collection_name)
        sys.exit(1)
        
    logging.info("Validation successful. The new collection is ready for promotion.")


def promote(alias: str, new_collection_name: str):
    """
    Promotes the new collection by switching the alias.
    """
    milvus_uri = os.getenv("MILVUS_URI")
    milvus_token = os.getenv("MILVUS_TOKEN")
    milvus_mgr = MilvusManager(milvus_uri, milvus_token)
    
    milvus_mgr.switch_alias(alias, new_collection_name)
    logging.info(f"Promotion complete. Alias '{alias}' now points to '{new_collection_name}'.")


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: python main.py <command> [args]")
        sys.exit(1)

    command = sys.argv[1]
    
    MODEL_NAME = "product-embedding-model"
    PROD_ALIAS = "product_embeddings_prod"
    # The build ID is crucial for uniquely identifying artifacts from a specific CI run.
    BUILD_ID = os.getenv("CIRCLE_WORKFLOW_ID", "local_test")
    
    # We construct a unique, traceable collection name
    MLFLOW_MODEL_VERSION = "v_placeholder" # This would be fetched
    NEW_COLLECTION_NAME = f"{MODEL_NAME.replace('-', '_')}_{MLFLOW_MODEL_VERSION}_{BUILD_ID}"


    if command == "build_and_validate":
        # In a real CI job, we'd fetch the version and pass it
        # For now, let's simulate fetching it first.
        mlflow_handler = MLflowHandler(os.getenv("MLFLOW_TRACKING_URI"), MODEL_NAME)
        model_version, _ = mlflow_handler.get_latest_staging_model()
        
        safe_model_version = model_version.replace('.', '_')
        collection_name = f"{MODEL_NAME.replace('-', '_')}_v{safe_model_version}_{BUILD_ID}"
        
        build_and_validate(MODEL_NAME, collection_name)

    elif command == "promote":
        if len(sys.argv) != 3:
            print("Usage: python main.py promote <collection_name>")
            sys.exit(1)
        collection_to_promote = sys.argv[2]
        promote(PROD_ALIAS, collection_to_promote)
        
    else:
        print(f"Unknown command: {command}")
        sys.exit(1)

This script is the entry point for CircleCI jobs. It uses environment variables provided by CircleCI, like CIRCLE_WORKFLOW_ID, to create unique names for the Milvus collections. This prevents collisions and makes debugging failed builds much easier.

5. The CircleCI Configuration (.circleci/config.yml)

This YAML file defines the entire workflow, orchestrating the Python scripts. We use CircleCI features like workspaces to pass the generated collection name between jobs.

# .circleci/config.yml
version: 2.1

orbs:
  python: circleci/[email protected]

jobs:
  build_index:
    executor: python/default
    steps:
      - checkout
      - python/install-packages:
          pkg-manager: pip
          pip-dependency-file: requirements.txt
      - run:
          name: Set Environment Variables
          command: |
            # In a real scenario, the version would be dynamically determined.
            # Here we are simulating it. The main.py script does the actual lookup.
            echo "export MODEL_NAME='product-embedding-model'" >> $BASH_ENV
            echo "export PROD_ALIAS='product_embeddings_prod'" >> $BASH_ENV
            echo "export NEW_COLLECTION_NAME_RAW=\"\$(python -c 'from scripts.mlflow_handler import MLflowHandler; import os; h = MLflowHandler(os.getenv(\"MLFLOW_TRACKING_URI\"), \"product-embedding-model\"); v, _ = h.get_latest_staging_model(); print(v)')\"" >> $BASH_ENV
            source $BASH_ENV
            
            SAFE_VERSION=$(echo $NEW_COLLECTION_NAME_RAW | tr '.' '_')
            export FINAL_COLLECTION_NAME="${MODEL_NAME//-/_}_v${SAFE_VERSION}_${CIRCLE_WORKFLOW_ID}"
            echo "export FINAL_COLLECTION_NAME=${FINAL_COLLECTION_NAME}" >> $BASH_ENV
            
            echo "Generated Collection Name: ${FINAL_COLLECTION_NAME}"
            echo ${FINAL_COLLECTION_NAME} > /tmp/collection_name.txt
      - run:
          name: Build and Validate New Milvus Index
          command: |
            source $BASH_ENV
            python scripts/main.py build_and_validate
      - persist_to_workspace:
          root: /tmp
          paths:
            - collection_name.txt

  promote_to_production:
    executor: python/default
    steps:
      - checkout
      - python/install-packages:
          pkg-manager: pip
          pip-dependency-file: requirements.txt
      - attach_workspace:
          at: /tmp/workspace
      - run:
          name: Promote Index by Switching Alias
          command: |
            COLLECTION_TO_PROMOTE=$(cat /tmp/workspace/collection_name.txt)
            if [ -z "$COLLECTION_TO_PROMOTE" ]; then
              echo "Error: Collection name not found in workspace."
              exit 1
            fi
            echo "Promoting collection: ${COLLECTION_TO_PROMOTE}"
            python scripts/main.py promote ${COLLECTION_TO_PROMOTE}

workflows:
  version: 2
  build-and-deploy-model-index:
    jobs:
      - build_index:
          context: my-mlops-secrets # Context storing MLFLOW_URI, MILVUS_URI etc.
      - hold_for_approval:
          type: approval
          requires:
            - build_index
      - promote_to_production:
          context: my-mlops-secrets
          requires:
            - hold_for_approval

The key parts of this CircleCI config are:

  • Contexts: my-mlops-secrets safely injects our credentials.
  • Workspace Persistence: The build_index job calculates the unique collection name and saves it to a file (/tmp/collection_name.txt). persist_to_workspace makes this file available to downstream jobs. The promote_to_production job then uses attach_workspace to read this file and know which collection to promote. This is how state is passed between jobs in a workflow.
  • Approval Job: The hold_for_approval job type is a native CircleCI feature that pauses the workflow until a user clicks “Approve” in the UI, serving as our manual gate.

Lingering Issues and Future Iterations

This pipeline solved our immediate problem of unsafe, manual deployments. It guarantees that what we validate is what goes to production, and the switch is instantaneous for users. However, it’s not a perfect system.

First, the re-indexing process is still a “full build”. For datasets with billions of vectors, this can be incredibly time-consuming and computationally expensive. A future iteration would explore delta-indexing, where we only compute and insert embeddings for new or changed source data. This, however, introduces significant complexity in managing data consistency.

Second, our validation is based on a static set of “golden queries”. This can catch regressions but may not catch more subtle semantic drifts in the new model. A more advanced validation step could involve shadow-deploying the new model-index pair, routing a small percentage of live traffic to it, and comparing its performance metrics (like click-through rate) against the existing production version before making the full switch.

Finally, the cleanup strategy is primitive. We mark the old collection but rely on a separate, manual process to eventually delete it. A robust system would have an automated garbage collection policy, for example, deleting any non-aliased collection older than 7 days, while ensuring at least one previous version is always retained for immediate rollback. This prevents orphaned collections from consuming expensive memory resources indefinitely.


  TOC