[rust][writer] Add DynamicWriteBatchSizeEstimator#532
[rust][writer] Add DynamicWriteBatchSizeEstimator#532fresh-borzoni wants to merge 2 commits intoapache:mainfrom
Conversation
charlesdong1991
left a comment
There was a problem hiding this comment.
Nice PR!! i left some thoughts! @fresh-borzoni PTAL 🙏
| partition_id: Option<PartitionId>, | ||
| config: &Config, | ||
| ) -> Self { | ||
| let estimator = DynamicWriteBatchSizeEstimator::new( |
There was a problem hiding this comment.
so this estimator is always being constructed no matter the dynamic setting is true or false?
| let dynamic_target = self | ||
| .config | ||
| .writer_dynamic_batch_size_enabled | ||
| .then(|| bucket_and_batches.dynamic_batch_size.lock().current()); |
There was a problem hiding this comment.
given how we use dynamic_batch_size here, i wonder if AtomicUsize will be a better option here than Mutex to avoid locking?
| | `writer_acks` | `std::string` | `"all"` | Acknowledgment setting (`"all"`, `"0"`, `"1"`, or `"-1"`) | | ||
| | `writer_retries` | `int32_t` | `INT32_MAX` | Number of retries on failure | | ||
| | `writer_batch_size` | `int32_t` | `2097152` (2 MB) | Batch size for writes in bytes | | ||
| | `writer_batch_size` | `int32_t` | `2097152` (2 MB) | Batch size for writes in bytes (also the upper bound when dynamic sizing is enabled) | |
There was a problem hiding this comment.
Feel a little bit counterintuitive from user perspective to have writer_batch_size for max, while writer_batch_size_min for min 😅 but very personal feeling.
Nit: on doc itself, maybe clearer to mention it is default batch size when dynamic sizing is disabled?
| return; | ||
| } | ||
| if let Some(entry) = self.write_batches.get(table_path) { | ||
| entry.dynamic_batch_size.lock().update(actual); |
There was a problem hiding this comment.
Nit: i wonder in production environment, would it be helpful to see tracing/debugging message when value changes since it might be quite important change that impacts performance?
| } | ||
|
|
||
| #[test] | ||
| fn dynamic_batch_size_shrinks_after_small_drained_batch() { |
There was a problem hiding this comment.
Nit: can we add a complementary test to verify grow direction end-to-end? aka, it shrinks estimate first, so drain small batches, then append that drained batch goes beyond 80% of target, and then we should verify that next allocation will increase?
this is very minor, we can ignore too
Summary
closes #445
Adds DynamicWriteBatchSizeEstimator, mirroring Java Fluss's per-table batch size tuning. When writer.dynamic-batch-size-enabled is on, each table's target grows 10% after a batch fills past 80% and shrinks 5% below 50%, clamped to [writer.batch-size-min, writer.batch-size].
Wired into the writer's allocation path: append reads the per-table target instead of the static writer_batch_size; drain feeds the actual size back. The estimator lives on BucketAndWriteBatches behind a parking_lot::Mutex - both touch points short-circuit on the config flag, so the disabled path locks nothing.