Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit fe1a371

Browse files
Read directory.
Write to disk.
1 parent 75fcddc commit fe1a371

2 files changed

Lines changed: 47 additions & 1 deletion

File tree

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/data/RDFGraphLoader.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package net.sansa_stack.inference.flink.data
22

3+
import java.io.File
4+
35
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
46
import net.sansa_stack.inference.data.RDFTriple
7+
import org.apache.flink.configuration.Configuration
58

69
/**
710
* @author Lorenz Buehmann
@@ -16,4 +19,34 @@ object RDFGraphLoader {
1619
RDFGraph(triples)
1720
}
1821

22+
def loadFromDisk(path: File, env: ExecutionEnvironment): RDFGraph = {
23+
val paths = if(path.isFile) Seq(path.getAbsolutePath) else path.listFiles().map(f => f.getAbsolutePath).toSeq
24+
25+
// create a configuration object
26+
val parameters = new Configuration
27+
28+
// set the recursive enumeration parameter
29+
parameters.setBoolean("recursive.file.enumeration", true)
30+
31+
// pass the configuration to the data source
32+
val triples = env.readTextFile(path.getAbsolutePath).withParameters(parameters)
33+
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
34+
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2))) // tokens to triple
35+
36+
RDFGraph(triples)
37+
}
38+
39+
def loadFromDisk(paths: Seq[File], env: ExecutionEnvironment): RDFGraph = {
40+
41+
val tmp: Seq[String] = paths.map(path => {
42+
if(path.isFile) Seq(path.getAbsolutePath) else path.listFiles().map(f => f.getAbsolutePath).toSeq
43+
}).flatMap(identity)
44+
45+
val triples = env.fromCollection(tmp)
46+
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
47+
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2))) // tokens to triple
48+
49+
RDFGraph(triples)
50+
}
51+
1952
}

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/data/RDFGraphWriter.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package net.sansa_stack.inference.flink.data
22

3-
import java.io.ByteArrayInputStream
3+
import java.io.{ByteArrayInputStream, File}
44
import java.nio.charset.StandardCharsets
55

66
import org.apache.flink.api.common.operators.Order
@@ -33,6 +33,19 @@ object RDFGraphWriter {
3333
logger.info("finished writing triples to disk in " + (System.currentTimeMillis()-startTime) + "ms.")
3434
}
3535

36+
def writeToDisk(graph: RDFGraph, path: File): Unit = {
37+
logger.info("writing triples to disk...")
38+
val startTime = System.currentTimeMillis()
39+
40+
implicit val ordering = RDFTripleOrdering
41+
42+
graph.triples.map(t=>(t,t)).sortPartition(1, Order.DESCENDING).map(_._1)
43+
.map(t => "<" + t.subject + "> <" + t.predicate + "> <" + t.`object` + "> .") // to N-TRIPLES string
44+
.writeAsText(path.getAbsolutePath, writeMode = FileSystem.WriteMode.OVERWRITE)
45+
46+
logger.info("finished writing triples to disk in " + (System.currentTimeMillis()-startTime) + "ms.")
47+
}
48+
3649
def convertToModel(graph: RDFGraph) : Model = {
3750
val modelString = graph.triples.map(t =>
3851
"<" + t.subject + "> <" + t.predicate + "> <" + t.`object` + "> .") // to N-TRIPLES string

0 commit comments

Comments
 (0)