Implementing a Real-Time Feature Engineering Pipeline with Kotlin, Pulsar, and Kubeflow


Our MLOps platform hit a wall. The core problem was feature engineering. Batch ETL jobs running nightly were too slow for our fraud detection and real-time recommendation models. Ad-hoc Python scripts used for training produced features that were subtly different from the online Java services, leading to maddening training-serving skew. We were spending more time debugging data pipelines than improving models. The initial goal was clear: build a decoupled, real-time feature engineering pipeline that guarantees consistency and is managed as part of the end-to-end ML lifecycle.

The initial whiteboard sketch was an event-driven architecture. Raw event streams (clicks, transactions, etc.) would be the source of truth. A set of microservices would process these streams to generate feature vectors, making them available for both model training and online inference. This immediately brought up a complex set of technology choices. We needed a streaming platform that could handle high throughput and also store historical data for retraining. We needed a robust language for the feature engineering services that could offer performance and maintainability. We needed an orchestration layer to manage the entire workflow from feature generation to model deployment. And critically, we needed a way to manage the infrastructure and configuration declaratively.

This led to our final stack selection. For the streaming backbone, we chose Apache Pulsar over Kafka. Its built-in multi-tenancy was a huge win for isolating different ML teams’ data, and its tiered storage architecture, offloading older data to S3, solved the historical replay problem without needing a separate data warehouse. For the feature engineering microservices, we selected Kotlin. The JVM’s performance is proven, but Kotlin’s modern syntax, null safety, and excellent coroutine support for asynchronous I/O made it a more productive and safer choice than Java or Scala. Kubeflow was the natural choice for the orchestration layer, being the de facto standard for MLOps on Kubernetes. To manage the infrastructure—the Kubernetes cluster, Pulsar, and other components—we used Terraform for its declarative approach. The non-obvious, but crucial, fifth element was HashiCorp Consul. We decided to use it not just for service discovery, but as a dynamic configuration store for our feature engineering logic, allowing us to change parameters like window sizes on the fly without a full service redeployment. This was the key to unlocking operational agility.

Phase 1: Declarative Infrastructure with Terraform

Before writing a single line of Kotlin, we needed a stable, repeatable environment. A common mistake is to manually set up clusters and dependencies; this is a recipe for disaster in production. Everything had to be code. Our foundation was an EKS cluster, with managed node groups for easier maintenance.

Here is the core Terraform module for the EKS cluster. The key considerations here are defining a VPC with private and public subnets, and setting up IAM roles with the correct permissions for Kubeflow and other components to interact with AWS APIs.

# modules/eks/main.tf

variable "cluster_name" {
  type        = string
  description = "The name of the EKS cluster."
}

variable "vpc_id" {
  type        = string
  description = "ID of the VPC where the cluster will be deployed."
}

variable "private_subnet_ids" {
  type        = list(string)
  description = "List of private subnet IDs for worker nodes."
}

resource "aws_eks_cluster" "main" {
  name     = var.cluster_name
  role_arn = aws_iam_role.cluster_role.arn
  version  = "1.27"

  vpc_config {
    subnet_ids              = var.private_subnet_ids
    endpoint_private_access = true
    endpoint_public_access  = false # Best practice for production
  }

  depends_on = [
    aws_iam_role_policy_attachment.cluster_amazon_eks_cluster_policy,
    aws_iam_role_policy_attachment.cluster_amazon_eks_service_policy,
  ]
}

resource "aws_eks_node_group" "ml_compute" {
  cluster_name    = aws_eks_cluster.main.name
  node_group_name = "${var.cluster_name}-ml-compute"
  node_role_arn   = aws_iam_role.node_group_role.arn
  subnet_ids      = var.private_subnet_ids
  instance_types  = ["m5.2xlarge"]

  scaling_config {
    desired_size = 3
    max_size     = 10
    min_size     = 2
  }

  update_config {
    max_unavailable = 1
  }

  # Ensure nodes can pull images and communicate with the control plane
  depends_on = [
    aws_iam_role_policy_attachment.node_amazon_eks_worker_node_policy,
    aws_iam_role_policy_attachment.node_amazon_ec2_container_registry_read_only,
    aws_iam_role_policy_attachment.node_amazon_eks_cni_policy,
  ]
}

