Implementing a Distributed Sliding Window Rate Limiter for a Scala API on Azure AKS with Redis


The initial system architecture was straightforward: a suite of Scala microservices, built with Akka HTTP, serving RESTful APIs and deployed as containers on Azure Kubernetes Service (AKS). For a while, this worked. Then, a bug in a downstream client caused it to enter a tight retry loop, hammering one of our non-critical but computationally expensive endpoints. The AKS Horizontal Pod Autoscaler kicked in, but not before the surge in latency cascaded, impacting other, more critical services sharing the same node pools. The immediate fix was a crude in-memory request counter, but we all knew that was a temporary patch. In a distributed environment like AKS with multiple pods, an in-memory solution is worse than useless—it provides a false sense of security, as the actual rate limit is N * limit, where N is the number of pods. The real problem required a distributed, consistent, and low-latency state store.

Our first thought was to build a proper rate-limiting service. This idea was quickly discarded. Introducing another gRPC or REST service just to manage rate limits would add significant operational overhead and a new point of failure. The core requirement was a shared atomic counter. This immediately brought Redis to the forefront. Its single-threaded nature, atomic commands like INCR, and support for key expiration (TTL) make it a near-perfect fit for this use case. We didn’t need the durability of a relational database, and the latency of hitting a disk-backed store for every API request was unacceptable. Azure Cache for Redis provided a managed, low-latency option within our existing VNet, making the choice clear.

The next decision was the algorithm. The simplest is a Fixed Window Counter. You define a window (e.g., 60 seconds) and increment a counter for each request. If the counter exceeds the limit, you reject. The problem is the edge condition: a user could make limit requests at second 59 and another limit requests at second 60, effectively doubling their allowance over a very short period. This burstiness was precisely what we needed to prevent. The most accurate alternative is the Sliding Window Log, which stores a timestamp for every request. It’s perfectly accurate but memory-intensive, requiring a Redis Sorted Set for each user, which felt like overkill for our needs.

The pragmatic middle ground is the approximate Sliding Window Counter algorithm. It offers a good balance of accuracy and efficiency. The approach requires tracking only two counters for each user: one for the current time window and one for the previous. The rate is calculated by weighting the count from the previous window based on how much of it still overlaps with the current one-second sliding window. This avoids the edge-of-window bursts of the fixed-window approach without the storage overhead of the log-based one. This was our chosen path.

Setting the Foundation: Project Structure and Dependencies

We started by setting up our Scala project with Akka HTTP and a reliable Redis client. For a modern Scala stack, we opted for redis4cats, a purely functional, non-blocking client built on Lettuce and Cats Effect. This fits seamlessly into the asynchronous nature of Akka HTTP.

Here is the build.sbt file defining our dependencies. In a real-world project, versions would be managed more rigorously, but this illustrates the core components.

// build.sbt

val AkkaVersion = "2.6.20"
val AkkaHttpVersion = "10.2.10"
val CatsEffectVersion = "3.4.8"
val Redis4CatsVersion = "1.3.0"
val LogbackVersion = "1.4.5"

lazy val root = (project in file("."))
  .settings(
    name := "distributed-rate-limiter",
    version := "0.1.0-SNAPSHOT",
    scalaVersion := "2.13.10",
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
      "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
      "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
      "org.typelevel" %% "cats-effect" % CatsEffectVersion,
      "dev.profunktor" %% "redis4cats-effects" % Redis4CatsVersion,
      "dev.profunktor" %% "redis4cats-log4cats" % Redis4CatsVersion,
      "ch.qos.logback" % "logback-classic" % LogbackVersion,
      "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
      // For configuration
      "com.github.pureconfig" %% "pureconfig" % "0.17.2",
      // Testing
      "com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion % Test,
      "org.scalatest" %% "scalatest" % "3.2.15" % Test
    )
  )

Configuration is managed via application.conf using pureconfig for type-safe loading. This allows us to separate runtime parameters from the code, which is critical for deployments across different environments (dev, staging, prod) on AKS.

// src/main/resources/application.conf

app {
  host = "0.0.0.0"
  port = 8080

  redis {
    uri = "redis://localhost:6379" // This would be overridden by env vars in AKS
  }

  rate-limiter {
    limit = 100 // requests
    window-seconds = 60 // per minute
    enabled = true
  }
}

We define a case class structure to load this configuration in a type-safe manner.

// src/main/scala/com/example/config/AppConfig.scala

package com.example.config

import pureconfig.ConfigSource
import pureconfig.generic.auto._

