Integrating Asynchronous BentoML Inference with Rails SSE Streams for Real-Time Progress Reporting


The initial requirement sounded straightforward: execute a machine learning model from our Rails application and show the results. But a critical constraint emerged—these weren’t 50ms classification tasks. They were complex simulations and data generation jobs, served via BentoML, that could run for several minutes. A standard synchronous request-response cycle was out of the question; it would lead to a cascade of HTTP timeouts and a terrible user experience. Polling from the client would hammer our infrastructure and still feel sluggish.

The technical pain point was clear: we needed a robust, scalable way to provide real-time, granular feedback for long-running, asynchronous background tasks initiated from a web interface. The user needed to see not just a spinner, but meaningful progress updates, logs, and eventually, the final result, without keeping a fragile HTTP connection open for minutes on end. This is a classic distributed system problem disguised as a simple feature request.

Our initial concept was an event-driven, decoupled architecture. A user’s request would trigger a background process. The client would then subscribe to a dedicated channel to receive progress updates pushed from the server. This decouples the lifecycle of the web request from the lifecycle of the ML job.

This led to our technology selection, driven by a mix of existing infrastructure and the specific demands of the problem:

  1. Ruby on Rails with ActionController::Live: Our web application is built on Rails. ActionController::Live provides a straightforward implementation of Server-Sent Events (SSE), a perfect protocol for one-way, server-to-client communication. It’s simpler than WebSockets and operates over standard HTTP, making it firewall-friendly.
  2. BentoML: The data science team had already containerized their models using BentoML. Our task was to integrate with this existing service, not replace it. The challenge was its black-box nature from the perspective of our Rails application.
  3. HBase: These ML jobs generated a significant volume of intermediate state and structured log data. We anticipated high write throughput. Storing this progress data in our primary PostgreSQL database was a non-starter; it would cause table bloat and lock contention. HBase, with its wide-column model and horizontal scalability, was chosen as the state store for these jobs. It could handle the write velocity and flexible schema of the progress data.
  4. Sentry: This was the most critical piece for production viability. When a 10-minute job fails at the 9-minute mark, the user sees a generic “Error” message. The support ticket simply says “it didn’t work.” To debug this, we needed unified, end-to-end tracing that spanned the entire distributed call chain: from the user’s click in the browser, through the Rails controller, the Sidekiq background job, the HTTP call to the BentoML service, the Python code executing the model, and the database writes to HBase. Without this, we would be flying blind.

Here is the high-level architecture we landed on.

sequenceDiagram
    participant Client
    participant Rails Server (Puma)
    participant Sidekiq
    participant BentoML Service
    participant HBase
    participant Sentry

    Client->>+Rails Server (Puma): POST /api/v1/simulations (start job)
    Rails Server (Puma)->>Sentry: Start Transaction 'POST /api/v1/simulations'
    Rails Server (Puma)->>Sidekiq: Enqueue SimulationJob (job_id, trace_id)
    Rails Server (Puma)-->>-Client: { "job_id": "uuid-1234" }
    Note over Client, Rails Server (Puma): Initial request is fast.

    Client->>+Rails Server (Puma): GET /api/v1/simulations/uuid-1234/stream (SSE)
    Note over Client, Rails Server (Puma): Client subscribes to SSE stream.

    Sidekiq->>+BentoML Service: POST /run_simulation (job_id, sentry-trace header)
    BentoML Service->>Sentry: Continue Transaction from header
    BentoML Service->>HBase: Write { status: 'RUNNING' }
    loop Job Execution
        BentoML Service->>BentoML Service: Run ML model chunk
        BentoML Service->>Sentry: Create Span 'process_chunk_N'
        BentoML Service->>HBase: Write { progress: 25, log: '...' }
    end
    BentoML Service->>HBase: Write { status: 'COMPLETED', result: '...' }
    BentoML Service-->>-Sidekiq: { "status": "ok" }
    Sidekiq->>Sentry: Finish Span
    
    loop SSE Stream
        Rails Server (Puma)->>HBase: Read progress for 'uuid-1234'
        HBase-->>Rails Server (Puma): { progress: 25, log: '...' }
        Rails Server (Puma)-->>Client: event: progress, data: {...}
    end
    Rails Server (Puma)->>HBase: Read final result
    HBase-->>Rails Server (Puma): { status: 'COMPLETED', result: '...' }
    Rails Server (Puma)-->>Client: event: complete, data: {...}
    Rails Server (Puma)-->>-Client: Close SSE connection

