Implementing a High-Throughput Polyglot Data Pipeline with Phoenix, Rust, and AWS SQS


The initial system bottleneck was a Phoenix controller action that timed out. Users uploaded multi-gigabyte CSV files for financial analysis, and our Elixir-based background processing, even when offloaded via Task.async, began to cripple the BEAM scheduler under heavy load. The memory consumption and long-running, CPU-bound nature of the validation and transformation logic were not a good fit for the BEAM’s cooperative multitasking model, which excels at I/O-bound concurrency. This led to unacceptable scheduler latency spikes, impacting the responsiveness of the entire real-time application. A fundamental architectural shift was necessary.

The decision was made to build a completely decoupled, asynchronous processing pipeline. The core requirements were fault tolerance, scalability of individual components, and the ability to use the best tool for each specific job. This led us to a polyglot architecture glued together by AWS SQS.

  • Phoenix/Elixir: Remains the user-facing web layer. Its responsibility is solely to handle the file upload, persist it to S3, and enqueue the initial processing job. It must return a response to the user immediately.
  • AWS SQS: The central nervous system. A standard queue for initial job submissions and another for jobs that have passed primary validation and are ready for statistical analysis. This provides backpressure handling and ensures message durability.
  • Rust: The first-stage worker. Chosen for its raw performance, memory safety, and excellent support for parallel processing. It will handle the heavy lifting: downloading the large file from S3, performing validation, normalization, and transformation on a massive scale.
  • Python/Pandas: The second-stage worker. While Rust could perform statistical analysis, the required models and data manipulations were already well-established and battle-tested within the Python data science ecosystem, specifically with Pandas and NumPy. Re-implementing this in Rust would be time-consuming and error-prone. In a real-world project, leveraging an existing, powerful ecosystem is often the more pragmatic choice.
  • Sentry: Observability in a distributed, polyglot system is non-negotiable. Sentry’s ability to propagate trace context across service and language boundaries via message queue attributes is critical for debugging a single user’s request as it flows through the entire pipeline.

The architecture can be visualized as follows:

