起因
业务给了一个 50GB 用户行为日志 CSV,让我做个简单的"每个城市 PV /
UV / 转化率"统计。pandas 直接 read_csv 把机器爆了(机器只有 32GB
内存)。改成 chunksize=100000 一块块读、自己累加?写起来又难维护
(要管 partial state)。
polars 的 scan_csv + lazy + streaming 是这种"out-of-core"分析的
标准解法。
解决方案
装
uv add polars
一行做完聚合
import polars as pl
result = (
pl.scan_csv('events_50gb.csv') # 不读,只 plan
.filter(pl.col('event_type') == 'pageview')
.group_by(['city', 'date'])
.agg([
pl.col('user_id').n_unique().alias('uv'),
pl.len().alias('pv'),
])
.sort('pv', descending=True)
.collect(streaming=True) # 流式执行
)
print(result.head(20))
关键点:
scan_csv而不是read_csv:只构建 query plan,不读文件- 链式 lazy 操作:filter → group_by → agg → sort 都在 plan 里
collect(streaming=True):让 polars 流式跑(边读边算,
不把全部数据放内存)
50GB 文件在我 32GB 机器上跑了 8 分钟,内存峰值 ~4 GB。
看 query plan
q = (
pl.scan_csv('events.csv')
.filter(pl.col('event_type') == 'pageview')
.group_by('city')
.agg(pl.col('user_id').n_unique())
)
print(q.explain())
# AGGREGATE
# [col("user_id").n_unique()]
# BY [col("city")]
# FROM
# Csv SCAN events.csv
# PROJECT 3/15 COLUMNS
# SELECTION: [col("event_type") == "pageview"]
polars 自动做:
- column pruning:只读用到的 3 列(不读 15 列里的其它 12 列)
- predicate pushdown:filter 下推到 scan 层(不读不匹配的行)
- operator fusion:能合并的操作合并
这些都是数据库 query optimizer 的标准手法,polars 把它带到 DataFrame
里。
转 Parquet:以后查询快 10x
# 一次性把 CSV 转 Parquet(列存 + zstd 压缩)
pl.scan_csv('events_50gb.csv').sink_parquet('events.parquet',
compression='zstd')
# 50GB CSV → ~12GB Parquet
# 之后所有查询用 parquet
result = pl.scan_parquet('events.parquet').filter(...).group_by(...).agg(...).collect()
# 同样查询 8 分钟 → 30 秒
Parquet 列存让"只读需要的列"在 IO 层就生效,比 CSV 快得多。
按字段分区写入
# 数据按 date 分区写
pl.scan_csv('events.csv').sink_parquet(
'events/',
partition_by=['date'],
)
# 生成:events/date=2024-01-01/data_0.parquet ...
# 之后查询某一天的数据自动 prune 到该分区
pl.scan_parquet('events/').filter(pl.col('date') == '2024-01-01')
巨大效率提升 —— 比扫整个 50GB 高几个数量级。
join 大表
# 50GB 主表 join 100MB 维度表
result = (
pl.scan_parquet('events.parquet')
.join(pl.scan_csv('cities.csv'), on='city_id', how='left')
.group_by('city_name')
.agg(pl.len().alias('events'))
.collect(streaming=True)
)
polars 自动选 hash join(小表 build hash,大表 stream probe),
内存友好。
性能 vs pandas / dask
50GB CSV → count by city:
| 工具 | 时间 | 内存峰值 | 写代码量 |
|---|---|---|---|
| pandas chunked | 25 min | ~6 GB | 30 行(手动累加) |
| dask | 12 min | ~8 GB | 8 行 |
| polars lazy + streaming | 8 min | ~4 GB | 6 行 |
| polars on parquet | 30 s | ~2 GB | 6 行 |
转 Parquet 一次后所有后续查询都飞快。
效果
- 50GB CSV 单机能跑通(之前要上集群)
- 探索性分析迭代速度 25 倍快
- 代码 30 行 → 6 行,可维护性大幅提升
- 后续团队类似分析都按"CSV → Parquet → polars query"模板做
高级 tip
流式写入
巨大计算结果直接落盘,不全部放内存:
(pl.scan_csv('events.csv')
.filter(pl.col('amount') > 100)
.sink_parquet('high_value.parquet'))
sink_* 系列函数都是流式写。
profile 查询
import time
t0 = time.time()
result = q.collect(streaming=True)
print(f'took {time.time()-t0:.1f}s')
# 更细:每个 node 时间
result, profile = q.profile()
print(profile)
字符串列消耗内存大
df = df.with_columns(
pl.col('country').cast(pl.Categorical), # 像 pandas categorical
)
categorical 把字符串映射到 int,对低基数字段省 80% 内存。
踩过的坑
-
streaming=True不是所有操作都支持:极个别 window 操作目前
还回落到 in-memory。运行时如果没真的流式,看 explain 输出会有
"STREAMING" 字样标记。 -
多文件 scan 时 schema 不一致:
scan_csv('logs/*.csv')假设所有
文件 schema 一样。第 N 个文件多一列就出错。infer_schema_length=10000
或显式指定 schema。 -
group_by(['col1', 'col2'])在 streaming 模式不一定流式:
分组键基数极高(如 user_id)时内存爆。改成"先按 user_id hash 分桶
再聚合"的策略。 -
写 Parquet 时 dtype 不对:polars
null列写 Parquet 是 null type,
其它工具读不出来。cast到具体类型再写。 -
collect 后切回 eager 操作:
collect()返回DataFrame,
不再是 lazy。后续操作如果想再 lazy 就.lazy()。
登录后参与评论。