Constructing a Raft-Based Distributed Lock Manager with Go and ScyllaDB for a C# Environment


Our existing Redis-based distributed locking, leveraging SET NX PX, began to show consistency cracks under specific network partition scenarios. For a critical financial transaction workflow, simple lease-based locks were insufficient; we required linearizable semantics to guarantee that only one process could orchestrate the final settlement at any given moment. This forced the decision to build a custom, fault-tolerant lock manager backed by a proper consensus protocol. The goal was not just correctness but also integrating the lock’s state directly into our primary data platform, ScyllaDB, for auditing and near real-time analytics, a feature impossible with off-the-shelf solutions like a standalone etcd cluster.

The technology selection process was rigorous. Using a full etcd or ZooKeeper cluster was ruled out early. We needed to embed custom logic directly into the state machine’s application layer—specifically, interacting with ScyllaDB upon log commitment. This pointed towards using a Raft library rather than a complete server. The etcd/raft library in Go was the obvious choice due to its production-proven stability and straightforward API. Go provided the performance and concurrency primitives needed for the server implementation. For communication, gRPC was a non-negotiable choice for its performance and strongly-typed contracts, essential for both the internal Raft transport and the public-facing API. The most unusual part of this architecture is the state store. Instead of an in-memory map or a local embedded database like BoltDB, we designated ScyllaDB as the durable persistence layer for the committed state. This decouples the consensus log from the state itself, allowing the lock metadata to be queried at scale. Finally, the primary consumer of this new service is our core .NET transaction engine, mandating a robust, production-ready C# client.

The Raft Node and gRPC Transport Layer

The foundation of the system is a Go application where each instance acts as a potential Raft node. The core logic revolves around wrapping the etcd/raft Node interface and managing its lifecycle. This involves feeding it messages from peers, proposing new commands from clients, and processing the resulting Ready struct that etcd/raft emits on each tick. The Ready struct contains new log entries to be persisted, messages to be sent to peers, and committed entries to be applied to the state machine.

Our transport layer is built entirely on gRPC. Each node exposes an internal gRPC service that other nodes use to exchange Raft messages. This is more complex than a simple HTTP transport but provides streaming capabilities and strict message schemas.

First, the internal protocol definition for Raft message exchange:

// internal/transport/raft_internal.proto
syntax = "proto3";

package transport;
option go_package = ".;transport";

// Raft messages are defined by the etcd/raft library.
// We just wrap them in our own message for transport.
import "go.etcd.io/etcd/raft/v3/raftpb/raft.proto";

service RaftInternal {
  // A streaming RPC for sending Raft messages.
  // This allows for a persistent connection between peers, reducing handshake overhead.
  rpc MessageStream(stream raftpb.Message) returns (stream raftpb.Message);
}

The server-side implementation of this transport is a Go struct that holds references to its peers.

// internal/node/transport.go
package node

import (
	"context"
	"fmt"
	"log"
	"net"
	"sync"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"go.etcd.io/etcd/raft/v3/raftpb"
	"path/to/project/internal/transport"
)

// Transport handles communication between Raft nodes.
type Transport struct {
	mu         sync.RWMutex
	NodeID     uint64
	ListenAddr string
	Peers      map[uint64]string // Map of node ID to its gRPC address
	clients    map[uint64]transport.RaftInternalClient
	server     *grpc.Server
	receiver   chan raftpb.Message // Channel to pass received messages to the Raft node
}

func NewTransport(nodeID uint64, listenAddr string, peers map[uint64]string) *Transport {
	return &Transport{
		NodeID:     nodeID,
		ListenAddr: listenAddr,
		Peers:      peers,
		clients:    make(map[uint64]transport.RaftInternalClient),
		receiver:   make(chan raftpb.Message, 256), // Buffered channel
	}
}

// Start initiates the gRPC server and connects to peers.
func (t *Transport) Start() error {
	lis, err := net.Listen("tcp", t.ListenAddr)
	if err != nil {
		return fmt.Errorf("failed to listen on %s: %w", t.ListenAddr, err)
	}

	t.server = grpc.NewServer()
	transport.RegisterRaftInternalServer(t.server, t)

	go func() {
		if err := t.server.Serve(lis); err != nil {
			log.Fatalf("gRPC server failed: %v", err)
		}
	}()

	return t.connectToPeers()
}

