Go-Gin 应用中基于 Redis Lua 的动态多策略API限流器实现复盘


新功能上线后用户增长曲线很漂亮,直到市场部开启了新一轮推广。数据库主节点的CPU使用率在晚高峰直接触及95%的告警线。经过紧急排查,根源锁定在一个计算密集型的查询接口上,它正被部分自动化脚本和少数高频用户无情地调用。一个健壮的API限流器,在那一刻从“技术债列表”上的一个待办项,变成了必须在下一个迭代周期内解决的P0级生产问题。

敏捷开发的核心是快速响应变化,但这也意味着初始版本往往只实现了核心业务逻辑。像限流、熔断这类非功能性需求,通常是在系统面临实际压力后才被提上日程。我们团队的第一个想法是,在Go-Gin中间件里用一个内存map[string]rate.Limiter来实现,key是用户IP。这是一个典型的、在分布式环境中会立刻失效的方案。我们的API服务是无状态、多副本部署的,本地内存限流无法在多个实例间共享状态,最终的总请求量会是单机限制的N倍(N为实例数)。

我们需要一个集中式的、高性能的、支持原子操作的存储来同步状态。Redis自然而然地成为了首选。它基于内存,网络开销极低,并且其单线程模型确保了命令执行的原子性,这对于实现计数器类的功能至关重要。

V1: 基于 INCR 的朴素实现

最初的方案非常直接:利用Redis的INCREXPIRE命令。对于每个请求,我们以IP地址为key,在Redis中递增一个计数器。如果这是窗口期内的第一个请求,我们就设置一个过期时间。

这是一个简单的Go-Gin中间件实现:

package ratelimiter

import (
	"context"
	"net/http"
	"strconv"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/redis/go-redis/v9"
)

// SimpleLimiter 是一个基础的、非生产级的限流器
type SimpleLimiter struct {
	rdb    *redis.Client
	limit  int64
	window time.Duration
}

func NewSimpleLimiter(rdb *redis.Client, limit int, window time.Duration) *SimpleLimiter {
	return &SimpleLimiter{
		rdb:    rdb,
		limit:  int64(limit),
		window: window,
	}
}

func (l *SimpleLimiter) Middleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		ctx := context.Background()
		key := "rate_limit:" + c.ClientIP()

		// 使用pipeline减少网络往返
		pipe := l.rdb.Pipeline()
		countCmd := pipe.Incr(ctx, key)
		pipe.Expire(ctx, key, l.window)
		_, err := pipe.Exec(ctx)

		if err != nil {
			// 在真实项目中,这里应该有详细的日志记录
			// 如果Redis故障,是放行还是拒绝?这是一个需要权衡的决策。
			// 通常为了服务可用性,会选择放行。
			c.Next()
			return
		}

		count := countCmd.Val()

		// 设置响应头,让客户端了解当前的限流状态
		c.Header("X-RateLimit-Limit", strconv.FormatInt(l.limit, 10))
		c.Header("X-RateLimit-Remaining", strconv.FormatInt(l.limit-count, 10))

		if count > l.limit {
			c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{"error": "Too many requests"})
			return
		}

		c.Next()
	}
}

这个V1版本很快就部署了。它确实挡住了一部分流量,但新的问题接踵而至。

  1. 竞态条件: INCREXPIRE是两个独立的操作。尽管使用了Pipeline,它们在Redis服务端依然是分两步执行。如果应用在INCR之后、EXPIRE之前崩溃,这个key就会永久存在,导致对应IP被永久封禁。
  2. 策略单一: 产品团队提出了更复杂的需求:“我们需要为VIP用户提供更高的API调用额度”,“某个特定接口需要更严格的限制,而不是全局限制”。只基于IP的单一策略已经无法满足业务需求。
  3. 配置僵化: limitwindow是硬编码或在服务启动时加载的。每次调整限流策略,我们都必须重新部署整个服务,这完全违背了敏捷的初衷。

V2: 引入 Lua 脚本解决原子性

为了解决INCREXPIRE的原子性问题,唯一可靠的方式是使用Redis Lua脚本。Lua脚本在Redis服务端被视为一个原子操作,它要么完全执行,要么完全不执行,不会出现执行到一半的状态。

