Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/hotblocks/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl CLI {

let api_controlled_datasets = datasets
.iter()
.filter_map(|(id, cfg)| (cfg.retention_strategy == RetentionConfig::Api).then_some(*id))
.filter_map(|(id, cfg)| matches!(cfg.retention_strategy, RetentionConfig::Api { .. }).then_some(*id))
.collect();

let data_service = DataService::start(db.clone(), datasets).await.map(Arc::new)?;
Expand Down
11 changes: 6 additions & 5 deletions crates/hotblocks/src/data_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,17 @@ impl DataService {
.map(|url| ReqwestDataClient::new(http_client.clone(), url))
.collect();

let retention = match cfg.retention_strategy {
let (retention, max_blocks) = match &cfg.retention_strategy {
RetentionConfig::FromBlock { number, parent_hash } => {
RetentionStrategy::FromBlock { number, parent_hash }
(RetentionStrategy::FromBlock { number: *number, parent_hash: parent_hash.clone() }, None)
}
RetentionConfig::Head(n) => RetentionStrategy::Head(n),
RetentionConfig::Api | RetentionConfig::None => RetentionStrategy::None
RetentionConfig::Head(n) => (RetentionStrategy::Head(*n), None),
RetentionConfig::Api { max_blocks } => (RetentionStrategy::None, *max_blocks),
RetentionConfig::None => (RetentionStrategy::None, None)
};

tokio::task::spawn_blocking(move || {
DatasetController::new(db, dataset_id, cfg.kind, retention, data_sources).map(|c| {
DatasetController::new(db, dataset_id, cfg.kind, retention, max_blocks, data_sources).map(|c| {
c.enable_compaction(!cfg.disable_compaction);
Arc::new(c)
})
Expand Down
143 changes: 139 additions & 4 deletions crates/hotblocks/src/dataset_config.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::collections::BTreeMap;
use std::fmt;

use serde::{Deserialize, Serialize};
use serde::de::{self, IgnoredAny, MapAccess, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
use sqd_query::BlockNumber;
use sqd_storage::db::DatasetId;
use url::Url;

use crate::types::DatasetKind;

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
pub enum RetentionConfig {
// Fixed, starting from the block number
FromBlock {
Expand All @@ -16,11 +18,94 @@ pub enum RetentionConfig {
},
// Moving window that keeps up to N blocks
Head(u64),
// Retention is set dynamically from the portal
Api,
// Retention is set dynamically from the portal. `max_blocks`, if set, caps
// storage at N blocks behind the tip as a safety net for when the portal
// stops advancing the floor. Accepts both the bare `Api` form and the
// `Api: { max_blocks: N }` form.
Api {
max_blocks: Option<u64>
},
None
}

const RETENTION_VARIANTS: &[&str] = &["FromBlock", "Head", "Api", "None"];

// `RetentionConfig` is read through `serde_yaml`'s `singleton_map_recursive`,
// which represents non-unit variants as a single-key map and unit variants as a
// bare string. A struct variant alone could therefore not accept the bare `Api`
// string, so deserialization is implemented by hand to allow both `Api` and
// `Api: { max_blocks: N }`.
//
// This can be reverted to a normal derived `Deserialize` once the bare `Api`
// form is no longer used in any config.
impl<'de> Deserialize<'de> for RetentionConfig {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct FromBlockCfg {
number: BlockNumber,
#[serde(default)]
parent_hash: Option<String>
}

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct ApiCfg {
#[serde(default)]
max_blocks: Option<u64>
}

struct RetentionVisitor;

impl<'de> Visitor<'de> for RetentionVisitor {
type Value = RetentionConfig;

fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("a retention strategy")
}

fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
match value {
"Api" => Ok(RetentionConfig::Api { max_blocks: None }),
"None" => Ok(RetentionConfig::None),
other => Err(E::unknown_variant(other, RETENTION_VARIANTS))
}
}

fn visit_map<A: MapAccess<'de>>(self, mut map: A) -> Result<Self::Value, A::Error> {
let tag: String = map
.next_key()?
.ok_or_else(|| de::Error::custom("expected a retention strategy"))?;

let strategy = match tag.as_str() {
"FromBlock" => {
let cfg: FromBlockCfg = map.next_value()?;
RetentionConfig::FromBlock { number: cfg.number, parent_hash: cfg.parent_hash }
}
"Head" => RetentionConfig::Head(map.next_value()?),
"Api" => {
let cfg: ApiCfg = map.next_value()?;
RetentionConfig::Api { max_blocks: cfg.max_blocks }
}
"None" => {
map.next_value::<IgnoredAny>()?;
RetentionConfig::None
}
other => return Err(de::Error::unknown_variant(other, RETENTION_VARIANTS))
};

if map.next_key::<IgnoredAny>()?.is_some() {
return Err(de::Error::custom("retention strategy must have a single key"));
}

Ok(strategy)
}
}

deserializer.deserialize_any(RetentionVisitor)
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct DatasetConfig {
Expand All @@ -39,3 +124,53 @@ impl DatasetConfig {
Ok(config)
}
}

#[cfg(test)]
mod tests {
use super::*;

fn parse(yaml: &str) -> RetentionConfig {
// Mirror how the field is read inside `read_config_file`.
let deser = serde_yaml::Deserializer::from_str(yaml);
serde_yaml::with::singleton_map_recursive::deserialize(deser).unwrap()
}

#[test]
fn bare_api_is_uncapped() {
assert_eq!(parse("Api"), RetentionConfig::Api { max_blocks: None });
}

#[test]
fn api_with_max_blocks() {
assert_eq!(parse("Api:\n max_blocks: 100000"), RetentionConfig::Api { max_blocks: Some(100000) });
}

#[test]
fn api_with_empty_map_is_uncapped() {
assert_eq!(parse("Api: {}"), RetentionConfig::Api { max_blocks: None });
}

#[test]
fn other_strategies_still_parse() {
assert_eq!(parse("None"), RetentionConfig::None);
assert_eq!(parse("Head: 2000"), RetentionConfig::Head(2000));
assert_eq!(
parse("FromBlock:\n number: 10\n parent_hash: '0xabc'"),
RetentionConfig::FromBlock { number: 10, parent_hash: Some("0xabc".to_owned()) }
);
}

#[test]
fn unknown_strategy_is_rejected() {
let deser = serde_yaml::Deserializer::from_str("Bogus");
let res: Result<RetentionConfig, _> = serde_yaml::with::singleton_map_recursive::deserialize(deser);
assert!(res.is_err());
}

#[test]
fn unknown_field_is_rejected() {
let deser = serde_yaml::Deserializer::from_str("Api:\n max_blcks: 5");
let res: Result<RetentionConfig, _> = serde_yaml::with::singleton_map_recursive::deserialize(deser);
assert!(res.is_err());
}
}
32 changes: 29 additions & 3 deletions crates/hotblocks/src/dataset_controller/dataset_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl DatasetController {
dataset_id: DatasetId,
dataset_kind: DatasetKind,
retention: RetentionStrategy,
max_blocks: Option<u64>,
data_sources: Vec<ReqwestDataClient>
) -> anyhow::Result<Self> {
let mut write = WriteController::new(db.clone(), dataset_id, dataset_kind)?;
Expand All @@ -63,6 +64,7 @@ impl DatasetController {
dataset_id,
dataset_kind,
data_sources,
max_blocks,
retention_recv,
head_sender,
finalized_head_sender
Expand Down Expand Up @@ -283,6 +285,10 @@ struct Ctl {
dataset_id: DatasetId,
dataset_kind: DatasetKind,
data_sources: Vec<ReqwestDataClient>,
// Static safety cap on retained blocks for Api-controlled datasets: even
// if the portal stops advancing the floor, the tail is trimmed to keep at
// most this many blocks behind the tip. `None` means grow indefinitely.
max_blocks: Option<u64>,
retention_recv: tokio::sync::watch::Receiver<RetentionStrategy>,
head_sender: tokio::sync::watch::Sender<Option<BlockRef>>,
finalized_head_sender: tokio::sync::watch::Sender<Option<BlockRef>>
Expand Down Expand Up @@ -343,17 +349,18 @@ impl Ctl {
let retention = self.retention_recv.borrow_and_update().clone();
let mut state = match retention {
RetentionStrategy::FromBlock { number, parent_hash } => {
let (number, parent_hash) = self.clamp_floor(&write, number, parent_hash);
if !write.starts_at(number, &parent_hash) {
blocking! {
write.retain(number, parent_hash)
}?;
}
State::Init { head: None }
State::Init { head: self.max_blocks }
}
RetentionStrategy::Head(n) => State::Init { head: Some(n) },
RetentionStrategy::None => {
if write.write.head().is_some() {
State::Init { head: None }
State::Init { head: self.max_blocks }
} else {
State::Idle
}
Expand Down Expand Up @@ -456,17 +463,36 @@ impl Ctl {
}
}

// With a storage cap, the tail trimmer may have already advanced the front
// past a stale/lagging portal floor. Retaining below the current front is
// treated as a gap and would drop the whole dataset, so never move the floor
// backwards. The posted parent_hash belongs to the original (lower) block,
// so it must be dropped when clamping or the hash check would wipe data.
fn clamp_floor(
&self,
write: &WriteCtx,
number: BlockNumber,
parent_hash: Option<String>
) -> (BlockNumber, Option<String>) {
if self.max_blocks.is_some() && number < write.write.start_block() {
(write.write.start_block(), None)
} else {
(number, parent_hash)
}
}

async fn handle_retention_change(&mut self, state: &mut State, mut write: WriteCtx) -> anyhow::Result<WriteCtx> {
// need this variable to please the compiler
let retention = self.retention_recv.borrow_and_update().clone();
match retention {
RetentionStrategy::FromBlock { number, parent_hash } => {
let (number, parent_hash) = self.clamp_floor(&write, number, parent_hash);
let will_erase_head = write.write.head().map_or(false, |h| h.number < number) || // FromBlock is greater than current head, so everything is cleared
write.write.start_block() > number; // FromBlock is less than current front, dropping everything by design
blocking_write!(write, write.retain(number, parent_hash))?;
match state {
State::Ingest { .. } if !will_erase_head => {} // Keep ingesting, head is valid
_ => *state = State::Init { head: None } // New ingest needed
_ => *state = State::Init { head: self.max_blocks } // New ingest needed
}
}
RetentionStrategy::Head(n) => match state {
Expand Down
Loading