Skip to content

Commit e0c6633

Browse files
[Feature] PPL Command: MvExpand (#5144)
* MvExpand new PR - After resolving the merge issues Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> # Conflicts: # core/src/main/java/org/opensearch/sql/analysis/Analyzer.java # core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java # core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java # docs/category.json # docs/user/ppl/index.md # integ-test/src/test/java/org/opensearch/sql/calcite/CalciteNoPushdownIT.java # integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java # integ-test/src/test/java/org/opensearch/sql/ppl/NewAddedCommandsIT.java # integ-test/src/test/java/org/opensearch/sql/security/CalciteCrossClusterSearchIT.java # ppl/src/main/antlr/OpenSearchPPLParser.g4 # ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java # ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java # ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java * Address coderrabbit comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address coderrabbit comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * merge main to my branch Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address corerabbit suggestions Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address corerabbit suggestions Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * ci: trigger Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * ci: trigger Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address corerabbit suggestions Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * ci: trigger Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address comments. Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Change the exception from illegalarg to SyntaxCheckException Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * change the exception message for consistency Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * ci: rerun Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> --------- Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> Co-authored-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
1 parent 5a1448f commit e0c6633

27 files changed

Lines changed: 1241 additions & 12 deletions

File tree

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.opensearch.sql.ast.tree.ML;
8282
import org.opensearch.sql.ast.tree.Multisearch;
8383
import org.opensearch.sql.ast.tree.MvCombine;
84+
import org.opensearch.sql.ast.tree.MvExpand;
8485
import org.opensearch.sql.ast.tree.NoMv;
8586
import org.opensearch.sql.ast.tree.Paginate;
8687
import org.opensearch.sql.ast.tree.Parse;
@@ -552,6 +553,11 @@ public LogicalPlan visitNoMv(NoMv node, AnalysisContext context) {
552553
throw getOnlyForCalciteException("nomv");
553554
}
554555

556+
@Override
557+
public LogicalPlan visitMvExpand(MvExpand node, AnalysisContext context) {
558+
throw getOnlyForCalciteException("mvexpand");
559+
}
560+
555561
/** Build {@link ParseExpression} to context and skip to child nodes. */
556562
@Override
557563
public LogicalPlan visitParse(Parse node, AnalysisContext context) {

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.opensearch.sql.ast.tree.ML;
7070
import org.opensearch.sql.ast.tree.Multisearch;
7171
import org.opensearch.sql.ast.tree.MvCombine;
72+
import org.opensearch.sql.ast.tree.MvExpand;
7273
import org.opensearch.sql.ast.tree.NoMv;
7374
import org.opensearch.sql.ast.tree.Paginate;
7475
import org.opensearch.sql.ast.tree.Parse;
@@ -480,4 +481,8 @@ public T visitMvCombine(MvCombine node, C context) {
480481
public T visitNoMv(NoMv node, C context) {
481482
return visitChildren(node, context);
482483
}
484+
485+
public T visitMvExpand(MvExpand node, C context) {
486+
return visitChildren(node, context);
487+
}
483488
}

core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.opensearch.sql.ast.tree.Limit;
6464
import org.opensearch.sql.ast.tree.MinSpanBin;
6565
import org.opensearch.sql.ast.tree.MvCombine;
66+
import org.opensearch.sql.ast.tree.MvExpand;
6667
import org.opensearch.sql.ast.tree.Parse;
6768
import org.opensearch.sql.ast.tree.Patterns;
6869
import org.opensearch.sql.ast.tree.Project;
@@ -477,6 +478,16 @@ public static MvCombine mvcombine(Field field, String delim) {
477478
return new MvCombine(field, delim);
478479
}
479480

481+
/**
482+
* Build an MVEXPAND plan node and attach it to the input plan.
483+
*
484+
* <p>`@param` input input plan `@param` field field to expand `@param` limit optional
485+
* per-document limit `@return` MvExpand plan attached to the input
486+
*/
487+
public static UnresolvedPlan mvexpand(UnresolvedPlan input, Field field, Integer limit) {
488+
return new MvExpand(field, limit).attach(input);
489+
}
490+
480491
public static List<Argument> sortOptions() {
481492
return exprList(argument("desc", booleanLiteral(false)));
482493
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import javax.annotation.Nullable;
11+
import lombok.EqualsAndHashCode;
12+
import lombok.Getter;
13+
import lombok.ToString;
14+
import org.opensearch.sql.ast.AbstractNodeVisitor;
15+
import org.opensearch.sql.ast.expression.Field;
16+
17+
/** AST node representing the {@code mvexpand} PPL command: {@code mvexpand <field> [limit=N]}. */
18+
@ToString
19+
@EqualsAndHashCode(callSuper = false)
20+
public class MvExpand extends UnresolvedPlan {
21+
22+
private UnresolvedPlan child;
23+
@Getter private final Field field;
24+
@Getter @Nullable private final Integer limit;
25+
26+
public MvExpand(Field field, @Nullable Integer limit) {
27+
this.field = field;
28+
this.limit = limit;
29+
}
30+
31+
@Override
32+
public MvExpand attach(UnresolvedPlan child) {
33+
this.child = child;
34+
return this;
35+
}
36+
37+
@Override
38+
public List<UnresolvedPlan> getChild() {
39+
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
40+
}
41+
42+
@Override
43+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
44+
return nodeVisitor.visitMvExpand(this, context);
45+
}
46+
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
import org.opensearch.sql.ast.tree.ML;
127127
import org.opensearch.sql.ast.tree.Multisearch;
128128
import org.opensearch.sql.ast.tree.MvCombine;
129+
import org.opensearch.sql.ast.tree.MvExpand;
129130
import org.opensearch.sql.ast.tree.NoMv;
130131
import org.opensearch.sql.ast.tree.Paginate;
131132
import org.opensearch.sql.ast.tree.Parse;
@@ -930,7 +931,11 @@ public RelNode visitPatterns(Patterns node, CalcitePlanContext context) {
930931
.toList();
931932
context.relBuilder.aggregate(context.relBuilder.groupKey(groupByList), aggCall);
932933
buildExpandRelNode(
933-
context.relBuilder.field(node.getAlias()), node.getAlias(), node.getAlias(), context);
934+
context.relBuilder.field(node.getAlias()),
935+
node.getAlias(),
936+
node.getAlias(),
937+
null,
938+
context);
934939
flattenParsedPattern(
935940
node.getAlias(),
936941
context.relBuilder.field(node.getAlias()),
@@ -3127,7 +3132,7 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) {
31273132
RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context);
31283133
String alias = expand.getAlias();
31293134

3130-
buildExpandRelNode(arrayFieldRex, arrayField.getField().toString(), alias, context);
3135+
buildExpandRelNode(arrayFieldRex, arrayField.getField().toString(), alias, null, context);
31313136

31323137
return context.relBuilder.peek();
31333138
}
@@ -3320,6 +3325,61 @@ public RelNode visitNoMv(NoMv node, CalcitePlanContext context) {
33203325
return visitEval((Eval) node.rewriteAsEval(), context);
33213326
}
33223327

3328+
/**
3329+
* MVExpand command visitor.
3330+
*
3331+
* <p>Expands a multi-value (array) field into separate rows using Calcite's CORRELATE join with
3332+
* UNCOLLECT. Each element of the array becomes a separate row while preserving all other fields
3333+
* from the original row.
3334+
*
3335+
* <p>Implementation uses {@link #buildExpandRelNode} to create a correlate join between the
3336+
* original relation and an uncollected (unnested) version of the target array field.
3337+
*
3338+
* <p>Behavior:
3339+
*
3340+
* <ul>
3341+
* <li>Array fields: Each array element is expanded into a separate row
3342+
* <li>Non-array fields: Treated as single-element arrays (returns original row unchanged)
3343+
* <li>Missing fields: Throws {@link SemanticCheckException}
3344+
* <li>Optional limit parameter: Limits the number of expanded elements per document
3345+
* </ul>
3346+
*
3347+
* @param mvExpand MVExpand command containing the field to expand and optional limit
3348+
* @param context CalcitePlanContext containing the RelBuilder and planning context
3349+
* @return RelNode representing the relation with the expanded multi-value field
3350+
* @throws SemanticCheckException if the target field does not exist in the schema
3351+
*/
3352+
@Override
3353+
public RelNode visitMvExpand(MvExpand mvExpand, CalcitePlanContext context) {
3354+
visitChildren(mvExpand, context);
3355+
3356+
final RelBuilder relBuilder = context.relBuilder;
3357+
final Field field = mvExpand.getField();
3358+
final String fieldName = field.getField().toString();
3359+
3360+
final RelDataType inputType = relBuilder.peek().getRowType();
3361+
final RelDataTypeField inputField =
3362+
inputType.getField(fieldName, /*caseSensitive*/ true, /*elideRecord*/ false);
3363+
3364+
if (inputField == null) {
3365+
throw new SemanticCheckException(
3366+
String.format("Field '%s' not found in the schema", fieldName));
3367+
}
3368+
3369+
final RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(field, context);
3370+
3371+
final RelDataType fieldType = arrayFieldRex.getType();
3372+
if (!(SqlTypeUtil.isArray(fieldType) || SqlTypeUtil.isMultiset(fieldType))) {
3373+
// For non-array/multiset fields (scalars), mvexpand just returns the field unchanged.
3374+
// This treats single-value fields as if they were arrays with one element.
3375+
return relBuilder.peek();
3376+
}
3377+
3378+
buildExpandRelNode(arrayFieldRex, fieldName, fieldName, mvExpand.getLimit(), context);
3379+
3380+
return relBuilder.peek();
3381+
}
3382+
33233383
@Override
33243384
public RelNode visitValues(Values values, CalcitePlanContext context) {
33253385
if (values.getValues() == null || values.getValues().isEmpty()) {
@@ -3564,7 +3624,11 @@ private void flattenParsedPattern(
35643624
}
35653625

35663626
private void buildExpandRelNode(
3567-
RexInputRef arrayFieldRex, String arrayFieldName, String alias, CalcitePlanContext context) {
3627+
RexInputRef arrayFieldRex,
3628+
String arrayFieldName,
3629+
String alias,
3630+
@Nullable Integer perDocLimit,
3631+
CalcitePlanContext context) {
35683632
// 3. Capture the outer row in a CorrelationId
35693633
Holder<RexCorrelVariable> correlVariable = Holder.empty();
35703634
context.relBuilder.variable(correlVariable::set);
@@ -3579,14 +3643,17 @@ private void buildExpandRelNode(
35793643
RelNode leftNode = context.relBuilder.build();
35803644

35813645
// 5. Build join right node and expand the array field using uncollect
3582-
RelNode rightNode =
3583-
context
3584-
.relBuilder
3585-
// fake input, see convertUnnest and convertExpression in Calcite SqlToRelConverter
3586-
.push(LogicalValues.createOneRow(context.relBuilder.getCluster()))
3587-
.project(List.of(correlArrayFieldAccess), List.of(arrayFieldName))
3588-
.uncollect(List.of(), false)
3589-
.build();
3646+
context
3647+
.relBuilder
3648+
// fake input, see convertUnnest and convertExpression in Calcite SqlToRelConverter
3649+
.push(LogicalValues.createOneRow(context.relBuilder.getCluster()))
3650+
.project(List.of(correlArrayFieldAccess), List.of(arrayFieldName))
3651+
.uncollect(List.of(), false);
3652+
3653+
if (perDocLimit != null) {
3654+
context.relBuilder.limit(0, perDocLimit);
3655+
}
3656+
RelNode rightNode = context.relBuilder.build();
35903657

35913658
// 6. Perform a nested-loop join (correlate) between the original table and the expanded
35923659
// array field.

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,6 +1105,10 @@ void populate() {
11051105
OperandTypes.family(SqlTypeFamily.ARRAY, SqlTypeFamily.INTEGER)
11061106
.or(OperandTypes.family(SqlTypeFamily.MAP, SqlTypeFamily.ANY)),
11071107
false));
1108+
registerOperator(
1109+
INTERNAL_ITEM,
1110+
SqlStdOperatorTable.ITEM,
1111+
PPLTypeChecker.family(SqlTypeFamily.IGNORE, SqlTypeFamily.CHARACTER));
11081112
registerOperator(
11091113
XOR,
11101114
SqlStdOperatorTable.NOT_EQUALS,

docs/category.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
"user/ppl/cmd/lookup.md",
2727
"user/ppl/cmd/mvcombine.md",
2828
"user/ppl/cmd/nomv.md",
29+
"user/ppl/cmd/mvexpand.md",
2930
"user/ppl/cmd/parse.md",
3031
"user/ppl/cmd/patterns.md",
3132
"user/ppl/cmd/rare.md",

docs/user/ppl/cmd/mvexpand.md

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# mvexpand
2+
3+
## Description
4+
The `mvexpand` command expands each value in a multivalue (array) field into a separate row. For each document, every element in the specified array field is returned as a new row.
5+
6+
7+
## Syntax
8+
```
9+
mvexpand <field> [limit=<int>]
10+
```
11+
12+
- `<field>`: The multivalue (array) field to expand. (Required)
13+
- `limit`: Maximum number of values per document to expand. If not specified, all array elements are expanded. (Optional)
14+
15+
16+
### Output field naming
17+
After `mvexpand`, the expanded value remains under the same field name (for example, `tags` or `ids`).
18+
If the array contains objects, you can reference subfields (for example, `skills.name`).
19+
20+
21+
## Examples
22+
23+
### Example 1: Basic Expansion (single document)
24+
Input document (case "basic") contains three tag values.
25+
26+
PPL query:
27+
```ppl
28+
source=people
29+
| eval tags = array('error', 'warning', 'info')
30+
| fields tags
31+
| head 1
32+
| mvexpand tags
33+
| fields tags
34+
```
35+
36+
Expected output:
37+
```text
38+
fetched rows / total rows = 3/3
39+
+---------+
40+
| tags |
41+
|---------|
42+
| error |
43+
| warning |
44+
| info |
45+
+---------+
46+
```
47+
48+
### Example 2: Expansion with Limit
49+
Input document (case "ids") contains an array of integers; expand and apply limit.
50+
51+
PPL query:
52+
```ppl
53+
source=people
54+
| eval ids = array(1, 2, 3, 4, 5)
55+
| fields ids
56+
| head 1
57+
| mvexpand ids limit=3
58+
| fields ids
59+
```
60+
61+
Expected output:
62+
```text
63+
fetched rows / total rows = 3/3
64+
+-----+
65+
| ids |
66+
|-----|
67+
| 1 |
68+
| 2 |
69+
| 3 |
70+
+-----+
71+
```
72+
73+
### Example 3: Expand projects
74+
This example demonstrates expanding a multivalue `projects` field into one row per project.
75+
76+
PPL query:
77+
```ppl
78+
source=people
79+
| head 1
80+
| fields projects
81+
| mvexpand projects
82+
| fields projects.name
83+
```
84+
85+
Expected output:
86+
```text
87+
fetched rows / total rows = 3/3
88+
+--------------------------------+
89+
| projects.name |
90+
|--------------------------------|
91+
| AWS Redshift Spectrum querying |
92+
| AWS Redshift security |
93+
| AWS Aurora security |
94+
+--------------------------------+
95+
```
96+
97+
### Example 4: Single-value array (case "single")
98+
Single-element array should expand to one row.
99+
100+
PPL query:
101+
```ppl
102+
source=people
103+
| eval tags = array('error')
104+
| fields tags
105+
| head 1
106+
| mvexpand tags
107+
| fields tags
108+
```
109+
110+
Expected output:
111+
```text
112+
fetched rows / total rows = 1/1
113+
+-------+
114+
| tags |
115+
|-------|
116+
| error |
117+
+-------+
118+
```
119+
120+
### Example 5: Missing Field
121+
If the field does not exist in the input schema (for example, it is not mapped or was projected out earlier), mvexpand throws a semantic check exception.
122+
123+
PPL query:
124+
```ppl
125+
source=people
126+
| eval some_field = 'x'
127+
| fields some_field
128+
| head 1
129+
| mvexpand tags
130+
| fields tags
131+
```
132+
133+
Expected output:
134+
```text
135+
{'reason': 'Invalid Query', 'details': "Field 'tags' not found in the schema", 'type': 'SemanticCheckException'}
136+
Error: Query returned no data
137+
```

0 commit comments

Comments
 (0)