Declarative Provisioning of a Heterogeneous Real-Time Event Pipeline


The initial system was a single, monolithic Java application tasked with an impossible combination of responsibilities: ingesting a high-throughput stream of telemetry events, fanning them out to real-time dashboards, and performing complex, stateful analytical processing on the same data. During peak load, the HTTP ingestion threads would starve the analytics background jobs, causing processing lags. Conversely, a heavy analytics cycle would spike CPU and GC pauses, increasing ingestion latency and leading to dropped events. This operational friction forced a decomposition, not just into services, but into specialized runtimes, each chosen for a specific performance characteristic. The challenge then became managing this polyglot environment without succumbing to infrastructural chaos.

The architectural hypothesis was straightforward: a pipeline of specialized components.

  1. Ingestion Tier: A stateless, brutally efficient service to accept incoming HTTP requests, perform minimal validation, and immediately hand off the payload. The only metrics that matter here are requests per second, p99 latency, and memory footprint. Rust, with its compile-time guarantees and the Actix-web framework, was the clear contender.
  2. Distribution Tier: A service capable of maintaining tens of thousands of persistent, low-latency connections (WebSockets) for real-time subscribers. This is a concurrency and state-management problem. The BEAM VM, via Elixir and its battle-tested OTP framework, is purpose-built for this exact scenario. Its lightweight process model and fault-tolerance primitives are non-negotiable advantages.
  3. Processing Tier: A robust service for durable, at-least-once processing of events. This requires a mature ecosystem for database interaction, complex business logic, and reliable transaction management. The existing team’s expertise and the vast library support made Java, specifically with Spring Boot, the pragmatic choice.
  4. Infrastructure Glue: Defining, deploying, and connecting these disparate services, along with their networking, messaging bus (RabbitMQ), and database (PostgreSQL), demanded an Infrastructure as Code solution. We opted for Pulumi over alternatives like Terraform primarily for its use of general-purpose programming languages. Defining infrastructure in Java allowed us to create higher-level abstractions and share code patterns with the Java processing service team, reducing context switching.

This is the overall flow we set out to build:

graph TD
    subgraph "Public Internet"
        A[Event Producers]
    end

    subgraph "AWS VPC (Managed by Pulumi)"
        B(ALB)
        C[Actix-web Ingest Service]
        D{RabbitMQ Cluster}
        E[Elixir Fan-out Service]
        F[Java Processing Service]
        G[(RDS PostgreSQL)]
        H[Real-time Subscribers]

        A -->|HTTPS| B
        B -->|HTTP| C
        C -->|AMQP Publish| D
        D -- "events.raw queue" --> E
        D -- "events.raw queue" --> F
        E -->|WebSocket| H
        F -->|SQL Writes| G
    end

The entire stack, from the VPC to the individual service deployment configurations, is managed within a single Pulumi project written in Java. This provides a unified source of truth for an otherwise dangerously fragmented system.

The Pulumi Foundation: A Unified Infrastructure Definition

A common pitfall in polyglot systems is that infrastructure definition becomes a collection of disparate scripts and manual configurations. Using Pulumi with Java allowed us to build a cohesive model. The entry point defines the foundational components: networking, the message broker, and the database.

// Pulumi.java
import com.pulumi.Pulumi;
import com.pulumi.aws.ec2.Ec2Functions;
import com.pulumi.aws.ec2.SecurityGroup;
import com.pulumi.aws.ec2.SecurityGroupArgs;
import com.pulumi.aws.ec2.Vpc;
import com.pulumi.aws.ec2.VpcArgs;
import com.pulumi.aws.ec2.inputs.GetSubnetsArgs;
import com.pulumi.aws.rds.Instance;
import com.pulumi.aws.rds.InstanceArgs;
import com.pulumi.aws.ecr.Repository;
import com.pulumi.awsx.ecr.Image;
import com.pulumi.awsx.ecr.ImageArgs;

