Implementing a Transaction-Aware Data Synchronization Pipeline from a Relational Database to Delta Lake for FastAPI Endpoints


The mandate was clear: provide near real-time analytical access to our core operational data without compromising the performance of the primary OLTP database. This database, a battle-hardened PostgreSQL instance, serves a monolithic Java application that relies heavily on MyBatis for its data access layer. Any attempt to run complex, long-running analytical queries directly against it resulted in lock contention, query timeouts, and palpable operational risk. The initial thought of using a read replica was dismissed; it solved the read isolation problem but failed to address the need for data in an analytics-friendly columnar format like Parquet, and the costs of maintaining a powerful, idle replica were unjustifiable.

The chosen path was architectural decoupling. We decided to stream changes from the production database into a Delta Lake table. This would provide a transactionally consistent, versioned, and columnar representation of our data, perfect for analytics. A new, lightweight service built with FastAPI would then expose this data to downstream consumers. This approach isolates workloads, leverages modern data formats, and opens the door for future data science initiatives. The real challenge, however, lay in building the bridge—a custom service capable of capturing database transactions faithfully and replaying them onto the Delta Lake table without data loss or corruption.

This is the log of that build. We deliberately avoided a heavyweight stack involving Kafka, Debezium, and Spark Streaming. While powerful, that stack introduced operational complexity and infrastructure costs we weren’t ready to incur for this specific use case. Instead, we opted for a leaner, more direct approach using PostgreSQL’s native logical replication and a custom Python consumer.

Phase 1: Preparing the Source Database for Logical Replication

Before any data can be streamed, the source PostgreSQL instance must be configured to emit changes through its Write-Ahead Log (WAL) in a logical format. This is a non-default setting and a critical first step. In a real-world project, this requires DBA involvement and a scheduled maintenance window.

The core setting is wal_level. It must be set to logical.

-- This configuration change requires a server restart.
-- Connect to the PostgreSQL instance as a superuser.
ALTER SYSTEM SET wal_level = 'logical';

-- Additionally, ensure you have enough WAL senders to handle replication clients.
-- The default is often 10. Increase if you plan on having multiple replication clients.
ALTER SYSTEM SET max_wal_senders = '12';

-- A restart of the PostgreSQL service is required for these to take effect.
-- e.g., using pg_ctl: pg_ctl restart -D /path/to/your/data_directory

With the server configured, we can create the specific table that our legacy MyBatis application interacts with. For this demonstration, we’ll use a simple products table.

CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    price NUMERIC(10, 2) NOT NULL,
    stock_quantity INTEGER NOT NULL,
    last_updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- For logical replication, the table must have a REPLICA IDENTITY.
-- 'FULL' logs the full previous row content on UPDATE and DELETE, which is
-- essential for our MERGE logic later. 'DEFAULT' (primary key) is not sufficient.
ALTER TABLE products REPLICA IDENTITY FULL;

The final piece of setup on the database side is creating a logical replication slot. This is a crucial concept. The slot ensures that the PostgreSQL server retains the WAL files needed by our consumer, even if the consumer disconnects. Without a slot, the server would prune old WAL files, and our consumer could lose changes permanently.

-- Create a publication that specifies which tables to replicate changes for.
CREATE PUBLICATION cdc_publication FOR TABLE products;

-- Create the logical replication slot using the 'pgoutput' plugin.
-- 'pgoutput' is the standard logical decoding plugin.
-- 'products_cdc_slot' is the name our client will use to connect.
SELECT * FROM pg_create_logical_replication_slot('products_cdc_slot', 'pgoutput');

A common mistake is to forget REPLICA IDENTITY FULL. Without it, UPDATE and DELETE events in the WAL stream won’t contain the old values of the columns, making it impossible to correctly identify and modify the corresponding records in the target Delta table.

Phase 2: The Python CDC Consumer Service

This service is the heart of our custom pipeline. It connects to the replication slot, consumes the stream of changes, buffers them transactionally, and writes them to Delta Lake. We’ll use the psycopg2 library, which has support for the replication protocol.

