Canary Rollout of IAM Policies in Kubernetes via a Message-Driven Custom Controller


A cluster-wide IAM policy change pushed via kubectl apply triggered a cascading failure last quarter. A misconfigured service account permission, intended for a single namespace, was accidentally made too broad, locking out critical monitoring components that relied on a more specific role. The rollback was manual, stressful, and involved waking up three on-call engineers. This incident made it painfully clear that our deployment practices for application code—canary releases, progressive delivery—were completely absent from our security and policy management. Treating critical configuration like IAM as a simple atomic update is a significant source of operational risk.

We needed a system to canary-release these IAM policy changes. The goal was to apply a new, potentially risky policy to a small subset of service instances, monitor for negative effects like a spike in access denials, and only then proceed with a full rollout.

Our first thought was a complex CI pipeline with shell scripts. This is a common mistake. Scripts lack state management, are difficult to test, and fail ungracefully. A real-world project requires a declarative, self-healing mechanism. This led us to the standard cloud-native solution: a custom Kubernetes controller. To decouple the trigger mechanism from the controller’s reconciliation loop, we placed a message queue in between. This prevents a failed deployment job from being tightly coupled to the policy enforcement logic; it just fires an event and moves on.

The final architecture settled on these components:

  1. A NATS Message Queue: For lightweight, high-performance messaging. It serves as the event bus for initiating policy rollouts.
  2. A Go-based Kubernetes Controller: This is the brain. It subscribes to NATS, receives policy change requests, and manipulates Kubernetes resources to enact the canary release.
  3. Istio Service Mesh: For policy enforcement. Instead of application-level IAM checks, we leverage the service mesh sidecar. This is critical because it requires no changes to the application code. The controller will manage Istio’s AuthorizationPolicy resources.
  4. A “Frontend” Client: In our case, this isn’t a web UI but our CI/CD system, represented here by a simple command-line Go application that publishes the initial request to NATS.

The entire flow is event-driven and asynchronous.

sequenceDiagram
    participant Client as CI/CD System (Frontend)
    participant NATS
    participant Controller as IAM Policy Controller
    participant K8s_API as Kubernetes API Server
    participant Istio_Sidecar as Istio Sidecar (Proxy)

    Client->>+NATS: Publish PolicyCanaryRequest message
    NATS-->>-Controller: Delivers message
    Controller->>+K8s_API: LIST Pods for target service
    K8s_API-->>-Controller: Return Pod list
    Controller->>Controller: Calculate canary subset (e.g., 10%)
    Controller->>+K8s_API: ADD label `iam-canary=true` to subset of Pods
    K8s_API-->>-Controller: Confirm label update
    Controller->>+K8s_API: CREATE Istio AuthorizationPolicy for `iam-canary=true` (New Policy)
    K8s_API-->>-Controller: Confirm policy creation
    Controller->>+K8s_API: CREATE/UPDATE Istio AuthorizationPolicy for `!iam-canary` (Old Policy)
    K8s_API-->>-Controller: Confirm policy creation
    Note right of Istio_Sidecar: Istio distributes policies to sidecars
    Istio_Sidecar->>Istio_Sidecar: New policy is now enforced on canary pods

The Communication Contract: Defining the Policy Rollout Message

Before writing any logic, we defined the contract. What information does the controller need to perform a rollout? We settled on a JSON payload for the NATS message. Using Protobuf would be a better choice in a mature system for schema enforcement, but JSON is sufficient for demonstrating the principle.

This is the PolicyCanaryRequest structure.

{
  "requestId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
  "target": {
    "namespace": "production",
    "serviceName": "billing-api",
    "appLabel": "billing-api"
  },
  "canaryPercentage": 10,
  "policy": {
    "version": "v2",
    "rules": [
      {
        "from": {
          "principals": ["cluster.local/ns/production/sa/frontend-app"]
        },
        "to": {
          "methods": ["GET"],
          "paths": ["/api/v1/invoices/*"]
        }
      },
      {
        "from": {
          "principals": ["cluster.local/ns/monitoring/sa/prometheus"]
        },
        "to": {
          "methods": ["GET"],
          "paths": ["/metrics"]
        }
      }
    ]
  },
  "command": "START"
}

