Skip to content

Commit 4c3ce8d

Browse files
Sandeep GottimukkalaSandeep Gottimukkala
authored andcommitted
feat(rest): add scan plan endpoints to REST catalog client
- Add PlanTableScan, FetchPlanningResult, CancelPlanning, FetchScanTasks methods to RestCatalog, wiring them to the corresponding REST endpoints - Add scan plan endpoint definitions and resource path helpers - Add ScanPlanErrorHandler with proper 404/406 error dispatch (NoSuchPlanIdException, NoSuchPlanTaskException, etc.) - Add PlanTableScanRequest/FetchScanTasksRequest serialization helpers - Add kNoSuchPlanId and kNoSuchPlanTask ErrorKind values - Add DataFileFromJson and FileScanTasksFromJson overloads with partition spec and schema support - Add endpoint and integration tests for all 4 scan plan operations
1 parent c27fd15 commit 4c3ce8d

19 files changed

Lines changed: 1164 additions & 3 deletions

src/iceberg/catalog.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
#include <unordered_set>
2727
#include <vector>
2828

29+
#include "iceberg/catalog/rest/types.h"
2930
#include "iceberg/result.h"
3031
#include "iceberg/table_identifier.h"
32+
#include "iceberg/table_scan.h"
3133
#include "iceberg/type_fwd.h"
3234

3335
namespace iceberg {
@@ -188,6 +190,22 @@ class ICEBERG_EXPORT Catalog {
188190
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
189191
virtual Result<std::shared_ptr<Table>> RegisterTable(
190192
const TableIdentifier& identifier, const std::string& metadata_file_location) = 0;
193+
194+
virtual Result<rest::PlanTableScanResponse> PlanTableScan(
195+
const Table& table, const internal::TableScanContext& context) {
196+
return NotImplemented("PlanTableScan is not supported by this catalog");
197+
}
198+
virtual Result<rest::FetchPlanningResultResponse> FetchPlanningResult(
199+
const Table& table, const std::string& plan_id) {
200+
return NotImplemented("FetchPlanningResult is not supported by this catalog");
201+
}
202+
virtual Status CancelPlanning(const Table& table, const std::string& plan_id) {
203+
return NotImplemented("CancelPlanning is not supported by this catalog");
204+
}
205+
virtual Result<rest::FetchScanTasksResponse> FetchScanTasks(
206+
const Table& table, const std::string& plan_task) {
207+
return NotImplemented("FetchScanTasks is not supported by this catalog");
208+
}
191209
};
192210

193211
} // namespace iceberg

src/iceberg/catalog/rest/endpoint.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,23 @@ class ICEBERG_REST_EXPORT Endpoint {
128128
return {HttpMethod::kPost, "/v1/{prefix}/transactions/commit"};
129129
}
130130

131+
// Scan planning endpoints
132+
static Endpoint PlanTableScan() {
133+
return {HttpMethod::kPost, "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"};
134+
}
135+
136+
static Endpoint FetchPlanningResult() {
137+
return {HttpMethod::kGet, "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"};
138+
}
139+
140+
static Endpoint CancelPlanning() {
141+
return {HttpMethod::kDelete, "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"};
142+
}
143+
144+
static Endpoint FetchScanTasks() {
145+
return {HttpMethod::kPost, "/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks"};
146+
}
147+
131148
private:
132149
Endpoint(HttpMethod method, std::string_view path) : method_(method), path_(path) {}
133150

src/iceberg/catalog/rest/error_handlers.cc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ namespace {
3030
constexpr std::string_view kIllegalArgumentException = "IllegalArgumentException";
3131
constexpr std::string_view kNoSuchNamespaceException = "NoSuchNamespaceException";
3232
constexpr std::string_view kNamespaceNotEmptyException = "NamespaceNotEmptyException";
33+
constexpr std::string_view kNoSuchTableException = "NoSuchTableException";
34+
constexpr std::string_view kNoSuchPlanIdException = "NoSuchPlanIdException";
35+
constexpr std::string_view kNoSuchPlanTaskException = "NoSuchPlanTaskException";
3336

3437
} // namespace
3538

@@ -183,4 +186,34 @@ Status ViewCommitErrorHandler::Accept(const ErrorResponse& error) const {
183186
return DefaultErrorHandler::Accept(error);
184187
}
185188