The core logic needs to be robust, handling connection drops, parsing different message types, and most importantly, respecting transaction boundaries (BEGIN and COMMIT messages).

# cdc_consumer/consumer.py

import logging
import os
import signal
import sys
import time
from typing import Generator

import psycopg2
from psycopg2.extras import LogicalReplicationConnection

from .delta_writer import DeltaWriter
from .transaction_buffer import TransactionBuffer

logging.basicConfig(
    level=logging.INFO,

    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
)

# --- Configuration ---
# In a real app, use Pydantic BaseSettings or similar
DB_CONN_INFO = os.environ.get("DB_CONN_INFO", "dbname=postgres user=postgres password=postgres host=postgres port=5432")
REPLICATION_SLOT_NAME = os.environ.get("REPLICATION_SLOT_NAME", "products_cdc_slot")
PUBLICATION_NAME = os.environ.get("PUBLICATION_NAME", "cdc_publication")
DELTA_TABLE_PATH = os.environ.get("DELTA_TABLE_PATH", "/app/delta/products")
MESSAGES_PER_ITERATION = 100

class PostgresCdcConsumer:
    """
    Connects to a PostgreSQL logical replication slot and consumes changes,
    passing them to a writer in transaction batches.
    """
    def __init__(self, conn_info: str, slot_name: str, publication_name: str):
        self.conn_info = conn_info
        self.slot_name = slot_name
        self.publication_name = publication_name
        self.conn = None
        self.cur = None
        self.running = True
        self.last_lsn = None

    def _connect(self):
        """Establishes a logical replication connection."""
        logging.info("Attempting to connect to PostgreSQL for replication...")
        try:
            self.conn = psycopg2.connect(
                self.conn_info,
                connection_factory=LogicalReplicationConnection
            )
            self.cur = self.conn.cursor()
            logging.info("Successfully connected.")
        except psycopg2.OperationalError as e:
            logging.error(f"Failed to connect to database: {e}")
            raise

    def _start_replication(self):
        """Starts the replication stream."""
        logging.info(f"Starting replication on slot '{self.slot_name}' for publication '{self.publication_name}'")
        try:
            options = {'proto_version': '1', 'publication_names': self.publication_name}
            self.cur.start_replication(
                slot_name=self.slot_name,
                options=options,
                decode=True
            )
            logging.info("Replication started.")
        except psycopg2.ProgrammingError as e:
            logging.error(f"Could not start replication: {e}. Check if slot and publication exist.")
            raise

    def consume_stream(self) -> Generator[bytes, None, None]:
        """
        A generator that yields raw messages from the replication stream.
        Manages connection and keepalives.
        """
        self._connect()
        self._start_replication()
        
        while self.running:
            try:
                # The keepalive is important to prevent timeouts on idle connections.
                msg = self.cur.read_message()
                if msg:
                    yield msg
                else:
                    time.sleep(0.1) # Avoid busy-waiting when no messages
            except psycopg2.OperationalError as e:
                logging.warning(f"Connection issue, attempting to reconnect: {e}")
                self.close()
                time.sleep(5)
                self._connect()
                self._start_replication()
            except Exception as e:
                logging.error(f"An unexpected error occurred: {e}")
                self.running = False

    def close(self):
        if self.cur:
            self.cur.close()
        if self.conn:
            self.conn.close()
        logging.info("Connection closed.")

