Implementing a Polyglot Asynchronous Task Pipeline Across Rails Ray and a JPA Backend


The core business application, a monolithic Ruby on Rails service, began to exhibit severe performance degradation under a specific workload: generating complex, multi-stage financial projection reports. These tasks, initiated via a standard RESTful endpoint, could lock up a Puma worker for 30 to 90 seconds. Moving them to Sidekiq background jobs merely shifted the problem. The jobs were CPU-intensive, not I/O-bound, and the Ruby Global Interpreter Lock (GIL) became a significant bottleneck. Scaling Sidekiq workers horizontally led to contention on the shared PostgreSQL database, while vertical scaling of the host machines yielded diminishing returns at a prohibitive cost. The root issue was clear: the Rails monolith, designed for transactional web workflows, was fundamentally unsuited for heavy, stateful computation.

Our initial concept was to build a dedicated compute service. The data science team already operated a Python-based stack, making a Python solution preferable for talent reuse. We evaluated Dask, Celery with RabbitMQ, and Ray. We settled on Ray for its native support for stateful actors, which felt like a natural model for our long-running, multi-stage computations. Each report generation could be encapsulated within an actor, isolating its state and execution. The Rails application would transition from an executor to an orchestrator, offloading these tasks to the Ray cluster.

This decision introduced a new set of architectural complexities. The source of truth for our financial data resides in a legacy Java/Spring Boot service that uses JPA/Hibernate to manage a complex relational schema. This service exposes a set of RESTful APIs, but they were designed for fetching individual entities, not for the high-throughput data access our new compute workers would require. Furthermore, the computations performed by the Ray workers were not stateless; they involved fetching a large base dataset and then iteratively applying transformations. Repeatedly fetching this base data from the sluggish Java service for every task was a non-starter.

To solve the worker state problem, we needed a fast, persistent, local storage solution. A shared Redis cache was considered but rejected; it would introduce another networked dependency and a single point of failure. We wanted to keep the workers self-contained. This led us to LevelDB, an embedded key-value store. Each Ray actor could manage its own private LevelDB instance on its local filesystem, using it as a persistent, process-local cache for data fetched from the Java service and for storing intermediate computation results. This design choice created a polyglot system with three distinct runtimes (Ruby, Python, Java) and four major components that needed to communicate reliably and efficiently.

graph TD
    subgraph Rails Orchestrator
        A[Rails Controller] --> B{ActiveJob};
    end

    subgraph Ray Compute Cluster
        C[Ray Serve API Gateway] --> D{Ray Actor Pool};
        D -- Manages --> E[Local LevelDB Instance];
    end

    subgraph Legacy Data Service
        F[Spring Boot REST API];
        F -- Uses --> G[JPA/Hibernate];
        G -- Accesses --> H[(PostgreSQL DB)];
    end

    B -- HTTP POST --> C;
    C -- dispatches task --> D;
    D -- Cache Miss --> F;
    A -- Polls Status --> C;

    style D fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#ccf,stroke:#333,stroke-width:2px

Rails as the Orchestration Layer

The Rails application’s role was simplified to accepting user requests, validating them, dispatching the job to the Ray cluster, and providing an endpoint for clients to poll for the result. We used ActiveJob to decouple the web request from the outbound API call to Ray. A real-world project requires robust configuration management for external services.

Here is the Rails configuration for the Ray service client. A common mistake is to hardcode URLs and timeouts directly in the code. Using Rails’ encrypted credentials and a dedicated configuration object is mandatory for production.

config/initializers/ray_client.rb

# frozen_string_literal: true

# A dedicated configuration object for the Ray cluster client.
# This prevents scattering configuration details across the codebase.
module RayClient
  class Configuration
    attr_accessor :endpoint_url, :api_key, :open_timeout, :read_timeout

    def initialize
      @endpoint_url = nil
      @api_key = nil
      @open_timeout = 5 # seconds
      @read_timeout = 10 # seconds
    end
  end

  def self.configuration
    @configuration ||= Configuration.new
  end

  def self.configure
    yield(configuration)
  end
