Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 6aca7a1

Browse files
Additional CLI options how to save result to disk.
1 parent 1bfd712 commit 6aca7a1

6 files changed

Lines changed: 65 additions & 24 deletions

File tree

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/RDFGraphMaterializer.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,19 @@ object RDFGraphMaterializer {
2323
def main(args: Array[String]) {
2424
parser.parse(args, Config()) match {
2525
case Some(config) =>
26-
run(config.in, config.out, config.profile)
26+
run(config.in, config.out, config.profile, config.writeToSingleFile, config.sortedOutput)
2727
case None =>
2828
println(parser.usage)
2929
}
3030
}
3131

32-
def run(input: File, output: File, profile: ReasoningProfile): Unit = {
32+
def run(input: File, output: File, profile: ReasoningProfile, writeToSingleFile: Boolean, sortedOutput: Boolean): Unit = {
3333
val conf = new SparkConf()
3434
conf.registerKryoClasses(Array(classOf[RDFTriple]))
3535

3636
// the SPARK config
3737
val session = SparkSession.builder
38-
.appName("SPARK Reasoning")
38+
.appName(s"SPARK $profile Reasoning")
3939
.master("local[4]")
4040
.config("spark.eventLog.enabled", "true")
4141
.config("spark.hadoop.validateOutputSpecs", "false") //override output files
@@ -58,13 +58,18 @@ object RDFGraphMaterializer {
5858
print(inferredGraph.size())
5959

6060
// write triples to disk
61-
RDFGraphWriter.writeToFile(inferredGraph, output.getAbsolutePath)
61+
RDFGraphWriter.writeGraphToFile(inferredGraph, output.getAbsolutePath, writeToSingleFile, sortedOutput)
6262

6363
session.stop()
6464
}
6565

6666
// the config object
67-
case class Config(in: File = new File("."), out: File = new File("."), profile: ReasoningProfile = ReasoningProfile.RDFS)
67+
case class Config(
68+
in: File = new File("."),
69+
out: File = new File("."),
70+
profile: ReasoningProfile = ReasoningProfile.RDFS,
71+
writeToSingleFile: Boolean = false,
72+
sortedOutput: Boolean = false)
6873

6974
// read ReasoningProfile enum
7075
implicit val profilesRead: scopt.Read[ReasoningProfile.Value] =
@@ -82,6 +87,12 @@ object RDFGraphMaterializer {
8287
action((x, c) => c.copy(out = x)).
8388
text("the output directory")
8489

90+
opt[Unit]("single-file").optional().action( (_, c) =>
91+
c.copy(writeToSingleFile = true)).text("write the output to a single file in the output directory")
92+
93+
opt[Unit]("sorted").optional().action( (_, c) =>
94+
c.copy(sortedOutput = true)).text("sorted output of the triples per file")
95+
8596
opt[ReasoningProfile]('p', "profile").required().valueName("{rdfs | owl-horst | owl-el | owl-rl}").
8697
action((x, c) => c.copy(profile = x)).
8798
text("the reasoning profile")

sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/RDFGraphWriter.scala

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,56 @@ object RDFGraphWriter {
2020

2121
private val logger = com.typesafe.scalalogging.Logger(LoggerFactory.getLogger(this.getClass.getName))
2222

23-
def writeToFile(graph: RDFGraph, path: String): Unit = {
23+
/**
24+
* Write the graph to disk in N-Triple format.
25+
*
26+
* @param graph the RDF graph
27+
* @param path the output directory
28+
* @param singleFile whether to put all data into a single file
29+
* @param sorted whether to sort the triples by subject, predicate, object
30+
*/
31+
def writeGraphToFile(graph: RDFGraph, path: String, singleFile: Boolean = false, sorted: Boolean = false): Unit = {
32+
writeTriplesToFile(graph.triples, path, singleFile, sorted)
33+
}
34+
35+
/**
36+
* Write the triples to disk in N-Triple format.
37+
*
38+
* @param triples the triples
39+
* @param path the output directory
40+
* @param singleFile whether to put all data into a single file
41+
* @param sorted whether to sort the triples by subject, predicate, object
42+
*/
43+
def writeTriplesToFile(triples: RDD[RDFTriple], path: String, singleFile: Boolean = false, sorted: Boolean = false): Unit = {
2444
logger.info("writing triples to disk...")
2545
val startTime = System.currentTimeMillis()
2646

2747
implicit val ordering = RDFTripleOrdering
2848

29-
graph.triples.map(t=>(t,t)).sortByKey().map(_._1)
30-
.map(t => "<" + t.subject + "> <" + t.predicate + "> <" + t.`object` + "> .") // to N-TRIPLES string
31-
.coalesce(1)
32-
.saveAsTextFile(path)
49+
// sort triples if enabled
50+
val tmp = if(sorted) {
51+
triples.map(t => (t,t)).sortByKey().map(_._1)
52+
} else {
53+
triples
54+
}
3355

34-
logger.info("finished writing triples to disk in " + (System.currentTimeMillis()-startTime) + "ms.")
35-
}
56+
// convert to N-Triple format
57+
var triplesNTFormat = tmp.map(t => "<" + t.subject + "> <" + t.predicate + "> <" + t.`object` + "> .")
3658

37-
def writeToFile(triples: RDD[RDFTriple], path: String): Unit = {
38-
writeToFile(RDFGraph(triples), path)
59+
// convert to single file, i.e. move al lto one partition
60+
// (might be very expensive and contradicts the Big Data paradigm on Hadoop in general)
61+
if(singleFile) {
62+
triplesNTFormat = triplesNTFormat.coalesce(1, shuffle = true)
63+
}
64+
65+
// finally, write to disk
66+
triplesNTFormat.saveAsTextFile(path)
67+
68+
logger.info("finished writing triples to disk in " + (System.currentTimeMillis()-startTime) + "ms.")
3969
}
4070

41-
def writeToFile(dataFrame: DataFrame, path: String): Unit = {
42-
writeToFile(dataFrame.rdd.map(row => RDFTriple(row.getString(0), row.getString(1), row.getString(2))), path)
71+
def writeDataframeToFile(dataFrame: DataFrame, path: String, singleFile: Boolean = false, sorted: Boolean = false): Unit = {
72+
writeTriplesToFile(dataFrame.rdd.map(row => RDFTriple(row.getString(0), row.getString(1), row.getString(2))), path, singleFile, sorted)
4373
}
4474

4575
def convertToModel(graph: RDFGraph) : Model = {

sansa-inference-spark/src/test/scala/net/sansa_stack/inference/spark/GenericVsNativeExperiments.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ object GenericVsNativeExperiments {
4848
val targetDir = args(1)
4949

5050
// write triples to disk
51-
RDFGraphWriter.writeToFile(infGraphNative.toDataFrame(), targetDir + "/native")
52-
RDFGraphWriter.writeToFile(infGraphGeneric.toDataFrame(), targetDir + "/generic")
51+
RDFGraphWriter.writeDataframeToFile(infGraphNative.toDataFrame(), targetDir + "/native")
52+
RDFGraphWriter.writeDataframeToFile(infGraphGeneric.toDataFrame(), targetDir + "/generic")
5353

5454
session.stop()
5555

sansa-inference-spark/src/test/scala/net/sansa_stack/inference/spark/rules/RDFGraphMaterializerTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ object RDFGraphMaterializerTest {
4646
val inferredGraph = reasoner.apply(graph)
4747

4848
// write triples to disk
49-
RDFGraphWriter.writeToFile(inferredGraph, args(0))
49+
RDFGraphWriter.writeGraphToFile(inferredGraph, args(0))
5050

5151
sc.stop()
5252

sansa-inference-spark/src/test/scala/net/sansa_stack/inference/spark/rules/SetOfRulesTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,13 @@ object SetOfRulesTest {
118118
def runNaive(graph: RDFGraphNative, rules: Seq[Rule]) = {
119119
val reasoner = new ForwardRuleReasonerNaive(sc, rules.toSet)
120120
val res = reasoner.apply(graph)
121-
RDFGraphWriter.writeToFile(res.toRDD(), "/tmp/spark-tests/naive")
121+
RDFGraphWriter.writeTriplesToFile(res.toRDD(), "/tmp/spark-tests/naive")
122122
}
123123

124124
def runNative(graph: RDFGraphNative, rules: Seq[Rule]) = {
125125
val reasoner = new ForwardRuleReasonerOptimizedNative(sparkSession, rules.toSet)
126126
val res = reasoner.apply(graph)
127-
RDFGraphWriter.writeToFile(res.toRDD(), "/tmp/spark-tests/optimized-native")
127+
RDFGraphWriter.writeTriplesToFile(res.toRDD(), "/tmp/spark-tests/optimized-native")
128128
}
129129

130130
def runSQL(graph: RDFGraphNative, rules: Seq[Rule]) = {
@@ -133,7 +133,7 @@ object SetOfRulesTest {
133133

134134
val reasoner = new ForwardRuleReasonerOptimizedSQL(sparkSession, rules.toSet)
135135
val res = reasoner.apply(graphDataframe)
136-
RDFGraphWriter.writeToFile(res.toDataFrame(), "/tmp/spark-tests/optimized-sql")
136+
RDFGraphWriter.writeDataframeToFile(res.toDataFrame(), "/tmp/spark-tests/optimized-sql")
137137
reasoner.showExecutionStats()
138138
}
139139
}

sansa-inference-spark/src/test/scala/net/sansa_stack/inference/spark/rules/TransitivityRuleTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ object TransitivityRuleTest {
8282

8383
val planExecutor1 = new PlanExecutorNative(sc)
8484
val res2 = planExecutor1.execute(plan, graph)
85-
RDFGraphWriter.writeToFile(res2.toRDD(), "/tmp/spark-tests/native")
85+
RDFGraphWriter.writeTriplesToFile(res2.toRDD(), "/tmp/spark-tests/native")
8686

8787

8888
// 3. the SQL based rule executor
@@ -94,7 +94,7 @@ object TransitivityRuleTest {
9494
val df = new RDFGraphDataFrame(graph.toDataFrame(sparkSession))
9595
val planExecutor2 = new PlanExecutorSQL(sparkSession)
9696
val res3 = planExecutor2.execute(plan, df)
97-
RDFGraphWriter.writeToFile(res3.toRDD(), "/tmp/spark-tests/sql")
97+
RDFGraphWriter.writeTriplesToFile(res3.toRDD(), "/tmp/spark-tests/sql")
9898

9999
sc.stop()
100100
}

0 commit comments

Comments
 (0)