def main():
    consumer = PostgresCdcConsumer(DB_CONN_INFO, REPLICATION_SLOT_NAME, PUBLICATION_NAME)
    delta_writer = DeltaWriter(DELTA_TABLE_PATH)
    buffer = TransactionBuffer()

    # Graceful shutdown handler
    def shutdown_handler(signum, frame):
        logging.info("Shutdown signal received, stopping consumer...")
        consumer.running = False

    signal.signal(signal.SIGINT, shutdown_handler)
    signal.signal(signal.SIGTERM, shutdown_handler)

    logging.info("Starting CDC consumer service.")
    
    try:
        for msg in consumer.consume_stream():
            # The WAL message payload contains critical metadata
            # like the LSN (Log Sequence Number), which is the cursor/offset.
            # A production system MUST persist `msg.data_start` after each successful
            # write to Delta Lake to ensure at-least-once processing on restart.
            # For simplicity, we are not implementing LSN checkpointing here.
            
            buffer.add_message(msg.payload)
            
            # When a transaction is complete, process it
            if buffer.is_transaction_complete():
                txn_data = buffer.get_completed_transaction()
                if txn_data:
                    logging.info(f"Processing transaction with {len(txn_data.changes)} changes.")
                    try:
                        delta_writer.apply_transaction(txn_data)
                        logging.info("Successfully applied transaction to Delta Lake.")
                    except Exception as e:
                        # A critical pitfall: How to handle write failures?
                        # A robust solution needs a dead-letter queue (DLQ) and alerting.
                        # For now, we log the error and stop, which is not production-ready.
                        logging.error(f"Fatal error applying transaction to Delta Lake: {e}")
                        consumer.running = False # Stop processing
                buffer.clear_completed_transaction()

            # Send keepalives to the server
            if msg.data_start > (consumer.last_lsn or 0):
                consumer.cur.send_feedback(write_lsn=msg.data_start)
                consumer.last_lsn = msg.data_start

    finally:
        consumer.close()
        logging.info("CDC consumer service stopped.")

if __name__ == "__main__":
    main()

The logic of parsing the WAL stream and buffering it is encapsulated in a TransactionBuffer. This is where we translate the raw byte stream into structured change events, respecting transaction boundaries.

# cdc_consumer/transaction_buffer.py

import logging
import re
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any

@dataclass
class ChangeEvent:
    """Represents a single DML operation (INSERT, UPDATE, DELETE)."""
    table: str
    schema: str
    op_type: str # 'INSERT', 'UPDATE', 'DELETE'
    # For UPDATE/DELETE, this contains the old row values
    old_tuple: Optional[Dict[str, Any]] = None
    # For INSERT/UPDATE, this contains the new row values
    new_tuple: Optional[Dict[str, Any]] = None

@dataclass
class Transaction:
    """A collection of change events that occurred within a single DB transaction."""
    xid: int # Transaction ID
    changes: List[ChangeEvent] = field(default_factory=list)

