Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 8a4d6d3

Browse files
Flink reasoning on Triple DataSet
1 parent af1ff94 commit 8a4d6d3

4 files changed

Lines changed: 270 additions & 121 deletions

File tree

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/forwardchaining/ForwardRuleReasoner.scala

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import org.apache.flink.api.scala.DataSet
55
import scala.collection.mutable
66

77
import org.apache.jena.graph.{Node, Triple}
8+
import org.apache.jena.shared.PrefixMapping
9+
import org.apache.jena.sparql.util.FmtUtils
810

911
/**
1012
* A forward chaining based reasoner.
@@ -50,7 +52,7 @@ trait ForwardRuleReasoner extends TransitiveReasoner{
5052
* @return the DataSet of triples that contain the predicate
5153
*/
5254
def extractTriples(triples: DataSet[Triple], predicate: Node): DataSet[Triple] = {
53-
triples.filter(triple => triple.predicateMatches(predicate))
55+
triples.filter(triple => triple.predicateMatches(predicate)).name(s"${FmtUtils.stringForNode(predicate)} triples")
5456
}
5557

5658
/**
@@ -66,21 +68,33 @@ trait ForwardRuleReasoner extends TransitiveReasoner{
6668
subject: Option[Node],
6769
predicate: Option[Node],
6870
obj: Option[Node]): DataSet[Triple] = {
69-
var extractedTriples = triples
70-
71-
if(subject.isDefined) {
72-
extractedTriples = extractedTriples.filter(triple => triple.subjectMatches(subject.get))
73-
}
74-
75-
if(predicate.isDefined) {
76-
extractedTriples = extractedTriples.filter(triple => triple.predicateMatches(predicate.get))
77-
}
71+
// import net.sansa_stack.inference.utils.PredicateUtils._
72+
// var extractedTriples = triples
73+
// var filter = (t: Triple) => true
74+
//
75+
// if(subject.isDefined) {
76+
// filter = filter || (_.subjectMatches(subject.get))
77+
//// extractedTriples = extractedTriples.filter(triple => triple.subjectMatches(subject.get))
78+
// }
79+
//
80+
// if(predicate.isDefined) {
81+
// filter = filter || (_.predicateMatches(predicate.get))
82+
//// extractedTriples = extractedTriples.filter(triple => triple.predicateMatches(predicate.get))
83+
// }
84+
//
85+
// if(obj.isDefined) {
86+
// filter = filter || (_.objectMatches(obj.get))
87+
//// extractedTriples = extractedTriples.filter(triple => triple.objectMatches(obj.get))
88+
// }
89+
//
90+
// extractedTriples.filter(filter)
7891

79-
if(obj.isDefined) {
80-
extractedTriples = extractedTriples.filter(triple => triple.objectMatches(obj.get))
81-
}
92+
val filterFct = (t: Triple) =>
93+
t.subjectMatches(subject.orNull) ||
94+
t.predicateMatches(predicate.orNull) ||
95+
t.objectMatches(obj.orNull)
8296

83-
extractedTriples
97+
triples.filter(filterFct)
8498
}
8599

86100
}

0 commit comments

Comments
 (0)