The initial state of our feature serving was brittle. Feature logic—data sources, transformations, default values—was embedded directly within the Go service binary. Every minor tweak to a feature, like changing a Redis key pattern or adjusting a time window, necessitated a full build and deployment cycle across dozens of nodes. This process was not only slow but also introduced significant risk. A single faulty feature deployment could degrade model performance and required a cumbersome rollback. The core technical pain point was the tight coupling of feature definitions with the application’s deployment lifecycle.
Our goal became to externalize these feature definitions, allowing them to be updated dynamically across our entire serving fleet without a single restart. The initial concept was simple: a central repository for feature configurations that our Go-Gin services could poll for changes. A relational database was briefly considered, but polling is inefficient and introduces latency. We needed a push-based mechanism. The services needed to be notified instantly when a feature definition changed.
This requirement led us to evaluate tools designed for distributed coordination and configuration management. This is where etcd
entered the picture. While often used for service discovery in Kubernetes, its core capabilities—a consistent, replicated key-value store with a powerful watch
API—made it a perfect candidate for our control plane. Feature definitions would be stored as values in etcd
, and our services would subscribe to changes on a specific key prefix. Go, with its excellent concurrency primitives, was the ideal language to build the service that would listen for these updates in a background goroutine while serving traffic via Gin.
The final architecture solidified around this principle: etcd
acts as the source of truth for all feature definitions, and each Go-Gin serving instance runs a background watcher. When a feature definition is updated in etcd
, the watcher triggers a hot-reload of the feature’s logic within the service, ensuring all nodes converge on the new definition within milliseconds.
Defining the Feature Schema and Control Plane
Before writing any service code, we needed a clear contract for what a “feature” is. Storing it as a structured format like JSON in etcd
was the obvious choice. This schema needed to be expressive enough to handle our current and future needs. A real-world project requires a schema that defines not just the source but also the transformation pipeline.
Here is the FeatureDefinition
struct we settled on, which would be serialized into JSON and stored in etcd
.
package core
import "time"
// DataSourceType defines the origin of the feature data.
type DataSourceType string
const (
SourceRedis DataSourceType = "redis"
SourceSQL DataSourceType = "sql"
SourceDefault DataSourceType = "default"
)
// Transformation defines a single step in a feature processing pipeline.
type Transformation struct {
Name string `json:"name"` // e.g., "multiply_by_constant"
Arguments map[string]interface{} `json:"arguments"` // e.g., {"value": 100}
}
// FeatureDefinition represents the complete configuration for a single feature.
// This struct is serialized to JSON and stored in etcd.
type FeatureDefinition struct {
Name string `json:"name"` // Unique name of the feature, e.g., "user_7day_purchase_amount"
Version int64 `json:"version"` // Monotonically increasing version
Description string `json:"description"` // Human-readable description
ValueType string `json:"value_type"` // "float", "int", "string", "bool"
DataSourceType DataSourceType `json:"data_source_type"` // Where to fetch the raw data from
DataSource string `json:"data_source"` // e.g., a Redis key pattern "user:{user_id}:purchases" or a SQL query
Transformations []Transformation `json:"transformations"`// Ordered list of transformations to apply
CacheTTL time.Duration `json:"cache_ttl"` // How long to cache the computed feature value
DefaultValue interface{} `json:"default_value"` // Value to return if fetching fails
Enabled bool `json:"enabled"` // Feature flag for this definition
}
With this schema, an etcd
key might be /features/user_7day_purchase_amount
, and the value would be the JSON representation of a FeatureDefinition
instance. The version number is critical for auditing and preventing stale writes.
Building the etcd-backed Feature Manager
The heart of the system is a component responsible for maintaining the in-memory state of all active features and handling the update logic triggered by etcd
. We’ll call this the FeatureManager
. A common mistake is to handle this with ad-hoc locking, which can easily lead to race conditions. A production-grade implementation requires careful management of concurrent reads (from API handlers) and writes (from the etcd
watcher).
sequenceDiagram participant EtcdWatcher as etcd Watcher (Goroutine) participant EtcdServer as etcd Server participant FeatureManager as FeatureManager participant APIHandler as API Handler (Gin) EtcdWatcher->>EtcdServer: Watch("/features/...") EtcdServer-->>EtcdWatcher: Initial feature set EtcdWatcher->>FeatureManager: LoadInitialFeatures(defs) loop On Update EtcdServer-->>EtcdWatcher: PUT /features/feature_X (v2) EtcdWatcher->>FeatureManager: UpdateFeature(def_X_v2) end loop On Request APIHandler->>FeatureManager: GetFeature("feature_X") FeatureManager-->>APIHandler: Return compiled feature_X (v2) end
The FeatureManager
will use a sync.RWMutex
to protect its internal map of features. This allows multiple concurrent reads from API handlers but ensures that writes from the watcher are exclusive.
Here’s the core implementation of the FeatureManager
. Note the clear separation between the manager itself and the etcd
client logic, which will be injected.
package manager
import (
"context"
"encoding/json"
"log"
"sync"
"time"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"feature-store/core" // Assumes the structs from above are in this package
)
const (
etcdFeaturePrefix = "/features/"
etcdRequestTimeout = 5 * time.Second
)
// ExecutableFeature represents a compiled, ready-to-use feature.
// In a real system, this would contain compiled logic, not just the definition.
// For this example, it's a wrapper around the definition.
type ExecutableFeature struct {
Def *core.FeatureDefinition
// In a real implementation, this would hold pre-compiled transformation functions,
// database statement handles, etc.
}
// FeatureManager holds the live, in-memory collection of features
// and manages updates from etcd.
type FeatureManager struct {
etcdClient *clientv3.Client
logger *zap.Logger
features map[string]*ExecutableFeature
mu sync.RWMutex
}
// NewFeatureManager creates and initializes a new FeatureManager.
func NewFeatureManager(etcdEndpoints []string, logger *zap.Logger) (*FeatureManager, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: etcdEndpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
return &FeatureManager{
etcdClient: cli,
logger: logger,
features: make(map[string]*ExecutableFeature),
}, nil
}
// StartWatcher initializes the feature set from etcd and starts a watcher goroutine
// to listen for updates. This must be called to make the manager live.
func (fm *FeatureManager) StartWatcher(ctx context.Context) error {
if err := fm.loadInitialFeatures(ctx); err != nil {
return err
}
go fm.watchForChanges(ctx)
return nil
}
// GetFeature safely retrieves a feature by name. It's safe for concurrent use.
func (fm *FeatureManager) GetFeature(name string) (*ExecutableFeature, bool) {
fm.mu.RLock()
defer fm.mu.RUnlock()
feature, ok := fm.features[name]
return feature, ok
}
// loadInitialFeatures performs an initial scan of the etcd prefix to populate the
// feature map on startup.
func (fm *FeatureManager) loadInitialFeatures(ctx context.Context) error {
fm.logger.Info("Performing initial load of feature definitions from etcd")
reqCtx, cancel := context.WithTimeout(ctx, etcdRequestTimeout)
defer cancel()
resp, err := fm.etcdClient.Get(reqCtx, etcdFeaturePrefix, clientv3.WithPrefix())
if err != nil {
return err
}
fm.mu.Lock()
defer fm.mu.Unlock()
for _, kv := range resp.Kvs {
fm.processPut(kv.Value)
}
fm.logger.Info("Initial feature load complete", zap.Int("count", len(fm.features)))
return nil
}
// watchForChanges runs in a background goroutine, watching for any changes
// under the feature prefix in etcd.
func (fm *FeatureManager) watchForChanges(ctx context.Context) {
fm.logger.Info("Starting etcd watcher for feature changes", zap.String("prefix", etcdFeaturePrefix))
watchChan := fm.etcdClient.Watch(ctx, etcdFeaturePrefix, clientv3.WithPrefix())
for watchResp := range watchChan {
if err := watchResp.Err(); err != nil {
fm.logger.Error("etcd watch error", zap.Error(err))
// In a production system, you might want to implement a backoff-retry mechanism here.
// Or if the error indicates a compacted revision, trigger a full reload.
continue
}
for _, event := range watchResp.Events {
fm.mu.Lock()
switch event.Type {
case clientv3.EventTypePut:
fm.logger.Info("Received feature update event", zap.String("key", string(event.Kv.Key)))
fm.processPut(event.Kv.Value)
case clientv3.EventTypeDelete:
fm.logger.Info("Received feature delete event", zap.String("key", string(event.Kv.Key)))
fm.processDelete(string(event.Kv.Key))
}
fm.mu.Unlock()
}
}
fm.logger.Info("etcd watcher stopped")
}
// processPut deserializes and updates/adds a feature definition.
// This function MUST be called with the write lock held.
func (fm *FeatureManager) processPut(value []byte) {
var def core.FeatureDefinition
if err := json.Unmarshal(value, &def); err != nil {
fm.logger.Error("Failed to unmarshal feature definition", zap.Error(err), zap.String("payload", string(value)))
return
}
// The pitfall here is not validating the definition. In production, you'd have
// a robust validation step here.
if !def.Enabled {
delete(fm.features, def.Name)
fm.logger.Warn("Feature definition is disabled, removing from active set", zap.String("name", def.Name))
return
}
// "Compiling" the feature. For now, it's just storing the definition.
executableFeature := &ExecutableFeature{Def: &def}
fm.features[def.Name] = executableFeature
fm.logger.Info("Successfully loaded/updated feature", zap.String("name", def.Name), zap.Int64("version", def.Version))
}
// processDelete removes a feature from the map.
// It derives the feature name from the etcd key.
// This function MUST be called with the write lock held.
func (fm *FeatureManager) processDelete(key string) {
// Key is in format "/features/feature_name"
featureName := key[len(etcdFeaturePrefix):]
if _, ok := fm.features[featureName]; ok {
delete(fm.features, featureName)
fm.logger.Info("Successfully deleted feature", zap.String("name", featureName))
}
}
// Close gracefully shuts down the etcd client connection.
func (fm *FeatureManager) Close() error {
return fm.etcdClient.Close()
}
This manager is the robust core. It handles the initial load, watches for real-time changes, and provides a thread-safe way for the rest of the application to access feature configurations. The logging is crucial for observability in a production environment.
Integrating with Go-Gin for the Serving Layer
With the FeatureManager
in place, creating the Gin API is straightforward. The API handler’s only job is to receive a request, ask the FeatureManager
for the requested feature, execute it, and return the result. The handler is completely unaware of etcd
; this separation of concerns is a key design principle.
package api
import (
"net/http"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"feature-store/manager"
)
// FeatureServer encapsulates the Gin engine and dependencies.
type FeatureServer struct {
engine *gin.Engine
fm *manager.FeatureManager
logger *zap.Logger
}
// NewFeatureServer creates a new server and sets up routing.
func NewFeatureServer(fm *manager.FeatureManager, logger *zap.Logger) *FeatureServer {
engine := gin.New()
// Use structured logging middleware
engine.Use(gin.Recovery())
server := &FeatureServer{
engine: engine,
fm: fm,
logger: logger,
}
server.setupRoutes()
return server
}
func (s *FeatureServer) setupRoutes() {
v1 := s.engine.Group("/v1")
{
v1.POST("/features/get", s.handleGetFeatures)
}
}
// Run starts the HTTP server.
func (s *FeatureServer) Run(addr string) error {
return s.engine.Run(addr)
}
type GetFeaturesRequest struct {
FeatureNames []string `json:"feature_names" binding:"required"`
Entities map[string]interface{} `json:"entities" binding:"required"` // e.g., {"user_id": 123}
}
type FeatureResult struct {
Name string `json:"name"`
Value interface{} `json:"value"`
Error string `json:"error,omitempty"`
}
func (s *FeatureServer) handleGetFeatures(c *gin.Context) {
var req GetFeaturesRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
results := make([]FeatureResult, 0, len(req.FeatureNames))
for _, name := range req.FeatureNames {
feature, ok := s.fm.GetFeature(name)
if !ok {
results = append(results, FeatureResult{Name: name, Error: "feature not found"})
continue
}
// In a real system, this is where you would execute the feature logic:
// 1. Interpolate entity IDs into the data_source string.
// 2. Connect to the specified data_source_type (Redis, SQL).
// 3. Fetch the raw value.
// 4. Apply the chain of transformations.
// 5. Handle errors and fall back to the default value.
// For this example, we'll just return the default value.
s.logger.Debug("Executing feature", zap.String("name", name), zap.Any("entities", req.Entities))
results = append(results, FeatureResult{
Name: name,
Value: feature.Def.DefaultValue, // Placeholder for actual execution
})
}
c.JSON(http.StatusOK, gin.H{"features": results})
}
Main Application and Graceful Shutdown
Tying everything together requires a main
function that initializes all components, starts the background watcher, and handles graceful shutdown signals from the operating system. This is a non-negotiable part of any real-world service.
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"go.uber.org/zap"
"feature-store/api"
"feature-store/manager"
)
func main() {
// Production-grade logging is essential.
logger, err := zap.NewProduction()
if err != nil {
log.Fatalf("can't initialize zap logger: %v", err)
}
defer logger.Sync()
etcdEndpoints := []string{"localhost:2379"} // Should be configurable
// 1. Initialize the FeatureManager
fm, err := manager.NewFeatureManager(etcdEndpoints, logger)
if err != nil {
logger.Fatal("Failed to create feature manager", zap.Error(err))
}
defer fm.Close()
// 2. Start the background watcher
// Use a cancellable context for the entire application lifetime.
appCtx, cancelApp := context.WithCancel(context.Background())
defer cancelApp()
if err := fm.StartWatcher(appCtx); err != nil {
logger.Fatal("Failed to start feature manager watcher", zap.Error(err))
}
// 3. Initialize the Gin server
server := api.NewFeatureServer(fm, logger)
httpServer := &http.Server{
Addr: ":8080",
Handler: server.engine,
}
// 4. Start the server in a goroutine so it doesn't block.
go func() {
logger.Info("Starting HTTP server", zap.String("address", httpServer.Addr))
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Fatal("Failed to start HTTP server", zap.Error(err))
}
}()
// 5. Wait for interrupt signal to gracefully shut down the server.
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
logger.Info("Shutting down server...")
// The context is used to inform the server it has 5 seconds to finish
// the requests it is currently handling
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelShutdown()
// This cancels the context for the etcd watcher, causing it to stop.
cancelApp()
if err := httpServer.Shutdown(shutdownCtx); err != nil {
logger.Fatal("Server forced to shutdown:", zap.Error(err))
}
logger.Info("Server exiting")
}
To test this entire system, you would start an etcd
instance, run this Go application, and then use etcdctl
to manipulate feature definitions.
For example, to create a new feature:etcdctl put /features/user_profile_age '{"name":"user_profile_age","version":1,"description":"Age of the user from their profile.","value_type":"int","data_source_type":"default","data_source":"","transformations":[],"cache_ttl":3600,"default_value":99,"enabled":true}'
Upon executing this command, the running Go service would log that it received and processed the update. A subsequent API call to fetch user_profile_age
would now succeed. Updating it or deleting the key would have similarly immediate effects across all running service instances.
Testing the Core Logic
Unit testing the FeatureManager
is critical to ensure the locking and update logic is correct. We don’t need a real etcd
server for this; we can mock the interactions. However, for a system this dependent on etcd
‘s behavior, an integration test using a library like etcd-testing
is a better real-world approach. For brevity, here’s the conceptual structure for a unit test of the update mechanism.
package manager
import (
"context"
"encoding/json"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
"feature-store/core"
)
// This test focuses on the internal logic of processPut/processDelete
// It doesn't use a real etcd client.
func TestFeatureManager_ProcessEvents(t *testing.T) {
logger := zap.NewNop()
fm := &FeatureManager{
logger: logger,
features: make(map[string]*ExecutableFeature),
}
def1 := core.FeatureDefinition{
Name: "feature1",
Version: 1,
Enabled: true,
DefaultValue: 10,
}
def1Json, _ := json.Marshal(def1)
def2 := core.FeatureDefinition{
Name: "feature2",
Version: 1,
Enabled: false, // Disabled feature
DefaultValue: 20,
}
def2Json, _ := json.Marshal(def2)
// Test 1: Add a new, enabled feature
fm.mu.Lock()
fm.processPut(def1Json)
fm.mu.Unlock()
f, ok := fm.GetFeature("feature1")
require.True(t, ok)
assert.Equal(t, int64(1), f.Def.Version)
assert.Equal(t, float64(10), f.Def.DefaultValue) // JSON unmarshals numbers to float64 by default
// Test 2: Adding a disabled feature should not add it to the map
fm.mu.Lock()
fm.processPut(def2Json)
fm.mu.Unlock()
_, ok = fm.GetFeature("feature2")
require.False(t, ok)
// Test 3: Deleting a feature
fm.mu.Lock()
fm.processDelete(etcdFeaturePrefix + "feature1")
fm.mu.Unlock()
_, ok = fm.GetFeature("feature1")
require.False(t, ok)
}
While this architecture solves the primary problem of dynamic updates, it is not without its limitations. The current implementation loads all feature definitions into the memory of every service instance, which could become a bottleneck if we scale to hundreds of thousands of definitions. A more advanced design might employ a two-tiered approach with on-demand loading for less-frequently used features. Furthermore, the feature execution logic itself is a placeholder; a production system would require a sandboxed, extensible plugin engine for transformations to prevent a buggy script from crashing the entire serving process. Finally, this design does not address atomic updates of multiple interdependent features. A change that requires modifying three features simultaneously could lead to inconsistent states during the rollout. This would necessitate a more complex transaction-like mechanism orchestrated through etcd
, potentially using multi-key transactions to ensure all related changes are applied as a single atomic unit.