11package net .sansa_stack .inference .spark .backwardchaining
22
33
4- import java .net .URI
5-
6- import org .apache .jena .graph .{Node , NodeFactory , Triple }
7- import org .apache .jena .reasoner .TriplePattern
8- import org .apache .jena .reasoner .rulesys .Rule
9- import org .apache .jena .reasoner .rulesys .impl .BindingVector
10- import org .apache .jena .sparql .util .FmtUtils
11- import org .apache .jena .vocabulary .{RDF , RDFS }
12- import org .apache .spark .sql .{Dataset , SaveMode , SparkSession }
134import net .sansa_stack .inference .rules .RuleSets
145import net .sansa_stack .inference .rules .plan .SimpleSQLGenerator
15- import net .sansa_stack .inference .spark .backwardchaining .BackwardChainingReasonerDataframe .time
166import net .sansa_stack .inference .spark .backwardchaining .tree .{AndNode , OrNode }
17- import net .sansa_stack .inference .spark .data .loader .RDFGraphLoader
18- import net .sansa_stack .inference .spark .utils .NTriplesToParquetConverter .{DEFAULT_NUM_THREADS , DEFAULT_PARALLELISM }
197import net .sansa_stack .inference .utils .RuleUtils ._
208import net .sansa_stack .inference .utils .{Logging , TripleUtils }
9+ import org .apache .jena .graph .{Node , NodeFactory , Triple }
2110import org .apache .jena .rdf .model .Resource
11+ import org .apache .jena .reasoner .TriplePattern
12+ import org .apache .jena .reasoner .rulesys .Rule
13+ import org .apache .jena .reasoner .rulesys .impl .BindingVector
14+ import org .apache .jena .sparql .util .FmtUtils
15+ import org .apache .jena .vocabulary .{RDF , RDFS }
16+ import org .apache .spark .sql .{Dataset , SparkSession }
2217
2318import scala .concurrent .duration .FiniteDuration
2419
@@ -47,11 +42,11 @@ class BackwardChainingReasonerDataframe(
4742
4843 def isEntailed (tp : TriplePattern ): Boolean = {
4944 val tree = buildTree(new AndNode (tp), Seq ())
50- println (tree.toString)
45+ log.info (tree.toString)
5146
5247 val triples = processTree(tree)
5348 triples.explain(true )
54- println (triples.distinct().count())
49+ log.info (triples.distinct().count().toString )
5550
5651 false
5752 }
@@ -66,7 +61,7 @@ class BackwardChainingReasonerDataframe(
6661
6762 // 2. process the inference rules that can infer the triple pattern
6863 val inferredTriples = tree.children.map(child => {
69- println (s " processing rule ${child.element}" )
64+ log.info (s " processing rule ${child.element}" )
7065
7166 // first process the children, i.e. we get the data for each triple pattern in the body of the rule
7267 val childrenTriples : Seq [Dataset [RDFTriple ]] = child.children.map(processTree(_))
@@ -142,7 +137,7 @@ class BackwardChainingReasonerDataframe(
142137
143138 rules.filterNot(visited.contains(_)).foreach(r => {
144139 // check if the head is more general than the triple in question
145- var head = r.headTriplePatterns()
140+ val head = r.headTriplePatterns()
146141
147142 head.foreach(headTP => {
148143 val subsumes = headTP.subsumes(tp)
@@ -210,7 +205,7 @@ class BackwardChainingReasonerDataframe(
210205
211206 val tableName = s " TRIPLES_ ${rule.getName}"
212207 sql = sql.replace(" TRIPLES" , tableName)
213- println (s " SQL NEW: $sql" )
208+ log.info (s " SQL NEW: $sql" )
214209 dataset.createOrReplaceTempView(tableName)
215210 dataset.sparkSession.sql(sql).as[RDFTriple ]
216211 }
@@ -220,7 +215,7 @@ class BackwardChainingReasonerDataframe(
220215 (RDFS .subPropertyOf, true , " SPO" ),
221216 (RDFS .domain, false , " DOM" ),
222217 (RDFS .range, false , " RAN" ))
223- val DUMMY_VAR = NodeFactory .createVariable(" VAR" );
218+ val DUMMY_VAR = NodeFactory .createVariable(" VAR" )
224219
225220 /**
226221 * Computes the triples for each schema property p, e.g. `rdfs:subClassOf` and returns it as mapping from p
@@ -470,6 +465,7 @@ object BackwardChainingReasonerDataframe extends Logging{
470465 .config(" spark.sql.shuffle.partitions" , parallelism)
471466 .config(" spark.sql.autoBroadcastJoinThreshold" , " 10485760" )
472467 .config(" parquet.enable.summary-metadata" , " false" )
468+ // .config("spark.sql.cbo.enabled", "true")path
473469// .config("spark.local.dir", "/home/user/work/datasets/spark/tmp")
474470 .getOrCreate()
475471
@@ -480,9 +476,7 @@ object BackwardChainingReasonerDataframe extends Logging{
480476// .triples.map(t => RDFTriple(t.getSubject.toString(), t.getPredicate.toString(), t.getObject.toString()))
481477// // .triples.map(t => RDFTriple(FmtUtils.stringForNode(t.getSubject), FmtUtils.stringForNode(t.getPredicate), FmtUtils.stringForNode(t.getObject)))
482478//
483- // val tableDir = "/home/user/work/datasets/lubm/table/1000"
484479// val graph = session.createDataset(triples)//.cache()
485- // graph.write.mode(SaveMode.Append).parquet(tableDir)
486480
487481 val graph = session.read.parquet(inputPath).as[RDFTriple ].cache()
488482 graph.createOrReplaceTempView(" TRIPLES" )
0 commit comments