Skip to content

Commit b683034

Browse files
committed
multiple join
1 parent df2d178 commit b683034

7 files changed

Lines changed: 357 additions & 22 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public class Main {
101101

102102
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
103103

104-
private static Config config = org.apache.calcite.sql.parser.SqlParser
104+
public static Config config = org.apache.calcite.sql.parser.SqlParser
105105
.configBuilder()
106106
.setLex(Lex.MYSQL)
107107
.build();

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2626

2727
import java.io.Serializable;
28+
import java.util.Map;
2829

2930
/**
3031
* Join信息
@@ -41,6 +42,8 @@ public class JoinInfo implements Serializable {
4142
//左表是否是维表
4243
private boolean leftIsSideTable;
4344

45+
private boolean leftIsMidTable;
46+
4447
//右表是否是维表
4548
private boolean rightIsSideTable;
4649

@@ -63,6 +66,8 @@ public class JoinInfo implements Serializable {
6366
private SqlNode selectNode;
6467

6568
private JoinType joinType;
69+
// 左边是中间转换表,做表映射关系,给替换属性名称使用
70+
private Map<String, String> leftTabMapping;
6671

6772
public String getSideTableName(){
6873
if(leftIsSideTable){
@@ -87,6 +92,22 @@ public String getNewTableName(){
8792
return leftStr + "_" + rightTableName;
8893
}
8994

95+
public boolean isLeftIsMidTable() {
96+
return leftIsMidTable;
97+
}
98+
99+
public void setLeftIsMidTable(boolean leftIsMidTable) {
100+
this.leftIsMidTable = leftIsMidTable;
101+
}
102+
103+
public Map<String, String> getLeftTabMapping() {
104+
return leftTabMapping;
105+
}
106+
107+
public void setLeftTabMapping(Map<String, String> leftTabMapping) {
108+
this.leftTabMapping = leftTabMapping;
109+
}
110+
90111
public String getNewTableAlias(){
91112
return leftTableAlias + "_" + rightTableAlias;
92113
}

core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,12 @@
4141

4242
public class ParserJoinField {
4343

44+
4445
/**
45-
* Need to parse the fields of information and where selectlist
46+
* build row by field
47+
* @param sqlNode select node
48+
* @param scope join left and right table all info
49+
* @param getAll true,get all fields from two tables; false, extract useful field from select node
4650
* @return
4751
*/
4852
public static List<FieldInfo> getRowTypeInfo(SqlNode sqlNode, JoinScope scope, boolean getAll){

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 156 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23+
import com.dtstack.flink.sql.Main;
24+
import com.dtstack.flink.sql.util.ParseUtils;
2325
import org.apache.calcite.config.Lex;
2426
import org.apache.calcite.sql.JoinType;
2527
import org.apache.calcite.sql.SqlAsOperator;
@@ -38,9 +40,16 @@
3840
import org.apache.calcite.sql.parser.SqlParseException;
3941
import org.apache.calcite.sql.parser.SqlParser;
4042
import org.apache.calcite.sql.parser.SqlParserPos;
43+
import org.apache.commons.collections.CollectionUtils;
4144
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
45+
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
46+
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
4247
import org.apache.flink.calcite.shaded.com.google.common.collect.Queues;
48+
import org.slf4j.Logger;
49+
import org.slf4j.LoggerFactory;
4350

51+
import java.util.List;
52+
import java.util.Map;
4453
import java.util.Queue;
4554
import java.util.Set;
4655

@@ -54,6 +63,11 @@
5463
*/
5564

5665
public class SideSQLParser {
66+
private static final Logger LOG = LoggerFactory.getLogger(SideSQLParser.class);
67+
68+
private final char SPLIT = '_';
69+
70+
private String tempSQL = "SELECT * FROM TMP";
5771

5872
public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws SqlParseException {
5973
System.out.println("---exeSql---");
@@ -63,7 +77,7 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws
6377
.configBuilder()
6478
.setLex(Lex.MYSQL)
6579
.build();
66-
SqlParser sqlParser = SqlParser.create(exeSql,config);
80+
SqlParser sqlParser = SqlParser.create(exeSql,Main.config);
6781
SqlNode sqlNode = sqlParser.parseStmt();
6882
parseSql(sqlNode, sideTableSet, queueInfo);
6983
queueInfo.offer(sqlNode);
@@ -143,18 +157,37 @@ private Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
143157
return "";
144158
}
145159

146-
private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<Object> queueInfo){
160+
private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<Object> queueInfo) {
147161
SqlNode leftNode = joinNode.getLeft();
148162
SqlNode rightNode = joinNode.getRight();
149163
JoinType joinType = joinNode.getJoinType();
150164
String leftTbName = "";
151165
String leftTbAlias = "";
166+
Map<String, String> midTableMapping = null ;
167+
boolean leftIsMidTable = false;
168+
152169

153170
if(leftNode.getKind() == IDENTIFIER){
154171
leftTbName = leftNode.toString();
155172
}else if(leftNode.getKind() == JOIN){
156-
Object leftNodeJoinInfo = parseSql(leftNode, sideTableSet, queueInfo);
157-
System.out.println(leftNodeJoinInfo);
173+
JoinInfo leftNodeJoinInfo = (JoinInfo)parseSql(leftNode, sideTableSet, queueInfo);//解析多JOIN
174+
// select * from xxx
175+
SqlNode sqlNode = buildSelectByLeftNode(leftNode);
176+
// ( select * from xxx) as xxx_0
177+
SqlBasicCall newAsNode = buildAsNodeByJoinInfo(leftNodeJoinInfo, sqlNode);
178+
179+
leftNode = newAsNode;
180+
181+
joinNode.setLeft(leftNode);
182+
183+
leftIsMidTable = true;
184+
185+
midTableMapping = saveTabMapping(leftNodeJoinInfo);
186+
187+
AliasInfo aliasInfo = (AliasInfo) parseSql(newAsNode, sideTableSet, queueInfo);
188+
leftTbName = aliasInfo.getName();
189+
leftTbAlias = aliasInfo.getAlias();
190+
158191
}else if(leftNode.getKind() == AS){
159192
AliasInfo aliasInfo = (AliasInfo) parseSql(leftNode, sideTableSet, queueInfo);
160193
leftTbName = aliasInfo.getName();
@@ -184,6 +217,12 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
184217
throw new RuntimeException("side join not support join type of right[current support inner join and left join]");
185218
}
186219

220+
if (leftIsMidTable) {
221+
// 替换右边 on语句 中的字段别名
222+
SqlNode afterReplaceNameCondition = ParseUtils.replaceJoinConditionTabName(joinNode.getCondition(), midTableMapping);
223+
joinNode.setOperand(5, afterReplaceNameCondition);
224+
}
225+
187226
JoinInfo tableInfo = new JoinInfo();
188227
tableInfo.setLeftTableName(leftTbName);
189228
tableInfo.setRightTableName(rightTableName);
@@ -204,11 +243,86 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
204243
tableInfo.setJoinType(joinType);
205244
tableInfo.setCondition(joinNode.getCondition());
206245

246+
tableInfo.setLeftIsMidTable(leftIsMidTable);
247+
tableInfo.setLeftTabMapping(midTableMapping);
248+
207249
return tableInfo;
208250
}
209251

252+
private Map<String, String> saveTabMapping(JoinInfo leftNodeJoinInfo) {
253+
Map<String, String> midTableMapping;
254+
255+
String midTab = buidTableName(leftNodeJoinInfo.getLeftTableAlias(), SPLIT, leftNodeJoinInfo.getRightTableAlias());
256+
midTab += "_0";
257+
258+
midTableMapping = Maps.newHashMap();
259+
260+
if(leftNodeJoinInfo.isLeftIsMidTable()) {
261+
midTableMapping.putAll(leftNodeJoinInfo.getLeftTabMapping());
262+
}
263+
264+
midTableMapping.put(leftNodeJoinInfo.getLeftTableAlias(), midTab);
265+
midTableMapping.put(leftNodeJoinInfo.getRightTableAlias(), midTab);
266+
return midTableMapping;
267+
}
268+
269+
private SqlNode buildSelectByLeftNode(SqlNode leftNode) {
270+
SqlParser sqlParser = SqlParser.create(tempSQL, Main.config);
271+
SqlNode sqlNode = null;
272+
try {
273+
sqlNode = sqlParser.parseStmt();
274+
}catch (Exception e) {
275+
LOG.error("tmp sql parse error..", e);
276+
}
277+
278+
((SqlSelect) sqlNode).setFrom(leftNode);
279+
return sqlNode;
280+
}
281+
210282

211283
private void dealSelectResultWithJoinInfo(JoinInfo joinInfo, SqlSelect sqlNode, Queue<Object> queueInfo){
284+
// 中间虚拟表进行表名称替换
285+
if (joinInfo.isLeftIsMidTable()){
286+
SqlNode whereNode = sqlNode.getWhere();
287+
SqlNodeList sqlGroup = sqlNode.getGroup();
288+
SqlNodeList sqlSelectList = sqlNode.getSelectList();
289+
List<SqlNode> newSelectNodeList = Lists.newArrayList();
290+
291+
for( int i=0; i<sqlSelectList.getList().size(); i++){
292+
SqlNode selectNode = sqlSelectList.getList().get(i);
293+
294+
SqlNode replaceNode = ParseUtils.replaceSelectFieldTabName(selectNode, joinInfo.getLeftTabMapping());
295+
296+
if(replaceNode == null){
297+
continue;
298+
}
299+
//sqlSelectList.set(i, replaceNode);
300+
newSelectNodeList.add(replaceNode);
301+
}
302+
303+
SqlNodeList newSelectList = new SqlNodeList(newSelectNodeList, sqlSelectList.getParserPosition());
304+
sqlNode.setSelectList(newSelectList);
305+
306+
307+
//where
308+
if(whereNode != null){
309+
SqlNode[] sqlNodeList = ((SqlBasicCall)whereNode).getOperands();
310+
for(int i =0; i<sqlNodeList.length; i++) {
311+
SqlNode whereSqlNode = sqlNodeList[i];
312+
SqlNode replaceNode = ParseUtils.replaceNodeInfo(whereSqlNode, joinInfo.getLeftTabMapping());
313+
sqlNodeList[i] = replaceNode;
314+
}
315+
}
316+
317+
if(sqlGroup != null && CollectionUtils.isNotEmpty(sqlGroup.getList())){
318+
for( int i=0; i<sqlGroup.getList().size(); i++){
319+
SqlNode selectNode = sqlGroup.getList().get(i);
320+
SqlNode replaceNode = ParseUtils.replaceNodeInfo(selectNode, joinInfo.getLeftTabMapping());
321+
sqlGroup.set(i, replaceNode);
322+
}
323+
}
324+
}
325+
212326
//SideJoinInfo rename
213327
if(joinInfo.checkIsSide()){
214328
joinInfo.setSelectFields(sqlNode.getSelectList());
@@ -228,30 +342,52 @@ private void dealSelectResultWithJoinInfo(JoinInfo joinInfo, SqlSelect sqlNode,
228342

229343
queueInfo.offer(joinInfo);
230344
}
345+
replaceFromNodeForJoin(joinInfo, sqlNode);
231346

232-
//Update from node
233-
SqlOperator operator = new SqlAsOperator();
234-
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
235-
String joinLeftTableName = joinInfo.getLeftTableName();
236-
String joinLeftTableAlias = joinInfo.getLeftTableAlias();
237-
joinLeftTableName = Strings.isNullOrEmpty(joinLeftTableName) ? joinLeftTableAlias : joinLeftTableName;
238-
String newTableName = joinLeftTableName + "_" + joinInfo.getRightTableName();
239-
String newTableAlias = joinInfo.getLeftTableAlias() + "_" + joinInfo.getRightTableAlias();
240-
SqlIdentifier sqlIdentifier = new SqlIdentifier(newTableName, null, sqlParserPos);
241-
SqlIdentifier sqlIdentifierAlias = new SqlIdentifier(newTableAlias, null, sqlParserPos);
242-
SqlNode[] sqlNodes = new SqlNode[2];
243-
sqlNodes[0] = sqlIdentifier;
244-
sqlNodes[1] = sqlIdentifierAlias;
245-
SqlBasicCall sqlBasicCall = new SqlBasicCall(operator, sqlNodes, sqlParserPos);
246-
sqlNode.setFrom(sqlBasicCall);
247347
}
248348
}
249349

350+
private void replaceFromNodeForJoin(JoinInfo joinInfo, SqlSelect sqlNode) {
351+
//Update from node
352+
SqlBasicCall sqlBasicCall = buildAsNodeByJoinInfo(joinInfo, null);
353+
sqlNode.setFrom(sqlBasicCall);
354+
}
355+
356+
private SqlBasicCall buildAsNodeByJoinInfo(JoinInfo joinInfo, SqlNode sqlNode0) {
357+
SqlOperator operator = new SqlAsOperator();
358+
359+
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
360+
String joinLeftTableName = joinInfo.getLeftTableName();
361+
String joinLeftTableAlias = joinInfo.getLeftTableAlias();
362+
joinLeftTableName = Strings.isNullOrEmpty(joinLeftTableName) ? joinLeftTableAlias : joinLeftTableName;
363+
String newTableName = buidTableName(joinLeftTableName, SPLIT, joinInfo.getRightTableName());
364+
String newTableAlias = buidTableName(joinInfo.getLeftTableAlias(), SPLIT, joinInfo.getRightTableAlias());
365+
366+
// mid table alias a_b_0
367+
if (null != sqlNode0) {
368+
newTableAlias += "_0";
369+
}
370+
371+
if (null == sqlNode0) {
372+
sqlNode0 = new SqlIdentifier(newTableName, null, sqlParserPos);
373+
}
374+
375+
SqlIdentifier sqlIdentifierAlias = new SqlIdentifier(newTableAlias, null, sqlParserPos);
376+
SqlNode[] sqlNodes = new SqlNode[2];
377+
sqlNodes[0] = sqlNode0;
378+
sqlNodes[1] = sqlIdentifierAlias;
379+
return new SqlBasicCall(operator, sqlNodes, sqlParserPos);
380+
}
381+
382+
private String buidTableName(String left, char split, String right) {
383+
StringBuilder sb = new StringBuilder();
384+
return sb.append(left).append(split).append(right).toString();
385+
}
386+
250387
private boolean checkIsSideTable(String tableName, Set<String> sideTableList){
251388
if(sideTableList.contains(tableName)){
252389
return true;
253390
}
254-
255391
return false;
256392
}
257393
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
112112
}
113113

114114
if(pollSqlNode.getKind() == INSERT){
115+
System.out.println("----------real exec sql-----------" );
116+
System.out.println(pollSqlNode.toString());
115117
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString());
116118
if(LOG.isInfoEnabled()){
117119
LOG.info("exec sql: " + pollSqlNode.toString());

0 commit comments

Comments
 (0)