Automating a Cross-Cloud RAG System with Weaviate on GCP and AWS Lambda via CircleCI


Our RAG prototype, born in a Jupyter Notebook, worked. It could ingest documents, embed them, and answer questions with surprising accuracy. But it ran on a single laptop, backed by a local Docker container, and was manually triggered. The path from this fragile proof-of-concept to a production-ready, automated system presented the real engineering challenge. The core problem was architectural and operational: our data ingestion and long-term storage felt most at home on Google Cloud Platform (GCP), close to our data lakes in GCS, while our existing API infrastructure and serverless expertise were firmly rooted in AWS. A multi-cloud solution was not a choice but a necessity. This is the breakdown of how we built a CI/CD pipeline using CircleCI to deploy and manage a Weaviate vector database on Google Kubernetes Engine (GKE) and a query-serving AWS Lambda function, creating a truly heterogeneous MLOps workflow.

The initial architecture was sketched on a whiteboard, aiming for a clear separation of concerns.

graph TD
    subgraph CircleCI Pipeline
        A[Git Commit] --> B{Build & Test};
        B --> C{Terraform Apply GKE};
        C --> D{Deploy Weaviate on GKE};
        D --> E{Run K8s Ingestion Job};
        B --> F{Deploy Lambda to AWS};
    end

    subgraph GCP Project
        GCS[GCS Bucket: Source Docs]
        GKE[GKE Cluster]
        Weaviate[Weaviate Pod w/ PVC]
        IngestionJob[K8s Job: ingest.py]

        GCS --> IngestionJob;
        IngestionJob -- Embeds & Writes --> Weaviate;
        CircleCI -- Manages --> GKE;
        CircleCI -- Manages --> Weaviate;
        CircleCI -- Triggers --> IngestionJob;
    end

    subgraph AWS Account
        APIGW[API Gateway]
        Lambda[Query Lambda: handler.py]

        APIGW --> Lambda;
        CircleCI -- Deploys --> Lambda;
    end

    User[End User] --> APIGW;
    Lambda -- Queries (Public LB) --> Weaviate;

This design embraces the strengths of each platform. GCP’s GKE provides a robust, managed environment for the stateful Weaviate instance, keeping it co-located with our source data. AWS Lambda offers a cost-effective, scalable, and stateless compute layer for the public-facing API. CircleCI acts as the central nervous system, orchestrating deployments across both clouds. The most significant trade-off made upfront was network connectivity; for this initial build, we opted for a public LoadBalancer for Weaviate, secured by a strong API key. In a real-world project, this would be the first thing to replace with a more secure VPC Peering or interconnect solution.

Phase 1: Establishing the Weaviate Infrastructure on GKE

Before any application code, we needed a reliable home for our vector database. Managing Kubernetes resources manually is a recipe for disaster. We committed to Infrastructure as Code using Terraform from day one.

The first step was provisioning a GKE cluster. We needed a minimal, cost-effective cluster but with enough power to handle Weaviate’s indexing and query load. A common mistake is over-provisioning at the start. We chose a regional cluster with a single node pool of e2-standard-4 machines and autoscaling enabled.

gke/main.tf

# main.tf - GKE Cluster Definition

provider "google" {
  project = var.gcp_project_id
  region  = var.gcp_region
}

resource "google_container_cluster" "primary" {
  name     = "weaviate-cluster"
  location = var.gcp_region

  # We start with a small, single-zone node pool to manage costs.
  # Production would use a regional cluster for higher availability.
  remove_default_node_pool = true
  initial_node_count       = 1

  network    = "default"
  subnetwork = "default"

  # Define workload identity to allow K8s service accounts to impersonate GCP service accounts.
  # This is crucial for secure access to other GCP services like GCS from our pods.
  workload_identity_config {
    workload_pool = "${var.gcp_project_id}.svc.id.goog"
  }
}

resource "google_container_node_pool" "primary_nodes" {
  name       = "primary-node-pool"
  location   = var.gcp_region
  cluster    = google_container_cluster.primary.name
  node_count = 1

  management {
    auto_repair  = true
    auto_upgrade = true
  }

  node_config {
    preemptible  = false
    machine_type = "e2-standard-4" # 4 vCPU, 16GB RAM is a decent starting point.

    # Granting the necessary scopes for the node service account.
    oauth_scopes = [
      "https://www.googleapis.com/auth/cloud-platform"
    ]

    # Use a GCP service account for the nodes.
    service_account = var.gke_node_service_account_email
  }

  autoscaling {
    min_node_count = 1
    max_node_count = 3
  }
}

# Output the cluster name and endpoint for kubectl configuration in CircleCI.
output "cluster_name" {
  value = google_container_cluster.primary.name
}

