The mandate was clear: provide near real-time analytical access to our core operational data without compromising the performance of the primary OLTP database. This database, a battle-hardened PostgreSQL instance, serves a monolithic Java application that relies heavily on MyBatis for its data access layer. Any attempt to run complex, long-running analytical queries directly against it resulted in lock contention, query timeouts, and palpable operational risk. The initial thought of using a read replica was dismissed; it solved the read isolation problem but failed to address the need for data in an analytics-friendly columnar format like Parquet, and the costs of maintaining a powerful, idle replica were unjustifiable.
The chosen path was architectural decoupling. We decided to stream changes from the production database into a Delta Lake table. This would provide a transactionally consistent, versioned, and columnar representation of our data, perfect for analytics. A new, lightweight service built with FastAPI would then expose this data to downstream consumers. This approach isolates workloads, leverages modern data formats, and opens the door for future data science initiatives. The real challenge, however, lay in building the bridge—a custom service capable of capturing database transactions faithfully and replaying them onto the Delta Lake table without data loss or corruption.
This is the log of that build. We deliberately avoided a heavyweight stack involving Kafka, Debezium, and Spark Streaming. While powerful, that stack introduced operational complexity and infrastructure costs we weren’t ready to incur for this specific use case. Instead, we opted for a leaner, more direct approach using PostgreSQL’s native logical replication and a custom Python consumer.
Phase 1: Preparing the Source Database for Logical Replication
Before any data can be streamed, the source PostgreSQL instance must be configured to emit changes through its Write-Ahead Log (WAL) in a logical format. This is a non-default setting and a critical first step. In a real-world project, this requires DBA involvement and a scheduled maintenance window.
The core setting is wal_level
. It must be set to logical
.
-- This configuration change requires a server restart.
-- Connect to the PostgreSQL instance as a superuser.
ALTER SYSTEM SET wal_level = 'logical';
-- Additionally, ensure you have enough WAL senders to handle replication clients.
-- The default is often 10. Increase if you plan on having multiple replication clients.
ALTER SYSTEM SET max_wal_senders = '12';
-- A restart of the PostgreSQL service is required for these to take effect.
-- e.g., using pg_ctl: pg_ctl restart -D /path/to/your/data_directory
With the server configured, we can create the specific table that our legacy MyBatis application interacts with. For this demonstration, we’ll use a simple products
table.
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price NUMERIC(10, 2) NOT NULL,
stock_quantity INTEGER NOT NULL,
last_updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- For logical replication, the table must have a REPLICA IDENTITY.
-- 'FULL' logs the full previous row content on UPDATE and DELETE, which is
-- essential for our MERGE logic later. 'DEFAULT' (primary key) is not sufficient.
ALTER TABLE products REPLICA IDENTITY FULL;
The final piece of setup on the database side is creating a logical replication slot. This is a crucial concept. The slot ensures that the PostgreSQL server retains the WAL files needed by our consumer, even if the consumer disconnects. Without a slot, the server would prune old WAL files, and our consumer could lose changes permanently.
-- Create a publication that specifies which tables to replicate changes for.
CREATE PUBLICATION cdc_publication FOR TABLE products;
-- Create the logical replication slot using the 'pgoutput' plugin.
-- 'pgoutput' is the standard logical decoding plugin.
-- 'products_cdc_slot' is the name our client will use to connect.
SELECT * FROM pg_create_logical_replication_slot('products_cdc_slot', 'pgoutput');
A common mistake is to forget REPLICA IDENTITY FULL
. Without it, UPDATE
and DELETE
events in the WAL stream won’t contain the old values of the columns, making it impossible to correctly identify and modify the corresponding records in the target Delta table.
Phase 2: The Python CDC Consumer Service
This service is the heart of our custom pipeline. It connects to the replication slot, consumes the stream of changes, buffers them transactionally, and writes them to Delta Lake. We’ll use the psycopg2
library, which has support for the replication protocol.
The core logic needs to be robust, handling connection drops, parsing different message types, and most importantly, respecting transaction boundaries (BEGIN
and COMMIT
messages).
# cdc_consumer/consumer.py
import logging
import os
import signal
import sys
import time
from typing import Generator
import psycopg2
from psycopg2.extras import LogicalReplicationConnection
from .delta_writer import DeltaWriter
from .transaction_buffer import TransactionBuffer
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
# --- Configuration ---
# In a real app, use Pydantic BaseSettings or similar
DB_CONN_INFO = os.environ.get("DB_CONN_INFO", "dbname=postgres user=postgres password=postgres host=postgres port=5432")
REPLICATION_SLOT_NAME = os.environ.get("REPLICATION_SLOT_NAME", "products_cdc_slot")
PUBLICATION_NAME = os.environ.get("PUBLICATION_NAME", "cdc_publication")
DELTA_TABLE_PATH = os.environ.get("DELTA_TABLE_PATH", "/app/delta/products")
MESSAGES_PER_ITERATION = 100
class PostgresCdcConsumer:
"""
Connects to a PostgreSQL logical replication slot and consumes changes,
passing them to a writer in transaction batches.
"""
def __init__(self, conn_info: str, slot_name: str, publication_name: str):
self.conn_info = conn_info
self.slot_name = slot_name
self.publication_name = publication_name
self.conn = None
self.cur = None
self.running = True
self.last_lsn = None
def _connect(self):
"""Establishes a logical replication connection."""
logging.info("Attempting to connect to PostgreSQL for replication...")
try:
self.conn = psycopg2.connect(
self.conn_info,
connection_factory=LogicalReplicationConnection
)
self.cur = self.conn.cursor()
logging.info("Successfully connected.")
except psycopg2.OperationalError as e:
logging.error(f"Failed to connect to database: {e}")
raise
def _start_replication(self):
"""Starts the replication stream."""
logging.info(f"Starting replication on slot '{self.slot_name}' for publication '{self.publication_name}'")
try:
options = {'proto_version': '1', 'publication_names': self.publication_name}
self.cur.start_replication(
slot_name=self.slot_name,
options=options,
decode=True
)
logging.info("Replication started.")
except psycopg2.ProgrammingError as e:
logging.error(f"Could not start replication: {e}. Check if slot and publication exist.")
raise
def consume_stream(self) -> Generator[bytes, None, None]:
"""
A generator that yields raw messages from the replication stream.
Manages connection and keepalives.
"""
self._connect()
self._start_replication()
while self.running:
try:
# The keepalive is important to prevent timeouts on idle connections.
msg = self.cur.read_message()
if msg:
yield msg
else:
time.sleep(0.1) # Avoid busy-waiting when no messages
except psycopg2.OperationalError as e:
logging.warning(f"Connection issue, attempting to reconnect: {e}")
self.close()
time.sleep(5)
self._connect()
self._start_replication()
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
self.running = False
def close(self):
if self.cur:
self.cur.close()
if self.conn:
self.conn.close()
logging.info("Connection closed.")
def main():
consumer = PostgresCdcConsumer(DB_CONN_INFO, REPLICATION_SLOT_NAME, PUBLICATION_NAME)
delta_writer = DeltaWriter(DELTA_TABLE_PATH)
buffer = TransactionBuffer()
# Graceful shutdown handler
def shutdown_handler(signum, frame):
logging.info("Shutdown signal received, stopping consumer...")
consumer.running = False
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
logging.info("Starting CDC consumer service.")
try:
for msg in consumer.consume_stream():
# The WAL message payload contains critical metadata
# like the LSN (Log Sequence Number), which is the cursor/offset.
# A production system MUST persist `msg.data_start` after each successful
# write to Delta Lake to ensure at-least-once processing on restart.
# For simplicity, we are not implementing LSN checkpointing here.
buffer.add_message(msg.payload)
# When a transaction is complete, process it
if buffer.is_transaction_complete():
txn_data = buffer.get_completed_transaction()
if txn_data:
logging.info(f"Processing transaction with {len(txn_data.changes)} changes.")
try:
delta_writer.apply_transaction(txn_data)
logging.info("Successfully applied transaction to Delta Lake.")
except Exception as e:
# A critical pitfall: How to handle write failures?
# A robust solution needs a dead-letter queue (DLQ) and alerting.
# For now, we log the error and stop, which is not production-ready.
logging.error(f"Fatal error applying transaction to Delta Lake: {e}")
consumer.running = False # Stop processing
buffer.clear_completed_transaction()
# Send keepalives to the server
if msg.data_start > (consumer.last_lsn or 0):
consumer.cur.send_feedback(write_lsn=msg.data_start)
consumer.last_lsn = msg.data_start
finally:
consumer.close()
logging.info("CDC consumer service stopped.")
if __name__ == "__main__":
main()
The logic of parsing the WAL stream and buffering it is encapsulated in a TransactionBuffer
. This is where we translate the raw byte stream into structured change events, respecting transaction boundaries.
# cdc_consumer/transaction_buffer.py
import logging
import re
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
@dataclass
class ChangeEvent:
"""Represents a single DML operation (INSERT, UPDATE, DELETE)."""
table: str
schema: str
op_type: str # 'INSERT', 'UPDATE', 'DELETE'
# For UPDATE/DELETE, this contains the old row values
old_tuple: Optional[Dict[str, Any]] = None
# For INSERT/UPDATE, this contains the new row values
new_tuple: Optional[Dict[str, Any]] = None
@dataclass
class Transaction:
"""A collection of change events that occurred within a single DB transaction."""
xid: int # Transaction ID
changes: List[ChangeEvent] = field(default_factory=list)
class TransactionBuffer:
"""
Parses raw logical replication messages and buffers them into transactions.
"""
def __init__(self):
self._current_transaction: Optional[Transaction] = None
self._is_complete = False
def is_transaction_complete(self) -> bool:
return self._is_complete
def get_completed_transaction(self) -> Optional[Transaction]:
return self._current_transaction
def clear_completed_transaction(self):
self._current_transaction = None
self._is_complete = False
def add_message(self, payload: str):
payload_str = payload.decode('utf-8')
if payload_str.startswith("BEGIN"):
xid = int(re.search(r"BEGIN (\d+)", payload_str).group(1))
self._current_transaction = Transaction(xid=xid)
self._is_complete = False
elif payload_str.startswith("COMMIT"):
if self._current_transaction:
self._is_complete = True
elif self._current_transaction:
# This is a very basic parser. A production version would need to be
# more robust, handling various data types, quoting, etc.
change = self._parse_change_message(payload_str)
if change:
self._current_transaction.changes.append(change)
@staticmethod
def _parse_change_message(payload: str) -> Optional[ChangeEvent]:
# Example payload for an INSERT:
# table public.products: INSERT: id[integer]:1 name[character varying]:'Laptop' price[numeric]:1200.50 stock_quantity[integer]:50 last_updated_at[timestamp with time zone]:'2023-10-27 10:00:00+00'
# Example for an UPDATE:
# table public.products: UPDATE: old-key: id[integer]:1 ... new-tuple: id[integer]:1 ... stock_quantity[integer]:49 ...
match = re.match(r"table (\w+)\.(\w+): (\w+): (.*)", payload)
if not match:
return None
schema, table, op_type, data = match.groups()
old_tuple, new_tuple = None, None
if op_type == "INSERT":
new_tuple = TransactionBuffer._parse_tuple_data(data)
elif op_type == "DELETE":
old_tuple = TransactionBuffer._parse_tuple_data(data)
elif op_type == "UPDATE":
# The format for UPDATE is more complex
if 'old-key:' in data and 'new-tuple:' in data:
_, old_data, new_data = re.split(r'old-key:|new-tuple:', data)
old_tuple = TransactionBuffer._parse_tuple_data(old_data.strip())
new_tuple = TransactionBuffer._parse_tuple_data(new_data.strip())
else: # Sometimes only new tuple is present if key didn't change
new_tuple_data_match = re.search(r'new-tuple: (.*)', data)
if new_tuple_data_match:
new_tuple = TransactionBuffer._parse_tuple_data(new_tuple_data_match.group(1))
return ChangeEvent(
schema=schema,
table=table,
op_type=op_type,
old_tuple=old_tuple,
new_tuple=new_tuple
)
@staticmethod
def _parse_tuple_data(data: str) -> Dict[str, Any]:
"""A simplified parser for the column data string."""
parsed = {}
# Regex to find 'col_name[type]:value' patterns. Handles quoted strings.
# This is a simplification and has limitations with complex types.
pattern = re.compile(r"(\w+)\[.*?\]:('([^']*)'|[^ ]+)")
for match in pattern.finditer(data):
key, value, quoted_value = match.groups()
# If the value was quoted, use the content inside the quotes.
# Otherwise, use the unquoted value.
final_value = quoted_value if quoted_value is not None else value
# A real implementation needs proper type casting based on the `[type]` part.
# For now, we treat everything as a string and let Delta Lake/Spark infer schema.
parsed[key] = final_value
return parsed
Phase 3: Transactional Writes to Delta Lake
Once we have a complete transaction buffered, we must apply it to the Delta table. This is the most critical step for maintaining consistency. A naive append
operation would fail to handle UPDATE
s and DELETE
s. The correct tool for this is Delta Lake’s MERGE
operation. It allows us to perform “UPSERT” logic atomically.
We’ll use the deltalake
Python library for this.
# cdc_consumer/delta_writer.py
import logging
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import TableNotFoundError
import pyarrow as pa
from .transaction_buffer import Transaction, ChangeEvent
class DeltaWriter:
"""
Handles writing transactional data to a Delta Lake table.
"""
def __init__(self, table_path: str):
self.table_path = table_path
self._ensure_table_exists()
def _ensure_table_exists(self):
"""Creates the Delta table if it doesn't exist."""
try:
DeltaTable(self.table_path)
logging.info(f"Delta table found at {self.table_path}")
except TableNotFoundError:
logging.warning(f"Delta table not found at {self.table_path}. Creating a new one.")
# Define an initial schema. A production system would infer this
# from the database or have it predefined.
schema = pa.schema([
pa.field("id", pa.int32(), nullable=False),
pa.field("name", pa.string()),
pa.field("price", pa.decimal128(10, 2)),
pa.field("stock_quantity", pa.int32()),
pa.field("last_updated_at", pa.timestamp('us', tz='UTC')),
# We add a metadata column to handle deletes
pa.field("__is_deleted", pa.bool_(), nullable=False)
])
# Create an empty table
empty_table = pa.Table.from_pydict({}, schema=schema)
write_deltalake(self.table_path, empty_table)
logging.info("New Delta table created.")
def _convert_changes_to_arrow(self, changes: list[ChangeEvent]) -> pa.Table:
"""Converts a list of change events into a PyArrow Table for the MERGE operation."""
# The goal is to create a DataFrame/Arrow Table of the *changes*
data = []
for change in changes:
# We care about the final state of the row in the transaction
row_data = change.new_tuple or change.old_tuple
if not row_data:
continue
# A common pitfall is type mismatch. We need to cast our string values
# from the parser to the correct types for the Arrow schema.
processed_row = {
'id': int(row_data['id']),
'name': str(row_data.get('name')),
'price': float(row_data.get('price', 0.0)),
'stock_quantity': int(row_data.get('stock_quantity', 0)),
'last_updated_at': str(row_data.get('last_updated_at')),
# This flag is key for our MERGE logic
'__is_deleted': change.op_type == 'DELETE'
}
data.append(processed_row)
if not data:
return None
# Convert dictionary to PyArrow Table
# A more robust solution would use the existing Delta table's schema
# to ensure type consistency.
return pa.Table.from_pylist(data)
def apply_transaction(self, transaction: Transaction):
"""
Applies a database transaction to the Delta table using a single MERGE operation.
"""
if not transaction.changes:
logging.info(f"Transaction {transaction.xid} had no relevant changes. Skipping.")
return
source_table = self._convert_changes_to_arrow(transaction.changes)
if source_table is None:
logging.info(f"Transaction {transaction.xid} resulted in no data to merge. Skipping.")
return
dt = DeltaTable(self.table_path)
# The MERGE operation is the atomic unit that ensures consistency.
# 'source' is an alias for our incoming Arrow table of changes.
# 'target' is an alias for the existing Delta table.
# The predicate `target.id = source.id` links rows from the change data to existing rows.
(dt.merge(
source=source_table,
predicate='target.id = source.id',
source_alias='source',
target_alias='target')
.when_matched_update(
# If a row matches and is a DELETE event, mark it as deleted.
# If it's an UPDATE, update all columns.
predicate="source.__is_deleted = true",
updates={"target.__is_deleted": "true"}
)
.when_matched_update(
predicate="source.__is_deleted = false",
updates={
"target.name": "source.name",
"target.price": "source.price",
"target.stock_quantity": "source.stock_quantity",
"target.last_updated_at": "source.last_updated_at",
"target.__is_deleted": "source.__is_deleted"
}
)
.when_not_matched_insert(
# If a new ID arrives and it's not a delete event, insert it.
# We should not insert rows that were created and deleted in the same transaction.
predicate="source.__is_deleted = false",
values={
"id": "source.id",
"name": "source.name",
"price": "source.price",
"stock_quantity": "source.stock_quantity",
"last_updated_at": "source.last_updated_at",
"__is_deleted": "source.__is_deleted"
}
).execute())
The use of a soft-delete flag (__is_deleted
) is a pragmatic choice. Physically deleting rows in a data lake can be inefficient. Soft deletes preserve history and are simpler to implement in the MERGE
statement. Downstream consumers, like our FastAPI service, will simply need to filter WHERE __is_deleted = false
.
Phase 4: The FastAPI Analytical Service
With data flowing reliably into Delta Lake, the final step is to expose it. FastAPI is an excellent choice due to its performance, async capabilities, and integration with Pydantic for data validation. This service will read from the Delta table, providing a clean, RESTful interface for analytical queries.
# api/main.py
import logging
import os
from contextlib import asynccontextmanager
from deltalake import DeltaTable
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import pyarrow.dataset as ds
# --- Configuration ---
DELTA_TABLE_PATH = os.environ.get("DELTA_TABLE_PATH", "/app/delta/products")
# --- Global State (managed by lifespan) ---
delta_table = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global delta_table
logging.info("FastAPI starting up...")
try:
# Load the Delta table once on startup.
# The DeltaTable object handles refreshing to the latest version.
delta_table = DeltaTable(DELTA_TABLE_PATH)
logging.info(f"Successfully loaded Delta table version {delta_table.version()}.")
except Exception as e:
logging.error(f"Failed to load Delta table on startup: {e}")
# Or let it fail hard depending on requirements
delta_table = None
yield
logging.info("FastAPI shutting down...")
app = FastAPI(lifespan=lifespan)
# --- Pydantic Models for API responses ---
class Product(BaseModel):
id: int
name: str
price: float
stock_quantity: int = Field(alias="stock_quantity")
# --- API Endpoints ---
@app.get("/health")
def health_check():
if delta_table is None:
raise HTTPException(status_code=503, detail="Delta table is not available.")
return {"status": "ok", "delta_table_version": delta_table.version()}
@app.get("/products/{product_id}", response_model=Product)
def get_product_by_id(product_id: int):
"""Fetches a single product by its ID from the Delta table."""
if delta_table is None:
raise HTTPException(status_code=503, detail="Delta table is not available.")
# The read operation is simple and fast.
# DeltaTable.to_pyarrow_dataset() is efficient for filtering.
# We explicitly filter out soft-deleted records.
try:
data = delta_table.to_pyarrow_dataset(
filters=[
("id", "=", product_id),
("__is_deleted", "=", False)
]
).to_table().to_pylist()
except Exception as e:
# This could happen if the table is being compacted or schemas are changing.
logging.error(f"Error querying Delta table for product {product_id}: {e}")
raise HTTPException(status_code=500, detail="Error querying data warehouse.")
if not data:
raise HTTPException(status_code=404, detail=f"Product with id {product_id} not found.")
return data[0]
@app.get("/products", response_model=list[Product])
def get_all_products(limit: int = 100):
"""Fetches a list of all active products."""
if delta_table is None:
raise HTTPException(status_code=503, detail="Delta table is not available.")
try:
data = (delta_table.to_pyarrow_table(
filter=ds.field("__is_deleted") == False
).slice(length=limit)).to_pylist()
except Exception as e:
logging.error(f"Error querying all products: {e}")
raise HTTPException(status_code=500, detail="Error querying data warehouse.")
return data
System Architecture Overview
The final architecture is a linear flow that successfully decouples the OLTP and OLAP workloads.
graph TD subgraph Legacy System A[Java Monolith with MyBatis] -->|Writes/Updates| B(PostgreSQL Primary DB); end subgraph CDC Pipeline B -->|WAL Logical Replication| C{Replication Slot}; C --> D[Python CDC Consumer Service]; D -->|Transactional MERGE| E(Delta Lake Table); end subgraph Analytical Service E -->|Reads| F[FastAPI Service]; F --> G[API Client/Dashboard]; end style A fill:#f9f,stroke:#333,stroke-width:2px style B fill:#cce,stroke:#333,stroke-width:2px style E fill:#9cf,stroke:#333,stroke-width:2px style F fill:#9f9,stroke:#333,stroke-width:2px
This design achieved its primary goal. The legacy application continues to operate unchanged, while the analytical services now have access to fresh, transactionally consistent data in a scalable format. The choice to build a custom consumer provided fine-grained control and avoided significant infrastructure overhead.
However, this solution is not without its limitations. The single-node Python consumer represents a single point of failure and a potential performance bottleneck. For true high-throughput environments, scaling this component would be the next engineering challenge, likely involving a message queue like Kafka to decouple consumption from processing and allow for multiple parallel writers. Furthermore, the current implementation lacks robust LSN checkpointing, meaning a crash could lead to reprocessing or, in worse scenarios, lost data. A production-grade version would need to persist the last successfully processed LSN to a reliable store. Finally, handling schema evolution (e.g., ALTER TABLE
commands) automatically remains an unsolved and complex problem in this architecture, requiring a mechanism to detect schema changes in the WAL stream and apply corresponding DDL to the Delta table.