Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion bottlecap/src/traces/stats_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,23 @@ impl StatsProcessor for ServerlessStatsProcessor {
}
};

// An empty stats payload (e.g. an empty msgpack map) has nothing to buffer, so
// treat it as a harmless no-op instead of indexing into an empty vec (which panics).
let Some(first_bucket) = stats.stats.first_mut() else {
debug!("Received empty trace stats payload; nothing to aggregate.");
return Ok((
StatusCode::ACCEPTED,
"Empty stats payload; nothing to aggregate.",
)
.into_response());
};

let start = SystemTime::now();
let timestamp = start
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
stats.stats[0].start = if let Ok(result) = u64::try_from(timestamp) {
first_bucket.start = if let Ok(result) = u64::try_from(timestamp) {
result
} else {
let error_msg = "Error converting timestamp to u64";
Expand All @@ -109,3 +120,32 @@ impl StatsProcessor for ServerlessStatsProcessor {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use axum::body::Body;
use tokio::sync::mpsc;

#[tokio::test]
async fn empty_stats_payload_does_not_panic() {
// An empty msgpack map (0x80) deserializes to a ClientStatsPayload with an empty
// `stats` vec. Indexing `stats.stats[0]` used to panic here; assert we return 202
// Accepted instead and buffer nothing.
let req = Request::builder()
.body(Body::from(vec![0x80u8]))
.expect("failed to build request");

let (tx, mut rx) = mpsc::channel(1);
let response = ServerlessStatsProcessor {}
.process_stats(req, tx)
.await
.expect("process_stats returned an error");

assert_eq!(response.status(), StatusCode::ACCEPTED);
assert!(
rx.try_recv().is_err(),
"no stats payload should be buffered for an empty request"
);
}
}
Loading