Skip to content

Commit 07629d4

Browse files
authored
Support profile option for PPL - Part II Implement operator level metrics (#5044)
Signed-off-by: Peng Huo <penghuo@gmail.com>
1 parent 66529d9 commit 07629d4

17 files changed

Lines changed: 693 additions & 27 deletions

File tree

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.profile;
7+
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.Objects;
11+
import java.util.stream.Collectors;
12+
import org.apache.calcite.adapter.enumerable.EnumerableRel;
13+
import org.apache.calcite.rel.RelNode;
14+
import org.opensearch.sql.calcite.plan.Scannable;
15+
import org.opensearch.sql.monitor.profile.ProfilePlanNode;
16+
17+
/** Builds a profiled EnumerableRel plan tree and matching plan node structure. */
18+
public final class PlanProfileBuilder {
19+
20+
private PlanProfileBuilder() {}
21+
22+
public static ProfilePlan profile(RelNode root) {
23+
Objects.requireNonNull(root, "root");
24+
return profileRel(root);
25+
}
26+
27+
private static ProfilePlan profileRel(RelNode rel) {
28+
List<ProfilePlan> childPlans = new ArrayList<>();
29+
for (RelNode input : rel.getInputs()) {
30+
childPlans.add(profileRel(input));
31+
}
32+
33+
List<RelNode> newInputs =
34+
childPlans.stream().map(ProfilePlan::rel).collect(Collectors.toList());
35+
List<ProfilePlanNode> childNodes =
36+
childPlans.stream().map(ProfilePlan::planRoot).collect(Collectors.toList());
37+
38+
ProfilePlanNode planNode = new ProfilePlanNode(nodeName(rel), childNodes);
39+
RelNode wrappedRel = wrap(rel, newInputs, planNode);
40+
return new ProfilePlan(wrappedRel, planNode);
41+
}
42+
43+
private static RelNode wrap(RelNode rel, List<RelNode> inputs, ProfilePlanNode planNode) {
44+
if (!(rel instanceof EnumerableRel)) {
45+
try {
46+
return rel.copy(rel.getTraitSet(), inputs);
47+
} catch (UnsupportedOperationException e) {
48+
return rel;
49+
}
50+
}
51+
if (rel instanceof Scannable) {
52+
return new ProfileScannableRel((EnumerableRel) rel, inputs, planNode);
53+
}
54+
return new ProfileEnumerableRel((EnumerableRel) rel, inputs, planNode);
55+
}
56+
57+
private static String nodeName(RelNode rel) {
58+
return rel.getRelTypeName();
59+
}
60+
61+
/** Pair of the profiled RelNode tree and its root plan node. */
62+
public static final class ProfilePlan {
63+
private final RelNode rel;
64+
private final ProfilePlanNode planRoot;
65+
66+
public ProfilePlan(RelNode rel, ProfilePlanNode planRoot) {
67+
this.rel = rel;
68+
this.planRoot = planRoot;
69+
}
70+
71+
public RelNode rel() {
72+
return rel;
73+
}
74+
75+
public ProfilePlanNode planRoot() {
76+
return planRoot;
77+
}
78+
}
79+
}
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.profile;
7+
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.Objects;
11+
import org.apache.calcite.adapter.enumerable.EnumerableRel;
12+
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
13+
import org.apache.calcite.linq4j.AbstractEnumerable;
14+
import org.apache.calcite.linq4j.Enumerable;
15+
import org.apache.calcite.linq4j.Enumerator;
16+
import org.apache.calcite.linq4j.tree.BlockBuilder;
17+
import org.apache.calcite.linq4j.tree.BlockStatement;
18+
import org.apache.calcite.linq4j.tree.Expression;
19+
import org.apache.calcite.linq4j.tree.Expressions;
20+
import org.apache.calcite.linq4j.tree.GotoExpressionKind;
21+
import org.apache.calcite.linq4j.tree.GotoStatement;
22+
import org.apache.calcite.linq4j.tree.Statement;
23+
import org.apache.calcite.plan.RelTraitSet;
24+
import org.apache.calcite.rel.AbstractRelNode;
25+
import org.apache.calcite.rel.RelNode;
26+
import org.apache.calcite.rel.RelWriter;
27+
import org.apache.calcite.rel.type.RelDataType;
28+
import org.opensearch.sql.monitor.profile.ProfilePlanNode;
29+
import org.opensearch.sql.monitor.profile.ProfilePlanNodeMetrics;
30+
31+
/** EnumerableRel wrapper that records inclusive time and row counts. */
32+
public class ProfileEnumerableRel extends AbstractRelNode implements EnumerableRel {
33+
34+
private final EnumerableRel delegate;
35+
private final List<RelNode> inputs;
36+
protected final ProfilePlanNode planNode;
37+
38+
public ProfileEnumerableRel(
39+
EnumerableRel delegate, List<RelNode> inputs, ProfilePlanNode planNode) {
40+
super(
41+
Objects.requireNonNull(delegate, "delegate").getCluster(),
42+
Objects.requireNonNull(delegate, "delegate").getTraitSet());
43+
this.delegate = delegate;
44+
this.inputs = new ArrayList<>(Objects.requireNonNull(inputs, "inputs"));
45+
this.planNode = Objects.requireNonNull(planNode, "planNode");
46+
}
47+
48+
@Override
49+
public List<RelNode> getInputs() {
50+
return inputs;
51+
}
52+
53+
@Override
54+
public void replaceInput(int ordinalInParent, RelNode p) {
55+
inputs.set(ordinalInParent, p);
56+
}
57+
58+
@Override
59+
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
60+
return new ProfileEnumerableRel(delegate, inputs, planNode);
61+
}
62+
63+
@Override
64+
protected RelDataType deriveRowType() {
65+
return delegate.getRowType();
66+
}
67+
68+
@Override
69+
public String getRelTypeName() {
70+
return delegate.getRelTypeName();
71+
}
72+
73+
@Override
74+
public RelWriter explainTerms(RelWriter pw) {
75+
RelWriter writer = pw;
76+
for (RelNode input : inputs) {
77+
writer = writer.input("input", input);
78+
}
79+
return writer;
80+
}
81+
82+
@Override
83+
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
84+
EnumerableRel rewritten = (EnumerableRel) delegate.copy(delegate.getTraitSet(), inputs);
85+
Result result = rewritten.implement(implementor, pref);
86+
return new Result(
87+
wrapBlock(
88+
result.block, implementor.stash(planNode.metrics(), ProfilePlanNodeMetrics.class)),
89+
result.physType,
90+
result.format);
91+
}
92+
93+
/**
94+
* Rewrite the generated block by wrapping the returned Enumerable with profiling.
95+
*
96+
* <p>Example (simplified):
97+
*
98+
* <pre>
99+
* // Before:
100+
* {
101+
* final Enumerable<Object[]> input = ...;
102+
* return input;
103+
* }
104+
*
105+
* // After:
106+
* {
107+
* final Enumerable<Object[]> input = ...;
108+
* return ProfileEnumerableRel.profile(input, metrics);
109+
* }
110+
* </pre>
111+
*/
112+
private static BlockStatement wrapBlock(BlockStatement block, Expression metricsExpression) {
113+
List<Statement> statements = block.statements;
114+
if (statements.isEmpty()) {
115+
return block;
116+
}
117+
Statement last = statements.get(statements.size() - 1);
118+
// Expect Calcite to end blocks with a return GotoStatement; skip profiling if it doesn't.
119+
if (!(last instanceof GotoStatement)) {
120+
return block;
121+
}
122+
GotoStatement gotoStatement = (GotoStatement) last;
123+
// Only rewrite blocks that return an expression; otherwise keep the original block.
124+
if (gotoStatement.kind != GotoExpressionKind.Return || gotoStatement.expression == null) {
125+
return block;
126+
}
127+
Expression profiled =
128+
Expressions.call(
129+
ProfileEnumerableRel.class, "profile", gotoStatement.expression, metricsExpression);
130+
BlockBuilder builder = new BlockBuilder();
131+
for (int i = 0; i < statements.size() - 1; i++) {
132+
builder.add(statements.get(i));
133+
}
134+
builder.add(Expressions.return_(gotoStatement.labelTarget, profiled));
135+
return builder.toBlock();
136+
}
137+
138+
public static <T> Enumerable<T> profile(
139+
Enumerable<T> enumerable, ProfilePlanNodeMetrics metrics) {
140+
if (metrics == null) {
141+
return enumerable;
142+
}
143+
return new AbstractEnumerable<>() {
144+
@Override
145+
public Enumerator<T> enumerator() {
146+
long start = System.nanoTime();
147+
Enumerator<T> delegate = enumerable.enumerator();
148+
metrics.addTimeNanos(System.nanoTime() - start);
149+
return new ProfileEnumerator<>(delegate, metrics);
150+
}
151+
};
152+
}
153+
154+
private static final class ProfileEnumerator<T> implements Enumerator<T> {
155+
private final Enumerator<T> delegate;
156+
private final ProfilePlanNodeMetrics metrics;
157+
158+
private ProfileEnumerator(Enumerator<T> delegate, ProfilePlanNodeMetrics metrics) {
159+
this.delegate = delegate;
160+
this.metrics = metrics;
161+
}
162+
163+
@Override
164+
public T current() {
165+
return delegate.current();
166+
}
167+
168+
@Override
169+
public boolean moveNext() {
170+
long start = System.nanoTime();
171+
try {
172+
boolean hasNext = delegate.moveNext();
173+
if (hasNext) {
174+
metrics.incrementRows();
175+
}
176+
return hasNext;
177+
} finally {
178+
metrics.addTimeNanos(System.nanoTime() - start);
179+
}
180+
}
181+
182+
@Override
183+
public void reset() {
184+
delegate.reset();
185+
}
186+
187+
@Override
188+
public void close() {
189+
long start = System.nanoTime();
190+
try {
191+
delegate.close();
192+
} finally {
193+
metrics.addTimeNanos(System.nanoTime() - start);
194+
}
195+
}
196+
}
197+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.profile;
7+
8+
import java.util.List;
9+
import org.apache.calcite.adapter.enumerable.EnumerableRel;
10+
import org.apache.calcite.linq4j.Enumerable;
11+
import org.apache.calcite.rel.RelNode;
12+
import org.checkerframework.checker.nullness.qual.Nullable;
13+
import org.opensearch.sql.calcite.plan.Scannable;
14+
import org.opensearch.sql.monitor.profile.ProfilePlanNode;
15+
16+
/** EnumerableRel wrapper that also supports Scannable plans. */
17+
public final class ProfileScannableRel extends ProfileEnumerableRel implements Scannable {
18+
19+
private final Scannable scannable;
20+
21+
public ProfileScannableRel(
22+
EnumerableRel delegate, List<RelNode> inputs, ProfilePlanNode planNode) {
23+
super(delegate, inputs, planNode);
24+
this.scannable = (Scannable) delegate;
25+
}
26+
27+
@Override
28+
public Enumerable<@Nullable Object> scan() {
29+
return profile(scannable.scan(), planNode.metrics());
30+
}
31+
}

