We need Polars .over(...) window expressions (GroupedRollingWindow) supported for streaming execution with the rapidsmpf runtime. Today those expressions either fall back to single-partition execution (where guarded), are not supported by GPUEngine translation at all, or are treated as if they could run partition-locally without a proper group shuffle, which limits scale and correctness for multi-partition queries.
Scope
Implement .over() for rapidsmpf by ensuring rows are partitioned (and ordered within partition) according to Polars semantics, then evaluating windows locally on each stream partition—i.e. a shuffle / sort-by-keys subgraph followed by local GroupedRollingWindow evaluation. Wire this through Select lowering and expression decomposition so window subexpressions compile to the right collectives and streaming actors instead of assuming embarrassingly parallel evaluation across arbitrary input partitions.
Future Work
The proposed implementation will be insufficient for skewed data distributions with over-sized groups. We may need to perform a global sort in those cases (and will likely need halo exchange).
We need Polars
.over(...)window expressions (GroupedRollingWindow) supported for streaming execution with the rapidsmpf runtime. Today those expressions either fall back to single-partition execution (where guarded), are not supported byGPUEnginetranslation at all, or are treated as if they could run partition-locally without a proper group shuffle, which limits scale and correctness for multi-partition queries.Scope
Implement
.over()for rapidsmpf by ensuring rows are partitioned (and ordered within partition) according to Polars semantics, then evaluating windows locally on each stream partition—i.e. a shuffle / sort-by-keys subgraph followed by localGroupedRollingWindowevaluation. Wire this through Select lowering and expression decomposition so window subexpressions compile to the right collectives and streaming actors instead of assuming embarrassingly parallel evaluation across arbitrary input partitions.Future Work
The proposed implementation will be insufficient for skewed data distributions with over-sized groups. We may need to perform a global sort in those cases (and will likely need halo exchange).