用 Redis 实现分布式锁:单实例 + Redlock 各自怎么用

分布式锁是绝大多数后端早晚都要的"防止并发执行某操作"机制。
Redis 是最常用的实现,但实现细节不对会导致重复执行 / 死锁 / 误释放
下面讲两种正确写法。

单实例 SET NX PX(最常用)

适合非关键路径:"同一任务同一时刻只跑一份",可以接受极小概率冲突。

import secrets
import redis

r = redis.Redis(decode_responses=True)

def acquire_lock(key: str, ttl_ms: int) -> str | None:
    """Return token if locked, None otherwise."""
    token = secrets.token_hex(16)
    ok = r.set(key, token, nx=True, px=ttl_ms)
    return token if ok else None

def release_lock(key: str, token: str) -> bool:
    """只在 token 匹配时才删 —— 防止误释放别人的锁。"""
    lua = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    else
        return 0
    end
    """
    return bool(r.eval(lua, 1, key, token))

使用:

token = acquire_lock('job:nightly-report', ttl_ms=60000)
if not token:
    print('another worker is running this; skip')
    return

try:
    do_the_work()
finally:
    release_lock('job:nightly-report', token)

三个关键点

  1. TTL 必须:worker 崩了 / 网断了,锁自动过期,否则死锁
  2. token 必须:删锁时验证是不是自己的;否则你的锁超时了,
    别人拿了同 key 的锁,你回头 del 把别人的锁删了
  3. 释放用 Lua:GET + DEL 不是原子的,中间可能锁到期被别人拿走

不做 token 验证的"del" 释放是大坑。

上下文管理器封装

import contextlib

@contextlib.contextmanager
def redis_lock(key, ttl_ms=30000):
    token = acquire_lock(key, ttl_ms)
    if not token:
        raise RuntimeError(f'lock {key} already held')
    try:
        yield
    finally:
        release_lock(key, token)

# 用法
with redis_lock('job:sync', ttl_ms=120000):
    do_sync()

长任务的 TTL 续期(看门狗)

如果 do_the_work 可能跑超过 ttl,需要在后台周期性续期:

import threading, time

class WatchdogLock:
    def __init__(self, key, ttl_ms=30000):
        self.key, self.ttl_ms, self.token = key, ttl_ms, None
        self._stop = threading.Event()
        self._thread = None

    def __enter__(self):
        self.token = acquire_lock(self.key, self.ttl_ms)
        if not self.token:
            raise RuntimeError('lock busy')
        self._thread = threading.Thread(target=self._renew, daemon=True)
        self._thread.start()
        return self

    def __exit__(self, *args):
        self._stop.set()
        self._thread.join()
        release_lock(self.key, self.token)

    def _renew(self):
        # 每 ttl/3 续一次
        while not self._stop.wait(self.ttl_ms / 3 / 1000):
            lua = """
            if redis.call('get', KEYS[1]) == ARGV[1] then
              return redis.call('pexpire', KEYS[1], ARGV[2])
            else
              return 0
            end
            """
            r.eval(lua, 1, self.key, self.token, self.ttl_ms)

Redlock —— 多 Redis 实例时

如果 Redis 是单机,主从切换的瞬间锁可能丢(旧 master 上有锁,
新 master 上没有)。Redlock 算法 在多个独立 Redis 实例上同时
申请锁,多数派同意才算成功。

但 Redlock 有争议(Martin Kleppmann vs antirez 之争):分布式系统专家
认为它在某些时钟漂移场景下不安全。结论:除非你真的不能容忍上面这种
极端故障,否则单实例 SET NX PX 已经够好

如果一定要 Redlock,用 redis-py-lock
内置的 r.lock() 实现(基于 Redlock):

import redis
r = redis.Redis()

with r.lock('myresource', timeout=60, blocking_timeout=5) as locked:
    do_work()

或者多实例版:

from redis import Redis
from redis.lock import Lock

rs = [Redis(host=h) for h in ('r1', 'r2', 'r3')]
# 你需要自己写 Redlock 算法:在 majority 上获取

Python 生态用 redlock-py
更省事。

什么时候不要用 Redis 锁

  • 严格不可重复执行:付款扣库存、唯一 ID 申领。
    数据库的事务 / unique constraint 是真正的源(Redis 锁可能因为时钟
    / 网络分区误放过两次)。
  • 跨数据中心:Redis 跨 DC 复制延迟通常 > 锁的 TTL;用 etcd / Zookeeper
    的 lease 机制。
  • 强一致需求:见上,用支持 raft / paxos 的工具。

整数 ID / 任务队列防重复

如果你要"任务 X 在 30 分钟内只处理一次",比锁更简单的是 dedup key:

key = f'dedup:taskX:{job_id}'
if not r.set(key, '1', nx=True, ex=1800):
    return  # 已经处理过 / 正在处理
do_work()

不需要释放——靠 TTL 自然过期。简单粗暴。

踩过的坑

  • 没 token 直接 del:worker A 拿锁后任务跑超时锁过期 → worker B 拿到
    同 key 锁 → A 任务结束直接 del → A 把 B 的锁删了 → C 又拿到锁 → 重复执行。
  • TTL 太短:网络抖动 / GC 暂停 / 数据库慢一下,TTL 到了任务还没完,
    下一个 worker 进来。要么调长 TTL 要么用看门狗。
  • 用 Redis 锁保护 DB 操作:注意 Redis 锁本身只是建议性,DB 一致性还是
    靠 DB 事务 / UNIQUE。锁主要是"避免重复劳动"而不是"保证正确性"。
  • redis-py 默认 connection pool 在多线程不安全。每个线程拿自己的 Redis
    实例,或者用 redis-py 4+ 的 connection pool(已优化)。
精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。