Building a Secure Distributed Computation Gateway Using Ray, Consul Connect, and Azure Service Bus


The mandate was clear: expose our internal Ray-based computational workloads to a new web-based UI for interactive analysis. The immediate and non-negotiable blocker was security. Exposing a Ray head node’s client port (or worse, its dashboard) directly to anything beyond a trusted private network is a production incident waiting to happen. The initial suggestions involved complex network ACLs, jump boxes, and VPN tunnels—all brittle, operationally intensive solutions that hinder velocity. We needed a programmatic, zero-trust approach to security that could scale with the system.

This led to a design centered on a secure gateway pattern. The architecture would physically and logically isolate the Ray cluster, with all communication brokered through controlled, authenticated, and encrypted channels. Simply securing the entry point wasn’t enough; long-running computations, sometimes lasting hours, demanded an asynchronous communication backbone to prevent client timeouts and provide reliable status updates. The resulting architecture became a synthesis of a service mesh for security, a distributed compute framework for processing, and a message bus for robust, asynchronous communication.

Our final technology stack to solve this was:

  1. FastAPI API Gateway: A lightweight Python service acting as the sole, authenticated entry point for UI requests.
  2. Consul Connect: The service mesh providing service discovery and automatic, transparent mTLS encryption for all backend traffic.
  3. Ray: The distributed computing cluster, now residing within the secure service mesh.
  4. Azure Service Bus: The durable messaging backbone for decoupling the synchronous API request from the asynchronous Ray job execution and results reporting.

Here’s the logical flow we engineered:

sequenceDiagram
    participant UI
    participant API Gateway
    participant Consul Sidecar (Client)
    participant Consul Sidecar (Server)
    participant Ray Head
    participant Ray Worker
    participant Azure Service Bus

    UI ->>+ API Gateway: POST /submit_job (payload)
    API Gateway ->> API Gateway: Validate Request & Generate Job ID
    API Gateway ->>+ Azure Service Bus: Send "Job Created" message
    API Gateway ->>+ Consul Sidecar (Client): ray.init(address='localhost:proxy_port')
    Consul Sidecar (Client) ->>+ Consul Sidecar (Server): Encrypted gRPC (mTLS)
    Consul Sidecar (Server) ->>+ Ray Head: Decrypted gRPC
    Ray Head -->>- Consul Sidecar (Server):
    Consul Sidecar (Server) -->>- Consul Sidecar (Client):
    Consul Sidecar (Client) -->>- API Gateway: Connection established
    API Gateway ->> Consul Sidecar (Client): Submit Ray job(job_id, payload)
    Consul Sidecar (Client) ->> Consul Sidecar (Server): Encrypted Job Submission
    Consul Sidecar (Server) ->> Ray Head:
    Ray Head ->>+ Ray Worker: Schedule Actor Task
    API Gateway -->>- UI: HTTP 202 Accepted (job_id)
    Ray Worker ->> Ray Worker: Execute Computation
    Ray Worker ->>+ Azure Service Bus: Send "Progress: 50%" message
    Ray Worker ->> Ray Worker: Finish Computation
    Ray Worker ->>+ Azure Service Bus: Send "Job Complete" message
    deactivate Ray Worker
    deactivate Azure Service Bus
    deactivate Azure Service Bus

The core challenge was not in using any single one of these components, but in correctly configuring their intersections—specifically, forcing Ray’s internal communication protocol to flow through the Consul Connect sidecar proxies.

Phase 1: Constructing the Service Mesh with Consul

Before any application code, the foundation is the mesh. We deployed Consul agents to every node participating in the system: the API Gateway server, the Ray head node, and all Ray worker nodes.

A typical Consul client agent configuration (/etc/consul.d/consul.hcl) on a Ray worker node looks like this. In a real-world project, this would be managed by configuration management tools like Ansible or baked into a machine image.

// /etc/consul.d/consul.hcl
// Base configuration for a Consul client agent.

data_dir = "/opt/consul"
log_level = "INFO"

// This agent is a client. It does not store state.
server = false

// Address to bind for agent-to-agent communication.
// It's critical this is a reachable IP within the cluster's private network.
bind_addr = "{{ GetInterfaceIP \"eth0\" }}"

// Advertise this address to other cluster members.
advertise_addr = "{{ GetInterfaceIP \"eth0\" }}"

// Join the Consul cluster by specifying one or more existing members.
// In production, use more than one for redundancy.
retry_join = ["10.0.1.10", "10.0.1.11", "10.0.1.12"]

// Enable the Connect feature for this agent.
connect {
  enabled = true
}

