The divergence between offline training features and online serving features was becoming untenable. Our machine learning teams were operating in a world of Jupyter notebooks and bespoke Python scripts, where a feature’s definition was buried in imperative code. Promoting a feature to production involved a series of manual hand-offs, copy-pasting logic, and a prayer that the online implementation matched the offline one. This friction resulted in slower model iteration, production incidents caused by data drift, and a complete lack of feature versioning or lineage. We needed a centralized, version-controlled system for defining, deploying, and serving features—a Feature Store. But simply adopting an off-the-shelf solution felt like treating a symptom, not the cause. Our core engineering principle is GitOps; infrastructure and application logic are declarative, versioned in Git, and reconciled automatically. The real challenge was to extend this paradigm to the fluid world of ML features.
Our initial concept was to build a Feature Store where every component—the underlying schema, the feature transformation logic, and the data ingestion pipelines—was managed declaratively. A pull request would be the sole mechanism for introducing or modifying a feature. This led us to a critical technology selection crossroads. For the serving layer, we needed more than a simple key-value store. Our models thrive on complex, relational features, like “the number of products a user’s friends have purchased in the last week.” Modeling this efficiently demanded a database that could handle both document-style entity storage and graph-based relationship traversal. ArangoDB, with its native multi-model capabilities, became the front-runner. It allowed us to represent users and products as documents and their interactions as edges in a graph, all queryable via a single, powerful AQL interface.
For the declarative delivery mechanism, Argo CD was the obvious, albeit incomplete, choice. We already used it to manage our Kubernetes workloads. The pitfall here is that Argo CD is designed to reconcile Kubernetes manifests, not arbitrary database schemas or queries. The core of our project would be bridging this gap: teaching Argo CD how to speak ArangoDB. This meant building a custom reconciliation layer that could translate YAML definitions from a Git repository into idempotent operations against an ArangoDB cluster.
The first step was designing the Git repository, the single source of truth for our entire Feature Store. A monorepo approach was chosen to keep feature definitions, schema, and infrastructure configuration tightly coupled.
feature-store-repo/
├── environments/ # Argo CD ApplicationSet configuration for each environment
│ ├── dev.yaml
│ ├── staging.yaml
│ └── prod.yaml
│
├── infrastructure/ # Kubernetes manifests for core components
│ ├── arangodb/ # ArangoDB cluster deployment (e.g., using ArangoDB Operator)
│ ├── kafka-consumers/ # Data ingestion consumers
│ └── schema-reconciler/ # Our custom tool's Job manifests
│
└── features/ # The core feature definitions
├── entities/ # Schema definitions (collections, indexes)
│ ├── users.yaml
│ ├── products.yaml
│ └── viewed_edge.yaml
└── transformations/ # AQL queries defining features
├── user_session_count_v1.aql
├── user_session_count_v2.aql
└── product_graph_centrality_v1.aql
With the structure defined, we tackled the first major implementation challenge: declarative schema management. ArangoDB collections, indexes, and graph definitions needed to be managed via Git. A common mistake is to handle this with one-off migration scripts. That approach is imperative and violates GitOps principles. We needed an idempotent reconciler. We built a small, stateless Go utility named arangosync
that would run inside a Kubernetes Job, triggered by an Argo CD PreSync hook. This ensures the database schema is in the correct state before the application pods that depend on it are deployed.
The features/entities/users.yaml
file looks like this:
apiVersion: "db.feature-store/v1alpha1"
kind: ArangoCollection
metadata:
name: users
spec:
# Standard ArangoDB collection properties
keyOptions:
allowUserKeys: true
type: "traditional"
indexes:
- type: "persistent"
fields: ["last_seen_at"]
unique: false
sparse: false
- type: "persistent"
fields: ["email"]
unique: true
sparse: true
schema: # Enforce a validation schema at the database level
rule:
properties:
_key:
type: "string"
pattern: "^[a-zA-Z0-9_-]+$"
email:
type: "string"
created_at:
type: "string"
format: "date-time"
required: ["email", "created_at"]
The arangosync
tool is responsible for parsing these YAML files and translating them into API calls to ArangoDB. The core logic focuses on idempotency: check if the resource exists; if not, create it; if it exists, check if it matches the definition and update it if necessary. A critical part of a real-world project is robust error handling and logging.
Here is a simplified snippet of the Go code for arangosync
handling collection reconciliation:
package main
import (
"context"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
driver "github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/http"
"gopkg.in/yaml.v2"
)
// ArangoCollection defines the structure of our custom YAML
type ArangoCollection struct {
ApiVersion string `yaml:"apiVersion"`
Kind string `yaml:"kind"`
Metadata struct {
Name string `yaml:"name"`
} `yaml:"metadata"`
Spec struct {
KeyOptions *driver.CollectionKeyOptions `yaml:"keyOptions,omitempty"`
Indexes []map[string]interface{} `yaml:"indexes,omitempty"`
Schema *driver.CollectionSchema `yaml:"schema,omitempty"`
} `yaml:"spec"`
}
func main() {
// In a real implementation, these would come from env vars or flags
dbHost := os.Getenv("ARANGO_HOST")
dbUser := os.Getenv("ARANGO_USER")
dbPass := os.Getenv("ARANGO_PASSWORD")
dbName := "_system"
schemaPath := "/schemas" // Path where YAML files are mounted in the Job pod
// --- Database Connection ---
conn, err := http.NewConnection(http.ConnectionConfig{Endpoints: []string{dbHost}})
if err != nil {
log.Fatalf("Failed to create HTTP connection: %v", err)
}
client, err := driver.NewClient(driver.ClientConfig{
Connection: conn,
Authentication: driver.BasicAuthentication(dbUser, dbPass),
})
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
db, err := client.Database(context.Background(), dbName)
if err != nil {
log.Fatalf("Failed to open database: %v", err)
}
log.Println("Starting schema reconciliation...")
// --- Walk the schema directory and process each file ---
err = filepath.Walk(schemaPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && (filepath.Ext(path) == ".yaml" || filepath.Ext(path) == ".yml") {
log.Printf("Processing file: %s", path)
processSchemaFile(context.Background(), db, path)
}
return nil
})
if err != nil {
log.Fatalf("Error during schema reconciliation: %v", err)
}
log.Println("Schema reconciliation completed successfully.")
}
// processSchemaFile reads a YAML file and applies the defined collection schema
func processSchemaFile(ctx context.Context, db driver.Database, filePath string) {
data, err := ioutil.ReadFile(filePath)
if err != nil {
log.Printf("ERROR: Failed to read file %s: %v", filePath, err)
return
}
var collectionDef ArangoCollection
err = yaml.Unmarshal(data, &collectionDef)
if err != nil {
log.Printf("ERROR: Failed to unmarshal YAML from %s: %v", filePath, err)
return
}
if collectionDef.Kind != "ArangoCollection" {
log.Printf("Skipping file %s, not an ArangoCollection kind.", filePath)
return
}
collectionName := collectionDef.Metadata.Name
ensureCollection(ctx, db, collectionName, &collectionDef)
ensureIndexes(ctx, db, collectionName, collectionDef.Spec.Indexes)
}
// ensureCollection is the idempotent logic for creating/updating a collection
func ensureCollection(ctx context.Context, db driver.Database, name string, def *ArangoCollection) {
exists, err := db.CollectionExists(ctx, name)
if err != nil {
log.Printf("ERROR: Failed to check if collection %s exists: %v", name, err)
return
}
if !exists {
log.Printf("Collection '%s' does not exist. Creating...", name)
opts := &driver.CreateCollectionOptions{
KeyOptions: def.Spec.KeyOptions,
Schema: def.Spec.Schema,
}
_, err := db.CreateCollection(ctx, name, opts)
if err != nil {
log.Printf("ERROR: Failed to create collection %s: %v", name, err)
} else {
log.Printf("Successfully created collection '%s'", name)
}
} else {
log.Printf("Collection '%s' already exists. Verifying properties...", name)
// In a production tool, you'd fetch existing properties
// and perform a diff to apply updates. For this example, we'll keep it simple.
// For instance, updating the schema:
if def.Spec.Schema != nil {
coll, _ := db.Collection(ctx, name)
props, _ := coll.Properties(ctx)
// A real diffing logic is needed here
if props.Schema == nil { // Simplified check
log.Printf("Updating schema for collection '%s'", name)
err := coll.SetProperties(ctx, driver.SetCollectionPropertiesOptions{Schema: def.Spec.Schema})
if err != nil {
log.Printf("ERROR: Failed to set schema for collection %s: %v", name, err)
}
}
}
}
}
// ensureIndexes handles idempotent index creation.
func ensureIndexes(ctx context.Context, db driver.Database, collectionName string, indexes []map[string]interface{}) {
coll, err := db.Collection(ctx, collectionName)
if err != nil {
log.Printf("ERROR: Could not get collection %s for index creation: %v", collectionName, err)
return
}
existingIndexes, err := coll.Indexes(ctx)
if err != nil {
log.Printf("ERROR: Could not list existing indexes for %s: %v", collectionName, err)
return
}
existingIndexMap := make(map[string]bool)
for _, idx := range existingIndexes {
// Create a unique signature for an index to check for its existence.
// This is a simplification; a robust implementation would hash the index properties.
key := fmt.Sprintf("%s-%v", idx.Type(), idx.Fields())
existingIndexMap[key] = true
}
for _, indexDef := range indexes {
fields, _ := indexDef["fields"].([]interface{})
var fieldStrs []string
for _, f := range fields {
fieldStrs = append(fieldStrs, f.(string))
}
indexType := indexDef["type"].(string)
key := fmt.Sprintf("%s-%v", indexType, fieldStrs)
if _, exists := existingIndexMap[key]; exists {
log.Printf("Index on %v for collection '%s' already exists. Skipping.", fieldStrs, collectionName)
continue
}
log.Printf("Creating %s index on %v for collection '%s'", indexType, fieldStrs, collectionName)
switch indexType {
case "persistent":
opts := driver.EnsurePersistentIndexOptions{
Unique: indexDef["unique"].(bool),
Sparse: indexDef["sparse"].(bool),
}
_, _, err := coll.EnsurePersistentIndex(ctx, fieldStrs, &opts)
if err != nil {
log.Printf("ERROR: Failed to create persistent index on %v for %s: %v", fieldStrs, collectionName, err)
}
// Add cases for other index types (geo, fulltext, etc.)
default:
log.Printf("WARNING: Unsupported index type '%s'", indexType)
}
}
}
This Go program is containerized and run as a Kubernetes Job. The Argo CD Application manifest that manages the schema is configured with a PreSync hook to run this Job.
# infrastructure/schema-reconciler/argocd-app-schema.yaml
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: feature-store-schema
namespace: argocd
spec:
project: default
source:
repoURL: 'https://github.com/your-org/feature-store-repo.git'
targetRevision: HEAD
path: features/entities # Only syncs the schema definitions
destination:
server: 'https://kubernetes.default.svc'
namespace: feature-store
syncPolicy:
automated:
prune: true
selfHeal: true
hooks:
- name: PreSyncSchemaReconciler
hook:
apiVersion: batch/v1
kind: Job
metadata:
name: arangosync-job
generateName: arangosync-job- # Ensures we can re-run the hook
spec:
template:
spec:
containers:
- name: arangosync
image: your-registry/arangosync:latest
env:
- name: ARANGO_HOST
valueFrom:
secretKeyRef:
name: arangodb-credentials
key: host
- name: ARANGO_USER
valueFrom:
secretKeyRef:
name: arangodb-credentials
key: user
- name: ARANGO_PASSWORD
valueFrom:
secretKeyRef:
name: arangodb-credentials
key: password
# This is the crucial part: we don't apply the manifests directly.
# Instead, we mount them into the Job's pod for the tool to process.
# Argo CD places the manifests in /tmp/manifests by default.
# We can copy them to a known location for our tool.
command: ["/bin/sh", "-c"]
args:
- "cp /tmp/manifests/* /schemas/ && /app/arangosync"
volumeMounts:
- name: schemas-vol
mountPath: /schemas
volumes:
- name: schemas-vol
emptyDir: {}
restartPolicy: Never
backoffLimit: 2
# This annotation allows the Job to be created by the hook.
# And `argocd.argoproj.io/hook-delete-policy: HookSucceeded` cleans it up.
annotations:
argocd.argoproj.io/hook: PreSync
argocd.argoproj.io/hook-delete-policy: HookSucceeded
The next, more complex challenge was managing the feature definitions themselves. A feature is essentially a named, versioned AQL query. We stored these in .aql
files. For instance, features/transformations/user_session_count_v2.aql
:
-- Feature: user_session_count
-- Version: 2
-- Description: Counts user sessions in the last 30 days, excluding bot traffic.
FOR u IN users
FILTER u._key == @user_key
LET session_count = (
FOR s IN 1..1 OUTBOUND u user_sessions
FILTER s.timestamp >= DATE_SUBTRACT(DATE_NOW(), 30, "day")
FILTER s.is_bot != true // <-- New logic for v2
COLLECT WITH COUNT INTO length
RETURN length
)[0] OR 0
RETURN {
user_key: u._key,
session_count_30d: session_count,
updated_at: DATE_NOW()
}
To manage these, we extended arangosync
to scan the transformations
directory. It parses the metadata from the AQL comments and creates or updates ArangoDB AQL User Functions. These functions are callable directly from other queries, which is perfect for our online serving API. The function name would be FEATURES::user_session_count_v2
. This namespacing prevents collisions and provides a clear API.
The entire process flow can be visualized as follows:
graph TD A[Data Scientist commits new feature to Git] --> B{GitHub PR}; B -- CI Tests Pass & PR Merged --> C[Argo CD detects change in Git repo]; C --> D{Argo CD Sync Starts}; D --> E[PreSync Hook: `arangosync` Job]; E -- 1. Reconciles Schema --> F[ArangoDB Collections & Indexes]; D -- 2. Applies K8s Manifests --> G[Kafka Consumer Deployment]; G -- Consumes data --> H[Raw Data in ArangoDB]; D --> I[PostSync Hook: `arangosync` Job]; I -- 3. Reconciles AQL --> J[ArangoDB AQL User Functions]; subgraph "Online Serving" K[ML Model API] --> L[Feature Fetching Service]; L -- "CALL FEATURES::user_...(@key)" --> J; end subgraph "Batch/Real-time Ingestion" G --> H; end style F fill:#d5f5e3,stroke:#27ae60 style J fill:#d5f5e3,stroke:#27ae60 style A fill:#eaf2f8,stroke:#3498db style C fill:#f9e79f,stroke:#f1c40f
The data ingestion pipeline, a set of Kafka consumers deployed as a Kubernetes Deployment, is also managed by Argo CD. These consumers are stateless; their job is to read from a raw data topic (e.g., clickstream_events
), perform minimal validation, and write the data into the appropriate ArangoDB collections (users
, products
) and edges (viewed_edge
). They do not contain any complex feature logic. The feature transformation happens at query time for real-time features, or via a separate batch process that calls the registered AQL functions for pre-computed features.
Promoting features across environments (dev -> staging -> prod) is now a standardized Git workflow. We use an Argo CD ApplicationSet with a Git generator. The environments/*.yaml
files define which branch the ApplicationSet should track for each environment.
# environments/prod.yaml
apiVersion: argoproj.io/v1alpha1
kind: ApplicationSet
metadata:
name: feature-store-prod
spec:
generators:
- git:
repoURL: https://github.com/your-org/feature-store-repo.git
revision: main # Production tracks the main branch
directories:
- path: 'infrastructure/*'
- path: 'features/*'
template:
metadata:
name: '{{path.basename}}-prod'
spec:
project: "production"
source:
repoURL: https://github.com/your-org/feature-store-repo.git
targetRevision: main
path: '{{path}}'
destination:
server: 'https://prod-k8s-cluster.svc'
namespace: 'feature-store'
A pull request from the staging
branch to the main
branch, once approved and merged, automatically triggers Argo CD to sync the new feature versions to the production environment. This process provides a full audit trail and the ability to roll back simply by reverting a Git commit.
This architecture provides a solid foundation, but it’s not without its limitations and areas for future improvement. The custom arangosync
tool, while effective, is bespoke. A more cloud-native approach would be to develop a full-fledged Kubernetes Operator for these ArangoDB resources (ArangoCollection
, ArangoAQLFunction
). This would replace the PreSync/PostSync hooks with a continuous reconciliation loop, providing more robust status reporting and error handling directly through the Kubernetes API. Furthermore, the real-time computation of complex graph features can introduce latency at scale. We will need to implement a caching layer (e.g., Redis) for frequently accessed features and establish a clear policy on which features are computed on-the-fly versus which are pre-materialized in batch. Finally, this solution focuses primarily on the declarative deployment of feature logic; a comprehensive feature backfilling strategy, likely orchestrated by a tool like Airflow that can trigger batch jobs using our versioned AQL functions, is a necessary next step to make new features available over historical data.