知识广场
按学科筛选:计算机科学 / 分布式与云计算 / Python
«计算机科学 / 分布式与云计算 / Python» 分类下共 1 篇帖子
## 起因 需要分布式跑 Python: - ML 训练(多 GPU 多机) - 批 inference(百万张图过 model) - hyperparameter sweep(试 1000 个组合) - 数据预处理(pandas 不够大) 老办法: - spark:scala-friendly,PySpark 写得别扭,启动重 - multiprocessing:单机 - celery:任务队列,不为 compute 设计 `Ray`(UC Berkeley 出,2018+)让 Python 函数加 `@ray.remote` 即分布式。 轻量 + Python 原生。 ## 装 ```bash pip install ray ``` ## 本地用 ```python import ray @ray.remote def square(x): return x * x ray.init() futures = [square.remote(i) for i in range(100)] results = ray.get(futures) # 等结果 print(results) ``` `square.remote(i)` 异步丢 task,返回 future。 `ray.get()` 阻塞拿结果。 本地多核就用多核(自动)。 ## actor (持久 state) ```python @ray.remote class Counter: def __init__(self): self.count = 0 def inc(self): self.count += 1 return self.count c = Counter.remote() c.inc.remote() # 异步调 c.inc.remote() print(ray.get(c.inc.remote())) # 3 ``` actor 跑在某 worker 上,state 持久。多 caller 共享。 适合:load 一个大 model 在 actor 里,多个 request 调它(避免重复加载)。 ## 多机集群 启动 head node: ```bash ray start --head --port=6379 ``` worker node 加入: ```bash ray start --address='head_node:6379' ``` 代码不变: ```python ray.init(address='auto') # 连接 cluster ``` `square.remote` 自动 schedule 到任意 worker。 ## 资源 / GPU 请求 ```python @ray.remote(num_cpus=2, num_gpus=1) def train(data): # 跑 GPU 训练 ... ``` Ray scheduler 把 task 放到有 GPU + 2 CPU 的 worker 上。 不够资源 → 排队等。 ## 数据:Ray Data ```python import ray ds = ray.data.read_parquet('s3://bucket/data/*.parquet') # 分布式 map processed = ds.map(lambda r: {**r, 'doubled': r['x'] * 2}) # 分布式 group + agg result = processed.groupby('country').sum() result.write_parquet('s3://bucket/out/') ``` 类似 spark RDD / dataframe,但 Python 原生。 ## ML:Ray Train ```python from ray.train.torch import TorchTrainer from ray.train import ScalingConfig def train_func(config): model = ... for epoch in range(config['epochs']): ... trainer = TorchTrainer( train_func, scaling_config=ScalingConfig(num_workers=4, use_gpu=True), train_loop_config={'lr': 1e-3, 'epochs': 10}, ) result = trainer.fit() ``` 4 GPU 分布式 train,PyTorch DDP 自动配置。 ## ML:Ray Tune (hyperparameter) ```python from ray import tune def trainable(config): score = train_model(config['lr'], config['dropout']) tune.report(score=score) tuner = tune.Tuner( trainable, param_space={ 'lr': tune.loguniform(1e-5, 1e-2), 'dropout': tune.uniform(0, 0.5), }, tune_config=tune.TuneConfig(num_samples=100), ) results = tuner.fit() ``` 跑 100 个 trial 在集群 → 自动 schedule。 内置 HyperBand / Population-Based Training 等算法。 ## ML:Ray Serve (推理 server) ```python from ray import serve @serve.deployment(num_replicas=4, ray_actor_options={'num_gpus': 1}) class Predictor: def __init__(self): self.model = load_model() async def __call__(self, request): data = await request.json() return self.model.predict(data) serve.run(Predictor.bind()) ``` 4 replica 在 4 个 GPU,前面 router 自动 LB。 比手写 FastAPI + load model worker 简单。 ## 与 spark 对比 | | Ray | Spark | |---|---|---| | 语言 | Python first | Scala / Java first | | 数据 size | < 10 TB | PB | | ML 集成 | 强(Ray Train/Tune/Serve) | 中(MLlib) | | 启动 | < 5s | 30s+ | | 心智模型 | Python actor / task | RDD / dataframe | | 生态 | ML / RL / serving | 通用大数据 | - 数据 < 10 TB + Python 重 → Ray - 数据 > 10 TB + Java/Scala 团队 → Spark - ML + 分布式训练 → Ray - ETL 跑数仓 → Spark / dbt ## 与 dask 对比 dask 也是 Python 分布式。 | | Ray | dask | |---|---|---| | API | actor + task | array / dataframe / bag | | ML | Ray Train/Tune/Serve | dask-ml(弱) | | 资源 model | 强(GPU / 自定义) | 中 | | 通用 compute | 是 | 是 | dask 更"分布式 pandas/numpy",Ray 更"分布式 Python compute + ML"。 ML 选 Ray;纯数据 ETL 选 dask 或 polars streaming。 ## 一个 case:分布式批 inference 需求:1 亿张图过 vision model。 ```python import ray import torch ray.init(address='auto') # cluster 8 GPU node @ray.remote(num_gpus=1) class Inferencer: def __init__(self): self.model = torch.load('model.pt').cuda().eval() @torch.no_grad() def predict(self, batch): return self.model(batch.cuda()).cpu() # 4 actor 4 GPU predictors = [Inferencer.remote() for _ in range(4)] # stream 数据 ds = ray.data.read_images('s3://bucket/images/').map_batches(preprocess, batch_size=64) # 分布式调 results = ds.map_batches( lambda batch: ray.get(predictors[0].predict.remote(batch)), # 简化 compute=ray.data.ActorPoolStrategy(size=4), ) results.write_parquet('s3://bucket/predictions/') ``` 8 GPU 跑满 → 1 亿张图几小时跑完。 ## 监控 ray dashboard 8265 端口默认开: - 集群资源使用 - task / actor 状态 - log 聚合 - profiler 调试比 spark UI 友好。 ## 部署 - **KubeRay**:k8s operator 部署 Ray cluster - **Anyscale**:Ray 公司的托管服务 - **手动**:ssh 到每 node `ray start` prod 用 KubeRay 多。 ## 踩过的坑 1. **object size 大**:ray 传 object 用 plasma store,几 GB object 序列化贵。共享大 dataset 用 `ray.put(data)` 一次 + 引用。 2. **import 依赖差**:worker node 没装跟 head 一样的 Python package → import error。runtime_env 指定 deps,或者 docker image 统一。 3. **GPU 假资源**:`num_gpus=1` 只是 Ray scheduler 视角,task 内部 仍要 `cuda()` 实际用。配错 → 多 task 抢同 GPU。 4. **schedule 不平衡**:data skew → 某 worker 一直累。 `ray.data.repartition()` 重新均衡。 5. **Ray 版本兼容**:head + worker Ray 版本必须一致。docker image pin 版本。