Constructing a Real-Time Raft Consensus Visualizer with Jetpack Compose for a GKE Cluster


Observing the internal state transitions of a distributed consensus algorithm is notoriously difficult. Standard logging across multiple nodes provides a fragmented, after-the-fact view that fails to capture the real-time dynamics of leader elections, log replication races, or split-brain scenarios. We faced this exact problem while building a fault-tolerant coordination service. Our initial tooling was inadequate for debugging the subtle timing issues inherent in our Raft implementation. This led to a critical decision: build a dedicated, real-time visualization and control plane.

The concept was to run our Raft cluster within a controlled, production-like environment (GKE) and interact with it via a rich desktop client. This client would not just be a passive observer but an active tool for inducing failures and monitoring recovery. For the implementation, Go was the natural choice for the Raft nodes due to its strong concurrency support. For the visualization tool, Jetpack Compose for Desktop offered a modern, declarative UI framework that could run on our engineering machines without browser-based complexities. A simple Web API would bridge these two worlds.

The Core Raft Implementation in Go

Before any visualization is possible, a functional Raft node is necessary. We opted to implement the core logic from scratch to maintain full control over instrumentation and state exposure. The central component is the Node struct, which encapsulates the entire state machine of a single participant in the cluster. In a real-world project, relying on a vetted library like hashicorp/raft is the pragmatic choice, but for this exercise, building it ourselves was key to understanding the failure modes we needed to visualize.

The node’s state includes persistent elements like the current term and log, as well as volatile state like the commit index and the identity of the current leader.

package raft

import (
	"log"
	"math/rand"
	"net/http"
	"sync"
	"time"
)

type State string

const (
	Follower  State = "Follower"
	Candidate State = "Candidate"
	Leader    State = "Leader"
)

type LogEntry struct {
	Term    int
	Command interface{}
}

// Node represents a single participant in the Raft cluster.
type Node struct {
	mu sync.RWMutex

	// Identity
	id    string   // Unique ID for this node
	peers []string // Addresses of other nodes in the cluster

	// Persistent state on all servers
	currentTerm int
	votedFor    string
	log         []LogEntry

	// Volatile state on all servers
	commitIndex int
	lastApplied int
	state       State

	// Volatile state on leaders
	nextIndex  map[string]int
	matchIndex map[string]int

	// Channels for communication and control
	electionTimer *time.Timer
	heartbeatChan chan struct{}
	shutdownChan  chan struct{}

	// API exposure
	apiServer *http.Server
}

func NewNode(id string, peers []string, apiAddr string) *Node {
	n := &Node{
		id:           id,
		peers:        peers,
		currentTerm:  0,
		votedFor:     "",
		log:          []LogEntry{{Term: 0}}, // Start with a dummy entry at index 0
		commitIndex:  0,
		lastApplied:  0,
		state:        Follower,
		heartbeatChan: make(chan struct{}, 1),
		shutdownChan: make(chan struct{}),
	}
	
	// The API server is started here to expose state.
	// We will define setupApiHandlers later.
	mux := http.NewServeMux()
	n.apiServer = &http.Server{Addr: apiAddr, Handler: mux}
	n.setupApiHandlers(mux)

	return n
}

// The main loop running the Raft state machine.
func (n *Node) Run() {
	log.Printf("[Node %s] Starting up in Follower state", n.id)
	n.resetElectionTimer()

	go func() {
		if err := n.apiServer.ListenAndServe(); err != http.ErrServerClosed {
			log.Fatalf("[Node %s] API server failed: %v", n.id, err)
		}
	}()

	for {
		select {
		case <-n.shutdownChan:
			log.Printf("[Node %s] Shutting down.", n.id)
			// Gracefully shutdown the API server
			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
			defer cancel()
			n.apiServer.Shutdown(ctx)
			return
		default:
			n.mu.RLock()
			currentState := n.state
			n.mu.RUnlock()

			switch currentState {
			case Follower:
				n.runFollower()
			case Candidate:
				n.runCandidate()
			case Leader:
				n.runLeader()
			}
		}
	}
}

