Building Immutable Ingestion Infrastructure on AKS for React Native Analytics with Hudi, Packer, and Jest


Our mobile analytics ingestion pipeline was a constant source of on-call alerts. It was a collection of brittle scripts processing raw JSON events from our React Native application, dumping them as partitioned Parquet files onto Azure Blob Storage. Late-arriving data from devices with intermittent connectivity corrupted daily partitions, schema drifts from new app versions broke downstream queries without warning, and our Kubernetes worker nodes on AKS suffered from configuration drift after months of manual hotfixes. The core problem was that we were treating our data infrastructure as a mutable system, leading to unpredictability and operational toil.

The shift in mindset was to treat the entire pipeline—from the infrastructure nodes to the data transformation logic—as a single, version-controlled, immutable artifact. This led us to a rather unconventional combination of technologies. We decided to build custom, hardened VM images for our AKS node pools using Packer to eliminate configuration drift. We chose Apache Hudi to create a transactional data lakehouse capable of handling updates and late data gracefully. And, in the most unorthodox move, we repurposed Jest, our frontend testing framework, to create a fast, reliable contract testing layer for our Spark-based data transformation logic, catching schema violations before they ever reached production.

Phase 1: Eradicating Node Drift with Packer-Built AKS Images

The first point of failure was the infrastructure itself. Our AKS node pools were based on standard Azure Marketplace images, customized post-deployment with startup scripts. This approach was unreliable. A script failure, a transient network issue, or a manual kubectl exec change would create a “snowflake” node, impossible to replicate and a nightmare to debug. The solution was to adopt an immutable infrastructure pattern by pre-baking our node configurations into a custom Azure VM image.

Packer was the obvious tool for this. It allows us to define an image declaratively and provision it with the exact tools and configurations we need. Our goal was to create a golden image containing a specific Java version, Spark dependencies, monitoring agents, and security hardening configurations. This image would then be stored in an Azure Shared Image Gallery (SIG) for efficient distribution across regions.

Here is the Packer template (aks-node.pkr.hcl) that defines our custom node image.

packer {
  required_plugins {
    azure = {
      version = ">= 2.0.0"
      source  = "github.com/hashicorp/azure"
    }
  }
}

variable "client_id" {
  type    = string
  default = env("AZURE_CLIENT_ID")
}

variable "client_secret" {
  type      = string
  default   = env("AZURE_CLIENT_SECRET")
  sensitive = true
}

variable "subscription_id" {
  type    = string
  default = env("AZURE_SUBSCRIPTION_ID")
}

variable "tenant_id" {
  type    = string
  default = env("AZURE_TENANT_ID")
}

variable "resource_group_name" {
  type = string
}

variable "gallery_name" {
  type = string
}

variable "image_name" {
  type    = string
  default = "aks-spark-hudi-node"
}

variable "image_version" {
  type    = string
  default = "1.0.0"
}

source "azure-arm" "aks_node" {
  // Authentication
  client_id       = var.client_id
  client_secret   = var.client_secret
  subscription_id = var.subscription_id
  tenant_id       = var.tenant_id

  // Base Image
  source_image_reference {
    publisher = "canonical"
    offer     = "0001-com-ubuntu-server-focal"
    sku       = "20_04-lts-gen2"
    version   = "latest"
  }
  
  // Target VM size for building
  vm_size = "Standard_D4s_v3"
  os_type = "Linux"

  // Managed image is temporary; the final artifact is the gallery image.
  managed_image_name                 = "packer-temp-img-${uuid()}"
  managed_image_resource_group_name  = var.resource_group_name
  
  // Final artifact destination: Shared Image Gallery
  shared_image_gallery_destination {
    resource_group       = var.resource_group_name
    gallery_name         = var.gallery_name
    image_name           = var.image_name
    image_version        = var.image_version
    replication_regions  = ["eastus", "westus2"]
  }
}

