Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 50cf3b4

Browse files
Split RDD.
1 parent ef512d1 commit 50cf3b4

1 file changed

Lines changed: 14 additions & 10 deletions

File tree

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/forwardchaining/ForwardRuleReasonerRDFS.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import org.apache.jena.vocabulary.{RDF, RDFS}
44
import org.apache.spark.SparkContext
55
import net.sansa_stack.inference.data.RDFTriple
66
import net.sansa_stack.inference.spark.data.RDFGraph
7+
import net.sansa_stack.inference.spark.utils.RDDUtils
78
import net.sansa_stack.inference.utils.CollectionUtils
89
import org.slf4j.LoggerFactory
910

@@ -52,6 +53,12 @@ class ForwardRuleReasonerRDFS(sc: SparkContext) extends ForwardRuleReasoner{
5253
val subClassOfMapBC = sc.broadcast(subClassOfMap)
5354
val subPropertyMapBC = sc.broadcast(subPropertyMap)
5455

56+
import net.sansa_stack.inference.spark.utils.RDDUtils.RDDOps
57+
58+
// split by rdf:type
59+
val split = triplesRDD.partitionBy(t => t.predicate == RDF.`type`.getURI)
60+
var typeTriples = split._1
61+
var otherTriples = split._2
5562

5663
// 2. SubPropertyOf inheritance according to rdfs7 is computed
5764

@@ -60,14 +67,14 @@ class ForwardRuleReasonerRDFS(sc: SparkContext) extends ForwardRuleReasoner{
6067
xxx aaa yyy . xxx bbb yyy .
6168
*/
6269
val triplesRDFS7 =
63-
triplesRDD // all triples (s p1 o)
70+
otherTriples // all triples (s p1 o)
6471
.filter(t => subPropertyMapBC.value.contains(t.predicate)) // such that p1 has a super property p2
6572
.flatMap(t => subPropertyMapBC.value(t.predicate)
6673
.map(supProp => RDFTriple(t.subject, supProp, t.`object`))) // create triple (s p2 o)
6774
.setName("rdfs7")
6875

6976
// add triples
70-
triplesRDD = triplesRDD.union(triplesRDFS7)
77+
otherTriples = otherTriples.union(triplesRDFS7)
7178

7279
// 3. Domain and Range inheritance according to rdfs2 and rdfs3 is computed
7380

@@ -80,7 +87,7 @@ class ForwardRuleReasonerRDFS(sc: SparkContext) extends ForwardRuleReasoner{
8087
val domainMapBC = sc.broadcast(domainMap)
8188

8289
val triplesRDFS2 =
83-
triplesRDD
90+
otherTriples
8491
.filter(t => domainMapBC.value.contains(t.predicate))
8592
.map(t => RDFTriple(t.subject, RDF.`type`.getURI, domainMapBC.value(t.predicate)))
8693
.setName("rdfs2")
@@ -94,15 +101,11 @@ class ForwardRuleReasonerRDFS(sc: SparkContext) extends ForwardRuleReasoner{
94101
val rangeMapBC = sc.broadcast(rangeMap)
95102

96103
val triplesRDFS3 =
97-
triplesRDD
104+
otherTriples
98105
.filter(t => rangeMapBC.value.contains(t.predicate))
99106
.map(t => RDFTriple(t.`object`, RDF.`type`.getURI, rangeMapBC.value(t.predicate)))
100107
.setName("rdfs3")
101108

102-
// extract the rdf:type triples
103-
var typeTriples = triplesRDD
104-
.filter(t => t.predicate == RDF.`type`.getURI)
105-
106109
// rdfs2 and rdfs3 generated rdf:type triples which we'll add to the existing ones
107110
val triples23 = triplesRDFS2.union(triplesRDFS3)
108111

@@ -123,6 +126,7 @@ class ForwardRuleReasonerRDFS(sc: SparkContext) extends ForwardRuleReasoner{
123126

124127
// 5. merge triples and remove duplicates
125128
val allTriples = sc.union(Seq(
129+
otherTriples,
126130
subClassOfTriplesTrans,
127131
subPropertyOfTriplesTrans,
128132
typeTriples,
@@ -131,8 +135,8 @@ class ForwardRuleReasonerRDFS(sc: SparkContext) extends ForwardRuleReasoner{
131135
.distinct()
132136

133137
logger.info("...finished materialization in " + (System.currentTimeMillis() - startTime) + "ms.")
134-
val newSize = allTriples.count()
135-
logger.info(s"|G_inf|=$newSize")
138+
// val newSize = allTriples.count()
139+
// logger.info(s"|G_inf|=$newSize")
136140

137141
// return graph with inferred triples
138142
new RDFGraph(allTriples)

0 commit comments

Comments
 (0)