我们的第一个Lua脚本如下:

-- ARGV[1]: limit (限制次数)
-- ARGV[2]: window (窗口时间,秒)
-- KEYS[1]: key (例如 rate_limit:127.0.0.1)

local current = redis.call('INCR', KEYS[1])
if current == 1 then
  redis.call('EXPIRE', KEYS[1], ARGV[2])
end

if current > tonumber(ARGV[1]) then
  return 0 -- 0 代表拒绝
else
  return 1 -- 1 代表允许
end

这个脚本将逻辑移到了服务端,完美解决了原子性问题。在Go代码中,我们需要加载并执行这个脚本。

// ... (部分代码)

// Lua脚本应该在应用启动时加载一次,获取其SHA1值,后续通过EVALSHA调用以提高效率
var rateLimitScript = redis.NewScript(`
    local current = redis.call('INCR', KEYS[1])
    if tonumber(current) == 1 then
        redis.call('EXPIRE', KEYS[1], ARGV[2])
    end
    if tonumber(current) > tonumber(ARGV[1]) then
        return 0
    else
        return 1
    end
`)

// 在中间件中调用
func (l *AdvancedLimiter) Middleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		// ...
		// 这里的 key, limit, window 需要根据策略动态获取
		key := "..." 
		limit := 100
		window := 60

		result, err := rateLimitScript.Run(ctx, l.rdb, []string{key}, limit, window).Result()
		if err != nil {
			// 日志 + 容错
			c.Next()
			return
		}

		if result.(int64) == 0 {
			c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{"error": "Rate limit exceeded"})
			return
		}

		c.Next()
	}
}

这解决了原子性问题,但策略单一和配置僵化的问题依然存在。下一个迭代,我们的目标是构建一个支持动态配置的多策略限流器。

V3: 动态多策略限流器的设计与实现

我们的目标是设计一个系统,它能够:

  1. 支持多种限流策略: 如基于用户ID、API Key、请求路径、IP地址等。
  2. 支持策略优先级: 如果一个请求同时匹配多个规则(例如,一个VIP用户的请求,既匹配用户ID规则,也匹配IP规则),应该应用最宽松的那个。
  3. 动态配置: 无需重启服务,即可实时更新所有限流规则。

数据结构设计

首先,我们定义规则的数据结构。它可以是一个JSON,存储在配置中心(如Nacos, Consul)或就是一个Redis Hash中。

{
  "rules": [
    {
      "id": "rule-vip-user",
      "priority": 100,
      "match": {
        "header": "X-User-Level",
        "value": "VIP"
      },
      "limit": 1000,
      "window": 60,
      "key_template": "user_id:{{.UserID}}"
    },
    {
      "id": "rule-post-api",
      "priority": 50,
      "match": {
        "path": "/api/v1/posts",
        "method": "POST"
      },
      "limit": 10,
      "window": 60,
      "key_template": "ip:{{.ClientIP}}"
    },
    {
      "id": "rule-default-ip",
      "priority": 1,
      "match": {},
      "limit": 100,
      "window": 60,
      "key_template": "ip:{{.ClientIP}}"
    }
  ]
}
  • priority: 数字越大,优先级越高。
  • match: 定义了匹配请求的条件。
  • key_template: 定义了在Redis中生成key的模板,{{.UserID}}{{.ClientIP}} 是占位符,会用请求的实际信息替换。

动态配置加载

我们在限流器结构体中增加一个后台goroutine,定期从Redis或配置中心拉取最新的规则集,并原子性地更新到内存中。

package ratelimiter

import (
    // ... imports
	"sync"
	"text/template"
    "bytes"
)

type Rule struct {
	ID          string            `json:"id"`
	Priority    int               `json:"priority"`
	Match       map[string]string `json:"match"`
	Limit       int64             `json:"limit"`
	Window      int64             `json:"window"` // seconds
	KeyTemplate string            `json:"key_template"`
	template    *template.Template
}

type DynamicRateLimiter struct {
	rdb              *redis.Client
	rules            []*Rule
	mu               sync.RWMutex
	configKey        string
	configPollInterval time.Duration
	luaScriptSHA     string
}

