perf: simplify HashJoinExec dynamic filter, drop CASE routing#21931
perf: simplify HashJoinExec dynamic filter, drop CASE routing#21931adriangb wants to merge 6 commits intoapache:mainfrom
Conversation
|
run benchmark tcph baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false |
|
run benchmark tcph baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (7a8272f) to main diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (7a8272f) to main diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
run benchmark tpch baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
run benchmark tpch baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (7a8272f) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (7a8272f) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
run benchmark tpch baseline:
ref: main
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS=128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS=128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false |
|
Hi @adriangb, your benchmark configuration could not be parsed (#21931 (comment)). Error: Supported benchmarks:
Usage: Per-side configuration ( env:
SHARED_SETTING: enabled
baseline:
ref: v45.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 1G
changed:
ref: v46.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 2GFile an issue against this benchmark runner |
|
run benchmark tpch baseline:
ref: main
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS=128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS=128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
Hi @adriangb, your benchmark configuration could not be parsed (#21931 (comment)). Error: Supported benchmarks:
Usage: Per-side configuration ( env:
SHARED_SETTING: enabled
baseline:
ref: v45.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 1G
changed:
ref: v46.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 2GFile an issue against this benchmark runner |
|
run benchmark tpch baseline:
ref: main
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS: 128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS: 128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: false |
|
run benchmark tpch baseline:
ref: main
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS: 128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_TARGET_PARTITIONS: 128
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (18129fe) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (18129fe) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
474734c to
f717a99
Compare
|
run benchmarks clickbench_partitioned tpch tpch10 tpcds hj env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: "false"
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: "false"
baseline:
ref: main
changed:
ref: HEAD |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff using: tpcds File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (f717a99) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
@gene-bordegaray @Dandandan I'm curious what you think of this. It's a bit of a compromise:
What this gets us is:
|
|
I'm quite in favor of this change. It also avoids blowing up the expression based on number of partitions, which can happen when partition count is high. |
Well it doesn't avoid it completely, and in some ways it makes it worse. We still have 1 hash map per partition (cannot be avoided unless we pay the memory and build time cost of combining them). And we now scale our probes with the number of partitions, they used to be constant with number of partitions. But probes are much faster than hashes which is why I think unless the partition count is high this will likely be faster. |
I am not 100% following this.
I think this would be worthwhile to add so we avoid both the expression blow up as not having to probe each of them? Perhaps we can as well use #21900 here as for primitive columns the cost of |
The point is: it is probably still slower at some very high partition count. But it seems to not matter in reasonable workloads.
That brings back coupling to hash routing, which IMO would be nice to avoid. I will try incorporating |
In Partitioned-mode HashJoinExec, when every reported partition's build
side uses a hash-table strategy, replace the routing CASE expression
(`CASE hash_repartition % N WHEN p THEN bounds AND hash_lookup ELSE
false END`) with `global_minmax AND multi_hash_lookup`.
The new MultiMapLookupExpr hashes the join keys once with HASH_JOIN_SEED
and ORs `contain_hashes()` across every partition's hash table,
eliminating both the routing-hash computation and the per-branch
re-hashing that CaseExpr does. Any non-Map partition (InList, Empty)
disqualifies the fast path and we use the legacy CASE unchanged; same
for partitions that were canceled before reporting build data.
Benchmarks (TPC-H SF=1, 7 iters back-to-back):
TOTAL min vs no-DF:
legacy CASE: +3.0%
multi_hash_lookup: +1.6% (~halves the regression)
Per-query (multi_hash_lookup vs CASE):
Q4 -6.0% Q5 -3.3% Q7 -6.0% Q8 -4.0%
Q9 -3.5% Q12 -3.2% Q17 -1.6% Q21 -3.8%
Refs: apache#19858
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…on IN (SET) When every reported partition for a Partitioned hash join uses InList pushdown and the cross-partition union would be ≤ 20 array entries, concatenate the per-partition `ArrayRef`s and emit `global_minmax AND struct(c0,c1,…) IN (SET)` instead of the routing CASE. The cap is set so the merged set can participate in parquet stats / bloom-filter pruning at the scan, which a per-partition CASE or a `multi_hash_lookup` cannot. A TPC-H SF=1 cap sweep (cap=20/50/100/200/2000) confirmed 20–50 is the sweet spot — past ~200 the larger static_filter hash set blows out of L1 and runtime regresses below the legacy CASE. The tightened path also subsumes the `force_hash_collisions` optimization (when the runtime collapses every key into one partition we get the same shape, just for a different reason) so both `#[cfg]` snapshot branches in test_hashjoin_dynamic_filter_pushdown_partitioned now produce the merged `IN (SET)` form. Refs: apache#19858 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Decouple the dynamic filter from the build-side repartition strategy
entirely. The filter for `PartitionMode::Partitioned` is now always
`global_minmax AND (merged_in_list | multi_hash_lookup)`, regardless
of whether individual partitions chose Map or InList for pushdown:
* `merged_in_list` fires when every reported partition contributed
an InList array AND the cross-partition union stays within
`MERGED_INLIST_MAX_TOTAL_LEN` (= 20). This is the path that
participates in parquet stats / bloom-filter pruning.
* Otherwise `multi_hash_lookup` probes every partition's hash table
in one shared hashing pass.
Key change: `PushdownStrategy` is now a struct that always carries the
`Map` (the join's hash table is built unconditionally) plus an optional
InList array. With the map always available we don't need a per-row
`CASE hash_repartition % N WHEN p THEN per_partition_filter ELSE …`
expression to route rows to the right partition's data — the shared
multi-map probe finds matches in whichever partition holds them.
Removed:
* `build_case_routing_filter` (~70 LoC) and its `CaseExpr` /
`HashExpr` plumbing in shared_bounds
* `repartition_random_state` field on `SharedBuildAccumulator`
* The `REPARTITION_RANDOM_STATE` import in `exec.rs`
* Conditional `force_hash_collisions` snapshot — both the normal
and force-collision paths now produce the same shape
The canceled-partition fallback collapses to `lit(true)`: with a
canceled partition we don't have its map, so we can't include it in
multi_hash_lookup; emitting the no-op filter is safe (correctness is
preserved) and the query is in the middle of being torn down anyway.
Costs: TPC-H SF=1 shows a small (≈0.5–2pp) regression vs the
multimap+CASE-fallback design on noisy back-to-back runs — the
moderate-InList shape (Q11/Q14 etc.) used to use small per-partition
InLists inside CASE; now those joins use multi_hash_lookup. Per the
issue discussion the simplification is the goal even when there's no
perf win.
Refs: apache#19858
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The cross-partition merged-InList gate now reuses the existing `optimizer.hash_join_inlist_pushdown_max_distinct_values` knob: one configuration option caps both the per-partition InList pushdown and the cross-partition merged set. The combine path explicitly deduplicates by `ScalarValue` (via a HashSet first-seen walk + a single `arrow::compute::take`) and then re-gates on the distinct count rather than the previous total-array-length heuristic. With the cap defaulting to 20, the worst-case dedup input is N×20 entries, which is microseconds at the partition counts we see in practice. The previous hardcoded 20-entry length cap and `MERGED_INLIST_MAX_TOTAL_LEN` constant are gone — the threshold is now configurable. Default lowered from 150 → 20 to align with parquet stats / bloom-filter pruning practicality (a small `IN (SET)` that scans can use to drop row groups is the entire reason for keeping this path). Users that want the wider per-partition InList behavior can raise the value. Within-partition build still ships duplicates: the build code in `exec.rs` doesn't dedupe before populating `PushdownStrategy::inlist`, relying on the join hash map's `num_of_distinct_key()` for the per-partition gate and the static filter inside `InListExpr` to dedupe at filter-evaluation time. Adding explicit per-partition dedup is a follow-up — not required for correctness because the cross-partition dedup catches everything. Refs: apache#19858 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- `information_schema.slt`: bumps the baked-in default and doc string for `optimizer.hash_join_inlist_pushdown_max_distinct_values` to match the 150 → 20 default change (sqllogictest, extended_tests, sqlite suite, verify-benchmark-results all hit this slt). - `partitioned_hash_eval.rs`: drop the redundant explicit-target on a `[BooleanArray]` doc link. Adding `BooleanArray` to imports for `MultiMapLookupExpr` made the existing `[`BooleanArray`](arrow::array::BooleanArray)` link redundant under `-D rustdoc::redundant_explicit_links`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drop comments that read like PR-review notes ("X no longer Y", "the
legacy CASE", "this drops the routing") in favour of comments that
describe the current behaviour for someone reading the file cold.
Trim some now-redundant field-level docs and tighten doc strings on
`MultiMapLookupExpr`, `PushdownStrategy`, `build_partitioned_filter`,
and `try_build_merged_inlist`.
No functional change.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
f717a99 to
17b9dce
Compare
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
|
I like this direction as a default, seems less brittle and cleaner baseline. This approach will definitely provide some value for I’d still like to discuss the possibility of partition-aware filters path when DataFusion can prove both sides share the same partition mapping and as discussed in #21207 the proper infrastructure for expressing more types of partitioning is in place. For use cases we are seeing the partition-specific filters selectivity is quite nice. I imagine that if we provided information from partitioning that two partitioning spaces (build and probe sides) are compatible (such as the same range partitioning) we would not only choose from |
|
Yes agreed. My thought is that this a good default for now that is less brittle to things messing with partitioning. And down the line once we've got a good story for range partitioning we can use #21900 for hash partitioned cases (assuming performance looks good) and some other system for range partitioning. @Dandandan I looked into #21900 but immediately realized the point is that would make the partition routing faster, but part of the goal here is to not have any partition routing because it introduces brittleness and is slower than just probing more hash tables (although #21900 could flip the performance story, we'd have to confirm). In any case - we can always merge this now and follow up with a version of |
I am very skeptical of probing partition x hash tables for every row is very efficient, but I see it can still be faster than evaluating a long nested expression which grows based on number of partitions, as DF doesn't have special knowledge of the "routing" (+ if everything matches it will be pure overhead). I feel like doing the That said - I like removing the big |
Sounds good |
Which issue does this PR close?
Rationale for this change
Today the
Partitioned-modeHashJoinExecbuilds a dynamic filter that's structured around the repartition layout:Two problems with this:
hash_repartition % Neven though the partition's hash table will be probed anyway with a different seed (HASH_JOIN_SEED).This PR replaces the routing CASE with a structure that depends only on the content of the build side, not its layout, and adds a cross-partition merged `IN (SET)` fast path so small joins can participate in parquet stats / bloom-filter pruning at the scan side.
What changes are included in this PR?
Five commits:
Final filter-shape matrix
No more `CASE`, no more `hash_repartition`, no more `REPARTITION_RANDOM_STATE` in the dynamic-filter path.
Performance
Per-row cost (Partitioned mode)
Let `N` = number of build-side partitions. For each probe row evaluated by the dynamic filter:
Two countervailing forces shape the result:
Benchmarks (TPC-H, runner = c4a-highmem-16, ARM Neoverse-V2, 16 cores)
Triggered four runs covering both N regimes and both pushdown configs.
Default partitioning (N ≈ ncores ≈ 16)
In the `pushdown=true` run the wins concentrate on queries where the dynamic filter feeds parquet stats / bloom-filter pruning at the scan:
Q17 alone is 109 ms of the 93 ms total wall-clock improvement — that's the original issue's regression, fixed. The small-query regressions (Q3/Q5/Q9/Q13/Q14) are the all-Map shape paying `O(N)` probes per row for moderate-to-large build sides where the bounds prefix doesn't prune much.
High partition count (`target_partitions=128`)
A stress test of partition-count scaling on the same 16-core box.
Pushdown=false @ N=128: 11 queries faster, 0 slower, 11 unchanged. `multi_hash_lookup` cleanly beats the legacy 128-branch `CaseExpr` evaluation. Big wins on Q3 (1.68×), Q5 (1.82×), Q7 (1.75×), Q8 (2.15×), Q9 (1.64×), Q12 (1.81×), Q13 (1.76×), Q17 (1.59×), Q18 (1.29×), Q20 (2.05×), Q21 (1.43×).
Pushdown=true @ N=128: 4 faster, 9 slower. This is the case where the filter runs in the scan hot loop and `multi_hash_lookup`'s `O(N)` probes per row dominate. The wins (Q17 1.87×, Q18 1.39×, Q20 1.76×) survive because their merged `IN (SET)` prunes whole row groups before the per-row filter ever runs. The losses (Q5 1.88×, Q9 1.64×, Q21 1.62×, Q14 1.51×, Q8 1.37×) are the same all-Map shape paying 128 probes per row.
Summary of the regime grid
Three of the four configs are wins (one big), one is a regression. The single regressing config is `high N + scan-side pushdown`, which is exactly the scenario `OptionalFilterPhysicalExpr` from #20363 is designed to absorb: the adaptive tracker would measure `multi_hash_lookup`'s low `bytes_pruned_per_second_of_eval_time` for queries like Q5/Q9/Q21 and drop the filter, while keeping Q17/Q18/Q20 (which prune scans aggressively).
A possible structural follow-up — re-introducing partition routing inside `MultiMapLookupExpr` (1 routing hash + 1 probe, so per-row cost matches legacy CASE) — would close the regression at any N, with or without #20363.
Are these changes tested?
Are there any user-facing changes?
🤖 Generated with Claude Code