189+
const std::shared_ptr<ScanPlanErrorHandler>& ScanPlanErrorHandler::Instance() {
190+
static const std::shared_ptr<ScanPlanErrorHandler> instance{
191+
new ScanPlanErrorHandler()};
192+
return instance;
193+
}
194+
195+
Status ScanPlanErrorHandler::Accept(const ErrorResponse& error) const {
196+
switch (error.code) {
197+
case 404:
198+
if (error.type == kNoSuchNamespaceException) {
199+
return NoSuchNamespace(error.message);
200+
}
201+
if (error.type == kNoSuchTableException) {
202+
return NoSuchTable(error.message);
203+
}
204+
if (error.type == kNoSuchPlanIdException) {
205+
return NoSuchPlanId(error.message);
206+
}
207+
if (error.type == kNoSuchPlanTaskException) {
208+
return NoSuchPlanTask(error.message);
209+
}
210+
return NotFound(error.message);
211+
case 406:
212+
return NotSupported(error.message);
213+
}
214+
215+
return DefaultErrorHandler::Accept(error);
216+
}
217+
218+
186219
} // namespace iceberg::rest

src/iceberg/catalog/rest/error_handlers.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,14 @@ class ICEBERG_REST_EXPORT ViewCommitErrorHandler final : public DefaultErrorHand
127127
constexpr ViewCommitErrorHandler() = default;
128128
};
129129

130+
class ICEBERG_REST_EXPORT ScanPlanErrorHandler final : public DefaultErrorHandler {
131+
public:
132+
static const std::shared_ptr<ScanPlanErrorHandler>& Instance();
133+
134+
Status Accept(const ErrorResponse& error) const override;
135+
136+
private:
137+
constexpr ScanPlanErrorHandler() = default;
138+
};
139+
130140
} // namespace iceberg::rest

src/iceberg/catalog/rest/json_serde.cc

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626

2727
#include "iceberg/catalog/rest/json_serde_internal.h"
2828
#include "iceberg/catalog/rest/types.h"
29+
#include "iceberg/expression/json_serde_internal.h"
2930
#include "iceberg/json_serde_internal.h"
3031
#include "iceberg/partition_spec.h"
32+
#include "iceberg/schema.h"
3133
#include "iceberg/sort_order.h"
3234
#include "iceberg/table_identifier.h"
3335
#include "iceberg/table_requirement.h"
@@ -78,6 +80,21 @@ constexpr std::string_view kExpiresIn = "expires_in";
7880
constexpr std::string_view kIssuedTokenType = "issued_token_type";
7981
constexpr std::string_view kRefreshToken = "refresh_token";
8082
constexpr std::string_view kOAuthScope = "scope";
83+
constexpr std::string_view kPlanStatus = "status";
84+
constexpr std::string_view kPlanId = "plan-id";
85+
constexpr std::string_view kPlanTasks = "plan-tasks";
86+
constexpr std::string_view kFileScanTasks = "file-scan-tasks";
87+
constexpr std::string_view kDeleteFiles = "delete-files";
88+
constexpr std::string_view kSnapshotId = "snapshot-id";
89+
constexpr std::string_view kSelect = "select";
90+
constexpr std::string_view kFilter = "filter";
91+
constexpr std::string_view kCaseSensitive = "case-sensitive";
92+
constexpr std::string_view kUseSnapshotSchema = "use-snapshot-schema";
93+
constexpr std::string_view kStartSnapshotId = "start-snapshot-id";
94+
constexpr std::string_view kEndSnapshotId = "end-snapshot-id";
95+
constexpr std::string_view kStatsFields = "stats-fields";
96+
constexpr std::string_view kMinRowsRequired = "min-rows-required";
97+
constexpr std::string_view kPlanTask = "plan-task";
8198

8299
} // namespace
83100

@@ -506,6 +523,114 @@ Result<OAuthTokenResponse> OAuthTokenResponseFromJson(const nlohmann::json& json
506523
return response;
507524
}
508525

