The core problem was a familiar one: a monolithic Oracle database, hammered by thousands of transactions per second, held the keys to critical real-time business decisions. The business intelligence team needed to see aggregated transaction patterns as they happened, not 15 minutes later from a batch ETL job. Direct queries against the OLTP instance were strictly forbidden; a single inefficient query could cascade into a production outage. Our mandate was clear: build a low-latency pipeline to stream changes out of this database, perform continuous aggregations, and push the results to a live dashboard, all without touching the primary application’s performance profile.
Our first attempt was naive but necessary to establish a baseline. We built a simple Python service that polled a high-volume TRANSACTIONS
table every two seconds, querying for rows with a CREATED_AT
timestamp greater than the last check. In a low-volume dev environment, it worked. In staging, under a simulated load, it collapsed. The constant, high-frequency queries on a non-covering index created significant database CPU pressure. Adding a covering index helped the query but introduced write overhead on the very table we were trying not to impact. The fundamental flaw was the pull-based model; we were asking the database “anything new?” thousands of times an hour, a fundamentally inefficient approach.
The second iteration involved database triggers. We created an AFTER INSERT
trigger on the TRANSACTIONS
table that copied the new row into a dedicated TRANSACTION_EVENTS
queue table. This decoupled the read from the primary table; our Python service now polled the queue table, processed new events, and deleted them. This was an improvement, but it introduced a new set of problems. In a real-world project, every trigger adds transactional overhead and increases lock contention. Under heavy insert load, we saw a measurable increase in commit times on the primary table. Furthermore, the queue table itself became a point of contention. We were essentially shifting the bottleneck, not eliminating it. The DBAs vetoed this approach for production deployment, and rightly so.
This led to the architecture that ultimately worked. We needed a push-based mechanism from Oracle, but the cost and operational complexity of a full-blown GoldenGate or a Debezium-Kafka setup were prohibitive for this specific use case. The solution was Oracle’s lesser-known but powerful Continuous Query Notification (CQN) feature. It allows a client to register a query with the database and receive a notification when the data underlying that query changes. For the distributed processing, our analytics stack was already built on Dask, making it the natural choice. For the final delivery, WebSockets provided the necessary real-time push to clients. The resulting system is a chain of three distinct, asynchronous services: an Oracle Listener, a Dask computation cluster, and a WebSocket broadcast hub.
graph TD subgraph Oracle Database A[OLTP Transactions Table] -- INSERT/UPDATE --> B{Oracle Notification Service}; end subgraph Listener Service C[Python CQN Listener] -- 1. Notification --> C; C -- 2. Fetch Changed Data --> A; C -- 3. Push to Queue --> D[Dask Distributed Queue]; end subgraph Dask Cluster D -- 4. Data Batch --> E[Dask Workers]; E -- 5. Partial Aggregation --> F[Dask Aggregator Actor]; end subgraph WebSocket Hub F -- 6. Push Updated State --> G[WebSocket Server]; end H[Browser Clients] -- WebSocket Connection --> G; G -- 7. Broadcast Results --> H; style Listener Service fill:#f9f,stroke:#333,stroke-width:2px style Dask Cluster fill:#ccf,stroke:#333,stroke-width:2px style WebSocket Hub fill:#cfc,stroke:#333,stroke-width:2px
The Oracle CQN Listener
The listener is the heart of the change data capture mechanism. It’s a standalone Python service whose only job is to maintain a persistent connection to Oracle, subscribe to notifications, and forward change data into the Dask ecosystem. The implementation details here are critical, as this component must be exceptionally resilient to network partitions and database failures.
A common mistake when working with CQN is misunderstanding what it provides. The notification itself is minimalistic; it essentially just tells you that something in the result set of your registered query has changed. It does not provide the changed data itself. The listener must then re-execute a query to fetch the actual delta.
Here is the core of the listener service. It uses cx_Oracle
, asyncio
for non-blocking operations, and a simple exponential backoff for reconnection logic.
# listener_service.py
import asyncio
import cx_Oracle
import os
import logging
import time
from dask.distributed import Client, Queue
from dotenv import load_dotenv
load_dotenv()
# --- Configuration ---
# A pragmatic approach for managing configs without complex frameworks.
ORACLE_USER = os.getenv("ORACLE_USER")
ORACLE_PASSWORD = os.getenv("ORACLE_PASSWORD")
ORACLE_DSN = os.getenv("ORACLE_DSN")
DASK_SCHEDULER = os.getenv("DASK_SCHEDULER", "tcp://127.0.0.1:8786")
QUEUE_NAME = "oracle-cdc-stream"
# Use a high timeout, CQN needs it.
CQN_TIMEOUT_SECONDS = 60 * 30
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
class OracleCqnListener:
def __init__(self, dask_client, cdc_queue):
self.dask_client = dask_client
self.cdc_queue = cdc_queue
self.connection = None
self.subscription = None
self.last_seen_tx_id = 0 # Simple state management for fetching deltas
async def connect_to_oracle(self):
"""
Establishes and re-establishes connection to Oracle DB with exponential backoff.
This is crucial for production stability.
"""
retry_delay = 1
while True:
try:
logging.info("Attempting to connect to Oracle...")
# The pool is essential for cx_Oracle's async mode.
pool = cx_Oracle.AsyncConnectionPool(
user=ORACLE_USER,
password=ORACLE_PASSWORD,
dsn=ORACLE_DSN,
min=1, max=2, increment=1
)
self.connection = await pool.acquire()
logging.info(f"Oracle connection successful. DB Version: {self.connection.version}")
# Fetch the latest transaction ID to start streaming from this point.
await self.initialize_watermark()
return
except cx_Oracle.Error as e:
logging.error(f"Oracle connection failed: {e}. Retrying in {retry_delay}s.")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60) # Cap retry delay at 60s
async def initialize_watermark(self):
"""Set the initial high-water mark to avoid processing stale data on startup."""
try:
async with self.connection.cursor() as cursor:
await cursor.execute("SELECT MAX(TRANSACTION_ID) FROM REALTIME_TRANSACTIONS")
result = await cursor.fetchone()
self.last_seen_tx_id = result[0] if result and result[0] is not None else 0
logging.info(f"Initialized high-water mark to transaction ID: {self.last_seen_tx_id}")
except cx_Oracle.Error as e:
logging.error(f"Failed to initialize high-water mark: {e}")
# This is a fatal startup error, we should probably exit.
raise
async def cqn_callback(self, message):
"""
This is the core callback function triggered by Oracle on data change.
It runs in its own thread, so keep it lightweight.
"""
logging.info("CQN Notification received.")
for table in message.tables:
# In a more complex scenario, you'd check table.name
if table.operation == cx_Oracle.OPCODE_INSERT:
logging.info("Insert operation detected. Fetching new rows.")
await self.fetch_and_publish_changes()
async def fetch_and_publish_changes(self):
"""
Queries the database for rows newer than the last seen ID and puts them on the Dask queue.
"""
try:
async with self.connection.cursor() as cursor:
# The query that fetches the actual changes.
query = "SELECT TRANSACTION_ID, AMOUNT, CURRENCY, PRODUCT_CATEGORY FROM REALTIME_TRANSACTIONS WHERE TRANSACTION_ID > :last_id ORDER BY TRANSACTION_ID"
await cursor.execute(query, last_id=self.last_seen_tx_id)
rows = await cursor.fetchall()
if not rows:
logging.warning("CQN triggered, but no new rows found. This can happen.")
return
# Convert rows to a more usable format (dicts)
new_events = [
{
"tx_id": row[0], "amount": row[1], "currency": row[2], "category": row[3]
} for row in rows
]
# The handoff point to the Dask cluster.
# In a high-throughput system, this put() could block if the queue is full.
# This is our implicit backpressure mechanism.
await self.cdc_queue.put(new_events)
# IMPORTANT: Update the watermark *after* successfully publishing.
new_max_id = new_events[-1]['tx_id']
self.last_seen_tx_id = new_max_id
logging.info(f"Published {len(new_events)} events. New watermark: {self.last_seen_tx_id}")
except cx_Oracle.Error as e:
logging.error(f"Error fetching/publishing changes: {e}. Connection might be stale.")
# Trigger a reconnect cycle
await self.stop()
await self.run()
except Exception as e:
logging.error(f"An unexpected error occurred in fetch_and_publish_changes: {e}")
async def run(self):
"""Main execution loop."""
await self.connect_to_oracle()
try:
# The subscription object is the key to CQN.
self.subscription = self.connection.subscribe(
callback=self.cqn_callback,
timeout=CQN_TIMEOUT_SECONDS,
qos=cx_Oracle.SUBSCR_QOS_ROWIDS # We don't use rowids but this is best practice
)
logging.info(f"CQN Subscription created with ID: {self.subscription.id}")
# Register the specific query we want to monitor.
# Any INSERT into this table will trigger the notification.
sql_to_register = "SELECT TRANSACTION_ID FROM REALTIME_TRANSACTIONS"
async with self.connection.cursor() as cursor:
await cursor.execute(sql_to_register)
self.subscription.registerquery(cursor)
logging.info("CQN registration complete. Waiting for notifications...")
# Keep the script alive. In a real service, this would be a more robust loop.
while True:
await asyncio.sleep(60)
except cx_Oracle.Error as e:
logging.error(f"A database error occurred in the main run loop: {e}")
finally:
await self.stop()
async def stop(self):
if self.subscription and self.connection:
logging.info("Unregistering CQN subscription.")
await self.connection.unsubscribe(self.subscription)
if self.connection:
logging.info("Closing Oracle connection.")
await self.connection.close()
self.connection = None
self.subscription = None
async def main():
logging.info("Initializing Dask client...")
dask_client = await Client(DASK_SCHEDULER, asynchronous=True)
cdc_queue = Queue(QUEUE_NAME, client=dask_client)
listener = OracleCqnListener(dask_client, cdc_queue)
try:
await listener.run()
except KeyboardInterrupt:
logging.info("Shutdown signal received.")
finally:
await listener.stop()
await dask_client.close()
if __name__ == "__main__":
# Ensure the required DB privilege is granted:
# GRANT CHANGE NOTIFICATION TO your_user;
asyncio.run(main())
The Dask Processing Pipeline
Once events land in the dask.distributed.Queue
, the listener’s job is done. The Dask cluster now takes over. The pitfall here is to treat this like a typical batch job. A streaming system requires careful state management. For our use case—calculating live aggregates like total transaction volume per category—we need a component that can hold and update state efficiently. Dask Actors are a perfect fit for this.
An Actor is essentially a stateful object that lives on one of the Dask workers. Other tasks can call its methods, and these calls are serialized, ensuring there are no race conditions when updating the state. We will create an Aggregator
actor to hold our running counts.
Another key aspect is backpressure. If Oracle generates events faster than Dask can process them, the dask.distributed.Queue
will grow indefinitely, consuming all available memory. Our listener’s queue.put(data)
call is asynchronous but will eventually block if the queue is at its maxsize
(which is infinite by default). By relying on this blocking behavior, the listener naturally stops pulling data from Oracle when the downstream system is overloaded. This is a simple but effective form of passive backpressure.
# processor_service.py
import asyncio
from dask.distributed import Client, Actor
import logging
# This service would run on a machine with access to the Dask scheduler.
# It can be run multiple times to create more "consumers".
DASK_SCHEDULER = "tcp://127.0.0.1:8786"
QUEUE_NAME = "oracle-cdc-stream"
AGGREGATOR_ACTOR_NAME = "metrics-aggregator"
WEBSOCKET_HUB_URL = "http://127.0.0.1:8765/update" # Endpoint to push updates to
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - [%(name)s] - %(levelname)s - %(message)s",
)
class MetricsAggregator(Actor):
"""
A Dask Actor to maintain the live state of our aggregates.
It lives on a single worker to ensure state consistency.
"""
def __init__(self):
# State is just a dictionary in memory. For production, you'd
# want to snapshot this periodically to persistent storage.
self.category_volume = {}
self.total_transactions = 0
logging.info("MetricsAggregator Actor initialized.")
def update_batch(self, events: list):
"""
Processes a batch of transaction events and updates the state.
This method is atomic from the perspective of the Dask scheduler.
"""
if not events:
return None
for event in events:
category = event.get('category', 'UNKNOWN')
amount = event.get('amount', 0)
self.category_volume[category] = self.category_volume.get(category, 0) + amount
self.total_transactions += 1
# After updating, return the new state.
return self.get_current_state()
def get_current_state(self):
"""Returns the current aggregated metrics."""
return {
"total_transactions": self.total_transactions,
"volume_by_category": self.category_volume,
"timestamp": time.time()
}
async def process_events(aggregator, batch):
"""
This is the function that gets submitted to a Dask worker.
It calls the actor's method to update the state.
"""
import httpx # Dask workers need to have necessary libraries installed
if not batch:
return
logging.info(f"Processing batch of {len(batch)} events.")
# The actual update happens on the actor's worker
updated_state = await aggregator.update_batch(batch)
if updated_state:
logging.info(f"New state: {updated_state}")
# Push the new state to the WebSocket hub for broadcasting
try:
async with httpx.AsyncClient() as client:
await client.post(WEBSOCKET_HUB_URL, json=updated_state, timeout=5.0)
except httpx.RequestError as exc:
logging.error(f"Failed to push update to WebSocket hub: {exc}")
async def consumer_loop(client):
"""
The main loop for a consumer. It pulls batches from the queue and submits
them for processing.
"""
cdc_queue = client.get_dataset(QUEUE_NAME) # Connect to the existing queue
aggregator = await client.get_actor(AGGREGATOR_ACTOR_NAME)
logging.info("Consumer started. Waiting for events from the queue...")
while True:
try:
# This will block until there is data in the queue.
batch = await cdc_queue.get()
# Submit the processing task to the cluster. Don't wait for it.
# This allows us to pull the next batch while the previous one is processed.
client.submit(process_events, aggregator, batch)
except Exception as e:
logging.error(f"Error in consumer loop: {e}")
await asyncio.sleep(5) # Avoid tight error loops
async def main():
async with Client(DASK_SCHEDULER, asynchronous=True) as client:
# Initialize the aggregator actor. The `name` makes it a singleton.
try:
# Try to get it first, in case the service restarted.
actor = await client.get_actor(AGGREGATOR_ACTOR_NAME)
logging.info("Found existing MetricsAggregator actor.")
except ValueError:
# If not found, create it.
logging.info("MetricsAggregator actor not found. Creating a new one.")
actor = await client.submit(MetricsAggregator, actor=True, name=AGGREGATOR_ACTOR_NAME)
# wait for actor to be ready
await actor.wait()
# We can run multiple consumers for higher throughput.
consumer_task = asyncio.create_task(consumer_loop(client))
try:
await consumer_task
except KeyboardInterrupt:
logging.info("Shutdown signal received.")
if __name__ == "__main__":
asyncio.run(main())
The WebSocket Broadcast Hub
The final piece is the simplest but most visible. It’s a lightweight asyncio
server that accepts WebSocket connections from clients (e.g., a browser dashboard). It exposes an HTTP endpoint that the Dask processing tasks can push updates to. When an update is received, the server serializes it to JSON and broadcasts it to all currently connected clients.
A key consideration in a production system is managing the set of connected clients. The websockets
library handles this gracefully, but you must structure your code to add new clients to a shared set and remove them when they disconnect or an error occurs.
# websocket_hub.py
import asyncio
import json
import logging
import websockets
from aiohttp import web
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
class BroadcastServer:
def __init__(self):
# This set will store all active WebSocket connections.
# It must be managed carefully to avoid memory leaks from stale connections.
self.connected_clients = set()
self.latest_state = {}
async def register_client(self, websocket):
"""Adds a new client to the set of connected clients."""
self.connected_clients.add(websocket)
logging.info(f"Client connected: {websocket.remote_address}. Total clients: {len(self.connected_clients)}")
# Send the latest known state immediately upon connection
if self.latest_state:
try:
await websocket.send(json.dumps(self.latest_state))
except websockets.ConnectionClosed:
pass # Client disconnected before we could send
async def unregister_client(self, websocket):
"""Removes a client from the set."""
self.connected_clients.remove(websocket)
logging.info(f"Client disconnected: {websocket.remote_address}. Total clients: {len(self.connected_clients)}")
async def broadcast(self, message: str):
"""Sends a message to all connected clients."""
if not self.connected_clients:
return
# The pitfall here is iterating over the set while a client might disconnect,
# modifying the set. Creating a copy for iteration is a safe pattern.
disconnected_clients = set()
for websocket in self.connected_clients:
try:
await websocket.send(message)
except websockets.ConnectionClosed:
logging.warning(f"Failed to send to client {websocket.remote_address}; connection closed.")
disconnected_clients.add(websocket)
# Clean up any clients that disconnected during the broadcast.
if disconnected_clients:
self.connected_clients.difference_update(disconnected_clients)
async def websocket_handler(self, websocket, path):
"""The main handler for incoming WebSocket connections."""
await self.register_client(websocket)
try:
# Keep the connection alive. We don't expect messages from the client,
# but this loop handles disconnects cleanly.
async for message in websocket:
# We could implement client-side commands here if needed.
pass
except websockets.ConnectionClosed as e:
logging.info(f"Connection closed with code {e.code}: {e.reason}")
finally:
await self.unregister_client(websocket)
async def http_update_handler(self, request):
"""
AIOHTTP handler for the POST endpoint that Dask workers will call.
"""
try:
data = await request.json()
logging.info(f"Received update from Dask: {data}")
self.latest_state = data
await self.broadcast(json.dumps(data))
return web.Response(text="Update received and broadcasted.", status=200)
except json.JSONDecodeError:
return web.Response(text="Invalid JSON.", status=400)
except Exception as e:
logging.error(f"Error in HTTP update handler: {e}")
return web.Response(text="Internal server error.", status=500)
async def main():
server = BroadcastServer()
# Setup AIOHTTP for the webhook
app = web.Application()
app.router.add_post('/update', server.http_update_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', 8765)
await site.start()
logging.info("HTTP update endpoint started on http://0.0.0.0:8765")
# Setup WebSocket server
websocket_server = await websockets.serve(
server.websocket_handler, "0.0.0.0", 8766
)
logging.info("WebSocket server started on ws://0.0.0.0:8766")
await asyncio.Event().wait() # Run forever
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Shutting down servers.")
This architecture, while more complex to build than an off-the-shelf solution, provided a highly resilient and performant pipeline tailored to our specific technology stack. It successfully isolated the read load from the primary OLTP database and enabled real-time analytics without introducing massive new infrastructure overhead.
However, the solution is not without its limitations. The use of Oracle CQN is a pragmatic compromise; it is not a true, log-based CDC and can miss events during network partitions or brief database outages if the listener is down. A more robust implementation for guaranteed delivery would involve mining the Oracle redo logs, a significantly more complex undertaking. The state management within the Dask Actor is also ephemeral. A restart of the Dask worker hosting the actor will wipe its state. A production-grade system would require periodic state snapshots to a persistent store like Redis or S3, and logic to recover from that snapshot on actor initialization. Finally, the single WebSocket hub can become a bottleneck; scaling it would require a message bus like Redis Pub/Sub to decouple the broadcast logic from individual server instances.