polars lazy 模式处理 50GB CSV:分块、流式、out-of-core

起因

业务给了一个 50GB 用户行为日志 CSV,让我做个简单的"每个城市 PV /
UV / 转化率"统计。pandas 直接 read_csv 把机器爆了(机器只有 32GB
内存)。改成 chunksize=100000 一块块读、自己累加?写起来又难维护
(要管 partial state)。

polars 的 scan_csv + lazy + streaming 是这种"out-of-core"分析的
标准解法。

解决方案

uv add polars

一行做完聚合

import polars as pl

result = (
    pl.scan_csv('events_50gb.csv')           # 不读,只 plan
      .filter(pl.col('event_type') == 'pageview')
      .group_by(['city', 'date'])
      .agg([
          pl.col('user_id').n_unique().alias('uv'),
          pl.len().alias('pv'),
      ])
      .sort('pv', descending=True)
      .collect(streaming=True)               # 流式执行
)

print(result.head(20))

关键点:

  1. scan_csv 而不是 read_csv:只构建 query plan,不读文件
  2. 链式 lazy 操作:filter → group_by → agg → sort 都在 plan 里
  3. collect(streaming=True):让 polars 流式跑(边读边算,
    不把全部数据放内存)

50GB 文件在我 32GB 机器上跑了 8 分钟,内存峰值 ~4 GB。

看 query plan

q = (
    pl.scan_csv('events.csv')
      .filter(pl.col('event_type') == 'pageview')
      .group_by('city')
      .agg(pl.col('user_id').n_unique())
)
print(q.explain())
# AGGREGATE
#   [col("user_id").n_unique()]
#   BY [col("city")]
#   FROM
#     Csv SCAN events.csv
#       PROJECT 3/15 COLUMNS
#       SELECTION: [col("event_type") == "pageview"]

polars 自动做:

  • column pruning:只读用到的 3 列(不读 15 列里的其它 12 列)
  • predicate pushdown:filter 下推到 scan 层(不读不匹配的行)
  • operator fusion:能合并的操作合并

这些都是数据库 query optimizer 的标准手法,polars 把它带到 DataFrame
里。

转 Parquet:以后查询快 10x

# 一次性把 CSV 转 Parquet(列存 + zstd 压缩)
pl.scan_csv('events_50gb.csv').sink_parquet('events.parquet',
                                              compression='zstd')
# 50GB CSV → ~12GB Parquet

# 之后所有查询用 parquet
result = pl.scan_parquet('events.parquet').filter(...).group_by(...).agg(...).collect()
# 同样查询 8 分钟 → 30 秒

Parquet 列存让"只读需要的列"在 IO 层就生效,比 CSV 快得多。

按字段分区写入

# 数据按 date 分区写
pl.scan_csv('events.csv').sink_parquet(
    'events/',
    partition_by=['date'],
)
# 生成:events/date=2024-01-01/data_0.parquet ...
# 之后查询某一天的数据自动 prune 到该分区
pl.scan_parquet('events/').filter(pl.col('date') == '2024-01-01')

巨大效率提升 —— 比扫整个 50GB 高几个数量级。

join 大表

# 50GB 主表 join 100MB 维度表
result = (
    pl.scan_parquet('events.parquet')
      .join(pl.scan_csv('cities.csv'), on='city_id', how='left')
      .group_by('city_name')
      .agg(pl.len().alias('events'))
      .collect(streaming=True)
)

polars 自动选 hash join(小表 build hash,大表 stream probe),
内存友好。

性能 vs pandas / dask

50GB CSV → count by city

工具 时间 内存峰值 写代码量
pandas chunked 25 min ~6 GB 30 行(手动累加)
dask 12 min ~8 GB 8 行
polars lazy + streaming 8 min ~4 GB 6 行
polars on parquet 30 s ~2 GB 6 行

转 Parquet 一次后所有后续查询都飞快。

效果

  • 50GB CSV 单机能跑通(之前要上集群)
  • 探索性分析迭代速度 25 倍快
  • 代码 30 行 → 6 行,可维护性大幅提升
  • 后续团队类似分析都按"CSV → Parquet → polars query"模板做

高级 tip

流式写入

巨大计算结果直接落盘,不全部放内存:

(pl.scan_csv('events.csv')
   .filter(pl.col('amount') > 100)
   .sink_parquet('high_value.parquet'))

sink_* 系列函数都是流式写。

profile 查询

import time
t0 = time.time()
result = q.collect(streaming=True)
print(f'took {time.time()-t0:.1f}s')

# 更细:每个 node 时间
result, profile = q.profile()
print(profile)

字符串列消耗内存大

df = df.with_columns(
    pl.col('country').cast(pl.Categorical),   # 像 pandas categorical
)

categorical 把字符串映射到 int,对低基数字段省 80% 内存。

踩过的坑

  1. streaming=True 不是所有操作都支持:极个别 window 操作目前
    还回落到 in-memory。运行时如果没真的流式,看 explain 输出会有
    "STREAMING" 字样标记。

  2. 多文件 scan 时 schema 不一致scan_csv('logs/*.csv') 假设所有
    文件 schema 一样。第 N 个文件多一列就出错。infer_schema_length=10000
    或显式指定 schema。

  3. group_by(['col1', 'col2']) 在 streaming 模式不一定流式
    分组键基数极高(如 user_id)时内存爆。改成"先按 user_id hash 分桶
    再聚合"的策略。

  4. 写 Parquet 时 dtype 不对:polars null 列写 Parquet 是 null type,
    其它工具读不出来。cast 到具体类型再写。

  5. collect 后切回 eager 操作collect() 返回 DataFrame
    不再是 lazy。后续操作如果想再 lazy 就 .lazy()

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。