output "cluster_endpoint" {
  value = google_container_cluster.primary.endpoint
}

With the cluster definition in place, the next piece was the Kubernetes manifests for Weaviate itself. We needed a Deployment to manage the pod, a PersistentVolumeClaim (PVC) to ensure our vector data survives pod restarts, and a Service of type LoadBalancer to expose it to the outside world (specifically, our AWS Lambda).

k8s/weaviate-deployment.yaml

# weaviate-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: weaviate
spec:
  replicas: 1
  selector:
    matchLabels:
      app: weaviate
  template:
    metadata:
      labels:
        app: weaviate
    spec:
      containers:
      - name: weaviate
        image: semitechnologies/weaviate:1.23.7 # Pinning the version is critical for stable deployments.
        ports:
        - containerPort: 8080
        env:
        - name: QUERY_DEFAULTS_LIMIT
          value: "25"
        - name: AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED
          value: "false" # Explicitly disable anonymous access.
        - name of: AUTHENTICATION_APIKEY_ENABLED
          value: "true"
        - name: AUTHENTICATION_APIKEY_ALLOWED_KEYS
          valueFrom:
            secretKeyRef:
              name: weaviate-secrets
              key: api-key # The key will be managed by CircleCI and applied as a K8s secret.
        - name: AUTHENTICATION_APIKEY_USERS
          value: "[email protected]"
        - name: DEFAULT_VECTORIZER_MODULE
          value: "none" # We will handle embedding client-side.
        - name: ENABLE_MODULES
          value: ""
        - name: PERSISTENCE_DATA_PATH
          value: "/var/lib/weaviate"
        - name: CLUSTER_HOSTNAME
          value: "node1"
        volumeMounts:
        - name: weaviate-storage
          mountPath: /var/lib/weaviate
      volumes:
      - name: weaviate-storage
        persistentVolumeClaim:
          claimName: weaviate-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: weaviate-pvc
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 50Gi # Start with a reasonable size, but monitor usage.
---
apiVersion: v1
kind: Service
metadata:
  name: weaviate-lb
spec:
  type: LoadBalancer
  ports:
  - port: 80
    targetPort: 8080
  selector:
    app: weaviate

The pitfall here is secret management. Hardcoding API keys in YAML is unacceptable. The valueFrom directive points to a Kubernetes Secret named weaviate-secrets. This secret would be created dynamically by our CircleCI pipeline just before applying the deployment.

Phase 2: The Cross-Cloud CI/CD Orchestration with CircleCI

This is where the complexity culminates. The .circleci/config.yml file becomes the blueprint for our entire MLOps workflow. It needs to handle authentication to both GCP and AWS, manage Terraform state, build Docker images, run Kubernetes commands, and deploy serverless functions.

We leveraged CircleCI’s Orbs to simplify interactions with cloud providers. The gcp-cli, gcp-gke, aws-cli, and serverless-framework orbs were indispensable. Contexts were used to securely store credentials like GCP_SERVICE_KEY, AWS_ACCESS_KEY_ID, and the WEAVIATE_API_KEY.

.circleci/config.yml (Abridged for clarity)

# .circleci/config.yml
version: 2.1
orbs:
  gcp-cli: circleci/gcp-[email protected]
  gcp-gke: circleci/gcp-[email protected]
  aws-cli: circleci/aws-[email protected]
  serverless-framework: circleci/serverless-[email protected]

# Define reusable executor with required tools
executors:
  python-node-gcloud:
    docker:
      - image: cimg/python:3.10-node
    resource_class: medium

jobs:
  # Job to apply GCP infrastructure via Terraform
  apply_gke_infrastructure:
    executor: python-node-gcloud
    steps:
      - checkout
      - gcp-cli/install
      - run:
          name: "Authenticate with GCP"
          command: |
            echo $GCP_SERVICE_KEY > ${HOME}/gcp-key.json
            gcp-cli/initialize --key-file="${HOME}/gcp-key.json" --project-id=$GCP_PROJECT_ID
      - run:
          name: "Terraform Apply GKE Cluster"
          working_directory: ./gke
          command: |
            # Using GCS backend for Terraform state is non-negotiable in a team environment.
            terraform init -backend-config="bucket=my-tf-state-bucket"
            terraform apply -auto-approve \
              -var="gcp_project_id=${GCP_PROJECT_ID}" \
              -var="gcp_region=${GCP_REGION}"
  
  # Job to deploy Weaviate application to the GKE cluster
  deploy_weaviate_to_gke:
    executor: python-node-gcloud
    steps:
      - checkout
      - gcp-cli/install
      - run:
          name: "Authenticate with GCP"
          command: |
            echo $GCP_SERVICE_KEY > ${HOME}/gcp-key.json
            gcp-cli/initialize --key-file="${HOME}/gcp-key.json" --project-id=$GCP_PROJECT_ID
      - gcp-gke/install
      - gcp-gke/update-kubeconfig-with-credentials:
          cluster: weaviate-cluster
          region: $GCP_REGION
      - run:
          name: "Create/Update Weaviate Secret"
          command: |
            kubectl delete secret weaviate-secrets --ignore-not-found
            kubectl create secret generic weaviate-secrets --from-literal=api-key="${WEAVIATE_API_KEY}"
      - run:
          name: "Apply Weaviate K8s Manifests"
          command: kubectl apply -f k8s/
  
  # Job to deploy the AWS Lambda function
  deploy_query_lambda:
    executor: python-node-gcloud # Reusing executor
    steps:
      - checkout
      - aws-cli/setup
      - serverless-framework/setup
      - run:
          name: "Install Python Dependencies"
          command: |
            cd api/
            pip install -r requirements.txt -t .
      - run:
          name: "Deploy Lambda via Serverless Framework"
          command: |
            cd api/
            # We pass secrets as environment variables to Serverless, which should then
            # ideally use SSM or Secrets Manager for runtime configuration.
            sls deploy --stage prod

