Building a Resilient Polyglot ML Inference Pipeline with Haskell and OpenFaaS


The initial technical constraint was non-negotiable: our public-facing API services must be written in Haskell for maximum type safety and robustness. Simultaneously, the machine learning team’s output was, and would continue to be, Python models leveraging the Hugging Face Transformers ecosystem. The task was to deploy a new text summarization model as a scalable, reliable service. A monolithic approach, attempting to embed a Python interpreter within a Haskell service, was immediately discarded as an operational nightmare. A simple Haskell proxy forwarding requests to a Python backend was a slightly better, but still inadequate, solution. It created a synchronous coupling that would tie up Haskell threads waiting for potentially slow ML inference, introducing a significant point of failure and resource contention.

The core of the problem was architectural: how to integrate two disparate runtime environments (GHC for Haskell, CPython for the ML model) while ensuring independent scalability, fault isolation, and resilience. This led us to a serverless, event-driven architecture. The concept was to build a multi-stage, polyglot workflow on OpenFaaS. A Haskell function would serve as the public API endpoint—the orchestrator—responsible for request validation, sanitization, and job submission. Instead of waiting for a response, it would asynchronously invoke a dedicated Python function to perform the actual summarization. This decouples the components entirely; a slowdown or failure in the Python inference workload would not directly impact the availability of the primary Haskell API.

OpenFaaS was selected for its simplicity, Kubernetes-native design, and explicit support for both synchronous and asynchronous invocations via its built-in NATS message queue. This asynchronous capability was the linchpin of the entire design. It allows the front-facing Haskell function to accept a request, validate it, offload the heavy computation by placing a message on the queue, and immediately return a job identifier to the client. The resource-intensive Python function, which can be scaled independently, consumes jobs from this queue at its own pace.

The front-end, while not the focus of the backend architecture, needed a clean and responsive interface to interact with this asynchronous system. We opted for Shadcn UI on a Next.js foundation, as its component-based, accessible-first approach allowed us to build the user-facing portion quickly without compromising quality.

The overall architecture can be visualized as a chain of responsibilities executed across different serverless functions.

sequenceDiagram
    participant C as Client (Browser with Shadcn UI)
    participant GW as OpenFaaS Gateway
    participant HO as Haskell Orchestrator
    participant NATS as NATS Queue
    participant PS as Python Summarizer

    C->>+GW: POST /function/haskell-orchestrator (Text to summarize)
    GW->>+HO: Invoke with request body
    HO-->>HO: Validate and sanitize input
    HO->>+GW: POST /async-function/python-summarizer (Validated Text)
    GW->>+NATS: Enqueue job for 'python-summarizer'
    NATS-->>-GW: Acknowledge enqueue
    GW-->>-HO: HTTP 202 Accepted
    HO-->>-C: HTTP 202 Accepted (Job ID)
    C-->>C: UI displays "Processing..."

    Note over NATS, PS: Later...

    NATS->>+PS: Deliver job
    PS-->>PS: Load model and perform inference
    PS-->>PS: (Future) Store result in DB with Job ID
    PS-->>-NATS: Acknowledge job completion

This design isolates concerns effectively. The Haskell function is a lightweight, fast, and secure API layer. The Python function is a heavyweight, compute-bound worker. OpenFaaS and NATS provide the connective tissue, turning a blocking operation into a resilient, asynchronous workflow.

The OpenFaaS Stack Definition

The foundation of any OpenFaaS project is the stack.yml file. It declaratively defines the functions, their runtimes, resource requirements, and environment. A common mistake in polyglot projects is to treat this file as a simple list. In a real-world project, it’s an architectural document that codifies the boundaries between services.

Here, we define two distinct functions: haskell-orchestrator and python-summarizer.

version: 1.0
provider:
  name: openfaas
  gateway: http://127.0.0.1:8080

functions:
  haskell-orchestrator:
    lang: haskell-http
    handler: ./haskell-orchestrator
    image: your-docker-hub-id/haskell-orchestrator:latest
    labels:
      com.openfaas.scale.min: 1
      com.openfaas.scale.max: 5
    environment:
      # The internal gateway URL for function-to-function communication
      OPENFAAS_INTERNAL_GATEWAY: http://gateway.openfaas:8080
      # The name of the downstream function to invoke
      SUMMARIZER_FUNCTION_NAME: python-summarizer
    requests:
      cpu: 100m
      memory: 128Mi
    limits:
      cpu: 200m
      memory: 256Mi

  python-summarizer:
    lang: python3-http
    handler: ./python-summarizer
    image: your-docker-hub-id/python-summarizer:latest
    labels:
      com.openfaas.scale.min: 0 # Allow scaling to zero to save resources
      com.openfaas.scale.max: 3
      com.openfaas.queue-worker.workers: 2 # Number of concurrent jobs per replica
    environment:
      # Hugging Face cache configuration to avoid re-downloading models in the container
      TRANSFORMERS_CACHE: /tmp/transformers_cache
      HF_HOME: /tmp/huggingface
    requests:
      cpu: 500m
      memory: 2Gi # ML models are memory-intensive
    limits:
      cpu: 1000m
      memory: 3Gi

