Commit 3fc479f
feat: add k-way sorted merge engine for Parquet compaction (#6335)
* fix: SS-2 invariant — nulls always sort last, regardless of direction
The SS-2 invariant incorrectly specified that nulls sort before non-null
for descending columns. The actual production code uses nulls_first=false
everywhere — nulls always sort after non-null values, regardless of
direction. This is required for sorted_series key correctness: null
columns are omitted from the key, and their keys naturally sort after
keys that include those columns.
Fixed in all locations:
- TLA+ SortSchema.tla: CompareValues and SS2_NullOrdering
- Rust invariants/sort.rs: compare_with_null_ordering
- Rust models/sort_schema.rs: SS-2 property check and compare_values test
- Rust invariants/registry.rs: SS-2 description
- ADR-002: SS-2 invariant table
- Phase 1 design doc: null handling descriptions
Stateright exhaustive_sort_schema BFS passes with corrected model.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* test: verify model detects null ordering violations
Added two tests to the Stateright sort_schema model:
- ss2_detects_null_before_non_null_in_descending: constructs rows
with null before non-null in a descending column, verifies both
row_leq and is_sorted reject this ordering (nulls always last).
- ss2_accepts_non_null_before_null_in_descending: verifies the
correct ordering (non-null then null) is accepted.
These confirm the model can catch the exact class of bug that the
SS-2 invariant fix addresses.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* feat: add SS-2 production invariant check using shared model function
Adds verify_ss2_null_ordering to the writer's sort verification path.
This calls quickwit_dst::invariants::sort::compare_with_null_ordering —
the same function the Stateright model uses for exhaustive BFS
verification — to confirm that:
1. The shared invariant function returns Greater for (null, non-null)
regardless of sort direction (debug_assert at function entry)
2. No adjacent row pair in the sorted output has null before non-null
in any sort column (when earlier columns are equal)
This bridges the gap where SS-2 was verified by the model but never
checked at runtime in production code. The check runs in debug builds
(via check_invariant! / debug_assert!) and reports to the invariant
recorder in all builds.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* refactor: use shared invariant functions for all TW-2 checks
All TW-2 (window_duration divides 3600) checks now call the shared
quickwit_dst::invariants::window::is_valid_window_duration instead
of inlining the `3600 % duration_secs == 0` logic. This ensures
production and model execute identical validation:
- sort_fields/window.rs: validate_window_duration + window_start
- storage/writer.rs: build_compaction_key_value_metadata
- split/metadata.rs: ParquetSplitMetadataBuilder::build
The shared functions used by production check_invariant! calls are
now the same code the Stateright models use:
- SS-2: compare_with_null_ordering (sort.rs)
- TW-2: is_valid_window_duration (window.rs)
- TW-1: window_start_secs (window.rs) — already shared
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: encode descending sort columns correctly in sorted_series key
The sorted_series key uses storekey for order-preserving binary encoding,
but storekey always encodes ascending. For sort schemas with descending
pre-timestamp columns (e.g. "-service"), the encoded bytes must be
bitwise-NOTed so that ascending memcmp on the composite key gives the
correct descending order. This is the standard ordered-code technique
(see Google's OrderedCode).
Without this fix, a descending pre-timestamp column would cause
sorted_series bytes to not be monotonically ascending in the file,
breaking DataFusion streaming aggregation assumptions and merge
correctness.
Changes:
- KeyColumn now carries `descending: bool` from the parsed sort schema
- encode_row_key inverts storekey bytes for descending columns
(ordinal + value together)
- Added invert_bytes helper
- timeseries_id is always ascending (it's a hash tiebreaker)
Test: ascending schema gives alpha < zebra in key bytes; descending
schema gives zebra < alpha (inverted). Keys from different schemas
differ for the same data.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: only invert value bytes for descending columns, not ordinal
When a descending sort column has a null value, the null row skips
the column entirely in the key. If the ordinal bytes were inverted
(!ordinal = 0xFE), null rows (whose next byte is the small
timeseries_id ordinal, e.g. 0x02) would sort before non-null rows,
violating the writer's nulls_first=false behavior.
Fix: write the ordinal normally (ascending) and only invert the
storekey-encoded value bytes. This ensures:
- Non-null descending: ordinal(small) + !value
- Null (skipped): next ordinal(slightly larger)
- So non-null keys < null keys, matching physical nulls-last order
Added test: descending service with null — verifies
key(beta) < key(alpha) < key(null).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: reject sort schemas with tag columns after timeseries_id
The sorted_series key encodes columns up to (and including)
timeseries_id. Any tag column after timeseries_id would be in the
physical sort order but not in the key, breaking merge correctness —
two rows with the same timeseries_id but different extra_tag values
would have identical sorted_series keys.
Added validation in validate_schema: after timeseries_id, only
timestamp and tiebreaker may follow. Any other column is rejected
at parse time.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* feat: add k-way sorted merge engine for Parquet compaction
Implements Phase 1 of Parquet compaction: a self-contained merge engine
in quickwit-parquet-engine/src/merge/ that takes N sorted Parquet files
and produces M sorted output files.
The merge key is (sorted_series ASC, timestamp_secs DESC), using Arrow's
RowConverter for binary-comparable key encoding and a BinaryHeap for
k-way merge. Outputs are split at sorted_series boundaries to ensure
non-overlapping key ranges across output files.
Key components:
- merge_order.rs: k-way merge via min-heap, RLE merge order, output
boundary computation at series transitions
- schema.rs: union schema resolution with Husky column ordering
(sort cols → sorted_series → remaining alphabetical)
- writer.rs: applies merge permutation via Arrow take kernel, extracts
row keys and zonemap regexes from merged data, embeds qh.* KV metadata
Invariant verification (sesh-mode):
- MC-1: check_invariant! verifies total rows in == total rows out
- MC-3: check_invariant! verifies output sorted by (sorted_series, timestamp_secs)
- MC-4: check_invariant! verifies output schema is union of all input schemas
- MC-2: unit test verifies all data values survive merge unchanged
- DM-5: unit test verifies sorted_series bytes identical through merge
- Stateright model (check_compaction_small) passes all TW/CS/MC invariants
Tests (13 total):
- 8 unit tests: non-overlapping, interleaved, single input, multiple
outputs, empty inputs, KV metadata, Husky column ordering
- 3 edge case tests: schema evolution (MC-4), row contents (MC-2),
sorted_series preservation (DM-5)
- 2 proptest DST: random inputs verify MC-1, MC-2, MC-3 for arbitrary data
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: address PR review — heap optimization, per-output schema, type compat
Addresses all review comments on #6335:
1. **Heap key optimization**: HeapEntry no longer copies row bytes.
Stores (input_index, row_pos) and looks up from shared MergeHeapState
via raw pointer during comparison. Zero per-row allocation.
2. **Per-output schema optimization**: After merge permutation, each
output file's schema is determined by its actual data:
- All-null columns are stripped (schema evolution, M>1 splitting)
- String columns are dictionary-encoded when cardinality is low
(distinct/total <= 0.5), plain Utf8 otherwise
3. **String type normalization**: Union schema normalizes all string-like
types (Utf8, LargeUtf8, Dictionary) to plain Utf8 internally for
uniform `take`. Output type is determined by data, not input types.
4. **New tests** (4 added, 17 total):
- Deep interleaving: rows from middle of different inputs must
interleave correctly (verifies row-level merge, not append)
- Duplicate timestamps: 4 rows with same series+timestamp all
survive (MC-1, no deduplication)
- Per-output schema stripping: M=2 where output 2 lacks a column
that's all-null (extra_tag stripped from output, present in other)
- Output type reflects data: low-cardinality metric_name is
dictionary-encoded based on output data characteristics
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* test: add cross-row-group interleaving test
Writes inputs with row_group_size=3 so each file has multiple Parquet
row groups, then verifies that the merge correctly interleaves rows
from different row groups of different inputs.
Input 1: 8 rows across 3 row groups (alpha, beta, gamma series)
Input 2: 8 rows across 3 row groups (alpha, beta, delta series)
Output: 16 rows with alpha and beta timestamps interleaved from both
inputs, proving the merge operates at individual row granularity
regardless of row group boundaries.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* test: add cross-RecordBatch interleaving test
Forces read_batch_size=2 so each 6-row input file yields 3 Arrow
RecordBatches during reading. Proves that concat_batches in read_inputs
correctly merges them and that RecordBatch boundaries within a file
don't affect the merge — rows from different batches of different
inputs interleave correctly.
Also refactors merge entry point to support test-only read_batch_size
parameter via merge_sorted_parquet_files_with_read_batch_size.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: replace raw pointer with Arc in merge heap
HeapEntry now holds Arc<MergeHeapState> instead of *const MergeHeapState.
Removes all unsafe code (raw pointer dereference and unsafe impl Send).
Arc refcount bump per heap push is negligible compared to the row
comparison cost.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: derive timestamp sort direction from sort schema, validate inputs
Two changes:
1. The timestamp_secs sort direction in the RowConverter is now derived
from the parsed sort schema instead of being hardcoded to DESC. The
sort schema is the source of truth for column ordering.
2. Added validate_sort_schemas() — reads qh.sort_fields from each input
file's Parquet KV metadata and verifies it matches the merge config's
sort_fields using equivalent_schemas_for_compaction(). Files without
sort metadata are accepted with a warning (pre-Phase-1 compatibility).
Mismatches are a hard error.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: missing sort_fields metadata is an error, not a warning
Every file entering the merge pipeline was written by our ingestion
pipeline, which always embeds qh.sort_fields. Missing metadata
indicates a bug, not a pre-Phase-1 file. Changed from warn to bail.
Also added build_test_metadata() helper so all test files embed proper
qh.sort_fields metadata, which the sort schema validation now requires.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: remove hardcoded DESC from comments and MC-3 check
All comments and the verify_sort_order function now derive the
timestamp_secs sort direction from the sort schema instead of
assuming descending. verify_sort_order takes the sort_fields_str
parameter and parses it to determine the direction.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* refactor: derive sort_fields/window/merge_ops from input files
Removes sort_fields, window_start_secs, window_duration_secs, and
input_num_merge_ops from MergeConfig. These are now read from the
input files' Parquet KV metadata via extract_and_validate_input_metadata(),
which:
- Reads qh.sort_fields, qh.window_start, qh.window_duration_secs,
qh.num_merge_ops from each input file
- Validates all inputs agree on sort schema, window_start, window_duration
- Computes max(num_merge_ops) + 1 for the output
- Fails on any mismatch
MergeConfig now only contains num_outputs and writer_config — the merge
engine is self-describing from its inputs.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* style: apply nightly rustfmt to merge module
Runs nightly rustfmt with imports_granularity=Module and
group_imports=StdExternalCrate. Also replaces let-chains with
nested if-let to avoid rustfmt parse errors on older nightly
versions used in CI.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: allow collapsible_if where let-chains break rustfmt
Clippy wants let-chains but the CI nightly rustfmt cannot parse them.
Added targeted #[allow(clippy::collapsible_if)] on the two spots with
a comment explaining why.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: normalize legacy "timestamp" to "timestamp_secs" in merge
Legacy sort schemas (e.g. "metric_name|host|timestamp/V2") use
"timestamp" while the physical column is "timestamp_secs". The
merge now accepts both names when looking up the timestamp column
in the sort schema, matching the normalization already done in
row_keys::extract_row_keys.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: split merge runs at sorted_series boundaries
When a single input contains multiple series (alpha, beta, gamma),
the k-way merge produced one contiguous MergeRun covering all rows.
compute_output_boundaries only checked transitions between runs, not
within them, so num_outputs=3 would still produce 1 output file.
Fix: during run extension in compute_merge_order, verify the new row's
sorted_series matches the run's start row. If the series changes, start
a new run. This guarantees every MergeRun contains exactly one series,
so output boundary computation finds all transitions by comparing
adjacent runs.
Added regression test: single input with 3 series, num_outputs=3,
verifies 3 output files are produced with one series each.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: split eagerly when transitions are scarce relative to outputs
When an early series has far more rows than target_rows_per_output,
the boundary algorithm would refuse to split at subsequent transitions
until the small remainder caught up to the target. For example, series
row counts [6,1,1] with num_outputs=3 produced only 2 files.
Fix: track transitions_remaining and split eagerly when
transitions_remaining < outputs_remaining, ensuring we don't skip
transitions we need to fill all requested outputs.
Added regression test with oversized first series.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: normalize legacy timestamp in sorting_columns and field names
build_sorting_columns and resolve_sort_field_names looked up sort
schema column names against the batch schema without normalizing
"timestamp" to "timestamp_secs". For inputs written with a legacy
sort schema containing "timestamp", the timestamp column was silently
omitted from the Parquet SortingColumn footer metadata.
Added normalize_column_name helper used by both functions, consistent
with the normalization already in compute_merge_order, verify_sort_order,
and row_keys::extract_row_keys.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* style: rustfmt new test functions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: encode descending sort columns correctly in sorted_series key
The sorted_series key uses storekey for order-preserving binary encoding,
but storekey always encodes ascending. For descending sort columns, the
encoded bytes must be bitwise-NOTed so that ascending memcmp on the
composite key gives the correct descending order (the standard
ordered-code technique — see Google's OrderedCode).
Changes:
- KeyColumn now carries a `descending: bool` from the sort schema
- encode_row_key inverts storekey bytes for descending columns
- Added invert_bytes helper
- Removed the rejected validation approach (no longer needed)
This ensures that sorted_series bytes are monotonically ascending in
files sorted by the writer, regardless of per-column sort direction.
The merge key (sorted_series ASC, timestamp <dir>) is therefore correct
for all valid sort schemas, not just all-ascending ones.
Added test: merge with -service (descending) pre-timestamp column
verifies zebra sorts before alpha in both input and output.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix: use UInt64 take indices to avoid u32 overflow on large merges
Take indices were Vec<u32> / UInt32Array, which silently truncates
when concatenated inputs exceed u32::MAX (~4B) rows. Large metrics
compactions can plausibly cross that threshold. Switched to Vec<u64>
/ UInt64Array.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* refactor: use shared compare_with_null_ordering in MC-3 timestamp check
The merge module's MC-3 sort order verification now uses the shared
quickwit_dst::invariants::sort::compare_with_null_ordering for the
timestamp comparison within equal sorted_series groups. Previously
this was an inline ascending/descending check.
This ensures the merge module's sort verification uses the same
comparison logic as the Stateright sort_schema model.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* refactor: share timestamp column normalization across all code paths
Five independent implementations of "timestamp" → "timestamp_secs"
normalization are replaced by shared functions in sort_fields/column_type.rs:
- normalize_column_name(name) → physical column name
- is_timestamp_column_name(name) → true for "timestamp" or "timestamp_secs"
- TIMESTAMP_SECS constant for the physical column name
Updated call sites:
- row_keys/mod.rs: extract_row_keys
- sorted_series/mod.rs: resolve_key_columns
- merge/merge_order.rs: compute_merge_order (removed local constants)
- merge/writer.rs: build_sorting_columns, resolve_sort_field_names,
verify_sort_order (removed local normalize_column_name function)
- sort_fields/validation.rs: uses shared TIMESERIES_ID constant
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>1 parent 6702d41 commit 3fc479f
14 files changed
Lines changed: 3339 additions & 18 deletions
File tree
- quickwit/quickwit-parquet-engine/src
- merge
- row_keys
- sort_fields
- sorted_series
- storage
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
22 | 22 | | |
23 | 23 | | |
24 | 24 | | |
| 25 | + | |
25 | 26 | | |
26 | 27 | | |
27 | 28 | | |
| |||
0 commit comments