Building a Resilient Saga Pattern with AWS SQS and Local SQLite State Machines


A critical business process involving multi-step data processing across different domains was causing recurring data consistency headaches. The initial implementation was a monolithic synchronous API call that chained these steps together. When a step failed halfway through—due to a network glitch, a third-party service outage, or a simple bug—the system was left in a corrupt, intermediate state. Manual data cleanup was becoming a significant operational burden. The standard solution, a two-phase commit (2PC) protocol using a distributed transaction coordinator, was immediately discarded. In a real-world project, introducing that level of complexity and performance overhead for this specific workflow was unjustifiable. We needed a solution that was resilient but also simple to operate and reason about.

Our initial concept gravitated towards the Saga pattern. A saga is a sequence of local transactions where each transaction updates data within a single service and publishes a message or event to trigger the next transaction. If a local transaction fails, the saga executes a series of compensating transactions to undo the preceding transactions. This provides eventual consistency without the need for distributed locks, which perfectly suited our asynchronous, event-driven ethos. The core challenge then became: how do we reliably manage the state of each saga instance? A centralized database like PostgreSQL or MySQL felt like overkill and would introduce a single point of failure and a performance bottleneck. The services involved were designed to be lightweight and self-contained.

This led us to an unconventional but highly effective architecture: using AWS SQS for orchestration and a local, embedded SQLite database within our orchestrator service as a durable, transactional state machine. This approach kept each orchestrator instance autonomous, with zero external database dependencies for its core logic, drastically simplifying deployment and reducing operational cost. The entire system, from shared event definitions to service implementations and end-to-end tests, was housed in a monorepo to enforce consistency. To manage the complexity of the saga’s logic, we adopted Behaviour-Driven Development (BDD), defining the saga’s flow with Gherkin specifications that became our executable documentation.

Monorepo Structure and Shared Definitions

A key pitfall in any distributed system is managing the contracts between services. In a saga, these contracts are the events or commands passed via the message queue. A monorepo structure provides a straightforward solution. We used a simple directory structure to organize our services and shared code.

/
├── packages/
│   ├── event-schemas/         # Protobuf or JSON schema definitions for events
│   │   └── booking.proto
│   └── saga-contracts/        # Shared interfaces and constants for saga states
│       └── states.go
├── services/
│   ├── booking-orchestrator/  # The main saga orchestrator service
│   │   ├── main.go
│   │   ├── Dockerfile
│   │   └── internal/
│   │       ├── application/   # Core logic
│   │       ├── infrastructure/  # SQLite and SQS implementations
│   │       └── domain/          # Saga state machine
│   ├── payment-service/       # A participant service
│   │   └── ...
│   └── notification-service/  # Another participant service
│       └── ...
└── tests/
    └── bdd/
        ├── features/
        │   └── booking_saga.feature # Gherkin feature files
        └── step_definitions/
            └── booking_steps.go     # Go code implementing the steps

All event schemas are defined in a central packages/event-schemas directory. This ensures that the producer (e.g., the orchestrator) and the consumer (e.g., the payment service) always agree on the message format. A change to an event schema requires updating it in one place, and the build system can then trigger rebuilds for all affected services. This avoids the nightmarish versioning conflicts common in systems with separately versioned client libraries.

Defining the Saga Flow with BDD

Before writing a single line of implementation code for the orchestrator, we defined its behavior using Gherkin. This forced us to think through all possible paths, including failures and compensations, from a user’s perspective. It became the ultimate source of truth for the saga’s logic.

Here is a simplified booking_saga.feature file:

Feature: Booking Saga Execution
  As a system, I need to reliably orchestrate a booking process
  involving payment and notification, handling failures gracefully.

  Scenario: Successful booking
    Given a new booking request is received for user "user-123"
    When the booking saga is initiated
    Then a "ProcessPayment" command should be sent to the payment queue
    And the saga state should be "AWAITING_PAYMENT"
    When a "PaymentSuccessful" event is received from the payment service
    Then a "SendNotification" command should be sent to the notification queue
    And the saga state should be "AWAITING_NOTIFICATION"
    When a "NotificationSent" event is received from the notification service
    Then the saga state should be "COMPLETED"

  Scenario: Payment fails and saga is compensated
    Given a new booking request is received for user "user-456"
    When the booking saga is initiated
    Then a "ProcessPayment" command should be sent to the payment queue
    And the saga state should be "AWAITING_PAYMENT"
    When a "PaymentFailed" event is received from the payment service
    Then a "CancelBooking" compensating command should be sent
    And the saga state should be "COMPENSATED"
    And no "SendNotification" command should be sent