func (t *Transport) connectToPeers() error {
	t.mu.Lock()
	defer t.mu.Unlock()

	for id, addr := range t.Peers {
		if id == t.NodeID {
			continue // Don't connect to self
		}

		// A real-world project would require proper credentials and connection options.
		conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
		if err != nil {
			return fmt.Errorf("failed to dial peer %d at %s: %w", id, addr, err)
		}
		t.clients[id] = transport.NewRaftInternalClient(conn)
		log.Printf("Node %d connected to peer %d at %s", t.NodeID, id, addr)
	}
	return nil
}

// MessageStream is the gRPC handler for receiving messages from peers.
func (t *Transport) MessageStream(stream transport.RaftInternal_MessageStreamServer) error {
	for {
		msg, err := stream.Recv()
		if err != nil {
			// Connection closed or error.
			return err
		}
		// Pass the received message to the Raft node's processing loop.
		t.receiver <- *msg
	}
}

// Send dispatches Raft messages to the appropriate peer.
func (t *Transport) Send(messages []raftpb.Message) {
	for _, msg := range messages {
		peerID := msg.To
		t.mu.RLock()
		client, ok := t.clients[peerID]
		t.mu.RUnlock()

		if !ok {
			log.Printf("Node %d: No client found for peer %d", t.NodeID, peerID)
			continue
		}

		go func(m raftpb.Message) {
			// A simple stream implementation for demonstration.
			// A production system would manage streams more robustly,
			// potentially having one long-lived stream per peer.
			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
			defer cancel()

			stream, err := client.MessageStream(ctx)
			if err != nil {
				log.Printf("Node %d: Failed to create stream to peer %d: %v", t.NodeID, m.To, err)
				return
			}

			if err := stream.Send(&m); err != nil {
				log.Printf("Node %d: Failed to send message to peer %d: %v", t.NodeID, m.To, err)
			}
			// In this simple model, we close the stream immediately.
			// For performance, a long-lived stream is better.
			stream.CloseSend()
		}(msg)
	}
}

func (t *Transport) Stop() {
	if t.server != nil {
		t.server.GracefulStop()
	}
}

This transport layer forms the nervous system of our cluster. The architecture looks like this:

graph TD
    subgraph Node 1
        R1[Raft Core 1] -- msgs --> T1[Transport 1]
        T1 -- gRPC stream --> T2
        T1 -- gRPC stream --> T3
    end
    subgraph Node 2
        R2[Raft Core 2] -- msgs --> T2[Transport 2]
        T2 -- gRPC stream --> T1
        T2 -- gRPC stream --> T3
    end
    subgraph Node 3
        R3[Raft Core 3] -- msgs --> T3[Transport 3]
        T3 -- gRPC stream --> T1
        T3 -- gRPC stream --> T2
    end

    R1 <--> SM1[State Machine 1]
    R2 <--> SM2[State Machine 2]
    R3 <--> SM3[State Machine 3]
    
    SM1 -- LWT --> ScyllaDB
    SM2 -- LWT --> ScyllaDB
    SM3 -- LWT --> ScyllaDB

The State Machine and ScyllaDB Integration

This is where our design diverges significantly from a standard Raft implementation. The state machine’s role is to apply a committed log entry to a durable store. In our case, that store is ScyllaDB. The core challenge is ensuring that the application of the state is idempotent and correct, especially for lock acquisition. We solve this using ScyllaDB’s Lightweight Transactions (LWTs), which provide a conditional IF NOT EXISTS clause.

First, the schema in ScyllaDB:

CREATE KEYSPACE IF NOT EXISTS lock_manager
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };

USE lock_manager;

CREATE TABLE locks (
    resource_name TEXT PRIMARY KEY,
    owner_id TEXT,
    lease_id UUID,
    acquired_timestamp TIMESTAMP
);

The Go state machine interacts with this table. It receives commands from the Raft log, decodes them, and executes the corresponding ScyllaDB query.

// internal/fsm/scylla_fsm.go
package fsm

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/gocql/gocql"
)

// Command represents a proposal to be applied to the state machine.
type Command struct {
	Op       string `json:"op"` // "acquire", "release"
	Resource string `json:"resource"`
	OwnerID  string `json:"owner_id"`
	LeaseID  string `json:"lease_id"` // Used to ensure only the owner can release.
}

