Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/blocks/tipset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
networks::{calibnet, mainnet},
shim::clock::ChainEpoch,
utils::{
ShallowClone,
cid::CidCborExt,
db::{CborStoreExt, car_stream::CarBlock},
get_size::nunny_vec_heap_size_helper,
Expand Down Expand Up @@ -195,6 +196,15 @@ pub struct Tipset {
key: Arc<OnceLock<TipsetKey>>,
}

impl ShallowClone for Tipset {
fn shallow_clone(&self) -> Self {
Self {
headers: self.headers.shallow_clone(),
key: self.key.shallow_clone(),
}
}
}

impl From<RawBlockHeader> for Tipset {
fn from(value: RawBlockHeader) -> Self {
Self::from(CachingBlockHeader::from(value))
Expand Down
5 changes: 3 additions & 2 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::cid_collections::CidHashSet;
use crate::db::car::forest::{self, ForestCarFrame, finalize_frame};
use crate::db::{SettingsStore, SettingsStoreExt};
use crate::ipld::stream_chain;
use crate::utils::ShallowClone as _;
use crate::utils::db::car_stream::{CarBlock, CarBlockWrite};
use crate::utils::io::{AsyncWriterWithChecksum, Checksum};
use crate::utils::multihash::MultihashCode;
Expand Down Expand Up @@ -167,8 +168,8 @@ async fn export_to_forest_car<D: Digest>(
// block size is between 1kb and 2kb.
1024,
stream_chain(
Arc::clone(db),
tipset.clone().chain_owned(Arc::clone(db)),
db.shallow_clone(),
tipset.shallow_clone().chain_owned(db.shallow_clone()),
stateroot_lookup_limit,
)
.with_seen(seen)
Expand Down
41 changes: 20 additions & 21 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::networks::{ChainConfig, Height};
use crate::rpc::eth::{eth_tx_from_signed_eth_message, types::EthHash};
use crate::shim::clock::ChainEpoch;
use crate::shim::{executor::Receipt, message::Message, version::NetworkVersion};
use crate::utils::ShallowClone;
use crate::utils::db::{BlockstoreExt, CborStoreExt};
use crate::{
blocks::{CachingBlockHeader, Tipset, TipsetKey, TxMeta},
Expand Down Expand Up @@ -76,7 +77,7 @@ pub struct ChainStore<DB> {
f3_finalized_tipset: Arc<RwLock<Option<Tipset>>>,

/// Used as a cache for tipset `lookbacks`.
chain_index: Arc<ChainIndex<Arc<DB>>>,
chain_index: ChainIndex<DB>,

/// Tracks blocks for the purpose of forming tipsets.
tipset_tracker: TipsetTracker<DB>,
Expand Down Expand Up @@ -144,22 +145,20 @@ where
};
let heaviest_tipset = Arc::new(RwLock::new(head));
let f3_finalized_tipset: Arc<RwLock<Option<Tipset>>> = Default::default();
let chain_index = Arc::new(
ChainIndex::new(db.clone()).with_is_tipset_finalized(Box::new({
let chain_finality = chain_config.policy.chain_finality;
let heaviest_tipset = heaviest_tipset.clone();
let f3_finalized_tipset = f3_finalized_tipset.clone();
move |ts| {
let finalized = f3_finalized_tipset
.read()
.as_ref()
.map(|ts| ts.epoch())
.unwrap_or_default()
.max(heaviest_tipset.read().epoch() - chain_finality);
ts.epoch() <= finalized
}
})),
);
let chain_index = ChainIndex::new(db.clone()).with_is_tipset_finalized(Arc::new({
let chain_finality = chain_config.policy.chain_finality;
let heaviest_tipset = heaviest_tipset.clone();
let f3_finalized_tipset = f3_finalized_tipset.clone();
move |ts| {
let finalized = f3_finalized_tipset
.read()
.as_ref()
.map(|ts| ts.epoch())
.unwrap_or_default()
.max(heaviest_tipset.read().epoch() - chain_finality);
ts.epoch() <= finalized
}
}));
let cs = Self {
head_changes_tx: publisher,
chain_index,
Expand Down Expand Up @@ -290,7 +289,7 @@ where
}

/// Returns the chain index
pub fn chain_index(&self) -> &Arc<ChainIndex<Arc<DB>>> {
pub fn chain_index(&self) -> &ChainIndex<DB> {
&self.chain_index
}

Expand Down Expand Up @@ -393,7 +392,7 @@ where
/// is usually 900. The `heaviest_tipset` is a reference point in the
/// blockchain. It must be a child of the look-back tipset.
pub fn get_lookback_tipset_for_round(
chain_index: &Arc<ChainIndex<Arc<DB>>>,
chain_index: &ChainIndex<DB>,
chain_config: &Arc<ChainConfig>,
heaviest_tipset: &Tipset,
round: ChainEpoch,
Expand All @@ -417,8 +416,8 @@ where
let beacon = Arc::new(chain_config.get_beacon_schedule(genesis_timestamp));
let ExecutedTipset { state_root, .. } = crate::state_manager::apply_block_messages(
genesis_timestamp,
Arc::clone(chain_index),
Arc::clone(chain_config),
chain_index.shallow_clone(),
chain_config.shallow_clone(),
beacon,
// Using shared WASM engine here as creating new WASM engines is expensive
// (takes seconds to minutes). It's only acceptable here because this situation is
Expand Down
22 changes: 17 additions & 5 deletions src/chain/store/index.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::num::NonZeroUsize;
use std::{num::NonZeroUsize, sync::Arc};

use crate::beacon::{BeaconEntry, IGNORE_DRAND};
use crate::blocks::{Tipset, TipsetKey};
use crate::chain::Error;
use crate::metrics;
use crate::shim::clock::ChainEpoch;
use crate::utils::ShallowClone;
use crate::utils::cache::SizeTrackingLruCache;
use fvm_ipld_blockstore::Blockstore;
use itertools::Itertools;
Expand All @@ -20,7 +21,7 @@ type TipsetCache = SizeTrackingLruCache<TipsetKey, Tipset>;

type TipsetHeightCache = SizeTrackingLruCache<ChainEpoch, TipsetKey>;

type IsTipsetFinalizedFn = Box<dyn Fn(&Tipset) -> bool + Send + Sync>;
type IsTipsetFinalizedFn = Arc<dyn Fn(&Tipset) -> bool + Send + Sync>;

/// Keeps look-back tipsets in cache at a given interval `skip_length` and can
/// be used to look-back at the chain to retrieve an old tipset.
Expand All @@ -30,11 +31,22 @@ pub struct ChainIndex<DB> {
/// epoch to tipset key mappings.
ts_height_cache: TipsetHeightCache,
/// `Blockstore` pointer needed to load tipsets from cold storage.
db: DB,
db: Arc<DB>,
/// check whether a tipset is finalized
is_tipset_finalized: Option<IsTipsetFinalizedFn>,
}

impl<DB> ShallowClone for ChainIndex<DB> {
fn shallow_clone(&self) -> Self {
Self {
ts_cache: self.ts_cache.shallow_clone(),
ts_height_cache: self.ts_height_cache.shallow_clone(),
db: self.db.shallow_clone(),
is_tipset_finalized: self.is_tipset_finalized.clone(),
}
}
}

#[derive(Debug, Clone, Copy)]
/// Methods for resolving fetches of null tipsets.
/// Imagine epoch 10 is null but epoch 9 and 11 exist. If epoch we request epoch
Expand All @@ -45,7 +57,7 @@ pub enum ResolveNullTipset {
}

impl<DB: Blockstore> ChainIndex<DB> {
pub fn new(db: DB) -> Self {
pub fn new(db: Arc<DB>) -> Self {
let ts_cache =
SizeTrackingLruCache::new_with_metrics("tipset".into(), DEFAULT_TIPSET_CACHE_SIZE);
let ts_height_cache: SizeTrackingLruCache<ChainEpoch, TipsetKey> =
Expand All @@ -68,7 +80,7 @@ impl<DB: Blockstore> ChainIndex<DB> {
self
}

pub fn db(&self) -> &DB {
pub fn db(&self) -> &Arc<DB> {
&self.db
}

Expand Down
12 changes: 10 additions & 2 deletions src/chain_sync/bad_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::num::NonZeroUsize;
use cid::Cid;
use nonzero_ext::nonzero;

use crate::utils::{cache::SizeTrackingLruCache, get_size};
use crate::utils::{ShallowClone, cache::SizeTrackingLruCache, get_size};

/// Default capacity for CID caches (32768 entries).
/// That's about 4 MiB.
Expand Down Expand Up @@ -47,11 +47,19 @@ impl BadBlockCache {

/// Thread-safe LRU cache for tracking recently seen gossip block CIDs.
/// Used to de-duplicate gossip blocks before expensive message fetching.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct SeenBlockCache {
cache: SizeTrackingLruCache<get_size::CidWrapper, ()>,
}

impl ShallowClone for SeenBlockCache {
fn shallow_clone(&self) -> Self {
Self {
cache: self.cache.shallow_clone(),
}
}
}

impl Default for SeenBlockCache {
fn default() -> Self {
Self::new(DEFAULT_CID_CACHE_CAPACITY)
Expand Down
21 changes: 11 additions & 10 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{
networks::calculate_expected_epoch,
shim::clock::ChainEpoch,
state_manager::StateManager,
utils::ShallowClone as _,
};
use ahash::{HashMap, HashSet};
use chrono::Utc;
Expand Down Expand Up @@ -153,12 +154,12 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(

// Increment metrics, update peer information, and forward tipsets to the state machine.
set.spawn({
let state_manager = state_manager.clone();
let state_changed = state_changed.clone();
let state_machine = state_machine.clone();
let network = network.clone();
let bad_block_cache = bad_block_cache.clone();
let seen_block_cache = seen_block_cache.clone();
let state_manager = state_manager.shallow_clone();
let state_changed = state_changed.shallow_clone();
let state_machine = state_machine.shallow_clone();
let network = network.shallow_clone();
let bad_block_cache = bad_block_cache.shallow_clone();
let seen_block_cache = seen_block_cache.shallow_clone();
async move {
while let Ok(event) = network_rx.recv_async().await {
inc_gossipsub_event_metrics(&event);
Expand Down Expand Up @@ -272,10 +273,10 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
let new = tasks_set.insert(task.clone());
if new {
let action = task.clone().execute(
network.clone(),
state_manager.clone(),
network.shallow_clone(),
state_manager.shallow_clone(),
stateless_mode,
bad_block_cache.clone(),
bad_block_cache.shallow_clone(),
);
tokio::spawn({
let tasks = tasks.clone();
Expand Down Expand Up @@ -389,7 +390,7 @@ fn update_peer_info<DB: Blockstore + Sync + Send + 'static>(
let genesis_cid = *genesis.block_headers().first().cid();
// Spawn and immediately move on to the next event
tokio::task::spawn(handle_peer_connected_event(
network.clone(),
network.shallow_clone(),
chain_store,
*peer_id,
genesis_cid,
Expand Down
9 changes: 5 additions & 4 deletions src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
rpc::RequestResponseError,
},
utils::{
ShallowClone,
misc::{AdaptiveValueProvider, ExponentialAdaptiveValueProvider},
stats::Stats,
},
Expand Down Expand Up @@ -66,12 +67,12 @@ pub struct SyncNetworkContext<DB> {
db: Arc<DB>,
}

impl<DB> Clone for SyncNetworkContext<DB> {
fn clone(&self) -> Self {
impl<DB> ShallowClone for SyncNetworkContext<DB> {
fn shallow_clone(&self) -> Self {
Self {
network_send: self.network_send.clone(),
peer_manager: self.peer_manager.clone(),
db: self.db.clone(),
peer_manager: self.peer_manager.shallow_clone(),
db: self.db.shallow_clone(),
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions src/chain_sync/tipset_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::shim::{
};
use crate::state_manager::ExecutedTipset;
use crate::state_manager::{Error as StateManagerError, StateManager, utils::is_valid_for_sending};
use crate::utils::ShallowClone as _;
use crate::{
blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
Expand Down Expand Up @@ -224,17 +225,17 @@ async fn validate_block<DB: Blockstore + Sync + Send + 'static>(

// Check block messages
validations.spawn(check_block_messages(
state_manager.clone(),
block.clone(),
base_tipset.clone(),
state_manager.shallow_clone(),
block.shallow_clone(),
base_tipset.shallow_clone(),
));

// Base fee check
validations.spawn_blocking({
let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
let base_tipset = base_tipset.clone();
let base_tipset = base_tipset.shallow_clone();
let block_store = state_manager.blockstore_owned();
let block = Arc::clone(&block);
let block = block.shallow_clone();
move || {
let base_fee = crate::chain::compute_base_fee(&block_store, &base_tipset, smoke_height)
.map_err(|e| {
Expand All @@ -253,7 +254,7 @@ async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
// Parent weight calculation check
validations.spawn_blocking({
let block_store = state_manager.blockstore_owned();
let base_tipset = base_tipset.clone();
let base_tipset = base_tipset.shallow_clone();
let weight = header.weight.clone();
move || {
let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
Expand Down
3 changes: 2 additions & 1 deletion src/cli/subcommands/chain_cmd/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
},
},
shim::econ::{BLOCK_GAS_LIMIT, TokenAmount},
utils::ShallowClone as _,
};

/// View a segment of the chain
Expand All @@ -43,7 +44,7 @@ impl ChainListCommand {
};
let mut tipsets = Vec::with_capacity(count);
loop {
tipsets.push(ts.clone());
tipsets.push(ts.shallow_clone());
if ts.epoch() == 0 || tipsets.len() >= count {
break;
}
Expand Down
5 changes: 3 additions & 2 deletions src/daemon/db_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::networks::ChainConfig;
use crate::rpc::sync::SnapshotProgressTracker;
use crate::shim::clock::ChainEpoch;
use crate::state_manager::StateManager;
use crate::utils::ShallowClone as _;
use crate::utils::db::car_stream::CarStream;
use crate::utils::io::EitherMmapOrRandomAccessFile;
use crate::utils::net::{DownloadFileOption, download_to};
Expand Down Expand Up @@ -390,7 +391,7 @@ where
match spec {
RangeSpec::To(to_epoch) => {
for ts in head_ts
.clone()
.shallow_clone()
.chain(&state_manager.chain_store().blockstore())
.take_while(|ts| ts.epoch() >= to_epoch)
{
Expand All @@ -400,7 +401,7 @@ where
}
RangeSpec::NumTipsets(n_tipsets) => {
for ts in head_ts
.clone()
.shallow_clone()
.chain(&state_manager.chain_store().blockstore())
.take(n_tipsets)
{
Expand Down
Loading
Loading