What happens?
When two db.sql(query).pl(lazy=True) LazyFrames are joined together, the joined LazyFrame is referenced twice in the same plan (a self-rejoin via a derived sub-LazyFrame), a window expression (e.g. rolling_sum, rolling_std, cum_prod with .over("g")) sits downstream, and the plan is collected via collect(engine="streaming"), the output silently drops ~80% of rows. An explicit .sort([partition_key, within_partition_key]) before the .over() — required for any correct window result anyway — makes the row count deterministic across runs, but it remains ~80% short of the reference. The same plan against pl.scan_parquet (no duckdb bridge) returns the full row count.
The minimal repro below produces:
A) pl.scan_parquet, in-memory engine
shape=(50597009, 3) y non-null=45,647,009
B) db.sql(...).pl(lazy=True), streaming engine, sorted plan
shape=(10301109, 3) y non-null=9,287,943 shape_match=False
Environment
- polars 1.40.1
- duckdb 1.5.1 / 1.5.2 / 1.6.0.dev12 (all reproduce, with identical behaviour)
- pyarrow latest
- macOS arm64 (Darwin 25.3.0), Python 3.11
Notes
The bug also manifests in our real ETL pipeline at much larger scale (47k groups, ~313M source rows, 5 source joins), where the symptom was a fraction of expected rows in window columns and an outright zero-row cum_prod column. A duckdb→parquet→pl.scan_parquet boundary at the data-source edge fixes it deterministically.
We tried a workaround that opens a fresh duckdb connection per register_io_source generator invocation (so no relation object is shared across polars's repeated calls into the source). That fixes the minimal repro but not the larger production case, suggesting the shared-relation pattern is only one of multiple failure modes in this bridge under the streaming engine.
To Reproduce
Self-contained script below. Variable-length groups are required to trigger the bug at this scale — with uniform-length groups the same plan returns the correct row count.
uv run --with 'polars==1.40.1' --with numpy --with pyarrow --with duckdb \
python duckdb_polars_streaming_repro.py
"""Minimal reproducer for a silent ~80% row drop in duckdb-python's
polars bridge under collect(engine="streaming").
When two ``db.sql(query).pl(lazy=True)`` LazyFrames are joined, the
joined LazyFrame is referenced twice in the plan (a self-rejoin), a
window expression with ``.over("g")`` sits downstream of an explicit
``.sort(["g","t"])``, and the plan is collected via
``collect(engine="streaming")``, the output drops ~80% of rows compared
to the pure ``pl.scan_parquet`` reference. The sort guarantees a stable
within-group order so the ``rolling_sum`` is deterministic — but the
row count is still ~80% short of the reference.
Example output on the repro below (50,597,009 rows total in reference):
A) pl.scan_parquet, in-memory engine
shape=(50597009, 3) y non-null=45,647,009
B) db.sql(...).pl(lazy=True), streaming engine, .sort(['g','t']) added
shape=(10400465, 3) y non-null=9,377,597 shape_match=False
The same plan against ``pl.scan_parquet`` LazyFrames produces the full
50.6M rows. The duckdb bridge is dropping rows during streaming.
Tested with:
polars 1.40.1
duckdb 1.5.1 / 1.5.2 / 1.6.0.dev12 (all reproduce)
pyarrow latest
Run:
uv run --with 'polars==1.40.1' --with numpy --with pyarrow --with duckdb \\
python duckdb_polars_streaming_repro.py
"""
from __future__ import annotations
import shutil
import tempfile
from pathlib import Path
import duckdb
import numpy as np
import polars as pl
print(f"polars {pl.__version__} duckdb {duckdb.__version__}")
# ---------------------------------------------------------------------------
# 1) Two parquet files with variable-length groups.
# `left`: (g, t, x) — group id, index, payload
# `right`: (g, t) — same key columns, no payload
# Variable per-group length matters: uniform-length groups don't reproduce
# at this scale.
# ---------------------------------------------------------------------------
N_GROUPS = 50_000
rng = np.random.default_rng(42)
group_lens = np.clip(
rng.lognormal(mean=6.8, sigma=0.5, size=N_GROUPS).astype(int), 30, 2900
)
N = int(group_lens.sum())
print(f"rows: {N:,} groups: {N_GROUPS:,}")
g = np.repeat(np.arange(N_GROUPS, dtype=np.int32), group_lens)
t = np.concatenate([np.arange(n, dtype=np.int32) for n in group_lens])
x = rng.uniform(-1.0, 1.0, N).astype(np.float32)
tmpdir = Path(tempfile.mkdtemp(prefix="repro_"))
left_path = tmpdir / "left.parquet"
right_path = tmpdir / "right.parquet"
pl.DataFrame({"g": g, "t": t, "x": x}).write_parquet(
left_path, row_group_size=200_000
)
pl.DataFrame({"g": g, "t": t}).write_parquet(
right_path, row_group_size=200_000
)
del g, t, x
# ---------------------------------------------------------------------------
# 2) The plan:
# - `left` and `right` are joined into `joined`.
# - `joined` is referenced TWICE: once directly, once via a derived
# LazyFrame that selects the group keys uniquely.
# - A rolling-sum window over `g` is applied at the end.
# ---------------------------------------------------------------------------
def build(left_lf: pl.LazyFrame, right_lf: pl.LazyFrame) -> pl.LazyFrame:
joined = left_lf.join(right_lf, on=["g", "t"], how="inner")
keys = joined.select("g").unique() # ← reference #1
plan = joined.join(keys, on="g") # ← reference #2
# Explicit sort by (partition key, within-partition key) so the
# downstream rolling_sum().over("g") sees within-g rows in t-order.
# Inner joins do NOT preserve sort order, so this sort is required
# for the .over result to be deterministic at all.
return plan.sort(["g", "t"]).select(
"g",
"t",
pl.col("x").rolling_sum(window_size=100).over("g").alias("y"),
)
# ---------------------------------------------------------------------------
# 3) Reference: pure pl.scan_parquet, in-memory engine.
# ---------------------------------------------------------------------------
print("\nA) pl.scan_parquet, in-memory engine")
ref = build(pl.scan_parquet(left_path), pl.scan_parquet(right_path)).collect()
ref_shape = ref.shape
ref_nn = ref["y"].drop_nulls().drop_nans().len()
print(f" shape={ref_shape} y non-null={ref_nn:,}")
# ---------------------------------------------------------------------------
# 4) Bug path: db.sql(...).pl(lazy=True) + collect(engine="streaming").
# Single run — the sort makes the output deterministic across runs.
# ---------------------------------------------------------------------------
print("\nB) db.sql(...).pl(lazy=True), streaming engine, sorted plan")
db_l = duckdb.connect(":memory:")
db_r = duckdb.connect(":memory:")
try:
left_lf = db_l.sql(
f"select * from read_parquet('{left_path}')"
).pl(lazy=True)
right_lf = db_r.sql(
f"select * from read_parquet('{right_path}')"
).pl(lazy=True)
out = build(left_lf, right_lf).collect(engine="streaming")
nn = out["y"].drop_nulls().drop_nans().len()
print(
f" shape={out.shape} y non-null={nn:,} "
f"shape_match={out.shape == ref_shape} nn_match={nn == ref_nn}"
)
finally:
db_l.close()
db_r.close()
shutil.rmtree(tmpdir)
OS:
macOS arm64 (Darwin 25.3.0)
DuckDB Package Version:
1.5.1 / 1.5.2 / 1.6.0.dev12
Python Version:
3.11
Full Name:
Misha van Beek
Affiliation:
Bayesline
What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.
I have tested with a stable release
Did you include all relevant data sets for reproducing the issue?
Yes
Did you include all code required to reproduce the issue?
Did you include all relevant configuration to reproduce the issue?
What happens?
When two
db.sql(query).pl(lazy=True)LazyFrames are joined together, the joined LazyFrame is referenced twice in the same plan (a self-rejoin via a derived sub-LazyFrame), a window expression (e.g.rolling_sum,rolling_std,cum_prodwith.over("g")) sits downstream, and the plan is collected viacollect(engine="streaming"), the output silently drops ~80% of rows. An explicit .sort([partition_key, within_partition_key]) before the .over() — required for any correct window result anyway — makes the row count deterministic across runs, but it remains ~80% short of the reference. The same plan against pl.scan_parquet (no duckdb bridge) returns the full row count.The minimal repro below produces:
Environment
Notes
The bug also manifests in our real ETL pipeline at much larger scale (47k groups, ~313M source rows, 5 source joins), where the symptom was a fraction of expected rows in window columns and an outright zero-row cum_prod column. A duckdb→parquet→pl.scan_parquet boundary at the data-source edge fixes it deterministically.
We tried a workaround that opens a fresh duckdb connection per
register_io_sourcegenerator invocation (so norelationobject is shared across polars's repeated calls into the source). That fixes the minimal repro but not the larger production case, suggesting the shared-relation pattern is only one of multiple failure modes in this bridge under the streaming engine.To Reproduce
Self-contained script below. Variable-length groups are required to trigger the bug at this scale — with uniform-length groups the same plan returns the correct row count.
OS:
macOS arm64 (Darwin 25.3.0)
DuckDB Package Version:
1.5.1 / 1.5.2 / 1.6.0.dev12
Python Version:
3.11
Full Name:
Misha van Beek
Affiliation:
Bayesline
What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.
I have tested with a stable release
Did you include all relevant data sets for reproducing the issue?
Yes
Did you include all code required to reproduce the issue?
Did you include all relevant configuration to reproduce the issue?