526+
Result<nlohmann::json> ToJson(const PlanTableScanRequest& request) {
527+
nlohmann::json json;
528+
if (request.snapshot_id.has_value()) {
529+
json[kSnapshotId] = request.snapshot_id.value();
530+
}
531+
if (!request.select.empty()) {
532+
json[kSelect] = request.select;
533+
}
534+
if (request.filter) {
535+
ICEBERG_ASSIGN_OR_RAISE(auto filter_json, iceberg::ToJson(*request.filter));
536+
json[kFilter] = std::move(filter_json);
537+
}
538+
json[kCaseSensitive] = request.case_sensitive;
539+
json[kUseSnapshotSchema] = request.use_snapshot_schema;
540+
if (request.start_snapshot_id.has_value()) {
541+
json[kStartSnapshotId] = request.start_snapshot_id.value();
542+
}
543+
if (request.end_snapshot_id.has_value()) {
544+
json[kEndSnapshotId] = request.end_snapshot_id.value();
545+
}
546+
if (!request.statsFields.empty()) {
547+
json[kStatsFields] = request.statsFields;
548+
}
549+
if (request.min_rows_required.has_value()) {
550+
json[kMinRowsRequired] = request.min_rows_required.value();
551+
}
552+
return json;
553+
}
554+
555+
nlohmann::json ToJson(const FetchScanTasksRequest& request) {
556+
nlohmann::json json;
557+
json[kPlanTask] = request.planTask;
558+
return json;
559+
}
560+
561+
Status BaseScanTaskResponseFromJson(
562+
const nlohmann::json& json, BaseScanTaskResponse* response,
563+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_specs_by_id,
564+
const Schema& schema) {
565+
// 1. plan_tasks
566+
ICEBERG_ASSIGN_OR_RAISE(auto plan_tasks,
567+
GetJsonValue<nlohmann::json>(json, kPlanTasks));
568+
if (!plan_tasks.is_array()) {
569+
return JsonParseError("Cannot parse plan tasks from non-array: {}",
570+
SafeDumpJson(plan_tasks));
571+
}
572+
ICEBERG_ASSIGN_OR_RAISE(response->plan_tasks,
573+
GetTypedJsonValue<std::vector<std::string>>(plan_tasks));
574+
575+
// 2. delete_files
576+
ICEBERG_ASSIGN_OR_RAISE(auto delete_files_json,
577+
GetJsonValue<nlohmann::json>(json, kDeleteFiles));
578+
if (!delete_files_json.is_array()) {
579+
return JsonParseError("Cannot parse delete files from non-array: {}",
580+
SafeDumpJson(delete_files_json));
581+
}
582+
for (const auto& entry_json : delete_files_json) {
583+
ICEBERG_ASSIGN_OR_RAISE(auto delete_file,
584+
DataFileFromJson(entry_json, partition_specs_by_id, schema));
585+
response->delete_files.push_back(std::move(delete_file));
586+
}
587+
588+
// 3. file_scan_tasks
589+
ICEBERG_ASSIGN_OR_RAISE(auto file_scan_tasks_json,
590+
GetJsonValue<nlohmann::json>(json, kFileScanTasks));
591+
ICEBERG_ASSIGN_OR_RAISE(
592+
response->file_scan_tasks,
593+
FileScanTasksFromJson(file_scan_tasks_json, response->delete_files,
594+
partition_specs_by_id, schema));
595+
return {};
596+
}
597+
598+
Result<PlanTableScanResponse> PlanTableScanResponseFromJson(
599+
const nlohmann::json& json,
600+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_specs_by_id,
601+
const Schema& schema) {
602+
PlanTableScanResponse response;
603+
ICEBERG_ASSIGN_OR_RAISE(response.plan_status,
604+
GetJsonValue<std::string>(json, kPlanStatus));
605+
ICEBERG_ASSIGN_OR_RAISE(response.plan_id, GetJsonValue<std::string>(json, kPlanId));
606+
ICEBERG_RETURN_UNEXPECTED(
607+
BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema));
608+
return response;
609+
}
610+
611+
Result<FetchPlanningResultResponse> FetchPlanningResultResponseFromJson(
612+
const nlohmann::json& json,
613+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_specs_by_id,
614+
const Schema& schema) {
615+
FetchPlanningResultResponse response;
616+
ICEBERG_ASSIGN_OR_RAISE(auto status_str,
617+
GetJsonValue<std::string>(json, kPlanStatus));
618+
response.plan_status = PlanStatus(PlanStatus::FromString(status_str));
619+
ICEBERG_RETURN_UNEXPECTED(
620+
BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema));
621+
return response;
622+
}
623+
624+
Result<FetchScanTasksResponse> FetchScanTasksResponseFromJson(
625+
const nlohmann::json& json,
626+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_specs_by_id,
627+
const Schema& schema) {
628+
FetchScanTasksResponse response;
629+
ICEBERG_RETURN_UNEXPECTED(
630+
BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema));
631+
return response;
632+
}
633+
509634
#define ICEBERG_DEFINE_FROM_JSON(Model) \
510635
template <> \
511636
Result<Model> FromJson<Model>(const nlohmann::json& json) { \

src/iceberg/catalog/rest/json_serde_internal.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919

2020
#pragma once
2121

22+
#include <unordered_map>
23+
2224
#include <nlohmann/json_fwd.hpp>
2325

2426
#include "iceberg/catalog/rest/iceberg_rest_export.h"
2527
#include "iceberg/catalog/rest/types.h"
28+
#include "iceberg/partition_spec.h"
2629
#include "iceberg/result.h"
30+
#include "iceberg/schema.h"
2731

2832
/// \file iceberg/catalog/rest/json_serde_internal.h
2933
/// JSON serialization and deserialization for Iceberg REST Catalog API types.
@@ -62,4 +66,22 @@ ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse)
6266

