Automating Time Series Vector Embedding Pipelines with Tekton and Weaviate under Phoenix Observability


The operational bottleneck wasn’t model training; it was the deployment and validation of time-series embeddings for our anomaly detection system. Every update was a brittle, multi-step manual process involving Python scripts, kubectl apply, and a great deal of hope. When a new set of vector embeddings caused a spike in query latency or a drop in detection accuracy, backtracking to the root cause—a specific data preprocessing run or a faulty embedding model—was a painful forensic exercise. The core problem was a complete lack of observability connecting the CI/CD pipeline that built the embeddings to the production system that served them. We were flying blind.

Our initial concept was to build a fully automated, GitOps-driven system. A git push to our model repository should trigger a resilient pipeline that ingests raw time-series data, generates vector representations of temporal patterns, loads them into a vector database, and makes them available for querying. The non-negotiable requirement was end-to-end traceability. I needed to be able to select a single slow query in production and trace it back to the specific Tekton PipelineRun and Git commit that produced the underlying data.

This led to a specific technology stack. For CI/CD, Tekton was the obvious choice for its Kubernetes-native, declarative nature. It avoids the impedance mismatch of forcing traditional CI systems into a cloud-native world. For the vector store, Weaviate was selected over simpler alternatives due to its robust metadata storage and filtering capabilities, which are critical for time-series workloads. We needed to associate each vector with a model version, a timestamp range, and sensor IDs. Finally, for observability, we chose Phoenix. Standard APM tools are excellent for tracing RPC calls between microservices but are largely unaware of the data flow within asynchronous, containerized batch jobs like a Tekton pipeline. Phoenix, with its focus on ML observability, gave us the tooling to inject context and trace a data lineage from source to production service.

The architecture we settled on can be visualized as a continuous loop.

graph TD
    A[Developer pushes model update to Git] --> B{Tekton EventListener};
    B --> C[Trigger Tekton PipelineRun];
    C --> D{Pipeline};

    subgraph Pipeline
        direction LR
        D1[1. Clone Repo] --> D2[2. Fetch Time Series Data];
        D2 --> D3[3. Generate Embeddings];
        D3 --> D4[4. Load to Weaviate];
        D4 --> D5[5. Validate Deployment];
    end

    D -- Traces sent to --> E[Phoenix Collector];

    F[Anomaly Detection Service] -- Queries --> G[Weaviate Cluster];
    G -- Serves Embeddings --> F;
    F -- Traces sent to --> E;
    A -.-> F;

    style D1 fill:#cde4ff
    style D2 fill:#cde4ff
    style D3 fill:#cde4ff
    style D4 fill:#cde4ff
    style D5 fill:#cde4ff

This post-mortem details the implementation of this system, focusing on the configuration of Tekton tasks, the core Python logic for data processing and instrumentation, and the practical challenges encountered.

Defining the CI/CD Contract with Tekton

The foundation of the system is a set of declarative Tekton Tasks and a Pipeline that orchestrates them. A common mistake is to create monolithic tasks. In a real-world project, it’s far more maintainable to create smaller, reusable tasks, each with a single responsibility. This mirrors the single responsibility principle in software design.

Our first Task is responsible for fetching the raw time-series data. In this case, from a GCS bucket. Note the use of workspaces for sharing data between tasks and params for configuration.