end

# Load configuration from Rails credentials for the current environment.
# This ensures secrets are not checked into version control.
RayClient.configure do |config|
  credentials = Rails.application.credentials.config.dig(:ray_service)
  if credentials
    config.endpoint_url = credentials[:endpoint_url]
    config.api_key = credentials[:api_key]
  else
    # Fail fast during boot if configuration is missing in production.
    Rails.logger.warn("Ray service configuration not found in credentials.") unless Rails.env.test?
  end
end

The ActiveJob class handles the actual communication. We use HTTParty for simplicity, but in a production system, a more robust client like Faraday with middleware for retries and logging would be preferable. The key is that the job is fire-and-forget from the Rails perspective. It dispatches the task and stores a job_id provided by our Ray API gateway.

app/jobs/financial_projection_job.rb

# frozen_string_literal: true

require 'httparty'
require 'securerandom'

class FinancialProjectionJob < ApplicationJob
  queue_as :high_priority # These are user-facing, so they get a high-priority queue.

  # A custom error class for better error tracking in services like Sentry.
  class RayServiceError < StandardError; end

  def perform(report_id, parameters)
    @report = Report.find(report_id)
    @report.update!(status: 'processing', external_job_id: nil)

    # The job_id is generated client-side to ensure idempotency.
    # If the job fails and retries, we send the same ID.
    idempotency_key = SecureRandom.uuid
    
    response = dispatch_to_ray_cluster(idempotency_key, parameters)

    if response.success?
      external_job_id = response.parsed_response['job_id']
      @report.update!(status: 'queued', external_job_id: external_job_id)
    else
      # Log detailed error information for debugging.
      Rails.logger.error "Ray service dispatch failed. Status: #{response.code}, Body: #{response.body}"
      # The job will be retried automatically by ActiveJob's retry mechanism.
      raise RayServiceError, "Failed to dispatch job to Ray. Status: #{response.code}"
    end
  rescue ActiveRecord::RecordNotFound
    Rails.logger.warn "Report #{report_id} not found while running FinancialProjectionJob. Job will not be retried."
  end

  private

  def dispatch_to_ray_cluster(job_id, params)
    config = RayClient.configuration
    headers = {
      'Content-Type' => 'application/json',
      'X-API-Key' => config.api_key,
      'X-Idempotency-Key' => job_id
    }
    body = {
      job_id: job_id,
      # Sanitize and structure parameters for the Python service.
      parameters: {
        customer_id: params[:customer_id],
        time_horizon_years: params[:time_horizon_years],
        # Snake case is conventional for Python.
        risk_profile: params[:risk_profile]
      }
    }.to_json

    HTTParty.post(
      "#{config.endpoint_url}/generate_projection",
      headers: headers,
      body: body,
      timeout: config.open_timeout + config.read_timeout
    )
  end
end

The Ray Serve API Gateway

On the Python side, we needed a stable, production-ready HTTP interface. While Ray Core allows direct task submission, using Ray Serve provides HTTP triggers, scalability, and a clean separation of concerns. The Serve deployment acts as a lightweight API gateway. Its only job is to receive requests, validate them, and dispatch the work to a pool of stateful worker actors.

We use Pydantic for robust data validation at the boundary. A common failure mode in polyglot systems is mismatched data schemas; Pydantic catches these errors early.

serve_gateway.py

import logging
import os
from typing import Dict

import ray
from ray import serve
from fastapi import FastAPI, Request, Response, status
from pydantic import BaseModel, Field

from worker_actor import ProjectionWorker, ACTOR_POOL_SIZE

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- Pydantic Models for type-safe request validation ---
class ProjectionParameters(BaseModel):
    customer_id: str
    time_horizon_years: int = Field(gt=0, le=50)
    risk_profile: str