These scenarios are not mere documentation; they are executable tests. They drive the design of our state machine and provide a safety net against regressions.

The SQLite-Backed State Machine

The heart of the orchestrator is its ability to remember the state of every in-flight saga durably. SQLite is perfect for this. It’s a full-featured, file-based, ACID-compliant SQL database engine. By embedding it directly into our service binary, we eliminate network latency, external dependencies, and complex connection management.

The schema for our saga state tracking is simple and effective.

-- file: schema.sql
CREATE TABLE IF NOT EXISTS saga_instances (
    id TEXT PRIMARY KEY,
    name TEXT NOT NULL,
    current_step TEXT NOT NULL,
    current_state TEXT NOT NULL,
    payload BLOB NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX IF NOT EXISTS idx_saga_state ON saga_instances (current_state);
  • id: A unique identifier for this specific saga instance (e.g., a UUID).
  • name: The type of saga (e.g., “booking-saga”).
  • current_step & current_state: Tracks the position in the workflow (e.g., step ProcessPayment, state AWAITING_PAYMENT).
  • payload: A serialized representation (e.g., JSON or Protobuf) of the saga’s data, containing all information needed to resume or compensate.

Here’s a Go implementation of the repository layer that interacts with this SQLite database. Note the focus on transactional integrity. Any state update is performed within a SQL transaction.

// file: services/booking-orchestrator/internal/infrastructure/saga_repository.go
package infrastructure

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"log/slog"
	"time"

	"github.com/mattn/go-sqlite3" // SQLite driver
	_ "github.com/mattn/go-sqlite3"

	"booking-orchestrator/internal/domain"
)

// SQLiteSagaRepository manages saga state persistence in a SQLite database.
type SQLiteSagaRepository struct {
	db *sql.DB
}

// NewSQLiteSagaRepository initializes the repository and ensures the schema is created.
func NewSQLiteSagaRepository(dataSourceName string) (*SQLiteSagaRepository, error) {
	db, err := sql.Open("sqlite3", dataSourceName)
	if err != nil {
		return nil, fmt.Errorf("failed to open sqlite database: %w", err)
	}

	// Basic connection pool settings for SQLite
	db.SetMaxOpenConns(1)
	db.SetMaxIdleConns(1)
	db.SetConnMaxLifetime(0) // Connections are persistent

	if err := db.Ping(); err != nil {
		return nil, fmt.Errorf("failed to connect to sqlite database: %w", err)
	}

	// In a real-world project, you would use a migration tool.
	// For this example, we execute the schema directly.
	schemaSQL := `
    CREATE TABLE IF NOT EXISTS saga_instances (
        id TEXT PRIMARY KEY,
        name TEXT NOT NULL,
        current_step TEXT NOT NULL,
        current_state TEXT NOT NULL,
        payload BLOB NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );`
	if _, err := db.Exec(schemaSQL); err != nil {
		return nil, fmt.Errorf("failed to create schema: %w", err)
	}

	return &SQLiteSagaRepository{db: db}, nil
}

// Save persists the saga instance's state within a transaction.
// It handles both creation and updates.
func (r *SQLiteSagaRepository) Save(ctx context.Context, saga *domain.SagaInstance) error {
	tx, err := r.db.BeginTx(ctx, nil)
	if err != nil {
		return fmt.Errorf("failed to begin transaction: %w", err)
	}
	defer tx.Rollback() // Rollback is a no-op if the transaction is committed.

	// A common mistake is to not update the timestamp. This is critical for auditing.
	now := time.Now().UTC()
	
	// Use UPSERT (ON CONFLICT) for atomic create/update operation.
	// This is more robust than a separate SELECT then INSERT/UPDATE.
	query := `
    INSERT INTO saga_instances (id, name, current_step, current_state, payload, created_at, updated_at)
    VALUES (?, ?, ?, ?, ?, ?, ?)
    ON CONFLICT(id) DO UPDATE SET
        current_step = excluded.current_step,
        current_state = excluded.current_state,
        payload = excluded.payload,
        updated_at = excluded.updated_at;`

	_, err = tx.ExecContext(ctx, query,
		saga.ID,
		saga.Name,
		saga.CurrentStep,
		saga.CurrentState,
		saga.Payload,
		now, // Using the same timestamp for both insert and update case
		now,
	)

	if err != nil {
		var sqliteErr sqlite3.Error
		if errors.As(err, &sqliteErr) {
			// Handle specific SQLite errors if needed, e.g., constraint violations
			return fmt.Errorf("sqlite error on save: %w", sqliteErr)
		}
		return fmt.Errorf("failed to save saga instance: %w", err)
	}

	return tx.Commit()
}