class TransactionBuffer:
    """
    Parses raw logical replication messages and buffers them into transactions.
    """
    def __init__(self):
        self._current_transaction: Optional[Transaction] = None
        self._is_complete = False

    def is_transaction_complete(self) -> bool:
        return self._is_complete

    def get_completed_transaction(self) -> Optional[Transaction]:
        return self._current_transaction
    
    def clear_completed_transaction(self):
        self._current_transaction = None
        self._is_complete = False

    def add_message(self, payload: str):
        payload_str = payload.decode('utf-8')

        if payload_str.startswith("BEGIN"):
            xid = int(re.search(r"BEGIN (\d+)", payload_str).group(1))
            self._current_transaction = Transaction(xid=xid)
            self._is_complete = False
        elif payload_str.startswith("COMMIT"):
            if self._current_transaction:
                self._is_complete = True
        elif self._current_transaction:
            # This is a very basic parser. A production version would need to be
            # more robust, handling various data types, quoting, etc.
            change = self._parse_change_message(payload_str)
            if change:
                self._current_transaction.changes.append(change)

    @staticmethod
    def _parse_change_message(payload: str) -> Optional[ChangeEvent]:
        # Example payload for an INSERT:
        # table public.products: INSERT: id[integer]:1 name[character varying]:'Laptop' price[numeric]:1200.50 stock_quantity[integer]:50 last_updated_at[timestamp with time zone]:'2023-10-27 10:00:00+00'
        # Example for an UPDATE:
        # table public.products: UPDATE: old-key: id[integer]:1 ... new-tuple: id[integer]:1 ... stock_quantity[integer]:49 ...
        
        match = re.match(r"table (\w+)\.(\w+): (\w+): (.*)", payload)
        if not match:
            return None

        schema, table, op_type, data = match.groups()
        
        old_tuple, new_tuple = None, None

        if op_type == "INSERT":
            new_tuple = TransactionBuffer._parse_tuple_data(data)
        elif op_type == "DELETE":
            old_tuple = TransactionBuffer._parse_tuple_data(data)
        elif op_type == "UPDATE":
            # The format for UPDATE is more complex
            if 'old-key:' in data and 'new-tuple:' in data:
                 _, old_data, new_data = re.split(r'old-key:|new-tuple:', data)
                 old_tuple = TransactionBuffer._parse_tuple_data(old_data.strip())
                 new_tuple = TransactionBuffer._parse_tuple_data(new_data.strip())
            else: # Sometimes only new tuple is present if key didn't change
                new_tuple_data_match = re.search(r'new-tuple: (.*)', data)
                if new_tuple_data_match:
                    new_tuple = TransactionBuffer._parse_tuple_data(new_tuple_data_match.group(1))
        
        return ChangeEvent(
            schema=schema,
            table=table,
            op_type=op_type,
            old_tuple=old_tuple,
            new_tuple=new_tuple
        )
    
    @staticmethod
    def _parse_tuple_data(data: str) -> Dict[str, Any]:
        """A simplified parser for the column data string."""
        parsed = {}
        # Regex to find 'col_name[type]:value' patterns. Handles quoted strings.
        # This is a simplification and has limitations with complex types.
        pattern = re.compile(r"(\w+)\[.*?\]:('([^']*)'|[^ ]+)")
        for match in pattern.finditer(data):
            key, value, quoted_value = match.groups()
            # If the value was quoted, use the content inside the quotes.
            # Otherwise, use the unquoted value.
            final_value = quoted_value if quoted_value is not None else value
            # A real implementation needs proper type casting based on the `[type]` part.
            # For now, we treat everything as a string and let Delta Lake/Spark infer schema.
            parsed[key] = final_value
        return parsed

Phase 3: Transactional Writes to Delta Lake

Once we have a complete transaction buffered, we must apply it to the Delta table. This is the most critical step for maintaining consistency. A naive append operation would fail to handle UPDATEs and DELETEs. The correct tool for this is Delta Lake’s MERGE operation. It allows us to perform “UPSERT” logic atomically.

We’ll use the deltalake Python library for this.

# cdc_consumer/delta_writer.py

import logging
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import TableNotFoundError
import pyarrow as pa
from .transaction_buffer import Transaction, ChangeEvent

