Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions services/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async-trait = "0.1"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sqlx = { version = "0.8", default-features = false, features = ["runtime-tokio-rustls", "postgres", "chrono", "uuid"] }
sqlx = { version = "0.8", default-features = false, features = ["runtime-tokio-rustls", "postgres", "chrono", "uuid", "derive"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal", "time", "sync"] }
tokio-util = { version = "0.7", features = ["rt"] }
tower = { version = "0.5", features = ["util"] }
Expand All @@ -47,11 +47,12 @@ hex = "0.4"
base64 = "0.22"
subtle = "2.5"
ipnet = "2"
fastrand = "2.4.1"

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports"] }
testcontainers = { version = "0.23", features = ["tokio"] }
testcontainers-modules = { version = "0.11", features = ["redis", "tokio"] }
testcontainers = { version = "0.23" }
testcontainers-modules = { version = "0.11", features = ["redis", "postgres"] }

[[bench]]
name = "api_key_auth"
Expand Down
3 changes: 1 addition & 2 deletions services/api/src/audit.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::net::IpAddr;

pub mod client_ip;
pub use client_ip::{extract_client_ip, trusted_cidrs_from_env};
pub use crate::client_ip::{extract_client_ip, trusted_cidrs_from_env};

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
Expand Down
3 changes: 1 addition & 2 deletions services/api/src/audit_middleware.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod body_redact;
pub use body_redact::{body_logging_enabled, redact_sensitive, truncate_body};
pub use crate::body_redact::{body_logging_enabled, redact_sensitive, truncate_body};

use std::sync::Arc;

Expand Down
2 changes: 1 addition & 1 deletion services/api/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ impl BlockchainClient {
total_events = all_events.len(),
"fetch_events_since paginated"
);
self.metrics.observe_invalidation("events_pagination_pages", pages);
self.metrics.observe_invalidation("events_pagination_pages", pages as usize);
}

Ok(all_events)
Expand Down
70 changes: 38 additions & 32 deletions services/api/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ use std::{
time::{Duration, Instant},
};

use redis::redis_module::RedisResult;


use anyhow::Context;
use deadpool_redis::{Config as PoolConfig, Pool, Runtime};
use deadpool_redis::{Config as PoolConfig, Pool};
use redis::AsyncCommands;
use serde::{de::DeserializeOwned, Serialize};

Expand Down Expand Up @@ -245,8 +242,9 @@ impl RedisCache {
}

// Deterministically hash the tag so the metadata key is stable.
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
std::hash::Hash::hash(&tag, &mut hasher);
tag.cache_keys().join("|").hash(&mut hasher);
let tag_hash = format!("{:x}", hasher.finish());

let zset_key = self.tag_cfg.tag_key(&tag_hash);
Expand Down Expand Up @@ -293,18 +291,19 @@ impl RedisCache {
"#,
);

let mut over_evicted: i64 = 0;
let script = std::sync::Arc::new(script);
self.exec(|mut conn| {
let zset_key = zset_key.clone();
let seq_key = seq_key.clone();
let keys = tag_keys.clone();
let script = script.clone();
async move {
let mut argv: Vec<String> = Vec::with_capacity(2 + keys.len());
argv.push(tag_ttl_secs.to_string());
argv.push(cap.to_string());
argv.extend(keys);

over_evicted = script
let _: i64 = script
.key(&zset_key)
.key(&seq_key)
.arg(tag_ttl_secs)
Expand All @@ -316,7 +315,6 @@ impl RedisCache {
})
.await?;

// Note: we don't need the evicted count for correctness.
Ok(())
}

Expand Down Expand Up @@ -399,11 +397,14 @@ impl RedisCache {
T: DeserializeOwned,
{
let key = key.to_owned();
self.exec(|mut conn| async move {
let val: Option<String> = conn.get(&key).await?;
match val {
Some(raw) => Ok(Some(serde_json::from_str(&raw)?)),
None => Ok(None),
self.exec(|mut conn| {
let key = key.clone();
async move {
let val: Option<String> = conn.get(&key).await?;
match val {
Some(raw) => Ok(Some(serde_json::from_str(&raw)?)),
None => Ok(None),
}
}
})
.await
Expand All @@ -429,9 +430,12 @@ impl RedisCache {

pub async fn del(&self, key: &str) -> anyhow::Result<()> {
let key = key.to_owned();
self.exec(|mut conn| async move {
let _: usize = conn.del(&key).await?;
Ok(())
self.exec(|mut conn| {
let key = key.clone();
async move {
let _: usize = conn.del(&key).await?;
Ok(())
}
})
.await
}
Expand Down Expand Up @@ -460,23 +464,25 @@ impl RedisCache {
let pattern = pattern.to_owned();

loop {
let pattern_clone = pattern.clone();
let (next_cursor, batch_deleted) = self
.exec(|mut conn| async move {
let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&pattern_clone)
.arg("COUNT")
.arg(100u64)
.query_async(&mut conn)
.await?;
let deleted = if keys.is_empty() {
0
} else {
conn.del(keys).await?
};
Ok((next_cursor, deleted))
.exec(|mut conn| {
let pattern_clone = pattern.clone();
async move {
let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&pattern_clone)
.arg("COUNT")
.arg(100u64)
.query_async(&mut conn)
.await?;
let deleted = if keys.is_empty() {
0
} else {
conn.del(keys).await?
};
Ok((next_cursor, deleted))
}
})
.await?;

Expand Down
51 changes: 14 additions & 37 deletions services/api/src/compression.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,25 @@
use tower_http::compression::predicate::{NotForContentType, Predicate};
use axum::http::{header, Extensions, HeaderMap, StatusCode, Version};
use tower_http::compression::CompressionLayer;

fn should_compress_text_based(content_type: Option<&str>) -> bool {
let Some(ct) = content_type else {
// If we can't determine content type, avoid wasting CPU.
return false;
};
type CompressFn = fn(StatusCode, Version, &HeaderMap, &Extensions) -> bool;

// Remove common parameters like `charset=utf-8`.
fn should_compress(
_: StatusCode,
_: Version,
headers: &HeaderMap,
_: &Extensions,
) -> bool {
let ct = headers
.get(header::CONTENT_TYPE)
.and_then(|h| h.to_str().ok())
.unwrap_or("");
let ct = ct.split(';').next().unwrap_or(ct).trim();

// Only compress text-ish payloads.
// Note: application/json is explicitly included.
ct == "application/json" || ct.starts_with("text/")
}

pub fn compression_layer() -> CompressionLayer {
// Exclude already-compressed/binary formats to avoid CPU waste.
// (This primarily protects against cases where `content_type` might be
// missing/incorrect while still keeping the middleware safe.)
let not_for_binary = NotForContentType::new(vec![
"application/zip",
"application/gzip",
"application/x-gzip",
"application/x-zip-compressed",
"application/pdf",
"image/jpeg",
"image/png",
"image/webp",
"image/gif",
"image/svg+xml",
"audio/mpeg",
"audio/mp4",
"video/mp4",
"application/octet-stream",
"application/x-bzip2",
"application/x-7z-compressed",
]);

pub fn compression_layer() -> CompressionLayer<CompressFn> {
CompressionLayer::new()
.gzip(true)
.br(true)
// Only apply compression to text-based responses.
.compress_when(Predicate::from_fn(should_compress_text_based))
.filter(not_for_binary)
.compress_when(should_compress as CompressFn)
}

4 changes: 2 additions & 2 deletions services/api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl CorsConfig {
.filter(|s| !s.is_empty())
.collect()
})
.unwrap_or_else(|| {
.unwrap_or_else(|_| {
["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"]
.iter()
.map(|s| s.to_string())
Expand All @@ -80,7 +80,7 @@ impl CorsConfig {
.filter(|s| !s.is_empty())
.collect()
})
.unwrap_or_else(|| {
.unwrap_or_else(|_| {
["content-type", "authorization"]
.iter()
.map(|s| s.to_string())
Expand Down
3 changes: 1 addition & 2 deletions services/api/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl Database {
/// Snapshot pool size/idle into Prometheus gauges.
/// Call this just before rendering `/metrics` so the values are current.
pub fn record_pool_metrics(&self) {
self.metrics.record_pool_metrics(self.pool.size(), self.pool.num_idle());
self.metrics.observe_pool_connections("primary", self.pool.size() as i64, self.pool.num_idle() as i64);
}

pub async fn new(
Expand Down Expand Up @@ -760,7 +760,6 @@ impl Database {
Ok(count > 0)
}
}
}

#[cfg(test)]
mod tests {
Expand Down
3 changes: 1 addition & 2 deletions services/api/src/email/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::{Context, Result};
use redis::AsyncCommands as _;
use serde_json::Value;
use sha2::{Digest, Sha256};
use std::time::Duration;
Expand Down Expand Up @@ -139,7 +138,7 @@ impl EmailService {
// --- idempotency check ---
if let (Some(cache), Some(key)) = (&self.cache, idem_key) {
let redis_key = format!("email:idem:{key}");
let mut conn = cache.manager.clone();
let mut conn = cache.get_connection().await.context("idempotency Redis connection failed")?;

// Try SET NX — only succeeds for the first send.
let acquired: Option<String> = redis::cmd("SET")
Expand Down
29 changes: 14 additions & 15 deletions services/api/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::content_type::require_json_content_type;
use std::{
sync::Arc,
time::{Duration, Instant},
Expand Down Expand Up @@ -565,7 +564,7 @@ pub async fn statistics(State(state): State<Arc<AppState>>) -> Result<impl IntoR
} else {
state.metrics.observe_miss("api", endpoint);
}
state.metrics.observe_request(endpoint, start.elapsed());
state.metrics.observe_request(endpoint, "200", start.elapsed());

Ok((StatusCode::OK, Json(payload)))
}
Expand Down Expand Up @@ -633,7 +632,7 @@ pub async fn featured_markets(
} else {
state.metrics.observe_miss("api", endpoint);
}
state.metrics.observe_request(endpoint, start.elapsed());
state.metrics.observe_request(endpoint, "200", start.elapsed());

Ok((StatusCode::OK, Json(paginated)))
}
Expand All @@ -647,13 +646,13 @@ pub async fn content(
let cursor = query.cursor();
let endpoint = "content";

let cache_key = keys::api_content(limit);
let cache_key = keys::api_content(limit.into());
let ttl = Duration::from_secs(60 * 60);

let (payload, hit) = state
.cache
.get_or_set_json(&cache_key, ttl, || async {
let data = state.db.content_cached(limit).await?;
let data = state.db.content_cached(limit.into()).await?;
Ok(data)
})
.await
Expand Down Expand Up @@ -683,7 +682,7 @@ pub async fn content(
} else {
state.metrics.observe_miss("api", endpoint);
}
state.metrics.observe_request(endpoint, start.elapsed());
state.metrics.observe_request(endpoint, "200", start.elapsed());

Ok((StatusCode::OK, Json(paginated)))
}
Expand Down Expand Up @@ -811,11 +810,11 @@ pub async fn blockchain_user_bets(

let page_data = state
.blockchain
.user_bets_page(&user, page, page_size)
.user_bets_page(&user, page, page_size.into())
.await
.map_err(into_api_error)?;

let has_more = (page + 1) * page_size < page_data.total;
let has_more = (page + 1) * (page_size as i64) < page_data.total;
let next_cursor = if has_more {
Some((page + 1).to_string())
} else {
Expand Down Expand Up @@ -883,13 +882,13 @@ pub async fn warm_critical_caches(state: Arc<AppState>) -> anyhow::Result<()> {

let (mut succeeded, mut failed) = (0usize, 0usize);

warm!("db.statistics", state.db.statistics_cached().map(|r| r.map(|_| ())), succeeded, failed);
warm!("db.featured_markets", state.db.featured_markets_cached(state.config.featured_limit).map(|r| r.map(|_| ())), succeeded, failed);
warm!("blockchain.health", state.blockchain.health_check_cached().map(|r| r.map(|_| ())), succeeded, failed);
warm!("blockchain.platform_stats", state.blockchain.platform_statistics_cached().map(|r| r.map(|_| ())), succeeded, failed);
warm!("api.statistics", statistics(State(state.clone())).map(|r| r.map(|_| ()).map_err(|e| anyhow::anyhow!("{e:?}"))), succeeded, failed);
warm!("api.featured_markets", featured_markets(State(state.clone()), Query(PaginationQuery::default())).map(|r| r.map(|_| ()).map_err(|e| anyhow::anyhow!("{e:?}"))), succeeded, failed);
warm!("api.content", content(State(state.clone()), Query(PaginationQuery::default())).map(|r| r.map(|_| ()).map_err(|e| anyhow::anyhow!("{e:?}"))), succeeded, failed);
warm!("db.statistics", state.db.statistics_cached(), succeeded, failed);
warm!("db.featured_markets", state.db.featured_markets_cached(state.config.featured_limit), succeeded, failed);
warm!("blockchain.health", state.blockchain.health_check_cached(), succeeded, failed);
warm!("blockchain.platform_stats", state.blockchain.platform_statistics_cached(), succeeded, failed);
warm!("api.statistics", async { statistics(State(state.clone())).await.map(|_| ()).map_err(|e| anyhow::anyhow!("{e:?}")) }, succeeded, failed);
warm!("api.featured_markets", async { featured_markets(State(state.clone()), Query(PaginationQuery::default())).await.map(|_| ()).map_err(|e| anyhow::anyhow!("{e:?}")) }, succeeded, failed);
warm!("api.content", async { content(State(state.clone()), Query(PaginationQuery::default())).await.map(|_| ()).map_err(|e| anyhow::anyhow!("{e:?}")) }, succeeded, failed);

tracing::info!(succeeded, failed, total = succeeded + failed, "cache warming complete");
Ok(())
Expand Down
3 changes: 3 additions & 0 deletions services/api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
pub mod audit;
pub mod audit_middleware;
pub mod body_redact;
pub mod client_ip;
pub mod content_type;
#[cfg(test)]
mod resolve_market_tests;
pub mod blockchain;
Expand Down
2 changes: 1 addition & 1 deletion services/api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async fn main() -> anyhow::Result<()> {
)?;

// Validate required configuration before proceeding
config.validate()?;
config.validate().map_err(|e| anyhow::anyhow!("{e}"))?;

let metrics = Metrics::new()?;
let cache = RedisCache::new(&config.redis_url).await?;
Expand Down
Loading
Loading