Step 1: The Rails Foundation - Triggering and Streaming

First, we need two endpoints. One to kick off the job and return a job_id immediately, and another to stream the results for that job_id.

The job creation endpoint is a standard controller action. Its only responsibilities are to generate a unique ID, create an initial record in HBase to signify the job exists, and enqueue a background job. A crucial addition is capturing the Sentry trace information to pass along.

# app/controllers/api/v1/simulations_controller.rb
class Api::V1::SimulationsController < ApplicationController
  include ActionController::Live

  # HBase connection is managed via a singleton or connection pool
  # For simplicity, we'll imagine a `HBaseClient` class.
  before_action :set_hbase_client

  def create
    job_id = SecureRandom.uuid
    sentry_trace = Sentry.get_current_scope.sentry_trace
    baggage = Sentry.get_current_scope.get_baggage

    # Create an initial placeholder record in HBase
    @hbase_client.put(
      table: 'simulations',
      row: job_id,
      data: { 'meta:status' => 'PENDING', 'meta:created_at' => Time.now.utc.iso8601 }
    )

    # Enqueue the actual work. Pass all necessary context.
    SimulationRunnerJob.perform_async(job_id, sentry_trace, baggage.serialize)

    render json: { job_id: job_id }, status: :accepted
  end

  # ... stream action below
end

The SimulationRunnerJob uses Sidekiq to make an HTTP call to the BentoML service. This is the handoff point. Notice how we explicitly pass the sentry-trace and baggage headers. This is the key to linking the transactions in Sentry.

# app/jobs/simulation_runner_job.rb
class SimulationRunnerJob
  include Sidekiq::Job

  def perform(job_id, sentry_trace, baggage)
    # A real-world project would use a more robust HTTP client like Faraday
    # and pull the BentoML URL from configuration.
    uri = URI.parse(ENV.fetch('BENTOML_SERVICE_URL') + '/run_simulation')
    http = Net::HTTP.new(uri.host, uri.port)
    request = Net::HTTP::Post.new(uri.request_uri, 'Content-Type' => 'application/json')
    
    # This is the critical part for distributed tracing.
    request['sentry-trace'] = sentry_trace
    request['baggage'] = baggage

    request.body = { job_id: job_id, params: { some_param: 'value' } }.to_json

    Sentry.with_scope do |scope|
      scope.set_context('simulation_job', { job_id: job_id })
      begin
        response = http.request(request)
        unless response.is_a?(Net::HTTPSuccess)
          # If the call to BentoML fails, update HBase and raise an error
          # so Sentry captures it within the job's context.
          HBaseClient.instance.put(
            table: 'simulations',
            row: job_id,
            data: { 'meta:status' => 'FAILED', 'meta:error' => "BentoML service returned #{response.code}" }
          )
          raise "BentoMLServiceError: #{response.body}"
        end
      rescue StandardError => e
        HBaseClient.instance.put(
            table: 'simulations',
            row: job_id,
            data: { 'meta:status' => 'FAILED', 'meta:error' => "Network error connecting to BentoML" }
        )
        # Re-raise to let Sidekiq's retry mechanism and Sentry handle it.
        raise e
      end
    end
  end
end

Now for the streaming part. The stream action uses ActionController::Live. It sets the response headers for an SSE stream and then enters a loop. Inside the loop, it reads the latest status from HBase and pushes it to the client. A common mistake here is to hold a database connection open for the duration of the stream. It’s vital to ensure connections are checked back into the pool on each iteration.

