@@ -1413,29 +1413,64 @@ private List<Page> mergeIcebergResults(
14131413 LOG .info ("PERF: mergeIcebergResults mergePassthrough {}ms, {} pages, {} rows" ,
14141414 (System .nanoTime () - t0 ) / 1_000_000 , rawPages .size (), totalRows );
14151415 t0 = System .nanoTime ();
1416- // Parallelize: partition raw pages into N chunks, aggregate each in parallel, then merge.
1417- int parallelism = Math .min (rawPages .size (), Runtime .getRuntime ().availableProcessors ());
1418- List <Page > mergedPages ;
1419- if (parallelism <= 1 ) {
1420- mergedPages = runCoordinatorAggregation (singleAgg , rawPages , rawColumnNames , columnTypeMap );
1421- } else {
1422- int chunkSize = (rawPages .size () + parallelism - 1 ) / parallelism ;
1423- java .util .concurrent .ExecutorService pool =
1424- java .util .concurrent .Executors .newFixedThreadPool (parallelism );
1425- try {
1426- List <java .util .concurrent .CompletableFuture <List <Page >>> futures = new ArrayList <>();
1427- for (int i = 0 ; i < rawPages .size (); i += chunkSize ) {
1428- List <Page > chunk = rawPages .subList (i , Math .min (i + chunkSize , rawPages .size ()));
1429- futures .add (java .util .concurrent .CompletableFuture .supplyAsync (
1430- () -> runCoordinatorAggregation (singleAgg , chunk , rawColumnNames , columnTypeMap ), pool ));
1416+ int numPartitions = Runtime .getRuntime ().availableProcessors ();
1417+ int numGroupByCols = singleAgg .getGroupByKeys ().size ();
1418+
1419+ // Resolve group-by column indices in raw scan output
1420+ List <Integer > gbIndices = new ArrayList <>();
1421+ for (String key : singleAgg .getGroupByKeys ()) {
1422+ gbIndices .add (rawColumnNames .indexOf (key ));
1423+ }
1424+
1425+ // Hash-partition raw rows by group keys into N partitions (disjoint keys per partition)
1426+ @ SuppressWarnings ("unchecked" )
1427+ List <Page >[] partitionPages = new List [numPartitions ];
1428+ for (int p = 0 ; p < numPartitions ; p ++) partitionPages [p ] = new ArrayList <>();
1429+
1430+ long perfRouteStart = System .nanoTime ();
1431+ for (Page page : rawPages ) {
1432+ int positionCount = page .getPositionCount ();
1433+ int [][] selected = new int [numPartitions ][positionCount ];
1434+ int [] selectedCount = new int [numPartitions ];
1435+ for (int pos = 0 ; pos < positionCount ; pos ++) {
1436+ int h = 1 ;
1437+ for (int gi : gbIndices ) {
1438+ Block block = page .getBlock (gi );
1439+ if (block .isNull (pos )) { h = 31 * h ; }
1440+ else {
1441+ Type type = columnTypeMap .getOrDefault (rawColumnNames .get (gi ), io .trino .spi .type .BigintType .BIGINT );
1442+ h = 31 * h + Long .hashCode (type .getLong (block , pos ));
1443+ }
14311444 }
1432- // Merge partial results — convert to splitPages format for mergeAggregation
1433- List <List <Page >> partialResults = new ArrayList <>();
1434- for (var f : futures ) partialResults .add (f .join ());
1435- mergedPages = merger .mergeAggregation (partialResults , singleAgg , columnTypes );
1436- } finally {
1437- pool .shutdown ();
1445+ int partition = Math .floorMod (h , numPartitions );
1446+ selected [partition ][selectedCount [partition ]++] = pos ;
14381447 }
1448+ for (int p = 0 ; p < numPartitions ; p ++) {
1449+ if (selectedCount [p ] > 0 ) {
1450+ partitionPages [p ].add (selectedCount [p ] == positionCount
1451+ ? page : page .copyPositions (selected [p ], 0 , selectedCount [p ]));
1452+ }
1453+ }
1454+ }
1455+ LOG .info ("PERF: hash exchange routing {}ms, {} partitions" ,
1456+ (System .nanoTime () - perfRouteStart ) / 1_000_000 , numPartitions );
1457+
1458+ // Parallel aggregation per partition — each has disjoint group keys
1459+ long perfAggStart = System .nanoTime ();
1460+ java .util .concurrent .ExecutorService pool =
1461+ java .util .concurrent .Executors .newFixedThreadPool (numPartitions );
1462+ List <Page > mergedPages ;
1463+ try {
1464+ List <java .util .concurrent .CompletableFuture <List <Page >>> futures = new ArrayList <>();
1465+ for (int p = 0 ; p < numPartitions ; p ++) {
1466+ List <Page > pPages = partitionPages [p ];
1467+ futures .add (java .util .concurrent .CompletableFuture .supplyAsync (
1468+ () -> runCoordinatorAggregation (singleAgg , pPages , rawColumnNames , columnTypeMap ), pool ));
1469+ }
1470+ mergedPages = new ArrayList <>();
1471+ for (var f : futures ) mergedPages .addAll (f .join ());
1472+ } finally {
1473+ pool .shutdown ();
14391474 }
14401475 long aggGroups = mergedPages .stream ().mapToLong (Page ::getPositionCount ).sum ();
14411476 LOG .info ("PERF: mergeIcebergResults runCoordinatorAggregation {}ms, {} output groups" ,
0 commit comments