import java.util.List;
import java.util.Map;

public class App {
    public static void main(String[] args) {
        Pulumi.run(ctx -> {
            // 1. Core Networking
            var vpc = new Vpc("app-vpc", VpcArgs.builder()
                .cidrBlock("10.0.0.0/16")
                .enableDnsHostnames(true)
                .enableDnsSupport(true)
                .build());

            var publicSubnets = Ec2Functions.getSubnets(GetSubnetsArgs.builder()
                .filter(Map.of("name", "vpc-id", "values", List.of(vpc.id())))
                .build()).ids();

            var appSg = new SecurityGroup("app-sg", SecurityGroupArgs.builder()
                .vpcId(vpc.id())
                .description("Allow all internal traffic and HTTP from anywhere")
                .ingress(
                    SecurityGroupIngressArgs.builder()
                        .protocol("tcp")
                        .fromPort(80)
                        .toPort(80)
                        .cidrBlocks("0.0.0.0/0")
                        .build(),
                    SecurityGroupIngressArgs.builder()
                        .protocol("-1")
                        .fromPort(0)
                        .toPort(0)
                        .self(true)
                        .build()
                )
                .egress(SecurityGroupEgressArgs.builder()
                    .protocol("-1")
                    .fromPort(0)
                    .toPort(0)
                    .cidrBlocks("0.0.0.0/0")
                    .build())
                .build());

            // 2. RabbitMQ Broker (Simplified using an existing AMI for this example)
            // In a real-world project, this would be a managed service or a custom component resource.
            // For brevity, we'll assume it exists and we have its connection string.
            var rabbitMqConnectionString = "amqp://user:[email protected]:5672";

            // 3. RDS Database for the Java Processor
            var dbPassword = new RandomPassword("db-password", RandomPasswordArgs.builder()
                .length(16)
                .special(true)
                .build());

            var db = new Instance("app-db", InstanceArgs.builder()
                .engine("postgres")
                .instanceClass("db.t3.micro")
                .allocatedStorage(20)
                .dbSubnetGroupName(new SubnetGroup("db-subnet-group", SubnetGroupArgs.builder()
                    .subnetIds(publicSubnets) // For simplicity; use private subnets in production.
                    .build()).name())
                .vpcSecurityGroupIds(appSg.id().applyValue(List::of))
                .username("processor")
                .password(dbPassword.result())
                .skipFinalSnapshot(true)
                .build());
                
            // Each service will be defined below, referencing these core resources.
            // ... service definitions ...

            ctx.export("vpcId", vpc.id());
            ctx.export("databaseEndpoint", db.endpoint());
        });
    }
}

The critical decision here was to centralize the creation of shared resources like the VPC and security groups. Each microservice deployment can then reference these outputs, ensuring they are deployed into the correct network context without duplicating definitions.

Tier 1: The Actix-web Ingestion Endpoint in Rust

The Rust service’s main.rs is lean by design. Its sole purpose is to deserialize JSON, validate a key field, and publish to RabbitMQ. We use lapin for AMQP communication and tokio as the runtime.

// ingest-service/src/main.rs
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use lapin::{
    options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties,
};
use serde::{Deserialize, Serialize};
use std::env;
use std::sync::Arc;
use tokio::sync::Mutex;
use log::{info, error};

#[derive(Deserialize, Serialize, Debug)]
struct TelemetryEvent {
    device_id: String,
    timestamp: u64,
    payload: serde_json::Value,
}

// A simple connection wrapper to handle reconnects, a production necessity.
struct AmqpClient {
    conn: Connection,
}

impl AmqpClient {
    async fn connect() -> Result<Self, lapin::Error> {
        let addr = env::var("RABBITMQ_URL").expect("RABBITMQ_URL must be set");
        let conn = Connection::connect(&addr, ConnectionProperties::default()).await?;
        info!("Successfully connected to RabbitMQ");
        Ok(Self { conn })
    }

