Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit de347e4

Browse files
Cntd. first backward chaining impl for Spark.
1 parent cce6848 commit de347e4

1 file changed

Lines changed: 39 additions & 0 deletions

File tree

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/backwardchaining/BackwardChainingReasoner.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package net.sansa_stack.inference.spark.backwardchaining
22

33
import java.io.PrintWriter
44

5+
import org.apache.calcite.interpreter.Bindables.{BindableFilter, BindableJoin, BindableProject}
6+
import org.apache.calcite.rel.{RelNode, RelVisitor}
7+
58
import net.sansa_stack.inference.rules.RuleSets
69
import net.sansa_stack.inference.rules.plan.{SimplePlanGenerator, SimpleSQLGenerator, TriplesSchema}
710
import net.sansa_stack.inference.spark.backwardchaining.tree.{AndNode, OrNode}
@@ -13,11 +16,14 @@ import org.apache.jena.reasoner.TriplePattern
1316
import org.apache.jena.reasoner.rulesys.Rule
1417
import org.apache.jena.reasoner.rulesys.impl.BindingVector
1518
import org.apache.jena.vocabulary.RDF
19+
1620
import net.sansa_stack.inference.utils.RuleUtils._
1721
import net.sansa_stack.inference.utils.TripleUtils._
1822
import org.apache.calcite.rel.externalize.RelWriterImpl
23+
import org.apache.calcite.rex.RexCall
1924
import org.apache.spark.rdd.RDD
2025
import org.apache.spark.sql.SparkSession
26+
import org.apache.spark.sql.catalyst.plans.logical.Project
2127

2228
/**
2329
* @author Lorenz Buehmann
@@ -56,6 +62,8 @@ class BackwardChainingReasoner(val rules: Set[Rule], val graph: RDFGraph) extend
5662
false
5763
}
5864

65+
val planGenerator = new SimplePlanGenerator(TriplesSchema.get())
66+
5967
private def processTree(tree: AndNode): RDD[Triple] = {
6068
// 1. look for asserted triples in the graph
6169
var rdd = graph.triples // lookup(tree.element)
@@ -64,6 +72,8 @@ class BackwardChainingReasoner(val rules: Set[Rule], val graph: RDFGraph) extend
6472
tree.children.foreach(child => {
6573
println(s"processing rule ${child.element}")
6674

75+
processRule(child.element)
76+
6777
val targetTp = child.element.headTriplePatterns().head
6878

6979
// recursively process each instantiated body atom of the rule
@@ -82,6 +92,35 @@ class BackwardChainingReasoner(val rules: Set[Rule], val graph: RDFGraph) extend
8292
rdd
8393
}
8494

95+
class RDDRelVisitor(rdd: RDD[Triple]) extends RelVisitor {
96+
override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
97+
println(node)
98+
99+
val rdd = node match {
100+
case project: BindableProject =>
101+
102+
103+
case join: BindableJoin =>
104+
105+
106+
case filter: BindableFilter =>
107+
val operands = filter.getCondition.asInstanceOf[RexCall].getOperands
108+
operands.get(0).
109+
110+
case _ =>
111+
}
112+
113+
super.visit(node, ordinal, parent)
114+
}
115+
116+
override def go(node: RelNode): RelNode = super.go(node)
117+
}
118+
119+
private def processRule(rule: Rule) = {
120+
val plan = planGenerator.generateLogicalPlan(rule)
121+
new RDDRelVisitor(graph.triples).go(plan)
122+
}
123+
85124
private def selectedVars(body: TriplePattern, head: TriplePattern): Seq[Int] = {
86125
var selectedIndexes: Seq[Int] = Seq()
87126

0 commit comments

Comments
 (0)