# IAM roles and policies are defined below...
# ... (aws_iam_role.cluster_role, aws_iam_role.node_group_role, etc.)

With the Kubernetes cluster defined, the next step was deploying Pulsar and Consul. We used the official Helm charts, managed via Terraform’s helm_release resource. This is where things get tricky in a real-world project. The default Helm values are rarely suitable for production.

For Pulsar, the most critical part is configuring persistence for Zookeeper and BookKeepers. Losing their state means losing all your data. We used AWS EBS volumes provisioned via a StorageClass.

# modules/pulsar/main.tf

resource "helm_release" "pulsar" {
  name       = "pulsar"
  repository = "https://pulsar.apache.org/charts"
  chart      = "pulsar"
  namespace  = "pulsar"
  version    = "3.1.0"
  create_namespace = true

  # Production-grade configuration overrides
  values = [
    yamlencode({
      zookeeper = {
        replicaCount = 3
        persistence = {
          enabled = true
          storageClass = "gp2-csi" # Assumes a gp2 storage class exists
          size = "20Gi"
        }
      }
      bookkeeper = {
        replicaCount = 3
        persistence = {
          enabled = true
          storageClass = "gp2-csi"
          size = "100Gi"
        }
        resources = {
          requests = { memory = "8Gi", cpu = "2000m" }
          limits = { memory = "8Gi", cpu = "4000m" }
        }
      }
      broker = {
        replicaCount = 3
        configData = {
          # Enable tiered storage offloading to S3 for long-term retention
          "managedLedgerOffloadDriver" = "s3"
          "s3ManagedLedgerOffloadBucket" = "our-pulsar-long-term-storage-bucket"
          "s3ManagedLedgerOffloadRegion" = "us-east-1"
        }
      }
      proxy = {
        replicaCount = 2
      }
      # Disable components we don't need to save resources
      components = {
        functions = false # We run our logic in dedicated pods
        presto = false
      }
      # Enable TLS and Authentication in a real scenario
      tls = {
        enabled = false # For simplicity in this example
      }
    })
  ]
}

Similarly, for Consul, we enabled the Connect (service mesh) and Controller components, and configured ACLs from the start. Not enabling ACLs during setup is a common mistake that is very painful to fix later.

# modules/consul/main.tf

resource "helm_release" "consul" {
  name       = "consul"
  repository = "https://helm.releases.hashicorp.com"
  chart      = "consul"
  namespace  = "consul"
  version    = "1.2.2"
  create_namespace = true

  values = [
    yamlencode({
      global = {
        name = "consul"
        # Enable ACLs from the start. The bootstrap token will be created in a k8s secret.
        acls = {
          manageSystemACLs = true
        }
      }
      # The controller syncs K8s services with the Consul catalog
      controller = {
        enabled = true
      }
      # The connectInject component enables the automatic sidecar proxy injection for service mesh
      connectInject = {
        enabled = true
      }
      server = {
        replicas = 3
        bootstrapExpect = 3 # Important for Raft consensus
        # Production storage configuration
        storage = "10Gi"
        storageClass = "gp2-csi"
      }
    })
  ]
}

After running terraform apply, we had a fully provisioned, production-ready foundation to build our application on.

Phase 2: The Kotlin Feature Engineering Service

The heart of the system is the microservice that performs the actual feature calculations. We chose Kotlin with coroutines to handle the high-concurrency, I/O-bound nature of consuming from Pulsar and potentially calling other services for data enrichment.

The project structure is a standard Gradle build. The build.gradle.kts file declares our key dependencies: the official Pulsar client, a Consul client for fetching configuration, and Ktor for exposing health check endpoints.

// build.gradle.kts

plugins {
    kotlin("jvm") version "1.9.0"
    application
}

// ... repository definitions

