Skip to content

feat: per-merge-region streaming engine — multi-RG inputs + outputs (PR-6c.2)#6410

Closed
g-talbot wants to merge 8 commits into
gtt/streaming-merge-engine-mergerfrom
gtt/streaming-merge-engine-multi-rg
Closed

feat: per-merge-region streaming engine — multi-RG inputs + outputs (PR-6c.2)#6410
g-talbot wants to merge 8 commits into
gtt/streaming-merge-engine-mergerfrom
gtt/streaming-merge-engine-multi-rg

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

@g-talbot g-talbot commented May 8, 2026

Summary

Restructures PR-6b.2's flat phase 0 → phase 3 into a per-merge-region loop. Unlocks multi-RG metric-aligned input support and produces multi-RG output naturally — one output row group per merge region (typically one per metric_name when rg_partition_prefix_len == 1).

Architectural insight

Sort-prefix alignment (prefix_len >= 1) guarantees that any merge region has AT MOST one row group per input. That single invariant unlocks the restructure: phase 0 no longer has to span all RGs upfront (which would force either column-chunk buffering or a second body GET); it drains one RG's sort cols per input per region as we go.

Pipeline

  1. Pre-compute regions from RG metadata. For prefix_len >= 1, read each RG's metric_name column-chunk min stat (must equal max — verifies metric-alignment). Group RGs across inputs by prefix_key. Sort regions by prefix_key.

    For prefix_len == 0 (single-RG inputs only, validated earlier), one region covers everything — behaviour matches PR-6b.2 for today's single-RG inputs.

  2. Assign regions to output files by cumulative row count. Caller's num_outputs is preserved. Each output file gets a contiguous slice of the region list, so output files have non-overlapping key ranges.

  3. Per-region processing: for each region in order, advance contributing inputs' decoders through their RGs (drain sort cols of that RG, then stream body cols via the existing page-bounded BodyColOutputPageAssembler). Each region becomes one output RG in the current writer; when the assignment moves to a new output file, close the previous writer and open a new one.

The streaming body-col mechanism from PR-6b.2 (arrow::compute::interleave + handle.block_on-driven decoder) is unchanged; it just runs over smaller row ranges (one region instead of one whole output).

Single-RG-input restriction lifted

PR-6b.2's check that rejected any multi-RG input is replaced with: reject only prefix_len == 0 AND multi-RG (those still need PR-5's LegacyMultiRGAdapter). Multi-RG metric-aligned inputs are now accepted natively.

Per-output schema simplification

PR-6b.2 optimised the per-output schema based on per-output sort col data (drop all-null cols, re-dict-encode low-cardinality strings). With per-region streaming we don't know each region's content until we drain it, so PR-6c.2 declares the writer's schema as the full union schema and leaves output strings as Utf8. Per-output dict re-encoding can be reintroduced later by tracking cardinality during the streaming pass.

Tests

  • All 9 PR-6b.2 tests still pass (single-RG input regression — behaviour preserved).
  • New test_multi_rg_metric_aligned_input_produces_multi_rg_output: feeds a 2-RG metric-aligned input (prefix_len = 1, RG 0 = cpu.usage, RG 1 = memory.used); the streaming engine accepts it and produces a 2-RG output (one RG per metric_name region).
  • Renamed test_multi_rg_input_rejectedtest_legacy_multi_rg_input_rejected to reflect the new rejection scope (only prefix_len == 0 multi-RG is rejected; metric-aligned is accepted).

10/10 streaming tests pass. 442/442 crate tests pass. Clippy, doc, machete, fmt all clean.

What changed

Added:

  • Region struct + extract_regions_from_metadata + assign_regions_to_output_files (region pre-computation; reads parquet column chunk statistics).
  • OutputAccumulator — gathers per-output static metadata (row_keys, zonemap, metric_names, time_range, service names) across the regions assigned to each output file. Computed at finalize_output time.
  • open_output_writer_for_streaming, process_region, build_sorting_columns_from_schema, finalize_output.

Removed (PR-6b.2 multi-output-parallel helpers — no longer needed):

  • write_streaming_outputs, write_all_columns, write_sort_col_for_all_outputs, write_body_col_for_all_outputs, build_parent_union_schema, writer_states_index_view.
  • Old OutputWriterStorage struct, PerOutputStatic, build_per_output_static, derive_output_schema, open_output_writer, finalize_output_writer.

Kept:

  • BodyColOutputPageAssembler and its iterator (the page-bounded body col fan-out — still the streaming core).
  • collect_service_names_from_page, build_full_union_schema_from_arrow_schemas.

Follow-ups deferred

  1. File-size cap with sort-key-boundary splits — when an output file's cumulative bytes-written approach a threshold, close at the next region boundary and open a new file. Adds extra splits beyond caller's N. Easy add on top of this PR's structure.
  2. Per-output schema optimisation — track region body-col cardinality during the streaming pass; reintroduce per-output dict encoding and all-null drops.
  3. Mid-region splits at sorted_series transitions — for finer-grained M:N control when callers want more outputs than there are metric_name regions.

Stack

