Securing Multi-Tenant Spark Workloads with Dynamically Generated Cilium Policies via OAuth 2.0


The technical pain point was stark: our shared Kubernetes cluster for running Apache Spark jobs had become a security liability. We were scaling a data platform for multiple internal teams (tenants), and the default network model was flat. A Spark job submitted by Team A could, in theory, probe and connect to a database service intended only for Team B. Static NetworkPolicy objects were a non-starter. Spark executors are ephemeral, their IPs are unpredictable, and the sheer number of jobs meant manually managing policies was operationally impossible. We needed dynamic, job-specific, least-privilege network isolation that was tied directly to the identity of the user submitting the job.

Our initial concept was to build a mandatory gateway for all Spark job submissions. This gateway would act as a security checkpoint. Instead of users interacting directly with the Spark Operator or Kubernetes API, they would submit their job configurations to our service. The key idea was to leverage the OAuth 2.0 JWT bearer token they already used for authentication. This token, containing the user’s identity and specific data access scopes, would become the source of truth for generating a transient, hyper-specific CiliumNetworkPolicy for that job and only that job. Once the policy was in place, the gateway would then delegate the actual job submission to Spark.

The technology selection process was driven by pragmatism. We chose Java with Spring Boot for the gateway service. The Spark ecosystem is JVM-native, and Java’s robust libraries for JWT validation (spring-security-oauth2-resource-server) and Kubernetes API interaction (fabric8-kubernetes-client) made it a reliable choice. For the network enforcement layer, Cilium was the only real contender. Its identity-based security model, which uses pod labels instead of brittle IP addresses, was perfectly suited for the dynamic nature of Spark pods. More importantly, Cilium’s eBPF implementation meant enforcement happened at the kernel level, avoiding the performance overhead of sidecar proxies which would be unacceptable for data-intensive Spark shuffles. OAuth 2.0 was the natural choice for authorization, allowing us to define fine-grained scopes like data:read:sensitive-pii-db or data:write:analytics-kafka-topic which could be directly mapped to network egress rules.

The high-level architecture follows this flow:

sequenceDiagram
    participant User
    participant Gateway (Java)
    participant Auth Server
    participant Kubernetes API
    participant Cilium Agent
    participant Spark Pods

    User->>Gateway: POST /submit (Spark App, JWT)
    Gateway->>Auth Server: Validate JWT
    Auth Server-->>Gateway: Validation OK
    Gateway->>Gateway: Parse scopes from token
    Gateway->>Gateway: Generate job-specific CiliumNetworkPolicy
    Gateway->>Kubernetes API: Apply CiliumNetworkPolicy
    Kubernetes API-->>Cilium Agent: Propagate policy
    Cilium Agent->>Cilium Agent: Enforce via eBPF
    Gateway->>Kubernetes API: Submit SparkApplication CRD
    Kubernetes API->>Kubernetes API: Create Spark Driver/Executor Pods
    Note over Spark Pods: Pods inherit job-specific labels
    Spark Pods->>Spark Pods: Network traffic governed by new policy

Our first implementation step was the Java gateway. We used Spring Boot for rapid development. The core is a controller that accepts the Spark application details and a JWT.

// File: src/main/java/com/secure/spark/gateway/controller/SparkSubmissionController.java
package com.secure.spark.gateway.controller;

import com.secure.spark.gateway.service.CiliumPolicyManager;
import com.secure.spark.gateway.service.SparkJobSubmitter;
import com.secure.spark.gateway.model.SubmissionRequest;
import com.secure.spark.gateway.model.SubmissionResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("/api/v1/jobs")
public class SparkSubmissionController {

    private static final Logger log = LoggerFactory.getLogger(SparkSubmissionController.class);

    private final CiliumPolicyManager policyManager;
    private final SparkJobSubmitter jobSubmitter;

    public SparkSubmissionController(CiliumPolicyManager policyManager, SparkJobSubmitter jobSubmitter) {
        this.policyManager = policyManager;
        this.jobSubmitter = jobSubmitter;
    }

