Skip to content

Commit 9dc0060

Browse files
authored
Fix MAP path resolution for symbol-based PPL commands (#5198)
* Add map path materializer and tests Signed-off-by: Chen Dai <daichen@amazon.com> * Refactor map path materializer to simplify Signed-off-by: Chen Dai <daichen@amazon.com> * Refactor unit and integration tests Signed-off-by: Chen Dai <daichen@amazon.com> * Fix wildcard invalid field error and potential duplicate projection Signed-off-by: Chen Dai <daichen@amazon.com> * Add more IT for all commands on map path Signed-off-by: Chen Dai <daichen@amazon.com> * Update javadoc Signed-off-by: Chen Dai <daichen@amazon.com> --------- Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent ce68b0c commit 9dc0060

4 files changed

Lines changed: 778 additions & 0 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,17 +180,30 @@ public class CalciteRelNodeVisitor extends AbstractNodeVisitor<RelNode, CalciteP
180180
private final CalciteRexNodeVisitor rexVisitor;
181181
private final CalciteAggCallVisitor aggVisitor;
182182
private final DataSourceService dataSourceService;
183+
private final MapPathPreMaterializer mapPathMaterializer;
183184

184185
public CalciteRelNodeVisitor(DataSourceService dataSourceService) {
185186
this.rexVisitor = new CalciteRexNodeVisitor(this);
186187
this.aggVisitor = new CalciteAggCallVisitor(rexVisitor);
187188
this.dataSourceService = dataSourceService;
189+
this.mapPathMaterializer = new MapPathPreMaterializer(rexVisitor);
188190
}
189191

190192
public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {
191193
return unresolved.accept(this, context);
192194
}
193195

196+
@Override
197+
public RelNode visitChildren(Node node, CalcitePlanContext context) {
198+
RelNode result = super.visitChildren(node, context);
199+
if (node instanceof UnresolvedPlan plan) {
200+
// Materialize MAP dotted paths as flat columns after children are analyzed
201+
// (so MAP/struct types are known) but before the command's own visit logic runs.
202+
mapPathMaterializer.materializePaths(plan, context);
203+
}
204+
return result;
205+
}
206+
194207
@Override
195208
public RelNode visitRelation(Relation node, CalcitePlanContext context) {
196209
DataSourceSchemaIdentifierNameResolver nameResolver =
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite;
7+
8+
import java.util.ArrayList;
9+
import java.util.Collection;
10+
import java.util.HashSet;
11+
import java.util.List;
12+
import java.util.Set;
13+
import java.util.function.Function;
14+
import lombok.RequiredArgsConstructor;
15+
import org.apache.calcite.rex.RexNode;
16+
import org.apache.calcite.sql.SqlKind;
17+
import org.apache.commons.lang3.tuple.Pair;
18+
import org.apache.logging.log4j.LogManager;
19+
import org.apache.logging.log4j.Logger;
20+
import org.opensearch.sql.ast.expression.Field;
21+
import org.opensearch.sql.ast.tree.AddTotals;
22+
import org.opensearch.sql.ast.tree.FillNull;
23+
import org.opensearch.sql.ast.tree.MvCombine;
24+
import org.opensearch.sql.ast.tree.Project;
25+
import org.opensearch.sql.ast.tree.Rename;
26+
import org.opensearch.sql.ast.tree.Replace;
27+
import org.opensearch.sql.ast.tree.UnresolvedPlan;
28+
29+
/**
30+
* Resolves MAP dotted paths (e.g. {@code doc.user.name}) referenced by a PPL command and projects
31+
* them as flat named columns. Each dotted path that resolves to an {@code ITEM()} expression is
32+
* added to the current row type so downstream command logic can reference it by name.
33+
*/
34+
@RequiredArgsConstructor
35+
public class MapPathPreMaterializer {
36+
37+
private static final Logger log = LogManager.getLogger(MapPathPreMaterializer.class);
38+
39+
/** Visitor used to resolve field expressions to Calcite {@link RexNode}. */
40+
private final CalciteRexNodeVisitor rexVisitor;
41+
42+
/**
43+
* Resolves and projects MAP dotted paths referenced by the given command as flat named columns.
44+
*
45+
* @param plan the AST command being visited
46+
* @param context the current plan context with relBuilder state
47+
*/
48+
public void materializePaths(UnresolvedPlan plan, CalcitePlanContext context) {
49+
if (context.relBuilder.size() == 0) {
50+
return;
51+
}
52+
53+
List<Field> fields = extractFieldOperands(plan);
54+
if (!fields.isEmpty()) {
55+
doMaterializeMapPaths(fields, context);
56+
}
57+
}
58+
59+
private List<Field> extractFieldOperands(UnresolvedPlan node) {
60+
return switch (node) {
61+
case Rename rename -> toFields(rename.getRenameList(), m -> m.getOrigin());
62+
case FillNull fillNull -> toFields(fillNull.getReplacementPairs(), Pair::getLeft);
63+
case Replace replace -> toFields(replace.getFieldList());
64+
// Only fields-exclusion needs pre-materialization; inclusion resolves paths via visitProject
65+
case Project project -> project.isExcluded() ? toFields(project.getProjectList()) : List.of();
66+
case AddTotals addTotals -> toFields(addTotals.getFieldList());
67+
case MvCombine mvCombine -> List.of(mvCombine.getField());
68+
default -> List.of();
69+
};
70+
}
71+
72+
private void doMaterializeMapPaths(List<Field> fields, CalcitePlanContext context) {
73+
Set<String> existingFields =
74+
new HashSet<>(context.relBuilder.peek().getRowType().getFieldNames());
75+
List<RexNode> newColumns = new ArrayList<>();
76+
for (Field field : fields) {
77+
try {
78+
RexNode resolved = rexVisitor.analyze(field, context);
79+
String name = field.getField().toString();
80+
if (resolved.getKind() == SqlKind.ITEM && !existingFields.contains(name)) {
81+
newColumns.add(context.relBuilder.alias(resolved, name));
82+
existingFields.add(name);
83+
}
84+
} catch (RuntimeException e) {
85+
// Skip unresolvable fields (e.g. wildcards); let the command itself handle them
86+
log.debug("Skipping field resolution for '{}': {}", field.getField(), e.getMessage(), e);
87+
}
88+
}
89+
90+
if (!newColumns.isEmpty()) {
91+
context.relBuilder.projectPlus(newColumns);
92+
}
93+
}
94+
95+
private static <T> List<Field> toFields(Collection<T> items) {
96+
return toFields(items, Function.identity());
97+
}
98+
99+
private static <T> List<Field> toFields(Collection<T> items, Function<T, ?> mapper) {
100+
return items.stream()
101+
.map(mapper)
102+
.filter(Field.class::isInstance)
103+
.map(Field.class::cast)
104+
.toList();
105+
}
106+
}
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite;
7+
8+
import static org.mockito.ArgumentMatchers.any;
9+
import static org.mockito.ArgumentMatchers.argThat;
10+
import static org.mockito.ArgumentMatchers.eq;
11+
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
12+
import static org.mockito.Mockito.lenient;
13+
import static org.mockito.Mockito.mock;
14+
import static org.mockito.Mockito.never;
15+
import static org.mockito.Mockito.verify;
16+
import static org.mockito.Mockito.when;
17+
import static org.opensearch.sql.ast.dsl.AstDSL.argument;
18+
import static org.opensearch.sql.ast.dsl.AstDSL.booleanLiteral;
19+
import static org.opensearch.sql.ast.dsl.AstDSL.field;
20+
import static org.opensearch.sql.ast.dsl.AstDSL.fillNull;
21+
import static org.opensearch.sql.ast.dsl.AstDSL.filter;
22+
import static org.opensearch.sql.ast.dsl.AstDSL.map;
23+
import static org.opensearch.sql.ast.dsl.AstDSL.mvcombine;
24+
import static org.opensearch.sql.ast.dsl.AstDSL.project;
25+
import static org.opensearch.sql.ast.dsl.AstDSL.projectWithArg;
26+
import static org.opensearch.sql.ast.dsl.AstDSL.relation;
27+
import static org.opensearch.sql.ast.dsl.AstDSL.rename;
28+
import static org.opensearch.sql.ast.dsl.AstDSL.stringLiteral;
29+
30+
import java.sql.Connection;
31+
import java.util.LinkedHashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.Set;
35+
import java.util.stream.Stream;
36+
import org.apache.calcite.rel.RelNode;
37+
import org.apache.calcite.rex.RexBuilder;
38+
import org.apache.calcite.rex.RexNode;
39+
import org.apache.calcite.sql.SqlKind;
40+
import org.apache.calcite.tools.FrameworkConfig;
41+
import org.apache.commons.lang3.tuple.Pair;
42+
import org.junit.jupiter.api.BeforeEach;
43+
import org.junit.jupiter.api.Test;
44+
import org.junit.jupiter.api.extension.ExtendWith;
45+
import org.mockito.Mock;
46+
import org.mockito.MockedStatic;
47+
import org.mockito.junit.jupiter.MockitoExtension;
48+
import org.opensearch.sql.ast.expression.Field;
49+
import org.opensearch.sql.ast.expression.UnresolvedExpression;
50+
import org.opensearch.sql.ast.tree.AddTotals;
51+
import org.opensearch.sql.ast.tree.Replace;
52+
import org.opensearch.sql.ast.tree.ReplacePair;
53+
import org.opensearch.sql.ast.tree.UnresolvedPlan;
54+
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
55+
import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelBuilder;
56+
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
57+
import org.opensearch.sql.executor.QueryType;
58+
59+
@ExtendWith(MockitoExtension.class)
60+
public class MapPathPreMaterializerTest {
61+
62+
@Mock private CalciteRexNodeVisitor rexVisitor;
63+
@Mock private OpenSearchRelBuilder relBuilder;
64+
65+
/** Intercepts static factory methods in {@link CalciteToolsHelper} during context creation. */
66+
@Mock private MockedStatic<CalciteToolsHelper> mockedToolsHelper;
67+
68+
/** Placeholder child plan node required by commands like rename and fillnull. */
69+
private static final UnresolvedPlan DUMMY_CHILD = relation("t");
70+
71+
private CalcitePlanContext context;
72+
private MapPathPreMaterializer materializer;
73+
74+
@BeforeEach
75+
void setUp() {
76+
materializer = new MapPathPreMaterializer(rexVisitor);
77+
mockedToolsHelper
78+
.when(() -> CalciteToolsHelper.connect(any(), any()))
79+
.thenReturn(mock(Connection.class));
80+
mockedToolsHelper
81+
.when(() -> CalciteToolsHelper.create(any(), any(), any()))
82+
.thenReturn(relBuilder);
83+
when(relBuilder.getRexBuilder()).thenReturn(new RexBuilder(OpenSearchTypeFactory.TYPE_FACTORY));
84+
context =
85+
CalcitePlanContext.create(mock(FrameworkConfig.class), SysLimit.DEFAULT, QueryType.PPL);
86+
lenient().when(relBuilder.size()).thenReturn(1);
87+
lenient().when(relBuilder.peek()).thenReturn(mock(RelNode.class, RETURNS_DEEP_STUBS));
88+
}
89+
90+
// ---- Symbol-based commands ----
91+
92+
@Test
93+
void testRename() {
94+
givenMapPaths("doc.user.name")
95+
.whenCommand(rename(DUMMY_CHILD, map("doc.user.name", "username")))
96+
.shouldProject("doc.user.name");
97+
}
98+
99+
@Test
100+
void testFillnull() {
101+
givenMapPaths("doc.user.name")
102+
.whenCommand(
103+
fillNull(DUMMY_CHILD, List.of(Pair.of(field("doc.user.name"), stringLiteral("N/A")))))
104+
.shouldProject("doc.user.name");
105+
}
106+
107+
@Test
108+
void testReplace() {
109+
givenMapPaths("doc.user.name")
110+
.whenCommand(
111+
new Replace(
112+
List.of(new ReplacePair(stringLiteral("a"), stringLiteral("b"))),
113+
Set.of(field("doc.user.name"))))
114+
.shouldProject("doc.user.name");
115+
}
116+
117+
@Test
118+
void testFieldsExclusion() {
119+
givenMapPaths("doc.user.name")
120+
.whenCommand(
121+
projectWithArg(
122+
DUMMY_CHILD,
123+
List.of(argument("exclude", booleanLiteral(true))),
124+
field("doc.user.name")))
125+
.shouldProject("doc.user.name");
126+
}
127+
128+
@Test
129+
void testAddtotals() {
130+
givenMapPaths("doc.user.name")
131+
.whenCommand(new AddTotals(List.of(field("doc.user.name")), Map.of()))
132+
.shouldProject("doc.user.name");
133+
}
134+
135+
@Test
136+
void testMvcombine() {
137+
givenMapPaths("doc.user.name")
138+
.whenCommand(mvcombine(field("doc.user.name")))
139+
.shouldProject("doc.user.name");
140+
}
141+
142+
// ---- Multiple fields cases ----
143+
144+
@Test
145+
void testMultipleMapPaths() {
146+
givenMapPaths("doc.user.name", "doc.user.age")
147+
.whenCommand(
148+
fillNull(
149+
DUMMY_CHILD,
150+
List.of(
151+
Pair.of(field("doc.user.name"), stringLiteral("N/A")),
152+
Pair.of(field("doc.user.age"), stringLiteral("0")))))
153+
.shouldProject("doc.user.name", "doc.user.age");
154+
}
155+
156+
@Test
157+
void testMixedMapAndNonMapFields() {
158+
givenMapPaths("doc.user.name")
159+
.givenNonMapPaths("regular_field")
160+
.whenCommand(
161+
fillNull(
162+
DUMMY_CHILD,
163+
List.of(
164+
Pair.of(field("doc.user.name"), stringLiteral("N/A")),
165+
Pair.of(field("regular_field"), stringLiteral("0")))))
166+
.shouldProject("doc.user.name");
167+
}
168+
169+
// ---- No-op cases ----
170+
171+
@Test
172+
void testNoOpForFilter() {
173+
givenMapPaths("doc.user.name")
174+
.whenCommand(filter(DUMMY_CHILD, field("doc.user.name")))
175+
.shouldNotProject();
176+
}
177+
178+
@Test
179+
void testNoOpForNonExcludedProject() {
180+
givenMapPaths("doc.user.name")
181+
.whenCommand(project(DUMMY_CHILD, field("doc.user.name")))
182+
.shouldNotProject();
183+
}
184+
185+
@Test
186+
void testNoOpWhenRelBuilderStackEmpty() {
187+
lenient().when(relBuilder.size()).thenReturn(0);
188+
givenMapPaths("doc.user.name")
189+
.whenCommand(rename(DUMMY_CHILD, map("doc.user.name", "u")))
190+
.shouldNotProject();
191+
}
192+
193+
@Test
194+
void testNoOpWhenFieldResolvesToNonItemAccess() {
195+
givenMapPaths()
196+
.givenNonMapPaths("regular_field")
197+
.whenCommand(rename(DUMMY_CHILD, map("regular_field", "alias")))
198+
.shouldNotProject();
199+
}
200+
201+
// ---- Fluent test helper ----
202+
203+
private MapPathAssertion givenMapPaths(String... fieldNames) {
204+
return new MapPathAssertion(fieldNames);
205+
}
206+
207+
private class MapPathAssertion {
208+
private final Map<String, RexNode> aliasedNodes = new LinkedHashMap<>();
209+
210+
MapPathAssertion(String... mapFieldNames) {
211+
for (String name : mapFieldNames) {
212+
RexNode item = mock(RexNode.class, "item:" + name);
213+
RexNode aliased = mock(RexNode.class, "aliased:" + name);
214+
lenient().when(item.getKind()).thenReturn(SqlKind.ITEM);
215+
lenient().when(rexVisitor.analyze(fieldMatching(name), eq(context))).thenReturn(item);
216+
lenient().when(relBuilder.alias(item, name)).thenReturn(aliased);
217+
aliasedNodes.put(name, aliased);
218+
}
219+
}
220+
221+
MapPathAssertion givenNonMapPaths(String... fieldNames) {
222+
for (String name : fieldNames) {
223+
RexNode ref = mock(RexNode.class, "ref:" + name);
224+
when(ref.getKind()).thenReturn(SqlKind.INPUT_REF);
225+
lenient().when(rexVisitor.analyze(fieldMatching(name), eq(context))).thenReturn(ref);
226+
}
227+
return this;
228+
}
229+
230+
MapPathAssertion whenCommand(UnresolvedPlan command) {
231+
materializer.materializePaths(command, context);
232+
return this;
233+
}
234+
235+
void shouldProject(String... expectedFields) {
236+
List<RexNode> expected = Stream.of(expectedFields).map(aliasedNodes::get).toList();
237+
verify(relBuilder).projectPlus(expected);
238+
}
239+
240+
void shouldNotProject() {
241+
verify(relBuilder, never()).projectPlus(any(List.class));
242+
}
243+
244+
private static UnresolvedExpression fieldMatching(String name) {
245+
return argThat(expr -> expr instanceof Field f && f.getField().toString().equals(name));
246+
}
247+
}
248+
}

0 commit comments

Comments
 (0)