Constructing an Interactive Domain Event Simulation Platform with Jupyter and a Consul-Secured SQS Backbone


The post-mortem of the Q3 revenue miss was brutal. A subtle bug in our Inventory service’s optimistic concurrency logic, triggered only under a specific sequence of OrderPlaced and ShipmentReturned events, had silently corrupted stock levels for high-velocity products. Our unit tests were green. Our integration tests, which ran against a sanitized, smaller dataset, never simulated this specific collision of events. The core problem wasn’t a lack of testing, but a failure of imagination. We couldn’t effectively model and validate complex, stateful, asynchronous business workflows before they hit production. The cost was a multi-million dollar write-off.

This incident crystallized our primary technical pain point: the validation of cross-service business logic in an event-driven architecture is fundamentally broken. Staging environments drift, integration tests are slow and brittle, and worst of all, there’s a chasm between the domain experts who understand the edge cases and the engineers writing the tests. We needed a “flight simulator” for our business domain—an interactive environment where we could replay, manipulate, and generate domain event streams to probe the behavior of our services under controlled, observable conditions.

Our initial concept was to build an internal platform centered around Jupyter notebooks. Domain experts and developers could collaboratively author “simulation scenarios” in Python, defining sequences of domain events and assertions about the expected outcomes. This required a robust, secure, and production-faithful way to inject these events into our microservices ecosystem and observe the results.

The technology selection process was guided by pragmatism, aiming to leverage our existing stack where possible.

  1. Domain-Driven Design (DDD): Our entire architecture is built on DDD principles. Bounded Contexts like Ordering, Inventory, and Shipping communicate via Domain Events. This existing vocabulary was the perfect foundation. The events themselves (OrderPlaced, ItemsReserved, etc.) would be the atomic units of our simulations.
  2. AWS SQS: As our production message bus, SQS was the natural choice for the event transport layer. Using it maintains fidelity with production behavior, including its at-least-once delivery semantics and lack of guaranteed ordering for standard queues—a “feature” we needed our services to handle correctly anyway. We would provision dedicated “simulation” queues to isolate this traffic.
  3. Consul Connect: The simulation platform would need to interact with services running in our staging environment. Opening up our staging network to a central Jupyter Hub was a non-starter from a security perspective. Consul Connect provides a zero-trust solution. By running a Consul agent alongside our services and the Jupyter kernel, we could create a secure service mesh with automatic, mutual TLS (mTLS), ensuring only authenticated and authorized components could communicate.
  4. Jupyter: Its interactive, multi-language nature and rich data science ecosystem made it the ideal control plane. Python, with boto3, provides first-class AWS integration. Notebooks allow us to mix code, documentation (in Markdown), and eventually, visualizations of the simulation results.

The architecture we settled on looks like this:

graph TD
    subgraph "Interactive Control Plane"
        Jupyter[Jupyter Notebook Server]
        style Jupyter fill:#f9f,stroke:#333,stroke-width:2px
    end

    subgraph "AWS"
        SQS_Order["SQS: order-events-sim"]
        SQS_Inventory["SQS: inventory-events-sim"]
        DynamoDB["DynamoDB: inventory-read-model"]
    end

    subgraph "Staging Environment (Consul Service Mesh)"
        Service_Ordering[Ordering Service]
        Proxy_Ordering[Consul Proxy]

        Service_Inventory[Inventory Service]
        Proxy_Inventory[Consul Proxy]
    end

    Jupyter -- "boto3: SendMessage" --> SQS_Order
    Service_Ordering -- "Consumes" --> SQS_Order
    Proxy_Ordering <--> Service_Ordering

    Service_Ordering -- "Publishes Event" --> SQS_Inventory
    Service_Inventory -- "Consumes" --> SQS_Inventory
    Proxy_Inventory <--> Service_Inventory

    Service_Inventory -- "Updates Read Model" --> DynamoDB
    Jupyter -- "boto3: Query" --> DynamoDB

The implementation journey began with defining the domain contracts.

Step 1: Codifying the Domain

Our services are written in Go, but the simulation logic in Python needs to speak the same language. We established a simple, language-agnostic JSON structure for our domain events.

Here is the core OrderPlaced event definition in Go, within the Ordering Bounded Context.

// pkg/ordering/domain/events.go
package domain

import (
	"encoding/json"
	"time"
)

// EventMetadata holds common fields for all domain events.
type EventMetadata struct {
	EventID   string    `json:"eventId"`
	EventType string    `json:"eventType"`
	Timestamp time.Time `json:"timestamp"`
	Version   string    `json:"version"`
}