The critical details here are the resource requests and limits. The Haskell function is lightweight, while the python-summarizer is allocated significantly more memory and CPU. This independent resource allocation is a primary benefit of this architecture. Furthermore, python-summarizer is configured with scale.min: 0, allowing it to scale to zero when idle, a crucial cost-saving measure for bursty inference workloads. The queue-worker.workers label controls how many concurrent jobs a single pod will process from the NATS queue, enabling fine-grained control over concurrency.

The Python Summarizer: A Production-Ready Inference Worker

The Python function’s role is singular: perform summarization. The pitfall here is naive implementation. A “hello world” version might load the model inside the handler function, leading to disastrous performance due to repeated model loading on every invocation. Production-grade code must treat the function instance as a long-lived process, loading the model once at startup.

First, the dependencies in python-summarizer/requirements.txt:

# Use specific versions for reproducible builds
# handler-proxy provides the HTTP server for the function
handler-proxy==0.3.2
# The core ML libraries
transformers==4.34.0
torch==2.1.0
sentencepiece==0.1.99

Now, the core logic in python-summarizer/handler.py. This implementation includes a singleton pattern for the model and tokenizer to ensure they are loaded only once per container instance.

import os
from http import HTTPStatus

from transformers import pipeline, T5ForConditionalGeneration, T5Tokenizer

class SummarizationModel:
    """
    Singleton class to hold the loaded model and tokenizer.
    This ensures the model is loaded only once per function replica,
    avoiding massive overhead on each invocation.
    """
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            print("Model instance not found. Initializing...")
            cls._instance = super(SummarizationModel, cls).__new__(cls)
            
            model_name = "t5-small"
            cache_dir = os.getenv("TRANSFORMERS_CACHE", "/tmp/model_cache")
            
            try:
                # In a production scenario, the model should be baked into the Docker image
                # or loaded from a persistent, shared volume. Downloading on first run
                # can lead to long cold starts.
                print(f"Loading tokenizer: {model_name} into {cache_dir}")
                tokenizer = T5Tokenizer.from_pretrained(model_name, cache_dir=cache_dir)
                
                print(f"Loading model: {model_name} into {cache_dir}")
                model = T5ForConditionalGeneration.from_pretrained(model_name, cache_dir=cache_dir)
                
                cls._instance.summarizer = pipeline(
                    "summarization",
                    model=model,
                    tokenizer=tokenizer,
                    framework="pt" # pt for PyTorch
                )
                print("Model initialization complete.")
            except Exception as e:
                # If model loading fails, the container is useless.
                # We log the error and store None to signal failure.
                print(f"FATAL: Failed to load model '{model_name}': {e}")
                cls._instance.summarizer = None
        return cls._instance

# Initialize the model when the Python module is first imported.
# This happens during the function's cold start.
model_singleton = SummarizationModel()

def handle(req: bytes) -> tuple:
    """
    Handles an incoming request from the OpenFaaS gateway.
    
    Args:
        req: The raw request body as bytes.
    
    Returns:
        A tuple containing (status_code, response_body).
    """
    if model_singleton.summarizer is None:
        return (HTTPStatus.INTERNAL_SERVER_ERROR, "Model is not available. Check function logs for initialization errors.")

    input_text = req.decode('utf-8')
    
    if not input_text or len(input_text.strip()) < 50:
        return (HTTPStatus.BAD_REQUEST, "Input text must be at least 50 characters long.")

    try:
        # These parameters are important for controlling output
        # in a production setting.
        summary = model_singleton.summarizer(
            input_text,
            max_length=150,
            min_length=30,
            do_sample=False
        )
        
        if not summary or 'summary_text' not in summary[0]:
             raise ValueError("Model did not return a valid summary.")

        result = summary[0]['summary_text']
        print(f"Successfully summarized input of length {len(input_text)} to {len(result)}")
        
        return (HTTPStatus.OK, result)

    except Exception as e:
        print(f"Error during summarization: {e}")
        # In a real system, you would add more structured logging here.
        return (HTTPStatus.INTERNAL_SERVER_ERROR, f"An error occurred during processing: {e}")

This code is significantly more robust than a simple example. It explicitly handles model loading failures, validates input, and includes error handling for the inference step itself. This is the level of detail required for a production system.