class DeltaWriter:
    """
    Handles writing transactional data to a Delta Lake table.
    """
    def __init__(self, table_path: str):
        self.table_path = table_path
        self._ensure_table_exists()

    def _ensure_table_exists(self):
        """Creates the Delta table if it doesn't exist."""
        try:
            DeltaTable(self.table_path)
            logging.info(f"Delta table found at {self.table_path}")
        except TableNotFoundError:
            logging.warning(f"Delta table not found at {self.table_path}. Creating a new one.")
            # Define an initial schema. A production system would infer this
            # from the database or have it predefined.
            schema = pa.schema([
                pa.field("id", pa.int32(), nullable=False),
                pa.field("name", pa.string()),
                pa.field("price", pa.decimal128(10, 2)),
                pa.field("stock_quantity", pa.int32()),
                pa.field("last_updated_at", pa.timestamp('us', tz='UTC')),
                # We add a metadata column to handle deletes
                pa.field("__is_deleted", pa.bool_(), nullable=False)
            ])
            # Create an empty table
            empty_table = pa.Table.from_pydict({}, schema=schema)
            write_deltalake(self.table_path, empty_table)
            logging.info("New Delta table created.")
            
    def _convert_changes_to_arrow(self, changes: list[ChangeEvent]) -> pa.Table:
        """Converts a list of change events into a PyArrow Table for the MERGE operation."""
        # The goal is to create a DataFrame/Arrow Table of the *changes*
        data = []
        for change in changes:
            # We care about the final state of the row in the transaction
            row_data = change.new_tuple or change.old_tuple
            if not row_data:
                continue
                
            # A common pitfall is type mismatch. We need to cast our string values
            # from the parser to the correct types for the Arrow schema.
            processed_row = {
                'id': int(row_data['id']),
                'name': str(row_data.get('name')),
                'price': float(row_data.get('price', 0.0)),
                'stock_quantity': int(row_data.get('stock_quantity', 0)),
                'last_updated_at': str(row_data.get('last_updated_at')),
                # This flag is key for our MERGE logic
                '__is_deleted': change.op_type == 'DELETE'
            }
            data.append(processed_row)
        
        if not data:
            return None
        
        # Convert dictionary to PyArrow Table
        # A more robust solution would use the existing Delta table's schema
        # to ensure type consistency.
        return pa.Table.from_pylist(data)

    def apply_transaction(self, transaction: Transaction):
        """
        Applies a database transaction to the Delta table using a single MERGE operation.
        """
        if not transaction.changes:
            logging.info(f"Transaction {transaction.xid} had no relevant changes. Skipping.")
            return

        source_table = self._convert_changes_to_arrow(transaction.changes)
        if source_table is None:
            logging.info(f"Transaction {transaction.xid} resulted in no data to merge. Skipping.")
            return

        dt = DeltaTable(self.table_path)

        # The MERGE operation is the atomic unit that ensures consistency.
        # 'source' is an alias for our incoming Arrow table of changes.
        # 'target' is an alias for the existing Delta table.
        # The predicate `target.id = source.id` links rows from the change data to existing rows.
        (dt.merge(
            source=source_table,
            predicate='target.id = source.id',
            source_alias='source',
            target_alias='target')
        .when_matched_update(
            # If a row matches and is a DELETE event, mark it as deleted.
            # If it's an UPDATE, update all columns.
            predicate="source.__is_deleted = true",
            updates={"target.__is_deleted": "true"}
        )
        .when_matched_update(
            predicate="source.__is_deleted = false",
            updates={
                "target.name": "source.name",
                "target.price": "source.price",
                "target.stock_quantity": "source.stock_quantity",
                "target.last_updated_at": "source.last_updated_at",
                "target.__is_deleted": "source.__is_deleted"
            }
        )
        .when_not_matched_insert(
            # If a new ID arrives and it's not a delete event, insert it.
            # We should not insert rows that were created and deleted in the same transaction.
            predicate="source.__is_deleted = false",
            values={
                "id": "source.id",
                "name": "source.name",
                "price": "source.price",
                "stock_quantity": "source.stock_quantity",
                "last_updated_at": "source.last_updated_at",
                "__is_deleted": "source.__is_deleted"
            }
        ).execute())

The use of a soft-delete flag (__is_deleted) is a pragmatic choice. Physically deleting rows in a data lake can be inefficient. Soft deletes preserve history and are simpler to implement in the MERGE statement. Downstream consumers, like our FastAPI service, will simply need to filter WHERE __is_deleted = false.

Phase 4: The FastAPI Analytical Service

With data flowing reliably into Delta Lake, the final step is to expose it. FastAPI is an excellent choice due to its performance, async capabilities, and integration with Pydantic for data validation. This service will read from the Delta table, providing a clean, RESTful interface for analytical queries.

# api/main.py

import logging
import os
from contextlib import asynccontextmanager

from deltalake import DeltaTable
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import pyarrow.dataset as ds

# --- Configuration ---
DELTA_TABLE_PATH = os.environ.get("DELTA_TABLE_PATH", "/app/delta/products")

# --- Global State (managed by lifespan) ---
delta_table = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global delta_table
    logging.info("FastAPI starting up...")
    try:
        # Load the Delta table once on startup.
        # The DeltaTable object handles refreshing to the latest version.
        delta_table = DeltaTable(DELTA_TABLE_PATH)
        logging.info(f"Successfully loaded Delta table version {delta_table.version()}.")
    except Exception as e:
        logging.error(f"Failed to load Delta table on startup: {e}")
        # Or let it fail hard depending on requirements
        delta_table = None 
    yield
    logging.info("FastAPI shutting down...")

