Skip to content

Commit e30b2f8

Browse files
[Feature] Support bi-directional graph traversal command graphlookup (#5138)
* Support bi-directional graph traversal command `graphlookup` (#5113) * succeed to graph lookup single index Signed-off-by: Lantao Jin <ltjin@amazon.com> * Implement graph lookup RelNode Signed-off-by: Heng Qian <qianheng@amazon.com> * Refine - remove depth from BFS node Signed-off-by: Heng Qian <qianheng@amazon.com> * Support bidirectional mode Signed-off-by: Heng Qian <qianheng@amazon.com> * Support anonymize graph lookup Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix UT Signed-off-by: Heng Qian <qianheng@amazon.com> * Add IT Signed-off-by: Heng Qian <qianheng@amazon.com> * Add limitation for GraphLookup Signed-off-by: Heng Qian <qianheng@amazon.com> * Simplify GraphLookup param names Signed-off-by: Heng Qian <qianheng@amazon.com> * Refine IT Signed-off-by: Heng Qian <qianheng@amazon.com> * Support value of list; Support retrieve circle edges also Signed-off-by: Heng Qian <qianheng@amazon.com> * Add documentation for graph lookup Signed-off-by: Heng Qian <qianheng@amazon.com> * Don't include loop edges Signed-off-by: Heng Qian <qianheng@amazon.com> * Refine code Signed-off-by: Heng Qian <qianheng@amazon.com> * spotlessApply Signed-off-by: Heng Qian <qianheng@amazon.com> * Refine code Signed-off-by: Heng Qian <qianheng@amazon.com> * Filter visited nodes in search query Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix UT Signed-off-by: Heng Qian <qianheng@amazon.com> * Add parameter supportArray for handling fields with array values Signed-off-by: Heng Qian <qianheng@amazon.com> * Remove unused code Signed-off-by: Heng Qian <qianheng@amazon.com> * Support batch mode Signed-off-by: Heng Qian <qianheng@amazon.com> * Close lookup table scan Signed-off-by: Heng Qian <qianheng@amazon.com> * refine code Signed-off-by: Heng Qian <qianheng@amazon.com> * Add param usePIT Signed-off-by: Heng Qian <qianheng@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com> Signed-off-by: Heng Qian <qianheng@amazon.com> Co-authored-by: Lantao Jin <ltjin@amazon.com> * Struct return array value instead of string Signed-off-by: Heng Qian <qianheng@amazon.com> * Support filter in GraphLookup Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix IT Signed-off-by: Heng Qian <qianheng@amazon.com> * Add experimental tag in doc Signed-off-by: Heng Qian <qianheng@amazon.com> * Support filter params in graphlookup (#5134) * Struct return array value instead of string Signed-off-by: Heng Qian <qianheng@amazon.com> * Support filter in GraphLookup Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix IT Signed-off-by: Heng Qian <qianheng@amazon.com> * Add experimental tag in doc Signed-off-by: Heng Qian <qianheng@amazon.com> --------- Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com> Signed-off-by: Heng Qian <qianheng@amazon.com> Co-authored-by: qianheng <qianheng@amazon.com>
1 parent 26912a3 commit e30b2f8

32 files changed

Lines changed: 2914 additions & 10 deletions

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.opensearch.sql.ast.tree.FillNull;
7474
import org.opensearch.sql.ast.tree.Filter;
7575
import org.opensearch.sql.ast.tree.Flatten;
76+
import org.opensearch.sql.ast.tree.GraphLookup;
7677
import org.opensearch.sql.ast.tree.Head;
7778
import org.opensearch.sql.ast.tree.Join;
7879
import org.opensearch.sql.ast.tree.Kmeans;
@@ -558,6 +559,11 @@ public LogicalPlan visitMvExpand(MvExpand node, AnalysisContext context) {
558559
throw getOnlyForCalciteException("mvexpand");
559560
}
560561

562+
@Override
563+
public LogicalPlan visitGraphLookup(GraphLookup node, AnalysisContext context) {
564+
throw getOnlyForCalciteException("graphlookup");
565+
}
566+
561567
/** Build {@link ParseExpression} to context and skip to child nodes. */
562568
@Override
563569
public LogicalPlan visitParse(Parse node, AnalysisContext context) {

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.opensearch.sql.ast.tree.FillNull;
6262
import org.opensearch.sql.ast.tree.Filter;
6363
import org.opensearch.sql.ast.tree.Flatten;
64+
import org.opensearch.sql.ast.tree.GraphLookup;
6465
import org.opensearch.sql.ast.tree.Head;
6566
import org.opensearch.sql.ast.tree.Join;
6667
import org.opensearch.sql.ast.tree.Kmeans;
@@ -485,4 +486,8 @@ public T visitNoMv(NoMv node, C context) {
485486
public T visitMvExpand(MvExpand node, C context) {
486487
return visitChildren(node, context);
487488
}
489+
490+
public T visitGraphLookup(GraphLookup node, C context) {
491+
return visitChildren(node, context);
492+
}
488493
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import javax.annotation.Nullable;
11+
import lombok.AllArgsConstructor;
12+
import lombok.Builder;
13+
import lombok.EqualsAndHashCode;
14+
import lombok.Getter;
15+
import lombok.RequiredArgsConstructor;
16+
import lombok.Setter;
17+
import lombok.ToString;
18+
import org.opensearch.sql.ast.AbstractNodeVisitor;
19+
import org.opensearch.sql.ast.expression.Field;
20+
import org.opensearch.sql.ast.expression.Literal;
21+
import org.opensearch.sql.ast.expression.UnresolvedExpression;
22+
23+
/**
24+
* AST node for graphLookup command. Performs BFS graph traversal on a lookup table.
25+
*
26+
* <p>Example: source=employees | graphLookup employees fromField=manager toField=name maxDepth=3
27+
* depthField=level direction=uni as hierarchy
28+
*/
29+
@Getter
30+
@Setter
31+
@ToString
32+
@EqualsAndHashCode(callSuper = false)
33+
@RequiredArgsConstructor
34+
@AllArgsConstructor
35+
@Builder(toBuilder = true)
36+
public class GraphLookup extends UnresolvedPlan {
37+
/** Direction mode for graph traversal. */
38+
public enum Direction {
39+
/** Unidirectional - traverse edges in one direction only. */
40+
UNI,
41+
/** Bidirectional - traverse edges in both directions. */
42+
BI
43+
}
44+
45+
/** Target table for graph traversal lookup. */
46+
private final UnresolvedPlan fromTable;
47+
48+
/** Field in sourceTable to start with. */
49+
private final Field startField;
50+
51+
/** Field in fromTable that represents the outgoing edge. */
52+
private final Field fromField;
53+
54+
/** Field in input/fromTable to match against for traversal. */
55+
private final Field toField;
56+
57+
/** Output field name for collected traversal results. */
58+
private final Field as;
59+
60+
/** Maximum traversal depth. Zero means no limit. */
61+
private final Literal maxDepth;
62+
63+
/** Optional field name to include recursion depth in output. */
64+
private @Nullable final Field depthField;
65+
66+
/** Direction mode: UNI (default) or BIO for bidirectional. */
67+
private final Direction direction;
68+
69+
/** Whether to support array-typed fields without early filter pushdown. */
70+
private final boolean supportArray;
71+
72+
/** Whether to batch all source start values into a single unified BFS traversal. */
73+
private final boolean batchMode;
74+
75+
/** Whether to use PIT (Point In Time) search for the lookup table to get complete results. */
76+
private final boolean usePIT;
77+
78+
/**
79+
* Optional filter condition to restrict which lookup table documents participate in traversal.
80+
*/
81+
private @Nullable final UnresolvedExpression filter;
82+
83+
private UnresolvedPlan child;
84+
85+
public String getDepthFieldName() {
86+
return depthField == null ? null : depthField.getField().toString();
87+
}
88+
89+
@Override
90+
public UnresolvedPlan attach(UnresolvedPlan child) {
91+
this.child = child;
92+
return this;
93+
}
94+
95+
@Override
96+
public List<UnresolvedPlan> getChild() {
97+
return child == null ? ImmutableList.of() : ImmutableList.of(child);
98+
}
99+
100+
@Override
101+
public <T, C> T accept(AbstractNodeVisitor<T, C> visitor, C context) {
102+
return visitor.visitGraphLookup(this, context);
103+
}
104+
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@
118118
import org.opensearch.sql.ast.tree.FillNull;
119119
import org.opensearch.sql.ast.tree.Filter;
120120
import org.opensearch.sql.ast.tree.Flatten;
121+
import org.opensearch.sql.ast.tree.GraphLookup;
122+
import org.opensearch.sql.ast.tree.GraphLookup.Direction;
121123
import org.opensearch.sql.ast.tree.Head;
122124
import org.opensearch.sql.ast.tree.Join;
123125
import org.opensearch.sql.ast.tree.Kmeans;
@@ -153,6 +155,7 @@
153155
import org.opensearch.sql.ast.tree.Window;
154156
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
155157
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
158+
import org.opensearch.sql.calcite.plan.rel.LogicalGraphLookup;
156159
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
157160
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
158161
import org.opensearch.sql.calcite.utils.BinUtils;
@@ -2539,6 +2542,67 @@ public RelNode visitAddColTotals(AddColTotals node, CalcitePlanContext context)
25392542
context, fieldsToAggregate, false, true, null, labelField, label);
25402543
}
25412544

2545+
@Override
2546+
public RelNode visitGraphLookup(GraphLookup node, CalcitePlanContext context) {
2547+
// 1. Visit source (child) table
2548+
visitChildren(node, context);
2549+
RelBuilder builder = context.relBuilder;
2550+
// TODO: Limit the number of source rows to 100 for now, make it configurable.
2551+
builder.limit(0, 100);
2552+
if (node.isBatchMode()) {
2553+
tryToRemoveMetaFields(context, true);
2554+
}
2555+
RelNode sourceTable = builder.build();
2556+
2557+
// 2. Extract parameters
2558+
String startFieldName = node.getStartField().getField().toString();
2559+
String fromFieldName = node.getFromField().getField().toString();
2560+
String toFieldName = node.getToField().getField().toString();
2561+
String outputFieldName = node.getAs().getField().toString();
2562+
String depthFieldName = node.getDepthFieldName();
2563+
boolean bidirectional = node.getDirection() == Direction.BI;
2564+
2565+
RexLiteral maxDepthNode = (RexLiteral) rexVisitor.analyze(node.getMaxDepth(), context);
2566+
Integer maxDepthValue = maxDepthNode.getValueAs(Integer.class);
2567+
maxDepthValue = maxDepthValue == null ? 0 : maxDepthValue;
2568+
boolean supportArray = node.isSupportArray();
2569+
boolean batchMode = node.isBatchMode();
2570+
boolean usePIT = node.isUsePIT();
2571+
2572+
// 3. Visit and materialize lookup table
2573+
analyze(node.getFromTable(), context);
2574+
tryToRemoveMetaFields(context, true);
2575+
2576+
// 4. Convert filter expression to RexNode against lookup table schema
2577+
RexNode filterRex = null;
2578+
if (node.getFilter() != null) {
2579+
filterRex = rexVisitor.analyze(node.getFilter(), context);
2580+
}
2581+
2582+
RelNode lookupTable = builder.build();
2583+
2584+
// 5. Create LogicalGraphLookup RelNode
2585+
// The conversion rule will extract the OpenSearchIndex from the lookup table
2586+
RelNode graphLookup =
2587+
LogicalGraphLookup.create(
2588+
sourceTable,
2589+
lookupTable,
2590+
startFieldName,
2591+
fromFieldName,
2592+
toFieldName,
2593+
outputFieldName,
2594+
depthFieldName,
2595+
maxDepthValue,
2596+
bidirectional,
2597+
supportArray,
2598+
batchMode,
2599+
usePIT,
2600+
filterRex);
2601+
2602+
builder.push(graphLookup);
2603+
return builder.peek();
2604+
}
2605+
25422606
/**
25432607
* Cast integer sum to long, real/float to double to avoid ClassCastException
25442608
*

0 commit comments

Comments
 (0)