core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@
103103
import org.opensearch.sql.calcite.plan.Scannable;
104104
import org.opensearch.sql.calcite.plan.rule.OpenSearchRules;
105105
import org.opensearch.sql.calcite.plan.rule.PPLSimplifyDedupRule;
106+
import org.opensearch.sql.calcite.profile.PlanProfileBuilder;
106107
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
108+
import org.opensearch.sql.monitor.profile.ProfileContext;
107109
import org.opensearch.sql.monitor.profile.ProfileMetric;
108110
import org.opensearch.sql.monitor.profile.QueryProfiling;
109111

@@ -318,6 +320,12 @@ public OpenSearchCalcitePreparingStmt(
318320

319321
@Override
320322
protected PreparedResult implement(RelRoot root) {
323+
ProfileContext profileContext = QueryProfiling.current();
324+
if (profileContext.isEnabled()) {
325+
PlanProfileBuilder.ProfilePlan plan = PlanProfileBuilder.profile(root.rel);
326+
profileContext.setPlanRoot(plan.planRoot());
327+
root = root.withRel(plan.rel());
328+
}
321329
if (root.rel instanceof Scannable scannable) {
322330
Hook.PLAN_BEFORE_IMPLEMENTATION.run(root);
323331
RelDataType resultType = root.rel.getRowType();

core/src/main/java/org/opensearch/sql/monitor/profile/DefaultProfileContext.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,30 @@ public class DefaultProfileContext implements ProfileContext {
1616
private final long startNanos = System.nanoTime();
1717
private boolean finished;
1818
private final Map<MetricName, DefaultMetricImpl> metrics = new ConcurrentHashMap<>();
19+
private ProfilePlanNode planRoot;
1920
private QueryProfile profile;
2021

2122
public DefaultProfileContext() {}
2223

24+
@Override
25+
public boolean isEnabled() {
26+
return true;
27+
}
28+
2329
/** {@inheritDoc} */
2430
@Override
2531
public ProfileMetric getOrCreateMetric(MetricName name) {
2632
Objects.requireNonNull(name, "name");
2733
return metrics.computeIfAbsent(name, key -> new DefaultMetricImpl(key.name()));
2834
}
2935

36+
@Override
37+
public synchronized void setPlanRoot(ProfilePlanNode planRoot) {
38+
if (this.planRoot == null) {
39+
this.planRoot = planRoot;
40+
}
41+
}
42+
3043
/** {@inheritDoc} */
3144
@Override
3245
public synchronized QueryProfile finish() {
@@ -38,15 +51,12 @@ public synchronized QueryProfile finish() {
3851
Map<MetricName, Double> snapshot = new LinkedHashMap<>(MetricName.values().length);
3952
for (MetricName metricName : MetricName.values()) {
4053
DefaultMetricImpl metric = metrics.get(metricName);
41-
double millis = metric == null ? 0d : roundToMillis(metric.value());
54+
double millis = metric == null ? 0d : ProfileUtils.roundToMillis(metric.value());
4255
snapshot.put(metricName, millis);
4356
}
44-
double totalMillis = roundToMillis(endNanos - startNanos);
45-
profile = new QueryProfile(totalMillis, snapshot);
57+
double totalMillis = ProfileUtils.roundToMillis(endNanos - startNanos);
58+
QueryProfile.PlanNode planSnapshot = planRoot == null ? null : planRoot.snapshot();
59+
profile = new QueryProfile(totalMillis, snapshot, planSnapshot);
4660
return profile;
4761
}
48-
49-
private double roundToMillis(long nanos) {
50-
return Math.round((nanos / 1_000_000.0d) * 100.0d) / 100.0d;
51-
}
5262
}

0 commit comments

Comments
 (0)