@@ -2,13 +2,10 @@ package net.sansa_stack.inference.flink.data
22
33import java .net .URI
44
5- import scala .collection .JavaConverters ._
65import scala .language .implicitConversions
76
7+ import net .sansa_stack .rdf .flink .io .ntriples .NTriplesReader
88import org .apache .flink .api .scala .{ExecutionEnvironment , _ }
9- import org .apache .jena .riot .{Lang , RDFDataMgr }
10-
11- import net .sansa_stack .rdf .benchmark .io .ReadableByteChannelFromIterator
129
1310
1411/**
@@ -28,35 +25,16 @@ object RDFGraphLoader {
2825 }
2926
3027 def loadFromDisk (paths : Seq [URI ], env : ExecutionEnvironment ): RDFGraph = {
31- // // create a configuration object
32- // val parameters = new Configuration
33- //
34- // // set the recursive enumeration parameter
35- // parameters.setBoolean("recursive.file.enumeration", true)
36- // env.readTextFile(f).withParameters(parameters)
37-
38- val tmp : List [String ] = paths.map(path => path.toString).toList
39-
40- val triples = tmp
41- .map(f => env.readTextFile(f)) // no support to read from multiple paths at once, thus, map + union here
42- .reduce(_ union _) // TODO Flink 1.5.0 supports multiple paths via FileInputFormat
43- .mapPartition(p => {
44- // convert iterator to input stream
45- val is = ReadableByteChannelFromIterator .toInputStream(p.asJava)
46-
47- RDFDataMgr .createIteratorTriples(is, Lang .NTRIPLES , null ).asScala
48- })
49- .name(" triples" )
50-
51- RDFGraph (triples)
28+ RDFGraph (NTriplesReader .load(env, paths))
5229 }
5330
5431 def main (args : Array [String ]): Unit = {
5532 if (args.length == 0 ) println(" Usage: RDFGraphLoader <PATH_TO_FILE>" )
5633
5734 val path = args(0 )
5835
59- val env = ExecutionEnvironment .getExecutionEnvironment
36+ // val env = ExecutionEnvironment.getExecutionEnvironment
37+ val env = ExecutionEnvironment .createLocalEnvironment(parallelism = 2 )
6038
6139 val ds = RDFGraphLoader .loadFromDisk(path, env).triples
6240
0 commit comments