// Enable the gRPC listener for xDS, required by modern sidecar proxies like Envoy.
grpc_port = 8502

With agents running, services must be registered. This is where we define the application’s identity within the mesh. For the Ray head node, the service definition is critical. We register ray-head as a service and, most importantly, define a connect stanza declaring it as a mesh-aware service.

Service definition for the Ray Head node (/etc/consul.d/ray-head.json):

{
  "service": {
    "name": "ray-head",
    "id": "ray-head-node-1",
    "port": 6379,
    "tags": ["compute", "python"],
    "address": "10.0.2.5",
    "connect": {
      "sidecar_service": {}
    },
    "checks": [
      {
        "id": "ray-head-port-check",
        "name": "Ray GCS Port TCP",
        "tcp": "10.0.2.5:6379",
        "interval": "10s",
        "timeout": "1s"
      }
    ]
  }
}

A common mistake here is to point health checks at the sidecar proxy port. The check should validate the health of the actual application process (ray-head itself), not the proxy. The proxy’s health is managed by the Consul agent.

Phase 2: Forcing Ray Traffic Through the Mesh

This was the most difficult and least documented part of the integration. Ray is not “service mesh aware.” It doesn’t know how to look up services in Consul. It expects to be given a direct IP address and port. The trick is to manipulate network bindings so that Ray’s only path to communication is through the local Consul sidecar proxy.

First, on the API Gateway’s node, we define our api-gateway service and declare an upstreams block. This tells the local Consul agent to start a listener on a local port that securely forwards traffic to the ray-head service somewhere in the mesh.

API Gateway service definition (/etc/consul.d/api-gateway.json):

{
  "service": {
    "name": "api-gateway",
    "port": 8000,
    "connect": {
      "sidecar_service": {
        "proxy": {
          "upstreams": [
            {
              "destination_name": "ray-head",
              "local_bind_port": 20000
            }
          ]
        }
      }
    }
  }
}

After reloading Consul, a listener appears on the gateway node at 127.0.0.1:20000. Any traffic sent to this port will be automatically wrapped in mTLS, authenticated, authorized via Consul intentions, and forwarded to a healthy ray-head service instance.

Second, we must modify how Ray itself is started. By default, Ray binds to the public network interface to allow workers to connect. We must force it to bind only to the loopback interface (127.0.0.1). This prevents any direct, unproxied connections from the outside. The Consul sidecar, running on the same machine, will listen on the public interface and forward traffic locally to Ray.

On the Ray head node, the startup command changes:

# Start the Consul agent and sidecar proxy in the background
consul agent -config-dir=/etc/consul.d &
consul connect proxy -sidecar-for ray-head-node-1 &

# CRITICAL: Start Ray head, binding ONLY to localhost.
# The Consul sidecar proxy will handle external traffic.
ray start --head \
  --port=6379 \
  --node-ip-address='127.0.0.1' \
  --dashboard-host='0.0.0.0' # Dashboard can be public, but proxied for security if needed.

A similar change is required for Ray workers. They must connect to the head node via its proxied address, not its direct IP. The pitfall here is that Ray workers also need to communicate with each other. For a full mesh implementation, each worker would also be a Consul service, and they would discover the head node via its Consul DNS name or a local upstream proxy. For simplicity in this build, we assumed a shared network where workers could resolve the head node’s IP, but all traffic is still forced through the head node’s sidecar.

Phase 3: The API Gateway and Asynchronous Job Submission

The API gateway is the gatekeeper. It handles authentication (omitted here for brevity, but would use something like OAuth2), validates input, and orchestrates the initial steps of the pipeline. We used FastAPI for its performance and ease of use.

# api_gateway/main.py
import os
import uuid
import logging
from contextlib import asynccontextmanager

import ray
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
from azure.servicebus.aio import ServiceBusClient
from azure.identity.aio import DefaultAzureCredential
from azure.core.exceptions import AzureError
from pybreaker import CircuitBreaker

# --- Configuration ---
# In a real project, this comes from a config file or environment variables.
RAY_CONSUL_PROXY_ADDRESS = "127.0.0.1:20000"
AZURE_SERVICE_BUS_NAMESPACE = os.environ.get("AZURE_SERVICE_BUS_NAMESPACE")
AZURE_JOB_QUEUE_NAME = "job-submissions"

# --- Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- State & Clients ---
# Use a dictionary to hold state that needs to be initialized on startup.
app_state = {}

