Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit c32c4e0

Browse files
Allow for multiple files/directories as input.
1 parent 0850f22 commit c32c4e0

2 files changed

Lines changed: 21 additions & 6 deletions

File tree

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/RDFGraphMaterializer.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ object RDFGraphMaterializer {
2929
}
3030
}
3131

32-
def run(input: File, output: File, profile: ReasoningProfile, writeToSingleFile: Boolean, sortedOutput: Boolean): Unit = {
32+
def run(input: Seq[File], output: File, profile: ReasoningProfile, writeToSingleFile: Boolean, sortedOutput: Boolean): Unit = {
3333
val conf = new SparkConf()
3434
conf.registerKryoClasses(Array(classOf[RDFTriple]))
3535

@@ -45,7 +45,7 @@ object RDFGraphMaterializer {
4545
.getOrCreate()
4646

4747
// load triples from disk
48-
val graph = RDFGraphLoader.loadFromFile(input.getAbsolutePath, session.sparkContext, 4)
48+
val graph = RDFGraphLoader.loadFromDisk(input, session.sparkContext, 4)
4949

5050
// create reasoner
5151
val reasoner = profile match {
@@ -65,7 +65,7 @@ object RDFGraphMaterializer {
6565

6666
// the config object
6767
case class Config(
68-
in: File = new File("."),
68+
in: Seq[File] = Seq(),
6969
out: File = new File("."),
7070
profile: ReasoningProfile = ReasoningProfile.RDFS,
7171
writeToSingleFile: Boolean = false,
@@ -79,9 +79,9 @@ object RDFGraphMaterializer {
7979
val parser = new scopt.OptionParser[Config]("RDFGraphMaterializer") {
8080
head("RDFGraphMaterializer", "0.1.0")
8181

82-
opt[File]('i', "input").required().valueName("<file>").
82+
opt[Seq[File]]('i', "input").required().valueName("<path1>,<path2>,...").
8383
action((x, c) => c.copy(in = x)).
84-
text("the input file in N-Triple format")
84+
text("path to file or directory that contains the input files (in N-Triple format)")
8585

8686
opt[File]('o', "out").required().valueName("<directory>").
8787
action((x, c) => c.copy(out = x)).
@@ -91,7 +91,7 @@ object RDFGraphMaterializer {
9191
c.copy(writeToSingleFile = true)).text("write the output to a single file in the output directory")
9292

9393
opt[Unit]("sorted").optional().action( (_, c) =>
94-
c.copy(sortedOutput = true)).text("sorted output of the triples per file")
94+
c.copy(sortedOutput = true)).text("sorted output of the triples (per file)")
9595

9696
opt[ReasoningProfile]('p', "profile").required().valueName("{rdfs | owl-horst | owl-el | owl-rl}").
9797
action((x, c) => c.copy(profile = x)).

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/RDFGraphLoader.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package net.sansa_stack.inference.spark.data
22

3+
import java.io.File
4+
35
import org.apache.spark.SparkContext
46
import org.apache.spark.sql.SparkSession
57
import net.sansa_stack.inference.data.RDFTriple
8+
import org.apache.spark.rdd.RDD
69
import org.slf4j.LoggerFactory
710

811
/**
@@ -28,6 +31,18 @@ object RDFGraphLoader {
2831
new RDFGraph(triples)
2932
}
3033

34+
def loadFromDisk(paths: Seq[File], sc: SparkContext, minPartitions: Int = 2): RDFGraph = {
35+
logger.info("loading triples from disk...")
36+
val startTime = System.currentTimeMillis()
37+
38+
val triples = sc.textFile(paths.map(p => p.getAbsolutePath).mkString(","))
39+
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
40+
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2))).cache() // tokens to triple
41+
42+
// logger.info("finished loading " + triples.count() + " triples in " + (System.currentTimeMillis()-startTime) + "ms.")
43+
new RDFGraph(triples)
44+
}
45+
3146
def loadGraphFromFile(path: String, session: SparkSession, minPartitions: Int = 2): RDFGraphNative = {
3247
logger.info("loading triples from disk...")
3348
val startTime = System.currentTimeMillis()

0 commit comments

Comments
 (0)