feat(merge-executor): YAML flag — streaming engine as default for regular merges, in-memory as fallback#6441
Open
g-talbot wants to merge 9 commits into
Conversation
0690fe1 to
4ab52a4
Compare
Adds `merge::tests::parity` with two tests that run the same realistic input fixture through both `merge_sorted_parquet_files` (in-memory engine) and `execute_merge_operation` (streaming engine over the same `LocalFileByteSource` the executor uses in production), then asserts row-by-row equivalence on every visible column. These gate the upcoming YAML flag that flips regular merges to the streaming engine: parity must hold before the default is flipped in production. The streaming engine writes a process-global atomic (`PEAK_BODY_COL_PAGE_CACHE_LEN`) that the MS-7 tests reset-then-read. Any test that runs a streaming merge must serialise against MS-7 or inflate its readings. Move `ms7_serial_lock` from the streaming-tests submodule to module scope (still `#[cfg(test)] pub(crate)`) so the new parity tests acquire the same lock. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…treaming engine Wires the streaming Parquet merge engine into the regular (non-promotion) merge path behind a node-level YAML flag, `indexer.parquet_merge_use_streaming_engine`, defaulted to false. When true, `ParquetMergeExecutor::handle` runs every merge through `execute_merge_operation` (the column-major, page-bounded streaming engine) instead of the in-memory `merge_sorted_parquet_files`. Promotion merges (`target_prefix_len_override.is_some()`) continue to take the streaming path unconditionally — the in-memory engine can't handle mixed `rg_partition_prefix_len` inputs. The in-memory engine stays in place as the runtime fallback. If the streaming engine hits a bug in production, an operator can flip the flag back to `false` via YAML without redeploying. Once the streaming path has soaked, the fallback branch and `merge_sorted_parquet_files` itself can be removed. The flag is plumbed `IndexerConfig` → `IndexingService` → `ParquetMergePipelineParams` → `ParquetMergeExecutor::new`, and exercised end-to-end by the engine parity tests in the previous commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…N/TOML fixtures Adds the new flag to the YAML, JSON, and TOML node-config test fixtures and bumps the expected `IndexerConfig` in `node_config_parse_*` to `parquet_merge_use_streaming_engine: true`. Catches parse / serde regressions on the field — e.g., a rename or a default-fn typo would fail the test instead of silently parsing as `false`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds `test_merge_pipeline_end_to_end_with_streaming_engine_flag`, an integration test that runs the full actor chain (planner → downloader → executor → uploader → publisher) with `ParquetMergePipelineParams::use_streaming_engine = true`. Asserts: 1. Publish fired with the right replaced_split_ids (merge ran end-to-end through the executor). 2. `PEAK_BODY_COL_PAGE_CACHE_LEN > 0` after the merge. The streaming engine increments this on every body-col page assembly; the in-memory engine never touches it. Non-zero is direct evidence the streaming path executed — not a silent fallback to in-memory. 3. The merge output row count and metric names are correct. To make assertion (2) work cross-crate, exposes `PEAK_BODY_COL_PAGE_CACHE_LEN` as `pub` under `#[cfg(any(test, feature = "testsuite"))]`. The visibility widening is test-only — production builds never see the symbol. This is the closest analog to the sesh-mode "production-path" rule that is feasible today: the metrics pipeline's OTLP gRPC ingest path is not yet wired into `quickwit-serve`, so the closest end-to-end test is the actor-chain integration test that this PR adds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Captures the intentional, time-bounded divergence from ADR-003 §4 introduced by the streaming-engine wire-in: two engines coexist in production behind `IndexerConfig::parquet_merge_use_streaming_engine`, with the in-memory engine retained as the runtime fallback. Documents: - The ADR-003 §4 quote the deviation diverges from (page-granular streaming, bounded memory). - The current dual-engine implementation and routing logic. - Why this exists (production safety, staged rollout, parity is strong-but-not-total). - Explicit exit criteria: default flipped to `true`, ≥ 2-week production soak with no merge-correctness incidents, no rollback. When met, a follow-up PR deletes the in-memory branch and engine, the flag, and the parity tests. This is the first deviation recorded under the EVOLUTION.md framework. Indexes the doc in `deviations/README.md`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…gine tests Extracts the steps-5-through-8 assertions (replaced_split_ids, staged metadata, Parquet file content, Parquet KV headers) into `assert_cpu_mem_merge_outputs_correct` and calls it from both `test_merge_pipeline_end_to_end` (in-memory engine) and `test_merge_pipeline_end_to_end_with_streaming_engine_flag` (streaming engine). The streaming-engine test had been doing only a small subset of the checks — row count and metric names. It now runs the full contract: time_range, num_merge_ops, sort_fields, row_keys_proto, zonemap_regexes, low_cardinality_tags, all 100 timestamps, sorted_series monotonicity, cpu/mem sort-order semantics, and every `qh.*` Parquet KV header. By construction both engines must produce a file that satisfies the same contract — the helper is the executable parity between engines at the pipeline-integration level, complementing the column-level parity tests in `quickwit-parquet-engine::merge::tests::parity`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ointness Expands test coverage along three axes the existing helpers didn't hit: 1. **Multi-input, multi-metric pipeline tests** (new file `parquet_merge_pipeline_multi_metric_test.rs`). Three inputs, each carrying three metrics with overlapping per-metric timeseries IDs and overlapping-but-distinct timestamps — the merge must row-by-row interleave across all three inputs. Output writer uses `row_group_size = 50` so the 180-row merge output breaks into four row groups, exercising the writer's multi-RG path in both engines. Both engine variants (in-memory + streaming) covered. Streaming-engine test asserts `PEAK_BODY_COL_PAGE_CACHE_LEN > 0` to confirm the flag routed through the streaming path. 2. **Engine-level multi-output contract** in `merge::tests::parity::assert_engine_parity`. Beyond the existing engine-vs-engine column equivalence, every parity test now also verifies on the in-memory engine's outputs (equivalent to the streaming engine's): sum of per-output row counts equals total input rows, each output internally monotonic on `sorted_series`, and across outputs the partition is disjoint (no two outputs share any `sorted_series` value). This is the m:n non-overlap contract. 3. **Multi-metric overlapping-input m:n** test `parity_multi_metric_overlapping_inputs_multi_output` exercises the strengthened contract with three inputs × three metrics where per-metric keyspaces overlap across inputs. n = 3 outputs target. Honest scope note in the new pipeline test module's doc: the actor pipeline today hardcodes `num_outputs = 1` in `ParquetMergeExecutor`, so n > 1 is not reachable end-to-end through the actor system. The new engine-level test covers the n > 1 correctness contract for now; when the executor is taught to accept `num_outputs > 1` from the merge policy, the pipeline tests can grow an n > 1 variant. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the hardcoded `MergeConfig { num_outputs: 1, ... }` in
`ParquetMergeExecutor::handle` with a per-merge computation:
num_outputs = max(1, ceil(total_input_bytes / target_split_size_bytes))
So a merge that ingests more than one target's worth of data spreads
across multiple output files; merges that fit in one target keep
producing a single output (preserving today's behavior for the
common case). The engine clamps the request to the number of
`sorted_series` boundaries actually available, so the value is an
upper bound, not an exact count.
Plumbing: `IndexerConfig` already carries `target_split_size_bytes`
in `ParquetMergePolicyConfig`. Pass that through
`ParquetMergePipelineParams.target_split_size_bytes` →
`ParquetMergeExecutor::new`. Default for tests:
`256 * 1024 * 1024` (matches the production default).
Latent multi-output bug fixed at the same time: with n>1, the
executor used to assign the planner-supplied `merge_split_id` to
**every** output split, which would have collided on the rename to
`{split_id}.parquet`. First output keeps the planner ID for
observability continuity; subsequent outputs use the fresh IDs
generated by `merge_parquet_split_metadata`.
Also exposes `quickwit_parquet_engine::merge::streaming::ms7_serial_lock`
as `pub` under the `testsuite` feature so cross-crate streaming tests
(in `quickwit-indexing`) can serialise against the same global lock
the in-crate MS-7 tests use. The streaming engine writes to a
process-global atomic on every merge — without shared locking, the
existing pipeline streaming-engine test races `store(0)` against
other tests' merges. Adds the appropriate
`#[allow(clippy::await_holding_lock)]` to the in-crate
`test_merge_pipeline_end_to_end_with_streaming_engine_flag` to
match.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the bonus scenario: three multi-metric inputs each written with `rg_partition_prefix_len = 1` and one row group per distinct metric_name (via `row_group_size = ROWS_PER_METRIC_PER_INPUT` so the writer flushes at every metric boundary after sorting). Merged with a small `ParquetMergePipelineParams::target_split_size_bytes = 500` that forces the executor's `num_outputs` calculation to ask the engine for multiple outputs — exercising the m:n merge path now reachable through the actor pipeline (PR's earlier commit removed the `num_outputs = 1` hardcode). Both engines covered: - `test_prefix_aligned_multi_metric_three_input_multi_output_in_memory_engine` - `test_prefix_aligned_multi_metric_three_input_multi_output_streaming_engine` The streaming-engine variant also asserts `PEAK_BODY_COL_PAGE_CACHE_LEN > 0` (under `ms7_serial_lock`) so a silent fallback to the in-memory path would fail. The shared assertion helper `assert_three_input_three_metric_multi_output_correct` checks the m:n contract end-to-end at the pipeline level: - All three input splits replaced. - ≥ 2 output splits staged (proves splitting happened). - Sum of per-output row counts = total input rows. - Each output internally monotonic on `sorted_series`. - Across outputs, the `sorted_series` partition is disjoint — no two outputs share any key, which is the "non-overlapping output" contract the engine promises. - Union of metric_names / services across outputs = full input set. - Every output has `num_merge_ops = 1`, `row_keys_proto`, and a `metric_name` zonemap regex. To pin the test to exactly one merge (not a cascade of merges over the now-multiple staged outputs), `make_pipeline_params` now takes `max_merge_ops` and the bonus tests set it to `1`: outputs land at `num_merge_ops = 1`, equal to the policy ceiling, and the planner refuses to merge them again. The existing n=1 tests stay at 5 (headroom — they produce a single output that can't trigger another merge anyway, since `merge_factor = 3`). Updates the module doc to drop the now-stale scope note about m:n not being reachable through the pipeline. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
452a5d2 to
6be2cd9
Compare
adamtobey
approved these changes
May 18, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Wires the streaming Parquet merge engine into the regular (non-promotion) merge path behind a new YAML feature flag.
indexer.parquet_merge_use_streaming_engine: booltoIndexerConfig(default:false).IndexerConfig→IndexingService→ParquetMergePipelineParams→ParquetMergeExecutor::new.ParquetMergeExecutor::handleto route on the flag:if promotion || use_streaming { streaming } else { in-memory fallback }. Promotion merges still always use the streaming engine (the in-memory path can't handle mixedrg_partition_prefix_len).merge_sorted_parquet_filespath stays in place as the runtime fallback so production can flip back via YAML without a redeploy if the streaming engine hits a bug. To be removed once the streaming engine has soaked.quickwit-parquet-engine/src/merge/tests.rsthat run both engines on the same realistic fixture and assert row-by-row column equivalence. These gate the eventual default flip.test_merge_pipeline_end_to_end_with_streaming_engine_flag— an actor-chain integration test that runs the full planner → downloader → executor → uploader → publisher pipeline with the flag set, and asserts (a) the merge published, (b)PEAK_BODY_COL_PAGE_CACHE_LEN > 0after the merge (the streaming engine writes to this atomic; in-memory never does — non-zero is direct evidence the streaming path executed), (c) the output row count and metric names are correct.quickwit-configYAML/JSON/TOML test fixtures so parse / serde regressions get caught.PEAK_BODY_COL_PAGE_CACHE_LEN).docs/internals/adr/deviations/. The doc cites the ADR-003 §4 streaming-merge requirement, lays out exit criteria (default flipped + ≥ 2-week production soak + no rollback), and lists the cleanup PR's tasks.This is the GAP-011-adjacent wire-in described in the streaming-merge roadmap memory: until the planner is taught to emit
promote_legacyoperations (GAP-011), the streaming engine sees no regular merges in production. Flipping this flag changes that — but only when the operator opts in.Rollout plan
false. CI parity tests + the new pipeline integration test guard correctness on synthetic fixtures.trueon a soak fleet, run for a planned window (≥ 2 weeks), monitor for divergence vs. the in-memory path.truein a follow-up PR.merge_sorted_parquet_files, the flag, the parity tests, and closes Deviation Add a CLA #1.Test plan
cargo test -p quickwit-parquet-engine --lib— 504 tests pass (incl. two new parity tests).cargo test -p quickwit-indexing --all-features --lib parquet_merge_pipeline_test— both end-to-end tests pass (the original in-memory one and the new streaming-engine-flag one).cargo test -p quickwit-config --lib node_config— 27 tests pass with the new field exercised in YAML/JSON/TOML.cargo clippy --workspace --all-features --testswithRUSTFLAGS="--cfg tokio_unstable -Dwarnings"— clean.cargo doc -p quickwit-config -p quickwit-parquet-engine -p quickwit-indexing --no-deps --all-featureswithRUSTDOCFLAGS="-Dwarnings"— clean.cargo machete— no unused deps.cargo +nightly fmt --all -- --check— clean on touched files.Architecture evolution
Stack
Stacked on #6428 (
gtt/adversarial-review-test-coverage).🤖 Generated with Claude Code