Constructing a Polyglot Data Ingestion Pipeline with Elixir and Rust for Apache Hudi on AWS


The monolith was failing. Our IoT analytics platform, responsible for ingesting real-time sensor data from millions of devices, was built on a massive PostgreSQL cluster. It worked for the first few hundred billion records, but the constant UPDATE statements to capture the latest device state created unbearable write amplification and lock contention. Analytical queries, which needed to scan the latest state of all devices, would time out, and the cost of the vertically-scaled database instances on AWS was becoming unjustifiable. We were facing a classic write-heavy, analytical-read workload that relational databases are simply not designed to handle at petabyte scale.

Our initial concept was to pivot to a data lakehouse architecture on AWS S3. The appeal was clear: decouple storage from compute, leverage cheap, infinitely scalable storage, and use query engines like AWS Athena or Spark for analytics. The core of this strategy rested on selecting the right transactional table format. We evaluated Apache Hudi, Delta Lake, and Apache Iceberg. We chose Hudi for one critical feature: its Merge-on-Read (MOR) table type. This allows for extremely fast ingestion by writing new data into delta log files, while a separate, asynchronous compaction process merges them into columnar base files. This perfectly matched our requirement for low-latency writes and near-real-time data visibility for our analytical queries.

The first architectural decision was the ingestion service itself. The sheer number of concurrent, long-lived connections from devices screamed for a platform designed for this exact problem. Elixir, running on the Erlang BEAM VM, was the obvious choice. Its lightweight processes, actor-model concurrency via GenServers, and built-in supervision trees provide an incredibly robust foundation for building a C10M-scale ingestion gateway. A prototype was simple: a GenServer per device connection, buffering data and flushing it to S3.

This is where we hit the first major roadblock. Writing to a Hudi table is not just a simple file dump. It involves creating versioned, columnar Parquet files, generating and updating complex metadata in a timeline, and handling commit protocols to ensure atomicity. The canonical Hudi libraries are Java-based. While interoperability tools exist, running a JVM inside the BEAM via something like JInterface felt clunky and would introduce an entirely separate, heavyweight runtime to manage. More critically, the CPU-intensive work of serializing data to Parquet and managing Hudi’s file layout is a terrible fit for the BEAM’s cooperative scheduler. A long-running CPU-bound task in one process can starve other processes, destroying the very latency characteristics we chose Elixir for.

The solution was a polyglot architecture. Elixir would do what it does best: manage I/O, concurrency, and application state. For the CPU-bound heavy lifting, we would delegate to a service written in a language built for performance: Rust. Communication between them would be handled by gRPC for a strongly-typed, high-performance contract. Rust’s ecosystem, with libraries like Tonic for gRPC, arrow-rs and parquet for data serialization, and the official aws-sdk-rust, provided all the necessary building blocks for a lean, efficient “Hudi Writer” service. The overall project also included a complex real-time monitoring dashboard; for its front-end, we chose Turbopack to accelerate our React development cycle, as its Rust-based architecture provided near-instantaneous hot module replacement even with a large codebase. This post, however, focuses on the data plane: the Elixir gateway and its Rust-based writer sidecar.

The Architectural Blueprint: Elixir Gateway and Rust Writer

The data flow is designed to isolate responsibilities.

