[fix](be) Preserve agg hash shuffle after non-hash exchange#63766
[fix](be) Preserve agg hash shuffle after non-hash exchange#63766BiteTheDDDDt wants to merge 1 commit into
Conversation
### What problem does this PR solve? Issue Number: None Related PR: apache#63529, apache#62438 Problem Summary: PR apache#63529 preserved local hash shuffle for serial merge aggregation, but the condition only covered serial children. A non-finalize merge aggregation can also receive input from a child that requires a non-hash local exchange, such as the adaptive passthrough exchange used by nested loop join after apache#62438. In that case identical group keys can be split across local aggregation instances and later count distinct aggregation may count duplicate partial groups. This change preserves local hash shuffle for merge aggregation when the child breaks local key distribution through either serial execution or a non-hash local exchange. ### Release note Fix wrong results for merge aggregation after non-hash local exchange. ### Check List (For Author) - Test: Unit Test - ./run-be-ut.sh --run --filter=AggOperatorRequiredDistributionTest.* - Behavior changed: Yes. Merge aggregation now keeps local hash shuffle when its child requires a non-hash local exchange. - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
/review |
There was a problem hiding this comment.
Pull request overview
This PR fixes a correctness edge case in BE pipeline local-exchange planning: when a non-finalize merge aggregation follows a child operator that requires a non-hash local exchange (e.g. ADAPTIVE_PASSTHROUGH from nested loop join), the local key distribution can be broken and identical group keys may be split across local agg instances, causing wrong results (notably for later DISTINCT-related aggregation stages).
Changes:
- Extend the “don’t skip local hash shuffle” condition for merge aggregations from “serial child” to “child that breaks local key distribution” (serial or requires a non-hash local exchange).
- Add a helper to detect whether a child’s required distribution implies a non-hash local exchange that would break key distribution.
- Add a BE unit test covering the
ADAPTIVE_PASSTHROUGHchild case to ensureHASH_SHUFFLEis required.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| be/src/exec/operator/aggregation_sink_operator.h | Refines merge-agg distribution requirement logic to preserve hash shuffle when a child requires non-hash local exchange (or is serial). |
| be/test/exec/operator/agg_operator_test.cpp | Adds a unit test validating merge-agg requires HASH_SHUFFLE after a child with ADAPTIVE_PASSTHROUGH distribution. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Review result: no blocking issues found in the changed code.
Critical checkpoint conclusions:
- Goal/test: The PR preserves aggregation hash shuffle when a merge aggregation follows a child that requires a non-hash local exchange. The added unit test covers the new ADAPTIVE_PASSTHROUGH case at the required_data_distribution level.
- Scope: The change is small and focused on AggSinkOperatorX distribution selection plus one targeted unit test.
- Concurrency/lifecycle: No new shared mutable state, threads, dependencies, or non-trivial lifecycle management are introduced.
- Configuration/compatibility: No new configuration, storage format, protocol, or rolling-upgrade compatibility concerns found.
- Parallel paths: The modified path is the relevant non-streaming aggregation sink distribution path; no distinct parallel BE path appeared to require the same exact change in this PR.
- Conditional logic: The new condition preserves the existing serial-child behavior and extends it to child-required non-hash local exchanges; this matches the local-exchange planning flow inspected.
- Test coverage: Coverage is targeted but limited to BE unit validation of distribution selection, not a full SQL regression. This is a residual coverage limitation, not a blocking defect from the reviewed diff.
- Observability/transactions/data writes: Not applicable; no new transaction, persistence, data-write, or observability-sensitive path is introduced.
- Performance: The added child distribution check is lightweight and only occurs during pipeline distribution planning; no hot-path performance issue found.
User focus response: no additional user-provided review focus was present.
Verification: Attempted ./run-be-ut.sh --run --filter=AggOperatorRequiredDistributionTest.*, but the command timed out after 120s while initializing/building third-party submodules, before the targeted test could complete.
TPC-H: Total hot run time: 31690 ms |
TPC-DS: Total hot run time: 171410 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
What problem does this PR solve?
Issue Number: None
Related PR: #63529, #62438
Problem Summary: PR #63529 preserved local hash shuffle for serial merge aggregation, but the condition only covered serial children. A non-finalize merge aggregation can also receive input from a child that requires a non-hash local exchange, such as the adaptive passthrough exchange used by nested loop join after #62438. In that case identical group keys can be split across local aggregation instances and later count distinct aggregation may count duplicate partial groups.
This PR keeps the existing serial-child protection and additionally preserves local hash shuffle when a merge aggregation's child requires a non-hash local exchange such as ADAPTIVE_PASSTHROUGH.
Release note
Fix wrong results for merge aggregation after non-hash local exchange.
Check List (For Author)