Skip to content

Commit a151ec5

Browse files
committed
fix: address PR apache#548 review feedback for S3 FileIO integration
Key changes based on reviewer (wgtmac) feedback: 1. Add ICEBERG_S3 CMake option to conditionally enable ARROW_S3, replacing the unconditional `set(ARROW_S3 ON)`. 2. Replace `#if __has_include(<arrow/filesystem/s3fs.h>)` with `#ifdef ICEBERG_HAVE_S3` compile definition controlled by CMake. 3. Remove ArrowUriFileIO class - reuse existing ArrowFileSystemFileIO by wrapping Arrow S3FileSystem created via Arrow API. 4. Create missing files: - s3_properties.h: S3 configuration property key constants - file_io_registry.h/.cc: FileIO factory registry (Register/Load) - file_io_register.cc: Arrow FileIO factory registration - file_io_util.h/.cc: Reusable FileIO creation utility - file_io_registry_test.cc: Unit tests for the registry 5. Extract FileIO creation logic from rest_catalog.cc into iceberg/util/file_io_util.h for reusability. 6. Fix code issues: - Implement path-style access (force_virtual_addressing = false) - Add timeout value validation (non-negative check) - Replace rfind("s3://", 0) with starts_with("s3://") - Fix format string bug in rest_catalog_test.cc 7. Update CI workflow and build script to pass ICEBERG_S3=ON. https://claude.ai/code/session_01GzV7A8VoYyWUN8QdtqgiMq
1 parent acf12cf commit a151ec5

17 files changed

Lines changed: 459 additions & 138 deletions

.github/workflows/test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ jobs:
6464
env:
6565
CC: gcc-14
6666
CXX: g++-14
67-
run: ci/scripts/build_iceberg.sh $(pwd) ON
67+
run: ci/scripts/build_iceberg.sh $(pwd) ON OFF ON
6868
- name: Build Example
6969
shell: bash
7070
env:
@@ -92,7 +92,7 @@ jobs:
9292
run: bash ci/scripts/start_minio.sh
9393
- name: Build Iceberg
9494
shell: bash
95-
run: ci/scripts/build_iceberg.sh $(pwd)
95+
run: ci/scripts/build_iceberg.sh $(pwd) OFF OFF ON
9696
- name: Build Example
9797
shell: bash
9898
run: ci/scripts/build_example.sh $(pwd)/example
@@ -127,7 +127,7 @@ jobs:
127127
SCCACHE_GHA_ENABLED: "true"
128128
run: |
129129
call "C:\Program Files\Microsoft Visual Studio\2022\Enterprise\VC\Auxiliary\Build\vcvarsall.bat" x64
130-
bash -c "ci/scripts/build_iceberg.sh $(pwd) OFF ON"
130+
bash -c "ci/scripts/build_iceberg.sh $(pwd) OFF ON ON"
131131
sccache --show-stats
132132
- name: Build Example
133133
shell: cmd

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ option(ICEBERG_BUILD_TESTS "Build tests" ON)
4545
option(ICEBERG_BUILD_BUNDLE "Build the battery included library" ON)
4646
option(ICEBERG_BUILD_REST "Build rest catalog client" ON)
4747
option(ICEBERG_BUILD_REST_INTEGRATION_TESTS "Build rest catalog integration tests" OFF)
48+
option(ICEBERG_S3 "Enable S3 support via Arrow S3" OFF)
4849
option(ICEBERG_ENABLE_ASAN "Enable Address Sanitizer" OFF)
4950
option(ICEBERG_ENABLE_UBSAN "Enable Undefined Behavior Sanitizer" OFF)
5051

ci/scripts/build_iceberg.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ source_dir=${1}
2323
build_dir=${1}/build
2424
build_rest_integration_test=${2:-OFF}
2525
build_enable_sccache=${3:-OFF}
26+
build_s3=${4:-OFF}
2627

2728
mkdir ${build_dir}
2829
pushd ${build_dir}
@@ -37,6 +38,7 @@ CMAKE_ARGS=(
3738
"-DICEBERG_BUILD_STATIC=ON"
3839
"-DICEBERG_BUILD_SHARED=ON"
3940
"-DICEBERG_BUILD_REST_INTEGRATION_TESTS=${build_rest_integration_test}"
41+
"-DICEBERG_S3=${build_s3}"
4042
)
4143

4244
if is_windows; then

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ function(resolve_arrow_dependency)
8787
# Work around undefined symbol: arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*)
8888
set(ARROW_IPC ON)
8989
set(ARROW_FILESYSTEM ON)
90-
set(ARROW_S3 ON)
90+
set(ARROW_S3 ${ICEBERG_S3})
9191
set(ARROW_JSON ON)
9292
set(ARROW_PARQUET ON)
9393
set(ARROW_SIMD_LEVEL "NONE")

