A Persistent State Machine Architecture for Asynchronous OpenCV Processing with JPA and Algolia


The initial implementation of our media asset pipeline was a single, monolithic Java controller method. An image would be uploaded, and the request thread would block while we performed validation, resizing, feature extraction with OpenCV, and finally, updated a database record. This worked for a dozen assets but failed catastrophically under any real load. A single corrupt image file could crash an OpenCV native process, bringing down the JVM. A network blip during database insertion would require a full re-upload and re-processing. We had no visibility into failures and no way to resume an interrupted job. It was a classic example of a brittle, synchronous design buckling under asynchronous, real-world conditions.

Our second attempt had to be built on a foundation of resilience and observability. The core conceptual shift was to model the entire asset lifecycle not as a linear script, but as a formal Finite State Machine (FSM). Every asset processing job would be an instance of this machine, and its state would be durably persisted at every step. This approach would allow any worker to crash and restart, pick up the job from its last known state, and continue, providing the durability we desperately needed.

For this, we settled on a somewhat unconventional but highly effective stack. We chose XState to formally define our statechart. Its language-agnostic nature (JSON definition) and rigorous adherence to state machine theory provided the clarity and testability our chaotic workflow lacked. To persist the state of these machines, we used JPA/Hibernate backed by a PostgreSQL database, a robust and familiar tool in our Java ecosystem. The heavy-lifting of image analysis remained with OpenCV, but it would now be invoked as a discrete, idempotent service within a state transition. Finally, once all metadata was extracted, it would be pushed to Algolia to provide the fast, typo-tolerant search experience our users required. This is the post-mortem of building that system.

The State Machine Definition: Our Single Source of Truth

Before writing a single line of Java, we defined the entire processing flow in an XState machine. This forced us to think through every possible success and failure path explicitly. In a real-world project, this definition becomes the contract between different services and teams.

The pitfall here is to create a state machine that is too granular, leading to a state explosion. We focused on meaningful, coarse-grained business states that represented significant milestones in the asset’s lifecycle.

// machine-definition.js
// This file defines the logic of our processing pipeline.
// It is consumed by our Java orchestrator to drive the process.
import { createMachine } from 'xstate';

export const assetProcessingMachine = createMachine({
  id: 'assetProcessing',
  initial: 'awaitingUpload',
  // The context stores all relevant data for a single processing job.
  context: {
    assetId: null,
    s3Bucket: null,
    s3Key: null,
    validationErrors: [],
    extractedMetadata: null,
    algoliaObjectId: null,
    retries: 0,
    errorMessage: null,
  },
  states: {
    awaitingUpload: {
      on: {
        UPLOAD_INITIATED: {
          target: 'validating',
          actions: ['assignAssetDetails']
        }
      }
    },
    validating: {
      invoke: {
        id: 'validationService',
        src: 'runValidation', // This string maps to a Java service implementation
        onDone: {
          target: 'processing',
        },
        onError: {
          target: 'validationFailed',
          actions: ['assignValidationError']
        }
      }
    },
    processing: {
      invoke: {
        id: 'opencvService',
        src: 'runOpenCVAnalysis',
        onDone: {
          target: 'indexing',
          actions: ['assignExtractedMetadata']
        },
        onError: {
          target: 'processingFailed',
          actions: ['assignProcessingError']
        }
      }
    },
    indexing: {
      invoke: {
        id: 'algoliaService',
        src: 'runAlgoliaIndexing',
        onDone: {
          target: 'completed',
          actions: ['assignAlgoliaObjectId']
        },
        onError: {
          target: 'indexingFailed',
          actions: ['assignIndexingError']
        }
      }
    },
    completed: {
      type: 'final'
    },
    // Failure states
    validationFailed: {
      type: 'final'
    },
    processingFailed: {
      on: {
        RETRY: [
          {
            target: 'processing',
            cond: 'canRetry',
            actions: ['incrementRetryCount']
          },
          { target: 'permanentlyFailed' }
        ]
      }
    },
    indexingFailed: {
      on: {
        RETRY: [
          {
            target: 'indexing',
            cond: 'canRetry',
            actions: ['incrementRetryCount']
          },
          { target: 'permanentlyFailed' }
        ]
      }
    },
    permanentlyFailed: {
      type: 'final'
    }
  }
}, {
  actions: {
    assignAssetDetails: (context, event) => {
      context.assetId = event.data.assetId;
      context.s3Bucket = event.data.s3Bucket;
      context.s3Key = event.data.s3Key;
    },
    assignValidationError: (context, event) => {
      context.errorMessage = "Validation failed: " + event.data.message;
    },
    assignExtractedMetadata: (context, event) => {
      context.extractedMetadata = event.data;
    },
    assignProcessingError: (context, event) => {
      context.errorMessage = "OpenCV processing failed: " + event.data.message;
    },
    assignAlgoliaObjectId: (context, event) => {
      context.algoliaObjectId = event.data.objectID;
    },
    assignIndexingError: (context, event) => {
      context.errorMessage = "Algolia indexing failed: " + event.data.message;
    },
    incrementRetryCount: (context) => {
      context.retries++;
    },
  },
  guards: {
    canRetry: (context) => context.retries < 3
  }
});