build {
  sources = ["source.azure-arm.aks_node"]

  provisioner "shell" {
    inline = [
      "echo 'Waiting for cloud-init to finish...'",
      "cloud-init status --wait",
      "sudo apt-get update -y",
      "sudo apt-get install -y openjdk-11-jdk-headless python3-pip",
      "sudo apt-get clean"
    ]
  }

  provisioner "shell" {
    script = "./scripts/install-monitoring-agents.sh"
  }
  
  provisioner "shell" {
    inline = [
      "echo 'Hardening SSH configuration...'",
      "sudo sed -i 's/#PermitRootLogin prohibit-password/PermitRootLogin no/' /etc/ssh/sshd_config",
      "sudo sed -i 's/#PasswordAuthentication yes/PasswordAuthentication no/' /etc/ssh/sshd_config",
      "sudo systemctl restart sshd"
    ]
  }

  // This step is critical for AKS custom images. It prepares the image
  // with necessary Kubernetes components. Without it, the node won't join the cluster.
  provisioner "shell" {
    execute_command = "sudo {{.Path}}"
    inline = [
        "wget https://raw.githubusercontent.com/Azure/aks-engine/master/parts/k8s/cloud-init/artifacts/cse_main.sh",
        "chmod +x cse_main.sh",
        "./cse_main.sh"
    ]
  }

  // At the end of provisioning, we must deprovision the machine to generalize it.
  provisioner "shell" {
    execute_command = "sudo {{.Path}}"
    inline = [
      "/usr/sbin/waagent -force -deprovision+user && export HISTSIZE=0 && sync"
    ]
  }
}

This Packer build does four main things:

  1. Starts from a standard Ubuntu 20.04 Gen2 image.
  2. Uses shell provisioners to install OpenJDK (for Spark) and run other setup scripts.
  3. Runs the official AKS cse_main.sh script, which installs the required Kubernetes components (kubelet, containerd, etc.) and prepares the node to join an AKS cluster. This is a non-negotiable step.
  4. Deprovisions the VM so it can be captured as a generalized image.

Once this image is built and published to our Shared Image Gallery, attaching it to a new AKS node pool is a straightforward CLI command. We create a dedicated node pool for our data processing workloads that uses this specific, versioned image.

# Variables
RESOURCE_GROUP="my-data-rg"
AKS_CLUSTER_NAME="my-aks-cluster"
NODEPOOL_NAME="data_processing_pool"
SIG_ID=$(az sig show --resource-group $RESOURCE_GROUP --gallery-name myAKSSIG --query id -o tsv)
IMAGE_DEFINITION_NAME="aks-spark-hudi-node"
IMAGE_VERSION="1.0.0"
CUSTOM_IMAGE_ID="$SIG_ID/images/$IMAGE_DEFINITION_NAME/versions/$IMAGE_VERSION"

# Create a new nodepool in an existing AKS cluster using the custom image
az aks nodepool add \
  --resource-group $RESOURCE_GROUP \
  --cluster-name $AKS_CLUSTER_NAME \
  --name $NODEPOOL_NAME \
  --node-count 1 \
  --node-vm-size Standard_E4ds_v4 \
  --node-image-id $CUSTOM_IMAGE_ID \
  --labels workload=spark-ingestion \
  --taints workload=spark-ingestion:NoSchedule

The taint and label ensure that only our Spark pods are scheduled onto these specialized, pre-warmed nodes. This completely eliminated configuration drift and significantly reduced pod startup times, as required dependencies were already present on the node filesystem.

Phase 2: Transactional Ingestion with Apache Hudi on AKS

With a stable foundation, we tackled the data layer. Our React Native app generates events representing user interactions. A typical event payload looks like this:

{
  "eventId": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
  "userId": "user-123",
  "sessionId": "session-xyz",
  "eventType": "ITEM_ADDED_TO_CART",
  "timestamp": "2023-10-27T10:00:00.123Z",
  "payload": {
    "itemId": "product-456",
    "quantity": 1,
    "price": 99.99
  },
  "deviceInfo": {
    "os": "iOS",
    "osVersion": "16.5",
    "appVersion": "2.1.0"
  }
}

