起因
新人经常困惑:"Python 怎么做并发?asyncio / threading / multiprocessing
都是干嘛的?什么时候用哪个?"
下面用具体场景拆开。
GIL 是什么
Python(CPython 实现)的 GIL(Global Interpreter Lock)让同一时刻
只有一个 thread 在跑 Python bytecode。
threading 在 CPU 密集任务上得不到并行加速。
CPython 3.13+ 实验性的 "free-threaded" build 移除 GIL,但默认还是有
GIL,下面假设默认。
三种并发选哪个
| 场景 | 推荐 |
|---|---|
| IO 密集(HTTP / DB / file) + 高并发 | asyncio |
| IO 密集 + 现有同步代码 + 中等并发 | threading |
| CPU 密集(数学 / 加密 / 解析) | multiprocessing |
| 数据科学(numpy / pandas) | 用 numpy 内部并行 |
| 跑 N 个独立 task(如批处理) | concurrent.futures.ProcessPoolExecutor |
详细 1:asyncio (IO 密集)
import asyncio
import aiohttp
async def fetch(url):
async with aiohttp.ClientSession() as s:
async with s.get(url) as r:
return await r.text()
async def main():
urls = ['https://x.com', 'https://y.com', 'https://z.com']
results = await asyncio.gather(*[fetch(u) for u in urls])
print(len(results))
asyncio.run(main())
3 个 fetch 并发跑(不是真的并行,但 IO 等待时事件循环切换)。
适合:
- web server(FastAPI / Sanic / aiohttp)
- API gateway
- crawler
- WebSocket 服务端
GIL 不阻碍 IO,所以 asyncio 单进程能处理 10k+ 并发连接。
详细 2:threading (IO 密集 + legacy code)
import threading
import requests
def fetch(url):
return requests.get(url).text
threads = [threading.Thread(target=fetch, args=(u,)) for u in urls]
for t in threads: t.start()
for t in threads: t.join()
跟 asyncio 类似的 IO 并发,但用同步代码 + thread。
threading 优势:
- 不用改成 async
- 现有 sync code 直接用
- thread pool 简单
劣势:
- 创建 thread 开销大(asyncio coroutine 几 KB / thread 几 MB)
- 上下文切换贵
- 并发上限低(几百 thread vs asyncio 万级 coroutine)
实际:
- Django / Flask 用 thread-based server (gunicorn sync workers):
通常够用 - 高并发 / 长连接 → 改 asyncio
详细 3:multiprocessing (CPU 密集)
from multiprocessing import Pool
def cpu_heavy(n):
return sum(i * i for i in range(n))
with Pool(processes=8) as pool:
results = pool.map(cpu_heavy, [10_000_000] * 8)
8 个 Python 进程并行跑 → 真正利用多核 CPU。
GIL 是 per-process 的。多进程绕过 GIL → 多核并行计算。
代价:
- 进程启动慢(几十 ms)
- 进程间通信只能 pickle(大数据传输贵)
- 每进程独立 RAM(不共享 Python 对象)
适合:
- 图像处理批量
- 机器学习预处理
- 复杂数学 / 解析
详细 4:concurrent.futures(统一 API)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# IO 密集 → thread
with ThreadPoolExecutor(max_workers=20) as ex:
results = list(ex.map(fetch_url, urls))
# CPU 密集 → process
with ProcessPoolExecutor(max_workers=8) as ex:
results = list(ex.map(cpu_heavy, datas))
concurrent.futures 是 thread / process 的统一封装。
简单"batch 处理"场景首选。
详细 5:用 numpy / pandas / pytorch 替代手写 CPU 并发
import numpy as np
# ❌ 手写循环(慢 + GIL)
result = [x * 2 + y for x, y in zip(arr1, arr2)]
# ✅ numpy vectorize
result = arr1 * 2 + arr2 # C 层并行 + SIMD
NumPy / pandas / PyTorch 内部 release GIL 跑 C 代码 + 多核 SIMD。
"用对工具"比"加并发"快得多。
混合:asyncio + thread pool
asyncio 里需要调同步代码(如 pandas / requests):
import asyncio
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_function, arg)
run_in_executor 把同步函数丢线程池跑,async 继续。
小心:仍受 GIL 限制(同步函数还是单核)。CPU 密集时换 ProcessPoolExecutor:
from concurrent.futures import ProcessPoolExecutor
pool = ProcessPoolExecutor()
result = await loop.run_in_executor(pool, cpu_heavy, data)
实战:crawler
需求:抓 10000 个网页 + 解析(IO 密集 + 轻量 CPU)。
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch_and_parse(session, url):
async with session.get(url) as r:
html = await r.text()
# 解析在主线程(轻量)
soup = BeautifulSoup(html, 'html.parser')
return soup.title.string if soup.title else ''
async def main(urls):
semaphore = asyncio.Semaphore(50) # 限并发
async with aiohttp.ClientSession() as session:
async def bounded(url):
async with semaphore:
return await fetch_and_parse(session, url)
results = await asyncio.gather(*[bounded(u) for u in urls])
return results
asyncio.run(main(urls))
50 并发同时跑 → 10000 URL 几分钟完成。
单 Python 进程,内存几百 MB。
如果解析很重(NLP / image),把解析丢 process pool:
async def fetch_and_dispatch(session, url, pool):
async with session.get(url) as r:
html = await r.text()
loop = asyncio.get_running_loop()
# 解析跑 process pool(绕过 GIL)
return await loop.run_in_executor(pool, heavy_parse, html)
实战:图像批量处理
CPU 密集 → multiprocessing:
from multiprocessing import Pool
from PIL import Image
def process(path):
img = Image.open(path)
img.thumbnail((800, 800))
img.save(path.replace('.jpg', '_thumb.jpg'))
if __name__ == '__main__':
with Pool(processes=8) as pool:
pool.map(process, image_paths)
if __name__ == '__main__': 必须(multiprocessing fork 模式要)。
8 核机器 ~ 8x 加速。
free-threaded Python (3.13t)
Python 3.13 引入 --disable-gil build:
# 装 free-threaded version
uv python install 3.13t
理论上 threading 在 CPU 密集任务能并行。
但:
- 实验性 + 慢一些(单线程慢 10-20%)
- C extension 兼容性问题(很多包还没支持)
- 几年内还不是默认
生产暂时仍用 GIL build + multiprocessing。
性能对比(10000 URL fetch)
| 方法 | 时间 | 内存 |
|---|---|---|
| sync requests + for loop | 30 min | 50 MB |
| threading + ThreadPool(20) | 3 min | 200 MB |
| threading + ThreadPool(200) | 1.5 min | 800 MB |
| asyncio + aiohttp + sem(50) | 2 min | 150 MB |
| asyncio + aiohttp + sem(200) | 45s | 250 MB |
| multiprocessing | 没意义(IO 任务) | - |
IO 任务:asyncio 全胜。
CPU 任务(10000 张图缩略):
| 方法 | 时间 |
|---|---|
| sync for loop | 50 min |
| threading | 48 min(GIL,几乎没加速) |
| multiprocessing(8) | 8 min |
| numpy vectorize 改写(若适用) | 5 min |
选型决策树
你的任务是?
├── IO 密集(网络 / 文件 / DB)
│ ├── 高并发(万级)→ asyncio
│ ├── 已有 sync code 不想改 → threading + ThreadPoolExecutor
│ └── 简单批处理 → concurrent.futures.ThreadPoolExecutor
│
├── CPU 密集
│ ├── 数学 / 矩阵 → numpy / pytorch(向量化)
│ └── 一般计算 → multiprocessing / ProcessPoolExecutor
│
└── 混合
└── asyncio + run_in_executor(ProcessPoolExecutor)
踩过的坑
-
threading + requests:默认
requests.Session不 thread-safe。
每线程独立 session 或用aiohttpasync。 -
multiprocessing + fork:Linux fork 复制整个进程 → 大数据 in
parent 也复制 → 内存爆。改spawn或者把 data 写文件 worker 读。 -
asyncio 里调
time.sleep(5)→ 阻塞整个 event loop。
await asyncio.sleep(5)。 -
mixed event loop:多个
asyncio.run嵌套 → 报"event loop already
running"。一般asyncio.run只在 main 调一次。 -
multiprocessing on Windows / Mac:spawn 模式要求 worker 函数
可 pickle + 子进程重新 import module → 用if __name__ == '__main__':
保护。
登录后参与评论。