// FindByID retrieves a saga instance by its unique ID.
func (r *SQLiteSagaRepository) FindByID(ctx context.Context, id string) (*domain.SagaInstance, error) {
	query := `SELECT id, name, current_step, current_state, payload, created_at, updated_at FROM saga_instances WHERE id = ?`
	row := r.db.QueryRowContext(ctx, query, id)

	var saga domain.SagaInstance
	var createdAt, updatedAt []byte // Read timestamps as raw bytes to handle formatting
	
	err := row.Scan(&saga.ID, &saga.Name, &saga.CurrentStep, &saga.CurrentState, &saga.Payload, &createdAt, &updatedAt)
	if err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return nil, domain.ErrSagaNotFound // A domain-specific error is better than a generic one.
		}
		return nil, fmt.Errorf("failed to find saga instance by id: %w", err)
	}
	
	// Parse timestamps manually for robustness
	saga.CreatedAt, _ = time.Parse(time.RFC3339, string(createdAt))
	saga.UpdatedAt, _ = time.Parse(time.RFC3339, string(updatedAt))

	return &saga, nil
}

Orchestration Logic and SQS Integration

The orchestrator’s primary job is to listen for events, update the saga’s state in SQLite, and dispatch the next command to SQS.

A crucial aspect of working with SQS is handling message idempotency. SQS provides at-least-once delivery, meaning a message might be delivered more than once. Our application logic must be resilient to this. The SQLite state machine is the key. When a message arrives, the orchestrator first loads the saga’s state. If the event has already been processed (e.g., the state is already past the one this event would trigger), the orchestrator simply acknowledges and deletes the message from the queue without re-processing it.

Here’s the core logic for handling a reply message from a participant.

// file: services/booking-orchestrator/internal/application/saga_handler.go
package application

import (
	"context"
	"encoding/json"
	"log/slog"

	"booking-orchestrator/internal/domain"
)

// Message represents a simplified incoming message from SQS.
type Message struct {
	Type      string          `json:"type"`
	SagaID    string          `json:"sagaId"`
	Payload   json.RawMessage `json:"payload"`
}

// SagaOrchestrator contains the business logic for the saga.
type SagaOrchestrator struct {
	repo       domain.SagaRepository
	dispatcher domain.CommandDispatcher // Interface to send commands to SQS
	logger     *slog.Logger
}

// HandleMessage is the entry point for processing an event from a participant.
func (o *SagaOrchestrator) HandleMessage(ctx context.Context, msg Message) error {
	saga, err := o.repo.FindByID(ctx, msg.SagaID)
	if err != nil {
		// If saga is not found, it might be a severe error or a very delayed message.
		// A common mistake is to just fail. We should log this and potentially move to a DLQ.
		o.logger.Error("Saga instance not found for incoming message", "sagaID", msg.SagaID, "error", err)
		return err
	}

	// This is the core state machine logic.
	// In a real system, this would be more complex, likely using a state pattern.
	switch saga.CurrentState {
	case domain.StateAwaitingPayment:
		if msg.Type == "PaymentSuccessful" {
			return o.handlePaymentSuccess(ctx, saga)
		} else if msg.Type == "PaymentFailed" {
			return o.handlePaymentFailure(ctx, saga)
		}
	case domain.StateAwaitingNotification:
		if msg.Type == "NotificationSent" {
			return o.handleNotificationSuccess(ctx, saga)
		}
		// ... handle other failure cases
	default:
		// Idempotency check: if we receive a message for a saga that is already
		// completed or in a different state, we log it and ignore it.
		o.logger.Info("Received message for saga in unexpected state", "sagaID", saga.ID, "currentState", saga.CurrentState, "messageType", msg.Type)
		return nil // Return nil to acknowledge the message
	}

	return nil
}

func (o *SagaOrchestrator) handlePaymentSuccess(ctx context.Context, saga *domain.SagaInstance) error {
	o.logger.Info("Payment successful, proceeding to notification", "sagaID", saga.ID)
	
	// 1. Update state locally
	saga.CurrentStep = "SendNotification"
	saga.CurrentState = domain.StateAwaitingNotification
	if err := o.repo.Save(ctx, saga); err != nil {
		o.logger.Error("Failed to save saga state after payment success", "sagaID", saga.ID, "error", err)
		return err // Returning an error will cause SQS to retry the message
	}
	
	// 2. Dispatch next command
	cmd := domain.SendCommand{Queue: "notification_queue", Payload: saga.Payload}
	if err := o.dispatcher.Dispatch(ctx, cmd); err != nil {
		o.logger.Error("Failed to dispatch notification command", "sagaID", saga.ID, "error", err)
		// This is a critical failure point. If the state is saved but the command fails
		// to dispatch, the saga is stuck. A background recovery job might be needed.
		return err
	}

	return nil
}

