Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit c9d9177

Browse files
committed
Implement intersection and difference on RDFGraph model
1 parent bb1a149 commit c9d9177

1 file changed

Lines changed: 51 additions & 39 deletions

File tree

  • sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/model

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/model/RDFGraph.scala

Lines changed: 51 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,90 @@
11
package net.sansa_stack.inference.spark.data.model
22

3-
import org.apache.jena.graph.{Node, Triple}
3+
import org.apache.jena.graph.{ Node, Triple }
44
import org.apache.spark.rdd.RDD
5-
import org.apache.spark.sql.types.{StringType, StructField, StructType}
6-
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
5+
import org.apache.spark.sql.types.{ StringType, StructField, StructType }
6+
import org.apache.spark.sql.{ DataFrame, Row, SQLContext }
77

88
import net.sansa_stack.inference.spark.data.model.TripleUtils._
99

10-
11-
1210
/**
13-
* A data structure that comprises a set of triples.
14-
*
15-
* @author Lorenz Buehmann
16-
*
17-
*/
18-
case class RDFGraph (triples: RDD[Triple]) {
11+
* A data structure that comprises a set of triples.
12+
*
13+
* @author Lorenz Buehmann
14+
*
15+
*/
16+
case class RDFGraph(triples: RDD[Triple]) {
1917

2018
/**
21-
* Returns an RDD of triples that match with the given input.
22-
*
23-
* @param s the subject
24-
* @param p the predicate
25-
* @param o the object
26-
* @return RDD of triples
27-
*/
19+
* Returns an RDD of triples that match with the given input.
20+
*
21+
* @param s the subject
22+
* @param p the predicate
23+
* @param o the object
24+
* @return RDD of triples
25+
*/
2826
def find(s: Option[Node] = None, p: Option[Node] = None, o: Option[Node] = None): RDD[Triple] = {
29-
triples.filter(t =>
30-
(s == None || t.getSubject == s.get) &&
31-
(p == None || t.getPredicate == p.get) &&
32-
(o == None || t.getObject == o.get)
33-
)
27+
triples.filter(t =>
28+
(s == None || t.getSubject == s.get) &&
29+
(p == None || t.getPredicate == p.get) &&
30+
(o == None || t.getObject == o.get))
3431
}
3532

3633
/**
37-
* Returns an RDD of triples that match with the given input.
38-
*
39-
* @return RDD of triples
40-
*/
34+
* Returns an RDD of triples that match with the given input.
35+
*
36+
* @return RDD of triples
37+
*/
4138
def find(triple: Triple): RDD[Triple] = {
4239
find(
4340
if (triple.getSubject.isVariable) None else Option(triple.getSubject),
4441
if (triple.getPredicate.isVariable) None else Option(triple.getPredicate),
45-
if (triple.getObject.isVariable) None else Option(triple.getObject)
46-
)
42+
if (triple.getObject.isVariable) None else Option(triple.getObject))
4743
}
4844

4945
/**
50-
* Return the union of the current RDF graph with the given RDF graph
51-
* @param graph the other RDF graph
52-
* @return the union of both graphs
53-
*/
46+
* Return the union of the current RDF graph with the given RDF graph
47+
* @param graph the other RDF graph
48+
* @return the union of both graphs
49+
*/
5450
def union(graph: RDFGraph): RDFGraph = {
5551
RDFGraph(triples.union(graph.triples))
5652
}
5753

5854
/**
59-
* Persist the triples RDD with the default storage level (`MEMORY_ONLY`).
55+
* Returns a new RDF graph that contains the intersection of the current RDF graph with the given RDF graph.
56+
*
57+
* @param graph the other RDF graph
58+
* @return the intersection of both RDF graphs
59+
*/
60+
def intersection(graph: RDFGraph): RDFGraph =
61+
new RDFGraph(this.triples.intersection(graph.triples))
62+
63+
/**
64+
* Returns a new RDF graph that contains the difference between the current RDF graph and the given RDF graph.
65+
*
66+
* @param graph the other RDF graph
67+
* @return the difference of both RDF graphs
6068
*/
69+
def difference(graph: RDFGraph): RDFGraph =
70+
new RDFGraph(this.triples.subtract(graph.triples))
71+
72+
/**
73+
* Persist the triples RDD with the default storage level (`MEMORY_ONLY`).
74+
*/
6175
def cache(): RDFGraph = {
6276
triples.cache()
6377
this
6478
}
6579

6680
/**
67-
* Return the number of triples.
68-
* @return the number of triples
69-
*/
81+
* Return the number of triples.
82+
* @return the number of triples
83+
*/
7084
def size(): Long = {
7185
triples.count()
7286
}
7387

74-
75-
7688
def toDataFrame(sqlContext: SQLContext): DataFrame = {
7789
// convert RDD to DataFrame
7890
val schemaString = "subject predicate object"

0 commit comments

Comments
 (0)