Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit edbdfe3

Browse files
Use less UNION operations and reverted execution of the HLRDG layers.
1 parent ae1674f commit edbdfe3

1 file changed

Lines changed: 47 additions & 6 deletions

File tree

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/forwardchaining/ForwardRuleReasonerOptimized.scala

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ abstract class ForwardRuleReasonerOptimized[V, G <: AbstractRDFGraph[V, G]]
5252
var newGraph = graph.cache()
5353

5454
// generate the rule dependency graph
55-
val dependencyGraph = RuleDependencyGraphGenerator.generate(rules)
55+
val dependencyGraph = RuleDependencyGraphGenerator.generate(rules, pruned = true)
5656

5757
// generate the high-level dependency graph
5858
val highLevelDependencyGraph = HighLevelRuleDependencyGraphGenerator.generate(dependencyGraph)
5959

6060
// apply topological sort and get the layers
61-
val layers = highLevelDependencyGraph.layers()
61+
val layers = highLevelDependencyGraph.layers().foldLeft(List[(Int, scala.Iterable[RuleDependencyGraph])]())((b, a) => a :: b)
6262

6363
// each layer contains a set of rule dependency graphs
6464
// for each layer we process those
@@ -84,12 +84,53 @@ abstract class ForwardRuleReasonerOptimized[V, G <: AbstractRDFGraph[V, G]]
8484

8585
var newGraph = graph
8686

87-
layer._2.foreach{rdg =>
87+
val processedRDGs = layer._2.map{rdg =>
8888
logger.info("Processing dependency graph " + rdg.printNodes())
89-
newGraph = newGraph.union(applyRules(rdg.rules().toSeq, newGraph)).distinct().cache()
90-
unionCnt += 1
91-
distinctCnt += 1
89+
applyRules(rdg, newGraph)
90+
}
91+
92+
newGraph = newGraph.unionAll(processedRDGs.toSeq).distinct().cache()
93+
unionCnt += 1
94+
distinctCnt += 1
95+
96+
newGraph
97+
}
98+
99+
/**
100+
* Apply the set of rules on the given graph by doing fix-point iteration.
101+
*
102+
* @param rdg the rule dependency graph
103+
* @param graph the RDF graph
104+
*/
105+
def applyRules(rdg: RuleDependencyGraph, graph: G): G = {
106+
var newGraph = graph.cache()
107+
108+
val rules = rdg.rules().toSeq
109+
110+
if(rdg.hasCycle()) {
111+
var newGraph = graph.cache()
112+
var iteration = 1
113+
var oldCount = 0L
114+
var nextCount = newGraph.size()
115+
logger.info(s"initial size:$nextCount")
116+
117+
do {
118+
logger.info("Iteration " + iteration)
119+
iteration += 1
120+
oldCount = nextCount
121+
122+
newGraph = newGraph.union(applyRulesOnce(rules, newGraph)).distinct().cache()
123+
unionCnt += 1
124+
distinctCnt += 1
125+
126+
nextCount = newGraph.size()
127+
logger.info(s"new size:$nextCount")
128+
countCnt += 1
129+
} while (nextCount != oldCount)
130+
} else {
131+
newGraph = newGraph.union(applyRulesOnce(rules, newGraph))
92132
}
133+
93134
newGraph
94135
}
95136

0 commit comments

Comments
 (0)