    async fn get_channel(&self) -> Result<lapin::Channel, lapin::Error> {
        self.conn.create_channel().await
    }
}

async fn ingest_event(
    event: web::Json<TelemetryEvent>,
    amqp_client: web::Data<Arc<Mutex<AmqpClient>>>,
) -> impl Responder {
    let body = match serde_json::to_string(&*event) {
        Ok(body) => body,
        Err(_) => return HttpResponse::BadRequest().finish(),
    };

    let client = amqp_client.lock().await;

    // The pitfall here is not handling channel/connection errors gracefully.
    // A production implementation would have a robust retry/reconnect loop.
    let channel = match client.get_channel().await {
        Ok(ch) => ch,
        Err(e) => {
            error!("Failed to create AMQP channel: {}", e);
            // Attempt to reconnect in a background task might be a strategy.
            return HttpResponse::ServiceUnavailable().body("Broker connection failed");
        }
    };

    let exchange_name = "events.direct";
    let routing_key = "events.raw";

    // Ensure exchange exists. This is idempotent.
    if let Err(e) = channel.exchange_declare(
        exchange_name,
        lapin::ExchangeKind::Direct,
        ExchangeDeclareOptions::default(),
        FieldTable::default()
    ).await {
        error!("Failed to declare exchange: {}", e);
        return HttpResponse::InternalServerError().body("Failed to declare AMQP exchange");
    }

    let confirm = channel
        .basic_publish(
            exchange_name,
            routing_key,
            BasicPublishOptions::default(),
            body.as_bytes().to_vec(),
            BasicProperties::default().with_content_type("application/json".into()),
        )
        .await;

    match confirm {
        Ok(publisher_confirm) => {
            // Wait for broker confirmation. This is crucial for at-least-once delivery.
            match publisher_confirm.await {
                Ok(_) => HttpResponse::Accepted().finish(),
                Err(e) => {
                    error!("Failed to receive publish confirmation: {}", e);
                    HttpResponse::InternalServerError().body("Broker did not confirm message")
                }
            }
        },
        Err(e) => {
            error!("Failed to publish message: {}", e);
            HttpResponse::InternalServerError().body("Failed to publish to broker")
        }
    }
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    env_logger::init();

    // The AMQP client is shared across all worker threads.
    let amqp_client = AmqpClient::connect().await.expect("Failed to connect to RabbitMQ at startup");
    let amqp_client_arc = Arc::new(Mutex::new(amqp_client));

    info!("Starting ingest server on 0.0.0.0:8080");

    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(amqp_client_arc.clone()))
            .route("/ingest", web::post().to(ingest_event))
    })
    .bind("0.0.0.0:8080")?
    .run()
    .await
}

The corresponding Dockerfile is minimal:

# Dockerfile for ingest-service
FROM rust:1.73-slim-buster as builder

WORKDIR /usr/src/ingest-service
COPY . .
RUN cargo install --path .

FROM debian:buster-slim
RUN apt-get update && apt-get install -y libssl-dev ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /usr/local/cargo/bin/ingest-service /usr/local/bin/ingest-service
EXPOSE 8080
CMD ["ingest-service"]

Deploying this service with Pulumi involves building the Docker image, pushing it to ECR, and defining an ECS service.

// Inside Pulumi.java's main method
// ... after shared resources are defined ...

// 4. Actix-web Ingest Service Definition
var ingestRepo = new Repository("ingest-repo");

var ingestImage = new Image("ingest-image", ImageArgs.builder()
    .repositoryUrl(ingestRepo.repositoryUrl())
    .path("./ingest-service") // Path to the Rust project
    .build());