case class RedisConfig(uri: String)
case class RateLimiterConfig(limit: Int, windowSeconds: Int, enabled: Boolean)
case class AppConfig(redis: RedisConfig, rateLimiter: RateLimiterConfig)

object AppConfig {
  def load(): AppConfig = ConfigSource.default.at("app").loadOrThrow[AppConfig]
}

The Core Logic: An Asynchronous and Atomic Rate Limiter

The heart of the solution is the RateLimiter service. Its primary responsibility is to encapsulate the interaction with Redis and implement the sliding window algorithm. The key design considerations were asynchronicity and atomicity. Every call to Redis is a network call; blocking the request-processing thread would be disastrous for performance. Using cats.effect.IO and redis4cats ensures that all I/O is non-blocking.

Atomicity is even more critical. A naive implementation might perform a GET on the counters, calculate the rate in the application, and then perform an INCR if the limit is not reached. This introduces a severe race condition in a high-concurrency environment. Between the GET and the INCR, another request could read the same stale value, leading to the limit being exceeded. The entire sequence of operations must be atomic. Redis provides two primary ways to achieve this: MULTI/EXEC transactions or Lua scripting. For this logic, a transaction block is sufficient and easier to reason about.

Here is the implementation of the DistributedRateLimiter.

// src/main/scala/com/example/limiter/DistributedRateLimiter.scala

package com.example.limiter

import cats.effect._
import cats.syntax.all._
import com.example.config.RateLimiterConfig
import com.typesafe.scalalogging.LazyLogging
import dev.profunktor.redis4cats.RedisCommands
import dev.profunktor.redis4cats.effects.Expire

import java.time.Instant

trait RateLimiter[F[_]] {
  def isAllowed(key: String): F[Boolean]
}

object DistributedRateLimiter {

  // A simple value class for clarity
  case class RequestCount(value: Long) extends AnyVal

  def apply[F[_]: Async](
    redis: RedisCommands[F, String, String],
    config: RateLimiterConfig
  ): RateLimiter[F] = new RateLimiter[F] with LazyLogging {

    private val limit: Long = config.limit.toLong
    private val windowMillis: Long = config.windowSeconds * 1000L

    override def isAllowed(key: String): F[Boolean] = {
      if (!config.enabled) {
        // If disabled in config, always allow.
        true.pure[F]
      } else {
        for {
          now <- Sync[F].delay(Instant.now().toEpochMilli)
          currentWindow = now / windowMillis
          
          // Define keys for the current and previous time windows
          currentKey = s"ratelimit:$key:$currentWindow"
          previousKey = s"ratelimit:$key:${currentWindow - 1}"

          // The core logic is executed within a Redis transaction (MULTI/EXEC)
          // to ensure atomicity. This prevents race conditions under high load.
          results <- redis.multi.use { multi =>
            val commands = (
              // 1. Increment the counter for the current window.
              multi.incr(currentKey),
              // 2. Set an expiry on the current key, slightly longer than two windows
              //    to ensure the previous window's data is available for the calculation.
              //    This is crucial for cleanup and preventing memory leaks in Redis.
              multi.expire(currentKey, Expire(config.windowSeconds * 2)),
              // 3. Get the count from the previous window.
              multi.get(previousKey)
            )
            commands.tupled // Combine the commands to be executed in the transaction
          }
          
          result <- handleTransactionResult(now, currentWindow, results)
        } yield result
      }
    }

    private def handleTransactionResult(
      now: Long,
      currentWindow: Long,
      results: (Long, Boolean, Option[String])
    ): F[Boolean] = {
      val (currentCount, _, previousCountOpt) = results
      
      val previousCount = previousCountOpt.flatMap(_.toLongOption).getOrElse(0L)

      // Calculate the weight of the previous window in the current sliding frame.
      // This is the core of the approximate sliding window algorithm.
      val overlapMillis = now % windowMillis
      val previousWindowWeight = (windowMillis - overlapMillis).toDouble / windowMillis

      val weightedPreviousCount = (previousCount * previousWindowWeight).toLong

      val totalCount = currentCount + weightedPreviousCount

      logger.debug(s"Rate check: currentCount=$currentCount, previousCount=$previousCount, weight=$previousWindowWeight, total=$totalCount, limit=$limit")

      if (totalCount <= limit) {
        true.pure[F]
      } else {
        // A potential optimization: if the limit is exceeded, we could DECR the counter.
        // However, this adds complexity and another Redis command. For now, we accept
        // that a rejected request still consumes a slot. This is a common trade-off.
        false.pure[F]
      }
    }
  }
}

Integrating with Akka HTTP via a Custom Directive