Key decisions in this definition:

  1. Invoked Services: States like validating, processing, and indexing are not static. They invoke asynchronous services. XState manages the onDone and onError transitions automatically, which maps perfectly to our distributed task execution model.
  2. Explicit Failure States: We don’t just have one failed state. We have validationFailed, processingFailed, and indexingFailed. This is critical for diagnostics and for implementing targeted recovery logic.
  3. Retry Logic: The processingFailed state isn’t final. It can receive a RETRY event. A guard condition (canRetry) prevents infinite retry loops, transitioning to permanentlyFailed after a set number of attempts. This logic is now declarative and lives with the state definition, not scattered in imperative code.

Persisting the Machine State with JPA

The XState definition is pure logic. To make it durable, we needed a way to save the current state and context of each processing job. This is where JPA and Hibernate come in. We created a single entity, AssetProcessingJob, to represent an instance of the state machine.

A common mistake is to try and model the entire state machine structure in the database with multiple tables. This is overly complex. The reality is that we only need to store the current state value and the machine’s context blob.

// AssetProcessingJob.java
package com.example.pipeline.persistence;

import io.hypersistence.utils.hibernate.type.json.JsonType;
import jakarta.persistence.*;
import org.hibernate.annotations.Type;

import java.time.Instant;
import java.util.UUID;
import java.util.Map;

@Entity
@Table(name = "asset_processing_jobs")
public class AssetProcessingJob {

    @Id
    @GeneratedValue(strategy = GenerationType.UUID)
    private UUID id;

    @Column(name = "asset_id", nullable = false, unique = true)
    private String assetId;

    @Column(name = "current_state", nullable = false, length = 50)
    private String currentState;

    // We use the hibernate-types library to map a JSON string to a Map<String, Object>
    // This is crucial for storing the dynamic XState context.
    // For PostgreSQL, this maps to a JSONB column.
    @Type(JsonType.class)
    @Column(name = "state_context", columnDefinition = "jsonb")
    private Map<String, Object> stateContext;

    @Column(name = "created_at", nullable = false, updatable = false)
    private Instant createdAt = Instant.now();

    @Column(name = "updated_at", nullable = false)
    private Instant updatedAt = Instant.now();

    @Version
    private int version; // For optimistic locking

    @PreUpdate
    protected void onUpdate() {
        this.updatedAt = Instant.now();
    }

    // Getters and Setters omitted for brevity
}

This entity is the linchpin of the system’s durability.

  • currentState: A simple String that holds the current XState node (e.g., "processing", "indexingFailed").
  • stateContext: This is the most critical field. It’s a JSONB column that stores the entire XState context object as a serialized string. Using the hibernate-types library provides seamless mapping between the database JSON and a Java Map<String, Object>.
  • version: We include a version field for optimistic locking. This prevents race conditions where two workers might try to process and update the same job simultaneously. If a worker tries to save a job with an old version number, Hibernate will throw an OptimisticLockException, and we can handle it by reloading the job and retrying the transition.

The Orchestrator: Bridging XState and Java

With the state definition and persistence layer in place, we needed the “brain” of the system: an orchestrator service that could load a job, interpret the state machine, execute the required side effects (like calling OpenCV), and save the new state.

This is where the system gets interesting. We are not running a full Node.js environment. Instead, our Java service interprets the XState machine’s definition to determine the next action. For this, we built a simple Java-based state machine interpreter that consumes the JSON definition.

// StateMachineOrchestrator.java
package com.example.pipeline.orchestration;

import com.example.pipeline.persistence.AssetProcessingJob;
import com.example.pipeline.persistence.JobRepository;
import com.example.pipeline.services.OpenCVService;
import com.example.pipeline.services.AlgoliaService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Map;

// A simplified representation of an XState machine transition result
record TransitionResult(String nextState, Map<String, Object> newContext) {}

@Service
public class StateMachineOrchestrator {

    private static final Logger logger = LoggerFactory.getLogger(StateMachineOrchestrator.class);

