Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit c86bd2a

Browse files
Aligned IO methods
1 parent 7c6ec88 commit c86bd2a

14 files changed

Lines changed: 183 additions & 98 deletions

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/RDFGraphMaterializer.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package net.sansa_stack.inference.spark
22

3-
import java.io.File
43
import java.net.URI
5-
import java.nio.file.{Path, Paths}
64

75
import org.apache.spark.SparkConf
86
import org.apache.spark.sql.SparkSession
@@ -49,21 +47,18 @@ object RDFGraphMaterializer {
4947
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
5048
.config("spark.default.parallelism", parallelism)
5149
.config("spark.ui.showConsoleProgress", "false")
50+
.config("spark.sql.shuffle.partitions", parallelism)
5251
.config(conf)
5352
.getOrCreate()
5453

5554
// println(session.conf.getAll.mkString("\n"))
5655

57-
// val g = RDFGraphLoader.loadGraphFromDiskAsDataset(session, input)
58-
// g.triples.toDF().printSchema()
59-
// g.triples.show(10)
56+
// val g = RDFGraphLoader.loadFromDiskAsDataset(session, input).distinct()
6057
// val g_inf = new ForwardRuleReasonerRDFSDataset(session).apply(g)
61-
// println(g_inf.size())
58+
// println(s"|G_inf| = ${g_inf.size()}")
6259

6360
// load triples from disk
64-
val graph = RDFGraphLoader.loadFromDisk(input, session, parallelism)
65-
66-
61+
val graph = RDFGraphLoader.loadFromDisk(session, input, parallelism)
6762
// println(s"|G| = ${graph.size()}")
6863

6964
// create reasoner
@@ -82,7 +77,7 @@ object RDFGraphMaterializer {
8277
// println(s"|G_inf| = ${inferredGraph.size()}")
8378

8479
// write triples to disk
85-
RDFGraphWriter.writeGraphToFile(inferredGraph, output.toString, writeToSingleFile, sortedOutput)
80+
RDFGraphWriter.writeToDisk(inferredGraph, output.toString, writeToSingleFile, sortedOutput)
8681

8782
session.stop()
8883
}

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/abstraction/TypeComputorDefault.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ object TypeComputorDefault {
8181
.config("spark.ui.showConsoleProgress", "false")
8282
.getOrCreate()
8383

84-
val graph = RDFGraphLoader.loadFromDisk(Seq(URI.create(args(0))), session, parallelism)
84+
val graph = RDFGraphLoader.loadFromDisk(session, Seq(URI.create(args(0))), parallelism)
8585

8686
val typeComputor = new TypeComputorDefault
8787

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/RDFGraph.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ case class RDFGraph (triples: RDD[RDFTriple]) {
8383
val triplesDataFrame = sqlContext.createDataFrame(rowRDD, schema)
8484

8585
// register the DataFrame as a table
86-
triplesDataFrame.registerTempTable("TRIPLES")
86+
triplesDataFrame.createOrReplaceTempView("TRIPLES")
8787

8888
triplesDataFrame
8989
}
Lines changed: 103 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package net.sansa_stack.inference.spark.data
22

3-
import java.io.File
43
import java.net.URI
54

6-
import org.apache.jena.rdf.model.Resource
5+
import scala.language.implicitConversions
6+
77
import org.apache.spark.SparkContext
8-
import org.apache.spark.sql.SparkSession
8+
import org.apache.spark.sql.{Dataset, SparkSession}
99
import org.slf4j.LoggerFactory
1010

1111
import net.sansa_stack.inference.data.RDFTriple
1212
import net.sansa_stack.inference.utils.NTriplesStringToRDFTriple
1313

1414
/**
15-
* Load an RDF graph from disk.
15+
* A class that provides methods to load an RDF graph from disk.
1616
*
1717
* @author Lorenz Buehmann
1818
*
@@ -21,65 +21,66 @@ object RDFGraphLoader {
2121

2222
private val logger = com.typesafe.scalalogging.Logger(LoggerFactory.getLogger(this.getClass.getName))
2323

24-
implicit def filesConverter(files: Seq[File]): String = files.map(p => p.getAbsolutePath).mkString(",")
24+
private implicit def pathURIsConverter(uris: Seq[URI]): String = uris.map(p => p.toString).mkString(",")
2525

2626
/**
2727
* Load an RDF graph from a file or directory. The path can also contain multiple paths
2828
* and even wildcards, e.g.
29-
* "/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file"
29+
* `"/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file"`
3030
*
3131
* @param path the absolute path of the file
3232
* @param session the Spark session
3333
* @param minPartitions min number of partitions for Hadoop RDDs ([[SparkContext.defaultMinPartitions]])
3434
* @return an RDF graph
3535
*/
36-
def loadFromFile(path: String, session: SparkSession, minPartitions: Int = 2): RDFGraph = {
36+
def loadFromDisk(session: SparkSession, path: String, minPartitions: Int = 2): RDFGraph = {
3737
logger.info("loading triples from disk...")
3838
val startTime = System.currentTimeMillis()
3939

4040
val triples = session.sparkContext
4141
.textFile(path, minPartitions) // read the text file
4242
.map(new NTriplesStringToRDFTriple()) // convert to triple object
43+
// .repartition(minPartitions)
4344

4445
// logger.info("finished loading " + triples.count() + " triples in " + (System.currentTimeMillis()-startTime) + "ms.")
4546
new RDFGraph(triples)
4647
}
4748

4849
/**
49-
* Load an RDF graph from multiple files.
50+
* Load an RDF graph from multiple files or directories.
5051
*
51-
* @param paths the files
52+
* @param paths the files or directories
5253
* @param session the Spark session
5354
* @param minPartitions min number of partitions for Hadoop RDDs ([[SparkContext.defaultMinPartitions]])
5455
* @return an RDF graph
5556
*/
56-
def loadFromDisk(paths: Seq[URI], session: SparkSession, minPartitions: Int = 2): RDFGraph = {
57-
logger.info("loading triples from disk...")
58-
val startTime = System.currentTimeMillis()
59-
println("Input Paths: " + paths.mkString(","))
60-
val pathsConcat = paths.mkString(",") // make concatenated string of paths
61-
62-
val triples = session.sparkContext
63-
.textFile(pathsConcat, minPartitions) // read the text files
64-
.map(new NTriplesStringToRDFTriple()) // convert to triple object
65-
// .repartition(minPartitions)
57+
def loadFromDisk(session: SparkSession, paths: Seq[URI], minPartitions: Int): RDFGraph = {
58+
loadFromDisk(session, paths.mkString(","), minPartitions)
59+
}
6660

67-
// logger.info("finished loading " + triples.count() + " triples in " +
68-
// (System.currentTimeMillis()-startTime) + "ms.")
69-
new RDFGraph(triples)
61+
/**
62+
* Load an RDF graph from a single file or directory.
63+
*
64+
* @param path the path to the file or directory
65+
* @param session the Spark session
66+
* @param minPartitions min number of partitions for Hadoop RDDs ([[SparkContext.defaultMinPartitions]])
67+
* @return an RDF graph
68+
*/
69+
def loadFromDisk(session: SparkSession, path: URI, minPartitions: Int): RDFGraph = {
70+
loadFromDisk(session, Seq(path), minPartitions)
7071
}
7172

7273
/**
73-
* Load an RDF graph from a file or directory. The path can also contain multiple paths
74-
* and even wildcards, e.g.
74+
* Load an RDF graph from a file or directory with a Spark RDD as underlying datastructure.
75+
* The path can also contain multiple paths and even wildcards, e.g.
7576
* "/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file"
7677
*
7778
* @param path the files
7879
* @param session the Spark session
7980
* @param minPartitions min number of partitions for Hadoop RDDs ([[SparkContext.defaultMinPartitions]])
8081
* @return an RDF graph
8182
*/
82-
def loadGraphFromFile(path: String, session: SparkSession, minPartitions: Int = 2): RDFGraphNative = {
83+
def loadFromDiskAsRDD(session: SparkSession, path: String, minPartitions: Int): RDFGraphNative = {
8384
logger.info("loading triples from disk...")
8485
val startTime = System.currentTimeMillis()
8586

@@ -92,9 +93,26 @@ println("Input Paths: " + paths.mkString(","))
9293
new RDFGraphNative(triples)
9394
}
9495

95-
case class RDFTriple2(s: String, p: String, o: String)
96+
private case class RDFTriple2(s: String, p: String, o: String) extends Product3[String, String, String] {
97+
override def _1: String = s
98+
override def _2: String = p
99+
override def _3: String = o
100+
101+
def subject: String = s
96102

97-
def loadGraphFromDiskAsDataset(implicit session: SparkSession, paths: scala.Seq[File]): RDFGraphDataset = {
103+
override def toString: String = s + " " + p + " " + o
104+
}
105+
106+
/**
107+
* Load an RDF graph from from multiple files or directories with a Spark Dataset as underlying datastructure.
108+
* The path can also contain multiple paths and even wildcards, e.g.
109+
* `"/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file"`
110+
*
111+
* @param path the absolute path of the file
112+
* @param session the Spark session
113+
* @return an RDF graph based on a [[Dataset]]
114+
*/
115+
def loadFromDiskAsDataset(session: SparkSession, path: String): RDFGraphDataset = {
98116
logger.info("loading triples from disk...")
99117
val startTime = System.currentTimeMillis()
100118

@@ -104,25 +122,43 @@ println("Input Paths: " + paths.mkString(","))
104122
Array(splitted(0), splitted(1), splitted(2))
105123
})
106124

107-
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[RDFTriple]
125+
// val rdfTripleEncoder = org.apache.spark.sql.Encoders.kryo[RDFTriple]
108126
val spark = session.sqlContext
109127
import spark.implicits._
110128

129+
130+
111131
val triples = session.read
112-
.textFile(paths) // read the text file
113-
.map(s => {
114-
val tokens = s.split(" ")
115-
RDFTriple(tokens(0), tokens(1), tokens(2))
116-
})
117-
session.read
118-
.textFile(paths) // read the text file
119-
.map(s => {
120-
val tokens = s.split(" ")
121-
RDFTriple(tokens(0), tokens(1), tokens(2))
122-
}).toDF().show(10)
123-
// .select(splitter($"value") as "tokens")
124-
// .select($"tokens"(0) as "s", $"tokens"(1) as "p", $"tokens"(2) as "o")
125-
// .as[RDFTriple]
132+
.textFile(path) // read the text file
133+
.map(new NTriplesStringToRDFTriple())
134+
.as[RDFTriple]
135+
.as("triples")
136+
// (rdfTripleEncoder)
137+
// val rowRDD = session.sparkContext
138+
// .textFile(paths) // read the text file
139+
// .map(s => {
140+
// val tokens = s.split(" ")
141+
// Row(tokens(0), tokens(1), tokens(2))
142+
// // RDFTriple(tokens(0), tokens(1), tokens(2))
143+
// })
144+
//
145+
// val encoder = Encoders.product[RDFTriple]
146+
// val schema =
147+
// StructType(Array(
148+
// StructField("s", StringType, true),
149+
// StructField("p", StringType, true),
150+
// StructField("o", StringType, true)))
151+
// val triplesDF = spark.createDataFrame(rowRDD, schema)
152+
// val triples = triplesDF.as[RDFTriple](encoder)
153+
// session.read
154+
// .textFile(paths) // read the text file
155+
// .map(s => {
156+
// val tokens = s.split(" ")
157+
// RDFTriple2(tokens(0), tokens(1), tokens(2))
158+
// }).as[RDFTriple2].show(10)
159+
// .select(splitter($"value") as "tokens")
160+
// .select($"tokens"(0) as "s", $"tokens"(1) as "p", $"tokens"(2) as "o")
161+
// .as[RDFTriple]
126162

127163

128164
// convert to triple object
@@ -132,7 +168,30 @@ println("Input Paths: " + paths.mkString(","))
132168
new RDFGraphDataset(triples)
133169
}
134170

135-
def loadGraphDataFrameFromFile(path: String, session: SparkSession, minPartitions: Int = 2): RDFGraphDataFrame = {
136-
new RDFGraphDataFrame(loadGraphFromFile(path, session, minPartitions).toDataFrame(session))
171+
/**
172+
* Load an RDF graph from from from a file or directory with a Spark Dataset as underlying datastructure.
173+
* The path can also contain multiple paths and even wildcards, e.g.
174+
* `"/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file"`
175+
*
176+
* @param paths the absolute path of the file
177+
* @param session the Spark session
178+
* @return an RDF graph based on a [[Dataset]]
179+
*/
180+
def loadFromDiskAsDataset(session: SparkSession, paths: scala.Seq[URI]): RDFGraphDataset = {
181+
loadFromDiskAsDataset(session, paths.mkString(","))
182+
}
183+
184+
/**
185+
* Load an RDF graph from a file or directory with a Spark DataFrame as underlying datastructure.
186+
* The path can also contain multiple paths and even wildcards, e.g.
187+
* `"/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file"`
188+
*
189+
* @param path the absolute path of the file
190+
* @param session the Spark session
191+
* @param minPartitions min number of partitions for Hadoop RDDs ([[SparkContext.defaultMinPartitions]])
192+
* @return an RDF graph based on a [[org.apache.spark.sql.DataFrame]]
193+
*/
194+
def loadFromDiskAsDataFrame(session: SparkSession, path: String, minPartitions: Int): RDFGraphDataFrame = {
195+
new RDFGraphDataFrame(loadFromDiskAsRDD(session, path, minPartitions).toDataFrame(session))
137196
}
138197
}

0 commit comments

Comments
 (0)