|
1 | 1 | package net.sansa_stack.inference.spark.data.model |
2 | 2 |
|
3 | | -import org.apache.jena.graph.{Node, Triple} |
| 3 | +import org.apache.jena.graph.{ Node, Triple } |
4 | 4 | import org.apache.spark.rdd.RDD |
5 | | -import org.apache.spark.sql.types.{StringType, StructField, StructType} |
6 | | -import org.apache.spark.sql.{DataFrame, Row, SQLContext} |
| 5 | +import org.apache.spark.sql.types.{ StringType, StructField, StructType } |
| 6 | +import org.apache.spark.sql.{ DataFrame, Row, SQLContext } |
7 | 7 |
|
8 | 8 | import net.sansa_stack.inference.spark.data.model.TripleUtils._ |
9 | 9 |
|
10 | | - |
11 | | - |
12 | 10 | /** |
13 | | - * A data structure that comprises a set of triples. |
14 | | - * |
15 | | - * @author Lorenz Buehmann |
16 | | - * |
17 | | - */ |
18 | | -case class RDFGraph (triples: RDD[Triple]) { |
| 11 | + * A data structure that comprises a set of triples. |
| 12 | + * |
| 13 | + * @author Lorenz Buehmann |
| 14 | + * |
| 15 | + */ |
| 16 | +case class RDFGraph(triples: RDD[Triple]) { |
19 | 17 |
|
20 | 18 | /** |
21 | | - * Returns an RDD of triples that match with the given input. |
22 | | - * |
23 | | - * @param s the subject |
24 | | - * @param p the predicate |
25 | | - * @param o the object |
26 | | - * @return RDD of triples |
27 | | - */ |
| 19 | + * Returns an RDD of triples that match with the given input. |
| 20 | + * |
| 21 | + * @param s the subject |
| 22 | + * @param p the predicate |
| 23 | + * @param o the object |
| 24 | + * @return RDD of triples |
| 25 | + */ |
28 | 26 | def find(s: Option[Node] = None, p: Option[Node] = None, o: Option[Node] = None): RDD[Triple] = { |
29 | | - triples.filter(t => |
30 | | - (s == None || t.getSubject == s.get) && |
31 | | - (p == None || t.getPredicate == p.get) && |
32 | | - (o == None || t.getObject == o.get) |
33 | | - ) |
| 27 | + triples.filter(t => |
| 28 | + (s == None || t.getSubject == s.get) && |
| 29 | + (p == None || t.getPredicate == p.get) && |
| 30 | + (o == None || t.getObject == o.get)) |
34 | 31 | } |
35 | 32 |
|
36 | 33 | /** |
37 | | - * Returns an RDD of triples that match with the given input. |
38 | | - * |
39 | | - * @return RDD of triples |
40 | | - */ |
| 34 | + * Returns an RDD of triples that match with the given input. |
| 35 | + * |
| 36 | + * @return RDD of triples |
| 37 | + */ |
41 | 38 | def find(triple: Triple): RDD[Triple] = { |
42 | 39 | find( |
43 | 40 | if (triple.getSubject.isVariable) None else Option(triple.getSubject), |
44 | 41 | if (triple.getPredicate.isVariable) None else Option(triple.getPredicate), |
45 | | - if (triple.getObject.isVariable) None else Option(triple.getObject) |
46 | | - ) |
| 42 | + if (triple.getObject.isVariable) None else Option(triple.getObject)) |
47 | 43 | } |
48 | 44 |
|
49 | 45 | /** |
50 | | - * Return the union of the current RDF graph with the given RDF graph |
51 | | - * @param graph the other RDF graph |
52 | | - * @return the union of both graphs |
53 | | - */ |
| 46 | + * Return the union of the current RDF graph with the given RDF graph |
| 47 | + * @param graph the other RDF graph |
| 48 | + * @return the union of both graphs |
| 49 | + */ |
54 | 50 | def union(graph: RDFGraph): RDFGraph = { |
55 | 51 | RDFGraph(triples.union(graph.triples)) |
56 | 52 | } |
57 | 53 |
|
58 | 54 | /** |
59 | | - * Persist the triples RDD with the default storage level (`MEMORY_ONLY`). |
| 55 | + * Returns a new RDF graph that contains the intersection of the current RDF graph with the given RDF graph. |
| 56 | + * |
| 57 | + * @param graph the other RDF graph |
| 58 | + * @return the intersection of both RDF graphs |
| 59 | + */ |
| 60 | + def intersection(graph: RDFGraph): RDFGraph = |
| 61 | + new RDFGraph(this.triples.intersection(graph.triples)) |
| 62 | + |
| 63 | + /** |
| 64 | + * Returns a new RDF graph that contains the difference between the current RDF graph and the given RDF graph. |
| 65 | + * |
| 66 | + * @param graph the other RDF graph |
| 67 | + * @return the difference of both RDF graphs |
60 | 68 | */ |
| 69 | + def difference(graph: RDFGraph): RDFGraph = |
| 70 | + new RDFGraph(this.triples.subtract(graph.triples)) |
| 71 | + |
| 72 | + /** |
| 73 | + * Persist the triples RDD with the default storage level (`MEMORY_ONLY`). |
| 74 | + */ |
61 | 75 | def cache(): RDFGraph = { |
62 | 76 | triples.cache() |
63 | 77 | this |
64 | 78 | } |
65 | 79 |
|
66 | 80 | /** |
67 | | - * Return the number of triples. |
68 | | - * @return the number of triples |
69 | | - */ |
| 81 | + * Return the number of triples. |
| 82 | + * @return the number of triples |
| 83 | + */ |
70 | 84 | def size(): Long = { |
71 | 85 | triples.count() |
72 | 86 | } |
73 | 87 |
|
74 | | - |
75 | | - |
76 | 88 | def toDataFrame(sqlContext: SQLContext): DataFrame = { |
77 | 89 | // convert RDD to DataFrame |
78 | 90 | val schemaString = "subject predicate object" |
|
0 commit comments