graph TD
    A[IoT Devices] -->|TCP/WebSocket Streams| B(Elixir Phoenix Channel);
    B --> C{Device Supervisor};
    C -->|Spawns| D[DeviceWorker GenServer];
    D -->|Buffer & Batch| E{Batcher GenServer};
    E -->|gRPC Request| F[Rust HudiWriter Service];
    F -->|Parquet & Hudi Metadata| G[AWS S3 Bucket];
    H[AWS Athena] -->|Queries| G;
    I[Spark Compaction Job] -->|Compacts Log Files| G;

    subgraph "Elixir Ingestion Gateway (On EC2/K8s)"
        B
        C
        D
        E
    end

    subgraph "Rust gRPC Service (On EC2/K8s)"
        F
    end

    subgraph "AWS Data Lake"
        G
        H
        I
    end
  1. Elixir Gateway: A Phoenix application accepts connections. For each device, a DeviceWorker GenServer is dynamically supervised.
  2. Buffering: The DeviceWorker buffers incoming sensor readings. To avoid overwhelming the Rust service with tiny requests, it forwards batches to a Batcher GenServer.
  3. gRPC Handoff: The Batcher consolidates records from many devices into larger payloads and sends them via gRPC to the Rust service.
  4. Rust Hudi Writer: This service receives the data, transforms it into an Arrow RecordBatch, writes it as a Parquet file, and handles the Hudi commit protocol by writing metadata files to S3.
  5. Data Lake: The data lands in S3, structured as a Hudi Merge-on-Read table, immediately available for query by Athena. A periodic Spark job handles compaction.

The Contract: Defining the gRPC Service

Everything starts with the Protobuf contract. This defines the immutable interface between Elixir and Rust. We need a way to send a collection of records, each with a map of key-value pairs representing sensor readings, along with Hudi-specific metadata.

proto/ingester.proto:

syntax = "proto3";

package ingester;

// The core service for writing data to the Hudi table
service HudiWriter {
  // Writes a batch of records. Returns the commit time upon success.
  rpc WriteBatch(WriteRequest) returns (WriteResponse) {}
}

// Represents a single logical record from a device
message Record {
  // Required Hudi fields: record key, partition path
  string hudi_record_key = 1;
  string hudi_partition_path = 2;
  // Precombine key, used for deduplication before writing.
  // Typically a timestamp of the event.
  int64 hudi_precombine_key = 3;

  // The actual payload as a map of string keys to typed values.
  map<string, Value> payload = 4;
}

// A wrapper to support different data types in the payload map.
message Value {
  oneof kind {
    string string_value = 1;
    int64 int_value = 2;
    double double_value = 3;
    bool bool_value = 4;
  }
}

// The request payload containing a batch of records
message WriteRequest {
  string request_id = 1; // For logging and tracing
  repeated Record records = 2;
}

// The response after a successful write
message WriteResponse {
  string commit_time = 1;
  uint32 records_written = 2;
}

This contract is fundamental. The hudi_record_key, hudi_partition_path, and hudi_precombine_key are essential for Hudi to correctly place data and perform upserts. The payload is flexible enough to handle varying sensor schemas.

The Heavy Lifter: Implementing the Rust Hudi Writer with Tonic

The Rust service is where the performance-critical logic resides. It needs to be lean, fast, and robust.

Cargo.toml dependencies:

[package]
name = "hudi_writer"
version = "0.1.0"
edition = "2021"

[dependencies]
tonic = "0.8"
prost = "0.11"
tokio = { version = "1", features = ["full"] }
arrow = "35.0"
parquet = { version = "35.0", features = ["arrow", "async"] }
aws-config = "0.55"
aws-sdk-s3 = "0.28"
uuid = { version = "1.3", features = ["v4"] }
chrono = "0.4"
log = "0.4"
env_logger = "0.10"
futures = "0.3"
bytes = "1"

The core of the service is the Tonic server implementation.

src/main.rs:

use tonic::{transport::Server, Request, Response, Status};
use futures::stream::{self, StreamExt};
use ingester::hudi_writer_server::{HudiWriter, HudiWriterServer};
use ingester::{WriteRequest, WriteResponse, Record, Value};

// Omitted: Protobuf generated code module `ingester`
pub mod ingester {
    tonic::include_proto!("ingester");
}

// --- Data Conversion Logic ---
// ... functions to convert protobuf `Record` to Arrow `StructArray` ...
// This part is non-trivial and involves mapping the `map<string, Value>`
// to a fixed Arrow schema. For production, you'd likely fetch the schema
// from a schema registry. For this example, we'll assume a fixed schema.

// --- S3 and Hudi Logic ---
mod hudi_ops;

