Skip to content

Commit 1610a5a

Browse files
authored
Merge pull request #140 from Tuntii/copilot/sub-pr-139
Fix review feedback: deduplication, pagination validation, lifecycle error handling, cache correctness
2 parents f8f1c76 + d5a2aeb commit 1610a5a

7 files changed

Lines changed: 256 additions & 41 deletions

File tree

crates/rustapi-core/src/app.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,9 +1065,13 @@ impl RustApi {
10651065
self.layers.prepend(Box::new(BodyLimitLayer::new(limit)));
10661066
}
10671067

1068-
// Run on_start lifecycle hooks before accepting connections
1069-
for hook in self.lifecycle_hooks.on_start {
1070-
hook().await;
1068+
// Run on_start lifecycle hooks before accepting connections.
1069+
// Each hook is spawned as a separate task so that a failing hook
1070+
// does not prevent subsequent hooks from running.
1071+
for (i, hook) in self.lifecycle_hooks.on_start.into_iter().enumerate() {
1072+
if let Err(e) = tokio::task::spawn(hook()).await {
1073+
tracing::error!("on_start lifecycle hook #{} failed: {:?}", i, e);
1074+
}
10711075
}
10721076

10731077
let server = Server::new(self.router, self.layers, self.interceptors);
@@ -1093,18 +1097,26 @@ impl RustApi {
10931097
self.layers.prepend(Box::new(BodyLimitLayer::new(limit)));
10941098
}
10951099

1096-
// Run on_start lifecycle hooks before accepting connections
1097-
for hook in self.lifecycle_hooks.on_start {
1098-
hook().await;
1100+
// Run on_start lifecycle hooks before accepting connections.
1101+
// Each hook is spawned as a separate task so that a failing hook
1102+
// does not prevent subsequent hooks from running.
1103+
for (i, hook) in self.lifecycle_hooks.on_start.into_iter().enumerate() {
1104+
if let Err(e) = tokio::task::spawn(hook()).await {
1105+
tracing::error!("on_start lifecycle hook #{} failed: {:?}", i, e);
1106+
}
10991107
}
11001108

11011109
// Wrap the shutdown signal to run on_shutdown hooks after signal fires
11021110
let shutdown_hooks = self.lifecycle_hooks.on_shutdown;
11031111
let wrapped_signal = async move {
11041112
signal.await;
1105-
// Run on_shutdown hooks after the shutdown signal fires
1106-
for hook in shutdown_hooks {
1107-
hook().await;
1113+
// Run on_shutdown hooks after the shutdown signal fires.
1114+
// Each hook is spawned as a separate task so that a failing hook
1115+
// does not prevent subsequent hooks from running.
1116+
for (i, hook) in shutdown_hooks.into_iter().enumerate() {
1117+
if let Err(e) = tokio::task::spawn(hook()).await {
1118+
tracing::error!("on_shutdown lifecycle hook #{} failed: {:?}", i, e);
1119+
}
11081120
}
11091121
};
11101122

crates/rustapi-core/src/events.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,15 @@ type AsyncHandler = Arc<dyn Fn(String) -> Pin<Box<dyn Future<Output = ()> + Send
8989
/// Supports both synchronous and asynchronous event handlers.
9090
/// Multiple handlers can be registered for the same event topic.
9191
///
92+
/// # Handler registration
93+
///
94+
/// Handlers are stored behind a `RwLock`. Reads (event emission) are cheap and
95+
/// fully concurrent, but writes (handler registration via [`on`][EventBus::on] /
96+
/// [`on_async`][EventBus::on_async]) acquire an exclusive lock. To avoid
97+
/// contention, **register all handlers during application startup** before
98+
/// the server begins serving requests, rather than registering them at
99+
/// runtime inside request handlers.
100+
///
92101
/// # Example
93102
///
94103
/// ```rust
@@ -175,7 +184,16 @@ impl EventBus {
175184
/// Emit an event synchronously
176185
///
177186
/// Calls all synchronous handlers for the topic in registration order.
178-
/// Also spawns tokio tasks for any async handlers.
187+
/// Also spawns a separate tokio task for each registered async handler.
188+
///
189+
/// # Backpressure
190+
///
191+
/// Async handlers are spawned without any concurrency limit. If many events
192+
/// are emitted rapidly and the registered async handlers are slow, an
193+
/// unbounded number of tokio tasks may be created, potentially exhausting
194+
/// system resources. For high-throughput use cases consider using
195+
/// [`emit_async`][EventBus::emit_async] and managing concurrency at the call
196+
/// site, or keep async handler logic lightweight (e.g. send to a channel).
179197
///
180198
/// # Example
181199
///

crates/rustapi-core/src/extract.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,10 +1259,8 @@ impl FromRequestParts for Paginate {
12591259
per_page: Option<u64>,
12601260
}
12611261

1262-
let params: PaginateQuery = serde_urlencoded::from_str(query).unwrap_or(PaginateQuery {
1263-
page: None,
1264-
per_page: None,
1265-
});
1262+
let params: PaginateQuery = serde_urlencoded::from_str(query)
1263+
.map_err(|e| ApiError::bad_request(format!("Invalid pagination parameters: {}", e)))?;
12661264

12671265
Ok(Paginate::new(
12681266
params.page.unwrap_or(DEFAULT_PAGE),
@@ -1345,10 +1343,8 @@ impl FromRequestParts for CursorPaginate {
13451343
limit: Option<u64>,
13461344
}
13471345

1348-
let params: CursorQuery = serde_urlencoded::from_str(query).unwrap_or(CursorQuery {
1349-
cursor: None,
1350-
limit: None,
1351-
});
1346+
let params: CursorQuery = serde_urlencoded::from_str(query)
1347+
.map_err(|e| ApiError::bad_request(format!("Invalid pagination parameters: {}", e)))?;
13521348

13531349
Ok(CursorPaginate::new(
13541350
params.cursor,

crates/rustapi-core/src/handler.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ pub struct Route {
351351
/// Supported types: "uuid", "integer", "string", "boolean", "number"
352352
pub(crate) param_schemas: std::collections::BTreeMap<String, String>,
353353
/// Custom error responses for OpenAPI (status_code -> description)
354-
pub(crate) error_responses: Vec<(u16, String)>,
354+
pub(crate) error_responses: std::collections::BTreeMap<u16, String>,
355355
}
356356

357357
impl Route {
@@ -370,7 +370,7 @@ impl Route {
370370
handler: into_boxed_handler(handler),
371371
operation,
372372
param_schemas: std::collections::BTreeMap::new(),
373-
error_responses: Vec::new(),
373+
error_responses: std::collections::BTreeMap::new(),
374374
}
375375
}
376376
/// Set the operation summary
@@ -453,7 +453,7 @@ impl Route {
453453
/// ```
454454
pub fn error_response(mut self, status: u16, description: impl Into<String>) -> Self {
455455
let desc = description.into();
456-
self.error_responses.push((status, desc.clone()));
456+
self.error_responses.insert(status, desc.clone());
457457

458458
// Also add directly to the operation's responses
459459
let mut content = std::collections::BTreeMap::new();
@@ -480,7 +480,7 @@ impl Route {
480480
}
481481

482482
/// Get the custom error responses
483-
pub fn error_responses(&self) -> &[(u16, String)] {
483+
pub fn error_responses(&self) -> &std::collections::BTreeMap<u16, String> {
484484
&self.error_responses
485485
}
486486
}

crates/rustapi-core/src/hateoas.rs

Lines changed: 161 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -643,9 +643,22 @@ impl<T: Serialize> Paginated<T> {
643643
}
644644

645645
impl<T: Serialize + Send> crate::response::IntoResponse for Paginated<T> {
646+
/// Convert to an HTTP response.
647+
///
648+
/// The response includes `X-Total-Count`, `X-Total-Pages`, and RFC 8288
649+
/// `Link` headers. Navigation links (first/prev/next/last) are generated as
650+
/// **relative query strings** (e.g. `?page=2&per_page=20`) because
651+
/// `IntoResponse` does not have access to the original request URI.
652+
///
653+
/// If you need absolute URLs in the Link header, wrap this type in a
654+
/// `ResponseModifier` or interceptor that has access to the request URI,
655+
/// or build the response manually using [`Paginated::link_header`] and
656+
/// [`Paginated::to_body_with_path`].
646657
fn into_response(self) -> crate::response::Response {
647-
// Use a generic base path since we don't have access to the request URI
648-
// in IntoResponse. Users can override via ResponseModifier or interceptors.
658+
// Use an empty base path since IntoResponse has no access to the
659+
// request URI. Navigation links will be relative query strings only
660+
// (e.g. `?page=2`). Callers that need absolute URLs should use
661+
// link_header(base_path) / to_body_with_path(base_path) directly.
649662
let base_path = "";
650663
let link_header = self.link_header(base_path);
651664
let body = self.to_body_with_path(base_path);
@@ -868,4 +881,150 @@ mod tests {
868881
let resource = user.with_links().self_link("/users/1");
869882
assert!(resource.links.contains_key("self"));
870883
}
884+
885+
// ─── IntoResponse tests ─────────────────────────────────────────────────
886+
887+
/// Collect a [`crate::response::Body`] into [`bytes::Bytes`] synchronously
888+
/// using a one-shot tokio runtime (avoids pulling in `#[tokio::test]`).
889+
fn collect_body(body: crate::response::Body) -> bytes::Bytes {
890+
use http_body_util::BodyExt;
891+
tokio::runtime::Builder::new_current_thread()
892+
.build()
893+
.unwrap()
894+
.block_on(async { body.collect().await.unwrap().to_bytes() })
895+
}
896+
897+
#[test]
898+
fn test_paginated_into_response_status_and_headers() {
899+
use crate::response::IntoResponse;
900+
901+
let users = vec![
902+
User { id: 1, name: "Alice".to_string() },
903+
User { id: 2, name: "Bob".to_string() },
904+
];
905+
let paginated = Paginated::new(users, 1, 10, 25);
906+
let response = paginated.into_response();
907+
908+
assert_eq!(response.status(), http::StatusCode::OK);
909+
assert_eq!(
910+
response.headers().get(http::header::CONTENT_TYPE).unwrap(),
911+
"application/json"
912+
);
913+
assert_eq!(
914+
response.headers().get("X-Total-Count").unwrap(),
915+
"25"
916+
);
917+
assert_eq!(
918+
response.headers().get("X-Total-Pages").unwrap(),
919+
"3" // ceil(25 / 10) = 3
920+
);
921+
// Link header should be present (non-first page has prev/next/first/last)
922+
assert!(response.headers().contains_key(http::header::LINK));
923+
}
924+
925+
#[test]
926+
fn test_paginated_into_response_json_body() {
927+
use crate::response::IntoResponse;
928+
929+
let users = vec![User { id: 42, name: "Carol".to_string() }];
930+
let paginated = Paginated::new(users, 2, 5, 10);
931+
let response = paginated.into_response();
932+
933+
let (parts, body) = response.into_parts();
934+
assert_eq!(parts.status, http::StatusCode::OK);
935+
936+
let bytes = collect_body(body);
937+
let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
938+
939+
assert!(json.get("items").is_some());
940+
assert!(json.get("meta").is_some());
941+
// Links use the HAL `_links` convention
942+
assert!(json.get("_links").is_some());
943+
944+
let items = json["items"].as_array().unwrap();
945+
assert_eq!(items.len(), 1);
946+
assert_eq!(items[0]["id"], 42);
947+
948+
let meta = &json["meta"];
949+
assert_eq!(meta["page"], 2);
950+
assert_eq!(meta["per_page"], 5);
951+
assert_eq!(meta["total"], 10);
952+
assert_eq!(meta["total_pages"], 2);
953+
}
954+
955+
#[test]
956+
fn test_paginated_into_response_empty_items() {
957+
use crate::response::IntoResponse;
958+
959+
let paginated: Paginated<User> = Paginated::new(vec![], 1, 10, 0);
960+
let response = paginated.into_response();
961+
962+
assert_eq!(response.status(), http::StatusCode::OK);
963+
assert_eq!(response.headers().get("X-Total-Count").unwrap(), "0");
964+
assert_eq!(response.headers().get("X-Total-Pages").unwrap(), "0");
965+
// At minimum the `first` link is always included in the Link header
966+
let link = response.headers().get(http::header::LINK);
967+
let link_str = link.map(|v| v.to_str().unwrap_or("")).unwrap_or("");
968+
// Empty result set has no next or prev links
969+
assert!(!link_str.contains("rel=\"next\""));
970+
assert!(!link_str.contains("rel=\"prev\""));
971+
}
972+
973+
#[test]
974+
fn test_cursor_paginated_into_response_status_and_headers() {
975+
use crate::response::IntoResponse;
976+
977+
let users = vec![User { id: 1, name: "Dave".to_string() }];
978+
let paginated = CursorPaginated::new(users, Some("cursor_abc".to_string()), true);
979+
let response = paginated.into_response();
980+
981+
assert_eq!(response.status(), http::StatusCode::OK);
982+
assert_eq!(
983+
response.headers().get(http::header::CONTENT_TYPE).unwrap(),
984+
"application/json"
985+
);
986+
}
987+
988+
#[test]
989+
fn test_cursor_paginated_into_response_json_body() {
990+
use crate::response::IntoResponse;
991+
992+
let users = vec![User { id: 7, name: "Eve".to_string() }];
993+
let paginated =
994+
CursorPaginated::new(users, Some("next_cursor_xyz".to_string()), true);
995+
let response = paginated.into_response();
996+
997+
let (_parts, body) = response.into_parts();
998+
let bytes = collect_body(body);
999+
let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1000+
1001+
assert!(json.get("items").is_some());
1002+
assert!(json.get("meta").is_some());
1003+
1004+
let items = json["items"].as_array().unwrap();
1005+
assert_eq!(items.len(), 1);
1006+
assert_eq!(items[0]["id"], 7);
1007+
1008+
let meta = &json["meta"];
1009+
assert_eq!(meta["next_cursor"], "next_cursor_xyz");
1010+
assert_eq!(meta["has_more"], true);
1011+
}
1012+
1013+
#[test]
1014+
fn test_cursor_paginated_into_response_last_page() {
1015+
use crate::response::IntoResponse;
1016+
1017+
let users = vec![User { id: 9, name: "Frank".to_string() }];
1018+
let paginated = CursorPaginated::new(users, None, false);
1019+
let response = paginated.into_response();
1020+
1021+
let (_parts, body) = response.into_parts();
1022+
let bytes = collect_body(body);
1023+
let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1024+
1025+
let meta = &json["meta"];
1026+
// next_cursor should be omitted when None
1027+
assert!(meta.get("next_cursor").is_none());
1028+
assert_eq!(meta["has_more"], false);
1029+
}
8711030
}

0 commit comments

Comments
 (0)