The Haskell Orchestrator: Type-Safe Request Handling and Invocation

The Haskell function is the brains of the operation. Its responsibilities are to expose a clean API, enforce data contracts, and correctly trigger the asynchronous downstream workflow. We use the aeson library for JSON serialization/deserialization, which provides compile-time guarantees about the shape of our API data.

First, the dependencies in haskell-orchestrator/package.yaml:

name:                haskell-orchestrator
version:             0.1.0.0
# ... other boilerplate ...

dependencies:
- base >= 4.7 && < 5
- http-server
- aeson
- bytestring
- text
- wreq # For making HTTP requests
- containers
- lens
- lens-aeson
- lifted-async
- mtl
- stm
- unordered-containers
- vector
- system-environment # To safely get environment variables

executables:
  haskell-orchestrator:
    main:                Main.hs
    source-dirs:         app
    ghc-options:
    - -threaded
    - -rtsopts
    - -with-rtsopts=-N

Next, the haskell-orchestrator/app/Main.hs. This code defines the data types, the main handler, and the logic for making the asynchronous call to the Python function.

{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}

module Main where

import GHC.Generics
import Data.Aeson (FromJSON, ToJSON, encode, eitherDecode)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Text as T
import Network.Wreq (post, Options, defaults, header, responseStatus, statusCode, checkResponse)
import Control.Lens ((&), (.~), (^.))
import System.Environment.Guard (getEnvOr)
import Network.HTTP.Types.Status (status202)
import System.IO (hPutStrLn, stderr, stdout)

-- Data type for the incoming request from the client
data SummarizeRequest = SummarizeRequest {
  textToSummarize :: T.Text
} deriving (Show, Generic)

instance FromJSON SummarizeRequest

-- Data type for the response sent back to the client
data JobSubmissionResponse = JobSubmissionResponse {
  jobId :: T.Text,
  status :: T.Text,
  message :: T.Text
} deriving (Show, Generic)

instance ToJSON JobSubmissionResponse

-- Configuration loaded from environment variables
data AppConfig = AppConfig {
  gatewayUrl :: String,
  summarizerFn :: String
}

-- The core handler logic
handler :: AppConfig -> LBS.ByteString -> IO LBS.ByteString
handler config requestBody = do
  hPutStrLn stdout "Haskell orchestrator invoked."
  case eitherDecode requestBody :: Either String SummarizeRequest of
    Left err -> do
      hPutStrLn stderr $ "Failed to decode request: " ++ err
      -- A common mistake is returning a simple string. Always return structured JSON errors.
      return $ encode $ JobSubmissionResponse "" "error" "Invalid request body format."

    Right req -> do
      let inputText = textToSummarize req
      
      -- Basic validation in the orchestrator
      if T.length inputText < 50
      then do
        hPutStrLn stderr "Input text too short."
        return $ encode $ JobSubmissionResponse "" "error" "Input text must be at least 50 characters."
      else do
        hPutStrLn stdout $ "Submitting job for text of length: " ++ show (T.length inputText)
        -- The crucial part: perform the async invocation
        let asyncUrl = gatewayUrl config ++ "/async-function/" ++ summarizerFn config
        
        -- In a production system, you'd add retry logic and more robust error handling here.
        -- The wreq library simplifies HTTP requests significantly.
        let opts = defaults & checkResponse .~ Just (const . const $ return ())
        
        resp <- post opts asyncUrl (textToSummarize req)

        let status = resp ^. responseStatus . statusCode
        
        -- OpenFaaS returns a 202 Accepted for successful async invocations.
        if status == 202
        then do
          hPutStrLn stdout "Successfully submitted job to python-summarizer."
          -- The X-Call-Id header can be used as a pseudo job ID
          let callId = LBS.toStrict $ head $ resp ^. Network.Wreq.responseHeader "X-Call-Id"
          return $ encode $ JobSubmissionResponse (T.pack $ show callId) "submitted" "Summarization job accepted for processing."
        else do
          hPutStrLn stderr $ "Failed to invoke summarizer function. Status: " ++ show status
          return $ encode $ JobSubmissionResponse "" "error" "Failed to submit job to processing backend."

main :: IO ()
main = do
  -- Loading configuration at startup is critical.
  -- Don't read environment variables inside the handler loop.
  gateway <- getEnvOr "OPENFAAS_INTERNAL_GATEWAY" "http://gateway.openfaas:8080"
  summarizer <- getEnvOr "SUMMARIZER_FUNCTION_NAME" "python-summarizer"
  let config = AppConfig gateway summarizer
  
  -- The OpenFaaS Haskell template provides this simple server loop.
  -- It reads from stdin and writes to stdout.
  hPutStrLn stdout "Haskell orchestrator is ready."
  LBS.interact (handler config)

