The operational bottleneck wasn’t model training; it was the deployment and validation of time-series embeddings for our anomaly detection system. Every update was a brittle, multi-step manual process involving Python scripts, kubectl apply
, and a great deal of hope. When a new set of vector embeddings caused a spike in query latency or a drop in detection accuracy, backtracking to the root cause—a specific data preprocessing run or a faulty embedding model—was a painful forensic exercise. The core problem was a complete lack of observability connecting the CI/CD pipeline that built the embeddings to the production system that served them. We were flying blind.
Our initial concept was to build a fully automated, GitOps-driven system. A git push
to our model repository should trigger a resilient pipeline that ingests raw time-series data, generates vector representations of temporal patterns, loads them into a vector database, and makes them available for querying. The non-negotiable requirement was end-to-end traceability. I needed to be able to select a single slow query in production and trace it back to the specific Tekton PipelineRun
and Git commit that produced the underlying data.
This led to a specific technology stack. For CI/CD, Tekton was the obvious choice for its Kubernetes-native, declarative nature. It avoids the impedance mismatch of forcing traditional CI systems into a cloud-native world. For the vector store, Weaviate was selected over simpler alternatives due to its robust metadata storage and filtering capabilities, which are critical for time-series workloads. We needed to associate each vector with a model version, a timestamp range, and sensor IDs. Finally, for observability, we chose Phoenix. Standard APM tools are excellent for tracing RPC calls between microservices but are largely unaware of the data flow within asynchronous, containerized batch jobs like a Tekton pipeline. Phoenix, with its focus on ML observability, gave us the tooling to inject context and trace a data lineage from source to production service.
The architecture we settled on can be visualized as a continuous loop.
graph TD A[Developer pushes model update to Git] --> B{Tekton EventListener}; B --> C[Trigger Tekton PipelineRun]; C --> D{Pipeline}; subgraph Pipeline direction LR D1[1. Clone Repo] --> D2[2. Fetch Time Series Data]; D2 --> D3[3. Generate Embeddings]; D3 --> D4[4. Load to Weaviate]; D4 --> D5[5. Validate Deployment]; end D -- Traces sent to --> E[Phoenix Collector]; F[Anomaly Detection Service] -- Queries --> G[Weaviate Cluster]; G -- Serves Embeddings --> F; F -- Traces sent to --> E; A -.-> F; style D1 fill:#cde4ff style D2 fill:#cde4ff style D3 fill:#cde4ff style D4 fill:#cde4ff style D5 fill:#cde4ff
This post-mortem details the implementation of this system, focusing on the configuration of Tekton tasks, the core Python logic for data processing and instrumentation, and the practical challenges encountered.
Defining the CI/CD Contract with Tekton
The foundation of the system is a set of declarative Tekton Tasks
and a Pipeline
that orchestrates them. A common mistake is to create monolithic tasks. In a real-world project, it’s far more maintainable to create smaller, reusable tasks, each with a single responsibility. This mirrors the single responsibility principle in software design.
Our first Task
is responsible for fetching the raw time-series data. In this case, from a GCS bucket. Note the use of workspaces
for sharing data between tasks and params
for configuration.
# tekton/tasks/fetch-data.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: fetch-timeseries-data
spec:
description: >-
Fetches raw time series data from a specified GCS bucket
and places it into the workspace.
workspaces:
- name: source
description: The workspace where fetched data will be stored.
params:
- name: gcs-bucket-uri
description: The full GCS URI of the source data file (e.g., gs://my-bucket/data.csv).
type: string
- name of the service account to use
name: gcp-service-account
description: The GCP service account for authentication.
type: string
steps:
- name: gcs-fetch
image: google/cloud-sdk:slim
script: |
#!/bin/sh
set -e
echo "Authenticating with GCP..."
gcloud auth activate-service-account --key-file=/var/secrets/gcp/key.json
echo "Fetching data from $(params.gcs-bucket-uri) to $(workspaces.source.path)/timeseries.csv"
gsutil cp $(params.gcs-bucket-uri) $(workspaces.source.path)/timeseries.csv
echo "Data fetch complete."
volumeMounts:
- name: gcp-credentials
mountPath: /var/secrets/gcp
readOnly: true
volumes:
- name: gcp-credentials
secret:
secretName: $(params.gcp-service-account)
The most complex Task
is generate-embeddings
. This is where the core logic resides. The task takes raw data from the workspace, processes it, and outputs serialized vector embeddings. It’s critical to inject environment variables for observability and configuration.
# tekton/tasks/generate-embeddings.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: generate-embeddings
spec:
workspaces:
- name: data
description: The workspace containing input data and for storing output embeddings.
params:
- name: source-file
description: The name of the input CSV file.
type: string
default: "timeseries.csv"
- name: output-file
description: The name of the output file for embeddings.
type: string
default: "embeddings.pkl"
- name: model-version
description: The version of the embedding model being used.
type: string
- name: phoenix-endpoint
description: The endpoint for the Phoenix OTLP collector.
type: string
# Tekton injects these automatically, useful for tracing
- name: pipeline-run-name
description: The name of the parent PipelineRun.
type: string
default: "unknown-pipelinerun"
steps:
- name: process-and-embed
image: my-registry/timeseries-embedder:1.0.0
env:
- name: SOURCE_FILE_PATH
value: "$(workspaces.data.path)/$(params.source-file)"
- name: OUTPUT_FILE_PATH
value: "$(workspaces.data.path)/$(params.output-file)"
- name: MODEL_VERSION
value: "$(params.model-version)"
- name: PHOENIX_ENDPOINT
value: "$(params.phoenix-endpoint)"
- name: TEKTON_PIPELINE_RUN
value: "$(params.pipeline-run-name)"
script: |
#!/usr/bin/env python
# The container's entrypoint is our main Python script
# which will read the environment variables.
python /app/main.py
Finally, the Pipeline
definition ties everything together, defining the execution graph and the flow of data through the shared workspace. The pitfall here is managing the workspace size; for massive datasets, a PVC-backed workspace is essential, but for moderately sized data, an emptyDir
can be faster.
# tekton/pipeline.yaml
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
name: ts-embedding-pipeline
spec:
workspaces:
- name: shared-data
params:
- name: git-repo-url
type: string
- name: git-revision
type: string
default: "main"
- name: gcs-data-uri
type: string
- name: model-version
type: string
- name: phoenix-endpoint
type: string
tasks:
- name: fetch-source-code
taskRef:
name: git-clone
workspaces:
- name: output
workspace: shared-data
params:
- name: url
value: $(params.git-repo-url)
- name: revision
value: $(params.git-revision)
- name: fetch-raw-data
taskRef:
name: fetch-timeseries-data
runAfter: [ "fetch-source-code" ]
workspaces:
- name: source
workspace: shared-data
params:
- name: gcs-bucket-uri
value: $(params.gcs-data-uri)
# Assuming a secret 'gcp-sa' exists in the namespace
- name: gcp-service-account
value: "gcp-sa"
- name: build-embeddings
taskRef:
name: generate-embeddings
runAfter: [ "fetch-raw-data" ]
workspaces:
- name: data
workspace: shared-data
params:
- name: model-version
value: $(params.model-version)
- name: phoenix-endpoint
value: $(params.phoenix-endpoint)
- name: pipeline-run-name
# This value is populated at runtime by the PipelineRun
value: $(context.pipelineRun.name)
- name: load-vectors-to-weaviate
taskRef:
name: load-weaviate
runAfter: [ "build-embeddings" ]
workspaces:
- name: data
workspace: shared-data
params:
- name: weaviate-url
value: "http://weaviate.weaviate.svc.cluster.local:8080"
- name: model-version
value: $(params.model-version)
Core Processing Logic and Observability Instrumentation
The Python code running inside the generate-embeddings
container is where the data processing and critical instrumentation occur. We used OpenTelemetry, configured with a Phoenix exporter, to create detailed traces of the batch job. This is not your typical web request tracing; we are tracing stages of a data transformation.
# /app/main.py
import os
import pickle
import logging
import time
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
# Assume an autoencoder model is packaged with the container
# from model import TimeSeriesAutoencoder
# --- Observability Setup ---
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import phoenix as px
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Phoenix setup: automatically configures OpenTelemetry for us
# This is much simpler than manual OTel setup.
phoenix_session = px.launch_app()
# --- End Observability Setup ---
# --- Configuration from Environment ---
SOURCE_FILE = os.getenv("SOURCE_FILE_PATH")
OUTPUT_FILE = os.getenv("OUTPUT_FILE_PATH")
MODEL_VERSION = os.getenv("MODEL_VERSION", "0.0.0")
PIPELINE_RUN_NAME = os.getenv("TEKTON_PIPELINE_RUN", "unknown")
WINDOW_SIZE = 128
STEP_SIZE = 32
tracer = trace.get_tracer("timeseries.embedder.job")
def create_windows(data: np.ndarray, window_size: int, step_size: int):
"""Creates sliding windows from a time series."""
windows = []
for i in range(0, len(data) - window_size + 1, step_size):
windows.append(data[i:i + window_size])
return np.array(windows)
def run_embedding_process():
"""
Main function to orchestrate the data processing and embedding generation,
with detailed tracing for each step.
"""
# The parent span ties all work in this job to the Tekton PipelineRun
with tracer.start_as_current_span(
"generate-embeddings-job",
attributes={
"tekton.pipeline_run": PIPELINE_RUN_NAME,
"model.version": MODEL_VERSION,
"data.source_file": SOURCE_FILE,
}
) as parent_span:
try:
# 1. Load Data
with tracer.start_as_current_span("load-data") as span:
logging.info(f"Loading data from {SOURCE_FILE}...")
if not os.path.exists(SOURCE_FILE):
raise FileNotFoundError(f"Source file not found: {SOURCE_FILE}")
df = pd.read_csv(SOURCE_FILE, parse_dates=['timestamp'], index_col='timestamp')
span.set_attribute("data.rows_loaded", len(df))
logging.info(f"Loaded {len(df)} rows.")
# 2. Preprocess Data
with tracer.start_as_current_span("preprocess-data") as span:
logging.info("Scaling data...")
scaler = StandardScaler()
scaled_data = scaler.fit_transform(df[['value']])
span.set_attribute("data.features", df.columns.tolist())
logging.info("Data scaling complete.")
# 3. Windowing
with tracer.start_as_current_span("create-time-windows") as span:
logging.info(f"Creating windows of size {WINDOW_SIZE} with step {STEP_SIZE}...")
windows = create_windows(scaled_data.flatten(), WINDOW_SIZE, STEP_SIZE)
span.set_attribute("data.window_count", len(windows))
span.set_attribute("data.window_size", WINDOW_SIZE)
span.set_attribute("data.step_size", STEP_SIZE)
logging.info(f"Created {len(windows)} windows.")
# 4. Generate Embeddings
with tracer.start_as_current_span("model-inference") as span:
logging.info("Generating embeddings using autoencoder...")
# This would be a real model in a production system
# model = TimeSeriesAutoencoder()
# model.load_weights('model.h5')
# embeddings = model.encoder(windows).numpy()
# For demonstration, we use random data
time.sleep(5) # Simulate inference time
embeddings = np.random.rand(len(windows), 64).astype(np.float32)
span.set_attribute("ml.model.name", "TimeSeriesAutoencoder")
span.set_attribute("ml.embedding_dimensions", embeddings.shape[1])
logging.info(f"Generated {len(embeddings)} embeddings.")
# 5. Package for Output
with tracer.start_as_current_span("serialize-output") as span:
output_data = []
for i, embedding in enumerate(embeddings):
start_index = i * STEP_SIZE
end_index = start_index + WINDOW_SIZE
output_data.append({
"vector": embedding.tolist(),
"timestamp_start": df.index[start_index],
"timestamp_end": df.index[end_index-1],
"model_version": MODEL_VERSION,
})
with open(OUTPUT_FILE, 'wb') as f:
pickle.dump(output_data, f)
span.set_attribute("data.output_file", OUTPUT_FILE)
logging.info(f"Serialized output to {OUTPUT_FILE}.")
parent_span.set_status(trace.Status(trace.StatusCode.OK))
logging.info("Embedding generation job completed successfully.")
except Exception as e:
logging.error(f"Job failed: {e}", exc_info=True)
parent_span.record_exception(e)
parent_span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
raise
if __name__ == "__main__":
run_embedding_process()
# Give the span processor time to export
time.sleep(5)
The key here is the use of tracer.start_as_current_span
to delineate each logical step of the process. The attributes we add, like tekton.pipeline_run
and model.version
, are the critical pieces of metadata that allow us to later correlate this specific job run with production behavior in Phoenix.
Weaviate Schema and Data Loading
The schema in Weaviate must be designed to support the queries from our anomaly detection service. This means not only storing the vector but also indexing the metadata properties we’ll filter on.
# Part of the loading script in the 'load-weaviate' Tekton task
import weaviate
import pickle
import os
from weaviate.util import get_valid_uuid
from uuid import uuid4
WEAVIATE_URL = os.getenv("WEAVIATE_URL")
SOURCE_FILE = os.getenv("SOURCE_FILE_PATH") # e.g., /workspace/data/embeddings.pkl
MODEL_VERSION = os.getenv("MODEL_VERSION")
# Weaviate Class Definition
class_obj = {
"class": "TimeSeriesChunk",
"description": "A vector representation of a chunk of time series data.",
"vectorizer": "none", # We provide our own vectors
"properties": [
{
"name": "timestamp_start",
"dataType": ["date"],
"description": "The start timestamp of the time series window.",
},
{
"name": "timestamp_end",
"dataType": ["date"],
"description": "The end timestamp of the time series window.",
},
{
"name": "model_version",
"dataType": ["text"],
"description": "The version of the model used to generate the embedding.",
},
{
"name": "source_pipeline_run",
"dataType": ["text"],
"description": "The Tekton PipelineRun that generated this vector.",
}
],
"vectorIndexConfig": {
"distance": "cosine",
# HNSW parameters are crucial for performance tuning.
# A common mistake is to leave these as default.
"efConstruction": 256, # Higher value leads to better graph quality but slower build time.
"maxConnections": 32, # Connections per node in the HNSW graph.
}
}
client = weaviate.Client(WEAVIATE_URL)
# Create schema if it doesn't exist (idempotent)
client.schema.create_class(class_obj)
# Load data from the workspace
with open(SOURCE_FILE, 'rb') as f:
embeddings_data = pickle.load(f)
# Use batching for efficient loading
client.batch.configure(batch_size=200, dynamic=True)
with client.batch as batch:
for item in embeddings_data:
properties = {
"timestamp_start": item["timestamp_start"].isoformat() + "Z",
"timestamp_end": item["timestamp_end"].isoformat() + "Z",
"model_version": item["model_version"],
"source_pipeline_run": os.getenv("TEKTON_PIPELINE_RUN", "unknown")
}
batch.add_data_object(
properties,
"TimeSeriesChunk",
uuid=get_valid_uuid(uuid4()),
vector=item["vector"]
)
print("Data loading to Weaviate complete.")
In a production environment, configuring efConstruction
and maxConnections
is a trade-off. Higher values improve search accuracy at the cost of ingestion speed and memory usage. This must be tuned based on the specific requirements of the application. The source_pipeline_run
property is the final link in our traceability chain.
Limitations and Future Iterations
This system represents a significant step up from manual processes, but it’s not without its own set of challenges. The current Tekton pipeline triggers on every commit to main
, which is too naive for a production system. A more robust approach would involve Git tags or a promotion process managed through a tool like ArgoCD to control deployments to the production Weaviate instance.
Secondly, the error handling for data quality is primitive. A faulty data export could poison the vector space. A future iteration should include a validation Task
in the Tekton pipeline that runs statistical checks on the generated embeddings (e.g., checking for NaN values, variance, and distribution shifts) before loading them into Weaviate. This task could use Phoenix to log these quality metrics, creating a complete picture of data health alongside performance traces.
Finally, the rollback strategy is undefined. If a bad set of embeddings is loaded, how do we efficiently purge them? Weaviate’s filtering allows for deletion based on model_version
or source_pipeline_run
, but performing this on millions of objects can be a slow operation. A more sophisticated strategy might involve a blue-green approach where a new Weaviate class is populated and, once validated, traffic is switched over at the application layer. This adds complexity but significantly improves the resilience of the system.