Observing the internal state transitions of a distributed consensus algorithm is notoriously difficult. Standard logging across multiple nodes provides a fragmented, after-the-fact view that fails to capture the real-time dynamics of leader elections, log replication races, or split-brain scenarios. We faced this exact problem while building a fault-tolerant coordination service. Our initial tooling was inadequate for debugging the subtle timing issues inherent in our Raft implementation. This led to a critical decision: build a dedicated, real-time visualization and control plane.
The concept was to run our Raft cluster within a controlled, production-like environment (GKE) and interact with it via a rich desktop client. This client would not just be a passive observer but an active tool for inducing failures and monitoring recovery. For the implementation, Go was the natural choice for the Raft nodes due to its strong concurrency support. For the visualization tool, Jetpack Compose for Desktop offered a modern, declarative UI framework that could run on our engineering machines without browser-based complexities. A simple Web API would bridge these two worlds.
The Core Raft Implementation in Go
Before any visualization is possible, a functional Raft node is necessary. We opted to implement the core logic from scratch to maintain full control over instrumentation and state exposure. The central component is the Node
struct, which encapsulates the entire state machine of a single participant in the cluster. In a real-world project, relying on a vetted library like hashicorp/raft
is the pragmatic choice, but for this exercise, building it ourselves was key to understanding the failure modes we needed to visualize.
The node’s state includes persistent elements like the current term and log, as well as volatile state like the commit index and the identity of the current leader.
package raft
import (
"log"
"math/rand"
"net/http"
"sync"
"time"
)
type State string
const (
Follower State = "Follower"
Candidate State = "Candidate"
Leader State = "Leader"
)
type LogEntry struct {
Term int
Command interface{}
}
// Node represents a single participant in the Raft cluster.
type Node struct {
mu sync.RWMutex
// Identity
id string // Unique ID for this node
peers []string // Addresses of other nodes in the cluster
// Persistent state on all servers
currentTerm int
votedFor string
log []LogEntry
// Volatile state on all servers
commitIndex int
lastApplied int
state State
// Volatile state on leaders
nextIndex map[string]int
matchIndex map[string]int
// Channels for communication and control
electionTimer *time.Timer
heartbeatChan chan struct{}
shutdownChan chan struct{}
// API exposure
apiServer *http.Server
}
func NewNode(id string, peers []string, apiAddr string) *Node {
n := &Node{
id: id,
peers: peers,
currentTerm: 0,
votedFor: "",
log: []LogEntry{{Term: 0}}, // Start with a dummy entry at index 0
commitIndex: 0,
lastApplied: 0,
state: Follower,
heartbeatChan: make(chan struct{}, 1),
shutdownChan: make(chan struct{}),
}
// The API server is started here to expose state.
// We will define setupApiHandlers later.
mux := http.NewServeMux()
n.apiServer = &http.Server{Addr: apiAddr, Handler: mux}
n.setupApiHandlers(mux)
return n
}
// The main loop running the Raft state machine.
func (n *Node) Run() {
log.Printf("[Node %s] Starting up in Follower state", n.id)
n.resetElectionTimer()
go func() {
if err := n.apiServer.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("[Node %s] API server failed: %v", n.id, err)
}
}()
for {
select {
case <-n.shutdownChan:
log.Printf("[Node %s] Shutting down.", n.id)
// Gracefully shutdown the API server
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
n.apiServer.Shutdown(ctx)
return
default:
n.mu.RLock()
currentState := n.state
n.mu.RUnlock()
switch currentState {
case Follower:
n.runFollower()
case Candidate:
n.runCandidate()
case Leader:
n.runLeader()
}
}
}
}
// electionTimeout calculates a random timeout to prevent split votes.
func electionTimeout() time.Duration {
return time.Duration(300+rand.Intn(300)) * time.Millisecond
}
func (n *Node) resetElectionTimer() {
if n.electionTimer != nil {
n.electionTimer.Stop()
}
n.electionTimer = time.NewTimer(electionTimeout())
}
func (n *Node) runFollower() {
select {
case <-n.electionTimer.C:
// Timeout elapsed without hearing from a leader, start an election.
log.Printf("[Node %s] Election timeout reached. Transitioning to Candidate.", n.id)
n.mu.Lock()
n.state = Candidate
n.mu.Unlock()
n.resetElectionTimer()
case <-n.heartbeatChan:
// Received a heartbeat or AppendEntries, so reset the timer.
n.resetElectionTimer()
}
}
// A simplified version of the candidate logic for brevity
func (n *Node) runCandidate() {
n.mu.Lock()
n.currentTerm++
n.votedFor = n.id
log.Printf("[Node %s] Starting election for term %d.", n.id, n.currentTerm)
votesNeeded := (len(n.peers)+1)/2 + 1
n.mu.Unlock()
// In a real implementation, you'd send RequestVote RPCs to peers here.
// For this simulation, we'll just wait and transition.
// This part is complex and involves handling RPC responses concurrently.
// Simulate winning or losing the election after a timeout
select {
case <-time.After(electionTimeout()):
// Assume we won for demonstration purposes
log.Printf("[Node %s] Election won. Transitioning to Leader.", n.id)
n.mu.Lock()
n.state = Leader
n.mu.Unlock()
case <-n.heartbeatChan:
// Received AppendEntries from a new leader, revert to follower.
log.Printf("[Node %s] Discovered new leader. Reverting to Follower.", n.id)
n.mu.Lock()
n.state = Follower
n.mu.Unlock()
}
}
// A simplified leader loop
func (n *Node) runLeader() {
// A real implementation sends AppendEntries RPCs periodically.
// This ensures other nodes don't time out and start new elections.
heartbeatInterval := 100 * time.Millisecond
ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
for {
// Before doing anything, check if we are still the leader.
n.mu.RLock()
if n.state != Leader {
n.mu.RUnlock()
return // Exit the leader loop
}
n.mu.RUnlock()
// Send heartbeats to all peers
log.Printf("[Node %s] Sending heartbeats as leader for term %d.", n.id, n.currentTerm)
// RPC sending logic would be here.
<-ticker.C
}
}
A critical pitfall here is race conditions. The Raft state machine is constantly being modified by timers and incoming RPCs. Every access to shared state like n.state
or n.currentTerm
must be protected by a mutex. We use sync.RWMutex
to allow concurrent reads, which is a slight optimization as state is read more often than it’s written.
Exposing Internal State via a Web API
To make the node observable, we need an interface. A simple REST-like API served over HTTP is sufficient. The key endpoint is /status
, which returns a JSON representation of the node’s current state. Another endpoint, /control
, allows us to manually trigger state changes, simulating failures.
package raft
import (
"encoding/json"
"net/http"
"log"
)
type NodeStatus struct {
ID string `json:"id"`
State State `json:"state"`
CurrentTerm int `json:"currentTerm"`
VotedFor string `json:"votedFor"`
CommitIndex int `json:"commitIndex"`
LogLength int `json:"logLength"`
}
func (n *Node) setupApiHandlers(mux *http.ServeMux) {
mux.HandleFunc("/status", n.handleStatus)
mux.HandleFunc("/control", n.handleControl)
}
func (n *Node) handleStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
n.mu.RLock()
status := NodeStatus{
ID: n.id,
State: n.state,
CurrentTerm: n.currentTerm,
VotedFor: n.votedFor,
CommitIndex: n.commitIndex,
LogLength: len(n.log),
}
n.mu.RUnlock()
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(status); err != nil {
log.Printf("[Node %s] Error encoding status: %v", n.id, err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
}
}
// handleControl allows external tools to manipulate the node's state for testing.
type ControlCommand struct {
Action string `json:"action"` // e.g., "stop", "start"
}
func (n *Node) handleControl(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var cmd ControlCommand
if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
n.mu.Lock()
defer n.mu.Unlock()
switch cmd.Action {
case "stop":
// A "soft stop" just makes it a passive follower, not a real shutdown.
log.Printf("[Node %s] Received 'stop' command. Forcing Follower state and disabling timer.", n.id)
n.state = Follower
if n.electionTimer != nil {
n.electionTimer.Stop() // Stop it from starting elections
}
w.WriteHeader(http.StatusOK)
case "start":
log.Printf("[Node %s] Received 'start' command. Resetting election timer.", n.id)
n.resetElectionTimer()
w.WriteHeader(http.StatusOK)
default:
http.Error(w, "Unknown action", http.StatusBadRequest)
}
}
The handler for /status
must use a read lock (n.mu.RLock()
) to ensure it provides a consistent snapshot of the node’s state without blocking the core consensus logic for long. The /control
handler uses a full write lock as it directly mutates the node’s state.
Containerizing and Deploying to GKE
Running a distributed stateful application like a Raft cluster on Kubernetes requires careful configuration. A simple Deployment
is insufficient because Raft nodes require stable network identities and persistent storage for their logs. The correct tool for this job is a StatefulSet
.
First, the Dockerfile for our Go application:
# Dockerfile
FROM golang:1.21-alpine
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o /raft-node ./cmd/server
EXPOSE 8080
CMD ["/raft-node"]
Next, the Kubernetes manifests. The crucial parts are the StatefulSet
and the Headless Service
. The Headless Service provides the stable DNS entries (raft-node-0.raft-service
, raft-node-1.raft-service
, etc.) that the nodes use to discover each other.
# raft-gke-deployment.yaml
# Headless service for stable network identities
apiVersion: v1
kind: Service
metadata:
name: raft-service
labels:
app: raft
spec:
ports:
- port: 8080
name: api
clusterIP: None
selector:
app: raft
---
# StatefulSet for the Raft nodes
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: raft-node
spec:
serviceName: "raft-service"
replicas: 3
selector:
matchLabels:
app: raft
template:
metadata:
labels:
app: raft
spec:
terminationGracePeriodSeconds: 10
containers:
- name: raft
# Replace with your GCR/Artifact Registry image path
image: gcr.io/your-project-id/raft-visualizer:v1.0.0
ports:
- containerPort: 8080
name: api
env:
# Pass the node's ordinal index and namespace to the application
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: REPLICA_COUNT
value: "3"
# Liveness/Readiness probes are critical for production
readinessProbe:
httpGet:
path: /status
port: api
initialDelaySeconds: 5
periodSeconds: 5
---
# Service to expose the APIs externally via a Load Balancer
apiVersion: v1
kind: Service
metadata:
name: raft-api-external
spec:
type: LoadBalancer
selector:
app: raft
ports:
- protocol: TCP
port: 80
targetPort: 8080
The application code needs to be aware of the Kubernetes environment to construct its peer list. It can use the POD_NAME
and NAMESPACE
environment variables to dynamically build the FQDNs for its peers. A common mistake is to hardcode peer lists, which makes the system brittle. Our Go main
function would parse this:
// In cmd/server/main.go
func main() {
podName := os.Getenv("POD_NAME")
namespace := os.Getenv("NAMESPACE")
replicaCountStr := os.Getenv("REPLICA_COUNT")
// ... error handling and parsing ...
// Example podName: "raft-node-1"
parts := strings.Split(podName, "-")
ordinal := parts[len(parts)-1]
id := fmt.Sprintf("node-%s", ordinal)
var peers []string
replicaCount, _ := strconv.Atoi(replicaCountStr)
for i := 0; i < replicaCount; i++ {
peerOrdinal := strconv.Itoa(i)
if peerOrdinal != ordinal {
// Construct the FQDN for each peer
peerAddr := fmt.Sprintf("raft-node-%d.raft-service.%s.svc.cluster.local:8080", i, namespace)
peers = append(peers, peerAddr)
}
}
node := raft.NewNode(id, peers, ":8080")
node.Run()
}
This configuration ensures that if raft-node-1
pod dies and is rescheduled, it comes back with the same network identity and can rejoin the cluster seamlessly.
The Jetpack Compose Visualizer
The final piece is the desktop client. Using Jetpack Compose for Desktop with Kotlin allows for a clean, declarative approach to building the UI. The core of the application is a ViewModel
that manages the state of the cluster by polling the API endpoints exposed by our GKE service.
First, the data model and API client in Kotlin:
// In shared/src/commonMain/kotlin/models.kt
import kotlinx.serialization.Serializable
@Serializable
data class NodeStatus(
val id: String,
val state: String,
val currentTerm: Int,
val votedFor: String,
val commitIndex: Int,
val logLength: Int
)
// A simple Ktor-based client
class RaftApiClient(private val baseUrls: List<String>) {
private val client = HttpClient(CIO) {
install(ContentNegotiation) { json() }
install(HttpTimeout) { requestTimeoutMillis = 2000 }
}
suspend fun fetchAllStatuses(): List<Result<NodeStatus>> {
return baseUrls.map { url ->
try {
Result.success(client.get("$url/status").body())
} catch (e: Exception) {
Result.failure(e)
}
}
}
// ... methods for sending control commands
}
The ViewModel
uses Kotlin Coroutines and StateFlow
to manage the UI state. It continuously polls the nodes and updates a single flow that the UI can observe.
// In desktop/src/jvmMain/kotlin/ClusterViewModel.kt
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.update
data class ClusterUiState(
val nodeStatuses: Map<String, Result<NodeStatus>> = emptyMap(),
val isLoading: Boolean = true
)
class ClusterViewModel(private val apiClient: RaftApiClient) {
private val _uiState = MutableStateFlow(ClusterUiState())
val uiState: StateFlow<ClusterUiState> = _uiState.asStateFlow()
private val viewModelScope = CoroutineScope(Dispatchers.Default)
init {
startPolling()
}
private fun startPolling() {
viewModelScope.launch {
while (isActive) {
val results = apiClient.fetchAllStatuses()
val statusMap = results.associateBy { result ->
result.getOrNull()?.id ?: "unknown-${System.currentTimeMillis()}"
}
_uiState.update {
it.copy(nodeStatuses = statusMap, isLoading = false)
}
delay(1000) // Poll every second
}
}
}
fun shutdown() {
viewModelScope.cancel()
}
}
A potential issue here is the simple polling mechanism. With many nodes or a need for lower latency, this would be inefficient. A better production approach would involve WebSockets or gRPC streaming for push-based updates from the server.
Finally, the UI itself. We can create a Composable
function that renders the cluster state. Each node is represented by a circle, color-coded by its state (e.g., green for Leader, yellow for Candidate, gray for Follower).
// In desktop/src/jvmMain/kotlin/Main.kt
import androidx.compose.desktop.ui.tooling.preview.Preview
import androidx.compose.foundation.Canvas
import androidx.compose.foundation.layout.*
import androidx.compose.material.MaterialTheme
import androidx.compose.material.Text
import androidx.compose.runtime.*
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.geometry.Offset
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.unit.dp
import androidx.compose.ui.window.Window
import androidx.compose.ui.window.application
@Composable
fun ClusterView(uiState: ClusterUiState) {
val nodePositions = remember(uiState.nodeStatuses.size) {
// Simple circular layout logic
// This would be more sophisticated in a real app
val center = Offset(250f, 250f)
val radius = 200f
val angleStep = 2 * Math.PI / ui.nodeStatuses.size
uiState.nodeStatuses.keys.mapIndexed { index, id ->
id to Offset(
center.x + radius * kotlin.math.cos(angleStep * index).toFloat(),
center.y + radius * kotlin.math.sin(angleStep * index).toFloat()
)
}.toMap()
}
Canvas(modifier = Modifier.fillMaxSize()) {
uiState.nodeStatuses.forEach { (id, result) ->
val position = nodePositions[id] ?: return@forEach
val (nodeColor, nodeInfo) = if (result.isSuccess) {
val status = result.getOrThrow()
val color = when (status.state) {
"Leader" -> Color.Green
"Candidate" -> Color.Yellow
"Follower" -> Color.Gray
else -> Color.Red
}
color to "Term: ${status.currentTerm}\nLog: ${status.logLength}"
} else {
Color.Red to "Offline"
}
drawCircle(
color = nodeColor,
radius = 40f,
center = position
)
// Drawing text on Canvas is more complex; in a real app, use overlays.
}
// Here you would draw lines for votes or connections
}
}
@Composable
@Preview
fun App() {
// In a real app, this would be injected via dependency injection.
val viewModel = remember {
// Assume GKE LoadBalancer IP is known
val nodeUrls = listOf("http://34.123.45.67", "http://34.123.45.67", "http://34.123.45.67")
// This is a simplification; you'd hit the load balancer which routes to different pods.
// A better approach would be to get IPs from the K8s API or use NodePorts.
val apiClient = RaftApiClient(nodeUrls)
ClusterViewModel(apiClient)
}
val uiState by viewModel.uiState.collectAsState()
MaterialTheme {
Column(modifier = Modifier.padding(16.dp)) {
Text("Raft Cluster Status", style = MaterialTheme.typography.h4)
Spacer(Modifier.height(16.dp))
ClusterView(uiState)
}
}
DisposableEffect(Unit) {
onDispose {
viewModel.shutdown()
}
}
}
The architecture connecting these components can be visualized as follows:
graph TD A[Jetpack Compose App] -- HTTP API Calls --> B{GKE LoadBalancer}; B -- Routes Traffic --> C1[Pod: raft-node-0]; B -- Routes Traffic --> C2[Pod: raft-node-1]; B -- Routes Traffic --> C3[Pod: raft-node-2]; subgraph GKE Cluster C1 -- Raft RPC --> C2; C2 -- Raft RPC --> C3; C3 -- Raft RPC --> C1; D[Headless Service: raft-service] -.-> C1; D -.-> C2; D -.-> C3; end style A fill:#87CEEB,stroke:#333,stroke-width:2px style B fill:#f9f,stroke:#333,stroke-width:2px
This resulting system provides a powerful feedback loop. By using the UI to issue a stop
command to the current leader, we can immediately observe the other nodes’ election timers expire, see them transition to candidates, vote for a new leader, and watch the system stabilize under a new term—all in real time.
The current implementation, while functional as a diagnostic tool, has clear boundaries. The Raft logic itself lacks critical production features like log compaction and dynamic membership changes. The API’s polling mechanism is inefficient and should be replaced with a streaming protocol for true real-time updates. Furthermore, the GKE deployment lacks robust security hardening, such as NetworkPolicies to restrict traffic between pods, and comprehensive observability through Prometheus metrics, which would be non-negotiable in a production environment. Future iterations would need to address these areas to move from a powerful visualizer to a production-grade system.