// ScyllaFSM is the finite state machine that persists state to ScyllaDB.
type ScyllaFSM struct {
	session *gocql.Session
}

func NewScyllaFSM(hosts []string) (*ScyllaFSM, error) {
	cluster := gocql.NewCluster(hosts...)
	cluster.Keyspace = "lock_manager"
	cluster.Consistency = gocql.Quorum
	cluster.Timeout = 5 * time.Second

	// In a real project, connection pooling and retry policies need careful tuning.
	session, err := cluster.CreateSession()
	if err != nil {
		return nil, fmt.Errorf("failed to connect to ScyllaDB: %w", err)
	}
	log.Println("ScyllaDB session created successfully.")

	return &ScyllaFSM{session: session}, nil
}

// Apply takes a committed log entry data and applies it to ScyllaDB.
func (fsm *ScyllaFSM) Apply(data []byte) error {
	var cmd Command
	if err := json.Unmarshal(data, &cmd); err != nil {
		// This indicates a bug or corrupted data, as proposals should be validated.
		log.Printf("Failed to unmarshal command: %v", err)
		return err
	}

	log.Printf("Applying command: Op=%s, Resource=%s", cmd.Op, cmd.Resource)

	switch cmd.Op {
	case "acquire":
		return fsm.handleAcquire(cmd)
	case "release":
		return fsm.handleRelease(cmd)
	default:
		return fmt.Errorf("unknown command op: %s", cmd.Op)
	}
}

func (fsm *ScyllaFSM) handleAcquire(cmd Command) error {
	// The core of our lock safety: a Lightweight Transaction.
	// This query will only succeed if the row for `resource_name` does not exist.
	query := `INSERT INTO locks (resource_name, owner_id, lease_id, acquired_timestamp) VALUES (?, ?, ?, ?) IF NOT EXISTS`
	
	applied, _, err := fsm.session.Query(query,
		cmd.Resource,
		cmd.OwnerID,
		gocql.TimeUUID(), // Generate a new lease ID on acquisition
		time.Now().UTC(),
	).WithContext(context.Background()).ScanCAS()

	if err != nil {
		log.Printf("Error during LWT for acquiring lock '%s': %v", cmd.Resource, err)
		return err
	}

	if !applied {
		// This is not an error, it's expected if the lock is already taken.
		// The client-facing API will handle returning a "lock not acquired" response.
		log.Printf("Lock acquisition failed for resource '%s' (already held).", cmd.Resource)
		// For the state machine, this is a successful application of a "no-op".
		return nil
	}

	log.Printf("Lock successfully acquired for resource '%s' by owner '%s'.", cmd.Resource, cmd.OwnerID)
	return nil
}

func (fsm *ScyllaFSM) handleRelease(cmd Command) error {
	// Releasing also uses LWT to ensure only the rightful owner releases the lock.
	// The lease_id is critical here to prevent a stale client from releasing a lock
	// that it no longer holds.
	parsedLeaseID, err := gocql.ParseUUID(cmd.LeaseID)
	if err != nil {
		return fmt.Errorf("invalid lease ID format for release: %w", err)
	}

	query := `DELETE FROM locks WHERE resource_name = ? IF lease_id = ?`

	applied, _, err := fsm.session.Query(query, cmd.Resource, parsedLeaseID).WithContext(context.Background()).ScanCAS()
	if err != nil {
		log.Printf("Error during LWT for releasing lock '%s': %v", cmd.Resource, err)
		return err
	}

	if !applied {
		log.Printf("Lock release failed for resource '%s' (lease ID mismatch or not held).", cmd.Resource)
		// Again, this is a successful no-op for the FSM.
		return nil
	}

	log.Printf("Lock successfully released for resource '%s'.", cmd.Resource)
	return nil
}

// Snapshot and Restore are critical for log compaction in Raft.
// A naive implementation is shown here. A real-world system needs to handle
// large state efficiently, perhaps by streaming data or using Scylla's snapshot tools.
func (fsm *ScyllaFSM) GetSnapshot() ([]byte, error) {
    // This is a major pitfall in practice. Iterating over the entire table
    // can be extremely slow and put heavy load on the database.
    // For a system with many locks, this is not a viable production strategy.
    iter := fsm.session.Query(`SELECT resource_name, owner_id, lease_id FROM locks`).Iter()
    var locks []map[string]interface{}
    var resource, owner string
    var lease gocql.UUID
    for iter.Scan(&resource, &owner, &lease) {
        locks = append(locks, map[string]interface{}{
            "resource": resource,
            "owner_id": owner,
            "lease_id": lease.String(),
        })
    }
    if err := iter.Close(); err != nil {
        return nil, err
    }
    return json.Marshal(locks)
}

