分布式锁是绝大多数后端早晚都要的"防止并发执行某操作"机制。
Redis 是最常用的实现,但实现细节不对会导致重复执行 / 死锁 / 误释放。
下面讲两种正确写法。
单实例 SET NX PX(最常用)
适合非关键路径:"同一任务同一时刻只跑一份",可以接受极小概率冲突。
import secrets
import redis
r = redis.Redis(decode_responses=True)
def acquire_lock(key: str, ttl_ms: int) -> str | None:
"""Return token if locked, None otherwise."""
token = secrets.token_hex(16)
ok = r.set(key, token, nx=True, px=ttl_ms)
return token if ok else None
def release_lock(key: str, token: str) -> bool:
"""只在 token 匹配时才删 —— 防止误释放别人的锁。"""
lua = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
return bool(r.eval(lua, 1, key, token))
使用:
token = acquire_lock('job:nightly-report', ttl_ms=60000)
if not token:
print('another worker is running this; skip')
return
try:
do_the_work()
finally:
release_lock('job:nightly-report', token)
三个关键点
- TTL 必须:worker 崩了 / 网断了,锁自动过期,否则死锁
- token 必须:删锁时验证是不是自己的;否则你的锁超时了,
别人拿了同 key 的锁,你回头 del 把别人的锁删了 - 释放用 Lua:GET + DEL 不是原子的,中间可能锁到期被别人拿走
不做 token 验证的"del" 释放是大坑。
上下文管理器封装
import contextlib
@contextlib.contextmanager
def redis_lock(key, ttl_ms=30000):
token = acquire_lock(key, ttl_ms)
if not token:
raise RuntimeError(f'lock {key} already held')
try:
yield
finally:
release_lock(key, token)
# 用法
with redis_lock('job:sync', ttl_ms=120000):
do_sync()
长任务的 TTL 续期(看门狗)
如果 do_the_work 可能跑超过 ttl,需要在后台周期性续期:
import threading, time
class WatchdogLock:
def __init__(self, key, ttl_ms=30000):
self.key, self.ttl_ms, self.token = key, ttl_ms, None
self._stop = threading.Event()
self._thread = None
def __enter__(self):
self.token = acquire_lock(self.key, self.ttl_ms)
if not self.token:
raise RuntimeError('lock busy')
self._thread = threading.Thread(target=self._renew, daemon=True)
self._thread.start()
return self
def __exit__(self, *args):
self._stop.set()
self._thread.join()
release_lock(self.key, self.token)
def _renew(self):
# 每 ttl/3 续一次
while not self._stop.wait(self.ttl_ms / 3 / 1000):
lua = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('pexpire', KEYS[1], ARGV[2])
else
return 0
end
"""
r.eval(lua, 1, self.key, self.token, self.ttl_ms)
Redlock —— 多 Redis 实例时
如果 Redis 是单机,主从切换的瞬间锁可能丢(旧 master 上有锁,
新 master 上没有)。Redlock 算法 在多个独立 Redis 实例上同时
申请锁,多数派同意才算成功。
但 Redlock 有争议(Martin Kleppmann vs antirez 之争):分布式系统专家
认为它在某些时钟漂移场景下不安全。结论:除非你真的不能容忍上面这种
极端故障,否则单实例 SET NX PX 已经够好。
如果一定要 Redlock,用 redis-py-lock
内置的 r.lock() 实现(基于 Redlock):
import redis
r = redis.Redis()
with r.lock('myresource', timeout=60, blocking_timeout=5) as locked:
do_work()
或者多实例版:
from redis import Redis
from redis.lock import Lock
rs = [Redis(host=h) for h in ('r1', 'r2', 'r3')]
# 你需要自己写 Redlock 算法:在 majority 上获取
Python 生态用 redlock-py
更省事。
什么时候不要用 Redis 锁
- 严格不可重复执行:付款扣库存、唯一 ID 申领。
数据库的事务 / unique constraint 是真正的源(Redis 锁可能因为时钟
/ 网络分区误放过两次)。 - 跨数据中心:Redis 跨 DC 复制延迟通常 > 锁的 TTL;用 etcd / Zookeeper
的 lease 机制。 - 强一致需求:见上,用支持 raft / paxos 的工具。
整数 ID / 任务队列防重复
如果你要"任务 X 在 30 分钟内只处理一次",比锁更简单的是 dedup key:
key = f'dedup:taskX:{job_id}'
if not r.set(key, '1', nx=True, ex=1800):
return # 已经处理过 / 正在处理
do_work()
不需要释放——靠 TTL 自然过期。简单粗暴。
踩过的坑
- 没 token 直接
del:worker A 拿锁后任务跑超时锁过期 → worker B 拿到
同 key 锁 → A 任务结束直接 del → A 把 B 的锁删了 → C 又拿到锁 → 重复执行。 - TTL 太短:网络抖动 / GC 暂停 / 数据库慢一下,TTL 到了任务还没完,
下一个 worker 进来。要么调长 TTL 要么用看门狗。 - 用 Redis 锁保护 DB 操作:注意 Redis 锁本身只是建议性,DB 一致性还是
靠 DB 事务 / UNIQUE。锁主要是"避免重复劳动"而不是"保证正确性"。 - redis-py 默认 connection pool 在多线程不安全。每个线程拿自己的 Redis
实例,或者用 redis-py 4+ 的 connection pool(已优化)。
登录后参与评论。