dependencies {
    // Pulsar client for messaging
    implementation("org.apache.pulsar:pulsar-client:3.1.0")

    // Consul client for dynamic configuration
    implementation("com.orbitz.consul:consul-client:1.5.3")

    // Ktor for HTTP server (health checks)
    implementation("io.ktor:ktor-server-core-jvm:2.3.4")
    implementation("io.ktor:ktor-server-netty-jvm:2.3.4")

    // Logging
    implementation("org.slf4j:slf4j-simple:2.0.7")

    // Coroutines
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")

    // Testing
    testImplementation(kotlin("test"))
    testImplementation("io.mockk:mockk:1.13.7")
}

The core logic resides in a FeatureProcessor class. On startup, it connects to both Pulsar and Consul. It fetches its configuration—like input/output topics and window duration—from the Consul KV store. This is the dynamic configuration piece that allows the platform team to tweak feature logic without requiring the ML team to rebuild and redeploy their service.

// src/main/kotlin/com/mlops/feature/FeatureProcessor.kt

import com.orbitz.consul.Consul
import com.orbitz.consul.KeyValueClient
import kotlinx.coroutines.*
import org.apache.pulsar.client.api.*
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.ConcurrentHashMap

// Data classes for input events and output features
data class TransactionEvent(val userId: String, val amount: Double, val timestamp: Long)
data class UserFeatures(val userId: String, val transactionCount10Min: Long, val totalAmount10Min: Double)

class FeatureProcessor(
    private val pulsarClient: PulsarClient,
    private val kvClient: KeyValueClient
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    // In-memory state for windowed aggregations. In a real-world scenario,
    // this would be backed by a state store like RocksDB or Redis for fault tolerance.
    private val userTransactionWindows = ConcurrentHashMap<String, MutableList<Pair<Long, Double>>>()

    private lateinit var consumer: Consumer<TransactionEvent>
    private lateinit var producer: Producer<UserFeatures>

    // Fetches configuration from Consul KV. Provides defaults for safety.
    private fun getConfig(key: String, default: String): String =
        kvClient.getValueAsString("config/feature-engineering/${key}").orElse(default)

    suspend fun start() {
        // Load configuration dynamically
        val inputTopic = getConfig("input.topic", "persistent://public/default/transactions")
        val outputTopic = getConfig("output.topic", "persistent://public/default/user-features")
        val subscriptionName = getConfig("subscription.name", "feature-engineering-service")
        val windowDurationSeconds = getConfig("window.duration.seconds", "600").toLong()

        logger.info("Starting processor with config: inputTopic=$inputTopic, outputTopic=$outputTopic, window=${windowDurationSeconds}s")

        // Initialize Pulsar producer and consumer
        producer = pulsarClient.newProducer(Schema.JSON(UserFeatures::class.java))
            .topic(outputTopic)
            .create()

        consumer = pulsarClient.newConsumer(Schema.JSON(TransactionEvent::class.java))
            .topic(inputTopic)
            .subscriptionName(subscriptionName)
            .subscriptionType(SubscriptionType.Shared) // Allow multiple instances to consume in parallel
            .subscribe()
        
        // Start the main processing loop in a coroutine
        coroutineScope {
            launch(Dispatchers.IO) { // Use IO dispatcher for blocking calls
                processMessages(windowDurationSeconds)
            }
        }
    }

    private suspend fun processMessages(windowDurationSeconds: Long) {
        while (currentCoroutineContext().isActive) {
            try {
                val message = consumer.receive()
                val event = message.value
                
                val now = System.currentTimeMillis()
                
                // Add event to user's window
                val window = userTransactionWindows.computeIfAbsent(event.userId) { mutableListOf() }
                window.add(Pair(event.timestamp, event.amount))

                // Evict old events from window
                window.retainAll { (timestamp, _) -> (now - timestamp) < windowDurationSeconds * 1000 }
                
                // Calculate features
                val feature = UserFeatures(
                    userId = event.userId,
                    transactionCount10Min = window.size.toLong(),
                    totalAmount10Min = window.sumOf { it.second }
                )

                // Produce the new feature vector
                producer.sendAsync(feature).thenAccept { msgId ->
                    logger.debug("Produced feature for user ${event.userId} with message ID $msgId")
                }

                // Acknowledge message only after successful processing and production intent
                consumer.acknowledge(message)

            } catch (e: PulsarClientException) {
                // Proper error handling is crucial.
                // In a real system, you'd have more nuanced retry logic.
                logger.error("Failed to process message from Pulsar. Will retry.", e)
                delay(1000) // Backoff before retrying
            }
        }
    }

    fun stop() {
        logger.info("Stopping FeatureProcessor...")
        consumer.close()
        producer.close()
    }
}

