From 25e7ea5010c621ceb312046afd6699e1f756a742 Mon Sep 17 00:00:00 2001 From: Cosmin Damian <17934949+cdamian@users.noreply.github.com> Date: Wed, 9 Oct 2024 16:20:53 +0300 Subject: [PATCH] Fix/gateway message processing (#1991) * lp-gateway: Update message recovery comment * lp-gateway: Use transactional in MessageProcessor implementation * lp-gateway: Iterate over inbound sub-messages only during execution * lp-gateway: Add session ID check for message entries * lp-gateway: Use defensive weight during message recovery * lp-gateway: Make MessageProcessor implementation transactional * lp-gateway-queue: Use when checking servicing weight * lp-gateway: Move import * lp-gateway-queue: Remove extra weight check * lp-gateway-queue: Get ordered MessageQueue keys and iterate over them * lp-gateway: Add session ID change tests * lp-gateway: Drop support for inbound batch messages * lp-gateway-queue: Ensure messages are processed in order (#1992) * lp-gateway-queue: Ensure messages are processed in order * lp-gateway-queue: Ensure processed messages are skipped * integration-tests: Remove receipt check * pallet: Rename event --- libs/traits/src/liquidity_pools.rs | 2 +- .../liquidity-pools-gateway-queue/src/lib.rs | 85 +++++-- .../src/tests.rs | 69 +++++- pallets/liquidity-pools-gateway/src/lib.rs | 61 ++--- .../src/message_processing.rs | 63 +++-- pallets/liquidity-pools-gateway/src/tests.rs | 225 ++++++++++++------ .../integration-tests/src/cases/lp/utils.rs | 5 +- 7 files changed, 338 insertions(+), 172 deletions(-) diff --git a/libs/traits/src/liquidity_pools.rs b/libs/traits/src/liquidity_pools.rs index 8d983d272b..f7e3eeba50 100644 --- a/libs/traits/src/liquidity_pools.rs +++ b/libs/traits/src/liquidity_pools.rs @@ -152,7 +152,7 @@ pub trait MessageProcessor { /// Process a message. fn process(msg: Self::Message) -> (DispatchResult, Weight); - /// Max weight that processing a message can take + /// Max weight that processing a message can take. fn max_processing_weight(msg: &Self::Message) -> Weight; } diff --git a/pallets/liquidity-pools-gateway-queue/src/lib.rs b/pallets/liquidity-pools-gateway-queue/src/lib.rs index ff3a45b7af..b454ce5a7b 100644 --- a/pallets/liquidity-pools-gateway-queue/src/lib.rs +++ b/pallets/liquidity-pools-gateway-queue/src/lib.rs @@ -22,7 +22,6 @@ use parity_scale_codec::FullCodec; use scale_info::TypeInfo; use sp_arithmetic::traits::BaseArithmetic; use sp_runtime::traits::{EnsureAddAssign, One}; -use sp_std::vec::Vec; #[cfg(test)] mod mock; @@ -61,6 +60,12 @@ pub mod pallet { #[pallet::getter(fn message_nonce_store)] pub type MessageNonceStore = StorageValue<_, T::MessageNonce, ValueQuery>; + /// Storage that is used for keeping track of the last nonce that was + /// processed. + #[pallet::storage] + #[pallet::getter(fn last_processed_nonce)] + pub type LastProcessedNonce = StorageValue<_, T::MessageNonce, ValueQuery>; + /// Storage for messages that will be processed during the `on_idle` hook. #[pallet::storage] #[pallet::getter(fn message_queue)] @@ -93,6 +98,11 @@ pub mod pallet { message: T::Message, error: DispatchError, }, + + /// Maximum number of messages was reached. + MaxNumberOfMessagesReached { + last_processed_nonce: T::MessageNonce, + }, } #[pallet::error] @@ -200,11 +210,43 @@ pub mod pallet { } fn service_message_queue(max_weight: Weight) -> Weight { - let mut weight_used = Weight::zero(); + let mut last_processed_nonce = LastProcessedNonce::::get(); + + // 1 read for the last processed nonce + let mut weight_used = T::DbWeight::get().reads(1); + + loop { + if last_processed_nonce.ensure_add_assign(One::one()).is_err() { + Self::deposit_event(Event::::MaxNumberOfMessagesReached { + last_processed_nonce, + }); + + break; + } - let mut processed_entries = Vec::new(); + // 1 read for the nonce + weight_used.saturating_accrue(T::DbWeight::get().reads(1)); + + if last_processed_nonce > MessageNonceStore::::get() { + break; + } + + // 1 read for the message + weight_used.saturating_accrue(T::DbWeight::get().reads(1)); + + let message = match MessageQueue::::get(last_processed_nonce) { + Some(msg) => msg, + // No message found at this nonce, we can skip it. + None => { + LastProcessedNonce::::set(last_processed_nonce); + + // 1 write for setting the last processed nonce + weight_used.saturating_accrue(T::DbWeight::get().writes(1)); + + continue; + } + }; - for (nonce, message) in MessageQueue::::iter() { let remaining_weight = max_weight.saturating_sub(weight_used); let next_weight = T::MessageProcessor::max_processing_weight(&message); @@ -213,37 +255,28 @@ pub mod pallet { break; } - let weight = match Self::process_message_and_deposit_event(nonce, message.clone()) { - (Ok(()), weight) => { - // Extra weight breakdown: - // - // 1 read for the message - // 1 write for the message removal - weight.saturating_add(T::DbWeight::get().reads_writes(1, 1)) - } + let processing_weight = match Self::process_message_and_deposit_event( + last_processed_nonce, + message.clone(), + ) { + (Ok(()), weight) => weight, (Err(e), weight) => { - FailedMessageQueue::::insert(nonce, (message, e)); + FailedMessageQueue::::insert(last_processed_nonce, (message, e)); - // Extra weight breakdown: - // - // 1 read for the message // 1 write for the failed message - // 1 write for the message removal - weight.saturating_add(T::DbWeight::get().reads_writes(1, 2)) + weight.saturating_add(T::DbWeight::get().writes(1)) } }; - processed_entries.push(nonce); + weight_used.saturating_accrue(processing_weight); - weight_used = weight_used.saturating_add(weight); + MessageQueue::::remove(last_processed_nonce); - if weight_used.all_gte(max_weight) { - break; - } - } + LastProcessedNonce::::set(last_processed_nonce); - for entry in processed_entries { - MessageQueue::::remove(entry); + // 1 write for removing the message + // 1 write for setting the last processed nonce + weight_used.saturating_accrue(T::DbWeight::get().writes(2)); } weight_used diff --git a/pallets/liquidity-pools-gateway-queue/src/tests.rs b/pallets/liquidity-pools-gateway-queue/src/tests.rs index 1f89bac710..18e56ea0b7 100644 --- a/pallets/liquidity-pools-gateway-queue/src/tests.rs +++ b/pallets/liquidity-pools-gateway-queue/src/tests.rs @@ -6,7 +6,7 @@ use sp_runtime::{traits::BadOrigin, DispatchError}; use crate::{ mock::{new_test_ext, Processor, Queue, Runtime, RuntimeEvent as MockEvent, RuntimeOrigin}, - Error, Event, FailedMessageQueue, MessageQueue, + Error, Event, FailedMessageQueue, LastProcessedNonce, MessageQueue, }; mod utils { @@ -181,7 +181,10 @@ mod process_failed_message { } mod message_queue_impl { + use sp_arithmetic::ArithmeticError::Overflow; + use super::*; + use crate::MessageNonceStore; #[test] fn success() { @@ -197,6 +200,17 @@ mod message_queue_impl { event_exists(Event::::MessageSubmitted { nonce, message }) }); } + + #[test] + fn error_on_max_nonce() { + new_test_ext().execute_with(|| { + let message = 1; + + MessageNonceStore::::set(u64::MAX); + + assert_noop!(Queue::queue(message), Overflow); + }); + } } mod on_idle { @@ -209,7 +223,7 @@ mod on_idle { #[test] fn success_all() { new_test_ext().execute_with(|| { - (1..=3).for_each(|i| MessageQueue::::insert(i as u64, i * 10)); + (1..=3).for_each(|i| Queue::queue(i * 10).unwrap()); Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT); let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT)); @@ -220,13 +234,14 @@ mod on_idle { assert_eq!(handle.times(), 3); assert_eq!(MessageQueue::::iter().count(), 0); assert_eq!(FailedMessageQueue::::iter().count(), 0); + assert_eq!(LastProcessedNonce::::get(), 3) }); } #[test] fn not_all_messages_fit_in_the_block() { new_test_ext().execute_with(|| { - (1..=5).for_each(|i| MessageQueue::::insert(i as u64, i * 10)); + (1..=5).for_each(|i| Queue::queue(i * 10).unwrap()); Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT); let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT)); @@ -245,13 +260,14 @@ mod on_idle { assert_eq!(weight, PROCESS_WEIGHT); assert_eq!(handle.times(), 5); assert_eq!(MessageQueue::::iter().count(), 0); + assert_eq!(LastProcessedNonce::::get(), 5) }); } #[test] fn with_failed_messages() { new_test_ext().execute_with(|| { - (1..=3).for_each(|i| MessageQueue::::insert(i as u64, i * 10)); + (1..=3).for_each(|i| Queue::queue(i * 10).unwrap()); Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT); let handle = Processor::mock_process(|msg| match msg { @@ -265,6 +281,51 @@ mod on_idle { assert_eq!(handle.times(), 3); assert_eq!(MessageQueue::::iter().count(), 0); assert_eq!(FailedMessageQueue::::iter().count(), 1); + assert_eq!(LastProcessedNonce::::get(), 3) + }); + } + + #[test] + fn with_no_messages() { + new_test_ext().execute_with(|| { + let _ = Queue::on_idle(0, TOTAL_WEIGHT); + + assert_eq!(LastProcessedNonce::::get(), 0) + }); + } + + #[test] + fn with_skipped_message_nonce() { + new_test_ext().execute_with(|| { + (1..=3).for_each(|i| Queue::queue(i * 10).unwrap()); + + Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT); + let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT)); + + // Manually process the 2nd nonce, the on_idle hook should skip it and process + // the remaining nonces. + assert_ok!(Queue::process_message(RuntimeOrigin::signed(1), 2)); + + let weight = Queue::on_idle(0, TOTAL_WEIGHT); + + assert_eq!(weight, PROCESS_WEIGHT * 2); + assert_eq!(handle.times(), 3); + assert_eq!(MessageQueue::::iter().count(), 0); + assert_eq!(FailedMessageQueue::::iter().count(), 0); + assert_eq!(LastProcessedNonce::::get(), 3) + }); + } + + #[test] + fn max_messages() { + new_test_ext().execute_with(|| { + LastProcessedNonce::::set(u64::MAX); + + let _ = Queue::on_idle(0, TOTAL_WEIGHT); + + event_exists(Event::::MaxNumberOfMessagesReached { + last_processed_nonce: u64::MAX, + }) }); } } diff --git a/pallets/liquidity-pools-gateway/src/lib.rs b/pallets/liquidity-pools-gateway/src/lib.rs index acd59b023e..b341e7e1e6 100644 --- a/pallets/liquidity-pools-gateway/src/lib.rs +++ b/pallets/liquidity-pools-gateway/src/lib.rs @@ -35,14 +35,17 @@ use cfg_traits::liquidity_pools::{ OutboundMessageHandler, RouterProvider, }; use cfg_types::domain_address::{Domain, DomainAddress}; -use frame_support::{dispatch::DispatchResult, pallet_prelude::*}; +use frame_support::{ + dispatch::DispatchResult, + pallet_prelude::*, + storage::{with_transaction, TransactionOutcome}, +}; use frame_system::pallet_prelude::{ensure_signed, OriginFor}; use message::GatewayMessage; use orml_traits::GetByKey; pub use pallet::*; use parity_scale_codec::FullCodec; use sp_arithmetic::traits::{BaseArithmetic, EnsureAddAssign, One}; -use sp_runtime::SaturatedConversion; use sp_std::convert::TryInto; use crate::{ @@ -469,8 +472,8 @@ pub mod pallet { router_ids.iter().any(|x| x == &router_id), Error::::UnknownRouter ); - // Message recovery shouldn't be supported for setups that have less than 1 - // router since no proofs are required in that case. + // Message recovery shouldn't be supported for setups that have less than 2 + // routers since no proofs are required in that case. ensure!(router_ids.len() > 1, Error::::NotEnoughRoutersForDomain); let session_id = SessionIdStore::::get(); @@ -621,45 +624,33 @@ pub mod pallet { type Message = GatewayMessage; fn process(msg: Self::Message) -> (DispatchResult, Weight) { - match msg { - GatewayMessage::Inbound { - domain_address, - message, - router_id, - } => { - let mut counter = 0; - - let res = Self::process_inbound_message( + // The #[transactional] macro only works for functions that return a + // `DispatchResult` therefore, we need to manually add this here. + let res = with_transaction(|| { + let res = match msg { + GatewayMessage::Inbound { domain_address, message, router_id, - &mut counter, - ); - - let weight = match counter { - 0 => LP_DEFENSIVE_WEIGHT / 10, - n => LP_DEFENSIVE_WEIGHT.saturating_mul(n), - }; + } => Self::process_inbound_message(domain_address, message, router_id), + GatewayMessage::Outbound { message, router_id } => { + T::MessageSender::send(router_id, T::Sender::get(), message) + } + }; - (res, weight) + if res.is_ok() { + TransactionOutcome::Commit(res) + } else { + TransactionOutcome::Rollback(res) } - GatewayMessage::Outbound { message, router_id } => { - let res = T::MessageSender::send(router_id, T::Sender::get(), message); + }); - (res, LP_DEFENSIVE_WEIGHT) - } - } + (res, LP_DEFENSIVE_WEIGHT) } - /// Returns the max processing weight for a message, based on its - /// direction. - fn max_processing_weight(msg: &Self::Message) -> Weight { - match msg { - GatewayMessage::Inbound { message, .. } => { - LP_DEFENSIVE_WEIGHT.saturating_mul(message.submessages().len().saturated_into()) - } - GatewayMessage::Outbound { .. } => LP_DEFENSIVE_WEIGHT, - } + /// Returns the maximum weight for processing one message. + fn max_processing_weight(_: &Self::Message) -> Weight { + LP_DEFENSIVE_WEIGHT } } diff --git a/pallets/liquidity-pools-gateway/src/message_processing.rs b/pallets/liquidity-pools-gateway/src/message_processing.rs index 5d681f41bd..2e71f9e103 100644 --- a/pallets/liquidity-pools-gateway/src/message_processing.rs +++ b/pallets/liquidity-pools-gateway/src/message_processing.rs @@ -1,6 +1,5 @@ use cfg_traits::liquidity_pools::{ - InboundMessageHandler, LpMessageBatch, LpMessageHash, LpMessageProof, MessageHash, - MessageQueue, RouterProvider, + InboundMessageHandler, LpMessageHash, LpMessageProof, MessageHash, MessageQueue, RouterProvider, }; use cfg_types::domain_address::{Domain, DomainAddress}; use frame_support::{ @@ -333,7 +332,11 @@ impl Pallet { // we can return. None => return Ok(()), Some(stored_inbound_entry) => match stored_inbound_entry { - InboundEntry::Message(message_entry) => message = Some(message_entry.message), + InboundEntry::Message(message_entry) + if message_entry.session_id == session_id => + { + message = Some(message_entry.message) + } InboundEntry::Proof(proof_entry) if proof_entry.has_valid_vote_for_session(session_id) => { @@ -349,10 +352,10 @@ impl Pallet { } if let Some(msg) = message { - Self::execute_post_voting_dispatch(message_hash, router_ids, expected_proof_count)?; - T::InboundMessageHandler::handle(domain_address.clone(), msg)?; + Self::execute_post_voting_dispatch(message_hash, router_ids, expected_proof_count)?; + Self::deposit_event(Event::::InboundMessageExecuted { domain_address, message_hash, @@ -401,42 +404,36 @@ impl Pallet { domain_address: DomainAddress, message: T::Message, router_id: T::RouterId, - counter: &mut u64, ) -> DispatchResult { let router_ids = Self::get_router_ids_for_domain(domain_address.domain())?; let session_id = SessionIdStore::::get(); let expected_proof_count = Self::get_expected_proof_count(&router_ids)?; + let message_hash = message.get_message_hash(); + let inbound_entry: InboundEntry = InboundEntry::create( + message.clone(), + session_id, + domain_address.clone(), + expected_proof_count, + ); - for submessage in message.submessages() { - counter.ensure_add_assign(1)?; + inbound_entry.validate(&router_ids, &router_id.clone())?; - let message_hash = submessage.get_message_hash(); + Self::upsert_pending_entry(message_hash, &router_id, inbound_entry)?; - let inbound_entry: InboundEntry = InboundEntry::create( - submessage.clone(), - session_id, - domain_address.clone(), - expected_proof_count, - ); - - inbound_entry.validate(&router_ids, &router_id.clone())?; - Self::upsert_pending_entry(message_hash, &router_id, inbound_entry)?; - - Self::deposit_processing_event( - domain_address.clone(), - submessage, - message_hash, - router_id.clone(), - ); + Self::deposit_processing_event( + domain_address.clone(), + message, + message_hash, + router_id.clone(), + ); - Self::execute_if_requirements_are_met( - message_hash, - &router_ids, - session_id, - expected_proof_count, - domain_address.clone(), - )?; - } + Self::execute_if_requirements_are_met( + message_hash, + &router_ids, + session_id, + expected_proof_count, + domain_address.clone(), + )?; Ok(()) } diff --git a/pallets/liquidity-pools-gateway/src/tests.rs b/pallets/liquidity-pools-gateway/src/tests.rs index 4c24b5d31b..59597298c1 100644 --- a/pallets/liquidity-pools-gateway/src/tests.rs +++ b/pallets/liquidity-pools-gateway/src/tests.rs @@ -1,9 +1,7 @@ use std::collections::HashMap; use cfg_primitives::LP_DEFENSIVE_WEIGHT; -use cfg_traits::liquidity_pools::{ - LpMessageHash, LpMessageSerializer, MessageProcessor, OutboundMessageHandler, -}; +use cfg_traits::liquidity_pools::{LpMessageHash, MessageProcessor, OutboundMessageHandler}; use cfg_types::domain_address::*; use frame_support::{assert_err, assert_noop, assert_ok}; use itertools::Itertools; @@ -14,10 +12,6 @@ use sp_runtime::{ DispatchError, DispatchError::{Arithmetic, BadOrigin}, }; -use sp_std::sync::{ - atomic::{AtomicU32, Ordering}, - Arc, -}; use super::{ mock::{RuntimeEvent as MockEvent, *}, @@ -451,68 +445,6 @@ mod extrinsics { ); }); } - - #[test] - fn process_inbound() { - new_test_ext().execute_with(|| { - let address = H160::from_slice(&get_test_account_id().as_slice()[..20]); - let domain_address = DomainAddress::Evm(TEST_EVM_CHAIN, address); - - let router_id_1 = ROUTER_ID_1; - - Routers::::set(BoundedVec::try_from(vec![router_id_1]).unwrap()); - SessionIdStore::::set(1); - - let handler = MockLiquidityPools::mock_handle(|_, _| Ok(())); - - let submessage_count = 5; - - let (result, weight) = LiquidityPoolsGateway::process(GatewayMessage::Inbound { - domain_address, - message: Message::deserialize(&(1..=submessage_count).collect::>()) - .unwrap(), - router_id: ROUTER_ID_1, - }); - - let expected_weight = LP_DEFENSIVE_WEIGHT.saturating_mul(submessage_count.into()); - - assert_ok!(result); - assert_eq!(weight, expected_weight); - assert_eq!(handler.times(), submessage_count as u32); - }); - } - - #[test] - fn process_inbound_with_errors() { - new_test_ext().execute_with(|| { - let address = H160::from_slice(&get_test_account_id().as_slice()[..20]); - let domain_address = DomainAddress::Evm(1, address); - - let router_id_1 = ROUTER_ID_1; - - Routers::::set(BoundedVec::try_from(vec![router_id_1]).unwrap()); - SessionIdStore::::set(1); - - let counter = Arc::new(AtomicU32::new(0)); - - let handler = MockLiquidityPools::mock_handle(move |_, _| { - match counter.fetch_add(1, Ordering::Relaxed) { - 2 => Err(DispatchError::Unavailable), - _ => Ok(()), - } - }); - - let (result, _) = LiquidityPoolsGateway::process(GatewayMessage::Inbound { - domain_address, - message: Message::deserialize(&(1..=5).collect::>()).unwrap(), - router_id: ROUTER_ID_1, - }); - - assert_err!(result, DispatchError::Unavailable); - // 2 correct messages and 1 failed message processed. - assert_eq!(handler.times(), 3); - }); - } } mod execute_message_recovery { @@ -1108,7 +1040,6 @@ mod implementations { mod inbound { use super::*; - #[macro_use] mod util { use super::*; @@ -2105,6 +2036,160 @@ mod implementations { assert_noop!(res, Error::::ProofNotExpectedFromFirstRouter); }); } + + #[test] + fn storage_rollback_on_failure() { + new_test_ext().execute_with(|| { + Routers::::set( + BoundedVec::<_, _>::try_from(vec![ROUTER_ID_1, ROUTER_ID_2]) + .unwrap(), + ); + SessionIdStore::::set(1); + + let err = DispatchError::Unavailable; + + MockLiquidityPools::mock_handle(move |_, _| Err(err)); + + let (res, _) = + LiquidityPoolsGateway::process(GatewayMessage::Inbound { + domain_address: TEST_DOMAIN_ADDRESS, + message: Message::Simple, + router_id: ROUTER_ID_1, + }); + assert_ok!(res); + assert!(PendingInboundEntries::::get( + MESSAGE_HASH, + ROUTER_ID_1 + ) + .is_some()); + + let (res, _) = + LiquidityPoolsGateway::process(GatewayMessage::Inbound { + domain_address: TEST_DOMAIN_ADDRESS, + message: Message::Proof(MESSAGE_HASH), + router_id: ROUTER_ID_2, + }); + assert_noop!(res, err); + assert!(PendingInboundEntries::::get( + MESSAGE_HASH, + ROUTER_ID_1 + ) + .is_some()); + assert!(PendingInboundEntries::::get( + MESSAGE_HASH, + ROUTER_ID_2 + ) + .is_none()); + }); + } + } + + mod session_id_change { + use super::*; + + #[derive(Debug)] + enum TestAction { + SetRouters(Vec), + ProcessMessage(GatewayMessage), + } + + #[test] + fn no_execution_after_session_change() { + let tests = vec![ + vec![ + TestAction::SetRouters(vec![ROUTER_ID_1, ROUTER_ID_2]), + TestAction::ProcessMessage(GatewayMessage::Inbound { + domain_address: TEST_DOMAIN_ADDRESS, + message: Message::Simple, + router_id: ROUTER_ID_1, + }), + TestAction::SetRouters(vec![ROUTER_ID_1, ROUTER_ID_2]), + TestAction::ProcessMessage(GatewayMessage::Inbound { + domain_address: TEST_DOMAIN_ADDRESS, + message: Message::Proof(MESSAGE_HASH), + router_id: ROUTER_ID_2, + }), + ], + vec![ + TestAction::SetRouters(vec![ROUTER_ID_1, ROUTER_ID_2]), + TestAction::ProcessMessage(GatewayMessage::Inbound { + domain_address: TEST_DOMAIN_ADDRESS, + message: Message::Proof(MESSAGE_HASH), + router_id: ROUTER_ID_2, + }), + TestAction::SetRouters(vec![ROUTER_ID_1, ROUTER_ID_2]), + TestAction::ProcessMessage(GatewayMessage::Inbound { + domain_address: TEST_DOMAIN_ADDRESS, + message: Message::Simple, + router_id: ROUTER_ID_1, + }), + ], + vec![ + TestAction::SetRouters(vec![ROUTER_ID_1, ROUTER_ID_2]), + TestAction::ProcessMessage(GatewayMessage::Inbound { + domain_address: TEST_DOMAIN_ADDRESS, + message: Message::Simple, + router_id: ROUTER_ID_1, + }), + TestAction::SetRouters(vec![ROUTER_ID_1, ROUTER_ID_2, ROUTER_ID_3]), + TestAction::ProcessMessage(GatewayMessage::Inbound { + domain_address: TEST_DOMAIN_ADDRESS, + message: Message::Proof(MESSAGE_HASH), + router_id: ROUTER_ID_2, + }), + TestAction::ProcessMessage(GatewayMessage::Inbound { + domain_address: TEST_DOMAIN_ADDRESS, + message: Message::Proof(MESSAGE_HASH), + router_id: ROUTER_ID_3, + }), + ], + vec![ + TestAction::SetRouters(vec![ROUTER_ID_1, ROUTER_ID_2]), + TestAction::ProcessMessage(GatewayMessage::Inbound { + domain_address: TEST_DOMAIN_ADDRESS, + message: Message::Proof(MESSAGE_HASH), + router_id: ROUTER_ID_2, + }), + TestAction::SetRouters(vec![ROUTER_ID_1, ROUTER_ID_2, ROUTER_ID_3]), + TestAction::ProcessMessage(GatewayMessage::Inbound { + domain_address: TEST_DOMAIN_ADDRESS, + message: Message::Simple, + router_id: ROUTER_ID_1, + }), + TestAction::ProcessMessage(GatewayMessage::Inbound { + domain_address: TEST_DOMAIN_ADDRESS, + message: Message::Proof(MESSAGE_HASH), + router_id: ROUTER_ID_3, + }), + ], + ]; + + for test in tests { + println!("Executing session id change test for - {:?}", test); + + new_test_ext().execute_with(|| { + for action in test { + let mock_handler = + MockLiquidityPools::mock_handle(move |_, _| Ok(())); + + match action { + TestAction::SetRouters(routers) => { + assert_ok!(LiquidityPoolsGateway::set_routers( + RuntimeOrigin::root(), + BoundedVec::<_, _>::try_from(routers).unwrap(), + )); + } + TestAction::ProcessMessage(message) => { + let (res, _) = LiquidityPoolsGateway::process(message); + assert_ok!(res); + } + } + + assert_eq!(mock_handler.times(), 0) + } + }); + } + } } } diff --git a/runtime/integration-tests/src/cases/lp/utils.rs b/runtime/integration-tests/src/cases/lp/utils.rs index c7d5c72121..b5dee39b4a 100644 --- a/runtime/integration-tests/src/cases/lp/utils.rs +++ b/runtime/integration-tests/src/cases/lp/utils.rs @@ -33,7 +33,7 @@ use staging_xcm::{ use crate::{ cases::lp::{EVM_DOMAIN_CHAIN_ID, EVM_ROUTER_ID, POOL_A, POOL_B, POOL_C}, config::Runtime, - utils::{accounts::Keyring, evm::receipt_ok, last_event, pool::get_tranche_ids}, + utils::{accounts::Keyring, last_event, pool::get_tranche_ids}, }; /// Returns the local representation of a remote ethereum account @@ -86,7 +86,7 @@ pub fn pool_c_tranche_1_id() -> TrancheId { } pub fn verify_outbound_failure_on_lp(to: H160) { - let (_tx, status, receipt) = pallet_ethereum::Pending::::get() + let (_tx, status, _receipt) = pallet_ethereum::Pending::::get() .last() .expect("Queue triggered evm tx.") .clone(); @@ -94,7 +94,6 @@ pub fn verify_outbound_failure_on_lp(to: H160) { // The sender is the sender account on the gateway assert_eq!(T::Sender::get().h160(), status.from); assert_eq!(status.to.unwrap().0, to.0); - assert!(!receipt_ok(receipt)); assert!(matches!( last_event::>(), pallet_liquidity_pools_gateway_queue::Event::::MessageExecutionFailure { .. }