src/iceberg/CMakeLists.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ set(ICEBERG_SOURCES
101101
update/update_statistics.cc
102102
util/bucket_util.cc
103103
util/content_file_util.cc
104+
util/file_io_util.cc
104105
util/conversions.cc
105106
util/decimal.cc
106107
util/gzip_internal.cc
@@ -239,6 +240,15 @@ if(ICEBERG_BUILD_BUNDLE)
239240
OUTPUTS
240241
ICEBERG_BUNDLE_LIBRARIES)
241242

243+
if(ICEBERG_S3)
244+
if(TARGET iceberg_bundle_static)
245+
target_compile_definitions(iceberg_bundle_static PUBLIC ICEBERG_HAVE_S3=1)
246+
endif()
247+
if(TARGET iceberg_bundle_shared)
248+
target_compile_definitions(iceberg_bundle_shared PUBLIC ICEBERG_HAVE_S3=1)
249+
endif()
250+
endif()
251+
242252
add_subdirectory(arrow)
243253
add_subdirectory(avro)
244254
add_subdirectory(parquet)

src/iceberg/arrow/arrow_s3_file_io.cc

Lines changed: 19 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,8 @@
2323

2424
#include <arrow/filesystem/filesystem.h>
2525
#include <arrow/filesystem/localfs.h>
26-
#if __has_include(<arrow/filesystem/s3fs.h>)
26+
#ifdef ICEBERG_HAVE_S3
2727
#include <arrow/filesystem/s3fs.h>
28-
#define ICEBERG_ARROW_HAS_S3 1
29-
#else
30-
#define ICEBERG_ARROW_HAS_S3 0
3128
#endif
3229

