Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 2df2c0c

Browse files
Cont. Flink RDF
1 parent 24d641b commit 2df2c0c

10 files changed

Lines changed: 110 additions & 90 deletions

File tree

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@
111111
<artifactId>sansa-rdf-partition-core</artifactId>
112112
<version>${sansa.rdf.version}</version>
113113
</dependency>
114+
<dependency>
115+
<groupId>net.sansa-stack</groupId>
116+
<artifactId>sansa-rdf-flink_${scala.binary.version}</artifactId>
117+
<version>${sansa.rdf.version}</version>
118+
</dependency>
114119

115120
<!-- Query Layer -->
116121
<dependency>

sansa-inference-flink/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ under the License.
4747
<scope>test</scope>
4848
</dependency>
4949

50+
<!-- RDF Layer -->
51+
<dependency>
52+
<groupId>net.sansa-stack</groupId>
53+
<artifactId>sansa-rdf-flink_${scala.binary.version}</artifactId>
54+
</dependency>
55+
5056
<!-- Apache Flink -->
5157
<dependency>
5258
<groupId>org.apache.flink</groupId>

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/RDFGraphMaterializer.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ object RDFGraphMaterializer {
7474

7575
// load triples from disk
7676
val graph = RDFGraphLoader.loadFromDisk(input, env)
77-
// println(s"|G| = ${graph.size()}")
7877

7978
// create reasoner
8079
val reasoner = profile match {
@@ -90,7 +89,7 @@ object RDFGraphMaterializer {
9089

9190
// compute inferred graph
9291
val inferredGraph = reasoner.apply(graph)
93-
println(s"|G_inf| = ${inferredGraph.size()}")
92+
println(s"|G_inf| = ${inferredGraph.size}")
9493

9594
// write triples to disk
9695
// RDFGraphWriter.writeToDisk(inferredGraph, output, writeToSingleFile, sortedOutput)
@@ -119,7 +118,7 @@ object RDFGraphMaterializer {
119118

120119
// the CLI parser
121120
val parser = new scopt.OptionParser[Config]("RDFGraphMaterializer") {
122-
head("RDFGraphMaterializer", "0.1.0")
121+
head("RDFGraphMaterializer", "0.4.0")
123122

124123
// opt[Seq[File]]('i', "input").required().valueName("<path1>,<path2>,...").
125124
// action((x, c) => c.copy(in = x)).
@@ -128,7 +127,7 @@ object RDFGraphMaterializer {
128127
.required()
129128
.valueName("<path>")
130129
.action((x, c) => c.copy(in = x))
131-
.text("path to file or directory that contains the input files (in N-Triple format)")
130+
.text("path to file or directory that contains the input files (in N-Triples format)")
132131

133132
opt[URI]('o', "out")
134133
.required()

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/data/RDFGraph.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,5 @@ case class RDFGraph(triples: DataSet[Triple]) {
6767
*
6868
* @return the number of triples
6969
*/
70-
def size(): Long = {
71-
triples.count()
72-
}
70+
lazy val size: Long = triples.count()
7371
}
Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
package net.sansa_stack.inference.flink.data
22

3-
import java.io.File
43
import java.net.URI
54

6-
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
7-
8-
import net.sansa_stack.inference.data.RDFTriple
9-
import org.apache.flink.configuration.Configuration
5+
import scala.collection.JavaConverters._
106
import scala.language.implicitConversions
117

12-
import org.apache.jena.rdf.model.impl.NTripleReader
8+
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
9+
import org.apache.jena.riot.{Lang, RDFDataMgr}
10+
11+
import net.sansa_stack.rdf.benchmark.io.ReadableByteChannelFromIterator
1312

14-
import net.sansa_stack.inference.utils.NTriplesStringToRDFTriple
1513

1614
/**
1715
* @author Lorenz Buehmann
@@ -20,41 +18,50 @@ object RDFGraphLoader {
2018

2119
implicit def pathURIsConverter(uris: Seq[URI]): String = uris.map(p => p.toString).mkString(",")
2220

23-
def loadFromFile(path: String, env: ExecutionEnvironment): RDFGraph = {
24-
val triples = env.readTextFile(path)
25-
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
26-
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2))) // tokens to triple
2721

28-
RDFGraph(triples)
22+
def loadFromDisk(path: String, env: ExecutionEnvironment): RDFGraph = {
23+
loadFromDisk(URI.create(path), env)
2924
}
3025

3126
def loadFromDisk(path: URI, env: ExecutionEnvironment): RDFGraph = {
32-
// create a configuration object
33-
val parameters = new Configuration
27+
loadFromDisk(Seq(path), env)
28+
}
29+
30+
def loadFromDisk(paths: Seq[URI], env: ExecutionEnvironment): RDFGraph = {
31+
// // create a configuration object
32+
// val parameters = new Configuration
33+
//
34+
// // set the recursive enumeration parameter
35+
// parameters.setBoolean("recursive.file.enumeration", true)
36+
// env.readTextFile(f).withParameters(parameters)
3437

35-
// set the recursive enumeration parameter
36-
parameters.setBoolean("recursive.file.enumeration", true)
38+
val tmp: List[String] = paths.map(path => path.toString).toList
3739

40+
val triples = tmp
41+
.map(f => env.readTextFile(f)) // no support to read from multiple paths at once, thus, map + union here
42+
.reduce(_ union _) // TODO Flink 1.5.0 supports multiple paths via FileInputFormat
43+
.mapPartition(p => {
44+
// convert iterator to input stream
45+
val is = ReadableByteChannelFromIterator.toInputStream(p.asJava)
3846

39-
// pass the configuration to the data source
40-
val triples = env.readTextFile(path.toString).withParameters(parameters)
41-
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
42-
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2)))
43-
.name("triples") // tokens to triple
47+
RDFDataMgr.createIteratorTriples(is, Lang.NTRIPLES, null).asScala
48+
})
49+
.name("triples")
4450

4551
RDFGraph(triples)
4652
}
4753

48-
def loadFromDisk(paths: Seq[URI], env: ExecutionEnvironment): RDFGraph = {
54+
def main(args: Array[String]): Unit = {
55+
if (args.length == 0) println("Usage: RDFGraphLoader <PATH_TO_FILE>")
4956

50-
val tmp: List[String] = paths.map(path => path.toString).toList
57+
val path = args(0)
5158

52-
val converter = new NTriplesStringToRDFTriple()
59+
val env = ExecutionEnvironment.getExecutionEnvironment
5360

54-
val triples = tmp
55-
.map(f => env.readTextFile(f).flatMap(line => converter.apply(line))).reduce(_ union _).name("triples")
61+
val ds = RDFGraphLoader.loadFromDisk(path, env).triples
5662

57-
RDFGraph(triples)
63+
println(s"size:${ds.count}")
64+
println("sample data:\n" + ds.first(10).map { _.toString.replaceAll("[\\x00-\\x1f]","???")}.collect().mkString("\n"))
5865
}
5966

6067
}

sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/data/RDFGraphWriter.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package net.sansa_stack.inference.flink.data
22

3-
import java.io.{ByteArrayInputStream, File}
3+
import java.io.ByteArrayInputStream
44
import java.net.URI
55
import java.nio.charset.StandardCharsets
66

77
import org.apache.flink.api.common.operators.Order
88
import org.apache.flink.api.scala._
99
import org.apache.flink.core.fs.FileSystem
1010
import org.apache.jena.rdf.model.{Model, ModelFactory}
11-
12-
import net.sansa_stack.inference.utils.{RDFTripleOrdering, RDFTripleToNTripleString}
11+
import org.apache.jena.sparql.util.TripleComparator
1312
import org.slf4j.LoggerFactory
1413

14+
import net.sansa_stack.inference.utils.{JenaTripleToNTripleString, RDFTripleOrdering}
15+
1516
/**
1617
* Writes an RDF graph to disk.
1718
*
@@ -26,10 +27,10 @@ object RDFGraphWriter {
2627
logger.info("writing triples to disk...")
2728
val startTime = System.currentTimeMillis()
2829

29-
implicit val ordering = RDFTripleOrdering
30+
implicit val ordering = new TripleComparator()
3031

3132
graph.triples.map(t => (t, t)).sortPartition(1, Order.DESCENDING).map(_._1)
32-
.map(new RDFTripleToNTripleString()) // to N-TRIPLES string
33+
.map(new JenaTripleToNTripleString()) // to N-Triples string
3334
.writeAsText(path, writeMode = FileSystem.WriteMode.OVERWRITE)
3435

3536
logger.info("finished writing triples to disk in " + (System.currentTimeMillis()-startTime) + "ms.")
@@ -61,14 +62,14 @@ object RDFGraphWriter {
6162
}
6263

6364
tmp
64-
.map(new RDFTripleToNTripleString()) // to N-TRIPLES string
65+
.map(new JenaTripleToNTripleString()) // to N-TRIPLES string
6566
.writeAsText(path.toString, writeMode = FileSystem.WriteMode.OVERWRITE)
6667

6768
logger.info("finished writing triples to disk in " + (System.currentTimeMillis()-startTime) + "ms.")
6869
}
6970

7071
def convertToModel(graph: RDFGraph) : Model = {
71-
val modelString = graph.triples.map(new RDFTripleToNTripleString())
72+
val modelString = graph.triples.map(new JenaTripleToNTripleString())
7273
.collect().mkString("\n")
7374

7475
val model = ModelFactory.createDefaultModel()

sansa-inference-flink/src/test/scala/net/sansa_stack/inference/flink/RDFGraphTestCase.scala

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,20 @@ package net.sansa_stack.inference.flink
33
import java.util
44
import java.util.Comparator
55

6+
import scala.collection.JavaConverters._
7+
68
import com.google.common.collect.ComparisonChain
7-
import net.sansa_stack.inference.flink.data.RDFGraph
89
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
910
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
1011
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
12+
import org.apache.jena.graph.{NodeFactory, Triple}
13+
import org.apache.jena.sparql.util.TripleComparator
1114
import org.junit.Test
1215
import org.junit.runner.RunWith
1316
import org.junit.runners.Parameterized
14-
import net.sansa_stack.inference.data.RDFTriple
1517

16-
import scala.collection.JavaConverters._
18+
import net.sansa_stack.inference.data.RDFTriple
19+
import net.sansa_stack.inference.flink.data.RDFGraph
1720

1821
/**
1922
* A test case for the computation of the transitive closure (TC).
@@ -26,19 +29,24 @@ class RDFGraphTestCase(mode: TestExecutionMode) extends MultipleProgramsTestBase
2629
def testSubtract(): Unit = {
2730
val env = ExecutionEnvironment.getExecutionEnvironment
2831

32+
val s1 = NodeFactory.createURI("s1")
33+
val p1 = NodeFactory.createURI("p1")
34+
val o1 = NodeFactory.createURI("o1")
35+
val o2 = NodeFactory.createURI("o2")
36+
val o3 = NodeFactory.createURI("o3")
2937

3038
// generate dataset
3139
val g1 = RDFGraph(env.fromCollection(
3240
Seq(
33-
RDFTriple("s1", "p1", "o1"),
34-
RDFTriple("s1", "p1", "o2"),
35-
RDFTriple("s1", "p1", "o3")
41+
Triple.create(s1, p1, o1),
42+
Triple.create(s1, p1, o2),
43+
Triple.create(s1, p1, o3)
3644
)
3745
))
3846
val g2 = RDFGraph(env.fromCollection(
3947
Seq(
40-
RDFTriple("s1", "p1", "o1"),
41-
RDFTriple("s1", "p1", "o2")
48+
Triple.create(s1, p1, o1),
49+
Triple.create(s1, p1, o2)
4250
)
4351
))
4452

@@ -47,17 +55,12 @@ class RDFGraphTestCase(mode: TestExecutionMode) extends MultipleProgramsTestBase
4755

4856
val result = g_diff.triples.collect()
4957
val expected = Seq(
50-
RDFTriple("s1", "p1", "o3")
58+
Triple.create(s1, p1, o3)
5159
)
5260

53-
TestBaseUtils.compareResultCollections(new util.ArrayList(result.asJava), new util.ArrayList(expected.asJava), new Comparator[RDFTriple] {
54-
override def compare(t1: RDFTriple, t2: RDFTriple): Int =
55-
ComparisonChain.start()
56-
.compare(t1.s, t2.s)
57-
.compare(t1.p, t2.p)
58-
.compare(t1.o, t2.o)
59-
.result()
60-
})
61+
TestBaseUtils.compareResultCollections(
62+
new util.ArrayList(result.asJava),
63+
new util.ArrayList(expected.asJava),
64+
new TripleComparator())
6165
}
62-
6366
}

sansa-inference-flink/src/test/scala/net/sansa_stack/inference/flink/conformance/OWLHorstConformanceTest.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
11
package net.sansa_stack.inference.flink.conformance
22

3-
import net.sansa_stack.inference.flink.data.RDFGraphWriter
4-
import net.sansa_stack.test.conformance.{IntegrationTestSuite, OWLHorstConformanceTestBase}
3+
import scala.collection.mutable
4+
55
import org.apache.flink.api.scala._
6+
import org.apache.jena.graph.Triple
67
import org.apache.jena.rdf.model.Model
7-
import net.sansa_stack.inference.data.{RDFTriple, SimpleRDFOps}
8-
import net.sansa_stack.inference.flink.data.{RDFGraph, RDFGraphWriter}
9-
import org.scalatest.Ignore
108

11-
import scala.collection.mutable
9+
import net.sansa_stack.inference.data.{Jena, JenaOps}
10+
import net.sansa_stack.inference.flink.data.{RDFGraph, RDFGraphWriter}
11+
import net.sansa_stack.test.conformance.OWLHorstConformanceTestBase
1212

1313
/**
1414
* The class is to test the conformance of each materialization rule of OWL Horst entailment.
1515
*
1616
* @author Lorenz Buehmann
1717
*
1818
*/
19-
@IntegrationTestSuite
20-
class OWLHorstConformanceTest extends OWLHorstConformanceTestBase(rdfOps = new SimpleRDFOps) with SharedOWLHorstReasonerContext{
19+
class OWLHorstConformanceTest
20+
extends OWLHorstConformanceTestBase[Jena](rdfOps = new JenaOps)
21+
with SharedOWLHorstReasonerContext{
2122

22-
override def computeInferredModel(triples: mutable.HashSet[RDFTriple]): Model = {
23+
override def computeInferredModel(triples: mutable.HashSet[Triple]): Model = {
2324
// distribute triples
2425
val triplesRDD = env.fromCollection(triples)
2526

sansa-inference-flink/src/test/scala/net/sansa_stack/inference/flink/conformance/RDFSConformanceTest.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
11
package net.sansa_stack.inference.flink.conformance
22

3-
import net.sansa_stack.inference.flink.data.RDFGraphWriter
4-
import net.sansa_stack.test.conformance.{IntegrationTestSuite, RDFSConformanceTestBase}
5-
import org.apache.jena.rdf.model.Model
6-
import net.sansa_stack.inference.data.{RDFTriple, SimpleRDFOps}
7-
import net.sansa_stack.inference.flink.data.RDFGraph
3+
import scala.collection.mutable
4+
85
import org.apache.flink.api.scala._
9-
import org.scalatest.Ignore
6+
import org.apache.jena.graph.Triple
7+
import org.apache.jena.rdf.model.Model
108

11-
import scala.collection.mutable
9+
import net.sansa_stack.inference.data.{Jena, JenaOps}
10+
import net.sansa_stack.inference.flink.data.{RDFGraph, RDFGraphWriter}
11+
import net.sansa_stack.test.conformance.RDFSConformanceTestBase
1212

1313
/**
1414
* The class is to test the conformance of each materialization rule of RDFS(simple) entailment.
1515
*
1616
* @author Lorenz Buehmann
1717
*
1818
*/
19-
@IntegrationTestSuite
20-
class RDFSConformanceTest extends RDFSConformanceTestBase(rdfOps = new SimpleRDFOps) with SharedRDFSReasonerContext{
19+
class RDFSConformanceTest
20+
extends RDFSConformanceTestBase[Jena](rdfOps = new JenaOps)
21+
with SharedRDFSReasonerContext{
2122

22-
override def computeInferredModel(triples: mutable.HashSet[RDFTriple]): Model = {
23+
override def computeInferredModel(triples: mutable.HashSet[Triple]): Model = {
2324
// distribute triples
2425
val triplesRDD = env.fromCollection(triples)
2526

0 commit comments

Comments
 (0)