The primary issues were:

  1. Updates: A user might update their profile information, which should be reflected across all their historical events. Simple Parquet files make this impossible without rewriting entire partitions.
  2. Late Data: An event from two days ago might arrive now. With a date-partitioned layout, this required complex and slow reprocessing of the old partition.
  3. Schema Evolution: App version 2.2.0 might add a new field, payload.discountCode. Downstream consumers not expecting this field would fail.

Apache Hudi, running on Spark within our AKS cluster, solves these problems. Hudi provides an abstraction over storage (like Azure Blob), turning it into a transactional data lake. It supports UPSERT operations, which was the key feature we needed.

We designed a Spark job to read a micro-batch of raw JSON events from Kafka, transform them, and perform an upsert into a Hudi table. The table was partitioned by appVersion and eventType for efficient querying.

Here’s the core of the Spark processing script (process_events.py):

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, MapType

# Kafka and Hudi configurations from environment variables
KAFKA_BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS")
KAFKA_TOPIC = os.environ.get("KAFKA_TOPIC", "react-native-events")
HUDI_TABLE_PATH = os.environ.get("HUDI_TABLE_PATH") # e.g., abfs://[email protected]/events
HUDI_TABLE_NAME = "mobile_events"

# Define the schema for the incoming JSON events from Kafka
# In a real project, this should be loaded from a schema registry.
event_schema = StructType([
    StructField("eventId", StringType(), False),
    StructField("userId", StringType(), False),
    StructField("sessionId", StringType(), True),
    StructField("eventType", StringType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("payload", MapType(StringType(), StringType()), True),
    StructField("deviceInfo", StructType([
        StructField("os", StringType(), True),
        StructField("osVersion", StringType(), True),
        StructField("appVersion", StringType(), True)
    ]), True)
])

def main():
    spark = SparkSession.builder \
        .appName("Hudi Ingestion from React Native Events") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.hive.convertMetastoreParquet", "false") \
        .getOrCreate()
    
    # Read from Kafka
    raw_events_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
        .option("subscribe", KAFKA_TOPIC) \
        .option("startingOffsets", "latest") \
        .load()

    # Deserialize JSON and apply schema
    events_df = raw_events_df.select(from_json(col("value").cast("string"), event_schema).alias("data")).select("data.*")

    # Core transformation and enrichment logic could go here.
    # For this example, we'll just flatten the deviceInfo for partitioning.
    transformed_df = events_df \
        .withColumn("appVersion", col("deviceInfo.appVersion")) \
        .withColumn("os", col("deviceInfo.os"))

    # Hudi write options
    hudi_options = {
        'hoodie.table.name': HUDI_TABLE_NAME,
        'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
        'hoodie.datasource.write.operation': 'upsert',
        'hoodie.datasource.write.recordkey.field': 'eventId',
        'hoodie.datasource.write.precombine.field': 'timestamp', # On duplicate eventId, keep the one with the latest timestamp
        'hoodie.datasource.write.partitionpath.field': 'appVersion,eventType',
        'hoodie.datasource.write.hive_style_partitioning': 'true',
        'hoodie.upsert.shuffle.parallelism': 2,
        'hoodie.insert.shuffle.parallelism': 2
    }

    def write_to_hudi(batch_df, batch_id):
        print(f"Processing batch ID: {batch_id}")
        if batch_df.count() == 0:
            print("Empty batch, skipping.")
            return

        batch_df.write.format("hudi") \
            .options(**hudi_options) \
            .mode("append") \
            .save(HUDI_TABLE_PATH)

    # Start the streaming query
    query = transformed_df.writeStream \
        .foreachBatch(write_to_hudi) \
        .option("checkpointLocation", f"{HUDI_TABLE_PATH}/_checkpoints") \
        .trigger(processingTime='1 minute') \
        .start()

    query.awaitTermination()

if __name__ == "__main__":
    main()

We packaged this script into a Docker container and deployed it to AKS using a Kubernetes Job or a SparkApplication CRD. The key Hudi options are:

  • hoodie.datasource.write.operation: 'upsert': This is the magic. If an event with the same recordkey.field (eventId) arrives, Hudi will update the existing record instead of creating a duplicate.
  • hoodie.datasource.write.precombine.field: 'timestamp': If two records with the same key appear in the same batch, Hudi uses this field to pick the winner. We always keep the latest one.
  • hoodie.datasource.write.partitionpath.field: 'appVersion,eventType': This structures our data lake for efficient queries based on the application version or the type of event.

Phase 3: Contract Testing the Pipeline with Jest

The Spark job was robust, but it introduced a new problem: how do we test changes to the transformation logic? A simple typo in the process_events.py script could lead to data corruption. The traditional approach of spinning up a full Spark cluster in CI for every pull request was too slow and expensive.

This is where we made our most controversial and effective decision. Our team was already proficient with JavaScript and Jest for testing the React Native app. We decided to create a parallel, lightweight implementation of the Spark transformation logic in TypeScript and use Jest to run “contract tests” against it. This test suite wouldn’t validate Spark itself; it would validate that for a given input JSON, the transformation produced the expected output structure and values. It became a quality gate for our data contracts.

First, we defined our canonical data contract using JSON Schema.

schemas/event-contract.jsonschema:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "TransformedMobileEvent",
  "description": "The final schema of a mobile event after processing, ready for Hudi.",
  "type": "object",
  "properties": {
    "eventId": { "type": "string", "format": "uuid" },
    "userId": { "type": "string" },
    "timestamp": { "type": "string", "format": "date-time" },
    "eventType": { "type": "string" },
    "appVersion": { "type": "string" },
    "os": { "type": "string" },
    "isNewUser": { "type": "boolean" },
    "payload_flat": { "type": "string" }
  },
  "required": ["eventId", "userId", "timestamp", "eventType", "appVersion", "isNewUser", "payload_flat"]
}