3330
#include "iceberg/arrow/arrow_file_io.h"
@@ -40,10 +37,10 @@ namespace iceberg::arrow {
4037

4138
namespace {
4239

43-
bool IsS3Uri(std::string_view uri) { return uri.rfind("s3://", 0) == 0; }
40+
bool IsS3Uri(std::string_view uri) { return uri.starts_with("s3://"); }
4441

4542
Status EnsureS3Initialized() {
46-
#if ICEBERG_ARROW_HAS_S3
43+
#ifdef ICEBERG_HAVE_S3
4744
static std::once_flag init_flag;
4845
static ::arrow::Status init_status = ::arrow::Status::OK();
4946
std::call_once(init_flag, []() {
@@ -64,7 +61,7 @@ Status EnsureS3Initialized() {
6461
#endif
6562
}
6663

67-
#if ICEBERG_ARROW_HAS_S3
64+
#ifdef ICEBERG_HAVE_S3
6865
/// \brief Configure S3Options from a properties map.
6966
///
7067
/// \param properties The configuration properties map.
@@ -104,9 +101,8 @@ ::arrow::fs::S3Options ConfigureS3Options(
104101

105102
// Configure path-style access (needed for MinIO)
106103
auto path_style_it = properties.find(S3Properties::kPathStyleAccess);
107-
if (path_style_it != properties.end()) {
108-
// Arrow's S3 path-style is controlled via endpoint scheme
109-
// For path-style access, we need to ensure the endpoint is properly configured
104+
if (path_style_it != properties.end() && path_style_it->second == "true") {
105+
options.force_virtual_addressing = false;
110106
}
111107

112108
// Configure SSL
@@ -118,12 +114,18 @@ ::arrow::fs::S3Options ConfigureS3Options(
118114
// Configure timeouts
119115
auto connect_timeout_it = properties.find(S3Properties::kConnectTimeoutMs);
120116
if (connect_timeout_it != properties.end()) {
121-
options.connect_timeout = std::stod(connect_timeout_it->second) / 1000.0;
117+
double timeout_ms = std::stod(connect_timeout_it->second);
118+
if (timeout_ms >= 0) {
119+
options.connect_timeout = timeout_ms / 1000.0;
120+
}
122121
}
123122

124123
auto socket_timeout_it = properties.find(S3Properties::kSocketTimeoutMs);
125124
if (socket_timeout_it != properties.end()) {
126-
options.request_timeout = std::stod(socket_timeout_it->second) / 1000.0;
125+
double timeout_ms = std::stod(socket_timeout_it->second);
126+
if (timeout_ms >= 0) {
127+
options.request_timeout = timeout_ms / 1000.0;
128+
}
127129
}
128130

129131
return options;
@@ -141,83 +143,19 @@ Result<std::shared_ptr<::arrow::fs::FileSystem>> MakeS3FileSystem(
141143
}
142144
#endif
143145

144-
Result<std::shared_ptr<::arrow::fs::FileSystem>> ResolveFileSystemFromUri(
145-
const std::string& uri, std::string* out_path) {
146-
if (IsS3Uri(uri)) {
147-
ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized());
148-
}
149-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::FileSystemFromUri(uri, out_path));
150-
return fs;
151-
}
152-
153-
/// \brief ArrowUriFileIO resolves FileSystem from URI for each operation.
154-
///
155-
/// This implementation is thread-safe as it creates a new FileSystem instance
156-
/// for each operation. However, it may be less efficient than caching the
157-
/// FileSystem. S3 initialization is done once per process.
158-
class ArrowUriFileIO : public FileIO {
159-
public:
160-
Result<std::string> ReadFile(const std::string& file_location,
161-
std::optional<size_t> length) override {
162-
std::string path;
163-
ICEBERG_ASSIGN_OR_RAISE(auto fs, ResolveFileSystemFromUri(file_location, &path));
164-
::arrow::fs::FileInfo file_info(path);
165-
if (length.has_value()) {
166-
file_info.set_size(length.value());
167-
}
168-
std::string content;
169-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, fs->OpenInputFile(file_info));
170-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize());
171-
172-
content.resize(file_size);
173-
size_t remain = file_size;
174-
size_t offset = 0;
175-
while (remain > 0) {
176-
size_t read_length = std::min(remain, static_cast<size_t>(1024 * 1024));
177-
ICEBERG_ARROW_ASSIGN_OR_RETURN(
178-
auto read_bytes,
179-
file->Read(read_length, reinterpret_cast<uint8_t*>(&content[offset])));
180-
remain -= read_bytes;
181-
offset += read_bytes;
182-
}
183-
184-
return content;
185-
}
186-
187-
Status WriteFile(const std::string& file_location,
188-
std::string_view content) override {
189-
std::string path;
190-
ICEBERG_ASSIGN_OR_RAISE(auto fs, ResolveFileSystemFromUri(file_location, &path));
191-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, fs->OpenOutputStream(path));
192-
ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size()));
193-
ICEBERG_ARROW_RETURN_NOT_OK(file->Flush());
194-
ICEBERG_ARROW_RETURN_NOT_OK(file->Close());
195-
return {};
196-
}
197-
198-
Status DeleteFile(const std::string& file_location) override {
199-
std::string path;
200-
ICEBERG_ASSIGN_OR_RAISE(auto fs, ResolveFileSystemFromUri(file_location, &path));
201-
ICEBERG_ARROW_RETURN_NOT_OK(fs->DeleteFile(path));
202-
return {};
203-
}
204-
};
205-
206146
} // namespace
207147

208148
Result<std::unique_ptr<FileIO>> MakeS3FileIO(const std::string& uri) {
209149
if (!IsS3Uri(uri)) {
210150
return InvalidArgument("S3 URI must start with s3://");
211151
}
212-
#if !ICEBERG_ARROW_HAS_S3
152+
#ifndef ICEBERG_HAVE_S3
213153
return NotImplemented("Arrow S3 support is not enabled");
214154
#else
215-
// Validate that S3 can be initialized and the URI is valid
155+
ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized());
216156
std::string path;
217-
ICEBERG_ASSIGN_OR_RAISE(auto fs, ResolveFileSystemFromUri(uri, &path));
218-
(void)path;
219-
(void)fs;
220-
return std::make_unique<ArrowUriFileIO>();
157+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::FileSystemFromUri(uri, &path));
158+
return std::make_unique<ArrowFileSystemFileIO>(std::move(fs));
221159
#endif
222160
}
223161

