Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit d3c3899

Browse files
Update: return the inferred graph instead of inferred axioms
1 parent 875ed40 commit d3c3899

3 files changed

Lines changed: 88 additions & 16 deletions

File tree

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/forwardchaining/axioms/ForwardRuleReasonerRDFS.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,23 @@ class ForwardRuleReasonerRDFS(sc: SparkContext, parallelism: Int = 2) extends Lo
9696
* y rdfs:subClassOf z . x rdfs:subClassOf z .
9797
*/
9898

99+
// val subClassOf = subClassofAxiom.asInstanceOf[RDD[OWLSubClassOfAxiom]]
100+
// val op1 = subClassOf.map { a => (a.getSubClass, a.getSuperClass) }
101+
// val op2 = subClassOf.map { a => (a.getSuperClass, a.getSubClass) }
102+
// val subClass_ExtVP_OS = op2.join(op1).map(a => (a._1, a._2._1)).distinct()
103+
// val subClass_ExtVP_SO = op1.join(op2).map(a => (a._1, a._2._1)).distinct()
104+
//
105+
// var startTime = System.currentTimeMillis()
106+
// val Trans = subClass_ExtVP_OS.join(subClass_ExtVP_SO).map(a => (a._2._1, a._2._2))
107+
// var endTime = System.currentTimeMillis() - startTime
108+
// println("\n...first --- " + (endTime) + " milisec.")
109+
//
110+
// val res = Trans.map(a => dataFactory.getOWLSubClassOfAxiom(a._1, a._2)).distinct
111+
//
112+
// println("\n ---- Trans ----\n")
113+
// Trans.collect.foreach(println(_))
114+
// startTime = System.currentTimeMillis()
115+
99116
val tr = new TransitiveReasoner()
100117
val subClassOfAxiomsTrans = tr.computeTransitiveClosure(subClassofAxiom, AxiomType.SUBCLASS_OF)
101118
.setName("rdfs11")

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/forwardchaining/axioms/TransitiveReasoner.scala

Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package net.sansa_stack.inference.spark.forwardchaining.axioms
22

33
import org.apache.spark.broadcast.Broadcast
44
import org.apache.spark.rdd.RDD
5+
import org.apache.spark.HashPartitioner
56
import org.semanticweb.owlapi.apibinding.OWLManager
67
import org.semanticweb.owlapi.model._
78

