@@ -6,7 +6,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
66import org .apache .flink .api .scala .{DataSet , _ }
77import org .apache .flink .util .Collector
88import org .apache .jena .graph .{Node , Triple }
9+ import org .apache .jena .sparql .util .NodeComparator
910
11+ import net .sansa_stack .inference .flink .utils .NodeKey
1012import net .sansa_stack .inference .utils .Profiler
1113
1214/**
@@ -186,28 +188,65 @@ trait TransitiveReasoner extends Profiler{
186188 * @return a DataSet containing the transitive closure of the triples
187189 */
188190 def computeTransitiveClosureOptSemiNaive (triples : DataSet [Triple ]): DataSet [Triple ] = {
191+
192+ // apparently, we have to use pairs for (subject, object) because the Jena Triple is not a Scala tuple
193+ // and we have to provide positions of key and value in the iterate method
194+ // the initial set of edges is used as input for both, the workset and the solutionset
195+ val initialTC = triples.map(t => (NodeKey (t.getSubject), NodeKey (t.getObject)))
196+ val pred = triples.first(1 ).collect().head.getPredicate
197+
189198 log.info(" computing TC..." )
190- def iterate (s : DataSet [Triple ], ws : DataSet [Triple ]): (DataSet [Triple ], DataSet [Triple ]) = {
191- val resolvedRedirects = triples.join(ws)
192- .where { _.getSubject }
193- .equalTo { _.getObject }
199+ def iterate (s : DataSet [(NodeKey , NodeKey )], ws : DataSet [(NodeKey , NodeKey )])
200+ : (DataSet [(NodeKey , NodeKey )], DataSet [(NodeKey , NodeKey )]) = {
201+ val resolvedRedirects = initialTC.join(ws)
202+ .where(0 )
203+ .equalTo(1 )
194204 .map { joinResult => joinResult match {
195- case (redirect, link) =>
196- Triple .create(link.getSubject, redirect.getPredicate, redirect.getObject)
205+ case (redirect, link) => (link._1, redirect._2)
197206 }
198207 }.name(" TC-From-Iteration" )
199208 (resolvedRedirects, resolvedRedirects)
200209 }
201210
202- val tc = triples
203- .iterateDelta(triples , 10 , Array (" s " , " o " ))(iterate)
211+ val tc = initialTC
212+ .iterateDelta(initialTC , 10 , Array (0 ))(iterate)
204213 .name(" Final-TC" )
205214 log.info(" finished computing TC" )
206215 // .map { cl => cl}
207216 // .name("Final-Redirect-Result")
208- tc
217+ tc.map(t => Triple .create(t._1.node, pred, t._2.node))
209218 }
210219
211-
220+ // /**
221+ // * Computes the transitive closure on a DataSet of triples.
222+ // * Note, that the assumption is that all triples do have the same predicate.
223+ // * This implementation uses the Flink iterate operator (see
224+ // * [[https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/iterations.html"]])
225+ // *
226+ // * @param triples the DataSet of triples
227+ // * @return a DataSet containing the transitive closure of the triples
228+ // */
229+ // def computeTransitiveClosureOptSemiNaive(triples: DataSet[Triple]): DataSet[Triple] = {
230+ // log.info("computing TC...")
231+ // def iterate(s: DataSet[Triple], ws: DataSet[Triple]): (DataSet[Triple], DataSet[Triple]) = {
232+ // val resolvedRedirects = triples.join(ws)
233+ // .where { _.getSubject }
234+ // .equalTo { _.getObject }
235+ // .map { joinResult => joinResult match {
236+ // case (redirect, link) =>
237+ // Triple.create(link.getSubject, redirect.getPredicate, redirect.getObject)
238+ // }
239+ // }.name("TC-From-Iteration")
240+ // (resolvedRedirects, resolvedRedirects)
241+ // }
242+ //
243+ // val tc = triples
244+ // .iterateDelta(triples, 10, Array("s", "o"))(iterate)
245+ // .name("Final-TC")
246+ // log.info("finished computing TC")
247+ // // .map { cl => cl}
248+ // // .name("Final-Redirect-Result")
249+ // tc
250+ // }
212251
213252}
0 commit comments