// This is a custom component resource we'd write to abstract ECS Fargate service creation.
// It simplifies creating the task definition, service, IAM roles, and load balancer integration.
var ingestService = new FargateService("ingest-service", FargateServiceArgs.builder()
    .clusterArn(cluster.arn()) // Assuming an ECS cluster was created
    .vpcId(vpc.id())
    .subnetIds(publicSubnets)
    .securityGroupIds(List.of(appSg.id()))
    .taskDefinitionArgs(TaskDefinitionArgs.builder()
        .image(ingestImage.imageUri())
        .port(8080)
        .cpu(256)
        .memory(512)
        .environment(Map.of("RABBITMQ_URL", rabbitMqConnectionString))
        .build())
    .desiredCount(2)
    .build());

ctx.export("ingestServiceUrl", ingestService.getLoadBalancerUrl());

The first major hurdle we encountered was broker unavailability. If RabbitMQ went down, the naive implementation would block and eventually timeout, causing cascading failures. The solution was to implement publisher confirms (publisher_confirm.await) and a timeout. If the confirmation doesn’t arrive within a reasonable period (e.g., 2 seconds), the request fails with a 503 Service Unavailable, signaling to the upstream client to retry. This prevents the ingestion service from becoming a bottleneck itself.

Tier 2: The Elixir Fan-out Service

The Elixir application’s role is to consume from RabbitMQ and broadcast to WebSocket clients managed by Phoenix Channels. We use the amqp library for RabbitMQ consumption.

# fanout-service/lib/fanout_service/event_consumer.ex
defmodule FanoutService.EventConsumer do
  use GenStage
  require Logger

  alias FanoutServiceWeb.Endpoint

  def start_link(args) do
    GenStage.start_link(__MODULE__, args, name: __MODULE__)
  end

  @impl true
  def init(_args) do
    {:consumer, :no_state,
     subscribe_to: [
       {AMQP.GenConsumer,
        queue: "events.raw.fanout",
        # In a real project, connection details come from config
        connection: {:default, url: System.get_env("RABBITMQ_URL")},
        bindings: [{"events.direct", routing_key: "events.raw"}],
        on_message: &handle_message/2}
     ]}
  end

  defp handle_message(payload, meta) do
    Logger.info("Received event, routing_key: #{meta.routing_key}")
    case Jason.decode(payload) do
      {:ok, %{"device_id" => device_id} = event} ->
        # Broadcast to any client subscribed to this device's topic
        Endpoint.broadcast("device:#{device_id}", "new_event", event)
        :ack
      {:error, _} ->
        Logger.error("Failed to decode event payload.")
        # Reject and do not requeue. It's malformed.
        :reject
    end
  end

  @impl true
  def handle_events(events, _from, state) do
    # This GenStage doesn't process events itself, it just ensures the consumer is running.
    # The AMQP.GenConsumer pushes messages via the callback.
    {:noreply, [], state}
  end
end

The Phoenix Channel setup is standard. A client connects to a WebSocket and subscribes to a topic like "device:12345".

# fanout-service/lib/fanout_service_web/channels/device_channel.ex
defmodule FanoutServiceWeb.DeviceChannel do
  use Phoenix.Channel
  require Logger

  def join("device:" <> device_id, _payload, socket) do
    Logger.info("Client joined channel for device #{device_id}")
    {:ok, socket}
  end

  def handle_in("ping", _payload, socket) do
    {:reply, {:ok, %{status: "pong"}}, socket}
  end
end

The deployment process with Pulumi mirrors the Rust service: build a Docker image from a Dockerfile and deploy it as another FargateService. The key difference is the environment variables, which now include the same RABBITMQ_URL. The primary production issue with this tier was slow consumers. If a WebSocket client was on a poor network, its process mailbox on the server could fill up, consuming memory. BEAM’s per-process memory isolation is a lifesaver here, preventing one bad client from crashing the whole node. We addressed this by monitoring mailbox lengths and forcefully disconnecting clients whose mailboxes grew beyond a certain threshold, a standard OTP practice for protecting system stability.

Tier 3: The Java Processing Service

This service performs the “heavy lifting”. It’s a standard Spring Boot application that listens to the same RabbitMQ exchange but with its own durable queue.

