Python: GIL / asyncio / multiprocessing / threading 选谁

起因

新人经常困惑:"Python 怎么做并发?asyncio / threading / multiprocessing
都是干嘛的?什么时候用哪个?"

下面用具体场景拆开。

GIL 是什么

Python(CPython 实现)的 GIL(Global Interpreter Lock)让同一时刻
只有一个 thread 在跑 Python bytecode。
threading 在 CPU 密集任务上得不到并行加速

CPython 3.13+ 实验性的 "free-threaded" build 移除 GIL,但默认还是有
GIL,下面假设默认。

三种并发选哪个

场景 推荐
IO 密集(HTTP / DB / file) + 高并发 asyncio
IO 密集 + 现有同步代码 + 中等并发 threading
CPU 密集(数学 / 加密 / 解析) multiprocessing
数据科学(numpy / pandas) 用 numpy 内部并行
跑 N 个独立 task(如批处理) concurrent.futures.ProcessPoolExecutor

详细 1:asyncio (IO 密集)

import asyncio
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as s:
        async with s.get(url) as r:
            return await r.text()

async def main():
    urls = ['https://x.com', 'https://y.com', 'https://z.com']
    results = await asyncio.gather(*[fetch(u) for u in urls])
    print(len(results))

asyncio.run(main())

3 个 fetch 并发跑(不是真的并行,但 IO 等待时事件循环切换)。

适合:

  • web server(FastAPI / Sanic / aiohttp)
  • API gateway
  • crawler
  • WebSocket 服务端

GIL 不阻碍 IO,所以 asyncio 单进程能处理 10k+ 并发连接。

详细 2:threading (IO 密集 + legacy code)

import threading
import requests

def fetch(url):
    return requests.get(url).text

threads = [threading.Thread(target=fetch, args=(u,)) for u in urls]
for t in threads: t.start()
for t in threads: t.join()

跟 asyncio 类似的 IO 并发,但用同步代码 + thread。

threading 优势:

  • 不用改成 async
  • 现有 sync code 直接用
  • thread pool 简单

劣势:

  • 创建 thread 开销大(asyncio coroutine 几 KB / thread 几 MB)
  • 上下文切换贵
  • 并发上限低(几百 thread vs asyncio 万级 coroutine)

实际:

  • Django / Flask 用 thread-based server (gunicorn sync workers):
    通常够用
  • 高并发 / 长连接 → 改 asyncio

详细 3:multiprocessing (CPU 密集)

from multiprocessing import Pool

def cpu_heavy(n):
    return sum(i * i for i in range(n))

with Pool(processes=8) as pool:
    results = pool.map(cpu_heavy, [10_000_000] * 8)

8 个 Python 进程并行跑 → 真正利用多核 CPU。

GIL 是 per-process 的。多进程绕过 GIL → 多核并行计算。

代价:

  • 进程启动慢(几十 ms)
  • 进程间通信只能 pickle(大数据传输贵)
  • 每进程独立 RAM(不共享 Python 对象)

适合:

  • 图像处理批量
  • 机器学习预处理
  • 复杂数学 / 解析

详细 4:concurrent.futures(统一 API)

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# IO 密集 → thread
with ThreadPoolExecutor(max_workers=20) as ex:
    results = list(ex.map(fetch_url, urls))

# CPU 密集 → process
with ProcessPoolExecutor(max_workers=8) as ex:
    results = list(ex.map(cpu_heavy, datas))

concurrent.futures 是 thread / process 的统一封装。
简单"batch 处理"场景首选。

详细 5:用 numpy / pandas / pytorch 替代手写 CPU 并发

import numpy as np

# ❌ 手写循环(慢 + GIL)
result = [x * 2 + y for x, y in zip(arr1, arr2)]

# ✅ numpy vectorize
result = arr1 * 2 + arr2     # C 层并行 + SIMD

NumPy / pandas / PyTorch 内部 release GIL 跑 C 代码 + 多核 SIMD。
"用对工具"比"加并发"快得多。

混合:asyncio + thread pool

asyncio 里需要调同步代码(如 pandas / requests):

import asyncio

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, blocking_function, arg)

run_in_executor 把同步函数丢线程池跑,async 继续。
小心:仍受 GIL 限制(同步函数还是单核)。CPU 密集时换 ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor

pool = ProcessPoolExecutor()
result = await loop.run_in_executor(pool, cpu_heavy, data)

实战:crawler

需求:抓 10000 个网页 + 解析(IO 密集 + 轻量 CPU)。

import asyncio
import aiohttp
from bs4 import BeautifulSoup

async def fetch_and_parse(session, url):
    async with session.get(url) as r:
        html = await r.text()
        # 解析在主线程(轻量)
        soup = BeautifulSoup(html, 'html.parser')
        return soup.title.string if soup.title else ''

async def main(urls):
    semaphore = asyncio.Semaphore(50)   # 限并发
    async with aiohttp.ClientSession() as session:
        async def bounded(url):
            async with semaphore:
                return await fetch_and_parse(session, url)
        results = await asyncio.gather(*[bounded(u) for u in urls])
    return results

asyncio.run(main(urls))

50 并发同时跑 → 10000 URL 几分钟完成。
单 Python 进程,内存几百 MB。

如果解析很重(NLP / image),把解析丢 process pool:

async def fetch_and_dispatch(session, url, pool):
    async with session.get(url) as r:
        html = await r.text()
        loop = asyncio.get_running_loop()
        # 解析跑 process pool(绕过 GIL)
        return await loop.run_in_executor(pool, heavy_parse, html)

实战:图像批量处理

CPU 密集 → multiprocessing:

from multiprocessing import Pool
from PIL import Image

def process(path):
    img = Image.open(path)
    img.thumbnail((800, 800))
    img.save(path.replace('.jpg', '_thumb.jpg'))

if __name__ == '__main__':
    with Pool(processes=8) as pool:
        pool.map(process, image_paths)

if __name__ == '__main__': 必须(multiprocessing fork 模式要)。

8 核机器 ~ 8x 加速。

free-threaded Python (3.13t)

Python 3.13 引入 --disable-gil build:

# 装 free-threaded version
uv python install 3.13t

理论上 threading 在 CPU 密集任务能并行。
但:

  • 实验性 + 慢一些(单线程慢 10-20%)
  • C extension 兼容性问题(很多包还没支持)
  • 几年内还不是默认

生产暂时仍用 GIL build + multiprocessing。

性能对比(10000 URL fetch)

方法 时间 内存
sync requests + for loop 30 min 50 MB
threading + ThreadPool(20) 3 min 200 MB
threading + ThreadPool(200) 1.5 min 800 MB
asyncio + aiohttp + sem(50) 2 min 150 MB
asyncio + aiohttp + sem(200) 45s 250 MB
multiprocessing 没意义(IO 任务) -

IO 任务:asyncio 全胜。

CPU 任务(10000 张图缩略):

方法 时间
sync for loop 50 min
threading 48 min(GIL,几乎没加速)
multiprocessing(8) 8 min
numpy vectorize 改写(若适用) 5 min

选型决策树

你的任务是?
├── IO 密集(网络 / 文件 / DB)
│   ├── 高并发(万级)→ asyncio
│   ├── 已有 sync code 不想改 → threading + ThreadPoolExecutor
│   └── 简单批处理 → concurrent.futures.ThreadPoolExecutor
│
├── CPU 密集
│   ├── 数学 / 矩阵 → numpy / pytorch(向量化)
│   └── 一般计算 → multiprocessing / ProcessPoolExecutor
│
└── 混合
    └── asyncio + run_in_executor(ProcessPoolExecutor)

踩过的坑

  1. threading + requests:默认 requests.Session 不 thread-safe。
    每线程独立 session 或用 aiohttp async。

  2. multiprocessing + fork:Linux fork 复制整个进程 → 大数据 in
    parent 也复制 → 内存爆。改 spawn 或者把 data 写文件 worker 读。

  3. asyncio 里调 time.sleep(5) → 阻塞整个 event loop。
    await asyncio.sleep(5)

  4. mixed event loop:多个 asyncio.run 嵌套 → 报"event loop already
    running"。一般 asyncio.run 只在 main 调一次。

  5. multiprocessing on Windows / Mac:spawn 模式要求 worker 函数
    可 pickle + 子进程重新 import module → 用 if __name__ == '__main__':
    保护。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。