Base: gtt/streaming-merge-engine-merger (PR-6b.2 #6409).

PR-7 (TBD) wires ParquetMergeExecutor to the streaming engine and deletes ParquetMergeSplitDownloader.

@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 6226032 to add52f0 Compare May 8, 2026 13:33
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from bcaa08b to 5d486d5 Compare May 8, 2026 13:33
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from add52f0 to e1990b2 Compare May 8, 2026 20:49
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from 5d486d5 to 3ee8ef9 Compare May 8, 2026 20:50
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from e1990b2 to 2af2fa8 Compare May 8, 2026 21:31
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from 3ee8ef9 to 307a981 Compare May 8, 2026 21:32
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 2af2fa8 to b2eee32 Compare May 8, 2026 21:47
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from 307a981 to 0f6892a Compare May 8, 2026 21:47
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from b2eee32 to 85b679a Compare May 9, 2026 00:08
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from 0f6892a to 4ce95e5 Compare May 9, 2026 00:08
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 85b679a to 38d4763 Compare May 11, 2026 11:06
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from 4ce95e5 to ebd9683 Compare May 11, 2026 11:06
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 38d4763 to 6adf05d Compare May 11, 2026 11:15
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from ebd9683 to b6842c9 Compare May 11, 2026 11:15
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch 2 times, most recently from 3c6e227 to 83153a2 Compare May 11, 2026 16:36
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from b6842c9 to bf75201 Compare May 11, 2026 17:58
@g-talbot g-talbot changed the title feat: multi-RG output at metric_name boundaries (PR-6c) feat: per-merge-region streaming engine — multi-RG inputs + outputs (PR-6c.2) May 11, 2026
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 83153a2 to f0a2d99 Compare May 11, 2026 18:18
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from bf75201 to 0a168c1 Compare May 11, 2026 18:18
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from f0a2d99 to f748b94 Compare May 12, 2026 11:12
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from 0a168c1 to bd639ce Compare May 12, 2026 11:12
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from f748b94 to 618e366 Compare May 12, 2026 11:30
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from bd639ce to 9d805a7 Compare May 12, 2026 11:30
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 618e366 to ecd10a2 Compare May 12, 2026 12:40
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from 9d805a7 to 9651c63 Compare May 12, 2026 12:40
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from ecd10a2 to 26b9257 Compare May 12, 2026 13:48
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from 9651c63 to 339d0eb Compare May 12, 2026 13:50
@g-talbot
Copy link
Copy Markdown
Contributor Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 339d0eb6f7

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread quickwit/quickwit-parquet-engine/src/merge/streaming.rs Outdated
Comment thread quickwit/quickwit-parquet-engine/src/merge/streaming.rs Outdated
Comment thread quickwit/quickwit-parquet-engine/src/merge/streaming.rs Outdated
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch 9 times, most recently from 91a0a77 to f2588e6 Compare May 12, 2026 19:58
…PR-6c.2)

Restructures PR-6b.2's flat phase 0 → phase 3 into a per-merge-region
loop. Unlocks multi-RG metric-aligned input support and produces
multi-RG output naturally — one output row group per merge region
(typically one per metric_name when `rg_partition_prefix_len == 1`).

## Architecture

Sort-prefix alignment (`prefix_len >= 1`) guarantees that any merge
region has AT MOST one row group per input. That single invariant
unlocks the restructure:

1. Pre-compute regions from RG metadata. For `prefix_len >= 1`, read
   each RG's metric_name min stat (must equal max — verifies
   metric-alignment). Group RGs across inputs by prefix_key. Sort
   regions by prefix_key. For `prefix_len == 0` (single-RG inputs
   only, validated earlier), one region covers everything.

2. Assign regions to output files by cumulative row count. Caller's
   `num_outputs` preserved as the upper bound. Each output file gets
   a contiguous slice of the region list, so output files have
   non-overlapping key ranges.

3. Per-region processing: for each region, advance contributing
   inputs' decoders through their RGs (drain sort cols of that RG,
   then stream body cols via the existing page-bounded
   BodyColOutputPageAssembler). Each region becomes one output RG in
   the current writer; when the assignment moves to a new output
   file, close the previous writer and open a new one.

The streaming body-col mechanism from PR-6b.2 (arrow::compute::
interleave + handle.block_on driven decoder) is unchanged; it just
runs over smaller row ranges (one region instead of one whole
output).

## Single-RG-input restriction lifted

PR-6b.2's check that rejected any multi-RG input is replaced with:
reject only `prefix_len == 0` AND multi-RG (those still need PR-5's
LegacyMultiRGAdapter). Multi-RG metric-aligned inputs are now
accepted natively.

## Per-output schema simplification

PR-6b.2 optimised the per-output schema based on per-output sort col
data (drop all-null cols, re-dict-encode low-cardinality strings).
With per-region streaming we don't know each region's content until
we drain it, so PR-6c.2 declares the writer's schema as the full
union schema and leaves output strings as Utf8. Per-output dict
re-encoding can be reintroduced later by tracking cardinality during
the streaming pass.

## Tests

- All 9 PR-6b.2 tests still pass (single-RG input regression —
  behaviour preserved).
- New test_multi_rg_metric_aligned_input_produces_multi_rg_output:
  feeds a 2-RG metric-aligned input (prefix_len = 1, RG 0 =
  cpu.usage, RG 1 = memory.used); the streaming engine accepts it
  and produces a 2-RG output (one RG per metric_name region).
- Renamed test_multi_rg_input_rejected →
  test_legacy_multi_rg_input_rejected to reflect the new rejection
  scope (only prefix_len == 0 multi-RG is rejected; metric-aligned
  is accepted).

10/10 streaming tests pass. Clippy, doc, machete, fmt all clean.

## Follow-ups deferred

1. File-size cap with sort-key-boundary splits.
2. Per-output schema optimisation (track region body-col cardinality
   during the streaming pass).
3. Mid-region splits at sorted_series transitions for finer-grained
   M:N control when callers want more outputs than regions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg branch from f2588e6 to d93f8e6 Compare May 12, 2026 21:38