// OrderItem represents a single item within an order.
type OrderItem struct {
	SKU      string  `json:"sku"`
	Quantity int     `json:"quantity"`
	Price    float64 `json:"price"`
}

// OrderPlaced is published when a new order is successfully created.
type OrderPlaced struct {
	Metadata  EventMetadata `json:"metadata"`
	OrderID   string        `json:"orderId"`
	CustomerID string        `json:"customerId"`
	Items     []OrderItem   `json:"items"`
}

// ToJSON serializes the event to a JSON string for publishing.
// In a real-world project, this would include robust error handling.
func (e *OrderPlaced) ToJSON() string {
	bytes, err := json.Marshal(e)
	if err != nil {
		// Production code would have structured logging and panic recovery.
		// For this example, we panic to halt on a critical serialization error.
		panic("failed to marshal OrderPlaced event: " + err.Error())
	}
	return string(bytes)
}

The corresponding service in the Inventory Bounded Context would consume this event and, upon success, publish its own ItemsReserved event. This chain of events is precisely what we need to simulate and verify.

Step 2: Provisioning the SQS Backbone with Terraform

Isolating simulation traffic is paramount. We used Terraform to provision dedicated SQS queues and a DynamoDB table to act as our verifiable read model. A common pitfall here is neglecting Dead Letter Queues (DLQs). For a simulation environment, DLQs are even more critical, as they capture events that failed processing, providing a clear signal of a bug in the service logic.

# terraform/aws_infra.tf

variable "aws_region" {
  description = "The AWS region to deploy resources in."
  type        = string
  default     = "us-east-1"
}

variable "tag_prefix" {
  description = "Prefix for resource tags."
  type        = string
  default     = "domain-sim"
}

resource "aws_sqs_queue" "order_events_sim_dlq" {
  name = "${var.tag_prefix}-order-events-dlq"
}

resource "aws_sqs_queue" "order_events_sim" {
  name                       = "${var.tag_prefix}-order-events"
  delay_seconds              = 0
  max_message_size           = 262144 // 256 KB
  message_retention_seconds  = 345600 // 4 days
  visibility_timeout_seconds = 60

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.order_events_sim_dlq.arn
    maxReceiveCount     = 5
  })

  tags = {
    Environment = "simulation"
    Project     = "DomainSimulator"
  }
}

// ... similar resources for inventory-events-sim queue ...

resource "aws_dynamodb_table" "inventory_read_model" {
  name         = "${var.tag_prefix}-inventory-read-model"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "sku"

  attribute {
    name = "sku"
    type = "S"
  }

  tags = {
    Environment = "simulation"
    Project     = "DomainSimulator"
  }
}

This IaC approach ensures our simulation infrastructure is repeatable, version-controlled, and can be spun up or torn down on demand.

Step 3: Securing the Services with Consul Connect

This is where the architecture gets interesting. Our InventoryService needs to be part of the mesh. It’s a simple Go application that listens for SQS messages.

First, the core logic of the service. It polls SQS, processes OrderPlaced messages, updates the DynamoDB read model, and publishes an ItemsReserved event.

// cmd/inventoryservice/main.go
package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"

    // AWS SDK and internal domain packages
)

// SQSMessageHandler defines the core business logic for processing a message.
func SQSMessageHandler(ctx context.Context, msg *sqs.Message) error {
    log.Printf("Received message: %s", *msg.MessageId)

    var event domain.OrderPlaced
    if err := json.Unmarshal([]byte(*msg.Body), &event); err != nil {
        log.Printf("ERROR: Failed to unmarshal message body: %v", err)
        // Returning nil so the message is deleted from the queue.
        // In a real system, this might go to a DLQ if the message is truly malformed.
        return nil
    }

    // A real implementation would involve transactional updates,
    // robust concurrency control, and domain logic.
    log.Printf("Processing OrderPlaced for OrderID: %s", event.OrderID)
    for _, item := range event.Items {
        // Here we would interact with our database to reserve stock.
        // For simulation, we update the DynamoDB read model.
        err := updateInventoryReadModel(ctx, item.SKU, item.Quantity)
        if err != nil {
            log.Printf("ERROR: Failed to update read model for SKU %s: %v", item.SKU, err)
            // Returning error will make the message visible again after timeout.
            return err
        }
    }

    // Publish follow-up event (e.g., ItemsReserved) to another SQS queue
    // ... implementation omitted for brevity ...

    log.Printf("Successfully processed message %s", *msg.MessageId)
    return nil
}

func main() {
    // Standard setup for AWS SDK, configuration, etc.
    // ...

    // Main loop for polling SQS
    // ... code that uses the AWS SDK to receive messages and passes them to SQSMessageHandler ...

    // Graceful shutdown logic
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
    log.Println("Shutting down service...")
}