    private final JobRepository jobRepository;
    private final XStateInterpreter xstateInterpreter; // A custom interpreter for the XState JSON
    private final ServiceRegistry serviceRegistry;   // Maps service names from XState to Java beans
    private final ObjectMapper objectMapper;

    public StateMachineOrchestrator(/*...inject dependencies...*/) { /*...*/ }

    @Transactional
    public void triggerEvent(UUID jobId, String eventType, Map<String, Object> eventData) {
        AssetProcessingJob job = jobRepository.findById(jobId)
            .orElseThrow(() -> new IllegalArgumentException("Job not found: " + jobId));

        logger.info("Job {} found in state '{}'. Triggering event '{}'", jobId, job.getCurrentState(), eventType);

        // 1. Calculate the next state based on the current state and event
        TransitionResult transition = xstateInterpreter.transition(
            job.getCurrentState(),
            job.getStateContext(),
            eventType,
            eventData
        );

        // Update the job with the new state and context
        job.setCurrentState(transition.nextState());
        job.setStateContext(transition.newContext());
        jobRepository.saveAndFlush(job); // Persist state change immediately

        logger.info("Job {} transitioned to state '{}'", jobId, job.getCurrentState());

        // 2. Check for any 'invoked services' in the new state
        String serviceToInvoke = xstateInterpreter.getServiceForState(transition.nextState());
        if (serviceToInvoke != null) {
            executeInvokedService(job, serviceToInvoke);
        }
    }

    private void executeInvokedService(AssetProcessingJob job, String serviceName) {
        logger.info("Job {} invoking service '{}'", job.getId(), serviceName);
        try {
            // ServiceRegistry maps "runOpenCVAnalysis" to the actual OpenCVService bean
            StatefulService service = serviceRegistry.getService(serviceName);
            
            // Execute the service. This can be asynchronous.
            // In a real system, this would likely submit a task to a thread pool or message queue.
            Object result = service.execute(job.getStateContext());

            // On success, trigger the 'onDone' event for the machine
            Map<String, Object> successEventData = Map.of("data", result);
            triggerEvent(job.getId(), "onDone", successEventData);

        } catch (Exception e) {
            logger.error("Service '{}' failed for job {}", serviceName, job.getId(), e);
            
            // On error, trigger the 'onError' event
            Map<String, Object> errorEventData = Map.of("data", Map.of("message", e.getMessage()));
            triggerEvent(job.getId(), "onError", errorEventData);
        }
    }
}

The flow is critical:

  1. Load and Transition: An event (e.g., triggered by an API call or a message queue consumer) calls triggerEvent. We load the job from the DB. The XStateInterpreter (a custom component not shown for brevity, but it’s essentially a logic engine that parses the JSON definition) computes the next state and context.
  2. Persist Immediately: We save the new state (processing) to the database within the same transaction before executing the long-running task. This is the key to durability. If the server crashes during the OpenCV analysis, on restart, the job is already correctly in the processing state, ready to be resumed.
  3. Execute Side-Effect: After persistence, we check if the new state requires an invoked service. If so, we look it up in a ServiceRegistry and execute it. The ServiceRegistry is a simple pattern that maps the string identifiers from the XState definition (like "runOpenCVAnalysis") to the actual Spring-managed beans (OpenCVService.class).
  4. Feedback Loop: The result of the service (success or failure) is not handled with a simple if/else. Instead, it is fed back into the state machine as a new event (onDone or onError). This closes the loop and lets XState’s logic decide the next transition (indexing or processingFailed), maintaining the FSM as the single source of truth for control flow.

The Heavy Lifting: OpenCV and Algolia Services

The services themselves are standard Java components, but they are designed to be idempotent and stateless. All the state they need is passed in via the stateContext map.

Here is a simplified version of the OpenCVService. In a production environment, this would have far more robust error handling, especially around the native library interactions.

// OpenCVService.java
package com.example.pipeline.services;

import org.opencv.core.Mat;
import org.opencv.core.MatOfRect;
import org.opencv.imgcodecs.Imgcodecs;
import org.opencv.objdetect.CascadeClassifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.io.InputStream;
import java.util.Map;
import java.util.List;
import java.util.stream.Collectors;

@Service("runOpenCVAnalysis") // Name matches the ID in the XState definition
public class OpenCVService implements StatefulService {
    
    private static final Logger logger = LoggerFactory.getLogger(OpenCVService.class);
    private final S3Downloader s3Downloader;
    private final CascadeClassifier faceDetector;

    public OpenCVService(S3Downloader s3Downloader) {
        this.s3Downloader = s3Downloader;
        // In a real app, this path would be configured and the file loaded robustly.
        // The XML file must be available to the application.
        System.loadLibrary(org.opencv.core.Core.NATIVE_LIBRARY_NAME);
        this.faceDetector = new CascadeClassifier("haarcascade_frontalface_alt.xml");
    }

