11package net .sansa_stack .inference .rules .plan
22
33import java .io .PrintWriter
4+ import java .util .Collections
45
6+ import com .google .common .collect .ImmutableList
57import org .apache .calcite .config .Lex
6- import org .apache .calcite .plan .{ Contexts , ConventionTraitDef , RelTrait , RelTraitDef }
8+ import org .apache .calcite .plan ._
79import org .apache .calcite .rel .{RelCollationTraitDef , RelNode }
810import org .apache .calcite .schema .SchemaPlus
911import org .apache .calcite .sql .parser .SqlParser
10- import org .apache .calcite .tools .{FrameworkConfig , Frameworks , Planner , RuleSets }
12+ import org .apache .calcite .tools ._
13+
1114import collection .JavaConverters ._
1215import scala .util .Try
13-
1416import org .apache .calcite .rel .`type` .RelDataTypeSystem
1517import org .apache .calcite .rel .externalize .RelWriterImpl
16- import org .apache .calcite .rel .rules .{ FilterJoinRule , ProjectJoinTransposeRule }
18+ import org .apache .calcite .rel .rules ._
1719import org .apache .jena .reasoner .rulesys .Rule
18-
1920import net .sansa_stack .inference .utils .RuleUtils
21+ import org .apache .calcite .adapter .enumerable .{EnumerableConvention , EnumerableRules }
22+ import org .apache .calcite .interpreter .{BindableConvention , Bindables }
23+ import org .apache .calcite .plan .RelOptPlanner .CannotPlanException
24+ import org .apache .calcite .plan .hep .{HepMatchOrder , HepPlanner , HepProgramBuilder }
25+ import org .apache .calcite .plan .volcano .VolcanoPlanner
26+ import org .apache .calcite .sql2rel .{RelDecorrelator , SqlToRelConverter }
2027
2128/**
2229 * @author Lorenz Buehmann
2330 */
2431class SimplePlanGenerator (schema : SchemaPlus ) {
2532
26- val traitDefs : List [RelTraitDef [_ <: RelTrait ]] = List (ConventionTraitDef .INSTANCE , RelCollationTraitDef .INSTANCE )
33+ val traitDefs : List [RelTraitDef [_ <: RelTrait ]] = List (
34+ // BindableConvention.INSTANCE.getTraitDef,
35+ ConventionTraitDef .INSTANCE
36+ // ,
37+ ,RelCollationTraitDef .INSTANCE
38+ )
2739
2840 val optRuleSet = RuleSets .ofList(
2941 FilterJoinRule .FILTER_ON_JOIN ,// push a filter into a join
3042 FilterJoinRule .JOIN ,// push filter into the children of a join
3143 ProjectJoinTransposeRule .INSTANCE // push a projection to the children of a join
44+ // ,
45+ // // push and merge filter rules
46+ // FilterAggregateTransposeRule.INSTANCE,
47+ // FilterProjectTransposeRule.INSTANCE,
48+ // FilterMergeRule.INSTANCE,
49+ // FilterJoinRule.FILTER_ON_JOIN,
50+ // FilterJoinRule.JOIN, /*push filter into the children of a join*/
51+ // FilterTableScanRule.INSTANCE,
52+ // // push and merge projection rules
53+ // /*check the effectiveness of pushing down projections*/
54+ // ProjectRemoveRule.INSTANCE,
55+ // ProjectJoinTransposeRule.INSTANCE
56+ )
57+
58+ var program = Programs .ofRules(
59+ // push and merge filter rules
60+ FilterProjectTransposeRule .INSTANCE ,
61+ FilterMergeRule .INSTANCE ,
62+ FilterJoinRule .FILTER_ON_JOIN ,
63+ FilterJoinRule .JOIN , /* push filter into the children of a join*/
64+ FilterTableScanRule .INSTANCE ,
65+ // push and merge projection rules
66+ /* check the effectiveness of pushing down projections*/
67+ ProjectRemoveRule .INSTANCE ,
68+ ProjectJoinTransposeRule .INSTANCE ,
69+ // JoinProjectTransposeRule.BOTH_PROJECT,
70+ // ProjectFilterTransposeRule.INSTANCE, /*it is better to use filter first an then project*/
71+ ProjectTableScanRule .INSTANCE ,
72+ ProjectWindowTransposeRule .INSTANCE ,
73+ ProjectMergeRule .INSTANCE ,
74+ // join rules
75+ /* A simple trick is to consider a window size equal to stream cardinality.
76+ * For tuple-based windows, the window size is equal to the number of tuples.
77+ * For time-based windows, the window size is equal to the (input_rate*time_of_window).*/
78+ // JoinToMultiJoinRule.INSTANCE ,
79+ // LoptOptimizeJoinRule.INSTANCE ,
80+ // MultiJoinOptimizeBushyRule.INSTANCE,
81+ JoinPushThroughJoinRule .RIGHT ,
82+ JoinPushThroughJoinRule .LEFT , /* choose between right and left*/
83+ JoinPushExpressionsRule .INSTANCE ,
84+ JoinAssociateRule .INSTANCE
3285 )
86+ program = Programs .ofRules(Bindables .RULES )
3387
3488 val calciteFrameworkConfig : FrameworkConfig =
3589 Frameworks .newConfigBuilder
@@ -47,6 +101,7 @@ class SimplePlanGenerator(schema: SchemaPlus) {
47101 .ruleSets(optRuleSet)
48102 // Custom cost factory to use during optimization
49103 .costFactory(null )
104+ // .programs(program)
50105 .typeSystem(RelDataTypeSystem .DEFAULT )
51106 .build()
52107
@@ -61,7 +116,6 @@ class SimplePlanGenerator(schema: SchemaPlus) {
61116 * @return the root node of the logical plan
62117 */
63118 def generateLogicalPlan (rule : Rule ): RelNode = {
64- planner.close()
65119 // generate SQL query
66120 val sqlQuery = sqlGenerator.generateSQLQuery(rule)
67121 println(sqlQuery)
@@ -73,7 +127,186 @@ class SimplePlanGenerator(schema: SchemaPlus) {
73127 val validatedSqlNode = planner.validate(sqlNode.get)
74128
75129 // return the root node
76- planner.rel(validatedSqlNode).project
130+ var root = planner.rel(validatedSqlNode).project() // .project
131+ // root.register(basePlanner)
132+ //
133+ //
134+ // basePlanner.setRoot(root)
135+ // optimizePlan(root)
136+
137+ import org .apache .calcite .adapter .enumerable .EnumerableConvention
138+ import org .apache .calcite .plan .RelTraitSet
139+ val desiredTraits = root.getTraitSet.replace(EnumerableConvention .INSTANCE ).simplify()
140+
141+ // root = root.getCluster.getPlanner.chooseDelegate.changeTraits(root, desiredTraits)
142+
143+ val node = planner.transform(0 ,
144+ root.getTraitSet.simplify().plus(BindableConvention .INSTANCE ),
145+ // planner.getEmptyTraitSet
146+ // .replace(ConventionTraitDef.INSTANCE.getDefault)
147+ // .replace(RelCollationTraitDef.INSTANCE.getDefault),
148+ root)
149+ planner.reset()
150+ node
151+ }
152+
153+ val DATASET_NORM_RULES : RuleSet = RuleSets .ofList(
154+ // simplify expressions rules
155+ ReduceExpressionsRule .FILTER_INSTANCE ,
156+ ReduceExpressionsRule .PROJECT_INSTANCE ,
157+ ReduceExpressionsRule .CALC_INSTANCE ,
158+ ReduceExpressionsRule .JOIN_INSTANCE ,
159+ ProjectToWindowRule .PROJECT
160+ )
161+
162+ val DATASET_OPT_RULES : RuleSet = RuleSets .ofList(
163+
164+ // convert a logical table scan to a relational expression
165+ TableScanRule .INSTANCE ,
166+ // EnumerableToLogicalTableScan.INSTANCE,
167+
168+ // push a filter into a join
169+ FilterJoinRule .FILTER_ON_JOIN ,
170+ // push filter into the children of a join
171+ FilterJoinRule .JOIN ,
172+ // push filter through an aggregation
173+ FilterAggregateTransposeRule .INSTANCE ,
174+
175+ // aggregation and projection rules
176+ AggregateProjectMergeRule .INSTANCE ,
177+ AggregateProjectPullUpConstantsRule .INSTANCE ,
178+ // push a projection past a filter or vice versa
179+ ProjectFilterTransposeRule .INSTANCE ,
180+ FilterProjectTransposeRule .INSTANCE ,
181+ // push a projection to the children of a join
182+ ProjectJoinTransposeRule .INSTANCE ,
183+ // remove identity project
184+ ProjectRemoveRule .INSTANCE ,
185+ // reorder sort and projection
186+ SortProjectTransposeRule .INSTANCE ,
187+ ProjectSortTransposeRule .INSTANCE ,
188+
189+ // join rules
190+ JoinPushExpressionsRule .INSTANCE ,
191+
192+ // remove union with only a single child
193+ UnionEliminatorRule .INSTANCE ,
194+ // convert non-all union into all-union + distinct
195+ UnionToDistinctRule .INSTANCE ,
196+
197+ // remove aggregation if it does not aggregate and input is already distinct
198+ AggregateRemoveRule .INSTANCE ,
199+ // push aggregate through join
200+ AggregateJoinTransposeRule .EXTENDED ,
201+ // aggregate union rule
202+ AggregateUnionAggregateRule .INSTANCE ,
203+ // expand distinct aggregate to normal aggregate with groupby
204+ AggregateExpandDistinctAggregatesRule .JOIN ,
205+
206+ // remove unnecessary sort rule
207+ SortRemoveRule .INSTANCE ,
208+
209+ // prune empty results rules
210+ PruneEmptyRules .AGGREGATE_INSTANCE ,
211+ PruneEmptyRules .FILTER_INSTANCE ,
212+ PruneEmptyRules .JOIN_LEFT_INSTANCE ,
213+ PruneEmptyRules .JOIN_RIGHT_INSTANCE ,
214+ PruneEmptyRules .PROJECT_INSTANCE ,
215+ PruneEmptyRules .SORT_INSTANCE ,
216+ PruneEmptyRules .UNION_INSTANCE ,
217+
218+ // calc rules
219+ FilterCalcMergeRule .INSTANCE ,
220+ ProjectCalcMergeRule .INSTANCE ,
221+ FilterToCalcRule .INSTANCE ,
222+ ProjectToCalcRule .INSTANCE ,
223+ CalcMergeRule .INSTANCE
224+
225+ )
226+
227+ def optimizePlan (relNode : RelNode ): RelNode = {
228+
229+ def runHepPlanner (
230+ hepMatchOrder : HepMatchOrder ,
231+ ruleSet : RuleSet ,
232+ input : RelNode ,
233+ targetTraits : RelTraitSet ): RelNode = {
234+ val builder = new HepProgramBuilder
235+ builder.addMatchOrder(hepMatchOrder)
236+
237+ val it = ruleSet.iterator()
238+ while (it.hasNext) {
239+ builder.addRuleInstance(it.next())
240+ }
241+
242+ val planner = new HepPlanner (builder.build, calciteFrameworkConfig.getContext)
243+ planner.setRoot(input)
244+ if (input.getTraitSet != targetTraits) {
245+ planner.changeTraits(input, targetTraits.simplify)
246+ }
247+ planner.findBestExp
248+ }
249+
250+ def runVolcanoPlanner (
251+ ruleSet : RuleSet ,
252+ input : RelNode ,
253+ targetTraits : RelTraitSet ): RelNode = {
254+ val optProgram = Programs .ofRules(ruleSet)
255+
256+ val output = try {
257+ optProgram.run(basePlanner, input, targetTraits,
258+ ImmutableList .of(), ImmutableList .of())
259+ } catch {
260+ case e : Exception =>
261+ throw e
262+ }
263+ output
264+ }
265+
266+ // 1. decorrelate
267+ val decorPlan = RelDecorrelator .decorrelateQuery(relNode)
268+
269+ // 2. normalize the logical plan
270+ val normRuleSet = DATASET_NORM_RULES
271+ val normalizedPlan = if (normRuleSet.iterator().hasNext) {
272+ runHepPlanner(HepMatchOrder .BOTTOM_UP , normRuleSet, decorPlan, decorPlan.getTraitSet)
273+ } else {
274+ decorPlan
275+ }
276+
277+ // 3. optimize the logical Flink plan
278+ val optRuleSet = DATASET_OPT_RULES
279+ val flinkOutputProps = relNode.getTraitSet.replace(ConventionTraitDef .INSTANCE .getDefault).simplify()
280+ val optimizedPlan = if (optRuleSet.iterator().hasNext) {
281+ runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps)
282+ } else {
283+ normalizedPlan
284+ }
285+
286+ optimizedPlan
287+ }
288+
289+ val basePlanner : RelOptPlanner = {
290+ val planner = new VolcanoPlanner (calciteFrameworkConfig.getCostFactory, Contexts .empty())
291+ planner.setExecutor(calciteFrameworkConfig.getExecutor)
292+ planner.addRelTraitDef(ConventionTraitDef .INSTANCE )
293+ planner
294+ }
295+
296+ def plan (rule : Rule ) = {
297+ // // generate SQL query
298+ // val sqlQuery = sqlGenerator.generateSQLQuery(rule)
299+ //
300+ // val sqlParser = SqlParser.create(sqlQuery, SqlParser.configBuilder().build())
301+ // val sqlNode = sqlParser.parseStmt()
302+ // val catalogReader = null
303+ //
304+ // val pl = new VolcanoPlanner()
305+ //
306+ // val sqlToRelConverter = new SqlToRelConverter(new ViewExpanderImpl(), )
307+ //
308+ // program.run(pl, , planner.getEmptyTraitSet, Collections.emptyList(), Collections.emptyList())
309+
77310 }
78311
79312 /**
0 commit comments