Implementing a Type-Safe Asynchronous NLP Pipeline with tRPC, Java, spaCy, and AWS SQS


The requirement was straightforward on the surface: provide an API endpoint that accepts a large body of text, performs complex Named Entity Recognition (NER), and returns the results. The immediate problem is that any meaningful NLP processing with a library like spaCy is too slow for a synchronous HTTP request-response cycle. A simple request can take anywhere from seconds to over a minute, leading to client timeouts and a terrible user experience. This immediately pushes the architecture towards an asynchronous, job-based pattern.

The complexity arose from our existing technical landscape. Our front-facing API layer is built on TypeScript with tRPC for end-to-end type safety, our core business logic and data persistence are managed by a suite of battle-tested Java services, and our data science team produces NLP models for Python’s spaCy ecosystem. A single-stack solution was never on the table. The challenge became one of orchestration: how to build a resilient, observable, and type-safe pipeline across three different language stacks, glued together by a message queue.

Our initial whiteboard sketch settled on AWS SQS as the asynchronous backbone. It’s managed, reliable, and simple enough for our decoupling needs. The flow would be:

  1. A TypeScript/tRPC server accepts the request, validates it, and immediately returns a jobId.
  2. In the background, it serializes the job details and pushes a message to an SQS queue.
  3. A pool of Python workers constantly polls this queue. Upon receiving a message, a worker performs the intensive spaCy processing.
  4. The Python worker, upon completion or failure, updates the job’s status via a dedicated Java state-management service.
  5. The original client can poll a separate tRPC endpoint using the jobId to get the status (PENDING, PROCESSING, COMPLETED, FAILED) and the final results.

This design uses each technology for its strength: tRPC for the type-safe API contract, Java for robust state management and persistence, Python for specialized NLP computation, and SQS for resilient decoupling.

sequenceDiagram
    participant Client
    participant TRPC_Server as tRPC BFF (TypeScript)
    participant AWSSQS as AWS SQS
    participant JavaService as Job Status Service (Java)
    participant PythonWorker as spaCy Worker (Python)

    Client->>+TRPC_Server: POST /submitJob (text: "...")
    TRPC_Server->>+JavaService: POST /jobs (Create job record)
    JavaService-->>-TRPC_Server: Response (jobId: "xyz-123", status: PENDING)
    TRPC_Server->>+AWSSQS: SendMessage({jobId, text})
    TRPC_Server-->>-Client: Response (jobId: "xyz-123")
    AWSSQS-->>-TRPC_Server: Ack

    loop Poll for messages
        PythonWorker->>+AWSSQS: ReceiveMessage
        AWSSQS-->>-PythonWorker: Message({jobId, text})
    end

    PythonWorker->>+JavaService: PUT /jobs/xyz-123 (status: PROCESSING)
    JavaService-->>-PythonWorker: Ack

    Note over PythonWorker: Perform heavy spaCy NLP processing...

    alt Processing Success
        PythonWorker->>+JavaService: PUT /jobs/xyz-123 (status: COMPLETED, results: [...])
        JavaService-->>-PythonWorker: Ack
        PythonWorker->>+AWSSQS: DeleteMessage
        AWSSQS-->>-PythonWorker: Ack
    else Processing Failure
        PythonWorker->>+JavaService: PUT /jobs/xyz-123 (status: FAILED, error: "...")
        JavaService-->>-PythonWorker: Ack
        Note over PythonWorker: Message moves to DLQ after visibility timeout expires.
        PythonWorker->>AWSSQS: (No DeleteMessage call)
    end


    loop Poll for status
        Client->>+TRPC_Server: GET /getJobStatus?jobId=xyz-123
        TRPC_Server->>+JavaService: GET /jobs/xyz-123
        JavaService-->>-TRPC_Server: Response ({status, results})
        TRPC_Server-->>-Client: Response ({status, results})
    end

Part 1: The tRPC API Entrypoint

The API layer, built with Node.js, Express, and tRPC, is the client-facing gatekeeper. Its primary responsibilities are validating input, creating the initial job record in the Java service, and enqueuing the task in SQS. A common mistake here is to enqueue the message before confirming the job record has been created in the persistent store. If the database call fails after the message is sent, you end up with an orphaned message in the queue that a worker might process without a corresponding state record.

First, let’s define the tRPC router. We use Zod for robust input validation.

packages/api/src/router.ts

import { initTRPC } from '@trpc/server';
import { z } from 'zod';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import axios from 'axios';
import { randomUUID } from 'crypto';