    @Override
    public Object execute(Map<String, Object> context) {
        String bucket = (String) context.get("s3Bucket");
        String key = (String) context.get("s3Key");
        logger.info("Starting OpenCV analysis for s3://{}/{}", bucket, key);

        try (InputStream imageStream = s3Downloader.download(bucket, key)) {
            byte[] imageBytes = imageStream.readAllBytes();
            Mat image = Imgcodecs.imdecode(new org.opencv.core.MatOfByte(imageBytes), Imgcodecs.IMREAD_UNCHANGED);

            if (image.empty()) {
                throw new RuntimeException("Failed to decode image.");
            }
            
            MatOfRect faceDetections = new MatOfRect();
            faceDetector.detectMultiScale(image, faceDetections);
            
            List<Map<String, Integer>> faces = List.of(faceDetections.toArray()).stream()
                .map(rect -> Map.of("x", rect.x, "y", rect.y, "width", rect.width, "height", rect.height))
                .collect(Collectors.toList());
                
            logger.info("Detected {} faces in {}", faces.size(), key);

            // This map becomes the 'data' payload for the 'onDone' event.
            return Map.of(
                "faceCount", faces.size(),
                "faces", faces,
                "imageHeight", image.height(),
                "imageWidth", image.width()
            );

        } catch (Exception e) {
            // Any exception here will be caught by the orchestrator
            // and trigger the 'onError' transition.
            throw new RuntimeException("OpenCV processing failed", e);
        }
    }
}

The Algolia indexing service follows a similar pattern, taking the extractedMetadata from the context, formatting it into an Algolia record, and using the Algolia client to push it. Error handling for network calls is paramount here, as transient failures are common. The state machine’s retry logic handles these gracefully.

System Architecture Overview

This design effectively decouples the workflow logic from the implementation. The orchestrator is the heart, JPA is the memory, and the services are the hands.

graph TD
    A[API Gateway] -- 1. Upload Request --> B(Controller);
    B -- 2. Create Job Entity & Trigger UPLOAD_INITIATED --> C{StateMachineOrchestrator};
    C -- 3. Persist State (awaitingUpload -> validating) --> D[(PostgreSQL DB)];
    C -- 4. Invoke 'runValidation' --> E[ValidationService];
    E -- 5. Validation Result --> C;
    C -- 6. Persist State (validating -> processing) & Trigger onDone/onError --> D;
    C -- 7. Invoke 'runOpenCVAnalysis' --> F[OpenCVService];
    F -- Downloads image --> G[S3 Bucket];
    F -- 8. Analysis Result --> C;
    C -- 9. Persist State (processing -> indexing) & Trigger onDone/onError --> D;
    C -- 10. Invoke 'runAlgoliaIndexing' --> H[AlgoliaIndexingService];
    H -- 11. Pushes metadata --> I[(Algolia API)];
    H -- 12. Indexing Result --> C;
    C -- 13. Persist Final State (completed / failed) --> D;

    subgraph "Durable State Core"
        direction LR
        C <--> D
    end
    
    subgraph "Business Logic Workers"
        direction TB
        E
        F
        H
    end

Lingering Issues and Future Iterations

This architecture provides the resilience we needed, but it’s not without its own complexities and trade-offs. The current implementation of the StateMachineOrchestrator invokes services synchronously within its own execution thread. For a high-throughput system, this is a bottleneck. The logical next step is to replace the direct service execution with a message queue. When the orchestrator determines a service like runOpenCVAnalysis needs to run, it would publish a message to a RabbitMQ or Kafka topic. A separate pool of workers would consume these messages, execute the OpenCV logic, and then post the result back to another queue, which in turn triggers the onDone or onError event in the state machine. This would fully decouple the orchestration from the execution and allow for independent scaling of worker pools.

Furthermore, our custom XStateInterpreter in Java is functional but simplistic. It doesn’t support all the advanced features of XState, like parallel states or history states. A more robust implementation might involve using GraalVM to execute the official XState JavaScript library directly within the JVM, ensuring perfect compatibility with the statechart definition. This would eliminate any risk of behavioral drift between our definition and our Java interpretation.

Finally, while we have failure states, our recovery is limited to a generic retry. A more sophisticated implementation would have specific retry strategies based on the failure type. For example, an indexingFailed state due to a 4xx error from Algolia should not be retried, while a 5xx error should be. This logic could be encoded directly into the state machine, further solidifying it as the central nervous system of our entire processing pipeline.


  TOC