#[derive(Debug, Default)]
pub struct HudiWriterService {
    s3_client: aws_sdk_s3::Client,
    bucket_name: String,
    table_path: String,
}

impl HudiWriterService {
    async fn new() -> Self {
        let config = aws_config::load_from_env().await;
        let s3_client = aws_sdk_s3::Client::new(&config);
        let bucket_name = std::env::var("HUDI_BUCKET_NAME").expect("HUDI_BUCKET_NAME not set");
        let table_path = std::env::var("HUDI_TABLE_PATH").expect("HUDI_TABLE_PATH not set");
        HudiWriterService { s3_client, bucket_name, table_path }
    }
}

#[tonic::async_trait]
impl HudiWriter for HudiWriterService {
    async fn write_batch(
        &self,
        request: Request<WriteRequest>,
    ) -> Result<Response<WriteResponse>, Status> {
        let req = request.into_inner();
        let record_count = req.records.len();
        log::info!("Received batch {} with {} records", req.request_id, record_count);

        if req.records.is_empty() {
            return Err(Status::invalid_argument("Cannot process an empty batch"));
        }

        // The real complexity is here. Hudi requires grouping records by partition path.
        // A real-world implementation would use a HashMap<String, Vec<Record>>.
        // For simplicity, we assume all records in a batch share a partition path.
        let partition_path = req.records[0].hudi_partition_path.clone();

        let commit_time = hudi_ops::generate_commit_time();
        
        // 1. Mark commit as inflight
        hudi_ops::create_inflight_commit_file(&self.s3_client, &self.bucket_name, &self.table_path, &commit_time)
            .await
            .map_err(|e| Status::internal(format!("Failed to create inflight commit: {}", e)))?;

        // 2. Convert to Arrow and write Parquet data file
        // This is a major simplification. Real logic involves `arrow::record_batch::RecordBatch`.
        let file_id = uuid::Uuid::new_v4().to_string();
        let parquet_data = hudi_ops::convert_to_parquet(req.records)
            .map_err(|e| Status::internal(format!("Parquet conversion failed: {}", e)))?;
        
        hudi_ops::write_log_file(&self.s3_client, &self.bucket_name, &self.table_path, &partition_path, &file_id, &commit_time, parquet_data)
            .await
            .map_err(|e| Status::internal(format!("S3 write failed: {}", e)))?;

        // 3. Complete the commit by moving from inflight to committed
        let commit_metadata = hudi_ops::generate_commit_metadata(record_count, &partition_path, &file_id);
        hudi_ops::complete_commit(&self.s3_client, &self.bucket_name, &self.table_path, &commit_time, commit_metadata)
            .await
            .map_err(|e| Status::internal(format!("Failed to finalize commit: {}", e)))?;

        Ok(Response::new(WriteResponse {
            commit_time,
            records_written: record_count as u32,
        }))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    env_logger::init();
    let addr = "0.0.0.0:50051".parse()?;
    let writer_service = HudiWriterService::new().await;
    
    log::info!("HudiWriter service listening on {}", addr);

    Server::builder()
        .add_service(HudiWriterServer::new(writer_service))
        .serve(addr)
        .await?;

    Ok(())
}

The pitfall here is underestimating the complexity of the Hudi commit protocol. Manually creating timeline files (.inflight, .commit), writing versioned data files, and generating commit metadata is brittle. A production-grade service would need to be much more robust, handling partial failures and retries with care to avoid corrupting the Hudi timeline.

src/hudi_ops.rs:

use aws_sdk_s3::{Client, ByteStream};
use parquet::arrow::arrow_writer::ArrowWriter;
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
// ... other necessary imports

pub fn generate_commit_time() -> String {
    chrono::Utc::now().format("%Y%m%d%H%M%S%3f").to_string()
}

// Simplified function to create a placeholder for an inflight commit
pub async fn create_inflight_commit_file(
    client: &Client, bucket: &str, table_path: &str, commit_time: &str
) -> Result<(), aws_sdk_s3::Error> {
    let key = format!("{}/.hoodie/{}.commit.inflight", table_path, commit_time);
    client.put_object().bucket(bucket).key(key).body(ByteStream::from(Vec::new())).send().await?;
    Ok(())
}

// This function is the core of the data writing logic.
// In reality, it would take Vec<Record> and build a RecordBatch.
// We'll mock this part to focus on the flow.
pub fn convert_to_parquet(records: Vec<super::ingester::Record>) -> Result<Vec<u8>, anyhow::Error> {
    //
    // ... Assume implementation that uses `arrow` crate to build a `RecordBatch`
    // from the `records` vector based on a predefined schema.
    //
    // let schema = ...
    // let rb = RecordBatch::try_new(schema, columns)?;
    //
    // let mut buffer: Vec<u8> = Vec::new();
    // let mut writer = ArrowWriter::try_new(&mut buffer, rb.schema(), None)?;
    // writer.write(&rb)?;
    // writer.close()?;
    // Ok(buffer)
    //
    // For now, return dummy data
    Ok(b"dummy parquet data".to_vec())
}

// For MOR tables, we write to `.log` files which are deltas.
pub async fn write_log_file(
    client: &Client, bucket: &str, table_path: &str, partition: &str,
    file_id: &str, commit_time: &str, data: Vec<u8>
) -> Result<(), aws_sdk_s3::Error> {
    // The filename structure is critical for Hudi.
    // .[fileId]_[writeToken]_[baseCommitTime].log.[version]
    let key = format!(
        "{}/{}/.{}_{}_0_{}.log.1",
        table_path, partition, file_id, "0-0-1", commit_time
    );
    client.put_object().bucket(bucket).key(key).body(ByteStream::from(data)).send().await?;
    Ok(())
}

// ... more helper functions for `generate_commit_metadata` and `complete_commit` ...
// `complete_commit` involves writing a JSON structure to the final .commit file
// and deleting the .inflight file. This must be atomic from Hudi's perspective.

This Rust code is optimized for one thing: taking a batch of data and efficiently writing it to S3 in the format Hudi expects. It does no business logic, only mechanical transformation and I/O, making it a perfect offload target.

The Conductor: The Elixir Ingestion Gateway

Back in the Elixir world, we build the fault-tolerant supervisor and worker system.

mix.exs:

def deps do
  [
    {:phoenix, "~> 1.7.0"},
    {:grpc, "~> 0.6.2"},
    {:protobuf, "~> 0.10.0"},
    {:jason, "~> 1.2"}
  ]
end

The supervision tree is key. We use a DynamicSupervisor to manage a GenServer for each connected device.

lib/device_ingestion/supervisor.ex:

defmodule DeviceIngestion.Supervisor do
  use DynamicSupervisor

