Ray:把 Python 函数变分布式(不学 Spark)

起因

需要分布式跑 Python:

  • ML 训练(多 GPU 多机)
  • 批 inference(百万张图过 model)
  • hyperparameter sweep(试 1000 个组合)
  • 数据预处理(pandas 不够大)

老办法:

  • spark:scala-friendly,PySpark 写得别扭,启动重
  • multiprocessing:单机
  • celery:任务队列,不为 compute 设计

Ray(UC Berkeley 出,2018+)让 Python 函数加 @ray.remote 即分布式。
轻量 + Python 原生。

pip install ray

本地用

import ray

@ray.remote
def square(x):
    return x * x

ray.init()

futures = [square.remote(i) for i in range(100)]
results = ray.get(futures)        # 等结果
print(results)

square.remote(i) 异步丢 task,返回 future。
ray.get() 阻塞拿结果。

本地多核就用多核(自动)。

actor (持久 state)

@ray.remote
class Counter:
    def __init__(self):
        self.count = 0
    def inc(self):
        self.count += 1
        return self.count

c = Counter.remote()
c.inc.remote()           # 异步调
c.inc.remote()
print(ray.get(c.inc.remote()))    # 3

actor 跑在某 worker 上,state 持久。多 caller 共享。

适合:load 一个大 model 在 actor 里,多个 request 调它(避免重复加载)。

多机集群

启动 head node:

ray start --head --port=6379

worker node 加入:

ray start --address='head_node:6379'

代码不变:

ray.init(address='auto')          # 连接 cluster

square.remote 自动 schedule 到任意 worker。

资源 / GPU 请求

@ray.remote(num_cpus=2, num_gpus=1)
def train(data):
    # 跑 GPU 训练
    ...

Ray scheduler 把 task 放到有 GPU + 2 CPU 的 worker 上。
不够资源 → 排队等。

数据:Ray Data

import ray

ds = ray.data.read_parquet('s3://bucket/data/*.parquet')

# 分布式 map
processed = ds.map(lambda r: {**r, 'doubled': r['x'] * 2})

# 分布式 group + agg
result = processed.groupby('country').sum()
result.write_parquet('s3://bucket/out/')

类似 spark RDD / dataframe,但 Python 原生。

ML:Ray Train

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func(config):
    model = ...
    for epoch in range(config['epochs']):
        ...

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
    train_loop_config={'lr': 1e-3, 'epochs': 10},
)
result = trainer.fit()

4 GPU 分布式 train,PyTorch DDP 自动配置。

ML:Ray Tune (hyperparameter)

from ray import tune

def trainable(config):
    score = train_model(config['lr'], config['dropout'])
    tune.report(score=score)

tuner = tune.Tuner(
    trainable,
    param_space={
        'lr': tune.loguniform(1e-5, 1e-2),
        'dropout': tune.uniform(0, 0.5),
    },
    tune_config=tune.TuneConfig(num_samples=100),
)
results = tuner.fit()

跑 100 个 trial 在集群 → 自动 schedule。
内置 HyperBand / Population-Based Training 等算法。

ML:Ray Serve (推理 server)

from ray import serve

@serve.deployment(num_replicas=4, ray_actor_options={'num_gpus': 1})
class Predictor:
    def __init__(self):
        self.model = load_model()
    async def __call__(self, request):
        data = await request.json()
        return self.model.predict(data)

serve.run(Predictor.bind())

4 replica 在 4 个 GPU,前面 router 自动 LB。
比手写 FastAPI + load model worker 简单。

与 spark 对比

Ray Spark
语言 Python first Scala / Java first
数据 size < 10 TB PB
ML 集成 强(Ray Train/Tune/Serve) 中(MLlib)
启动 < 5s 30s+
心智模型 Python actor / task RDD / dataframe
生态 ML / RL / serving 通用大数据
  • 数据 < 10 TB + Python 重 → Ray
  • 数据 > 10 TB + Java/Scala 团队 → Spark
  • ML + 分布式训练 → Ray
  • ETL 跑数仓 → Spark / dbt

与 dask 对比

dask 也是 Python 分布式。

Ray dask
API actor + task array / dataframe / bag
ML Ray Train/Tune/Serve dask-ml(弱)
资源 model 强(GPU / 自定义)
通用 compute

dask 更"分布式 pandas/numpy",Ray 更"分布式 Python compute + ML"。
ML 选 Ray;纯数据 ETL 选 dask 或 polars streaming。

一个 case:分布式批 inference

需求:1 亿张图过 vision model。

import ray
import torch

ray.init(address='auto')        # cluster 8 GPU node

@ray.remote(num_gpus=1)
class Inferencer:
    def __init__(self):
        self.model = torch.load('model.pt').cuda().eval()
    @torch.no_grad()
    def predict(self, batch):
        return self.model(batch.cuda()).cpu()

# 4 actor 4 GPU
predictors = [Inferencer.remote() for _ in range(4)]

# stream 数据
ds = ray.data.read_images('s3://bucket/images/').map_batches(preprocess, batch_size=64)

# 分布式调
results = ds.map_batches(
    lambda batch: ray.get(predictors[0].predict.remote(batch)),  # 简化
    compute=ray.data.ActorPoolStrategy(size=4),
)
results.write_parquet('s3://bucket/predictions/')

8 GPU 跑满 → 1 亿张图几小时跑完。

监控

ray dashboard 8265 端口默认开:

  • 集群资源使用
  • task / actor 状态
  • log 聚合
  • profiler

调试比 spark UI 友好。

部署

  • KubeRay:k8s operator 部署 Ray cluster
  • Anyscale:Ray 公司的托管服务
  • 手动:ssh 到每 node ray start

prod 用 KubeRay 多。

踩过的坑

  1. object size 大:ray 传 object 用 plasma store,几 GB
    object 序列化贵。共享大 dataset 用 ray.put(data) 一次 + 引用。

  2. import 依赖差:worker node 没装跟 head 一样的 Python package
    → import error。runtime_env 指定 deps,或者 docker image 统一。

  3. GPU 假资源num_gpus=1 只是 Ray scheduler 视角,task 内部
    仍要 cuda() 实际用。配错 → 多 task 抢同 GPU。

  4. schedule 不平衡:data skew → 某 worker 一直累。
    ray.data.repartition() 重新均衡。

  5. Ray 版本兼容:head + worker Ray 版本必须一致。docker image
    pin 版本。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。