    @PostMapping("/submit")
    public ResponseEntity<SubmissionResponse> submitJob(
            @RequestBody SubmissionRequest request,
            @AuthenticationPrincipal Jwt jwt) {
        
        String jobId = "spark-job-" + UUID.randomUUID().toString();
        String userId = jwt.getSubject();
        log.info("Received job submission request for user '{}'. Assigning jobId '{}'", userId, jobId);

        try {
            // Step 1: Create and apply the network policy BEFORE submitting the job.
            // This is critical to avoid a race condition where pods start before their sandbox is ready.
            policyManager.createAndApplyPolicy(jobId, userId, jwt);
            log.info("Successfully applied Cilium network policy for jobId '{}'", jobId);

            // Step 2: Submit the Spark job, injecting the necessary labels for the policy to match.
            jobSubmitter.submit(jobId, userId, request);
            log.info("Successfully submitted Spark job '{}' to Kubernetes", jobId);

            return ResponseEntity.ok(new SubmissionResponse(jobId, "Job submitted successfully."));

        } catch (Exception e) {
            log.error("Failed to process job submission for user '{}', jobId '{}'", userId, jobId, e);
            
            // Critical cleanup step: If job submission fails after the policy was created, we must remove the orphaned policy.
            // A real-world project would use a more robust transactional saga pattern, but for now, a try-catch cleanup is essential.
            policyManager.cleanupPolicy(jobId);
            log.warn("Attempted cleanup of orphaned policy for failed jobId '{}'", jobId);
            
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                    .body(new SubmissionResponse(jobId, "Job submission failed: " + e.getMessage()));
        }
    }
}

Security is handled by Spring Security’s OAuth 2.0 resource server dependency. The configuration is straightforward, pointing to our identity provider’s JWK Set URI.

// File: src/main/java/com/secure/spark/gateway/config/SecurityConfig.java
package com.secure.spark.gateway.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.oauth2.jwt.JwtDecoder;
import org.springframework.security.oauth2.jwt.NimbusJwtDecoder;
import org.springframework.security.web.SecurityFilterChain;

@Configuration
@EnableWebSecurity
public class SecurityConfig {

    @Value("${spring.security.oauth2.resourceserver.jwt.jwk-set-uri}")
    private String jwkSetUri;

    @Bean
    public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
        http
            .authorizeHttpRequests(authorize -> authorize
                .requestMatchers("/api/v1/jobs/**").authenticated()
                .anyRequest().permitAll()
            )
            .oauth2ResourceServer(oauth2 -> oauth2.jwt(jwt -> {}))
            .sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
            .csrf(csrf -> csrf.disable()); // Assuming this is an internal API service
        return http.build();
    }

    @Bean
    public JwtDecoder jwtDecoder() {
        // This configuration assumes the Auth server provides a standard JWK Set endpoint.
        return NimbusJwtDecoder.withJwkSetUri(jwkSetUri).build();
    }
}

The core logic resides in the CiliumPolicyManager. This service uses the Fabric8 Kubernetes client to dynamically build a CiliumNetworkPolicy resource. The policy’s rules are derived directly from the scope claim in the JWT.

A common mistake here is to create overly complex policies. We designed a system where scopes map to pre-defined CiliumEndpoint labels. For instance, the scope data:read:postgres-main allows egress to any pod labeled db: postgres-main.

// File: src/main/java/com/secure/spark/gateway/service/CiliumPolicyManager.java
package com.secure.spark.gateway.service;

import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.utils.Serialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.Collections;

// These are custom resource definitions for Cilium.
// You would need to add the Cilium CRD dependency to your project.
import io.cilium.api.v2.CiliumNetworkPolicy;
import io.cilium.api.v2.EndpointSelector;
import io.cilium.api.v2.Rule;
import io.cilium.api.v2.EgressRule;
import io.cilium.api.v2.ToPort;

@Service
public class CiliumPolicyManager {

    private static final Logger log = LoggerFactory.getLogger(CiliumPolicyManager.class);
    private static final String SPARK_JOB_ID_LABEL = "spark-job-id";
    private static final String SPARK_USER_ID_LABEL = "spark-user-id";
    private static final String POLICY_NAMESPACE = "spark-jobs";

    private final KubernetesClient k8sClient;
    private final MixedOperation<CiliumNetworkPolicy, ?, ?> ciliumPolicyClient;

    public CiliumPolicyManager(KubernetesClient k8sClient) {
        this.k8sClient = k8sClient;
        // The client needs to be configured for the Cilium CRD
        this.ciliumPolicyClient = k8sClient.resources(CiliumNetworkPolicy.class);
    }

    public void createAndApplyPolicy(String jobId, String userId, Jwt jwt) {
        List<String> scopes = jwt.getClaimAsStringList("scope");
        if (scopes == null) {
            scopes = Collections.emptyList();
        }

        log.debug("Building policy for jobId '{}' with scopes: {}", jobId, scopes);
        
        CiliumNetworkPolicy policy = buildPolicy(jobId, userId, scopes);
        
        // This is an atomic operation: create or replace.
        ciliumPolicyClient.inNamespace(POLICY_NAMESPACE).resource(policy).createOrReplace();
        
        // In a production system, you might add a short wait/check loop here
        // to confirm the policy is active before proceeding, further mitigating race conditions.
    }