func NewDynamicRateLimiter(rdb *redis.Client, configKey string, pollInterval time.Duration) (*DynamicRateLimiter, error) {
	limiter := &DynamicRateLimiter{
		rdb:              rdb,
		configKey:        configKey,
		configPollInterval: pollInterval,
	}

	// 预加载Lua脚本
	sha, err := rateLimitScript.Load(context.Background(), rdb).Result()
	if err != nil {
		// log.Fatalf(...) 在真实应用中可能更合适
		return nil, fmt.Errorf("failed to load rate limit lua script: %w", err)
	}
	limiter.luaScriptSHA = sha

	// 首次加载配置
	if err := limiter.loadConfig(); err != nil {
		// 启动时加载失败,可以根据策略决定是panic还是允许无配置启动
		// log.Printf("Warning: initial config load failed: %v", err)
	}

	// 启动后台goroutine定期更新配置
	go limiter.configPoller()

	return limiter, nil
}

// configPoller 定期从Redis拉取配置
func (l *DynamicRateLimiter) configPoller() {
	ticker := time.NewTicker(l.configPollInterval)
	defer ticker.Stop()

	for range ticker.C {
		// 在真实项目中,应该加入日志来监控配置拉取是否成功
		_ = l.loadConfig()
	}
}

// loadConfig 从数据源加载、解析并更新规则
func (l *DynamicRateLimiter) loadConfig() error {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	configJSON, err := l.rdb.Get(ctx, l.configKey).Result()
	if err != nil {
		if err == redis.Nil {
			// 配置不存在,可能是正常的初始状态
			return nil
		}
		return fmt.Errorf("failed to get config from redis: %w", err)
	}

	var config struct {
		Rules []*Rule `json:"rules"`
	}
	if err := json.Unmarshal([]byte(configJSON), &config); err != nil {
		return fmt.Errorf("failed to unmarshal config json: %w", err)
	}
    
    // 预编译模板以提高性能
	for _, rule := range config.Rules {
		tmpl, err := template.New(rule.ID).Parse(rule.KeyTemplate)
		if err != nil {
            // log.Printf("Warning: failed to parse template for rule %s: %v", rule.ID, err)
			continue // 跳过无效规则
		}
		rule.template = tmpl
	}
    
    // 按优先级排序
	sort.Slice(config.Rules, func(i, j int) bool {
		return config.Rules[i].Priority > config.Rules[j].Priority
	})

	l.mu.Lock()
	l.rules = config.Rules
	l.mu.Unlock()

	return nil
}

核心中间件逻辑

中间件现在需要遍历所有规则,找到第一个匹配的规则,然后执行限流逻辑。

type RequestContext struct {
    UserID   string
    ClientIP string
    Path     string
    Method   string
    Header   http.Header
}

func (l *DynamicRateLimiter) Middleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		l.mu.RLock()
		rules := l.rules
		l.mu.RUnlock()

        // 构造请求上下文,用于规则匹配和模板渲染
        reqCtx := RequestContext{
            ClientIP: c.ClientIP(),
            Path:     c.FullPath(),
            Method:   c.Request.Method,
            Header:   c.Request.Header,
            UserID:   c.GetString("userID"), // 假设之前的中间件已经解析并设置了userID
        }

		// 遍历规则,找到第一个匹配的
		var matchedRule *Rule
		for _, rule := range rules {
			if l.isMatch(rule, &reqCtx) {
				matchedRule = rule
				break
			}
		}

		if matchedRule == nil {
			c.Next() // 没有匹配的规则,直接放行
			return
		}

        // 使用模板生成Redis Key
        var keyBuf bytes.Buffer
        if err := matchedRule.template.Execute(&keyBuf, reqCtx); err != nil {
            // log.Errorf("Failed to execute key template for rule %s: %v", matchedRule.ID, err)
            c.Next() // 模板执行失败,放行
            return
        }
        redisKey := "rate_limit:" + keyBuf.String()

		// 执行Lua脚本
		result, err := l.rdb.EvalSha(context.Background(), l.luaScriptSHA, []string{redisKey}, matchedRule.Limit, matchedRule.Window).Result()
		if err != nil {
			// log.Errorf("Rate limit script execution failed: %v", err)
			c.Next() // Redis异常,放行
			return
		}

		if result.(int64) == 0 {
            // 获取key的TTL来计算重试时间
			ttl, _ := l.rdb.TTL(context.Background(), redisKey).Result()
			c.Header("Retry-After", strconv.FormatInt(int64(ttl.Seconds()), 10))
			c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{"error": "rate limit exceeded", "rule_id": matchedRule.ID})
			return
		}

		c.Header("X-RateLimit-Rule", matchedRule.ID)
		c.Next()
	}
}

