Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 79e2e3a

Browse files
minor changes in I/O
1 parent f66a9e0 commit 79e2e3a

2 files changed

Lines changed: 21 additions & 15 deletions

File tree

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/data/RDFGraphLoader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ object RDFGraphLoader {
2525
}
2626

2727
def loadFromDisk(paths: Seq[URI], env: ExecutionEnvironment): RDFGraph = {
28-
RDFGraph(NTriplesReader.load(env, paths))
28+
RDFGraph(NTriplesReader.load(env, paths).name("triples"))
2929
}
3030

3131
def main(args: Array[String]): Unit = {

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/data/RDFGraphWriter.scala

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import java.nio.charset.StandardCharsets
77
import org.apache.flink.api.common.operators.Order
88
import org.apache.flink.api.scala._
99
import org.apache.flink.core.fs.FileSystem
10+
import org.apache.jena.graph.GraphUtil
1011
import org.apache.jena.rdf.model.{Model, ModelFactory}
12+
import org.apache.jena.sparql.graph.GraphFactory
1113
import org.apache.jena.sparql.util.TripleComparator
1214
import org.slf4j.LoggerFactory
1315

@@ -52,32 +54,36 @@ object RDFGraphWriter {
5254

5355
// sort triples if enabled
5456
val tmp = if (sorted) {
55-
graph.triples// .sortPartition(t => t, Order.ASCENDING) // map(t => (t, t)).sortPartition(1, Order.DESCENDING).map(_._1)
57+
graph.triples.sortPartition(_.hashCode(), Order.ASCENDING)
5658
} else {
5759
graph.triples
5860
}
5961

60-
if (singleFile) {
61-
tmp.setParallelism(1)
62-
}
63-
64-
tmp
62+
val sink = tmp
6563
.map(new JenaTripleToNTripleString()) // to N-TRIPLES string
6664
.writeAsText(path.toString, writeMode = FileSystem.WriteMode.OVERWRITE)
6765

66+
// write to single file if enabled
67+
if (singleFile) {
68+
sink.setParallelism(1)
69+
}
70+
6871
logger.info("finished writing triples to disk in " + (System.currentTimeMillis()-startTime) + "ms.")
6972
}
7073

74+
/**
75+
* Converts an RDF graph to an Apache Jena in-memory model.
76+
*
77+
* @note For large graphs this can be too expensive
78+
* and lead to a OOM exception
79+
*
80+
* @param graph the RDF graph
81+
*
82+
* @return the in-memory Apache Jena model containing the triples
83+
*/
7184
def convertToModel(graph: RDFGraph) : Model = {
72-
val modelString = graph.triples.map(new JenaTripleToNTripleString())
73-
.collect().mkString("\n")
74-
7585
val model = ModelFactory.createDefaultModel()
76-
77-
if(!modelString.trim.isEmpty) {
78-
model.read(new ByteArrayInputStream(modelString.getBytes(StandardCharsets.UTF_8)), null, "N-TRIPLES")
79-
}
80-
86+
GraphUtil.add(model.getGraph, graph.triples.collect().toArray)
8187
model
8288
}
8389
}

0 commit comments

Comments
 (0)