diff --git a/applications/tari_indexer/src/event_scanner.rs b/applications/tari_indexer/src/event_scanner.rs index 3851f5c98..20da8e6e9 100644 --- a/applications/tari_indexer/src/event_scanner.rs +++ b/applications/tari_indexer/src/event_scanner.rs @@ -127,28 +127,64 @@ impl EventScanner { let mut event_count = 0; - let current_epoch = self.epoch_manager.current_epoch().await?; - let current_committees = self.epoch_manager.get_committees(current_epoch).await?; - for (shard_group, mut committee) in current_committees { + let newest_epoch = self.epoch_manager.current_epoch().await?; + let oldest_scanned_epoch = self.get_oldest_scanned_epoch().await?; + + match oldest_scanned_epoch { + Some(oldest_epoch) => { + // we could span multiple cuncurrent epoch scans + // but we want to avoid gaps of the latest scanned value if any of the intermediate epoch scans fail + for epoch_idx in oldest_epoch.0..=newest_epoch.0 { + let epoch = Epoch(epoch_idx); + event_count += self.scan_events_of_epoch(epoch).await?; + + // at this point we can assume the previous epochs have been fully scanned + self.delete_scanned_epochs_older_than(epoch).await?; + } + }, + None => { + // by default we start scanning since the current epoch + // TODO: it would be nice a new parameter in the indexer to spcify a custom starting epoch + event_count += self.scan_events_of_epoch(newest_epoch).await?; + }, + } + + info!( + target: LOG_TARGET, + "Scanned {} events", + event_count + ); + + Ok(event_count) + } + + async fn scan_events_of_epoch(&self, epoch: Epoch) -> Result { + let committees = self.epoch_manager.get_committees(epoch).await?; + + let mut event_count = 0; + + for (shard_group, mut committee) in committees { info!( target: LOG_TARGET, "Scanning committee epoch={}, sg={}", - current_epoch, + epoch, shard_group ); let new_blocks = self - .get_new_blocks_from_committee(shard_group, &mut committee, current_epoch) + .get_new_blocks_from_committee(shard_group, &mut committee, epoch) .await?; info!( target: LOG_TARGET, - "Scanned {} blocks", - new_blocks.len() + "Scanned {} blocks in epoch={}", + new_blocks.len(), + epoch, ); let transactions = self.extract_transactions_from_blocks(new_blocks); info!( target: LOG_TARGET, - "Scanned {} transactions", - transactions.len() + "Scanned {} transactions in epoch={}", + transactions.len(), + epoch, ); for transaction in transactions { @@ -161,22 +197,23 @@ impl EventScanner { events.into_iter().filter(|ev| self.should_persist_event(ev)).collect(); info!( target: LOG_TARGET, - "Filtered events: {}", + "Filtered events in epoch {}: {}", + epoch, filtered_events.len() ); self.store_events_in_db(&filtered_events, transaction).await?; } } - info!( - target: LOG_TARGET, - "Scanned {} events", - event_count - ); - Ok(event_count) } + async fn delete_scanned_epochs_older_than(&self, epoch: Epoch) -> Result<(), anyhow::Error> { + self.substate_store + .with_write_tx(|tx| tx.delete_scanned_epochs_older_than(epoch)) + .map_err(|e| e.into()) + } + fn should_persist_event(&self, event_data: &EventData) -> bool { for filter in &self.event_filters { if Self::event_matches_filter(filter, &event_data.event) { @@ -416,6 +453,12 @@ impl EventScanner { *start_block.id() } + async fn get_oldest_scanned_epoch(&self) -> Result, anyhow::Error> { + self.substate_store + .with_read_tx(|tx| tx.get_oldest_scanned_epoch()) + .map_err(|e| e.into()) + } + #[allow(unused_assignments)] async fn get_new_blocks_from_committee( &self, @@ -451,7 +494,7 @@ impl EventScanner { epoch, shard_group ); - let resp = self.get_blocks_from_vn(member, start_block_id).await; + let resp = self.get_blocks_from_vn(member, start_block_id, Some(epoch)).await; match resp { Ok(blocks) => { @@ -464,11 +507,15 @@ impl EventScanner { epoch, shard_group, ); - if let Some(block) = blocks.last() { + + // get the most recent block among all scanned blocks in the epoch + let last_block = blocks.iter().max_by_key(|b| (b.epoch(), b.height())); + + if let Some(block) = last_block { last_block_id = *block.id(); + // Store the latest scanned block id in the database for future scans + self.save_scanned_block_id(epoch, shard_group, last_block_id)?; } - // Store the latest scanned block id in the database for future scans - self.save_scanned_block_id(epoch, shard_group, last_block_id)?; return Ok(blocks); }, Err(e) => { @@ -524,6 +571,7 @@ impl EventScanner { &self, vn_addr: &PeerAddress, start_block_id: BlockId, + up_to_epoch: Option, ) -> Result, anyhow::Error> { let mut blocks = vec![]; @@ -533,7 +581,7 @@ impl EventScanner { let mut stream = client .sync_blocks(SyncBlocksRequest { start_block_id: start_block_id.as_bytes().to_vec(), - up_to_epoch: None, + up_to_epoch: up_to_epoch.map(|epoch| epoch.into()), }) .await?; while let Some(resp) = stream.next().await { diff --git a/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs b/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs index f8cb3d1bb..972a58840 100644 --- a/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs +++ b/applications/tari_indexer/src/substate_storage_sqlite/sqlite_substate_store_factory.rs @@ -230,6 +230,7 @@ pub trait SubstateStoreReadTransaction { limit: u32, ) -> Result, StorageError>; fn event_exists(&mut self, event: NewEvent) -> Result; + fn get_oldest_scanned_epoch(&mut self) -> Result, StorageError>; fn get_last_scanned_block_id( &mut self, epoch: Epoch, @@ -601,6 +602,26 @@ impl SubstateStoreReadTransaction for SqliteSubstateStoreReadTransaction<'_> { Ok(exists) } + fn get_oldest_scanned_epoch(&mut self) -> Result, StorageError> { + use crate::substate_storage_sqlite::schema::scanned_block_ids; + + let res: Option = scanned_block_ids::table + .select(diesel::dsl::min(scanned_block_ids::epoch)) + .first(self.connection()) + .map_err(|e| StorageError::QueryError { + reason: format!("get_oldest_scanned_epoch: {}", e), + })?; + + let oldest_epoch = res + .map(|r| { + let epoch_as_u64 = r.try_into().map_err(|_| StorageError::InvalidIntegerCast)?; + Ok::(Epoch(epoch_as_u64)) + }) + .transpose()?; + + Ok(oldest_epoch) + } + fn get_last_scanned_block_id( &mut self, epoch: Epoch, @@ -656,6 +677,7 @@ pub trait SubstateStoreWriteTransaction { fn add_non_fungible_index(&mut self, new_nft_index: NewNonFungibleIndex) -> Result<(), StorageError>; fn save_event(&mut self, new_event: NewEvent) -> Result<(), StorageError>; fn save_scanned_block_id(&mut self, new_scanned_block_id: NewScannedBlockId) -> Result<(), StorageError>; + fn delete_scanned_epochs_older_than(&mut self, epoch: Epoch) -> Result<(), StorageError>; } impl SubstateStoreWriteTransaction for SqliteSubstateStoreWriteTransaction<'_> { @@ -818,6 +840,19 @@ impl SubstateStoreWriteTransaction for SqliteSubstateStoreWriteTransaction<'_> { Ok(()) } + + fn delete_scanned_epochs_older_than(&mut self, epoch: Epoch) -> Result<(), StorageError> { + use crate::substate_storage_sqlite::schema::scanned_block_ids; + + diesel::delete(scanned_block_ids::table) + .filter(scanned_block_ids::epoch.lt(epoch.0 as i64)) + .execute(&mut *self.connection()) + .map_err(|e| StorageError::QueryError { + reason: format!("delete_scanned_epochs_older_than: {}", e), + })?; + + Ok(()) + } } impl<'a> Deref for SqliteSubstateStoreWriteTransaction<'a> { diff --git a/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs b/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs index b20ecb6d1..18171f008 100644 --- a/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs +++ b/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs @@ -4,7 +4,7 @@ use std::collections::HashSet; use log::*; -use tari_dan_common_types::Epoch; +use tari_dan_common_types::{optional::Optional, Epoch}; use tari_dan_p2p::proto::rpc::{sync_blocks_response::SyncData, QuorumCertificates, SyncBlocksResponse, Transactions}; use tari_dan_storage::{ consensus_models::{Block, BlockId, QuorumCertificate, SubstateUpdate, TransactionRecord}, @@ -30,7 +30,7 @@ type BlockBuffer = Vec; pub struct BlockSyncTask { store: TStateStore, start_block: Block, - _up_to_epoch: Option, + up_to_epoch: Option, sender: mpsc::Sender>, } @@ -44,7 +44,7 @@ impl BlockSyncTask { Self { store, start_block, - _up_to_epoch: up_to_epoch, + up_to_epoch, sender, } } @@ -110,11 +110,33 @@ impl BlockSyncTask { let mut current_block_id = *current_block_id; let mut last_block_id = current_block_id; loop { - let children = tx.blocks_get_all_by_parent(¤t_block_id)?; - let Some(child) = children.into_iter().find(|b| b.is_committed()) else { + let current_block = tx.blocks_get(¤t_block_id)?; + + // Find the next block in the database + let child = if current_block.is_epoch_end() { + // The current block is the last one in the epoch, + // so we need to find the first block in the next expoch + tx.blocks_get_genesis_for_epoch(current_block.epoch() + Epoch(1)) + .optional()? + } else { + // The current block is NOT the last one in the epoch, + // so we need to find a child block + let children = tx.blocks_get_all_by_parent(¤t_block_id)?; + children.into_iter().find(|b| b.is_committed()) + }; + + // If there is not a new block then we stop streaming + let Some(child) = child else { break; }; + // If we hit the max allowed epoch then we stop streaming + if let Some(epoch) = self.up_to_epoch { + if child.epoch() > epoch { + break; + } + } + current_block_id = *child.id(); if child.is_dummy() { continue; diff --git a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs index 68c6ba465..cc045f3f8 100644 --- a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs +++ b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs @@ -279,12 +279,16 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl { .map_err(RpcStatus::log_internal_error(LOG_TARGET))? .ok_or_else(|| RpcStatus::not_found(format!("start_block_id {start_block_id} not found")))?; - // Check that the start block + // Check that the start block is not after the locked block let locked_block = store .with_read_tx(|tx| LockedBlock::get(tx).optional()) .map_err(RpcStatus::log_internal_error(LOG_TARGET))? .ok_or_else(|| RpcStatus::not_found("No locked block"))?; - if start_block.height() > locked_block.height() { + let epoch_is_after = start_block.epoch() > locked_block.epoch(); + let height_is_after = + (start_block.epoch() == locked_block.epoch) && (start_block.height() > locked_block.height()); + + if epoch_is_after || height_is_after { return Err(RpcStatus::not_found(format!( "start_block_id {} is after locked block {}", start_block_id, locked_block diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index fafa61375..8e6a8ec5f 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -720,6 +720,31 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor block.try_convert(qc) } + fn blocks_get_genesis_for_epoch(&self, epoch: Epoch) -> Result { + use crate::schema::{blocks, quorum_certificates}; + + let (block, qc) = blocks::table + .left_join(quorum_certificates::table.on(blocks::qc_id.eq(quorum_certificates::qc_id))) + .select((blocks::all_columns, quorum_certificates::all_columns.nullable())) + .filter(blocks::epoch.eq(epoch.as_u64() as i64)) + .filter(blocks::height.eq(0)) + .first::<(sql_models::Block, Option)>(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "blocks_get_genesis_for_epoch", + source: e, + })?; + + let qc = qc.ok_or_else(|| SqliteStorageError::DbInconsistency { + operation: "blocks_get_genesis_for_epoch", + details: format!( + "block {} references non-existent quorum certificate {}", + block.id, block.qc_id + ), + })?; + + block.try_convert(qc) + } + fn blocks_get_last_n_in_epoch(&self, n: usize, epoch: Epoch) -> Result, StorageError> { use crate::schema::{blocks, quorum_certificates}; diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index 0d54114ae..215f3d964 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -140,6 +140,7 @@ pub trait StateStoreReadTransaction: Sized { from_block_id: &BlockId, ) -> Result; fn blocks_get(&self, block_id: &BlockId) -> Result; + fn blocks_get_genesis_for_epoch(&self, epoch: Epoch) -> Result; fn blocks_get_last_n_in_epoch(&self, n: usize, epoch: Epoch) -> Result, StorageError>; /// Returns all blocks from and excluding the start block (lower height) to the end block (inclusive) fn blocks_get_all_between(