func (fsm *ScyllaFSM) RestoreFromSnapshot(data []byte) error {
    // Another dangerous operation. This would typically involve clearing the table
    // and batch inserting, which requires careful planning during a node restart.
    log.Println("Restoring from snapshot... (This is a simplified implementation)")
    // ... logic to TRUNCATE and batch INSERT the restored data ...
    return nil
}

func (fsm *ScyllaFSM) Close() {
	fsm.session.Close()
}

The snapshotting implementation here is deliberately simplistic to highlight a common pitfall. Reading the entire state from ScyllaDB to create a snapshot is not scalable. A production system would need a more sophisticated approach, possibly involving timestamp-based queries or integrating with ScyllaDB’s backup mechanisms.

The Public API and Client Request Flow

The client-facing API is also defined using gRPC. It exposes methods to acquire and release locks.

// api/lock_service.proto
syntax = "proto3";

package api;
option go_package = ".;api";

service LockService {
  rpc AcquireLock(AcquireLockRequest) returns (AcquireLockResponse);
  rpc ReleaseLock(ReleaseLockRequest) returns (ReleaseLockResponse);
}

message AcquireLockRequest {
  string resource_name = 1;
  string owner_id = 2;
  // Timeout in milliseconds for the client waiting for the operation to complete.
  int64 client_timeout_ms = 3; 
}

message AcquireLockResponse {
  bool acquired = 1;
  string lease_id = 2; // Returned on successful acquisition.
  string error_message = 3;
}

message ReleaseLockRequest {
  string resource_name = 1;
  string lease_id = 2; // Must match the one from AcquireLockResponse.
}

message ReleaseLockResponse {
  bool released = 1;
  string error_message = 3;
}

The Go implementation of this service acts as the bridge between a client request and the Raft node. When an AcquireLock request arrives, the server serializes it into our internal Command struct and proposes it to the Raft cluster using node.Propose(). The handler then waits for a notification that this specific command has been committed and applied. This requires a mechanism to correlate proposals with their outcomes. A map of channels, indexed by the proposal ID, is a common pattern.

sequenceDiagram
    participant C as C# Client
    participant S as Lock Service (gRPC)
    participant R as Raft Node (Leader)
    participant FSM as State Machine
    participant DB as ScyllaDB

    C->>+S: AcquireLock(resource="res1", owner="ownerA")
    S->>S: Create Command{op: "acquire", ...}
    S->>+R: Propose(command_data)
    R->>R: Replicate log entry to followers
    Note over R: Wait for majority acknowledgement
    R->>-FSM: Apply(committed_entry)
    FSM->>+DB: INSERT ... IF NOT EXISTS
    DB-->>-FSM: applied=true
    FSM->>S: Notify(commit_index, result)
    S-->>-C: AcquireLockResponse{acquired=true, lease_id="..."}

The C# Client Implementation

The final piece is the C# client library. A production-grade client needs to be more than just a thin wrapper around the generated gRPC stubs. It must handle connection lifecycle, retries, deadlines, and cancellation.

First, the .csproj file would include the necessary gRPC packages:

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net8.0</TargetFramework>
  </PropertyGroup>

  <ItemGroup>
    <Protobuf Include="Protos\lock_service.proto" GrpcServices="Client" />
    <PackageReference Include="Google.Protobuf" Version="3.24.4" />
    <PackageReference Include="Grpc.Net.Client" Version="2.57.0" />
    <PackageReference Include="Grpc.Tools" Version="2.58.0">
      <PrivateAssets>all</PrivateAssets>
      <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
    </PackageReference>
  </ItemGroup>
</Project>

The client wrapper class focuses on usability and resilience.

// LockManagerClient.cs
using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Net.Client;
using Api; // Namespace from generated code

public class LockManagerClient : IDisposable
{
    private readonly GrpcChannel _channel;
    private readonly LockService.LockServiceClient _client;

