From a4370568b3371bc7de1a4086d2f99d7402e9804d Mon Sep 17 00:00:00 2001 From: Cosmin Damian <17934949+cdamian@users.noreply.github.com> Date: Mon, 7 Oct 2024 15:11:44 +0300 Subject: [PATCH] lp-gateway-queue: Ensure messages are processed in order (#1992) * lp-gateway-queue: Ensure messages are processed in order * lp-gateway-queue: Ensure processed messages are skipped * integration-tests: Remove receipt check --- .../liquidity-pools-gateway-queue/src/lib.rs | 66 ++++++++++++++---- .../src/tests.rs | 69 +++++++++++++++++-- .../integration-tests/src/cases/lp/utils.rs | 5 +- 3 files changed, 121 insertions(+), 19 deletions(-) diff --git a/pallets/liquidity-pools-gateway-queue/src/lib.rs b/pallets/liquidity-pools-gateway-queue/src/lib.rs index 3384fcdd63..0bef83116d 100644 --- a/pallets/liquidity-pools-gateway-queue/src/lib.rs +++ b/pallets/liquidity-pools-gateway-queue/src/lib.rs @@ -22,7 +22,6 @@ use parity_scale_codec::FullCodec; use scale_info::TypeInfo; use sp_arithmetic::traits::BaseArithmetic; use sp_runtime::traits::{EnsureAddAssign, One}; -use sp_std::vec::Vec; #[cfg(test)] mod mock; @@ -61,6 +60,12 @@ pub mod pallet { #[pallet::getter(fn message_nonce_store)] pub type MessageNonceStore = StorageValue<_, T::MessageNonce, ValueQuery>; + /// Storage that is used for keeping track of the last nonce that was + /// processed. + #[pallet::storage] + #[pallet::getter(fn last_processed_nonce)] + pub type LastProcessedNonce = StorageValue<_, T::MessageNonce, ValueQuery>; + /// Storage for messages that will be processed during the `on_idle` hook. #[pallet::storage] #[pallet::getter(fn message_queue)] @@ -93,6 +98,11 @@ pub mod pallet { message: T::Message, error: DispatchError, }, + + /// Maximum number of messages was reached. + MaxNumberOfMessagesWasReached { + last_processed_nonce: T::MessageNonce, + }, } #[pallet::error] @@ -200,17 +210,43 @@ pub mod pallet { } fn service_message_queue(max_weight: Weight) -> Weight { - let mut weight_used = Weight::zero(); + let mut last_processed_nonce = LastProcessedNonce::::get(); + + // 1 read for the last processed nonce + let mut weight_used = T::DbWeight::get().reads(1); - let mut nonces = MessageQueue::::iter_keys().collect::>(); - nonces.sort(); + loop { + if last_processed_nonce.ensure_add_assign(One::one()).is_err() { + Self::deposit_event(Event::::MaxNumberOfMessagesWasReached { + last_processed_nonce, + }); - for nonce in nonces { - let message = - MessageQueue::::get(nonce).expect("valid nonce ensured by `iter_keys`"); + break; + } + // 1 read for the nonce weight_used.saturating_accrue(T::DbWeight::get().reads(1)); + if last_processed_nonce > MessageNonceStore::::get() { + break; + } + + // 1 read for the message + weight_used.saturating_accrue(T::DbWeight::get().reads(1)); + + let message = match MessageQueue::::get(last_processed_nonce) { + Some(msg) => msg, + // No message found at this nonce, we can skip it. + None => { + LastProcessedNonce::::set(last_processed_nonce); + + // 1 write for setting the last processed nonce + weight_used.saturating_accrue(T::DbWeight::get().writes(1)); + + continue; + } + }; + let remaining_weight = max_weight.saturating_sub(weight_used); let next_weight = T::MessageProcessor::max_processing_weight(&message); @@ -219,22 +255,28 @@ pub mod pallet { break; } - let weight = match Self::process_message_and_deposit_event(nonce, message.clone()) { + let processing_weight = match Self::process_message_and_deposit_event( + last_processed_nonce, + message.clone(), + ) { (Ok(()), weight) => weight, (Err(e), weight) => { - FailedMessageQueue::::insert(nonce, (message, e)); + FailedMessageQueue::::insert(last_processed_nonce, (message, e)); // 1 write for the failed message weight.saturating_add(T::DbWeight::get().writes(1)) } }; - weight_used.saturating_accrue(weight); + weight_used.saturating_accrue(processing_weight); + + MessageQueue::::remove(last_processed_nonce); - MessageQueue::::remove(nonce); + LastProcessedNonce::::set(last_processed_nonce); // 1 write for removing the message - weight_used.saturating_accrue(T::DbWeight::get().writes(1)); + // 1 write for setting the last processed nonce + weight_used.saturating_accrue(T::DbWeight::get().writes(2)); } weight_used diff --git a/pallets/liquidity-pools-gateway-queue/src/tests.rs b/pallets/liquidity-pools-gateway-queue/src/tests.rs index 1f89bac710..4eba1097d4 100644 --- a/pallets/liquidity-pools-gateway-queue/src/tests.rs +++ b/pallets/liquidity-pools-gateway-queue/src/tests.rs @@ -6,7 +6,7 @@ use sp_runtime::{traits::BadOrigin, DispatchError}; use crate::{ mock::{new_test_ext, Processor, Queue, Runtime, RuntimeEvent as MockEvent, RuntimeOrigin}, - Error, Event, FailedMessageQueue, MessageQueue, + Error, Event, FailedMessageQueue, LastProcessedNonce, MessageQueue, }; mod utils { @@ -181,7 +181,10 @@ mod process_failed_message { } mod message_queue_impl { + use sp_arithmetic::ArithmeticError::Overflow; + use super::*; + use crate::MessageNonceStore; #[test] fn success() { @@ -197,6 +200,17 @@ mod message_queue_impl { event_exists(Event::::MessageSubmitted { nonce, message }) }); } + + #[test] + fn error_on_max_nonce() { + new_test_ext().execute_with(|| { + let message = 1; + + MessageNonceStore::::set(u64::MAX); + + assert_noop!(Queue::queue(message), Overflow); + }); + } } mod on_idle { @@ -209,7 +223,7 @@ mod on_idle { #[test] fn success_all() { new_test_ext().execute_with(|| { - (1..=3).for_each(|i| MessageQueue::::insert(i as u64, i * 10)); + (1..=3).for_each(|i| Queue::queue(i * 10).unwrap()); Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT); let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT)); @@ -220,13 +234,14 @@ mod on_idle { assert_eq!(handle.times(), 3); assert_eq!(MessageQueue::::iter().count(), 0); assert_eq!(FailedMessageQueue::::iter().count(), 0); + assert_eq!(LastProcessedNonce::::get(), 3) }); } #[test] fn not_all_messages_fit_in_the_block() { new_test_ext().execute_with(|| { - (1..=5).for_each(|i| MessageQueue::::insert(i as u64, i * 10)); + (1..=5).for_each(|i| Queue::queue(i * 10).unwrap()); Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT); let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT)); @@ -245,13 +260,14 @@ mod on_idle { assert_eq!(weight, PROCESS_WEIGHT); assert_eq!(handle.times(), 5); assert_eq!(MessageQueue::::iter().count(), 0); + assert_eq!(LastProcessedNonce::::get(), 5) }); } #[test] fn with_failed_messages() { new_test_ext().execute_with(|| { - (1..=3).for_each(|i| MessageQueue::::insert(i as u64, i * 10)); + (1..=3).for_each(|i| Queue::queue(i * 10).unwrap()); Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT); let handle = Processor::mock_process(|msg| match msg { @@ -265,6 +281,51 @@ mod on_idle { assert_eq!(handle.times(), 3); assert_eq!(MessageQueue::::iter().count(), 0); assert_eq!(FailedMessageQueue::::iter().count(), 1); + assert_eq!(LastProcessedNonce::::get(), 3) + }); + } + + #[test] + fn with_no_messages() { + new_test_ext().execute_with(|| { + let _ = Queue::on_idle(0, TOTAL_WEIGHT); + + assert_eq!(LastProcessedNonce::::get(), 0) + }); + } + + #[test] + fn with_skipped_message_nonce() { + new_test_ext().execute_with(|| { + (1..=3).for_each(|i| Queue::queue(i * 10).unwrap()); + + Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT); + let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT)); + + // Manually process the 2nd nonce, the on_idle hook should skip it and process + // the remaining nonces. + assert_ok!(Queue::process_message(RuntimeOrigin::signed(1), 2)); + + let weight = Queue::on_idle(0, TOTAL_WEIGHT); + + assert_eq!(weight, PROCESS_WEIGHT * 2); + assert_eq!(handle.times(), 3); + assert_eq!(MessageQueue::::iter().count(), 0); + assert_eq!(FailedMessageQueue::::iter().count(), 0); + assert_eq!(LastProcessedNonce::::get(), 3) + }); + } + + #[test] + fn max_messages() { + new_test_ext().execute_with(|| { + LastProcessedNonce::::set(u64::MAX); + + let _ = Queue::on_idle(0, TOTAL_WEIGHT); + + event_exists(Event::::MaxNumberOfMessagesWasReached { + last_processed_nonce: u64::MAX, + }) }); } } diff --git a/runtime/integration-tests/src/cases/lp/utils.rs b/runtime/integration-tests/src/cases/lp/utils.rs index c7d5c72121..b5dee39b4a 100644 --- a/runtime/integration-tests/src/cases/lp/utils.rs +++ b/runtime/integration-tests/src/cases/lp/utils.rs @@ -33,7 +33,7 @@ use staging_xcm::{ use crate::{ cases::lp::{EVM_DOMAIN_CHAIN_ID, EVM_ROUTER_ID, POOL_A, POOL_B, POOL_C}, config::Runtime, - utils::{accounts::Keyring, evm::receipt_ok, last_event, pool::get_tranche_ids}, + utils::{accounts::Keyring, last_event, pool::get_tranche_ids}, }; /// Returns the local representation of a remote ethereum account @@ -86,7 +86,7 @@ pub fn pool_c_tranche_1_id() -> TrancheId { } pub fn verify_outbound_failure_on_lp(to: H160) { - let (_tx, status, receipt) = pallet_ethereum::Pending::::get() + let (_tx, status, _receipt) = pallet_ethereum::Pending::::get() .last() .expect("Queue triggered evm tx.") .clone(); @@ -94,7 +94,6 @@ pub fn verify_outbound_failure_on_lp(to: H160) { // The sender is the sender account on the gateway assert_eq!(T::Sender::get().h160(), status.from); assert_eq!(status.to.unwrap().0, to.0); - assert!(!receipt_ok(receipt)); assert!(matches!( last_event::>(), pallet_liquidity_pools_gateway_queue::Event::::MessageExecutionFailure { .. }