The initial system architecture was straightforward: a suite of Scala microservices, built with Akka HTTP, serving RESTful APIs and deployed as containers on Azure Kubernetes Service (AKS). For a while, this worked. Then, a bug in a downstream client caused it to enter a tight retry loop, hammering one of our non-critical but computationally expensive endpoints. The AKS Horizontal Pod Autoscaler kicked in, but not before the surge in latency cascaded, impacting other, more critical services sharing the same node pools. The immediate fix was a crude in-memory request counter, but we all knew that was a temporary patch. In a distributed environment like AKS with multiple pods, an in-memory solution is worse than useless—it provides a false sense of security, as the actual rate limit is N * limit
, where N
is the number of pods. The real problem required a distributed, consistent, and low-latency state store.
Our first thought was to build a proper rate-limiting service. This idea was quickly discarded. Introducing another gRPC or REST service just to manage rate limits would add significant operational overhead and a new point of failure. The core requirement was a shared atomic counter. This immediately brought Redis to the forefront. Its single-threaded nature, atomic commands like INCR
, and support for key expiration (TTL) make it a near-perfect fit for this use case. We didn’t need the durability of a relational database, and the latency of hitting a disk-backed store for every API request was unacceptable. Azure Cache for Redis provided a managed, low-latency option within our existing VNet, making the choice clear.
The next decision was the algorithm. The simplest is a Fixed Window Counter. You define a window (e.g., 60 seconds) and increment a counter for each request. If the counter exceeds the limit, you reject. The problem is the edge condition: a user could make limit
requests at second 59 and another limit
requests at second 60, effectively doubling their allowance over a very short period. This burstiness was precisely what we needed to prevent. The most accurate alternative is the Sliding Window Log, which stores a timestamp for every request. It’s perfectly accurate but memory-intensive, requiring a Redis Sorted Set for each user, which felt like overkill for our needs.
The pragmatic middle ground is the approximate Sliding Window Counter algorithm. It offers a good balance of accuracy and efficiency. The approach requires tracking only two counters for each user: one for the current time window and one for the previous. The rate is calculated by weighting the count from the previous window based on how much of it still overlaps with the current one-second sliding window. This avoids the edge-of-window bursts of the fixed-window approach without the storage overhead of the log-based one. This was our chosen path.
Setting the Foundation: Project Structure and Dependencies
We started by setting up our Scala project with Akka HTTP and a reliable Redis client. For a modern Scala stack, we opted for redis4cats
, a purely functional, non-blocking client built on Lettuce and Cats Effect. This fits seamlessly into the asynchronous nature of Akka HTTP.
Here is the build.sbt
file defining our dependencies. In a real-world project, versions would be managed more rigorously, but this illustrates the core components.
// build.sbt
val AkkaVersion = "2.6.20"
val AkkaHttpVersion = "10.2.10"
val CatsEffectVersion = "3.4.8"
val Redis4CatsVersion = "1.3.0"
val LogbackVersion = "1.4.5"
lazy val root = (project in file("."))
.settings(
name := "distributed-rate-limiter",
version := "0.1.0-SNAPSHOT",
scalaVersion := "2.13.10",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"org.typelevel" %% "cats-effect" % CatsEffectVersion,
"dev.profunktor" %% "redis4cats-effects" % Redis4CatsVersion,
"dev.profunktor" %% "redis4cats-log4cats" % Redis4CatsVersion,
"ch.qos.logback" % "logback-classic" % LogbackVersion,
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
// For configuration
"com.github.pureconfig" %% "pureconfig" % "0.17.2",
// Testing
"com.typesafe.akka" %% "akka-http-testkit" % AkkaHttpVersion % Test,
"org.scalatest" %% "scalatest" % "3.2.15" % Test
)
)
Configuration is managed via application.conf
using pureconfig
for type-safe loading. This allows us to separate runtime parameters from the code, which is critical for deployments across different environments (dev, staging, prod) on AKS.
// src/main/resources/application.conf
app {
host = "0.0.0.0"
port = 8080
redis {
uri = "redis://localhost:6379" // This would be overridden by env vars in AKS
}
rate-limiter {
limit = 100 // requests
window-seconds = 60 // per minute
enabled = true
}
}
We define a case class structure to load this configuration in a type-safe manner.
// src/main/scala/com/example/config/AppConfig.scala
package com.example.config
import pureconfig.ConfigSource
import pureconfig.generic.auto._
case class RedisConfig(uri: String)
case class RateLimiterConfig(limit: Int, windowSeconds: Int, enabled: Boolean)
case class AppConfig(redis: RedisConfig, rateLimiter: RateLimiterConfig)
object AppConfig {
def load(): AppConfig = ConfigSource.default.at("app").loadOrThrow[AppConfig]
}
The Core Logic: An Asynchronous and Atomic Rate Limiter
The heart of the solution is the RateLimiter
service. Its primary responsibility is to encapsulate the interaction with Redis and implement the sliding window algorithm. The key design considerations were asynchronicity and atomicity. Every call to Redis is a network call; blocking the request-processing thread would be disastrous for performance. Using cats.effect.IO
and redis4cats
ensures that all I/O is non-blocking.
Atomicity is even more critical. A naive implementation might perform a GET
on the counters, calculate the rate in the application, and then perform an INCR
if the limit is not reached. This introduces a severe race condition in a high-concurrency environment. Between the GET
and the INCR
, another request could read the same stale value, leading to the limit being exceeded. The entire sequence of operations must be atomic. Redis provides two primary ways to achieve this: MULTI
/EXEC
transactions or Lua scripting. For this logic, a transaction block is sufficient and easier to reason about.
Here is the implementation of the DistributedRateLimiter
.
// src/main/scala/com/example/limiter/DistributedRateLimiter.scala
package com.example.limiter
import cats.effect._
import cats.syntax.all._
import com.example.config.RateLimiterConfig
import com.typesafe.scalalogging.LazyLogging
import dev.profunktor.redis4cats.RedisCommands
import dev.profunktor.redis4cats.effects.Expire
import java.time.Instant
trait RateLimiter[F[_]] {
def isAllowed(key: String): F[Boolean]
}
object DistributedRateLimiter {
// A simple value class for clarity
case class RequestCount(value: Long) extends AnyVal
def apply[F[_]: Async](
redis: RedisCommands[F, String, String],
config: RateLimiterConfig
): RateLimiter[F] = new RateLimiter[F] with LazyLogging {
private val limit: Long = config.limit.toLong
private val windowMillis: Long = config.windowSeconds * 1000L
override def isAllowed(key: String): F[Boolean] = {
if (!config.enabled) {
// If disabled in config, always allow.
true.pure[F]
} else {
for {
now <- Sync[F].delay(Instant.now().toEpochMilli)
currentWindow = now / windowMillis
// Define keys for the current and previous time windows
currentKey = s"ratelimit:$key:$currentWindow"
previousKey = s"ratelimit:$key:${currentWindow - 1}"
// The core logic is executed within a Redis transaction (MULTI/EXEC)
// to ensure atomicity. This prevents race conditions under high load.
results <- redis.multi.use { multi =>
val commands = (
// 1. Increment the counter for the current window.
multi.incr(currentKey),
// 2. Set an expiry on the current key, slightly longer than two windows
// to ensure the previous window's data is available for the calculation.
// This is crucial for cleanup and preventing memory leaks in Redis.
multi.expire(currentKey, Expire(config.windowSeconds * 2)),
// 3. Get the count from the previous window.
multi.get(previousKey)
)
commands.tupled // Combine the commands to be executed in the transaction
}
result <- handleTransactionResult(now, currentWindow, results)
} yield result
}
}
private def handleTransactionResult(
now: Long,
currentWindow: Long,
results: (Long, Boolean, Option[String])
): F[Boolean] = {
val (currentCount, _, previousCountOpt) = results
val previousCount = previousCountOpt.flatMap(_.toLongOption).getOrElse(0L)
// Calculate the weight of the previous window in the current sliding frame.
// This is the core of the approximate sliding window algorithm.
val overlapMillis = now % windowMillis
val previousWindowWeight = (windowMillis - overlapMillis).toDouble / windowMillis
val weightedPreviousCount = (previousCount * previousWindowWeight).toLong
val totalCount = currentCount + weightedPreviousCount
logger.debug(s"Rate check: currentCount=$currentCount, previousCount=$previousCount, weight=$previousWindowWeight, total=$totalCount, limit=$limit")
if (totalCount <= limit) {
true.pure[F]
} else {
// A potential optimization: if the limit is exceeded, we could DECR the counter.
// However, this adds complexity and another Redis command. For now, we accept
// that a rejected request still consumes a slot. This is a common trade-off.
false.pure[F]
}
}
}
}
Integrating with Akka HTTP via a Custom Directive
With the core logic in place, the next step was to integrate it cleanly into our REST API layer. Akka HTTP’s Directive model is perfect for this. Directives are composable building blocks that can wrap an inner route, adding behavior like authentication, header extraction, or, in our case, rate limiting.
We created a RateLimiterDirectives
trait that can be mixed into our route definitions. It defines a rateLimit
directive that extracts a client identifier (e.g., the remote IP address) and invokes our DistributedRateLimiter
. If the request is allowed, the inner route is executed. If not, the directive rejects the request with a 429 Too Many Requests
status code and informative headers like X-RateLimit-Limit
.
// src/main/scala/com/example/api/RateLimiterDirectives.scala
package com.example.api
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.{RemoteAddress, StatusCodes}
import akka.http.scaladsl.server.Directive0
import akka.http.scaladsl.server.Directives._
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import com.example.config.RateLimiterConfig
import com.example.limiter.RateLimiter
import scala.util.{Failure, Success}
trait RateLimiterDirectives {
// This runtime is needed to bridge the world of cats-effect IO
// with Akka HTTP's Future-based world.
implicit val runtime: IORuntime
def rateLimiter: RateLimiter[IO]
def rateLimiterConfig: RateLimiterConfig
def getClientIdentifier: Directive1[String] = {
// In a real-world scenario, this might extract an API key from a header
// or a user ID from a JWT. For this example, we use the client's IP address.
// A pitfall here is that clients behind a NAT will share the same IP and thus the same limit.
extractClientIP.map(_.toOption.map(_.getHostAddress).getOrElse("unknown"))
}
def rateLimit: Directive0 = {
getClientIdentifier.flatMap { identifier =>
val isAllowedIO = rateLimiter.isAllowed(identifier)
// onComplete is the bridge from an async operation to a directive
onComplete(isAllowedIO.unsafeToFuture()) {
case Success(true) =>
// Request is allowed, proceed to the inner route.
// We add headers to inform the client about their current state.
respondWithHeaders(
RawHeader("X-RateLimit-Limit", rateLimiterConfig.limit.toString),
) & pass
case Success(false) =>
// Request is denied. Reject with a 429 status code.
complete(
StatusCodes.TooManyRequests,
s"Rate limit of ${rateLimiterConfig.limit} requests per ${rateLimiterConfig.windowSeconds} seconds exceeded."
)
case Failure(ex) =>
// A failure to connect to Redis should not bring down the entire API.
// The pragmatic choice is to "fail open" and allow the request, but log the error.
// "Failing closed" (rejecting the request) could cause a complete outage if Redis is down.
// This is a critical architectural decision.
extractRequest { req =>
logRequest(s"rate-limit-check-failed-for-${req.uri}", akka.event.Logging.ErrorLevel) {
System.err.println(s"Rate limiter failed: ${ex.getMessage}")
pass // Fail open
}
}
}
}
}
}
Assembling the API and Deployment to AKS
Now we can wire everything together in our main application object. This involves initializing the ActorSystem
, loading the config, creating the Redis connection pool, instantiating our rate limiter, and defining the routes that use the new directive.
// src/main/scala/com/example/Main.scala
package com.example
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.unsafe.IORuntime
import com.example.api.RateLimiterDirectives
import com.example.config.{AppConfig, RateLimiterConfig}
import com.example.limiter.{DistributedRateLimiter, RateLimiter}
import com.typesafe.scalalogging.LazyLogging
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import scala.util.{Failure, Success}
object Main extends App with LazyLogging {
implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "RateLimiterSystem")
implicit val executionContext = system.executionContext
implicit val runtime: IORuntime = IORuntime.global
val appConfig = AppConfig.load()
// Use cats.effect.Resource for managing the Redis connection lifecycle.
// This ensures the connection is properly closed on application shutdown.
implicit val log: Log[IO] = new Log[IO] {
def info(msg: => String): IO[Unit] = IO(logger.info(msg))
def error(msg: => String): IO[Unit] = IO(logger.error(msg))
def warn(msg: => String): IO[Unit] = IO(logger.warn(msg))
}
val redisResource: Resource[IO, RedisCommands[IO, String, String]] =
Redis[IO].utf8(appConfig.redis.uri)
redisResource.use { redisCmd =>
val limiter = DistributedRateLimiter(redisCmd, appConfig.rateLimiter)
val apiRoutes = new ApiRoutes(limiter, appConfig.rateLimiter)
val bindingFuture = Http().newServerAt(appConfig.host, appConfig.port).bind(apiRoutes.routes)
logger.info(s"Server online at http://${appConfig.host}:${appConfig.port}/")
bindingFuture.onComplete {
case Success(_) => logger.info("Server binding successful.")
case Failure(ex) =>
logger.error(s"Server binding failed: ${ex.getMessage}")
system.terminate()
}
// In a real app, we'd wait for a shutdown signal.
// For this example, we keep it running.
IO.never
}.unsafeRunSync()
}
class ApiRoutes(
val rateLimiter: RateLimiter[IO],
val rateLimiterConfig: RateLimiterConfig
)(implicit val runtime: IORuntime) extends RateLimiterDirectives {
val routes: Route =
path("status") {
get {
complete("OK")
}
} ~
pathPrefix("data") {
// The rateLimit directive is applied here.
// Any request to /data/* will be subject to the limit.
rateLimit {
path("public") {
get {
complete("This is a public resource, but it is rate-limited.")
}
} ~
path("sensitive") {
get {
complete("This is a sensitive resource, also rate-limited.")
}
}
}
}
}
The final pieces are the Dockerfile
and Kubernetes deployment manifests for AKS.
# Dockerfile
FROM eclipse-temurin:11-jre-focal
WORKDIR /app
# Copy the fat JAR from the sbt build stage
COPY target/scala-2.13/distributed-rate-limiter-assembly-0.1.0-SNAPSHOT.jar app.jar
EXPOSE 8080
# The REDIS_URI env var will be configured in the Kubernetes deployment
CMD ["java", "-jar", "app.jar", "-Dapp.redis.uri=${REDIS_URI}"]
The Kubernetes deployment manifest specifies three replicas to demonstrate the distributed nature of the problem and solution. The REDIS_URI
is injected from a ConfigMap
or Secret
, pointing to our Azure Cache for Redis instance.
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: scala-api-deployment
spec:
replicas: 3
selector:
matchLabels:
app: scala-api
template:
metadata:
labels:
app: scala-api
spec:
containers:
- name: scala-api
image: youracr.azurecr.io/distributed-rate-limiter:latest
ports:
- containerPort: 8080
env:
- name: REDIS_URI
valueFrom:
secretKeyRef:
name: redis-secrets
key: connectionString
resources:
requests:
cpu: "250m"
memory: "512Mi"
limits:
cpu: "500m"
memory: "1Gi"
---
apiVersion: v1
kind: Service
metadata:
name: scala-api-service
spec:
type: LoadBalancer
ports:
- port: 80
targetPort: 8080
selector:
app: scala-api
This setup, once deployed to AKS, provides a scalable, resilient API layer protected by a distributed rate limiter. A simple test using a shell loop like for i in {1..12}; do curl -I http://<your-service-ip>/data/public; done
would show the first 10 requests succeeding and the subsequent ones receiving a HTTP/1.1 429 Too Many Requests
response, regardless of which pod serves the request.
graph TD subgraph Azure Cloud subgraph AKS Cluster LB(Load Balancer) P1(Pod 1: Scala App) P2(Pod 2: Scala App) P3(Pod 3: Scala App) end subgraph VNet ACR[Azure Cache for Redis] end end Client -- Request --> LB LB -- Distributes Traffic --> P1 LB -- Distributes Traffic --> P2 LB -- Distributes Traffic --> P3 P1 -- INCR/GET --> ACR P2 -- INCR/GET --> ACR P3 -- INCR/GET --> ACR
While this solution effectively solved our immediate problem, it’s not without its own set of trade-offs and potential future enhancements. The choice to “fail open” on Redis failure is a critical policy decision; in a high-security context, “failing closed” might be preferable, but that requires a highly available Redis cluster (e.g., using Sentinel) to be viable. The current implementation also places the burden of rate limiting entirely on the application pods. For a very large-scale system, this logic could be offloaded to an API Gateway or a service mesh sidecar (like Envoy), which provides sophisticated rate-limiting capabilities out of the box, though often with less customizability in the algorithm itself. Finally, the rate limits are static. A more advanced system might adjust limits dynamically based on overall system health or user subscription tiers, pulling configuration from a service like Azure App Configuration.