The command field is important; it allows us to later send PROMOTE or ROLLBACK commands over the same channel, making the system more versatile. The policy block is a simplified abstraction that our controller will translate into a formal Istio AuthorizationPolicy.

The Initiator: A Simple Go Client

Our CI pipeline is represented by this client. It’s responsible for validating the request locally and then publishing it to NATS. It’s built to be robust, with connection retries and structured logging.

cmd/publisher/main.go:

package main

import (
	"context"
	"encoding/json"
	"log/slog"
	"os"
	"time"

	"github.com/google/uuid"
	"github.comcom/nats-io/nats.go"
)

const (
	natsURL      = "nats://127.0.0.1:4222"
	natsSubject  = "iam.policy.rollouts"
	connectRetry = 5 * time.Second
)

// PolicyCanaryRequest defines the structure for a new policy rollout.
// It's the contract between the publisher and the controller.
type PolicyCanaryRequest struct {
	RequestID        string      `json:"requestId"`
	Target           Target      `json:"target"`
	CanaryPercentage int         `json:"canaryPercentage"`
	Policy           Policy      `json:"policy"`
	Command          string      `json:"command"` // START, PROMOTE, ROLLBACK
}

type Target struct {
	Namespace string `json:"namespace"`
	AppName   string `json:"appLabel"`
}

type Policy struct {
	Version string `json:"version"`
	Rules   []Rule `json:"rules"`
}

type Rule struct {
	From From `json:"from"`
	To   To   `json:"to"`
}

type From struct {
	Principals []string `json:"principals"`
}

type To struct {
	Methods []string `json:"methods"`
	Paths   []string `json:"paths"`
}