class TaskPayload(BaseModel):
    job_id: str
    parameters: ProjectionParameters

# --- Ray Serve Application ---
app = FastAPI()

@serve.deployment(
    num_replicas=2, 
    ray_actor_options={"num_cpus": 0.5, "num_gpus": 0}
)
@serve.ingress(app)
class ApiGateway:
    def __init__(self):
        # Create a pool of stateful worker actors.
        # This is a simple round-robin router. In a real-world project,
        # one might implement more sophisticated routing (e.g., based on data locality).
        logger.info("Initializing ApiGateway and creating worker actor pool...")
        self.workers = [
            ProjectionWorker.options(name=f"worker_{i}").remote(worker_id=i)
            for i in range(ACTOR_POOL_SIZE)
        ]
        self.counter = 0
        self.api_key = os.environ.get("SERVICE_API_KEY")
        if not self.api_key:
            raise ValueError("SERVICE_API_KEY environment variable not set.")
        logger.info("ApiGateway initialized.")

    @app.post("/generate_projection")
    async def generate_projection(self, payload: TaskPayload, request: Request, response: Response):
        # --- Security and Idempotency ---
        if request.headers.get("X-API-Key") != self.api_key:
            response.status_code = status.HTTP_401_UNAUTHORIZED
            return {"error": "Invalid API key"}

        # Idempotency check could be added here using a shared store like Redis
        # to ensure the same job_id isn't processed twice. For now, we assume
        # the client provides a unique ID.
        
        # --- Dispatching Logic ---
        # Use a simple round-robin to distribute load across actors.
        selected_worker_index = self.counter % ACTOR_POOL_SIZE
        worker = self.workers[selected_worker_index]
        self.counter += 1
        
        logger.info(f"Dispatching job {payload.job_id} to worker_{selected_worker_index}")

        # `process.remote` is a non-blocking call. It returns immediately.
        # The actor will execute the task in the background.
        ray.get(worker.process.remote(payload.job_id, payload.parameters.dict()))

        return {"status": "queued", "job_id": payload.job_id}

    @app.get("/job_status/{job_id}")
    async def get_job_status(self, job_id: str, request: Request, response: Response):
        if request.headers.get("X-API-Key") != self.api_key:
            response.status_code = status.HTTP_401_UNAUTHORIZED
            return {"error": "Invalid API key"}

        # This is a simplistic approach. It requires knowing which worker has the job.
        # A better pattern is for workers to write their status to a shared database
        # (e.g., Redis, DynamoDB) keyed by job_id.
        # For demonstration, we'll query all actors, which is inefficient.
        for worker in self.workers:
            status_result = ray.get(worker.get_status.remote(job_id))
            if status_result is not None:
                return status_result
        
        response.status_code = status.HTTP_404_NOT_FOUND
        return {"error": "Job not found"}

# Bind the deployment to a name for access.
api_gateway_app = ApiGateway.bind()

The Stateful Worker with a LevelDB Cache

This is the core of the compute logic. Each ProjectionWorker is a stateful Ray actor. Upon initialization, it creates its own private LevelDB database in a designated directory. The key decision here is to embed the database directly into the actor, ensuring data locality and eliminating network overhead for cache access. We use the plyvel library, a Python binding for LevelDB.

The interaction with the legacy Java service is handled by a dedicated client class that includes retries with exponential backoff using the tenacity library. This is critical for resilience when dealing with potentially unreliable downstream services.

worker_actor.py

import logging
import os
import json
import time
import shutil

import ray
import plyvel
import requests
from tenacity import retry, stop_after_attempt, wait_exponential

# --- Constants and Configuration ---
ACTOR_POOL_SIZE = 4
BASE_DB_PATH = "/tmp/ray_leveldb_cache"
JAVA_API_ENDPOINT = os.environ.get("JAVA_API_ENDPOINT", "http://localhost:8080/api")
logger = logging.getLogger(__name__)


