Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 1e250f2

Browse files
Made RDF SQL schema definition more flexible
1 parent 82bddbf commit 1e250f2

43 files changed

Lines changed: 250 additions & 160 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package net.sansa_stack.inference.data
2+
3+
/**
4+
* An RDF tuple `(s o)`, i.e. only subject and object are represented.
5+
*
6+
* @param s the subject
7+
* @param o the object
8+
* @author Lorenz Buehmann
9+
*/
10+
case class RDFTuple(s: String, o: String) extends Product2[String, String] {
11+
override def _1: String = s
12+
override def _2: String = o
13+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package net.sansa_stack.inference.spark.data.model
2+
3+
/**
4+
* The SQL schema used for RDF triples in a Dataframe.
5+
*
6+
* @param triplesTable the name of the triples table
7+
* @param subjectCol the name of the subject column
8+
* @param predicateCol the name of the predicate column
9+
* @param objectCol the name of the object column
10+
*
11+
* @author Lorenz Buehmann
12+
*/
13+
class SQLSchema(val triplesTable: String, val subjectCol: String, val predicateCol: String, val objectCol: String) {}
14+
15+
object SQLSchemaDefault extends SQLSchema("TRIPLES", "s", "p", "o") {}

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/RDFGraphMaterializer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import org.apache.spark.sql.SparkSession
88
import net.sansa_stack.inference.data.RDFTriple
99
import net.sansa_stack.inference.rules.ReasoningProfile._
1010
import net.sansa_stack.inference.rules.{RDFSLevel, ReasoningProfile}
11-
import net.sansa_stack.inference.spark.data.{RDFGraphLoader, RDFGraphWriter}
11+
import net.sansa_stack.inference.spark.data.loader.RDFGraphLoader
12+
import net.sansa_stack.inference.spark.data.writer.RDFGraphWriter
1213
import net.sansa_stack.inference.spark.forwardchaining.{ForwardRuleReasonerOWLHorst, ForwardRuleReasonerRDFS, ForwardRuleReasonerRDFSDataset, TransitiveReasoner}
1314

1415
/**

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/abstraction/TypeComputorDefault.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.apache.spark.rdd.RDD
99
import org.apache.spark.sql.{Dataset, SparkSession}
1010

1111
import net.sansa_stack.inference.data.RDFTriple
12-
import net.sansa_stack.inference.spark.data.RDFGraphLoader
12+
import net.sansa_stack.inference.spark.data.loader.RDFGraphLoader
1313

1414
/**
1515
* @author Lorenz Buehmann

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/SQLSchema.scala

Lines changed: 0 additions & 18 deletions
This file was deleted.

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/RDFGraphLoader.scala renamed to sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/loader/RDFGraphLoader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package net.sansa_stack.inference.spark.data
1+
package net.sansa_stack.inference.spark.data.loader
22

33
import java.net.URI
44

@@ -9,6 +9,7 @@ import org.apache.spark.sql.{Dataset, SparkSession}
99
import org.slf4j.LoggerFactory
1010

1111
import net.sansa_stack.inference.data.RDFTriple
12+
import net.sansa_stack.inference.spark.data.model.{RDFGraph, RDFGraphDataFrame, RDFGraphDataset, RDFGraphNative}
1213
import net.sansa_stack.inference.utils.NTriplesStringToRDFTriple
1314

1415
/**

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/AbstractRDFGraph.scala renamed to sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/model/AbstractRDFGraph.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
package net.sansa_stack.inference.spark.data
1+
package net.sansa_stack.inference.spark.data.model
22

33
import org.apache.jena.graph.Triple
44
import org.apache.spark.rdd.RDD
55
import org.apache.spark.sql.{DataFrame, SparkSession}
66

7-
import net.sansa_stack.inference.data.RDFTriple
7+
import net.sansa_stack.inference.data.{RDFTriple, SQLSchema, SQLSchemaDefault}
88

99
/**
1010
* A data structure that comprises a collection of triples. Note, due to the implementation of the Spark
@@ -71,7 +71,7 @@ abstract class AbstractRDFGraph[T, G <: AbstractRDFGraph[T, G]](val triples: T)
7171

7272

7373

74-
def toDataFrame(sparkSession: SparkSession = null): DataFrame
74+
def toDataFrame(sparkSession: SparkSession = null, schema: SQLSchema = SQLSchemaDefault): DataFrame
7575

7676
def toRDD(): RDD[RDFTriple]
7777

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/EmptyRDFGraphDataFrame.scala renamed to sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/model/EmptyRDFGraphDataFrame.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package net.sansa_stack.inference.spark.data
1+
package net.sansa_stack.inference.spark.data.model
22

33
import org.apache.spark.sql.types.{StringType, StructField, StructType}
44
import org.apache.spark.sql.{DataFrame, Row, SQLContext}

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/RDFGraph.scala renamed to sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/model/RDFGraph.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package net.sansa_stack.inference.spark.data
1+
package net.sansa_stack.inference.spark.data.model
22

33
import org.apache.jena.graph.Triple
44
import org.apache.spark.rdd.RDD

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/RDFGraphDataFrame.scala renamed to sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/model/RDFGraphDataFrame.scala

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
package net.sansa_stack.inference.spark.data
1+
package net.sansa_stack.inference.spark.data.model
22

3-
import org.apache.jena.graph.Triple
43
import org.apache.spark.rdd.RDD
54
import org.apache.spark.sql.{DataFrame, SparkSession}
65

7-
import net.sansa_stack.inference.data.RDFTriple
6+
import net.sansa_stack.inference.data.{RDFTriple, SQLSchema, SQLSchemaDefault}
87

98
/**
109
* A data structure that comprises a set of triples.
1110
*
1211
* @author Lorenz Buehmann
1312
*
1413
*/
15-
class RDFGraphDataFrame(override val triples: DataFrame) extends AbstractRDFGraph[DataFrame, RDFGraphDataFrame](triples) {
14+
class RDFGraphDataFrame(override val triples: DataFrame, val schema: SQLSchema = SQLSchemaDefault)
15+
extends AbstractRDFGraph[DataFrame, RDFGraphDataFrame](triples) {
1616

1717
/**
1818
* Returns an RDD of triples that match with the given input.
@@ -23,17 +23,17 @@ class RDFGraphDataFrame(override val triples: DataFrame) extends AbstractRDFGrap
2323
* @return RDD of triples
2424
*/
2525
override def find(s: Option[String] = None, p: Option[String] = None, o: Option[String] = None): RDFGraphDataFrame = {
26-
var sql = "SELECT subject, predicate, object FROM TRIPLES"
26+
var sql = s"SELECT ${schema.subjectCol}, ${schema.predicateCol}, ${schema.objectCol} FROM ${schema.triplesTable}"
2727

2828
// corner case is when nothing is set, i.e. all triples will be returned
29-
if(s.isDefined || p.isDefined || o.isDefined) {
29+
if (s.isDefined || p.isDefined || o.isDefined) {
3030
sql += " WHERE "
3131

3232
val conditions = scala.collection.mutable.ListBuffer[String]()
3333

34-
if(s.isDefined) conditions += "subject = '" + s.get + "'"
35-
if(p.isDefined) conditions += "predicate = '" + p.get + "'"
36-
if(o.isDefined) conditions += "object = '" + o.get + "'"
34+
if (s.isDefined) conditions += s"${schema.subjectCol} = '${s.get}'"
35+
if (p.isDefined) conditions += s"${schema.predicateCol} = '${p.get}'"
36+
if (o.isDefined) conditions += s"${schema.objectCol} = '${o.get}'"
3737

3838
sql += conditions.mkString(" AND ")
3939
}
@@ -43,7 +43,7 @@ class RDFGraphDataFrame(override val triples: DataFrame) extends AbstractRDFGrap
4343

4444
/**
4545
* Return the union of the current RDF graph with the given RDF graph
46-
*
46+
*
4747
* @param graph the other RDF graph
4848
* @return the union of both graphs
4949
*/
@@ -58,10 +58,15 @@ class RDFGraphDataFrame(override val triples: DataFrame) extends AbstractRDFGrap
5858
// to limit the lineage, we convert to RDDs first, and use the SparkContext Union method for a sequence of RDDs
5959
val df: Option[DataFrame] = graphs match {
6060
case g :: Nil => Some(g.toDataFrame())
61-
case g :: _ => Some(g.toDataFrame().sqlContext.createDataFrame(
62-
g.toDataFrame().sqlContext.sparkContext.union(graphs.map(_.toDataFrame().rdd)),
63-
g.toDataFrame().schema
64-
))
61+
case g :: _ =>
62+
Some(
63+
g.toDataFrame()
64+
.sqlContext
65+
.createDataFrame(
66+
g.toDataFrame().sqlContext.sparkContext.union(graphs.map(_.toDataFrame().rdd)),
67+
g.toDataFrame().schema
68+
)
69+
)
6570
case _ => None
6671
}
6772
new RDFGraphDataFrame(df.get)
@@ -80,7 +85,7 @@ class RDFGraphDataFrame(override val triples: DataFrame) extends AbstractRDFGrap
8085
triples.count()
8186
}
8287

83-
def toDataFrame(sparkSession: SparkSession): DataFrame = triples
88+
def toDataFrame(sparkSession: SparkSession, schema: SQLSchema = SQLSchemaDefault): DataFrame = triples
8489

8590
def toRDD(): RDD[RDFTriple] = triples.rdd.map(row => RDFTriple(row.getString(0), row.getString(1), row.getString(2)))
8691

0 commit comments

Comments
 (0)