// updateInventoryReadModel is a helper to interact with DynamoDB.
// ... implementation omitted ...

To integrate this service into the mesh, we define a Consul service configuration file. The key part is the connect stanza, which tells Consul to inject a sidecar proxy.

# consul/inventory-service.hcl
service {
  name = "inventory-service"
  port = 8080 // The application's actual port, though it's not exposed directly.

  connect {
    sidecar_service {}
  }

  // Health check is crucial for service discovery.
  // For a message-driven service, this could be a simple TCP check
  // or an HTTP endpoint that reports internal status.
  check {
    id       = "service-health"
    name     = "TCP Health Check"
    tcp      = "localhost:8080"
    interval = "10s"
    timeout  = "2s"
  }
}

When we run the service, we start the Consul agent first, then the application itself.

# Terminal 1: Start Consul Agent
$ consul agent -dev -node=inventory-node -config-file=consul/inventory-service.hcl

# Terminal 2: Start the Go Application
# The proxy will run on a random high port and forward traffic to 8080.
# The application itself is not exposed to the network.
$ go run cmd/inventoryservice/main.go

The service doesn’t need to know about the proxy. It listens on localhost:8080 as usual. The Consul proxy intercepts all inbound and outbound traffic, enforces mTLS, and manages connections based on intentions defined in Consul.

Step 4: The Jupyter Control Plane

This is the interactive heart of the platform. We create a custom Docker image for our Jupyter server that includes the consul binary, Python dependencies (boto3, jupyterlab), and a startup script.

# docker/jupyter/Dockerfile
FROM jupyter/base-notebook:latest

USER root