graph TD
    subgraph User Interaction
        Client[User's Browser] -- Uploads CSV --> Phoenix
    end

    subgraph AWS
        Phoenix -- 1. Uploads to --> S3[(S3 Bucket)]
        Phoenix -- 2. Enqueues Job --> SQS_A[SQS: validation_queue]
        S3 -- 4. Downloads CSV --> RustWorker
        RustWorker -- 3. Polls Job --> SQS_A
        RustWorker -- 5. Uploads Processed Data --> S3
        RustWorker -- 6. Enqueues Job --> SQS_B[SQS: analysis_queue]
        S3 -- 8. Downloads Processed Data --> PythonWorker
        PythonWorker -- 7. Polls Job --> SQS_B
        PythonWorker -- 9. Writes Final Report --> S3
    end

    subgraph Workers
        RustWorker[Rust Validation Worker]
        PythonWorker[Python Pandas Worker]
    end

    subgraph Observability
        Phoenix -- Reports Errors/Traces --> SentryService[(Sentry)]
        RustWorker -- Reports Errors/Traces --> SentryService
        PythonWorker -- Reports Errors/Traces --> SentryService
    end

Phoenix Producer: Enqueuing the Initial Job

The Phoenix controller’s only job is to get the file off its hands as quickly as possible. We stream the upload directly to an S3 bucket to avoid loading the entire file into the web server’s memory. Once the upload is confirmed, a message is dispatched to SQS.

First, the necessary configuration in config/config.exs for the ex_aws and ex_aws_sqs libraries.

# config/config.exs
config :ex_aws,
  access_key_id: System.get_env("AWS_ACCESS_KEY_ID"),
  secret_access_key: System.get_env("AWS_SECRET_ACCESS_KEY"),
  region: System.get_env("AWS_REGION")

config :my_app, :sqs,
  validation_queue_url: System.get_env("SQS_VALIDATION_QUEUE_URL")

The controller action handles the multipart upload and enqueues the task. A critical piece is initializing a Sentry transaction and passing its trace context into the SQS message attributes. This is the starting point for our distributed trace.

# lib/my_app_web/controllers/upload_controller.ex
defmodule MyAppWeb.UploadController do
  use MyAppWeb, :controller

  alias ExAws.SQS

  def create(conn, %{"upload" => %Plug.Upload{} = upload}) do
    job_id = Ecto.UUID.generate()
    s3_key = "uploads/#{job_id}/#{upload.filename}"

    # Start a Sentry transaction for this request
    sentry_ctx = Sentry.start_transaction("upload.process", "request")
    Sentry.put_context(sentry_ctx, :user, %{id: conn.assigns.current_user.id})
    Sentry.set_tags(%{job_id: job_id})

    # The actual upload logic to S3. Assume `Uploader.stream_to_s3/2` is a function
    # that handles streaming the Plug.Upload to S3 and returns {:ok, s3_key}
    case Uploader.stream_to_s3(upload, s3_key) do
      {:ok, saved_key} ->
        # Sentry.get_trace_header returns the `sentry-trace` string
        trace_header = Sentry.get_trace_header(sentry_ctx)
        
        enqueue_validation_job(job_id, saved_key, trace_header)
        
        Sentry.finish_transaction(sentry_ctx)
        
        conn
        |> put_status(:accepted)
        |> json(%{status: "processing", job_id: job_id})

      {:error, reason} ->
        Sentry.capture_exception(reason, stacktrace: __STACKTRACE__)
        Sentry.finish_transaction(sentry_ctx, :error)

        conn
        |> put_status(:internal_server_error)
        |> json(%{error: "failed to upload file"})
    end
  end

  defp enqueue_validation_job(job_id, s3_key, trace_header) do
    queue_url = Application.fetch_env!(:my_app, :sqs)[:validation_queue_url]

    message_body =
      Jason.encode!(%{
        job_id: job_id,
        s3_input_path: s3_key,
        s3_output_path_prefix: "processed/#{job_id}/"
      })

    # This is the key to distributed tracing. We pass the trace context as a message attribute.
    message_attributes = %{
      "sentry_trace" => %{
        data_type: "String",
        string_value: trace_header
      }
    }

    SQS.send_message(queue_url, message_body, message_attributes: message_attributes)
    |> ExAws.request()
  end
end

The message contract is simple JSON. It’s vital to establish and document this contract as it’s the API between our services.

{
  "job_id": "uuid-goes-here",
  "s3_input_path": "uploads/uuid-goes-here/raw_data.csv",
  "s3_output_path_prefix": "processed/uuid-goes-here/"
}

Rust Worker: High-Performance Validation

This is a standalone Rust binary, designed to be deployed in a container and run as a long-lived process. It polls SQS, processes files in parallel, and enqueues jobs for the next stage.

The Cargo.toml specifies dependencies for AWS, serialization, logging, error handling, and parallel processing.

# Cargo.toml
[package]
name = "validation-worker"
version = "0.1.0"
edition = "2021"

[dependencies]
aws-config = "0.56"
aws-sdk-s3 = "0.29"
aws-sdk-sqs = "0.29"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
csv = "1.2"
rayon = "1.7"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
sentry = "0.31"
thiserror = "1.0"
anyhow = "1.0"

The core logic resides in main.rs. We set up logging, Sentry, and an AWS SDK client. The main loop continuously polls SQS for messages.

// src/main.rs
use aws_sdk_s3 as s3;
use aws_sdk_sqs as sqs;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{error, info, instrument};

#[derive(Debug, Deserialize)]
struct JobPayload {
    job_id: String,
    s3_input_path: String,
    s3_output_path_prefix: String,
}

#[derive(Debug, Serialize)]
struct NextJobPayload {
    job_id: String,
    s3_processed_path: String,
}

// Define custom error types for better error handling
#[derive(thiserror::Error, Debug)]
enum WorkerError {
    #[error("S3 operation failed: {0}")]
    S3Error(String),
    #[error("SQS operation failed: {0}")]
    SqsError(String),
    #[error("CSV processing failed: {0}")]
    CsvError(#[from] csv::Error),
    #[error("Data validation failed: {0}")]
    ValidationError(String),
}

const VALIDATION_QUEUE_URL_ENV: &str = "SQS_VALIDATION_QUEUE_URL";
const ANALYSIS_QUEUE_URL_ENV: &str = "SQS_ANALYSIS_QUEUE_URL";
const S3_BUCKET_ENV: &str = "S3_BUCKET";

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    tracing_subscriber::fmt::init();
    
    // Initialize Sentry. DSN is read from the SENTRY_DSN environment variable.
    let _guard = sentry::init((
        std::env::var("SENTRY_DSN").expect("SENTRY_DSN must be set"),
        sentry::ClientOptions {
            release: sentry::release_name!(),
            ..Default::default()
        },
    ));
    
    let config = aws_config::load_from_env().await;
    let sqs_client = sqs::Client::new(&config);
    let s3_client = s3::Client::new(&config);

    let validation_queue_url = std::env::var(VALIDATION_QUEUE_URL_ENV)?;
    info!(queue = %validation_queue_url, "Worker starting, polling SQS queue");

    loop {
        let receive_output = sqs_client
            .receive_message()
            .queue_url(&validation_queue_url)
            .max_number_of_messages(1) // Process one at a time for simplicity
            .wait_time_seconds(20) // Use long polling
            .message_attribute_names("All")
            .send()
            .await;

        match receive_output {
            Ok(output) => {
                if let Some(messages) = output.messages {
                    for message in messages {
                        match process_message(&s3_client, &sqs_client, &message).await {
                            Ok(_) => {
                                info!("Successfully processed message, deleting from queue.");
                                if let Some(receipt_handle) = message.receipt_handle() {
                                    let _ = sqs_client
                                        .delete_message()
                                        .queue_url(&validation_queue_url)
                                        .receipt_handle(receipt_handle)
                                        .send()
                                        .await;
                                }
                            }
                            Err(e) => {
                                error!(error = %e, "Failed to process message. It will become visible again.");
                                sentry::capture_error(&e);
                                // Do not delete the message, let it be retried after visibility timeout
                            }
                        }
                    }
                }
            }
            Err(e) => {
                error!(error = %e, "Failed to receive messages from SQS.");
                tokio::time::sleep(Duration::from_secs(5)).await;
            }
        }
    }
}

// The instrument macro automatically creates a tracing span for this function.
#[instrument(skip(s3_client, sqs_client, message), fields(job_id))]
async fn process_message(
    s3_client: &s3::Client,
    sqs_client: &sqs::Client,
    message: &sqs::types::Message,
) -> Result<(), WorkerError> {
    // Sentry Distributed Tracing integration
    let sentry_trace = message
        .message_attributes()
        .and_then(|attrs| attrs.get("sentry_trace"))
        .and_then(|attr| attr.string_value());
    
    // Start a new transaction that continues the trace from the Phoenix app.
    let tx_ctx = sentry::TransactionContext::from_trace_header(
        "rust-worker.process_job", 
        "queue.process", 
        sentry_trace.unwrap_or("")
    );
    let transaction = sentry::start_transaction(tx_ctx);
    sentry::configure_scope(|scope| scope.set_span(Some(transaction.clone().into())));

    let body = message.body().unwrap_or_default();
    let payload: JobPayload = match serde_json::from_str(body) {
        Ok(p) => p,
        Err(e) => {
            error!(error = %e, "Failed to parse SQS message body");
            transaction.finish();
            // This is an unrecoverable error for this message.
            return Ok(()); // Returning Ok to delete the poison pill message.
        }
    };
    
    // Attach job_id to tracing span and Sentry scope for context.
    tracing::Span::current().record("job_id", &payload.job_id);
    sentry::configure_scope(|scope| scope.set_tag("job_id", &payload.job_id));
    
    info!(job_id = %payload.job_id, "Starting processing");

    // Main processing logic goes here.
    let processed_file_key = run_validation_logic(s3_client, &payload).await?;

    // Enqueue for the next stage
    enqueue_analysis_job(sqs_client, &payload, &processed_file_key).await?;
    
    info!(job_id = %payload.job_id, "Processing complete, enqueued for analysis");
    transaction.finish();
    Ok(())
}

async fn run_validation_logic(
    s3_client: &s3::Client,
    payload: &JobPayload,
) -> Result<String, WorkerError> {
    let bucket = std::env::var(S3_BUCKET_ENV).unwrap();

    // 1. Download file from S3
    let get_obj_output = s3_client
        .get_object()
        .bucket(&bucket)
        .key(&payload.s3_input_path)
        .send()
        .await
        .map_err(|e| WorkerError::S3Error(e.to_string()))?;
    
    let body_bytes = get_obj_output.body.collect().await.unwrap().into_bytes();
    let mut rdr = csv::Reader::from_reader(body_bytes.as_ref());
    
    // 2. Process records using Rayon for parallelism
    let records: Vec<csv::StringRecord> = rdr.records().collect::<Result<_, _>>()?;
    
    let processed_data: Vec<String> = records
        .par_iter() // Parallel iterator
        .filter_map(|record| {
            // Dummy validation: ensure record has 5 columns and the 3rd column is a valid float
            if record.len() == 5 && record.get(2).unwrap_or("").parse::<f64>().is_ok() {
                // Dummy transformation: capitalize the first column
                let mut new_record = record.clone();
                new_record.iter_mut().next().map(|field| *field = field.to_uppercase().into());
                Some(new_record.into_iter().collect::<Vec<_>>().join(","))
            } else {
                None // Filter out invalid records
            }
        })
        .collect();

    // 3. Upload processed file back to S3
    let processed_content = processed_data.join("\n");
    let output_key = format!("{}processed.csv", payload.s3_output_path_prefix);
    
    s3_client
        .put_object()
        .bucket(&bucket)
        .key(&output_key)
        .body(s3::primitives::ByteStream::from(processed_content.into_bytes()))
        .send()
        .await
        .map_err(|e| WorkerError::S3Error(e.to_string()))?;

    Ok(output_key)
}

async fn enqueue_analysis_job(
    sqs_client: &sqs::Client,
    original_payload: &JobPayload,
    processed_file_key: &str,
) -> Result<(), WorkerError> {
    let analysis_queue_url = std::env::var(ANALYSIS_QUEUE_URL_ENV).unwrap();
    
    let next_payload = NextJobPayload {
        job_id: original_payload.job_id.clone(),
        s3_processed_path: processed_file_key.to_string(),
    };
    
    let body = serde_json::to_string(&next_payload).unwrap();

    // Propagate the Sentry trace context to the next worker
    let sentry_span = sentry::configure_scope(|scope| scope.get_span()).flatten();
    let trace_header = sentry_span
        .map(|span| span.to_traceparent())
        .unwrap_or_else(|| "".to_string());
    
    let sentry_trace_attribute = sqs::types::MessageAttributeValue::builder()
        .data_type("String")
        .string_value(trace_header)
        .build();

    sqs_client
        .send_message()
        .queue_url(analysis_queue_url)
        .message_body(body)
        .message_attributes("sentry_trace", sentry_trace_attribute)
        .send()
        .await
        .map_err(|e| WorkerError::SqsError(e.to_string()))?;
        
    Ok(())
}

The pitfall here is error handling. If run_validation_logic fails, the process_message function returns an Err. This prevents us from deleting the message from SQS. The message will become visible again after its visibility timeout, and the worker will retry it. This is crucial for resilience but requires a dead-letter queue (DLQ) strategy in a real-world project to handle messages that consistently fail.

Python Worker: Pandas-Powered Analysis

This Python script performs a similar role to the Rust worker but for the analysis stage. It polls the analysis_queue, downloads the processed data, performs some Pandas operations, and stores the final result.

The dependencies are managed in requirements.txt.

# requirements.txt
boto3
pandas
sentry-sdk

The worker script is structured similarly, with a main polling loop.

# analysis_worker.py
import os
import json
import logging
import time

import boto3
import pandas as pd
import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Sentry Initialization ---
SENTRY_DSN = os.getenv("SENTRY_DSN")
if SENTRY_DSN:
    sentry_sdk.init(
        dsn=SENTRY_DSN,
        integrations=[LoggingIntegration(level=logging.INFO, event_level=logging.ERROR)],
        traces_sample_rate=1.0,
        release=os.getenv("SENTRY_RELEASE")
    )

# --- AWS Clients and Config ---
AWS_REGION = os.getenv("AWS_REGION")
SQS_ANALYSIS_QUEUE_URL = os.getenv("SQS_ANALYSIS_QUEUE_URL")
S3_BUCKET = os.getenv("S3_BUCKET")

sqs = boto3.client("sqs", region_name=AWS_REGION)
s3 = boto3.client("s3", region_name=AWS_REGION)

def perform_analysis(s3_path: str) -> str:
    """
    Downloads data from S3, performs analysis with Pandas, and returns a summary.
    A common mistake is to not handle potential empty files or parsing errors gracefully.
    """
    try:
        obj = s3.get_object(Bucket=S3_BUCKET, Key=s3_path)
        df = pd.read_csv(obj['Body'])
        
        if df.empty:
            logging.warning(f"File at {s3_path} is empty.")
            return "Analysis complete: input file was empty."

        # Example analysis: calculate descriptive statistics on the third column
        # (which we know is numeric from the Rust validation step)
        # Assuming column names are 0, 1, 2, 3, 4
        description = df.iloc[:, 2].describe().to_json()
        
        logging.info(f"Analysis complete for {s3_path}. Mean value: {df.iloc[:, 2].mean()}")
        return description

    except Exception as e:
        logging.error(f"Failed during pandas analysis for {s3_path}: {e}")
        sentry_sdk.capture_exception(e)
        raise  # Re-raise to signal failure to the main loop

def process_message(message: dict):
    """Processes a single SQS message."""
    sentry_trace_header = ""
    if 'MessageAttributes' in message and 'sentry_trace' in message['MessageAttributes']:
        sentry_trace_header = message['MessageAttributes']['sentry_trace']['StringValue']
    
    # Continue the distributed trace from the Rust worker
    transaction = sentry_sdk.start_transaction(
        op="queue.process", 
        name="python-worker.analyze_data",
        # This function correctly parses the header and links the transactions
        parent_trace_id=sentry_sdk.get_traceparent_from_string(sentry_trace_header)
    )

    with sentry_sdk.start_span(op="process", description="process_message") as span:
        try:
            body = json.loads(message['Body'])
            job_id = body.get("job_id")
            s3_path = body.get("s3_processed_path")

            if not job_id or not s3_path:
                logging.error("Invalid message body received, missing keys.")
                return # Acknowledge and drop the poison pill message

            span.set_tag("job_id", job_id)
            sentry_sdk.set_tag("job_id", job_id)
            logging.info(f"Processing job {job_id} for file {s3_path}")

            # Run the core logic
            analysis_result = perform_analysis(s3_path)

            # Store the final result in S3
            result_key = f"results/{job_id}/analysis_summary.json"
            s3.put_object(
                Bucket=S3_BUCKET,
                Key=result_key,
                Body=analysis_result.encode('utf-8'),
                ContentType='application/json'
            )
            logging.info(f"Successfully stored analysis result for job {job_id} at {result_key}")

        except Exception as e:
            logging.error(f"Unhandled exception in process_message for job {job_id}: {e}")
            sentry_sdk.capture_exception(e)
            transaction.set_status("internal_error")
            raise # Propagate error to main loop to prevent message deletion
        finally:
            transaction.finish()

def main():
    """Main polling loop."""
    logging.info(f"Starting Python analysis worker, polling {SQS_ANALYSIS_QUEUE_URL}")
    while True:
        try:
            response = sqs.receive_message(
                QueueUrl=SQS_ANALYSIS_QUEUE_URL,
                MaxNumberOfMessages=1,
                WaitTimeSeconds=20,
                MessageAttributeNames=['All']
            )

            messages = response.get('Messages', [])
            if not messages:
                continue

            message = messages[0]
            receipt_handle = message['ReceiptHandle']

            try:
                process_message(message)
                sqs.delete_message(
                    QueueUrl=SQS_ANALYSIS_QUEUE_URL,
                    ReceiptHandle=receipt_handle
                )
            except Exception:
                # Error already logged and sent to Sentry inside process_message.
                # Do not delete the message, allow it to be retried.
                logging.error("Processing failed, not deleting message.")
                
        except Exception as e:
            logging.error(f"Error in main SQS polling loop: {e}")
            sentry_sdk.capture_exception(e)
            time.sleep(5)

if __name__ == "__main__":
    main()

The final result is a system where each component is specialized. The Phoenix web server remains light and responsive. The computationally intensive validation is handled by a scalable pool of Rust workers optimized for performance. The complex, domain-specific statistical analysis is handled by a separate pool of Python workers, leveraging the mature data science ecosystem without compromising the performance of the core validation stage. An error in any single job processing attempt is captured in Sentry with a full, cross-language trace, does not bring down the entire system, and the job is automatically retried.

This architecture is not without its trade-offs. The operational complexity is higher than a monolith; we now have three distinct services to deploy, monitor, and manage. The reliance on S3 for intermediate data storage introduces latency and cost, which would be a consideration for smaller, faster jobs. For future iterations, replacing the Python/Pandas worker with a Rust-based solution using a library like Polars could be explored. This would reduce the polyglot footprint and potentially improve performance, but at the cost of sacrificing the vast ecosystem and ease of use that Pandas provides for complex, exploratory data analysis. The boundary of applicability for this pattern lies where the processing time and resource isolation benefits decisively outweigh the increased architectural complexity.


  TOC