@@ -66,20 +66,20 @@ object Main extends ZCaseApp[Config] {
6666 classesStream = allClasses(ontology)
6767 classesTasks = classesStream.map(c => ZIO .effectTotal(processSuperclasses(c, whelk, config)))
6868 queue <- Queue .unbounded[Restriction ]
69- activeRestrictions <- SubscriptionRef .make(Set .empty[ Restriction ] )
70- _ <- traverse(properties, classes, whelk, config.mode, queue, activeRestrictions.ref )
71- restrictionsStream = Stream .fromQueue(queue).map(r => processRestrictionAndExtendQueue(r, classes, whelk, config.mode, queue, activeRestrictions.ref ))
69+ activeRestrictions <- Ref .make(0 )
70+ _ <- traverse(properties, classes, whelk, config.mode, queue, activeRestrictions)
71+ restrictionsStream = Stream .fromQueue(queue).map(r => processRestrictionAndExtendQueue(r, classes, whelk, config.mode, queue, activeRestrictions))
7272 allTasks = classesTasks ++ restrictionsStream
7373 processed = allTasks.mapMParUnordered(JRuntime .getRuntime.availableProcessors)(identity)
74- watchFiber <- activeRestrictions.changes.dropUntil(_.isEmpty).take(1 ).foreach(_ => queue.shutdown).fork
74+ // watchFiber <- activeRestrictions.changes.dropUntil(_.isEmpty).take(1).foreach(_ => queue.shutdown).fork
7575 _ <- processed.foreach {
7676 case TriplesGroup (nonredundant, redundant) =>
7777 ZIO .effect {
7878 nonredundant.foreach(nonredundantRDFWriter.triple)
7979 redundant.foreach(redundantRDFWriter.triple)
8080 }
8181 }
82- _ <- watchFiber.join
82+ // _ <- watchFiber.join
8383 stop <- ZIO .effectTotal(System .currentTimeMillis())
8484 _ <- ZIO .effectTotal(scribe.info(s " Computed relations in ${(stop - start) / 1000.0 }s " ))
8585 } yield ()
@@ -103,16 +103,22 @@ object Main extends ZCaseApp[Config] {
103103
104104 def allClasses (ont : OWLOntology ): ZStream [Any , Nothing , OWLClass ] = Stream .fromIterable(ont.getClassesInSignature(Imports .INCLUDED ).asScala.to(Set ) - OWLThing - OWLNothing )
105105
106- def traverse (properties : Hierarchy , classes : Hierarchy , reasoner : ReasonerState , mode : Config .OutputMode , queue : Queue [Restriction ], activeRestrictions : RefM [ Set [ Restriction ] ]): UIO [Unit ] =
106+ def traverse (properties : Hierarchy , classes : Hierarchy , reasoner : ReasonerState , mode : Config .OutputMode , queue : Queue [Restriction ], activeRestrictions : Ref [ Int ]): UIO [Unit ] =
107107 ZIO .foreach_(properties.subclasses.getOrElse(Top , Set .empty)) { subprop =>
108108 traverseProperty(subprop, properties, classes, reasoner, mode, queue, activeRestrictions)
109109 }
110110
111- def traverseProperty (property : AtomicConcept , properties : Hierarchy , classes : Hierarchy , whelk : ReasonerState , mode : Config .OutputMode , queue : Queue [Restriction ], activeRestrictions : RefM [ Set [ Restriction ] ]): UIO [Unit ] = {
111+ def traverseProperty (property : AtomicConcept , properties : Hierarchy , classes : Hierarchy , whelk : ReasonerState , mode : Config .OutputMode , queue : Queue [Restriction ], activeRestrictions : Ref [ Int ]): UIO [Unit ] = {
112112 val restrictions = if (hasSubClasses(property, whelk)) {
113113 (classes.subclasses.getOrElse(Top , Set .empty) - Bottom ).map(filler => Restriction (Role (property.id), filler))
114114 } else Set .empty
115- activeRestrictions.update(current => ZIO .succeed(current ++ restrictions)) *> queue.offerAll(restrictions).unit
115+ for {
116+ _ <- activeRestrictions.update(current => current + restrictions.size)
117+ _ <- queue.offerAll(restrictions).unit
118+ active <- activeRestrictions.get
119+ _ <- queue.shutdown.when(active < 1 )
120+ } yield ()
121+
116122 }
117123
118124 def hasSubClasses (property : AtomicConcept , whelk : ReasonerState ): Boolean = {
@@ -123,14 +129,16 @@ object Main extends ZCaseApp[Config] {
123129 (updatedWhelk.closureSubsBySuperclass.getOrElse(queryConcept, Set .empty) - Bottom ).nonEmpty
124130 }
125131
126- def processRestrictionAndExtendQueue (restriction : Restriction , classes : Hierarchy , whelk : ReasonerState , mode : Config .OutputMode , queue : Queue [Restriction ], activeRestrictions : RefM [ Set [ Restriction ] ]): UIO [TriplesGroup ] = {
132+ def processRestrictionAndExtendQueue (restriction : Restriction , classes : Hierarchy , whelk : ReasonerState , mode : Config .OutputMode , queue : Queue [Restriction ], activeRestrictions : Ref [ Int ]): UIO [TriplesGroup ] = {
127133 for {
128134 triples <- ZIO .effectTotal(processRestriction(restriction, whelk, mode))
129135 directFillerSubclassesRestrictions = if (triples.redundant.nonEmpty)
130136 (classes.subclasses.getOrElse(restriction.filler, Set .empty) - Bottom ).map(c => Restriction (restriction.property, c))
131137 else Set .empty
132- _ <- activeRestrictions.update(current => ZIO .succeed( current - restriction ++ directFillerSubclassesRestrictions) )
138+ _ <- activeRestrictions.update(current => current - 1 + directFillerSubclassesRestrictions.size )
133139 _ <- queue.offerAll(directFillerSubclassesRestrictions)
140+ active <- activeRestrictions.get
141+ _ <- queue.shutdown.when(active < 1 )
134142 } yield triples
135143 }
136144
0 commit comments