From 27bbaf200ed110b43c19e6205186c7525ee98e70 Mon Sep 17 00:00:00 2001 From: Dzmitry Kalabuk Date: Wed, 24 Jun 2026 10:39:47 +0300 Subject: [PATCH 1/2] Cap storage in API retention mode Add an optional `max_blocks` to the Api config that trims the tail to keep at most `max_blocks` from the head. --- crates/hotblocks/src/cli.rs | 2 +- crates/hotblocks/src/data_service.rs | 11 ++++--- crates/hotblocks/src/dataset_config.rs | 9 ++++-- .../dataset_controller/dataset_controller.rs | 32 +++++++++++++++++-- 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/crates/hotblocks/src/cli.rs b/crates/hotblocks/src/cli.rs index 1e14ff72..3b38e46f 100644 --- a/crates/hotblocks/src/cli.rs +++ b/crates/hotblocks/src/cli.rs @@ -98,7 +98,7 @@ impl CLI { let api_controlled_datasets = datasets .iter() - .filter_map(|(id, cfg)| (cfg.retention_strategy == RetentionConfig::Api).then_some(*id)) + .filter_map(|(id, cfg)| matches!(cfg.retention_strategy, RetentionConfig::Api { .. }).then_some(*id)) .collect(); let data_service = DataService::start(db.clone(), datasets).await.map(Arc::new)?; diff --git a/crates/hotblocks/src/data_service.rs b/crates/hotblocks/src/data_service.rs index a95256c1..af7e38e5 100644 --- a/crates/hotblocks/src/data_service.rs +++ b/crates/hotblocks/src/data_service.rs @@ -46,16 +46,17 @@ impl DataService { .map(|url| ReqwestDataClient::new(http_client.clone(), url)) .collect(); - let retention = match cfg.retention_strategy { + let (retention, max_blocks) = match &cfg.retention_strategy { RetentionConfig::FromBlock { number, parent_hash } => { - RetentionStrategy::FromBlock { number, parent_hash } + (RetentionStrategy::FromBlock { number: *number, parent_hash: parent_hash.clone() }, None) } - RetentionConfig::Head(n) => RetentionStrategy::Head(n), - RetentionConfig::Api | RetentionConfig::None => RetentionStrategy::None + RetentionConfig::Head(n) => (RetentionStrategy::Head(*n), None), + RetentionConfig::Api { max_blocks } => (RetentionStrategy::None, *max_blocks), + RetentionConfig::None => (RetentionStrategy::None, None) }; tokio::task::spawn_blocking(move || { - DatasetController::new(db, dataset_id, cfg.kind, retention, data_sources).map(|c| { + DatasetController::new(db, dataset_id, cfg.kind, retention, max_blocks, data_sources).map(|c| { c.enable_compaction(!cfg.disable_compaction); Arc::new(c) }) diff --git a/crates/hotblocks/src/dataset_config.rs b/crates/hotblocks/src/dataset_config.rs index 5ae1723e..34de363b 100644 --- a/crates/hotblocks/src/dataset_config.rs +++ b/crates/hotblocks/src/dataset_config.rs @@ -16,8 +16,13 @@ pub enum RetentionConfig { }, // Moving window that keeps up to N blocks Head(u64), - // Retention is set dynamically from the portal - Api, + // Retention is set dynamically from the portal. `max_blocks`, if set, caps + // storage at N blocks behind the tip as a safety net for when the portal + // stops advancing the floor. + Api { + #[serde(default)] + max_blocks: Option + }, None } diff --git a/crates/hotblocks/src/dataset_controller/dataset_controller.rs b/crates/hotblocks/src/dataset_controller/dataset_controller.rs index 8568b79a..df290b17 100644 --- a/crates/hotblocks/src/dataset_controller/dataset_controller.rs +++ b/crates/hotblocks/src/dataset_controller/dataset_controller.rs @@ -42,6 +42,7 @@ impl DatasetController { dataset_id: DatasetId, dataset_kind: DatasetKind, retention: RetentionStrategy, + max_blocks: Option, data_sources: Vec ) -> anyhow::Result { let mut write = WriteController::new(db.clone(), dataset_id, dataset_kind)?; @@ -63,6 +64,7 @@ impl DatasetController { dataset_id, dataset_kind, data_sources, + max_blocks, retention_recv, head_sender, finalized_head_sender @@ -283,6 +285,10 @@ struct Ctl { dataset_id: DatasetId, dataset_kind: DatasetKind, data_sources: Vec, + // Static safety cap on retained blocks for Api-controlled datasets: even + // if the portal stops advancing the floor, the tail is trimmed to keep at + // most this many blocks behind the tip. `None` means grow indefinitely. + max_blocks: Option, retention_recv: tokio::sync::watch::Receiver, head_sender: tokio::sync::watch::Sender>, finalized_head_sender: tokio::sync::watch::Sender> @@ -343,17 +349,18 @@ impl Ctl { let retention = self.retention_recv.borrow_and_update().clone(); let mut state = match retention { RetentionStrategy::FromBlock { number, parent_hash } => { + let (number, parent_hash) = self.clamp_floor(&write, number, parent_hash); if !write.starts_at(number, &parent_hash) { blocking! { write.retain(number, parent_hash) }?; } - State::Init { head: None } + State::Init { head: self.max_blocks } } RetentionStrategy::Head(n) => State::Init { head: Some(n) }, RetentionStrategy::None => { if write.write.head().is_some() { - State::Init { head: None } + State::Init { head: self.max_blocks } } else { State::Idle } @@ -456,17 +463,36 @@ impl Ctl { } } + // With a storage cap, the tail trimmer may have already advanced the front + // past a stale/lagging portal floor. Retaining below the current front is + // treated as a gap and would drop the whole dataset, so never move the floor + // backwards. The posted parent_hash belongs to the original (lower) block, + // so it must be dropped when clamping or the hash check would wipe data. + fn clamp_floor( + &self, + write: &WriteCtx, + number: BlockNumber, + parent_hash: Option + ) -> (BlockNumber, Option) { + if self.max_blocks.is_some() && number < write.write.start_block() { + (write.write.start_block(), None) + } else { + (number, parent_hash) + } + } + async fn handle_retention_change(&mut self, state: &mut State, mut write: WriteCtx) -> anyhow::Result { // need this variable to please the compiler let retention = self.retention_recv.borrow_and_update().clone(); match retention { RetentionStrategy::FromBlock { number, parent_hash } => { + let (number, parent_hash) = self.clamp_floor(&write, number, parent_hash); let will_erase_head = write.write.head().map_or(false, |h| h.number < number) || // FromBlock is greater than current head, so everything is cleared write.write.start_block() > number; // FromBlock is less than current front, dropping everything by design blocking_write!(write, write.retain(number, parent_hash))?; match state { State::Ingest { .. } if !will_erase_head => {} // Keep ingesting, head is valid - _ => *state = State::Init { head: None } // New ingest needed + _ => *state = State::Init { head: self.max_blocks } // New ingest needed } } RetentionStrategy::Head(n) => match state { From d2fa0f88dc2a796fb06a17fc38a0013174733d66 Mon Sep 17 00:00:00 2001 From: Dzmitry Kalabuk Date: Wed, 24 Jun 2026 11:01:53 +0300 Subject: [PATCH 2/2] Make the retention config backward-compatible --- crates/hotblocks/src/dataset_config.rs | 138 ++++++++++++++++++++++++- 1 file changed, 134 insertions(+), 4 deletions(-) diff --git a/crates/hotblocks/src/dataset_config.rs b/crates/hotblocks/src/dataset_config.rs index 34de363b..ad0e6ccc 100644 --- a/crates/hotblocks/src/dataset_config.rs +++ b/crates/hotblocks/src/dataset_config.rs @@ -1,13 +1,15 @@ use std::collections::BTreeMap; +use std::fmt; -use serde::{Deserialize, Serialize}; +use serde::de::{self, IgnoredAny, MapAccess, Visitor}; +use serde::{Deserialize, Deserializer, Serialize}; use sqd_query::BlockNumber; use sqd_storage::db::DatasetId; use url::Url; use crate::types::DatasetKind; -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize)] pub enum RetentionConfig { // Fixed, starting from the block number FromBlock { @@ -18,14 +20,92 @@ pub enum RetentionConfig { Head(u64), // Retention is set dynamically from the portal. `max_blocks`, if set, caps // storage at N blocks behind the tip as a safety net for when the portal - // stops advancing the floor. + // stops advancing the floor. Accepts both the bare `Api` form and the + // `Api: { max_blocks: N }` form. Api { - #[serde(default)] max_blocks: Option }, None } +const RETENTION_VARIANTS: &[&str] = &["FromBlock", "Head", "Api", "None"]; + +// `RetentionConfig` is read through `serde_yaml`'s `singleton_map_recursive`, +// which represents non-unit variants as a single-key map and unit variants as a +// bare string. A struct variant alone could therefore not accept the bare `Api` +// string, so deserialization is implemented by hand to allow both `Api` and +// `Api: { max_blocks: N }`. +// +// This can be reverted to a normal derived `Deserialize` once the bare `Api` +// form is no longer used in any config. +impl<'de> Deserialize<'de> for RetentionConfig { + fn deserialize>(deserializer: D) -> Result { + #[derive(Deserialize)] + #[serde(deny_unknown_fields)] + struct FromBlockCfg { + number: BlockNumber, + #[serde(default)] + parent_hash: Option + } + + #[derive(Deserialize)] + #[serde(deny_unknown_fields)] + struct ApiCfg { + #[serde(default)] + max_blocks: Option + } + + struct RetentionVisitor; + + impl<'de> Visitor<'de> for RetentionVisitor { + type Value = RetentionConfig; + + fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("a retention strategy") + } + + fn visit_str(self, value: &str) -> Result { + match value { + "Api" => Ok(RetentionConfig::Api { max_blocks: None }), + "None" => Ok(RetentionConfig::None), + other => Err(E::unknown_variant(other, RETENTION_VARIANTS)) + } + } + + fn visit_map>(self, mut map: A) -> Result { + let tag: String = map + .next_key()? + .ok_or_else(|| de::Error::custom("expected a retention strategy"))?; + + let strategy = match tag.as_str() { + "FromBlock" => { + let cfg: FromBlockCfg = map.next_value()?; + RetentionConfig::FromBlock { number: cfg.number, parent_hash: cfg.parent_hash } + } + "Head" => RetentionConfig::Head(map.next_value()?), + "Api" => { + let cfg: ApiCfg = map.next_value()?; + RetentionConfig::Api { max_blocks: cfg.max_blocks } + } + "None" => { + map.next_value::()?; + RetentionConfig::None + } + other => return Err(de::Error::unknown_variant(other, RETENTION_VARIANTS)) + }; + + if map.next_key::()?.is_some() { + return Err(de::Error::custom("retention strategy must have a single key")); + } + + Ok(strategy) + } + } + + deserializer.deserialize_any(RetentionVisitor) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct DatasetConfig { @@ -44,3 +124,53 @@ impl DatasetConfig { Ok(config) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn parse(yaml: &str) -> RetentionConfig { + // Mirror how the field is read inside `read_config_file`. + let deser = serde_yaml::Deserializer::from_str(yaml); + serde_yaml::with::singleton_map_recursive::deserialize(deser).unwrap() + } + + #[test] + fn bare_api_is_uncapped() { + assert_eq!(parse("Api"), RetentionConfig::Api { max_blocks: None }); + } + + #[test] + fn api_with_max_blocks() { + assert_eq!(parse("Api:\n max_blocks: 100000"), RetentionConfig::Api { max_blocks: Some(100000) }); + } + + #[test] + fn api_with_empty_map_is_uncapped() { + assert_eq!(parse("Api: {}"), RetentionConfig::Api { max_blocks: None }); + } + + #[test] + fn other_strategies_still_parse() { + assert_eq!(parse("None"), RetentionConfig::None); + assert_eq!(parse("Head: 2000"), RetentionConfig::Head(2000)); + assert_eq!( + parse("FromBlock:\n number: 10\n parent_hash: '0xabc'"), + RetentionConfig::FromBlock { number: 10, parent_hash: Some("0xabc".to_owned()) } + ); + } + + #[test] + fn unknown_strategy_is_rejected() { + let deser = serde_yaml::Deserializer::from_str("Bogus"); + let res: Result = serde_yaml::with::singleton_map_recursive::deserialize(deser); + assert!(res.is_err()); + } + + #[test] + fn unknown_field_is_rejected() { + let deser = serde_yaml::Deserializer::from_str("Api:\n max_blcks: 5"); + let res: Result = serde_yaml::with::singleton_map_recursive::deserialize(deser); + assert!(res.is_err()); + } +}