Implementing Idempotent Event Handlers in Haskell Driven by BDD and DynamoDB Conditional Writes


The system began dropping acknowledgements under load. Our event consumer, designed for at-least-once delivery from a Kafka topic, started retrying messages against a third-party payment API. The result was predictable and unacceptable: duplicate financial transactions. This forced a hard stop and an immediate design review. The core requirement that emerged was not just desirable but business-critical: our event processing handlers must be strictly idempotent. Any given event, identified by a unique key, must be processed to completion exactly once, regardless of how many times the handler receives it.

Our initial whiteboard sketch involved a simple SQL table to track processed event IDs. A SELECT followed by an INSERT inside a transaction seemed viable. However, our stack was evolving. We were committed to Haskell for its compile-time guarantees in mission-critical code, and DynamoDB was our strategic choice for scalable, low-latency data storage. A traditional relational database felt like a step backward. The challenge became how to implement a robust, lock-free idempotency mechanism using the specific capabilities of this stack.

The decision to stick with Haskell was straightforward. For a financial process, modeling states like Processing, Completed, and Failed within a rigid type system eliminates an entire category of runtime errors. The compiler becomes your first line of defense against invalid state transitions.

DynamoDB was chosen not just for its performance characteristics, but for one specific feature: Conditional Writes. The ability to execute a PutItem or UpdateItem operation that only succeeds if a specific condition on the item is met—atomically, on the server side—is the perfect primitive for this problem. It allows for an atomic “check-and-set” operation, sidestepping the classic read-modify-write race condition that would otherwise require pessimistic locking or complex application-level logic.

Finally, we adopted Behavior-Driven Development (BDD). The logic for idempotency is subtle. It’s not a binary “it works” or “it doesn’t.” It’s a collection of behaviors. What happens on the first call? What happens on a second, immediate call? What if a call fails partway through and is retried? How are concurrent requests for the same event handled? Using Gherkin syntax with the hspec-gherkin library allowed us to specify these scenarios in plain English, get sign-off from stakeholders, and then write tests that directly mirror this expected behavior.

The project structure was set up using stack. The key dependencies in our package.yaml are:

# package.yaml
# ...
dependencies:
- base >= 4.7 && < 5
- aeson
- amazonka
- amazonka-dynamodb
- bytestring
- hspec
- hspec-gherkin
- lens
- text
- unordered-containers
- uuid
- mtl

Our first step was to model the domain. Haskell’s algebraic data types are exceptionally well-suited for this. We defined the event payload, its unique identifier, and the possible states of its processing lifecycle.

-- src/Domain.hs
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}

module Domain where

import Data.Aeson (FromJSON, ToJSON)
import Data.Text (Text)
import Data.Time (UTCTime)
import GHC.Generics (Generic)
import Data.UUID (UUID)

newtype EventId = EventId { unEventId :: UUID }
  deriving (Show, Eq, Generic)

instance ToJSON EventId
instance FromJSON EventId

data PaymentDetails = PaymentDetails
  { accountFrom :: Text
  , accountTo   :: Text
  , amount      :: Int
  } deriving (Show, Eq, Generic)

instance ToJSON PaymentDetails
instance FromJSON PaymentDetails

data PaymentEvent = PaymentEvent
  { eventId   :: EventId
  , payload   :: PaymentDetails
  , timestamp :: UTCTime
  } deriving (Show, Eq, Generic)

instance ToJSON PaymentEvent
instance FromJSON PaymentEvent

-- Represents the state of processing for an idempotency key.
data ProcessingState = Pending | Completed | Failed
  deriving (Show, Eq, Enum, Bounded, Generic)

-- For converting to/from DynamoDB attribute values
instance ToJSON ProcessingState where
instance FromJSON ProcessingState

-- Represents the record we store in DynamoDB for idempotency tracking.
data IdempotencyRecord = IdempotencyRecord
  { recordId      :: Text -- Partition Key, e.g., "IDEMPOTENCY#<event-id>"
  , state         :: ProcessingState
  , lastUpdated   :: UTCTime
  , ttl           :: Integer -- DynamoDB TTL attribute (Unix epoch seconds)
  } deriving (Show, Eq, Generic)

