|
| 1 | +package net.sansa_stack.inference.spark.utils |
| 2 | + |
| 3 | +import java.net.URI |
| 4 | + |
| 5 | +import net.sansa_stack.inference.spark.utils.RDFTriple |
| 6 | +import net.sansa_stack.inference.spark.data.loader.RDFGraphLoader |
| 7 | +import org.apache.jena.graph.Node |
| 8 | +import org.apache.spark.rdd.RDD |
| 9 | +import org.apache.spark.sql.{SaveMode, SparkSession} |
| 10 | + |
| 11 | +case class RDFTriple(s: String, p: String, o: String) |
| 12 | + |
| 13 | +/** |
| 14 | + * Convert N-Triples data into the [[https://parquet.apache.org/ Apache Parquet format]]. |
| 15 | + * |
| 16 | + * @author Lorenz Buehmann |
| 17 | + */ |
| 18 | +class NTriplesToParquetConverter(val session: SparkSession) { |
| 19 | + |
| 20 | + import session.implicits._ |
| 21 | + |
| 22 | + private implicit def pathURIsConverter(uris: Seq[URI]): String = uris.map(p => p.toString).mkString(",") |
| 23 | + |
| 24 | + def saveAsParquet(inputPath: URI, outputPath: URI): Unit = { |
| 25 | + saveAsParquet(Seq(inputPath), outputPath) |
| 26 | + } |
| 27 | + |
| 28 | + def saveAsParquet(inputPaths: scala.Seq[URI], outputPath: URI): Unit = { |
| 29 | + // load the RDD |
| 30 | + val triplesRDD: RDD[RDFTriple] = RDFGraphLoader.loadFromDisk(session, inputPaths).triples |
| 31 | + .map(t => RDFTriple(t.getSubject.toString(), t.getPredicate.toString(), t.getObject.toString())) |
| 32 | + |
| 33 | + // generate a Dataset |
| 34 | + val tripleDS = session.createDataset(triplesRDD) |
| 35 | + |
| 36 | + // write to disk in Parquet format |
| 37 | + tripleDS |
| 38 | +// .repartition(tripleDS("s")) |
| 39 | + .write.mode(SaveMode.Append).parquet(outputPath.toString) |
| 40 | + } |
| 41 | +} |
| 42 | + |
| 43 | +object NTriplesToParquetConverter { |
| 44 | + |
| 45 | + val DEFAULT_PARALLELISM = 200 |
| 46 | + val DEFAULT_NUM_THREADS = 4 |
| 47 | + |
| 48 | + def main(args: Array[String]): Unit = { |
| 49 | + if (args.length < 2) sys.error("USAGE: NTriplesToParquetConverter <INPUT_PATH>+ <OUTPUT_PATH> <NUM_THREADS>? <PARALLELISM>?") |
| 50 | + |
| 51 | + val inputPaths = args(0).split(",").map(URI.create) |
| 52 | + val outputPath = URI.create(args(1)) |
| 53 | + val numThreads = if (args.length > 2) args(2).toInt else DEFAULT_NUM_THREADS |
| 54 | + val parallelism = if (args.length > 3) args(3).toInt else DEFAULT_PARALLELISM |
| 55 | + |
| 56 | + |
| 57 | + // the SPARK config |
| 58 | + val session = SparkSession.builder |
| 59 | + .appName("N-Triples to Parquet Conversion") |
| 60 | + .master(s"local[$numThreads]") |
| 61 | + .config("spark.eventLog.enabled", "true") |
| 62 | + .config("spark.hadoop.validateOutputSpecs", "false") // override output files |
| 63 | + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") |
| 64 | + .config("spark.default.parallelism", parallelism) |
| 65 | + .config("spark.ui.showConsoleProgress", "false") |
| 66 | + .config("spark.sql.shuffle.partitions", parallelism) |
| 67 | + .config("parquet.enable.summary-metadata", "false") |
| 68 | + .getOrCreate() |
| 69 | + |
| 70 | + new NTriplesToParquetConverter(session).saveAsParquet(inputPaths, outputPath) |
| 71 | + |
| 72 | + session.stop() |
| 73 | + } |
| 74 | + |
| 75 | + |
| 76 | +} |
0 commit comments