workflows:
  build-and-deploy:
    jobs:
      - apply_gke_infrastructure:
          context: cloud-creds # Context containing GCP/AWS secrets
      - deploy_weaviate_to_gke:
          requires:
            - apply_gke_infrastructure
          context: cloud-creds
      - deploy_query_lambda:
          requires:
            - apply_gke_infrastructure # Lambda depends on the infra existing
          context: cloud-creds

This configuration defines a dependency chain: infrastructure must exist before applications can be deployed. A change to the Terraform files triggers an infrastructure update. A change to the application code triggers only the relevant deployment jobs.

Phase 3: The Data Ingestion and Query Logic

The logic prototyped in Jupyter needed to be hardened into two separate, production-grade Python scripts: one for batch ingestion and one for the Lambda handler.

The ingestion script was designed to run as a Kubernetes Job. It connects to the internal Weaviate service, pulls data from GCS, generates embeddings using the sentence-transformers library, and batch-loads them into Weaviate.

ingestion/ingest.py

# ingest.py - Batch data ingestion script
import os
import logging
import weaviate
from sentence_transformers import SentenceTransformer
from google.cloud import storage

# Production-grade logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Configuration from environment variables
WEAVIATE_URL = os.environ.get("WEAVIATE_INTERNAL_URL") # e.g., http://weaviate-lb.default.svc.cluster.local:80
WEAVIATE_API_KEY = os.environ.get("WEAVIATE_API_KEY")
GCS_BUCKET = os.environ.get("GCS_BUCKET_NAME")
SOURCE_BLOB_NAME = os.environ.get("SOURCE_BLOB_NAME")
MODEL_NAME = 'all-MiniLM-L6-v2'

def download_from_gcs(bucket_name, blob_name):
    """Downloads a blob from GCS and returns its content."""
    try:
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        logging.info(f"Downloading data from gs://{bucket_name}/{blob_name}")
        return blob.download_as_text().splitlines()
    except Exception as e:
        logging.error(f"Failed to download from GCS: {e}")
        raise

def main():
    """Main ingestion logic."""
    if not all([WEAVIATE_URL, WEAVIATE_API_KEY, GCS_BUCKET, SOURCE_BLOB_NAME]):
        logging.error("Missing required environment variables.")
        return

    try:
        auth_config = weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY)
        client = weaviate.Client(url=WEAVIATE_URL, auth_client_secret=auth_config)
    except Exception as e:
        logging.error(f"Failed to connect to Weaviate: {e}")
        return

    # Schema definition - should be idempotent
    class_obj = {
        "class": "Document",
        "vectorizer": "none", # Critical, as we provide our own vectors
    }
    try:
        client.schema.create_class(class_obj)
        logging.info("Created 'Document' schema.")
    except weaviate.exceptions.UnexpectedStatusCodeException:
        logging.warning("'Document' schema already exists. Skipping creation.")

    logging.info(f"Loading sentence transformer model: {MODEL_NAME}")
    model = SentenceTransformer(MODEL_NAME)
    
    documents = download_from_gcs(GCS_BUCKET, SOURCE_BLOB_NAME)

    # Configure batching for efficient ingestion
    client.batch.configure(batch_size=100, dynamic=True)
    with client.batch as batch:
        for i, doc_text in enumerate(documents):
            if not doc_text.strip():
                continue
            
            try:
                vector = model.encode(doc_text).tolist()
                properties = {"content": doc_text}
                batch.add_data_object(
                    data_object=properties,
                    class_name="Document",
                    vector=vector
                )
                if (i + 1) % 100 == 0:
                    logging.info(f"Processed {i+1}/{len(documents)} documents.")
            except Exception as e:
                logging.error(f"Error processing document {i}: {e}")
    
    logging.info(f"Finished ingestion. Total documents processed: {len(documents)}")

