起因
要接 Stripe webhook 处理支付成功事件,"用户付款成功 → 给账户加余额"。
看似简单:
@app.post('/webhook/stripe')
def handle(req):
event = req.json()
if event['type'] == 'payment_intent.succeeded':
add_credit(event['data']['object']['customer'], 100)
return {'ok': True}
实际几个问题:
- Stripe 重发同一事件(network 重试 / 我们一时返回 500)→ 用户余额被加多次
- 没校验签名 → 任何人 POST 这个 URL 都能给账户加钱
- 处理慢 → Stripe 超时 → 它重试 → 雪崩
- add_credit 失败 → 错过事件 → 数据丢
正确实现需要 4 个东西:签名校验、幂等、异步处理、死信队列。
解决方案
1. 签名校验(永远第一步)
Stripe 在 header Stripe-Signature 里发 HMAC:
import stripe
STRIPE_WEBHOOK_SECRET = 'whsec_...'
@app.post('/webhook/stripe')
def handle(req):
payload = req.body_bytes # 原始 bytes,不是 parsed JSON
sig = req.headers.get('Stripe-Signature')
try:
event = stripe.Webhook.construct_event(
payload, sig, STRIPE_WEBHOOK_SECRET
)
except stripe.SignatureVerificationError:
return Response(status=400)
# ... 继续处理
construct_event 内部验签 + parse。验签失败 → 400 拒绝。
用原始 bytes,不要先 parse JSON 再 stringify——序列化可能改字节
(key 顺序 / 空格),HMAC 算不对。
自己实现 webhook(不用 SDK)的签名校验:
import hmac, hashlib, time
def verify(payload: bytes, sig_header: str, secret: str, tolerance=300):
# 解 header: t=timestamp,v1=hex_signature
parts = dict(p.split('=') for p in sig_header.split(','))
ts = int(parts['t'])
if abs(time.time() - ts) > tolerance:
raise ValueError('timestamp too old (replay?)')
signed = f'{ts}.{payload.decode()}'.encode()
expected = hmac.new(secret.encode(), signed, hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, parts['v1']):
raise ValueError('signature mismatch')
hmac.compare_digest 防时序攻击;tolerance 防 replay。
2. 幂等:去重表 + 唯一 event id
Stripe 每个 event 有唯一 id 字段,重发时 id 不变。
建一个表存已处理 id:
CREATE TABLE webhook_events (
id TEXT PRIMARY KEY, -- Stripe event id (evt_...)
type TEXT NOT NULL,
received_at TIMESTAMPTZ DEFAULT now(),
processed_at TIMESTAMPTZ,
payload JSONB NOT NULL
);
接收端:
@app.post('/webhook/stripe')
def handle(req):
# 1. 验签(上面)
event = construct_event(...)
# 2. 幂等检查 + 入库
try:
db.execute(
'INSERT INTO webhook_events (id, type, payload) VALUES (%s, %s, %s)',
(event.id, event.type, event.to_dict())
)
except UniqueViolation:
# 重复事件,已经处理过,直接 200 OK
return {'ok': True, 'duplicate': True}
# 3. 异步处理
enqueue_event_processing.delay(event.id)
# 4. 立刻返回 200(< 100ms)
return {'ok': True}
INSERT + 唯一约束就是去重。返回前不真正处理业务——业务逻辑放到后台。
3. 异步处理
@celery_task(bind=True, max_retries=5)
def enqueue_event_processing(self, event_id):
e = db.fetch_event(event_id)
if e.processed_at:
return # 已处理
try:
if e.type == 'payment_intent.succeeded':
obj = e.payload['data']['object']
add_credit(obj['customer'], obj['amount_received'])
elif e.type == 'invoice.payment_failed':
send_payment_failed_email(...)
# ... 其它事件
db.execute(
'UPDATE webhook_events SET processed_at = now() WHERE id = %s',
(event_id,)
)
except Exception as exc:
# Celery 自动按 exponential backoff 重试
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
关键:
- webhook endpoint 只做"放入队列",秒级返回 200
- 真处理在后台 worker,失败有重试 + 死信
processed_at字段防"重试 N 次都成功"被算多次
4. 处理顺序
Webhook 可能乱序到达(network 重试)。如果业务关心顺序(如订单状态
pending → paid → shipped),需要:
def handle_event(e):
obj_id = e.payload['data']['object']['id']
# 加锁防同对象并发处理
with db_lock(f'order:{obj_id}'):
current = db.get_order(obj_id)
if e.payload['data']['object']['updated'] < current.updated_at:
return # 这是老事件,丢
update_order(obj_id, e.payload['data']['object'])
用对象上的 timestamp / version 字段判断"这事件是不是 stale"。
5. 死信队列 (DLQ)
重试 N 次都失败的事件不要丢,放 DLQ 人工 review:
@celery_task(bind=True, max_retries=5)
def enqueue_event_processing(self, event_id):
try:
process(event_id)
except Exception as exc:
if self.request.retries >= self.max_retries:
# 最后一次重试还失败
move_to_dlq.delay(event_id, str(exc))
else:
raise self.retry(countdown=2 ** self.request.retries)
CREATE TABLE webhook_dlq (
id BIGSERIAL PRIMARY KEY,
event_id TEXT REFERENCES webhook_events(id),
error TEXT,
moved_at TIMESTAMPTZ DEFAULT now(),
resolved_at TIMESTAMPTZ
);
定时 / Slack 告警:"DLQ 有 N 条未处理",人工修业务后从 DLQ 重放。
6. 监控
# Prometheus metrics
webhook_received = Counter('webhook_received_total', '', ['provider', 'type'])
webhook_processed = Counter('webhook_processed_total', '', ['provider', 'type', 'result'])
webhook_lag = Histogram('webhook_lag_seconds', '')
# 上报
webhook_received.labels('stripe', event.type).inc()
# 处理完
webhook_processed.labels('stripe', event.type, 'ok').inc()
仪表盘看:
- 每分钟事件量
- 处理延迟 P95
- 错误率(按 event type 分)
- DLQ 数量
完整流程
1. Stripe POST 事件
↓
2. 验签(HMAC)→ 失败 400 拒绝
↓
3. INSERT 去重表(唯一 id)→ 已存在返 200 (dup)
↓
4. 推入 Celery 队列
↓
5. 返回 200(end-to-end < 100ms)
后台:
6. worker 拿事件
↓
7. 业务处理(add_credit 等)
↓
8. 成功 → 标记 processed_at
失败 → exponential retry
↓
9. 重试 5 次都失败 → 进 DLQ + 告警
效果
按这套设计后:
- 重复扣款 / 加钱 bug 归零
- 即使 Stripe 一秒发 1000 个事件,endpoint 不挂(异步队列削峰)
- DLQ 偶尔有 2-3 条(被告警捕获,人工处理 < 1 小时)
- 整套对 Stripe 的接入测试通过 Stripe 官方的 webhook test
安全 checklist
- ✅ HMAC 签名校验
- ✅ 时间戳验证防 replay
- ✅ 用原始 bytes 验签
- ✅ 不在 endpoint 里做业务
- ✅ 业务幂等(DB unique constraint 等)
- ✅ HTTPS only
- ✅ Webhook secret 进 vault / env,不进 git
踩过的坑
-
request.json()之后再验签:FastAPI / Flask 默认 parse JSON
后再处理,原始 body 拿不到。要用await request.body()/
request.get_data()拿 bytes。 -
endpoint 返回慢 → Stripe 重发:处理逻辑写在 endpoint 里 →
8 秒返回 → Stripe 5 秒超时认为失败 → 重发 → 你又花 8 秒 → 死循环。
异步是必须的。 -
重试时业务"看起来 OK 但 DB 没改":DB transaction 中间 commit
失败 → 业务部分生效 → 重试又跑一次。所有 update 必须 atomic
transaction。 -
DLQ 没人看:上 DLQ 后没监控 / 告警,几个月后发现 1000 条
未处理。alert 是必须的。 -
多 webhook 端点共享一个 secret:Stripe / 其它 SaaS 给每个
endpoint 独立 secret 才对,混用一个 secret 出问题难定位来源。
登录后参与评论。