The initial system bottleneck was a Phoenix controller action that timed out. Users uploaded multi-gigabyte CSV files for financial analysis, and our Elixir-based background processing, even when offloaded via Task.async
, began to cripple the BEAM scheduler under heavy load. The memory consumption and long-running, CPU-bound nature of the validation and transformation logic were not a good fit for the BEAM’s cooperative multitasking model, which excels at I/O-bound concurrency. This led to unacceptable scheduler latency spikes, impacting the responsiveness of the entire real-time application. A fundamental architectural shift was necessary.
The decision was made to build a completely decoupled, asynchronous processing pipeline. The core requirements were fault tolerance, scalability of individual components, and the ability to use the best tool for each specific job. This led us to a polyglot architecture glued together by AWS SQS.
- Phoenix/Elixir: Remains the user-facing web layer. Its responsibility is solely to handle the file upload, persist it to S3, and enqueue the initial processing job. It must return a response to the user immediately.
- AWS SQS: The central nervous system. A standard queue for initial job submissions and another for jobs that have passed primary validation and are ready for statistical analysis. This provides backpressure handling and ensures message durability.
- Rust: The first-stage worker. Chosen for its raw performance, memory safety, and excellent support for parallel processing. It will handle the heavy lifting: downloading the large file from S3, performing validation, normalization, and transformation on a massive scale.
- Python/Pandas: The second-stage worker. While Rust could perform statistical analysis, the required models and data manipulations were already well-established and battle-tested within the Python data science ecosystem, specifically with Pandas and NumPy. Re-implementing this in Rust would be time-consuming and error-prone. In a real-world project, leveraging an existing, powerful ecosystem is often the more pragmatic choice.
- Sentry: Observability in a distributed, polyglot system is non-negotiable. Sentry’s ability to propagate trace context across service and language boundaries via message queue attributes is critical for debugging a single user’s request as it flows through the entire pipeline.
The architecture can be visualized as follows:
graph TD subgraph User Interaction Client[User's Browser] -- Uploads CSV --> Phoenix end subgraph AWS Phoenix -- 1. Uploads to --> S3[(S3 Bucket)] Phoenix -- 2. Enqueues Job --> SQS_A[SQS: validation_queue] S3 -- 4. Downloads CSV --> RustWorker RustWorker -- 3. Polls Job --> SQS_A RustWorker -- 5. Uploads Processed Data --> S3 RustWorker -- 6. Enqueues Job --> SQS_B[SQS: analysis_queue] S3 -- 8. Downloads Processed Data --> PythonWorker PythonWorker -- 7. Polls Job --> SQS_B PythonWorker -- 9. Writes Final Report --> S3 end subgraph Workers RustWorker[Rust Validation Worker] PythonWorker[Python Pandas Worker] end subgraph Observability Phoenix -- Reports Errors/Traces --> SentryService[(Sentry)] RustWorker -- Reports Errors/Traces --> SentryService PythonWorker -- Reports Errors/Traces --> SentryService end
Phoenix Producer: Enqueuing the Initial Job
The Phoenix controller’s only job is to get the file off its hands as quickly as possible. We stream the upload directly to an S3 bucket to avoid loading the entire file into the web server’s memory. Once the upload is confirmed, a message is dispatched to SQS.
First, the necessary configuration in config/config.exs
for the ex_aws
and ex_aws_sqs
libraries.
# config/config.exs
config :ex_aws,
access_key_id: System.get_env("AWS_ACCESS_KEY_ID"),
secret_access_key: System.get_env("AWS_SECRET_ACCESS_KEY"),
region: System.get_env("AWS_REGION")
config :my_app, :sqs,
validation_queue_url: System.get_env("SQS_VALIDATION_QUEUE_URL")
The controller action handles the multipart upload and enqueues the task. A critical piece is initializing a Sentry transaction and passing its trace context into the SQS message attributes. This is the starting point for our distributed trace.
# lib/my_app_web/controllers/upload_controller.ex
defmodule MyAppWeb.UploadController do
use MyAppWeb, :controller
alias ExAws.SQS
def create(conn, %{"upload" => %Plug.Upload{} = upload}) do
job_id = Ecto.UUID.generate()
s3_key = "uploads/#{job_id}/#{upload.filename}"
# Start a Sentry transaction for this request
sentry_ctx = Sentry.start_transaction("upload.process", "request")
Sentry.put_context(sentry_ctx, :user, %{id: conn.assigns.current_user.id})
Sentry.set_tags(%{job_id: job_id})
# The actual upload logic to S3. Assume `Uploader.stream_to_s3/2` is a function
# that handles streaming the Plug.Upload to S3 and returns {:ok, s3_key}
case Uploader.stream_to_s3(upload, s3_key) do
{:ok, saved_key} ->
# Sentry.get_trace_header returns the `sentry-trace` string
trace_header = Sentry.get_trace_header(sentry_ctx)
enqueue_validation_job(job_id, saved_key, trace_header)
Sentry.finish_transaction(sentry_ctx)
conn
|> put_status(:accepted)
|> json(%{status: "processing", job_id: job_id})
{:error, reason} ->
Sentry.capture_exception(reason, stacktrace: __STACKTRACE__)
Sentry.finish_transaction(sentry_ctx, :error)
conn
|> put_status(:internal_server_error)
|> json(%{error: "failed to upload file"})
end
end
defp enqueue_validation_job(job_id, s3_key, trace_header) do
queue_url = Application.fetch_env!(:my_app, :sqs)[:validation_queue_url]
message_body =
Jason.encode!(%{
job_id: job_id,
s3_input_path: s3_key,
s3_output_path_prefix: "processed/#{job_id}/"
})
# This is the key to distributed tracing. We pass the trace context as a message attribute.
message_attributes = %{
"sentry_trace" => %{
data_type: "String",
string_value: trace_header
}
}
SQS.send_message(queue_url, message_body, message_attributes: message_attributes)
|> ExAws.request()
end
end
The message contract is simple JSON. It’s vital to establish and document this contract as it’s the API between our services.
{
"job_id": "uuid-goes-here",
"s3_input_path": "uploads/uuid-goes-here/raw_data.csv",
"s3_output_path_prefix": "processed/uuid-goes-here/"
}
Rust Worker: High-Performance Validation
This is a standalone Rust binary, designed to be deployed in a container and run as a long-lived process. It polls SQS, processes files in parallel, and enqueues jobs for the next stage.
The Cargo.toml
specifies dependencies for AWS, serialization, logging, error handling, and parallel processing.
# Cargo.toml
[package]
name = "validation-worker"
version = "0.1.0"
edition = "2021"
[dependencies]
aws-config = "0.56"
aws-sdk-s3 = "0.29"
aws-sdk-sqs = "0.29"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
csv = "1.2"
rayon = "1.7"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
sentry = "0.31"
thiserror = "1.0"
anyhow = "1.0"
The core logic resides in main.rs
. We set up logging, Sentry, and an AWS SDK client. The main loop continuously polls SQS for messages.
// src/main.rs
use aws_sdk_s3 as s3;
use aws_sdk_sqs as sqs;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{error, info, instrument};
#[derive(Debug, Deserialize)]
struct JobPayload {
job_id: String,
s3_input_path: String,
s3_output_path_prefix: String,
}
#[derive(Debug, Serialize)]
struct NextJobPayload {
job_id: String,
s3_processed_path: String,
}
// Define custom error types for better error handling
#[derive(thiserror::Error, Debug)]
enum WorkerError {
#[error("S3 operation failed: {0}")]
S3Error(String),
#[error("SQS operation failed: {0}")]
SqsError(String),
#[error("CSV processing failed: {0}")]
CsvError(#[from] csv::Error),
#[error("Data validation failed: {0}")]
ValidationError(String),
}
const VALIDATION_QUEUE_URL_ENV: &str = "SQS_VALIDATION_QUEUE_URL";
const ANALYSIS_QUEUE_URL_ENV: &str = "SQS_ANALYSIS_QUEUE_URL";
const S3_BUCKET_ENV: &str = "S3_BUCKET";
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt::init();
// Initialize Sentry. DSN is read from the SENTRY_DSN environment variable.
let _guard = sentry::init((
std::env::var("SENTRY_DSN").expect("SENTRY_DSN must be set"),
sentry::ClientOptions {
release: sentry::release_name!(),
..Default::default()
},
));
let config = aws_config::load_from_env().await;
let sqs_client = sqs::Client::new(&config);
let s3_client = s3::Client::new(&config);
let validation_queue_url = std::env::var(VALIDATION_QUEUE_URL_ENV)?;
info!(queue = %validation_queue_url, "Worker starting, polling SQS queue");
loop {
let receive_output = sqs_client
.receive_message()
.queue_url(&validation_queue_url)
.max_number_of_messages(1) // Process one at a time for simplicity
.wait_time_seconds(20) // Use long polling
.message_attribute_names("All")
.send()
.await;
match receive_output {
Ok(output) => {
if let Some(messages) = output.messages {
for message in messages {
match process_message(&s3_client, &sqs_client, &message).await {
Ok(_) => {
info!("Successfully processed message, deleting from queue.");
if let Some(receipt_handle) = message.receipt_handle() {
let _ = sqs_client
.delete_message()
.queue_url(&validation_queue_url)
.receipt_handle(receipt_handle)
.send()
.await;
}
}
Err(e) => {
error!(error = %e, "Failed to process message. It will become visible again.");
sentry::capture_error(&e);
// Do not delete the message, let it be retried after visibility timeout
}
}
}
}
}
Err(e) => {
error!(error = %e, "Failed to receive messages from SQS.");
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
}
// The instrument macro automatically creates a tracing span for this function.
#[instrument(skip(s3_client, sqs_client, message), fields(job_id))]
async fn process_message(
s3_client: &s3::Client,
sqs_client: &sqs::Client,
message: &sqs::types::Message,
) -> Result<(), WorkerError> {
// Sentry Distributed Tracing integration
let sentry_trace = message
.message_attributes()
.and_then(|attrs| attrs.get("sentry_trace"))
.and_then(|attr| attr.string_value());
// Start a new transaction that continues the trace from the Phoenix app.
let tx_ctx = sentry::TransactionContext::from_trace_header(
"rust-worker.process_job",
"queue.process",
sentry_trace.unwrap_or("")
);
let transaction = sentry::start_transaction(tx_ctx);
sentry::configure_scope(|scope| scope.set_span(Some(transaction.clone().into())));
let body = message.body().unwrap_or_default();
let payload: JobPayload = match serde_json::from_str(body) {
Ok(p) => p,
Err(e) => {
error!(error = %e, "Failed to parse SQS message body");
transaction.finish();
// This is an unrecoverable error for this message.
return Ok(()); // Returning Ok to delete the poison pill message.
}
};
// Attach job_id to tracing span and Sentry scope for context.
tracing::Span::current().record("job_id", &payload.job_id);
sentry::configure_scope(|scope| scope.set_tag("job_id", &payload.job_id));
info!(job_id = %payload.job_id, "Starting processing");
// Main processing logic goes here.
let processed_file_key = run_validation_logic(s3_client, &payload).await?;
// Enqueue for the next stage
enqueue_analysis_job(sqs_client, &payload, &processed_file_key).await?;
info!(job_id = %payload.job_id, "Processing complete, enqueued for analysis");
transaction.finish();
Ok(())
}
async fn run_validation_logic(
s3_client: &s3::Client,
payload: &JobPayload,
) -> Result<String, WorkerError> {
let bucket = std::env::var(S3_BUCKET_ENV).unwrap();
// 1. Download file from S3
let get_obj_output = s3_client
.get_object()
.bucket(&bucket)
.key(&payload.s3_input_path)
.send()
.await
.map_err(|e| WorkerError::S3Error(e.to_string()))?;
let body_bytes = get_obj_output.body.collect().await.unwrap().into_bytes();
let mut rdr = csv::Reader::from_reader(body_bytes.as_ref());
// 2. Process records using Rayon for parallelism
let records: Vec<csv::StringRecord> = rdr.records().collect::<Result<_, _>>()?;
let processed_data: Vec<String> = records
.par_iter() // Parallel iterator
.filter_map(|record| {
// Dummy validation: ensure record has 5 columns and the 3rd column is a valid float
if record.len() == 5 && record.get(2).unwrap_or("").parse::<f64>().is_ok() {
// Dummy transformation: capitalize the first column
let mut new_record = record.clone();
new_record.iter_mut().next().map(|field| *field = field.to_uppercase().into());
Some(new_record.into_iter().collect::<Vec<_>>().join(","))
} else {
None // Filter out invalid records
}
})
.collect();
// 3. Upload processed file back to S3
let processed_content = processed_data.join("\n");
let output_key = format!("{}processed.csv", payload.s3_output_path_prefix);
s3_client
.put_object()
.bucket(&bucket)
.key(&output_key)
.body(s3::primitives::ByteStream::from(processed_content.into_bytes()))
.send()
.await
.map_err(|e| WorkerError::S3Error(e.to_string()))?;
Ok(output_key)
}
async fn enqueue_analysis_job(
sqs_client: &sqs::Client,
original_payload: &JobPayload,
processed_file_key: &str,
) -> Result<(), WorkerError> {
let analysis_queue_url = std::env::var(ANALYSIS_QUEUE_URL_ENV).unwrap();
let next_payload = NextJobPayload {
job_id: original_payload.job_id.clone(),
s3_processed_path: processed_file_key.to_string(),
};
let body = serde_json::to_string(&next_payload).unwrap();
// Propagate the Sentry trace context to the next worker
let sentry_span = sentry::configure_scope(|scope| scope.get_span()).flatten();
let trace_header = sentry_span
.map(|span| span.to_traceparent())
.unwrap_or_else(|| "".to_string());
let sentry_trace_attribute = sqs::types::MessageAttributeValue::builder()
.data_type("String")
.string_value(trace_header)
.build();
sqs_client
.send_message()
.queue_url(analysis_queue_url)
.message_body(body)
.message_attributes("sentry_trace", sentry_trace_attribute)
.send()
.await
.map_err(|e| WorkerError::SqsError(e.to_string()))?;
Ok(())
}
The pitfall here is error handling. If run_validation_logic
fails, the process_message
function returns an Err
. This prevents us from deleting the message from SQS. The message will become visible again after its visibility timeout, and the worker will retry it. This is crucial for resilience but requires a dead-letter queue (DLQ) strategy in a real-world project to handle messages that consistently fail.
Python Worker: Pandas-Powered Analysis
This Python script performs a similar role to the Rust worker but for the analysis stage. It polls the analysis_queue
, downloads the processed data, performs some Pandas operations, and stores the final result.
The dependencies are managed in requirements.txt
.
# requirements.txt
boto3
pandas
sentry-sdk
The worker script is structured similarly, with a main polling loop.
# analysis_worker.py
import os
import json
import logging
import time
import boto3
import pandas as pd
import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Sentry Initialization ---
SENTRY_DSN = os.getenv("SENTRY_DSN")
if SENTRY_DSN:
sentry_sdk.init(
dsn=SENTRY_DSN,
integrations=[LoggingIntegration(level=logging.INFO, event_level=logging.ERROR)],
traces_sample_rate=1.0,
release=os.getenv("SENTRY_RELEASE")
)
# --- AWS Clients and Config ---
AWS_REGION = os.getenv("AWS_REGION")
SQS_ANALYSIS_QUEUE_URL = os.getenv("SQS_ANALYSIS_QUEUE_URL")
S3_BUCKET = os.getenv("S3_BUCKET")
sqs = boto3.client("sqs", region_name=AWS_REGION)
s3 = boto3.client("s3", region_name=AWS_REGION)
def perform_analysis(s3_path: str) -> str:
"""
Downloads data from S3, performs analysis with Pandas, and returns a summary.
A common mistake is to not handle potential empty files or parsing errors gracefully.
"""
try:
obj = s3.get_object(Bucket=S3_BUCKET, Key=s3_path)
df = pd.read_csv(obj['Body'])
if df.empty:
logging.warning(f"File at {s3_path} is empty.")
return "Analysis complete: input file was empty."
# Example analysis: calculate descriptive statistics on the third column
# (which we know is numeric from the Rust validation step)
# Assuming column names are 0, 1, 2, 3, 4
description = df.iloc[:, 2].describe().to_json()
logging.info(f"Analysis complete for {s3_path}. Mean value: {df.iloc[:, 2].mean()}")
return description
except Exception as e:
logging.error(f"Failed during pandas analysis for {s3_path}: {e}")
sentry_sdk.capture_exception(e)
raise # Re-raise to signal failure to the main loop
def process_message(message: dict):
"""Processes a single SQS message."""
sentry_trace_header = ""
if 'MessageAttributes' in message and 'sentry_trace' in message['MessageAttributes']:
sentry_trace_header = message['MessageAttributes']['sentry_trace']['StringValue']
# Continue the distributed trace from the Rust worker
transaction = sentry_sdk.start_transaction(
op="queue.process",
name="python-worker.analyze_data",
# This function correctly parses the header and links the transactions
parent_trace_id=sentry_sdk.get_traceparent_from_string(sentry_trace_header)
)
with sentry_sdk.start_span(op="process", description="process_message") as span:
try:
body = json.loads(message['Body'])
job_id = body.get("job_id")
s3_path = body.get("s3_processed_path")
if not job_id or not s3_path:
logging.error("Invalid message body received, missing keys.")
return # Acknowledge and drop the poison pill message
span.set_tag("job_id", job_id)
sentry_sdk.set_tag("job_id", job_id)
logging.info(f"Processing job {job_id} for file {s3_path}")
# Run the core logic
analysis_result = perform_analysis(s3_path)
# Store the final result in S3
result_key = f"results/{job_id}/analysis_summary.json"
s3.put_object(
Bucket=S3_BUCKET,
Key=result_key,
Body=analysis_result.encode('utf-8'),
ContentType='application/json'
)
logging.info(f"Successfully stored analysis result for job {job_id} at {result_key}")
except Exception as e:
logging.error(f"Unhandled exception in process_message for job {job_id}: {e}")
sentry_sdk.capture_exception(e)
transaction.set_status("internal_error")
raise # Propagate error to main loop to prevent message deletion
finally:
transaction.finish()
def main():
"""Main polling loop."""
logging.info(f"Starting Python analysis worker, polling {SQS_ANALYSIS_QUEUE_URL}")
while True:
try:
response = sqs.receive_message(
QueueUrl=SQS_ANALYSIS_QUEUE_URL,
MaxNumberOfMessages=1,
WaitTimeSeconds=20,
MessageAttributeNames=['All']
)
messages = response.get('Messages', [])
if not messages:
continue
message = messages[0]
receipt_handle = message['ReceiptHandle']
try:
process_message(message)
sqs.delete_message(
QueueUrl=SQS_ANALYSIS_QUEUE_URL,
ReceiptHandle=receipt_handle
)
except Exception:
# Error already logged and sent to Sentry inside process_message.
# Do not delete the message, allow it to be retried.
logging.error("Processing failed, not deleting message.")
except Exception as e:
logging.error(f"Error in main SQS polling loop: {e}")
sentry_sdk.capture_exception(e)
time.sleep(5)
if __name__ == "__main__":
main()
The final result is a system where each component is specialized. The Phoenix web server remains light and responsive. The computationally intensive validation is handled by a scalable pool of Rust workers optimized for performance. The complex, domain-specific statistical analysis is handled by a separate pool of Python workers, leveraging the mature data science ecosystem without compromising the performance of the core validation stage. An error in any single job processing attempt is captured in Sentry with a full, cross-language trace, does not bring down the entire system, and the job is automatically retried.
This architecture is not without its trade-offs. The operational complexity is higher than a monolith; we now have three distinct services to deploy, monitor, and manage. The reliance on S3 for intermediate data storage introduces latency and cost, which would be a consideration for smaller, faster jobs. For future iterations, replacing the Python/Pandas worker with a Rust-based solution using a library like Polars could be explored. This would reduce the polyglot footprint and potentially improve performance, but at the cost of sacrificing the vast ecosystem and ease of use that Pandas provides for complex, exploratory data analysis. The boundary of applicability for this pattern lies where the processing time and resource isolation benefits decisively outweigh the increased architectural complexity.