// Minimal context setup for demonstration
export const t = initTRPC.create();

const publicProcedure = t.procedure;
const router = t.router;

// Environment variables should be loaded securely (e.g., via dotenv)
const SQS_QUEUE_URL = process.env.SQS_QUEUE_URL!;
const JAVA_SERVICE_URL = process.env.JAVA_SERVICE_URL!;

// It's best practice to initialize clients outside the request path
const sqsClient = new SQSClient({
  region: process.env.AWS_REGION,
  // Credentials should be handled by the environment (e.g., IAM roles in production)
});

// A simple client for our Java service
const jobServiceClient = axios.create({
  baseURL: JAVA_SERVICE_URL,
  headers: { 'Content-Type': 'application/json' },
});

export const appRouter = router({
  submitJob: publicProcedure
    .input(
      z.object({
        text: z.string().min(10, 'Text must be at least 10 characters long.'),
      })
    )
    .mutation(async ({ input }) => {
      const jobId = randomUUID();

      // Step 1: Create the job record in the Java service FIRST.
      // This is the source of truth for the job's existence.
      try {
        await jobServiceClient.post('/jobs', {
          id: jobId,
          status: 'PENDING',
        });
      } catch (error) {
        console.error(`Failed to create job record for ${jobId}`, error);
        // Throwing here will propagate a proper tRPC error to the client
        throw new Error('Failed to initialize job processing.');
      }
      
      // Step 2: If and only if the record is created, enqueue the message.
      const command = new SendMessageCommand({
        QueueUrl: SQS_QUEUE_URL,
        MessageBody: JSON.stringify({
          jobId: jobId,
          textContent: input.text,
        }),
        // MessageGroupId is required for FIFO queues. 
        // We use it to ensure messages related to a job are processed in order, though not strictly necessary for this use case.
        MessageGroupId: 'nlp-jobs',
        // MessageDeduplicationId prevents duplicate messages within the 5-minute deduplication window.
        MessageDeduplicationId: jobId 
      });

      try {
        await sqsClient.send(command);
        console.log(`Successfully enqueued job ${jobId}`);
      } catch (error) {
        console.error(`Failed to enqueue job ${jobId}`, error);
        // If enqueuing fails, we have an orphan record in the DB.
        // A production system would need a cleanup mechanism (e.g., a reaper process for old PENDING jobs)
        // or attempt to update the job status to FAILED.
        await jobServiceClient.put(`/jobs/${jobId}`, {
            status: 'FAILED',
            result: JSON.stringify({ error: 'Failed to enqueue processing task.' })
        });
        throw new Error('Failed to enqueue job for processing.');
      }
      
      return { jobId };
    }),

  getJobStatus: publicProcedure
    .input(z.object({ jobId: z.string().uuid() }))
    .query(async ({ input }) => {
      try {
        const response = await jobServiceClient.get(`/jobs/${input.jobId}`);
        // The Java service returns the full job entity. We shape it for the client.
        return {
          jobId: response.data.id,
          status: response.data.status,
          result: response.data.result ? JSON.parse(response.data.result) : null,
          createdAt: response.data.createdAt,
          updatedAt: response.data.updatedAt,
        };
      } catch (error) {
        if (axios.isAxiosError(error) && error.response?.status === 404) {
          throw new Error('Job not found.');
        }
        console.error(`Failed to retrieve status for job ${input.jobId}`, error);
        throw new Error('Failed to retrieve job status.');
      }
    }),
});

export type AppRouter = typeof appRouter;

The key takeaway here is the transactional-like behavior we’re simulating. Create the state record, then dispatch the work. If dispatch fails, attempt to roll back the state. This prevents workers from picking up jobs that don’t officially exist.

Part 2: The Java State Management Service

This service is the system’s memory. It’s a standard Spring Boot application with a JPA entity for persistence. Its role is simple but critical: provide synchronous, consistent CRUD operations on the Job entity.

Job.java (JPA Entity)

package com.example.jobstatus;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import org.hibernate.annotations.CreationTimestamp;
import org.hibernate.annotations.UpdateTimestamp;

import java.time.Instant;

@Entity
@Table(name = "nlp_jobs")
public class Job {

    @Id
    @Column(nullable = false, updatable = false)
    private String id;

    @Column(nullable = false)
    private String status;

    @Column(columnDefinition = "TEXT") // Use TEXT for potentially large JSON results
    private String result;

    @CreationTimestamp
    @Column(updatable = false)
    private Instant createdAt;

