Skip to content

Commit 8abc4dc

Browse files
authored
Migrate to ZIO 2. Reduce contention updating seen restrictions. (#116)
1 parent 4ab2dcc commit 8abc4dc

4 files changed

Lines changed: 85 additions & 79 deletions

File tree

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, sbtVersion, gitC
2626

2727
buildInfoPackage := "org.renci.relationgraph"
2828

29-
val zioVersion = "1.0.13"
29+
val zioVersion = "2.0.0-RC3"
3030

3131
libraryDependencies ++= {
3232
Seq(

src/main/scala/org/renci/relationgraph/Main.scala

Lines changed: 71 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@ import org.semanticweb.owlapi.model._
1515
import org.semanticweb.owlapi.model.parameters.Imports
1616
import scribe.Level
1717
import scribe.filter.{packageName, select}
18+
import zio.ZIO.attemptBlockingIO
1819
import zio._
19-
import zio.blocking._
2020
import zio.stream._
2121

2222
import java.io.{File, FileOutputStream}
2323
import java.lang.{Runtime => JRuntime}
2424
import java.nio.charset.StandardCharsets
2525
import java.security.MessageDigest
2626
import java.util.Base64
27+
import java.util.concurrent.TimeUnit
2728
import scala.io.Source
2829
import scala.jdk.CollectionConverters._
2930

@@ -48,27 +49,30 @@ object Main extends ZCaseApp[Config] {
4849
.withHandler(minimumLevel = Some(if (config.verbose) Level.Info else Level.Warn))
4950
.replace()
5051
}
51-
val program = createStreamRDF(config.outputFile).use { rdfWriter =>
52-
for {
53-
fileProperties <- config.propertiesFile.map(readPropertiesFile).getOrElse(ZIO.succeed(Set.empty[AtomicConcept]))
54-
specifiedProperties = fileProperties ++ config.property.map(prop => AtomicConcept(prop)).to(Set)
55-
ontology <- loadOntology(config.ontologyFile)
56-
whelkOntology = Bridge.ontologyToAxioms(ontology)
57-
_ <- ZIO.succeed(scribe.info("Running reasoner"))
58-
whelk = Reasoner.assert(whelkOntology, disableBottom = config.disableOwlNothing.bool)
59-
indexedWhelk = IndexedReasonerState(whelk)
60-
_ <- ZIO.succeed(scribe.info("Done running reasoner"))
61-
_ <- ZIO.fail(new Exception("Ontology is incoherent; please correct unsatisfiable classes.")).when(isIncoherent(whelk))
62-
_ <- effectBlockingIO(rdfWriter.triple(Triple.create(NodeFactory.createBlankNode("redundant"), RDFType, OWLOntology)))
63-
.when(config.mode == OWLMode)
64-
start <- ZIO.succeed(System.currentTimeMillis())
65-
triplesStream = computeRelations(ontology, indexedWhelk, specifiedProperties, config.outputSubclasses.bool, config.reflexiveSubclasses.bool, config.equivalenceAsSubclass.bool, config.outputClasses.bool, config.outputIndividuals.bool, config.mode)
66-
_ <- triplesStream.foreach {
67-
case TriplesGroup(triples) => ZIO.effect(triples.foreach(rdfWriter.triple))
68-
}
69-
stop <- ZIO.succeed(System.currentTimeMillis())
70-
_ <- ZIO.succeed(scribe.info(s"Computed relations in ${(stop - start) / 1000.0}s"))
71-
} yield ()
52+
53+
val program = ZIO.scoped {
54+
createStreamRDF(config.outputFile).flatMap { rdfWriter =>
55+
for {
56+
fileProperties <- config.propertiesFile.map(readPropertiesFile).getOrElse(ZIO.succeed(Set.empty[AtomicConcept]))
57+
specifiedProperties = fileProperties ++ config.property.map(prop => AtomicConcept(prop)).to(Set)
58+
ontology <- loadOntology(config.ontologyFile)
59+
whelkOntology = Bridge.ontologyToAxioms(ontology)
60+
_ <- ZIO.succeed(scribe.info("Running reasoner"))
61+
whelk = Reasoner.assert(whelkOntology, disableBottom = config.disableOwlNothing.bool)
62+
indexedWhelk = IndexedReasonerState(whelk)
63+
_ <- ZIO.succeed(scribe.info("Done running reasoner"))
64+
_ <- ZIO.fail(new Exception("Ontology is incoherent; please correct unsatisfiable classes.")).when(isIncoherent(whelk))
65+
_ <- attemptBlockingIO(rdfWriter.triple(Triple.create(NodeFactory.createBlankNode("redundant"), RDFType, OWLOntology)))
66+
.when(config.mode == OWLMode)
67+
start <- Clock.currentTime(TimeUnit.MILLISECONDS)
68+
triplesStream = computeRelations(ontology, indexedWhelk, specifiedProperties, config.outputSubclasses.bool, config.reflexiveSubclasses.bool, config.equivalenceAsSubclass.bool, config.outputClasses.bool, config.outputIndividuals.bool, config.mode)
69+
_ <- triplesStream.foreach {
70+
case TriplesGroup(triples) => ZIO.attempt(triples.foreach(rdfWriter.triple))
71+
}
72+
stop <- Clock.currentTime(TimeUnit.MILLISECONDS)
73+
_ <- ZIO.succeed(scribe.info(s"Computed relations in ${(stop - start) / 1000.0}s"))
74+
} yield ()
75+
}
7276
}
7377
configureLogging *>
7478
program.as(ExitCode.success)
@@ -81,58 +85,58 @@ object Main extends ZCaseApp[Config] {
8185
def computeRelations(ontology: OWLOntology, whelk: IndexedReasonerState, specifiedProperties: Set[AtomicConcept], outputSubclasses: Boolean, reflexiveSubclasses: Boolean, equivalenceAsSubclass: Boolean, outputClasses: Boolean, outputIndividuals: Boolean, mode: Config.OutputMode): UStream[TriplesGroup] = {
8286
val classes = classHierarchy(whelk.state)
8387
val properties = propertyHierarchy(ontology)
88+
val allProperties = properties.subclasses.keySet.map(c => Role(c.id))
8489
val classesTasks = if (outputSubclasses) {
8590
allClasses(ontology).map(c => ZIO.succeed(processSubclasses(c, whelk.state, reflexiveSubclasses, equivalenceAsSubclass, outputClasses, outputIndividuals)))
8691
} else Stream.empty
8792
val streamZ = for {
8893
queue <- Queue.unbounded[Restriction]
8994
activeRestrictions <- Ref.make(0)
90-
seenRef <- Ref.make(Map.empty[AtomicConcept, Set[AtomicConcept]])
91-
_ <- traverse(specifiedProperties, properties, classes, queue, activeRestrictions, seenRef)
92-
restrictionsStream = Stream.fromQueue(queue).map(r => processRestrictionAndExtendQueue(r, properties, classes, whelk, mode, specifiedProperties.isEmpty, outputClasses, outputIndividuals, queue, activeRestrictions, seenRef))
95+
seenRefs <- ZIO.foreach(allProperties)(p => Ref.make(Set.empty[AtomicConcept]).map(p -> _)).map(_.toMap)
96+
_ <- traverse(specifiedProperties, properties, classes, queue, activeRestrictions, seenRefs)
97+
restrictionsStream = Stream.fromQueue(queue).map(r => processRestrictionAndExtendQueue(r, properties, classes, whelk, mode, specifiedProperties.isEmpty, outputClasses, outputIndividuals, queue, activeRestrictions, seenRefs))
9398
allTasks = classesTasks ++ restrictionsStream
94-
} yield allTasks.mapMParUnordered(JRuntime.getRuntime.availableProcessors)(identity)
99+
} yield allTasks.mapZIOParUnordered(JRuntime.getRuntime.availableProcessors)(identity)
95100
Stream.unwrap(streamZ)
96101
}
97102

98-
def readPropertiesFile(file: String): ZIO[Blocking, Throwable, Set[AtomicConcept]] =
99-
effectBlocking(Source.fromFile(file, "utf-8")).bracketAuto { source =>
100-
effectBlocking(source.getLines().map(_.trim).filter(_.nonEmpty).map(line => AtomicConcept(line)).to(Set))
103+
def readPropertiesFile(file: String): ZIO[Any, Throwable, Set[AtomicConcept]] =
104+
ZIO.attemptBlocking(Source.fromFile(file, "utf-8")).acquireReleaseWithAuto { source =>
105+
ZIO.attemptBlocking(source.getLines().map(_.trim).filter(_.nonEmpty).map(line => AtomicConcept(line)).to(Set))
101106
}
102107

103108
def loadOntology(path: String): Task[OWLOntology] = for {
104-
manager <- ZIO.effect(OWLManager.createOWLOntologyManager())
105-
ontology <- ZIO.effect(manager.loadOntologyFromOntologyDocument(new File(path)))
109+
manager <- ZIO.attempt(OWLManager.createOWLOntologyManager())
110+
ontology <- ZIO.attemptBlocking(manager.loadOntologyFromOntologyDocument(new File(path)))
106111
} yield ontology
107112

108-
def createStreamRDF(path: String): TaskManaged[StreamRDF] = for {
109-
outputStream <- Managed.fromAutoCloseable(ZIO.effect(new FileOutputStream(new File(path))))
110-
rdfWriter <- Managed.make {
111-
ZIO.effect {
113+
def createStreamRDF(path: String): ZIO[Scope, Throwable, StreamRDF] = {
114+
ZIO.acquireRelease(ZIO.attempt(new FileOutputStream(new File(path))))(stream => ZIO.succeed(stream.close())).flatMap { outputStream =>
115+
ZIO.acquireRelease(ZIO.attempt {
112116
val stream = StreamRDFWriter.getWriterStream(outputStream, RDFFormat.TURTLE_FLAT, null)
113117
stream.start()
114118
stream
115-
}
116-
}(stream => ZIO.succeed(stream.finish()))
117-
} yield rdfWriter
119+
})(stream => ZIO.succeed(stream.finish()))
120+
}
121+
}
118122

119123
def allClasses(ont: OWLOntology): ZStream[Any, Nothing, OWLClass] = Stream.fromIterable(ont.getClassesInSignature(Imports.INCLUDED).asScala.to(Set) - OWLThing - OWLNothing)
120124

121-
def traverse(specifiedProperties: Set[AtomicConcept], properties: Hierarchy, classes: Hierarchy, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRef: Ref[Map[AtomicConcept, Set[AtomicConcept]]]): UIO[Unit] = {
125+
def traverse(specifiedProperties: Set[AtomicConcept], properties: Hierarchy, classes: Hierarchy, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[Unit] = {
122126
val descendProperties = specifiedProperties.isEmpty
123127
val queryProperties = if (descendProperties) properties.subclasses.getOrElse(Top, Set.empty) - Bottom else specifiedProperties
124-
if (queryProperties.nonEmpty) ZIO.foreachPar_(queryProperties) { subprop =>
125-
traverseProperty(subprop, classes, queue, activeRestrictions, seenRef)
128+
if (queryProperties.nonEmpty) ZIO.foreachParDiscard(queryProperties) { subprop =>
129+
traverseProperty(subprop, classes, queue, activeRestrictions, seenRefs)
126130
}
127131
else queue.shutdown
128132
}
129133

130-
def traverseProperty(property: AtomicConcept, classes: Hierarchy, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRef: Ref[Map[AtomicConcept, Set[AtomicConcept]]]): UIO[Unit] = {
134+
def traverseProperty(property: AtomicConcept, classes: Hierarchy, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[Unit] = {
131135
val restrictions = (classes.subclasses.getOrElse(Top, Set.empty) - Bottom).map(filler => Restriction(Role(property.id), filler))
136+
val propSeenRef = seenRefs(Role(property.id))
132137
for {
133-
_ <- seenRef.update { seen =>
134-
val seenForThisProperty = seen.getOrElse(property, Set.empty) ++ restrictions.map(_.filler)
135-
seen.updated(property, seenForThisProperty)
138+
_ <- propSeenRef.update { seenForThisProperty =>
139+
seenForThisProperty ++ restrictions.map(_.filler)
136140
}
137141
_ <- activeRestrictions.update(current => current + restrictions.size)
138142
_ <- queue.offerAll(restrictions).unit
@@ -141,30 +145,34 @@ object Main extends ZCaseApp[Config] {
141145
} yield ()
142146
}
143147

144-
def processRestrictionAndExtendQueue(restriction: Restriction, properties: Hierarchy, classes: Hierarchy, whelk: IndexedReasonerState, mode: Config.OutputMode, descendProperties: Boolean, outputClasses: Boolean, outputIndividuals: Boolean, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRef: Ref[Map[AtomicConcept, Set[AtomicConcept]]]): UIO[TriplesGroup] = {
148+
def processRestrictionAndExtendQueue(restriction: Restriction, properties: Hierarchy, classes: Hierarchy, whelk: IndexedReasonerState, mode: Config.OutputMode, descendProperties: Boolean, outputClasses: Boolean, outputIndividuals: Boolean, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[TriplesGroup] = {
145149
val triples = processRestriction(restriction, whelk, mode, outputClasses, outputIndividuals)
150+
val continue = triples.redundant.nonEmpty
146151
for {
147-
directFillerSubclassesRestrictions <- if (triples.redundant.nonEmpty) seenRef.modify { seen =>
148-
val propertyConcept = AtomicConcept(restriction.property.id)
149-
val seenForThisProperty = seen.getOrElse(propertyConcept, Set.empty)
152+
propSeenRef <- ZIO.fromOption(seenRefs.get(restriction.property)).orDieWith(_ => new Exception("A property was encountered that was not in the seen map. This should never happen."))
153+
directFillerSubclassesRestrictions <- if (continue) propSeenRef.modify { seenForThisProperty =>
150154
val subClasses = classes.subclasses.getOrElse(restriction.filler, Set.empty) - Bottom
151155
val unseenSubClasses = subClasses -- seenForThisProperty
152-
val updatedSeen = seen.updated(propertyConcept, seenForThisProperty ++ subClasses)
156+
val updatedSeenForThisProperty = seenForThisProperty ++ subClasses
153157
val newRestrictions = unseenSubClasses.map(c => Restriction(restriction.property, c))
154-
if (descendProperties) {
155-
val subProperties = properties.subclasses.getOrElse(propertyConcept, Set.empty) - Bottom
156-
subProperties.foldLeft((newRestrictions, updatedSeen)) { case ((accRestrictions, accSeen), subProperty) =>
157-
val seenClassesForSubProperty = accSeen.getOrElse(subProperty, Set.empty)
158-
val updatedRestrictions = if (!seenClassesForSubProperty(restriction.filler))
159-
accRestrictions + Restriction(Role(subProperty.id), restriction.filler)
160-
else accRestrictions
161-
val updatedAccSeen = accSeen.updated(subProperty, seenClassesForSubProperty + restriction.filler)
162-
(updatedRestrictions, updatedAccSeen)
158+
(newRestrictions, updatedSeenForThisProperty)
159+
} else ZIO.succeed(Set.empty[Restriction])
160+
directSubPropertyRestrictions <- if (continue && descendProperties) {
161+
val propertyConcept = AtomicConcept(restriction.property.id)
162+
val subProperties = properties.subclasses.getOrElse(propertyConcept, Set.empty) - Bottom
163+
ZIO.foreach(subProperties) { subProperty =>
164+
val subPropSeenRef = seenRefs(Role(subProperty.id))
165+
subPropSeenRef.modify { seenClassesForSubProperty =>
166+
val maybeRestriction = if (!seenClassesForSubProperty(restriction.filler))
167+
Set(Restriction(Role(subProperty.id), restriction.filler))
168+
else Set.empty[Restriction]
169+
(maybeRestriction, seenClassesForSubProperty + restriction.filler)
163170
}
164-
} else (newRestrictions, updatedSeen)
171+
}.map(_.flatten)
165172
} else ZIO.succeed(Set.empty[Restriction])
166-
_ <- activeRestrictions.update(current => current - 1 + directFillerSubclassesRestrictions.size)
167-
_ <- queue.offerAll(directFillerSubclassesRestrictions)
173+
newRestrictions = directFillerSubclassesRestrictions ++ directSubPropertyRestrictions
174+
_ <- activeRestrictions.update(current => current - 1 + newRestrictions.size)
175+
_ <- queue.offerAll(newRestrictions)
168176
active <- activeRestrictions.get
169177
_ <- queue.shutdown.when(active < 1)
170178
} yield triples
@@ -235,7 +243,7 @@ object Main extends ZCaseApp[Config] {
235243
def processSubclasses(cls: OWLClass, whelk: ReasonerState, reflexiveSubclasses: Boolean, equivalenceAsSubclass: Boolean, outputClasses: Boolean, outputIndividuals: Boolean): TriplesGroup = {
236244
val obj = NodeFactory.createURI(cls.getIRI.toString)
237245
val concept = AtomicConcept(cls.getIRI.toString)
238-
val subConcepts = (whelk.closureSubsBySuperclass.getOrElse(concept, Set.empty) - BuiltIn.Bottom)
246+
val subConcepts = whelk.closureSubsBySuperclass.getOrElse(concept, Set.empty) - BuiltIn.Bottom
239247
val individualsTriples = if (outputIndividuals) {
240248
subConcepts.collect { case Nominal(Individual(id)) =>
241249
Triple.create(NodeFactory.createURI(id), RDFType, obj)

src/main/scala/org/renci/relationgraph/ZCaseApp.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ import caseapp.core.help.{Help, WithHelp}
55
import caseapp.core.parser.Parser
66
import caseapp.core.util.Formatter
77
import caseapp.core.{Error, RemainingArgs}
8+
import zio.Console.printLine
89
import zio._
9-
import zio.console.{Console, putStrLn}
1010

1111
import java.io.IOException
1212

@@ -26,13 +26,13 @@ abstract class ZCaseApp[T](implicit val parser0: Parser[T], val messages: Help[T
2626
def run(options: T, remainingArgs: RemainingArgs): ZIO[ZEnv, Nothing, ExitCode]
2727

2828
private[this] def error(message: Error): ZIO[Console, IOException, ExitCode] =
29-
putStrLn(message.message).as(ExitCode.failure)
29+
printLine(message.message).as(ExitCode.failure)
3030

3131
private[this] def helpAsked: ZIO[Console, IOException, ExitCode] =
32-
putStrLn(messages.withHelp.help).as(ExitCode.success)
32+
printLine(messages.withHelp.help).as(ExitCode.success)
3333

3434
private[this] def usageAsked: ZIO[Console, IOException, ExitCode] =
35-
putStrLn(messages.withHelp.usage).as(ExitCode.success)
35+
printLine(messages.withHelp.usage).as(ExitCode.success)
3636

3737
/**
3838
* Arguments are expanded then parsed. By default, argument expansion is the identity function.

src/test/scala/org/renci/relationgraph/TestRelationGraph.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import org.geneontology.whelk.{Bridge, Reasoner}
55
import org.renci.relationgraph.Main.{IndexedReasonerState, TriplesGroup}
66
import org.semanticweb.owlapi.apibinding.OWLManager
77
import zio._
8-
import zio.duration.durationInt
9-
import zio.test.Assertion._
108
import zio.test.TestAspect.timeout
119
import zio.test._
1210

@@ -19,16 +17,16 @@ object TestRelationGraph extends DefaultRunnableSpec {
1917

2018
def spec =
2119
suite("RelationGraphSpec")(
22-
testM("testMaterializedRelations") {
20+
test("testMaterializedRelations") {
2321
for {
24-
manager <- ZIO.effect(OWLManager.createOWLOntologyManager())
25-
ontology <- ZIO.effect(manager.loadOntologyFromOntologyDocument(this.getClass.getResourceAsStream("materialize_test.ofn")))
22+
manager <- ZIO.attempt(OWLManager.createOWLOntologyManager())
23+
ontology <- ZIO.attempt(manager.loadOntologyFromOntologyDocument(this.getClass.getResourceAsStream("materialize_test.ofn")))
2624
whelkOntology = Bridge.ontologyToAxioms(ontology)
2725
whelk = Reasoner.assert(whelkOntology)
2826
indexedWhelk = IndexedReasonerState(whelk)
2927
resultsStream = Main.computeRelations(ontology, indexedWhelk, Set.empty, true, false, false, true, false, Config.RDFMode)
3028
results <- resultsStream.runCollect
31-
triples <- ZIO.fromOption(results.reduceOption((left, right) => TriplesGroup(left.redundant ++ right.redundant)))
29+
triples <- ZIO.from(results.reduceOption((left, right) => TriplesGroup(left.redundant ++ right.redundant)))
3230
TriplesGroup(redundant) = triples
3331
} yield
3432
assertTrue(redundant.contains(Triple.create(n(s"$Prefix#A"), P, n(s"$Prefix#D")))) &&
@@ -38,19 +36,19 @@ object TestRelationGraph extends DefaultRunnableSpec {
3836
assertTrue(redundant.contains(Triple.create(n(s"$Prefix#E"), P, n(s"$Prefix#C")))) &&
3937
assertTrue(redundant.contains(Triple.create(n(s"$Prefix#E"), P, n(s"$Prefix#A"))))
4038
},
41-
(testM("exitProperlyWhenNoObjectPropertiesAreDeclared") {
39+
test("exitProperlyWhenNoObjectPropertiesAreDeclared") {
4240
for {
43-
manager <- ZIO.effect(OWLManager.createOWLOntologyManager())
44-
ontology <- ZIO.effect(manager.loadOntologyFromOntologyDocument(this.getClass.getResourceAsStream("apo.owl")))
41+
manager <- ZIO.attempt(OWLManager.createOWLOntologyManager())
42+
ontology <- ZIO.attempt(manager.loadOntologyFromOntologyDocument(this.getClass.getResourceAsStream("apo.owl")))
4543
whelkOntology = Bridge.ontologyToAxioms(ontology)
4644
whelk = Reasoner.assert(whelkOntology)
4745
indexedWhelk = IndexedReasonerState(whelk)
4846
resultsStream = Main.computeRelations(ontology, indexedWhelk, Set.empty, true, false, false, true, false, Config.RDFMode)
4947
results <- resultsStream.runCollect
50-
triples <- ZIO.fromOption(results.reduceOption((left, right) => TriplesGroup(left.redundant ++ right.redundant)))
48+
triples <- ZIO.from(results.reduceOption((left, right) => TriplesGroup(left.redundant ++ right.redundant)))
5149
TriplesGroup(redundant) = triples
5250
} yield assertTrue(ontology.getObjectPropertiesInSignature().isEmpty) && assertTrue(redundant.nonEmpty)
53-
}) @@ timeout(5.seconds)
51+
} @@ timeout(5.seconds)
5452
)
5553

5654
}

0 commit comments

Comments
 (0)