This Haskell code demonstrates several best practices. Configuration is loaded once at startup. Incoming JSON is decoded into a strongly typed record, preventing a whole class of runtime errors. The function makes an explicit asynchronous call to the downstream service using the /async-function/ URL prefix provided by OpenFaaS. It correctly checks for the 202 Accepted status code, which confirms that the job has been successfully queued in NATS.

The User Interface: A Simple Shadcn UI Client

To complete the end-to-end flow, we need a simple client. A Next.js application with Shadcn UI components provides a clean and modern interface. This is not a deep dive into front-end development, but showing the client code is essential to illustrate how the backend architecture is consumed.

Here is a simplified React component SummarizerForm.tsx:

'use client';

import { useState } from 'react';
import { Button } from '@/components/ui/button';
import { Textarea } from '@/components/ui/textarea';
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card';
import { Alert, AlertDescription, AlertTitle } from '@/components/ui/alert';

type JobStatus = 'idle' | 'loading' | 'submitted' | 'error';

interface ApiResponse {
  jobId?: string;
  status: string;
  message: string;
}

export function SummarizerForm() {
  const [text, setText] = useState('');
  const [status, setStatus] = useState<JobStatus>('idle');
  const [responseMessage, setResponseMessage] = useState('');

  const handleSubmit = async (event: React.FormEvent) => {
    event.preventDefault();
    setStatus('loading');
    setResponseMessage('');

    try {
      // In a real app, this URL would come from environment variables.
      const res = await fetch('/api/summarize', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ textToSummarize: text }),
      });

      const data: ApiResponse = await res.json();

      if (!res.ok || data.status === 'error') {
        throw new Error(data.message || 'An unknown error occurred.');
      }
      
      setStatus('submitted');
      setResponseMessage(`Job ${data.jobId} submitted successfully. The result will be available shortly.`);

    } catch (err: any) {
      setStatus('error');
      setResponseMessage(err.message);
    }
  };

  return (
    <Card className="w-full max-w-2xl">
      <CardHeader>
        <CardTitle>Text Summarization Service</CardTitle>
        <CardDescription>
          Enter text below to submit it for asynchronous summarization.
        </CardDescription>
      </CardHeader>
      <CardContent>
        <form onSubmit={handleSubmit}>
          <Textarea
            placeholder="Paste your long text here..."
            value={text}
            onChange={(e) => setText(e.target.value)}
            rows={10}
            disabled={status === 'loading'}
          />
          <Button type="submit" className="mt-4" disabled={status === 'loading'}>
            {status === 'loading' ? 'Submitting...' : 'Submit for Summarization'}
          </Button>
        </form>

        {status === 'submitted' && (
          <Alert variant="default" className="mt-4">
            <AlertTitle>Success</AlertTitle>
            <AlertDescription>{responseMessage}</AlertDescription>
          </Alert>
        )}

        {status === 'error' && (
          <Alert variant="destructive" className="mt-4">
            <AlertTitle>Error</AlertTitle>
            <AlertDescription>{responseMessage}</AlertDescription>
          </Alert>
        )}
      </CardContent>
    </Card>
  );
}

This component calls a Next.js API route (/api/summarize) which in turn would proxy the request to the haskell-orchestrator function’s public URL. This demonstrates the full, end-to-end flow from the user’s browser to the asynchronous backend.

Limitations and Future Iterations

This architecture, while resilient, is not complete. The most glaring omission is the result retrieval mechanism. The current implementation is “fire and forget”; the client knows a job was submitted but has no way to get the final summary. A production system would require a second component:

  1. Result Storage: The python-summarizer function, upon completion, would need to write the result to a persistent store (e.g., Redis, PostgreSQL, or S3) against the jobId.
  2. Result Endpoint: A new HTTP endpoint, likely on the Haskell orchestrator, would be needed (e.g., GET /job-status/{jobId}). The client would poll this endpoint to check for job completion and retrieve the result. Alternatively, for a more advanced solution, WebSockets could be used to push the result to the client once it’s ready.

Another area for improvement is observability. While we have basic logging, true production readiness demands distributed tracing. Using a standard like OpenTelemetry, we could inject a trace context at the Haskell function, propagate it through NATS, and pick it up in the Python function. This would allow us to visualize the entire lifecycle of a request across all components, making debugging complex failures tractable.

Finally, the model loading in the Python function still presents a cold start challenge. While caching helps, the initial download and loading can take time. For applications requiring lower latency, strategies like pre-warmed instances or baking the model directly into a custom Docker image from a private registry would be necessary optimizations. The current approach strikes a balance between simplicity and performance suitable for many non-real-time workloads.


  TOC