Skip to content

Commit 582b4d2

Browse files
authored
[BugFix] Fix multiple appendpipe error while revisiting parent AST (#5322)
* Fix double appendpipe planner assertion error (#5173) visitAppendPipe re-visits parent AST in new planner context, causing assertion failure. Use relBuilder stack directly instead of AST re-visitation to stay within the same planner context. Signed-off-by: Songkan Tang <songkant@amazon.com> * Add regression tests for chained appendpipe (double and triple) Signed-off-by: Songkan Tang <songkant@amazon.com> * Minor change of assertion on rows Signed-off-by: Songkan Tang <songkant@amazon.com> * Fix chained appendpipe planner mismatch (#5173) Chained appendpipe queries can produce literal-only projections during prepare-time field trimming. Calcite may simplify those projections into Values, which can trigger planner mismatch assertions during execution. Preserve the Project shape for this narrow case in OpenSearchRelFieldTrimmer and add YAML REST regression coverage for double and triple appendpipe. Signed-off-by: Songkan Tang <songkant@amazon.com> * Use consistent prepareStatement to resolve the issue Signed-off-by: Songkan Tang <songkant@amazon.com> * Preserve agg hint after reusing cluster during prepare and optimize Signed-off-by: Songkan Tang <songkant@amazon.com> --------- Signed-off-by: Songkan Tang <songkant@amazon.com>
1 parent 8c40c39 commit 582b4d2

4 files changed

Lines changed: 381 additions & 3 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.calcite.plan.RelOptSchema;
6464
import org.apache.calcite.plan.RelOptTable;
6565
import org.apache.calcite.plan.RelOptTable.ViewExpander;
66+
import org.apache.calcite.plan.RelOptUtil;
6667
import org.apache.calcite.plan.hep.HepPlanner;
6768
import org.apache.calcite.plan.hep.HepProgram;
6869
import org.apache.calcite.plan.hep.HepProgramBuilder;
@@ -74,6 +75,7 @@
7475
import org.apache.calcite.rel.RelRoot;
7576
import org.apache.calcite.rel.RelShuttle;
7677
import org.apache.calcite.rel.core.TableScan;
78+
import org.apache.calcite.rel.hint.HintStrategyTable;
7779
import org.apache.calcite.rel.logical.LogicalTableScan;
7880
import org.apache.calcite.rel.rules.FilterMergeRule;
7981
import org.apache.calcite.rel.type.RelDataType;
@@ -367,6 +369,36 @@ protected SqlToRelConverter getSqlToRelConverter(
367369
return new OpenSearchSqlToRelConverter(
368370
this, validator, catalogReader, this.cluster, convertletTable, config);
369371
}
372+
373+
@Override
374+
protected RelRoot trimUnusedFields(RelRoot root) {
375+
final SqlToRelConverter.Config config =
376+
SqlToRelConverter.config()
377+
.withTrimUnusedFields(shouldTrim(root.rel))
378+
.withExpand(THREAD_EXPAND.get())
379+
.withInSubQueryThreshold(requireNonNull(THREAD_INSUBQUERY_THRESHOLD.get()));
380+
// PPL analyzes into a pre-built RelNode before prepareStatement(rel). Reuse the incoming
381+
// RelNode's cluster here so prepare-time trimming does not create replacement nodes under a
382+
// different planner than the rest of the tree.
383+
final SqlToRelConverter converter =
384+
new OpenSearchSqlToRelConverter(
385+
this,
386+
getSqlValidator(),
387+
catalogReader,
388+
root.rel.getCluster(),
389+
convertletTable,
390+
config);
391+
final boolean ordered = !root.collation.getFieldCollations().isEmpty();
392+
final boolean dml = SqlKind.DML.contains(root.kind);
393+
return root.withRel(converter.trimUnusedFields(dml || ordered, root.rel));
394+
}
395+
396+
private static boolean shouldTrim(RelNode rootRel) {
397+
// For now, don't trim if there are more than 3 joins. The projects
398+
// near the leaves created by trim migrate past joins and seem to
399+
// prevent join-reordering.
400+
return THREAD_TRIM.get() || RelOptUtil.countJoins(rootRel) < 2;
401+
}
370402
}
371403

372404
public static class OpenSearchSqlToRelConverter extends SqlToRelConverter {
@@ -379,22 +411,51 @@ public OpenSearchSqlToRelConverter(
379411
RelOptCluster cluster,
380412
SqlRexConvertletTable convertletTable,
381413
Config config) {
382-
super(viewExpander, validator, catalogReader, cluster, convertletTable, config);
414+
this(
415+
viewExpander,
416+
validator,
417+
catalogReader,
418+
cluster,
419+
convertletTable,
420+
preserveHintStrategies(cluster, config),
421+
true);
422+
}
423+
424+
private OpenSearchSqlToRelConverter(
425+
ViewExpander viewExpander,
426+
@Nullable SqlValidator validator,
427+
CatalogReader catalogReader,
428+
RelOptCluster cluster,
429+
SqlRexConvertletTable convertletTable,
430+
Config effectiveConfig,
431+
boolean ignored) {
432+
super(viewExpander, validator, catalogReader, cluster, convertletTable, effectiveConfig);
383433
this.relBuilder =
384-
config
434+
effectiveConfig
385435
.getRelBuilderFactory()
386436
.create(
387437
cluster,
388438
validator != null
389439
? validator.getCatalogReader().unwrap(RelOptSchema.class)
390440
: null)
391-
.transform(config.getRelBuilderConfigTransform());
441+
.transform(effectiveConfig.getRelBuilderConfigTransform());
392442
}
393443

394444
@Override
395445
protected RelFieldTrimmer newFieldTrimmer() {
396446
return new OpenSearchRelFieldTrimmer(validator, this.relBuilder);
397447
}
448+
449+
// SqlToRelConverter always installs the hint strategy table from its config onto the cluster.
450+
// When prepare-time trimming reuses an incoming RelNode cluster, preserve any PPL-specific
451+
// aggregate hint strategies that were already registered during analysis.
452+
private static Config preserveHintStrategies(RelOptCluster cluster, Config config) {
453+
if (config.getHintStrategyTable() == HintStrategyTable.EMPTY
454+
&& cluster.getHintStrategies() != HintStrategyTable.EMPTY) {
455+
return config.withHintStrategyTable(cluster.getHintStrategies());
456+
}
457+
return config;
458+
}
398459
}
399460

400461
public static class OpenSearchRelRunners {

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,74 @@ public void testAppendpipeWithConflictTypeColumn() throws IOException {
8787
TEST_INDEX_ACCOUNT)));
8888
assertTrue(exception.getMessage().contains("due to incompatible types"));
8989
}
90+
91+
/** Regression test: double appendpipe with different aggregations (issue #5173). */
92+
@Test
93+
public void testDoubleAppendPipe() throws IOException {
94+
JSONObject actual =
95+
executeQuery(
96+
String.format(
97+
Locale.ROOT,
98+
"source=%s | stats sum(age) as sum_age by gender"
99+
+ " | appendpipe [ stats avg(sum_age) as avg_sum_age ]"
100+
+ " | appendpipe [ stats max(sum_age) as max_sum_age ]",
101+
TEST_INDEX_ACCOUNT));
102+
verifySchemaInOrder(
103+
actual,
104+
schema("sum_age", "bigint"),
105+
schema("gender", "string"),
106+
schema("avg_sum_age", "double"),
107+
schema("max_sum_age", "bigint"));
108+
// 2 original rows + 1 avg row + 1 max row
109+
verifyDataRows(
110+
actual,
111+
rows(14947, "F", null, null),
112+
rows(15224, "M", null, null),
113+
rows(null, null, 15085.5, null),
114+
rows(null, null, null, 15224));
115+
}
116+
117+
/** Regression test: triple appendpipe with different aggregations (issue #5173). */
118+
@Test
119+
public void testTripleAppendPipe() throws IOException {
120+
JSONObject actual =
121+
executeQuery(
122+
String.format(
123+
Locale.ROOT,
124+
"source=%s | stats sum(age) as sum_age by gender"
125+
+ " | appendpipe [ stats avg(sum_age) as avg_sum_age ]"
126+
+ " | appendpipe [ stats max(sum_age) as max_sum_age ]"
127+
+ " | appendpipe [ stats min(sum_age) as min_sum_age ]",
128+
TEST_INDEX_ACCOUNT));
129+
verifySchemaInOrder(
130+
actual,
131+
schema("sum_age", "bigint"),
132+
schema("gender", "string"),
133+
schema("avg_sum_age", "double"),
134+
schema("max_sum_age", "bigint"),
135+
schema("min_sum_age", "bigint"));
136+
// 2 original rows + 1 avg + 1 max + 1 min
137+
verifyDataRows(
138+
actual,
139+
rows(14947, "F", null, null, null),
140+
rows(15224, "M", null, null, null),
141+
rows(null, null, 15085.5, null, null),
142+
rows(null, null, null, 15224, null),
143+
rows(null, null, null, null, 14947));
144+
}
145+
146+
/** Regression test: double appendpipe with non-aggregation (filter) subpipeline. */
147+
@Test
148+
public void testDoubleAppendPipeWithFilter() throws IOException {
149+
JSONObject actual =
150+
executeQuery(
151+
String.format(
152+
Locale.ROOT,
153+
"source=%s | stats sum(age) as sum_age by gender"
154+
+ " | appendpipe [ where gender = 'F' ]"
155+
+ " | appendpipe [ where gender = 'M' ]",
156+
TEST_INDEX_ACCOUNT));
157+
// 2 original + 1 (F filter from original) + 1 (M filter from cumulative 3 rows)
158+
verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(14947, "F"), rows(15224, "M"));
159+
}
90160
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
setup:
2+
- do:
3+
query.settings:
4+
body:
5+
transient:
6+
plugins.calcite.enabled: true
7+
8+
- do:
9+
indices.create:
10+
index: issue5173
11+
body:
12+
settings:
13+
number_of_shards: 1
14+
number_of_replicas: 0
15+
mappings:
16+
properties:
17+
gender:
18+
type: keyword
19+
age:
20+
type: integer
21+
22+
- do:
23+
bulk:
24+
refresh: true
25+
body:
26+
- '{"index": {"_index": "issue5173", "_id": "1"}}'
27+
- '{"gender": "F", "age": 10}'
28+
- '{"index": {"_index": "issue5173", "_id": "2"}}'
29+
- '{"gender": "F", "age": 20}'
30+
- '{"index": {"_index": "issue5173", "_id": "3"}}'
31+
- '{"gender": "M", "age": 30}'
32+
- '{"index": {"_index": "issue5173", "_id": "4"}}'
33+
- '{"gender": "M", "age": 40}'
34+
35+
---
36+
teardown:
37+
- do:
38+
indices.delete:
39+
index: issue5173
40+
ignore_unavailable: true
41+
- do:
42+
query.settings:
43+
body:
44+
transient:
45+
plugins.calcite.enabled: false
46+
47+
---
48+
"Issue 5173: double appendpipe with different aggregations should succeed":
49+
- skip:
50+
features:
51+
- headers
52+
- do:
53+
headers:
54+
Content-Type: 'application/json'
55+
ppl:
56+
body:
57+
query: "source=issue5173 | stats sum(age) as sum_age by gender | appendpipe [ stats avg(sum_age) as avg_sum_age ] | appendpipe [ stats max(sum_age) as max_sum_age ]"
58+
59+
- match: { total: 4 }
60+
- match:
61+
schema:
62+
- { name: sum_age, type: bigint }
63+
- { name: gender, type: string }
64+
- { name: avg_sum_age, type: double }
65+
- { name: max_sum_age, type: bigint }
66+
- match:
67+
datarows:
68+
- [ 30, "F", null, null ]
69+
- [ 70, "M", null, null ]
70+
- [ null, null, 50.0, null ]
71+
- [ null, null, null, 70 ]
72+
73+
---
74+
"Issue 5173: triple appendpipe with different aggregations should succeed":
75+
- skip:
76+
features:
77+
- headers
78+
- do:
79+
headers:
80+
Content-Type: 'application/json'
81+
ppl:
82+
body:
83+
query: "source=issue5173 | stats sum(age) as sum_age by gender | appendpipe [ stats avg(sum_age) as avg_sum_age ] | appendpipe [ stats max(sum_age) as max_sum_age ] | appendpipe [ stats min(sum_age) as min_sum_age ]"
84+
85+
- match: { total: 5 }
86+
- match:
87+
schema:
88+
- { name: sum_age, type: bigint }
89+
- { name: gender, type: string }
90+
- { name: avg_sum_age, type: double }
91+
- { name: max_sum_age, type: bigint }
92+
- { name: min_sum_age, type: bigint }
93+
- match:
94+
datarows:
95+
- [ 30, "F", null, null, null ]
96+
- [ 70, "M", null, null, null ]
97+
- [ null, null, 50.0, null, null ]
98+
- [ null, null, null, 70, null ]
99+
- [ null, null, null, null, 30 ]

0 commit comments

Comments
 (0)