# Circuit breaker to protect against a failing Ray cluster connection.
# If 5 consecutive calls to ray.init fail, it will open for 60 seconds.
ray_breaker = CircuitBreaker(fail_max=5, reset_timeout=60)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup logic
    logging.info("API Gateway starting up...")
    
    # Use Azure Managed Identity for secure, passwordless authentication
    credential = DefaultAzureCredential()
    app_state["service_bus_client"] = ServiceBusClient(
        fully_qualified_namespace=f"{AZURE_SERVICE_BUS_NAMESPACE}.servicebus.windows.net",
        credential=credential
    )
    logging.info("Azure Service Bus client initialized.")
    
    # Yield control to the application
    yield
    
    # Shutdown logic
    logging.info("API Gateway shutting down...")
    await app_state["service_bus_client"].close()
    if ray.is_initialized():
        ray.shutdown()
    logging.info("Cleanup complete.")


app = FastAPI(lifespan=lifespan)

class JobRequest(BaseModel):
    simulation_type: str
    parameters: dict

class JobResponse(BaseModel):
    job_id: str
    status: str
    message: str

# Define a custom Ray Actor that will run the computation
# This would typically be in a separate, shared library.
@ray.remote
class ComputationActor:
    def __init__(self, job_id: str):
        self._job_id = job_id
        self._sb_namespace = os.environ.get("AZURE_SERVICE_BUS_NAMESPACE")
        self._status_topic_name = "job-status"
        self._credential = DefaultAzureCredential() # Each actor gets its own credentials
        
        # This client is created inside the actor, not serialized from the driver.
        self._sb_client = ServiceBusClient(
            fully_qualified_namespace=f"{self._sb_namespace}.servicebus.windows.net",
            credential=self._credential
        )

    async def _publish_status(self, status_message: str, progress: int):
        logging.info(f"Job {self._job_id}: Publishing status '{status_message}'")
        sender = self._sb_client.get_topic_sender(self._status_topic_name)
        async with sender:
            message = ServiceBusMessage(
                body=str({ "job_id": self._job_id, "status": status_message, "progress": progress }),
                content_type="application/json",
                correlation_id=self._job_id,
            )
            await sender.send_messages(message)

    async def run_simulation(self, params: dict) -> dict:
        try:
            await self._publish_status("RUNNING", 10)
            
            # Simulate a long-running, multi-step computation
            import time
            time.sleep(15) # Step 1
            await self._publish_status("RUNNING", 50)
            time.sleep(15) # Step 2

            await self._publish_status("COMPLETED", 100)
            
            return {"job_id": self._job_id, "result": "simulation_successful"}
        except Exception as e:
            logging.error(f"Job {self._job_id} failed: {e}")
            await self._publish_status(f"FAILED: {str(e)}", -1)
            # Re-raise to make the Ray task fail
            raise
        finally:
            # Important to close the client within the actor to release resources
            await self._sb_client.close()

# The CircuitBreaker protected function for connecting to Ray
@ray_breaker
def connect_to_ray():
    if not ray.is_initialized():
        logging.info(f"Connecting to Ray via Consul Proxy at {RAY_CONSUL_PROXY_ADDRESS}...")
        ray.init(address=f"ray://{RAY_CONSUL_PROXY_ADDRESS}", ignore_reinit_error=True)
    logging.info("Successfully connected to Ray cluster.")


@app.post("/jobs", response_model=JobResponse, status_code=status.HTTP_202_ACCEPTED)
async def submit_job(job: JobRequest):
    job_id = str(uuid.uuid4())
    logging.info(f"Received job submission request. Assigned Job ID: {job_id}")

    # 1. Place a message on Azure Service Bus to indicate submission
    try:
        sender = app_state["service_bus_client"].get_queue_sender(AZURE_JOB_QUEUE_NAME)
        async with sender:
            # Use ServiceBusMessage for more control over properties
            from azure.servicebus import ServiceBusMessage
            message = ServiceBusMessage(
                body=str({ "job_id": job_id, "params": job.parameters }),
                content_type="application/json",
                message_id=job_id
            )
            await sender.send_messages(message)
        logging.info(f"Job {job_id} successfully queued in Azure Service Bus.")
    except AzureError as e:
        logging.error(f"Failed to publish job {job_id} to Azure Service Bus: {e}")
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail="Messaging service is currently unavailable."
        )

    # 2. Connect to Ray cluster and submit the job
    try:
        connect_to_ray()
        
        # Instantiate the actor with a unique name based on job_id
        actor_name = f"actor_{job_id}"
        computation_actor = ComputationActor.options(name=actor_name).remote(job_id=job_id)
        
        # Submit the task. This returns a Ray ObjectRef immediately. We don't wait for it.
        computation_actor.run_simulation.remote(job.parameters)
        
        logging.info(f"Job {job_id} submitted to Ray cluster as actor '{actor_name}'.")

    except Exception as e:
        logging.error(f"Failed to submit job {job_id} to Ray: {e}")
        # Here we could attempt to send a "submission_failed" message to ASB
        # for reconciliation.
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail="Compute service is currently unavailable."
        )

    return JobResponse(
        job_id=job_id,
        status="SUBMITTED",
        message="Job has been accepted and is pending execution."
    )