# --- Client for the legacy Java service ---
class LegacyDataClient:
    """A resilient client for fetching data from the Java service."""
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
    def get_customer_portfolio(self, customer_id: str) -> dict:
        """Fetches customer portfolio data with retries."""
        try:
            logger.info(f"Fetching portfolio for customer {customer_id} from Java service.")
            url = f"{JAVA_API_ENDPOINT}/customers/{customer_id}/portfolio"
            response = requests.get(url, timeout=15)
            response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx)
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.error(f"Failed to fetch data for customer {customer_id}: {e}")
            raise # Reraise to trigger tenacity's retry mechanism.

# --- Ray Actor Definition ---
@ray.remote(num_cpus=1)
class ProjectionWorker:
    def __init__(self, worker_id: int):
        self.worker_id = worker_id
        self.db_path = os.path.join(BASE_DB_PATH, f"worker_{self.worker_id}")
        
        # Clean up previous state on actor restart, a practical step for development.
        if os.path.exists(self.db_path):
            shutil.rmtree(self.db_path)
        os.makedirs(self.db_path, exist_ok=True)
        
        # Each actor gets its own private LevelDB instance.
        self.db = plyvel.DB(self.db_path, create_if_missing=True)
        self.data_client = LegacyDataClient()
        # In-memory dict to track job statuses handled by this worker.
        self.job_statuses = {}
        logger.info(f"Worker {self.worker_id} initialized with DB at {self.db_path}")

    def process(self, job_id: str, params: dict):
        """Main entry point for processing a projection task."""
        self.job_statuses[job_id] = {"status": "processing", "start_time": time.time()}
        logger.info(f"Worker {self.worker_id} started processing job {job_id}")
        
        try:
            customer_id = params['customer_id']
            
            # --- The core caching logic ---
            # The cache key includes a version prefix to allow for easy invalidation.
            cache_key = f"v1:portfolio:{customer_id}".encode('utf-8')
            
            cached_data = self.db.get(cache_key)
            if cached_data:
                logger.info(f"Cache HIT for customer {customer_id} on worker {self.worker_id}")
                portfolio_data = json.loads(cached_data.decode('utf-8'))
            else:
                logger.info(f"Cache MISS for customer {customer_id} on worker {self.worker_id}")
                portfolio_data = self.data_client.get_customer_portfolio(customer_id)
                # Cache the data for 1 hour (3600 seconds). A simple TTL mechanism.
                # A real implementation would store a tuple of (data, expiry_timestamp).
                self.db.put(cache_key, json.dumps(portfolio_data).encode('utf-8'))

            # --- Simulate a CPU-intensive computation ---
            # This is where the actual financial modeling would happen.
            time.sleep(15) # Simulates a 15-second computation
            
            result = {"projection_value": len(portfolio_data.get("assets", [])) * 1000}
            
            # Persist final result to DB before updating status
            result_key = f"result:{job_id}".encode('utf-8')
            self.db.put(result_key, json.dumps(result).encode('utf-8'))
            
            self.job_statuses[job_id] = {
                "status": "completed",
                "result": result,
                "duration_seconds": time.time() - self.job_statuses[job_id]["start_time"]
            }
            logger.info(f"Worker {self.worker_id} completed job {job_id}")

        except Exception as e:
            logger.error(f"Worker {self.worker_id} failed job {job_id}: {e}", exc_info=True)
            self.job_statuses[job_id] = {"status": "failed", "error": str(e)}

    def get_status(self, job_id: str) -> dict | None:
        """Allows the gateway to poll for job status."""
        return self.job_statuses.get(job_id)
        
    def __del__(self):
        """Ensure the database connection is closed when the actor is destroyed."""
        if self.db:
            self.db.close()

The Legacy Java Service Constraint