6367
#undef ICEBERG_DECLARE_JSON_SERDE
6468

69+
ICEBERG_REST_EXPORT Result<PlanTableScanResponse> PlanTableScanResponseFromJson(
70+
const nlohmann::json& json,
71+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_specs_by_id,
72+
const Schema& schema);
73+
74+
ICEBERG_REST_EXPORT Result<FetchPlanningResultResponse> FetchPlanningResultResponseFromJson(
75+
const nlohmann::json& json,
76+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_specs_by_id,
77+
const Schema& schema);
78+
79+
ICEBERG_REST_EXPORT Result<FetchScanTasksResponse> FetchScanTasksResponseFromJson(
80+
const nlohmann::json& json,
81+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_specs_by_id,
82+
const Schema& schema);
83+
84+
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const PlanTableScanRequest& request);
85+
ICEBERG_REST_EXPORT nlohmann::json ToJson(const FetchScanTasksRequest& request);
86+
6587
} // namespace iceberg::rest

src/iceberg/catalog/rest/resource_paths.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,25 @@ Result<std::string> ResourcePaths::CommitTransaction() const {
102102
return std::format("{}/v1/{}transactions/commit", base_uri_, prefix_);
103103
}
104104

105+
Result<std::string> ResourcePaths::ScanPlan(const TableIdentifier& ident) const {
106+
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns));
107+
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name));
108+
return std::format("{}/v1/{}namespaces/{}/tables/{}/plan", base_uri_, prefix_,
109+
encoded_namespace, encoded_table_name);
110+
}
111+
112+
Result<std::string> ResourcePaths::ScanPlan(const TableIdentifier& ident, const std::string& plan_id) const {
113+
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns));
114+
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name));
115+
return std::format("{}/v1/{}namespaces/{}/tables/{}/plan/{}", base_uri_, prefix_,
116+
encoded_namespace, encoded_table_name, plan_id);
117+
}
118+
119+
Result<std::string> ResourcePaths::ScanTask(const TableIdentifier& ident) const {
120+
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns));
121+
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name));
122+
return std::format("{}/v1/{}namespaces/{}/tables/{}/tasks", base_uri_, prefix_,
123+
encoded_namespace, encoded_table_name);
124+
}
125+
105126
} // namespace iceberg::rest

src/iceberg/catalog/rest/resource_paths.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ class ICEBERG_REST_EXPORT ResourcePaths {
8181
/// \brief Get the /v1/{prefix}/transactions/commit endpoint path.
8282
Result<std::string> CommitTransaction() const;
8383

84+
Result<std::string> ScanPlan(const TableIdentifier& ident) const;
85+
Result<std::string> ScanPlan(const TableIdentifier& ident, const std::string& plan_id) const;
86+
Result<std::string> ScanTask(const TableIdentifier& ident) const;
87+
8488
private:
8589
ResourcePaths(std::string base_uri, const std::string& prefix);
8690

0 commit comments

Comments
 (0)