instance ToJSON IdempotencyRecord
instance FromJSON IdempotencyRecord

With the types defined, we wrote the behavior specification before writing any implementation code. This is the core of the BDD cycle.

features/Idempotency.feature:

Feature: Idempotent Event Processing

  Scenario: A new event is processed successfully
    Given a new payment event
    When the event handler processes the event
    Then the processing should succeed
    And an idempotency record for the event should be marked as 'Completed'

  Scenario: A previously completed event is reprocessed
    Given a payment event that has already been processed successfully
    When the event handler processes the same event again
    Then the processing should be skipped due to idempotency
    And the idempotency record for the event should remain 'Completed'

  Scenario: A failed event is retried
    Given a payment event that failed during its first processing attempt
    When the event handler processes the same event again
    Then the processing should succeed
    And the idempotency record for the event should be updated to 'Completed'

  Scenario: Concurrent attempts to process the same new event
    Given a new payment event
    When two handlers attempt to process the event concurrently
    Then only one handler should succeed in processing
    And the other handler should skip processing due to idempotency

This Gherkin file acts as our executable specification. The next step is to write the hspec test suite that will parse this file and fail until we implement the required logic.

The core of the solution lies in the interaction with DynamoDB. We needed a data access layer that correctly leverages ConditionExpression. Our design uses a single DynamoDB table with a generic primary key schema (PK, SK) to allow for future extensibility, but for this component, we only use the partition key (PK). The idempotency record for an event with ID 123 will have a PK of IDEMPOTENCY#123.

The critical function is acquireLock. It attempts to create an idempotency record in a Pending state. The magic is in the ConditionExpression: attribute_not_exists(PK). This expression tells DynamoDB to only allow the PutItem operation to succeed if an item with that partition key does not already exist. If it does exist, DynamoDB rejects the write and the AWS SDK returns a ConditionalCheckFailedException. This is the atomic primitive we build upon.

Here’s the data access layer implementation. Note the explicit handling of the specific exception from amazonka. In a real-world project, robust logging would be added here.