# app/controllers/api/v1/simulations_controller.rb (continued)
  def stream
    response.headers['Content-Type'] = 'text/event-stream'
    # NGINX/proxy configuration is critical here to disable response buffering.
    response.headers['X-Accel-Buffering'] = 'no'
    response.headers['Cache-Control'] = 'no-cache'

    sse = SSE.new(response.stream)

    begin
      job_id = params[:id]
      last_timestamp = 0

      # Loop until a terminal state is reached
      loop do
        # In a real-world project, implement a timeout on this loop
        # to prevent infinitely running streams for orphaned jobs.
        # e.g., break after 15 minutes.
        
        # Fetch the latest updates from HBase since the last check
        # The `get_updates` method is hypothetical, but would query HBase
        # for all cells in the row with a timestamp > last_timestamp.
        updates = @hbase_client.get_updates(table: 'simulations', row: job_id, since: last_timestamp)

        unless updates.empty?
          # The column name from HBase becomes the SSE event name.
          # e.g., 'progress:percent' -> event: 'progress'
          updates.each do |column, value, timestamp|
            event_name = column.split(':').first
            sse.write({ data: value }, event: event_name)
          end
          last_timestamp = updates.map { |_, _, ts| ts }.max
        end

        # Check for terminal state
        status = updates['meta:status']&.first || @hbase_client.get(table: 'simulations', row: job_id, column: 'meta:status')
        if ['COMPLETED', 'FAILED'].include?(status)
          # Send one final "complete" event with the final state
          final_data = @hbase_client.get_row(table: 'simulations', row: job_id)
          sse.write({ data: final_data }, event: 'complete')
          break
        end

        sleep 2 # Poll HBase every 2 seconds.
      end
    rescue ClientDisconnected
      # This is expected when the user closes the browser.
      # Log it for information, but it's not an error.
      logger.info "SSE Client Disconnected for job #{params[:id]}"
    ensure
      sse.close
    end
  end

private

  def set_hbase_client
    @hbase_client = HBaseClient.instance # Assuming a singleton for connection management
  end

Step 2: The BentoML Service - Execution and State Reporting

The Python side is where the actual work happens. We define a BentoML service that exposes an endpoint. This service needs to do three things:

  1. Parse the incoming request, including the job_id and Sentry headers.
  2. Perform the long-running ML task, instrumenting key stages with Sentry spans.
  3. Periodically connect to HBase to write progress updates.

Here’s the bentofile.yaml which defines the service and its dependencies.

# bentofile.yaml
service: "service:svc"
labels:
  owner: data-science-team
  project: real-time-sim
include:
  - "*.py"
python:
  packages:
    - bentoml
    - numpy
    - pandas
    - sentry-sdk
    - happybase # A python client for HBase

And here’s the Python service implementation. Pay close attention to how the Sentry transaction is continued from the incoming headers and how happybase is used to communicate with HBase.

# service.py
import bentoml
import numpy as np
import time
import happybase
import os
import sentry_sdk
from sentry_sdk.tracing import Transaction
from bentoml.io import JSON
from werkzeug.datastructures import Headers

# --- Sentry and HBase Configuration ---
# In production, use environment variables for configuration.
SENTRY_DSN = os.environ.get("SENTRY_DSN")
HBASE_HOST = os.environ.get("HBASE_HOST", "localhost")
HBASE_TABLE = "simulations"

sentry_sdk.init(
    dsn=SENTRY_DSN,
    traces_sample_rate=1.0,
    profiles_sample_rate=1.0,
)

# HBase connection pooling is crucial for performance.
# happybase provides a ConnectionPool.
hbase_pool = happybase.ConnectionPool(size=10, host=HBASE_HOST)

# --- BentoML Service Definition ---
svc = bentoml.Service("long_running_simulation")

class SimulationRunnable(bentoml.Runnable):
    SUPPORTED_RESOURCES = ("cpu",)
    SUPPORTS_CPU_MULTI_THREADING = True

    def __init__(self):
        # In a real model, you would load artifacts here.
        # self.model = bentoml.sklearn.load_model("my_model:latest")
        pass

    @bentoml.Runnable.method(batchable=False)
    def run_simulation(self, data: dict):
        job_id = data.get("job_id")
        if not job_id:
            raise ValueError("job_id is required")

        # The core simulation logic.
        total_steps = 20
        for i in range(total_steps):
            with sentry_sdk.start_span(op="simulation.step", description=f"Processing step {i+1}") as span:
                # Simulate work
                time.sleep(1.5)
                progress = int(((i + 1) / total_steps) * 100)
                
                # Write progress to HBase
                with hbase_pool.connection() as connection:
                    table = connection.table(HBASE_TABLE)
                    table.put(
                        job_id.encode('utf-8'),
                        {
                            b'progress:percent': str(progress).encode('utf-8'),
                            b'progress:step': str(i + 1).encode('utf-8'),
                            b'logs:info': f"Completed step {i+1}/{total_steps}".encode('utf-8')
                        }
                    )
                span.set_data("progress", progress)
                
        # Return final result
        return {"result": "simulation completed successfully", "steps_processed": total_steps}