With the core logic in place, the next step was to integrate it cleanly into our REST API layer. Akka HTTP’s Directive model is perfect for this. Directives are composable building blocks that can wrap an inner route, adding behavior like authentication, header extraction, or, in our case, rate limiting.

We created a RateLimiterDirectives trait that can be mixed into our route definitions. It defines a rateLimit directive that extracts a client identifier (e.g., the remote IP address) and invokes our DistributedRateLimiter. If the request is allowed, the inner route is executed. If not, the directive rejects the request with a 429 Too Many Requests status code and informative headers like X-RateLimit-Limit.

// src/main/scala/com/example/api/RateLimiterDirectives.scala

package com.example.api

import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.{RemoteAddress, StatusCodes}
import akka.http.scaladsl.server.Directive0
import akka.http.scaladsl.server.Directives._
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import com.example.config.RateLimiterConfig
import com.example.limiter.RateLimiter

import scala.util.{Failure, Success}

trait RateLimiterDirectives {

  // This runtime is needed to bridge the world of cats-effect IO
  // with Akka HTTP's Future-based world.
  implicit val runtime: IORuntime

  def rateLimiter: RateLimiter[IO]
  def rateLimiterConfig: RateLimiterConfig

  def getClientIdentifier: Directive1[String] = {
    // In a real-world scenario, this might extract an API key from a header
    // or a user ID from a JWT. For this example, we use the client's IP address.
    // A pitfall here is that clients behind a NAT will share the same IP and thus the same limit.
    extractClientIP.map(_.toOption.map(_.getHostAddress).getOrElse("unknown"))
  }

  def rateLimit: Directive0 = {
    getClientIdentifier.flatMap { identifier =>
      val isAllowedIO = rateLimiter.isAllowed(identifier)
      
      // onComplete is the bridge from an async operation to a directive
      onComplete(isAllowedIO.unsafeToFuture()) {
        case Success(true) =>
          // Request is allowed, proceed to the inner route.
          // We add headers to inform the client about their current state.
          respondWithHeaders(
            RawHeader("X-RateLimit-Limit", rateLimiterConfig.limit.toString),
          ) & pass
        case Success(false) =>
          // Request is denied. Reject with a 429 status code.
          complete(
            StatusCodes.TooManyRequests,
            s"Rate limit of ${rateLimiterConfig.limit} requests per ${rateLimiterConfig.windowSeconds} seconds exceeded."
          )
        case Failure(ex) =>
          // A failure to connect to Redis should not bring down the entire API.
          // The pragmatic choice is to "fail open" and allow the request, but log the error.
          // "Failing closed" (rejecting the request) could cause a complete outage if Redis is down.
          // This is a critical architectural decision.
          extractRequest { req =>
            logRequest(s"rate-limit-check-failed-for-${req.uri}", akka.event.Logging.ErrorLevel) {
              System.err.println(s"Rate limiter failed: ${ex.getMessage}")
              pass // Fail open
            }
          }
      }
    }
  }
}

Assembling the API and Deployment to AKS

Now we can wire everything together in our main application object. This involves initializing the ActorSystem, loading the config, creating the Redis connection pool, instantiating our rate limiter, and defining the routes that use the new directive.

// src/main/scala/com/example/Main.scala

package com.example

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.unsafe.IORuntime
import com.example.api.RateLimiterDirectives
import com.example.config.{AppConfig, RateLimiterConfig}
import com.example.limiter.{DistributedRateLimiter, RateLimiter}
import com.typesafe.scalalogging.LazyLogging
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.{Redis, RedisCommands}

import scala.util.{Failure, Success}

object Main extends App with LazyLogging {

  implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "RateLimiterSystem")
  implicit val executionContext = system.executionContext
  implicit val runtime: IORuntime = IORuntime.global

  val appConfig = AppConfig.load()

  // Use cats.effect.Resource for managing the Redis connection lifecycle.
  // This ensures the connection is properly closed on application shutdown.
  implicit val log: Log[IO] = new Log[IO] {
    def info(msg: => String): IO[Unit] = IO(logger.info(msg))
    def error(msg: => String): IO[Unit] = IO(logger.error(msg))
    def warn(msg: => String): IO[Unit] = IO(logger.warn(msg))
  }
  
  val redisResource: Resource[IO, RedisCommands[IO, String, String]] =
    Redis[IO].utf8(appConfig.redis.uri)

  redisResource.use { redisCmd =>
    val limiter = DistributedRateLimiter(redisCmd, appConfig.rateLimiter)
    val apiRoutes = new ApiRoutes(limiter, appConfig.rateLimiter)
    
    val bindingFuture = Http().newServerAt(appConfig.host, appConfig.port).bind(apiRoutes.routes)

    logger.info(s"Server online at http://${appConfig.host}:${appConfig.port}/")

    bindingFuture.onComplete {
      case Success(_) => logger.info("Server binding successful.")
      case Failure(ex) =>
        logger.error(s"Server binding failed: ${ex.getMessage}")
        system.terminate()
    }
    
    // In a real app, we'd wait for a shutdown signal.
    // For this example, we keep it running.
    IO.never
  }.unsafeRunSync()

}