@@ -15,16 +16,21 @@ class TransitiveReasoner extends Serializable {
1516

1617
val m = OWLManager.createOWLOntologyManager()
1718
val f: OWLDataFactory = m.getOWLDataFactory
19+
val parallism = 30
20+
val partitioner = new HashPartitioner(parallism)
1821

1922
def computeTransitiveClosure(axioms: RDD[OWLAxiom], T: AxiomType[_]): RDD[OWLAxiom] = {
20-
if (axioms.isEmpty()) return axioms
23+
24+
axioms.cache()
25+
26+
if (axioms.partitions.isEmpty) return axioms
2127

2228
val tcAxiom: RDD[OWLAxiom] = T match {
2329
case AxiomType.SUBCLASS_OF =>
2430
// we only need (s, o)
2531
val subjectObjectPairs = axioms.asInstanceOf[RDD[OWLSubClassOfAxiom]]
26-
.map { a => (a.getSubClass, a.getSuperClass) }
27-
val tc: RDD[(OWLClassExpression, OWLClassExpression)] = computeTransitiveClosure(subjectObjectPairs)
32+
.map(a => (a.getSubClass, a.getSuperClass))
33+
val tc = computeTransitiveClosure(subjectObjectPairs)
2834
tc.map(x => f.getOWLSubClassOfAxiom(x._1, x._2)).asInstanceOf[RDD[OWLAxiom]]
2935

3036
case AxiomType.SUB_DATA_PROPERTY =>
@@ -53,36 +59,84 @@ class TransitiveReasoner extends Serializable {
5359
return tcAxiom
5460
}
5561

62+
5663
def computeTransitiveClosure[A: ClassTag](pairs: RDD[(A, A)]): RDD[(A, A)] = {
64+
var nextCount = 1L
5765
var tc = pairs
5866
tc.cache()
5967

60-
// get the pairs in reversed order because join() performs on keys
61-
val reversedPairs = tc.map(t => (t._2, t._1)).cache()
68+
var q = tc
69+
var l = q.map(t => t.swap).partitionBy(partitioner) // (t._2, t._1)
6270

63-
// the join is iterated until a fixed point is reached
64-
var oldCount = 0L
65-
var nextCount = tc.count()
66-
do {
67-
oldCount = nextCount
71+
do{
72+
val r = q.partitionBy(partitioner) // .map(t => (t._1, t._2))
6873

6974
// perform the join on (a, b).(b, a) to get RDD of
7075
// (a=b, (b, a)) then map the output to get the new (a, b) paths
71-
tc = tc.union(tc.join(reversedPairs).map(a => (a._2._2, a._2._1)))
72-
.distinct().cache()
73-
74-
nextCount = tc.count()
75-
} while (nextCount != oldCount)
76-
76+
val q1 = l.join(r).map(t => (t._2._1, t._2._2))
77+
q = q1.subtract(tc, parallism).cache() // q contains only the new inferred axioms
78+
nextCount = q.count()
79+
if(nextCount != 0) {
80+
l = q.map(t => (t._2, t._1)).partitionBy(partitioner)
81+
val s = tc.map(t => (t._1, t._2)).partitionBy(partitioner)
82+
val tc1 = s.join(l).map(t => (t._2._2, t._2._1)).cache()
83+
tc = tc1.union(tc).union(q)
84+
}
85+
} while (nextCount != 0)
7786
tc
7887
}
7988

89+
// def computeTransitiveClosure[A: ClassTag](pairs: RDD[(A, A)]): RDD[(A, A)] = {
90+
// var tc = pairs
91+
// tc.cache()
92+
//
93+
// // get the pairs in reversed order because join() performs on keys
94+
// val reversedPairs = tc.map(t => t.swap).partitionBy(partitioner)
95+
// reversedPairs.cache()
96+
//
97+
// // the join is iterated until a fixed point is reached
98+
// var oldCount = 0L
99+
// var nextCount = tc.count()
100+
// do {
101+
// oldCount = nextCount
102+
//
103+
// // perform the join on (a, b).(b, a) to get RDD of
104+
// // (a=b, (b, a)) then map the output to get the new (a, b) paths
105+
// // var startTime = System.currentTimeMillis()
106+
// // tc = tc.join(reversedPairs).map(a => (a._2._2, a._2._1))
107+
// // var endTime = System.currentTimeMillis() - startTime
108+
// // println("\n...second " + (endTime) + " millisec.")
109+
// // tc = tc.union(tc).distinct().cache()
110+
// val tc1 = tc.join(reversedPairs).map(a => (a._2._2, a._2._1))
111+
// tc = tc.union(tc1).cache()
112+
// nextCount = tc.count()
113+
// } while (nextCount != oldCount)
114+
//
115+
// tc.distinct()
116+
// }
117+
80118
def computeTransitiveClosure(asserstion: RDD[OWLObjectPropertyAssertionAxiom]): RDD[OWLObjectPropertyAssertionAxiom] = {
81119

82120
if (asserstion.isEmpty()) return asserstion
83121

122+
// val soPairs = asserstion.map{ a => (a.getSubject, a.getObject)}
123+
// val osPairs = asserstion.map{ a => (a.getObject, a.getSubject)}
124+
//
125+
// val O4_ExtVP_OS = osPairs.join(soPairs).map(a => (a._1, a._2._1))
126+
// val O4_ExtVP_SO = soPairs.join(osPairs).map(a => (a._1, a._2._1))
127+
// val O4_ExtVP = O4_ExtVP_OS.join(O4_ExtVP_SO).map(a => (a._2._1, a._2._2)).distinct()
128+
//
129+
// var startTime = System.currentTimeMillis()
130+
// var tc = computeTransitiveClosure(O4_ExtVP_OS)
131+
// tc = tc.union(computeTransitiveClosure(O4_ExtVP_SO))
132+
// var endTime = System.currentTimeMillis() - startTime
133+
// println("\n1 - ...O4 object assertion " + (endTime) + " millisec.")
134+
84135
val subjectObjectPairs = asserstion.map{ a => (a.getSubject, a.getObject)}
136+
// startTime = System.currentTimeMillis()
85137
val tc = computeTransitiveClosure(subjectObjectPairs)
138+
// endTime = System.currentTimeMillis() - startTime
139+
// println("\n2 - ... O4 object assertion " + (endTime) + " millisec.")
86140
val prop: OWLObjectPropertyExpression = asserstion.first().getProperty
87141

88142
tc.map(x => f.getOWLObjectPropertyAssertionAxiom(prop, x._1, x._2))

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/rules/plan/PlanExecutorNative.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import org.apache.spark.rdd.RDD
1212
import org.apache.spark.sql.SparkSession
1313
import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, EqualTo, Expression, IsNotNull, NamedExpression}
1414
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
15+
import org.apache.spark.sql.catalyst.AliasIdentifier
1516
import org.apache.spark.sql.catalyst.plans.{Inner, logical}
1617
import net.sansa_stack.inference.spark.data.model.{EmptyRDFGraphDataFrame, RDFGraphNative}
1718
import net.sansa_stack.inference.utils.{Logging, Tuple0}

0 commit comments

Comments
 (0)