Skip to content

[spark] Add scan.max.records.per.partition config to split log table input partitions#3260

Open
Yohahaha wants to merge 1 commit intoapache:mainfrom
Yohahaha:spark-split-partition
Open

[spark] Add scan.max.records.per.partition config to split log table input partitions#3260
Yohahaha wants to merge 1 commit intoapache:mainfrom
Yohahaha:spark-split-partition

Conversation

@Yohahaha
Copy link
Copy Markdown
Contributor

@Yohahaha Yohahaha commented May 7, 2026

Purpose

Linked issue: close #3215

Brief change log

  • Introduce scan.max.records.per.partition config option for Spark log table reads. When set, each Fluss
    bucket whose offset range exceeds this value will be split into multiple Spark input partitions, improving
    read parallelism for large offset ranges.
  • Update BucketOffsetsRetrieverImpl to support fetching real earliest offsets when needed.

Tests

SparkLogTableReadTest: "Spark Read: split partition by config"

API and Format

Documentation

@Yohahaha Yohahaha marked this pull request as ready for review May 7, 2026 02:52
@Yohahaha
Copy link
Copy Markdown
Contributor Author

Yohahaha commented May 7, 2026

@YannByron

@Yohahaha
Copy link
Copy Markdown
Contributor Author

Yohahaha commented May 7, 2026

@luoyuxia @fresh-borzoni PTAL!

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yohahaha Ty for the PR, overall LGTM, left minor comments, PTAL

stopOffset: Long,
maxRecords: Long): Seq[InputPartition] = {
if (
startOffset < 0 || stopOffset <= startOffset || stopOffset <= (startOffset + maxRecords)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the earliest mode we have sentinel -2L, I think it would result in a bug here, since we clamp to 1 split

public class BucketOffsetsRetrieverImpl implements OffsetsInitializer.BucketOffsetsRetriever {
private final Admin flussAdmin;
private final TablePath tablePath;
private final Boolean fetchEarliestOffset;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mb better to use primitive, since it doesn't allow null

) {
return Seq(
FlussAppendInputPartition(tableBucket, startOffset, stopOffset)
.asInstanceOf[InputPartition])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it's redundant

.map {
from =>
FlussAppendInputPartition(tableBucket, from, math.min(from + step, stopOffset))
.asInstanceOf[InputPartition]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

test("Spark Read: split partition by config") {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test only checks row order/values, not partition count. Also what about partitioned tables? Earliest mode?

.asInstanceOf[InputPartition]
Seq(
FlussAppendInputPartition(tableBucket, startOffset, stopOffset)
.asInstanceOf[InputPartition])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[spark] Add config to split input partition by input size

2 participants