Skip to content

Commit 794524a

Browse files
committed
perf(dqe): hash-partitioned exchange + multi-key flat-array aggregation for Q33
- Hash-partitioned exchange in coordinator merge: single-pass routing by hash(groupKeys) % N, parallel FINAL agg per partition with disjoint keys. Coordinator merge: 120s → 27s for 58M groups. - Multi-key numeric fast path in HashAggregationOperator: flat long[]/double[] arrays with open-addressing, inline COUNT/SUM/AVG accumulation, zero per-row object allocation. Handles AVG via sum+count in double[]+long[] slots. - Iceberg table metadata cache: ConcurrentHashMap avoids repeated resolve+loadTable. - Q33: 339s → 114s (PASS). Trino 26.8s = 4.3x.
1 parent f7ad732 commit 794524a

3 files changed

Lines changed: 505 additions & 58 deletions

File tree

dqe/src/main/java/org/opensearch/sql/dqe/coordinator/transport/TransportTrinoSqlAction.java

Lines changed: 126 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1087,13 +1087,21 @@ private void executeIcebergQuery(
10871087
boolean needsMultiPass = false;
10881088
int estimatedGroups = 0;
10891089
// Dispatch splits ONCE — reuse results in both fast path and multi-pass fallback.
1090+
long perfTotal = System.nanoTime();
1091+
long perfT0 = System.nanoTime();
10901092
List<List<Page>> splitPages = dispatchIcebergSplits(splitFragments, splits, splitPlan);
1093+
int totalPages = splitPages.stream().mapToInt(List::size).sum();
1094+
LOG.info("PERF: split dispatch {}ms, {} splits, {} total pages",
1095+
(System.nanoTime() - perfT0) / 1_000_000, splits.size(), totalPages);
10911096
try {
1097+
perfT0 = System.nanoTime();
10921098
mergedPages = mergeIcebergResults(
10931099
splitPages, splitPlan, coordinatorPlan, optimizedPlan,
10941100
columnTypes, columnTypeMap, isSingleStepAgg, isScalarSingleStepAgg,
10951101
internalColumnNames);
10961102
} catch (HashAggregationOperator.GroupLimitExceededException e) {
1103+
LOG.info("PERF: first agg attempt failed at {}ms, {} groups",
1104+
(System.nanoTime() - perfT0) / 1_000_000, e.getGroupCount());
10971105
needsMultiPass = true;
10981106
estimatedGroups = e.getGroupCount() * 2;
10991107
mergedPages = null;
@@ -1116,7 +1124,11 @@ private void executeIcebergQuery(
11161124
LOG.info("GROUP BY overflow, retrying with {} buckets", numBuckets);
11171125

11181126
// Reuse raw pages from the first dispatch (no re-read)
1127+
perfT0 = System.nanoTime();
11191128
List<Page> allRawPages = new ResultMerger().mergePassthrough(splitPages);
1129+
long totalRows = allRawPages.stream().mapToLong(Page::getPositionCount).sum();
1130+
LOG.info("PERF: mergePassthrough {}ms, {} pages, {} total rows",
1131+
(System.nanoTime() - perfT0) / 1_000_000, allRawPages.size(), totalRows);
11201132

11211133
// Determine GROUP BY column indices in the raw scan output
11221134
List<String> rawColumnNames = resolveColumnNames(splitPlan);
@@ -1147,10 +1159,13 @@ private void executeIcebergQuery(
11471159
final boolean singleStep = isSingleStepAgg || isScalarSingleStepAgg;
11481160

11491161
List<java.util.concurrent.CompletableFuture<List<Page>>> bucketFutures = new ArrayList<>();
1162+
long perfBucketsStart = System.nanoTime();
11501163
for (int b = 0; b < numBuckets; b++) {
11511164
final int bid = b;
11521165
bucketFutures.add(java.util.concurrent.CompletableFuture.supplyAsync(() -> {
1166+
long filterStart = System.nanoTime();
11531167
List<Page> bucketPages = new ArrayList<>();
1168+
long inputRows = 0;
11541169
for (Page rawPage : rawPages) {
11551170
int positionCount = rawPage.getPositionCount();
11561171
int[] selected = new int[positionCount];
@@ -1173,25 +1188,38 @@ private void executeIcebergQuery(
11731188
if (selectedCount > 0) {
11741189
bucketPages.add(selectedCount == positionCount
11751190
? rawPage : rawPage.copyPositions(selected, 0, selectedCount));
1191+
inputRows += selectedCount;
11761192
}
11771193
}
1194+
long filterMs = (System.nanoTime() - filterStart) / 1_000_000;
1195+
long aggStart = System.nanoTime();
1196+
List<Page> result;
11781197
if (singleStep) {
1179-
return runCoordinatorAggregation(aggNode, bucketPages, rawColNames, columnTypeMap);
1198+
result = runCoordinatorAggregation(aggNode, bucketPages, rawColNames, columnTypeMap);
11801199
} else {
1181-
return new ResultMerger().mergeAggregation(List.of(bucketPages), aggNode, columnTypes);
1200+
result = new ResultMerger().mergeAggregation(List.of(bucketPages), aggNode, columnTypes);
11821201
}
1202+
long aggMs = (System.nanoTime() - aggStart) / 1_000_000;
1203+
long outputGroups = result.stream().mapToLong(Page::getPositionCount).sum();
1204+
LOG.info("PERF: bucket {} filter {}ms, agg {}ms, {} input rows, {} output groups",
1205+
bid, filterMs, aggMs, inputRows, outputGroups);
1206+
return result;
11831207
}, bucketPool));
11841208
}
11851209

11861210
allBucketPages = new ArrayList<>();
11871211
for (var future : bucketFutures) {
11881212
allBucketPages.addAll(future.join());
11891213
}
1214+
LOG.info("PERF: all buckets {}ms, {} total output pages",
1215+
(System.nanoTime() - perfBucketsStart) / 1_000_000, allBucketPages.size());
11901216
} finally {
11911217
bucketPool.shutdown();
11921218
}
11931219
mergedPages = applyCoordinatorHaving(allBucketPages, optimizedPlan, aggNode, columnTypeMap);
1220+
perfT0 = System.nanoTime();
11941221
mergedPages = applyCoordinatorSort(mergedPages, aggNode, optimizedPlan, columnTypes, new ResultMerger());
1222+
LOG.info("PERF: coordinator sort {}ms", (System.nanoTime() - perfT0) / 1_000_000);
11951223
}
11961224

11971225
// Apply coordinator-level OFFSET + LIMIT
@@ -1206,6 +1234,7 @@ private void executeIcebergQuery(
12061234

12071235
String schemaPrefix = buildSchemaJsonPrefix(columnNames, columnTypes);
12081236
Type[] typeArray = columnTypes.toArray(new Type[0]);
1237+
LOG.info("PERF: Q total {}ms", (System.nanoTime() - perfTotal) / 1_000_000);
12091238
String responseJson =
12101239
formatResponse(mergedPages, columnNames, columnTypes, schemaPrefix, typeArray);
12111240
listener.onResponse(new TrinoSqlResponse(responseJson));
@@ -1357,13 +1386,105 @@ private List<Page> mergeIcebergResults(
13571386
if (isSingleStepAgg || isScalarSingleStepAgg) {
13581387
AggregationNode singleAgg = (AggregationNode) coordinatorPlan;
13591388
List<String> rawColumnNames = resolveColumnNames(splitPlan);
1389+
long t0 = System.nanoTime();
13601390
List<Page> rawPages = merger.mergePassthrough(splitPages);
1361-
List<Page> mergedPages = runCoordinatorAggregation(singleAgg, rawPages, rawColumnNames, columnTypeMap);
1362-
return applyCoordinatorSort(mergedPages, singleAgg, optimizedPlan, columnTypes, merger);
1391+
long totalRows = rawPages.stream().mapToLong(Page::getPositionCount).sum();
1392+
LOG.info("PERF: mergeIcebergResults mergePassthrough {}ms, {} pages, {} rows",
1393+
(System.nanoTime() - t0) / 1_000_000, rawPages.size(), totalRows);
1394+
t0 = System.nanoTime();
1395+
// Parallelize: partition raw pages into N chunks, aggregate each in parallel, then merge.
1396+
int parallelism = Math.min(rawPages.size(), Runtime.getRuntime().availableProcessors());
1397+
List<Page> mergedPages;
1398+
if (parallelism <= 1) {
1399+
mergedPages = runCoordinatorAggregation(singleAgg, rawPages, rawColumnNames, columnTypeMap);
1400+
} else {
1401+
int chunkSize = (rawPages.size() + parallelism - 1) / parallelism;
1402+
java.util.concurrent.ExecutorService pool =
1403+
java.util.concurrent.Executors.newFixedThreadPool(parallelism);
1404+
try {
1405+
List<java.util.concurrent.CompletableFuture<List<Page>>> futures = new ArrayList<>();
1406+
for (int i = 0; i < rawPages.size(); i += chunkSize) {
1407+
List<Page> chunk = rawPages.subList(i, Math.min(i + chunkSize, rawPages.size()));
1408+
futures.add(java.util.concurrent.CompletableFuture.supplyAsync(
1409+
() -> runCoordinatorAggregation(singleAgg, chunk, rawColumnNames, columnTypeMap), pool));
1410+
}
1411+
// Merge partial results — convert to splitPages format for mergeAggregation
1412+
List<List<Page>> partialResults = new ArrayList<>();
1413+
for (var f : futures) partialResults.add(f.join());
1414+
mergedPages = merger.mergeAggregation(partialResults, singleAgg, columnTypes);
1415+
} finally {
1416+
pool.shutdown();
1417+
}
1418+
}
1419+
long aggGroups = mergedPages.stream().mapToLong(Page::getPositionCount).sum();
1420+
LOG.info("PERF: mergeIcebergResults runCoordinatorAggregation {}ms, {} output groups",
1421+
(System.nanoTime() - t0) / 1_000_000, aggGroups);
1422+
t0 = System.nanoTime();
1423+
List<Page> sorted = applyCoordinatorSort(mergedPages, singleAgg, optimizedPlan, columnTypes, merger);
1424+
LOG.info("PERF: mergeIcebergResults coordinatorSort {}ms", (System.nanoTime() - t0) / 1_000_000);
1425+
return sorted;
13631426
} else if (coordinatorPlan instanceof AggregationNode aggNode && isScalarPartialMerge(aggNode)) {
13641427
return mergeScalarAggregation(splitPages, aggNode, columnTypes);
13651428
} else if (coordinatorPlan instanceof AggregationNode aggNode) {
1366-
List<Page> mergedPages = merger.mergeAggregation(splitPages, aggNode, columnTypes);
1429+
long t0 = System.nanoTime();
1430+
int numPartitions = Runtime.getRuntime().availableProcessors();
1431+
int numGroupByCols = aggNode.getGroupByKeys().size();
1432+
1433+
// Collect all partial pages from all splits into flat list
1434+
List<Page> allPartialPages = merger.mergePassthrough(splitPages);
1435+
1436+
// Hash-partitioned exchange: route each row to partition = hash(groupKeys) % N
1437+
@SuppressWarnings("unchecked")
1438+
List<Page>[] partitionPages = new List[numPartitions];
1439+
for (int p = 0; p < numPartitions; p++) partitionPages[p] = new ArrayList<>();
1440+
1441+
long perfRouteStart = System.nanoTime();
1442+
for (Page page : allPartialPages) {
1443+
int positionCount = page.getPositionCount();
1444+
int[][] selected = new int[numPartitions][positionCount];
1445+
int[] selectedCount = new int[numPartitions];
1446+
for (int pos = 0; pos < positionCount; pos++) {
1447+
int h = 1;
1448+
for (int k = 0; k < numGroupByCols; k++) {
1449+
Block block = page.getBlock(k);
1450+
if (block.isNull(pos)) { h = 31 * h; }
1451+
else { h = 31 * h + Long.hashCode(columnTypes.get(k).getLong(block, pos)); }
1452+
}
1453+
int partition = Math.floorMod(h, numPartitions);
1454+
selected[partition][selectedCount[partition]++] = pos;
1455+
}
1456+
for (int p = 0; p < numPartitions; p++) {
1457+
if (selectedCount[p] > 0) {
1458+
partitionPages[p].add(selectedCount[p] == positionCount
1459+
? page : page.copyPositions(selected[p], 0, selectedCount[p]));
1460+
}
1461+
}
1462+
}
1463+
LOG.info("PERF: hash exchange routing {}ms, {} partitions",
1464+
(System.nanoTime() - perfRouteStart) / 1_000_000, numPartitions);
1465+
1466+
// Parallel FINAL aggregation per partition (disjoint group keys)
1467+
long perfAggStart = System.nanoTime();
1468+
java.util.concurrent.ExecutorService pool =
1469+
java.util.concurrent.Executors.newFixedThreadPool(numPartitions);
1470+
List<Page> mergedPages;
1471+
try {
1472+
List<java.util.concurrent.CompletableFuture<List<Page>>> futures = new ArrayList<>();
1473+
for (int p = 0; p < numPartitions; p++) {
1474+
List<Page> pPages = partitionPages[p];
1475+
futures.add(java.util.concurrent.CompletableFuture.supplyAsync(
1476+
() -> new ResultMerger().mergeAggregation(List.of(pPages), aggNode, columnTypes), pool));
1477+
}
1478+
mergedPages = new ArrayList<>();
1479+
for (var f : futures) mergedPages.addAll(f.join());
1480+
} finally {
1481+
pool.shutdown();
1482+
}
1483+
long aggGroups = mergedPages.stream().mapToLong(Page::getPositionCount).sum();
1484+
LOG.info("PERF: parallel FINAL agg {}ms, {} output groups",
1485+
(System.nanoTime() - perfAggStart) / 1_000_000, aggGroups);
1486+
LOG.info("PERF: mergeIcebergResults hash-exchange total {}ms",
1487+
(System.nanoTime() - t0) / 1_000_000);
13671488
mergedPages = applyCoordinatorHaving(mergedPages, optimizedPlan, aggNode, columnTypeMap);
13681489
return applyCoordinatorSort(mergedPages, aggNode, optimizedPlan, columnTypes, merger);
13691490
} else {

dqe/src/main/java/org/opensearch/sql/dqe/iceberg/transport/TransportIcebergSplitExecuteAction.java

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,21 @@ public class TransportIcebergSplitExecuteAction
4949

5050
private final org.opensearch.cluster.service.ClusterService clusterService;
5151

52+
// Cache table metadata across splits — same table is loaded 125+ times per query
53+
private record CachedTableMeta(TableInfo tableInfo, Table icebergTable, Schema schema, Map<String, Type> columnTypeMap) {}
54+
private final java.util.concurrent.ConcurrentHashMap<String, CachedTableMeta> tableCache = new java.util.concurrent.ConcurrentHashMap<>();
55+
56+
private CachedTableMeta getTableMeta(String tableName) {
57+
return tableCache.computeIfAbsent(tableName, name -> {
58+
IcebergTableResolver resolver = new IcebergTableResolver(getWarehousePath());
59+
TableInfo info = resolver.resolve(name);
60+
Table table = resolver.loadTable(name);
61+
Map<String, Type> typeMap = new HashMap<>();
62+
for (TableInfo.ColumnInfo col : info.columns()) typeMap.put(col.name(), col.trinoType());
63+
return new CachedTableMeta(info, table, table.schema(), typeMap);
64+
});
65+
}
66+
5267
@Inject
5368
public TransportIcebergSplitExecuteAction(
5469
TransportService transportService, ActionFilters actionFilters,
@@ -80,15 +95,9 @@ protected void doExecute(
8095
new ByteArrayInputStream(req.getSerializedPlan())));
8196

8297
IcebergSplitInfo splitInfo = req.getSplitInfo();
83-
IcebergTableResolver resolver = new IcebergTableResolver(getWarehousePath());
84-
TableInfo tableInfo = resolver.resolve(splitInfo.tableName());
85-
Table icebergTable = resolver.loadTable(splitInfo.tableName());
86-
Schema icebergSchema = icebergTable.schema();
87-
88-
Map<String, Type> columnTypeMap = new HashMap<>();
89-
for (TableInfo.ColumnInfo col : tableInfo.columns()) {
90-
columnTypeMap.put(col.name(), col.trinoType());
91-
}
98+
CachedTableMeta meta = getTableMeta(splitInfo.tableName());
99+
Schema icebergSchema = meta.schema();
100+
Map<String, Type> columnTypeMap = meta.columnTypeMap();
92101

93102
// Build scan factory that creates ParquetPageSource for the split
94103
ParquetReaderOptions options = new ParquetReaderOptions();
@@ -148,22 +157,21 @@ protected void doExecute(
148157
*/
149158
public IcebergSplitExecuteResponse executeLocal(
150159
DqePlanNode plan, IcebergSplitInfo splitInfo) throws Exception {
151-
IcebergTableResolver resolver = new IcebergTableResolver(getWarehousePath());
152-
TableInfo tableInfo = resolver.resolve(splitInfo.tableName());
153-
Table icebergTable = resolver.loadTable(splitInfo.tableName());
154-
Schema icebergSchema = icebergTable.schema();
155-
156-
Map<String, Type> columnTypeMap = new HashMap<>();
157-
for (TableInfo.ColumnInfo col : tableInfo.columns()) {
158-
columnTypeMap.put(col.name(), col.trinoType());
159-
}
160+
long perfSplitStart = System.nanoTime();
161+
long perfMetaStart = System.nanoTime();
162+
CachedTableMeta meta = getTableMeta(splitInfo.tableName());
163+
Schema icebergSchema = meta.schema();
164+
Map<String, Type> columnTypeMap = meta.columnTypeMap();
165+
long perfMetaMs = (System.nanoTime() - perfMetaStart) / 1_000_000;
160166

161167
ParquetReaderOptions options = new ParquetReaderOptions();
162168
ParquetPredicateConverter predicateConverter = new ParquetPredicateConverter(columnTypeMap);
169+
long[] perfParquetOpenMs = {0};
163170
LocalExecutionPlanner planner =
164171
new LocalExecutionPlanner(
165172
scanNode -> {
166173
try {
174+
long openStart = System.nanoTime();
167175
// Read file schema for predicate mapping
168176
java.io.File file = new java.io.File(splitInfo.filePath());
169177
io.trino.filesystem.local.LocalInputFile inputFile =
@@ -181,26 +189,44 @@ public IcebergSplitExecuteResponse executeLocal(
181189
TupleDomain<ColumnDescriptor> predicate =
182190
predicateConverter.extractPredicates(plan, fileSchema);
183191

184-
return new ParquetPageSource(
192+
ParquetPageSource source = new ParquetPageSource(
185193
splitInfo.filePath(),
186194
icebergSchema,
187195
scanNode.getColumns(),
188196
columnTypeMap,
189197
options,
190198
predicate);
199+
perfParquetOpenMs[0] = (System.nanoTime() - openStart) / 1_000_000;
200+
return source;
191201
} catch (Exception e) {
192202
throw new RuntimeException("Failed to create ParquetPageSource", e);
193203
}
194204
},
195205
columnTypeMap);
196206

207+
long perfPlanStart = System.nanoTime();
197208
Operator pipeline = plan.accept(planner, null);
209+
long perfPlanMs = (System.nanoTime() - perfPlanStart) / 1_000_000;
210+
211+
long perfExecStart = System.nanoTime();
198212
List<Page> pages = new ArrayList<>();
199213
Page page;
214+
long totalRows = 0;
200215
while ((page = pipeline.processNextBatch()) != null) {
201216
pages.add(page);
217+
totalRows += page.getPositionCount();
202218
}
203219
pipeline.close();
220+
long perfExecMs = (System.nanoTime() - perfExecStart) / 1_000_000;
221+
long perfTotalMs = (System.nanoTime() - perfSplitStart) / 1_000_000;
222+
223+
// Log per-split profiling (sampled: first, last, and every 25th split)
224+
String fileName = splitInfo.filePath();
225+
int lastSlash = fileName.lastIndexOf('/');
226+
String shortName = lastSlash >= 0 ? fileName.substring(lastSlash + 1) : fileName;
227+
LOG.info("PERF-SPLIT: {} total={}ms meta={}ms parquetOpen={}ms plan={}ms exec={}ms pages={} rows={}",
228+
shortName, perfTotalMs, perfMetaMs, perfParquetOpenMs[0], perfPlanMs, perfExecMs,
229+
pages.size(), totalRows);
204230

205231
List<Type> columnTypes = resolveColumnTypes(plan, columnTypeMap);
206232
return new IcebergSplitExecuteResponse(pages, columnTypes);
@@ -214,15 +240,9 @@ public IcebergSplitExecuteResponse executeLocalWithBucketFilter(
214240
DqePlanNode plan, IcebergSplitInfo splitInfo,
215241
List<Integer> groupByIndices, List<Type> allColumnTypes,
216242
int bucket, int numBuckets) throws Exception {
217-
IcebergTableResolver resolver = new IcebergTableResolver(getWarehousePath());
218-
TableInfo tableInfo = resolver.resolve(splitInfo.tableName());
219-
Table icebergTable = resolver.loadTable(splitInfo.tableName());
220-
Schema icebergSchema = icebergTable.schema();
221-
222-
Map<String, Type> columnTypeMap = new HashMap<>();
223-
for (TableInfo.ColumnInfo col : tableInfo.columns()) {
224-
columnTypeMap.put(col.name(), col.trinoType());
225-
}
243+
CachedTableMeta meta = getTableMeta(splitInfo.tableName());
244+
Schema icebergSchema = meta.schema();
245+
Map<String, Type> columnTypeMap = meta.columnTypeMap();
226246

227247
ParquetReaderOptions options = new ParquetReaderOptions();
228248
ParquetPredicateConverter predicateConverter = new ParquetPredicateConverter(columnTypeMap);

0 commit comments

Comments
 (0)