起因
需要分布式跑 Python:
- ML 训练(多 GPU 多机)
- 批 inference(百万张图过 model)
- hyperparameter sweep(试 1000 个组合)
- 数据预处理(pandas 不够大)
老办法:
- spark:scala-friendly,PySpark 写得别扭,启动重
- multiprocessing:单机
- celery:任务队列,不为 compute 设计
Ray(UC Berkeley 出,2018+)让 Python 函数加 @ray.remote 即分布式。
轻量 + Python 原生。
装
pip install ray
本地用
import ray
@ray.remote
def square(x):
return x * x
ray.init()
futures = [square.remote(i) for i in range(100)]
results = ray.get(futures) # 等结果
print(results)
square.remote(i) 异步丢 task,返回 future。
ray.get() 阻塞拿结果。
本地多核就用多核(自动)。
actor (持久 state)
@ray.remote
class Counter:
def __init__(self):
self.count = 0
def inc(self):
self.count += 1
return self.count
c = Counter.remote()
c.inc.remote() # 异步调
c.inc.remote()
print(ray.get(c.inc.remote())) # 3
actor 跑在某 worker 上,state 持久。多 caller 共享。
适合:load 一个大 model 在 actor 里,多个 request 调它(避免重复加载)。
多机集群
启动 head node:
ray start --head --port=6379
worker node 加入:
ray start --address='head_node:6379'
代码不变:
ray.init(address='auto') # 连接 cluster
square.remote 自动 schedule 到任意 worker。
资源 / GPU 请求
@ray.remote(num_cpus=2, num_gpus=1)
def train(data):
# 跑 GPU 训练
...
Ray scheduler 把 task 放到有 GPU + 2 CPU 的 worker 上。
不够资源 → 排队等。
数据:Ray Data
import ray
ds = ray.data.read_parquet('s3://bucket/data/*.parquet')
# 分布式 map
processed = ds.map(lambda r: {**r, 'doubled': r['x'] * 2})
# 分布式 group + agg
result = processed.groupby('country').sum()
result.write_parquet('s3://bucket/out/')
类似 spark RDD / dataframe,但 Python 原生。
ML:Ray Train
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
def train_func(config):
model = ...
for epoch in range(config['epochs']):
...
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
train_loop_config={'lr': 1e-3, 'epochs': 10},
)
result = trainer.fit()
4 GPU 分布式 train,PyTorch DDP 自动配置。
ML:Ray Tune (hyperparameter)
from ray import tune
def trainable(config):
score = train_model(config['lr'], config['dropout'])
tune.report(score=score)
tuner = tune.Tuner(
trainable,
param_space={
'lr': tune.loguniform(1e-5, 1e-2),
'dropout': tune.uniform(0, 0.5),
},
tune_config=tune.TuneConfig(num_samples=100),
)
results = tuner.fit()
跑 100 个 trial 在集群 → 自动 schedule。
内置 HyperBand / Population-Based Training 等算法。
ML:Ray Serve (推理 server)
from ray import serve
@serve.deployment(num_replicas=4, ray_actor_options={'num_gpus': 1})
class Predictor:
def __init__(self):
self.model = load_model()
async def __call__(self, request):
data = await request.json()
return self.model.predict(data)
serve.run(Predictor.bind())
4 replica 在 4 个 GPU,前面 router 自动 LB。
比手写 FastAPI + load model worker 简单。
与 spark 对比
| Ray | Spark | |
|---|---|---|
| 语言 | Python first | Scala / Java first |
| 数据 size | < 10 TB | PB |
| ML 集成 | 强(Ray Train/Tune/Serve) | 中(MLlib) |
| 启动 | < 5s | 30s+ |
| 心智模型 | Python actor / task | RDD / dataframe |
| 生态 | ML / RL / serving | 通用大数据 |
- 数据 < 10 TB + Python 重 → Ray
- 数据 > 10 TB + Java/Scala 团队 → Spark
- ML + 分布式训练 → Ray
- ETL 跑数仓 → Spark / dbt
与 dask 对比
dask 也是 Python 分布式。
| Ray | dask | |
|---|---|---|
| API | actor + task | array / dataframe / bag |
| ML | Ray Train/Tune/Serve | dask-ml(弱) |
| 资源 model | 强(GPU / 自定义) | 中 |
| 通用 compute | 是 | 是 |
dask 更"分布式 pandas/numpy",Ray 更"分布式 Python compute + ML"。
ML 选 Ray;纯数据 ETL 选 dask 或 polars streaming。
一个 case:分布式批 inference
需求:1 亿张图过 vision model。
import ray
import torch
ray.init(address='auto') # cluster 8 GPU node
@ray.remote(num_gpus=1)
class Inferencer:
def __init__(self):
self.model = torch.load('model.pt').cuda().eval()
@torch.no_grad()
def predict(self, batch):
return self.model(batch.cuda()).cpu()
# 4 actor 4 GPU
predictors = [Inferencer.remote() for _ in range(4)]
# stream 数据
ds = ray.data.read_images('s3://bucket/images/').map_batches(preprocess, batch_size=64)
# 分布式调
results = ds.map_batches(
lambda batch: ray.get(predictors[0].predict.remote(batch)), # 简化
compute=ray.data.ActorPoolStrategy(size=4),
)
results.write_parquet('s3://bucket/predictions/')
8 GPU 跑满 → 1 亿张图几小时跑完。
监控
ray dashboard 8265 端口默认开:
- 集群资源使用
- task / actor 状态
- log 聚合
- profiler
调试比 spark UI 友好。
部署
- KubeRay:k8s operator 部署 Ray cluster
- Anyscale:Ray 公司的托管服务
- 手动:ssh 到每 node
ray start
prod 用 KubeRay 多。
踩过的坑
-
object size 大:ray 传 object 用 plasma store,几 GB
object 序列化贵。共享大 dataset 用ray.put(data)一次 + 引用。 -
import 依赖差:worker node 没装跟 head 一样的 Python package
→ import error。runtime_env 指定 deps,或者 docker image 统一。 -
GPU 假资源:
num_gpus=1只是 Ray scheduler 视角,task 内部
仍要cuda()实际用。配错 → 多 task 抢同 GPU。 -
schedule 不平衡:data skew → 某 worker 一直累。
ray.data.repartition()重新均衡。 -
Ray 版本兼容:head + worker Ray 版本必须一致。docker image
pin 版本。
登录后参与评论。