Skip to content

Commit 4d13549

Browse files
committed
Shutdown queue when finished.
1 parent 6d525e3 commit 4d13549

1 file changed

Lines changed: 13 additions & 9 deletions

File tree

src/main/scala/org/renci/relationgraph/Main.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,17 +66,20 @@ object Main extends ZCaseApp[Config] {
6666
classesStream = allClasses(ontology)
6767
classesTasks = classesStream.map(c => ZIO.effectTotal(processSuperclasses(c, whelk, config)))
6868
queue <- Queue.unbounded[Restriction]
69-
_ <- traverse(properties, classes, whelk, config.mode, queue)
70-
restrictionsStream = Stream.fromQueue(queue).map(r => processRestrictionAndExtendQueue(r, classes, whelk, config.mode, queue))
69+
activeRestrictions <- SubscriptionRef.make(Set.empty[Restriction])
70+
_ <- traverse(properties, classes, whelk, config.mode, queue, activeRestrictions.ref)
71+
restrictionsStream = Stream.fromQueue(queue).map(r => processRestrictionAndExtendQueue(r, classes, whelk, config.mode, queue, activeRestrictions.ref))
7172
allTasks = classesTasks ++ restrictionsStream
7273
processed = allTasks.mapMParUnordered(JRuntime.getRuntime.availableProcessors)(identity)
74+
watchFiber <- activeRestrictions.changes.dropUntil(_.isEmpty).take(1).foreach(_ => queue.shutdown).fork
7375
_ <- processed.foreach {
7476
case TriplesGroup(nonredundant, redundant) =>
7577
ZIO.effect {
7678
nonredundant.foreach(nonredundantRDFWriter.triple)
7779
redundant.foreach(redundantRDFWriter.triple)
7880
}
7981
}
82+
_ <- watchFiber.join
8083
stop <- ZIO.effectTotal(System.currentTimeMillis())
8184
_ <- ZIO.effectTotal(scribe.info(s"Computed relations in ${(stop - start) / 1000.0}s"))
8285
} yield ()
@@ -100,16 +103,16 @@ object Main extends ZCaseApp[Config] {
100103

101104
def allClasses(ont: OWLOntology): ZStream[Any, Nothing, OWLClass] = Stream.fromIterable(ont.getClassesInSignature(Imports.INCLUDED).asScala.to(Set) - OWLThing - OWLNothing)
102105

103-
def traverse(properties: Hierarchy, classes: Hierarchy, reasoner: ReasonerState, mode: Config.OutputMode, queue: Queue[Restriction]): UIO[Unit] =
106+
def traverse(properties: Hierarchy, classes: Hierarchy, reasoner: ReasonerState, mode: Config.OutputMode, queue: Queue[Restriction], activeRestrictions: RefM[Set[Restriction]]): UIO[Unit] =
104107
ZIO.foreach_(properties.subclasses.getOrElse(Top, Set.empty)) { subprop =>
105-
traverseProperty(subprop, properties, classes, reasoner, mode, queue)
108+
traverseProperty(subprop, properties, classes, reasoner, mode, queue, activeRestrictions)
106109
}
107110

108-
def traverseProperty(property: AtomicConcept, properties: Hierarchy, classes: Hierarchy, whelk: ReasonerState, mode: Config.OutputMode, queue: Queue[Restriction]): UIO[Unit] = {
111+
def traverseProperty(property: AtomicConcept, properties: Hierarchy, classes: Hierarchy, whelk: ReasonerState, mode: Config.OutputMode, queue: Queue[Restriction], activeRestrictions: RefM[Set[Restriction]]): UIO[Unit] = {
109112
val restrictions = if (hasSubClasses(property, whelk)) {
110113
(classes.subclasses.getOrElse(Top, Set.empty) - Bottom).map(filler => Restriction(Role(property.id), filler))
111114
} else Set.empty
112-
queue.offerAll(restrictions).unit
115+
activeRestrictions.update(current => ZIO.succeed(current ++ restrictions)) *> queue.offerAll(restrictions).unit
113116
}
114117

115118
def hasSubClasses(property: AtomicConcept, whelk: ReasonerState): Boolean = {
@@ -120,17 +123,18 @@ object Main extends ZCaseApp[Config] {
120123
(updatedWhelk.closureSubsBySuperclass.getOrElse(queryConcept, Set.empty) - Bottom).nonEmpty
121124
}
122125

123-
def processRestrictionAndExtendQueue(restriction: Restriction, classes: Hierarchy, whelk: ReasonerState, mode: Config.OutputMode, queue: Queue[Restriction]): UIO[TriplesGroup] = {
126+
def processRestrictionAndExtendQueue(restriction: Restriction, classes: Hierarchy, whelk: ReasonerState, mode: Config.OutputMode, queue: Queue[Restriction], activeRestrictions: RefM[Set[Restriction]]): UIO[TriplesGroup] = {
124127
for {
125128
triples <- ZIO.effectTotal(processRestriction(restriction, whelk, mode))
126-
directFillerSubclassesRestrictions = if (triples.redundant.nonEmpty) (classes.subclasses.getOrElse(restriction.filler, Set.empty) - Bottom).map(c => Restriction(restriction.property, c))
129+
directFillerSubclassesRestrictions = if (triples.redundant.nonEmpty)
130+
(classes.subclasses.getOrElse(restriction.filler, Set.empty) - Bottom).map(c => Restriction(restriction.property, c))
127131
else Set.empty
132+
_ <- activeRestrictions.update(current => ZIO.succeed(current - restriction ++ directFillerSubclassesRestrictions))
128133
_ <- queue.offerAll(directFillerSubclassesRestrictions)
129134
} yield triples
130135
}
131136

132137
def processRestriction(restriction: Restriction, whelk: ReasonerState, mode: Config.OutputMode): TriplesGroup = {
133-
println(s"Querying: ${restriction.property.id} || ${restriction.filler.id}")
134138
val queryConcept = AtomicConcept(s"${restriction.property.id}${restriction.filler.id}")
135139
val er = ExistentialRestriction(restriction.property, restriction.filler)
136140
val axioms = Set(ConceptInclusion(queryConcept, er), ConceptInclusion(er, queryConcept))

0 commit comments

Comments
 (0)