Skip to content

Commit 6ce3ac5

Browse files
committed
Add query size limit to RelNode plan instead of post-processing
Add LogicalSystemLimit to the RelNode plan before passing it to the analytics engine, consistent with PPL V3 (QueryService.convertToCalcitePlan). This ensures the analytics engine enforces the limit during execution rather than returning all rows for post-processing truncation. Remove post-processing querySizeLimit truncation from AnalyticsExecutionEngine -- the limit is now part of the plan the executor receives. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 7613c44 commit 6ce3ac5

3 files changed

Lines changed: 26 additions & 29 deletions

File tree

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,10 @@ public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listene
6565
public void execute(
6666
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
6767
try {
68-
Integer querySizeLimit = context.sysLimit.querySizeLimit();
6968
Iterable<Object[]> rows = planExecutor.execute(plan, null);
7069

7170
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
72-
List<ExprValue> results = convertRows(rows, fields, querySizeLimit);
71+
List<ExprValue> results = convertRows(rows, fields);
7372
Schema schema = buildSchema(fields);
7473

7574
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
@@ -78,13 +77,9 @@ public void execute(
7877
}
7978
}
8079

81-
private List<ExprValue> convertRows(
82-
Iterable<Object[]> rows, List<RelDataTypeField> fields, Integer querySizeLimit) {
80+
private List<ExprValue> convertRows(Iterable<Object[]> rows, List<RelDataTypeField> fields) {
8381
List<ExprValue> results = new ArrayList<>();
8482
for (Object[] row : rows) {
85-
if (querySizeLimit != null && results.size() >= querySizeLimit) {
86-
break;
87-
}
8883
Map<String, ExprValue> valueMap = new LinkedHashMap<>();
8984
for (int i = 0; i < fields.size(); i++) {
9085
String columnName = fields.get(i).getName();

core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@
1212
import static org.mockito.Mockito.when;
1313

1414
import java.lang.reflect.Field;
15-
import java.util.ArrayList;
1615
import java.util.Arrays;
1716
import java.util.Collections;
18-
import java.util.List;
1917
import java.util.concurrent.atomic.AtomicReference;
2018
import java.util.stream.Collectors;
2119
import org.apache.calcite.rel.RelNode;
@@ -148,24 +146,8 @@ void executeRelNode_temporalTypes() {
148146
assertEquals(0, response.getResults().size(), "Should have 0 rows. " + dump);
149147
}
150148

151-
@Test
152-
void executeRelNode_querySizeLimit() throws Exception {
153-
RelNode relNode = mockRelNode("id", SqlTypeName.INTEGER);
154-
List<Object[]> manyRows = new ArrayList<>();
155-
for (int i = 0; i < 100; i++) {
156-
manyRows.add(new Object[] {i});
157-
}
158-
when(mockExecutor.execute(relNode, null)).thenReturn(manyRows);
159-
setSysLimit(mockContext, new SysLimit(10, 10000, 50000));
160-
161-
QueryResponse response = executeAndCapture(relNode);
162-
String dump = dumpResponse(response);
163-
164-
assertEquals(
165-
10,
166-
response.getResults().size(),
167-
"Should truncate to querySizeLimit=10, got " + response.getResults().size() + ". " + dump);
168-
}
149+
// Query size limit is now enforced in the RelNode plan (LogicalSystemLimit) before it reaches
150+
// AnalyticsExecutionEngine. The engine trusts the executor to honor the limit.
169151

170152
@Test
171153
void executeRelNode_emptyResults() {

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.opensearch.sql.api.UnifiedQueryContext;
2525
import org.opensearch.sql.api.UnifiedQueryPlanner;
2626
import org.opensearch.sql.calcite.CalcitePlanContext;
27+
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
2728
import org.opensearch.sql.common.antlr.SyntaxCheckException;
2829
import org.opensearch.sql.common.response.ResponseListener;
2930
import org.opensearch.sql.datasources.exceptions.DataSourceClientException;
@@ -129,13 +130,18 @@ private void doExecuteViaTransport(
129130

130131
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
131132
RelNode plan = planner.plan(query);
133+
134+
// Add query size limit to the plan so the analytics engine can enforce it
135+
// during execution, consistent with PPL V3 (see QueryService.convertToCalcitePlan)
136+
CalcitePlanContext planContext = context.getPlanContext();
137+
plan = addQuerySizeLimit(plan, planContext);
138+
132139
long planTime = System.nanoTime();
133140
LOG.info(
134141
"[unified] Planning completed in {}ms for {} query",
135142
(planTime - startTime) / 1_000_000,
136143
queryType);
137144

138-
CalcitePlanContext planContext = context.getPlanContext();
139145
analyticsEngine.execute(
140146
plan, planContext, createTransportQueryListener(queryType, planTime, listener));
141147
}
@@ -144,6 +150,17 @@ private void doExecuteViaTransport(
144150
}
145151
}
146152

153+
/**
154+
* Add a system-level query size limit to the plan. This ensures the analytics engine enforces the
155+
* limit during execution rather than returning all rows for post-processing truncation.
156+
*/
157+
private static RelNode addQuerySizeLimit(RelNode plan, CalcitePlanContext context) {
158+
return LogicalSystemLimit.create(
159+
LogicalSystemLimit.SystemLimitType.QUERY_SIZE_LIMIT,
160+
plan,
161+
context.relBuilder.literal(context.sysLimit.querySizeLimit()));
162+
}
163+
147164
private ResponseListener<QueryResponse> createTransportQueryListener(
148165
QueryType queryType,
149166
long planEndTime,
@@ -188,13 +205,16 @@ private void doExecute(String query, QueryType queryType, RestChannel channel) {
188205

189206
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
190207
RelNode plan = planner.plan(query);
208+
209+
CalcitePlanContext planContext = context.getPlanContext();
210+
plan = addQuerySizeLimit(plan, planContext);
211+
191212
long planTime = System.nanoTime();
192213
LOG.info(
193214
"[unified] Planning completed in {}ms for {} query",
194215
(planTime - startTime) / 1_000_000,
195216
queryType);
196217

197-
CalcitePlanContext planContext = context.getPlanContext();
198218
analyticsEngine.execute(
199219
plan, planContext, createQueryListener(queryType, channel, planTime));
200220
}

0 commit comments

Comments
 (0)