Python asyncio 实战要点:gather / TaskGroup / 取消 / 限并发

asyncio 已经是 Python 标准并发模型。但坑也多——很多人以为加 async/await
就能"并行",结果代码顺序跑得跟同步一样。下面讲实际让代码并发的关键。

1. 单纯加 async/await 不会并行

async def fetch_one(url):
    async with aiohttp.ClientSession() as s:
        async with s.get(url) as r:
            return await r.text()

async def main():
    for url in urls:
        await fetch_one(url)   # 串行!每个等上一个完

await 让出控制权,但 for + await 仍然是顺序的。要并发用 gather
TaskGroup

2. asyncio.gather

async def main():
    results = await asyncio.gather(*[fetch_one(u) for u in urls])

所有 fetch 同时进行,gather 等全部完成返回列表。

注意:

  • 任何一个抛异常默认会取消其它任务并向上抛
  • return_exceptions=True 让异常也作为结果返回,不取消其它
results = await asyncio.gather(*coros, return_exceptions=True)
for r in results:
    if isinstance(r, Exception):
        log.warning('one failed: %s', r)
    else:
        process(r)

3. TaskGroup(Python 3.11+,推荐)

async def main():
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(fetch_one(u1))
        t2 = tg.create_task(fetch_one(u2))
        t3 = tg.create_task(fetch_one(u3))
    # 所有 task 自动完成 + 自动 cancel 兄弟 task 当一个失败
    print(t1.result(), t2.result())

TaskGroup 是 PEP 654 + 3.11 引入的结构化并发。优点:

  • 异常传播更清晰(aggregates as ExceptionGroup)
  • 自动 cancel 兄弟,避免 "task 还在跑" 的孤儿
  • 比 gather 更结构化

新代码优先 TaskGroup。

4. 限并发:Semaphore

如果有 10000 个 URL 要抓,全用 gather 同时跑会爆 socket / 被目标限流。
用 Semaphore 控制并发度:

sem = asyncio.Semaphore(20)

async def bounded_fetch(url):
    async with sem:
        return await fetch_one(url)

results = await asyncio.gather(*[bounded_fetch(u) for u in urls])

sem 上限 20 → 最多 20 个 fetch 同时进行。

更高级的限速(按时间,比如每秒 50 个请求):

import aiometer

async def fetch_one(url): ...

results = await aiometer.run_on_each(
    fetch_one, urls,
    max_at_once=20,
    max_per_second=50,
)

5. 取消 + 超时

# 单个任务超时
try:
    result = await asyncio.wait_for(fetch_one(url), timeout=5.0)
except asyncio.TimeoutError:
    log.warning('timeout')

asyncio.wait_for 在超时后取消 coroutine(抛 CancelledError)。
任务里要正确处理这个 cancel:

async def fetch_one(url):
    try:
        async with session.get(url) as r:
            return await r.text()
    except asyncio.CancelledError:
        # 清理资源(如果还在持有)
        log.info('cancelled')
        raise   # 必须 re-raise,否则任务不算被取消

不 re-raise 会让 cancel 信号丢失,cleanup 顺序乱。

6. 不要在 async 里跑同步阻塞

async def bad():
    time.sleep(5)        # 阻塞整个事件循环!
    result = requests.get('...')  # 同步 IO

修复:

async def good():
    await asyncio.sleep(5)
    async with aiohttp.ClientSession() as s:
        async with s.get(...) as r:
            await r.text()

如果必须用同步库(pandas / boto3 / heavy CPU 计算):

# 跑到 thread pool 不阻塞 loop
result = await asyncio.to_thread(blocking_function, args)

asyncio.to_thread 把同步函数包成 awaitable,在 thread pool 里跑。

7. CPU 密集任务用 ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor

executor = ProcessPoolExecutor()
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, cpu_heavy_function, data)

CPU 密集(图片处理 / 加密 / 解析)放进程池,避开 GIL。

8. 信号 / 优雅退出

import signal

async def main():
    stop = asyncio.Event()
    loop = asyncio.get_running_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, stop.set)

    server_task = asyncio.create_task(run_server())
    await stop.wait()
    log.info('shutting down')
    server_task.cancel()
    try:
        await server_task
    except asyncio.CancelledError:
        pass

asyncio.run(main())

收到 Ctrl-C 时优雅 cancel + 等待清理。Windows 不支持 add_signal_handler。

9. 共享状态

asyncio 是单线程协作式并发,访问全局变量不需要锁
但 await 之后状态可能被其它 coroutine 改了:

counter = 0

async def bad():
    global counter
    n = counter
    await asyncio.sleep(0.001)   # 让出执行
    counter = n + 1   # n 可能已经过时

需要原子读改写时用 asyncio.Lock

lock = asyncio.Lock()

async def good():
    async with lock:
        n = counter
        await ...
        counter = n + 1

10. 调试

import asyncio
asyncio.run(main(), debug=True)
# 或 export PYTHONASYNCIODEBUG=1

debug 模式会警告:

  • coroutine 没 await
  • 协程跑超过 100ms(可能阻塞了 loop)
  • 任务 leaks

asyncio.all_tasks() 看当前所有任务。

11. 常用第三方

  • aiohttp:HTTP client/server
  • httpx:HTTP,同步 + 异步统一 API(更现代)
  • asyncpg:PostgreSQL,比 aiopg / sqlalchemy-asyncio 快很多
  • redis-py 4+:内置 async
  • anyio:跨 asyncio / trio 的抽象层

新项目 HTTP 优先用 httpx,老项目 aiohttp 仍然稳定。

踩过的坑

  • coro = some_async_fn() 但忘 await:coroutine 不会执行 + Python
    抛 "coroutine was never awaited" 警告。
  • asyncio.run() 不能嵌套:在 Jupyter / IPython 里要用 await main()
    直接(IPython 7+ 自动包 await)。
  • 多线程跑 asyncio:每个线程需要自己 asyncio.new_event_loop()
    通常不推荐——asyncio 设计是单线程的。
  • aiohttp.ClientSession 是有 connection pool 的,每个请求建一个新
    session 性能差。整个程序生命周期共享一个 session。
精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。