Skip to content

feat(merge-executor): YAML flag — streaming engine as default for regular merges, in-memory as fallback#6441

Open
g-talbot wants to merge 9 commits into
gtt/adversarial-review-test-coveragefrom
gtt/parquet-merge-streaming-engine-flag
Open

feat(merge-executor): YAML flag — streaming engine as default for regular merges, in-memory as fallback#6441
g-talbot wants to merge 9 commits into
gtt/adversarial-review-test-coveragefrom
gtt/parquet-merge-streaming-engine-flag

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

@g-talbot g-talbot commented May 18, 2026

Summary

Wires the streaming Parquet merge engine into the regular (non-promotion) merge path behind a new YAML feature flag.

  • Adds indexer.parquet_merge_use_streaming_engine: bool to IndexerConfig (default: false).
  • Plumbs the flag from IndexerConfigIndexingServiceParquetMergePipelineParamsParquetMergeExecutor::new.
  • Updates ParquetMergeExecutor::handle to route on the flag: if promotion || use_streaming { streaming } else { in-memory fallback }. Promotion merges still always use the streaming engine (the in-memory path can't handle mixed rg_partition_prefix_len).
  • The in-memory merge_sorted_parquet_files path stays in place as the runtime fallback so production can flip back via YAML without a redeploy if the streaming engine hits a bug. To be removed once the streaming engine has soaked.
  • Adds engine-parity tests in quickwit-parquet-engine/src/merge/tests.rs that run both engines on the same realistic fixture and assert row-by-row column equivalence. These gate the eventual default flip.
  • Adds test_merge_pipeline_end_to_end_with_streaming_engine_flag — an actor-chain integration test that runs the full planner → downloader → executor → uploader → publisher pipeline with the flag set, and asserts (a) the merge published, (b) PEAK_BODY_COL_PAGE_CACHE_LEN > 0 after the merge (the streaming engine writes to this atomic; in-memory never does — non-zero is direct evidence the streaming path executed), (c) the output row count and metric names are correct.
  • Exercises the new YAML field in the quickwit-config YAML/JSON/TOML test fixtures so parse / serde regressions get caught.
  • Moves the MS-7 serial lock out of the streaming-tests submodule to module scope so the new parity tests can share it (any streaming-merge test must serialise against MS-7's reset/load of PEAK_BODY_COL_PAGE_CACHE_LEN).
  • Records the dual-engine state as Deviation Add a CLA #1 under docs/internals/adr/deviations/. The doc cites the ADR-003 §4 streaming-merge requirement, lays out exit criteria (default flipped + ≥ 2-week production soak + no rollback), and lists the cleanup PR's tasks.

This is the GAP-011-adjacent wire-in described in the streaming-merge roadmap memory: until the planner is taught to emit promote_legacy operations (GAP-011), the streaming engine sees no regular merges in production. Flipping this flag changes that — but only when the operator opts in.

Rollout plan

  1. Merge with default false. CI parity tests + the new pipeline integration test guard correctness on synthetic fixtures.
  2. Set true on a soak fleet, run for a planned window (≥ 2 weeks), monitor for divergence vs. the in-memory path.
  3. Flip the default to true in a follow-up PR.
  4. After production has soaked at the new default, a separate PR deletes the in-memory branch, merge_sorted_parquet_files, the flag, the parity tests, and closes Deviation Add a CLA #1.

Test plan

  • cargo test -p quickwit-parquet-engine --lib — 504 tests pass (incl. two new parity tests).
  • cargo test -p quickwit-indexing --all-features --lib parquet_merge_pipeline_test — both end-to-end tests pass (the original in-memory one and the new streaming-engine-flag one).
  • cargo test -p quickwit-config --lib node_config — 27 tests pass with the new field exercised in YAML/JSON/TOML.
  • cargo clippy --workspace --all-features --tests with RUSTFLAGS="--cfg tokio_unstable -Dwarnings" — clean.
  • cargo doc -p quickwit-config -p quickwit-parquet-engine -p quickwit-indexing --no-deps --all-features with RUSTDOCFLAGS="-Dwarnings" — clean.
  • cargo machete — no unused deps.
  • cargo +nightly fmt --all -- --check — clean on touched files.
  • Soak on a single-fleet rollout once base PRs land.

Architecture evolution

Stack

Stacked on #6428 (gtt/adversarial-review-test-coverage).

🤖 Generated with Claude Code

@g-talbot g-talbot requested review from a team as code owners May 18, 2026 16:38
@g-talbot g-talbot force-pushed the gtt/adversarial-review-test-coverage branch from 0690fe1 to 4ab52a4 Compare May 18, 2026 18:17
g-talbot and others added 9 commits May 18, 2026 14:18
Adds `merge::tests::parity` with two tests that run the same realistic
input fixture through both `merge_sorted_parquet_files` (in-memory
engine) and `execute_merge_operation` (streaming engine over the same
`LocalFileByteSource` the executor uses in production), then asserts
row-by-row equivalence on every visible column. These gate the upcoming
YAML flag that flips regular merges to the streaming engine: parity
must hold before the default is flipped in production.

The streaming engine writes a process-global atomic
(`PEAK_BODY_COL_PAGE_CACHE_LEN`) that the MS-7 tests reset-then-read.
Any test that runs a streaming merge must serialise against MS-7 or
inflate its readings. Move `ms7_serial_lock` from the streaming-tests
submodule to module scope (still `#[cfg(test)] pub(crate)`) so the new
parity tests acquire the same lock.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…treaming engine

Wires the streaming Parquet merge engine into the regular (non-promotion)
merge path behind a node-level YAML flag,
`indexer.parquet_merge_use_streaming_engine`, defaulted to false. When
true, `ParquetMergeExecutor::handle` runs every merge through
`execute_merge_operation` (the column-major, page-bounded streaming
engine) instead of the in-memory `merge_sorted_parquet_files`.
Promotion merges (`target_prefix_len_override.is_some()`) continue to
take the streaming path unconditionally — the in-memory engine can't
handle mixed `rg_partition_prefix_len` inputs.

The in-memory engine stays in place as the runtime fallback. If the
streaming engine hits a bug in production, an operator can flip the
flag back to `false` via YAML without redeploying. Once the streaming
path has soaked, the fallback branch and `merge_sorted_parquet_files`
itself can be removed.

The flag is plumbed `IndexerConfig` → `IndexingService` →
`ParquetMergePipelineParams` → `ParquetMergeExecutor::new`, and exercised
end-to-end by the engine parity tests in the previous commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…N/TOML fixtures

Adds the new flag to the YAML, JSON, and TOML node-config test fixtures
and bumps the expected `IndexerConfig` in `node_config_parse_*` to
`parquet_merge_use_streaming_engine: true`. Catches parse / serde
regressions on the field — e.g., a rename or a default-fn typo would
fail the test instead of silently parsing as `false`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds `test_merge_pipeline_end_to_end_with_streaming_engine_flag`, an
integration test that runs the full actor chain (planner → downloader
→ executor → uploader → publisher) with
`ParquetMergePipelineParams::use_streaming_engine = true`. Asserts:

1. Publish fired with the right replaced_split_ids (merge ran
   end-to-end through the executor).
2. `PEAK_BODY_COL_PAGE_CACHE_LEN > 0` after the merge. The streaming
   engine increments this on every body-col page assembly; the
   in-memory engine never touches it. Non-zero is direct evidence
   the streaming path executed — not a silent fallback to in-memory.
3. The merge output row count and metric names are correct.

To make assertion (2) work cross-crate, exposes
`PEAK_BODY_COL_PAGE_CACHE_LEN` as `pub` under
`#[cfg(any(test, feature = "testsuite"))]`. The visibility widening is
test-only — production builds never see the symbol.

This is the closest analog to the sesh-mode "production-path" rule
that is feasible today: the metrics pipeline's OTLP gRPC ingest path
is not yet wired into `quickwit-serve`, so the closest end-to-end
test is the actor-chain integration test that this PR adds.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Captures the intentional, time-bounded divergence from ADR-003 §4
introduced by the streaming-engine wire-in: two engines coexist in
production behind `IndexerConfig::parquet_merge_use_streaming_engine`,
with the in-memory engine retained as the runtime fallback.

Documents:
- The ADR-003 §4 quote the deviation diverges from (page-granular
  streaming, bounded memory).
- The current dual-engine implementation and routing logic.
- Why this exists (production safety, staged rollout, parity is
  strong-but-not-total).
- Explicit exit criteria: default flipped to `true`, ≥ 2-week
  production soak with no merge-correctness incidents, no rollback.
  When met, a follow-up PR deletes the in-memory branch and engine,
  the flag, and the parity tests.

This is the first deviation recorded under the EVOLUTION.md framework.
Indexes the doc in `deviations/README.md`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…gine tests

Extracts the steps-5-through-8 assertions (replaced_split_ids, staged
metadata, Parquet file content, Parquet KV headers) into
`assert_cpu_mem_merge_outputs_correct` and calls it from both
`test_merge_pipeline_end_to_end` (in-memory engine) and
`test_merge_pipeline_end_to_end_with_streaming_engine_flag` (streaming
engine). The streaming-engine test had been doing only a small subset
of the checks — row count and metric names. It now runs the full
contract: time_range, num_merge_ops, sort_fields, row_keys_proto,
zonemap_regexes, low_cardinality_tags, all 100 timestamps,
sorted_series monotonicity, cpu/mem sort-order semantics, and every
`qh.*` Parquet KV header.

By construction both engines must produce a file that satisfies the
same contract — the helper is the executable parity between engines
at the pipeline-integration level, complementing the column-level
parity tests in `quickwit-parquet-engine::merge::tests::parity`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ointness

Expands test coverage along three axes the existing helpers didn't hit:

1. **Multi-input, multi-metric pipeline tests** (new file
   `parquet_merge_pipeline_multi_metric_test.rs`). Three inputs, each
   carrying three metrics with overlapping per-metric timeseries IDs
   and overlapping-but-distinct timestamps — the merge must
   row-by-row interleave across all three inputs. Output writer uses
   `row_group_size = 50` so the 180-row merge output breaks into
   four row groups, exercising the writer's multi-RG path in both
   engines. Both engine variants (in-memory + streaming) covered.
   Streaming-engine test asserts `PEAK_BODY_COL_PAGE_CACHE_LEN > 0`
   to confirm the flag routed through the streaming path.

2. **Engine-level multi-output contract** in
   `merge::tests::parity::assert_engine_parity`. Beyond the existing
   engine-vs-engine column equivalence, every parity test now also
   verifies on the in-memory engine's outputs (equivalent to the
   streaming engine's): sum of per-output row counts equals total
   input rows, each output internally monotonic on `sorted_series`,
   and across outputs the partition is disjoint (no two outputs
   share any `sorted_series` value). This is the m:n
   non-overlap contract.

3. **Multi-metric overlapping-input m:n** test
   `parity_multi_metric_overlapping_inputs_multi_output` exercises
   the strengthened contract with three inputs × three metrics where
   per-metric keyspaces overlap across inputs. n = 3 outputs target.

Honest scope note in the new pipeline test module's doc: the actor
pipeline today hardcodes `num_outputs = 1` in `ParquetMergeExecutor`,
so n > 1 is not reachable end-to-end through the actor system. The
new engine-level test covers the n > 1 correctness contract for now;
when the executor is taught to accept `num_outputs > 1` from the
merge policy, the pipeline tests can grow an n > 1 variant.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the hardcoded `MergeConfig { num_outputs: 1, ... }` in
`ParquetMergeExecutor::handle` with a per-merge computation:

    num_outputs = max(1, ceil(total_input_bytes / target_split_size_bytes))

So a merge that ingests more than one target's worth of data spreads
across multiple output files; merges that fit in one target keep
producing a single output (preserving today's behavior for the
common case). The engine clamps the request to the number of
`sorted_series` boundaries actually available, so the value is an
upper bound, not an exact count.

Plumbing: `IndexerConfig` already carries `target_split_size_bytes`
in `ParquetMergePolicyConfig`. Pass that through
`ParquetMergePipelineParams.target_split_size_bytes` →
`ParquetMergeExecutor::new`. Default for tests:
`256 * 1024 * 1024` (matches the production default).

Latent multi-output bug fixed at the same time: with n>1, the
executor used to assign the planner-supplied `merge_split_id` to
**every** output split, which would have collided on the rename to
`{split_id}.parquet`. First output keeps the planner ID for
observability continuity; subsequent outputs use the fresh IDs
generated by `merge_parquet_split_metadata`.

Also exposes `quickwit_parquet_engine::merge::streaming::ms7_serial_lock`
as `pub` under the `testsuite` feature so cross-crate streaming tests
(in `quickwit-indexing`) can serialise against the same global lock
the in-crate MS-7 tests use. The streaming engine writes to a
process-global atomic on every merge — without shared locking, the
existing pipeline streaming-engine test races `store(0)` against
other tests' merges. Adds the appropriate
`#[allow(clippy::await_holding_lock)]` to the in-crate
`test_merge_pipeline_end_to_end_with_streaming_engine_flag` to
match.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the bonus scenario: three multi-metric inputs each written with
`rg_partition_prefix_len = 1` and one row group per distinct
metric_name (via `row_group_size = ROWS_PER_METRIC_PER_INPUT` so the
writer flushes at every metric boundary after sorting). Merged with a
small `ParquetMergePipelineParams::target_split_size_bytes = 500`
that forces the executor's `num_outputs` calculation to ask the
engine for multiple outputs — exercising the m:n merge path now
reachable through the actor pipeline (PR's earlier commit removed
the `num_outputs = 1` hardcode).

Both engines covered:

- `test_prefix_aligned_multi_metric_three_input_multi_output_in_memory_engine`
- `test_prefix_aligned_multi_metric_three_input_multi_output_streaming_engine`

The streaming-engine variant also asserts
`PEAK_BODY_COL_PAGE_CACHE_LEN > 0` (under `ms7_serial_lock`) so a
silent fallback to the in-memory path would fail.

The shared assertion helper
`assert_three_input_three_metric_multi_output_correct` checks the
m:n contract end-to-end at the pipeline level:

- All three input splits replaced.
- ≥ 2 output splits staged (proves splitting happened).
- Sum of per-output row counts = total input rows.
- Each output internally monotonic on `sorted_series`.
- Across outputs, the `sorted_series` partition is disjoint — no
  two outputs share any key, which is the "non-overlapping output"
  contract the engine promises.
- Union of metric_names / services across outputs = full input set.
- Every output has `num_merge_ops = 1`, `row_keys_proto`, and a
  `metric_name` zonemap regex.

To pin the test to exactly one merge (not a cascade of merges over
the now-multiple staged outputs), `make_pipeline_params` now takes
`max_merge_ops` and the bonus tests set it to `1`: outputs land at
`num_merge_ops = 1`, equal to the policy ceiling, and the planner
refuses to merge them again. The existing n=1 tests stay at 5
(headroom — they produce a single output that can't trigger another
merge anyway, since `merge_factor = 3`).

Updates the module doc to drop the now-stale scope note about m:n
not being reachable through the pipeline.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/parquet-merge-streaming-engine-flag branch from 452a5d2 to 6be2cd9 Compare May 18, 2026 18:18
@g-talbot g-talbot requested a review from adamtobey May 18, 2026 18:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants