Skip to content

Commit 23227fd

Browse files
authored
Add query routing and execution handoff for Parquet-backed indices (#5267)
* [Mustang] Add query routing and execution handoff for Parquet-backed indices (#5247) Implements the query routing and AnalyticsExecutionEngine for Project Mustang's unified query pipeline. PPL queries targeting parquet_ prefixed indices are routed through UnifiedQueryPlanner and executed via a stub QueryPlanExecutor, with results formatted through the existing JDBC response pipeline. New files: - QueryPlanExecutor: @FunctionalInterface contract for analytics engine - AnalyticsExecutionEngine: converts Iterable<Object[]> to QueryResponse with type mapping and query size limit enforcement - RestUnifiedQueryAction: orchestrates schema building, planning, execution on sql-worker thread pool, with client/server error classification and metrics - StubQueryPlanExecutor: canned data for parquet_logs and parquet_metrics tables for development and testing Modified files: - RestPPLQueryAction: routing branch for parquet_ indices - SQLPlugin: passes ClusterService and NodeClient to RestPPLQueryAction - plugin/build.gradle: adds :api dependency for UnifiedQueryPlanner Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Align isClientError with RestPPLQueryAction classification Add missing exception types to isClientError(): IndexNotFoundException, ExpressionEvaluationException, QueryEngineException, DataSourceClientException, IllegalAccessException. Matches the full list in RestPPLQueryAction.isClientError(). Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Move stub code into analytics/stub sub-package Extract StubSchemaProvider, StubQueryPlanExecutor, and StubIndexDetector into plugin/.../rest/analytics/stub/ package to clearly separate temporary stub code from production code. RestUnifiedQueryAction now delegates to these stub classes. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Use ErrorMessageFactory for error responses Replace hand-crafted JSON error response with ErrorMessageFactory.createErrorMessage(), matching the standard error format used in RestPPLQueryAction.reportError(). Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Make metrics and response formatting query-type-aware Use QueryType to select the correct metrics (PPL_REQ_TOTAL vs REQ_TOTAL, PPL_FAILED_REQ_COUNT_* vs FAILED_REQ_COUNT_*) and LangSpec (PPL_SPEC vs SQL_SPEC) so this class can serve both PPL and SQL queries when unified SQL support is added. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Move analytics routing from REST layer to transport layer Move the analytics index routing check from RestPPLQueryAction into TransportPPLQueryAction.doExecute(). This ensures the analytics path gets the same PPL enabled check, metrics, request ID, and inter-plugin transport support as the existing Lucene path. RestPPLQueryAction and SQLPlugin are reverted to their original state. Added executeViaTransport() to RestUnifiedQueryAction which returns results via ActionListener<TransportPPLQueryResponse> instead of RestChannel, integrating properly with the transport action pattern. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Add query size limit to RelNode plan instead of post-processing Add LogicalSystemLimit to the RelNode plan before passing it to the analytics engine, consistent with PPL V3 (QueryService.convertToCalcitePlan). This ensures the analytics engine enforces the limit during execution rather than returning all rows for post-processing truncation. Remove post-processing querySizeLimit truncation from AnalyticsExecutionEngine -- the limit is now part of the plan the executor receives. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Propagate security context to sql-worker thread pool Wrap scheduled lambdas in both execute() and executeViaTransport() with withCurrentContext() to capture and restore ThreadContext (user identity, permissions, audit trail) on the worker thread. Follows the same pattern as OpenSearchQueryManager.withCurrentContext(). Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Move analytics routing after profiling setup Move the analytics index routing check after QueryContext.setProfile() and wrapWithProfilingClear(listener). Use clearingListener instead of raw listener so profiling thread-local state is properly cleaned up after analytics path execution. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Remove NPE from isClientError classification NPE in the analytics path is a server bug (null schema field, missing row), not bad user input. Removed from client error list. Will sync this classification with RestPPLQueryAction updates in #5266. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Remove duplicate REST execution path from RestUnifiedQueryAction Remove execute(query, queryType, channel), doExecute(), createQueryListener(channel), recordSuccessMetric(), recordFailureMetric(), reportError(), and related REST imports. Since routing now goes through TransportPPLQueryAction, the REST-specific path was unused. Renamed executeViaTransport() to execute() as the sole entry point. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 513e1b2 commit 23227fd

11 files changed

Lines changed: 1119 additions & 1 deletion

File tree

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.executor.analytics;
7+
8+
import java.util.ArrayList;
9+
import java.util.LinkedHashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
import org.apache.calcite.rel.RelNode;
13+
import org.apache.calcite.rel.type.RelDataType;
14+
import org.apache.calcite.rel.type.RelDataTypeField;
15+
import org.opensearch.sql.calcite.CalcitePlanContext;
16+
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
17+
import org.opensearch.sql.common.response.ResponseListener;
18+
import org.opensearch.sql.data.model.ExprTupleValue;
19+
import org.opensearch.sql.data.model.ExprValue;
20+
import org.opensearch.sql.data.model.ExprValueUtils;
21+
import org.opensearch.sql.data.type.ExprType;
22+
import org.opensearch.sql.executor.ExecutionContext;
23+
import org.opensearch.sql.executor.ExecutionEngine;
24+
import org.opensearch.sql.executor.pagination.Cursor;
25+
import org.opensearch.sql.planner.physical.PhysicalPlan;
26+
27+
/**
28+
* Execution engine adapter for the analytics engine (Project Mustang).
29+
*
30+
* <p>Bridges the analytics engine's {@link QueryPlanExecutor} with the SQL plugin's {@link
31+
* ExecutionEngine} response pipeline. Takes a Calcite {@link RelNode}, delegates execution to the
32+
* analytics engine, and converts the raw results into {@link QueryResponse}.
33+
*/
34+
public class AnalyticsExecutionEngine implements ExecutionEngine {
35+
36+
private final QueryPlanExecutor planExecutor;
37+
38+
public AnalyticsExecutionEngine(QueryPlanExecutor planExecutor) {
39+
this.planExecutor = planExecutor;
40+
}
41+
42+
/** Not supported. Analytics queries use the RelNode path exclusively. */
43+
@Override
44+
public void execute(PhysicalPlan plan, ResponseListener<QueryResponse> listener) {
45+
listener.onFailure(
46+
new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
47+
}
48+
49+
/** Not supported. Analytics queries use the RelNode path exclusively. */
50+
@Override
51+
public void execute(
52+
PhysicalPlan plan, ExecutionContext context, ResponseListener<QueryResponse> listener) {
53+
listener.onFailure(
54+
new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
55+
}
56+
57+
/** Not supported. Analytics queries use the RelNode path exclusively. */
58+
@Override
59+
public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listener) {
60+
listener.onFailure(
61+
new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
62+
}
63+
64+
@Override
65+
public void execute(
66+
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
67+
try {
68+
Iterable<Object[]> rows = planExecutor.execute(plan, null);
69+
70+
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
71+
List<ExprValue> results = convertRows(rows, fields);
72+
Schema schema = buildSchema(fields);
73+
74+
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
75+
} catch (Exception e) {
76+
listener.onFailure(e);
77+
}
78+
}
79+
80+
private List<ExprValue> convertRows(Iterable<Object[]> rows, List<RelDataTypeField> fields) {
81+
List<ExprValue> results = new ArrayList<>();
82+
for (Object[] row : rows) {
83+
Map<String, ExprValue> valueMap = new LinkedHashMap<>();
84+
for (int i = 0; i < fields.size(); i++) {
85+
String columnName = fields.get(i).getName();
86+
Object value = (i < row.length) ? row[i] : null;
87+
valueMap.put(columnName, ExprValueUtils.fromObjectValue(value));
88+
}
89+
results.add(ExprTupleValue.fromExprValueMap(valueMap));
90+
}
91+
return results;
92+
}
93+
94+
private Schema buildSchema(List<RelDataTypeField> fields) {
95+
List<Schema.Column> columns = new ArrayList<>();
96+
for (RelDataTypeField field : fields) {
97+
ExprType exprType = convertType(field.getType());
98+
columns.add(new Schema.Column(field.getName(), null, exprType));
99+
}
100+
return new Schema(columns);
101+
}
102+
103+
private ExprType convertType(RelDataType type) {
104+
try {
105+
return OpenSearchTypeFactory.convertRelDataTypeToExprType(type);
106+
} catch (IllegalArgumentException e) {
107+
return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN;
108+
}
109+
}
110+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.executor.analytics;
7+
8+
import org.apache.calcite.rel.RelNode;
9+
10+
/**
11+
* Executes a Calcite {@link RelNode} logical plan against the analytics engine.
12+
*
13+
* <p>This is a local equivalent of {@code org.opensearch.analytics.exec.QueryPlanExecutor} from the
14+
* analytics-framework library. It will be replaced by the upstream interface once the
15+
* analytics-framework JAR is published.
16+
*
17+
* @see <a
18+
* href="https://github.com/opensearch-project/OpenSearch/blob/9142d0e789c6a6c4708f1bc015745ed55202eefe/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/exec/QueryPlanExecutor.java">Upstream
19+
* QueryPlanExecutor</a>
20+
*/
21+
@FunctionalInterface
22+
public interface QueryPlanExecutor {
23+
24+
/**
25+
* Executes the given logical plan and returns result rows.
26+
*
27+
* @param plan the Calcite RelNode subtree to execute
28+
* @param context execution context (opaque to avoid server dependency)
29+
* @return rows produced by the engine
30+
*/
31+
Iterable<Object[]> execute(RelNode plan, Object context);
32+
}

0 commit comments

Comments
 (0)