func main() {
	logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
	
	nc, err := connectToNATS(logger)
	if err != nil {
		logger.Error("Failed to connect to NATS after multiple retries", "error", err)
		os.Exit(1)
	}
	defer nc.Close()
	logger.Info("Successfully connected to NATS")

	// In a real CI/CD job, this request would be generated from a Git commit.
	request := PolicyCanaryRequest{
		RequestID:        uuid.New().String(),
		Target: Target{
			Namespace: "production",
			AppName:   "billing-api",
		},
		CanaryPercentage: 10,
		Policy: Policy{
			Version: "v2-restrictive-read",
			Rules: []Rule{
				{
					From: From{Principals: []string{"cluster.local/ns/production/sa/frontend-app"}},
					To: To{
						Methods: []string{"GET"},
						Paths:   []string{"/api/v1/invoices/*"},
					},
				},
                // The original policy allowed access to /metrics, this one removes it.
                // This is the kind of subtle, dangerous change we want to canary.
			},
		},
		Command: "START",
	}

	payload, err := json.Marshal(request)
	if err != nil {
		logger.Error("Failed to marshal request to JSON", "error", err)
		os.Exit(1)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	if err := nc.PublishMsg(&nats.Msg{Subject: natsSubject, Data: payload}); err != nil {
		logger.Error("Failed to publish message to NATS", "error", err)
		os.Exit(1)
	}

	// Wait for the message to be processed by the server.
	if err := nc.FlushWithContext(ctx); err != nil {
		logger.Error("Failed to flush NATS connection", "error", err)
	}

	logger.Info("Successfully published policy canary request", "requestId", request.RequestID, "subject", natsSubject)
}

func connectToNATS(logger *slog.Logger) (*nats.Conn, error) {
	var nc *nats.Conn
	var err error
	for i := 0; i < 5; i++ {
		nc, err = nats.Connect(natsURL)
		if err == nil {
			return nc, nil
		}
		logger.Warn("Failed to connect to NATS, retrying...", "attempt", i+1, "error", err)
		time.Sleep(connectRetry)
	}
	return nil, err
}

This client is simple but production-minded. It logs in a structured format and handles connection logic gracefully.

The Controller: Logic for Safe Policy Rollout

This is the core of the system. It’s a long-running process deployed in the Kubernetes cluster with permissions to manage Pods and Istio AuthorizationPolicy resources.

The main components of the controller are:

  1. Kubernetes Client: Using client-go to interact with the API server.
  2. Istio Client: Using the official Istio client library for managing AuthorizationPolicy custom resources.
  3. NATS Subscriber: Listens for PolicyCanaryRequest messages.
  4. Reconciliation Logic: The handler that executes the canary deployment steps.

Here is a stripped-down but functional version of the controller.

pkg/controller/controller.go:

package controller

import (
	"context"
	"encoding/json"
	"fmt"
	"log/slog"
	"math"

	"github.com/nats-io/nats.go"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/kubernetes"
	
	istioclient "istio.io/client-go/pkg/clientset/versioned"
	istiosecurity "istio.io/api/security/v1beta1"
	istioapiv1beta1 "istio.io/api/security/v1beta1"
	istionetworkingv1beta1 "istio.io/client-go/pkg/apis/security/v1beta1"

)

const (
	CanaryLabelKey   = "iam-canary-policy"
	CanaryLabelValue = "active"
	ManagedByLabel   = "iam-policy-controller"
)

type Controller struct {
	logger    *slog.Logger
	kubeClient kubernetes.Interface
	istioClient istioclient.Interface
	natsConn  *nats.Conn
}

func New(logger *slog.Logger, kc kubernetes.Interface, ic istioclient.Interface, nc *nats.Conn) *Controller {
	return &Controller{
		logger:    logger,
		kubeClient: kc,
		istioClient: ic,
		natsConn:  nc,
	}
}

func (c *Controller) Run(ctx context.Context, subject string) error {
	sub, err := c.natsConn.Subscribe(subject, c.handleMessage)
	if err != nil {
		return fmt.Errorf("failed to subscribe to NATS subject %s: %w", subject, err)
	}
	
	c.logger.Info("Controller started, waiting for messages", "subject", subject)
	<-ctx.Done()
	
	// Gracefully unsubscribe
	if err := sub.Unsubscribe(); err != nil {
		c.logger.Error("Failed to unsubscribe from NATS", "error", err)
	}
	
	return nil
}

// handleMessage is the core reconciliation loop triggered by a NATS message.
func (c *Controller) handleMessage(msg *nats.Msg) {
	var req PolicyCanaryRequest
	if err := json.Unmarshal(msg.Data, &req); err != nil {
		c.logger.Error("Failed to unmarshal NATS message", "error", err, "subject", msg.Subject)
		return
	}

	logger := c.logger.With("requestId", req.RequestID, "targetApp", req.Target.AppName, "namespace", req.Target.Namespace)
	logger.Info("Received new policy canary request")

	// A real controller would have a state machine here, but for now, we only handle START.
	if req.Command != "START" {
		logger.Warn("Ignoring non-START command", "command", req.Command)
		return
	}
	
	// The main logic for starting a canary rollout
	if err := c.startCanary(context.Background(), logger, req); err != nil {
		logger.Error("Failed to process canary request", "error", err)
		// Here you would publish a failure event back to NATS
	} else {
		logger.Info("Successfully initiated canary policy rollout")
	}
}

func (c *Controller) startCanary(ctx context.Context, logger *slog.Logger, req PolicyCanaryRequest) error {
	// Step 1: Clean up any previous canary state for this app. This is crucial for idempotency.
	if err := c.cleanupPreviousCanary(ctx, logger, req.Target); err != nil {
		return fmt.Errorf("pre-flight cleanup failed: %w", err)
	}

	// Step 2: Find the target pods.
	podSelector := labels.Set{"app": req.Target.AppName}.AsSelector()
	podList, err := c.kubeClient.CoreV1().Pods(req.Target.Namespace).List(ctx, metav1.ListOptions{LabelSelector: podSelector.String()})
	if err != nil {
		return fmt.Errorf("failed to list pods for app %s: %w", req.Target.AppName, err)
	}
	if len(podList.Items) == 0 {
		return fmt.Errorf("no pods found for app %s", req.Target.AppName)
	}

	// Step 3: Select pods for the canary group.
	canaryPods := c.selectCanaryPods(podList.Items, req.CanaryPercentage)
	if len(canaryPods) == 0 {
		logger.Warn("Canary percentage resulted in zero pods, no action taken")
		return nil
	}
	logger.Info("Selected pods for canary", "total", len(podList.Items), "canaryCount", len(canaryPods))

	// Step 4: Label the canary pods. This is the atomic operation that segments the fleet.
	for _, pod := range canaryPods {
		updatedPod := pod.DeepCopy()
		if updatedPod.Labels == nil {
			updatedPod.Labels = make(map[string]string)
		}
		updatedPod.Labels[CanaryLabelKey] = CanaryLabelValue
		_, err := c.kubeClient.CoreV1().Pods(req.Target.Namespace).Update(ctx, updatedPod, metav1.UpdateOptions{})
		if err != nil {
			// A single failure here could leave the system in a partial state.
			// A production controller would need a rollback mechanism.
			return fmt.Errorf("failed to apply canary label to pod %s: %w", pod.Name, err)
		}
	}
	
	// Step 5: Apply the new Istio AuthorizationPolicy for the canary group.
	if err := c.applyCanaryPolicy(ctx, logger, req); err != nil {
		return fmt.Errorf("failed to apply canary AuthorizationPolicy: %w", err)
	}

	// Step 6: Ensure the stable policy is in place for non-canary pods.
	// In a real system, you'd fetch the existing policy instead of assuming none exists.
	// For this example, we assume no policy existed before.
	if err := c.applyStablePolicy(ctx, logger, req.Target); err != nil {
		return fmt.Errorf("failed to apply stable AuthorizationPolicy: %w", err)
	}
	
	return nil
}

// selectCanaryPods calculates which pods to include in the canary group.
func (c *Controller) selectCanaryPods(pods []corev1.Pod, percentage int) []corev1.Pod {
	if percentage <= 0 {
		return []corev1.Pod{}
	}
	if percentage >= 100 {
		return pods
	}
	
	numCanary := int(math.Ceil(float64(len(pods)) * (float64(percentage) / 100.0)))
	return pods[:numCanary]
}

// applyCanaryPolicy creates the Istio AuthorizationPolicy targeting only the labeled canary pods.
func (c *Controller) applyCanaryPolicy(ctx context.Context, logger *slog.Logger, req PolicyCanaryRequest) error {
	policyName := fmt.Sprintf("%s-canary-policy-%s", req.Target.AppName, req.Policy.Version)
	logger.Info("Applying canary policy", "policyName", policyName)

	rules := []*istiosecurity.Rule{}
	for _, rule := range req.Policy.Rules {
		rules = append(rules, &istiosecurity.Rule{
			From: []*istiosecurity.Rule_From{
				{Source: &istiosecurity.Source{Principals: rule.From.Principals}},
			},
			To: []*istiosecurity.Rule_To{
				{Operation: &istiosecurity.Operation{
					Methods: rule.To.Methods,
					Paths:   rule.To.Paths,
				}},
			},
		})
	}
	
	authPolicy := &istionetworkingv1beta1.AuthorizationPolicy{
		ObjectMeta: metav1.ObjectMeta{
			Name:      policyName,
			Namespace: req.Target.Namespace,
			Labels: map[string]string{
				"app": req.Target.AppName,
				ManagedByLabel: "true",
			},
		},
		Spec: istioapiv1beta1.AuthorizationPolicy{
			Selector: &istioapiv1beta1.WorkloadSelector{
				MatchLabels: map[string]string{
					"app":           req.Target.AppName,
					CanaryLabelKey: CanaryLabelValue, // This is key: it only targets canary pods.
				},
			},
			Action: istioapiv1beta1.AuthorizationPolicy_ALLOW,
			Rules:  rules,
		},
	}

	_, err := c.istioClient.SecurityV1beta1().AuthorizationPolicies(req.Target.Namespace).Create(ctx, authPolicy, metav1.CreateOptions{})
	return err
}

// applyStablePolicy applies a default-deny policy to non-canary pods.
// A real implementation would be more nuanced, perhaps preserving the "old" known-good policy.
func (c *Controller) applyStablePolicy(ctx context.Context, logger *slog.Logger, target Target) error {
	policyName := fmt.Sprintf("%s-stable-deny", target.AppName)
	logger.Info("Applying stable default-deny policy", "policyName", policyName)

	authPolicy := &istionetworkingv1beta1.AuthorizationPolicy{
		ObjectMeta: metav1.ObjectMeta{
			Name:      policyName,
			Namespace: target.Namespace,
			Labels: map[string]string{
				"app": target.AppName,
				ManagedByLabel: "true",
			},
		},
		Spec: istioapiv1beta1.AuthorizationPolicy{
			Selector: &istioapiv1beta1.WorkloadSelector{
				MatchLabels: map[string]string{
					"app": target.AppName,
				},
			},
			// No rules means DENY by default if an AuthorizationPolicy exists.
			// This is a simplistic approach; a better one would be to apply the PREVIOUS policy
			// to pods that do NOT have the canary label.
		},
	}

	_, err := c.istioClient.SecurityV1beta1().AuthorizationPolicies(target.Namespace).Create(ctx, authPolicy, metav1.CreateOptions{})
    // We expect this might fail if it already exists, so we should handle that.
    // In a real controller, you would use CreateOrUpdate logic.
	return err
}

// cleanupPreviousCanary ensures a clean state before starting a new rollout.
func (c *Controller) cleanupPreviousCanary(ctx context.Context, logger *slog.Logger, target Target) error {
	logger.Info("Cleaning up previous canary state")
	
	// Remove canary labels from all pods for the app
	podSelector := labels.Set{"app": target.AppName}.AsSelector()
	podList, err := c.kubeClient.CoreV1().Pods(target.Namespace).List(ctx, metav1.ListOptions{LabelSelector: podSelector.String()})
	if err != nil {
		return err
	}
	for _, pod := range podList.Items {
		if _, ok := pod.Labels[CanaryLabelKey]; ok {
			updatedPod := pod.DeepCopy()
			delete(updatedPod.Labels, CanaryLabelKey)
			_, err := c.kubeClient.CoreV1().Pods(target.Namespace).Update(ctx, updatedPod, metav1.UpdateOptions{})
			if err != nil {
				logger.Warn("Failed to remove canary label from pod", "pod", pod.Name, "error", err)
				// Continue cleanup effort
			}
		}
	}
	
	// Delete all AuthorizationPolicies managed by this controller for the app
	policySelector := labels.Set{ManagedByLabel: "true", "app": target.AppName}.AsSelector()
	err = c.istioClient.SecurityV1beta1().AuthorizationPolicies(target.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{
		LabelSelector: policySelector.String(),
	})
	if err != nil {
		return fmt.Errorf("failed to delete old authorization policies: %w", err)
	}

	logger.Info("Cleanup complete")
	return nil
}

The pitfall here is state management. The controller as written is largely stateless, recalculating everything from the cluster state and the incoming message. This works for simple START commands but becomes insufficient for multi-stage PROMOTE/ROLLBACK workflows. A production version would need to persist the “old” policy definition somewhere, perhaps in a ConfigMap or a Custom Resource Definition (CRD), so it can be reapplied to stable pods or during a rollback. The applyStablePolicy function is overly simplistic; it just adds a deny-all policy. The correct approach would be to create a second policy with the previous rules, targeting pods that do not have the canary label.

Lingering Issues and Future Iterations

This implementation provides a solid foundation for canarying IAM policies, but it’s not a complete production system. Several aspects require further development.

First, the observability loop is manual. The system successfully segments the pods and applies the policies, but an operator still needs to watch dashboards (e.g., Grafana for Istio metrics) to check for an increase in 403 Forbidden responses from the canary group. A more advanced system would integrate with a metrics provider like Prometheus. The controller could query Prometheus after a set period and, if a predefined SLO for error rates is breached, automatically trigger a ROLLBACK event.

Second, the rollback mechanism is not fully implemented. We need to define ROLLBACK and PROMOTE commands. A ROLLBACK would delete the canary policy and remove the labels. A PROMOTE would update the stable policy to match the canary one, then clean up the canary resources. This requires careful state management to know what the “canary” and “stable” policies are at any given time. This is where a dedicated CRD would be superior to a simple message-based system.

Finally, the current pod selection is naive (first N pods). In a real-world project, this is insufficient. A better approach might be to integrate with a progressive delivery tool like Flagger, which can manage the canary pod set and traffic shifting, while our controller focuses purely on syncing the correct IAM policy to the state managed by Flagger. This would separate concerns, allowing each component to do what it does best. The core principle of decoupling the policy rollout from the application rollout, however, remains a powerful technique for reducing operational risk.


  TOC