Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions crates/hotblocks/src/dataset_controller/write_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,27 @@ impl WriteController {
finalized_head = valuable(&finalized_head),
))]
pub fn new_chunk(&mut self, finalized_head: Option<&BlockRef>, chunk: &StorageChunk) -> anyhow::Result<()> {
// A data source that has fallen behind can re-deliver a pack that lies entirely
// within the already-finalized region: its blocks end at or below our current
// finalized head and the finalized head it reports is below ours. Finalized blocks
// are immutable, so this is stale, already-committed data from a lagging endpoint,
// not a genuine fork. Skip it instead of failing the dataset update task: otherwise
// a single behind endpoint crash-loops the task every minute (see the bail! below)
// and the dataset stops ingesting from the endpoints that are still ahead.
if let Some(current) = self.finalized_head.as_ref() {
let pack_below_finalized = chunk.last_block() <= current.number
&& finalized_head.map_or(true, |new| new.number < current.number);
if pack_below_finalized {
warn!(
first_block = chunk.first_block(),
last_block = chunk.last_block(),
current_finalized_head = current.number,
"ignoring stale data pack below the current finalized head (lagging data source)"
);
return Ok(());
}
}

// FIXME: accept self.first_block rollback limit
let finalized_head = self.db.update_dataset(self.dataset_id, |tx| {
let new_finalized_head = match (finalized_head, tx.label().finalized_head()) {
Expand Down
Loading