    @UpdateTimestamp
    private Instant updatedAt;

    // Getters and Setters omitted for brevity
}

JobController.java (REST Controller)

package com.example.jobstatus;

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.server.ResponseStatusException;

@RestController
@RequestMapping("/jobs")
public class JobController {

    private final JobRepository jobRepository;

    public JobController(JobRepository jobRepository) {
        this.jobRepository = jobRepository;
    }

    @PostMapping
    public ResponseEntity<Job> createJob(@RequestBody JobRequest.Create createRequest) {
        if (jobRepository.existsById(createRequest.id())) {
             throw new ResponseStatusException(HttpStatus.CONFLICT, "Job with this ID already exists.");
        }
        Job newJob = new Job();
        newJob.setId(createRequest.id());
        newJob.setStatus(createRequest.status());
        Job savedJob = jobRepository.save(newJob);
        return ResponseEntity.status(HttpStatus.CREATED).body(savedJob);
    }

    @GetMapping("/{id}")
    public ResponseEntity<Job> getJob(@PathVariable String id) {
        return jobRepository.findById(id)
                .map(ResponseEntity::ok)
                .orElse(ResponseEntity.notFound().build());
    }

    @PutMapping("/{id}")
    public ResponseEntity<Job> updateJob(@PathVariable String id, @RequestBody JobRequest.Update updateRequest) {
        Job job = jobRepository.findById(id)
                .orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND, "Job not found"));

        job.setStatus(updateRequest.status());
        if (updateRequest.result() != null) {
            job.setResult(updateRequest.result());
        }
        
        Job updatedJob = jobRepository.save(job);
        return ResponseEntity.ok(updatedJob);
    }
}

// Using Java 17 records for DTOs
class JobRequest {
    record Create(String id, String status) {}
    record Update(String status, String result) {}
}

This Java service is deliberately minimal. In a real-world project, it would include more robust validation, security (e.g., ensuring only authorized workers can update status), and potentially optimistic locking (@Version annotation on the entity) to handle concurrent update attempts on the same job, though that’s less likely in this specific SQS-based architecture.

Part 3: The Python spaCy Worker

This is where the heavy lifting happens. The worker is a long-running Python process that polls the SQS queue. A critical concept here is SQS’s “visibility timeout.” When a worker receives a message, SQS makes it invisible to other consumers for a configured duration. If the worker doesn’t explicitly delete the message within this timeout, SQS assumes the worker failed and makes the message visible again for another worker to process.

This is a double-edged sword. It provides at-least-once delivery guarantees but introduces the possibility of a message being processed multiple times if a worker is slow or crashes. This is why our state updates must be idempotent. Updating a job status from PROCESSING to COMPLETED multiple times is harmless.

We also configure a Dead-Letter Queue (DLQ). If a message fails processing a certain number of times, SQS will automatically move it to the DLQ. This prevents a “poison pill” message from blocking the queue indefinitely.

worker.py

import os
import time
import json
import logging
import boto3
import requests
import spacy

# --- Configuration ---
# Load from environment variables for production readiness
SQS_QUEUE_URL = os.environ.get("SQS_QUEUE_URL")
JAVA_SERVICE_URL = os.environ.get("JAVA_SERVICE_URL")
AWS_REGION = os.environ.get("AWS_REGION", "us-east-1")
POLL_INTERVAL_SECONDS = int(os.environ.get("POLL_INTERVAL_SECONDS", 5))
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()

# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Global Clients ---
# It's crucial to initialize clients and models outside the main loop
# to avoid re-initialization overhead on every poll.
try:
    sqs_client = boto3.client("sqs", region_name=AWS_REGION)
    # Load a medium-sized English model from spaCy.
    # This can take time and memory, so it's done once at startup.
    nlp_model = spacy.load("en_core_web_md")
    logging.info("spaCy model 'en_core_web_md' loaded successfully.")
except Exception as e:
    logging.critical(f"Failed to initialize clients or models: {e}")
    # Exit if essential components fail to load.
    exit(1)


def update_job_status(job_id, status, result=None):
    """Updates the job status via the Java service."""
    url = f"{JAVA_SERVICE_URL}/jobs/{job_id}"
    payload = {"status": status}
    if result:
        # A common pitfall is not handling complex data structures during serialization.
        # json.dumps is essential.
        payload["result"] = json.dumps(result)
    
    try:
        response = requests.put(url, json=payload, timeout=10)
        response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx)
        logging.info(f"Successfully updated job {job_id} to status {status}")
        return True
    except requests.exceptions.RequestException as e:
        logging.error(f"Failed to update job {job_id} status: {e}")
        return False

