Skip to content

[fix](be) Preserve agg hash shuffle after non-hash exchange#63766

Open
BiteTheDDDDt wants to merge 1 commit into
apache:masterfrom
BiteTheDDDDt:fix-agg-local-exchange-nlj
Open

[fix](be) Preserve agg hash shuffle after non-hash exchange#63766
BiteTheDDDDt wants to merge 1 commit into
apache:masterfrom
BiteTheDDDDt:fix-agg-local-exchange-nlj

Conversation

@BiteTheDDDDt
Copy link
Copy Markdown
Contributor

What problem does this PR solve?

Issue Number: None

Related PR: #63529, #62438

Problem Summary: PR #63529 preserved local hash shuffle for serial merge aggregation, but the condition only covered serial children. A non-finalize merge aggregation can also receive input from a child that requires a non-hash local exchange, such as the adaptive passthrough exchange used by nested loop join after #62438. In that case identical group keys can be split across local aggregation instances and later count distinct aggregation may count duplicate partial groups.

This PR keeps the existing serial-child protection and additionally preserves local hash shuffle when a merge aggregation's child requires a non-hash local exchange such as ADAPTIVE_PASSTHROUGH.

Release note

Fix wrong results for merge aggregation after non-hash local exchange.

Check List (For Author)

  • Test: Unit Test
    • ./run-be-ut.sh --run --filter=AggOperatorRequiredDistributionTest.*
  • Behavior changed: Yes. Merge aggregation now keeps local hash shuffle when its child requires a non-hash local exchange.
  • Does this need documentation: No

### What problem does this PR solve?

Issue Number: None

Related PR: apache#63529, apache#62438

Problem Summary: PR apache#63529 preserved local hash shuffle for serial merge aggregation, but the condition only covered serial children. A non-finalize merge aggregation can also receive input from a child that requires a non-hash local exchange, such as the adaptive passthrough exchange used by nested loop join after apache#62438. In that case identical group keys can be split across local aggregation instances and later count distinct aggregation may count duplicate partial groups. This change preserves local hash shuffle for merge aggregation when the child breaks local key distribution through either serial execution or a non-hash local exchange.

### Release note

Fix wrong results for merge aggregation after non-hash local exchange.

### Check List (For Author)

- Test: Unit Test
    - ./run-be-ut.sh --run --filter=AggOperatorRequiredDistributionTest.*
- Behavior changed: Yes. Merge aggregation now keeps local hash shuffle when its child requires a non-hash local exchange.
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings May 27, 2026 14:06
@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@BiteTheDDDDt
Copy link
Copy Markdown
Contributor Author

run buildall

@BiteTheDDDDt
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a correctness edge case in BE pipeline local-exchange planning: when a non-finalize merge aggregation follows a child operator that requires a non-hash local exchange (e.g. ADAPTIVE_PASSTHROUGH from nested loop join), the local key distribution can be broken and identical group keys may be split across local agg instances, causing wrong results (notably for later DISTINCT-related aggregation stages).

Changes:

  • Extend the “don’t skip local hash shuffle” condition for merge aggregations from “serial child” to “child that breaks local key distribution” (serial or requires a non-hash local exchange).
  • Add a helper to detect whether a child’s required distribution implies a non-hash local exchange that would break key distribution.
  • Add a BE unit test covering the ADAPTIVE_PASSTHROUGH child case to ensure HASH_SHUFFLE is required.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
be/src/exec/operator/aggregation_sink_operator.h Refines merge-agg distribution requirement logic to preserve hash shuffle when a child requires non-hash local exchange (or is serial).
be/test/exec/operator/agg_operator_test.cpp Adds a unit test validating merge-agg requires HASH_SHUFFLE after a child with ADAPTIVE_PASSTHROUGH distribution.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review result: no blocking issues found in the changed code.

Critical checkpoint conclusions:

  • Goal/test: The PR preserves aggregation hash shuffle when a merge aggregation follows a child that requires a non-hash local exchange. The added unit test covers the new ADAPTIVE_PASSTHROUGH case at the required_data_distribution level.
  • Scope: The change is small and focused on AggSinkOperatorX distribution selection plus one targeted unit test.
  • Concurrency/lifecycle: No new shared mutable state, threads, dependencies, or non-trivial lifecycle management are introduced.
  • Configuration/compatibility: No new configuration, storage format, protocol, or rolling-upgrade compatibility concerns found.
  • Parallel paths: The modified path is the relevant non-streaming aggregation sink distribution path; no distinct parallel BE path appeared to require the same exact change in this PR.
  • Conditional logic: The new condition preserves the existing serial-child behavior and extends it to child-required non-hash local exchanges; this matches the local-exchange planning flow inspected.
  • Test coverage: Coverage is targeted but limited to BE unit validation of distribution selection, not a full SQL regression. This is a residual coverage limitation, not a blocking defect from the reviewed diff.
  • Observability/transactions/data writes: Not applicable; no new transaction, persistence, data-write, or observability-sensitive path is introduced.
  • Performance: The added child distribution check is lightweight and only occurs during pipeline distribution planning; no hot-path performance issue found.

User focus response: no additional user-provided review focus was present.

Verification: Attempted ./run-be-ut.sh --run --filter=AggOperatorRequiredDistributionTest.*, but the command timed out after 120s while initializing/building third-party submodules, before the targeted test could complete.

@hello-stephen
Copy link
Copy Markdown
Contributor

TPC-H: Total hot run time: 31690 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit 00f9e27ab019737d869a3696042b0961068aae49, data reload: false