Next, we implemented the transformation logic in TypeScript. This logic must be kept in sync with the Python/Spark implementation.

src/transformer.ts:

// A simplified representation of the event structures
interface RawEvent {
  eventId: string;
  userId: string;
  eventType: string;
  timestamp: string; // ISO string
  payload: Record<string, any>;
  deviceInfo: {
    os: string;
    osVersion: string;
    appVersion: string;
  };
}

interface TransformedEvent {
  eventId: string;
  userId: string;
  timestamp: string;
  eventType: string;
  appVersion: string;
  os: string;
  isNewUser: boolean; // Example of an enriched field
  payload_flat: string; // Example of a complex type being flattened
}

// This function must mirror the logic in the Spark job
export function transformEvent(event: RawEvent, isNewUserLookup: boolean = false): TransformedEvent {
  if (!event.eventId || !event.deviceInfo.appVersion) {
    throw new Error('Missing required fields for transformation.');
  }

  return {
    eventId: event.eventId,
    userId: event.userId,
    timestamp: event.timestamp,
    eventType: event.eventType.toUpperCase(), // Business rule: eventType is always uppercase
    appVersion: event.deviceInfo.appVersion,
    os: event.deviceInfo.os,
    isNewUser: isNewUserLookup, // An enriched field
    payload_flat: JSON.stringify(event.payload), // Flatten complex object to string
  };
}

Finally, the Jest test suite. We used fixture files for input and Jest’s snapshot testing to detect any unintended changes in the output.

src/transformer.test.ts:

import { transformEvent } from './transformer';
import * as Ajv from 'ajv';
import * as addFormats from 'ajv-formats';
import eventSchema from '../schemas/event-contract.jsonschema';

