The trigger for this project wasn’t a grand architectural vision; it was a production incident. A seemingly innocuous ALTER TABLE ... ADD COLUMN ... DEFAULT ...
on a core table locked it for three minutes during peak traffic. Standard migration tools, even the more sophisticated ones, treated the database as a simple, stateful endpoint to which SQL commands were thrown. Our reality, with stringent uptime requirements and a high-transaction workload, demanded a process that was not just automated but also verifiable and inherently safe. The core technical pain point was that our CI/CD pipelines, managed by Tekton, were excellent for stateless application deployments but dangerously naive when it came to stateful, high-risk database schema changes.
Our initial concept was to adapt the blue-green deployment strategy for database schemas. The theory is straightforward: instead of altering a live table (users
), we create a new table (users_new
) with the desired schema. We then use PostgreSQL’s logical replication to stream all changes from users
to users_new
in real-time. Once the new table is caught up and fully in sync, we can run a battery of verification tests against it. If everything passes, we perform a near-instantaneous switch by renaming the tables within a single transaction. This approach moves the costly and risky parts of the migration—the DDL execution and data backfill—offline, away from the critical path of application requests.
The challenge was translating this concept into a robust, repeatable pipeline within our Kubernetes-native ecosystem. Shell scripts inside a Task
container felt brittle. We needed a purpose-built tool that could manage the complex lifecycle of this process with strong error handling, clear configuration, and verifiable outputs. This led to the decision to build a small, self-contained CLI utility in Rust.
In a real-world project, technology selection is about trade-offs.
- Why Tekton? It’s our incumbent CI/CD engine. Its container-native design means any tool that can be containerized can become a step in a pipeline. This extensibility was key.
- Why PostgreSQL Logical Replication? It’s a native feature since version 10, providing a low-overhead way to capture DML changes without triggers. It forms the backbone of the entire data synchronization process.
- Why Rust? For a tool that orchestrates critical database operations, “good enough” is not good enough.
- Reliability: Rust’s compile-time guarantees against null pointer dereferences, data races, and other common memory errors gave us confidence that the tool itself wouldn’t be a source of failure.
- Performance: The validation steps, particularly the performance regression testing, needed to be fast to avoid unnecessarily long pipeline runs.
- Static Binary: The ability to compile our tool into a single, dependency-free static binary is a massive operational advantage. Our Tekton
Task
container image becomes incredibly small and simple—just the binary and nothing else. - Ecosystem: Crates like
tokio-postgres
for non-blocking database interaction,serde
for robust configuration parsing, andclap
for building a clean command-line interface are production-ready and high-quality.
The result was a CLI tool named pg-shifter
. Its job is to act as the “brain” inside our Tekton Task
, executing discrete, verifiable stages of the migration process.
The Core Logic: pg-shifter
in Rust
The pg-shifter
tool needed several commands to manage the migration lifecycle: initiate
, validate
, analyze
, and switchover
. Let’s break down the implementation of the most critical parts.
First, establishing a reliable, asynchronous connection to PostgreSQL is foundational. We used tokio-postgres
. A common mistake is to hardcode credentials; our configuration is driven by environment variables, which map directly to Kubernetes Secrets mounted into the Tekton Task pod.
// src/db.rs
use tokio_postgres::{Client, Config, NoTls, Error};
use std::env;
// A struct to hold our configuration, parsed from environment variables.
// Using a struct makes the configuration explicit and easier to manage.
pub struct DbConfig {
pub primary_dsn: String,
pub shadow_dsn: String, // Connection to the same DB, but maybe with a different user
}
impl DbConfig {
pub fn from_env() -> Result<Self, &'static str> {
let primary_dsn = env::var("PRIMARY_DSN").map_err(|_| "PRIMARY_DSN not set")?;
let shadow_dsn = env::var("SHADOW_DSN").map_err(|_| "SHADOW_DSN not set")?;
Ok(Self { primary_dsn, shadow_dsn })
}
}
pub async fn connect(dsn: &str) -> Result<Client, Error> {
let (client, connection) = Config::new()
.host(&dsn)
// ... parse other DSN components
.user(&env::var("PGUSER").unwrap_or_default())
.password(&env::var("PGPASSWORD").unwrap_or_default())
.dbname(&env::var("PGDATABASE").unwrap_or_default())
.connect(NoTls)
.await?;
// The connection object must be spawned as a background task.
// If it's not spawned, the client will hang waiting for messages.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
}
});
Ok(client)
}
The validate
command is where the real intelligence lies. Its primary job is to poll PostgreSQL’s replication statistics and fail the pipeline if replication lag exceeds a defined threshold or if the replication slot enters a failed state. This prevents a premature switchover with an out-of-sync database.
// src/commands/validate.rs
use tokio_postgres::Client;
use std::time::Duration;
use tokio::time::sleep;
const LAG_THRESHOLD_BYTES: i64 = 1024 * 1024; // 1 MB
const MAX_RETRIES: u32 = 10;
const RETRY_INTERVAL_SECONDS: u64 = 30;
pub async fn run_validation(client: &Client, publication_name: &str, slot_name: &str) -> Result<(), String> {
println!("Starting replication validation for slot '{}'...", slot_name);
for i in 0..MAX_RETRIES {
println!("Validation attempt {}/{}...", i + 1, MAX_RETRIES);
// This query joins system views to get the replication lag in bytes.
// It's a critical piece of the validation logic.
let rows = client
.query(
"SELECT pg_current_wal_lsn() - confirmed_flush_lsn AS replication_lag_bytes
FROM pg_replication_slots
WHERE slot_name = $1 AND active = 't'",
&[&slot_name],
)
.await
.map_err(|e| format!("Failed to query replication slot: {}", e))?;
if rows.is_empty() {
return Err(format!("Replication slot '{}' is not active or does not exist.", slot_name));
}
let lag_bytes: i64 = rows[0].get("replication_lag_bytes");
println!("Current replication lag: {} bytes", lag_bytes);
if lag_bytes <= LAG_THRESHOLD_BYTES {
println!("Replication lag is within the threshold ({} bytes). Validation successful.", LAG_THRESHOLD_BYTES);
return Ok(());
}
println!("Replication lag {} exceeds threshold {}. Waiting...", lag_bytes, LAG_THRESHOLD_BYTES);
sleep(Duration::from_secs(RETRY_INTERVAL_SECONDS)).await;
}
Err(format!(
"Replication lag failed to converge after {} attempts. Aborting.",
MAX_RETRIES
))
}
The pitfall here is not accounting for an inactive slot. The query explicitly checks active = 't'
. An inactive slot means the subscriber isn’t connected, which is a critical failure state for this process. The polling loop with a timeout is essential; without it, a stuck replication could cause the pipeline to hang indefinitely.
The analyze
command performs performance regression testing. This was a crucial requirement to prevent migrations that introduced silent performance degradations. The tool executes a set of pre-defined, critical queries (read from a config file) against both the original and the new table structures using EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON)
. It then parses the JSON output to compare execution times and buffer usage.
// src/commands/analyze.rs
use tokio_postgres::Client;
use serde::Deserialize;
use serde_json::Value;
#[derive(Deserialize)]
struct PerfTestCase {
name: String,
query: String,
}
// A simplified representation of the EXPLAIN ANALYZE output.
struct QueryPlanMetrics {
execution_time_ms: f64,
total_buffers_shared_hit: i64,
}
const PERF_DEGRADATION_THRESHOLD: f64 = 1.2; // Allow for 20% degradation
async fn get_plan_metrics(client: &Client, query: &str) -> Result<QueryPlanMetrics, String> {
let statement = format!("EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {}", query);
let rows = client.query(&statement, &[])
.await
.map_err(|e| format!("EXPLAIN ANALYZE failed: {}", e))?;
let plan_json: Value = rows[0].get(0);
// The JSON output from EXPLAIN is nested. We need to carefully extract the values.
// In a production tool, this parsing would be more robust.
let plan = &plan_json[0]["Plan"];
let execution_time_ms = plan["Actual Total Time"].as_f64().unwrap_or(0.0);
let total_buffers_shared_hit = plan["Shared Hit Blocks"].as_i64().unwrap_or(0) * 8192; // 8kB per block
Ok(QueryPlanMetrics {
execution_time_ms,
total_buffers_shared_hit,
})
}
pub async fn run_analysis(
primary_client: &Client,
shadow_client: &Client,
original_table: &str,
new_table: &str,
test_queries_path: &str,
) -> Result<(), String> {
let queries_file = std::fs::read_to_string(test_queries_path)
.map_err(|e| format!("Failed to read test queries file: {}", e))?;
let test_cases: Vec<PerfTestCase> = serde_yaml::from_str(&queries_file)
.map_err(|e| format!("Failed to parse test queries YAML: {}", e))?;
for case in test_cases {
println!("Analyzing performance for test case: {}", case.name);
let original_query = case.query.replace("{TABLE}", original_table);
let new_query = case.query.replace("{TABLE}", new_table);
let original_metrics = get_plan_metrics(primary_client, &original_query).await?;
let new_metrics = get_plan_metrics(shadow_client, &new_query).await?;
println!(
" Original -> Execution Time: {:.2} ms, Shared Buffers: {} bytes",
original_metrics.execution_time_ms, original_metrics.total_buffers_shared_hit
);
println!(
" New -> Execution Time: {:.2} ms, Shared Buffers: {} bytes",
new_metrics.execution_time_ms, new_metrics.total_buffers_shared_hit
);
if new_metrics.execution_time_ms > original_metrics.execution_time_ms * PERF_DEGRADATION_THRESHOLD {
return Err(format!(
"Performance regression detected in '{}': execution time increased by more than {}%",
case.name, (PERF_DEGRADATION_THRESHOLD - 1.0) * 100.0
));
}
}
println!("All performance analysis tests passed.");
Ok(())
}
The {TABLE}
placeholder is a simple convention allowing us to reuse the same query templates for both the original and shadow tables. This step provides an objective, data-driven guardrail against bad index changes or suboptimal query plans introduced by the schema modification.
Integrating with Tekton
With the pg-shifter
binary ready, the next step was to containerize it and define a Tekton Task
.
Dockerfile for a minimal Rust image:
# Stage 1: Build the Rust binary
FROM rust:1.73-slim as builder
WORKDIR /usr/src/pg-shifter
COPY . .
# Use a release build for production, statically link for portability
RUN cargo install --path . --root /usr/local/cargo --target x86_64-unknown-linux-musl
# Stage 2: Create the final, minimal image
FROM alpine:latest
# We only need the compiled binary and ca-certificates for TLS (if needed)
COPY --from=builder /usr/local/cargo/bin/pg-shifter /usr/local/bin/pg-shifter
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
ENTRYPOINT ["/usr/local/bin/pg-shifter"]
This multi-stage build results in a tiny container image (around 15MB), which is ideal for fast pod startup times in Tekton.
Tekton ClusterTask
Definition:
We define a ClusterTask
to make it reusable across different pipelines and namespaces. It takes parameters for the action to perform, database details, and table names.
apiVersion: tekton.dev/v1beta1
kind: ClusterTask
metadata:
name: pg-shifter-task
spec:
description: >-
A task to perform safe, zero-downtime schema migrations for PostgreSQL
using a blue-green strategy.
params:
- name: PGSHIFTER_IMAGE
description: The container image for the pg-shifter tool.
type: string
default: "my-registry/pg-shifter:latest"
- name: COMMAND
description: The pg-shifter command to execute (e.g., initiate, validate, analyze, switchover).
type: string
- name: ORIGINAL_TABLE
description: The name of the live table.
type: string
- name: NEW_TABLE
description: The name of the new shadow table.
type: string
- name: PUBLICATION_NAME
description: The name of the logical replication publication.
type: string
- name: SLOT_NAME
description: The name of the logical replication slot.
type: string
- name: DB_SECRET_NAME
description: The name of the Kubernetes secret containing DB credentials.
type: string
steps:
- name: run-command
image: $(params.PGSHIFTER_IMAGE)
command: ["pg-shifter"]
args: ["$(params.COMMAND)"]
env:
# Map secrets to standard PostgreSQL environment variables
# pg-shifter will pick these up automatically.
- name: PGHOST
valueFrom:
secretKeyRef:
name: $(params.DB_SECRET_NAME)
key: host
- name: PGPORT
valueFrom:
secretKeyRef:
name: $(params.DB_SECRET_NAME)
key: port
- name: PGDATABASE
valueFrom:
secretKeyRef:
name: $(params.DB_SECRET_NAME)
key: database
- name: PGUSER
valueFrom:
secretKeyRef:
name: $(params.DB_SECRET_NAME)
key: user
- name: PGPASSWORD
valueFrom:
secretKeyRef:
name: $(params.DB_SECRET_NAME)
key: password
# Pass task params to the tool via env vars as well
- name: ORIGINAL_TABLE
value: $(params.ORIGINAL_TABLE)
- name: NEW_TABLE
value: $(params.NEW_TABLE)
- name: PUBLICATION_NAME
value: $(params.PUBLICATION_NAME)
- name: SLOT_NAME
value: $(params.SLOT_NAME)
The Full Tekton Pipeline
:
Finally, we orchestrate these tasks into a cohesive Pipeline
. This is where the entire migration strategy is laid out declaratively.
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
name: postgres-safe-migration-pipeline
spec:
params:
- name: git-repo-url
type: string
- name: git-revision
type: string
- name: original-table-name
type: string
- name: db-secret-name
type: string
workspaces:
- name: shared-data
tasks:
- name: fetch-migration-scripts
taskRef:
name: git-clone
workspaces:
- name: output
workspace: shared-data
params:
- name: url
value: $(params.git-repo-url)
- name: revision
value: $(params.git-revision)
- name: 1-initiate-migration
runAfter: [fetch-migration-scripts]
taskRef:
name: pg-shifter-task
params:
- name: COMMAND
value: "initiate"
# Parameters for table names are derived from conventions
# or pipeline parameters.
# ...
- name: DB_SECRET_NAME
value: $(params.db-secret-name)
# This task would apply the SQL to create the new table,
# the publication, and the replication slot.
- name: 2-validate-replication
runAfter: [1-initiate-migration]
taskRef:
name: pg-shifter-task
params:
- name: COMMAND
value: "validate"
- name: DB_SECRET_NAME
value: $(params.db-secret-name)
# ... other params
- name: 3-analyze-performance
runAfter: [2-validate-replication]
taskRef:
name: pg-shifter-task
params:
- name: COMMAND
value: "analyze"
- name: DB_SECRET_NAME
value: $(params.db-secret-name)
# ... other params
# This is a critical safety gate before the final, irreversible step.
# It requires a human to manually approve the pipeline continuation.
- name: 4-manual-approval
runAfter: [3-analyze-performance]
taskRef:
name: manual-approval
timeout: 1h
- name: 5-execute-switchover
runAfter: [4-manual-approval]
taskRef:
name: pg-shifter-task
params:
- name: COMMAND
value: "switchover"
- name: DB_SECRET_NAME
value: $(params.db-secret-name)
# ... other params
Here’s the visual flow of the pipeline:
graph TD A[fetch-migration-scripts] --> B(1-initiate-migration); B --> C(2-validate-replication); C --> D(3-analyze-performance); D --> E{4-manual-approval}; E -- Approved --> F(5-execute-switchover); E -- Rejected --> G(Cleanup / Fail); F --> H(Final Cleanup);
This pipeline provides a fully auditable, repeatable process. Every run is logged, and the failure of any step—be it a syntax error, a replication lag timeout, or a performance regression—halts the entire process before the production state is altered. The manual approval gate is a pragmatic concession that for all our automation, a human expert should give the final sign-off for a high-stakes change.
The final switchover
step is the moment of highest risk. pg-shifter
handles this by executing the rename operations within a single transaction, acquiring an ACCESS EXCLUSIVE
lock on both tables for the briefest possible moment.
BEGIN;
-- This lock prevents any reads or writes to both tables,
-- ensuring consistency during the switch.
LOCK TABLE public.users, public.users_new IN ACCESS EXCLUSIVE MODE;
-- The switch itself.
ALTER TABLE public.users RENAME TO users_old;
ALTER TABLE public.users_new RENAME TO users;
-- Rename indexes, sequences, etc.
ALTER INDEX users_new_pkey RENAME TO users_pkey;
-- ...
COMMIT;
The duration of this lock is typically milliseconds, a massive improvement over the minutes of locking we experienced with ALTER TABLE
.
This system is not a silver bullet. The switchover
still requires a very brief, exclusive lock, which might be unacceptable for systems that cannot tolerate even a few milliseconds of write interruption. The performance analysis is only as good as the test queries provided; it cannot predict the impact of novel query patterns that appear after the migration. Furthermore, this workflow is optimized for DDL changes to a single table and its associated indexes. More complex migrations involving stored procedures, triggers, or foreign key constraints across multiple tables still require a more bespoke, manual process. The current rollback strategy also relies on reversing the switchover manually, as automating stateful rollbacks in a CI/CD pipeline is an order of magnitude more complex than the forward migration itself.