任何对外 API 都需要限流,否则一波突发流量 / 恶意刷接口能把服务打挂。
四种主流算法各有适用场景,下面手写各一份 + 选择建议。
算法对比
| 算法 | 适合 | 突发支持 | 实现复杂度 |
|---|---|---|---|
| Fixed Window | 简单计数器 | 边界双倍突发 | 极简 |
| Sliding Window | 平滑限流 | 可控 | 中 |
| Leaky Bucket | 强制匀速输出 | 不支持 | 中 |
| Token Bucket | 突发友好 | 支持 | 中 |
1. Fixed Window(最简,但有边界问题)
import redis
r = redis.Redis()
def fixed_window_allow(key: str, limit: int, window_sec: int) -> bool:
bucket = f'{key}:{int(time.time()) // window_sec}'
n = r.incr(bucket)
if n == 1:
r.expire(bucket, window_sec)
return n <= limit
# 用法:limit=100/分钟
allowed = fixed_window_allow(f'rl:user:{uid}', 100, 60)
问题:59:59 一波 100 次 + 00:01 一波 100 次 = 2 秒内 200 次,违反"每分钟 100"
意图。
2. Sliding Window(用 sorted set 精确,开销稍高)
def sliding_window_allow(key: str, limit: int, window_sec: int) -> bool:
now = time.time()
pipe = r.pipeline()
pipe.zremrangebyscore(key, 0, now - window_sec) # 删窗口外
pipe.zcard(key) # 当前窗口内的请求数
pipe.zadd(key, {str(now): now}) # 加这次
pipe.expire(key, window_sec)
_, count, _, _ = pipe.execute()
return count < limit
精确:当前到过去 window_sec 秒内最多 limit 次。
代价:每个 user 一个 sorted set;高并发下 Redis 内存增加。
3. Leaky Bucket(强制匀速出)
def leaky_bucket_allow(key: str, capacity: int, leak_per_sec: float) -> bool:
"""
模拟一个桶:用户每次请求往桶里加一滴水;桶在背景以 leak_per_sec 漏水。
桶满 → 拒绝。
"""
now = time.time()
state = r.hgetall(key) or {}
last = float(state.get(b'last', now))
water = float(state.get(b'water', 0))
# 时间差里漏掉的水
water = max(0, water - (now - last) * leak_per_sec)
if water >= capacity:
return False
water += 1
r.hset(key, mapping={'water': water, 'last': now})
r.expire(key, int(capacity / leak_per_sec) + 1)
return True
# 用法:bucket cap=10, 漏速 2/秒 → 最大允许短期 10 突发,长期 2/秒
allowed = leaky_bucket_allow(f'rl:lb:{uid}', 10, 2)
漏速恒定 → 输出节奏稳定,但不允许突发"全速消费"。
4. Token Bucket(突发友好,最常用)
def token_bucket_allow(key: str, capacity: int, refill_per_sec: float) -> bool:
"""
桶里装 token:请求消耗一个 token;桶在背景以 refill_per_sec 添加 token。
没 token → 拒绝。
"""
now = time.time()
state = r.hgetall(key) or {}
last = float(state.get(b'last', now))
tokens = float(state.get(b'tokens', capacity))
tokens = min(capacity, tokens + (now - last) * refill_per_sec)
if tokens < 1:
return False
tokens -= 1
r.hset(key, mapping={'tokens': tokens, 'last': now})
r.expire(key, int(capacity / refill_per_sec) + 1)
return True
# 用法:cap=100, 速率 10/秒 → 短期可一次性 100 个,平均 10/秒
allowed = token_bucket_allow(f'rl:tb:{uid}', 100, 10)
桶满了允许突发用 capacity 个,之后稳态 refill_per_sec。
实际生产 99% 用这个:API 客户端通常零散调用,偶尔一波,token bucket
最贴合。
5. 原子性问题
上面 Python 版本有 race condition(get → 算 → set 之间被别的请求抢插)。
生产用 Lua 脚本一次性原子完成:
-- token_bucket.lua
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local state = redis.call('HMGET', key, 'tokens', 'last')
local tokens = tonumber(state[1]) or capacity
local last = tonumber(state[2]) or now
tokens = math.min(capacity, tokens + (now - last) * refill_rate)
local allowed = 0
if tokens >= 1 then
tokens = tokens - 1
allowed = 1
end
redis.call('HMSET', key, 'tokens', tokens, 'last', now)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
return allowed
Python 调用:
TOKEN_BUCKET_SCRIPT = r.register_script(open('token_bucket.lua').read())
def allow(key, capacity, refill_rate):
return TOKEN_BUCKET_SCRIPT(
keys=[key],
args=[capacity, refill_rate, time.time()],
)
6. nginx 限流
不要总在应用层做。nginx 内置 limit_req(leaky bucket)很强:
# 每客户端 IP 10 req/s,最大突发 20
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
server {
location /api/ {
limit_req zone=api burst=20 nodelay;
proxy_pass http://backend;
}
}
burst=20 nodelay = 突发 20 个立即过,超过的直接 503。
不带 nodelay 是排队处理(leaky bucket 标准行为)。
API gateway(Kong / Tyk / Envoy)也都内置。
7. 分布式限流的精度
单 Redis 实例 + Lua 是强一致;多 Redis cluster 时同一 user 的请求被 hash
到不同分片,限流可能略松。一般可接受。
需要严格全局限流可用 sentinel pattern + 唯一 master。
8. 限流的响应
HTTP/1.1 429 Too Many Requests
Retry-After: 60
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1735689600
Retry-After(秒或日期)告诉客户端何时再试。
9. 按维度限流
不同维度组合:
- 按 IP:防匿名滥用
- 按 API key:按付费等级
- 按 endpoint:贵 endpoint 单独严格
- 按 user-id:登录用户
def check_rate_limits(req):
if not allow(f'ip:{req.ip}', 1000, 100):
return 'IP too fast'
if not allow(f'user:{req.user_id}', 100, 10):
return 'user quota exceeded'
if req.path == '/expensive':
if not allow(f'user:{req.user_id}:exp', 5, 0.1):
return 'expensive endpoint quota exceeded'
return None
10. 反例 + 注意
- 用全局锁实现限流:性能差,单点故障
- 用 DB 计数:DB 不是限流工具,QPS 上来扛不住
- 限流粒度太粗(all users 共享):恶意用户拖累所有人
11. 第三方库
- slowapi(FastAPI / Starlette):装饰器限流
- django-ratelimit:Django 装饰器
- redis-cell:Redis module,提供原子 CL.THROTTLE 命令
# slowapi 例
from slowapi import Limiter
limiter = Limiter(key_func=lambda req: req.client.host)
@app.get('/api/x')
@limiter.limit('100/minute')
def x(request: Request): ...
踩过的坑
- 限流 key 包含敏感信息(user email),泄露 Redis = 数据泄漏。
hash 一下当 key。 - TTL 算错让 key 永远不过期,Redis 占用涨。
- 限流应用在 readiness probe 上 → 监控系统刷它把自己 ban 了。
排除 health endpoint。 - 测试期忘了关限流 → CI 测试因 429 间歇性失败。环境变量分场景。
登录后参与评论。