Skip to content

Commit

Permalink
fix(diri): ensure events are handled sorted by block number
Browse files Browse the repository at this point in the history
  • Loading branch information
ptisserand committed Aug 23, 2024
1 parent 9fdf559 commit 2cf652a
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions crates/diri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,15 @@ impl<S: Storage, E: EventHandler> Diri<S, E> {
)
.await?;

for (block_number, events) in blocks_events {
let block_timestamp = self.block_time(BlockId::Number(block_number)).await?;
// Handle events sorted by block number
let mut block_numbers: Vec<&u64> = blocks_events.keys().collect();
block_numbers.sort();

for block_number in block_numbers {
let block_timestamp = self.block_time(BlockId::Number(*block_number)).await?;
let events = blocks_events.get(block_number).unwrap();
for any_event in events {
let orderbook_event: Event = match any_event.try_into() {
let orderbook_event: Event = match any_event.clone().try_into() {
Ok(ev) => ev,
Err(e) => {
trace!("Event can't be deserialized: {e}");
Expand All @@ -92,38 +96,38 @@ impl<S: Storage, E: EventHandler> Diri<S, E> {
Event::OrderPlaced(ev) => {
trace!("OrderPlaced found: {:?}", ev);
self.storage
.register_placed(block_number, block_timestamp, &ev.into())
.register_placed(*block_number, block_timestamp, &ev.into())
.await?;
}
Event::OrderCancelled(ev) => {
trace!("OrderCancelled found: {:?}", ev);
self.storage
.register_cancelled(block_number, block_timestamp, &ev.into())
.register_cancelled(*block_number, block_timestamp, &ev.into())
.await?;
}
Event::OrderFulfilled(ev) => {
trace!("OrderFulfilled found: {:?}", ev);
self.storage
.register_fulfilled(block_number, block_timestamp, &ev.into())
.register_fulfilled(*block_number, block_timestamp, &ev.into())
.await?;
}
Event::OrderExecuted(ev) => {
trace!("OrderExecuted found: {:?}", ev);
self.storage
.register_executed(block_number, block_timestamp, &ev.into())
.register_executed(*block_number, block_timestamp, &ev.into())
.await?;
}
Event::RollbackStatus(ev) => {
trace!("RollbackStatus found: {:?}", ev);
self.storage
.status_back_to_open(block_number, block_timestamp, &ev.into())
.status_back_to_open(*block_number, block_timestamp, &ev.into())
.await?;
}
_ => warn!("Orderbook event not handled: {:?}", orderbook_event),
};
}

self.event_handler.on_block_processed(block_number).await;
self.event_handler.on_block_processed(*block_number).await;
}

Ok(())
Expand Down

0 comments on commit 2cf652a

Please sign in to comment.