Building a Compensating Transaction Framework with Celery for Oracle DB Interactions on EKS


Our core order processing was trapped inside a monolithic 3,000-line Oracle PL/SQL stored procedure. For years, it was the unassailable heart of the system, a black box that handled everything from inventory checks and payment processing to customer notifications. It worked, until it didn’t. Scaling was impossible, adding a new step (like calling a third-party fraud detection API) was a high-risk, month-long project, and any failure mid-execution left the database in an inconsistent state requiring hours of manual data correction by a DBA. The mandate was clear: break it apart. But doing so introduced the classic distributed systems problem—how do you maintain transactional integrity across multiple, independent services and a legacy database without a distributed transaction coordinator?

The initial concept was to decompose the monolith into a sequence of services running on AWS EKS, orchestrated by Celery tasks. This would give us scalability, independent deployments, and the ability to use the right tool for each job. However, a simple chain of Celery tasks (task1.s() | task2.s() | task3.s()) is brittle. If task3 fails, the work done by task1 and task2 remains committed. Standard database rollbacks don’t work across service boundaries. Two-phase commit (2PC) was immediately ruled out. It’s a protocol that requires holding locks across distributed resources, which would be disastrous for performance and is notoriously difficult to implement correctly in a heterogeneous environment spanning Kubernetes services and a standalone Oracle instance.

This led us directly to the Saga pattern. Instead of an atomic ACID transaction, a saga is a sequence of local transactions. Each local transaction updates the database and publishes a message or event to trigger the next one. The key is that for every forward action (e.g., ReserveInventory), there must be a corresponding compensating action (e.g., ReleaseInventory). If any step in the saga fails, the previously completed steps are compensated for by executing their compensating actions in reverse order. We chose an orchestration-based saga, where a central coordinator calls each service in turn, over a choreography-based one. For a critical business process, having a single, observable point of control for the entire workflow is far more manageable than trying to trace a cascade of events across a dozen different services. Celery, as a mature task queue system, was a natural fit for building this orchestrator.

The Saga Definition and Orchestrator

The first step was to design a declarative way to define a saga. Hard-coding the workflow into a single, massive Celery task would just recreate the monolith problem in Python. The solution was a data structure that defines the sequence of steps, their parameters, and their corresponding compensating actions.

A saga is defined as a list of steps. Each step is a dictionary containing the forward action task signature and the compensation action task signature.

# project/sagas/definitions.py

# A declarative definition for a complex order processing saga.
# Each step includes the forward task, its arguments, the compensating task,
# and the arguments for compensation.
ORDER_PROCESSING_SAGA = [
    {
        "name": "VALIDATE_PAYMENT_METHOD",
        "forward": {
            "task": "project.tasks.payments.validate_payment_method",
            # Placeholders like {{ order_id }} will be resolved at runtime
            "args_template": {
                "order_id": "{{ order_id }}",
                "customer_id": "{{ customer_id }}",
            }
        },
        # No compensation needed for a read-only validation
        "compensation": None
    },
    {
        "name": "RESERVE_INVENTORY",
        "forward": {
            "task": "project.tasks.inventory.reserve_inventory_in_oracle",
            "args_template": {
                "order_id": "{{ order_id }}",
                "items": "{{ items }}",
            }
        },
        "compensation": {
            "task": "project.tasks.inventory.release_inventory_in_oracle",
            "args_template": {
                "order_id": "{{ order_id }}",
                "reason": "Saga failure at step: {{ failed_step_name }}"
            }
        }
    },
    {
        "name": "PROCESS_PAYMENT",
        "forward": {
            "task": "project.tasks.payments.process_external_payment",
            "args_template": {
                "order_id": "{{ order_id }}",
                "amount": "{{ amount }}",
                "payment_token": "{{ payment_token }}"
            }
        },
        "compensation": {
            "task": "project.tasks.payments.refund_payment",
            "args_template": {
                "order_id": "{{ order_id }}",
                "transaction_id": "{{ forward_result.transaction_id }}",
                "reason": "Saga failure at step: {{ failed_step_name }}"
            }
        }
    },
    {
        "name": "CREATE_SHIPMENT_RECORD",
        "forward": {
            "task": "project.tasks.shipping.create_shipment_in_oracle",
            "args_template": {
                "order_id": "{{ order_id }}",
                "shipping_address": "{{ shipping_address }}"
            }
        },
        "compensation": {
            "task": "project.tasks.shipping.cancel_shipment_in_oracle",
            "args_template": {
                "order_id": "{{ order_id }}",
                "reason": "Saga failure"
            }
        }
    }
]