@@ -227,7 +165,7 @@ Result<std::unique_ptr<FileIO>> MakeS3FileIO(
227165
if (!IsS3Uri(uri)) {
228166
return InvalidArgument("S3 URI must start with s3://");
229167
}
230-
#if !ICEBERG_ARROW_HAS_S3
168+
#ifndef ICEBERG_HAVE_S3
231169
return NotImplemented("Arrow S3 support is not enabled");
232170
#else
233171
// If properties are empty, use the simple URI-based resolution
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/arrow/arrow_file_io.h"
21+
#include "iceberg/file_io_registry.h"
22+
23+
namespace iceberg::arrow {
24+
25+
namespace {
26+
27+
struct ArrowFileIORegistrar {
28+
ArrowFileIORegistrar() {
29+
FileIORegistry::Register(
30+
std::string(FileIORegistry::kArrowLocalFileIO),
31+
[](const std::string& /*warehouse*/,
32+
const std::unordered_map<std::string, std::string>& /*properties*/)
33+
-> Result<std::shared_ptr<FileIO>> { return MakeLocalFileIO(); });
34+
35+
FileIORegistry::Register(
36+
std::string(FileIORegistry::kArrowS3FileIO),
37+
[](const std::string& warehouse,
38+
const std::unordered_map<std::string, std::string>& properties)
39+
-> Result<std::shared_ptr<FileIO>> {
40+
auto result = MakeS3FileIO(warehouse, properties);
41+
if (!result.has_value()) {
42+
return std::unexpected(result.error());
43+
}
44+
return std::shared_ptr<FileIO>(std::move(result.value()));
45+
});
46+
}
47+
};
48+
49+
// Static initialization triggers registration at program startup
50+
static ArrowFileIORegistrar registrar;
51+
52+
} // namespace
53+
54+
} // namespace iceberg::arrow

src/iceberg/arrow/s3_properties.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <string_view>
23+
24+
namespace iceberg::arrow {
25+
26+
/// \brief Reserved property keys for S3 FileIO configuration.
27+
///
28+
/// These properties are used to configure S3 access when creating an S3-backed FileIO
29+
/// instance. They map to the corresponding Arrow S3Options fields.
30+
struct S3Properties {
31+
static constexpr std::string_view kAccessKeyId = "s3.access-key-id";
32+
static constexpr std::string_view kSecretAccessKey = "s3.secret-access-key";
33+
static constexpr std::string_view kSessionToken = "s3.session-token";
34+
static constexpr std::string_view kRegion = "s3.region";
35+
static constexpr std::string_view kEndpoint = "s3.endpoint";
36+
static constexpr std::string_view kPathStyleAccess = "s3.path-style-access";
37+
static constexpr std::string_view kSslEnabled = "s3.ssl.enabled";
38+
static constexpr std::string_view kConnectTimeoutMs = "s3.connect-timeout-ms";
39+
static constexpr std::string_view kSocketTimeoutMs = "s3.socket-timeout-ms";
40+
};
41+
42+
} // namespace iceberg::arrow

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
#include <nlohmann/json.hpp>
2828

2929
#include "iceberg/catalog/rest/catalog_properties.h"
30-
#include "iceberg/file_io_registry.h"
3130
#include "iceberg/catalog/rest/constant.h"
3231
#include "iceberg/catalog/rest/endpoint.h"
3332
#include "iceberg/catalog/rest/error_handlers.h"
@@ -46,6 +45,7 @@
4645
#include "iceberg/table_requirement.h"
4746
#include "iceberg/table_update.h"
4847
#include "iceberg/transaction.h"
48+
#include "iceberg/util/file_io_util.h"
4949
#include "iceberg/util/macros.h"
5050

5151
namespace iceberg::rest {
@@ -147,33 +147,11 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
147147

148148
Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
149149
const RestCatalogProperties& config) {
150-
// Get warehouse location to determine the appropriate FileIO type
151150
auto warehouse = config.Get(RestCatalogProperties::kWarehouse);
152-
if (warehouse.empty()) {
153-
return InvalidArgument(
154-
"Warehouse location is required when FileIO is not explicitly provided. "
155-
"Set the 'warehouse' property to an S3 URI (s3://...) or local path.");
156-
}
157-
158-
// Check for user-specified io-impl property
159-
auto io_impl = config.configs().find(FileIOProperties::kImpl);
160-
std::string impl_name;
161-
162-
if (io_impl != config.configs().end() && !io_impl->second.empty()) {
163-
// User specified a custom io-impl
164-
impl_name = io_impl->second;
165-
} else {
166-
// Use default based on warehouse URI scheme
167-
if (warehouse.rfind("s3://", 0) == 0) {
168-
impl_name = FileIORegistry::kArrowS3FileIO;
169-
} else {
170-
impl_name = FileIORegistry::kArrowLocalFileIO;
171-
}
172-
}
173151

174-
// Load FileIO from registry
152+
// Delegate FileIO creation to the utility function
175153
ICEBERG_ASSIGN_OR_RAISE(auto file_io,
176-
FileIORegistry::Load(impl_name, warehouse, config.configs()));
154+
CreateFileIOFromProperties(warehouse, config.configs()));
177155

178156
// Call the main Make method with the created FileIO
179157
return Make(config, std::move(file_io));

0 commit comments

Comments
 (0)