The system began dropping acknowledgements under load. Our event consumer, designed for at-least-once delivery from a Kafka topic, started retrying messages against a third-party payment API. The result was predictable and unacceptable: duplicate financial transactions. This forced a hard stop and an immediate design review. The core requirement that emerged was not just desirable but business-critical: our event processing handlers must be strictly idempotent. Any given event, identified by a unique key, must be processed to completion exactly once, regardless of how many times the handler receives it.
Our initial whiteboard sketch involved a simple SQL table to track processed event IDs. A SELECT
followed by an INSERT
inside a transaction seemed viable. However, our stack was evolving. We were committed to Haskell for its compile-time guarantees in mission-critical code, and DynamoDB was our strategic choice for scalable, low-latency data storage. A traditional relational database felt like a step backward. The challenge became how to implement a robust, lock-free idempotency mechanism using the specific capabilities of this stack.
The decision to stick with Haskell was straightforward. For a financial process, modeling states like Processing
, Completed
, and Failed
within a rigid type system eliminates an entire category of runtime errors. The compiler becomes your first line of defense against invalid state transitions.
DynamoDB was chosen not just for its performance characteristics, but for one specific feature: Conditional Writes. The ability to execute a PutItem
or UpdateItem
operation that only succeeds if a specific condition on the item is met—atomically, on the server side—is the perfect primitive for this problem. It allows for an atomic “check-and-set” operation, sidestepping the classic read-modify-write race condition that would otherwise require pessimistic locking or complex application-level logic.
Finally, we adopted Behavior-Driven Development (BDD). The logic for idempotency is subtle. It’s not a binary “it works” or “it doesn’t.” It’s a collection of behaviors. What happens on the first call? What happens on a second, immediate call? What if a call fails partway through and is retried? How are concurrent requests for the same event handled? Using Gherkin syntax with the hspec-gherkin
library allowed us to specify these scenarios in plain English, get sign-off from stakeholders, and then write tests that directly mirror this expected behavior.
The project structure was set up using stack
. The key dependencies in our package.yaml
are:
# package.yaml
# ...
dependencies:
- base >= 4.7 && < 5
- aeson
- amazonka
- amazonka-dynamodb
- bytestring
- hspec
- hspec-gherkin
- lens
- text
- unordered-containers
- uuid
- mtl
Our first step was to model the domain. Haskell’s algebraic data types are exceptionally well-suited for this. We defined the event payload, its unique identifier, and the possible states of its processing lifecycle.
-- src/Domain.hs
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
module Domain where
import Data.Aeson (FromJSON, ToJSON)
import Data.Text (Text)
import Data.Time (UTCTime)
import GHC.Generics (Generic)
import Data.UUID (UUID)
newtype EventId = EventId { unEventId :: UUID }
deriving (Show, Eq, Generic)
instance ToJSON EventId
instance FromJSON EventId
data PaymentDetails = PaymentDetails
{ accountFrom :: Text
, accountTo :: Text
, amount :: Int
} deriving (Show, Eq, Generic)
instance ToJSON PaymentDetails
instance FromJSON PaymentDetails
data PaymentEvent = PaymentEvent
{ eventId :: EventId
, payload :: PaymentDetails
, timestamp :: UTCTime
} deriving (Show, Eq, Generic)
instance ToJSON PaymentEvent
instance FromJSON PaymentEvent
-- Represents the state of processing for an idempotency key.
data ProcessingState = Pending | Completed | Failed
deriving (Show, Eq, Enum, Bounded, Generic)
-- For converting to/from DynamoDB attribute values
instance ToJSON ProcessingState where
instance FromJSON ProcessingState
-- Represents the record we store in DynamoDB for idempotency tracking.
data IdempotencyRecord = IdempotencyRecord
{ recordId :: Text -- Partition Key, e.g., "IDEMPOTENCY#<event-id>"
, state :: ProcessingState
, lastUpdated :: UTCTime
, ttl :: Integer -- DynamoDB TTL attribute (Unix epoch seconds)
} deriving (Show, Eq, Generic)
instance ToJSON IdempotencyRecord
instance FromJSON IdempotencyRecord
With the types defined, we wrote the behavior specification before writing any implementation code. This is the core of the BDD cycle.
features/Idempotency.feature
:
Feature: Idempotent Event Processing
Scenario: A new event is processed successfully
Given a new payment event
When the event handler processes the event
Then the processing should succeed
And an idempotency record for the event should be marked as 'Completed'
Scenario: A previously completed event is reprocessed
Given a payment event that has already been processed successfully
When the event handler processes the same event again
Then the processing should be skipped due to idempotency
And the idempotency record for the event should remain 'Completed'
Scenario: A failed event is retried
Given a payment event that failed during its first processing attempt
When the event handler processes the same event again
Then the processing should succeed
And the idempotency record for the event should be updated to 'Completed'
Scenario: Concurrent attempts to process the same new event
Given a new payment event
When two handlers attempt to process the event concurrently
Then only one handler should succeed in processing
And the other handler should skip processing due to idempotency
This Gherkin file acts as our executable specification. The next step is to write the hspec
test suite that will parse this file and fail until we implement the required logic.
The core of the solution lies in the interaction with DynamoDB. We needed a data access layer that correctly leverages ConditionExpression
. Our design uses a single DynamoDB table with a generic primary key schema (PK
, SK
) to allow for future extensibility, but for this component, we only use the partition key (PK
). The idempotency record for an event with ID 123
will have a PK
of IDEMPOTENCY#123
.
The critical function is acquireLock
. It attempts to create an idempotency record in a Pending
state. The magic is in the ConditionExpression
: attribute_not_exists(PK)
. This expression tells DynamoDB to only allow the PutItem
operation to succeed if an item with that partition key does not already exist. If it does exist, DynamoDB rejects the write and the AWS SDK returns a ConditionalCheckFailedException
. This is the atomic primitive we build upon.
Here’s the data access layer implementation. Note the explicit handling of the specific exception from amazonka
. In a real-world project, robust logging would be added here.
-- src/DynamoDB.hs
{-# LANGUAGE OverloadedStrings #-}
module DynamoDB where
import Control.Lens ((&), (.~), (?~))
import Control.Monad.Catch (MonadCatch, catch, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (MonadReader, asks)
import Data.Aeson (encode)
import qualified Data.HashMap.Strict as HashMap
import Data.Text (Text, pack)
import Data.Time (UTCTime, getCurrentTime)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
import Network.AWS
import Network.AWS.DynamoDB
import Domain
-- A simple environment for our application
data AppEnv = AppEnv
{ appEnvManager :: Manager
, appEnvAwsEnv :: Env
, appEnvTableName :: Text
}
type AppM = ReaderT AppEnv IO
data LockAcquisitionResult
= LockAcquired
| AlreadyProcessed -- The event was already in 'Completed' state
| LockHeld -- Another process holds the lock ('Pending' state)
deriving (Show, Eq)
data IdempotencyError
= LockAcquisitionConflict -- ConditionalCheckFailedException
| AwsError ServiceError
deriving (Show)
-- This function attempts to atomically create an idempotency record.
acquireLock :: (MonadIO m, MonadReader AppEnv m, MonadCatch m) => EventId -> m LockAcquisitionResult
acquireLock eventId = do
env <- asks appEnvAwsEnv
tableName <- asks appEnvTableName
now <- liftIO getCurrentTime
let ttlInSeconds = round (utcTimeToPOSIXSeconds now) + (3600 * 24) -- 24-hour TTL
recordKey = "IDEMPOTENCY#" <> pack (show $ unEventId eventId)
-- The PutItem request with the crucial condition expression
let putRequest = putItem tableName
& piItem .~ HashMap.fromList
[ ("PK", attributeValue & avS ?~ recordKey)
, ("State", attributeValue & avS ?~ "Pending")
, ("LastUpdated", attributeValue & avS ?~ pack (show now))
, ("TTL", attributeValue & avN ?~ pack (show ttlInSeconds))
]
-- This is the core of the idempotency check.
& piConditionExpression ?~ "attribute_not_exists(PK)"
runResourceT . runAWS env . within NorthVirginia $ do
send putRequest `catch` handlePutConflict
pure LockAcquired
where
-- If the put fails due to the condition, we need to inspect the existing item
-- to determine if it was already completed or if another processor is active.
handlePutConflict :: (MonadIO m, MonadReader AppEnv m, MonadCatch m) => ServiceError -> m a
handlePutConflict err
| _serviceCode err == "ConditionalCheckFailedException" = do
-- In a real system, you'd fetch the item to check its state.
-- For this example, we'll simplify and assume a conflict means it's locked.
-- A more robust implementation would GET the item and check its State attribute.
-- If State is "Completed", return AlreadyProcessed. If "Pending", return LockHeld.
liftIO $ putStrLn "Conditional check failed. Item likely exists."
throwM LockAcquisitionConflict
| otherwise = throwM (AwsError err)
-- Updates the state of an existing idempotency record.
updateState :: (MonadIO m, MonadReader AppEnv m, MonadCatch m) => EventId -> ProcessingState -> m ()
updateState eventId newState = do
env <- asks appEnvAwsEnv
tableName <- asks appEnvTableName
now <- liftIO getCurrentTime
let recordKey = "IDEMPOTENCY#" <> pack (show $ unEventId eventId)
let updateRequest = updateItem tableName
& uiKey .~ HashMap.singleton "PK" (attributeValue & avS ?~ recordKey)
& uiUpdateExpression ?~ "SET #s = :s, #lu = :lu"
& uiExpressionAttributeNames .~ HashMap.fromList [("#s", "State"), ("#lu", "LastUpdated")]
& uiExpressionAttributeValues .~ HashMap.fromList
[ (":s", attributeValue & avS ?~ pack (show newState))
, (":lu", attributeValue & avS ?~ pack (show now))
]
runResourceT . runAWS env . within NorthVirginia $ do
_ <- send updateRequest `catch` (\e -> throwM (AwsError e))
pure ()
A pitfall we discovered during implementation was handling stale locks. If a process acquires the lock (writes a Pending
record) and then crashes, the event is locked forever. The solution is to use DynamoDB’s Time To Live (TTL) feature. We add a TTL
attribute to each record containing a Unix epoch timestamp. When configured on the table, DynamoDB automatically deletes items whose TTL timestamp is in the past. This provides an automatic cleanup mechanism for orphaned locks, ensuring the system is self-healing. The TTL value must be chosen carefully; it should be longer than the maximum expected processing time for an event.
Now we can write the main event handler logic. It orchestrates the calls to the DynamoDB layer and contains the actual business logic (which we’ll mock for this article). Using a monad transformer stack like ExceptT AppError AppM
is idiomatic in Haskell for handling computations that can fail.
-- src/Handler.hs
{-# LANGUAGE OverloadedStrings #-}
module Handler where
import Control.Monad.Catch (MonadCatch, catch, throwM)
import Control.Monad.Except (ExceptT, runExceptT, throwError, catchError)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (MonadReader)
import Domain
import DynamoDB
-- Define potential outcomes of processing an event
data ProcessingResult
= ProcessedSuccessfully
| SkippedDuplicate
| ProcessingFailed
deriving (Show, Eq)
data AppError
= IdempotencyError IdempotencyError
| BusinessLogicError String
deriving (Show)
-- The main handler function
processEvent :: (MonadIO m, MonadReader AppEnv m, MonadCatch m) => PaymentEvent -> m ProcessingResult
processEvent event = do
let eid = eventId event
result <- runExceptT $ processEventInternal eid
case result of
Left (IdempotencyError LockAcquisitionConflict) -> do
liftIO $ putStrLn $ "Event " ++ show eid ++ " is already being processed or is complete. Skipping."
pure SkippedDuplicate
Left (BusinessLogicError msg) -> do
liftIO $ putStrLn $ "Business logic failed for event " ++ show eid ++ ": " ++ msg
-- Attempt to mark the idempotency record as Failed
updateState eid Failed `catch` (\_ -> pure ()) -- Best-effort update
pure ProcessingFailed
Left (IdempotencyError (AwsError e)) -> do
liftIO $ putStrLn $ "AWS Error during processing: " ++ show e
pure ProcessingFailed
Right () -> do
liftIO $ putStrLn $ "Event " ++ show eid ++ " processed successfully."
pure ProcessedSuccessfully
processEventInternal :: (MonadIO m, MonadReader AppEnv m, MonadCatch m) => EventId -> ExceptT AppError m ()
processEventInternal eid = do
-- Step 1: Acquire the idempotency lock
acquireLock eid `catchError` (throwError . IdempotencyError)
-- Step 2: Execute the core business logic
-- This is where you would call the payment gateway, etc.
-- We wrap it in ExceptT to handle its potential failures.
executeBusinessLogic eid `catchError` (\e -> do
-- If business logic fails, we still own the lock. We should update the
-- state to Failed before propagating the error.
updateState eid Failed `catchError` (throwError . IdempotencyError)
throwError e
)
-- Step 3: If business logic succeeds, mark the event as Completed
updateState eid Completed `catchError` (throwError . IdempotencyError)
-- A mock of the actual work being done
executeBusinessLogic :: MonadIO m => EventId -> ExceptT AppError m ()
executeBusinessLogic eid = do
liftIO $ putStrLn $ "Executing business logic for event: " ++ show eid
-- To simulate a failure, you could uncomment the following line:
-- if someCondition then throwError (BusinessLogicError "Payment Gateway Timeout") else pure ()
pure ()
The flow can be visualized as follows:
sequenceDiagram participant Client participant Handler participant DynamoDB Client->>Handler: processEvent(event) Handler->>DynamoDB: PUT item with ConditionExpression: attribute_not_exists(PK) alt Lock Acquisition Succeeds DynamoDB-->>Handler: PUT Success Handler->>Handler: Execute Business Logic alt Business Logic Succeeds Handler->>DynamoDB: UPDATE item state to 'Completed' DynamoDB-->>Handler: UPDATE Success Handler-->>Client: ProcessedSuccessfully else Business Logic Fails Handler->>DynamoDB: UPDATE item state to 'Failed' DynamoDB-->>Handler: UPDATE Success Handler-->>Client: ProcessingFailed end else Lock Acquisition Fails (ConditionalCheckFailedException) DynamoDB-->>Handler: ConditionalCheckFailedException Handler-->>Client: SkippedDuplicate end
With the implementation in place, we returned to the BDD tests. We used dynamodb-local
as a test dependency to run a local instance of DynamoDB, allowing our tests to be self-contained integration tests. The hspec
step definitions connect the Gherkin steps to our Haskell code, setting up initial database states and asserting on the outcomes.
For the concurrent scenario, the test code spawns two lightweight Haskell threads (async
) that both attempt to call processEvent
with the same EventId
. We then collect the results and assert that one returns ProcessedSuccessfully
and the other returns SkippedDuplicate
. This directly validates that our conditional write prevents double processing under race conditions.
The final solution is robust but not without its boundaries. The idempotency protection is scoped to this specific handler. It does not provide distributed transaction guarantees. If executeBusinessLogic
makes two distinct external API calls and the process fails after the first one succeeds, the system is left in an inconsistent state. The idempotency key will prevent the event from being re-processed from the beginning, but it won’t roll back the partial work. For such cases, a more complex pattern like the Saga pattern would be required, where each step of the business logic is its own idempotent action that can be compensated.
Furthermore, the choice of TTL is a critical trade-off between liveness (how quickly a failed event can be retried) and safety (ensuring a slow-running process isn’t preempted by its own lock expiring). A too-short TTL could lead to the very duplicate processing we aimed to prevent if a valid process takes longer than expected. A potential future iteration could involve the handler periodically “heartbeating” to extend the TTL on its lock for long-running jobs, but this adds significant complexity to the state management.