Skip to content

Commit e4ac74c

Browse files
authored
Fix MAP path resolution for top/rare, join, lookup and streamstats (#5206)
1 parent 7d6d983 commit e4ac74c

5 files changed

Lines changed: 267 additions & 18 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,6 +1494,9 @@ private Optional<RexLiteral> extractAliasLiteral(RexNode node) {
14941494
public RelNode visitJoin(Join node, CalcitePlanContext context) {
14951495
List<UnresolvedPlan> children = node.getChildren();
14961496
children.forEach(c -> analyze(c, context));
1497+
// Trigger manually since Join bypass visitChildren and getChild
1498+
mapPathMaterializer.materializePaths(node, context);
1499+
14971500
if (node.getJoinCondition().isEmpty()) {
14981501
// join-with-field-list grammar
14991502
List<String> leftColumns = context.relBuilder.peek(1).getRowType().getFieldNames();

core/src/main/java/org/opensearch/sql/calcite/MapPathPreMaterializer.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.sql.calcite;
77

8+
import com.google.common.collect.ImmutableList;
89
import java.util.ArrayList;
910
import java.util.Collection;
1011
import java.util.HashSet;
@@ -18,12 +19,17 @@
1819
import org.apache.logging.log4j.LogManager;
1920
import org.apache.logging.log4j.Logger;
2021
import org.opensearch.sql.ast.expression.Field;
22+
import org.opensearch.sql.ast.expression.QualifiedName;
2123
import org.opensearch.sql.ast.tree.AddTotals;
2224
import org.opensearch.sql.ast.tree.FillNull;
25+
import org.opensearch.sql.ast.tree.Join;
26+
import org.opensearch.sql.ast.tree.Lookup;
2327
import org.opensearch.sql.ast.tree.MvCombine;
2428
import org.opensearch.sql.ast.tree.Project;
29+
import org.opensearch.sql.ast.tree.RareTopN;
2530
import org.opensearch.sql.ast.tree.Rename;
2631
import org.opensearch.sql.ast.tree.Replace;
32+
import org.opensearch.sql.ast.tree.StreamWindow;
2733
import org.opensearch.sql.ast.tree.UnresolvedPlan;
2834

2935
/**
@@ -58,13 +64,26 @@ public void materializePaths(UnresolvedPlan plan, CalcitePlanContext context) {
5864

5965
private List<Field> extractFieldOperands(UnresolvedPlan node) {
6066
return switch (node) {
67+
// Symbol-based commands require the map path symbol present in the symbol table
6168
case Rename rename -> toFields(rename.getRenameList(), m -> m.getOrigin());
6269
case FillNull fillNull -> toFields(fillNull.getReplacementPairs(), Pair::getLeft);
6370
case Replace replace -> toFields(replace.getFieldList());
64-
// Only fields-exclusion needs pre-materialization; inclusion resolves paths via visitProject
6571
case Project project -> project.isExcluded() ? toFields(project.getProjectList()) : List.of();
6672
case AddTotals addTotals -> toFields(addTotals.getFieldList());
6773
case MvCombine mvCombine -> List.of(mvCombine.getField());
74+
// Commands broken by fragmented resolve API but direct surgical fix is highly complex
75+
case RareTopN rareTopN ->
76+
ImmutableList.<Field>builder()
77+
.addAll(rareTopN.getFields())
78+
.addAll(toFields(rareTopN.getGroupExprList()))
79+
.build();
80+
case StreamWindow streamWindow -> toFields(streamWindow.getGroupList());
81+
case Lookup lookup ->
82+
toFields(
83+
lookup.getMappingAliasMap().values(),
84+
name -> new Field(QualifiedName.of(List.of(name.split("\\.")))));
85+
case Join join ->
86+
join.getJoinFields().map(MapPathPreMaterializer::toFields).orElse(List.of());
6887
default -> List.of();
6988
};
7089
}
@@ -81,9 +100,10 @@ private void doMaterializeMapPaths(List<Field> fields, CalcitePlanContext contex
81100
newColumns.add(context.relBuilder.alias(resolved, name));
82101
existingFields.add(name);
83102
}
84-
} catch (RuntimeException e) {
103+
} catch (RuntimeException | AssertionError e) {
104+
// FIXME: QualifiedNameResolver throws error for dotted path on non-map field
85105
// Skip unresolvable fields (e.g. wildcards); let the command itself handle them
86-
log.debug("Skipping field resolution for '{}': {}", field.getField(), e.getMessage(), e);
106+
log.debug("Skipping field resolution for '{}': {}", field.getField(), e.getMessage());
87107
}
88108
}
89109

core/src/main/java/org/opensearch/sql/calcite/QualifiedNameResolver.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,9 @@ private static RexNode resolveFieldAccess(
288288
if (length == parts.size() - start) {
289289
return field;
290290
} else {
291-
String itemName = joinParts(parts, length + start, parts.size() - length);
291+
int remainingStart = length + start;
292+
int remainingLength = parts.size() - remainingStart;
293+
String itemName = joinParts(parts, remainingStart, remainingLength);
292294
return createItemAccess(field, itemName, context);
293295
}
294296
}

core/src/test/java/org/opensearch/sql/calcite/MapPathPreMaterializerTest.java

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
import static org.mockito.Mockito.when;
1717
import static org.opensearch.sql.ast.dsl.AstDSL.argument;
1818
import static org.opensearch.sql.ast.dsl.AstDSL.booleanLiteral;
19+
import static org.opensearch.sql.ast.dsl.AstDSL.defaultTopArgs;
1920
import static org.opensearch.sql.ast.dsl.AstDSL.field;
2021
import static org.opensearch.sql.ast.dsl.AstDSL.fillNull;
2122
import static org.opensearch.sql.ast.dsl.AstDSL.filter;
2223
import static org.opensearch.sql.ast.dsl.AstDSL.map;
2324
import static org.opensearch.sql.ast.dsl.AstDSL.mvcombine;
2425
import static org.opensearch.sql.ast.dsl.AstDSL.project;
2526
import static org.opensearch.sql.ast.dsl.AstDSL.projectWithArg;
27+
import static org.opensearch.sql.ast.dsl.AstDSL.rareTopN;
2628
import static org.opensearch.sql.ast.dsl.AstDSL.relation;
2729
import static org.opensearch.sql.ast.dsl.AstDSL.rename;
2830
import static org.opensearch.sql.ast.dsl.AstDSL.stringLiteral;
@@ -31,6 +33,7 @@
3133
import java.util.LinkedHashMap;
3234
import java.util.List;
3335
import java.util.Map;
36+
import java.util.Optional;
3437
import java.util.Set;
3538
import java.util.stream.Stream;
3639
import org.apache.calcite.rel.RelNode;
@@ -45,11 +48,16 @@
4548
import org.mockito.Mock;
4649
import org.mockito.MockedStatic;
4750
import org.mockito.junit.jupiter.MockitoExtension;
51+
import org.opensearch.sql.ast.expression.Argument.ArgumentMap;
4852
import org.opensearch.sql.ast.expression.Field;
4953
import org.opensearch.sql.ast.expression.UnresolvedExpression;
5054
import org.opensearch.sql.ast.tree.AddTotals;
55+
import org.opensearch.sql.ast.tree.Join;
56+
import org.opensearch.sql.ast.tree.Lookup;
57+
import org.opensearch.sql.ast.tree.RareTopN;
5158
import org.opensearch.sql.ast.tree.Replace;
5259
import org.opensearch.sql.ast.tree.ReplacePair;
60+
import org.opensearch.sql.ast.tree.StreamWindow;
5361
import org.opensearch.sql.ast.tree.UnresolvedPlan;
5462
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
5563
import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelBuilder;
@@ -139,6 +147,69 @@ void testMvcombine() {
139147
.shouldProject("doc.user.name");
140148
}
141149

150+
@Test
151+
void testRareTopN() {
152+
givenMapPaths("doc.user.name", "doc.user.city")
153+
.whenCommand(
154+
rareTopN(
155+
DUMMY_CHILD,
156+
RareTopN.CommandType.TOP,
157+
defaultTopArgs(),
158+
List.of(field("doc.user.city")),
159+
field("doc.user.name")))
160+
.shouldProject("doc.user.name", "doc.user.city");
161+
}
162+
163+
@Test
164+
void testStreamWindow() {
165+
givenMapPaths("doc.user.city")
166+
.whenCommand(
167+
new StreamWindow(
168+
List.of(), List.of(field("doc.user.city")), false, 2, true, false, null, null))
169+
.shouldProject("doc.user.city");
170+
}
171+
172+
@Test
173+
void testLookup() {
174+
givenMapPaths("doc.user.name")
175+
.whenCommand(
176+
new Lookup(
177+
null, Map.of("name", "doc.user.name"), Lookup.OutputStrategy.REPLACE, Map.of()))
178+
.shouldProject("doc.user.name");
179+
}
180+
181+
@Test
182+
void testJoinWithFieldList() {
183+
givenMapPaths("doc.user.name")
184+
.whenCommand(
185+
new Join(
186+
DUMMY_CHILD,
187+
Optional.empty(),
188+
Optional.empty(),
189+
Join.JoinType.INNER,
190+
Optional.empty(),
191+
new Join.JoinHint(),
192+
Optional.of(List.of(field("doc.user.name"))),
193+
new ArgumentMap(List.of())))
194+
.shouldProject("doc.user.name");
195+
}
196+
197+
@Test
198+
void testJoinWithoutFieldList() {
199+
givenMapPaths("doc.user.name")
200+
.whenCommand(
201+
new Join(
202+
DUMMY_CHILD,
203+
Optional.empty(),
204+
Optional.empty(),
205+
Join.JoinType.INNER,
206+
Optional.empty(),
207+
new Join.JoinHint(),
208+
Optional.empty(),
209+
new ArgumentMap(List.of())))
210+
.shouldNotProject();
211+
}
212+
142213
// ---- Multiple fields cases ----
143214

144215
@Test
@@ -166,7 +237,7 @@ void testMixedMapAndNonMapFields() {
166237
.shouldProject("doc.user.name");
167238
}
168239

169-
// ---- No-op cases ----
240+
// ---- No-op and edge cases ----
170241

171242
@Test
172243
void testNoOpForFilter() {
@@ -198,6 +269,30 @@ void testNoOpWhenFieldResolvesToNonItemAccess() {
198269
.shouldNotProject();
199270
}
200271

272+
@Test
273+
void testSkipsErrorField() {
274+
givenMapPaths()
275+
.givenErrorPaths("message.process.name", new AssertionError())
276+
.whenCommand(
277+
new Replace(
278+
List.of(new ReplacePair(stringLiteral("a"), stringLiteral("b"))),
279+
Set.of(field("message.process.name"))))
280+
.shouldNotProject();
281+
}
282+
283+
@Test
284+
void testSkipsErrorFieldButProjectsValidMapPath() {
285+
givenMapPaths("doc.user.name")
286+
.givenErrorPaths("bad.field", new AssertionError())
287+
.whenCommand(
288+
fillNull(
289+
DUMMY_CHILD,
290+
List.of(
291+
Pair.of(field("doc.user.name"), stringLiteral("N/A")),
292+
Pair.of(field("bad.field"), stringLiteral("0")))))
293+
.shouldProject("doc.user.name");
294+
}
295+
201296
// ---- Fluent test helper ----
202297

203298
private MapPathAssertion givenMapPaths(String... fieldNames) {
@@ -227,6 +322,11 @@ MapPathAssertion givenNonMapPaths(String... fieldNames) {
227322
return this;
228323
}
229324

325+
MapPathAssertion givenErrorPaths(String fieldName, Throwable error) {
326+
lenient().when(rexVisitor.analyze(fieldMatching(fieldName), eq(context))).thenThrow(error);
327+
return this;
328+
}
329+
230330
MapPathAssertion whenCommand(UnresolvedPlan command) {
231331
materializer.materializePaths(command, context);
232332
return this;

0 commit comments

Comments
 (0)