Skip to content

Commit 3e3499a

Browse files
authored
Merge pull request #27 from balhoff/queue
Use graph traversal approach
2 parents 48f9314 + 5a4127f commit 3e3499a

3 files changed

Lines changed: 145 additions & 81 deletions

File tree

build.sbt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ libraryDependencies ++= {
2222
Seq(
2323
"dev.zio" %% "zio" % zioVersion,
2424
"dev.zio" %% "zio-streams" % zioVersion,
25-
"dev.zio" %% "zio-interop-monix" % "3.2.2.0-RC2",
26-
"io.monix" %% "monix" % "3.2.2",
2725
"org.geneontology" %% "whelk-owlapi" % "1.1",
2826
"com.outr" %% "scribe-slf4j" % "3.5.5",
2927
"com.github.alexarchambault" %% "case-app" % "2.0.6",

src/main/scala/org/renci/relationgraph/Main.scala

Lines changed: 142 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,11 @@
11
package org.renci.relationgraph
22

3-
import java.io.{File, FileOutputStream, OutputStream}
4-
import java.lang.{Runtime => JRuntime}
5-
import java.nio.charset.StandardCharsets
6-
import java.security.MessageDigest
7-
import java.util.Base64
8-
93
import caseapp._
10-
import monix.eval.Task
11-
import monix.execution.Scheduler.Implicits.global
12-
import monix.reactive._
134
import org.apache.jena.graph.{Node, NodeFactory, Triple}
145
import org.apache.jena.riot.RDFFormat
156
import org.apache.jena.riot.system.{StreamRDF, StreamRDFWriter}
167
import org.apache.jena.vocabulary.{OWL2, RDF, RDFS}
8+
import org.geneontology.whelk.BuiltIn.{Bottom, Top}
179
import org.geneontology.whelk._
1810
import org.renci.relationgraph.Config.{OWLMode, RDFMode}
1911
import org.semanticweb.owlapi.apibinding.OWLFunctionalSyntaxFactory.{OWLNothing, OWLThing}
@@ -22,8 +14,13 @@ import org.semanticweb.owlapi.model._
2214
import org.semanticweb.owlapi.model.parameters.Imports
2315
import zio._
2416
import zio.blocking._
25-
import zio.interop.monix._
17+
import zio.stream._
2618

19+
import java.io.{File, FileOutputStream, OutputStream}
20+
import java.lang.{Runtime => JRuntime}
21+
import java.nio.charset.StandardCharsets
22+
import java.security.MessageDigest
23+
import java.util.Base64
2724
import scala.io.Source
2825
import scala.jdk.CollectionConverters._
2926

@@ -36,8 +33,6 @@ object Main extends ZCaseApp[Config] {
3633
private val OWLOnProperty = OWL2.onProperty.asNode
3734
private val OWLSomeValuesFrom = OWL2.someValuesFrom.asNode
3835
private val OWLOntology = OWL2.Ontology.asNode
39-
private val df = OWLManager.getOWLDataFactory
40-
private val OWLTopObjectProperty = df.getOWLTopObjectProperty
4136

4237
override def run(config: Config, arg: RemainingArgs): ZIO[ZEnv, Nothing, ExitCode] = {
4338
val ontologyFile = new File(config.ontologyFile)
@@ -52,8 +47,8 @@ object Main extends ZCaseApp[Config] {
5247
val program = streamsManaged.use {
5348
case (nonredundantRDFWriter, redundantRDFWriter) =>
5449
for {
55-
fileProperties <- config.propertiesFile.map(readPropertiesFile).getOrElse(ZIO.succeed(Set.empty[OWLObjectProperty]))
56-
specifiedProperties = fileProperties ++ config.property.map(prop => df.getOWLObjectProperty(IRI.create(prop))).to(Set)
50+
fileProperties <- config.propertiesFile.map(readPropertiesFile).getOrElse(ZIO.succeed(Set.empty[AtomicConcept]))
51+
specifiedProperties = fileProperties ++ config.property.map(prop => AtomicConcept(prop)).to(Set)
5752
manager <- ZIO.effect(OWLManager.createOWLOntologyManager())
5853
ontology <- ZIO.effect(manager.loadOntologyFromOntologyDocument(ontologyFile))
5954
whelkOntology = Bridge.ontologyToAxioms(ontology)
@@ -65,28 +60,39 @@ object Main extends ZCaseApp[Config] {
6560
effectBlockingIO(redundantRDFWriter.triple(Triple.create(NodeFactory.createBlankNode("redundant"), RDFType, OWLOntology))))
6661
.when(config.mode == OWLMode)
6762
start <- ZIO.effectTotal(System.currentTimeMillis())
68-
classes = allClasses(ontology)
69-
classesTasks = classes.map(c => Task(processSuperclasses(c, whelk, config)))
70-
restrictions = extractAllRestrictions(ontology, specifiedProperties)
71-
restrictionsTasks = restrictions.map(r => Task(processRestriction(r, whelk, config.mode)))
72-
allTasks = classesTasks ++ restrictionsTasks
73-
processed = allTasks.mapParallelUnordered(JRuntime.getRuntime.availableProcessors)(identity)
74-
monixTask = processed.foreachL {
63+
processed <- computeRelations(ontology, whelk, specifiedProperties, config.reflexiveSubclasses.bool, config.equivalenceAsSubclass.bool, config.mode)
64+
_ <- processed.foreach {
7565
case TriplesGroup(nonredundant, redundant) =>
76-
nonredundant.foreach(nonredundantRDFWriter.triple)
77-
redundant.foreach(redundantRDFWriter.triple)
66+
ZIO.effect {
67+
nonredundant.foreach(nonredundantRDFWriter.triple)
68+
redundant.foreach(redundantRDFWriter.triple)
69+
}
7870
}
79-
_ <- IO.fromTask(monixTask)
8071
stop <- ZIO.effectTotal(System.currentTimeMillis())
8172
_ <- ZIO.effectTotal(scribe.info(s"Computed relations in ${(stop - start) / 1000.0}s"))
8273
} yield ()
8374
}
8475
program.exitCode
8576
}
8677

87-
def readPropertiesFile(file: String): ZIO[Blocking, Throwable, Set[OWLObjectProperty]] =
78+
def computeRelations(ontology: OWLOntology, whelk: ReasonerState, specifiedProperties: Set[AtomicConcept], reflexiveSubclasses: Boolean, equivalenceAsSubclass: Boolean, mode: Config.OutputMode): UIO[UStream[TriplesGroup]] = {
79+
val classes = classHierarchy(whelk)
80+
val properties = propertyHierarchy(ontology)
81+
val classesStream = allClasses(ontology)
82+
val classesTasks = classesStream.map(c => ZIO.effectTotal(processSuperclasses(c, whelk, reflexiveSubclasses, equivalenceAsSubclass)))
83+
for {
84+
queue <- Queue.unbounded[Restriction]
85+
activeRestrictions <- Ref.make(0)
86+
seenRef <- Ref.make(Map.empty[AtomicConcept, Set[AtomicConcept]])
87+
_ <- traverse(specifiedProperties, properties, classes, queue, activeRestrictions, seenRef)
88+
restrictionsStream = Stream.fromQueue(queue).map(r => processRestrictionAndExtendQueue(r, properties, classes, whelk, mode, specifiedProperties.isEmpty, queue, activeRestrictions, seenRef))
89+
allTasks = classesTasks ++ restrictionsStream
90+
} yield allTasks.mapMParUnordered(JRuntime.getRuntime.availableProcessors)(identity)
91+
}
92+
93+
def readPropertiesFile(file: String): ZIO[Blocking, Throwable, Set[AtomicConcept]] =
8894
effectBlocking(Source.fromFile(file, "utf-8")).bracketAuto { source =>
89-
effectBlocking(source.getLines().map(_.trim).filter(_.nonEmpty).map(line => df.getOWLObjectProperty(IRI.create(line))).to(Set))
95+
effectBlocking(source.getLines().map(_.trim).filter(_.nonEmpty).map(line => AtomicConcept(line)).to(Set))
9096
}
9197

9298
def createStreamRDF(output: OutputStream): Managed[Throwable, StreamRDF] =
@@ -98,60 +104,69 @@ object Main extends ZCaseApp[Config] {
98104
}
99105
}(stream => ZIO.effectTotal(stream.finish()))
100106

101-
def allClasses(ont: OWLOntology): Observable[OWLClass] = Observable.fromIterable(ont.getClassesInSignature(Imports.INCLUDED).asScala.to(Set) - OWLThing - OWLNothing)
107+
def allClasses(ont: OWLOntology): ZStream[Any, Nothing, OWLClass] = Stream.fromIterable(ont.getClassesInSignature(Imports.INCLUDED).asScala.to(Set) - OWLThing - OWLNothing)
102108

103-
def processSuperclasses(cls: OWLClass, whelk: ReasonerState, config: Config): TriplesGroup = {
104-
val subject = NodeFactory.createURI(cls.getIRI.toString)
105-
val concept = AtomicConcept(cls.getIRI.toString)
106-
val allSuperclasses = (whelk.closureSubsBySubclass.getOrElse(concept, Set.empty) - BuiltIn.Top)
107-
.collect { case ac @ AtomicConcept(_) => ac }
108-
if (allSuperclasses(BuiltIn.Bottom)) TriplesGroup.empty //unsatisfiable
109-
else {
110-
val (equivs, directSuperclasses) = whelk.directlySubsumedBy(concept)
111-
val adjustedEquivs = if (config.reflexiveSubclasses.bool) equivs + concept else equivs - concept
112-
val directSuperclassTriples = directSuperclasses.map(c => Triple.create(subject, RDFSSubClassOf, NodeFactory.createURI(c.id)))
113-
val equivalentClassTriples = if (config.equivalenceAsSubclass.bool)
114-
adjustedEquivs.map(c => Triple.create(subject, RDFSSubClassOf, NodeFactory.createURI(c.id)))
115-
else
116-
adjustedEquivs.map(c => Triple.create(subject, OWLEquivalentClass, NodeFactory.createURI(c.id)))
117-
val nonredundantTriples = directSuperclassTriples ++ equivalentClassTriples
118-
val adjustedSuperclasses = if (config.reflexiveSubclasses.bool) allSuperclasses + concept else allSuperclasses - concept
119-
val redundantTriples = if (config.equivalenceAsSubclass.bool)
120-
adjustedSuperclasses.map(c => Triple.create(subject, RDFSSubClassOf, NodeFactory.createURI(c.id)))
121-
else {
122-
val superclassesMinusEquiv = adjustedSuperclasses -- adjustedEquivs
123-
superclassesMinusEquiv.map(c => Triple.create(subject, RDFSSubClassOf, NodeFactory.createURI(c.id))) ++
124-
equivalentClassTriples
125-
}
126-
TriplesGroup(nonredundantTriples, redundantTriples)
109+
def traverse(specifiedProperties: Set[AtomicConcept], properties: Hierarchy, classes: Hierarchy, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRef: Ref[Map[AtomicConcept, Set[AtomicConcept]]]): UIO[Unit] = {
110+
val descendProperties = specifiedProperties.isEmpty
111+
val queryProperties = if (descendProperties) properties.subclasses.getOrElse(Top, Set.empty) - Bottom else specifiedProperties
112+
ZIO.foreachPar_(queryProperties) { subprop =>
113+
traverseProperty(subprop, classes, queue, activeRestrictions, seenRef)
127114
}
128115
}
129116

130-
def extractAllRestrictions(ont: OWLOntology, specifiedProperties: Set[OWLObjectProperty]): Observable[Restriction] = {
131-
val properties =
132-
if (specifiedProperties.nonEmpty) specifiedProperties
133-
else ont.getObjectPropertiesInSignature(Imports.INCLUDED).asScala.to(Set) - OWLTopObjectProperty
134-
val propertiesStream = Observable.fromIterable(properties)
135-
val classesStream = allClasses(ont)
117+
def traverseProperty(property: AtomicConcept, classes: Hierarchy, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRef: Ref[Map[AtomicConcept, Set[AtomicConcept]]]): UIO[Unit] = {
118+
val restrictions = (classes.subclasses.getOrElse(Top, Set.empty) - Bottom).map(filler => Restriction(Role(property.id), filler))
136119
for {
137-
property <- propertiesStream
138-
cls <- classesStream
139-
} yield Restriction(property, cls)
120+
_ <- seenRef.update { seen =>
121+
val seenForThisProperty = seen.getOrElse(property, Set.empty) ++ restrictions.map(_.filler)
122+
seen.updated(property, seenForThisProperty)
123+
}
124+
_ <- activeRestrictions.update(current => current + restrictions.size)
125+
_ <- queue.offerAll(restrictions).unit
126+
active <- activeRestrictions.get
127+
_ <- queue.shutdown.when(active < 1)
128+
} yield ()
140129
}
141130

142-
def processRestriction(combo: Restriction, whelk: ReasonerState, mode: Config.OutputMode): TriplesGroup = {
143-
val Restriction(property, cls) = combo
144-
val propertyID = property.getIRI.toString
145-
val clsID = cls.getIRI.toString
146-
val queryConcept = AtomicConcept(s"$propertyID$clsID")
147-
val restriction = ExistentialRestriction(Role(propertyID), AtomicConcept(clsID))
148-
val axioms = Set(ConceptInclusion(queryConcept, restriction), ConceptInclusion(restriction, queryConcept))
131+
def processRestrictionAndExtendQueue(restriction: Restriction, properties: Hierarchy, classes: Hierarchy, whelk: ReasonerState, mode: Config.OutputMode, descendProperties: Boolean, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRef: Ref[Map[AtomicConcept, Set[AtomicConcept]]]): UIO[TriplesGroup] =
132+
for {
133+
triples <- ZIO.effectTotal(processRestriction(restriction, whelk, mode))
134+
directFillerSubclassesRestrictions <- if (triples.redundant.nonEmpty) seenRef.modify { seen =>
135+
val propertyConcept = AtomicConcept(restriction.property.id)
136+
val seenForThisProperty = seen.getOrElse(propertyConcept, Set.empty)
137+
val subClasses = classes.subclasses.getOrElse(restriction.filler, Set.empty) - Bottom
138+
val unseenSubClasses = subClasses -- seenForThisProperty
139+
val updatedSeen = seen.updated(propertyConcept, seenForThisProperty ++ subClasses)
140+
val newRestrictions = unseenSubClasses.map(c => Restriction(restriction.property, c))
141+
if (descendProperties) {
142+
val subProperties = properties.subclasses.getOrElse(propertyConcept, Set.empty) - Bottom
143+
subProperties.foldLeft((newRestrictions, updatedSeen)) { case ((accRestrictions, accSeen), subProperty) =>
144+
val seenClassesForSubProperty = accSeen.getOrElse(subProperty, Set.empty)
145+
val updatedRestrictions = if (!seenClassesForSubProperty(restriction.filler))
146+
accRestrictions + Restriction(Role(subProperty.id), restriction.filler)
147+
else accRestrictions
148+
val updatedAccSeen = accSeen.updated(subProperty, seenClassesForSubProperty + restriction.filler)
149+
(updatedRestrictions, updatedAccSeen)
150+
}
151+
} else (newRestrictions, updatedSeen)
152+
} else ZIO.succeed(Set.empty[Restriction])
153+
_ <- activeRestrictions.update(current => current - 1 + directFillerSubclassesRestrictions.size)
154+
_ <- queue.offerAll(directFillerSubclassesRestrictions)
155+
active <- activeRestrictions.get
156+
_ <- queue.shutdown.when(active < 1)
157+
} yield triples
158+
159+
def processRestriction(restriction: Restriction, whelk: ReasonerState, mode: Config.OutputMode): TriplesGroup = {
160+
val queryConcept = AtomicConcept(s"${restriction.property.id}${restriction.filler.id}")
161+
val er = ExistentialRestriction(restriction.property, restriction.filler)
162+
val axioms = Set(ConceptInclusion(queryConcept, er), ConceptInclusion(er, queryConcept))
149163
val updatedWhelk = Reasoner.assert(axioms, whelk)
150-
val predicate = NodeFactory.createURI(property.getIRI.toString)
151-
val target = NodeFactory.createURI(cls.getIRI.toString)
164+
//FIXME don't create these if result is empty
165+
val predicate = NodeFactory.createURI(restriction.property.id)
166+
val target = NodeFactory.createURI(restriction.filler.id)
152167
val (equivalents, directSubclasses) = updatedWhelk.directlySubsumes(queryConcept)
153168
val subclasses =
154-
updatedWhelk.closureSubsBySuperclass(queryConcept).collect { case x: AtomicConcept => x } - queryConcept - BuiltIn.Bottom
169+
updatedWhelk.closureSubsBySuperclass(queryConcept).collect { case x: AtomicConcept => x } - queryConcept - Bottom
155170
if (!equivalents(BuiltIn.Bottom)) {
156171
val nonredundantTerms = directSubclasses - BuiltIn.Bottom ++ equivalents
157172
val nonredundantTriples = mode match {
@@ -166,6 +181,60 @@ object Main extends ZCaseApp[Config] {
166181
} else TriplesGroup.empty
167182
}
168183

184+
def classHierarchy(reasoner: ReasonerState): Hierarchy = {
185+
val taxonomy = reasoner.computeTaxonomy
186+
val subclassTaxonomy = taxonomy.foldLeft(Map.empty[AtomicConcept, Set[AtomicConcept]]) { case (accum, (concept, (_, superclasses))) =>
187+
superclasses.foldLeft(accum) { case (inner, superclass) =>
188+
val updatedSubclasses = accum.getOrElse(superclass, Set.empty) + concept
189+
inner.updated(superclass, updatedSubclasses)
190+
}
191+
}
192+
val equivMap = taxonomy.map { case (concept, (equivs, _)) => concept -> equivs }
193+
Hierarchy(equivMap, subclassTaxonomy)
194+
}
195+
196+
def propertyHierarchy(ont: OWLOntology): Hierarchy = {
197+
val subPropAxioms = ont.getAxioms(AxiomType.SUB_OBJECT_PROPERTY).asScala.to(Set).collect {
198+
case ax if ax.getSubProperty.isNamed && ax.getSuperProperty.isNamed && !ax.getSuperProperty.isOWLTopObjectProperty => ConceptInclusion(
199+
AtomicConcept(ax.getSubProperty.asOWLObjectProperty.getIRI.toString),
200+
AtomicConcept(ax.getSuperProperty.asOWLObjectProperty.getIRI.toString))
201+
}
202+
val allProps = ont.getObjectPropertiesInSignature(Imports.INCLUDED).asScala.to(Set)
203+
.filterNot(_.isOWLTopObjectProperty)
204+
.map(prop =>
205+
ConceptInclusion(AtomicConcept(prop.getIRI.toString), AtomicConcept(prop.getIRI.toString)))
206+
val allAxioms = (subPropAxioms ++ allProps).toSet[Axiom]
207+
val whelk = Reasoner.assert(allAxioms)
208+
classHierarchy(whelk)
209+
}
210+
211+
def processSuperclasses(cls: OWLClass, whelk: ReasonerState, reflexiveSubclasses: Boolean, equivalenceAsSubclass: Boolean): TriplesGroup = {
212+
val subject = NodeFactory.createURI(cls.getIRI.toString)
213+
val concept = AtomicConcept(cls.getIRI.toString)
214+
val allSuperclasses = (whelk.closureSubsBySubclass.getOrElse(concept, Set.empty) - BuiltIn.Top)
215+
.collect { case ac @ AtomicConcept(_) => ac }
216+
if (allSuperclasses(BuiltIn.Bottom)) TriplesGroup.empty //unsatisfiable
217+
else {
218+
val (equivs, directSuperclasses) = whelk.directlySubsumedBy(concept)
219+
val adjustedEquivs = if (reflexiveSubclasses) equivs + concept else equivs - concept
220+
val directSuperclassTriples = directSuperclasses.map(c => Triple.create(subject, RDFSSubClassOf, NodeFactory.createURI(c.id)))
221+
val equivalentClassTriples = if (equivalenceAsSubclass)
222+
adjustedEquivs.map(c => Triple.create(subject, RDFSSubClassOf, NodeFactory.createURI(c.id)))
223+
else
224+
adjustedEquivs.map(c => Triple.create(subject, OWLEquivalentClass, NodeFactory.createURI(c.id)))
225+
val nonredundantTriples = directSuperclassTriples ++ equivalentClassTriples
226+
val adjustedSuperclasses = if (reflexiveSubclasses) allSuperclasses + concept else allSuperclasses - concept
227+
val redundantTriples = if (equivalenceAsSubclass)
228+
adjustedSuperclasses.map(c => Triple.create(subject, RDFSSubClassOf, NodeFactory.createURI(c.id)))
229+
else {
230+
val superclassesMinusEquiv = adjustedSuperclasses -- adjustedEquivs
231+
superclassesMinusEquiv.map(c => Triple.create(subject, RDFSSubClassOf, NodeFactory.createURI(c.id))) ++
232+
equivalentClassTriples
233+
}
234+
TriplesGroup(nonredundantTriples, redundantTriples)
235+
}
236+
}
237+
169238
def owlTriples(subj: Node, pred: Node, obj: Node): Set[Triple] = {
170239
val hash = MessageDigest.getInstance("SHA-256").digest(s"$subj$pred$obj".getBytes(StandardCharsets.UTF_8))
171240
val id = Base64.getEncoder.encodeToString(hash)
@@ -178,7 +247,9 @@ object Main extends ZCaseApp[Config] {
178247
)
179248
}
180249

181-
final case class Restriction(property: OWLObjectProperty, filler: OWLClass)
250+
final case class Hierarchy(equivs: Map[AtomicConcept, Set[AtomicConcept]], subclasses: Map[AtomicConcept, Set[AtomicConcept]])
251+
252+
final case class Restriction(property: Role, filler: AtomicConcept)
182253

183254
final case class TriplesGroup(nonredundant: Set[Triple], redundant: Set[Triple])
184255

src/test/scala/org/renci/relationgraph/TestRelationGraph.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package org.renci.relationgraph
22

3-
import monix.execution.Scheduler.Implicits.global
43
import org.apache.jena.graph.{Node, NodeFactory, Triple}
54
import org.geneontology.whelk.{Bridge, Reasoner}
65
import org.renci.relationgraph.Main.TriplesGroup
76
import org.semanticweb.owlapi.apibinding.OWLManager
87
import zio._
9-
import zio.interop.monix._
108
import zio.test.Assertion._
119
import zio.test._
1210

@@ -23,14 +21,11 @@ object TestRelationGraph extends DefaultRunnableSpec {
2321
for {
2422
manager <- ZIO.effect(OWLManager.createOWLOntologyManager())
2523
ontology <- ZIO.effect(manager.loadOntologyFromOntologyDocument(this.getClass.getResourceAsStream("materialize_test.ofn")))
26-
restrictions = Main.extractAllRestrictions(ontology, Set.empty)
2724
whelkOntology = Bridge.ontologyToAxioms(ontology)
2825
whelk = Reasoner.assert(whelkOntology)
29-
triples <- IO.fromTask(
30-
restrictions
31-
.map(Main.processRestriction(_, whelk, Config.RDFMode))
32-
.reduce((left, right) => TriplesGroup(left.nonredundant ++ right.nonredundant, left.redundant ++ right.redundant))
33-
.headL)
26+
resultsStream <- Main.computeRelations(ontology, whelk, Set.empty, false, false, Config.RDFMode)
27+
results <- resultsStream.runCollect
28+
triples <- ZIO.fromOption(results.reduceOption((left, right) => TriplesGroup(left.nonredundant ++ right.nonredundant, left.redundant ++ right.redundant)))
3429
TriplesGroup(nonredundant, redundant) = triples
3530
} yield assert(nonredundant)(contains(Triple.create(n(s"$Prefix#A"), P, n(s"$Prefix#D")))) &&
3631
assert(redundant)(contains(Triple.create(n(s"$Prefix#A"), P, n(s"$Prefix#D")))) &&

0 commit comments

Comments
 (0)