Our deployment process was a liability. A single feature release required orchestrating changes across three distinct services hosted on separate DigitalOcean droplets: a primary PostgreSQL database, a core application service, and a configuration service. The deployment “script” was a collection of brittle shell commands chained together with &&
. When step two failed, step one was already committed, leaving the system in an inconsistent state that required manual intervention. The process was unreliable enough that deployments became a scheduled, all-hands-on-deck event. This was the technical pain point that kicked off our initiative in the first sprint.
The initial goal was simple: atomicity. Either all three services deploy successfully, or none of them do. Our team, working within a Scrum framework, dedicated a sprint to exploring solutions. The immediate candidates were the Saga and Two-Phase Commit (2PC) patterns. Sagas, with their emphasis on choreography and compensating transactions, felt like the modern, scalable choice. However, when we whiteboarded the compensating actions for each failure mode, the rollback logic started to look more complex than the deployment logic itself.
A common mistake is to dismiss 2PC outright due to its blocking nature and single-point-of-failure coordinator. In a real-world project, however, you must evaluate patterns within their specific context. Our use case was low-frequency (a few deployments a day), low-concurrency (typically one at a time), but demanded absolute consistency. For this internal administrative tool, the operational simplicity of a strongly consistent, atomic transaction outweighed the scalability concerns. The deciding factor was the desire for real-time observability. We wanted a UI where an operator could watch the transaction progress across all participants. This requirement made Elixir, with OTP and Phoenix Channels, a perfect fit. We decided to build a 2PC Coordinator.
The Architectural Blueprint
The system consists of three main components:
- The Coordinator: An Elixir GenServer responsible for managing the state of a single distributed transaction. It orchestrates the two phases: prepare and commit/abort.
- The Participants: Three simple web services (we used Plug in Elixir for mocks) representing our real services. They expose
/prepare
,/commit
, and/abort
endpoints. They must be ableto persist their state between the prepare and commit phases. - The Real-Time Interface: A Phoenix Channel that allows a web client to connect via WebSockets and receive live updates on the transaction’s state as the Coordinator progresses.
The entire system would run on our DigitalOcean infrastructure, with services communicating over the private network. Network latency and partitions are a real concern, making robust timeout handling in the coordinator non-negotiable.
sequenceDiagram participant Client participant PhoenixChannel participant Coordinator (GenServer) participant PubSub participant ServiceA participant ServiceB participant ServiceC Client->>PhoenixChannel: Initiates transaction PhoenixChannel->>Coordinator: start_transaction(services) Coordinator->>Coordinator: State -> :preparing Coordinator->>PubSub: Broadcasts {tx_id, state: :preparing} PubSub-->>PhoenixChannel: Pushes update to Client loop Prepare Phase Coordinator->>ServiceA: POST /prepare Coordinator->>ServiceB: POST /prepare Coordinator->>ServiceC: POST /prepare end ServiceA-->>Coordinator: 200 OK (:prepared) ServiceB-->>Coordinator: 200 OK (:prepared) ServiceC-->>Coordinator: 500 Error (:abort) Coordinator->>Coordinator: Receives responses, decides to Abort Coordinator->>Coordinator: State -> :aborting Coordinator->>PubSub: Broadcasts {tx_id, state: :aborting} PubSub-->>PhoenixChannel: Pushes update to Client loop Abort Phase Coordinator->>ServiceA: POST /abort Coordinator->>ServiceB: POST /abort Coordinator->>ServiceC: POST /abort end ServiceA-->>Coordinator: 200 OK ServiceB-->>Coordinator: 200 OK ServiceC-->>Coordinator: 200 OK Coordinator->>Coordinator: State -> :aborted Coordinator->>PubSub: Broadcasts {tx_id, state: :aborted} PubSub-->>PhoenixChannel: Pushes update to Client
The Coordinator: An OTP GenServer Core
The heart of the system is a GenServer
that models the state machine of a 2PC transaction. Each transaction gets its own GenServer
process, supervised by a DynamicSupervisor
. This isolates failures and makes the system resilient.
The state for each transaction needs to track the participants, their individual responses, and the overall transaction state.
# lib/two_pc/transaction/coordinator.ex
defmodule TwoPc.Transaction.Coordinator do
use GenServer
require Logger
alias TwoPc.Transaction.ParticipantClient
# Timeout for participant responses (prepare, commit, abort)
# In a DigitalOcean private network, this should be low, but high enough to handle a service restart.
@participant_timeout 30_000 # 30 seconds
# State structure for the GenServer
defstruct tx_id: nil,
participants: %{}, # %{name => %{url: url, state: :pending}}
state: :new,
decision: :pending,
caller: nil
# Client API
def start_link(tx_id, participants) do
GenServer.start_link(__MODULE__, {tx_id, participants}, name: via(tx_id))
end
def start_transaction(tx_id, participants, caller) do
GenServer.cast(via(tx_id), {:start, participants, caller})
end
defp via(tx_id), do: {:via, Registry, {TwoPc.TransactionRegistry, tx_id}}
# Server Callbacks
@impl true
def init({tx_id, participants_list}) do
participants =
Enum.into(participants_list, %{}, fn %{name: name, url: url} ->
{name, %{url: url, state: :pending}}
end)
state = %{
tx_id: tx_id,
participants: participants,
state: :new,
caller: nil
}
Logger.info("[#{tx_id}] Coordinator initialized for participants: #{inspect(Map.keys(participants))}")
{:ok, state}
end
@impl true
def handle_cast({:start, participants, caller}, state) do
updated_state = %{state | participants: participants, caller: caller}
# Asynchronously kick off the prepare phase
Process.send(self(), :begin_prepare_phase, [])
{:noreply, updated_state}
end
@impl true
def handle_info(:begin_prepare_phase, state) do
Logger.info("[#{state.tx_id}] State -> :preparing. Broadcasting prepare requests.")
new_state = update_state(state, :preparing)
Enum.each(new_state.participants, fn {name, %{url: url}} ->
ParticipantClient.request_prepare(state.tx_id, name, url)
end)
# Set a master timeout for the whole prepare phase.
Process.send_after(self(), :prepare_timeout, @participant_timeout)
{:noreply, new_state}
end
# Handling responses from participants
@impl true
def handle_info({:participant_response, name, :prepared}, %{state: :preparing} = state) do
Logger.info("[#{state.tx_id}] Participant '#{name}' prepared successfully.")
updated_participants = put_in(state.participants[name].state, :prepared)
new_state = %{state | participants: updated_participants}
if all_participants_prepared?(new_state) do
Logger.info("[#{state.tx_id}] All participants prepared. Moving to commit phase.")
Process.send(self(), :begin_commit_phase, [])
{:noreply, new_state}
else
{:noreply, new_state}
end
end
@impl true
def handle_info({:participant_response, name, :abort}, %{state: :preparing} = state) do
Logger.warn("[#{state.tx_id}] Participant '#{name}' voted to abort.")
# One abort vote is enough to doom the transaction.
new_state = %{state | decision: :abort}
Process.send(self(), :begin_abort_phase, [])
{:noreply, new_state}
end
@impl true
def handle_info(:prepare_timeout, %{state: :preparing} = state) do
# A common pitfall is not handling timeouts correctly. If we timeout, we MUST abort.
# Any participant that hasn't responded is considered to have voted for abortion.
Logger.error("[#{state.tx_id}] Timed out waiting for prepare responses. Aborting transaction.")
new_state = %{state | decision: :abort}
Process.send(self(), :begin_abort_phase, [])
{:noreply, new_state}
end
# Commit Phase Logic
@impl true
def handle_info(:begin_commit_phase, state) do
Logger.info("[#{state.tx_id}] State -> :committing. Broadcasting commit requests.")
new_state = update_state(state, :committing)
Enum.each(new_state.participants, fn {name, %{url: url}} ->
ParticipantClient.request_commit(state.tx_id, name, url)
end)
{:noreply, new_state}
end
# Abort Phase Logic
@impl true
def handle_info(:begin_abort_phase, state) do
Logger.warn("[#{state.tx_id}] State -> :aborting. Broadcasting abort requests.")
new_state = update_state(state, :aborting)
Enum.each(new_state.participants, fn {name, %{url: url}} ->
ParticipantClient.request_abort(state.tx_id, name, url)
end)
{:noreply, new_state}
end
# Final state transitions. In a real system, you'd add timeouts and retries here.
@impl true
def handle_info({:participant_response, name, :committed}, %{state: :committing} = state) do
updated_participants = put_in(state.participants[name].state, :committed)
new_state = %{state | participants: updated_participants}
if all_participants_done?(new_state, :committed) do
Logger.info("[#{state.tx_id}] Transaction successfully committed.")
final_state = update_state(new_state, :committed)
# Terminate the process after completion.
{:stop, :normal, final_state}
else
{:noreply, new_state}
end
end
@impl true
def handle_info({:participant_response, name, :aborted}, %{state: :aborting} = state) do
updated_participants = put_in(state.participants[name].state, :aborted)
new_state = %{state | participants: updated_participants}
if all_participants_done?(new_state, :aborted) do
Logger.warn("[#{state.tx_id}] Transaction successfully aborted.")
final_state = update_state(new_state, :aborted)
{:stop, :normal, final_state}
else
{:noreply, new_state}
end
end
# Catch-all for unexpected messages
@impl true
def handle_info(msg, state) do
Logger.warn("[#{state.tx_id}] Received unexpected message in state #{state.state}: #{inspect(msg)}")
{:noreply, state}
end
# Private helpers
defp update_state(state, new_status) do
updated_state = %{state | state: new_status}
# This is the hook for our real-time UI
payload = %{
tx_id: state.tx_id,
state: new_status,
participants: updated_state.participants
}
TwoPc.Endpoint.broadcast("transaction:#{state.tx_id}", "state_change", payload)
updated_state
end
defp all_participants_prepared?(state) do
Enum.all?(state.participants, fn {_, p_state} -> p_state.state == :prepared end)
end
defp all_participants_done?(state, expected_state) do
Enum.all?(state.participants, fn {_, p_state} -> p_state.state == expected_state end)
end
end
The ParticipantClient
is a simple wrapper around an HTTP client (like Tesla
or HTTPoison
) that sends asynchronous requests to the participants and messages the responses back to the coordinator GenServer’s PID.
The Mock Participant Service
To test the coordinator, we need mock participant services. A simple Plug.Router
works well. The key challenge for a participant is maintaining state. After responding prepared
, it must remember this decision and be ready to either commit
or abort
later, even if the service restarts. For our mock, we use an in-memory ETS table to simulate this transactional state. In a production system, this would be a write-ahead log (WAL) or a specific status field in a database record.
# lib/two_pc/mock_participant.ex
defmodule TwoPc.MockParticipant do
use Plug.Router
require Logger
plug :match
plug :dispatch
# In-memory store for transaction states
# In a real service, this would be backed by a persistent store.
defmacrop ets_table, do: :mock_participant_states
def start_link(port) do
# Create the ETS table on start
:ets.new(ets_table(), [:set, :public, :named_table, read_concurrency: true])
Plug.Cowboy.http(__MODULE__, [], port: port)
end
post "/prepare" do
body = decode_body(conn)
tx_id = body["tx_id"]
# Simulate a potential failure. The database service might fail to prepare.
should_succeed = conn.port != 4002 # Let's make service on port 4002 the unreliable one
if should_succeed do
:ets.insert(ets_table(), {tx_id, :prepared})
Logger.info("[Participant #{conn.port}] Transaction #{tx_id} prepared.")
send_resp(conn, 200, "prepared")
else
:ets.insert(ets_table(), {tx_id, :aborted})
Logger.warn("[Participant #{conn.port}] Transaction #{tx_id} voting to ABORT.")
send_resp(conn, 500, "abort")
end
end
post "/commit" do
tx_id = decode_body(conn)["tx_id"]
case :ets.lookup(ets_table(), tx_id) do
[{^tx_id, :prepared}] ->
:ets.insert(ets_table(), {tx_id, :committed})
Logger.info("[Participant #{conn.port}] Transaction #{tx_id} committed.")
send_resp(conn, 200, "committed")
_ ->
# This is a critical state. A commit request was received for a non-prepared transaction.
Logger.error("[Participant #{conn.port}] Received commit for unknown/aborted transaction #{tx_id}.")
send_resp(conn, 409, "conflict")
end
end
post "/abort" do
tx_id = decode_body(conn)["tx_id"]
:ets.insert(ets_table(), {tx_id, :aborted})
Logger.info("[Participant #{conn.port}] Transaction #{tx_id} aborted.")
send_resp(conn, 200, "aborted")
end
match _ do
send_resp(conn, 404, "not found")
end
defp decode_body(conn) do
{:ok, body, _conn} = Plug.Conn.read_body(conn)
Jason.decode!(body)
end
end
The Phoenix Channel and Real-Time UI
This is where the user-facing value appears. We define a Phoenix Channel that clients can join to listen for updates on a specific transaction. The Coordinator
GenServer broadcasts every state change to a PubSub topic, like "transaction:<tx_id>"
. The channel is just a conduit for these messages.
# lib/two_pc_web/channels/transaction_channel.ex
defmodule TwoPcWeb.TransactionChannel do
use TwoPcWeb, :channel
@impl true
def join("transaction:" <> tx_id, _payload, socket) do
Logger.info("Client joined channel for transaction #{tx_id}")
{:ok, "Joined transaction channel for #{tx_id}", socket}
end
# This channel is receive-only from the client's perspective.
# The server pushes all state changes.
@impl true
def handle_in("ping", payload, socket) do
{:reply, {:ok, payload}, socket}
end
end
# In user_socket.ex, you'd have:
# channel "transaction:*", TwoPcWeb.TransactionChannel
The frontend JavaScript is straightforward. It uses phoenix.js
to connect to the socket and join the channel for the transaction it wants to monitor.
// assets/js/socket.js
import { Socket } from "phoenix";
let socket = new Socket("/socket", { params: { token: window.userToken } });
socket.connect();
// Assuming we get the transaction ID from the page, e.g., from a data attribute.
const transactionId = document.getElementById("transaction-view").dataset.txId;
if (transactionId) {
let channel = socket.channel(`transaction:${transactionId}`, {});
channel.on("state_change", (payload) => {
console.log("New state received:", payload);
// Here you would update the UI
// For example, update a status header and the state of each participant.
document.getElementById("tx-status").innerText = `Status: ${payload.state}`;
Object.keys(payload.participants).forEach(name => {
const pState = payload.participants[name].state;
const el = document.getElementById(`participant-${name}-status`);
if (el) {
el.innerText = pState;
el.className = `status-${pState}`; // for CSS styling
}
});
});
channel.join()
.receive("ok", resp => { console.log("Joined successfully", resp) })
.receive("error", resp => { console.log("Unable to join", resp) });
}
export default socket;
This simple setup gave our operations team exactly what they needed: a clear, unambiguous, real-time view into the state of a critical deployment process. The Scrum process was invaluable here. The initial story was just “make deployments atomic.” It was through sprint reviews and feedback that the requirement for granular, real-time observability became a primary driver, which in turn validated our choice of the Phoenix/OTP stack.
Lingering Issues and Production Hardening
This implementation is a solid foundation, but it’s not a complete, production-ready system. The primary remaining weakness of 2PC is the coordinator recovery process. If the coordinator GenServer
crashes (or the whole node goes down) after the prepare
phase but before sending the final commit
or abort
decision, the participants are left in a blocked, “in-doubt” state. They have locked resources but don’t know whether to finalize or roll back the change.
To solve this, the coordinator’s state transitions must be persisted to a durable store before broadcasting messages. A simple approach is writing to a write-ahead log (WAL) on disk. On restart, a supervisor would read the log for any unfinished transactions, query the participants for their state, and correctly resume the commit or abort phase. This recovery logic is non-trivial to get right and represents the true cost of implementing a robust 2PC protocol.
Furthermore, the current coordinator is a single process. While OTP supervision makes it resilient, it doesn’t make it horizontally scalable. For our internal tool, this is an acceptable trade-off. For a high-throughput application, this single bottleneck would be a showstopper, pushing the architecture back toward eventually consistent models like Sagas. The applicability of this pattern is therefore limited to scenarios where strong consistency is paramount and transaction volume is manageable.