if __name__ == "__main__":
    main()

The AWS Lambda handler, on the other hand, is optimized for low-latency queries. It receives a query via API Gateway, embeds it using the same model, and performs a vector search against the Weaviate instance.

api/handler.py

# handler.py - AWS Lambda function for querying Weaviate
import os
import json
import logging
import weaviate
from sentence_transformers import SentenceTransformer

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# A common mistake is initializing expensive resources on every invocation.
# We initialize the model and Weaviate client outside the handler to leverage
# Lambda's execution context reuse for better performance.
try:
    WEAVIATE_URL = os.environ['WEAVIATE_PUBLIC_URL']
    WEAVIATE_API_KEY = os.environ['WEAVIATE_API_KEY']
    MODEL_NAME = 'all-MiniLM-L6-v2'
    
    logger.info("Initializing SentenceTransformer model...")
    model = SentenceTransformer(MODEL_NAME)
    logger.info("Model initialized.")

    auth_config = weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY)
    client = weaviate.Client(url=WEAVIATE_URL, auth_client_secret=auth_config)
    logger.info(f"Weaviate client configured for {WEAVIATE_URL}")

except Exception as e:
    # This will cause subsequent invocations to fail until the container is recycled.
    logger.error(f"FATAL: Cold start initialization failed: {e}")
    model = None
    client = None

def handler(event, context):
    """API Gateway Lambda handler for vector search."""
    if not client or not model:
        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'Service is not initialized. Check logs.'})
        }
    
    try:
        body = json.loads(event.get('body', '{}'))
        query = body.get('query')
        if not query:
            return {'statusCode': 400, 'body': json.dumps({'error': 'Missing "query" in request body.'})}
    except json.JSONDecodeError:
        return {'statusCode': 400, 'body': json.dumps({'error': 'Invalid JSON in request body.'})}

    logger.info(f"Received query: {query}")

    try:
        # 1. Embed the query
        query_vector = model.encode(query).tolist()

        # 2. Perform the vector search
        response = (
            client.query
            .get("Document", ["content"])
            .with_near_vector({"vector": query_vector})
            .with_limit(3)
            .do()
        )
        
        results = response['data']['Get']['Document']
        logger.info(f"Found {len(results)} results.")

        return {
            'statusCode': 200,
            'headers': { 'Content-Type': 'application/json' },
            'body': json.dumps(results)
        }

    except Exception as e:
        logger.error(f"An error occurred during query processing: {e}", exc_info=True)
        return {
            'statusCode': 503,
            'body': json.dumps({'error': 'Failed to query the vector database.'})
        }

The serverless.yml file defines the AWS resources needed for this handler.

api/serverless.yml

# serverless.yml
service: rag-query-api

provider:
  name: aws
  runtime: python3.10
  stage: ${opt:stage, 'dev'}
  region: us-east-1
  environment:
    # In a real project, these would be populated from AWS SSM Parameter Store using ${ssm:/path/to/param} syntax
    # For this example, we pass them from CircleCI during deployment.
    WEAVIATE_PUBLIC_URL: ${env:WEAVIATE_PUBLIC_URL}
    WEAVIATE_API_KEY: ${env:WEAVIATE_API_KEY}

package:
  individually: true
  exclude:
    - "**"
  include:
    - handler.py
    - "requirements.txt"
    - ".env" # If used for local dev

functions:
  query:
    handler: handler.handler
    memorySize: 1024 # Embedding models can be memory-intensive.
    timeout: 30
    events:
      - http:
          path: /query
          method: post
          cors: true

The final result is a fully automated, cross-cloud MLOps pipeline. A git push triggers CircleCI, which provisions or updates the GKE cluster, deploys the Weaviate instance with the correct secrets, triggers a data ingestion job, and deploys the query API to AWS Lambda. It bridges the gap between data science experimentation and production reality by codifying every step of the process.

This architecture, however, is not without its limitations and next steps. The public LoadBalancer for Weaviate is a security risk that must be addressed, likely through a Cloud VPN or Interconnect between GCP and AWS. Observability is another major gap; implementing OpenTelemetry to trace a request from the user’s API call through Lambda and into the Weaviate instance on GKE would be crucial for debugging performance issues. Finally, the batch ingestion job is rudimentary. The next iteration should involve a more robust, event-driven data pipeline, perhaps using GCS event triggers and a Pub/Sub queue to process documents as they arrive, ensuring the vector database is always up-to-date.


  TOC