  def start_link(init_arg) do
    DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end

  def start_worker(device_id) do
    spec = {DeviceIngestion.Worker, device_id}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end
end

The worker itself buffers messages and communicates with a central batcher to avoid making too many small gRPC calls. A common mistake is to have every worker call the gRPC service directly, which creates massive connection churn and loses the benefit of batching.

lib/device_ingestion/worker.ex:

defmodule DeviceIngestion.Worker do
  use GenServer
  require Logger

  # Time in ms to wait before flushing buffer
  @flush_interval 3000
  # Max records to buffer before forcing a flush
  @buffer_size 100

  def start_link(device_id) do
    GenServer.start_link(__MODULE__, device_id, name: via_tuple(device_id))
  end

  @impl true
  def init(device_id) do
    Logger.info("Starting worker for device: #{device_id}")
    # Start a timer to periodically flush the buffer
    Process.send_after(self(), :flush, @flush_interval)
    {:ok, %{device_id: device_id, buffer: []}}
  end

  # Public API to handle incoming sensor data
  def handle_reading(device_id, reading) do
    GenServer.cast(via_tuple(device_id), {:reading, reading})
  end

  @impl true
  def handle_cast({:reading, reading}, state) do
    new_buffer = [reading | state.buffer]
    
    if length(new_buffer) >= @buffer_size do
      # Buffer is full, flush immediately
      flush_buffer(new_buffer)
      {:noreply, %{state | buffer: []}}
    else
      {:noreply, %{state | buffer: new_buffer}}
    end
  end

