Our core order processing was trapped inside a monolithic 3,000-line Oracle PL/SQL stored procedure. For years, it was the unassailable heart of the system, a black box that handled everything from inventory checks and payment processing to customer notifications. It worked, until it didn’t. Scaling was impossible, adding a new step (like calling a third-party fraud detection API) was a high-risk, month-long project, and any failure mid-execution left the database in an inconsistent state requiring hours of manual data correction by a DBA. The mandate was clear: break it apart. But doing so introduced the classic distributed systems problem—how do you maintain transactional integrity across multiple, independent services and a legacy database without a distributed transaction coordinator?
The initial concept was to decompose the monolith into a sequence of services running on AWS EKS, orchestrated by Celery tasks. This would give us scalability, independent deployments, and the ability to use the right tool for each job. However, a simple chain of Celery tasks (task1.s() | task2.s() | task3.s()
) is brittle. If task3
fails, the work done by task1
and task2
remains committed. Standard database rollbacks don’t work across service boundaries. Two-phase commit (2PC) was immediately ruled out. It’s a protocol that requires holding locks across distributed resources, which would be disastrous for performance and is notoriously difficult to implement correctly in a heterogeneous environment spanning Kubernetes services and a standalone Oracle instance.
This led us directly to the Saga pattern. Instead of an atomic ACID transaction, a saga is a sequence of local transactions. Each local transaction updates the database and publishes a message or event to trigger the next one. The key is that for every forward action (e.g., ReserveInventory
), there must be a corresponding compensating action (e.g., ReleaseInventory
). If any step in the saga fails, the previously completed steps are compensated for by executing their compensating actions in reverse order. We chose an orchestration-based saga, where a central coordinator calls each service in turn, over a choreography-based one. For a critical business process, having a single, observable point of control for the entire workflow is far more manageable than trying to trace a cascade of events across a dozen different services. Celery, as a mature task queue system, was a natural fit for building this orchestrator.
The Saga Definition and Orchestrator
The first step was to design a declarative way to define a saga. Hard-coding the workflow into a single, massive Celery task would just recreate the monolith problem in Python. The solution was a data structure that defines the sequence of steps, their parameters, and their corresponding compensating actions.
A saga is defined as a list of steps. Each step is a dictionary containing the forward action task signature and the compensation action task signature.
# project/sagas/definitions.py
# A declarative definition for a complex order processing saga.
# Each step includes the forward task, its arguments, the compensating task,
# and the arguments for compensation.
ORDER_PROCESSING_SAGA = [
{
"name": "VALIDATE_PAYMENT_METHOD",
"forward": {
"task": "project.tasks.payments.validate_payment_method",
# Placeholders like {{ order_id }} will be resolved at runtime
"args_template": {
"order_id": "{{ order_id }}",
"customer_id": "{{ customer_id }}",
}
},
# No compensation needed for a read-only validation
"compensation": None
},
{
"name": "RESERVE_INVENTORY",
"forward": {
"task": "project.tasks.inventory.reserve_inventory_in_oracle",
"args_template": {
"order_id": "{{ order_id }}",
"items": "{{ items }}",
}
},
"compensation": {
"task": "project.tasks.inventory.release_inventory_in_oracle",
"args_template": {
"order_id": "{{ order_id }}",
"reason": "Saga failure at step: {{ failed_step_name }}"
}
}
},
{
"name": "PROCESS_PAYMENT",
"forward": {
"task": "project.tasks.payments.process_external_payment",
"args_template": {
"order_id": "{{ order_id }}",
"amount": "{{ amount }}",
"payment_token": "{{ payment_token }}"
}
},
"compensation": {
"task": "project.tasks.payments.refund_payment",
"args_template": {
"order_id": "{{ order_id }}",
"transaction_id": "{{ forward_result.transaction_id }}",
"reason": "Saga failure at step: {{ failed_step_name }}"
}
}
},
{
"name": "CREATE_SHIPMENT_RECORD",
"forward": {
"task": "project.tasks.shipping.create_shipment_in_oracle",
"args_template": {
"order_id": "{{ order_id }}",
"shipping_address": "{{ shipping_address }}"
}
},
"compensation": {
"task": "project.tasks.shipping.cancel_shipment_in_oracle",
"args_template": {
"order_id": "{{ order_id }}",
"reason": "Saga failure"
}
}
}
]
With this definition, the orchestrator becomes a generic Celery task that interprets the structure. It doesn’t know or care about inventory or payments; it only knows how to execute a sequence of tasks and their compensations.
The core of the system is the run_saga
task. It maintains the state of the saga’s execution, including the results of each successful forward step. If a step fails, this state is used to execute the necessary compensations.
# project/sagas/orchestrator.py
import logging
from celery import shared_task, chord
from celery.exceptions import MaxRetriesExceededError
from jinja2 import Template
from project.app import celery_app
logger = logging.getLogger(__name__)
def _render_args(args_template, context):
"""Renders Jinja2 templates in task arguments."""
if not args_template:
return {}
rendered_args = {}
for key, value in args_template.items():
if isinstance(value, str):
template = Template(value)
rendered_args[key] = template.render(context)
else:
# Pass through non-string values (like lists or dicts)
rendered_args[key] = value
return rendered_args
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def run_saga(self, saga_definition, initial_context):
"""
Orchestrates a saga defined by a list of steps.
This task is the heart of the compensating transaction framework. It
maintains a log of completed steps. If any forward task fails, it uses
this log to trigger compensating tasks in reverse order.
:param self: Celery task instance.
:param saga_definition: A list of dicts defining the saga steps.
:param initial_context: A dict with the initial data for the saga.
"""
# saga_state stores the history of completed forward tasks and their results
# In a real-world project, this state should be persisted to a durable
# store (e.g., Redis, DynamoDB, or an Oracle table) to survive worker restarts.
# For this implementation, we store it within the task's request context,
# which is risky but sufficient for demonstration.
if 'saga_state' not in self.request.kwargs:
self.request.kwargs['saga_state'] = {
"completed_steps": [],
"context": initial_context.copy()
}
saga_state = self.request.kwargs['saga_state']
start_index = len(saga_state["completed_steps"])
for i in range(start_index, len(saga_definition)):
step = saga_definition[i]
step_name = step.get("name", f"step_{i}")
logger.info(f"Saga '{self.request.id}': Executing step '{step_name}'")
try:
forward_task_def = step["forward"]
forward_task = celery_app.signature(forward_task_def["task"])
# Dynamically build task arguments from the current context
forward_args = _render_args(
forward_task_def.get("args_template", {}),
saga_state["context"]
)
# Execute the forward task synchronously to handle failures immediately
result = forward_task.apply_async(kwargs=forward_args).get(propagate=True)
logger.info(f"Saga '{self.request.id}': Step '{step_name}' completed successfully.")
# Update context with the result of the forward task
saga_state["context"][f"forward_result_{i}"] = result
saga_state["context"]["forward_result"] = result # for easy access to last result
# Record successful completion
saga_state["completed_steps"].append({
"index": i,
"name": step_name,
"compensation": step.get("compensation")
})
except Exception as e:
logger.error(
f"Saga '{self.request.id}': Step '{step_name}' failed. Error: {e}. "
"Initiating compensation."
)
# Add context about the failure for compensation tasks
saga_state["context"]["failed_step_name"] = step_name
saga_state["context"]["failure_reason"] = str(e)
trigger_compensations.delay(
saga_id=self.request.id,
completed_steps=saga_state["completed_steps"],
context=saga_state["context"]
)
# We must re-raise the exception to signal the failure to Celery
# and potentially trigger retries of the orchestrator itself if desired.
raise
logger.info(f"Saga '{self.request.id}': All steps completed successfully.")
return {"status": "SUCCESS", "final_context": saga_state["context"]}
@shared_task
def trigger_compensations(saga_id, completed_steps, context):
"""
Creates and executes a group of compensating tasks in reverse order.
"""
logger.warning(f"Saga '{saga_id}': Triggering compensations for {len(completed_steps)} steps.")
compensation_signatures = []
# Iterate in reverse order
for step_info in reversed(completed_steps):
comp_def = step_info.get("compensation")
if not comp_def:
logger.info(f"Saga '{saga_id}': Step '{step_info['name']}' has no compensation. Skipping.")
continue
comp_task = celery_app.signature(comp_def["task"])
comp_args = _render_args(
comp_def.get("args_template", {}),
context
)
# We add error handling callbacks to each compensation task
# to ensure we know if a rollback itself fails.
comp_task.kwargs.update(comp_args)
compensation_signatures.append(comp_task)
if not compensation_signatures:
logger.warning(f"Saga '{saga_id}': No compensation tasks to execute.")
return
# A chord is used here to execute all compensations in parallel
# and then call a final task once all are complete.
# In some scenarios, sequential compensation might be required.
callback = log_compensation_completion.s(saga_id=saga_id)
chord(compensation_signatures)(callback)
@shared_task
def log_compensation_completion(results, saga_id):
logger.info(f"Saga '{saga_id}': All compensation tasks have been dispatched. Results: {results}")
# In a production system, this task would update the saga's status
# to 'COMPENSATED' and trigger alerts if any compensation failed.
A critical pitfall here is state management. The saga_state
in the example above is stored in the task’s request context, which is volatile. A production-grade implementation would persist this state to a database or a key-value store like Redis after each successful step. This ensures that if the orchestrator pod dies and Celery restarts the task, it can resume from the last completed step instead of starting over.
Oracle Interaction Tasks: Idempotency and Connection Management
The real work happens in the individual tasks that interact with the Oracle database. These tasks must be written defensively. A common mistake is to assume a task will only ever run once. Celery’s default guarantee is “at-least-once” delivery, meaning a task could be executed multiple times if a worker acknowledges completion but dies before the message broker confirms it. Therefore, every database-mutating task must be idempotent.
For our reserve_inventory_in_oracle
task, we can’t just run UPDATE products SET stock = stock - 1
. If that runs twice, we’ve double-counted the reservation. Instead, we need a way to track that this specific action for this order has already been performed. A common pattern is to use a separate “processed messages” or “idempotency key” table.
-- DDL for idempotency tracking in Oracle
CREATE TABLE SAGA_PROCESSED_TASKS (
task_id VARCHAR2(255) PRIMARY KEY,
saga_id VARCHAR2(255) NOT NULL,
task_name VARCHAR2(255) NOT NULL,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
The task code must then handle connections and transactions carefully. Each task should manage its own database transaction.
# project/tasks/inventory.py
import logging
import oracledb
from celery import shared_task
from contextlib import contextmanager
from project.config import ORACLE_DSN, ORACLE_USER, ORACLE_PASSWORD
logger = logging.getLogger(__name__)
# A connection pool is essential for performance in a multi-worker environment.
# This should be initialized once when the Celery worker process starts.
try:
pool = oracledb.create_pool(user=ORACLE_USER, password=ORACLE_PASSWORD, dsn=ORACLE_DSN, min=2, max=5, increment=1)
logger.info("Oracle connection pool created successfully.")
except Exception as e:
logger.error(f"Failed to create Oracle connection pool: {e}")
pool = None
@contextmanager
def oracle_connection():
"""Provides a managed Oracle database connection from the pool."""
if not pool:
raise ConnectionError("Oracle connection pool is not available.")
connection = None
try:
connection = pool.acquire()
yield connection
finally:
if connection:
pool.release(connection)
def is_task_processed(cursor, task_id, saga_id, task_name):
"""Checks if a task has already been processed using the idempotency table."""
cursor.execute("SELECT 1 FROM SAGA_PROCESSED_TASKS WHERE task_id = :task_id", task_id=task_id)
return cursor.fetchone() is not None
def mark_task_as_processed(cursor, task_id, saga_id, task_name):
"""Marks a task as processed in the idempotency table."""
cursor.execute(
"""
INSERT INTO SAGA_PROCESSED_TASKS (task_id, saga_id, task_name)
VALUES (:task_id, :saga_id, :task_name)
""",
task_id=task_id, saga_id=saga_id, task_name=task_name
)
@shared_task(bind=True)
def reserve_inventory_in_oracle(self, order_id, items):
"""
Idempotent task to reserve inventory in the Oracle database.
"""
with oracle_connection() as connection:
with connection.cursor() as cursor:
# Begin transaction
connection.begin()
try:
# 1. Idempotency Check
if is_task_processed(cursor, self.request.id, self.request.root_id, self.name):
logger.warning(f"Task {self.request.id} for order {order_id} already processed. Skipping.")
connection.rollback() # Rollback the check transaction
return {"status": "SKIPPED", "reason": "already_processed"}
# 2. Business Logic
for item in items:
# This is a simplified example. A real implementation would check
# for sufficient stock before updating.
cursor.execute(
"""
UPDATE INVENTORY
SET reserved_stock = reserved_stock + :quantity
WHERE product_id = :product_id AND (total_stock - reserved_stock) >= :quantity
""",
quantity=item['quantity'],
product_id=item['product_id']
)
if cursor.rowcount == 0:
raise ValueError(f"Insufficient stock for product {item['product_id']}")
# 3. Log idempotency key
mark_task_as_processed(cursor, self.request.id, self.request.root_id, self.name)
# 4. Commit transaction
connection.commit()
logger.info(f"Successfully reserved inventory for order {order_id}.")
return {"status": "SUCCESS"}
except Exception as e:
logger.error(f"Failed to reserve inventory for order {order_id}: {e}")
connection.rollback()
# Re-raise to let Celery and the saga orchestrator handle the failure
raise
@shared_task(bind=True)
def release_inventory_in_oracle(self, order_id, reason):
"""
Idempotent compensation task to release reserved inventory.
"""
with oracle_connection() as connection:
with connection.cursor() as cursor:
connection.begin()
try:
if is_task_processed(cursor, self.request.id, self.request.root_id, self.name):
logger.warning(f"Compensation task {self.request.id} already processed. Skipping.")
connection.rollback()
return {"status": "SKIPPED"}
# This logic assumes a previous reservation was successful.
# It needs to find what was reserved for the given order_id.
# For simplicity, we assume another table tracks reservations per order.
cursor.execute(
"""
UPDATE INVENTORY i
SET i.reserved_stock = i.reserved_stock - r.quantity
FROM ORDER_RESERVATIONS r
WHERE r.order_id = :order_id AND i.product_id = r.product_id
""",
order_id=order_id
)
mark_task_as_processed(cursor, self.request.id, self.request.root_id, self.name)
connection.commit()
logger.info(f"Successfully released inventory for order {order_id}. Reason: {reason}")
return {"status": "SUCCESS"}
except Exception as e:
logger.error(f"Compensation for order {order_id} failed: {e}")
connection.rollback()
# A failed compensation is a critical error that requires manual intervention.
# This should trigger a high-priority alert.
raise
This code illustrates several production-grade practices: using a connection pool, explicit transaction management (connection.begin()
, connection.commit()
, connection.rollback()
), and a robust idempotency check within the same transaction as the business logic.
Deployment on AWS EKS
Running this system on EKS requires careful configuration of the Celery workers, the message broker (e.g., RabbitMQ), and access to the Oracle database.
Architecture on EKS
graph TD subgraph AWS Cloud subgraph EKS Cluster A[API Service Deployment] --> |Triggers Saga| B(RabbitMQ / Redis) B --> C{Celery Worker Deployment} C --> |oracledb client| D[Oracle DB] end D -- VPC Peering / Direct Connect --> E[On-Premises Data Center] end style A fill:#cde4ff style C fill:#cde4ff style B fill:#ffe4b5 style D fill:#f9f,stroke:#333,stroke-width:2px
The Celery worker is containerized. The Dockerfile
must install the Oracle Instant Client libraries, which the oracledb
driver depends on.
# Use an official Python base image
FROM python:3.9-slim
# Set environment variables for Oracle Instant Client
ENV LD_LIBRARY_PATH=/opt/oracle/instantclient_21_5
ENV ORACLE_HOME=/opt/oracle/instantclient_21_5
# Download and install Oracle Instant Client
# A real-world Dockerfile would pull this from an internal artifact repository
# for reliability and security.
WORKDIR /tmp
ADD https://download.oracle.com/otn_software/linux/instantclient/215000/instantclient-basic-linux.x64-21.5.0.0.0dbru.zip .
RUN apt-get update && apt-get install -y libaio1 unzip && \
unzip instantclient-basic-linux.x64-21.5.0.0.0dbru.zip && \
mkdir -p /opt/oracle && \
mv instantclient_21_5 /opt/oracle/ && \
rm -f instantclient-basic-linux.x64-21.5.0.0.0dbru.zip && \
apt-get purge -y --auto-remove unzip && rm -rf /var/lib/apt/lists/*
# Install Python dependencies
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Command to run the Celery worker
CMD ["celery", "-A", "project.app", "worker", "-l", "info", "-c", "4"]
The Kubernetes deployment manifest for the workers must inject the Oracle credentials and broker URL securely using Kubernetes Secrets and ConfigMaps.
# celery-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-saga-worker
labels:
app: celery-saga-worker
spec:
replicas: 3
selector:
matchLabels:
app: celery-saga-worker
template:
metadata:
labels:
app: celery-saga-worker
spec:
containers:
- name: worker
image: your-repo/celery-oracle-worker:latest
env:
- name: CELERY_BROKER_URL
valueFrom:
secretKeyRef:
name: celery-secrets
key: broker-url
- name: ORACLE_USER
valueFrom:
secretKeyRef:
name: oracle-credentials
key: username
- name: ORACLE_PASSWORD
valueFrom:
secretKeyRef:
name: oracle-credentials
key: password
- name: ORACLE_DSN
valueFrom:
configMapKeyRef:
name: oracle-config
key: dsn
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1"
memory: "2Gi"
This setup provides a scalable and resilient execution environment. EKS can automatically replace failed worker pods, and the Horizontal Pod Autoscaler (HPA) can be configured to scale the number of workers based on the length of the Celery queue.
The final architecture successfully decomposes the monolithic PL/SQL procedure into a managed, observable, and scalable workflow. Each step is an independent, idempotent task. The saga orchestrator provides the transactional guarantees, ensuring that a failure in one step triggers a clean rollback of all previously completed work. We have replaced a brittle, tightly-coupled system with a resilient, distributed one that can evolve with business needs.
This saga orchestration model is not a silver bullet. It introduces eventual consistency, which means there’s a window of time during a failure and compensation where the system state is inconsistent. This trade-off must be acceptable for the business process. Furthermore, the possibility of a compensation task failing is a real risk that must be mitigated with robust monitoring and alerting, as it will always require manual intervention. The observability of a distributed saga can also be complex; tracing a single order_id
through a dozen task executions requires a distributed tracing solution like OpenTelemetry to be truly effective. Future iterations will focus on persisting the saga state to a durable store and integrating comprehensive tracing.