# Install AWS CLI, HashiCorp repo, and Consul
RUN apt-get update && apt-get install -y \
    curl \
    unzip \
    gnupg \
    software-properties-common \
    && curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" \
    && unzip awscliv2.zip \
    && ./aws/install \
    && curl -fsSL https://apt.releases.hashicorp.com/gpg | apt-key add - \
    && apt-add-repository "deb [arch=amd64] https://apt.releases.hashicorp.com $(lsb_release -cs) main" \
    && apt-get update && apt-get install consul \
    && rm -rf /var/lib/apt/lists/*

# Switch back to the notebook user
USER ${NB_UID}

# Install Python libraries
RUN pip install --no-cache-dir \
    boto3 \
    pandas

# Add a startup script
COPY start-jupyter.sh /usr/local/bin/start-jupyter.sh
RUN chmod +x /usr/local/bin/start-jupyter.sh

ENTRYPOINT ["/usr/local/bin/start-jupyter.sh"]

The start-jupyter.sh script is critical. It starts the Consul agent in client mode before launching JupyterLab. This makes the Jupyter environment a first-class citizen of the service mesh.

# docker/jupyter/start-jupyter.sh
#!/bin/bash

# Start consul agent in the background.
# It joins an existing Consul cluster using the CONSUL_HTTP_ADDR env var.
echo "Starting Consul agent..."
consul agent -config-file=/home/jovyan/consul-config/jupyter-service.hcl &

# Wait a moment for consul to initialize
sleep 5

# Start the Jupyter Lab server
echo "Starting JupyterLab..."
start-notebook.sh "$@"

The corresponding Consul service definition for Jupyter is simple:

# consul-config/jupyter-service.hcl
service {
  name = "jupyter-control-plane"
}

Now, within a notebook, we can write Python code to orchestrate simulations. We abstract the boilerplate into a helper class.

# notebooks/utils/simulator.py
import boto3
import json
import uuid
import time
from datetime import datetime, timezone

class DomainEventSimulator:
    """A client for injecting domain events into SQS for simulation."""

    def __init__(self, region_name: str):
        """
        Initializes the simulator client.
        Assumes AWS credentials are configured via environment variables or IAM roles.
        """
        self.sqs = boto3.client("sqs", region_name=region_name)
        self.dynamodb = boto3.resource("dynamodb", region_name=region_name)
        print(f"Simulator initialized for region {region_name}")

    def _get_queue_url(self, queue_name: str) -> str:
        """Helper to resolve queue name to URL."""
        try:
            response = self.sqs.get_queue_url(QueueName=queue_name)
            return response['QueueUrl']
        except self.sqs.exceptions.QueueDoesNotExist:
            print(f"ERROR: SQS Queue '{queue_name}' does not exist.")
            raise

    def publish_event(self, queue_name: str, event_payload: dict):
        """
        Publishes a single domain event to the specified SQS queue.
        Adds standard metadata to the event.
        """
        queue_url = self._get_queue_url(queue_name)

        # Enrich with metadata
        event_payload['metadata'] = {
            'eventId': str(uuid.uuid4()),
            'timestamp': datetime.now(timezone.utc).isoformat(),
            'version': '1.0',
            'source': 'JupyterDomainSimulator'
        }
        
        message_body = json.dumps(event_payload)

        try:
            response = self.sqs.send_message(
                QueueUrl=queue_url,
                MessageBody=message_body
            )
            print(f"Successfully sent event. MessageId: {response['MessageId']}")
            return response['MessageId']
        except Exception as e:
            print(f"FATAL: Failed to send message to SQS: {e}")
            raise

    def get_inventory_level(self, table_name: str, sku: str) -> dict | None:
        """Polls the inventory read model for the state of a given SKU."""
        table = self.dynamodb.Table(table_name)
        try:
            response = table.get_item(Key={'sku': sku})
            return response.get('Item')
        except Exception as e:
            print(f"ERROR: Failed to query DynamoDB table {table_name}: {e}")
            return None

A complete simulation scenario in a notebook now becomes clear and expressive.

# notebooks/scenarios/race_condition_test.ipynb

# Cell 1: Setup
from utils.simulator import DomainEventSimulator
import time

# Configuration
AWS_REGION = "us-east-1"
ORDER_QUEUE = "domain-sim-order-events"
INVENTORY_TABLE = "domain-sim-inventory-read-model"
SKU_TO_TEST = "PROD-12345"

# Initialize our simulation client
sim = DomainEventSimulator(AWS_REGION)

# --- Scenario Definition ---
# Test what happens when two large orders for the same SKU arrive concurrently.

# Cell 2: Define the events
order1_payload = {
    "eventType": "OrderPlaced",
    "orderId": "sim-order-001",
    "customerId": "cust-abc",
    "items": [{"sku": SKU_TO_TEST, "quantity": 75, "price": 10.0}]
}

order2_payload = {
    "eventType": "OrderPlaced",
    "orderId": "sim-order-002",
    "customerId": "cust-def",
    "items": [{"sku": SKU_TO_TEST, "quantity": 50, "price": 10.0}]
}


# Cell 3: Execute the simulation
print("--- Starting Simulation: Race Condition Test ---")
print(f"Publishing two concurrent orders for SKU: {SKU_TO_TEST}")

sim.publish_event(ORDER_QUEUE, order1_payload)
sim.publish_event(ORDER_QUEUE, order2_payload)

print("\nEvents published. Waiting for processing...")

# Cell 4: Verification
# In a real-world scenario, verification logic can be complex.
# We might need to poll for a certain state or wait for a resulting event.
# For simplicity, we poll the read model with a timeout.
max_wait_seconds = 30
start_time = time.time()
final_stock = None

while time.time() - start_time < max_wait_seconds:
    inventory_state = sim.get_inventory_level(INVENTORY_TABLE, SKU_TO_TEST)
    if inventory_state and inventory_state.get('reservedCount') == 125: # 75 + 50
        print("\n✅ VERIFICATION PASSED!")
        print(f"Final state for {SKU_TO_TEST}: {inventory_state}")
        final_stock = inventory_state
        break
    time.sleep(2)

if not final_stock:
    print("\n❌ VERIFICATION FAILED!")
    print(f"Timed out after {max_wait_seconds}s. Final state observed:")
    print(sim.get_inventory_level(INVENTORY_TABLE, SKU_TO_TEST))

print("\n--- Simulation Complete ---")

This setup provides an incredibly powerful feedback loop. We can now codify the complex scenario that caused our production outage, add it to a library of regression tests, and run it with a single click before every deployment of the Inventory service. The domain expert who first identified the potential issue can now work with an engineer to write the notebook, validating their understanding against running code.

This platform is not without its limitations and required significant initial investment. The state management of services under test is a major challenge; each simulation run should ideally start from a known, clean state. Our current solution involves manual scripts to reset the DynamoDB table, but a more robust implementation would involve database snapshotting or ephemeral, containerized databases for each test run. Furthermore, the fidelity of the simulation is bound by the fidelity of the staging environment. Any divergence in configuration, data, or dependent service versions between staging and production represents a potential blind spot.

The next logical iteration is to build more sophisticated assertion and orchestration capabilities into the DomainEventSimulator library, potentially creating a DSL for defining event choreography and state-based assertions. We are also exploring ways to automatically capture and anonymize production event streams to generate more realistic simulation scenarios, moving us from manually crafted tests towards a more continuous, data-driven validation model.


  TOC