// electionTimeout calculates a random timeout to prevent split votes.
func electionTimeout() time.Duration {
	return time.Duration(300+rand.Intn(300)) * time.Millisecond
}

func (n *Node) resetElectionTimer() {
	if n.electionTimer != nil {
		n.electionTimer.Stop()
	}
	n.electionTimer = time.NewTimer(electionTimeout())
}

func (n *Node) runFollower() {
	select {
	case <-n.electionTimer.C:
		// Timeout elapsed without hearing from a leader, start an election.
		log.Printf("[Node %s] Election timeout reached. Transitioning to Candidate.", n.id)
		n.mu.Lock()
		n.state = Candidate
		n.mu.Unlock()
		n.resetElectionTimer()
	case <-n.heartbeatChan:
		// Received a heartbeat or AppendEntries, so reset the timer.
		n.resetElectionTimer()
	}
}

// A simplified version of the candidate logic for brevity
func (n *Node) runCandidate() {
	n.mu.Lock()
	n.currentTerm++
	n.votedFor = n.id
	log.Printf("[Node %s] Starting election for term %d.", n.id, n.currentTerm)
	votesNeeded := (len(n.peers)+1)/2 + 1
	n.mu.Unlock()
	
	// In a real implementation, you'd send RequestVote RPCs to peers here.
	// For this simulation, we'll just wait and transition.
	// This part is complex and involves handling RPC responses concurrently.
	
	// Simulate winning or losing the election after a timeout
	select {
	case <-time.After(electionTimeout()):
		// Assume we won for demonstration purposes
		log.Printf("[Node %s] Election won. Transitioning to Leader.", n.id)
		n.mu.Lock()
		n.state = Leader
		n.mu.Unlock()
	case <-n.heartbeatChan:
		// Received AppendEntries from a new leader, revert to follower.
		log.Printf("[Node %s] Discovered new leader. Reverting to Follower.", n.id)
		n.mu.Lock()
		n.state = Follower
		n.mu.Unlock()
	}
}

// A simplified leader loop
func (n *Node) runLeader() {
	// A real implementation sends AppendEntries RPCs periodically.
	// This ensures other nodes don't time out and start new elections.
	heartbeatInterval := 100 * time.Millisecond
	ticker := time.NewTicker(heartbeatInterval)
	defer ticker.Stop()

	for {
		// Before doing anything, check if we are still the leader.
		n.mu.RLock()
		if n.state != Leader {
			n.mu.RUnlock()
			return // Exit the leader loop
		}
		n.mu.RUnlock()
		
		// Send heartbeats to all peers
		log.Printf("[Node %s] Sending heartbeats as leader for term %d.", n.id, n.currentTerm)
		// RPC sending logic would be here.
		
		<-ticker.C
	}
}

A critical pitfall here is race conditions. The Raft state machine is constantly being modified by timers and incoming RPCs. Every access to shared state like n.state or n.currentTerm must be protected by a mutex. We use sync.RWMutex to allow concurrent reads, which is a slight optimization as state is read more often than it’s written.

Exposing Internal State via a Web API

To make the node observable, we need an interface. A simple REST-like API served over HTTP is sufficient. The key endpoint is /status, which returns a JSON representation of the node’s current state. Another endpoint, /control, allows us to manually trigger state changes, simulating failures.

package raft

import (
	"encoding/json"
	"net/http"
	"log"
)

type NodeStatus struct {
	ID          string `json:"id"`
	State       State  `json:"state"`
	CurrentTerm int    `json:"currentTerm"`
	VotedFor    string `json:"votedFor"`
	CommitIndex int    `json:"commitIndex"`
	LogLength   int    `json:"logLength"`
}

func (n *Node) setupApiHandlers(mux *http.ServeMux) {
	mux.HandleFunc("/status", n.handleStatus)
	mux.HandleFunc("/control", n.handleControl)
}