    private CiliumNetworkPolicy buildPolicy(String jobId, String userId, List<String> scopes) {
        CiliumNetworkPolicy policy = new CiliumNetworkPolicy();
        policy.setApiVersion("cilium.io/v2");
        policy.setKind("CiliumNetworkPolicy");

        ObjectMeta metadata = new ObjectMeta();
        metadata.setName("cnp-" + jobId);
        metadata.setNamespace(POLICY_NAMESPACE);
        metadata.setLabels(Map.of(
            "owner", "spark-gateway",
            SPARK_JOB_ID_LABEL, jobId
        ));
        policy.setMetadata(metadata);
        
        // The spec defines what this policy applies to.
        // It selects all pods with our unique job ID label.
        EndpointSelector endpointSelector = new EndpointSelector();
        LabelSelector selector = new LabelSelector();
        selector.setMatchLabels(Map.of(SPARK_JOB_ID_LABEL, jobId));
        endpointSelector.setMatchLabels(selector.getMatchLabels());
        policy.setSpec(new CiliumNetworkPolicy.Spec());
        policy.getSpec().setEndpointSelector(endpointSelector);

        // Build egress rules based on OAuth scopes
        List<EgressRule> egressRules = new ArrayList<>();
        
        // Rule 1: Always allow DNS traffic. A common pitfall is forgetting this,
        // which causes jobs to fail resolving hostnames.
        egressRules.add(createDnsEgressRule());
        
        // Rule 2: Allow communication between driver and executors within the same job.
        egressRules.add(createIntraJobEgressRule(jobId));

        // Rule 3: Dynamically add rules based on scopes.
        egressRules.addAll(scopes.stream()
                .map(this::scopeToEgressRule)
                .filter(java.util.Objects::nonNull)
                .collect(Collectors.toList()));
        
        policy.getSpec().setEgress(egressRules);
        
        // By default, if any rule matches, traffic is allowed. We apply ingress lockdown.
        // No ingress traffic is allowed to the job pods from outside the job itself.
        policy.getSpec().setIngress(List.of(createIntraJobIngressRule(jobId)));

        log.trace("Generated CiliumNetworkPolicy YAML:\n{}", Serialization.asYaml(policy));
        return policy;
    }

    private EgressRule createDnsEgressRule() {
        // Allows UDP traffic to port 53 for kube-dns service.
        EgressRule dnsRule = new EgressRule();
        LabelSelector dnsSelector = new LabelSelector(null, Map.of("k8s-app", "kube-dns"));
        dnsRule.setToEntities(List.of("kube-dns"));
        dnsRule.setToPorts(List.of(new ToPort(53, "UDP")));
        return dnsRule;
    }

    private EgressRule createIntraJobEgressRule(String jobId) {
        // Allows all pods within the same job to communicate with each other.
        // This is essential for Spark driver/executor communication.
        EgressRule intraJobRule = new EgressRule();
        EndpointSelector toEndpoints = new EndpointSelector();
        toEndpoints.setMatchLabels(Map.of(SPARK_JOB_ID_LABEL, jobId));
        intraJobRule.setTo(List.of(toEndpoints));
        return intraJobRule;
    }
    
    private Rule createIntraJobIngressRule(String jobId) {
        Rule ingressRule = new Rule();
        EndpointSelector fromEndpoints = new EndpointSelector();
        fromEndpoints.setMatchLabels(Map.of(SPARK_JOB_ID_LABEL, jobId));
        ingressRule.setFrom(List.of(fromEndpoints));
        return ingressRule;
    }

    private EgressRule scopeToEgressRule(String scope) {
        // This is where the mapping logic lives.
        // e.g., scope "data:read:postgres-main" -> egress rule to pods with label "db: postgres-main" on port 5432
        if ("data:read:postgres-main".equals(scope)) {
            EgressRule rule = new EgressRule();
            EndpointSelector selector = new EndpointSelector();
            selector.setMatchLabels(Map.of("db", "postgres-main"));
            rule.setTo(List.of(selector));
            rule.setToPorts(List.of(new ToPort(5432, "TCP")));
            return rule;
        }
        if ("data:write:kafka-main".equals(scope)) {
            EgressRule rule = new EgressRule();
            EndpointSelector selector = new EndpointSelector();
            selector.setMatchLabels(Map.of("app", "kafka-broker"));
            rule.setTo(List.of(selector));
            rule.setToPorts(List.of(new ToPort(9092, "TCP")));
            return rule;
        }
        // A more advanced implementation might query a central policy definition service.
        log.warn("Unmapped scope '{}', ignoring for network policy.", scope);
        return null;
    }

