Skip to content

Commit a72e462

Browse files
authored
[Experimental] Add engine extensions interface (#5298)
* Add engine extensions interface Signed-off-by: Lantao Jin <ltjin@amazon.com> * Add engine extensions interface Signed-off-by: Lantao Jin <ltjin@amazon.com> * Fix TransportPPLQueryAction Signed-off-by: Lantao Jin <ltjin@amazon.com> * address comments Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 2c8b706 commit a72e462

7 files changed

Lines changed: 329 additions & 8 deletions

File tree

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.executor;
7+
8+
import java.util.List;
9+
import java.util.Optional;
10+
import lombok.RequiredArgsConstructor;
11+
import lombok.extern.log4j.Log4j2;
12+
import org.apache.calcite.rel.RelNode;
13+
import org.opensearch.sql.ast.statement.ExplainMode;
14+
import org.opensearch.sql.calcite.CalcitePlanContext;
15+
import org.opensearch.sql.common.response.ResponseListener;
16+
import org.opensearch.sql.planner.physical.PhysicalPlan;
17+
18+
/**
19+
* An {@link ExecutionEngine} that delegates Calcite RelNode execution to the first extension whose
20+
* {@link ExecutionEngine#canVectorize(RelNode)} returns {@code true}, falling back to the default
21+
* engine otherwise. Non-Calcite ({@link PhysicalPlan}) methods and unmatched RelNode plans are
22+
* forwarded to the default engine.
23+
*/
24+
@RequiredArgsConstructor
25+
@Log4j2
26+
public class DelegatingExecutionEngine implements ExecutionEngine {
27+
28+
private final ExecutionEngine defaultEngine;
29+
private final List<ExecutionEngine> extensions;
30+
31+
@Override
32+
public void execute(PhysicalPlan plan, ResponseListener<QueryResponse> listener) {
33+
defaultEngine.execute(plan, listener);
34+
}
35+
36+
@Override
37+
public void execute(
38+
PhysicalPlan plan, ExecutionContext context, ResponseListener<QueryResponse> listener) {
39+
defaultEngine.execute(plan, context, listener);
40+
}
41+
42+
@Override
43+
public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listener) {
44+
defaultEngine.explain(plan, listener);
45+
}
46+
47+
@Override
48+
public boolean canVectorize(RelNode plan) {
49+
return findExtension(plan).isPresent();
50+
}
51+
52+
@Override
53+
public void execute(
54+
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
55+
Optional<ExecutionEngine> ext = findExtension(plan);
56+
if (ext.isPresent()) {
57+
log.info("Routing query to extension engine : {}", ext.get().getClass().getSimpleName());
58+
ext.get().execute(plan, context, listener);
59+
} else {
60+
defaultEngine.execute(plan, context, listener);
61+
}
62+
}
63+
64+
@Override
65+
public void explain(
66+
RelNode plan,
67+
ExplainMode mode,
68+
CalcitePlanContext context,
69+
ResponseListener<ExplainResponse> listener) {
70+
Optional<ExecutionEngine> ext = findExtension(plan);
71+
if (ext.isPresent()) {
72+
ext.get().explain(plan, mode, context, listener);
73+
} else {
74+
defaultEngine.explain(plan, mode, context, listener);
75+
}
76+
}
77+
78+
private Optional<ExecutionEngine> findExtension(RelNode plan) {
79+
return extensions.stream().filter(ext -> ext.canVectorize(plan)).findFirst();
80+
}
81+
}

core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,32 @@ void execute(
4747
*/
4848
void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listener);
4949

50+
/**
51+
* Check if this engine supports vectorized execution of the given Calcite RelNode plan.
52+
* Vectorized execution engines (e.g. Velox) override this to advertise support for specific plan
53+
* shapes. The default returns {@code false}.
54+
*/
55+
default boolean canVectorize(RelNode plan) {
56+
return false;
57+
}
58+
5059
/** Execute calcite RelNode plan with {@link ExecutionContext} and call back response listener. */
5160
default void execute(
52-
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {}
61+
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
62+
listener.onFailure(
63+
new UnsupportedOperationException(
64+
getClass().getSimpleName() + " does not support RelNode execution"));
65+
}
5366

5467
default void explain(
5568
RelNode plan,
5669
ExplainMode mode,
5770
CalcitePlanContext context,
58-
ResponseListener<ExplainResponse> listener) {}
71+
ResponseListener<ExplainResponse> listener) {
72+
listener.onFailure(
73+
new UnsupportedOperationException(
74+
getClass().getSimpleName() + " does not support RelNode explain"));
75+
}
5976

6077
/** Data class that encapsulates ExprValue. */
6178
@Data
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.executor;
7+
8+
import static org.mockito.ArgumentMatchers.any;
9+
import static org.mockito.ArgumentMatchers.eq;
10+
import static org.mockito.Mockito.never;
11+
import static org.mockito.Mockito.verify;
12+
import static org.mockito.Mockito.when;
13+
14+
import java.util.List;
15+
import org.apache.calcite.rel.RelNode;
16+
import org.junit.jupiter.api.Test;
17+
import org.junit.jupiter.api.extension.ExtendWith;
18+
import org.mockito.Mock;
19+
import org.mockito.junit.jupiter.MockitoExtension;
20+
import org.opensearch.sql.ast.statement.ExplainMode;
21+
import org.opensearch.sql.calcite.CalcitePlanContext;
22+
import org.opensearch.sql.common.response.ResponseListener;
23+
import org.opensearch.sql.planner.physical.PhysicalPlan;
24+
25+
@ExtendWith(MockitoExtension.class)
26+
class DelegatingExecutionEngineTest {
27+
28+
@Mock private ExecutionEngine defaultEngine;
29+
30+
@Mock private ExecutionEngine extension1;
31+
32+
@Mock private ExecutionEngine extension2;
33+
34+
@Mock private RelNode relNode;
35+
36+
@Mock private CalcitePlanContext calciteContext;
37+
38+
@Mock private PhysicalPlan physicalPlan;
39+
40+
@Mock private ExecutionContext executionContext;
41+
42+
@Mock private ResponseListener<ExecutionEngine.QueryResponse> queryListener;
43+
44+
@Mock private ResponseListener<ExecutionEngine.ExplainResponse> explainListener;
45+
46+
@Test
47+
void executeRelNodeRoutesToMatchingExtension() {
48+
when(extension1.canVectorize(relNode)).thenReturn(true);
49+
DelegatingExecutionEngine engine =
50+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2));
51+
52+
engine.execute(relNode, calciteContext, queryListener);
53+
54+
verify(extension1).execute(relNode, calciteContext, queryListener);
55+
verify(defaultEngine, never()).execute(any(RelNode.class), any(), eq(queryListener));
56+
}
57+
58+
@Test
59+
void executeRelNodeFallsBackToDefaultWhenNoExtensionMatches() {
60+
when(extension1.canVectorize(relNode)).thenReturn(false);
61+
when(extension2.canVectorize(relNode)).thenReturn(false);
62+
DelegatingExecutionEngine engine =
63+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2));
64+
65+
engine.execute(relNode, calciteContext, queryListener);
66+
67+
verify(defaultEngine).execute(relNode, calciteContext, queryListener);
68+
verify(extension1, never()).execute(any(RelNode.class), any(), eq(queryListener));
69+
verify(extension2, never()).execute(any(RelNode.class), any(), eq(queryListener));
70+
}
71+
72+
@Test
73+
void executeRelNodeRoutesToFirstMatchingExtension() {
74+
when(extension1.canVectorize(relNode)).thenReturn(true);
75+
DelegatingExecutionEngine engine =
76+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2));
77+
78+
engine.execute(relNode, calciteContext, queryListener);
79+
80+
verify(extension1).execute(relNode, calciteContext, queryListener);
81+
verify(extension2, never()).execute(any(RelNode.class), any(), eq(queryListener));
82+
}
83+
84+
@Test
85+
void explainRelNodeRoutesToMatchingExtension() {
86+
when(extension1.canVectorize(relNode)).thenReturn(true);
87+
DelegatingExecutionEngine engine =
88+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));
89+
90+
engine.explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener);
91+
92+
verify(extension1).explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener);
93+
verify(defaultEngine, never()).explain(any(RelNode.class), any(), any(), eq(explainListener));
94+
}
95+
96+
@Test
97+
void explainRelNodeFallsBackToDefaultWhenNoExtensionMatches() {
98+
when(extension1.canVectorize(relNode)).thenReturn(false);
99+
DelegatingExecutionEngine engine =
100+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));
101+
102+
engine.explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener);
103+
104+
verify(defaultEngine).explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener);
105+
}
106+
107+
@Test
108+
void canVectorizeReturnsTrueWhenExtensionMatches() {
109+
when(extension1.canVectorize(relNode)).thenReturn(false);
110+
when(extension2.canVectorize(relNode)).thenReturn(true);
111+
DelegatingExecutionEngine engine =
112+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2));
113+
114+
assert engine.canVectorize(relNode);
115+
}
116+
117+
@Test
118+
void canVectorizeReturnsFalseWhenNoExtensionMatches() {
119+
when(extension1.canVectorize(relNode)).thenReturn(false);
120+
DelegatingExecutionEngine engine =
121+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));
122+
123+
assert !engine.canVectorize(relNode);
124+
}
125+
126+
@Test
127+
void physicalPlanExecuteDelegatesToDefault() {
128+
DelegatingExecutionEngine engine =
129+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));
130+
131+
engine.execute(physicalPlan, queryListener);
132+
133+
verify(defaultEngine).execute(physicalPlan, queryListener);
134+
}
135+
136+
@Test
137+
void physicalPlanExecuteWithContextDelegatesToDefault() {
138+
DelegatingExecutionEngine engine =
139+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));
140+
141+
engine.execute(physicalPlan, executionContext, queryListener);
142+
143+
verify(defaultEngine).execute(physicalPlan, executionContext, queryListener);
144+
}
145+
146+
@Test
147+
void physicalPlanExplainDelegatesToDefault() {
148+
DelegatingExecutionEngine engine =
149+
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));
150+
151+
engine.explain(physicalPlan, explainListener);
152+
153+
verify(defaultEngine).explain(physicalPlan, explainListener);
154+
}
155+
156+
@Test
157+
void emptyExtensionsListAlwaysFallsBackToDefault() {
158+
DelegatingExecutionEngine engine = new DelegatingExecutionEngine(defaultEngine, List.of());
159+
160+
engine.execute(relNode, calciteContext, queryListener);
161+
162+
verify(defaultEngine).execute(relNode, calciteContext, queryListener);
163+
}
164+
}

plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
4747
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
4848
import org.opensearch.plugins.ActionPlugin;
49+
import org.opensearch.plugins.ExtensiblePlugin;
4950
import org.opensearch.plugins.Plugin;
5051
import org.opensearch.plugins.ScriptPlugin;
5152
import org.opensearch.plugins.SystemIndexPlugin;
@@ -85,6 +86,7 @@
8586
import org.opensearch.sql.directquery.transport.model.ExecuteDirectQueryActionResponse;
8687
import org.opensearch.sql.directquery.transport.model.ReadDirectQueryResourcesActionResponse;
8788
import org.opensearch.sql.directquery.transport.model.WriteDirectQueryResourcesActionResponse;
89+
import org.opensearch.sql.executor.ExecutionEngine;
8890
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
8991
import org.opensearch.sql.legacy.metrics.Metrics;
9092
import org.opensearch.sql.legacy.plugin.RestSqlAction;
@@ -93,6 +95,7 @@
9395
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
9496
import org.opensearch.sql.opensearch.storage.OpenSearchDataSourceFactory;
9597
import org.opensearch.sql.opensearch.storage.script.CompoundedScriptEngine;
98+
import org.opensearch.sql.plugin.config.EngineExtensionsHolder;
9699
import org.opensearch.sql.plugin.config.OpenSearchPluginModule;
97100
import org.opensearch.sql.plugin.rest.RestPPLGrammarAction;
98101
import org.opensearch.sql.plugin.rest.RestPPLQueryAction;
@@ -126,10 +129,15 @@
126129
import org.opensearch.watcher.ResourceWatcherService;
127130

128131
public class SQLPlugin extends Plugin
129-
implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension {
132+
implements ActionPlugin,
133+
ScriptPlugin,
134+
SystemIndexPlugin,
135+
JobSchedulerExtension,
136+
ExtensiblePlugin {
130137

131138
private static final Logger LOGGER = LogManager.getLogger(SQLPlugin.class);
132139

140+
private List<ExecutionEngine> executionEngineExtensions = List.of();
133141
private ClusterService clusterService;
134142

135143
/** Settings should be inited when bootstrap the plugin. */
@@ -148,6 +156,18 @@ public String description() {
148156
return "Use sql to query OpenSearch.";
149157
}
150158

159+
@Override
160+
public void loadExtensions(ExtensionLoader loader) {
161+
List<ExecutionEngine> loaded = loader.loadExtensions(ExecutionEngine.class);
162+
this.executionEngineExtensions = loaded != null ? List.copyOf(loaded) : List.of();
163+
if (!executionEngineExtensions.isEmpty()) {
164+
LOGGER.info(
165+
"Loaded {} execution engine extension(s): {}",
166+
executionEngineExtensions.size(),
167+
executionEngineExtensions.stream().map(e -> e.getClass().getSimpleName()).toList());
168+
}
169+
}
170+
151171
@Override
152172
public List<RestHandler> getRestHandlers(
153173
Settings settings,
@@ -252,7 +272,7 @@ public Collection<Object> createComponents(
252272
LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);
253273
LocalClusterState.state().setClient(client);
254274
ModulesBuilder modules = new ModulesBuilder();
255-
modules.add(new OpenSearchPluginModule());
275+
modules.add(new OpenSearchPluginModule(executionEngineExtensions));
256276
modules.add(
257277
b -> {
258278
b.bind(NodeClient.class).toInstance((NodeClient) client);
@@ -287,12 +307,15 @@ public Collection<Object> createComponents(
287307
ScheduledAsyncQueryJobRunner.getJobRunnerInstance()
288308
.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService);
289309

310+
EngineExtensionsHolder extensionsHolder = new EngineExtensionsHolder(executionEngineExtensions);
311+
290312
return ImmutableList.of(
291313
dataSourceService,
292314
asyncQueryExecutorService,
293315
clusterManagerEventListener,
294316
pluginSettings,
295-
directQueryExecutorService);
317+
directQueryExecutorService,
318+
extensionsHolder);
296319
}
297320

298321
@Override
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.plugin.config;
7+
8+
import java.util.List;
9+
import org.opensearch.sql.executor.ExecutionEngine;
10+
11+
/**
12+
* Holds execution engine engines loaded via SPI. Returned from {@code SQLPlugin.createComponents()}
13+
* so that OpenSearch's Guice injector can inject it into transport actions like {@code
14+
* TransportPPLQueryAction}.
15+
*/
16+
public record EngineExtensionsHolder(List<ExecutionEngine> engines) {
17+
public EngineExtensionsHolder(List<ExecutionEngine> engines) {
18+
this.engines = engines != null ? List.copyOf(engines) : List.of();
19+
}
20+
}

0 commit comments

Comments
 (0)