With this definition, the orchestrator becomes a generic Celery task that interprets the structure. It doesn’t know or care about inventory or payments; it only knows how to execute a sequence of tasks and their compensations.

The core of the system is the run_saga task. It maintains the state of the saga’s execution, including the results of each successful forward step. If a step fails, this state is used to execute the necessary compensations.

# project/sagas/orchestrator.py
import logging
from celery import shared_task, chord
from celery.exceptions import MaxRetriesExceededError
from jinja2 import Template

from project.app import celery_app

logger = logging.getLogger(__name__)

def _render_args(args_template, context):
    """Renders Jinja2 templates in task arguments."""
    if not args_template:
        return {}
    rendered_args = {}
    for key, value in args_template.items():
        if isinstance(value, str):
            template = Template(value)
            rendered_args[key] = template.render(context)
        else:
            # Pass through non-string values (like lists or dicts)
            rendered_args[key] = value
    return rendered_args

@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def run_saga(self, saga_definition, initial_context):
    """
    Orchestrates a saga defined by a list of steps.
    
    This task is the heart of the compensating transaction framework. It
    maintains a log of completed steps. If any forward task fails, it uses
    this log to trigger compensating tasks in reverse order.
    
    :param self: Celery task instance.
    :param saga_definition: A list of dicts defining the saga steps.
    :param initial_context: A dict with the initial data for the saga.
    """
    # saga_state stores the history of completed forward tasks and their results
    # In a real-world project, this state should be persisted to a durable
    # store (e.g., Redis, DynamoDB, or an Oracle table) to survive worker restarts.
    # For this implementation, we store it within the task's request context,
    # which is risky but sufficient for demonstration.
    if 'saga_state' not in self.request.kwargs:
        self.request.kwargs['saga_state'] = {
            "completed_steps": [],
            "context": initial_context.copy()
        }
    
    saga_state = self.request.kwargs['saga_state']
    start_index = len(saga_state["completed_steps"])

    for i in range(start_index, len(saga_definition)):
        step = saga_definition[i]
        step_name = step.get("name", f"step_{i}")
        logger.info(f"Saga '{self.request.id}': Executing step '{step_name}'")

        try:
            forward_task_def = step["forward"]
            forward_task = celery_app.signature(forward_task_def["task"])
            
            # Dynamically build task arguments from the current context
            forward_args = _render_args(
                forward_task_def.get("args_template", {}),
                saga_state["context"]
            )

            # Execute the forward task synchronously to handle failures immediately
            result = forward_task.apply_async(kwargs=forward_args).get(propagate=True)
            
            logger.info(f"Saga '{self.request.id}': Step '{step_name}' completed successfully.")

            # Update context with the result of the forward task
            saga_state["context"][f"forward_result_{i}"] = result
            saga_state["context"]["forward_result"] = result # for easy access to last result
            
            # Record successful completion
            saga_state["completed_steps"].append({
                "index": i,
                "name": step_name,
                "compensation": step.get("compensation")
            })

        except Exception as e:
            logger.error(
                f"Saga '{self.request.id}': Step '{step_name}' failed. Error: {e}. "
                "Initiating compensation."
            )
            # Add context about the failure for compensation tasks
            saga_state["context"]["failed_step_name"] = step_name
            saga_state["context"]["failure_reason"] = str(e)

            trigger_compensations.delay(
                saga_id=self.request.id,
                completed_steps=saga_state["completed_steps"],
                context=saga_state["context"]
            )
            # We must re-raise the exception to signal the failure to Celery
            # and potentially trigger retries of the orchestrator itself if desired.
            raise

    logger.info(f"Saga '{self.request.id}': All steps completed successfully.")
    return {"status": "SUCCESS", "final_context": saga_state["context"]}


