From e81a6a4f6b18013eb0b40d11dbf37f71d0c455c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Thu, 7 May 2026 10:41:20 +0800 Subject: [PATCH 1/4] split scan partition by conf --- .../BucketOffsetsRetrieverImpl.java | 19 ++++-- .../apache/fluss/spark/SparkFlussConf.scala | 10 +++ .../apache/fluss/spark/read/FlussBatch.scala | 61 +++++++++++++++---- .../fluss/spark/SparkLogTableReadTest.scala | 9 +++ 4 files changed, 84 insertions(+), 15 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java index e868a84cc1..037790f0be 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java @@ -37,10 +37,17 @@ public class BucketOffsetsRetrieverImpl implements OffsetsInitializer.BucketOffsetsRetriever { private final Admin flussAdmin; private final TablePath tablePath; + private final Boolean fetchEarliestOffset; public BucketOffsetsRetrieverImpl(Admin flussAdmin, TablePath tablePath) { + this(flussAdmin, tablePath, false); + } + + public BucketOffsetsRetrieverImpl( + Admin flussAdmin, TablePath tablePath, Boolean fetchEarliestOffset) { this.flussAdmin = flussAdmin; this.tablePath = tablePath; + this.fetchEarliestOffset = fetchEarliestOffset; } @Override @@ -52,11 +59,15 @@ public Map latestOffsets( @Override public Map earliestOffsets( @Nullable String partitionName, Collection buckets) { - Map bucketWithOffset = new HashMap<>(buckets.size()); - for (Integer bucket : buckets) { - bucketWithOffset.put(bucket, EARLIEST_OFFSET); + if (!fetchEarliestOffset) { + Map bucketWithOffset = new HashMap<>(buckets.size()); + for (Integer bucket : buckets) { + bucketWithOffset.put(bucket, EARLIEST_OFFSET); + } + return bucketWithOffset; + } else { + return listOffsets(partitionName, buckets, new OffsetSpec.EarliestSpec()); } - return bucketWithOffset; } @Override diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala index 28fb633b52..00d6400f64 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala @@ -50,4 +50,14 @@ object SparkFlussConf { .durationType() .defaultValue(Duration.ofMillis(10000L)) .withDescription("The timeout for log scanner to poll records.") + + val SCAN_MAX_RECORDS_PER_PARTITION: ConfigOption[java.lang.Long] = + ConfigBuilder + .key("scan.max.records.per.partition") + .longType() + .noDefaultValue() + .withDescription( + "The maximum number of records per Spark input partition when reading a log table. " + + "When set, each Fluss bucket whose offset range exceeds this value will be split " + + "into multiple partitions. Disabled by default (one partition per bucket).") } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala index 128094a04c..88bf09e8e6 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala @@ -26,6 +26,7 @@ import org.apache.fluss.config.Configuration import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, TablePath} import org.apache.fluss.predicate.Predicate import org.apache.fluss.spark.utils.SparkPartitionPredicate +import org.apache.fluss.spark.SparkFlussConf import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory} import org.apache.spark.sql.types.StructType @@ -93,26 +94,64 @@ class FlussAppendBatch( } override def planInputPartitions(): Array[InputPartition] = { - val bucketOffsetsRetrieverImpl = new BucketOffsetsRetrieverImpl(admin, tablePath) + val maxRecordsPerPartition: Option[Long] = { + val opt = flussConfig.getOptional(SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION) + if (opt.isPresent) Some(opt.get().longValue()) else None + } + + val bucketOffsetsRetrieverImpl = maxRecordsPerPartition match { + case Some(_) => new BucketOffsetsRetrieverImpl(admin, tablePath, true) + case None => new BucketOffsetsRetrieverImpl(admin, tablePath) + } val buckets = (0 until tableInfo.getNumBuckets).toSeq + def splitOffsetRange( + tableBucket: TableBucket, + startOffset: Long, + stopOffset: Long, + maxRecords: Long): Seq[InputPartition] = { + if ( + startOffset < 0 || stopOffset <= startOffset || stopOffset <= (startOffset + maxRecords) + ) { + return Seq( + FlussAppendInputPartition(tableBucket, startOffset, stopOffset) + .asInstanceOf[InputPartition]) + } + val rangeSize = stopOffset - startOffset + val numSplits = ((rangeSize + maxRecords - 1) / maxRecords).toInt + val step = (rangeSize + numSplits - 1) / numSplits + + Iterator + .from(0) + .take(numSplits) + .map(i => startOffset + i * step) + .map { + from => + FlussAppendInputPartition(tableBucket, from, math.min(from + step, stopOffset)) + .asInstanceOf[InputPartition] + } + .toSeq + } + def createPartitions( partitionId: Option[Long], startBucketOffsets: Map[Integer, Long], stoppingBucketOffsets: Map[Integer, Long]): Array[InputPartition] = { - buckets.map { + buckets.flatMap { bucketId => - val (startBucketOffset, stoppingBucketOffset) = + val (startOffset, stopOffset) = (startBucketOffsets(bucketId), stoppingBucketOffsets(bucketId)) - partitionId match { - case Some(partitionId) => - val tableBucket = new TableBucket(tableInfo.getTableId, partitionId, bucketId) - FlussAppendInputPartition(tableBucket, startBucketOffset, stoppingBucketOffset) - .asInstanceOf[InputPartition] + val tableBucket = partitionId match { + case Some(pid) => new TableBucket(tableInfo.getTableId, pid, bucketId) + case None => new TableBucket(tableInfo.getTableId, bucketId) + } + maxRecordsPerPartition match { + case Some(maxRecs) => + splitOffsetRange(tableBucket, startOffset, stopOffset, maxRecs) case None => - val tableBucket = new TableBucket(tableInfo.getTableId, bucketId) - FlussAppendInputPartition(tableBucket, startBucketOffset, stoppingBucketOffset) - .asInstanceOf[InputPartition] + Seq( + FlussAppendInputPartition(tableBucket, startOffset, stopOffset) + .asInstanceOf[InputPartition]) } }.toArray } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala index 42b0aa62d0..e09e82f434 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala @@ -603,4 +603,13 @@ class SparkLogTableReadTest extends FlussSparkTestBase { assert(numRowsRead == 5L, s"Expected 5 rows read, got $numRowsRead") } } + + test("Spark Read: split partition by config") { + withSampleTable { + withSQLConf("spark.sql.fluss.scan.max.records.per.partition" -> "2") { + val query = sql(s"SELECT amount FROM $DEFAULT_DATABASE.t ORDER BY orderId") + checkAnswer(query, Row(601) :: Row(602) :: Row(603) :: Row(604) :: Row(605) :: Nil) + } + } + } } From bdaf93b21eaa5ff0992dc11a82f610ddc86cce2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Wed, 13 May 2026 18:45:57 +0800 Subject: [PATCH 2/4] fix comments --- .../BucketOffsetsRetrieverImpl.java | 4 +- .../apache/fluss/spark/read/FlussBatch.scala | 16 ++--- .../fluss/spark/SparkLogTableReadTest.scala | 59 ++++++++++++++++++- 3 files changed, 63 insertions(+), 16 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java index 037790f0be..2abd998828 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/initializer/BucketOffsetsRetrieverImpl.java @@ -37,14 +37,14 @@ public class BucketOffsetsRetrieverImpl implements OffsetsInitializer.BucketOffsetsRetriever { private final Admin flussAdmin; private final TablePath tablePath; - private final Boolean fetchEarliestOffset; + private final boolean fetchEarliestOffset; public BucketOffsetsRetrieverImpl(Admin flussAdmin, TablePath tablePath) { this(flussAdmin, tablePath, false); } public BucketOffsetsRetrieverImpl( - Admin flussAdmin, TablePath tablePath, Boolean fetchEarliestOffset) { + Admin flussAdmin, TablePath tablePath, boolean fetchEarliestOffset) { this.flussAdmin = flussAdmin; this.tablePath = tablePath; this.fetchEarliestOffset = fetchEarliestOffset; diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala index 88bf09e8e6..d036113b12 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala @@ -113,9 +113,7 @@ class FlussAppendBatch( if ( startOffset < 0 || stopOffset <= startOffset || stopOffset <= (startOffset + maxRecords) ) { - return Seq( - FlussAppendInputPartition(tableBucket, startOffset, stopOffset) - .asInstanceOf[InputPartition]) + return Seq(FlussAppendInputPartition(tableBucket, startOffset, stopOffset)) } val rangeSize = stopOffset - startOffset val numSplits = ((rangeSize + maxRecords - 1) / maxRecords).toInt @@ -126,9 +124,7 @@ class FlussAppendBatch( .take(numSplits) .map(i => startOffset + i * step) .map { - from => - FlussAppendInputPartition(tableBucket, from, math.min(from + step, stopOffset)) - .asInstanceOf[InputPartition] + from => FlussAppendInputPartition(tableBucket, from, math.min(from + step, stopOffset)) } .toSeq } @@ -146,12 +142,10 @@ class FlussAppendBatch( case None => new TableBucket(tableInfo.getTableId, bucketId) } maxRecordsPerPartition match { - case Some(maxRecs) => + case Some(maxRecs) if maxRecs > 0 => splitOffsetRange(tableBucket, startOffset, stopOffset, maxRecs) - case None => - Seq( - FlussAppendInputPartition(tableBucket, startOffset, stopOffset) - .asInstanceOf[InputPartition]) + case _ => + Seq(FlussAppendInputPartition(tableBucket, startOffset, stopOffset)) } }.toArray } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala index e09e82f434..4ccc58f928 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala @@ -21,8 +21,8 @@ import org.apache.fluss.spark.read.{FlussMetrics, FlussScan} import org.apache.fluss.spark.read.FlussAppendScan import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.Row import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2ScanRelation} import org.assertj.core.api.Assertions.assertThat @@ -607,9 +607,62 @@ class SparkLogTableReadTest extends FlussSparkTestBase { test("Spark Read: split partition by config") { withSampleTable { withSQLConf("spark.sql.fluss.scan.max.records.per.partition" -> "2") { - val query = sql(s"SELECT amount FROM $DEFAULT_DATABASE.t ORDER BY orderId") - checkAnswer(query, Row(601) :: Row(602) :: Row(603) :: Row(604) :: Row(605) :: Nil) + val df = sql(s"SELECT amount FROM $DEFAULT_DATABASE.t ORDER BY orderId") + checkAnswer(df, Row(601) :: Row(602) :: Row(603) :: Row(604) :: Row(605) :: Nil) + + val partitions = getInputPartitions(df) + assertThat(partitions.length).isEqualTo(3) + } + } + + withTable("t_partition") { + sql( + s""" + |CREATE TABLE $DEFAULT_DATABASE.t_partition (orderId BIGINT, itemId BIGINT, amount INT, address STRING, dt STRING) + |PARTITIONED BY (dt) + |""".stripMargin + ) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_partition VALUES + |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602, "addr2", "2026-01-01"), + |(800L, 23L, 603, "addr3", "2026-01-02"), (900L, 24L, 604, "addr4", "2026-01-02"), + |(1000L, 25L, 605, "addr5", "2026-01-03") + |""".stripMargin) + Seq((0, 3), (1, 5), (2, 3)).foreach { + case (maxRecords, expectedPartitions) => + withClue(s"maxRecords = $maxRecords, expectedPartitions = $expectedPartitions") { + withSQLConf("spark.sql.fluss.scan.max.records.per.partition" -> maxRecords.toString) { + val df = sql(s"SELECT * FROM $DEFAULT_DATABASE.t_partition ORDER BY orderId") + checkAnswer( + df, + Row(600L, 21L, 601, "addr1", "2026-01-01") :: + Row(700L, 22L, 602, "addr2", "2026-01-01") :: + Row(800L, 23L, 603, "addr3", "2026-01-02") :: + Row(900L, 24L, 604, "addr4", "2026-01-02") :: + Row(1000L, 25L, 605, "addr5", "2026-01-03") :: Nil + ) + + val partitions = getInputPartitions(df) + assertThat(partitions.length).isEqualTo(expectedPartitions) + } + } + } + } + } + + private def getInputPartitions(df: DataFrame): Array[InputPartition] = { + // Try executedPlan first (after AQE), then optimizedPlan + val fromExecutedPlan = df.queryExecution.executedPlan.collect { + case b: BatchScanExec => b.inputPartitions.toArray + } + if (fromExecutedPlan.nonEmpty) { + fromExecutedPlan.head + } else { + val scans = df.queryExecution.optimizedPlan.collect { + case DataSourceV2ScanRelation(_, scan: FlussAppendScan, _, _, _) => scan } + scans.headOption.map(_.toBatch.planInputPartitions()).getOrElse(Array.empty[InputPartition]) } } } From 4bc780598f071a88c3f90199e4f819f509ca614b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Wed, 13 May 2026 18:52:59 +0800 Subject: [PATCH 3/4] fix rebase --- .../src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala index d036113b12..c5a9a3e49c 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala @@ -25,8 +25,8 @@ import org.apache.fluss.client.table.scanner.log.LogScanner import org.apache.fluss.config.Configuration import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, TablePath} import org.apache.fluss.predicate.Predicate -import org.apache.fluss.spark.utils.SparkPartitionPredicate import org.apache.fluss.spark.SparkFlussConf +import org.apache.fluss.spark.utils.SparkPartitionPredicate import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory} import org.apache.spark.sql.types.StructType From 2a6a55bb7dd9b2e65db07f2a8ae54e575640a47f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Thu, 14 May 2026 10:25:21 +0800 Subject: [PATCH 4/4] trigger CI