  @impl true
  def handle_info(:flush, state) do
    flush_buffer(state.buffer)
    # Reset the timer
    Process.send_after(self(), :flush, @flush_interval)
    {:noreply, %{state | buffer: []}}
  end

  defp flush_buffer([]), do: :ok
  defp flush_buffer(buffer) do
    # In a real system, we'd send to a Batcher process
    # to aggregate records from multiple devices into a single gRPC call.
    # For simplicity, we call the client directly here.
    DeviceIngestion.HudiClient.write_records(buffer)
  end

  defp via_tuple(device_id), do: {:via, Registry, {DeviceIngestion.Registry, device_id}}
end

Finally, the gRPC client module in Elixir handles the communication with the Rust service. Connection management and backoff/retry logic are critical for a production system.

lib/device_ingestion/hudi_client.ex:

defmodule DeviceIngestion.HudiClient do
  require Logger

  @grpc_host "localhost" # From config in production
  @grpc_port 50051

  def write_records(records) do
    with {:ok, channel} <- GRPC.Stub.connect("#{@grpc_host}:#{@grpc_port}"),
         request <- build_request(records) do
      
      case Ingester.HudiWriter.Stub.write_batch(channel, request, timeout: 5000) do
        {:ok, response} ->
          Logger.info("Successfully wrote #{response.records_written} records. Commit: #{response.commit_time}")
          :ok
        {:error, %GRPC.Status{code: code, message: msg}} ->
          Logger.error("gRPC call failed. Code: #{code}, Message: #{msg}")
          # In production, this error should trigger a circuit breaker
          # or a more sophisticated retry strategy with exponential backoff.
          :error
      end
    else
      # Handle connection errors
      {:error, reason} ->
        Logger.error("Failed to connect to gRPC service: #{inspect(reason)}")
        :error
    end
  end

  defp build_request(records) do
    # Logic to transform Elixir maps into Protobuf `Record` messages
    # This involves mapping Elixir types to the `Value` oneof field.
    proto_records = Enum.map(records, fn record ->
      # ... conversion logic ...
      %Ingester.Record{
        hudi_record_key: record.id,
        hudi_partition_path: record.partition,
        hudi_precombine_key: record.timestamp,
        payload: %{"sensor_a" => %Ingester.Value{double_value: record.value}}
      }
    end)

    %Ingester.WriteRequest{
      request_id: UUID.uuid4(),
      records: proto_records
    }
  end
end

The result is a highly scalable and resilient system. The Elixir layer can comfortably manage millions of connections with minimal resources, while the Rust layer provides a high-performance, memory-safe data serialization engine. The entire system deployed on AWS EKS allows us to scale the Elixir and Rust pods independently based on their specific loads—I/O bound vs. CPU bound. Our analytical queries on Athena against the Hudi table now return in seconds instead of minutes or hours, and our ingestion latency is consistently under 100ms.

This architecture is not without its own set of challenges. The Merge-on-Read table requires a separate, well-managed compaction process. We currently run a scheduled Spark job, but this can cause temporary query performance degradation during the compaction window. A future iteration will likely involve a continuous compaction service to smooth this out. Furthermore, our manual implementation of the Hudi commit logic in Rust is a technical debt. As the Hudi specification evolves, keeping this custom implementation in sync will be a maintenance burden. We are actively exploring contributing to emerging native Rust Hudi writers to eventually replace this component. Finally, managing schema evolution across three boundaries (Device -> Elixir -> Rust) requires a robust solution like a centralized schema registry, which is the next major feature on our roadmap.


  TOC