Skip to content

Commit 7630db8

Browse files
authored
Adopt appendcol, appendpipe, multisearch to spath (#5075)
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
1 parent 4188dac commit 7630db8

6 files changed

Lines changed: 197 additions & 19 deletions

File tree

core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -350,12 +350,6 @@ public Node visitSearch(Search node, FieldResolutionContext context) {
350350
return node;
351351
}
352352

353-
@Override
354-
public Node visitAppendPipe(AppendPipe node, FieldResolutionContext context) {
355-
visitChildren(node, context);
356-
return node;
357-
}
358-
359353
@Override
360354
public Node visitRegex(Regex node, FieldResolutionContext context) {
361355
Set<String> regexFields = extractFieldsFromExpression(node.getField());
@@ -508,8 +502,10 @@ public Node visitFillNull(FillNull node, FieldResolutionContext context) {
508502

509503
@Override
510504
public Node visitAppendCol(AppendCol node, FieldResolutionContext context) {
511-
throw new IllegalArgumentException(
512-
"AppendCol command cannot be used together with spath command");
505+
// dispatch requirements to subsearch and main
506+
acceptAndVerifyNodeVisited(node.getSubSearch(), context);
507+
visitChildren(node, context);
508+
return node;
513509
}
514510

515511
@Override
@@ -521,9 +517,10 @@ public Node visitAppend(Append node, FieldResolutionContext context) {
521517
}
522518

523519
@Override
524-
public Node visitMultisearch(Multisearch node, FieldResolutionContext context) {
525-
throw new IllegalArgumentException(
526-
"Multisearch command cannot be used together with spath command");
520+
public Node visitAppendPipe(AppendPipe node, FieldResolutionContext context) {
521+
acceptAndVerifyNodeVisited(node.getSubQuery(), context);
522+
visitChildren(node, context);
523+
return node;
527524
}
528525

529526
@Override
@@ -533,7 +530,16 @@ public Node visitLookup(Lookup node, FieldResolutionContext context) {
533530

534531
@Override
535532
public Node visitValues(Values node, FieldResolutionContext context) {
536-
throw new IllegalArgumentException("Values command cannot be used together with spath command");
533+
// do nothing
534+
return node;
535+
}
536+
537+
@Override
538+
public Node visitMultisearch(Multisearch node, FieldResolutionContext context) {
539+
// dispatch requirements to subsearches and main
540+
node.getSubsearches().forEach(subsearch -> acceptAndVerifyNodeVisited(subsearch, context));
541+
visitChildren(node, context);
542+
return node;
537543
}
538544

539545
@Override

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2304,6 +2304,7 @@ public RelNode visitMultisearch(Multisearch node, CalcitePlanContext context) {
23042304
prunedSubSearch.accept(this, context);
23052305
subsearchNodes.add(context.relBuilder.build());
23062306
}
2307+
subsearchNodes = DynamicFieldsHelper.adjustInputsForDynamicFields(subsearchNodes, context);
23072308

23082309
// Use shared schema merging logic that handles type conflicts via field renaming
23092310
List<RelNode> alignedNodes =

core/src/main/java/org/opensearch/sql/calcite/DynamicFieldsHelper.java

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.ArrayList;
1111
import java.util.Collection;
1212
import java.util.Collections;
13+
import java.util.HashSet;
1314
import java.util.List;
1415
import java.util.Optional;
1516
import java.util.Set;
@@ -99,8 +100,9 @@ static void adjustJoinInputsForDynamicFields(
99100
// build once to modify the inputs already in the stack.
100101
RelNode right = context.relBuilder.build();
101102
RelNode left = context.relBuilder.build();
102-
left = adjustFieldsForDynamicFields(left, right, context);
103-
right = adjustFieldsForDynamicFields(right, left, context);
103+
List<RelNode> inputs = adjustInputsForDynamicFields(List.of(right, left), context);
104+
right = inputs.get(0);
105+
left = inputs.get(1);
104106
context.relBuilder.push(left);
105107
// `as(alias)` is needed since `build()` won't preserve alias
106108
leftAlias.map(alias -> context.relBuilder.as(alias));
@@ -119,6 +121,36 @@ static RelNode adjustFieldsForDynamicFields(
119121
return target;
120122
}
121123

124+
/** Adjust inputs to align the static/dynamic fields each other */
125+
static List<RelNode> adjustInputsForDynamicFields(
126+
List<RelNode> inputs, CalcitePlanContext context) {
127+
boolean requireAdjustment = inputs.stream().anyMatch(input -> hasDynamicFields(input));
128+
if (requireAdjustment) {
129+
List<String> requiredStaticFields = getRequiredStaticFields(inputs);
130+
return inputs.stream()
131+
.map(input -> adjustFieldsForDynamicFields(input, requiredStaticFields, context))
132+
.collect(Collectors.toList());
133+
} else {
134+
return inputs;
135+
}
136+
}
137+
138+
static List<String> getRequiredStaticFields(List<RelNode> inputs) {
139+
Set<String> requiredStaticFields = new HashSet<String>();
140+
for (RelNode input : inputs) {
141+
if (hasDynamicFields(input)) {
142+
requiredStaticFields.addAll(getStaticFields(input));
143+
}
144+
}
145+
return toSortedList(requiredStaticFields);
146+
}
147+
148+
private static List<String> toSortedList(Collection<String> collection) {
149+
ArrayList<String> result = new ArrayList<>(collection);
150+
Collections.sort(result);
151+
return result;
152+
}
153+
122154
/**
123155
* Project node's fields in `requiredFieldNames` as static field, and put other fields into `_MAP`
124156
* (dynamic fields) This projection is needed when merging an input with dynamic fields and an
@@ -128,16 +160,27 @@ static RelNode adjustFieldsForDynamicFields(
128160
static RelNode adjustFieldsForDynamicFields(
129161
RelNode node, List<String> staticFieldNames, CalcitePlanContext context) {
130162
context.relBuilder.push(node);
131-
List<String> existingFields = node.getRowType().getFieldNames();
163+
List<String> existingFields = getStaticFields(node);
132164
List<RexNode> project = new ArrayList<>();
133165
for (String existingField : existingFields) {
134166
if (staticFieldNames.contains(existingField)) {
135167
project.add(context.rexBuilder.makeInputRef(node, existingFields.indexOf(existingField)));
136168
}
137169
}
138-
project.add(
139-
context.relBuilder.alias(
140-
getFieldsAsMap(existingFields, staticFieldNames, context), DYNAMIC_FIELDS_MAP));
170+
if (hasDynamicFields(node)) {
171+
// _MAP = MAP_APPEND(_MAP, MAP(existingFields - staticFields))
172+
RexNode existingDynamicFieldsMap = context.relBuilder.field(DYNAMIC_FIELDS_MAP);
173+
RexNode additionalFieldsMap = getFieldsAsMap(existingFields, staticFieldNames, context);
174+
RexNode mapAppend =
175+
context.rexBuilder.makeCall(
176+
BuiltinFunctionName.MAP_APPEND, existingDynamicFieldsMap, additionalFieldsMap);
177+
project.add(context.relBuilder.alias(mapAppend, DYNAMIC_FIELDS_MAP));
178+
} else {
179+
// _MAP = MAP(existingFields - staticFields)
180+
project.add(
181+
context.relBuilder.alias(
182+
getFieldsAsMap(existingFields, staticFieldNames, context), DYNAMIC_FIELDS_MAP));
183+
}
141184
return context.relBuilder.project(project).build();
142185
}
143186

docs/user/ppl/cmd/spath.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ For more information about path syntax, see [json_extract](../functions/json.md#
4141
* **Limitation**: Field order in the result could be inconsistent with query without `spath` command, and the behavior might change in the future version.
4242
* **Limitation**: Filter with subquery (`where <field> in/exists [...]`) is not supported with `spath` command.
4343
* **Limitation**: `fillnull` command requires to specify fields when used with `spath` command.
44-
* **Limitation**: Following commands cannot be used together with `spath` command: `appendcol`, `multisearch`, `lookup`.
44+
* **Limitation**: Following commands cannot be used together with `spath` command: `lookup`.
4545
* **Performance**: Filter records before `spath` command for best performance (see Example 8)
4646

4747
* **Internal Implementation**: The auto extraction feature uses an internal `_MAP` system column to store dynamic fields during query processing. This column is automatically expanded into individual columns in the final results and users don't need to reference it directly. For more information, see [System Columns](../general/identifiers.md#system-columns).

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathCommandIT.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,4 +262,100 @@ public void testAppendWithSpathInSubsearchDynamicFields() throws IOException {
262262
rows(null, null, null, "simple", "4", sj("{'a': 1, 'b': 2, 'c': 3}")),
263263
rows("1", "3", "2", "simple", null, sj("{'a': 1, 'b': 2, 'c': 3}")));
264264
}
265+
266+
@Test
267+
public void testAppendColWithSpathInMain() throws IOException {
268+
JSONObject result =
269+
executeQuery(
270+
"source=test_json | where category='simple' | spath input=userData | appendcol [where"
271+
+ " category='simple'] | fields a, c, *");
272+
verifySchema(
273+
result,
274+
schema("a", "string"),
275+
schema("c", "string"),
276+
schema("category", "string"),
277+
schema("userData", "string"),
278+
schema("b", "string"));
279+
verifyDataRows(
280+
result,
281+
rows("1", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"), "2"),
282+
rows("1", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"), "2"));
283+
}
284+
285+
@Test
286+
public void testAppendColWithSpathInSubsearch() throws IOException {
287+
JSONObject result =
288+
executeQuery(
289+
"source=test_json | where category='simple' | appendcol [where category='simple' |"
290+
+ " spath input=userData] | fields a, c, *");
291+
verifySchema(
292+
result,
293+
schema("a", "string"),
294+
schema("c", "string"),
295+
schema("category", "string"),
296+
schema("userData", "string"),
297+
schema("b", "string"));
298+
verifyDataRows(
299+
result,
300+
rows("1", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"), "2"),
301+
rows("1", "3", "simple", sj("{'a': 1, 'b': 2, 'c': 3}"), "2"));
302+
}
303+
304+
@Test
305+
public void testAppendColWithSpathInBothInputs() throws IOException {
306+
JSONObject result =
307+
executeQuery(
308+
"source=test_json | where category='simple' | spath input=userData | appendcol [where"
309+
+ " category='simple' | spath input=userData ] | fields a, c, *");
310+
verifySchema(
311+
result,
312+
schema("a", "string"),
313+
schema("c", "string"),
314+
schema("b", "string"),
315+
schema("category", "string"),
316+
schema("userData", "string"));
317+
verifyDataRows(
318+
result,
319+
rows("1", "3", "2", "simple", sj("{'a': 1, 'b': 2, 'c': 3}")),
320+
rows("1", "3", "2", "simple", sj("{'a': 1, 'b': 2, 'c': 3}")));
321+
}
322+
323+
@Test
324+
public void testAppendPipeWithSpathInMain() throws IOException {
325+
JSONObject result =
326+
executeQuery(
327+
"source=test_json | where category='simple' | spath input=userData | stats sum(a) as"
328+
+ " total by b | appendpipe [stats sum(total) as total] | head 5");
329+
verifySchema(result, schema("total", "double"), schema("b", "string"));
330+
verifyDataRows(result, rows(2, "2"), rows(2, null));
331+
}
332+
333+
@Test
334+
public void testMultisearchWithSpath() throws IOException {
335+
JSONObject result =
336+
executeQuery(
337+
"| multisearch [source=test_json | where category='simple' | spath input=userData |"
338+
+ " head 1] [source=test_json | where category='nested' | spath input=userData] |"
339+
+ " fields a, c, *");
340+
verifySchema(
341+
result,
342+
schema("a", "string"),
343+
schema("c", "string"),
344+
schema("b", "string"),
345+
schema("category", "string"),
346+
schema("nested.d{}", "string"),
347+
schema("nested.e", "string"),
348+
schema("userData", "string"));
349+
verifyDataRows(
350+
result,
351+
rows("1", "3", "2", "simple", null, null, sj("{'a': 1, 'b': 2, 'c': 3}")),
352+
rows(
353+
null,
354+
null,
355+
null,
356+
"nested",
357+
"[1, 2, 3]",
358+
"str",
359+
sj("{'nested': {'d': [1, 2, 3], 'e': 'str'}}")));
360+
}
265361
}

ppl/src/test/java/org/opensearch/sql/ppl/parser/FieldResolutionVisitorTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,38 @@ public void testAppend() {
371371
"sub", new FieldResolutionResult(Set.of("a", "c", "testCase"), "*")));
372372
}
373373

374+
@Test
375+
public void testAppendCol() {
376+
String query =
377+
"source=main | where testCase='simple' | eval c = 4 | "
378+
+ "appendcol [where testCase='simple' ] | fields a, c, *";
379+
assertMultiRelationFields(
380+
query, Map.of("main", new FieldResolutionResult(Set.of("a", "testCase"), "*")));
381+
}
382+
383+
@Test
384+
public void testAppendpipe() {
385+
String query =
386+
"source=main | where testCase='simple' | stats sum(a) as sum_a by b | "
387+
+ "appendpipe [stats sum(sum_a) as total] | head 5";
388+
assertMultiRelationFields(
389+
query, Map.of("main", new FieldResolutionResult(Set.of("a", "b", "testCase"))));
390+
}
391+
392+
@Test
393+
public void testMultisearch() {
394+
String query =
395+
"| multisearch [source=main | where testCase='simple'] [source=sub | where"
396+
+ " testCase='simple'] | fields a, c, *";
397+
assertMultiRelationFields(
398+
query,
399+
Map.of(
400+
"main",
401+
new FieldResolutionResult(Set.of("a", "c", "testCase"), "*"),
402+
"sub",
403+
new FieldResolutionResult(Set.of("a", "c", "testCase"), "*")));
404+
}
405+
374406
@Test
375407
public void testAppendWithSpathInMain() {
376408
String query =

0 commit comments

Comments
 (0)