diff --git a/crates/hotblocks/src/dataset_controller/write_controller.rs b/crates/hotblocks/src/dataset_controller/write_controller.rs index 1d74f5c6..269af73e 100644 --- a/crates/hotblocks/src/dataset_controller/write_controller.rs +++ b/crates/hotblocks/src/dataset_controller/write_controller.rs @@ -344,6 +344,27 @@ impl WriteController { finalized_head = valuable(&finalized_head), ))] pub fn new_chunk(&mut self, finalized_head: Option<&BlockRef>, chunk: &StorageChunk) -> anyhow::Result<()> { + // A data source that has fallen behind can re-deliver a pack that lies entirely + // within the already-finalized region: its blocks end at or below our current + // finalized head and the finalized head it reports is below ours. Finalized blocks + // are immutable, so this is stale, already-committed data from a lagging endpoint, + // not a genuine fork. Skip it instead of failing the dataset update task: otherwise + // a single behind endpoint crash-loops the task every minute (see the bail! below) + // and the dataset stops ingesting from the endpoints that are still ahead. + if let Some(current) = self.finalized_head.as_ref() { + let pack_below_finalized = chunk.last_block() <= current.number + && finalized_head.map_or(true, |new| new.number < current.number); + if pack_below_finalized { + warn!( + first_block = chunk.first_block(), + last_block = chunk.last_block(), + current_finalized_head = current.number, + "ignoring stale data pack below the current finalized head (lagging data source)" + ); + return Ok(()); + } + } + // FIXME: accept self.first_block rollback limit let finalized_head = self.db.update_dataset(self.dataset_id, |tx| { let new_finalized_head = match (finalized_head, tx.label().finalized_head()) {