The error handling is critical here. If producing the feature fails, we must not acknowledge the input message. Pulsar’s acknowledgment mechanism ensures that the message will be redelivered, preventing data loss. The state management userTransactionWindows is in-memory for this example, which is a major simplification. In production, this state must be externalized (e.g., using Pulsar’s built-in state functions or a separate Key-Value store) to survive pod restarts and enable stateful scaling.

Phase 3: Deployment and Orchestration with Kubeflow

With the service implemented, we need to deploy it and integrate it into a larger ML pipeline. First, we need a Kubernetes Deployment manifest. The key part of this manifest is the annotations that tell the Consul Connect injector to add the Envoy sidecar proxy. This is the first step towards a zero-trust network.

# kubernetes/feature-engineering-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: feature-engineering-service
  namespace: ml-pipelines
spec:
  replicas: 3
  selector:
    matchLabels:
      app: feature-engineering-service
  template:
    metadata:
      labels:
        app: feature-engineering-service
      annotations:
        # These annotations enable Consul Service Mesh
        'consul.hashicorp.com/connect-inject': 'true'
    spec:
      containers:
      - name: processor
        image: your-docker-registry/feature-engineering-service:v1.0.0
        ports:
        - containerPort: 8080 # For health checks
        env:
        # Inject Consul and Pulsar addresses.
        # Use Kubernetes secrets for sensitive data like ACL tokens.
        - name: CONSUL_HTTP_ADDR
          value: "consul-server.consul.svc:8500"
        - name: PULSAR_SERVICE_URL
          value: "pulsar://pulsar-proxy.pulsar.svc:6650"
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 15
          periodSeconds: 10

Now, we define a Kubeflow Pipeline using the KFP SDK. This pipeline automates the entire process: it first updates the feature engineering configuration in Consul, then deploys our Kotlin service, waits for it to be ready, and finally launches a model training job that consumes the newly generated features from Pulsar.

# pipelines/training_pipeline.py

from kfp import dsl
from kfp.dsl import Input, Output, Dataset

@dsl.container_component
def update_feature_config(consul_address: str, window_seconds: int):
    """A pipeline component to update feature configuration in Consul KV."""
    return dsl.ContainerSpec(
        image='bitnami/curl:latest',
        command=[
            'curl',
            '-X', 'PUT',
            '--data', str(window_seconds),
            f'http://{consul_address}/v1/kv/config/feature-engineering/window.duration.seconds'
        ]
    )

@dsl.container_component
def model_training_job(
    pulsar_address: str,
    feature_topic: str,
    model_output: Output[Dataset]
):
    """A pipeline component that reads features from Pulsar and trains a model."""
    # In a real implementation, this would be a more complex component
    # using pandas, scikit-learn, and the pulsar-client python library.
    return dsl.ContainerSpec(
        image='python:3.9-slim',
        command=['sh', '-c'],
        args=[
            'pip install pulsar-client && '
            'echo "Simulating training job: Reading from Pulsar and writing model artifact..." && '
            f'echo "Model trained with data from {pulsar_address}/{feature_topic}" > {model_output.path}'
        ]
    )