@g-talbot
Copy link
Copy Markdown
Contributor Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: d93f8e6c65

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread quickwit/quickwit-parquet-engine/src/merge/streaming/region_grouping.rs Outdated
Two P1 bugs flagged by Codex on PR-6c.2 (#6410):

1. **Duplicate input row groups silently dropped.** When one input
   contained two RGs with the same composite prefix key,
   `process_region` overwrote `sort_col_batches[input_idx]` while
   `Region::total_rows` still counted both — losing rows and
   misaligning the body-col / sort-col mapping. Now enforce
   at-most-one-RG-per-input-per-prefix as a strong invariant at three
   sites: the merge read path (`extract_regions_from_metadata`), the
   streaming merge output finalize, and the indexing writer
   (`ParquetWriter::write_to_bytes` / `write_to_file_with_metadata`).
   The new `assert_unique_rg_prefix_keys` helper is shared.

2. **Byte-array prefix encoding broke lex order across lengths.**
   The 4-byte length prefix made `"b"` sort before `"aa"`, violating
   the declared ASC order. Switched to byte-stuffed escape encoding
   (`0x00` → `0x00 0x01`, terminator `0x00 0x00`), which preserves
   single-column lex order AND retains unambiguous concatenation for
   composite keys (the terminator is the smallest 2-byte sequence
   under escaping, so shorter values still sort before longer ones
   with the same prefix).

Tests:
- `test_byte_array_prefix_preserves_lex_order_across_lengths` —
  `"aa" < "b"`, empty < non-empty, shared-prefix shorter < longer,
  null-byte escaping preserves order.
- `test_streaming_merge_rejects_duplicate_prefix_rgs_in_one_input` —
  end-to-end bail with clear error.
- `test_write_to_bytes_rejects_duplicate_rg_prefix_when_claimed_aligned`
  + the `write_to_file` and single-RG positive counterparts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot
Copy link
Copy Markdown
Contributor Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 05dfba44c3

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +390 to +393
return Ok(vec![Region {
prefix_key: Vec::new(),
contributing,
}]);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve num_outputs on the prefix_len=0 path

When the inputs have rg_partition_prefix_len == 0 (the normal single-row-group/legacy path), this branch always returns exactly one region. The outer merge loop now assigns files only at region boundaries, so num_outputs > 1 can never produce more than one output even when the merged data contains many sorted_series split points; the previous implementation used compute_output_boundaries to split the merge order within this case. Compactions of ordinary single-RG inputs requesting multiple outputs will silently create one oversized output file.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in c85bb8e — taking the broader approach we discussed offline: the prefix_len=0 path now synthesizes prefix-aligned regions during the merge rather than restoring the old compute_output_boundaries split.

Specifically: for rg_partition_prefix_len == 0 inputs, the engine drains all sort cols upfront, computes the global k-way merge order, and walks runs to find first-sort-col (e.g. metric_name) transitions. Each transition opens a synthesized region with per-input row ranges; the output writer materialises one parquet RG per region. The output is therefore prefix-aligned and declares rg_partition_prefix_len = 1 so future compactions take the fast streaming path.

Multi-output splitting falls out for free — assign_regions_to_output_files distributes the synthesized regions across files balancing row count. Regression test test_prefix_len_zero_multi_output_splits_at_prefix_transitions covers the exact num_outputs > 1 → multiple files case you flagged (6 metric_names × 50 rows + num_outputs = 3 produces 3 outputs, each declaring prefix_len = 1). Companion test test_prefix_len_zero_single_output_is_prefix_aligned_multi_rg covers num_outputs = 1 producing multi-RG output.

Plumbing: Region::contributing carries per-input start_row/num_rows so a single RG can be sliced across adjacent synthesized regions; InputDecoderState switched to per-parquet-col page cache + cursor (a HashMap keyed by col_idx) so reads of a later col that pull leftover earlier-col pages into the stream still cache them for the next synthesized region.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followup to my previous reply: the synthesis-at-metric_name approach was wrong on two counts (caught in review): the engine was declaring prefix_len = 1 on output regardless of input, and the synthesized boundaries couldn't honor num_outputs > 1 for a giant single-metric input (no metric_name transitions to split at).

Replaced in b642371 with a sub-region splitter that breaks at sorted_series transitions whenever a region's row count would push the current output past total_rows / num_outputs, never inside a single sorted_series run. Output rg_partition_prefix_len now inherits from InputMetadata unchanged — the engine no longer declares a prefix it can't unconditionally guarantee.

New tests:

  • test_prefix_len_zero_multi_output_splits_at_sorted_series — original P2 case (6 metrics × 50 rows + num_outputs = 3 → 3 balanced outputs).
  • test_prefix_len_zero_giant_single_metric_splits_into_multiple_outputs — 1 metric × 200 rows + num_outputs = 2 → 2 outputs split at sorted_series within the one metric (the case the previous fix missed).
  • test_prefix_len_zero_single_output_is_single_rg — no split needed, one output, one RG, prefix KV absent.

g-talbot and others added 6 commits May 13, 2026 07:41
…regions

Codex P2 on PR-6c.2 (#6410): for `rg_partition_prefix_len == 0`
inputs, `extract_regions_from_metadata` returned a single region
spanning all inputs, and the region-to-output assigner could only
split work at region boundaries. `num_outputs > 1` therefore silently
produced one oversized file. The previous (non-streaming) engine
sidestepped this with `compute_output_boundaries`, splitting the
merge order at `sorted_series` transitions.

Rather than restore the legacy split, unify the two paths: for
prefix_len=0 inputs the engine now drains all sort cols upfront,
computes the global k-way merge order, and walks runs to find
first-sort-col (e.g. `metric_name`) transitions. Each transition
opens a new synthesized region carrying per-input row ranges; the
output writer materialises one parquet row group per region. The
output is therefore prefix-aligned (each RG carries a single first-
sort-col value) and the writer advertises
`rg_partition_prefix_len = 1` so future compactions take the fast
streaming path.

Multi-output splitting falls out for free: the existing
`assign_regions_to_output_files` distributes synthesized regions
across files balancing row count.

Mechanical pieces:

- `Region::contributing` becomes `Vec<RegionContribution>` with
  `start_row` per input, so a single RG can be sliced across multiple
  adjacent regions.
- `InputDecoderState` switches from one-active-col page cache + cursor
  to per-parquet-col HashMaps. `advance_decoder_to_row` now stores
  pages under their actual `col_idx`, so reads of a later col that
  pull leftover earlier-col pages into the stream keep them cached
  for the next synthesized region to consume.
- `set_body_col_cursor` replaces `reset_body_col_state`: it positions
  the cursor and drops only pages strictly below it, preserving rows
  that still belong to future regions.
- `process_region` accepts an optional `prefetched_sort_batches`
  argument. When supplied (the synthesized path) it slices the pre-
  drained sort batches by each contribution's row range; when `None`
  (the existing prefix_len>0 path) it drains a fresh whole-RG batch
  from the decoder, same as before.

Tests:

- `test_prefix_len_zero_multi_output_splits_at_prefix_transitions` —
  6 metric_names × 50 rows + `num_outputs = 3` produces 3 output files;
  each declares `rg_partition_prefix_len = 1`.
- `test_prefix_len_zero_single_output_is_prefix_aligned_multi_rg` —
  3 metric_names + `num_outputs = 1` produces one file with 3 RGs,
  declaring `rg_partition_prefix_len = 1`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…sizing prefix

Per review feedback: the prior commit incorrectly conflated "fix the
prefix_len=0 multi-output regression" with "make every legacy output
prefix-aligned at the engine layer." Two issues with that approach:

1. The engine was declaring `rg_partition_prefix_len = 1` on outputs
   regardless of what the inputs claimed. That's a property the
   engine guessed and forced; it should come from the inputs (or
   from a future legacy-converter layer that translates legacy
   inputs into prefix-aligned form before the engine sees them).

2. Synthesizing regions at first-sort-col transitions doesn't honor
   `num_outputs` when there are no such transitions — a giant single
   metric with `prefix_len = 0` would still collapse into one
   output even with `num_outputs = 3`.

New strategy: keep the per-col page cache and `RegionContribution`
row-range plumbing — both are still needed — but replace the
metric_name-transition synthesis with a sub-region splitter that
breaks at `sorted_series` transitions whenever a region's row count
would push the current output past `target_per_output = total_rows /
num_outputs`. Splitting happens at run boundaries only — never
inside a `sorted_series` run — so a single huge run lands in one
output regardless of size. The output's `rg_partition_prefix_len`
is inherited from `InputMetadata` unchanged.

Main-loop shape: for each top-level region the engine first asks
"would this region fit in the current output's remaining budget?"
If yes, `process_region` drains internally as before (per-region
memory bound preserved). If not, pre-drain the region's sort cols,
compute its merge order, call `split_region_at_sorted_series` to
get sub-regions, then process each sub-region with the prefetched
batches.

Between top-level regions we now `reset_all_body_col_state` on
every input — different RGs have overlapping row-index spaces and
their cached pages would collide. Sub-regions of one top-level
region share an RG and keep the cache; this is what lets an
earlier col's stream-tail get cached during a later col's read and
remain available for the next sub-region.

Tests rewritten:
- `test_prefix_len_zero_multi_output_splits_at_sorted_series` —
  6 metrics × 50 rows, `num_outputs = 3` → 3 outputs balanced near
  100 rows each. Asserts the output's `rg_partition_prefix_len` KV
  is absent (inherits input's 0).
- `test_prefix_len_zero_giant_single_metric_splits_into_multiple_outputs`
  — covers the giant-metric case: 1 metric × 200 rows,
  `num_outputs = 2` → 2 outputs, splitting at `sorted_series`
  transitions inside the single metric.
- `test_prefix_len_zero_single_output_is_single_rg` — `num_outputs =
  1` produces a single output with a single RG and no prefix KV.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The legacy adapter previously consolidated multi-RG legacy inputs
into a single oversized row group and left `rg_partition_prefix_len`
at the original's (typically `0`). The streaming merge engine then
sent these single-RG/prefix=0 inputs through the new sub-region
splitting path — correct, but it forfeits the prefix-aware fast path
for outputs derived from legacy inputs and gives up the row-group
pruning that prefix alignment enables.

After consolidating, the adapter now slices the resulting record
batch at first-sort-col transitions (typically `metric_name`) and
emits one parquet row group per slice, stamping the re-encoded file
with `qh.rg_partition_prefix_len = 1`. The merge engine then reads
it through the prefix-aware fast path: one region per metric_name,
the existing duplicate-prefix invariant on read validates uniqueness.

Fallback: if the original file has no `qh.sort_fields` KV, the
sort-fields string fails to parse, the first column can't be
resolved in the arrow schema, or the consolidated batch is empty,
the adapter reverts to a single-RG re-encode without claiming any
prefix alignment. That input still works — the engine's
prefix_len=0 sub-region splitting path picks it up. This keeps the
adapter robust for files written by very early versions of the
indexer that may pre-date the standard KV layout.

Implementation: `reencode_prefix_aligned` replaces
`reencode_as_single_row_group` and either dispatches to the new
multi-RG writer or to the legacy single-RG writer based on whether
the first sort col is resolvable. `RowConverter` handles the
prefix-value equality check uniformly across dictionary, utf8, and
primitive types. The KV injection helper replaces (rather than
appends) any existing `qh.rg_partition_prefix_len` so re-runs and
files mistakenly carrying a stale value still land at the freshly
synthesized prefix.

Tests:
- `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`
  — 3 metrics × 40 rows, multi-RG input → 3 prefix-aligned output
  RGs and `qh.rg_partition_prefix_len = 1` KV.
- `test_legacy_input_single_metric_yields_one_rg_with_prefix_kv` —
  one metric → one RG, prefix KV still stamped (vacuously aligned).
- `test_legacy_input_without_sort_fields_falls_back_to_single_rg` —
  fallback path preserved when sort-fields KV is missing.
- All existing tests pass unchanged (they use empty KVs or
  unparseable sort-fields strings, both of which exercise the
  fallback path).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…e-prefix support

`LegacyInputAdapter::try_open` now takes `target_prefix_len: u32`
chosen by the caller, matching the merge plan's consensus prefix
length. The adapter slices the consolidated batch at every transition
of the first N sort columns (composite key, via `RowConverter` over
all N fields) and emits one output row group per slice, stamping the
output with `qh.rg_partition_prefix_len = target_prefix_len`. With
`target_prefix_len = 0` the adapter takes the original single-RG
passthrough path with no prefix-alignment claim.

A sort column that is named in `qh.sort_fields` but missing from the
file's arrow schema is treated as implicitly null at every row per
SS-3. A constantly-null column trivially satisfies alignment on that
column (null == null) and contributes no transitions, so the split
boundaries are driven by the columns that are present. This matches
the merge engine's compaction-time treatment of missing columns and
keeps a legacy file with an evolved schema usable as a prefix-aligned
input.

`PrefixUnresolvable` now fires only on cases where the file doesn't
advertise enough sort *names* to honor the request:
- `qh.sort_fields` absent or unparseable
- `qh.sort_fields` declares fewer sort columns than `target_prefix_len`

A column missing from the arrow schema no longer counts as
unresolvable; the adapter materialises a `NullArray` of the batch's
length in that slot and proceeds.

Tests:
- `test_target_prefix_len_zero_passes_through_as_single_rg` — explicit
  N=0 fallback, no prefix KV stamped.
- `test_target_prefix_len_two_splits_by_metric_and_service` — composite
  prefix (`metric_name`, `service`) → 4 RGs, KV declares prefix_len=2.
- `test_target_prefix_len_one_without_sort_fields_returns_unresolvable`
  — no `qh.sort_fields` KV → `PrefixUnresolvable`.
- `test_target_prefix_len_exceeds_declared_sort_cols_returns_unresolvable`
  — sort schema declares 2 cols, caller asks 3 → `PrefixUnresolvable`.
- `test_missing_prefix_col_treated_as_null_satisfies_alignment` —
  sort schema declares `metric_name|env|-timestamp_secs` but `env`
  is absent from the arrow schema → no error, only metric_name
  transitions split RGs, KV still stamps prefix_len=2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…KV stamp

The streaming merge engine produces sort-prefix-aligned multi-RG output
and stamps `qh.rg_partition_prefix_len = input_meta.rg_partition_prefix_len`
in the file's KV (verified by `assert_unique_rg_prefix_keys` before close).
`merge_parquet_split_metadata` then ran after and unconditionally demoted
to 0 whenever `output.num_row_groups > 1` — breaking CS-1 (metastore must
mirror on-disk KV) for every multi-RG streaming-engine output. Aligned
splits got tagged 0 in the metastore on every merge and leaked out of
the prefix-aligned compaction bucket on the next pass.

Carry the value the writer actually stamped via a new
`MergeOutputFile.output_rg_partition_prefix_len` field, then propagate
it as-is in metadata aggregation. Both engines populate the field:
- Legacy `merge/writer.rs` reports its demoted value (row-count-driven
  RG boundaries can't honor prefix alignment, so it stamps 0 on multi-RG).
- Streaming `merge/streaming/output.rs` reports the inputs' prefix
  unchanged (it splits at prefix transitions and the writer verifies).

CS-1 holds by construction — same source of truth, no re-derivation.

Tests:
- `test_output_prefix_len_demoted_when_multi_rg` → renamed to
  `test_output_prefix_len_carries_writers_value_when_demoted`; now
  asserts that the metastore mirrors the writer's reported value.
- New `test_output_prefix_len_preserved_on_multi_rg_streaming_engine`
  asserts that a multi-RG streaming output (writer reports prefix_len=2)
  keeps the prefix in the metastore — the regression case for F1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…onger test verifiers

Three adversarial-review findings on the prefix/RG machinery, bundled
because they touch the same producer/consumer contract:

**F8: Legacy adapter rejects SS-1-violating input upfront.**
The adapter walked rows in physical order and emitted one RG per
prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`)
produced a 3-RG file where two RGs shared prefix `A`, violating PA-3.
The streaming merge engine would later reject it mid-merge — but only
after a quietly-bad file had been built. Now `compute_prefix_value_slices`
tracks each slice's composite prefix-value bytes and bails with
`LegacyAdapterError::InputNotSorted` on duplicates, surfacing the
SS-1 violation before any file lands on disk.

**F12: Consumer-side SS-3 (cross-layer divergence, discovered while
wiring F2's chunk-level verifier into the SS-3 test).** The adapter
implements SS-3 correctly (missing-from-schema → synthesized NullArray
during slice computation, file stamps `prefix_len = N`). The streaming
engine's reader did not: `find_prefix_parquet_col_indices` hard-required
every named prefix column to be physically present, so a file the
adapter produced from an SS-3 input was unreadable by the merge engine.
Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>`
and `extract_rg_composite_prefix_key` emits a constant null marker
(`encode_byte_array_prefix(&[])`) for None slots. The column contributes
no cross-RG ordering signal (constant everywhere) so region boundaries
are driven entirely by the present columns. Both halves of SS-3 now
agree end-to-end.

Known limitation: cross-file SS-3 — where some inputs have a sort
column and others don't — uses [0x00, 0x00] for the null contribution,
which sorts BEFORE non-null per the encoded-empty-string convention.
That weakly violates SS-2 (nulls sort last). Single-file SS-3 is
correct because every RG in such a file contributes the same constant.
If cross-file SS-3 becomes a production scenario, the encoding needs
a leading-0xff sentinel instead. Not exercised today.

**F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming
tests.** Tests asserting `num_row_groups == N` + KV stamped to N would
have passed even with an off-by-one in slice-boundary detection or
column-content scrambling. The verifier reads chunk-level statistics
directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness)
on the composite key. Wired into six tests:
- streaming engine: `test_streaming_merge_with_prefix_len_two`,
  `test_multi_rg_metric_aligned_input_produces_multi_rg_output`,
  `test_streaming_merge_with_desc_prefix_col`
- legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`,
  `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`,
  `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now
  passes thanks to F12).

Also: `assert_unique_rg_prefix_keys` no longer short-circuits on
single-RG files — they still go through PA-1 because an unsorted
single-RG file CAN have `min != max` on a prefix column.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot
Copy link
Copy Markdown
Contributor Author

Sliced into three stacked PRs for easier review — same final tree, just reorganized commits:

PR #6423 (legacy promotion + body-col schema evolution) is being rebased onto #6426.

The branch backing this PR (gtt/streaming-merge-engine-multi-rg) is preserved for reference; no force-push happened here. The three new branches contain an identical tree at the top via cherry-pick (forward synth + correction pair squashed into one commit).

@g-talbot g-talbot closed this May 13, 2026
g-talbot added a commit that referenced this pull request May 18, 2026
… splitting (#6424)

* feat: per-merge-region streaming engine — multi-RG inputs + outputs (PR-6c.2)

Restructures PR-6b.2's flat phase 0 → phase 3 into a per-merge-region
loop. Unlocks multi-RG metric-aligned input support and produces
multi-RG output naturally — one output row group per merge region
(typically one per metric_name when `rg_partition_prefix_len == 1`).

Sort-prefix alignment (`prefix_len >= 1`) guarantees that any merge
region has AT MOST one row group per input. That single invariant
unlocks the restructure:

1. Pre-compute regions from RG metadata. For `prefix_len >= 1`, read
   each RG's metric_name min stat (must equal max — verifies
   metric-alignment). Group RGs across inputs by prefix_key. Sort
   regions by prefix_key. For `prefix_len == 0` (single-RG inputs
   only, validated earlier), one region covers everything.

2. Assign regions to output files by cumulative row count. Caller's
   `num_outputs` preserved as the upper bound. Each output file gets
   a contiguous slice of the region list, so output files have
   non-overlapping key ranges.

3. Per-region processing: for each region, advance contributing
   inputs' decoders through their RGs (drain sort cols of that RG,
   then stream body cols via the existing page-bounded
   BodyColOutputPageAssembler). Each region becomes one output RG in
   the current writer; when the assignment moves to a new output
   file, close the previous writer and open a new one.

The streaming body-col mechanism from PR-6b.2 (arrow::compute::
interleave + handle.block_on driven decoder) is unchanged; it just
runs over smaller row ranges (one region instead of one whole
output).

PR-6b.2's check that rejected any multi-RG input is replaced with:
reject only `prefix_len == 0` AND multi-RG (those still need PR-5's
LegacyMultiRGAdapter). Multi-RG metric-aligned inputs are now
accepted natively.

PR-6b.2 optimised the per-output schema based on per-output sort col
data (drop all-null cols, re-dict-encode low-cardinality strings).
With per-region streaming we don't know each region's content until
we drain it, so PR-6c.2 declares the writer's schema as the full
union schema and leaves output strings as Utf8. Per-output dict
re-encoding can be reintroduced later by tracking cardinality during
the streaming pass.

- All 9 PR-6b.2 tests still pass (single-RG input regression —
  behaviour preserved).
- New test_multi_rg_metric_aligned_input_produces_multi_rg_output:
  feeds a 2-RG metric-aligned input (prefix_len = 1, RG 0 =
  cpu.usage, RG 1 = memory.used); the streaming engine accepts it
  and produces a 2-RG output (one RG per metric_name region).
- Renamed test_multi_rg_input_rejected →
  test_legacy_multi_rg_input_rejected to reflect the new rejection
  scope (only prefix_len == 0 multi-RG is rejected; metric-aligned
  is accepted).

10/10 streaming tests pass. Clippy, doc, machete, fmt all clean.

1. File-size cap with sort-key-boundary splits.
2. Per-output schema optimisation (track region body-col cardinality
   during the streaming pass).
3. Mid-region splits at sorted_series transitions for finer-grained
   M:N control when callers want more outputs than regions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(streaming): reject duplicate-prefix RGs + use escape encoding

Two P1 bugs flagged by Codex on PR-6c.2 (#6410):

1. **Duplicate input row groups silently dropped.** When one input
   contained two RGs with the same composite prefix key,
   `process_region` overwrote `sort_col_batches[input_idx]` while
   `Region::total_rows` still counted both — losing rows and
   misaligning the body-col / sort-col mapping. Now enforce
   at-most-one-RG-per-input-per-prefix as a strong invariant at three
   sites: the merge read path (`extract_regions_from_metadata`), the
   streaming merge output finalize, and the indexing writer
   (`ParquetWriter::write_to_bytes` / `write_to_file_with_metadata`).
   The new `assert_unique_rg_prefix_keys` helper is shared.

2. **Byte-array prefix encoding broke lex order across lengths.**
   The 4-byte length prefix made `"b"` sort before `"aa"`, violating
   the declared ASC order. Switched to byte-stuffed escape encoding
   (`0x00` → `0x00 0x01`, terminator `0x00 0x00`), which preserves
   single-column lex order AND retains unambiguous concatenation for
   composite keys (the terminator is the smallest 2-byte sequence
   under escaping, so shorter values still sort before longer ones
   with the same prefix).

Tests:
- `test_byte_array_prefix_preserves_lex_order_across_lengths` —
  `"aa" < "b"`, empty < non-empty, shared-prefix shorter < longer,
  null-byte escaping preserves order.
- `test_streaming_merge_rejects_duplicate_prefix_rgs_in_one_input` —
  end-to-end bail with clear error.
- `test_write_to_bytes_rejects_duplicate_rg_prefix_when_claimed_aligned`
  + the `write_to_file` and single-RG positive counterparts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(streaming): split regions at sorted_series for prefix_len=0 multi-output

When inputs declare rg_partition_prefix_len = 0 (legacy single-RG)
and the caller asks for num_outputs > 1, the engine subdivides the
single region at sorted_series transitions in the merge order so it
can honor the output count. A single sorted_series run is never
broken; if one run exceeds the remaining budget the whole run lands
in one output anyway. The output inherits the input's
rg_partition_prefix_len (=0) — the engine does not synthesize a
prefix it can't unconditionally guarantee.

Also handles the giant-single-metric case (prefix_len=0, one
metric_name, num_outputs > 1): sorted_series transitions still
split the merge order even though there are no metric_name
transitions to drive a prefix synthesis.

Implementation:
- New `split_region_at_sorted_series` in region_grouping: walks the merge order and splits at
  sorted_series transitions when accumulated rows reach the target budget.
- Main engine loop: when num_outputs > current_output_idx + 1 AND region's rows exceed the
  remaining budget, drain sort cols for the region, compute merge order, call
  split_region_at_sorted_series, process sub-regions.
- Per-col page cache + cursor keyed by col_idx so the body-col path can read pages once and re-use
  them across sub-regions within the same top-level region. Resets between top-level regions
  (different RGs).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(streaming): correct 'crash' → 'bail' in MS-2 doc comments

The MS-2 validation path returns `Err` via `bail!()` (anyhow), not a
panic / abort. Five doc-comment / inline-comment sites described the
failure as "the engine would crash mid-merge" — overstated. Callers
get a `Result::Err` propagated up the spawn_blocking task and the
`streaming_merge_sorted_parquet_files` return.

Sites updated:
- `region_grouping.rs` module doc.
- `validate_region_order_matches_physical_rg_order` doc.
- streaming.rs MS-2 validation call-site comment.
- Test docstrings for `test_streaming_merge_with_desc_prefix_col` and
  `test_ms2_region_order_disagrees_with_physical_rg_order_rejected`.

No behaviour change. 477 lib tests pass; clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(streaming): fix wrong adapter type name + explain rejection intent

Two sites referenced a non-existent `LegacyMultiRGAdapter` — the
actual type, introduced in PR-5 (#6408), is `LegacyInputAdapter`
in `storage::legacy_adapter`. Fixed both references.

Also expanded the rejection-block comment to make the *intent* of
the guard explicit: it catches caller bugs (wiring a raw legacy
multi-RG `StreamingParquetReader` straight into the streaming
merge), not a degraded-input fallback. Production code routes
legacy splits through `merge::execute_merge_operation` which
wraps them in `LegacyInputAdapter` first.

No behaviour change. Targeted test passes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(body_assembler): tighten output-iter termination + assert invariant

adamtobey nit on PR #6424: `rows_emitted >= expected_rows` accepts
`emitted > expected` as a normal termination condition, which would
actually be a real accounting bug. The math rules `>` out by
construction — `page_size = remaining.min(OUTPUT_PAGE_ROWS)` where
`remaining = expected_rows - rows_emitted`, so each
`rows_emitted += page_size` keeps `rows_emitted ≤ expected_rows`.

Two changes:
- Termination becomes `rows_emitted == expected_rows` so we don't
  silently accept an overshoot.
- `debug_assert!(rows_emitted <= expected_rows, …)` at the top of
  `next()` documents the invariant and surfaces a regression loudly
  (panic in debug + tests) instead of silently terminating one
  iteration too late.

No behaviour change in the happy path; bugs that would have produced
`>` now fail tests instead of producing wrong output.

477 lib tests pass; clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(streaming): recompute split budget across the output-rollover boundary

Codex P1 finding on PR #6424: when a top-level region exactly fills
the current output (so `remaining_in_current == 0`) and the next
prefix-aligned region needs splitting, the split's first-sub-region
budget was the stale zero remainder of the about-to-be-finalized
output. `split_region_at_sorted_series` therefore cut after the
first sorted_series run, producing a tiny leftover plus a large
continuation that both inherited the parent region's prefix key.
The sub-region loop then rolled over to a fresh output and wrote
both pieces there, tripping the PA-3 duplicate-prefix-RG check in
`finalize_output`.

Fix: detect the rollover at decision time and compute
`effective_first_target` / `effective_outputs_remaining` against the
*next* output's empty budget. With the fix, the example above just
chooses `needs_split = false` (region fits the fresh output's full
target), processes the region whole, and rolls over cleanly.

Regression test `test_region_exactly_fills_output_does_not_split_next_aligned_region`
exercises the exact scenario Codex described: three 50-row RGs with
distinct (metric, service) prefixes, `num_outputs = 3`, target = 50.
Pre-fix, the merge bailed with PA-3 on output 1; post-fix, three
clean outputs each with one unique prefix key. Verified by reading
each output's parquet metadata back through
`assert_unique_rg_prefix_keys`.

478 lib tests pass (477 prior + 1 new); clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(streaming): reject null-mixed + all-null prefix RGs

Codex P1 on PR #6424: `extract_aligned_prefix_value` decided
prefix alignment purely from `min` / `max` statistics. Parquet
records those over non-null cells only, with `null_count` reported
separately, so two real failure modes slipped through:

1. **Mixed null + non-null.** A row group with `N` nulls plus a
   single non-null cell `"x"` reports `min == max == "x"` and the
   `min == max` check silently accepted it — but two distinct
   prefix keys (null and `"x"`) lived in that RG, breaking the
   at-most-one-prefix-value-per-RG invariant (PA-1).
2. **All-null RG.** Parquet records no `min` / `max` for an all-
   null chunk, so the legacy check bailed with the misleading "no
   min in stats" error. Logically the RG carries one prefix value
   (null) and is aligned — but supporting it cleanly requires a
   null marker in the composite-key encoding that agrees with
   SS-2's "nulls last" rule. `encode_byte_array_prefix(&[])` puts
   nulls *first*; coordinating that with SS-2 is a follow-up.

Fix: read `null_count_opt()` from stats and `num_values()` from
the column-chunk metadata. Bail explicitly in both cases — mixed
with a PA-1 message naming the (nulls, non-null) split, all-null
with a clear "not yet supported" pointer.

Two regression tests in `streaming.rs::tests`:
- `test_mixed_null_and_value_prefix_rg_rejected`: 1 RG, 3 cells
  `"cpu.usage"` + 1 null. Asserts PA-1 bail.
- `test_all_null_prefix_rg_rejected`: 1 RG, 3 nulls. Asserts the
  "all-null … not yet supported" bail.

480 lib tests pass (+2 new); workspace clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor(streaming): share storekey prefix encoding with sorted_series

The per-RG composite prefix key now uses the same storekey-based
encoding as `sorted_series` — same `(ordinal, value)` layout, same
direction-inversion, same null-skip pattern — so a per-RG prefix
key is a literal byte prefix of every `sorted_series` value emitted
by rows in that RG.

Why: the prior byte-stuffed escape encoding had no in-line way to
represent an all-null prefix RG (an empty marker would lex-sort
before any present-value key, conflicting with SS-2 nulls-last).
With the shared encoding, an all-null column is skipped entirely
and the next column's higher ordinal byte appears in its place,
giving nulls-last ordering for free — the same trick already proven
in `sorted_series::encode_row_key`.

Per-column logic now goes through one helper:

  `crate::sorted_series::append_prefix_col_to_key(buf, ord, val, desc)`

shared between `sorted_series` (per-row keys) and
`merge::streaming::region_grouping` (per-RG keys). It writes
`storekey(ord) || storekey(val)` and inverts only the value bytes
for DESC columns. `sorted_series::encode_row_key` was refactored
to call the helper; the open-coded inline encoding is gone.

Trailing **prefix-length sentinel**: each per-RG key ends with a
`u8(prefix_len)` ordinal byte. This handles the prefix_len=1
edge case where an all-null RG's empty body would otherwise lex-sort
*before* any non-null RG — with the sentinel, the all-null key
becomes `[prefix_len]` and non-null keys still start with `ord(0)`
(< prefix_len), so non-null sorts first. The sentinel is also what
`sorted_series` writes immediately after the prefix cols, so the
literal-prefix property is preserved.

Null handling in `extract_rg_composite_prefix_key`:
- **All-null RG**: column skipped, RG groups into its own region (after non-null regions).
- **Mixed null + non-null**: rejected as a PA-1 violation (rows in the same RG would encode to
  two distinct prefix keys; producer is supposed to start a new RG at the null/non-null
  transition).
- **Otherwise**: standard `min == max` check, then the type-dispatched storekey encoding via the
  helper.

Removed:
- `extract_aligned_prefix_value` (replaced by `encode_prefix_col_value` which calls the helper).
- `encode_byte_array_prefix` (byte-stuffed escape, no longer used).
- `invert_for_descending` (the helper handles inversion per-column).
- `test_invert_for_descending_reverses_lex_order` and
  `test_byte_array_prefix_preserves_lex_order_across_lengths` (byte-level tests of the removed
  encoding; semantic properties remain enforced by `storekey`'s own tests plus the higher-level
  prefix tests).

Replaced `test_all_null_prefix_rg_rejected` with
`test_all_null_prefix_rg_groups_into_separate_region_sorted_last`:
builds two inputs (one with `metric_name = "cpu.usage"`, one with
`metric_name = NULL`) and verifies the merged output has two RGs
with the all-null region in RG 1 (sorted after the non-null
region) — pinning the nulls-last ordering that the sentinel
encoding produces.

Updated `test_extract_rg_composite_prefix_key_two_byte_array_cols`
for the new byte layout (`storekey(ord) || storekey(val)` per col
plus the trailing sentinel byte).

`PrefixColumn` gains an `ordinal: u8` field, populated from each
column's position in `qh.sort_fields` so it matches the ordinal
`sorted_series` would assign.

478 lib tests pass; workspace clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style(streaming): re-fmt to latest nightly rustfmt

CI's nightly rustfmt (1.9.0-nightly 2026-05-17) wrapped a handful
of comment / bail!-message / where-clause / vec! literal lines
slightly differently than my local nightly at commit time
(1.9.0-nightly 2026-05-11). Re-formatting all three affected files
catches the drift in this commit so CI Lints stops complaining;
local nightly is now updated to match CI.

No behaviour change. 478 lib tests still pass on the slice.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(merge): preserve metastore rg_partition_prefix_len from writer's KV stamp

The streaming merge engine produces sort-prefix-aligned multi-RG output
and stamps `qh.rg_partition_prefix_len = input_meta.rg_partition_prefix_len`
in the file's KV (verified by `assert_unique_rg_prefix_keys` before close).
`merge_parquet_split_metadata` then ran after and unconditionally demoted
to 0 whenever `output.num_row_groups > 1` — breaking CS-1 (metastore must
mirror on-disk KV) for every multi-RG streaming-engine output. Aligned
splits got tagged 0 in the metastore on every merge and leaked out of
the prefix-aligned compaction bucket on the next pass.

Carry the value the writer actually stamped via a new
`MergeOutputFile.output_rg_partition_prefix_len` field, then propagate
it as-is in metadata aggregation. Both engines populate the field:
- Legacy `merge/writer.rs` reports its demoted value (row-count-driven
  RG boundaries can't honor prefix alignment, so it stamps 0 on multi-RG).
- Streaming `merge/streaming/output.rs` reports the inputs' prefix
  unchanged (it splits at prefix transitions and the writer verifies).

CS-1 holds by construction — same source of truth, no re-derivation.

Tests:
- `test_output_prefix_len_demoted_when_multi_rg` → renamed to
  `test_output_prefix_len_carries_writers_value_when_demoted`; now
  asserts that the metastore mirrors the writer's reported value.
- New `test_output_prefix_len_preserved_on_multi_rg_streaming_engine`
  asserts that a multi-RG streaming output (writer reports prefix_len=2)
  keeps the prefix in the metastore — the regression case for F1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant