Skip to content

Commit 3f6c68b

Browse files
committed
perf(dqe): parallel bucket aggregation + inflated Sort+Limit for Iceberg GROUP BY queries
- Replace sequential bucket loop with CompletableFuture parallel execution (bucketParallelism = min(numBuckets, availableProcessors)) - Push inflated Sort+Limit (500K min) to Iceberg splits for ORDER BY agg LIMIT N - Q32 PASS 26.7s (was 88s), Q36 PASS ~24s (was 42s) - Q33/Q35 still slow (>180s) — needs further investigation
1 parent 696e7cf commit 3f6c68b

1 file changed

Lines changed: 72 additions & 33 deletions

File tree

dqe/src/main/java/org/opensearch/sql/dqe/coordinator/transport/TransportTrinoSqlAction.java

Lines changed: 72 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,6 +1044,26 @@ private void executeIcebergQuery(
10441044
if (isSingleStepAgg || isScalarSingleStepAgg || isAggQuery) {
10451045
// Aggregation queries: strip sort+limit, coordinator handles merge+sort+limit
10461046
splitPlan = stripSortAndLimit(splitPlan);
1047+
// Push inflated Sort+Limit to each split for partial agg with ORDER BY agg_col LIMIT N.
1048+
// Each split produces top-K groups locally, reducing coordinator merge volume.
1049+
// Use large inflated limit (100K min) to preserve correctness for high-cardinality GROUP BY.
1050+
if (!isSingleStepAgg && !isScalarSingleStepAgg
1051+
&& originalSort != null && originalLimit >= 0
1052+
&& coordinatorPlan instanceof AggregationNode aggNode
1053+
&& aggNode.getStep() != AggregationNode.Step.SINGLE) {
1054+
List<String> aggOutput = new ArrayList<>(aggNode.getGroupByKeys());
1055+
aggOutput.addAll(aggNode.getAggregateFunctions());
1056+
String primarySortKey = originalSort.getSortKeys().get(0);
1057+
int primaryIdx = aggOutput.indexOf(primarySortKey);
1058+
if (primaryIdx >= aggNode.getGroupByKeys().size()) {
1059+
long totalLimit = originalLimit + findGlobalOffset(optimizedPlan);
1060+
long inflatedLimit = Math.max(500_000, totalLimit * splits.size() * 10);
1061+
splitPlan = new SortNode(splitPlan,
1062+
originalSort.getSortKeys(), originalSort.getAscending(),
1063+
originalSort.getNullsFirst());
1064+
splitPlan = new LimitNode(splitPlan, inflatedLimit);
1065+
}
1066+
}
10471067
} else if (originalLimit >= 0 && originalSort != null) {
10481068
// Non-agg with ORDER BY + LIMIT — push to splits for top-N per split
10491069
splitPlan = stripSortAndLimit(splitPlan);
@@ -1112,44 +1132,63 @@ private void executeIcebergQuery(
11121132
rawColumnTypes.add(columnTypeMap.getOrDefault(col, io.trino.spi.type.BigintType.BIGINT));
11131133
}
11141134

1115-
// Run aggregation per bucket
1116-
List<Page> allBucketPages = new ArrayList<>();
1117-
for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
1118-
// Filter raw pages for this bucket
1119-
List<Page> bucketPages = new ArrayList<>();
1120-
for (Page rawPage : allRawPages) {
1121-
int positionCount = rawPage.getPositionCount();
1122-
int[] selected = new int[positionCount];
1123-
int selectedCount = 0;
1124-
for (int pos = 0; pos < positionCount; pos++) {
1125-
int h = 1;
1126-
for (int gi : groupByIndices) {
1127-
Block block = rawPage.getBlock(gi);
1128-
Type type = rawColumnTypes.get(gi);
1129-
if (block.isNull(pos)) { h = 31 * h; }
1130-
else {
1131-
Object val = HashAggregationOperator.readValue(block, pos, type);
1132-
h = 31 * h + (val == null ? 0 : val.hashCode());
1135+
// Run aggregation per bucket — PARALLEL across all cores.
1136+
// Each bucket filters by hash(groupKey) % numBuckets == bucketId.
1137+
// Buckets are independent (disjoint group sets), zero shared state.
1138+
int bucketParallelism = Math.min(numBuckets, Runtime.getRuntime().availableProcessors());
1139+
java.util.concurrent.ExecutorService bucketPool =
1140+
java.util.concurrent.Executors.newFixedThreadPool(bucketParallelism);
1141+
List<Page> allBucketPages;
1142+
try {
1143+
final List<Page> rawPages = allRawPages;
1144+
final List<Integer> gbIndices = groupByIndices;
1145+
final List<Type> rawColTypes = rawColumnTypes;
1146+
final List<String> rawColNames = rawColumnNames;
1147+
final boolean singleStep = isSingleStepAgg || isScalarSingleStepAgg;
1148+
1149+
List<java.util.concurrent.CompletableFuture<List<Page>>> bucketFutures = new ArrayList<>();
1150+
for (int b = 0; b < numBuckets; b++) {
1151+
final int bid = b;
1152+
bucketFutures.add(java.util.concurrent.CompletableFuture.supplyAsync(() -> {
1153+
List<Page> bucketPages = new ArrayList<>();
1154+
for (Page rawPage : rawPages) {
1155+
int positionCount = rawPage.getPositionCount();
1156+
int[] selected = new int[positionCount];
1157+
int selectedCount = 0;
1158+
for (int pos = 0; pos < positionCount; pos++) {
1159+
int h = 1;
1160+
for (int gi : gbIndices) {
1161+
Block block = rawPage.getBlock(gi);
1162+
Type type = rawColTypes.get(gi);
1163+
if (block.isNull(pos)) { h = 31 * h; }
1164+
else {
1165+
Object val = HashAggregationOperator.readValue(block, pos, type);
1166+
h = 31 * h + (val == null ? 0 : val.hashCode());
1167+
}
1168+
}
1169+
if (Math.floorMod(h, numBuckets) == bid) {
1170+
selected[selectedCount++] = pos;
1171+
}
1172+
}
1173+
if (selectedCount > 0) {
1174+
bucketPages.add(selectedCount == positionCount
1175+
? rawPage : rawPage.copyPositions(selected, 0, selectedCount));
11331176
}
11341177
}
1135-
if (Math.floorMod(h, numBuckets) == bucketId) {
1136-
selected[selectedCount++] = pos;
1178+
if (singleStep) {
1179+
return runCoordinatorAggregation(aggNode, bucketPages, rawColNames, columnTypeMap);
1180+
} else {
1181+
return new ResultMerger().mergeAggregation(List.of(bucketPages), aggNode, columnTypes);
11371182
}
1138-
}
1139-
if (selectedCount > 0) {
1140-
bucketPages.add(selectedCount == positionCount
1141-
? rawPage : rawPage.copyPositions(selected, 0, selectedCount));
1142-
}
1183+
}, bucketPool));
11431184
}
1144-
// Merge this bucket's partial results
1145-
if (isSingleStepAgg || isScalarSingleStepAgg) {
1146-
allBucketPages.addAll(
1147-
runCoordinatorAggregation(aggNode, bucketPages, rawColumnNames, columnTypeMap));
1148-
} else {
1149-
List<List<Page>> bucketSplitPages = List.of(bucketPages);
1150-
allBucketPages.addAll(
1151-
new ResultMerger().mergeAggregation(bucketSplitPages, aggNode, columnTypes));
1185+
1186+
allBucketPages = new ArrayList<>();
1187+
for (var future : bucketFutures) {
1188+
allBucketPages.addAll(future.join());
11521189
}
1190+
} finally {
1191+
bucketPool.shutdown();
11531192
}
11541193
mergedPages = applyCoordinatorHaving(allBucketPages, optimizedPlan, aggNode, columnTypeMap);
11551194
mergedPages = applyCoordinatorSort(mergedPages, aggNode, optimizedPlan, columnTypes, new ResultMerger());

0 commit comments

Comments
 (0)