This code demonstrates several production-grade patterns:

  • Lifespan Management: asynccontextmanager ensures that clients for Ray and Azure Service Bus are initialized on startup and gracefully shut down, preventing resource leaks.
  • Circuit Breaker: The pybreaker library wraps the ray.init call. If the Ray cluster is down or unreachable through the mesh, the gateway will quickly fail subsequent requests for a period of time instead of waiting for TCP timeouts, protecting itself from cascading failures.
  • Asynchronous Submission: The API returns 202 Accepted almost instantly. The actual work is done by computation_actor.run_simulation.remote(), which is a fire-and-forget operation from the gateway’s perspective. The state is now managed by Ray and Azure Service Bus.
  • Secure Credentials: DefaultAzureCredential is used within both the gateway and the Ray actor. This allows the application to authenticate to Azure services using Managed Identity when deployed on an Azure VM, eliminating the need to store secrets in code or configuration files.

Phase 4: Testing and Validation

A unit testing strategy is vital. For the API gateway, we can mock the ray and azure libraries to test the API logic in isolation.

Example using pytest and unittest.mock:

# tests/test_api.py
from unittest.mock import patch, MagicMock
from fastapi.testclient import TestClient
from api_gateway.main import app

client = TestClient(app)

@patch('api_gateway.main.ray')
@patch('api_gateway.main.ServiceBusClient')
def test_submit_job_success(MockServiceBusClient, mock_ray):
    # Arrange
    # Mock the Ray connection to succeed
    mock_ray.is_initialized.return_value = False
    
    # Mock the actor and its remote call
    mock_actor_instance = MagicMock()
    mock_actor_class = MagicMock()
    mock_actor_class.options.return_value.remote.return_value = mock_actor_instance
    mock_ray.remote.return_value = mock_actor_class
    
    # Mock the Azure Service Bus client
    mock_sender = MagicMock()
    mock_sb_client_instance = MagicMock()
    mock_sb_client_instance.get_queue_sender.return_value = mock_sender
    MockServiceBusClient.return_value = mock_sb_client_instance
    
    # This forces the lifespan startup logic to run with our mocks
    with TestClient(app) as client:
        # Act
        response = client.post("/jobs", json={
            "simulation_type": "monte_carlo",
            "parameters": {"iterations": 10000}
        })

        # Assert
        assert response.status_code == 202
        data = response.json()
        assert "job_id" in data
        assert data["status"] == "SUBMITTED"

        # Verify that Ray was initialized and the job submitted
        mock_ray.init.assert_called_once()
        mock_actor_instance.run_simulation.remote.assert_called_once()
        
        # Verify a message was sent to ASB
        mock_sb_client_instance.get_queue_sender.assert_called_with("job-submissions")
        # Access the async context manager mock
        mock_sender.__aenter__.return_value.send_messages.assert_called_once()

This test validates the success path without needing a live Ray cluster or Azure connection. Similar tests should be written for failure scenarios, such as the ray.init call raising an exception to ensure the circuit breaker logic is triggered and the correct HTTP 503 error is returned.

Limitations and Future Work

This architecture provides a secure and scalable foundation, but it’s not without its own complexities and areas for improvement. The primary operational burden is managing the lifecycle of the Consul agents and sidecar proxies alongside the Ray processes. A containerized environment orchestrated by Kubernetes, using the official Consul Helm chart, would significantly automate this deployment and management.

The current observability is limited to logs. A crucial next step would be to implement distributed tracing using OpenTelemetry. We could generate a trace in the API gateway, propagate the trace context through the Azure Service Bus message properties, and have the Ray actor continue the trace. This would give us end-to-end visibility into job execution times, pinpointing bottlenecks within the Ray computation itself or in the messaging pipeline.

Finally, the security model can be hardened further. We are using Consul Connect for transport security (mTLS), but not application-level authorization. By defining Consul intentions, we could create rules stating that only the api-gateway service is allowed to communicate with the ray-head service, effectively blocking a compromised Ray worker from attempting to submit new jobs, thereby achieving true least-privilege networking.


  TOC