simulation_runner = bentoml.Runner(SimulationRunnable)
svc.runners.append(simulation_runner)

@svc.api(input=JSON(), output=JSON())
async def run_simulation(input_data: dict, context: bentoml.Context):
    job_id = input_data.get("job_id")

    # --- Sentry Transaction Continuation ---
    # This is the magic that links Rails and Python traces.
    sentry_trace_header = context.request.headers.get("sentry-trace")
    baggage_header = context.request.headers.get("baggage")
    
    transaction = Transaction.continue_from_headers(
        dict(context.request.headers), op="bentoml.request", name="run_simulation"
    )
    with sentry_sdk.start_transaction(transaction):
        sentry_sdk.set_tag("job_id", job_id)
        
        with hbase_pool.connection() as connection:
            table = connection.table(HBASE_TABLE)
            table.put(job_id.encode('utf-8'), {b'meta:status': b'RUNNING'})

        # Run the task asynchronously. BentoML runners handle this.
        result = await simulation_runner.run_simulation.async_run(input_data)

        # Write final result to HBase
        with hbase_pool.connection() as connection:
            table = connection.table(HBASE_TABLE)
            table.put(
                job_id.encode('utf-8'),
                {
                    b'meta:status': b'COMPLETED',
                    b'results:final': str(result).encode('utf-8'),
                    b'meta:finished_at': str(time.time()).encode('utf-8')
                }
            )

        return {"status": "job completed", "job_id": job_id}

Step 3: HBase Schema and Data Model

The HBase table design is intentionally simple but effective for this use case. We use the job_id as the row key, which guarantees fast lookups. We use column families (meta, progress, logs, results) to group related data.

  • Table Name: simulations
  • Row Key: job_id (e.g., 550e8400-e29b-41d4-a716-446655440000)
  • Column Families:
    • meta: Stores job metadata (e.g., meta:status, meta:created_at, meta:error).
    • progress: Stores frequently updated progress indicators (e.g., progress:percent, progress:step).
    • logs: For streaming log lines (e.g., logs:info, logs:warn). Each write can use a different qualifier (like the timestamp) to store multiple log entries.
    • results: For storing final or intermediate structured results.

This schema allows the Rails SSE streamer to efficiently fetch just the columns it needs, and HBase’s timestamping of each cell (row, column, version) allows us to fetch only the new data since the last poll.

The Full Picture: Observability in Action

With Sentry configured on both sides and the trace context propagated, a single failed job generates a complete, unified trace in the Sentry UI. We can see the initial Rails request, the duration of the Sidekiq job, the latency of the network call to BentoML, and a full breakdown of the execution within the Python service, including each simulation.step span and any database interactions.

If the BentoML service throws an exception while writing to HBase, the exception is captured in Sentry and associated with the same trace that originated in Rails. We can immediately identify the failing component, see the request payload, the logs leading up to the error, and the exact line of Python code that failed. This transforms debugging from a multi-hour archaeological dig through separate log files into a five-minute analysis.

This architecture, while involving several moving parts, solves the core problem effectively. It provides a responsive user experience for long-running tasks, leverages specialized tools (BentoML, HBase) for their intended purpose, and, most importantly, is built from the ground up to be observable and maintainable in a production environment. The pitfall here would have been to build a “happy path” solution without considering how it would fail. In a distributed system, failure is a certainty, and designing for observability is not an optional add-on; it is a fundamental requirement.

The current implementation relies on the Rails server polling HBase. This is a pragmatic and simple approach that decouples the web tier from the compute tier, but it introduces a minor latency (up to the poll interval). A future iteration could evolve this into a true push-based system. The BentoML service could publish completion events for each step to a Redis Pub/Sub channel or a Kafka topic. The Rails application could have a separate listener process (not within the Puma web server) that subscribes to these events and pushes them into a cache or directly to the active SSE streams. This would reduce the load on HBase for status checks and provide more immediate updates, at the cost of introducing and managing another piece of infrastructure.


  TOC