app = FastAPI(lifespan=lifespan)

# --- Pydantic Models for API responses ---
class Product(BaseModel):
    id: int
    name: str
    price: float
    stock_quantity: int = Field(alias="stock_quantity")

# --- API Endpoints ---
@app.get("/health")
def health_check():
    if delta_table is None:
        raise HTTPException(status_code=503, detail="Delta table is not available.")
    return {"status": "ok", "delta_table_version": delta_table.version()}

@app.get("/products/{product_id}", response_model=Product)
def get_product_by_id(product_id: int):
    """Fetches a single product by its ID from the Delta table."""
    if delta_table is None:
        raise HTTPException(status_code=503, detail="Delta table is not available.")

    # The read operation is simple and fast.
    # DeltaTable.to_pyarrow_dataset() is efficient for filtering.
    # We explicitly filter out soft-deleted records.
    try:
        data = delta_table.to_pyarrow_dataset(
            filters=[
                ("id", "=", product_id),
                ("__is_deleted", "=", False)
            ]
        ).to_table().to_pylist()
    except Exception as e:
        # This could happen if the table is being compacted or schemas are changing.
        logging.error(f"Error querying Delta table for product {product_id}: {e}")
        raise HTTPException(status_code=500, detail="Error querying data warehouse.")

    if not data:
        raise HTTPException(status_code=404, detail=f"Product with id {product_id} not found.")

    return data[0]

@app.get("/products", response_model=list[Product])
def get_all_products(limit: int = 100):
    """Fetches a list of all active products."""
    if delta_table is None:
        raise HTTPException(status_code=503, detail="Delta table is not available.")

    try:
        data = (delta_table.to_pyarrow_table(
            filter=ds.field("__is_deleted") == False
        ).slice(length=limit)).to_pylist()
    except Exception as e:
        logging.error(f"Error querying all products: {e}")
        raise HTTPException(status_code=500, detail="Error querying data warehouse.")

    return data

System Architecture Overview

The final architecture is a linear flow that successfully decouples the OLTP and OLAP workloads.

graph TD
    subgraph Legacy System
        A[Java Monolith with MyBatis] -->|Writes/Updates| B(PostgreSQL Primary DB);
    end

    subgraph CDC Pipeline
        B -->|WAL Logical Replication| C{Replication Slot};
        C --> D[Python CDC Consumer Service];
        D -->|Transactional MERGE| E(Delta Lake Table);
    end

    subgraph Analytical Service
        E -->|Reads| F[FastAPI Service];
        F --> G[API Client/Dashboard];
    end

    style A fill:#f9f,stroke:#333,stroke-width:2px
    style B fill:#cce,stroke:#333,stroke-width:2px
    style E fill:#9cf,stroke:#333,stroke-width:2px
    style F fill:#9f9,stroke:#333,stroke-width:2px

This design achieved its primary goal. The legacy application continues to operate unchanged, while the analytical services now have access to fresh, transactionally consistent data in a scalable format. The choice to build a custom consumer provided fine-grained control and avoided significant infrastructure overhead.

However, this solution is not without its limitations. The single-node Python consumer represents a single point of failure and a potential performance bottleneck. For true high-throughput environments, scaling this component would be the next engineering challenge, likely involving a message queue like Kafka to decouple consumption from processing and allow for multiple parallel writers. Furthermore, the current implementation lacks robust LSN checkpointing, meaning a crash could lead to reprocessing or, in worse scenarios, lost data. A production-grade version would need to persist the last successfully processed LSN to a reliable store. Finally, handling schema evolution (e.g., ALTER TABLE commands) automatically remains an unsolved and complex problem in this architecture, requiring a mechanism to detect schema changes in the WAL stream and apply corresponding DDL to the Delta table.


  TOC