def process_message(message):
    """The core logic for processing a single SQS message."""
    receipt_handle = message['ReceiptHandle']
    
    try:
        body = json.loads(message['Body'])
        job_id = body.get('jobId')
        text_content = body.get('textContent')

        if not job_id or not text_content:
            logging.error(f"Invalid message format received: {body}")
            # Delete the malformed message to prevent it from blocking the queue.
            sqs_client.delete_message(QueueUrl=SQS_QUEUE_URL, ReceiptHandle=receipt_handle)
            return

        logging.info(f"Processing job {job_id}...")
        
        # First, update status to PROCESSING. This gives visibility into ongoing work.
        update_job_status(job_id, "PROCESSING")

        # --- The actual NLP work ---
        doc = nlp_model(text_content)
        entities = [{"text": ent.text, "label": ent.label_} for ent in doc.ents]
        
        # This could be any complex result structure.
        result_payload = {
            "entities": entities,
            "token_count": len(doc)
        }

        # --- Final status update ---
        if update_job_status(job_id, "COMPLETED", result_payload):
            # The final and most critical step: delete the message from the queue.
            # If this fails, the message will become visible again after the timeout,
            # leading to reprocessing. The idempotency of our Java service handles this gracefully.
            sqs_client.delete_message(QueueUrl=SQS_QUEUE_URL, ReceiptHandle=receipt_handle)
            logging.info(f"Job {job_id} completed and message deleted.")
        else:
            # If we fail to update the final status, we should NOT delete the message.
            # Let it time out and be reprocessed. This is a core resiliency pattern.
            logging.warning(f"Failed to record final status for job {job_id}. Message will be reprocessed.")

    except Exception as e:
        # Catch-all for any unexpected errors during processing.
        logging.error(f"An unexpected error occurred while processing message: {e}", exc_info=True)
        # Attempt to mark the job as FAILED.
        # If this fails, the message will timeout and retry, eventually landing in the DLQ.
        if 'job_id' in locals():
            update_job_status(job_id, "FAILED", {"error": str(e)})


def main_loop():
    """The main polling loop for the worker."""
    logging.info(f"Worker started. Polling SQS queue: {SQS_QUEUE_URL}")
    while True:
        try:
            response = sqs_client.receive_message(
                QueueUrl=SQS_QUEUE_URL,
                MaxNumberOfMessages=1, # Process one message at a time for simplicity
                WaitTimeSeconds=20, # Use long polling to reduce costs and empty receives
                AttributeNames=['All']
            )

            messages = response.get("Messages", [])
            if not messages:
                # No messages, continue loop
                continue

            for message in messages:
                process_message(message)

        except Exception as e:
            logging.critical(f"Error in main polling loop: {e}. Retrying in {POLL_INTERVAL_SECONDS}s...")
            time.sleep(POLL_INTERVAL_SECONDS)


if __name__ == "__main__":
    main_loop()

The worker’s logic is designed for resilience. It updates state proactively, handles failures gracefully, and relies on SQS features like visibility timeouts and long polling. The separation of update_job_status and process_message makes the code cleaner and easier to test.

Limitations and Future Iterations

This architecture provides a robust baseline for polyglot asynchronous processing. However, it’s not without its trade-offs and areas for improvement. The client-side polling for status is inefficient and can be replaced with a more advanced pattern. A WebSocket connection could be established, allowing the Java service to push status updates directly to the client in real-time upon receiving the update from the Python worker. This would create a more responsive user experience but adds complexity in managing persistent connections.

The communication from the Python worker and tRPC server back to the Java service is synchronous REST. In a high-throughput scenario, the Java service could become a bottleneck. These synchronous calls could be replaced with another SQS queue, turning the Java service into a consumer that processes a stream of status update events. This would further decouple the system but makes reasoning about the state of a job more complex, as updates become eventually consistent.

Finally, scaling the Python workers is straightforward if they are CPU-bound. However, if the NLP models require GPUs, scaling becomes a matter of managing expensive, specialized hardware. This would necessitate more sophisticated orchestration, perhaps using Kubernetes with GPU node pools and tools like KEDA (Kubernetes Event-driven Autoscaling) to scale worker deployments based on the SQS queue depth. The current design is a solid foundation, but scaling it to handle massive volume requires addressing these deeper infrastructure challenges.


  TOC