The final piece is the Java service. Its implementation is standard for a Spring Boot application. The critical part to analyze from an architectural perspective is not the code itself, but its performance characteristics and how they forced the design of our caching solution. A common pitfall in systems using Hibernate is the N+1 select problem, where fetching a list of entities triggers one query for the list and then N additional queries for each entity’s related data. When our Python workers requested a customer’s portfolio, the original implementation of this endpoint was triggering dozens of queries, resulting in response times of 5-10 seconds.

CustomerPortfolioController.java

@RestController
@RequestMapping("/api/customers")
public class CustomerPortfolioController {

    private final CustomerService customerService;

    // Constructor injection
    public CustomerPortfolioController(CustomerService customerService) {
        this.customerService = customerService;
    }

    @GetMapping("/{customerId}/portfolio")
    public ResponseEntity<PortfolioDTO> getCustomerPortfolio(@PathVariable String customerId) {
        // The DTO (Data Transfer Object) is crucial. It decouples the API representation
        // from the internal JPA entity structure. This prevents leaking internal details
        // and allows us to optimize the data fetching.
        PortfolioDTO portfolio = customerService.getPortfolioForCustomer(customerId);
        if (portfolio == null) {
            return ResponseEntity.notFound().build();
        }
        return ResponseEntity.ok(portfolio);
    }
}

CustomerService.java

@Service
public class CustomerService {

    private final CustomerRepository customerRepository;

    // Constructor injection
    public CustomerService(CustomerRepository customerRepository) {
        this.customerRepository = customerRepository;
    }

    @Transactional(readOnly = true) // Use read-only transactions for performance
    public PortfolioDTO getPortfolioForCustomer(String customerId) {
        // The key optimization is here: using a custom query with JOIN FETCH.
        // This tells Hibernate to fetch the Customer and their associated Assets
        // in a single SQL query, avoiding the N+1 problem.
        Customer customer = customerRepository.findByIdWithAssets(customerId)
                .orElse(null);

        if (customer == null) {
            return null;
        }

        // Map entities to a DTO for the API response.
        return mapToPortfolioDTO(customer);
    }

    // ... DTO mapping logic ...
}

CustomerRepository.java

public interface CustomerRepository extends JpaRepository<Customer, Long> {

    // This custom query is the fix for the N+1 problem.
    // 'LEFT JOIN FETCH c.assets' instructs JPA to load the assets collection
    // eagerly in the same query that loads the customer.
    @Query("SELECT c FROM Customer c LEFT JOIN FETCH c.assets WHERE c.externalId = :customerId")
    Optional<Customer> findByIdWithAssets(@Param("customerId") String customerId);
}

Even with this JPA optimization, the service was still a bottleneck due to network latency, JSON serialization overhead, and load on the Java service’s application server. The LevelDB cache at the Ray actor level was not just a performance enhancement; it was a required architectural component to shield the legacy service and enable the entire pipeline to function at the required throughput.

This architecture, while complex, effectively decouples the user-facing application from the resource-intensive computation. The Rails app remains responsive, the Ray cluster can be scaled independently to handle computational load, and the legacy Java service is protected from being overwhelmed by a barrage of requests. The use of a local, embedded database like LevelDB provides a powerful pattern for managing state in distributed compute workers without introducing additional network dependencies.

The current implementation has known limitations. The cache invalidation is non-existent; we rely on a soft TTL implemented by versioning the cache keys, but stale data is a real risk. A more robust solution would involve the Java service publishing invalidation events (e.g., via Kafka or a Redis pub/sub channel) that the Ray actors could subscribe to. Furthermore, the job status polling mechanism is inefficient. A centralized, shared data store (like Redis or DynamoDB) where workers write their status would be a much more scalable pattern. Finally, data serialization between services is currently JSON, which is human-readable but inefficient. Migrating the interfaces to use a binary format like Protocol Buffers would significantly reduce network bandwidth and serialization/deserialization CPU time across the entire polyglot stack.


  TOC