-- src/DynamoDB.hs
{-# LANGUAGE OverloadedStrings #-}

module DynamoDB where

import Control.Lens ((&), (.~), (?~))
import Control.Monad.Catch (MonadCatch, catch, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (MonadReader, asks)
import Data.Aeson (encode)
import qualified Data.HashMap.Strict as HashMap
import Data.Text (Text, pack)
import Data.Time (UTCTime, getCurrentTime)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
import Network.AWS
import Network.AWS.DynamoDB
import Domain

-- A simple environment for our application
data AppEnv = AppEnv
  { appEnvManager :: Manager
  , appEnvAwsEnv  :: Env
  , appEnvTableName :: Text
  }

type AppM = ReaderT AppEnv IO

data LockAcquisitionResult
  = LockAcquired
  | AlreadyProcessed -- The event was already in 'Completed' state
  | LockHeld         -- Another process holds the lock ('Pending' state)
  deriving (Show, Eq)

data IdempotencyError
  = LockAcquisitionConflict -- ConditionalCheckFailedException
  | AwsError ServiceError
  deriving (Show)

-- This function attempts to atomically create an idempotency record.
acquireLock :: (MonadIO m, MonadReader AppEnv m, MonadCatch m) => EventId -> m LockAcquisitionResult
acquireLock eventId = do
  env <- asks appEnvAwsEnv
  tableName <- asks appEnvTableName
  now <- liftIO getCurrentTime
  let ttlInSeconds = round (utcTimeToPOSIXSeconds now) + (3600 * 24) -- 24-hour TTL
      recordKey = "IDEMPOTENCY#" <> pack (show $ unEventId eventId)

  -- The PutItem request with the crucial condition expression
  let putRequest = putItem tableName
        & piItem .~ HashMap.fromList
            [ ("PK", attributeValue & avS ?~ recordKey)
            , ("State", attributeValue & avS ?~ "Pending")
            , ("LastUpdated", attributeValue & avS ?~ pack (show now))
            , ("TTL", attributeValue & avN ?~ pack (show ttlInSeconds))
            ]
        -- This is the core of the idempotency check.
        & piConditionExpression ?~ "attribute_not_exists(PK)"

  runResourceT . runAWS env . within NorthVirginia $ do
    send putRequest `catch` handlePutConflict
    pure LockAcquired

  where
    -- If the put fails due to the condition, we need to inspect the existing item
    -- to determine if it was already completed or if another processor is active.
    handlePutConflict :: (MonadIO m, MonadReader AppEnv m, MonadCatch m) => ServiceError -> m a
    handlePutConflict err
      | _serviceCode err == "ConditionalCheckFailedException" = do
          -- In a real system, you'd fetch the item to check its state.
          -- For this example, we'll simplify and assume a conflict means it's locked.
          -- A more robust implementation would GET the item and check its State attribute.
          -- If State is "Completed", return AlreadyProcessed. If "Pending", return LockHeld.
          liftIO $ putStrLn "Conditional check failed. Item likely exists."
          throwM LockAcquisitionConflict
      | otherwise = throwM (AwsError err)


-- Updates the state of an existing idempotency record.
updateState :: (MonadIO m, MonadReader AppEnv m, MonadCatch m) => EventId -> ProcessingState -> m ()
updateState eventId newState = do
  env <- asks appEnvAwsEnv
  tableName <- asks appEnvTableName
  now <- liftIO getCurrentTime
  let recordKey = "IDEMPOTENCY#" <> pack (show $ unEventId eventId)

  let updateRequest = updateItem tableName
        & uiKey .~ HashMap.singleton "PK" (attributeValue & avS ?~ recordKey)
        & uiUpdateExpression ?~ "SET #s = :s, #lu = :lu"
        & uiExpressionAttributeNames .~ HashMap.fromList [("#s", "State"), ("#lu", "LastUpdated")]
        & uiExpressionAttributeValues .~ HashMap.fromList
            [ (":s", attributeValue & avS ?~ pack (show newState))
            , (":lu", attributeValue & avS ?~ pack (show now))
            ]

  runResourceT . runAWS env . within NorthVirginia $ do
    _ <- send updateRequest `catch` (\e -> throwM (AwsError e))
    pure ()

A pitfall we discovered during implementation was handling stale locks. If a process acquires the lock (writes a Pending record) and then crashes, the event is locked forever. The solution is to use DynamoDB’s Time To Live (TTL) feature. We add a TTL attribute to each record containing a Unix epoch timestamp. When configured on the table, DynamoDB automatically deletes items whose TTL timestamp is in the past. This provides an automatic cleanup mechanism for orphaned locks, ensuring the system is self-healing. The TTL value must be chosen carefully; it should be longer than the maximum expected processing time for an event.

Now we can write the main event handler logic. It orchestrates the calls to the DynamoDB layer and contains the actual business logic (which we’ll mock for this article). Using a monad transformer stack like ExceptT AppError AppM is idiomatic in Haskell for handling computations that can fail.

-- src/Handler.hs
{-# LANGUAGE OverloadedStrings #-}

module Handler where

import Control.Monad.Catch (MonadCatch, catch, throwM)
import Control.Monad.Except (ExceptT, runExceptT, throwError, catchError)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (MonadReader)
import Domain
import DynamoDB

-- Define potential outcomes of processing an event
data ProcessingResult
  = ProcessedSuccessfully
  | SkippedDuplicate
  | ProcessingFailed
  deriving (Show, Eq)

data AppError
  = IdempotencyError IdempotencyError
  | BusinessLogicError String
  deriving (Show)

-- The main handler function
processEvent :: (MonadIO m, MonadReader AppEnv m, MonadCatch m) => PaymentEvent -> m ProcessingResult
processEvent event = do
  let eid = eventId event
  result <- runExceptT $ processEventInternal eid
  case result of
    Left (IdempotencyError LockAcquisitionConflict) -> do
      liftIO $ putStrLn $ "Event " ++ show eid ++ " is already being processed or is complete. Skipping."
      pure SkippedDuplicate
    Left (BusinessLogicError msg) -> do
      liftIO $ putStrLn $ "Business logic failed for event " ++ show eid ++ ": " ++ msg
      -- Attempt to mark the idempotency record as Failed
      updateState eid Failed `catch` (\_ -> pure ()) -- Best-effort update
      pure ProcessingFailed
    Left (IdempotencyError (AwsError e)) -> do
      liftIO $ putStrLn $ "AWS Error during processing: " ++ show e
      pure ProcessingFailed
    Right () -> do
      liftIO $ putStrLn $ "Event " ++ show eid ++ " processed successfully."
      pure ProcessedSuccessfully

processEventInternal :: (MonadIO m, MonadReader AppEnv m, MonadCatch m) => EventId -> ExceptT AppError m ()
processEventInternal eid = do
  -- Step 1: Acquire the idempotency lock
  acquireLock eid `catchError` (throwError . IdempotencyError)

  -- Step 2: Execute the core business logic
  -- This is where you would call the payment gateway, etc.
  -- We wrap it in ExceptT to handle its potential failures.
  executeBusinessLogic eid `catchError` (\e -> do
    -- If business logic fails, we still own the lock. We should update the
    -- state to Failed before propagating the error.
    updateState eid Failed `catchError` (throwError . IdempotencyError)
    throwError e
    )

  -- Step 3: If business logic succeeds, mark the event as Completed
  updateState eid Completed `catchError` (throwError . IdempotencyError)

-- A mock of the actual work being done
executeBusinessLogic :: MonadIO m => EventId -> ExceptT AppError m ()
executeBusinessLogic eid = do
  liftIO $ putStrLn $ "Executing business logic for event: " ++ show eid
  -- To simulate a failure, you could uncomment the following line:
  -- if someCondition then throwError (BusinessLogicError "Payment Gateway Timeout") else pure ()
  pure ()

The flow can be visualized as follows:

sequenceDiagram
    participant Client
    participant Handler
    participant DynamoDB
    Client->>Handler: processEvent(event)
    Handler->>DynamoDB: PUT item with ConditionExpression: attribute_not_exists(PK)
    alt Lock Acquisition Succeeds
        DynamoDB-->>Handler: PUT Success
        Handler->>Handler: Execute Business Logic
        alt Business Logic Succeeds
            Handler->>DynamoDB: UPDATE item state to 'Completed'
            DynamoDB-->>Handler: UPDATE Success
            Handler-->>Client: ProcessedSuccessfully
        else Business Logic Fails
            Handler->>DynamoDB: UPDATE item state to 'Failed'
            DynamoDB-->>Handler: UPDATE Success
            Handler-->>Client: ProcessingFailed
        end
    else Lock Acquisition Fails (ConditionalCheckFailedException)
        DynamoDB-->>Handler: ConditionalCheckFailedException
        Handler-->>Client: SkippedDuplicate
    end

With the implementation in place, we returned to the BDD tests. We used dynamodb-local as a test dependency to run a local instance of DynamoDB, allowing our tests to be self-contained integration tests. The hspec step definitions connect the Gherkin steps to our Haskell code, setting up initial database states and asserting on the outcomes.

For the concurrent scenario, the test code spawns two lightweight Haskell threads (async) that both attempt to call processEvent with the same EventId. We then collect the results and assert that one returns ProcessedSuccessfully and the other returns SkippedDuplicate. This directly validates that our conditional write prevents double processing under race conditions.

The final solution is robust but not without its boundaries. The idempotency protection is scoped to this specific handler. It does not provide distributed transaction guarantees. If executeBusinessLogic makes two distinct external API calls and the process fails after the first one succeeds, the system is left in an inconsistent state. The idempotency key will prevent the event from being re-processed from the beginning, but it won’t roll back the partial work. For such cases, a more complex pattern like the Saga pattern would be required, where each step of the business logic is its own idempotent action that can be compensated.

Furthermore, the choice of TTL is a critical trade-off between liveness (how quickly a failed event can be retried) and safety (ensuring a slow-running process isn’t preempted by its own lock expiring). A too-short TTL could lead to the very duplicate processing we aimed to prevent if a valid process takes longer than expected. A potential future iteration could involve the handler periodically “heartbeating” to extend the TTL on its lock for long-running jobs, but this adds significant complexity to the state management.


  TOC