// Sample raw event fixture, mimicking what comes from Kafka
const rawEventFixture = {
  eventId: 'a1b2c3d4-e5f6-7890-1234-567890abcdef',
  userId: 'user-123',
  sessionId: 'session-xyz',
  eventType: 'item_added_to_cart', // lowercase to test transformation
  timestamp: '2023-10-27T10:00:00.123Z',
  payload: {
    itemId: 'product-456',
    quantity: 1,
    price: 99.99,
  },
  deviceInfo: {
    os: 'iOS',
    osVersion: '16.5',
    appVersion: '2.1.0',
  },
};

describe('Data Transformation Logic', () => {
  let validator: Ajv.ValidateFunction;

  beforeAll(() => {
    const ajv = new Ajv();
    addFormats(ajv);
    validator = ajv.compile(eventSchema);
  });

  it('should transform a raw event correctly and match the snapshot', () => {
    const transformed = transformEvent(rawEventFixture, true);
    
    // Snapshot testing is powerful here. If the transformation logic changes,
    // the test will fail, forcing the developer to consciously update the snapshot.
    expect(transformed).toMatchSnapshot();
  });

  it('should produce a valid record according to the JSON Schema contract', () => {
    const transformed = transformEvent(rawEventFixture, false);
    const isValid = validator(transformed);

    // If schema validation fails, we get detailed errors.
    if (!isValid) {
      console.error(validator.errors);
    }
    
    expect(isValid).toBe(true);
  });

  it('should convert eventType to uppercase', () => {
    const transformed = transformEvent(rawEventFixture);
    expect(transformed.eventType).toBe('ITEM_ADDED_TO_CART');
  });

  it('should throw an error if a required field is missing', () => {
    const invalidEvent = { ...rawEventFixture, eventId: '' };
    // @ts-ignore
    expect(() => transformEvent(invalidEvent)).toThrow('Missing required fields for transformation.');
  });
});

This Jest suite runs in seconds in our CI pipeline. If a developer changes the Spark job, they are required to update the TypeScript implementation and the corresponding Jest tests. A failure in this stage blocks the PR. This doesn’t eliminate the need for full end-to-end tests in a staging environment, but it catches 99% of schema and logic errors far earlier and faster. The pitfall here is the potential for divergence between the two implementations, which requires team discipline to manage. The JSON Schema contract serves as the ultimate source of truth that both sides must adhere to.

Tying It Together: The Full Pipeline

The final architecture provides a robust, versioned, and testable flow from the client to the data lake.

graph TD
    subgraph "React Native Client"
        A[User Interaction] --> B{Event Generation};
    end

    subgraph "Ingestion & Processing on AKS"
        C[Kafka Topic: raw-events]
        D[Spark Streaming Job on Custom Nodes]
        E[Jest Contract Tests in CI/CD]
        F[Packer Image Build]
    end

    subgraph "Data Lakehouse on Azure Blob"
        G[Apache Hudi Table]
    end

    subgraph "CI/CD Pipeline"
        H[Code Change in Git] -- Triggers --> E;
        E -- On Success --> I{Build Spark App Image};
        H -- Infra Change --> F;
        F -- Publishes --> J[Shared Image Gallery];
        I -- Deploys to --> D;
    end
    
    B --> C;
    C --> D;
    D -- Upserts --> G;

This entire system, from the underlying node image to the data transformation code, is now managed through Git. The fragility is gone. A change to the node requires a Packer build and a new image version. A change to the data logic is caught by Jest before it can cause damage. The Hudi table handles the inherent messiness of mobile event data without data loss.

The most significant remaining challenge is the disciplined maintenance of the parallel transformation logic in both Python/Spark and TypeScript. We mitigate this by keeping the logic simple and relying heavily on the shared JSON Schema contract. A future iteration might explore code generation from a common DSL to create both implementations, but for now, the process relies on developer diligence. Furthermore, Spark job cost optimization on AKS is a continuous effort, requiring ongoing tuning of executor sizes and dynamic allocation settings to balance performance and spend.


  TOC