Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 272f0e7

Browse files
Flink reasoning with Jena datastructures.
1 parent 7d231ec commit 272f0e7

9 files changed

Lines changed: 253 additions & 240 deletions

File tree

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/data/RDFGraph.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package net.sansa_stack.inference.flink.data
22

3-
import net.sansa_stack.inference.flink.utils.DataSetUtils
43
import org.apache.flink.api.scala.{DataSet, _}
5-
import org.apache.jena.graph.Triple
6-
import net.sansa_stack.inference.data.RDFTriple
4+
import org.apache.jena.graph.{Node, Triple}
5+
76
import net.sansa_stack.inference.flink.utils.DataSetUtils.DataSetOps
87

98
/**
@@ -12,7 +11,7 @@ import net.sansa_stack.inference.flink.utils.DataSetUtils.DataSetOps
1211
* @author Lorenz Buehmann
1312
*
1413
*/
15-
case class RDFGraph(triples: DataSet[RDFTriple]) {
14+
case class RDFGraph(triples: DataSet[Triple]) {
1615

1716
/**
1817
* Returns a DataSet of triples that match with the given input.
@@ -22,11 +21,11 @@ case class RDFGraph(triples: DataSet[RDFTriple]) {
2221
* @param o the object
2322
* @return DataSet of triples
2423
*/
25-
def find(s: Option[String] = None, p: Option[String] = None, o: Option[String] = None): DataSet[RDFTriple] = {
24+
def find(s: Option[Node] = None, p: Option[Node] = None, o: Option[Node] = None): DataSet[Triple] = {
2625
triples.filter(t =>
27-
(s == None || t.s == s.get) &&
28-
(p == None || t.p == p.get) &&
29-
(o == None || t.o == o.get)
26+
(s.isEmpty || t.subjectMatches(s.get)) &&
27+
(p.isEmpty || t.predicateMatches(p.get)) &&
28+
(o.isEmpty || t.objectMatches(o.get))
3029
)
3130
}
3231

@@ -35,11 +34,11 @@ case class RDFGraph(triples: DataSet[RDFTriple]) {
3534
*
3635
* @return DataSet of triples
3736
*/
38-
def find(triple: Triple): DataSet[RDFTriple] = {
37+
def find(triple: Triple): DataSet[Triple] = {
3938
find(
40-
if (triple.getSubject.isVariable) None else Option(triple.getSubject.toString),
41-
if (triple.getPredicate.isVariable) None else Option(triple.getPredicate.toString),
42-
if (triple.getObject.isVariable) None else Option(triple.getObject.toString)
39+
if (triple.getSubject.isVariable) None else Option(triple.getSubject),
40+
if (triple.getPredicate.isVariable) None else Option(triple.getPredicate),
41+
if (triple.getObject.isVariable) None else Option(triple.getObject)
4342
)
4443
}
4544

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/data/RDFGraphLoader.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import net.sansa_stack.inference.data.RDFTriple
99
import org.apache.flink.configuration.Configuration
1010
import scala.language.implicitConversions
1111

12+
import org.apache.jena.rdf.model.impl.NTripleReader
13+
1214
import net.sansa_stack.inference.utils.NTriplesStringToRDFTriple
1315

1416
/**
@@ -33,6 +35,7 @@ object RDFGraphLoader {
3335
// set the recursive enumeration parameter
3436
parameters.setBoolean("recursive.file.enumeration", true)
3537

38+
3639
// pass the configuration to the data source
3740
val triples = env.readTextFile(path.toString).withParameters(parameters)
3841
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
@@ -48,7 +51,8 @@ object RDFGraphLoader {
4851

4952
val converter = new NTriplesStringToRDFTriple()
5053

51-
val triples = tmp.map(f => env.readTextFile(f).flatMap(line => converter.apply(line))).reduce(_ union _).name("triples")
54+
val triples = tmp
55+
.map(f => env.readTextFile(f).flatMap(line => converter.apply(line))).reduce(_ union _).name("triples")
5256

5357
RDFGraph(triples)
5458
}

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/extraction/OWLHorstSchemaExtractor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ class OWLHorstSchemaExtractor()
4242
OWL2.allValuesFrom,
4343
OWL2.hasValue,
4444
OWL2.onProperty
45-
).map(p => p.getURI)
45+
).map(p => p.asNode())
4646
)(
4747
Set(
4848
OWL2.TransitiveProperty,
4949
OWL2.FunctionalProperty,
5050
OWL2.InverseFunctionalProperty,
5151
OWL2.SymmetricProperty
52-
).map(p => p.getURI)
52+
).map(p => p.asNode())
5353
) {}

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/extraction/RDFSSchemaExtractor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import org.apache.jena.vocabulary.RDFS
1515
* @author Lorenz Buehmann
1616
*/
1717
class RDFSSchemaExtractor()
18-
extends SchemaExtractor()(Set(RDFS.subClassOf, RDFS.subPropertyOf, RDFS.domain, RDFS.range).map(p => p.getURI))() {}
18+
extends SchemaExtractor()(Set(RDFS.subClassOf, RDFS.subPropertyOf, RDFS.domain, RDFS.range).map(p => p.asNode()))() {}
1919

2020
object RDFSSchemaExtractor {
2121
def apply: RDFSSchemaExtractor = new RDFSSchemaExtractor()

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/extraction/SchemaExtractor.scala

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,41 @@
11
package net.sansa_stack.inference.flink.extraction
22

33
import org.apache.flink.api.scala.DataSet
4-
import org.apache.jena.vocabulary.RDFS
4+
import org.apache.jena.graph.{Node, Triple}
55

6-
import net.sansa_stack.inference.data.RDFTriple
76
import net.sansa_stack.inference.flink.data.RDFGraph
87
import net.sansa_stack.inference.utils.Logging
98

109
/**
1110
* @author Lorenz Buehmann
1211
*/
1312
abstract class SchemaExtractor
14-
(subjects: Set[String] = Set())
15-
(predicates: Set[String] = Set())
16-
(objects: Set[String] = Set())
13+
(subjects: Set[Node] = Set())
14+
(predicates: Set[Node] = Set())
15+
(objects: Set[Node] = Set())
1716
extends Logging with Serializable{
1817

19-
val subjectsFilter: ((RDFTriple) => Boolean) = t => subjects.contains(t.s)
20-
val predicatesFilter: ((RDFTriple) => Boolean) = t => predicates.contains(t.p)
21-
val objectsFilter: ((RDFTriple) => Boolean) = t => objects.contains(t.o)
18+
val subjectsFilter: ((Triple) => Boolean) = t => subjects.contains(t.getSubject)
19+
val predicatesFilter: ((Triple) => Boolean) = t => predicates.contains(t.getPredicate)
20+
val objectsFilter: ((Triple) => Boolean) = t => objects.contains(t.getObject)
2221

23-
private def or(ps: (RDFTriple => Boolean)*) = (a: RDFTriple) => ps.exists(_(a))
22+
private def or(ps: (Triple => Boolean)*) = (a: Triple) => ps.exists(_(a))
2423

2524
/**
2625
* Extract a graph that contains only the schema triples.
2726
*
2827
* @param graph the graph
2928
* @return a graph containing only the schema triples
3029
*/
31-
def extract(graph: RDFGraph): RDFGraph =
32-
new RDFGraph(extract(graph.triples))
30+
def extract(graph: RDFGraph): RDFGraph = RDFGraph(extract(graph.triples))
3331

3432
/**
3533
* Extract a DataSet that contains only the schema triples.
3634
*
3735
* @param triples the triples
3836
* @return the schema triples
3937
*/
40-
def extract(triples: DataSet[RDFTriple]): DataSet[RDFTriple] =
38+
def extract(triples: DataSet[Triple]): DataSet[Triple] =
4139
triples
4240
.filter(or(subjectsFilter, predicatesFilter, objectsFilter))
4341
.name("schema-triples")

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/forwardchaining/ForwardRuleReasoner.scala

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package net.sansa_stack.inference.flink.forwardchaining
22

3-
import net.sansa_stack.inference.data.RDFTriple
43
import net.sansa_stack.inference.flink.data.RDFGraph
54
import org.apache.flink.api.scala.DataSet
6-
75
import scala.collection.mutable
86

7+
import org.apache.jena.graph.{Node, Triple}
8+
99
/**
1010
* A forward chaining based reasoner.
1111
*
@@ -22,15 +22,24 @@ trait ForwardRuleReasoner extends TransitiveReasoner{
2222
*/
2323
def apply(graph: RDFGraph) : RDFGraph
2424

25+
/**
26+
* Applies forward chaining to the given set of RDF triples and returns a new set of RDF triples that
27+
* contains all additional triples based on the underlying set of rules.
28+
*
29+
* @param triples the RDF triples
30+
* @return the materialized RDF triples
31+
*/
32+
def apply(triples: DataSet[Triple]) : DataSet[Triple] = apply(RDFGraph(triples)).triples
33+
2534
/**
2635
* Extracts all triples for the given predicate.
2736
*
2837
* @param triples the triples
2938
* @param predicate the predicate
3039
* @return the set of triples that contain the predicate
3140
*/
32-
def extractTriples(triples: mutable.Set[RDFTriple], predicate: String): mutable.Set[RDFTriple] = {
33-
triples.filter(triple => triple.p == predicate)
41+
def extractTriples(triples: mutable.Set[Triple], predicate: Node): mutable.Set[Triple] = {
42+
triples.filter(triple => triple.predicateMatches(predicate))
3443
}
3544

3645
/**
@@ -40,8 +49,8 @@ trait ForwardRuleReasoner extends TransitiveReasoner{
4049
* @param predicate the predicate
4150
* @return the DataSet of triples that contain the predicate
4251
*/
43-
def extractTriples(triples: DataSet[RDFTriple], predicate: String): DataSet[RDFTriple] = {
44-
triples.filter(triple => triple.p == predicate)
52+
def extractTriples(triples: DataSet[Triple], predicate: Node): DataSet[Triple] = {
53+
triples.filter(triple => triple.predicateMatches(predicate))
4554
}
4655

4756
/**
@@ -53,22 +62,22 @@ trait ForwardRuleReasoner extends TransitiveReasoner{
5362
* @param obj the object
5463
* @return the DataSet of triples that match
5564
*/
56-
def extractTriples(triples: DataSet[RDFTriple],
57-
subject: Option[String],
58-
predicate: Option[String],
59-
obj: Option[String]): DataSet[RDFTriple] = {
65+
def extractTriples(triples: DataSet[Triple],
66+
subject: Option[Node],
67+
predicate: Option[Node],
68+
obj: Option[Node]): DataSet[Triple] = {
6069
var extractedTriples = triples
6170

6271
if(subject.isDefined) {
63-
extractedTriples = extractedTriples.filter(triple => triple.s == subject.get)
72+
extractedTriples = extractedTriples.filter(triple => triple.subjectMatches(subject.get))
6473
}
6574

6675
if(predicate.isDefined) {
67-
extractedTriples = extractedTriples.filter(triple => triple.p == predicate.get)
76+
extractedTriples = extractedTriples.filter(triple => triple.predicateMatches(predicate.get))
6877
}
6978

7079
if(obj.isDefined) {
71-
extractedTriples = extractedTriples.filter(triple => triple.o == obj.get)
80+
extractedTriples = extractedTriples.filter(triple => triple.objectMatches(obj.get))
7281
}
7382

7483
extractedTriples

0 commit comments

Comments
 (0)