@shared_task
def trigger_compensations(saga_id, completed_steps, context):
    """
    Creates and executes a group of compensating tasks in reverse order.
    """
    logger.warning(f"Saga '{saga_id}': Triggering compensations for {len(completed_steps)} steps.")
    
    compensation_signatures = []
    # Iterate in reverse order
    for step_info in reversed(completed_steps):
        comp_def = step_info.get("compensation")
        if not comp_def:
            logger.info(f"Saga '{saga_id}': Step '{step_info['name']}' has no compensation. Skipping.")
            continue

        comp_task = celery_app.signature(comp_def["task"])
        comp_args = _render_args(
            comp_def.get("args_template", {}),
            context
        )
        
        # We add error handling callbacks to each compensation task
        # to ensure we know if a rollback itself fails.
        comp_task.kwargs.update(comp_args)
        compensation_signatures.append(comp_task)

    if not compensation_signatures:
        logger.warning(f"Saga '{saga_id}': No compensation tasks to execute.")
        return

    # A chord is used here to execute all compensations in parallel
    # and then call a final task once all are complete.
    # In some scenarios, sequential compensation might be required.
    callback = log_compensation_completion.s(saga_id=saga_id)
    chord(compensation_signatures)(callback)


@shared_task
def log_compensation_completion(results, saga_id):
    logger.info(f"Saga '{saga_id}': All compensation tasks have been dispatched. Results: {results}")
    # In a production system, this task would update the saga's status
    # to 'COMPENSATED' and trigger alerts if any compensation failed.

A critical pitfall here is state management. The saga_state in the example above is stored in the task’s request context, which is volatile. A production-grade implementation would persist this state to a database or a key-value store like Redis after each successful step. This ensures that if the orchestrator pod dies and Celery restarts the task, it can resume from the last completed step instead of starting over.

Oracle Interaction Tasks: Idempotency and Connection Management

The real work happens in the individual tasks that interact with the Oracle database. These tasks must be written defensively. A common mistake is to assume a task will only ever run once. Celery’s default guarantee is “at-least-once” delivery, meaning a task could be executed multiple times if a worker acknowledges completion but dies before the message broker confirms it. Therefore, every database-mutating task must be idempotent.

For our reserve_inventory_in_oracle task, we can’t just run UPDATE products SET stock = stock - 1. If that runs twice, we’ve double-counted the reservation. Instead, we need a way to track that this specific action for this order has already been performed. A common pattern is to use a separate “processed messages” or “idempotency key” table.