# tekton/tasks/fetch-data.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: fetch-timeseries-data
spec:
  description: >-
    Fetches raw time series data from a specified GCS bucket
    and places it into the workspace.
  workspaces:
    - name: source
      description: The workspace where fetched data will be stored.
  params:
    - name: gcs-bucket-uri
      description: The full GCS URI of the source data file (e.g., gs://my-bucket/data.csv).
      type: string
    - name of the service account to use
      name: gcp-service-account
      description: The GCP service account for authentication.
      type: string
  steps:
    - name: gcs-fetch
      image: google/cloud-sdk:slim
      script: |
        #!/bin/sh
        set -e
        echo "Authenticating with GCP..."
        gcloud auth activate-service-account --key-file=/var/secrets/gcp/key.json
        
        echo "Fetching data from $(params.gcs-bucket-uri) to $(workspaces.source.path)/timeseries.csv"
        gsutil cp $(params.gcs-bucket-uri) $(workspaces.source.path)/timeseries.csv
        
        echo "Data fetch complete."
      volumeMounts:
        - name: gcp-credentials
          mountPath: /var/secrets/gcp
          readOnly: true
  volumes:
    - name: gcp-credentials
      secret:
        secretName: $(params.gcp-service-account)

The most complex Task is generate-embeddings. This is where the core logic resides. The task takes raw data from the workspace, processes it, and outputs serialized vector embeddings. It’s critical to inject environment variables for observability and configuration.

# tekton/tasks/generate-embeddings.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: generate-embeddings
spec:
  workspaces:
    - name: data
      description: The workspace containing input data and for storing output embeddings.
  params:
    - name: source-file
      description: The name of the input CSV file.
      type: string
      default: "timeseries.csv"
    - name: output-file
      description: The name of the output file for embeddings.
      type: string
      default: "embeddings.pkl"
    - name: model-version
      description: The version of the embedding model being used.
      type: string
    - name: phoenix-endpoint
      description: The endpoint for the Phoenix OTLP collector.
      type: string
    # Tekton injects these automatically, useful for tracing
    - name: pipeline-run-name
      description: The name of the parent PipelineRun.
      type: string
      default: "unknown-pipelinerun"
  steps:
    - name: process-and-embed
      image: my-registry/timeseries-embedder:1.0.0
      env:
        - name: SOURCE_FILE_PATH
          value: "$(workspaces.data.path)/$(params.source-file)"
        - name: OUTPUT_FILE_PATH
          value: "$(workspaces.data.path)/$(params.output-file)"
        - name: MODEL_VERSION
          value: "$(params.model-version)"
        - name: PHOENIX_ENDPOINT
          value: "$(params.phoenix-endpoint)"
        - name: TEKTON_PIPELINE_RUN
          value: "$(params.pipeline-run-name)"
      script: |
        #!/usr/bin/env python
        # The container's entrypoint is our main Python script
        # which will read the environment variables.
        python /app/main.py

Finally, the Pipeline definition ties everything together, defining the execution graph and the flow of data through the shared workspace. The pitfall here is managing the workspace size; for massive datasets, a PVC-backed workspace is essential, but for moderately sized data, an emptyDir can be faster.

# tekton/pipeline.yaml
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
  name: ts-embedding-pipeline
spec:
  workspaces:
    - name: shared-data
  params:
    - name: git-repo-url
      type: string
    - name: git-revision
      type: string
      default: "main"
    - name: gcs-data-uri
      type: string
    - name: model-version
      type: string
    - name: phoenix-endpoint
      type: string
  tasks:
    - name: fetch-source-code
      taskRef:
        name: git-clone
      workspaces:
        - name: output
          workspace: shared-data
      params:
        - name: url
          value: $(params.git-repo-url)
        - name: revision
          value: $(params.git-revision)

    - name: fetch-raw-data
      taskRef:
        name: fetch-timeseries-data
      runAfter: [ "fetch-source-code" ]
      workspaces:
        - name: source
          workspace: shared-data
      params:
        - name: gcs-bucket-uri
          value: $(params.gcs-data-uri)
        # Assuming a secret 'gcp-sa' exists in the namespace
        - name: gcp-service-account 
          value: "gcp-sa" 
    
    - name: build-embeddings
      taskRef:
        name: generate-embeddings
      runAfter: [ "fetch-raw-data" ]
      workspaces:
        - name: data
          workspace: shared-data
      params:
        - name: model-version
          value: $(params.model-version)
        - name: phoenix-endpoint
          value: $(params.phoenix-endpoint)
        - name: pipeline-run-name
          # This value is populated at runtime by the PipelineRun
          value: $(context.pipelineRun.name)

    - name: load-vectors-to-weaviate
      taskRef:
        name: load-weaviate
      runAfter: [ "build-embeddings" ]
      workspaces:
        - name: data
          workspace: shared-data
      params:
        - name: weaviate-url
          value: "http://weaviate.weaviate.svc.cluster.local:8080"
        - name: model-version
          value: $(params.model-version)

Core Processing Logic and Observability Instrumentation

The Python code running inside the generate-embeddings container is where the data processing and critical instrumentation occur. We used OpenTelemetry, configured with a Phoenix exporter, to create detailed traces of the batch job. This is not your typical web request tracing; we are tracing stages of a data transformation.

# /app/main.py
import os
import pickle
import logging
import time

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

# Assume an autoencoder model is packaged with the container
# from model import TimeSeriesAutoencoder 

# --- Observability Setup ---
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import phoenix as px

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Phoenix setup: automatically configures OpenTelemetry for us
# This is much simpler than manual OTel setup.
phoenix_session = px.launch_app()
# --- End Observability Setup ---

# --- Configuration from Environment ---
SOURCE_FILE = os.getenv("SOURCE_FILE_PATH")
OUTPUT_FILE = os.getenv("OUTPUT_FILE_PATH")
MODEL_VERSION = os.getenv("MODEL_VERSION", "0.0.0")
PIPELINE_RUN_NAME = os.getenv("TEKTON_PIPELINE_RUN", "unknown")

WINDOW_SIZE = 128
STEP_SIZE = 32

tracer = trace.get_tracer("timeseries.embedder.job")

def create_windows(data: np.ndarray, window_size: int, step_size: int):
    """Creates sliding windows from a time series."""
    windows = []
    for i in range(0, len(data) - window_size + 1, step_size):
        windows.append(data[i:i + window_size])
    return np.array(windows)

def run_embedding_process():
    """
    Main function to orchestrate the data processing and embedding generation,
    with detailed tracing for each step.
    """
    # The parent span ties all work in this job to the Tekton PipelineRun
    with tracer.start_as_current_span(
        "generate-embeddings-job",
        attributes={
            "tekton.pipeline_run": PIPELINE_RUN_NAME,
            "model.version": MODEL_VERSION,
            "data.source_file": SOURCE_FILE,
        }
    ) as parent_span:
        try:
            # 1. Load Data
            with tracer.start_as_current_span("load-data") as span:
                logging.info(f"Loading data from {SOURCE_FILE}...")
                if not os.path.exists(SOURCE_FILE):
                    raise FileNotFoundError(f"Source file not found: {SOURCE_FILE}")
                
                df = pd.read_csv(SOURCE_FILE, parse_dates=['timestamp'], index_col='timestamp')
                span.set_attribute("data.rows_loaded", len(df))
                logging.info(f"Loaded {len(df)} rows.")

            # 2. Preprocess Data
            with tracer.start_as_current_span("preprocess-data") as span:
                logging.info("Scaling data...")
                scaler = StandardScaler()
                scaled_data = scaler.fit_transform(df[['value']])
                span.set_attribute("data.features", df.columns.tolist())
                logging.info("Data scaling complete.")

            # 3. Windowing
            with tracer.start_as_current_span("create-time-windows") as span:
                logging.info(f"Creating windows of size {WINDOW_SIZE} with step {STEP_SIZE}...")
                windows = create_windows(scaled_data.flatten(), WINDOW_SIZE, STEP_SIZE)
                span.set_attribute("data.window_count", len(windows))
                span.set_attribute("data.window_size", WINDOW_SIZE)
                span.set_attribute("data.step_size", STEP_SIZE)
                logging.info(f"Created {len(windows)} windows.")

            # 4. Generate Embeddings
            with tracer.start_as_current_span("model-inference") as span:
                logging.info("Generating embeddings using autoencoder...")
                # This would be a real model in a production system
                # model = TimeSeriesAutoencoder()
                # model.load_weights('model.h5')
                # embeddings = model.encoder(windows).numpy()
                
                # For demonstration, we use random data
                time.sleep(5) # Simulate inference time
                embeddings = np.random.rand(len(windows), 64).astype(np.float32)

                span.set_attribute("ml.model.name", "TimeSeriesAutoencoder")
                span.set_attribute("ml.embedding_dimensions", embeddings.shape[1])
                logging.info(f"Generated {len(embeddings)} embeddings.")

            # 5. Package for Output
            with tracer.start_as_current_span("serialize-output") as span:
                output_data = []
                for i, embedding in enumerate(embeddings):
                    start_index = i * STEP_SIZE
                    end_index = start_index + WINDOW_SIZE
                    output_data.append({
                        "vector": embedding.tolist(),
                        "timestamp_start": df.index[start_index],
                        "timestamp_end": df.index[end_index-1],
                        "model_version": MODEL_VERSION,
                    })
                
                with open(OUTPUT_FILE, 'wb') as f:
                    pickle.dump(output_data, f)

                span.set_attribute("data.output_file", OUTPUT_FILE)
                logging.info(f"Serialized output to {OUTPUT_FILE}.")

            parent_span.set_status(trace.Status(trace.StatusCode.OK))
            logging.info("Embedding generation job completed successfully.")

        except Exception as e:
            logging.error(f"Job failed: {e}", exc_info=True)
            parent_span.record_exception(e)
            parent_span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
            raise

if __name__ == "__main__":
    run_embedding_process()
    # Give the span processor time to export
    time.sleep(5) 

The key here is the use of tracer.start_as_current_span to delineate each logical step of the process. The attributes we add, like tekton.pipeline_run and model.version, are the critical pieces of metadata that allow us to later correlate this specific job run with production behavior in Phoenix.

Weaviate Schema and Data Loading

The schema in Weaviate must be designed to support the queries from our anomaly detection service. This means not only storing the vector but also indexing the metadata properties we’ll filter on.

# Part of the loading script in the 'load-weaviate' Tekton task
import weaviate
import pickle
import os
from weaviate.util import get_valid_uuid
from uuid import uuid4

WEAVIATE_URL = os.getenv("WEAVIATE_URL")
SOURCE_FILE = os.getenv("SOURCE_FILE_PATH") # e.g., /workspace/data/embeddings.pkl
MODEL_VERSION = os.getenv("MODEL_VERSION")

# Weaviate Class Definition
class_obj = {
    "class": "TimeSeriesChunk",
    "description": "A vector representation of a chunk of time series data.",
    "vectorizer": "none", # We provide our own vectors
    "properties": [
        {
            "name": "timestamp_start",
            "dataType": ["date"],
            "description": "The start timestamp of the time series window.",
        },
        {
            "name": "timestamp_end",
            "dataType": ["date"],
            "description": "The end timestamp of the time series window.",
        },
        {
            "name": "model_version",
            "dataType": ["text"],
            "description": "The version of the model used to generate the embedding.",
        },
        {
            "name": "source_pipeline_run",
            "dataType": ["text"],
            "description": "The Tekton PipelineRun that generated this vector.",
        }
    ],
    "vectorIndexConfig": {
        "distance": "cosine",
        # HNSW parameters are crucial for performance tuning.
        # A common mistake is to leave these as default.
        "efConstruction": 256, # Higher value leads to better graph quality but slower build time.
        "maxConnections": 32, # Connections per node in the HNSW graph.
    }
}

client = weaviate.Client(WEAVIATE_URL)

# Create schema if it doesn't exist (idempotent)
client.schema.create_class(class_obj)

# Load data from the workspace
with open(SOURCE_FILE, 'rb') as f:
    embeddings_data = pickle.load(f)

# Use batching for efficient loading
client.batch.configure(batch_size=200, dynamic=True)
with client.batch as batch:
    for item in embeddings_data:
        properties = {
            "timestamp_start": item["timestamp_start"].isoformat() + "Z",
            "timestamp_end": item["timestamp_end"].isoformat() + "Z",
            "model_version": item["model_version"],
            "source_pipeline_run": os.getenv("TEKTON_PIPELINE_RUN", "unknown")
        }
        
        batch.add_data_object(
            properties,
            "TimeSeriesChunk",
            uuid=get_valid_uuid(uuid4()),
            vector=item["vector"]
        )

print("Data loading to Weaviate complete.")

In a production environment, configuring efConstruction and maxConnections is a trade-off. Higher values improve search accuracy at the cost of ingestion speed and memory usage. This must be tuned based on the specific requirements of the application. The source_pipeline_run property is the final link in our traceability chain.

Limitations and Future Iterations

This system represents a significant step up from manual processes, but it’s not without its own set of challenges. The current Tekton pipeline triggers on every commit to main, which is too naive for a production system. A more robust approach would involve Git tags or a promotion process managed through a tool like ArgoCD to control deployments to the production Weaviate instance.

Secondly, the error handling for data quality is primitive. A faulty data export could poison the vector space. A future iteration should include a validation Task in the Tekton pipeline that runs statistical checks on the generated embeddings (e.g., checking for NaN values, variance, and distribution shifts) before loading them into Weaviate. This task could use Phoenix to log these quality metrics, creating a complete picture of data health alongside performance traces.

Finally, the rollback strategy is undefined. If a bad set of embeddings is loaded, how do we efficiently purge them? Weaviate’s filtering allows for deletion based on model_version or source_pipeline_run, but performing this on millions of objects can be a slow operation. A more sophisticated strategy might involve a blue-green approach where a new Weaviate class is populated and, once validated, traffic is switched over at the application layer. This adds complexity but significantly improves the resilience of the system.


  TOC