Skip to content

Commit

Permalink
Logs
Browse files Browse the repository at this point in the history
  • Loading branch information
squadgazzz committed Aug 29, 2024
1 parent ca10f6f commit 6a7f7e5
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions crates/shared/src/event_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,15 @@ where
// first get the blocks needed to update `last_handled_blocks` because if it
// fails, it's safer to fail at the beginning of the function before we
// update Storage
tracing::info!("newlog update_events_from_old_blocks 0");
let blocks = self
.block_retriever
.blocks(RangeInclusive::try_new(
range.end().saturating_sub(MAX_REORG_BLOCK_COUNT),
*range.end(),
)?)
.await?;
tracing::info!("newlog update_events_from_old_blocks 1");

let events = self
.past_events_by_block_number_range(&range)
Expand All @@ -290,6 +292,7 @@ where
.chunks(INSERT_EVENT_BATCH_SIZE)
.map(|chunk| chunk.into_iter().collect::<Result<Vec<_>, _>>());
futures::pin_mut!(events);
tracing::info!("newlog update_events_from_old_blocks 2");
// We intentionally do not go with the obvious approach of deleting old events
// first and then inserting new ones. Instead, we make sure that the
// deletion and the insertion of the first batch of events happen in one
Expand Down Expand Up @@ -318,28 +321,36 @@ where
// we would all have to in one transaction.
let mut have_deleted_old_events = false;
while let Some(events_chunk) = events.next().await {
tracing::info!("newlog update_events_from_old_blocks 3");
// Early return on error (through `?`) is important here so that the second
// !have_deleted_old_events check (after the loop) is correct.
let unwrapped_events = events_chunk.context("failed to get next chunk of events")?;
if !have_deleted_old_events {
tracing::info!("newlog update_events_from_old_blocks 4");
self.store
.replace_events(unwrapped_events, range.clone())
.await?;
tracing::info!("newlog update_events_from_old_blocks 5");
have_deleted_old_events = true;
} else {
tracing::info!("newlog update_events_from_old_blocks 6");
self.store.append_events(unwrapped_events).await?;
tracing::info!("newlog update_events_from_old_blocks 7");
};
}
// The `chunks` adaptor does not return an empty chunk if the stream was
// completely empty. However we do want to delete old events in this
// case as a rerorg might have removed events without adding new ones.
tracing::info!("newlog update_events_from_old_blocks 8");
if !have_deleted_old_events {
tracing::info!("newlog update_events_from_old_blocks 9");
self.store.replace_events(Vec::new(), range.clone()).await?;
tracing::info!("newlog update_events_from_old_blocks 10");
}
tracing::info!("newlog update_events_from_old_blocks 1");
tracing::info!("newlog update_events_from_old_blocks 11");

self.update_last_handled_blocks(&blocks);
tracing::info!("newlog update_events_from_old_blocks 2");
tracing::info!("newlog update_events_from_old_blocks 12");
Ok(())
}

Expand Down

0 comments on commit 6a7f7e5

Please sign in to comment.