限流算法实战:token bucket / leaky bucket / sliding window 各自怎么写

任何对外 API 都需要限流,否则一波突发流量 / 恶意刷接口能把服务打挂。
四种主流算法各有适用场景,下面手写各一份 + 选择建议。

算法对比

算法 适合 突发支持 实现复杂度
Fixed Window 简单计数器 边界双倍突发 极简
Sliding Window 平滑限流 可控
Leaky Bucket 强制匀速输出 不支持
Token Bucket 突发友好 支持

1. Fixed Window(最简,但有边界问题)

import redis

r = redis.Redis()

def fixed_window_allow(key: str, limit: int, window_sec: int) -> bool:
    bucket = f'{key}:{int(time.time()) // window_sec}'
    n = r.incr(bucket)
    if n == 1:
        r.expire(bucket, window_sec)
    return n <= limit

# 用法:limit=100/分钟
allowed = fixed_window_allow(f'rl:user:{uid}', 100, 60)

问题:59:59 一波 100 次 + 00:01 一波 100 次 = 2 秒内 200 次,违反"每分钟 100"
意图。

2. Sliding Window(用 sorted set 精确,开销稍高)

def sliding_window_allow(key: str, limit: int, window_sec: int) -> bool:
    now = time.time()
    pipe = r.pipeline()
    pipe.zremrangebyscore(key, 0, now - window_sec)   # 删窗口外
    pipe.zcard(key)                                    # 当前窗口内的请求数
    pipe.zadd(key, {str(now): now})                    # 加这次
    pipe.expire(key, window_sec)
    _, count, _, _ = pipe.execute()
    return count < limit

精确:当前到过去 window_sec 秒内最多 limit 次。

代价:每个 user 一个 sorted set;高并发下 Redis 内存增加。

3. Leaky Bucket(强制匀速出)

def leaky_bucket_allow(key: str, capacity: int, leak_per_sec: float) -> bool:
    """
    模拟一个桶:用户每次请求往桶里加一滴水;桶在背景以 leak_per_sec 漏水。
    桶满 → 拒绝。
    """
    now = time.time()
    state = r.hgetall(key) or {}
    last = float(state.get(b'last', now))
    water = float(state.get(b'water', 0))

    # 时间差里漏掉的水
    water = max(0, water - (now - last) * leak_per_sec)

    if water >= capacity:
        return False

    water += 1
    r.hset(key, mapping={'water': water, 'last': now})
    r.expire(key, int(capacity / leak_per_sec) + 1)
    return True

# 用法:bucket cap=10, 漏速 2/秒 → 最大允许短期 10 突发,长期 2/秒
allowed = leaky_bucket_allow(f'rl:lb:{uid}', 10, 2)

漏速恒定 → 输出节奏稳定,但不允许突发"全速消费"。

4. Token Bucket(突发友好,最常用)

def token_bucket_allow(key: str, capacity: int, refill_per_sec: float) -> bool:
    """
    桶里装 token:请求消耗一个 token;桶在背景以 refill_per_sec 添加 token。
    没 token → 拒绝。
    """
    now = time.time()
    state = r.hgetall(key) or {}
    last = float(state.get(b'last', now))
    tokens = float(state.get(b'tokens', capacity))

    tokens = min(capacity, tokens + (now - last) * refill_per_sec)

    if tokens < 1:
        return False

    tokens -= 1
    r.hset(key, mapping={'tokens': tokens, 'last': now})
    r.expire(key, int(capacity / refill_per_sec) + 1)
    return True

# 用法:cap=100, 速率 10/秒 → 短期可一次性 100 个,平均 10/秒
allowed = token_bucket_allow(f'rl:tb:{uid}', 100, 10)

桶满了允许突发用 capacity 个,之后稳态 refill_per_sec。
实际生产 99% 用这个:API 客户端通常零散调用,偶尔一波,token bucket
最贴合。

5. 原子性问题

上面 Python 版本有 race condition(get → 算 → set 之间被别的请求抢插)。
生产用 Lua 脚本一次性原子完成:

-- token_bucket.lua
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

local state = redis.call('HMGET', key, 'tokens', 'last')
local tokens = tonumber(state[1]) or capacity
local last = tonumber(state[2]) or now

tokens = math.min(capacity, tokens + (now - last) * refill_rate)

local allowed = 0
if tokens >= 1 then
    tokens = tokens - 1
    allowed = 1
end

redis.call('HMSET', key, 'tokens', tokens, 'last', now)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)

return allowed

Python 调用:

TOKEN_BUCKET_SCRIPT = r.register_script(open('token_bucket.lua').read())

def allow(key, capacity, refill_rate):
    return TOKEN_BUCKET_SCRIPT(
        keys=[key],
        args=[capacity, refill_rate, time.time()],
    )

6. nginx 限流

不要总在应用层做。nginx 内置 limit_req(leaky bucket)很强:

# 每客户端 IP 10 req/s,最大突发 20
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;

server {
    location /api/ {
        limit_req zone=api burst=20 nodelay;
        proxy_pass http://backend;
    }
}

burst=20 nodelay = 突发 20 个立即过,超过的直接 503。
不带 nodelay 是排队处理(leaky bucket 标准行为)。

API gateway(Kong / Tyk / Envoy)也都内置。

7. 分布式限流的精度

单 Redis 实例 + Lua 是强一致;多 Redis cluster 时同一 user 的请求被 hash
到不同分片,限流可能略松。一般可接受。

需要严格全局限流可用 sentinel pattern + 唯一 master。

8. 限流的响应

HTTP/1.1 429 Too Many Requests
Retry-After: 60
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1735689600

Retry-After(秒或日期)告诉客户端何时再试。

9. 按维度限流

不同维度组合:

  • 按 IP:防匿名滥用
  • 按 API key:按付费等级
  • 按 endpoint:贵 endpoint 单独严格
  • 按 user-id:登录用户
def check_rate_limits(req):
    if not allow(f'ip:{req.ip}', 1000, 100):
        return 'IP too fast'
    if not allow(f'user:{req.user_id}', 100, 10):
        return 'user quota exceeded'
    if req.path == '/expensive':
        if not allow(f'user:{req.user_id}:exp', 5, 0.1):
            return 'expensive endpoint quota exceeded'
    return None

10. 反例 + 注意

  • 用全局锁实现限流:性能差,单点故障
  • 用 DB 计数:DB 不是限流工具,QPS 上来扛不住
  • 限流粒度太粗(all users 共享):恶意用户拖累所有人

11. 第三方库

  • slowapi(FastAPI / Starlette):装饰器限流
  • django-ratelimit:Django 装饰器
  • redis-cell:Redis module,提供原子 CL.THROTTLE 命令
# slowapi 例
from slowapi import Limiter

limiter = Limiter(key_func=lambda req: req.client.host)

@app.get('/api/x')
@limiter.limit('100/minute')
def x(request: Request): ...

踩过的坑

  • 限流 key 包含敏感信息(user email),泄露 Redis = 数据泄漏。
    hash 一下当 key。
  • TTL 算错让 key 永远不过期,Redis 占用涨。
  • 限流应用在 readiness probe 上 → 监控系统刷它把自己 ban 了。
    排除 health endpoint。
  • 测试期忘了关限流 → CI 测试因 429 间歇性失败。环境变量分场景。
精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。