Skip to content

Commit

Permalink
fix: indexer event scanning is broken after epoch change (#1121)
Browse files Browse the repository at this point in the history
Description
---
* In the indexer:
    * Refactored the event scanner to take into account epoch changes
* New db methods `get_oldest_scanned_epoch` and
`delete_scanned_epochs_older_than` to support the event scanning
* In the validator node:
* Refactored the block sync task to take into account epoch changes and
to use the existing `up_to_epoch` parameter (it was ignored previously)
* New db method `blocks_get_first_in_epoch` to support the new block
sync logic

Motivation and Context
---
Recently the epoch change logic was reworked, which broke the event
scanning functionality in the indexer. This PR fixes it.

Fixes #1111

How Has This Been Tested?
---
Manually by running `tari_swarm`, doing transactions, and
stopping/resuming the network

What process can a PR reviewer use to test or verify this change?
---
See previous section


Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
mrnaveira committed Aug 23, 2024
1 parent 7f0a322 commit 6dc89c8
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 28 deletions.
90 changes: 69 additions & 21 deletions applications/tari_indexer/src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,28 +127,64 @@ impl EventScanner {

let mut event_count = 0;

let current_epoch = self.epoch_manager.current_epoch().await?;
let current_committees = self.epoch_manager.get_committees(current_epoch).await?;
for (shard_group, mut committee) in current_committees {
let newest_epoch = self.epoch_manager.current_epoch().await?;
let oldest_scanned_epoch = self.get_oldest_scanned_epoch().await?;

match oldest_scanned_epoch {
Some(oldest_epoch) => {
// we could span multiple cuncurrent epoch scans
// but we want to avoid gaps of the latest scanned value if any of the intermediate epoch scans fail
for epoch_idx in oldest_epoch.0..=newest_epoch.0 {
let epoch = Epoch(epoch_idx);
event_count += self.scan_events_of_epoch(epoch).await?;

// at this point we can assume the previous epochs have been fully scanned
self.delete_scanned_epochs_older_than(epoch).await?;
}
},
None => {
// by default we start scanning since the current epoch
// TODO: it would be nice a new parameter in the indexer to spcify a custom starting epoch
event_count += self.scan_events_of_epoch(newest_epoch).await?;
},
}

info!(
target: LOG_TARGET,
"Scanned {} events",
event_count
);

Ok(event_count)
}

async fn scan_events_of_epoch(&self, epoch: Epoch) -> Result<usize, anyhow::Error> {
let committees = self.epoch_manager.get_committees(epoch).await?;

let mut event_count = 0;

for (shard_group, mut committee) in committees {
info!(
target: LOG_TARGET,
"Scanning committee epoch={}, sg={}",
current_epoch,
epoch,
shard_group
);
let new_blocks = self
.get_new_blocks_from_committee(shard_group, &mut committee, current_epoch)
.get_new_blocks_from_committee(shard_group, &mut committee, epoch)
.await?;
info!(
target: LOG_TARGET,
"Scanned {} blocks",
new_blocks.len()
"Scanned {} blocks in epoch={}",
new_blocks.len(),
epoch,
);
let transactions = self.extract_transactions_from_blocks(new_blocks);
info!(
target: LOG_TARGET,
"Scanned {} transactions",
transactions.len()
"Scanned {} transactions in epoch={}",
transactions.len(),
epoch,
);

for transaction in transactions {
Expand All @@ -161,22 +197,23 @@ impl EventScanner {
events.into_iter().filter(|ev| self.should_persist_event(ev)).collect();
info!(
target: LOG_TARGET,
"Filtered events: {}",
"Filtered events in epoch {}: {}",
epoch,
filtered_events.len()
);
self.store_events_in_db(&filtered_events, transaction).await?;
}
}

info!(
target: LOG_TARGET,
"Scanned {} events",
event_count
);

Ok(event_count)
}

async fn delete_scanned_epochs_older_than(&self, epoch: Epoch) -> Result<(), anyhow::Error> {
self.substate_store
.with_write_tx(|tx| tx.delete_scanned_epochs_older_than(epoch))
.map_err(|e| e.into())
}

fn should_persist_event(&self, event_data: &EventData) -> bool {
for filter in &self.event_filters {
if Self::event_matches_filter(filter, &event_data.event) {
Expand Down Expand Up @@ -416,6 +453,12 @@ impl EventScanner {
*start_block.id()
}

async fn get_oldest_scanned_epoch(&self) -> Result<Option<Epoch>, anyhow::Error> {
self.substate_store
.with_read_tx(|tx| tx.get_oldest_scanned_epoch())
.map_err(|e| e.into())
}

#[allow(unused_assignments)]
async fn get_new_blocks_from_committee(
&self,
Expand Down Expand Up @@ -451,7 +494,7 @@ impl EventScanner {
epoch,
shard_group
);
let resp = self.get_blocks_from_vn(member, start_block_id).await;
let resp = self.get_blocks_from_vn(member, start_block_id, Some(epoch)).await;

match resp {
Ok(blocks) => {
Expand All @@ -464,11 +507,15 @@ impl EventScanner {
epoch,
shard_group,
);
if let Some(block) = blocks.last() {

// get the most recent block among all scanned blocks in the epoch
let last_block = blocks.iter().max_by_key(|b| (b.epoch(), b.height()));

if let Some(block) = last_block {
last_block_id = *block.id();
// Store the latest scanned block id in the database for future scans
self.save_scanned_block_id(epoch, shard_group, last_block_id)?;
}
// Store the latest scanned block id in the database for future scans
self.save_scanned_block_id(epoch, shard_group, last_block_id)?;
return Ok(blocks);
},
Err(e) => {
Expand Down Expand Up @@ -524,6 +571,7 @@ impl EventScanner {
&self,
vn_addr: &PeerAddress,
start_block_id: BlockId,
up_to_epoch: Option<Epoch>,
) -> Result<Vec<Block>, anyhow::Error> {
let mut blocks = vec![];

Expand All @@ -533,7 +581,7 @@ impl EventScanner {
let mut stream = client
.sync_blocks(SyncBlocksRequest {
start_block_id: start_block_id.as_bytes().to_vec(),
up_to_epoch: None,
up_to_epoch: up_to_epoch.map(|epoch| epoch.into()),
})
.await?;
while let Some(resp) = stream.next().await {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ pub trait SubstateStoreReadTransaction {
limit: u32,
) -> Result<Vec<Event>, StorageError>;
fn event_exists(&mut self, event: NewEvent) -> Result<bool, StorageError>;
fn get_oldest_scanned_epoch(&mut self) -> Result<Option<Epoch>, StorageError>;
fn get_last_scanned_block_id(
&mut self,
epoch: Epoch,
Expand Down Expand Up @@ -601,6 +602,26 @@ impl SubstateStoreReadTransaction for SqliteSubstateStoreReadTransaction<'_> {
Ok(exists)
}

fn get_oldest_scanned_epoch(&mut self) -> Result<Option<Epoch>, StorageError> {
use crate::substate_storage_sqlite::schema::scanned_block_ids;

let res: Option<i64> = scanned_block_ids::table
.select(diesel::dsl::min(scanned_block_ids::epoch))
.first(self.connection())
.map_err(|e| StorageError::QueryError {
reason: format!("get_oldest_scanned_epoch: {}", e),
})?;

let oldest_epoch = res
.map(|r| {
let epoch_as_u64 = r.try_into().map_err(|_| StorageError::InvalidIntegerCast)?;
Ok::<Epoch, StorageError>(Epoch(epoch_as_u64))
})
.transpose()?;

Ok(oldest_epoch)
}

fn get_last_scanned_block_id(
&mut self,
epoch: Epoch,
Expand Down Expand Up @@ -656,6 +677,7 @@ pub trait SubstateStoreWriteTransaction {
fn add_non_fungible_index(&mut self, new_nft_index: NewNonFungibleIndex) -> Result<(), StorageError>;
fn save_event(&mut self, new_event: NewEvent) -> Result<(), StorageError>;
fn save_scanned_block_id(&mut self, new_scanned_block_id: NewScannedBlockId) -> Result<(), StorageError>;
fn delete_scanned_epochs_older_than(&mut self, epoch: Epoch) -> Result<(), StorageError>;
}

impl SubstateStoreWriteTransaction for SqliteSubstateStoreWriteTransaction<'_> {
Expand Down Expand Up @@ -818,6 +840,19 @@ impl SubstateStoreWriteTransaction for SqliteSubstateStoreWriteTransaction<'_> {

Ok(())
}

fn delete_scanned_epochs_older_than(&mut self, epoch: Epoch) -> Result<(), StorageError> {
use crate::substate_storage_sqlite::schema::scanned_block_ids;

diesel::delete(scanned_block_ids::table)
.filter(scanned_block_ids::epoch.lt(epoch.0 as i64))
.execute(&mut *self.connection())
.map_err(|e| StorageError::QueryError {
reason: format!("delete_scanned_epochs_older_than: {}", e),
})?;

Ok(())
}
}

impl<'a> Deref for SqliteSubstateStoreWriteTransaction<'a> {
Expand Down
32 changes: 27 additions & 5 deletions applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::collections::HashSet;

use log::*;
use tari_dan_common_types::Epoch;
use tari_dan_common_types::{optional::Optional, Epoch};
use tari_dan_p2p::proto::rpc::{sync_blocks_response::SyncData, QuorumCertificates, SyncBlocksResponse, Transactions};
use tari_dan_storage::{
consensus_models::{Block, BlockId, QuorumCertificate, SubstateUpdate, TransactionRecord},
Expand All @@ -30,7 +30,7 @@ type BlockBuffer = Vec<BlockData>;
pub struct BlockSyncTask<TStateStore: StateStore> {
store: TStateStore,
start_block: Block,
_up_to_epoch: Option<Epoch>,
up_to_epoch: Option<Epoch>,
sender: mpsc::Sender<Result<SyncBlocksResponse, RpcStatus>>,
}

Expand All @@ -44,7 +44,7 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
Self {
store,
start_block,
_up_to_epoch: up_to_epoch,
up_to_epoch,
sender,
}
}
Expand Down Expand Up @@ -110,11 +110,33 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
let mut current_block_id = *current_block_id;
let mut last_block_id = current_block_id;
loop {
let children = tx.blocks_get_all_by_parent(&current_block_id)?;
let Some(child) = children.into_iter().find(|b| b.is_committed()) else {
let current_block = tx.blocks_get(&current_block_id)?;

// Find the next block in the database
let child = if current_block.is_epoch_end() {
// The current block is the last one in the epoch,
// so we need to find the first block in the next expoch
tx.blocks_get_genesis_for_epoch(current_block.epoch() + Epoch(1))
.optional()?
} else {
// The current block is NOT the last one in the epoch,
// so we need to find a child block
let children = tx.blocks_get_all_by_parent(&current_block_id)?;
children.into_iter().find(|b| b.is_committed())
};

// If there is not a new block then we stop streaming
let Some(child) = child else {
break;
};

// If we hit the max allowed epoch then we stop streaming
if let Some(epoch) = self.up_to_epoch {
if child.epoch() > epoch {
break;
}
}

current_block_id = *child.id();
if child.is_dummy() {
continue;
Expand Down
8 changes: 6 additions & 2 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,16 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found(format!("start_block_id {start_block_id} not found")))?;

// Check that the start block
// Check that the start block is not after the locked block
let locked_block = store
.with_read_tx(|tx| LockedBlock::get(tx).optional())
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found("No locked block"))?;
if start_block.height() > locked_block.height() {
let epoch_is_after = start_block.epoch() > locked_block.epoch();
let height_is_after =
(start_block.epoch() == locked_block.epoch) && (start_block.height() > locked_block.height());

if epoch_is_after || height_is_after {
return Err(RpcStatus::not_found(format!(
"start_block_id {} is after locked block {}",
start_block_id, locked_block
Expand Down
25 changes: 25 additions & 0 deletions dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,31 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor
block.try_convert(qc)
}

fn blocks_get_genesis_for_epoch(&self, epoch: Epoch) -> Result<Block, StorageError> {
use crate::schema::{blocks, quorum_certificates};

let (block, qc) = blocks::table
.left_join(quorum_certificates::table.on(blocks::qc_id.eq(quorum_certificates::qc_id)))
.select((blocks::all_columns, quorum_certificates::all_columns.nullable()))
.filter(blocks::epoch.eq(epoch.as_u64() as i64))
.filter(blocks::height.eq(0))
.first::<(sql_models::Block, Option<sql_models::QuorumCertificate>)>(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
operation: "blocks_get_genesis_for_epoch",
source: e,
})?;

let qc = qc.ok_or_else(|| SqliteStorageError::DbInconsistency {
operation: "blocks_get_genesis_for_epoch",
details: format!(
"block {} references non-existent quorum certificate {}",
block.id, block.qc_id
),
})?;

block.try_convert(qc)
}

fn blocks_get_last_n_in_epoch(&self, n: usize, epoch: Epoch) -> Result<Vec<Block>, StorageError> {
use crate::schema::{blocks, quorum_certificates};

Expand Down
1 change: 1 addition & 0 deletions dan_layer/storage/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub trait StateStoreReadTransaction: Sized {
from_block_id: &BlockId,
) -> Result<BlockTransactionExecution, StorageError>;
fn blocks_get(&self, block_id: &BlockId) -> Result<Block, StorageError>;
fn blocks_get_genesis_for_epoch(&self, epoch: Epoch) -> Result<Block, StorageError>;
fn blocks_get_last_n_in_epoch(&self, n: usize, epoch: Epoch) -> Result<Vec<Block>, StorageError>;
/// Returns all blocks from and excluding the start block (lower height) to the end block (inclusive)
fn blocks_get_all_between(
Expand Down

0 comments on commit 6dc89c8

Please sign in to comment.