func (l *DynamicRateLimiter) isMatch(rule *Rule, req *RequestContext) bool {
    if len(rule.Match) == 0 {
        return true // 空match表示默认规则
    }
	for key, val := range rule.Match {
		switch key {
		case "path":
			if req.Path != val { return false }
		case "method":
			if req.Method != val { return false }
		case "header":
            // 简单的实现,实际可能需要更复杂的header匹配逻辑
            parts := strings.SplitN(val, ":", 2)
            if len(parts) != 2 || req.Header.Get(parts[0]) != parts[1] {
                return false
            }
		// 可以扩展更多匹配条件, e.g., query, body...
		default:
			return false
		}
	}
	return true
}

Mermaid.js 请求流程图

下面是整个请求处理的流程图:

sequenceDiagram
    participant Client
    participant Gin as Gin Server
    participant Middleware as RateLimiter Middleware
    participant Redis

    Client->>Gin: HTTP Request
    Gin->>Middleware: Handle Request
    Middleware->>Middleware: Lock & Get Rules
    Middleware->>Middleware: Match Request against Rules
    alt Rule Matched
        Middleware->>Middleware: Generate Redis Key from Template
        Middleware->>Redis: EVALSHA(script, key, limit, window)
        Redis-->>Middleware: 1 (Allow) or 0 (Deny)
        alt Allowed
            Middleware->>Gin: c.Next()
            Gin-->>Client: 200 OK
        else Denied
            Middleware->>Gin: c.Abort()
            Gin-->>Client: 429 Too Many Requests
        end
    else No Rule Matched
        Middleware->>Gin: c.Next()
        Gin-->>Client: 200 OK
    end

单元测试思路

对于这样一个复杂的组件,单元测试至关重要。

  1. 规则匹配逻辑 (isMatch): 这是纯逻辑,最容易测试。可以创建各种RuleRequestContext的组合,断言匹配结果是否符合预期。
  2. 模板生成: 测试key_template能否正确地从RequestContext渲染出Redis key。
  3. 中间件核心流程: 使用httptest包来模拟Gin的HTTP请求。需要一个mock的Redis客户端(例如使用go-redis/redismock),来模拟EVALSHA返回允许或拒绝的结果,然后断言HTTP响应的状态码和头部是否正确。
  4. Lua脚本: Lua脚本本身的逻辑也需要测试。可以编写一个小的Go测试函数,连接到一个测试用的Redis实例,直接执行脚本并验证其行为。

这个V3版本的限流器已经相当健壮,它解决了原子性、单一策略和配置僵化的问题,能够很好地适应敏捷开发流程中频繁变更的业务需求。在生产环境中,它稳定地保护了我们的核心服务。

局限与未来迭代方向

尽管当前的实现已经满足了需求,但它并非没有缺点。

首先,它使用的是固定窗口算法,存在临界点问题。例如,在窗口的最后一秒和下一个窗口的第一秒,用户可以发起两倍于限制的请求。更平滑的算法,如令牌桶(Token Bucket)或漏桶(Leaky Bucket),可以解决这个问题,但这会增加Lua脚本的复杂度和Redis的内存使用。

其次,动态配置目前依赖于轮询(Polling)。对于需要极低延迟配置更新的场景,可以改造为基于Redis Pub/Sub的推送(Push)模型,当配置变更时,主动通知所有服务实例刷新。

最后,可观测性有待加强。我们应该将限流的详细信息,如哪个规则被触发、被拒绝的请求数等,作为Prometheus指标暴露出去,以便进行更精细的监控和告警。这对于遵循SRE实践、量化服务可靠性至关重要。


  目录