asyncio 已经是 Python 标准并发模型。但坑也多——很多人以为加 async/await
就能"并行",结果代码顺序跑得跟同步一样。下面讲实际让代码并发的关键。
1. 单纯加 async/await 不会并行
async def fetch_one(url):
async with aiohttp.ClientSession() as s:
async with s.get(url) as r:
return await r.text()
async def main():
for url in urls:
await fetch_one(url) # 串行!每个等上一个完
await 让出控制权,但 for + await 仍然是顺序的。要并发用 gather
或 TaskGroup。
2. asyncio.gather
async def main():
results = await asyncio.gather(*[fetch_one(u) for u in urls])
所有 fetch 同时进行,gather 等全部完成返回列表。
注意:
- 任何一个抛异常默认会取消其它任务并向上抛
- 用
return_exceptions=True让异常也作为结果返回,不取消其它
results = await asyncio.gather(*coros, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
log.warning('one failed: %s', r)
else:
process(r)
3. TaskGroup(Python 3.11+,推荐)
async def main():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_one(u1))
t2 = tg.create_task(fetch_one(u2))
t3 = tg.create_task(fetch_one(u3))
# 所有 task 自动完成 + 自动 cancel 兄弟 task 当一个失败
print(t1.result(), t2.result())
TaskGroup 是 PEP 654 + 3.11 引入的结构化并发。优点:
- 异常传播更清晰(aggregates as ExceptionGroup)
- 自动 cancel 兄弟,避免 "task 还在跑" 的孤儿
- 比 gather 更结构化
新代码优先 TaskGroup。
4. 限并发:Semaphore
如果有 10000 个 URL 要抓,全用 gather 同时跑会爆 socket / 被目标限流。
用 Semaphore 控制并发度:
sem = asyncio.Semaphore(20)
async def bounded_fetch(url):
async with sem:
return await fetch_one(url)
results = await asyncio.gather(*[bounded_fetch(u) for u in urls])
sem 上限 20 → 最多 20 个 fetch 同时进行。
更高级的限速(按时间,比如每秒 50 个请求):
import aiometer
async def fetch_one(url): ...
results = await aiometer.run_on_each(
fetch_one, urls,
max_at_once=20,
max_per_second=50,
)
5. 取消 + 超时
# 单个任务超时
try:
result = await asyncio.wait_for(fetch_one(url), timeout=5.0)
except asyncio.TimeoutError:
log.warning('timeout')
asyncio.wait_for 在超时后取消 coroutine(抛 CancelledError)。
任务里要正确处理这个 cancel:
async def fetch_one(url):
try:
async with session.get(url) as r:
return await r.text()
except asyncio.CancelledError:
# 清理资源(如果还在持有)
log.info('cancelled')
raise # 必须 re-raise,否则任务不算被取消
不 re-raise 会让 cancel 信号丢失,cleanup 顺序乱。
6. 不要在 async 里跑同步阻塞
async def bad():
time.sleep(5) # 阻塞整个事件循环!
result = requests.get('...') # 同步 IO
修复:
async def good():
await asyncio.sleep(5)
async with aiohttp.ClientSession() as s:
async with s.get(...) as r:
await r.text()
如果必须用同步库(pandas / boto3 / heavy CPU 计算):
# 跑到 thread pool 不阻塞 loop
result = await asyncio.to_thread(blocking_function, args)
asyncio.to_thread 把同步函数包成 awaitable,在 thread pool 里跑。
7. CPU 密集任务用 ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor()
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, cpu_heavy_function, data)
CPU 密集(图片处理 / 加密 / 解析)放进程池,避开 GIL。
8. 信号 / 优雅退出
import signal
async def main():
stop = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, stop.set)
server_task = asyncio.create_task(run_server())
await stop.wait()
log.info('shutting down')
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass
asyncio.run(main())
收到 Ctrl-C 时优雅 cancel + 等待清理。Windows 不支持 add_signal_handler。
9. 共享状态
asyncio 是单线程协作式并发,访问全局变量不需要锁。
但 await 之后状态可能被其它 coroutine 改了:
counter = 0
async def bad():
global counter
n = counter
await asyncio.sleep(0.001) # 让出执行
counter = n + 1 # n 可能已经过时
需要原子读改写时用 asyncio.Lock:
lock = asyncio.Lock()
async def good():
async with lock:
n = counter
await ...
counter = n + 1
10. 调试
import asyncio
asyncio.run(main(), debug=True)
# 或 export PYTHONASYNCIODEBUG=1
debug 模式会警告:
- coroutine 没 await
- 协程跑超过 100ms(可能阻塞了 loop)
- 任务 leaks
asyncio.all_tasks() 看当前所有任务。
11. 常用第三方
- aiohttp:HTTP client/server
- httpx:HTTP,同步 + 异步统一 API(更现代)
- asyncpg:PostgreSQL,比 aiopg / sqlalchemy-asyncio 快很多
- redis-py 4+:内置 async
- anyio:跨 asyncio / trio 的抽象层
新项目 HTTP 优先用 httpx,老项目 aiohttp 仍然稳定。
踩过的坑
coro = some_async_fn()但忘 await:coroutine 不会执行 + Python
抛 "coroutine was never awaited" 警告。asyncio.run()不能嵌套:在 Jupyter / IPython 里要用await main()
直接(IPython 7+ 自动包 await)。- 多线程跑 asyncio:每个线程需要自己
asyncio.new_event_loop()。
通常不推荐——asyncio 设计是单线程的。 aiohttp.ClientSession是有 connection pool 的,每个请求建一个新
session 性能差。整个程序生命周期共享一个 session。
登录后参与评论。