Skip to content

[rust][writer] Add DynamicWriteBatchSizeEstimator#532

Open
fresh-borzoni wants to merge 2 commits intoapache:mainfrom
fresh-borzoni:feat/dynamic-batch-size
Open

[rust][writer] Add DynamicWriteBatchSizeEstimator#532
fresh-borzoni wants to merge 2 commits intoapache:mainfrom
fresh-borzoni:feat/dynamic-batch-size

Conversation

@fresh-borzoni
Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni commented May 7, 2026

Summary

closes #445

Adds DynamicWriteBatchSizeEstimator, mirroring Java Fluss's per-table batch size tuning. When writer.dynamic-batch-size-enabled is on, each table's target grows 10% after a batch fills past 80% and shrinks 5% below 50%, clamped to [writer.batch-size-min, writer.batch-size].

Wired into the writer's allocation path: append reads the per-table target instead of the static writer_batch_size; drain feeds the actual size back. The estimator lives on BucketAndWriteBatches behind a parking_lot::Mutex - both touch points short-circuit on the config flag, so the disabled path locks nothing.

@fresh-borzoni
Copy link
Copy Markdown
Member Author

@leekeiabstraction @charlesdong1991 PTAL 🙏

Copy link
Copy Markdown
Contributor

@charlesdong1991 charlesdong1991 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice PR!! i left some thoughts! @fresh-borzoni PTAL 🙏

partition_id: Option<PartitionId>,
config: &Config,
) -> Self {
let estimator = DynamicWriteBatchSizeEstimator::new(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this estimator is always being constructed no matter the dynamic setting is true or false?

let dynamic_target = self
.config
.writer_dynamic_batch_size_enabled
.then(|| bucket_and_batches.dynamic_batch_size.lock().current());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given how we use dynamic_batch_size here, i wonder if AtomicUsize will be a better option here than Mutex to avoid locking?

| `writer_acks` | `std::string` | `"all"` | Acknowledgment setting (`"all"`, `"0"`, `"1"`, or `"-1"`) |
| `writer_retries` | `int32_t` | `INT32_MAX` | Number of retries on failure |
| `writer_batch_size` | `int32_t` | `2097152` (2 MB) | Batch size for writes in bytes |
| `writer_batch_size` | `int32_t` | `2097152` (2 MB) | Batch size for writes in bytes (also the upper bound when dynamic sizing is enabled) |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel a little bit counterintuitive from user perspective to have writer_batch_size for max, while writer_batch_size_min for min 😅 but very personal feeling.

Nit: on doc itself, maybe clearer to mention it is default batch size when dynamic sizing is disabled?

return;
}
if let Some(entry) = self.write_batches.get(table_path) {
entry.dynamic_batch_size.lock().update(actual);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: i wonder in production environment, would it be helpful to see tracing/debugging message when value changes since it might be quite important change that impacts performance?

}

#[test]
fn dynamic_batch_size_shrinks_after_small_drained_batch() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we add a complementary test to verify grow direction end-to-end? aka, it shrinks estimate first, so drain small batches, then append that drained batch goes beyond 80% of target, and then we should verify that next allocation will increase?

this is very minor, we can ignore too

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement DynamicWriteBatchSizeEstimator

2 participants