func (o *SagaOrchestrator) handlePaymentFailure(ctx context.Context, saga *domain.SagaInstance) error {
	o.logger.Warn("Payment failed, starting compensation", "sagaID", saga.ID)

	// 1. Update state to reflect compensation
	saga.CurrentStep = "CancelBooking"
	saga.CurrentState = domain.StateCompensating
	if err := o.repo.Save(ctx, saga); err != nil {
		return err
	}

	// 2. Dispatch compensating command
	cmd := domain.SendCommand{Queue: "booking_compensation_queue", Payload: saga.Payload}
	if err := o.dispatcher.Dispatch(ctx, cmd); err != nil {
		return err
	}

	// After dispatching, we can move the saga to its final compensated state.
	saga.CurrentState = domain.StateCompensated
	return o.repo.Save(ctx, saga)
}

// ... other handlers

A critical piece of infrastructure is robust configuration of AWS SQS itself, especially Dead-Letter Queues (DLQs). If a message consistently fails processing due to a bug, it shouldn’t be retried indefinitely, as this can poison the queue. After a configured number of retries, SQS automatically moves the message to a DLQ. This isolates the problematic message and allows developers to inspect it for debugging without halting the processing of valid messages. Setting up a DLQ is not optional; it’s a fundamental requirement for a production-grade system.

The System in Action

A Mermaid.js sequence diagram illustrates the complete “happy path” flow.

sequenceDiagram
    participant Client
    participant Orchestrator
    participant SQLite
    participant SQS_Payment
    participant PaymentSvc
    participant SQS_Notify
    participant NotificationSvc
    participant SQS_Reply

    Client->>Orchestrator: 1. Initiate Booking
    Orchestrator->>SQLite: 2. Create Saga record (State: PENDING)
    activate Orchestrator
    Orchestrator->>SQS_Payment: 3. Send ProcessPayment command
    Orchestrator->>SQLite: 4. Update Saga (State: AWAITING_PAYMENT)
    deactivate Orchestrator
    
    SQS_Payment-->>PaymentSvc: 5. Receive Message
    activate PaymentSvc
    PaymentSvc->>PaymentSvc: 6. Process local transaction
    PaymentSvc->>SQS_Reply: 7. Send PaymentSuccessful event
    deactivate PaymentSvc

    SQS_Reply-->>Orchestrator: 8. Receive Message
    activate Orchestrator
    Orchestrator->>SQLite: 9. Load Saga, verify state
    Orchestrator->>SQS_Notify: 10. Send SendNotification command
    Orchestrator->>SQLite: 11. Update Saga (State: AWAITING_NOTIFICATION)
    deactivate Orchestrator

    SQS_Notify-->>NotificationSvc: 12. Receive Message
    activate NotificationSvc
    NotificationSvc->>NotificationSvc: 13. Process local transaction
    NotificationSvc->>SQS_Reply: 14. Send NotificationSent event
    deactivate NotificationSvc

    SQS_Reply-->>Orchestrator: 15. Receive Message
    activate Orchestrator
    Orchestrator->>SQLite: 16. Load Saga, verify state
    Orchestrator->>SQLite: 17. Update Saga (State: COMPLETED)
    deactivate Orchestrator

This architecture has proven to be extremely robust. Individual participant services can fail and recover without affecting the integrity of the overall process. The orchestrator, with its simple and durable SQLite state, can be restarted, and it will pick up right where it left off by processing messages from its input queue. The BDD tests provide confidence that the complex orchestration logic is correct across dozens of potential failure and compensation scenarios.

However, this architecture is not without its limitations. The orchestrator, while its state is durable, is itself a stateful component. A given saga instance is managed by a single orchestrator process. If that process dies, another must take over, but there’s no built-in mechanism for high-availability of a single saga’s in-memory context. We rely on fast process restarts (e.g., via Kubernetes) and the durability of SQS messages and the SQLite file. Furthermore, gaining a global, real-time view of all in-flight sagas requires querying across potentially many separate SQLite database files on different hosts. A future iteration could involve streaming state change events from each orchestrator to a centralized logging or observability platform to create a consolidated dashboard. For now, the operational simplicity and resilience at the individual process level have far outweighed these concerns.


  TOC