Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit ae1674f

Browse files
Separate RDFS schema extractor
1 parent 95a560e commit ae1674f

3 files changed

Lines changed: 10 additions & 8 deletions

File tree

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/forwardchaining/ForwardRuleReasonerRDFS.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class ForwardRuleReasonerRDFS(sc: SparkContext, parallelism: Int = 2) extends Tr
3939

4040
// as an optimization, we can extract all schema triples first which avoids to run on the whole dataset
4141
// for each schema triple later
42-
val schemaTriples = if (extractSchemaTriplesInAdvance) new RDFSSchemaExtractor(sc).extract(triplesRDD)
42+
val schemaTriples = if (extractSchemaTriplesInAdvance) new RDFSSchemaExtractor().extract(triplesRDD)
4343
else triplesRDD
4444

4545

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/forwardchaining/ForwardRuleReasonerRDFSDataframe.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class ForwardRuleReasonerRDFSDataframe(session: SparkSession, parallelism: Int =
3434

3535
val sqlSchema = graph.schema
3636

37-
val extractor = new RDFSSchemaExtractor(session.sparkContext)
37+
val extractor = new RDFSSchemaExtractor()
3838

3939
var index = extractor.extractWithIndex(graph)
4040

@@ -145,7 +145,7 @@ class ForwardRuleReasonerRDFSDataframe(session: SparkSession, parallelism: Int =
145145

146146
// get rdf:type tuples here as intermediate result
147147
val typeTuples = triples
148-
.where(s"${sqlSchema.predicateCol} = '${RDF.`type`.getURI} '")
148+
.where(s"${sqlSchema.predicateCol} = '${RDF.`type`.getURI}'")
149149
.select(sqlSchema.subjectCol, sqlSchema.objectCol)
150150
.union(tuples23)
151151
.alias("TYPES")
@@ -156,7 +156,7 @@ class ForwardRuleReasonerRDFSDataframe(session: SparkSession, parallelism: Int =
156156
rdfs9 xxx rdfs:subClassOf yyy .
157157
zzz rdf:type xxx . zzz rdf:type yyy .
158158
*/
159-
val tripleRDFS9 = typeTuples
159+
val tuplesRDFS9 = typeTuples
160160
.join(subClassOfTriplesTrans, $"TYPES.${sqlSchema.objectCol}" === $"SC.${sqlSchema.subjectCol}", "inner")
161161
.select($"TYPES.${sqlSchema.subjectCol}", $"SC.${sqlSchema.objectCol}") // (zzz, yyy)
162162

@@ -184,22 +184,24 @@ class ForwardRuleReasonerRDFSDataframe(session: SparkSession, parallelism: Int =
184184
// .withColumn("const", lit(RDF.`type`.getURI))
185185
// .select("DATA.subject", "const", "SC.object")
186186
// .select($"TYPES.subject", $"SC.object") // (zzz, yyy)
187+
// println("existing types:" + existingTypes.count())
187188
// println("SC:" + subClassOfTriplesTrans.count())
188189
// println("SP:" + subPropertyOfTriplesTrans.count())
189190
// println("TYPES:" + typeTuples.count())
190191
// println("R7:" + triplesRDFS7.count())
191192
// println("R2:" + triplesRDFS2.count())
192193
// println("R3:" + triplesRDFS3.count())
193-
// println("R9:" + triplesRDFS9.count())
194+
// println("R9:" + tuplesRDFS9.count())
194195

195196
// 5. merge triples and remove duplicates
196197
val allTriples =
197-
tuples23.union(tripleRDFS9)
198+
typeTuples.union(tuples23).union(tuplesRDFS9)
198199
.withColumn("const", lit(RDF.`type`.getURI))
199200
.select(sqlSchema.subjectCol, "const", sqlSchema.objectCol)
200201
.union(subClassOfTriplesTrans)
201202
.union(subPropertyOfTriplesTrans)
202203
.union(triplesRDFS7)
204+
.union(triples)
203205
.distinct()
204206
// .selectExpr("subject", "'" + RDF.`type`.getURI + "' as predicate", "object")
205207
// allTriples.explain()

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/utils/RDFSSchemaExtractor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import net.sansa_stack.inference.utils.{CollectionUtils, Logging}
2222
*
2323
* @author Lorenz Buehmann
2424
*/
25-
class RDFSSchemaExtractor(sc : SparkContext) extends Logging{
25+
class RDFSSchemaExtractor() extends Logging with Serializable {
2626

2727
val properties = Set(RDFS.subClassOf, RDFS.subPropertyOf, RDFS.domain, RDFS.range).map(p => p.getURI)
2828

@@ -121,7 +121,7 @@ class RDFSSchemaExtractor(sc : SparkContext) extends Logging{
121121
* @return a mapping from the corresponding schema property to the broadcast variable that wraps the multimap
122122
* with s-o pairs
123123
*/
124-
def extractWithIndexAndDistribute(graph: RDFGraphNative): Map[String, Broadcast[Map[String, Set[String]]]] = {
124+
def extractWithIndexAndDistribute(sc : SparkContext, graph: RDFGraphNative): Map[String, Broadcast[Map[String, Set[String]]]] = {
125125
val schema = extractWithIndex(graph)
126126

127127
log.info("Started schema distribution...")

0 commit comments

Comments
 (0)