-- DDL for idempotency tracking in Oracle
CREATE TABLE SAGA_PROCESSED_TASKS (
    task_id VARCHAR2(255) PRIMARY KEY,
    saga_id VARCHAR2(255) NOT NULL,
    task_name VARCHAR2(255) NOT NULL,
    processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

The task code must then handle connections and transactions carefully. Each task should manage its own database transaction.

# project/tasks/inventory.py
import logging
import oracledb
from celery import shared_task
from contextlib import contextmanager

from project.config import ORACLE_DSN, ORACLE_USER, ORACLE_PASSWORD

logger = logging.getLogger(__name__)

# A connection pool is essential for performance in a multi-worker environment.
# This should be initialized once when the Celery worker process starts.
try:
    pool = oracledb.create_pool(user=ORACLE_USER, password=ORACLE_PASSWORD, dsn=ORACLE_DSN, min=2, max=5, increment=1)
    logger.info("Oracle connection pool created successfully.")
except Exception as e:
    logger.error(f"Failed to create Oracle connection pool: {e}")
    pool = None

@contextmanager
def oracle_connection():
    """Provides a managed Oracle database connection from the pool."""
    if not pool:
        raise ConnectionError("Oracle connection pool is not available.")
    
    connection = None
    try:
        connection = pool.acquire()
        yield connection
    finally:
        if connection:
            pool.release(connection)

def is_task_processed(cursor, task_id, saga_id, task_name):
    """Checks if a task has already been processed using the idempotency table."""
    cursor.execute("SELECT 1 FROM SAGA_PROCESSED_TASKS WHERE task_id = :task_id", task_id=task_id)
    return cursor.fetchone() is not None

def mark_task_as_processed(cursor, task_id, saga_id, task_name):
    """Marks a task as processed in the idempotency table."""
    cursor.execute(
        """
        INSERT INTO SAGA_PROCESSED_TASKS (task_id, saga_id, task_name)
        VALUES (:task_id, :saga_id, :task_name)
        """,
        task_id=task_id, saga_id=saga_id, task_name=task_name
    )

@shared_task(bind=True)
def reserve_inventory_in_oracle(self, order_id, items):
    """
    Idempotent task to reserve inventory in the Oracle database.
    """
    with oracle_connection() as connection:
        with connection.cursor() as cursor:
            # Begin transaction
            connection.begin()
            try:
                # 1. Idempotency Check
                if is_task_processed(cursor, self.request.id, self.request.root_id, self.name):
                    logger.warning(f"Task {self.request.id} for order {order_id} already processed. Skipping.")
                    connection.rollback() # Rollback the check transaction
                    return {"status": "SKIPPED", "reason": "already_processed"}
                
                # 2. Business Logic
                for item in items:
                    # This is a simplified example. A real implementation would check
                    # for sufficient stock before updating.
                    cursor.execute(
                        """
                        UPDATE INVENTORY
                        SET reserved_stock = reserved_stock + :quantity
                        WHERE product_id = :product_id AND (total_stock - reserved_stock) >= :quantity
                        """,
                        quantity=item['quantity'],
                        product_id=item['product_id']
                    )
                    if cursor.rowcount == 0:
                        raise ValueError(f"Insufficient stock for product {item['product_id']}")

                # 3. Log idempotency key
                mark_task_as_processed(cursor, self.request.id, self.request.root_id, self.name)

                # 4. Commit transaction
                connection.commit()
                logger.info(f"Successfully reserved inventory for order {order_id}.")
                return {"status": "SUCCESS"}

            except Exception as e:
                logger.error(f"Failed to reserve inventory for order {order_id}: {e}")
                connection.rollback()
                # Re-raise to let Celery and the saga orchestrator handle the failure
                raise

@shared_task(bind=True)
def release_inventory_in_oracle(self, order_id, reason):
    """
    Idempotent compensation task to release reserved inventory.
    """
    with oracle_connection() as connection:
        with connection.cursor() as cursor:
            connection.begin()
            try:
                if is_task_processed(cursor, self.request.id, self.request.root_id, self.name):
                    logger.warning(f"Compensation task {self.request.id} already processed. Skipping.")
                    connection.rollback()
                    return {"status": "SKIPPED"}
                
                # This logic assumes a previous reservation was successful.
                # It needs to find what was reserved for the given order_id.
                # For simplicity, we assume another table tracks reservations per order.
                cursor.execute(
                    """
                    UPDATE INVENTORY i
                    SET i.reserved_stock = i.reserved_stock - r.quantity
                    FROM ORDER_RESERVATIONS r
                    WHERE r.order_id = :order_id AND i.product_id = r.product_id
                    """,
                    order_id=order_id
                )
                
                mark_task_as_processed(cursor, self.request.id, self.request.root_id, self.name)
                connection.commit()
                logger.info(f"Successfully released inventory for order {order_id}. Reason: {reason}")
                return {"status": "SUCCESS"}
            except Exception as e:
                logger.error(f"Compensation for order {order_id} failed: {e}")
                connection.rollback()
                # A failed compensation is a critical error that requires manual intervention.
                # This should trigger a high-priority alert.
                raise

This code illustrates several production-grade practices: using a connection pool, explicit transaction management (connection.begin(), connection.commit(), connection.rollback()), and a robust idempotency check within the same transaction as the business logic.

Deployment on AWS EKS

Running this system on EKS requires careful configuration of the Celery workers, the message broker (e.g., RabbitMQ), and access to the Oracle database.

Architecture on EKS

graph TD
    subgraph AWS Cloud
        subgraph EKS Cluster
            A[API Service Deployment] --> |Triggers Saga| B(RabbitMQ / Redis)
            B --> C{Celery Worker Deployment}
            C --> |oracledb client| D[Oracle DB]
        end
        D -- VPC Peering / Direct Connect --> E[On-Premises Data Center]
    end

    style A fill:#cde4ff
    style C fill:#cde4ff
    style B fill:#ffe4b5
    style D fill:#f9f,stroke:#333,stroke-width:2px

The Celery worker is containerized. The Dockerfile must install the Oracle Instant Client libraries, which the oracledb driver depends on.

# Use an official Python base image
FROM python:3.9-slim

# Set environment variables for Oracle Instant Client
ENV LD_LIBRARY_PATH=/opt/oracle/instantclient_21_5
ENV ORACLE_HOME=/opt/oracle/instantclient_21_5

# Download and install Oracle Instant Client
# A real-world Dockerfile would pull this from an internal artifact repository
# for reliability and security.
WORKDIR /tmp
ADD https://download.oracle.com/otn_software/linux/instantclient/215000/instantclient-basic-linux.x64-21.5.0.0.0dbru.zip .
RUN apt-get update && apt-get install -y libaio1 unzip && \
    unzip instantclient-basic-linux.x64-21.5.0.0.0dbru.zip && \
    mkdir -p /opt/oracle && \
    mv instantclient_21_5 /opt/oracle/ && \
    rm -f instantclient-basic-linux.x64-21.5.0.0.0dbru.zip && \
    apt-get purge -y --auto-remove unzip && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Command to run the Celery worker
CMD ["celery", "-A", "project.app", "worker", "-l", "info", "-c", "4"]

The Kubernetes deployment manifest for the workers must inject the Oracle credentials and broker URL securely using Kubernetes Secrets and ConfigMaps.

# celery-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-saga-worker
  labels:
    app: celery-saga-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: celery-saga-worker
  template:
    metadata:
      labels:
        app: celery-saga-worker
    spec:
      containers:
      - name: worker
        image: your-repo/celery-oracle-worker:latest
        env:
          - name: CELERY_BROKER_URL
            valueFrom:
              secretKeyRef:
                name: celery-secrets
                key: broker-url
          - name: ORACLE_USER
            valueFrom:
              secretKeyRef:
                name: oracle-credentials
                key: username
          - name: ORACLE_PASSWORD
            valueFrom:
              secretKeyRef:
                name: oracle-credentials
                key: password
          - name: ORACLE_DSN
            valueFrom:
              configMapKeyRef:
                name: oracle-config
                key: dsn
        resources:
          requests:
            cpu: "500m"
            memory: "1Gi"
          limits:
            cpu: "1"
            memory: "2Gi"

This setup provides a scalable and resilient execution environment. EKS can automatically replace failed worker pods, and the Horizontal Pod Autoscaler (HPA) can be configured to scale the number of workers based on the length of the Celery queue.

The final architecture successfully decomposes the monolithic PL/SQL procedure into a managed, observable, and scalable workflow. Each step is an independent, idempotent task. The saga orchestrator provides the transactional guarantees, ensuring that a failure in one step triggers a clean rollback of all previously completed work. We have replaced a brittle, tightly-coupled system with a resilient, distributed one that can evolve with business needs.

This saga orchestration model is not a silver bullet. It introduces eventual consistency, which means there’s a window of time during a failure and compensation where the system state is inconsistent. This trade-off must be acceptable for the business process. Furthermore, the possibility of a compensation task failing is a real risk that must be mitigated with robust monitoring and alerting, as it will always require manual intervention. The observability of a distributed saga can also be complex; tracing a single order_id through a dozen task executions requires a distributed tracing solution like OpenTelemetry to be truly effective. Future iterations will focus on persisting the saga state to a durable store and integrating comprehensive tracing.


  TOC