Skip to content

Silent row drop in db.sql(query).pl(lazy=True) when the LazyFrame is self-rejoined and collected via engine="streaming" with a window expression #452

@mishavanbeek

Description

@mishavanbeek

What happens?

When two db.sql(query).pl(lazy=True) LazyFrames are joined together, the joined LazyFrame is referenced twice in the same plan (a self-rejoin via a derived sub-LazyFrame), a window expression (e.g. rolling_sum, rolling_std, cum_prod with .over("g")) sits downstream, and the plan is collected via collect(engine="streaming"), the output silently drops ~80% of rows. An explicit .sort([partition_key, within_partition_key]) before the .over() — required for any correct window result anyway — makes the row count deterministic across runs, but it remains ~80% short of the reference. The same plan against pl.scan_parquet (no duckdb bridge) returns the full row count.

The minimal repro below produces:

A) pl.scan_parquet, in-memory engine
   shape=(50597009, 3)  y non-null=45,647,009

B) db.sql(...).pl(lazy=True), streaming engine, sorted plan                                         
   shape=(10301109, 3)  y non-null=9,287,943   shape_match=False 

Environment

  • polars 1.40.1
  • duckdb 1.5.1 / 1.5.2 / 1.6.0.dev12 (all reproduce, with identical behaviour)
  • pyarrow latest
  • macOS arm64 (Darwin 25.3.0), Python 3.11

Notes

The bug also manifests in our real ETL pipeline at much larger scale (47k groups, ~313M source rows, 5 source joins), where the symptom was a fraction of expected rows in window columns and an outright zero-row cum_prod column. A duckdb→parquet→pl.scan_parquet boundary at the data-source edge fixes it deterministically.