func (n *Node) handleStatus(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodGet {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	n.mu.RLock()
	status := NodeStatus{
		ID:          n.id,
		State:       n.state,
		CurrentTerm: n.currentTerm,
		VotedFor:    n.votedFor,
		CommitIndex: n.commitIndex,
		LogLength:   len(n.log),
	}
	n.mu.RUnlock()

	w.Header().Set("Content-Type", "application/json")
	if err := json.NewEncoder(w).Encode(status); err != nil {
		log.Printf("[Node %s] Error encoding status: %v", n.id, err)
		http.Error(w, "Internal server error", http.StatusInternalServerError)
	}
}

// handleControl allows external tools to manipulate the node's state for testing.
type ControlCommand struct {
	Action string `json:"action"` // e.g., "stop", "start"
}

func (n *Node) handleControl(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	var cmd ControlCommand
	if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil {
		http.Error(w, "Invalid request body", http.StatusBadRequest)
		return
	}

	n.mu.Lock()
	defer n.mu.Unlock()

	switch cmd.Action {
	case "stop":
		// A "soft stop" just makes it a passive follower, not a real shutdown.
		log.Printf("[Node %s] Received 'stop' command. Forcing Follower state and disabling timer.", n.id)
		n.state = Follower
		if n.electionTimer != nil {
			n.electionTimer.Stop() // Stop it from starting elections
		}
		w.WriteHeader(http.StatusOK)
	case "start":
		log.Printf("[Node %s] Received 'start' command. Resetting election timer.", n.id)
		n.resetElectionTimer()
		w.WriteHeader(http.StatusOK)
	default:
		http.Error(w, "Unknown action", http.StatusBadRequest)
	}
}

The handler for /status must use a read lock (n.mu.RLock()) to ensure it provides a consistent snapshot of the node’s state without blocking the core consensus logic for long. The /control handler uses a full write lock as it directly mutates the node’s state.

Containerizing and Deploying to GKE

Running a distributed stateful application like a Raft cluster on Kubernetes requires careful configuration. A simple Deployment is insufficient because Raft nodes require stable network identities and persistent storage for their logs. The correct tool for this job is a StatefulSet.

First, the Dockerfile for our Go application:

# Dockerfile
FROM golang:1.21-alpine

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . .

RUN go build -o /raft-node ./cmd/server

EXPOSE 8080

CMD ["/raft-node"]

Next, the Kubernetes manifests. The crucial parts are the StatefulSet and the Headless Service. The Headless Service provides the stable DNS entries (raft-node-0.raft-service, raft-node-1.raft-service, etc.) that the nodes use to discover each other.

# raft-gke-deployment.yaml

# Headless service for stable network identities
apiVersion: v1
kind: Service
metadata:
  name: raft-service
  labels:
    app: raft
spec:
  ports:
  - port: 8080
    name: api
  clusterIP: None
  selector:
    app: raft
---
# StatefulSet for the Raft nodes
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: raft-node
spec:
  serviceName: "raft-service"
  replicas: 3
  selector:
    matchLabels:
      app: raft
  template:
    metadata:
      labels:
        app: raft
    spec:
      terminationGracePeriodSeconds: 10
      containers:
      - name: raft
        # Replace with your GCR/Artifact Registry image path
        image: gcr.io/your-project-id/raft-visualizer:v1.0.0
        ports:
        - containerPort: 8080
          name: api
        env:
        # Pass the node's ordinal index and namespace to the application
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: REPLICA_COUNT
          value: "3"
        # Liveness/Readiness probes are critical for production
        readinessProbe:
          httpGet:
            path: /status
            port: api
          initialDelaySeconds: 5
          periodSeconds: 5
---
# Service to expose the APIs externally via a Load Balancer
apiVersion: v1
kind: Service
metadata:
  name: raft-api-external
spec:
  type: LoadBalancer
  selector:
    app: raft
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8080

The application code needs to be aware of the Kubernetes environment to construct its peer list. It can use the POD_NAME and NAMESPACE environment variables to dynamically build the FQDNs for its peers. A common mistake is to hardcode peer lists, which makes the system brittle. Our Go main function would parse this:

// In cmd/server/main.go
func main() {
    podName := os.Getenv("POD_NAME")
    namespace := os.Getenv("NAMESPACE")
    replicaCountStr := os.Getenv("REPLICA_COUNT")
    // ... error handling and parsing ...

    // Example podName: "raft-node-1"
    parts := strings.Split(podName, "-")
    ordinal := parts[len(parts)-1]
    id := fmt.Sprintf("node-%s", ordinal)

    var peers []string
    replicaCount, _ := strconv.Atoi(replicaCountStr)
    for i := 0; i < replicaCount; i++ {
        peerOrdinal := strconv.Itoa(i)
        if peerOrdinal != ordinal {
            // Construct the FQDN for each peer
            peerAddr := fmt.Sprintf("raft-node-%d.raft-service.%s.svc.cluster.local:8080", i, namespace)
            peers = append(peers, peerAddr)
        }
    }
    
    node := raft.NewNode(id, peers, ":8080")
    node.Run()
}

This configuration ensures that if raft-node-1 pod dies and is rescheduled, it comes back with the same network identity and can rejoin the cluster seamlessly.

The Jetpack Compose Visualizer

The final piece is the desktop client. Using Jetpack Compose for Desktop with Kotlin allows for a clean, declarative approach to building the UI. The core of the application is a ViewModel that manages the state of the cluster by polling the API endpoints exposed by our GKE service.

First, the data model and API client in Kotlin:

// In shared/src/commonMain/kotlin/models.kt
import kotlinx.serialization.Serializable

@Serializable
data class NodeStatus(
    val id: String,
    val state: String,
    val currentTerm: Int,
    val votedFor: String,
    val commitIndex: Int,
    val logLength: Int
)

// A simple Ktor-based client
class RaftApiClient(private val baseUrls: List<String>) {
    private val client = HttpClient(CIO) {
        install(ContentNegotiation) { json() }
        install(HttpTimeout) { requestTimeoutMillis = 2000 }
    }

    suspend fun fetchAllStatuses(): List<Result<NodeStatus>> {
        return baseUrls.map { url ->
            try {
                Result.success(client.get("$url/status").body())
            } catch (e: Exception) {
                Result.failure(e)
            }
        }
    }
    
    // ... methods for sending control commands
}

The ViewModel uses Kotlin Coroutines and StateFlow to manage the UI state. It continuously polls the nodes and updates a single flow that the UI can observe.

// In desktop/src/jvmMain/kotlin/ClusterViewModel.kt
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.update

data class ClusterUiState(
    val nodeStatuses: Map<String, Result<NodeStatus>> = emptyMap(),
    val isLoading: Boolean = true
)

class ClusterViewModel(private val apiClient: RaftApiClient) {
    private val _uiState = MutableStateFlow(ClusterUiState())
    val uiState: StateFlow<ClusterUiState> = _uiState.asStateFlow()

    private val viewModelScope = CoroutineScope(Dispatchers.Default)

    init {
        startPolling()
    }

    private fun startPolling() {
        viewModelScope.launch {
            while (isActive) {
                val results = apiClient.fetchAllStatuses()
                val statusMap = results.associateBy { result ->
                    result.getOrNull()?.id ?: "unknown-${System.currentTimeMillis()}"
                }
                
                _uiState.update {
                    it.copy(nodeStatuses = statusMap, isLoading = false)
                }
                delay(1000) // Poll every second
            }
        }
    }

    fun shutdown() {
        viewModelScope.cancel()
    }
}

A potential issue here is the simple polling mechanism. With many nodes or a need for lower latency, this would be inefficient. A better production approach would involve WebSockets or gRPC streaming for push-based updates from the server.

Finally, the UI itself. We can create a Composable function that renders the cluster state. Each node is represented by a circle, color-coded by its state (e.g., green for Leader, yellow for Candidate, gray for Follower).

// In desktop/src/jvmMain/kotlin/Main.kt
import androidx.compose.desktop.ui.tooling.preview.Preview
import androidx.compose.foundation.Canvas
import androidx.compose.foundation.layout.*
import androidx.compose.material.MaterialTheme
import androidx.compose.material.Text
import androidx.compose.runtime.*
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.geometry.Offset
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.unit.dp
import androidx.compose.ui.window.Window
import androidx.compose.ui.window.application

@Composable
fun ClusterView(uiState: ClusterUiState) {
    val nodePositions = remember(uiState.nodeStatuses.size) {
        // Simple circular layout logic
        // This would be more sophisticated in a real app
        val center = Offset(250f, 250f)
        val radius = 200f
        val angleStep = 2 * Math.PI / ui.nodeStatuses.size
        uiState.nodeStatuses.keys.mapIndexed { index, id ->
            id to Offset(
                center.x + radius * kotlin.math.cos(angleStep * index).toFloat(),
                center.y + radius * kotlin.math.sin(angleStep * index).toFloat()
            )
        }.toMap()
    }

    Canvas(modifier = Modifier.fillMaxSize()) {
        uiState.nodeStatuses.forEach { (id, result) ->
            val position = nodePositions[id] ?: return@forEach

            val (nodeColor, nodeInfo) = if (result.isSuccess) {
                val status = result.getOrThrow()
                val color = when (status.state) {
                    "Leader" -> Color.Green
                    "Candidate" -> Color.Yellow
                    "Follower" -> Color.Gray
                    else -> Color.Red
                }
                color to "Term: ${status.currentTerm}\nLog: ${status.logLength}"
            } else {
                Color.Red to "Offline"
            }
            
            drawCircle(
                color = nodeColor,
                radius = 40f,
                center = position
            )
            // Drawing text on Canvas is more complex; in a real app, use overlays.
        }

        // Here you would draw lines for votes or connections
    }
}


@Composable
@Preview
fun App() {
    // In a real app, this would be injected via dependency injection.
    val viewModel = remember {
        // Assume GKE LoadBalancer IP is known
        val nodeUrls = listOf("http://34.123.45.67", "http://34.123.45.67", "http://34.123.45.67")
        // This is a simplification; you'd hit the load balancer which routes to different pods.
        // A better approach would be to get IPs from the K8s API or use NodePorts.
        val apiClient = RaftApiClient(nodeUrls)
        ClusterViewModel(apiClient)
    }

    val uiState by viewModel.uiState.collectAsState()

    MaterialTheme {
        Column(modifier = Modifier.padding(16.dp)) {
            Text("Raft Cluster Status", style = MaterialTheme.typography.h4)
            Spacer(Modifier.height(16.dp))
            ClusterView(uiState)
        }
    }
    
    DisposableEffect(Unit) {
        onDispose {
            viewModel.shutdown()
        }
    }
}

The architecture connecting these components can be visualized as follows:

graph TD
    A[Jetpack Compose App] -- HTTP API Calls --> B{GKE LoadBalancer};
    B -- Routes Traffic --> C1[Pod: raft-node-0];
    B -- Routes Traffic --> C2[Pod: raft-node-1];
    B -- Routes Traffic --> C3[Pod: raft-node-2];
    
    subgraph GKE Cluster
        C1 -- Raft RPC --> C2;
        C2 -- Raft RPC --> C3;
        C3 -- Raft RPC --> C1;
        
        D[Headless Service: raft-service] -.-> C1;
        D -.-> C2;
        D -.-> C3;
    end
    
    style A fill:#87CEEB,stroke:#333,stroke-width:2px
    style B fill:#f9f,stroke:#333,stroke-width:2px

This resulting system provides a powerful feedback loop. By using the UI to issue a stop command to the current leader, we can immediately observe the other nodes’ election timers expire, see them transition to candidates, vote for a new leader, and watch the system stabilize under a new term—all in real time.

The current implementation, while functional as a diagnostic tool, has clear boundaries. The Raft logic itself lacks critical production features like log compaction and dynamic membership changes. The API’s polling mechanism is inefficient and should be replaced with a streaming protocol for true real-time updates. Furthermore, the GKE deployment lacks robust security hardening, such as NetworkPolicies to restrict traffic between pods, and comprehensive observability through Prometheus metrics, which would be non-negotiable in a production environment. Future iterations would need to address these areas to move from a powerful visualizer to a production-grade system.


  TOC