------ Round 1 ----------------------------------
orders	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	17646	4011	4019	4011
q2	q3	10802	1408	821	821
q4	4688	475	345	345
q5	7580	2296	2138	2138
q6	265	183	145	145
q7	982	797	667	667
q8	9408	1721	1720	1720
q9	6590	5013	4914	4914
q10	6448	2250	1895	1895
q11	456	280	249	249
q12	697	439	295	295
q13	18188	3390	2774	2774
q14	271	266	239	239
q15	q16	824	773	713	713
q17	1006	920	1005	920
q18	6992	5745	5493	5493
q19	1196	1449	1154	1154
q20	515	414	297	297
q21	5921	2753	2581	2581
q22	480	374	319	319
Total cold run time: 100955 ms
Total hot run time: 31690 ms

----- Round 2, with runtime_filter_mode=off -----
orders	Doris	NULL	NULL	150000000	42	6422171781	NULL	22778155	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	4789	4715	4874	4715
q2	q3	4873	5268	4678	4678
q4	2093	2188	1379	1379
q5	4895	4724	4680	4680
q6	238	184	133	133
q7	1885	1794	1561	1561
q8	2279	1941	1939	1939
q9	7417	7421	7426	7421
q10	4755	4662	4224	4224
q11	529	388	357	357
q12	726	735	523	523
q13	3038	3353	2800	2800
q14	283	286	256	256
q15	q16	688	717	622	622
q17	1273	1239	1242	1239
q18	7244	6865	6801	6801
q19	1129	1085	1121	1085
q20	2218	2223	1929	1929
q21	5240	4553	4331	4331
q22	522	456	409	409
Total cold run time: 56114 ms
Total hot run time: 51082 ms

@hello-stephen
Copy link
Copy Markdown
Contributor

TPC-DS: Total hot run time: 171410 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit 00f9e27ab019737d869a3696042b0961068aae49, data reload: false

query5	4325	638	538	538
query6	345	214	197	197
query7	4232	565	309	309
query8	331	236	217	217
query9	8783	3995	4019	3995
query10	455	382	292	292
query11	5759	2403	2256	2256
query12	198	126	135	126
query13	1383	605	452	452
query14	6102	5479	5141	5141
query14_1	4492	4486	4468	4468
query15	215	203	184	184
query16	989	461	441	441
query17	1156	769	611	611
query18	2587	483	347	347
query19	213	204	164	164
query20	136	135	125	125
query21	211	135	115	115
query22	13547	13479	13264	13264
query23	17275	16507	16142	16142
query23_1	16285	16336	16356	16336
query24	7618	1766	1300	1300
query24_1	1307	1302	1316	1302
query25	544	484	404	404
query26	1318	332	172	172
query27	2738	556	339	339
query28	4430	1994	1981	1981
query29	958	625	496	496
query30	309	241	196	196
query31	1130	1082	976	976
query32	87	73	73	73
query33	542	344	303	303
query34	1202	1137	650	650
query35	776	809	701	701
query36	1422	1446	1286	1286
query37	150	110	93	93
query38	3217	3144	3182	3144
query39	941	944	953	944
query39_1	922	914	925	914
query40	230	146	128	128
query41	72	62	62	62
query42	109	110	111	110
query43	333	336	306	306
query44	
query45	212	203	202	202
query46	1161	1251	738	738
query47	2366	2381	2306	2306
query48	409	427	297	297
query49	627	497	388	388
query50	998	351	250	250
query51	4366	4306	4261	4261
query52	105	105	92	92
query53	261	266	194	194
query54	325	269	250	250
query55	92	89	84	84
query56	302	301	320	301
query57	1441	1412	1321	1321
query58	304	273	259	259
query59	1590	1674	1439	1439
query60	311	320	304	304
query61	156	161	146	146
query62	705	651	586	586
query63	255	200	204	200
query64	2407	794	605	605
query65	
query66	1699	469	358	358
query67	29036	29594	29407	29407
query68	
query69	452	335	301	301
query70	1054	1017	962	962
query71	295	314	265	265
query72	2901	2646	2329	2329
query73	821	782	420	420
query74	5089	4980	4747	4747
query75	2690	2615	2244	2244
query76	2298	1160	770	770
query77	413	411	330	330
query78	12429	12348	11852	11852
query79	1350	1017	739	739
query80	584	543	460	460
query81	449	280	236	236
query82	398	160	121	121
query83	359	280	249	249
query84	253	172	115	115
query85	887	555	454	454
query86	399	370	313	313
query87	3469	3384	3245	3245
query88	3621	2736	2727	2727
query89	433	399	342	342
query90	1833	180	179	179
query91	182	165	140	140
query92	78	76	73	73
query93	1438	1319	845	845
query94	546	334	273	273
query95	686	391	342	342
query96	1112	819	317	317
query97	2746	2725	2592	2592
query98	235	228	225	225
query99	1173	1164	1043	1043
Total cold run time: 252016 ms
Total hot run time: 171410 ms

@hello-stephen
Copy link
Copy Markdown
Contributor

BE UT Coverage Report

Increment line coverage 6.25% (1/16) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 53.92% (20953/38856)
Line Coverage 37.51% (198722/529741)
Region Coverage 33.79% (155684/460745)
Branch Coverage 34.79% (67789/194877)

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 6.25% (1/16) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 63.74% (24256/38055)
Line Coverage 47.46% (250774/528384)
Region Coverage 44.53% (207151/465166)
Branch Coverage 45.64% (89276/195603)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants