Our MLOps platform hit a wall. The core problem was feature engineering. Batch ETL jobs running nightly were too slow for our fraud detection and real-time recommendation models. Ad-hoc Python scripts used for training produced features that were subtly different from the online Java services, leading to maddening training-serving skew. We were spending more time debugging data pipelines than improving models. The initial goal was clear: build a decoupled, real-time feature engineering pipeline that guarantees consistency and is managed as part of the end-to-end ML lifecycle.
The initial whiteboard sketch was an event-driven architecture. Raw event streams (clicks, transactions, etc.) would be the source of truth. A set of microservices would process these streams to generate feature vectors, making them available for both model training and online inference. This immediately brought up a complex set of technology choices. We needed a streaming platform that could handle high throughput and also store historical data for retraining. We needed a robust language for the feature engineering services that could offer performance and maintainability. We needed an orchestration layer to manage the entire workflow from feature generation to model deployment. And critically, we needed a way to manage the infrastructure and configuration declaratively.
This led to our final stack selection. For the streaming backbone, we chose Apache Pulsar over Kafka. Its built-in multi-tenancy was a huge win for isolating different ML teams’ data, and its tiered storage architecture, offloading older data to S3, solved the historical replay problem without needing a separate data warehouse. For the feature engineering microservices, we selected Kotlin. The JVM’s performance is proven, but Kotlin’s modern syntax, null safety, and excellent coroutine support for asynchronous I/O made it a more productive and safer choice than Java or Scala. Kubeflow was the natural choice for the orchestration layer, being the de facto standard for MLOps on Kubernetes. To manage the infrastructure—the Kubernetes cluster, Pulsar, and other components—we used Terraform for its declarative approach. The non-obvious, but crucial, fifth element was HashiCorp Consul. We decided to use it not just for service discovery, but as a dynamic configuration store for our feature engineering logic, allowing us to change parameters like window sizes on the fly without a full service redeployment. This was the key to unlocking operational agility.
Phase 1: Declarative Infrastructure with Terraform
Before writing a single line of Kotlin, we needed a stable, repeatable environment. A common mistake is to manually set up clusters and dependencies; this is a recipe for disaster in production. Everything had to be code. Our foundation was an EKS cluster, with managed node groups for easier maintenance.
Here is the core Terraform module for the EKS cluster. The key considerations here are defining a VPC with private and public subnets, and setting up IAM roles with the correct permissions for Kubeflow and other components to interact with AWS APIs.
# modules/eks/main.tf
variable "cluster_name" {
type = string
description = "The name of the EKS cluster."
}
variable "vpc_id" {
type = string
description = "ID of the VPC where the cluster will be deployed."
}
variable "private_subnet_ids" {
type = list(string)
description = "List of private subnet IDs for worker nodes."
}
resource "aws_eks_cluster" "main" {
name = var.cluster_name
role_arn = aws_iam_role.cluster_role.arn
version = "1.27"
vpc_config {
subnet_ids = var.private_subnet_ids
endpoint_private_access = true
endpoint_public_access = false # Best practice for production
}
depends_on = [
aws_iam_role_policy_attachment.cluster_amazon_eks_cluster_policy,
aws_iam_role_policy_attachment.cluster_amazon_eks_service_policy,
]
}
resource "aws_eks_node_group" "ml_compute" {
cluster_name = aws_eks_cluster.main.name
node_group_name = "${var.cluster_name}-ml-compute"
node_role_arn = aws_iam_role.node_group_role.arn
subnet_ids = var.private_subnet_ids
instance_types = ["m5.2xlarge"]
scaling_config {
desired_size = 3
max_size = 10
min_size = 2
}
update_config {
max_unavailable = 1
}
# Ensure nodes can pull images and communicate with the control plane
depends_on = [
aws_iam_role_policy_attachment.node_amazon_eks_worker_node_policy,
aws_iam_role_policy_attachment.node_amazon_ec2_container_registry_read_only,
aws_iam_role_policy_attachment.node_amazon_eks_cni_policy,
]
}
# IAM roles and policies are defined below...
# ... (aws_iam_role.cluster_role, aws_iam_role.node_group_role, etc.)
With the Kubernetes cluster defined, the next step was deploying Pulsar and Consul. We used the official Helm charts, managed via Terraform’s helm_release
resource. This is where things get tricky in a real-world project. The default Helm values are rarely suitable for production.
For Pulsar, the most critical part is configuring persistence for Zookeeper and BookKeepers. Losing their state means losing all your data. We used AWS EBS volumes provisioned via a StorageClass.
# modules/pulsar/main.tf
resource "helm_release" "pulsar" {
name = "pulsar"
repository = "https://pulsar.apache.org/charts"
chart = "pulsar"
namespace = "pulsar"
version = "3.1.0"
create_namespace = true
# Production-grade configuration overrides
values = [
yamlencode({
zookeeper = {
replicaCount = 3
persistence = {
enabled = true
storageClass = "gp2-csi" # Assumes a gp2 storage class exists
size = "20Gi"
}
}
bookkeeper = {
replicaCount = 3
persistence = {
enabled = true
storageClass = "gp2-csi"
size = "100Gi"
}
resources = {
requests = { memory = "8Gi", cpu = "2000m" }
limits = { memory = "8Gi", cpu = "4000m" }
}
}
broker = {
replicaCount = 3
configData = {
# Enable tiered storage offloading to S3 for long-term retention
"managedLedgerOffloadDriver" = "s3"
"s3ManagedLedgerOffloadBucket" = "our-pulsar-long-term-storage-bucket"
"s3ManagedLedgerOffloadRegion" = "us-east-1"
}
}
proxy = {
replicaCount = 2
}
# Disable components we don't need to save resources
components = {
functions = false # We run our logic in dedicated pods
presto = false
}
# Enable TLS and Authentication in a real scenario
tls = {
enabled = false # For simplicity in this example
}
})
]
}
Similarly, for Consul, we enabled the Connect (service mesh) and Controller components, and configured ACLs from the start. Not enabling ACLs during setup is a common mistake that is very painful to fix later.
# modules/consul/main.tf
resource "helm_release" "consul" {
name = "consul"
repository = "https://helm.releases.hashicorp.com"
chart = "consul"
namespace = "consul"
version = "1.2.2"
create_namespace = true
values = [
yamlencode({
global = {
name = "consul"
# Enable ACLs from the start. The bootstrap token will be created in a k8s secret.
acls = {
manageSystemACLs = true
}
}
# The controller syncs K8s services with the Consul catalog
controller = {
enabled = true
}
# The connectInject component enables the automatic sidecar proxy injection for service mesh
connectInject = {
enabled = true
}
server = {
replicas = 3
bootstrapExpect = 3 # Important for Raft consensus
# Production storage configuration
storage = "10Gi"
storageClass = "gp2-csi"
}
})
]
}
After running terraform apply
, we had a fully provisioned, production-ready foundation to build our application on.
Phase 2: The Kotlin Feature Engineering Service
The heart of the system is the microservice that performs the actual feature calculations. We chose Kotlin with coroutines to handle the high-concurrency, I/O-bound nature of consuming from Pulsar and potentially calling other services for data enrichment.
The project structure is a standard Gradle build. The build.gradle.kts
file declares our key dependencies: the official Pulsar client, a Consul client for fetching configuration, and Ktor for exposing health check endpoints.
// build.gradle.kts
plugins {
kotlin("jvm") version "1.9.0"
application
}
// ... repository definitions
dependencies {
// Pulsar client for messaging
implementation("org.apache.pulsar:pulsar-client:3.1.0")
// Consul client for dynamic configuration
implementation("com.orbitz.consul:consul-client:1.5.3")
// Ktor for HTTP server (health checks)
implementation("io.ktor:ktor-server-core-jvm:2.3.4")
implementation("io.ktor:ktor-server-netty-jvm:2.3.4")
// Logging
implementation("org.slf4j:slf4j-simple:2.0.7")
// Coroutines
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
// Testing
testImplementation(kotlin("test"))
testImplementation("io.mockk:mockk:1.13.7")
}
The core logic resides in a FeatureProcessor
class. On startup, it connects to both Pulsar and Consul. It fetches its configuration—like input/output topics and window duration—from the Consul KV store. This is the dynamic configuration piece that allows the platform team to tweak feature logic without requiring the ML team to rebuild and redeploy their service.
// src/main/kotlin/com/mlops/feature/FeatureProcessor.kt
import com.orbitz.consul.Consul
import com.orbitz.consul.KeyValueClient
import kotlinx.coroutines.*
import org.apache.pulsar.client.api.*
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.ConcurrentHashMap
// Data classes for input events and output features
data class TransactionEvent(val userId: String, val amount: Double, val timestamp: Long)
data class UserFeatures(val userId: String, val transactionCount10Min: Long, val totalAmount10Min: Double)
class FeatureProcessor(
private val pulsarClient: PulsarClient,
private val kvClient: KeyValueClient
) {
private val logger = LoggerFactory.getLogger(javaClass)
// In-memory state for windowed aggregations. In a real-world scenario,
// this would be backed by a state store like RocksDB or Redis for fault tolerance.
private val userTransactionWindows = ConcurrentHashMap<String, MutableList<Pair<Long, Double>>>()
private lateinit var consumer: Consumer<TransactionEvent>
private lateinit var producer: Producer<UserFeatures>
// Fetches configuration from Consul KV. Provides defaults for safety.
private fun getConfig(key: String, default: String): String =
kvClient.getValueAsString("config/feature-engineering/${key}").orElse(default)
suspend fun start() {
// Load configuration dynamically
val inputTopic = getConfig("input.topic", "persistent://public/default/transactions")
val outputTopic = getConfig("output.topic", "persistent://public/default/user-features")
val subscriptionName = getConfig("subscription.name", "feature-engineering-service")
val windowDurationSeconds = getConfig("window.duration.seconds", "600").toLong()
logger.info("Starting processor with config: inputTopic=$inputTopic, outputTopic=$outputTopic, window=${windowDurationSeconds}s")
// Initialize Pulsar producer and consumer
producer = pulsarClient.newProducer(Schema.JSON(UserFeatures::class.java))
.topic(outputTopic)
.create()
consumer = pulsarClient.newConsumer(Schema.JSON(TransactionEvent::class.java))
.topic(inputTopic)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared) // Allow multiple instances to consume in parallel
.subscribe()
// Start the main processing loop in a coroutine
coroutineScope {
launch(Dispatchers.IO) { // Use IO dispatcher for blocking calls
processMessages(windowDurationSeconds)
}
}
}
private suspend fun processMessages(windowDurationSeconds: Long) {
while (currentCoroutineContext().isActive) {
try {
val message = consumer.receive()
val event = message.value
val now = System.currentTimeMillis()
// Add event to user's window
val window = userTransactionWindows.computeIfAbsent(event.userId) { mutableListOf() }
window.add(Pair(event.timestamp, event.amount))
// Evict old events from window
window.retainAll { (timestamp, _) -> (now - timestamp) < windowDurationSeconds * 1000 }
// Calculate features
val feature = UserFeatures(
userId = event.userId,
transactionCount10Min = window.size.toLong(),
totalAmount10Min = window.sumOf { it.second }
)
// Produce the new feature vector
producer.sendAsync(feature).thenAccept { msgId ->
logger.debug("Produced feature for user ${event.userId} with message ID $msgId")
}
// Acknowledge message only after successful processing and production intent
consumer.acknowledge(message)
} catch (e: PulsarClientException) {
// Proper error handling is crucial.
// In a real system, you'd have more nuanced retry logic.
logger.error("Failed to process message from Pulsar. Will retry.", e)
delay(1000) // Backoff before retrying
}
}
}
fun stop() {
logger.info("Stopping FeatureProcessor...")
consumer.close()
producer.close()
}
}
The error handling is critical here. If producing the feature fails, we must not acknowledge the input message. Pulsar’s acknowledgment mechanism ensures that the message will be redelivered, preventing data loss. The state management userTransactionWindows
is in-memory for this example, which is a major simplification. In production, this state must be externalized (e.g., using Pulsar’s built-in state functions or a separate Key-Value store) to survive pod restarts and enable stateful scaling.
Phase 3: Deployment and Orchestration with Kubeflow
With the service implemented, we need to deploy it and integrate it into a larger ML pipeline. First, we need a Kubernetes Deployment
manifest. The key part of this manifest is the annotations that tell the Consul Connect injector to add the Envoy sidecar proxy. This is the first step towards a zero-trust network.
# kubernetes/feature-engineering-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: feature-engineering-service
namespace: ml-pipelines
spec:
replicas: 3
selector:
matchLabels:
app: feature-engineering-service
template:
metadata:
labels:
app: feature-engineering-service
annotations:
# These annotations enable Consul Service Mesh
'consul.hashicorp.com/connect-inject': 'true'
spec:
containers:
- name: processor
image: your-docker-registry/feature-engineering-service:v1.0.0
ports:
- containerPort: 8080 # For health checks
env:
# Inject Consul and Pulsar addresses.
# Use Kubernetes secrets for sensitive data like ACL tokens.
- name: CONSUL_HTTP_ADDR
value: "consul-server.consul.svc:8500"
- name: PULSAR_SERVICE_URL
value: "pulsar://pulsar-proxy.pulsar.svc:6650"
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 15
periodSeconds: 10
Now, we define a Kubeflow Pipeline using the KFP SDK. This pipeline automates the entire process: it first updates the feature engineering configuration in Consul, then deploys our Kotlin service, waits for it to be ready, and finally launches a model training job that consumes the newly generated features from Pulsar.
# pipelines/training_pipeline.py
from kfp import dsl
from kfp.dsl import Input, Output, Dataset
@dsl.container_component
def update_feature_config(consul_address: str, window_seconds: int):
"""A pipeline component to update feature configuration in Consul KV."""
return dsl.ContainerSpec(
image='bitnami/curl:latest',
command=[
'curl',
'-X', 'PUT',
'--data', str(window_seconds),
f'http://{consul_address}/v1/kv/config/feature-engineering/window.duration.seconds'
]
)
@dsl.container_component
def model_training_job(
pulsar_address: str,
feature_topic: str,
model_output: Output[Dataset]
):
"""A pipeline component that reads features from Pulsar and trains a model."""
# In a real implementation, this would be a more complex component
# using pandas, scikit-learn, and the pulsar-client python library.
return dsl.ContainerSpec(
image='python:3.9-slim',
command=['sh', '-c'],
args=[
'pip install pulsar-client && '
'echo "Simulating training job: Reading from Pulsar and writing model artifact..." && '
f'echo "Model trained with data from {pulsar_address}/{feature_topic}" > {model_output.path}'
]
)
@dsl.pipeline(
name='Real-time Feature Training Pipeline',
description='A pipeline that configures features, waits, and trains a model.'
)
def feature_training_pipeline(
consul_address: str = 'consul-server.consul.svc:8500',
pulsar_address: str = 'pulsar-proxy.pulsar.svc:6650',
feature_topic: str = 'persistent://public/default/user-features',
new_window_config: int = 300
):
# Step 1: Update the feature configuration in Consul
config_task = update_feature_config(
consul_address=consul_address,
window_seconds=new_window_config
)
# Step 2: Use a dummy task with a sleep to allow the feature engineering
# service time to pick up the new config and generate features.
# In a production pipeline, this would be a more sophisticated check,
# perhaps querying Pulsar topic stats until enough new messages are present.
wait_op = dsl.ContainerSpec(
image='alpine:latest',
command=['sleep', '120']
)
wait_task = dsl.ContainerOp(
name='wait-for-features',
container=wait_op,
).after(config_task)
# Step 3: Run the training job which consumes from the feature topic
train_task = model_training_job(
pulsar_address=pulsar_address,
feature_topic=feature_topic,
).after(wait_task)
This pipeline, when compiled and uploaded to Kubeflow, provides a single, repeatable workflow for our data scientists. They can trigger a run, pass in a new windowing parameter, and the system automatically reconfigures the live feature generation and kicks off a new training run based on the results.
The Assembled System
The final architecture provides a clean separation of concerns and a clear data flow.
graph TD subgraph "AWS EKS Cluster (Managed by Terraform)" subgraph "Namespace: pulsar" PulsarBroker[Pulsar Broker] PulsarProxy[Pulsar Proxy] Bookie[BookKeeper] Zookeeper[Zookeeper] end subgraph "Namespace: consul" ConsulServer[Consul Server] ConsulClient[Consul Client Agent] end subgraph "Namespace: ml-pipelines" direction LR KFP[Kubeflow Pipelines UI/Server] --> KFPRunner[Pipeline Runner Pod] subgraph "Feature Engineering Deployment" KotlinSvc1[Kotlin Service Pod 1] KotlinSvc2[Kotlin Service Pod 2] KotlinSvc3[Kotlin Service Pod 3] end TrainingPod[Model Training Pod] end end ExternalSource[External Event Source] -->|1. Raw Events| PulsarProxy PulsarProxy --> PulsarBroker PulsarBroker -- Stores Data --> Bookie KotlinSvc1 -- Reads Config --> ConsulServer KotlinSvc1 -- 2. Consumes Raw Events --> PulsarProxy KotlinSvc1 -- 3. Publishes Features --> PulsarProxy KFP -- Triggers Pipeline --> KFPRunner KFPRunner -- "4. updates config" --> ConsulServer KFPRunner -- "5. creates pod" --> TrainingPod TrainingPod -- 6. Consumes Features --> PulsarProxy style Zookeeper fill:#f9f,stroke:#333,stroke-width:2px style Bookie fill:#f9f,stroke:#333,stroke-width:2px style ConsulServer fill:#bbf,stroke:#333,stroke-width:2px
This setup solved our core problems. Training-serving skew was eliminated because both the training pipeline and any future online inference service would consume from the exact same Pulsar topic (user-features
). The feature logic is encapsulated in a single, testable, and robust Kotlin service. The entire process is automated and auditable through Kubeflow. The infrastructure is repeatable and version-controlled via Terraform.
The current implementation, however, has clear boundaries and areas for the next iteration. State management for the feature engineering service is naive; moving from in-memory to a durable state store like RocksDB managed by the application or leveraging Pulsar Functions’ stateful capabilities is the highest priority for fault tolerance. The feature schema is implicit in the Kotlin data classes; integrating a schema registry (like Pulsar’s) is essential to manage schema evolution without breaking consumers. Finally, the “wait” step in our Kubeflow pipeline is brittle. A more robust solution would involve a sensor component that actively checks for the availability of sufficient new feature data in Pulsar before triggering the training job. This architecture isn’t an end state, but a solid, pragmatic foundation for building a truly scalable MLOps platform.