Skip to content

Commit

Permalink
lp-gateway-queue: Ensure messages are processed in order (#1992)
Browse files Browse the repository at this point in the history
* lp-gateway-queue: Ensure messages are processed in order

* lp-gateway-queue: Ensure processed messages are skipped

* integration-tests: Remove receipt check
  • Loading branch information
cdamian authored Oct 7, 2024
1 parent ef843b2 commit a437056
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 19 deletions.
66 changes: 54 additions & 12 deletions pallets/liquidity-pools-gateway-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use parity_scale_codec::FullCodec;
use scale_info::TypeInfo;
use sp_arithmetic::traits::BaseArithmetic;
use sp_runtime::traits::{EnsureAddAssign, One};
use sp_std::vec::Vec;

#[cfg(test)]
mod mock;
Expand Down Expand Up @@ -61,6 +60,12 @@ pub mod pallet {
#[pallet::getter(fn message_nonce_store)]
pub type MessageNonceStore<T: Config> = StorageValue<_, T::MessageNonce, ValueQuery>;

/// Storage that is used for keeping track of the last nonce that was
/// processed.
#[pallet::storage]
#[pallet::getter(fn last_processed_nonce)]
pub type LastProcessedNonce<T: Config> = StorageValue<_, T::MessageNonce, ValueQuery>;

/// Storage for messages that will be processed during the `on_idle` hook.
#[pallet::storage]
#[pallet::getter(fn message_queue)]
Expand Down Expand Up @@ -93,6 +98,11 @@ pub mod pallet {
message: T::Message,
error: DispatchError,
},

/// Maximum number of messages was reached.
MaxNumberOfMessagesWasReached {
last_processed_nonce: T::MessageNonce,
},
}

#[pallet::error]
Expand Down Expand Up @@ -200,17 +210,43 @@ pub mod pallet {
}

fn service_message_queue(max_weight: Weight) -> Weight {
let mut weight_used = Weight::zero();
let mut last_processed_nonce = LastProcessedNonce::<T>::get();

// 1 read for the last processed nonce
let mut weight_used = T::DbWeight::get().reads(1);

let mut nonces = MessageQueue::<T>::iter_keys().collect::<Vec<_>>();
nonces.sort();
loop {
if last_processed_nonce.ensure_add_assign(One::one()).is_err() {
Self::deposit_event(Event::<T>::MaxNumberOfMessagesWasReached {
last_processed_nonce,
});

for nonce in nonces {
let message =
MessageQueue::<T>::get(nonce).expect("valid nonce ensured by `iter_keys`");
break;
}

// 1 read for the nonce
weight_used.saturating_accrue(T::DbWeight::get().reads(1));

if last_processed_nonce > MessageNonceStore::<T>::get() {
break;
}

// 1 read for the message
weight_used.saturating_accrue(T::DbWeight::get().reads(1));

let message = match MessageQueue::<T>::get(last_processed_nonce) {
Some(msg) => msg,
// No message found at this nonce, we can skip it.
None => {
LastProcessedNonce::<T>::set(last_processed_nonce);

// 1 write for setting the last processed nonce
weight_used.saturating_accrue(T::DbWeight::get().writes(1));

continue;
}
};

let remaining_weight = max_weight.saturating_sub(weight_used);
let next_weight = T::MessageProcessor::max_processing_weight(&message);

Expand All @@ -219,22 +255,28 @@ pub mod pallet {
break;
}

let weight = match Self::process_message_and_deposit_event(nonce, message.clone()) {
let processing_weight = match Self::process_message_and_deposit_event(
last_processed_nonce,
message.clone(),
) {
(Ok(()), weight) => weight,
(Err(e), weight) => {
FailedMessageQueue::<T>::insert(nonce, (message, e));
FailedMessageQueue::<T>::insert(last_processed_nonce, (message, e));

// 1 write for the failed message
weight.saturating_add(T::DbWeight::get().writes(1))
}
};

weight_used.saturating_accrue(weight);
weight_used.saturating_accrue(processing_weight);

MessageQueue::<T>::remove(last_processed_nonce);

MessageQueue::<T>::remove(nonce);
LastProcessedNonce::<T>::set(last_processed_nonce);

// 1 write for removing the message
weight_used.saturating_accrue(T::DbWeight::get().writes(1));
// 1 write for setting the last processed nonce
weight_used.saturating_accrue(T::DbWeight::get().writes(2));
}

weight_used
Expand Down
69 changes: 65 additions & 4 deletions pallets/liquidity-pools-gateway-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sp_runtime::{traits::BadOrigin, DispatchError};

use crate::{
mock::{new_test_ext, Processor, Queue, Runtime, RuntimeEvent as MockEvent, RuntimeOrigin},
Error, Event, FailedMessageQueue, MessageQueue,
Error, Event, FailedMessageQueue, LastProcessedNonce, MessageQueue,
};

mod utils {
Expand Down Expand Up @@ -181,7 +181,10 @@ mod process_failed_message {
}

mod message_queue_impl {
use sp_arithmetic::ArithmeticError::Overflow;

use super::*;
use crate::MessageNonceStore;

#[test]
fn success() {
Expand All @@ -197,6 +200,17 @@ mod message_queue_impl {
event_exists(Event::<Runtime>::MessageSubmitted { nonce, message })
});
}

#[test]
fn error_on_max_nonce() {
new_test_ext().execute_with(|| {
let message = 1;

MessageNonceStore::<Runtime>::set(u64::MAX);

assert_noop!(Queue::queue(message), Overflow);
});
}
}

mod on_idle {
Expand All @@ -209,7 +223,7 @@ mod on_idle {
#[test]
fn success_all() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));
Expand All @@ -220,13 +234,14 @@ mod on_idle {
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn not_all_messages_fit_in_the_block() {
new_test_ext().execute_with(|| {
(1..=5).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=5).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));
Expand All @@ -245,13 +260,14 @@ mod on_idle {
assert_eq!(weight, PROCESS_WEIGHT);
assert_eq!(handle.times(), 5);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 5)
});
}

#[test]
fn with_failed_messages() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|msg| match msg {
Expand All @@ -265,6 +281,51 @@ mod on_idle {
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 1);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn with_no_messages() {
new_test_ext().execute_with(|| {
let _ = Queue::on_idle(0, TOTAL_WEIGHT);

assert_eq!(LastProcessedNonce::<Runtime>::get(), 0)
});
}

#[test]
fn with_skipped_message_nonce() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));

// Manually process the 2nd nonce, the on_idle hook should skip it and process
// the remaining nonces.
assert_ok!(Queue::process_message(RuntimeOrigin::signed(1), 2));

let weight = Queue::on_idle(0, TOTAL_WEIGHT);

assert_eq!(weight, PROCESS_WEIGHT * 2);
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn max_messages() {
new_test_ext().execute_with(|| {
LastProcessedNonce::<Runtime>::set(u64::MAX);

let _ = Queue::on_idle(0, TOTAL_WEIGHT);

event_exists(Event::<Runtime>::MaxNumberOfMessagesWasReached {
last_processed_nonce: u64::MAX,
})
});
}
}
5 changes: 2 additions & 3 deletions runtime/integration-tests/src/cases/lp/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use staging_xcm::{
use crate::{
cases::lp::{EVM_DOMAIN_CHAIN_ID, EVM_ROUTER_ID, POOL_A, POOL_B, POOL_C},
config::Runtime,
utils::{accounts::Keyring, evm::receipt_ok, last_event, pool::get_tranche_ids},
utils::{accounts::Keyring, last_event, pool::get_tranche_ids},
};

/// Returns the local representation of a remote ethereum account
Expand Down Expand Up @@ -86,15 +86,14 @@ pub fn pool_c_tranche_1_id<T: Runtime>() -> TrancheId {
}

pub fn verify_outbound_failure_on_lp<T: Runtime>(to: H160) {
let (_tx, status, receipt) = pallet_ethereum::Pending::<T>::get()
let (_tx, status, _receipt) = pallet_ethereum::Pending::<T>::get()
.last()
.expect("Queue triggered evm tx.")
.clone();

// The sender is the sender account on the gateway
assert_eq!(T::Sender::get().h160(), status.from);
assert_eq!(status.to.unwrap().0, to.0);
assert!(!receipt_ok(receipt));
assert!(matches!(
last_event::<T, pallet_liquidity_pools_gateway_queue::Event::<T>>(),
pallet_liquidity_pools_gateway_queue::Event::<T>::MessageExecutionFailure { .. }
Expand Down

0 comments on commit a437056

Please sign in to comment.