Skip to content
This repository was archived by the owner on Oct 8, 2020. It is now read-only.

Commit 2e84e5b

Browse files
Improved Flink conformance test setup
1 parent 9da7479 commit 2e84e5b

3 files changed

Lines changed: 53 additions & 19 deletions

File tree

sansa-inference-flink/src/test/scala/net/sansa_stack/inference/flink/conformance/SharedOWLHorstReasonerContext.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,13 @@ import org.scalatest.{BeforeAndAfterAll, Suite}
1313
* @author Lorenz Buehmann
1414
*/
1515
@RunWith(classOf[Parameterized])
16-
trait SharedOWLHorstReasonerContext extends BeforeAndAfterAll with ReasonerContextProvider{
16+
trait SharedOWLHorstReasonerContext
17+
extends SharedReasonerContext[ForwardRuleReasonerOWLHorst] {
1718
self: Suite =>
1819

19-
@transient private var _reasoner: ForwardRuleReasonerOWLHorst = _
20-
21-
val reasoner: ForwardRuleReasoner = _reasoner
22-
23-
@transient private var _env: ExecutionEnvironment = _
24-
def env: ExecutionEnvironment = _env
25-
2620

2721
override def beforeAll(): Unit = {
2822
super.beforeAll()
29-
_env = ExecutionEnvironment.getExecutionEnvironment
3023
_reasoner = new ForwardRuleReasonerOWLHorst(env)
3124
}
3225
}

sansa-inference-flink/src/test/scala/net/sansa_stack/inference/flink/conformance/SharedRDFSReasonerContext.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package net.sansa_stack.inference.flink.conformance
33
import net.sansa_stack.inference.flink.forwardchaining.{ForwardRuleReasoner, ForwardRuleReasonerRDFS}
44
import net.sansa_stack.inference.rules.RDFSLevel
55
import org.apache.flink.api.scala.ExecutionEnvironment
6+
import org.apache.flink.test.util.AbstractTestBase
67
import org.junit.runner.RunWith
78
import org.junit.runners.Parameterized
89
import org.scalatest.{BeforeAndAfterAll, Suite}
@@ -13,20 +14,12 @@ import org.scalatest.{BeforeAndAfterAll, Suite}
1314
* @author Lorenz Buehmann
1415
*/
1516
@RunWith(classOf[Parameterized])
16-
trait SharedRDFSReasonerContext extends BeforeAndAfterAll with ReasonerContextProvider{
17+
trait SharedRDFSReasonerContext
18+
extends SharedReasonerContext[ForwardRuleReasonerRDFS] {
1719
self: Suite =>
1820

19-
@transient private var _reasoner: ForwardRuleReasonerRDFS = _
20-
def reasoner: ForwardRuleReasoner = _reasoner
21-
22-
@transient private var _env: ExecutionEnvironment = _
23-
def env: ExecutionEnvironment = _env
24-
25-
2621
override def beforeAll(): Unit = {
2722
super.beforeAll()
28-
_env = ExecutionEnvironment.getExecutionEnvironment
29-
_env.getConfig.disableSysoutLogging()
3023
_reasoner = new ForwardRuleReasonerRDFS(env)
3124
_reasoner.level = RDFSLevel.SIMPLE
3225
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package net.sansa_stack.inference.flink.conformance
2+
3+
import net.sansa_stack.rdf.common.kryo.jena.JenaKryoSerializers.{NodeSerializer, TripleSerializer}
4+
import org.apache.flink.api.scala.ExecutionEnvironment
5+
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
6+
import org.apache.jena.graph.{Node, Triple}
7+
import org.scalatest.{BeforeAndAfterAll, Suite}
8+
9+
import net.sansa_stack.inference.flink.forwardchaining.ForwardRuleReasoner
10+
11+
/**
12+
* A shared reasoner and Flink environment used for multiple test cases using the same resources.
13+
*
14+
* @author Lorenz Buehmann
15+
*/
16+
trait SharedReasonerContext[R <: ForwardRuleReasoner]
17+
extends BeforeAndAfterAll
18+
with ReasonerContextProvider {
19+
self: Suite =>
20+
21+
@transient protected var _reasoner: R = _
22+
def reasoner: R = _reasoner
23+
24+
@transient private var _env: ExecutionEnvironment = _
25+
def env: ExecutionEnvironment = _env
26+
27+
override def beforeAll(): Unit = {
28+
super.beforeAll()
29+
_env = ExecutionEnvironment.getExecutionEnvironment
30+
_env.setParallelism(4)
31+
_env.getConfig.disableSysoutLogging()
32+
_env.getConfig.addDefaultKryoSerializer(classOf[Triple], classOf[TripleSerializer])
33+
_env.getConfig.addDefaultKryoSerializer(classOf[Node], classOf[NodeSerializer])
34+
}
35+
36+
import org.apache.flink.test.util.MiniClusterWithClientResource
37+
import org.junit.ClassRule
38+
39+
private val DEFAULT_PARALLELISM = 4
40+
41+
@ClassRule val miniClusterResource = new MiniClusterWithClientResource(
42+
new MiniClusterResourceConfiguration.Builder()
43+
.setNumberTaskManagers(1)
44+
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
45+
.build
46+
)
47+
48+
}

0 commit comments

Comments
 (0)