    public LockManagerClient(string address)
    {
        // For production, configure HttpClientHandler for TLS, etc.
        _channel = GrpcChannel.ForAddress(address);
        _client = new LockService.LockServiceClient(_channel);
    }

    public async Task<(bool Acquired, string LeaseId)> AcquireLockAsync(
        string resourceName, 
        string ownerId, 
        TimeSpan timeout, 
        CancellationToken cancellationToken = default)
    {
        if (string.IsNullOrEmpty(resourceName) || string.IsNullOrEmpty(ownerId))
        {
            throw new ArgumentException("Resource name and owner ID must be provided.");
        }

        try
        {
            var request = new AcquireLockRequest
            {
                ResourceName = resourceName,
                OwnerId = ownerId,
                ClientTimeoutMs = (long)timeout.TotalMilliseconds
            };

            var headers = new Metadata(); // Can be used for auth tokens, tracing info
            var deadline = DateTime.UtcNow.Add(timeout);

            var response = await _client.AcquireLockAsync(
                request, 
                headers, 
                deadline, 
                cancellationToken);

            if (!response.Acquired && !string.IsNullOrEmpty(response.ErrorMessage))
            {
                // Log the server-side error for diagnostics
                Console.WriteLine($"Server error on acquire: {response.ErrorMessage}");
                return (false, null);
            }
            
            return (response.Acquired, response.LeaseId);
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.DeadlineExceeded)
        {
            Console.WriteLine($"AcquireLock timed out for resource '{resourceName}'. The operation may have succeeded on the server.");
            return (false, null);
        }
        catch (RpcException ex)
        {
            Console.WriteLine($"gRPC error acquiring lock '{resourceName}': {ex.Status}");
            // A real implementation would have more sophisticated error handling and retry logic
            // based on the status code (e.g., Unavailable).
            throw;
        }
    }
    
    // Test method to demonstrate usage
    public static async Task RunDemo()
    {
        // Assumes the service is running on localhost:50051
        var client = new LockManagerClient("http://localhost:50051");
        var resource = $"settlement-batch-{DateTime.UtcNow:yyyyMMdd}";
        var owner = $"worker-{Guid.NewGuid()}";

        Console.WriteLine($"Attempting to acquire lock for '{resource}' by '{owner}'...");
        
        var (acquired, leaseId) = await client.AcquireLockAsync(resource, owner, TimeSpan.FromSeconds(10));

        if (acquired)
        {
            Console.WriteLine($"Lock acquired successfully! Lease ID: {leaseId}");
            // ... perform critical work ...
            Console.WriteLine("Work complete. Releasing lock.");
            // await client.ReleaseLockAsync(resource, leaseId, ...);
        }
        else
        {
            Console.WriteLine("Failed to acquire lock.");
        }
    }

    public void Dispose()
    {
        _channel.Dispose();
    }
}

This client demonstrates key production considerations: explicit timeouts via deadlines, handling of RpcException for different status codes, and passing a CancellationToken. The comment about DeadlineExceeded is crucial: if a client times out, it has no way of knowing if the Raft proposal succeeded or not. This ambiguity must be handled by the application logic, often by designing operations to be idempotent.

Lingering Issues and Future Iterations

This architecture provides the required linearizable semantics but is not without its own set of trade-offs and complexities. The snapshotting strategy, as implemented, is a significant performance bottleneck and is not viable for a system with a large number of active locks. A more robust solution might involve the Raft leader maintaining a concurrent map of the lock state in memory for fast snapshot creation, using ScyllaDB purely as the durable write-ahead log’s ultimate destination and for external query capabilities.

Furthermore, the client does not currently implement leader discovery. It must be configured with the address of the current Raft leader. A production-grade client should be able to connect to any node in the cluster. Non-leader nodes would either reject the proposal with a redirect message containing the leader’s address or, preferably, proxy the proposal to the current leader transparently.

Finally, the system lacks a mechanism for handling client failures. A client that acquires a lock and then crashes will hold that lock indefinitely. This necessitates a lease-based system where locks have a TTL. The client would be responsible for periodically sending a “keep-alive” proposal through Raft to renew its lease. The state machine would then need a background process to scan for and expire locks whose leases have not been renewed, which adds another layer of complexity to the FSM’s design.


  TOC