Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 1bfd712

Browse files
Dedicated transitive reasoner class.
1 parent 3f14795 commit 1bfd712

3 files changed

Lines changed: 200 additions & 142 deletions

File tree

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/forwardchaining/ForwardRuleReasoner.scala

Lines changed: 1 addition & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import scala.reflect.ClassTag
1414
*
1515
* @author Lorenz Buehmann
1616
*/
17-
trait ForwardRuleReasoner extends Profiler{
17+
trait ForwardRuleReasoner extends Profiler with TransitiveReasoner{
1818

1919
/**
2020
* Applies forward chaining to the given RDF graph and returns a new RDF graph that contains all additional
@@ -25,137 +25,6 @@ trait ForwardRuleReasoner extends Profiler{
2525
*/
2626
def apply(graph: RDFGraph) : RDFGraph
2727

28-
// def computeTransitiveClosure[A, B, C](s: mutable.Set[(A, B, C)]): mutable.Set[(A, B, C)] = {
29-
// val t = addTransitive(s)
30-
// // recursive call if set changed, otherwise stop and return
31-
// if (t.size == s.size) s else computeTransitiveClosure(t)
32-
// }
33-
34-
def computeTransitiveClosure(s: mutable.Set[RDFTriple]): mutable.Set[RDFTriple] = {
35-
val t = addTransitive(s)
36-
// recursive call if set changed, otherwise stop and return
37-
if (t.size == s.size) s else computeTransitiveClosure(t)
38-
}
39-
40-
// def addTransitive[A, B, C](s: mutable.Set[(A, B, C)]) = {
41-
// s ++ (for ((s1, p1, o1) <- s; (s2, p2, o2) <- s if o1 == s2) yield (s1, p1, o2))
42-
// }
43-
44-
def addTransitive(s: mutable.Set[RDFTriple]) = {
45-
s ++ (for (t1 <- s; t2 <- s if t1.`object` == t2.subject) yield RDFTriple(t1.subject, t1.predicate, t2.`object`))
46-
}
47-
48-
def computeTransitiveClosure(triples: RDD[RDFTriple]): RDD[RDFTriple] = {
49-
if(triples.isEmpty()) return triples
50-
log.info("computing TC...")
51-
52-
profile {
53-
// keep the predicate
54-
val predicate = triples.take(1)(0).predicate
55-
56-
// compute the TC
57-
var subjectObjectPairs = triples.map(t => (t.subject, t.`object`)).cache()
58-
59-
// because join() joins on keys, in addition the pairs are stored in reversed order (o, s)
60-
val objectSubjectPairs = subjectObjectPairs.map(t => (t._2, t._1))
61-
62-
// the join is iterated until a fixed point is reached
63-
var i = 1
64-
var oldCount = 0L
65-
var nextCount = triples.count()
66-
do {
67-
log.info(s"iteration $i...")
68-
oldCount = nextCount
69-
// perform the join (s1, o1) x (o2, s2), obtaining an RDD of (s1=o2, (o1, s2)) pairs,
70-
// then project the result to obtain the new (s2, o1) paths.
71-
subjectObjectPairs = subjectObjectPairs
72-
.union(subjectObjectPairs.join(objectSubjectPairs).map(x => (x._2._2, x._2._1)))
73-
.filter(tuple => tuple._1 != tuple._2) // omit (s1, s1)
74-
.distinct()
75-
.cache()
76-
nextCount = subjectObjectPairs.count()
77-
i += 1
78-
} while (nextCount != oldCount)
79-
80-
println("TC has " + nextCount + " triples.")
81-
subjectObjectPairs.map(p => new RDFTriple(p._1, predicate, p._2))
82-
}
83-
}
84-
85-
def computeTransitiveClosure[A:ClassTag](edges: RDD[(A, A)]): RDD[(A, A)] = {
86-
log.info("computing TC...")
87-
// we keep the transitive closure cached
88-
var tc = edges
89-
tc.cache()
90-
91-
// because join() joins on keys, in addition the pairs are stored in reversed order (o, s)
92-
val edgesReversed = tc.map(t => (t._2, t._1))
93-
94-
// the join is iterated until a fixed point is reached
95-
var i = 1
96-
var oldCount = 0L
97-
var nextCount = tc.count()
98-
do {
99-
log.info(s"iteration $i...")
100-
oldCount = nextCount
101-
// perform the join (x, y) x (y, x), obtaining an RDD of (x=y, (y, x)) pairs,
102-
// then project the result to obtain the new (x, y) paths.
103-
tc = tc
104-
.union(tc.join(edgesReversed).map(x => (x._2._2, x._2._1)))
105-
.distinct()
106-
.cache()
107-
nextCount = tc.count()
108-
i += 1
109-
} while (nextCount != oldCount)
110-
111-
println("TC has " + nextCount + " edges.")
112-
tc
113-
}
114-
115-
def computeTransitiveClosure[A:ClassTag](edges: DataFrame): DataFrame = {
116-
log.info("computing TC...")
117-
118-
profile {
119-
// we keep the transitive closure cached
120-
var tc = edges
121-
tc.cache()
122-
123-
// the join is iterated until a fixed point is reached
124-
var i = 1
125-
var oldCount = 0L
126-
var nextCount = tc.count()
127-
do {
128-
log.info(s"iteration $i...")
129-
oldCount = nextCount
130-
131-
// val df1 = tc.alias("df1")
132-
// val df2 = tc.alias("df2")
133-
// perform the join (x, y) x (y, x), obtaining an RDD of (x=y, (y, x)) pairs,
134-
// then project the result to obtain the new (x, y) paths.
135-
136-
tc.createOrReplaceTempView("SC")
137-
var joined = tc.sqlContext.sql("SELECT A.subject, A.predicate, B.object FROM SC A INNER JOIN SC B ON A.object = B.subject")
138-
139-
// joined.explain()
140-
// var joined = df1.join(df2, df1("object") === df2("subject"), "inner")
141-
// println("JOINED:\n" + joined.collect().mkString("\n"))
142-
// joined = joined.select(df2(s"df1.$col1"), df1(s"df1.$col2"))
143-
// println(joined.collect().mkString("\n"))
144-
145-
tc = tc
146-
.union(joined)
147-
.distinct()
148-
.cache()
149-
nextCount = tc.count()
150-
i += 1
151-
} while (nextCount != oldCount)
152-
153-
tc.sqlContext.uncacheTable("SC")
154-
log.info("TC has " + nextCount + " edges.")
155-
tc
156-
}
157-
}
158-
15928
/**
16029
* Extracts all triples for the given predicate.
16130
*

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/forwardchaining/ForwardRuleReasonerRDFS.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ class ForwardRuleReasonerRDFS(sc: SparkContext) extends ForwardRuleReasoner{
3434
* yyy rdfs:subClassOf zzz . xxx rdfs:subClassOf zzz .
3535
*/
3636
val subClassOfTriples = extractTriples(triplesRDD, RDFS.subClassOf.getURI) // extract rdfs:subClassOf triples
37-
val subClassOfTriplesTrans = computeTransitiveClosure(subClassOfTriples).setName("rdfs11")//mutable.Set()++subClassOfTriples.collect())
37+
val subClassOfTriplesTrans = computeTransitiveClosure(subClassOfTriples, RDFS.subClassOf.getURI).setName("rdfs11")//mutable.Set()++subClassOfTriples.collect())
3838

3939
/*
4040
rdfs5 xxx rdfs:subPropertyOf yyy .
4141
yyy rdfs:subPropertyOf zzz . xxx rdfs:subPropertyOf zzz .
4242
*/
4343
val subPropertyOfTriples = extractTriples(triplesRDD, RDFS.subPropertyOf.getURI) // extract rdfs:subPropertyOf triples
44-
val subPropertyOfTriplesTrans = computeTransitiveClosure(subPropertyOfTriples).setName("rdfs5")//extractTriples(mutable.Set()++subPropertyOfTriples.collect(), RDFS.subPropertyOf.getURI))
44+
val subPropertyOfTriplesTrans = computeTransitiveClosure(subPropertyOfTriples, RDFS.subPropertyOf.getURI).setName("rdfs5")//extractTriples(mutable.Set()++subPropertyOfTriples.collect(), RDFS.subPropertyOf.getURI))
4545

4646
// a map structure should be more efficient
4747
val subClassOfMap = CollectionUtils.toMultiMap(subClassOfTriplesTrans.map(t => (t.subject, t.`object`)).collect)
@@ -99,31 +99,33 @@ class ForwardRuleReasonerRDFS(sc: SparkContext) extends ForwardRuleReasoner{
9999
.map(t => RDFTriple(t.`object`, RDF.`type`.getURI, rangeMapBC.value(t.predicate)))
100100
.setName("rdfs3")
101101

102+
// extract the rdf:type triples
103+
var typeTriples = triplesRDD
104+
.filter(t => t.predicate == RDF.`type`.getURI)
105+
106+
// rdfs2 and rdfs3 generated rdf:type triples which we'll add to the existing ones
102107
val triples23 = triplesRDFS2.union(triplesRDFS3)
103108

104-
// get rdf:type tuples here as intermediate result
105-
val typeTriples = triplesRDD
106-
.filter(t => t.predicate == RDF.`type`.getURI)
107-
.union(triples23)
109+
// all rdf:type triples here as intermediate result
110+
typeTriples = typeTriples.union(triples23)
108111

109-
// 4. SubClass inheritance according to rdfs9
110112

113+
// 4. SubClass inheritance according to rdfs9
111114
/*
112115
rdfs9 xxx rdfs:subClassOf yyy .
113116
zzz rdf:type xxx . zzz rdf:type yyy .
114117
*/
115118
val triplesRDFS9 =
116119
typeTriples // all rdf:type triples (s a A)
117120
.filter(t => subClassOfMapBC.value.contains(t.`object`)) // such that A has a super class B
118-
.flatMap(t => subClassOfMapBC.value(t.`object`)
119-
.map(supCls => RDFTriple(t.subject, RDF.`type`.getURI, supCls))) // create triple (s a B)
121+
.flatMap(t => subClassOfMapBC.value(t.`object`).map(supCls => RDFTriple(t.subject, RDF.`type`.getURI, supCls))) // create triple (s a B)
120122
.setName("rdfs9")
121123

122124
// 5. merge triples and remove duplicates
123125
val allTriples = sc.union(Seq(
124126
subClassOfTriplesTrans,
125127
subPropertyOfTriplesTrans,
126-
triples23,
128+
typeTriples,
127129
triplesRDFS7,
128130
triplesRDFS9))
129131
.distinct()
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package net.sansa_stack.inference.spark.forwardchaining
2+
3+
import net.sansa_stack.inference.data.RDFTriple
4+
import net.sansa_stack.inference.utils.Profiler
5+
import org.apache.spark.rdd.RDD
6+
import org.apache.spark.sql.DataFrame
7+
8+
import scala.collection.mutable
9+
import scala.reflect.ClassTag
10+
11+
/**
12+
* An engine to compute the transitive closure (TC) for a set of triples given in several datastructures.
13+
*
14+
* @author Lorenz Buehmann
15+
*/
16+
trait TransitiveReasoner extends Profiler{
17+
18+
// def computeTransitiveClosure[A, B, C](s: mutable.Set[(A, B, C)]): mutable.Set[(A, B, C)] = {
19+
// val t = addTransitive(s)
20+
// // recursive call if set changed, otherwise stop and return
21+
// if (t.size == s.size) s else computeTransitiveClosure(t)
22+
// }
23+
24+
/**
25+
* Computes the transitive closure on a set of triples, i.e. it is computed in-memory by the driver.
26+
* Note, that the assumption is that all triples do have the same predicate.
27+
*
28+
* @param triples the set of triples
29+
* @return a set containing the transitive closure of the triples
30+
*/
31+
def computeTransitiveClosure(triples: mutable.Set[RDFTriple]): mutable.Set[RDFTriple] = {
32+
val tc = addTransitive(triples)
33+
// recursive call if set changed, otherwise stop and return
34+
if (tc.size == triples.size) triples else computeTransitiveClosure(tc)
35+
}
36+
37+
// def addTransitive[A, B, C](s: mutable.Set[(A, B, C)]) = {
38+
// s ++ (for ((s1, p1, o1) <- s; (s2, p2, o2) <- s if o1 == s2) yield (s1, p1, o2))
39+
// }
40+
41+
private def addTransitive(triples: mutable.Set[RDFTriple]) = {
42+
triples ++ (for (t1 <- triples; t2 <- triples if t1.`object` == t2.subject) yield RDFTriple(t1.subject, t1.predicate, t2.`object`))
43+
}
44+
45+
/**
46+
* Computes the transitive closure on an RDD of triples.
47+
* Note, that the assumption is that all triples do have the same predicate.
48+
*
49+
* @param triples the RDD of triples
50+
* @return an RDD containing the transitive closure of the triples
51+
*/
52+
def computeTransitiveClosure(triples: RDD[RDFTriple]): RDD[RDFTriple] = {
53+
// get the predicate
54+
val predicate = triples.take(1)(0).predicate
55+
56+
// compute TC
57+
computeTransitiveClosure(triples, predicate)
58+
}
59+
60+
/**
61+
* Computes the transitive closure for the given predicate on an RDD of triples.
62+
*
63+
* @param triples the RDD of triples
64+
* @param predicate the predicate
65+
* @return an RDD containing the transitive closure of the triples
66+
*/
67+
def computeTransitiveClosure(triples: RDD[RDFTriple], predicate: String): RDD[RDFTriple] = {
68+
if(triples.isEmpty()) return triples
69+
log.info(s"computing TC for property $predicate...")
70+
71+
profile {
72+
// compute the TC
73+
var subjectObjectPairs = triples.map(t => (t.subject, t.`object`)).cache()
74+
75+
// because join() joins on keys, in addition the pairs are stored in reversed order (o, s)
76+
val objectSubjectPairs = subjectObjectPairs.map(t => (t._2, t._1))
77+
78+
// the join is iterated until a fixed point is reached
79+
var i = 1
80+
var oldCount = 0L
81+
var nextCount = triples.count()
82+
do {
83+
log.info(s"iteration $i...")
84+
oldCount = nextCount
85+
// perform the join (s1, o1) x (o2, s2), obtaining an RDD of (s1=o2, (o1, s2)) pairs,
86+
// then project the result to obtain the new (s2, o1) paths.
87+
subjectObjectPairs = subjectObjectPairs
88+
.union(subjectObjectPairs.join(objectSubjectPairs).map(x => (x._2._2, x._2._1)))
89+
.filter(tuple => tuple._1 != tuple._2) // omit (s1, s1)
90+
.distinct()
91+
.cache()
92+
nextCount = subjectObjectPairs.count()
93+
i += 1
94+
} while (nextCount != oldCount)
95+
96+
log.info(s"TC for $predicate has " + nextCount + " triples.")
97+
subjectObjectPairs.map(p => new RDFTriple(p._1, predicate, p._2))
98+
}
99+
}
100+
101+
/**
102+
* Computes the transitive closure for an RDD of tuples
103+
*
104+
* @param edges the RDD of triples
105+
* @return an RDD containing the transitive closure of the tuples
106+
*/
107+
def computeTransitiveClosure[A:ClassTag](edges: RDD[(A, A)]): RDD[(A, A)] = {
108+
log.info("computing TC...")
109+
// we keep the transitive closure cached
110+
var tc = edges
111+
tc.cache()
112+
113+
// because join() joins on keys, in addition the pairs are stored in reversed order (o, s)
114+
val edgesReversed = tc.map(t => (t._2, t._1))
115+
116+
// the join is iterated until a fixed point is reached
117+
var i = 1
118+
var oldCount = 0L
119+
var nextCount = tc.count()
120+
do {
121+
log.info(s"iteration $i...")
122+
oldCount = nextCount
123+
// perform the join (x, y) x (y, x), obtaining an RDD of (x=y, (y, x)) pairs,
124+
// then project the result to obtain the new (x, y) paths.
125+
tc = tc
126+
.union(tc.join(edgesReversed).map(x => (x._2._2, x._2._1)))
127+
.distinct()
128+
.cache()
129+
nextCount = tc.count()
130+
i += 1
131+
} while (nextCount != oldCount)
132+
133+
println("TC has " + nextCount + " edges.")
134+
tc
135+
}
136+
137+
/**
138+
* Computes the transitive closure for a Dataframe of triples
139+
*
140+
* @param edges the Dataframe of triples
141+
* @return a Dataframe containing the transitive closure of the triples
142+
*/
143+
def computeTransitiveClosure[A:ClassTag](edges: DataFrame): DataFrame = {
144+
log.info("computing TC...")
145+
146+
profile {
147+
// we keep the transitive closure cached
148+
var tc = edges
149+
tc.cache()
150+
151+
// the join is iterated until a fixed point is reached
152+
var i = 1
153+
var oldCount = 0L
154+
var nextCount = tc.count()
155+
do {
156+
log.info(s"iteration $i...")
157+
oldCount = nextCount
158+
159+
// val df1 = tc.alias("df1")
160+
// val df2 = tc.alias("df2")
161+
// perform the join (x, y) x (y, x), obtaining an RDD of (x=y, (y, x)) pairs,
162+
// then project the result to obtain the new (x, y) paths.
163+
164+
tc.createOrReplaceTempView("SC")
165+
var joined = tc.sqlContext.sql("SELECT A.subject, A.predicate, B.object FROM SC A INNER JOIN SC B ON A.object = B.subject")
166+
167+
// joined.explain()
168+
// var joined = df1.join(df2, df1("object") === df2("subject"), "inner")
169+
// println("JOINED:\n" + joined.collect().mkString("\n"))
170+
// joined = joined.select(df2(s"df1.$col1"), df1(s"df1.$col2"))
171+
// println(joined.collect().mkString("\n"))
172+
173+
tc = tc
174+
.union(joined)
175+
.distinct()
176+
.cache()
177+
nextCount = tc.count()
178+
i += 1
179+
} while (nextCount != oldCount)
180+
181+
tc.sqlContext.uncacheTable("SC")
182+
log.info("TC has " + nextCount + " edges.")
183+
tc
184+
}
185+
}
186+
187+
}

0 commit comments

Comments
 (0)