知识广场

按学科筛选:计算机科学 / 数据库 / Redis
清除筛选

«计算机科学 / 数据库 / Redis» 分类下共 1 篇帖子

用 Redis 实现分布式锁:单实例 + Redlock 各自怎么用

分布式锁是绝大多数后端早晚都要的"防止并发执行某操作"机制。 Redis 是最常用的实现,但实现细节不对会导致**重复执行 / 死锁 / 误释放**。 下面讲两种正确写法。 ## 单实例 SET NX PX(最常用) 适合非关键路径:"同一任务同一时刻只跑一份",可以接受极小概率冲突。 ```python import secrets import redis r = redis.Redis(decode_responses=True) def acquire_lock(key: str, ttl_ms: int) -> str | None: """Return token if locked, None otherwise.""" token = secrets.token_hex(16) ok = r.set(key, token, nx=True, px=ttl_ms) return token if ok else None def release_lock(key: str, token: str) -> bool: """只在 token 匹配时才删 —— 防止误释放别人的锁。""" lua = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end """ return bool(r.eval(lua, 1, key, token)) ``` 使用: ```python token = acquire_lock('job:nightly-report', ttl_ms=60000) if not token: print('another worker is running this; skip') return try: do_the_work() finally: release_lock('job:nightly-report', token) ``` ## 三个关键点 1. **TTL 必须**:worker 崩了 / 网断了,锁自动过期,否则死锁 2. **token 必须**:删锁时验证是不是自己的;否则你的锁超时了, 别人拿了同 key 的锁,你回头 del 把别人的锁删了 3. **释放用 Lua**:GET + DEL 不是原子的,中间可能锁到期被别人拿走 不做 token 验证的"`del`" 释放是大坑。 ## 上下文管理器封装 ```python import contextlib @contextlib.contextmanager def redis_lock(key, ttl_ms=30000): token = acquire_lock(key, ttl_ms) if not token: raise RuntimeError(f'lock {key} already held') try: yield finally: release_lock(key, token) # 用法 with redis_lock('job:sync', ttl_ms=120000): do_sync() ``` ## 长任务的 TTL 续期(看门狗) 如果 do_the_work 可能跑超过 ttl,需要在后台周期性续期: ```python import threading, time class WatchdogLock: def __init__(self, key, ttl_ms=30000): self.key, self.ttl_ms, self.token = key, ttl_ms, None self._stop = threading.Event() self._thread = None def __enter__(self): self.token = acquire_lock(self.key, self.ttl_ms) if not self.token: raise RuntimeError('lock busy') self._thread = threading.Thread(target=self._renew, daemon=True) self._thread.start() return self def __exit__(self, *args): self._stop.set() self._thread.join() release_lock(self.key, self.token) def _renew(self): # 每 ttl/3 续一次 while not self._stop.wait(self.ttl_ms / 3 / 1000): lua = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('pexpire', KEYS[1], ARGV[2]) else return 0 end """ r.eval(lua, 1, self.key, self.token, self.ttl_ms) ``` ## Redlock —— 多 Redis 实例时 如果 Redis 是单机,主从切换的瞬间锁可能丢(旧 master 上有锁, 新 master 上没有)。**Redlock 算法** 在多个独立 Redis 实例上同时 申请锁,多数派同意才算成功。 但 Redlock 有争议(Martin Kleppmann vs antirez 之争):分布式系统专家 认为它在某些时钟漂移场景下不安全。**结论:除非你真的不能容忍上面这种 极端故障,否则单实例 SET NX PX 已经够好**。 如果一定要 Redlock,用 [redis-py-lock](https://github.com/redis/redis-py) 内置的 `r.lock()` 实现(基于 Redlock): ```python import redis r = redis.Redis() with r.lock('myresource', timeout=60, blocking_timeout=5) as locked: do_work() ``` 或者多实例版: ```python from redis import Redis from redis.lock import Lock rs = [Redis(host=h) for h in ('r1', 'r2', 'r3')] # 你需要自己写 Redlock 算法:在 majority 上获取 ``` Python 生态用 [`redlock-py`](https://github.com/SPSCommerce/redlock-py) 更省事。 ## 什么时候**不要**用 Redis 锁 - **严格不可重复执行**:付款扣库存、唯一 ID 申领。 数据库的事务 / unique constraint 是真正的源(Redis 锁可能因为时钟 / 网络分区误放过两次)。 - **跨数据中心**:Redis 跨 DC 复制延迟通常 > 锁的 TTL;用 etcd / Zookeeper 的 lease 机制。 - **强一致需求**:见上,用支持 raft / paxos 的工具。 ## 整数 ID / 任务队列防重复 如果你要"任务 X 在 30 分钟内只处理一次",比锁更简单的是 dedup key: ```python key = f'dedup:taskX:{job_id}' if not r.set(key, '1', nx=True, ex=1800): return # 已经处理过 / 正在处理 do_work() ``` 不需要释放——靠 TTL 自然过期。简单粗暴。 ## 踩过的坑 - 没 token 直接 `del`:worker A 拿锁后任务跑超时锁过期 → worker B 拿到 同 key 锁 → A 任务结束直接 del → A 把 B 的锁删了 → C 又拿到锁 → 重复执行。 - TTL 太短:网络抖动 / GC 暂停 / 数据库慢一下,TTL 到了任务还没完, 下一个 worker 进来。要么调长 TTL 要么用看门狗。 - 用 Redis 锁保护 DB 操作:注意 Redis 锁本身只是建议性,DB 一致性还是 靠 DB 事务 / UNIQUE。锁主要是"避免重复劳动"而不是"保证正确性"。 - redis-py 默认 connection pool 在多线程不安全。每个线程拿自己的 Redis 实例,或者用 redis-py 4+ 的 connection pool(已优化)。