class ApiRoutes(
  val rateLimiter: RateLimiter[IO],
  val rateLimiterConfig: RateLimiterConfig
)(implicit val runtime: IORuntime) extends RateLimiterDirectives {

  val routes: Route =
    path("status") {
      get {
        complete("OK")
      }
    } ~
    pathPrefix("data") {
      // The rateLimit directive is applied here.
      // Any request to /data/* will be subject to the limit.
      rateLimit {
        path("public") {
          get {
            complete("This is a public resource, but it is rate-limited.")
          }
        } ~
        path("sensitive") {
          get {
            complete("This is a sensitive resource, also rate-limited.")
          }
        }
      }
    }
}

The final pieces are the Dockerfile and Kubernetes deployment manifests for AKS.

# Dockerfile
FROM eclipse-temurin:11-jre-focal

WORKDIR /app

# Copy the fat JAR from the sbt build stage
COPY target/scala-2.13/distributed-rate-limiter-assembly-0.1.0-SNAPSHOT.jar app.jar

EXPOSE 8080

# The REDIS_URI env var will be configured in the Kubernetes deployment
CMD ["java", "-jar", "app.jar", "-Dapp.redis.uri=${REDIS_URI}"]

The Kubernetes deployment manifest specifies three replicas to demonstrate the distributed nature of the problem and solution. The REDIS_URI is injected from a ConfigMap or Secret, pointing to our Azure Cache for Redis instance.

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: scala-api-deployment
spec:
  replicas: 3
  selector:
    matchLabels:
      app: scala-api
  template:
    metadata:
      labels:
        app: scala-api
    spec:
      containers:
      - name: scala-api
        image: youracr.azurecr.io/distributed-rate-limiter:latest
        ports:
        - containerPort: 8080
        env:
        - name: REDIS_URI
          valueFrom:
            secretKeyRef:
              name: redis-secrets
              key: connectionString
        resources:
          requests:
            cpu: "250m"
            memory: "512Mi"
          limits:
            cpu: "500m"
            memory: "1Gi"
---
apiVersion: v1
kind: Service
metadata:
  name: scala-api-service
spec:
  type: LoadBalancer
  ports:
  - port: 80
    targetPort: 8080
  selector:
    app: scala-api

This setup, once deployed to AKS, provides a scalable, resilient API layer protected by a distributed rate limiter. A simple test using a shell loop like for i in {1..12}; do curl -I http://<your-service-ip>/data/public; done would show the first 10 requests succeeding and the subsequent ones receiving a HTTP/1.1 429 Too Many Requests response, regardless of which pod serves the request.

graph TD
    subgraph Azure Cloud
        subgraph AKS Cluster
            LB(Load Balancer)
            P1(Pod 1: Scala App)
            P2(Pod 2: Scala App)
            P3(Pod 3: Scala App)
        end
        subgraph VNet
            ACR[Azure Cache for Redis]
        end
    end

    Client -- Request --> LB
    LB -- Distributes Traffic --> P1
    LB -- Distributes Traffic --> P2
    LB -- Distributes Traffic --> P3
    
    P1 -- INCR/GET --> ACR
    P2 -- INCR/GET --> ACR
    P3 -- INCR/GET --> ACR

While this solution effectively solved our immediate problem, it’s not without its own set of trade-offs and potential future enhancements. The choice to “fail open” on Redis failure is a critical policy decision; in a high-security context, “failing closed” might be preferable, but that requires a highly available Redis cluster (e.g., using Sentinel) to be viable. The current implementation also places the burden of rate limiting entirely on the application pods. For a very large-scale system, this logic could be offloaded to an API Gateway or a service mesh sidecar (like Envoy), which provides sophisticated rate-limiting capabilities out of the box, though often with less customizability in the algorithm itself. Finally, the rate limits are static. A more advanced system might adjust limits dynamically based on overall system health or user subscription tiers, pulling configuration from a service like Azure App Configuration.


  TOC