The performance degradation was not subtle. A core financial modeling endpoint, /api/v1/risk-analysis
, built in Django REST Framework, was bringing the entire service to its knees. Under a load of just ten concurrent users, response times skyrocketed from a baseline of 200ms to over 30 seconds, eventually causing a cascade of Gunicorn worker timeouts. The root cause was clear: a synchronous, CPU-bound Monte Carlo simulation using NumPy was executing directly within the Django view. Each request seized a worker process for its entire duration, effectively starving the service of its ability to handle any other traffic.
The initial, problematic view looked something like this. It’s a classic example of mixing concerns, where the web request-response handler is also the long-running compute engine.
# project/risk_analysis/views.py (The Monolithic Anti-Pattern)
import numpy as np
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
def run_monte_carlo_simulation(portfolio_data, num_simulations=100000):
"""
A placeholder for a computationally expensive NumPy operation.
In a real-world project, this could be matrix multiplication,
statistical modeling, or complex option pricing.
"""
# Simulate complex portfolio analysis
initial_prices = np.array(portfolio_data['prices'])
volatility = np.array(portfolio_data['volatility'])
time_horizon = portfolio_data.get('horizon_days', 252)
# Generate random walks
daily_returns = np.random.normal(0, volatility, (time_horizon, len(initial_prices), num_simulations))
# This creates a large array and performs element-wise operations
price_paths = initial_prices[:, np.newaxis] * np.exp(np.cumsum(daily_returns, axis=0))
# Calculate Value at Risk (VaR) at 95% confidence
final_prices = price_paths[-1]
portfolio_values = np.sum(final_prices, axis=0)
var_95 = np.percentile(portfolio_values, 5)
return {"value_at_risk_95": var_95, "num_simulations": num_simulations}
class RiskAnalysisView(APIView):
def post(self, request, *args, **kwargs):
# Basic validation omitted for brevity
portfolio_data = request.data
if not all(k in portfolio_data for k in ['prices', 'volatility']):
return Response(
{"error": "Missing 'prices' or 'volatility' fields."},
status=status.HTTP_400_BAD_REQUEST
)
# The blocking call. This is the bottleneck.
# A Gunicorn worker is held captive here for seconds.
try:
result = run_monte_carlo_simulation(portfolio_data)
return Response(result, status=status.HTTP_200_OK)
except Exception as e:
# A real project needs proper logging here
return Response(
{"error": f"Calculation failed: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
The immediate temptation was to throw Celery at the problem. While a valid approach, it introduces a new set of infrastructure to manage: a message broker like Redis or RabbitMQ, and a dedicated fleet of Celery worker VMs. In our Oracle Cloud Infrastructure (OCI) environment, this felt like trading one problem for another, specifically one involving operational overhead and cost management for what was ultimately a sporadic, bursty workload.
The architectural pivot was to reframe the problem: Django’s job is to manage state and orchestrate tasks, not to perform heavy computation. The computation itself is a self-contained unit of work that can be executed elsewhere. This pointed directly towards a serverless model using OCI Functions. The final piece of the puzzle was OCI API Gateway, which could act as a sophisticated reverse proxy, presenting a single, unified API to the outside world while intelligently routing traffic to different backends—the Django application for orchestration and the OCI Function for the raw computation.
graph TD subgraph "Monolithic Architecture (Problem)" Client_M[Client] --> Gunicorn_M[Django/Gunicorn] Gunicorn_M -- "Blocks Worker" --> NumPy_M[NumPy Calculation] end subgraph "Decoupled Architecture (Solution)" Client_S[Client] --> APIGW[OCI API Gateway] APIGW -- "POST /tasks" --> Django[Django App: Orchestration] APIGW -- "POST /compute" --> OCIFunc[OCI Function: NumPy Calc] Django -- "Creates Task Record" --> DB[(Database)] OCIFunc -- "Callback to update status" --> Django end
Step 1: Isolating the Computation into an OCI Function
The first task was to extract the run_monte_carlo_simulation
logic into a standalone Python function deployable on OCI Functions. This requires defining the function’s entry point and its dependencies.
The func.py
file is the core of the serverless function. It uses the Functions Development Kit (FDK) for Python to handle the request-response lifecycle. Crucially, it must include robust error handling for malformed input, as it’s now an independent service boundary.
# oci_function/func.py
import io
import json
import logging
import traceback
import numpy as np
from fdk import response
# It's a good practice to configure logging at the start
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def run_monte_carlo_simulation(portfolio_data, num_simulations=100000):
"""
This is the same expensive function, now isolated.
It's critical to validate its inputs thoroughly.
"""
if not isinstance(portfolio_data, dict) or not all(k in portfolio_data for k in ['prices', 'volatility']):
raise ValueError("Input must be a JSON object with 'prices' and 'volatility' keys.")
initial_prices = np.array(portfolio_data['prices'])
volatility = np.array(portfolio_data['volatility'])
if initial_prices.ndim != 1 or volatility.ndim != 1 or len(initial_prices) != len(volatility):
raise ValueError("'prices' and 'volatility' must be 1D arrays of the same length.")
time_horizon = int(portfolio_data.get('horizon_days', 252))
logger.info(f"Starting simulation for {len(initial_prices)} assets over {time_horizon} days.")
daily_returns = np.random.normal(0, volatility, (time_horizon, len(initial_prices), num_simulations))
price_paths = initial_prices[:, np.newaxis] * np.exp(np.cumsum(daily_returns, axis=0))
final_prices = price_paths[-1]
portfolio_values = np.sum(final_prices, axis=0)
var_95 = np.percentile(portfolio_values, 5)
logger.info("Simulation completed successfully.")
return {"value_at_risk_95": float(var_95), "num_simulations": num_simulations}
def handler(ctx, data: io.BytesIO = None):
"""
OCI Function entry point.
"""
try:
body = json.loads(data.getvalue())
result = run_monte_carlo_simulation(body)
return response.Response(
ctx,
response_data=json.dumps(result),
headers={"Content-Type": "application/json"}
)
except (ValueError, json.JSONDecodeError) as e:
logger.error(f"Bad Request: {str(e)}")
error_payload = json.dumps({"error": "Invalid input data", "details": str(e)})
return response.Response(
ctx,
response_data=error_payload,
status_code=400,
headers={"Content-Type": "application/json"}
)
except Exception:
logger.error(f"Internal Server Error: {traceback.format_exc()}")
error_payload = json.dumps({"error": "An unexpected error occurred during computation."})
return response.Response(
ctx,
response_data=error_payload,
status_code=500,
headers={"Content-Type": "application/json"}
)
The function’s dependencies are declared in requirements.txt
, and its runtime metadata is in func.yaml
. A common pitfall here is version mismatch. The NumPy version should be pinned to avoid unexpected behavior changes between local testing and the cloud deployment.
# oci_function/func.yaml
schema_version: 20180708
name: numpy-risk-analysis-func
version: 0.0.1
runtime: python
build_image: fnproject/python:3.9-dev
run_image: fnproject/python:3.9
entrypoint: /python/bin/fdk /function/func.py handler
memory: 1024
# oci_function/requirements.txt
fdk>=0.1.58
numpy==1.24.3
After deploying this function to OCI, it becomes an invocable HTTP endpoint, completely independent of the Django application.
Step 2: Refactoring Django for Asynchronous Orchestration
With the computation offloaded, the Django application’s role transforms from a worker into a manager. It now needs to:
- Accept a computation request.
- Persist the task state in a database.
- Provide an endpoint for clients to check the status of the task.
- Provide a secure callback endpoint for the OCI Function to report its completion.
This requires a new model and a set of non-blocking views.
# project/tasks/models.py
import uuid
from django.db import models
from django.utils.translation import gettext_lazy as _
class ComputationTask(models.Model):
class TaskStatus(models.TextChoices):
PENDING = 'PENDING', _('Pending')
PROCESSING = 'PROCESSING', _('Processing')
SUCCESS = 'SUCCESS', _('Success')
FAILED = 'FAILED', _('Failed')
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
status = models.CharField(
max_length=20,
choices=TaskStatus.choices,
default=TaskStatus.PENDING
)
# The original request payload is stored for auditing/retries
input_payload = models.JSONField()
# The result from the OCI function will be stored here
output_result = models.JSONField(null=True, blank=True)
error_message = models.TextField(blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def __str__(self):
return f"Task {self.id} - {self.status}"
The views are now split. One view initiates the process, and another provides status updates. This is a fundamental shift to an asynchronous pattern. The client is no longer waiting for the result in the same HTTP request.
A critical design decision is how to trigger the OCI function. The Django view could call it directly, but this reintroduces a blocking call (albeit a faster network call). A better production pattern is to have the Django view simply create the task record and return immediately. The actual invocation of the OCI function can be handled by a separate lightweight process, or even simpler for this use case, the client can be instructed to make the call to the compute endpoint. We will use the API Gateway to facilitate this clean separation.
The Django app exposes two endpoints: one to create a task and one to get its status.
# project/tasks/views.py
import hmac
import hashlib
import os
import logging
from django.shortcuts import get_object_or_404
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .models import ComputationTask
from .serializers import ComputationTaskSerializer, TaskCreateSerializer
logger = logging.getLogger(__name__)
# This shared secret must be configured securely in both Django settings and as
# an OCI Function configuration variable. Do NOT hardcode it.
CALLBACK_SECRET = os.environ.get("OCI_CALLBACK_SECRET", "default-secret-for-dev")
class TaskCreateView(APIView):
"""
Creates a new computation task record.
This endpoint is extremely fast as it only performs a database write.
"""
def post(self, request, *args, **kwargs):
serializer = TaskCreateSerializer(data=request.data)
if serializer.is_valid():
task = ComputationTask.objects.create(
input_payload=serializer.validated_data['payload']
)
response_data = {
"message": "Task created. Proceed to call the compute endpoint.",
"task_id": task.id,
"status_url": request.build_absolute_uri(f"/api/tasks/{task.id}/"),
# Instruct the client which endpoint to call next
"compute_url": "/api/compute/",
}
return Response(response_data, status=status.HTTP_202_ACCEPTED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class TaskStatusView(APIView):
"""
Retrieves the status and result of a computation task.
Clients poll this endpoint.
"""
def get(self, request, task_id, *args, **kwargs):
task = get_object_or_404(ComputationTask, id=task_id)
serializer = ComputationTaskSerializer(task)
return Response(serializer.data, status=status.HTTP_200_OK)
class TaskUpdateCallbackView(APIView):
"""
A secure, internal-only endpoint for the OCI Function to post back results.
This should not be exposed directly to the public internet.
The API Gateway will protect this.
"""
def post(self, request, task_id, *args, **kwargs):
# Security: Verify the request came from our OCI function
# A simple HMAC signature is used for this purpose.
provided_signature = request.headers.get("X-Callback-Signature")
if not provided_signature:
return Response({"error": "Signature missing"}, status=status.HTTP_401_UNAUTHORIZED)
signature = hmac.new(
CALLBACK_SECRET.encode('utf-8'),
request.body,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, provided_signature):
logger.warning(f"Invalid callback signature for task {task_id}")
return Response({"error": "Invalid signature"}, status=status.HTTP_401_UNAUTHORIZED)
task = get_object_or_404(ComputationTask, id=task_id)
data = request.data
if data.get("status") == "SUCCESS":
task.status = ComputationTask.TaskStatus.SUCCESS
task.output_result = data.get("result")
elif data.get("status") == "FAILED":
task.status = ComputationTask.TaskStatus.FAILED
task.error_message = data.get("error_details")
else:
return Response({"error": "Invalid status in callback"}, status=status.HTTP_400_BAD_REQUEST)
task.save()
logger.info(f"Task {task_id} updated via callback to status {task.status}")
return Response({"message": "Task updated successfully"}, status=status.HTTP_200_OK)
This design now requires the client to perform a two-step process, but it fully decouples the systems. For this to work, the OCI Function needs to be modified to perform this callback.
# oci_function/func.py (Modified for callback)
# ... (imports and run_monte_carlo_simulation function remain the same) ...
import requests # Add 'requests' to requirements.txt
import os
import hmac
import hashlib
# These must be set as configuration variables for the OCI Function
DJANGO_CALLBACK_URL = os.environ.get("DJANGO_CALLBACK_URL") # e.g., https://api.mydomain.com/api/tasks/{task_id}/callback
CALLBACK_SECRET = os.environ.get("CALLBACK_SECRET")
def post_callback(task_id, payload):
if not DJANGO_CALLBACK_URL or not CALLBACK_SECRET:
logger.error("Callback URL or secret not configured. Skipping callback.")
return
url = DJANGO_CALLBACK_URL.format(task_id=task_id)
body = json.dumps(payload).encode('utf-8')
signature = hmac.new(
CALLBACK_SECRET.encode('utf-8'),
body,
hashlib.sha256
).hexdigest()
headers = {
"Content-Type": "application/json",
"X-Callback-Signature": signature
}
try:
response = requests.post(url, data=body, headers=headers, timeout=5)
response.raise_for_status()
logger.info(f"Successfully posted callback for task {task_id}")
except requests.exceptions.RequestException as e:
logger.error(f"Failed to post callback for task {task_id}: {e}")
# In a production system, this failure should be sent to a dead-letter queue for retry.
def handler(ctx, data: io.BytesIO = None):
task_id = None
try:
body = json.loads(data.getvalue())
# The client must now pass the task_id in the payload
task_id = body.get("task_id")
computation_payload = body.get("payload")
if not task_id or not computation_payload:
raise ValueError("Request must include 'task_id' and 'payload'.")
result = run_monte_carlo_simulation(computation_payload)
callback_payload = {"status": "SUCCESS", "result": result}
post_callback(task_id, callback_payload)
# The function's direct response to the client can still be the result
return response.Response(
ctx, response_data=json.dumps(result), headers={"Content-Type": "application/json"}
)
except Exception as e:
error_details = traceback.format_exc()
logger.error(f"Error processing task {task_id}: {error_details}")
if task_id:
callback_payload = {"status": "FAILED", "error_details": str(e)}
post_callback(task_id, callback_payload)
return response.Response(
ctx,
response_data=json.dumps({"error": str(e)}),
status_code=500,
headers={"Content-Type": "application/json"}
)
Step 3: Configuring the OCI API Gateway as the Unified Front Door
The API Gateway is the glue that makes this architecture seamless for the client. It will be configured with a single public endpoint and multiple routes that direct traffic to the correct backend based on the request path.
sequenceDiagram participant Client participant APIGW as OCI API Gateway participant DjangoApp as Django Backend participant OCIFunc as OCI Function Backend Client->>+APIGW: POST /api/tasks (payload) APIGW->>+DjangoApp: (Forward Request) DjangoApp-->>-APIGW: 202 ACCEPTED (task_id) APIGW-->>-Client: 202 ACCEPTED (task_id) Client->>+APIGW: POST /api/compute (task_id, payload) APIGW->>+OCIFunc: (Forward Request) OCIFunc-->>-APIGW: 200 OK (computation_result) APIGW-->>-Client: 200 OK (computation_result) OCIFunc->>+APIGW: POST /api/tasks/{id}/callback (signed_payload) Note right of OCIFunc: (Internal call) APIGW->>+DjangoApp: (Forward Request) DjangoApp-->>-APIGW: 200 OK APIGW-->>-OCIFunc: 200 OK loop Poll for status Client->>+APIGW: GET /api/tasks/{id}/ APIGW->>+DjangoApp: (Forward Request) DjangoApp-->>-APIGW: 200 OK (status: SUCCESS, result) APIGW-->>-Client: 200 OK (status: SUCCESS, result) end
The configuration within OCI involves these key steps:
- Define Backends:
-
django-backend
: TypeHTTP
, pointing to the URL of the Load Balancer in front of the Django application VMs. -
oci-function-backend
: TypeOracle Functions
, pointing to the specific OCI Function application and function name.
-
- Define Routes in a Deployment:
- Route 1: Task Creation
- Path:
/api/tasks
- Methods:
POST
- Backend:
django-backend
- Path:
- Route 2: Task Status
- Path:
/api/tasks/{taskId}
(uses path parameter) - Methods:
GET
- Backend:
django-backend
- Path:
- Route 3: Task Update Callback
- Path:
/api/tasks/{taskId}/callback
- Methods:
POST
- Backend:
django-backend
- Security: This route should have an authorizer function or IP whitelisting policy applied to ensure it can only be called from the OCI Function’s egress IPs.
- Path:
- Route 4: Computation
- Path:
/api/compute
- Methods:
POST
- Backend:
oci-function-backend
- Path:
- Route 1: Task Creation
This configuration ensures the client interacts with a single host (api.mydomain.com
) while the gateway handles the complex routing logic. The Django application is no longer burdened with the heavy compute traffic, and its workers remain free to handle the fast orchestration requests. The result is a system that can handle a much higher concurrent load, scales compute resources on demand, and maintains a clean architectural separation of concerns.
Limitations and Future Considerations
This architecture, while vastly superior to the monolith, is not without its own complexities and trade-offs. The client-side logic is now more involved, requiring a multi-step, asynchronous flow instead of a simple request-response. The reliance on client polling for status updates is inefficient at scale; a production system would likely evolve to use WebSockets or Server-Sent Events pushed from the Django backend to notify clients of task completion, eliminating the need for polling.
Furthermore, passing large data payloads (e.g., multi-gigabyte matrices) through JSON in API calls is impractical. A more robust pattern would involve the client uploading input data directly to OCI Object Storage, passing only the object’s OCID (Oracle Cloud Identifier) to the task creation endpoint. The OCI Function would then be granted permission to read this object, perform the computation, and write the result to another object, updating the Django task record with the output object’s OCID. This avoids shuffling large amounts of data through the application and function layers entirely.
Finally, observability becomes more challenging. A single logical operation now spans multiple services (API Gateway, Django, OCI Function, Database). Implementing distributed tracing with a shared trace ID passed through all components via HTTP headers is no longer a “nice-to-have” but a necessity for debugging failures that span across these service boundaries. The current HMAC signature for callbacks is sufficient for basic authentication, but leveraging OCI’s native IAM policies and resource principals offers a more secure and manageable long-term solution.