We tried a workaround that opens a fresh duckdb connection per register_io_source generator invocation (so no relation object is shared across polars's repeated calls into the source). That fixes the minimal repro but not the larger production case, suggesting the shared-relation pattern is only one of multiple failure modes in this bridge under the streaming engine.

To Reproduce

Self-contained script below. Variable-length groups are required to trigger the bug at this scale — with uniform-length groups the same plan returns the correct row count.

uv run --with 'polars==1.40.1' --with numpy --with pyarrow --with duckdb \
    python duckdb_polars_streaming_repro.py
"""Minimal reproducer for a silent ~80% row drop in duckdb-python's
polars bridge under collect(engine="streaming").

When two ``db.sql(query).pl(lazy=True)`` LazyFrames are joined, the
joined LazyFrame is referenced twice in the plan (a self-rejoin), a
window expression with ``.over("g")`` sits downstream of an explicit
``.sort(["g","t"])``, and the plan is collected via
``collect(engine="streaming")``, the output drops ~80% of rows compared
to the pure ``pl.scan_parquet`` reference. The sort guarantees a stable
within-group order so the ``rolling_sum`` is deterministic — but the
row count is still ~80% short of the reference.

Example output on the repro below (50,597,009 rows total in reference):

    A) pl.scan_parquet, in-memory engine
       shape=(50597009, 3)  y non-null=45,647,009

    B) db.sql(...).pl(lazy=True), streaming engine, .sort(['g','t']) added
       shape=(10400465, 3)  y non-null=9,377,597   shape_match=False

The same plan against ``pl.scan_parquet`` LazyFrames produces the full
50.6M rows. The duckdb bridge is dropping rows during streaming.

Tested with:
    polars  1.40.1
    duckdb  1.5.1 / 1.5.2 / 1.6.0.dev12 (all reproduce)
    pyarrow latest

Run:
    uv run --with 'polars==1.40.1' --with numpy --with pyarrow --with duckdb \\
        python duckdb_polars_streaming_repro.py
"""
from __future__ import annotations

import shutil
import tempfile
from pathlib import Path

import duckdb
import numpy as np
import polars as pl

print(f"polars {pl.__version__}  duckdb {duckdb.__version__}")

# ---------------------------------------------------------------------------
# 1) Two parquet files with variable-length groups.
#    `left`:  (g, t, x)  — group id, index, payload
#    `right`: (g, t)     — same key columns, no payload
# Variable per-group length matters: uniform-length groups don't reproduce
# at this scale.
# ---------------------------------------------------------------------------
N_GROUPS = 50_000
rng = np.random.default_rng(42)
group_lens = np.clip(
    rng.lognormal(mean=6.8, sigma=0.5, size=N_GROUPS).astype(int), 30, 2900
)
N = int(group_lens.sum())
print(f"rows: {N:,}  groups: {N_GROUPS:,}")

g = np.repeat(np.arange(N_GROUPS, dtype=np.int32), group_lens)
t = np.concatenate([np.arange(n, dtype=np.int32) for n in group_lens])
x = rng.uniform(-1.0, 1.0, N).astype(np.float32)

tmpdir = Path(tempfile.mkdtemp(prefix="repro_"))
left_path = tmpdir / "left.parquet"
right_path = tmpdir / "right.parquet"
pl.DataFrame({"g": g, "t": t, "x": x}).write_parquet(
    left_path, row_group_size=200_000
)
pl.DataFrame({"g": g, "t": t}).write_parquet(
    right_path, row_group_size=200_000
)
del g, t, x


# ---------------------------------------------------------------------------
# 2) The plan:
#    - `left` and `right` are joined into `joined`.
#    - `joined` is referenced TWICE: once directly, once via a derived
#      LazyFrame that selects the group keys uniquely.
#    - A rolling-sum window over `g` is applied at the end.
# ---------------------------------------------------------------------------
def build(left_lf: pl.LazyFrame, right_lf: pl.LazyFrame) -> pl.LazyFrame:
    joined = left_lf.join(right_lf, on=["g", "t"], how="inner")
    keys = joined.select("g").unique()                # ← reference #1
    plan = joined.join(keys, on="g")                  # ← reference #2
    # Explicit sort by (partition key, within-partition key) so the
    # downstream rolling_sum().over("g") sees within-g rows in t-order.
    # Inner joins do NOT preserve sort order, so this sort is required
    # for the .over result to be deterministic at all.
    return plan.sort(["g", "t"]).select(
        "g",
        "t",
        pl.col("x").rolling_sum(window_size=100).over("g").alias("y"),
    )


# ---------------------------------------------------------------------------
# 3) Reference: pure pl.scan_parquet, in-memory engine.
# ---------------------------------------------------------------------------
print("\nA) pl.scan_parquet, in-memory engine")
ref = build(pl.scan_parquet(left_path), pl.scan_parquet(right_path)).collect()
ref_shape = ref.shape
ref_nn = ref["y"].drop_nulls().drop_nans().len()
print(f"   shape={ref_shape}  y non-null={ref_nn:,}")

# ---------------------------------------------------------------------------
# 4) Bug path: db.sql(...).pl(lazy=True) + collect(engine="streaming").
# Single run — the sort makes the output deterministic across runs.
# ---------------------------------------------------------------------------
print("\nB) db.sql(...).pl(lazy=True), streaming engine, sorted plan")
db_l = duckdb.connect(":memory:")
db_r = duckdb.connect(":memory:")
try:
    left_lf = db_l.sql(
        f"select * from read_parquet('{left_path}')"
    ).pl(lazy=True)
    right_lf = db_r.sql(
        f"select * from read_parquet('{right_path}')"
    ).pl(lazy=True)
    out = build(left_lf, right_lf).collect(engine="streaming")
    nn = out["y"].drop_nulls().drop_nans().len()
    print(
        f"   shape={out.shape}  y non-null={nn:,}  "
        f"shape_match={out.shape == ref_shape}  nn_match={nn == ref_nn}"
    )
finally:
    db_l.close()
    db_r.close()

shutil.rmtree(tmpdir)

OS:

macOS arm64 (Darwin 25.3.0)

DuckDB Package Version:

1.5.1 / 1.5.2 / 1.6.0.dev12

Python Version:

3.11

Full Name:

Misha van Beek

Affiliation:

Bayesline

What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.

I have tested with a stable release

Did you include all relevant data sets for reproducing the issue?

Yes

Did you include all code required to reproduce the issue?

  • Yes, I have

Did you include all relevant configuration to reproduce the issue?

  • Yes, I have

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions