feat: add support of iceberg key encoder#308
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds support for Iceberg key encoding by implementing the IcebergKeyEncoder struct that follows Fluss Java's encoding specifications. The implementation enables Fluss to encode row keys for the Iceberg data lake format.
Changes:
- Added new
IcebergKeyEncoderimplementation with support for scalar types (Int, BigInt, Float, Double, Date, Time, Timestamp, Decimal, String, Char, Binary, Bytes) - Integrated
IcebergKeyEncoderinto theKeyEncoderFactoryto handle Iceberg format - Added comprehensive test coverage for supported data types
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| crates/fluss/src/row/encode/mod.rs | Replaces the "not yet implemented" error for Iceberg format with the new IcebergKeyEncoder implementation |
| crates/fluss/src/row/encode/iceberg_key_encoder.rs | Complete implementation of the Iceberg key encoder with type validation, encoding logic, and tests for supported types |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { | ||
| let micros = | ||
| ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); | ||
| micros.to_le_bytes().to_vec() |
There was a problem hiding this comment.
Potential integer overflow in timestamp conversion. At line 138, ts.get_millisecond() * 1000 could overflow for very large timestamp values. The maximum i64 value is 9,223,372,036,854,775,807, so multiplying a timestamp in milliseconds by 1000 could overflow for timestamps beyond year 292,277,026. Consider using checked_mul() to detect overflow, or documenting this limitation if it's acceptable based on the expected range of timestamps in Iceberg format.
| fn validate_supported_type(field_type: &DataType) -> Result<()> { | ||
| match field_type { | ||
| DataType::Int(_) | ||
| | DataType::BigInt(_) | ||
| | DataType::Float(_) | ||
| | DataType::Double(_) | ||
| | DataType::Date(_) | ||
| | DataType::Time(_) | ||
| | DataType::Timestamp(_) | ||
| | DataType::Decimal(_) | ||
| | DataType::String(_) | ||
| | DataType::Char(_) | ||
| | DataType::Binary(_) | ||
| | DataType::Bytes(_) => Ok(()), | ||
|
|
||
| DataType::Array(_) => Err(IllegalArgument { | ||
| message: | ||
| "Array types cannot be used as bucket keys. Bucket keys must be scalar types." | ||
| .to_string(), | ||
| }), | ||
| DataType::Map(_) => Err(IllegalArgument { | ||
| message: | ||
| "Map types cannot be used as bucket keys. Bucket keys must be scalar types." | ||
| .to_string(), | ||
| }), | ||
| other => Err(IllegalArgument { | ||
| message: format!("Unsupported type for Iceberg key encoder: {other}"), | ||
| }), | ||
| } | ||
| } |
There was a problem hiding this comment.
The type validation is incomplete. Several data types are supported by the FieldGetter but are missing from this validation:
- Boolean (DataType::Boolean) - which would be returned as Datum::Bool
- TinyInt (DataType::TinyInt) - which would be returned as Datum::Int8
- SmallInt (DataType::SmallInt) - which would be returned as Datum::Int16
- TimestampLtz (DataType::TimestampLTz) - which would be returned as Datum::TimestampLtz
Without explicit handling for these types, if a user attempts to use them as key fields, they would get a generic "Unsupported type" error at validation time. However, if this is intentional and matches the Java implementation behavior, then these types should be listed explicitly in validation with appropriate error messages explaining they are not supported for Iceberg keys.
| let bytes: Vec<u8> = match (&self.field_type, value) { | ||
| (DataType::Int(_), Datum::Int32(v)) => (v as i64).to_le_bytes().to_vec(), | ||
| (DataType::Date(_), Datum::Date(v)) => (v.get_inner() as i64).to_le_bytes().to_vec(), | ||
|
|
||
| (DataType::Time(_), Datum::Time(v)) => { | ||
| let micros = v.get_inner() as i64 * 1000; | ||
| micros.to_le_bytes().to_vec() | ||
| } | ||
|
|
||
| (DataType::BigInt(_), Datum::Int64(v)) => v.to_le_bytes().to_vec(), | ||
| (DataType::Float(_), Datum::Float32(v)) => v.0.to_le_bytes().to_vec(), | ||
| (DataType::Double(_), Datum::Float64(v)) => v.0.to_le_bytes().to_vec(), | ||
|
|
||
| (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { | ||
| let micros = | ||
| ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); | ||
| micros.to_le_bytes().to_vec() | ||
| } | ||
|
|
||
| (DataType::Decimal(_), Datum::Decimal(d)) => d.to_unscaled_bytes(), | ||
| (DataType::String(_), Datum::String(s)) => s.as_bytes().to_vec(), | ||
| (DataType::Char(_), Datum::String(s)) => s.as_bytes().to_vec(), | ||
| (DataType::Binary(_), Datum::Blob(b)) => b.as_ref().to_vec(), | ||
| (DataType::Bytes(_), Datum::Blob(b)) => b.as_ref().to_vec(), | ||
|
|
||
| // FieldGetter uses Datum::String for CHAR, Datum::Blob for BINARY/BYTES. | ||
| (expected_type, actual) => { | ||
| return Err(IllegalArgument { | ||
| message: format!( | ||
| "IcebergKeyEncoder type mismatch: expected {expected_type}, got {actual:?}" | ||
| ), | ||
| }); | ||
| } | ||
| }; |
There was a problem hiding this comment.
The encode_key function is missing match arms for several data types that could potentially be returned by FieldGetter:
- Boolean (DataType::Boolean) - would return Datum::Bool
- TinyInt (DataType::TinyInt) - would return Datum::Int8
- SmallInt (DataType::SmallInt) - would return Datum::Int16
- TimestampLtz (DataType::TimestampLTz) - would return Datum::TimestampLtz
If these types pass validation (e.g., if validation is updated), the match statement would fail at runtime with an "IcebergKeyEncoder type mismatch" error at line 150-154. Consider adding explicit handling for these types in the match statement, even if only to encode them (for numeric types) or to return a more specific error message explaining they are unsupported for Iceberg keys.
|
Will you have some time to update the PR following the review? Thanks a lot! @zuston |
Sure, will be updated in this week |
e3addf9 to
04b3a08
Compare
|
@qzyu999 Thank you for offering to finish up the PR. I've pushed but reverted it from this PR as there were build failures from rebase conflict. |
|
@zuston Note there were CI failure on your last push: https://github.com/apache/fluss-rust/actions/runs/24123013769/job/70381044371?pr=308 Can you please take a look? Also it seems further rebase is needed now. |
leekeiabstraction
left a comment
There was a problem hiding this comment.
Added some comments. PTAL
| )) | ||
| /// Helper function to create a ListBuilder for a given element Arrow type. | ||
| /// This uses boxed builders to allow dynamic dispatch for nested types. | ||
| fn create_list_builder( |
There was a problem hiding this comment.
The functions create_list_builder, create_builder_for_type and create_boxed_builder are fairly similar. Can we apply DRY e.g. through generics or macro?
| Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, Time32SecondBuilder, | ||
| Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder, | ||
| TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, | ||
| Int32Builder, Int64Builder, ListBuilder, StringBuilder, StructBuilder, |
There was a problem hiding this comment.
I am not sure why we have changes for column writer within this PR?
sure, let me take a look tomorrow. |
Purpose
Linked issue: close #xxx
Brief change log
Tests
API and Format
Documentation