feat(streaming-merge): per-region engine + multi-output sorted_series splitting#6424
Conversation
61163cd to
eac4f9c
Compare
0c3ae7c to
bc10992
Compare
…ocstrings
Addresses adamtobey's review on PR-6409.
- Rename `advance_decoder_to_row` → `fill_page_cache_to_row`. The
function's effect on the world is "add pages to the per-input cache"
— it never advances a cursor or skips data. The old name primed
reviewers to ask "are we skipping rows?" (which is exactly what
Adam asked).
- Use a `rows_for_current_output` register inside
`compute_input_row_destinations` and write to
`rows_per_output[out_idx]` once after the inner loop; saves the
per-row indexed store.
- Expand `body_col_page_cache` docstring with the horizontal-vs-vertical
memory bound argument and a pointer to the MS-7 invariant test
(`test_ms7_body_col_page_cache_bounded_regardless_of_input_size`).
- Add context comments at the cross-file invariant sites Adam flagged:
- Sort-cols-first storage-ordering contract on the sort-col drain.
- Single-RG-input restriction with forward pointer to PR-6c.2
(#6424) which relaxes it.
- `rg_partition_prefix_len` defaulting to 0 (with reference to the
legacy-promotion `mixed_prefix_ok` escape in PR-6423).
No behaviour change. 461 lib tests pass; workspace clippy + nightly
fmt + rustdoc clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2085c8d to
ceed395
Compare
4d246e9 to
d5118cf
Compare
…ocstrings
Addresses adamtobey's review on PR-6409.
- Rename `advance_decoder_to_row` → `fill_page_cache_to_row`. The
function's effect on the world is "add pages to the per-input cache"
— it never advances a cursor or skips data. The old name primed
reviewers to ask "are we skipping rows?" (which is exactly what
Adam asked).
- Use a `rows_for_current_output` register inside
`compute_input_row_destinations` and write to
`rows_per_output[out_idx]` once after the inner loop; saves the
per-row indexed store.
- Expand `body_col_page_cache` docstring with the horizontal-vs-vertical
memory bound argument and a pointer to the MS-7 invariant test
(`test_ms7_body_col_page_cache_bounded_regardless_of_input_size`).
- Add context comments at the cross-file invariant sites Adam flagged:
- Sort-cols-first storage-ordering contract on the sort-col drain.
- Single-RG-input restriction with forward pointer to PR-6c.2
(#6424) which relaxes it.
- `rg_partition_prefix_len` defaulting to 0 (with reference to the
legacy-promotion `mixed_prefix_ok` escape in PR-6423).
No behaviour change. 461 lib tests pass; workspace clippy + nightly
fmt + rustdoc clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ceed395 to
7b8317b
Compare
… (PR-6b.2) (#6409) * feat: streaming column-major merge engine with page-bounded body cols (PR-6b.2) Rebuilds PR-6b on top of PR-6a.2's per-page Arrow decoder. The streaming merge engine now keeps body-col memory bounded by output page size (not column-chunk size) while preserving caller-specified M:N output splitting at sorted_series boundaries. Architecture (Husky multi-input → multi-output sorted merge): Phase 0 (async) — drain sort cols from each input. With Husky column ordering, sort cols + sorted_series are the prefix of each row group's body bytes, so the decoder can stop after they are fully decoded; the remaining body col pages stay un-read in the input stream, ready for phase 3. Phase 1 — compute_merge_order over the per-input sort-col RecordBatches using the existing k-way (sorted_series, timestamp_secs) heap. Phase 2 — compute_output_boundaries with the caller's num_outputs, splitting at sorted_series transitions. Phase 3 (blocking + block_on bridges) — streaming write. All M output writers are alive for the duration. For each column in Husky order, every output's col K is written in turn: - Sort col / sorted_series: applied via arrow::interleave from the already-buffered phase-0 data. - Body col: each output page is assembled via arrow::interleave from input page slices, with decoders advanced page-by-page via handle.block_on from inside the sync iterator passed to write_next_column_arrays. Pages flush to the writer's sink as SerializedColumnWriter's page-size threshold trips — memory stays bounded by the in-flight output page plus a small number of in-flight input pages. After all M outputs' col K is done, every input decoder is at the start of col K+1 in its single row group. Move to col K+1. PR-6b.2 only handles single-row-group inputs (real or PR-5- adapter-presented). Multi-RG metric-aligned inputs are rejected with a clear error message; supporting them requires either consuming + discarding body cols of RG[i-1] from the stream to reach RG[i]'s sort cols, or a second body GET — both are larger scope changes that land in a follow-up. Page-bounded contract verified by test_body_col_streams_many_pages_per_column_chunk: with data_page_row_count_limit=1000 on an 8000-row merge, the output value column spans ≥ 2 pages, demonstrating that body col writes respect data_page_size and do not materialise whole column chunks. Tests (9, all passing): two-input merge, single-RG output for single-metric_name input, total-rows-preserved across M:N, sort-schema mismatch rejection, KV metadata propagation, all-empty-inputs no-output, output drainable by StreamDecoder, multi-RG input rejection, page-bounded body col streaming. Also exposes existing helpers in merge/writer.rs as pub(super) (apply_merge_permutation, build_merge_kv_metadata, build_sorting_columns, resolve_sort_field_names, verify_sort_order) so streaming.rs can reuse the same MC-3 / KV / sorting-columns construction the non-streaming engine uses. PR-7 will fold the non-streaming engine away. PR-6c.2 will add file-size monitoring on top: close the current output at the next sorted_series transition when an in-progress file approaches the size cap, producing additional splits beyond the caller's N. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: persist decoder + page cache across body-col passes Address two Codex review findings on PR-6b.2 (#6409): * P1 — Preserve decoder/page cache across output chunks. The merge engine was constructing a fresh `StreamDecoder` for every `advance_decoder_to_row` call, which reset the per-column `rows_decoded` counter so the second decoded page reported `row_start = 0` after the stream had already advanced. The page cache also lived on the per-output assembler, so pages whose row range straddled two outputs were dropped when the first output finished even though the stream couldn't be rewound. Both scenarios produced silently wrong rows or out-of-bounds panics on any input large enough to require multi-page advances per output or multi-output coverage of a single page. The decoder now lives on `InputDecoderState` (owned via the new `StreamDecoder::from_owned` constructor), and the per-input body- col page cache + cursor are reset only at the start of each body column. * P2 — Stream body pages instead of collecting `Vec<ArrayRef>`. The per-output body-col write now feeds `write_next_column_arrays` one page at a time via `StreamingBodyColIter`, which captures assembly errors in a side cell so memory stays bounded by output- page size rather than column-chunk size. Two regression tests cover the bug shapes — multi-page body col within one output (2500 rows × 50-row pages) and multi-output input where pages span output boundaries. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: guard body-col path against zero-row-group inputs Address Codex P1 (third comment) on PR-6b.2 (#6409): phase 0 explicitly accepts inputs with `num_row_groups() == 0` (returning a zero-row sort batch), but `write_body_col_for_all_outputs` unconditionally called `state.metadata.row_group(0)` for every input, panicking with "index out of bounds" before the first body column was written. Treat zero-RG inputs the same as inputs whose schema lacks the current column: push `None` into `input_col_indices` and skip them for this body col. Also drop the unused `input_target_rows` vec that was being built only for its row-group lookup side effect. Regression test `test_zero_row_input_mixed_with_non_empty` builds a 0-row + 50-row pair and merges them through the streaming engine; without this fix the merge blocking task panics inside parquet-rs's `row_group()` indexing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: drop all-null sort fields from per-output streaming schema Address Codex P2 (fourth comment) on PR-6b.2 (#6409): the schema derivation condition `sort_optimised.has(name) || full_union.has(name)` was tautologically true for every iterated field — every `field` came from `full_union_schema`, so the second disjunct was always satisfied and the intended "drop all-null sort fields" branch never fired. Pass the sort union schema in explicitly so we can tell sort fields apart from body fields. Sort field present in `sort_union_schema` → keep only if `optimize_output_batch` kept it (not all-null for this output's rows). Body field → keep unconditionally; tracking per-output body-col presence would require pre-reading every body column for every output, which is the column-chunk-bounded buffering the streaming path exists to avoid. Regression test `test_derive_output_schema_drops_all_null_sort_field` calls the helper directly with a synthetic union + sort-optimised pair and asserts an all-null sort field is dropped while a body field with the same union-schema position is preserved. Verified the test fails against the pre-fix logic with the expected `['metric_name', 'env', 'timestamp_secs', 'value']` vs `['metric_name', 'timestamp_secs', 'value']` mismatch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore: code-quality fixes + MC-2 type round-trip test on streaming merge Bundle three pieces: - **Husky → neutral phrasing.** Replaced the seven "Husky" mentions in the streaming engine's doc-comments and error messages with neutral "sort-cols-first storage ordering" / "column ordering" phrasing. Project is Quickwit; the internal column-ordering scheme didn't need a separate brand in user-visible error strings. - **One `.unwrap()` → `.expect()` in lib code.** The hashmap lookup in `drain_sort_cols_one_input` is guarded by a `contains_key` check; promote the implicit invariant to a documented panic message per CODE_STYLE.md. - **`align_inputs_to_union_schema` nullability fix.** The first-sight branch unconditionally marked new fields nullable; the existing comment claims "columns that don't appear in every input must be nullable" but the code applied that rule to every field. Replaced with a two-pass scheme: track `any_nullable` and `appears_in` per field across all inputs, then mark nullable iff some input had it nullable OR the field is missing from some input. This unblocks `List<Float64>` columns end-to-end (the writer rejects nullable List; the previous behaviour forced every list column nullable on first sight even when every input declared it non-null). - **MC-2 round-trip integration test.** New `test_mc2_all_types_round_trip_through_streaming_merge` builds two inputs covering every parquet physical type the decoder accepts — Int8/16/32/64, UInt8/16/32/64, Float32/64, Bool, Utf8, Dictionary<Int32, Utf8>, LargeBinary, and non-nullable `List<Float64>` — merges them through the streaming engine, and asserts every `(sorted_series_key → body-col tuple)` pair survives byte-equal. Storage-encoding transitions (Dict→Utf8, LargeBinary→ Binary) are normalised in the render helper because MC-2 promises value preservation, not internal representation preservation. This test caught two real bugs while being written: 1. Body cols must be declared in lexicographic order — the streaming engine assumes the storage convention and crashes mid-merge if they aren't. Fixture re-ordered accordingly. (Worth adding upfront validation in a follow-up; not in scope here.) 2. The schema-union nullability bug above. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(MS-7): page-cache bounded-memory contract is observable + asserted Add a `#[cfg(test)] static AtomicUsize` PEAK_BODY_COL_PAGE_CACHE_LEN that records the maximum length any input's `body_col_page_cache` ever reached during the current merge, bumped on every page push in `advance_decoder_to_row`. Zero production overhead — the `record_*` helper compiles to a no-op outside test builds. New test `test_ms7_body_col_page_cache_bounded_regardless_of_input_size` runs the streaming merge over three input sizes (300 / 3 000 / 30 000 rows at 50-row pages) and asserts: 1. Peak resident pages stays below a fixed ceiling (24, for the ratio of OUTPUT_PAGE_ROWS=1024 to input page_rows=50, plus a few-page slack for decoder lookahead + transients). 2. Growth from 3 000 to 30 000 rows (10× more input pages) yields at most a 2-page increase in peak. The whole MS-7 claim is that peak does not scale with input size. Verified the test catches a deliberate regression: removing the per- output-page eviction loop in `assemble_one_output_page` pushed the 3 000-row peak to 60 (60 > 24) and the test failed with the expected message. Fixture support: `write_input_parquet_with_small_pages` now also sets `write_batch_size` and `data_page_size` proportional to the requested page row count. Without those, the arrow writer's defaults (64 KiB / 1 MiB) caused `data_page_row_count_limit` to be silently ignored and produced one giant page per column. Probed the output via `get_column_page_reader` — 30 000 rows now produces 600 pages per col as expected. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: drive col loop from full union schema + collect service tags from sort col Address two new Codex P2 findings on PR-6b.2 (#6409): - **Use the full union schema when driving column writes.** The old `build_parent_union_schema` picked one per-output schema by field count and used it as the column-iteration driver. If two outputs drop *different* all-null sort fields and end up with the same field count, the first wins — and any column it dropped is never iterated, leaving the other output's writer missing a column or writing subsequent columns into the wrong slot. The doc comment already said "process the FULL union schema's cols in order"; the implementation diverged. Drive `write_all_columns` from `full_union_schema` directly and delete the broken heuristic. - **Collect service names from the sort-col path too.** If the sort schema places `service` in the sort key (`metric_name|service|...`), the streaming engine writes it via the sort-col path and the body-col `track_service` branch never runs. `MergeOutputFile.low_cardinality_tags[TAG_SERVICE]` came back empty even though every row had a service value. Extract service names from `static_meta.sort_optimised` at `finalize_output_writer` time so the TAG_SERVICE metadata is accurate regardless of which write path the column took. Two regression tests: - `test_heterogeneous_dropped_fields_drive_from_full_union_schema` builds two inputs whose per-output schemas drop different all-null sort fields with the same field count. Each kept tag must survive to its output. Verified the test fails (panic on missing column) against the pre-fix logic. - `test_service_as_sort_column_still_populates_low_cardinality_tags` uses a sort schema `metric_name|service|-timestamp_secs/V2` and asserts the output's `low_cardinality_tags[TAG_SERVICE]` covers every distinct service value. Verified the test fails against pre-fix `finalize_output_writer` with the expected "must contain TAG_SERVICE" message. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(streaming): rename to fill_page_cache_to_row + cross-input docstrings Addresses adamtobey's review on PR-6409. - Rename `advance_decoder_to_row` → `fill_page_cache_to_row`. The function's effect on the world is "add pages to the per-input cache" — it never advances a cursor or skips data. The old name primed reviewers to ask "are we skipping rows?" (which is exactly what Adam asked). - Use a `rows_for_current_output` register inside `compute_input_row_destinations` and write to `rows_per_output[out_idx]` once after the inner loop; saves the per-row indexed store. - Expand `body_col_page_cache` docstring with the horizontal-vs-vertical memory bound argument and a pointer to the MS-7 invariant test (`test_ms7_body_col_page_cache_bounded_regardless_of_input_size`). - Add context comments at the cross-file invariant sites Adam flagged: - Sort-cols-first storage-ordering contract on the sort-col drain. - Single-RG-input restriction with forward pointer to PR-6c.2 (#6424) which relaxes it. - `rg_partition_prefix_len` defaulting to 0 (with reference to the legacy-promotion `mixed_prefix_ok` escape in PR-6423). No behaviour change. 461 lib tests pass; workspace clippy + nightly fmt + rustdoc clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(streaming): assert per-input body cols are in Husky order Adam's question on L194 asked whether body-col ordering was a hard cross-file requirement. My first answer said "no" — true for which array we read (we look up by name), but wrong for the body-col memory bound: Phase 3 iterates the union schema's body cols alphabetically and asks each input's decoder to advance to that col. Parquet emits column chunks in declared schema order, so the decoder reads pages in that input's storage order. If an input's body cols aren't in the same alphabetical-after-sort-cols order ("Husky order"), fill_page_cache_to_row has to drain every body col preceding the requested one on the wire — those pages land in body_col_page_caches[col_idx] until that col's turn in the union iteration. The cache grows to a full column-chunk's worth per misaligned col. Vertical, not horizontal. Defeats streaming. Catch this at merge entry instead of silently degrading to vertical caching: - `assert_inputs_in_husky_body_col_order` runs after `build_input_decoders_state` and before phase 0. Bails with a concrete error message naming the offending pair of column names. - New regression test `test_assert_inputs_in_husky_body_col_order_rejects_misaligned_input` builds an input with body cols `[value, metric_type]` (alphabetical would be `[metric_type, value]`) and asserts the merge errors out before phase 3. No production producer violates this today (streaming writer and legacy Husky writer both emit lexicographic body cols), so the assertion catches future producer drift, not current traffic. 462 lib tests pass (461 prior + 1 new); workspace clippy + nightly fmt + rustdoc clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…PR-6c.2) Restructures PR-6b.2's flat phase 0 → phase 3 into a per-merge-region loop. Unlocks multi-RG metric-aligned input support and produces multi-RG output naturally — one output row group per merge region (typically one per metric_name when `rg_partition_prefix_len == 1`). Sort-prefix alignment (`prefix_len >= 1`) guarantees that any merge region has AT MOST one row group per input. That single invariant unlocks the restructure: 1. Pre-compute regions from RG metadata. For `prefix_len >= 1`, read each RG's metric_name min stat (must equal max — verifies metric-alignment). Group RGs across inputs by prefix_key. Sort regions by prefix_key. For `prefix_len == 0` (single-RG inputs only, validated earlier), one region covers everything. 2. Assign regions to output files by cumulative row count. Caller's `num_outputs` preserved as the upper bound. Each output file gets a contiguous slice of the region list, so output files have non-overlapping key ranges. 3. Per-region processing: for each region, advance contributing inputs' decoders through their RGs (drain sort cols of that RG, then stream body cols via the existing page-bounded BodyColOutputPageAssembler). Each region becomes one output RG in the current writer; when the assignment moves to a new output file, close the previous writer and open a new one. The streaming body-col mechanism from PR-6b.2 (arrow::compute:: interleave + handle.block_on driven decoder) is unchanged; it just runs over smaller row ranges (one region instead of one whole output). PR-6b.2's check that rejected any multi-RG input is replaced with: reject only `prefix_len == 0` AND multi-RG (those still need PR-5's LegacyMultiRGAdapter). Multi-RG metric-aligned inputs are now accepted natively. PR-6b.2 optimised the per-output schema based on per-output sort col data (drop all-null cols, re-dict-encode low-cardinality strings). With per-region streaming we don't know each region's content until we drain it, so PR-6c.2 declares the writer's schema as the full union schema and leaves output strings as Utf8. Per-output dict re-encoding can be reintroduced later by tracking cardinality during the streaming pass. - All 9 PR-6b.2 tests still pass (single-RG input regression — behaviour preserved). - New test_multi_rg_metric_aligned_input_produces_multi_rg_output: feeds a 2-RG metric-aligned input (prefix_len = 1, RG 0 = cpu.usage, RG 1 = memory.used); the streaming engine accepts it and produces a 2-RG output (one RG per metric_name region). - Renamed test_multi_rg_input_rejected → test_legacy_multi_rg_input_rejected to reflect the new rejection scope (only prefix_len == 0 multi-RG is rejected; metric-aligned is accepted). 10/10 streaming tests pass. Clippy, doc, machete, fmt all clean. 1. File-size cap with sort-key-boundary splits. 2. Per-output schema optimisation (track region body-col cardinality during the streaming pass). 3. Mid-region splits at sorted_series transitions for finer-grained M:N control when callers want more outputs than regions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two P1 bugs flagged by Codex on PR-6c.2 (#6410): 1. **Duplicate input row groups silently dropped.** When one input contained two RGs with the same composite prefix key, `process_region` overwrote `sort_col_batches[input_idx]` while `Region::total_rows` still counted both — losing rows and misaligning the body-col / sort-col mapping. Now enforce at-most-one-RG-per-input-per-prefix as a strong invariant at three sites: the merge read path (`extract_regions_from_metadata`), the streaming merge output finalize, and the indexing writer (`ParquetWriter::write_to_bytes` / `write_to_file_with_metadata`). The new `assert_unique_rg_prefix_keys` helper is shared. 2. **Byte-array prefix encoding broke lex order across lengths.** The 4-byte length prefix made `"b"` sort before `"aa"`, violating the declared ASC order. Switched to byte-stuffed escape encoding (`0x00` → `0x00 0x01`, terminator `0x00 0x00`), which preserves single-column lex order AND retains unambiguous concatenation for composite keys (the terminator is the smallest 2-byte sequence under escaping, so shorter values still sort before longer ones with the same prefix). Tests: - `test_byte_array_prefix_preserves_lex_order_across_lengths` — `"aa" < "b"`, empty < non-empty, shared-prefix shorter < longer, null-byte escaping preserves order. - `test_streaming_merge_rejects_duplicate_prefix_rgs_in_one_input` — end-to-end bail with clear error. - `test_write_to_bytes_rejects_duplicate_rg_prefix_when_claimed_aligned` + the `write_to_file` and single-RG positive counterparts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…i-output When inputs declare rg_partition_prefix_len = 0 (legacy single-RG) and the caller asks for num_outputs > 1, the engine subdivides the single region at sorted_series transitions in the merge order so it can honor the output count. A single sorted_series run is never broken; if one run exceeds the remaining budget the whole run lands in one output anyway. The output inherits the input's rg_partition_prefix_len (=0) — the engine does not synthesize a prefix it can't unconditionally guarantee. Also handles the giant-single-metric case (prefix_len=0, one metric_name, num_outputs > 1): sorted_series transitions still split the merge order even though there are no metric_name transitions to drive a prefix synthesis. Implementation: - New `split_region_at_sorted_series` in region_grouping: walks the merge order and splits at sorted_series transitions when accumulated rows reach the target budget. - Main engine loop: when num_outputs > current_output_idx + 1 AND region's rows exceed the remaining budget, drain sort cols for the region, compute merge order, call split_region_at_sorted_series, process sub-regions. - Per-col page cache + cursor keyed by col_idx so the body-col path can read pages once and re-use them across sub-regions within the same top-level region. Resets between top-level regions (different RGs). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
7b8317b to
a5dfd72
Compare
The MS-2 validation path returns `Err` via `bail!()` (anyhow), not a panic / abort. Five doc-comment / inline-comment sites described the failure as "the engine would crash mid-merge" — overstated. Callers get a `Result::Err` propagated up the spawn_blocking task and the `streaming_merge_sorted_parquet_files` return. Sites updated: - `region_grouping.rs` module doc. - `validate_region_order_matches_physical_rg_order` doc. - streaming.rs MS-2 validation call-site comment. - Test docstrings for `test_streaming_merge_with_desc_prefix_col` and `test_ms2_region_order_disagrees_with_physical_rg_order_rejected`. No behaviour change. 477 lib tests pass; clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two sites referenced a non-existent `LegacyMultiRGAdapter` — the actual type, introduced in PR-5 (#6408), is `LegacyInputAdapter` in `storage::legacy_adapter`. Fixed both references. Also expanded the rejection-block comment to make the *intent* of the guard explicit: it catches caller bugs (wiring a raw legacy multi-RG `StreamingParquetReader` straight into the streaming merge), not a degraded-input fallback. Production code routes legacy splits through `merge::execute_merge_operation` which wraps them in `LegacyInputAdapter` first. No behaviour change. Targeted test passes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@codex review |
adamtobey nit on PR #6424: `rows_emitted >= expected_rows` accepts `emitted > expected` as a normal termination condition, which would actually be a real accounting bug. The math rules `>` out by construction — `page_size = remaining.min(OUTPUT_PAGE_ROWS)` where `remaining = expected_rows - rows_emitted`, so each `rows_emitted += page_size` keeps `rows_emitted ≤ expected_rows`. Two changes: - Termination becomes `rows_emitted == expected_rows` so we don't silently accept an overshoot. - `debug_assert!(rows_emitted <= expected_rows, …)` at the top of `next()` documents the invariant and surfaces a regression loudly (panic in debug + tests) instead of silently terminating one iteration too late. No behaviour change in the happy path; bugs that would have produced `>` now fail tests instead of producing wrong output. 477 lib tests pass; clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d8881b927c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
…ndary Codex P1 finding on PR #6424: when a top-level region exactly fills the current output (so `remaining_in_current == 0`) and the next prefix-aligned region needs splitting, the split's first-sub-region budget was the stale zero remainder of the about-to-be-finalized output. `split_region_at_sorted_series` therefore cut after the first sorted_series run, producing a tiny leftover plus a large continuation that both inherited the parent region's prefix key. The sub-region loop then rolled over to a fresh output and wrote both pieces there, tripping the PA-3 duplicate-prefix-RG check in `finalize_output`. Fix: detect the rollover at decision time and compute `effective_first_target` / `effective_outputs_remaining` against the *next* output's empty budget. With the fix, the example above just chooses `needs_split = false` (region fits the fresh output's full target), processes the region whole, and rolls over cleanly. Regression test `test_region_exactly_fills_output_does_not_split_next_aligned_region` exercises the exact scenario Codex described: three 50-row RGs with distinct (metric, service) prefixes, `num_outputs = 3`, target = 50. Pre-fix, the merge bailed with PA-3 on output 1; post-fix, three clean outputs each with one unique prefix key. Verified by reading each output's parquet metadata back through `assert_unique_rg_prefix_keys`. 478 lib tests pass (477 prior + 1 new); clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 56e773f9fe
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Codex P1 on PR #6424: `extract_aligned_prefix_value` decided prefix alignment purely from `min` / `max` statistics. Parquet records those over non-null cells only, with `null_count` reported separately, so two real failure modes slipped through: 1. **Mixed null + non-null.** A row group with `N` nulls plus a single non-null cell `"x"` reports `min == max == "x"` and the `min == max` check silently accepted it — but two distinct prefix keys (null and `"x"`) lived in that RG, breaking the at-most-one-prefix-value-per-RG invariant (PA-1). 2. **All-null RG.** Parquet records no `min` / `max` for an all- null chunk, so the legacy check bailed with the misleading "no min in stats" error. Logically the RG carries one prefix value (null) and is aligned — but supporting it cleanly requires a null marker in the composite-key encoding that agrees with SS-2's "nulls last" rule. `encode_byte_array_prefix(&[])` puts nulls *first*; coordinating that with SS-2 is a follow-up. Fix: read `null_count_opt()` from stats and `num_values()` from the column-chunk metadata. Bail explicitly in both cases — mixed with a PA-1 message naming the (nulls, non-null) split, all-null with a clear "not yet supported" pointer. Two regression tests in `streaming.rs::tests`: - `test_mixed_null_and_value_prefix_rg_rejected`: 1 RG, 3 cells `"cpu.usage"` + 1 null. Asserts PA-1 bail. - `test_all_null_prefix_rg_rejected`: 1 RG, 3 nulls. Asserts the "all-null … not yet supported" bail. 480 lib tests pass (+2 new); workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The per-RG composite prefix key now uses the same storekey-based encoding as `sorted_series` — same `(ordinal, value)` layout, same direction-inversion, same null-skip pattern — so a per-RG prefix key is a literal byte prefix of every `sorted_series` value emitted by rows in that RG. Why: the prior byte-stuffed escape encoding had no in-line way to represent an all-null prefix RG (an empty marker would lex-sort before any present-value key, conflicting with SS-2 nulls-last). With the shared encoding, an all-null column is skipped entirely and the next column's higher ordinal byte appears in its place, giving nulls-last ordering for free — the same trick already proven in `sorted_series::encode_row_key`. Per-column logic now goes through one helper: `crate::sorted_series::append_prefix_col_to_key(buf, ord, val, desc)` shared between `sorted_series` (per-row keys) and `merge::streaming::region_grouping` (per-RG keys). It writes `storekey(ord) || storekey(val)` and inverts only the value bytes for DESC columns. `sorted_series::encode_row_key` was refactored to call the helper; the open-coded inline encoding is gone. Trailing **prefix-length sentinel**: each per-RG key ends with a `u8(prefix_len)` ordinal byte. This handles the prefix_len=1 edge case where an all-null RG's empty body would otherwise lex-sort *before* any non-null RG — with the sentinel, the all-null key becomes `[prefix_len]` and non-null keys still start with `ord(0)` (< prefix_len), so non-null sorts first. The sentinel is also what `sorted_series` writes immediately after the prefix cols, so the literal-prefix property is preserved. Null handling in `extract_rg_composite_prefix_key`: - **All-null RG**: column skipped, RG groups into its own region (after non-null regions). - **Mixed null + non-null**: rejected as a PA-1 violation (rows in the same RG would encode to two distinct prefix keys; producer is supposed to start a new RG at the null/non-null transition). - **Otherwise**: standard `min == max` check, then the type-dispatched storekey encoding via the helper. Removed: - `extract_aligned_prefix_value` (replaced by `encode_prefix_col_value` which calls the helper). - `encode_byte_array_prefix` (byte-stuffed escape, no longer used). - `invert_for_descending` (the helper handles inversion per-column). - `test_invert_for_descending_reverses_lex_order` and `test_byte_array_prefix_preserves_lex_order_across_lengths` (byte-level tests of the removed encoding; semantic properties remain enforced by `storekey`'s own tests plus the higher-level prefix tests). Replaced `test_all_null_prefix_rg_rejected` with `test_all_null_prefix_rg_groups_into_separate_region_sorted_last`: builds two inputs (one with `metric_name = "cpu.usage"`, one with `metric_name = NULL`) and verifies the merged output has two RGs with the all-null region in RG 1 (sorted after the non-null region) — pinning the nulls-last ordering that the sentinel encoding produces. Updated `test_extract_rg_composite_prefix_key_two_byte_array_cols` for the new byte layout (`storekey(ord) || storekey(val)` per col plus the trailing sentinel byte). `PrefixColumn` gains an `ordinal: u8` field, populated from each column's position in `qh.sort_fields` so it matches the ordinal `sorted_series` would assign. 478 lib tests pass; workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fb85855ee6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
CI's nightly rustfmt (1.9.0-nightly 2026-05-17) wrapped a handful of comment / bail!-message / where-clause / vec! literal lines slightly differently than my local nightly at commit time (1.9.0-nightly 2026-05-11). Re-formatting all three affected files catches the drift in this commit so CI Lints stops complaining; local nightly is now updated to match CI. No behaviour change. 478 lib tests still pass on the slice. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…KV stamp The streaming merge engine produces sort-prefix-aligned multi-RG output and stamps `qh.rg_partition_prefix_len = input_meta.rg_partition_prefix_len` in the file's KV (verified by `assert_unique_rg_prefix_keys` before close). `merge_parquet_split_metadata` then ran after and unconditionally demoted to 0 whenever `output.num_row_groups > 1` — breaking CS-1 (metastore must mirror on-disk KV) for every multi-RG streaming-engine output. Aligned splits got tagged 0 in the metastore on every merge and leaked out of the prefix-aligned compaction bucket on the next pass. Carry the value the writer actually stamped via a new `MergeOutputFile.output_rg_partition_prefix_len` field, then propagate it as-is in metadata aggregation. Both engines populate the field: - Legacy `merge/writer.rs` reports its demoted value (row-count-driven RG boundaries can't honor prefix alignment, so it stamps 0 on multi-RG). - Streaming `merge/streaming/output.rs` reports the inputs' prefix unchanged (it splits at prefix transitions and the writer verifies). CS-1 holds by construction — same source of truth, no re-derivation. Tests: - `test_output_prefix_len_demoted_when_multi_rg` → renamed to `test_output_prefix_len_carries_writers_value_when_demoted`; now asserts that the metastore mirrors the writer's reported value. - New `test_output_prefix_len_preserved_on_multi_rg_streaming_engine` asserts that a multi-RG streaming output (writer reports prefix_len=2) keeps the prefix in the metastore — the regression case for F1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@codex review |
|
Codex Review: Didn't find any major issues. Already looking forward to the next diff. ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
Same nightly-rustfmt drift as the storekey commit on #6424 (local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok` binding and the `merge_parquet_split_metadata` call now fit single-line under the newer width heuristics. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Same nightly-rustfmt drift as the storekey commit on #6424 (local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok` binding and the `merge_parquet_split_metadata` call now fit single-line under the newer width heuristics. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…_prefix_len (#6425) * feat(legacy-adapter): synthesize prefix-aligned row groups The legacy adapter previously consolidated multi-RG legacy inputs into a single oversized row group and left `rg_partition_prefix_len` at the original's (typically `0`). The streaming merge engine then sent these single-RG/prefix=0 inputs through the new sub-region splitting path — correct, but it forfeits the prefix-aware fast path for outputs derived from legacy inputs and gives up the row-group pruning that prefix alignment enables. After consolidating, the adapter now slices the resulting record batch at first-sort-col transitions (typically `metric_name`) and emits one parquet row group per slice, stamping the re-encoded file with `qh.rg_partition_prefix_len = 1`. The merge engine then reads it through the prefix-aware fast path: one region per metric_name, the existing duplicate-prefix invariant on read validates uniqueness. Fallback: if the original file has no `qh.sort_fields` KV, the sort-fields string fails to parse, the first column can't be resolved in the arrow schema, or the consolidated batch is empty, the adapter reverts to a single-RG re-encode without claiming any prefix alignment. That input still works — the engine's prefix_len=0 sub-region splitting path picks it up. This keeps the adapter robust for files written by very early versions of the indexer that may pre-date the standard KV layout. Implementation: `reencode_prefix_aligned` replaces `reencode_as_single_row_group` and either dispatches to the new multi-RG writer or to the legacy single-RG writer based on whether the first sort col is resolvable. `RowConverter` handles the prefix-value equality check uniformly across dictionary, utf8, and primitive types. The KV injection helper replaces (rather than appends) any existing `qh.rg_partition_prefix_len` so re-runs and files mistakenly carrying a stale value still land at the freshly synthesized prefix. Tests: - `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg` — 3 metrics × 40 rows, multi-RG input → 3 prefix-aligned output RGs and `qh.rg_partition_prefix_len = 1` KV. - `test_legacy_input_single_metric_yields_one_rg_with_prefix_kv` — one metric → one RG, prefix KV still stamped (vacuously aligned). - `test_legacy_input_without_sort_fields_falls_back_to_single_rg` — fallback path preserved when sort-fields KV is missing. - All existing tests pass unchanged (they use empty KVs or unparseable sort-fields strings, both of which exercise the fallback path). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(legacy-adapter): parameterize on target_prefix_len with composite-prefix support `LegacyInputAdapter::try_open` now takes `target_prefix_len: u32` chosen by the caller, matching the merge plan's consensus prefix length. The adapter slices the consolidated batch at every transition of the first N sort columns (composite key, via `RowConverter` over all N fields) and emits one output row group per slice, stamping the output with `qh.rg_partition_prefix_len = target_prefix_len`. With `target_prefix_len = 0` the adapter takes the original single-RG passthrough path with no prefix-alignment claim. A sort column that is named in `qh.sort_fields` but missing from the file's arrow schema is treated as implicitly null at every row per SS-3. A constantly-null column trivially satisfies alignment on that column (null == null) and contributes no transitions, so the split boundaries are driven by the columns that are present. This matches the merge engine's compaction-time treatment of missing columns and keeps a legacy file with an evolved schema usable as a prefix-aligned input. `PrefixUnresolvable` now fires only on cases where the file doesn't advertise enough sort *names* to honor the request: - `qh.sort_fields` absent or unparseable - `qh.sort_fields` declares fewer sort columns than `target_prefix_len` A column missing from the arrow schema no longer counts as unresolvable; the adapter materialises a `NullArray` of the batch's length in that slot and proceeds. Tests: - `test_target_prefix_len_zero_passes_through_as_single_rg` — explicit N=0 fallback, no prefix KV stamped. - `test_target_prefix_len_two_splits_by_metric_and_service` — composite prefix (`metric_name`, `service`) → 4 RGs, KV declares prefix_len=2. - `test_target_prefix_len_one_without_sort_fields_returns_unresolvable` — no `qh.sort_fields` KV → `PrefixUnresolvable`. - `test_target_prefix_len_exceeds_declared_sort_cols_returns_unresolvable` — sort schema declares 2 cols, caller asks 3 → `PrefixUnresolvable`. - `test_missing_prefix_col_treated_as_null_satisfies_alignment` — sort schema declares `metric_name|env|-timestamp_secs` but `env` is absent from the arrow schema → no error, only metric_name transitions split RGs, KV still stamps prefix_len=2. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(legacy_adapter): note where reader-side SS-3 handling lands Codex P2 on PR #6425: the adapter records `None` for missing prefix columns and stamps `rg_partition_prefix_len = target_prefix_len` anyway. In isolation that produces a file with an advertised prefix the current reader (`find_prefix_parquet_col_indices` on the #6425 state) bails on. The reader-side fix — returning `Vec<Option<PrefixColumn>>` and synthesizing a constant `[0x00, 0x00]` byte for `None` slots — lands in PR #6426 (the hardening slice, F12 from the adversarial review). The only caller of this adapter is `execute_merge_operation`, introduced in PR #6423 which sits above #6426 in the stack, so no production caller can produce a missing- column prefix until the reader fix is in place. Adding the in-code pointer so a future reader bisecting the stack doesn't have to trace the relationship from scratch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(merge): consumer honors SS-3 (move F12 forward from #6426 to #6425) Previously the F12 fix — "consumer side honors SS-3 missing prefix columns" — lived in the hardening PR (#6426). At the #6425 isolation level, the legacy adapter records `None` for a prefix column absent from the parquet schema and stamps `rg_partition_prefix_len = target_prefix_len` on the output, but the reader's `find_prefix_parquet_col_indices` bails on any missing column. So #6425 + #6424 alone would produce a legacy-adapter file that the streaming-merge reader rejects mid-merge — i.e. a known- incoherent intermediate stack state. Move F12 into this PR so the adapter and reader agree at the same slice: - `find_prefix_parquet_col_indices` now returns `Result<Vec<Option<PrefixColumn>>>`. `Some(_)` when the column is present in the parquet schema; `None` per SS-3 when the column is named in `qh.sort_fields` but absent from the schema. - `extract_rg_composite_prefix_key` skips `None` slots entirely (no ordinal byte, no value bytes for that column). The trailing `u8(prefix_len)` sentinel introduced in the storekey refactor keeps the resulting key well-formed across present/absent columns. - Callers that index into `prefix_cols` updated to use `.as_ref().expect(…)` where they assume presence. Existing SS-3 test `test_missing_prefix_col_treated_as_null_satisfies_alignment` in `legacy_adapter.rs` gets an `assert_unique_rg_prefix_keys` call verifying the adapter's output is consumable by the reader — pins the "stack-coherent at #6425" property the F12 hop establishes. Also incidental nightly-fmt cleanups in `sorted_series::append_prefix_col_to_key` and the two-input fixture in `test_all_null_prefix_rg_groups_into_separate_region_sorted_last`. The hardening PR (#6426) will be re-cascaded to drop the now- duplicated F12 hunks (keeping its F8 adapter-rejects-unsorted + F2 verifier-strength changes intact). 485 lib tests pass on this slice; workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(legacy-adapter): strip stale rg_partition_prefix_len when target=0 Codex P2 on PR #6425: when the legacy adapter is called with `target_prefix_len == 0` it consolidates the input into a single RG, but the previous version preserved the input's footer KVs unchanged. If the input itself already carried a stale nonzero `qh.rg_partition_prefix_len` claim (e.g., a prefix-aware split being re-encoded through the legacy fallback path), the single-RG output would still advertise that claim. Downstream metadata extraction would take the prefix-aware path against an RG carrying multiple first-prefix values — failing the PA-1 min/max alignment check on read despite the caller explicitly asking for the legacy path. Strip `PARQUET_META_RG_PARTITION_PREFIX_LEN` from `original_kv` in the `target_prefix_len == 0` branch. Absence of the KV is the legacy convention for "no alignment claim", matching the existing `test_target_prefix_len_zero_passes_through_as_single_rg` test's `prefix_kv.is_none()` assertion. New regression test `test_target_prefix_len_zero_strips_stale_prefix_kv_from_input`: inputs a 2-RG file with `qh.rg_partition_prefix_len = "1"` AND opens through adapter with `target_prefix_len = 0`; asserts the re-encoded output has no prefix KV. Pre-fix this test caught the leak; post-fix the stale value is dropped. 487 lib tests pass on the slice; clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Same nightly-rustfmt drift as the storekey commit on #6424 (local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok` binding and the `merge_parquet_split_metadata` call now fit single-line under the newer width heuristics. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…plit Codex P1 on PR #6428: the previous "Recompute split budget after rolling over" fix (commit 56e773f, #6424) handled the split *decision* but not the split *assignment*. When the previous region fills the current output exactly and the next region enters the `needs_split` path, the chunk-assignment loop in `process_split_region_col_outer`'s setup initializes from the stale `current_output_idx` / `current_output_rows`. Its inner `needs_new_writer` check guards on `!chunk_assignments.is_empty()`, so the first iteration cannot roll over: the first sub-region is appended to the already-full output and only the second one advances. Output K ends up at 2× target while subsequent outputs are short or empty. Fix: initialize `active_output_idx` / `active_rows` from the `will_roll_over` case before the loop. The inner `needs_new_writer` check then works for both the first and subsequent iterations (on the first iteration `active_rows = 0 < target` so it correctly doesn't re-roll). The `can_reuse_current` check in the writer- materialization loop already handles "first chunk's output_idx doesn't match current_writer" by finalizing the current output (which is correct: it's full, close it) and opening a fresh writer at the next index. Regression test `test_split_chunk_assignment_rolls_over_before_first_chunk`: prefix_len=1, two metrics of 200 + 400 rows = 600 total, `num_outputs = 3` → `target_per_output = 200`. Region A fills output 0 exactly; region B needs splitting. Pre-fix the merge produced 2 outputs of 400 + 200 (output 0 overfilled, output 2 empty); post-fix it produces 3 outputs of ~200 rows each. 502 lib tests pass (+1); workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The streaming Parquet merge stack landing in #6424–#6428 ships the full legacy-promotion *mechanism* (engine + adapter + executor wiring) but not the planner-level *trigger*. In production today, `MergePolicyState::record_split` buckets by `CompactionScope::from_split` which includes `rg_partition_prefix_len`, so legacy (prefix=0) and aligned (prefix>0) splits are separated before `ParquetMergePolicy::operations` runs. The policy only emits `ParquetMergeOperation::new`; a repo-wide search finds `promote_legacy` only in tests. Legacy splits therefore never migrate without an explicit trigger. Tracking this as GAP-011 so we pick it up at the right time. The gap doc walks three resolution options (merge buckets in the scope key, dedicated promotion pass, or hybrid prefer-multi-input-promotion) and the cost trade-offs between them, so the eventual implementation PR has a starting point. Raised by Codex review comment id 4311184497 on PR #6423. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…plit Codex P1 on PR #6428: the previous "Recompute split budget after rolling over" fix (commit 56e773f, #6424) handled the split *decision* but not the split *assignment*. When the previous region fills the current output exactly and the next region enters the `needs_split` path, the chunk-assignment loop in `process_split_region_col_outer`'s setup initializes from the stale `current_output_idx` / `current_output_rows`. Its inner `needs_new_writer` check guards on `!chunk_assignments.is_empty()`, so the first iteration cannot roll over: the first sub-region is appended to the already-full output and only the second one advances. Output K ends up at 2× target while subsequent outputs are short or empty. Fix: initialize `active_output_idx` / `active_rows` from the `will_roll_over` case before the loop. The inner `needs_new_writer` check then works for both the first and subsequent iterations (on the first iteration `active_rows = 0 < target` so it correctly doesn't re-roll). The `can_reuse_current` check in the writer- materialization loop already handles "first chunk's output_idx doesn't match current_writer" by finalizing the current output (which is correct: it's full, close it) and opening a fresh writer at the next index. Regression test `test_split_chunk_assignment_rolls_over_before_first_chunk`: prefix_len=1, two metrics of 200 + 400 rows = 600 total, `num_outputs = 3` → `target_per_output = 200`. Region A fills output 0 exactly; region B needs splitting. Pre-fix the merge produced 2 outputs of 400 + 200 (output 0 overfilled, output 2 empty); post-fix it produces 3 outputs of ~200 rows each. 502 lib tests pass (+1); workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers Three adversarial-review findings on the prefix/RG machinery, bundled because they touch the same producer/consumer contract: **F8: Legacy adapter rejects SS-1-violating input upfront.** The adapter walked rows in physical order and emitted one RG per prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`) produced a 3-RG file where two RGs shared prefix `A`, violating PA-3. The streaming merge engine would later reject it mid-merge — but only after a quietly-bad file had been built. Now `compute_prefix_value_slices` tracks each slice's composite prefix-value bytes and bails with `LegacyAdapterError::InputNotSorted` on duplicates, surfacing the SS-1 violation before any file lands on disk. **F12: Consumer-side SS-3 (cross-layer divergence, discovered while wiring F2's chunk-level verifier into the SS-3 test).** The adapter implements SS-3 correctly (missing-from-schema → synthesized NullArray during slice computation, file stamps `prefix_len = N`). The streaming engine's reader did not: `find_prefix_parquet_col_indices` hard-required every named prefix column to be physically present, so a file the adapter produced from an SS-3 input was unreadable by the merge engine. Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>` and `extract_rg_composite_prefix_key` emits a constant null marker (`encode_byte_array_prefix(&[])`) for None slots. The column contributes no cross-RG ordering signal (constant everywhere) so region boundaries are driven entirely by the present columns. Both halves of SS-3 now agree end-to-end. Known limitation: cross-file SS-3 — where some inputs have a sort column and others don't — uses [0x00, 0x00] for the null contribution, which sorts BEFORE non-null per the encoded-empty-string convention. That weakly violates SS-2 (nulls sort last). Single-file SS-3 is correct because every RG in such a file contributes the same constant. If cross-file SS-3 becomes a production scenario, the encoding needs a leading-0xff sentinel instead. Not exercised today. **F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming tests.** Tests asserting `num_row_groups == N` + KV stamped to N would have passed even with an off-by-one in slice-boundary detection or column-content scrambling. The verifier reads chunk-level statistics directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness) on the composite key. Wired into six tests: - streaming engine: `test_streaming_merge_with_prefix_len_two`, `test_multi_rg_metric_aligned_input_produces_multi_rg_output`, `test_streaming_merge_with_desc_prefix_col` - legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`, `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`, `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now passes thanks to F12). Also: `assert_unique_rg_prefix_keys` no longer short-circuits on single-RG files — they still go through PA-1 because an unsorted single-RG file CAN have `min != max` on a prefix column. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(merge): legacy-prefix promotion path + schema-evolution body cols Two adversarial-review follow-ups grouped because they share the streaming engine's input-routing and union-schema seams. ## (b) Legacy-prefix promotion A new operation type pairs a prefix_len=0 split with prefix_len>0 peers in one merge, so legacy splits can be folded into prefix- aligned buckets instead of aging out via retention. Adds: - `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)`: relaxes MP-3 to allow mixed `rg_partition_prefix_len` as long as every input is `<= target`. Sort_fields + window equality unchanged. - `ParquetMergeOperation::target_prefix_len_override: Option<u32>` field records the promotion target; `None` is the default regular-merge form. - `merge_parquet_split_metadata(..., mixed_prefix_ok)`: skips the input-side prefix-len equality check in promotion mode. The output prefix_len still comes from the writer's KV stamp via `MergeOutputFile.output_rg_partition_prefix_len` (CS-1 holds by construction post-F1). - `merge::execute_merge_operation(op, sources, ...)`: new thin executor that opens each input as either `LegacyInputAdapter` (when `split.rg_partition_prefix_len < target`) or `StreamingParquetReader` (otherwise), then feeds them to the streaming engine. Becomes the seam PR-7 will wire from above. Tests: - `test_promote_legacy_pairs_legacy_with_aligned_peer`, `test_promote_legacy_rejects_higher_prefix_input`, `test_promote_legacy_still_enforces_sort_fields`, `test_promote_legacy_all_at_target_is_valid`. - `test_mixed_prefix_ok_skips_input_equality_check`. - `test_promote_legacy_executor_end_to_end`: legacy single-RG + aligned multi-RG → 3-RG output passing `assert_unique_rg_prefix_keys` with `prefix_len = 1`, plus metastore CS-1. - `test_executor_mismatched_sources_count_bails`. ## F6 + F13: Schema evolution for body columns The merger now supports MC-4 across heterogeneous body-col schemas: - F6: `normalize_type` collapses `Binary`/`LargeBinary` (and dict variants) to `Binary`, analogous to the existing string-flavour collapse. Two inputs whose body col differs only by byte-array flavour merge cleanly; before this they hit a "type conflict" at alignment time. - F13: `streaming_writer.rs::write_list_via_serialized_column_writer` (renamed from `..._non_nullable_...`) now handles nullable outer `List<T>` / `LargeList<T>`. MC-4 forces the union to be nullable when a List col is present in only some inputs; before this the writer rejected the merged output. Uses Dremel max_def_level = 2 (0 = outer null, 1 = empty list, 2 = element present) for nullable outer; non-nullable path unchanged. Test: `test_mc2_mixed_schemas_round_trip` builds two inputs A and B with the same sort schema but different body cols (Utf8 vs Dict<Utf8>, LargeBinary vs Binary, List<Float64> in A only, Int32 A-only, Int64 B-only, common Float64). The merge produces the union schema; per-row rendering via `render_cell` matches across flavour boundaries; List cells from B render as nulls. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style(indexing): re-fmt parquet_merge_executor to latest nightly rustfmt Same nightly-rustfmt drift as the storekey commit on #6424 (local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok` binding and the `merge_parquet_split_metadata` call now fit single-line under the newer width heuristics. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(merge-executor): route promotion merges through execute_merge_operation Codex P1 on PR #6423: the executor unconditionally called the in-memory `merge_sorted_parquet_files` path, which routes through `extract_and_validate_input_metadata` and bails on mixed `qh.rg_partition_prefix_len` before any output is produced. So a real promotion merge — `prefix_len = 0` plus `prefix_len = 1` with `target_prefix_len_override = Some(1)` — failed before reaching the downstream `mixed_prefix_ok` plumbing in `merge_parquet_split_metadata`. The escape hatch existed but was unreachable for actual promotion inputs. Fix: branch in the executor's handle on `target_prefix_len_override.is_some()`. Promotion merges go through the engine's streaming entry point `quickwit_parquet_engine::merge::execute_merge_operation`, which opens each below-target input via `LegacyInputAdapter` and each at-target input directly. The streaming merge then sees a homogeneous stream advertising `prefix_len = target` on every input. Regular (non-promotion) merges keep the in-memory path. `execute_merge_operation` expects `Vec<Arc<dyn RemoteByteSource>>` parallel to `op.splits` — the engine deliberately doesn't depend on `quickwit-storage` (would invert layering and pull cloud SDKs into a pure parquet library). So this commit adds `LocalFileByteSource`: a tiny `RemoteByteSource` impl backed by `tokio::fs::File`, one instance per downloaded split, each bound to its scratch-directory path. The `path: &Path` argument on the trait surface is ignored — the downloader has already resolved each split to a concrete local file before the executor runs. Coverage: - Library-level: `quickwit-parquet-engine::merge::streaming::tests::test_promote_legacy_executor_end_to_end` already exercises `execute_merge_operation` with a `prefix_len = 0` + `prefix_len = 1` pair, verifying the output advertises `prefix_len = 1` and passes PA-1 + PA-3 on the composite key. That's now the same code path the in-tree executor takes. - Module doc on the executor rewritten to spell out which path runs when. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): track legacy promotion planner gap as GAP-011 The streaming Parquet merge stack landing in #6424–#6428 ships the full legacy-promotion *mechanism* (engine + adapter + executor wiring) but not the planner-level *trigger*. In production today, `MergePolicyState::record_split` buckets by `CompactionScope::from_split` which includes `rg_partition_prefix_len`, so legacy (prefix=0) and aligned (prefix>0) splits are separated before `ParquetMergePolicy::operations` runs. The policy only emits `ParquetMergeOperation::new`; a repo-wide search finds `promote_legacy` only in tests. Legacy splits therefore never migrate without an explicit trigger. Tracking this as GAP-011 so we pick it up at the right time. The gap doc walks three resolution options (merge buckets in the scope key, dedicated promotion pass, or hybrid prefer-multi-input-promotion) and the cost trade-offs between them, so the eventual implementation PR has a starting point. Raised by Codex review comment id 4311184497 on PR #6423. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): track download-vs-streaming merge executor gap as GAP-012 The Parquet streaming merge engine is built around `RemoteByteSource` and was designed to pull pages directly from object storage — two GETs per input, overlap fetch with merge, no scratch disk. The production actor pipeline doesn't take that path: a downloader actor materializes every input on local disk first, and the executor wraps the local files in a `LocalFileByteSource` to feed `execute_merge_operation` (or just calls the in-memory `merge_sorted_parquet_files` path). The streaming engine's central design benefit is unused. This isn't a correctness bug — both paths give the same result. It's a perf/architecture gap: every merge pays 2× I/O per input (network → scratch + scratch → merger), serializes phases (`max(input download time)` first-byte latency), and consumes scratch disk that scales with concurrent merges. Tracking as GAP-012 so we pick it up at the right time. The gap doc walks four options (stream-directly with download fallback, stream- by-default with circuit breaker, eliminate in-memory path only, stream-directly for promotion merges only) and the trade-offs between them — including the mid-merge retry surface, which is the main reason download-first is the current default. Surfaced during PR #6423 code walkthrough. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…plit Codex P1 on PR #6428: the previous "Recompute split budget after rolling over" fix (commit 56e773f, #6424) handled the split *decision* but not the split *assignment*. When the previous region fills the current output exactly and the next region enters the `needs_split` path, the chunk-assignment loop in `process_split_region_col_outer`'s setup initializes from the stale `current_output_idx` / `current_output_rows`. Its inner `needs_new_writer` check guards on `!chunk_assignments.is_empty()`, so the first iteration cannot roll over: the first sub-region is appended to the already-full output and only the second one advances. Output K ends up at 2× target while subsequent outputs are short or empty. Fix: initialize `active_output_idx` / `active_rows` from the `will_roll_over` case before the loop. The inner `needs_new_writer` check then works for both the first and subsequent iterations (on the first iteration `active_rows = 0 < target` so it correctly doesn't re-roll). The `can_reuse_current` check in the writer- materialization loop already handles "first chunk's output_idx doesn't match current_writer" by finalizing the current output (which is correct: it's full, close it) and opening a fresh writer at the next index. Regression test `test_split_chunk_assignment_rolls_over_before_first_chunk`: prefix_len=1, two metrics of 200 + 400 rows = 600 total, `num_outputs = 3` → `target_per_output = 200`. Region A fills output 0 exactly; region B needs splitting. Pre-fix the merge produced 2 outputs of 400 + 200 (output 0 overfilled, output 2 empty); post-fix it produces 3 outputs of ~200 rows each. 502 lib tests pass (+1); workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Slice 1 of 3 from the former PR #6410. Stacked underneath #6425 (adapter) and #6426 (hardening).
The streaming merge engine's per-region foundation. Refactors the engine from "one big merge" to "one region per prefix-key value", and adds intra-region splitting at sorted_series transitions so prefix_len=0 multi-output cases honor the requested file count.
What's in here
merge/streaming/region_grouping.rs: extracts regions from input metadata by composite prefix key (BTreeMap-driven), validates MS-2 (region order matches each input's physical RG order), and verifies PA-3 uniqueness viaassert_unique_rg_prefix_keys.merge/streaming/body_assembler.rs: page-bounded body-col write driven from per-input page caches.merge/streaming/output.rs: per-output writer + finalize that derives row_keys / zonemap / metric_names from the rows that landed in that output.merge/streaming.rsfrom a single-region engine to a per-region processor with sub-region splitting.0x00 → 0x00 0x01, terminator0x00 0x00) with bytewise complement for DESC columns, so BTreeMap iteration matches the declared sort order across any composite of leaf primitives.05dfb).split_region_at_sorted_series: when prefix_len=0 + num_outputs > 1, walks the merge order and splits at sorted_series transitions so even single-prefix inputs honor the file count. Single sorted_series runs are never broken.Test plan
cargo test -p quickwit-parquet-engine --all-features— 476 unit tests pass at this slice's HEAD.Out of scope
🤖 Generated with Claude Code