    public void cleanupPolicy(String jobId) {
        log.warn("Cleaning up policy for jobId '{}'", jobId);
        try {
            ciliumPolicyClient.inNamespace(POLICY_NAMESPACE).withName("cnp-" + jobId).delete();
        } catch (Exception e) {
            log.error("Failed to cleanup policy for jobId '{}'. It may require manual intervention.", jobId, e);
        }
    }
}

The final piece of the puzzle was configuring the Spark submission itself. We used the Spark on Kubernetes Operator, which accepts a YAML definition for a SparkApplication. The gateway service is responsible for creating this resource. The crucial part is injecting the spark-job-id and spark-user-id labels into the pod templates for both the driver and executors.

// File: src/main/java/com/secure/spark/gateway/service/SparkJobSubmitter.java
package com.secure.spark.gateway.service;

import com.secure.spark.gateway.model.SubmissionRequest;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

// You need the Spark Operator CRD dependency for this class to compile
// import com.google.cloud.spark.operator.v1beta2.SparkApplication;
// import com.google.cloud.spark.operator.v1beta2.SparkApplicationSpec;
// ... and so on

@Service
public class SparkJobSubmitter {
    
    private static final Logger log = LoggerFactory.getLogger(SparkJobSubmitter.class);
    private static final String SPARK_JOB_ID_LABEL = "spark-job-id";
    private static final String SPARK_USER_ID_LABEL = "spark-user-id";
    private static final String POLICY_NAMESPACE = "spark-jobs";

    private final KubernetesClient k8sClient;
    // private final MixedOperation<SparkApplication, ?, ?> sparkAppClient;

    public SparkJobSubmitter(KubernetesClient k8sClient) {
        this.k8sClient = k8sClient;
        // this.sparkAppClient = k8sClient.resources(SparkApplication.class);
    }
    
    public void submit(String jobId, String userId, SubmissionRequest request) {
        // This is a conceptual implementation. The actual SparkApplication object is complex.
        // The key is to add labels to driver and executor specs.
        
        // SparkApplication sparkApp = new SparkApplication();
        // sparkApp.getMetadata().setName(jobId);
        // sparkApp.getMetadata().setNamespace(POLICY_NAMESPACE);
        
        // SparkApplicationSpec spec = request.getSparkApplicationSpec();
        
        // Inject labels for the driver
        // spec.getDriver().getLabels().put(SPARK_JOB_ID_LABEL, jobId);
        // spec.getDriver().getLabels().put(SPARK_USER_ID_LABEL, userId);
        
        // Inject labels for the executors
        // spec.getExecutor().getLabels().put(SPARK_JOB_ID_LABEL, jobId);
        // spec.getExecutor().getLabels().put(SPARK_USER_ID_LABEL, userId);
        
        // sparkApp.setSpec(spec);
        
        // sparkAppClient.inNamespace(POLICY_NAMESPACE).resource(sparkApp).create();

        log.info("Conceptual: Submitting SparkApplication CRD with injected labels for jobId: {}", jobId);
        
        // A simpler approach without the operator is to use spark-submit with config flags:
        // --conf spark.kubernetes.driver.labels.spark-job-id=...
        // --conf spark.kubernetes.executor.labels.spark-job-id=...
        // The gateway would build and execute this command.
    }
}

A significant problem we encountered was policy lifecycle management. What happens to the CiliumNetworkPolicy after a job finishes? Leaving them around would pollute the cluster with thousands of useless policies. Our solution was twofold. First, the SparkApplication CRD created by our gateway is configured with a OwnerReference pointing to the policy, so deleting the job could cascade to the policy. However, this coupling felt wrong. The more robust solution was to implement a background reaper thread in the gateway. It periodically queries for policies owned by the gateway and cross-references them with active SparkApplication resources. If a policy exists for a job that is no longer running (status COMPLETED or FAILED), it gets deleted. This asynchronous cleanup is more resilient to gateway restarts or submission failures.

The final result is a system where security policy is no longer a static, manual configuration. It is ephemeral, automated, and directly derived from user identity at the moment of execution. A job submitted with a token that only has scope data:read:postgres-main is physically prevented at the kernel level from even opening a TCP socket to our Kafka cluster, providing a powerful layer of Zero Trust networking for our entire Spark platform.

This solution, however, has its boundaries. The gateway is a potential bottleneck and a single point of failure; it requires a highly available deployment with multiple replicas. The scope-to-policy mapping logic is currently hardcoded in Java; a more mature version should externalize this logic to a dedicated policy engine like OPA, allowing security teams to manage policies without code changes. Furthermore, this system addresses network-layer isolation. It does not replace the need for authentication at the application layer (e.g., database credentials, which should be mounted via secrets) or resource quotas managed by Kubernetes. It is one critical, dynamic layer in a comprehensive multi-tenant security strategy.


  TOC