Skip to content

Commit 3f646d5

Browse files
authored
Add unified function interface with function discovery API (#5039)
* Add initial impl for unified function and repository Signed-off-by: Chen Dai <daichen@amazon.com> * Simplify unified function impl by built-in rex executor Signed-off-by: Chen Dai <daichen@amazon.com> * Extract unified function builder abstraction Signed-off-by: Chen Dai <daichen@amazon.com> * Refactor unified function repository internal and javadoc Signed-off-by: Chen Dai <daichen@amazon.com> * Remove type converter and use Calcite's sql type name as unified type for now Signed-off-by: Chen Dai <daichen@amazon.com> * Update readme Signed-off-by: Chen Dai <daichen@amazon.com> * Refactor unified function repository API and javadoc Signed-off-by: Chen Dai <daichen@amazon.com> * Refactor unified function calcite adapter to remove rex executable field Signed-off-by: Chen Dai <daichen@amazon.com> * Update readme Signed-off-by: Chen Dai <daichen@amazon.com> * Polish all javadoc comments Signed-off-by: Chen Dai <daichen@amazon.com> * Refactor calcite adapter create with utility methods Signed-off-by: Chen Dai <daichen@amazon.com> * Address comments from CodeRabbit Signed-off-by: Chen Dai <daichen@amazon.com> --------- Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent 5dbcd28 commit 3f646d5

6 files changed

Lines changed: 467 additions & 1 deletion

File tree

api/README.md

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ This module provides components organized into two main areas aligned with the [
1414
### Unified Execution Runtime
1515

1616
- **`UnifiedQueryCompiler`**: Compiles Calcite logical plans (`RelNode`) into executable JDBC `PreparedStatement` objects for separation of compilation and execution.
17+
- **`UnifiedFunction`**: Engine-agnostic function interface that enables functions to be evaluated across different execution engines without engine-specific code duplication.
18+
- **`UnifiedFunctionRepository`**: Repository for discovering and loading functions as `UnifiedFunction` instances, providing a bridge between function definitions and external execution engines.
1719

18-
Together, these components enable complete workflows: parse PPL queries into logical plans, transpile those plans into target database SQL, or compile and execute queries directly for testing and conformance validation.
20+
Together, these components enable complete workflows: parse PPL queries into logical plans, transpile those plans into target database SQL, compile and execute queries directly, or export PPL functions for use in external execution engines.
1921

2022
### Experimental API Design
2123

@@ -86,6 +88,58 @@ try (PreparedStatement statement = compiler.compile(plan)) {
8688
}
8789
```
8890

91+
### UnifiedFunction and UnifiedFunctionRepository
92+
93+
The Unified Function API provides an engine-agnostic abstraction for functions, enabling them to be evaluated across different execution engines (Spark, Flink, Calcite, etc.) without engine-specific code duplication.
94+
95+
#### Type System
96+
97+
Types are represented as SQL type name strings for engine-agnostic serialization:
98+
99+
- **Primitive types**: `"VARCHAR"`, `"INTEGER"`, `"BIGINT"`, `"DOUBLE"`, `"BOOLEAN"`, `"DATE"`, `"TIMESTAMP"`
100+
- **Array types**: `"ARRAY<ELEMENT_TYPE>"` (e.g., `"ARRAY<INTEGER>"`)
101+
- **Struct types**: `"STRUCT<field1:TYPE1, field2:TYPE2>"` (e.g., `"STRUCT<name:VARCHAR, age:INTEGER>"`)
102+
103+
#### Loading Functions
104+
105+
Use `UnifiedFunctionRepository` to discover and load unified functions:
106+
107+
```java
108+
// Create repository with context
109+
UnifiedFunctionRepository repository = new UnifiedFunctionRepository(context);
110+
111+
// Load all available functions
112+
List<UnifiedFunctionDescriptor> allFunctions = repository.loadFunctions();
113+
for (UnifiedFunctionDescriptor descriptor : allFunctions) {
114+
String name = descriptor.getFunctionName();
115+
UnifiedFunctionBuilder builder = descriptor.getBuilder();
116+
// Use builder to create function instances
117+
}
118+
119+
// Load a specific function by name
120+
UnifiedFunctionDescriptor upperDescriptor = repository.loadFunction("UPPER").orElseThrow();
121+
```
122+
123+
#### Creating and Using Functions
124+
125+
Functions are created using builders with specific input types:
126+
127+
```java
128+
// Get function descriptor
129+
UnifiedFunctionDescriptor descriptor = repository.loadFunction("UPPER").orElseThrow();
130+
131+
// Build function with specific input types
132+
UnifiedFunction upperFunc = descriptor.getBuilder().build(List.of("VARCHAR"));
133+
134+
// Get function metadata
135+
String name = upperFunc.getFunctionName(); // "UPPER"
136+
List<String> inputTypes = upperFunc.getInputTypes(); // ["VARCHAR"]
137+
String returnType = upperFunc.getReturnType(); // "VARCHAR"
138+
139+
// Evaluate function
140+
Object result = upperFunc.eval(List.of("hello")); // "HELLO"
141+
```
142+
89143
### Complete Workflow Examples
90144

91145
Combining all components for a complete PPL query workflow:
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api.function;
7+
8+
import java.io.Serializable;
9+
import java.util.List;
10+
11+
/**
12+
* A unified function abstraction that provides an engine-agnostic way to represent and evaluate
13+
* functions, enabling functions to be implemented once and used across multiple execution engines
14+
* without engine-specific code duplication.
15+
*
16+
* <p>Note: types are represented as engine-agnostic SQL type name strings (e.g., {@code "VARCHAR"},
17+
* {@code "INTEGER"}, {@code "ARRAY<T>"}, {@code "STRUCT<...>"}) to avoid introducing a dedicated
18+
* {@code UnifiedType} abstraction until it’s needed.
19+
*
20+
* @see java.io.Serializable
21+
*/
22+
public interface UnifiedFunction extends Serializable {
23+
24+
/**
25+
* Returns the name of the function.
26+
*
27+
* @return the function name
28+
*/
29+
String getFunctionName();
30+
31+
/**
32+
* Returns the unified type names expected for the input arguments.
33+
*
34+
* @return list of unified type names for input arguments
35+
*/
36+
List<String> getInputTypes();
37+
38+
/**
39+
* Returns the unified type name of the function result.
40+
*
41+
* @return unified type name of the function result
42+
*/
43+
String getReturnType();
44+
45+
/**
46+
* Evaluates the function with the provided input values.
47+
*
48+
* @param inputs argument values evaluated by the caller
49+
* @return the evaluated result, may be null depending on the function implementation
50+
*/
51+
Object eval(List<Object> inputs);
52+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api.function;
7+
8+
import java.util.List;
9+
import java.util.Map;
10+
import lombok.EqualsAndHashCode;
11+
import lombok.Getter;
12+
import lombok.RequiredArgsConstructor;
13+
import lombok.ToString;
14+
import org.apache.calcite.DataContext;
15+
import org.apache.calcite.DataContexts;
16+
import org.apache.calcite.rel.type.RelDataType;
17+
import org.apache.calcite.rel.type.RelDataTypeFactory;
18+
import org.apache.calcite.rex.RexBuilder;
19+
import org.apache.calcite.rex.RexExecutable;
20+
import org.apache.calcite.rex.RexExecutorImpl;
21+
import org.apache.calcite.rex.RexNode;
22+
import org.apache.calcite.sql.type.SqlTypeName;
23+
import org.opensearch.sql.expression.function.PPLFuncImpTable;
24+
25+
/** Adapter that implements {@link UnifiedFunction} using Calcite's {@link RexExecutorImpl}. */
26+
@ToString
27+
@EqualsAndHashCode(exclude = "compiledCode")
28+
@RequiredArgsConstructor
29+
public class UnifiedFunctionCalciteAdapter implements UnifiedFunction {
30+
31+
private static final long serialVersionUID = 1L;
32+
33+
/**
34+
* Key used by RexExecutorImpl's InputGetter to retrieve input values from DataContext. This is a
35+
* Calcite internal convention.
36+
*/
37+
private static final String INPUT_RECORD_KEY = "inputRecord";
38+
39+
/** Unified function name. */
40+
@Getter private final String functionName;
41+
42+
/** Unified type name of the return value. */
43+
@Getter private final String returnType;
44+
45+
/** Unified type names of the input arguments. */
46+
@Getter private final List<String> inputTypes;
47+
48+
/**
49+
* Compiled Java source for evaluating the function.
50+
*
51+
* <p>The generated code reads inputs from the {@code "inputRecord"} entry in {@link DataContext}.
52+
* Arguments are mapped to field variables named {@code "_0"}, {@code "_1"}, etc.
53+
*
54+
* <pre>{@code
55+
* // For UPPER(input) function:
56+
* Object[] inputRecord = (Object[]) dataContext.get("inputRecord");
57+
* String _0 = (String) inputRecord[0];
58+
* return _0 == null ? null : _0.toUpperCase();
59+
* }</pre>
60+
*/
61+
private final String compiledCode;
62+
63+
@Override
64+
public Object eval(List<Object> inputs) {
65+
RexExecutable rexExecutor = new RexExecutable(compiledCode, functionName);
66+
DataContext dataContext = DataContexts.of(Map.of(INPUT_RECORD_KEY, inputs.toArray()));
67+
rexExecutor.setDataContext(dataContext);
68+
69+
Object[] results = rexExecutor.execute();
70+
return (results == null || results.length == 0) ? null : results[0];
71+
}
72+
73+
/**
74+
* Creates Calcite RexNode adapter for a unified function.
75+
*
76+
* <p>Note: this method pre-compiles the resolved function expression and stores the generated
77+
* source code as a string. This avoids serializing {@link RexNode} instances and simplifies
78+
* distribution across execution engines. If performance or security concerns arise, we can change
79+
* this internal implementation.
80+
*
81+
* @param rexBuilder RexBuilder for creating expressions
82+
* @param functionName function name
83+
* @param inputTypes function argument types
84+
* @return configured adapter instance
85+
*/
86+
public static UnifiedFunctionCalciteAdapter create(
87+
RexBuilder rexBuilder, String functionName, List<String> inputTypes) {
88+
RexNode[] inputRefs = makeInputRefs(rexBuilder, inputTypes);
89+
RexNode resolved = PPLFuncImpTable.INSTANCE.resolve(rexBuilder, functionName, inputRefs);
90+
RelDataType inputRowType = buildInputRowType(rexBuilder, inputTypes);
91+
RexExecutable executable =
92+
RexExecutorImpl.getExecutable(rexBuilder, List.of(resolved), inputRowType);
93+
String returnType = resolved.getType().getSqlTypeName().getName();
94+
95+
return new UnifiedFunctionCalciteAdapter(
96+
functionName, returnType, List.copyOf(inputTypes), executable.getSource());
97+
}
98+
99+
private static RelDataType buildInputRowType(RexBuilder rexBuilder, List<String> inputTypes) {
100+
RelDataTypeFactory typeFactory = rexBuilder.getTypeFactory();
101+
RelDataTypeFactory.Builder builder = typeFactory.builder();
102+
for (int i = 0; i < inputTypes.size(); i++) {
103+
RelDataType relType = typeFactory.createSqlType(SqlTypeName.valueOf(inputTypes.get(i)));
104+
builder.add("_" + i, relType);
105+
}
106+
return builder.build();
107+
}
108+
109+
private static RexNode[] makeInputRefs(RexBuilder rexBuilder, List<String> inputTypes) {
110+
RelDataTypeFactory typeFactory = rexBuilder.getTypeFactory();
111+
RexNode[] inputRefs = new RexNode[inputTypes.size()];
112+
for (int i = 0; i < inputTypes.size(); i++) {
113+
RelDataType relType = typeFactory.createSqlType(SqlTypeName.valueOf(inputTypes.get(i)));
114+
inputRefs[i] = rexBuilder.makeInputRef(relType, i);
115+
}
116+
return inputRefs;
117+
}
118+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api.function;
7+
8+
import java.util.List;
9+
import java.util.Optional;
10+
import java.util.stream.Collectors;
11+
import lombok.RequiredArgsConstructor;
12+
import lombok.Value;
13+
import org.apache.calcite.rex.RexBuilder;
14+
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
15+
import org.opensearch.sql.api.UnifiedQueryContext;
16+
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
17+
18+
/** Repository for discovering and loading PPL functions as {@link UnifiedFunction} instances. */
19+
@RequiredArgsConstructor
20+
public class UnifiedFunctionRepository {
21+
22+
/** Unified query context containing CalcitePlanContext for creating Rex expressions. */
23+
private final UnifiedQueryContext context;
24+
25+
/**
26+
* Loads all PPL functions from {@link PPLBuiltinOperators} as descriptors.
27+
*
28+
* @return list of function descriptors
29+
*/
30+
public List<UnifiedFunctionDescriptor> loadFunctions() {
31+
RexBuilder rexBuilder = context.getPlanContext().rexBuilder;
32+
return PPLBuiltinOperators.instance().getOperatorList().stream()
33+
.filter(SqlUserDefinedFunction.class::isInstance)
34+
.map(
35+
operator -> {
36+
String functionName = operator.getName();
37+
UnifiedFunctionBuilder builder =
38+
inputTypes ->
39+
UnifiedFunctionCalciteAdapter.create(rexBuilder, functionName, inputTypes);
40+
return new UnifiedFunctionDescriptor(functionName, builder);
41+
})
42+
.collect(Collectors.toList());
43+
}
44+
45+
/**
46+
* Loads a specific PPL function by name.
47+
*
48+
* @param functionName the name of the function to load (case-insensitive)
49+
* @return optional function descriptor, empty if not found
50+
*/
51+
public Optional<UnifiedFunctionDescriptor> loadFunction(String functionName) {
52+
return loadFunctions().stream()
53+
.filter(desc -> desc.getFunctionName().equalsIgnoreCase(functionName))
54+
.findFirst();
55+
}
56+
57+
/** Function descriptor with name and builder for creating {@link UnifiedFunction} instances. */
58+
@Value
59+
public static class UnifiedFunctionDescriptor {
60+
/** The name of the function in upper case. */
61+
String functionName;
62+
63+
/** Builder for creating {@link UnifiedFunction} instances with specific input types. */
64+
UnifiedFunctionBuilder builder;
65+
}
66+
67+
/** Builder for creating {@link UnifiedFunction} instances with specific input types. */
68+
@FunctionalInterface
69+
public interface UnifiedFunctionBuilder {
70+
71+
/**
72+
* Builds a {@link UnifiedFunction} instance for the specified input types.
73+
*
74+
* @param inputTypes Unified type names for function arguments
75+
* @return a UnifiedFunction instance configured for the specified input types
76+
*/
77+
UnifiedFunction build(List<String> inputTypes);
78+
}
79+
}

0 commit comments

Comments
 (0)