// processing-service/src/main/java/com/example/processor/EventProcessor.java
package com.example.processor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class EventProcessor {

    private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
    private final TelemetryRepository telemetryRepository;

    public EventProcessor(TelemetryRepository telemetryRepository) {
        this.telemetryRepository = telemetryRepository;
    }

    // A common mistake is to not define queues, exchanges, and bindings declaratively
    // in a @Configuration class. The `queues` property here assumes the queue exists.
    @RabbitListener(queues = "events.raw.processor")
    @Transactional // Ensures DB write and message ACK are atomic
    public void processEvent(TelemetryEvent event) {
        log.info("Processing event for device: {}", event.getDeviceId());

        try {
            // Simulate complex business logic and database persistence
            if (event.getPayload().get("critical").asBoolean()) {
                // Perform some intensive calculation or lookup
                Thread.sleep(100); 
            }
            telemetryRepository.save(new TelemetryEntity(event));
            log.info("Successfully persisted event for device: {}", event.getDeviceId());
        } catch (Exception e) {
            log.error("Failed to process event for device: {}. It will be requeued or sent to DLQ.", event.getDeviceId(), e);
            // Let the exception propagate to trigger RabbitMQ's retry/DLQ mechanism.
            throw new RuntimeException("Processing failed", e);
        }
    }
}

The configuration for RabbitMQ is crucial for resilience. We define a Dead Letter Exchange (DLX) so that messages that repeatedly fail processing are routed somewhere for inspection rather than being endlessly requeued.

// processing-service/src/main/java/com/example/processor/RabbitMqConfig.java
package com.example.processor;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

    public static final String EXCHANGE_NAME = "events.direct";
    public static final String QUEUE_NAME = "events.raw.processor";
    public static final String ROUTING_KEY = "events.raw";

    public static final String DLX_NAME = "events.dlx";
    public static final String DLQ_NAME = "events.raw.processor.dlq";

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    DirectExchange deadLetterExchange() {
        return new DirectExchange(DLX_NAME);
    }

    @Bean
    Queue queue() {
        return QueueBuilder.durable(QUEUE_NAME)
            .withArgument("x-dead-letter-exchange", DLX_NAME)
            .withArgument("x-dead-letter-routing-key", ROUTING_KEY)
            .build();
    }

    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(DLQ_NAME).build();
    }
    
    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }

    @Bean
    Binding dlqBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(ROUTING_KEY);
    }
}

The main operational challenge for the Java tier was handling processing spikes. If a batch of complex events arrived, the consumer would slow down, and the events.raw.processor queue would grow. The solution was to implement autoscaling on the ECS service, triggered by the ApproximateNumberOfMessagesVisible metric from the SQS queue (if using SQS) or a custom CloudWatch metric for RabbitMQ’s queue depth. This was configured directly in Pulumi, allowing the infrastructure to react dynamically to application load.

The greatest value of this polyglot, best-tool-for-the-job architecture is performance isolation. A GC pause in the Java service has zero impact on the ingestion latency of the Rust service. A massive influx of WebSocket clients connecting to the Elixir service does not slow down the batch processing. However, this isolation comes at the cost of operational complexity. Managing three different build pipelines, container artifacts, and runtime environments would be untenable without a unifying layer like Pulumi. It provides the single pane of glass and the declarative control plane that turns a collection of disparate services into a cohesive, manageable system.

The current implementation still has limitations. The system’s observability is fragmented; correlating a single event’s journey from the Actix-web endpoint, through Elixir, and into the Java processor’s database record is difficult. A full-fledged distributed tracing solution using OpenTelemetry, with context propagation across AMQP messages, is the necessary next step to achieve true end-to-end visibility. Furthermore, the reliance on a single RabbitMQ cluster in one availability zone remains a significant single point of failure. A future iteration must explore a multi-AZ cluster or a more resilient messaging technology to meet high-availability requirements.


  TOC