@dsl.pipeline(
    name='Real-time Feature Training Pipeline',
    description='A pipeline that configures features, waits, and trains a model.'
)
def feature_training_pipeline(
    consul_address: str = 'consul-server.consul.svc:8500',
    pulsar_address: str = 'pulsar-proxy.pulsar.svc:6650',
    feature_topic: str = 'persistent://public/default/user-features',
    new_window_config: int = 300
):
    # Step 1: Update the feature configuration in Consul
    config_task = update_feature_config(
        consul_address=consul_address,
        window_seconds=new_window_config
    )

    # Step 2: Use a dummy task with a sleep to allow the feature engineering
    # service time to pick up the new config and generate features.
    # In a production pipeline, this would be a more sophisticated check,
    # perhaps querying Pulsar topic stats until enough new messages are present.
    wait_op = dsl.ContainerSpec(
        image='alpine:latest',
        command=['sleep', '120']
    )
    wait_task = dsl.ContainerOp(
        name='wait-for-features',
        container=wait_op,
    ).after(config_task)

    # Step 3: Run the training job which consumes from the feature topic
    train_task = model_training_job(
        pulsar_address=pulsar_address,
        feature_topic=feature_topic,
    ).after(wait_task)

This pipeline, when compiled and uploaded to Kubeflow, provides a single, repeatable workflow for our data scientists. They can trigger a run, pass in a new windowing parameter, and the system automatically reconfigures the live feature generation and kicks off a new training run based on the results.

The Assembled System

The final architecture provides a clean separation of concerns and a clear data flow.

graph TD
    subgraph "AWS EKS Cluster (Managed by Terraform)"
        subgraph "Namespace: pulsar"
            PulsarBroker[Pulsar Broker]
            PulsarProxy[Pulsar Proxy]
            Bookie[BookKeeper]
            Zookeeper[Zookeeper]
        end

        subgraph "Namespace: consul"
            ConsulServer[Consul Server]
            ConsulClient[Consul Client Agent]
        end

        subgraph "Namespace: ml-pipelines"
            direction LR
            KFP[Kubeflow Pipelines UI/Server] --> KFPRunner[Pipeline Runner Pod]
            
            subgraph "Feature Engineering Deployment"
                KotlinSvc1[Kotlin Service Pod 1]
                KotlinSvc2[Kotlin Service Pod 2]
                KotlinSvc3[Kotlin Service Pod 3]
            end
            
            TrainingPod[Model Training Pod]
        end
    end

    ExternalSource[External Event Source] -->|1. Raw Events| PulsarProxy
    PulsarProxy --> PulsarBroker
    PulsarBroker -- Stores Data --> Bookie
    
    KotlinSvc1 -- Reads Config --> ConsulServer
    KotlinSvc1 -- 2. Consumes Raw Events --> PulsarProxy
    KotlinSvc1 -- 3. Publishes Features --> PulsarProxy
    
    KFP -- Triggers Pipeline --> KFPRunner
    KFPRunner -- "4. updates config" --> ConsulServer
    KFPRunner -- "5. creates pod" --> TrainingPod
    
    TrainingPod -- 6. Consumes Features --> PulsarProxy

    style Zookeeper fill:#f9f,stroke:#333,stroke-width:2px
    style Bookie fill:#f9f,stroke:#333,stroke-width:2px
    style ConsulServer fill:#bbf,stroke:#333,stroke-width:2px

This setup solved our core problems. Training-serving skew was eliminated because both the training pipeline and any future online inference service would consume from the exact same Pulsar topic (user-features). The feature logic is encapsulated in a single, testable, and robust Kotlin service. The entire process is automated and auditable through Kubeflow. The infrastructure is repeatable and version-controlled via Terraform.

The current implementation, however, has clear boundaries and areas for the next iteration. State management for the feature engineering service is naive; moving from in-memory to a durable state store like RocksDB managed by the application or leveraging Pulsar Functions’ stateful capabilities is the highest priority for fault tolerance. The feature schema is implicit in the Kotlin data classes; integrating a schema registry (like Pulsar’s) is essential to manage schema evolution without breaking consumers. Finally, the “wait” step in our Kubeflow pipeline is brittle. A more robust solution would involve a sensor component that actively checks for the availability of sufficient new feature data in Pulsar before triggering the training job. This architecture isn’t an end state, but a solid, pragmatic foundation for building a truly scalable MLOps platform.


  TOC