The model predictions were drifting, but not because of concept drift. The root cause was more insidious: data corruption at the feature level. Our real-time fraud detection system was processing multiple event streams for a single user—clickstream data, transaction initiations, profile updates—and attempting to update the user’s feature vector in near real-time. In high-traffic scenarios, race conditions were leading to partial updates. A transaction feature might be updated, but the corresponding session activity feature was overwritten by a concurrent process, resulting in a feature vector that represented an impossible state. This lack of atomicity was poisoning our training data and causing erratic real-time inference.
Our initial architecture used a document database, optimized for fast writes and schema flexibility. It became clear, however, that we had sacrificed consistency for speed, a trade-off that was no longer acceptable. The business demanded transactional integrity for feature sets. The problem wasn’t just about updating a single value; it was about ensuring that a group of logically related features changed as a single, atomic unit. This is a classic ACID problem, but solving it within a distributed, high-throughput MLOps pipeline running on Kubernetes presented a significant challenge.
The decision was made to re-architect the core of our real-time feature engineering service. We needed a design that could enforce business invariants and guarantee atomicity. This led us directly to Domain-Driven Design (DDD). Specifically, the Aggregate pattern felt like the perfect tool. An Aggregate is a cluster of associated objects that are treated as a single unit for data changes. The root of the Aggregate is the only object that external clients can interact with, and it’s responsible for upholding the consistency of the entire cluster. Our “Feature Aggregate” would encapsulate all features for a given entity (e.g., a customer) and ensure any modifications were valid and atomic.
To support this, we required a persistence layer that offered true ACID guarantees. We selected PostgreSQL, a battle-tested relational database known for its robustness and transactional integrity. The final piece of the puzzle was our runtime environment: Azure Kubernetes Service (AKS). The challenge here was to run this stateful, transactional component reliably on a platform typically geared towards stateless services. This meant leveraging StatefulSets
and PersistentVolumes
to provide stable network identities and durable storage for our service and its database.
This is the log of that build, moving from a conceptual DDD model to a production-grade, ACID-compliant service on AKS.
Defining the Domain Model: The Feature Aggregate
The first step was to model the domain using C#. We are not just storing key-value pairs; we are modeling a business concept with rules and behaviors. The FeatureAggregate
is the heart of this model.
The core principles are:
- The
FeatureAggregate
is the consistency boundary. - External objects can only hold a reference to the Aggregate Root.
- The Aggregate Root enforces all invariants.
Here is the C# implementation of our domain model. Note the private setters and the exposure of behavior through methods (UpdateFeature
, UpdateFeatures
), which is a key tenet of DDD.
// --- Domain/FeatureAggregate.cs ---
using System;
using System.Collections.Generic;
using System.Linq;
namespace TransactionalFeatureStore.Domain
{
/// <summary>
/// Represents the Aggregate Root for a collection of features for a specific entity.
/// It is the only way to modify the state of the features within this boundary,
/// ensuring all invariants are maintained.
/// </summary>
public class FeatureAggregate
{
// EF Core needs a private constructor for materialization
private FeatureAggregate()
{
_features = new Dictionary<string, Feature>();
}
public FeatureAggregate(Guid entityId)
{
if (entityId == Guid.Empty)
throw new ArgumentException("Entity ID cannot be empty.", nameof(entityId));
EntityId = entityId;
Id = Guid.NewGuid();
Version = 1;
LastModifiedUtc = DateTime.UtcNow;
_features = new Dictionary<string, Feature>();
}
// The unique identifier for the entity this feature set belongs to (e.g., customer ID)
public Guid EntityId { get; private set; }
// The aggregate's own unique identifier
public Guid Id { get; private set; }
// Timestamp for optimistic concurrency control
public ulong Version { get; private set; }
public DateTime LastModifiedUtc { get; private set; }
private readonly Dictionary<string, Feature> _features;
public IReadOnlyCollection<Feature> Features => _features.Values.ToList().AsReadOnly();
/// <summary>
/// Business logic for updating a single feature. Encapsulates validation.
/// </summary>
public void UpdateFeature(string name, object value, DateTime timestamp)
{
if (string.IsNullOrWhiteSpace(name))
throw new ArgumentException("Feature name cannot be empty.", nameof(name));
// In a real-world project, value validation would be far more complex.
// This could involve checking types, ranges, or cross-feature rules.
if (value == null)
throw new ArgumentNullException(nameof(value), "Feature value cannot be null.");
if (_features.TryGetValue(name, out var existingFeature))
{
existingFeature.UpdateValue(value, timestamp);
}
else
{
var newFeature = new Feature(name, value, timestamp, this.Id);
_features.Add(name, newFeature);
}
this.LastModifiedUtc = DateTime.UtcNow;
}
/// <summary>
/// Updates a batch of features. The key is that this entire operation
/// will be persisted within a single transaction.
/// </summary>
public void UpdateFeatures(Dictionary<string, object> featureUpdates, DateTime timestamp)
{
// This loop is part of a single unit of work. If one update fails,
// the entire transaction should be rolled back by the persistence layer.
foreach (var update in featureUpdates)
{
UpdateFeature(update.Key, update.Value, timestamp);
}
}
}
/// <summary>
/// Represents a single feature as an Entity within the Aggregate.
/// It has its own identity but its lifecycle is managed by the FeatureAggregate.
/// </summary>
public class Feature
{
private Feature() { } // For EF Core
public Feature(string name, object value, DateTime timestamp, Guid aggregateId)
{
// Internal consistency checks
Name = name;
Value = value.ToString(); // Persisting as string for simplicity, real-world might use JSONB or specific types
ValueType = value.GetType().AssemblyQualifiedName;
Timestamp = timestamp;
AggregateId = aggregateId;
Id = Guid.NewGuid();
}
public Guid Id { get; private set; }
public string Name { get; private set; }
public string Value { get; private set; }
public string ValueType { get; private set; }
public DateTime Timestamp { get; private set; }
public Guid AggregateId { get; private set; }
internal void UpdateValue(object value, DateTime timestamp)
{
Value = value.ToString();
ValueType = value.GetType().AssemblyQualifiedName;
Timestamp = timestamp;
}
}
}
This model is pure domain logic, with no dependencies on infrastructure. The FeatureAggregate
ensures that no Feature
can exist or be modified outside of its control, which is the foundation of our consistency guarantee.
Persistence and ACID Transactions
With the domain model defined, the next task was to persist it transactionally. We chose Entity Framework (EF) Core to map our objects to PostgreSQL. The DbContext
in EF Core conveniently provides the Unit of Work pattern, where all changes tracked within a single DbContext
instance are saved in one atomic transaction when SaveChangesAsync()
is called.
A critical mistake in such systems is to neglect concurrency. What happens if two processes fetch the same aggregate, both make changes, and then try to save? The last one wins, overwriting the changes of the first. To prevent this, we implemented optimistic concurrency control using a version property.
Here’s the repository implementation and the EF Core configuration.
// --- Infrastructure/Persistence/FeatureRepository.cs ---
using Microsoft.EntityFrameworkCore;
using System.Threading.Tasks;
using TransactionalFeatureStore.Domain;
namespace TransactionalFeatureStore.Infrastructure.Persistence
{
public interface IFeatureRepository
{
Task<FeatureAggregate?> GetByEntityIdAsync(Guid entityId);
Task AddAsync(FeatureAggregate aggregate);
Task SaveChangesAsync();
}
public class FeatureRepository : IFeatureRepository
{
private readonly FeatureDbContext _context;
public FeatureRepository(FeatureDbContext context)
{
_context = context;
}
public async Task AddAsync(FeatureAggregate aggregate)
{
await _context.FeatureAggregates.AddAsync(aggregate);
}
public async Task<FeatureAggregate?> GetByEntityIdAsync(Guid entityId)
{
// We must include the Features for them to be tracked by the DbContext
return await _context.FeatureAggregates
.Include(a => a.Features)
.SingleOrDefaultAsync(a => a.EntityId == entityId);
}
public async Task SaveChangesAsync()
{
// EF Core's SaveChangesAsync wraps all tracked changes (adds, updates, deletes)
// in a single database transaction. If any part of it fails, the entire
// operation is rolled back, guaranteeing ACID.
// The optimistic concurrency check also happens here. If the 'Version'
// in the database does not match the 'Version' of the in-memory object,
// a DbUpdateConcurrencyException will be thrown.
await _context.SaveChangesAsync();
}
}
}
The DbContext
configuration is where we tell EF Core how to map our domain model to database tables and, crucially, how to handle the optimistic locking.
// --- Infrastructure/Persistence/FeatureDbContext.cs ---
using Microsoft.EntityFrameworkCore;
using TransactionalFeatureStore.Domain;
namespace TransactionalFeatureStore.Infrastructure.Persistence
{
public class FeatureDbContext : DbContext
{
public FeatureDbContext(DbContextOptions<FeatureDbContext> options) : base(options) { }
public DbSet<FeatureAggregate> FeatureAggregates { get; set; }
public DbSet<Feature> Features { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
var aggregate = modelBuilder.Entity<FeatureAggregate>();
aggregate.ToTable("FeatureAggregates");
aggregate.HasKey(a => a.Id);
// EntityId is how we will look up the aggregate, so it needs an index.
aggregate.HasIndex(a => a.EntityId).IsUnique();
// This is the critical piece for optimistic concurrency.
// We map the 'Version' property to a PostgreSQL xmin system column,
// which is an internal transaction ID. It's automatically updated by Postgres
// on every row update, making it a perfect, reliable row version.
aggregate.Property(a => a.Version)
.IsRowVersion()
.HasColumnName("xmin")
.HasColumnType("xid");
// Define the one-to-many relationship.
// The features are owned by the aggregate. If the aggregate is deleted, so are its features.
aggregate.HasMany(a => a.Features)
.WithOne()
.HasForeignKey(f => f.AggregateId)
.OnDelete(DeleteBehavior.Cascade);
// EF Core needs to know how to access the private collection.
aggregate.Metadata.FindNavigation(nameof(FeatureAggregate.Features))!
.SetPropertyAccessMode(PropertyAccessMode.Field);
var feature = modelBuilder.Entity<Feature>();
feature.ToTable("Features");
feature.HasKey(f => f.Id);
feature.HasIndex(f => f.Name);
}
}
}
Using PostgreSQL’s internal xmin
column for row versioning is a robust technique. It avoids the need for manual version incrementing and is guaranteed to be updated by the database itself on every UPDATE
. If EF Core tries to run an UPDATE
with a WHERE
clause that includes the old xmin
value and that row has already been changed by another transaction, the update will affect zero rows, and EF Core will correctly throw a DbUpdateConcurrencyException
.
Containerization and Deployment to AKS
With the core logic in place, we needed to deploy it. The service was packaged as a Docker container.
Dockerfile
:
# Use the official ASP.NET Core runtime image
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 8080
# Use the SDK image to build the application
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["TransactionalFeatureStore.csproj", "./"]
RUN dotnet restore "./TransactionalFeatureStore.csproj"
COPY . .
WORKDIR "/src/."
RUN dotnet build "TransactionalFeatureStore.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "TransactionalFeatureStore.csproj" -c Release -o /app/publish /p:UseAppHost=false
# Final stage/image
FROM base AS final
WORKDIR /app
COPY /app/publish .
ENTRYPOINT ["dotnet", "TransactionalFeatureStore.dll"]
The real challenge was deploying this stateful application and its PostgreSQL database to AKS. We needed:
- A stable identity and storage for our service pods.
- A persistent, reliable database instance.
For our service, a StatefulSet
is the correct Kubernetes workload. Unlike a Deployment
, it provides stable, unique network identifiers (pod-name-0
, pod-name-1
) and stable, persistent storage. Each pod gets its own PersistentVolumeClaim
based on a template.
For PostgreSQL, a production setup would use a managed service like Azure Database for PostgreSQL or a robust Kubernetes Operator. For this build log, we’ll deploy a single-instance PostgreSQL as a StatefulSet
as well, to demonstrate the principle.
Here’s the combined Kubernetes manifest (feature-store.yaml
):
apiVersion: v1
kind: Service
metadata:
name: postgres-svc
labels:
app: postgres
spec:
ports:
- port: 5432
name: web
clusterIP: None # Headless service for direct StatefulSet pod access
selector:
app: postgres
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: postgres-sts
spec:
serviceName: "postgres-svc"
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:15
ports:
- containerPort: 5432
env:
- name: POSTGRES_DB
value: "featuredb"
- name: POSTGRES_USER
value: "user"
- name: POSTGRES_PASSWORD
value: "password"
volumeMounts:
- name: postgres-storage
mountPath: /var/lib/postgresql/data
volumeClaimTemplates:
- metadata:
name: postgres-storage
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 5Gi # In production, use a proper storage class
---
apiVersion: v1
kind: Service
metadata:
name: feature-store-svc
spec:
selector:
app: feature-store
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: feature-store-sts
spec:
serviceName: "feature-store-svc"
replicas: 2
selector:
matchLabels:
app: feature-store
template:
metadata:
labels:
app: feature-store
spec:
containers:
- name: feature-store
image: youracr.azurecr.io/transactional-feature-store:v1.0.0
ports:
- containerPort: 8080
env:
- name: ConnectionStrings__Database
value: "Host=postgres-svc;Port=5432;Database=featuredb;Username=user;Password=password"
# Liveness and Readiness probes are CRITICAL for stateful services
# They tell Kubernetes if the pod is healthy and ready to accept traffic.
readinessProbe:
httpGet:
path: /health/ready
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
livenessProbe:
httpGet:
path: /health/live
port: 8080
initialDelaySeconds: 20
periodSeconds: 15
Deploying this with kubectl apply -f feature-store.yaml
creates the PostgreSQL instance and two replicas of our feature store service. Each service pod connects to the single PostgreSQL instance via the headless service postgres-svc
. The transactional logic is handled by the database, and our application simply needs to manage the Unit of Work and handle potential DbUpdateConcurrencyException
.
Here is the architectural flow:
graph TD subgraph "Azure AKS Cluster" subgraph "StatefulSet: feature-store-sts" FS1[Pod: feature-store-0] FS2[Pod: feature-store-1] end subgraph "StatefulSet: postgres-sts" DB_POD[Pod: postgres-0] --- PV[Persistent Volume] end SVC[Service: feature-store-svc] --> FS1 SVC --> FS2 FS1 --> PG_SVC[Headless Service: postgres-svc] FS2 --> PG_SVC PG_SVC --> DB_POD end Client[ML Pipeline / API Client] -- HTTP Request --> Ingress[AKS Ingress] Ingress --> SVC
Testing for Concurrency and Atomicity
A common mistake is to write domain logic and persistence without adequately testing the failure modes. We needed to prove that our optimistic locking and transactional writes worked as expected.
Here is an integration test snippet using xUnit and Testcontainers to spin up a real PostgreSQL instance for the test run, providing high fidelity.
// --- Tests/Integration/ConcurrencyTests.cs ---
using Microsoft.EntityFrameworkCore;
using System.Threading.Tasks;
using Testcontainers.PostgreSql;
using Xunit;
using TransactionalFeatureStore.Domain;
// ... other usings
public class ConcurrencyTests : IAsyncLifetime
{
private readonly PostgreSqlContainer _dbContainer = new PostgreSqlBuilder().Build();
public async Task InitializeAsync() => await _dbContainer.StartAsync();
public async Task DisposeAsync() => await _dbContainer.DisposeAsync().AsTask();
private FeatureDbContext CreateDbContext()
{
var options = new DbContextOptionsBuilder<FeatureDbContext>()
.UseNpgsql(_dbContainer.GetConnectionString())
.Options;
var context = new FeatureDbContext(options);
context.Database.EnsureCreated();
return context;
}
[Fact]
public async Task When_Two_Processes_Update_Same_Aggregate_Concurrently_One_Should_Fail()
{
// ARRANGE
var entityId = Guid.NewGuid();
var initialAggregate = new FeatureAggregate(entityId);
initialAggregate.UpdateFeature("initial_feature", 100, DateTime.UtcNow);
// Save the initial state
await using var setupContext = CreateDbContext();
var setupRepo = new FeatureRepository(setupContext);
await setupRepo.AddAsync(initialAggregate);
await setupRepo.SaveChangesAsync();
// ACT
// Simulate two concurrent processes fetching the same aggregate
await using var context1 = CreateDbContext();
var repo1 = new FeatureRepository(context1);
var aggregate1 = await repo1.GetByEntityIdAsync(entityId);
await using var context2 = CreateDbContext();
var repo2 = new FeatureRepository(context2);
var aggregate2 = await repo2.GetByEntityIdAsync(entityId);
Assert.NotNull(aggregate1);
Assert.NotNull(aggregate2);
// Process 1 updates and saves successfully
aggregate1.UpdateFeature("clicks", 5, DateTime.UtcNow);
await repo1.SaveChangesAsync();
// Process 2, working with STALE data, now tries to update and save
aggregate2.UpdateFeature("time_on_page", 120.5, DateTime.UtcNow);
// ASSERT
// This should fail because the 'Version' (xmin) of aggregate2 is now out of date.
// The database row was modified by the first process.
await Assert.ThrowsAsync<DbUpdateConcurrencyException>(() => repo2.SaveChangesAsync());
}
}
This test explicitly simulates the race condition we aimed to solve. It proves that our persistence strategy correctly prevents stale writes, thus preserving the integrity of our feature vectors. The process that receives the DbUpdateConcurrencyException
would then be responsible for retrying the operation: re-fetch the latest version of the aggregate, re-apply its changes, and attempt to save again.
Lingering Issues and Future Iterations
This architecture successfully solves the problem of atomic feature updates for our MLOps platform. However, it’s not a silver bullet. The pragmatic senior engineer knows every solution introduces new trade-offs.
The primary limitation is the contention on a single database row for a highly active entity. While optimistic locking prevents corruption, it doesn’t solve the “thundering herd” problem. If hundreds of events per second are trying to update the same user’s feature aggregate, most will fail with concurrency exceptions and have to retry, leading to high latency and wasted compute. This design is perfect for moderately active entities or for updates that are naturally batched, but it is not suitable for hyper-scale, per-event updates on the same entity.
A potential future iteration could explore a CQRS (Command Query Responsibility Segregation) pattern. Writes would still go through the transactional aggregate, but reads (for model inference) could be served from a denormalized, eventually consistent read model (e.g., in Redis or a document DB). This would optimize for fast reads, which is often the more critical path in real-time inference.
Furthermore, managing database schema migrations in a StatefulSet
-based deployment on Kubernetes requires a careful, managed process. A simple pod restart is not enough. This often involves using init containers for migration scripts or dedicated database migration tools that understand how to perform rolling updates without downtime. This operational complexity is a non-trivial cost of running stateful workloads on Kubernetes.