Skip to content

Commit

Permalink
Fix/gateway message processing (#1991)
Browse files Browse the repository at this point in the history
* lp-gateway: Update message recovery comment

* lp-gateway: Use transactional in MessageProcessor implementation

* lp-gateway: Iterate over inbound sub-messages only during execution

* lp-gateway: Add session ID check for message entries

* lp-gateway: Use defensive weight during message recovery

* lp-gateway: Make MessageProcessor implementation transactional

* lp-gateway-queue: Use  when checking servicing weight

* lp-gateway: Move import

* lp-gateway-queue: Remove extra weight check

* lp-gateway-queue: Get ordered MessageQueue keys and iterate over them

* lp-gateway: Add session ID change tests

* lp-gateway: Drop support for inbound batch messages

* lp-gateway-queue: Ensure messages are processed in order (#1992)

* lp-gateway-queue: Ensure messages are processed in order

* lp-gateway-queue: Ensure processed messages are skipped

* integration-tests: Remove receipt check

* pallet: Rename event
  • Loading branch information
cdamian authored Oct 9, 2024
1 parent 927af06 commit 25e7ea5
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 172 deletions.
2 changes: 1 addition & 1 deletion libs/traits/src/liquidity_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pub trait MessageProcessor {
/// Process a message.
fn process(msg: Self::Message) -> (DispatchResult, Weight);

/// Max weight that processing a message can take
/// Max weight that processing a message can take.
fn max_processing_weight(msg: &Self::Message) -> Weight;
}

Expand Down
85 changes: 59 additions & 26 deletions pallets/liquidity-pools-gateway-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use parity_scale_codec::FullCodec;
use scale_info::TypeInfo;
use sp_arithmetic::traits::BaseArithmetic;
use sp_runtime::traits::{EnsureAddAssign, One};
use sp_std::vec::Vec;

#[cfg(test)]
mod mock;
Expand Down Expand Up @@ -61,6 +60,12 @@ pub mod pallet {
#[pallet::getter(fn message_nonce_store)]
pub type MessageNonceStore<T: Config> = StorageValue<_, T::MessageNonce, ValueQuery>;

/// Storage that is used for keeping track of the last nonce that was
/// processed.
#[pallet::storage]
#[pallet::getter(fn last_processed_nonce)]
pub type LastProcessedNonce<T: Config> = StorageValue<_, T::MessageNonce, ValueQuery>;

/// Storage for messages that will be processed during the `on_idle` hook.
#[pallet::storage]
#[pallet::getter(fn message_queue)]
Expand Down Expand Up @@ -93,6 +98,11 @@ pub mod pallet {
message: T::Message,
error: DispatchError,
},

/// Maximum number of messages was reached.
MaxNumberOfMessagesReached {
last_processed_nonce: T::MessageNonce,
},
}

#[pallet::error]
Expand Down Expand Up @@ -200,11 +210,43 @@ pub mod pallet {
}

fn service_message_queue(max_weight: Weight) -> Weight {
let mut weight_used = Weight::zero();
let mut last_processed_nonce = LastProcessedNonce::<T>::get();

// 1 read for the last processed nonce
let mut weight_used = T::DbWeight::get().reads(1);

loop {
if last_processed_nonce.ensure_add_assign(One::one()).is_err() {
Self::deposit_event(Event::<T>::MaxNumberOfMessagesReached {
last_processed_nonce,
});

break;
}

let mut processed_entries = Vec::new();
// 1 read for the nonce
weight_used.saturating_accrue(T::DbWeight::get().reads(1));

if last_processed_nonce > MessageNonceStore::<T>::get() {
break;
}

// 1 read for the message
weight_used.saturating_accrue(T::DbWeight::get().reads(1));

let message = match MessageQueue::<T>::get(last_processed_nonce) {
Some(msg) => msg,
// No message found at this nonce, we can skip it.
None => {
LastProcessedNonce::<T>::set(last_processed_nonce);

// 1 write for setting the last processed nonce
weight_used.saturating_accrue(T::DbWeight::get().writes(1));

continue;
}
};

for (nonce, message) in MessageQueue::<T>::iter() {
let remaining_weight = max_weight.saturating_sub(weight_used);
let next_weight = T::MessageProcessor::max_processing_weight(&message);

Expand All @@ -213,37 +255,28 @@ pub mod pallet {
break;
}

let weight = match Self::process_message_and_deposit_event(nonce, message.clone()) {
(Ok(()), weight) => {
// Extra weight breakdown:
//
// 1 read for the message
// 1 write for the message removal
weight.saturating_add(T::DbWeight::get().reads_writes(1, 1))
}
let processing_weight = match Self::process_message_and_deposit_event(
last_processed_nonce,
message.clone(),
) {
(Ok(()), weight) => weight,
(Err(e), weight) => {
FailedMessageQueue::<T>::insert(nonce, (message, e));
FailedMessageQueue::<T>::insert(last_processed_nonce, (message, e));

// Extra weight breakdown:
//
// 1 read for the message
// 1 write for the failed message
// 1 write for the message removal
weight.saturating_add(T::DbWeight::get().reads_writes(1, 2))
weight.saturating_add(T::DbWeight::get().writes(1))
}
};

processed_entries.push(nonce);
weight_used.saturating_accrue(processing_weight);

weight_used = weight_used.saturating_add(weight);
MessageQueue::<T>::remove(last_processed_nonce);

if weight_used.all_gte(max_weight) {
break;
}
}
LastProcessedNonce::<T>::set(last_processed_nonce);

for entry in processed_entries {
MessageQueue::<T>::remove(entry);
// 1 write for removing the message
// 1 write for setting the last processed nonce
weight_used.saturating_accrue(T::DbWeight::get().writes(2));
}

weight_used
Expand Down
69 changes: 65 additions & 4 deletions pallets/liquidity-pools-gateway-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sp_runtime::{traits::BadOrigin, DispatchError};

use crate::{
mock::{new_test_ext, Processor, Queue, Runtime, RuntimeEvent as MockEvent, RuntimeOrigin},
Error, Event, FailedMessageQueue, MessageQueue,
Error, Event, FailedMessageQueue, LastProcessedNonce, MessageQueue,
};

mod utils {
Expand Down Expand Up @@ -181,7 +181,10 @@ mod process_failed_message {
}

mod message_queue_impl {
use sp_arithmetic::ArithmeticError::Overflow;

use super::*;
use crate::MessageNonceStore;

#[test]
fn success() {
Expand All @@ -197,6 +200,17 @@ mod message_queue_impl {
event_exists(Event::<Runtime>::MessageSubmitted { nonce, message })
});
}

#[test]
fn error_on_max_nonce() {
new_test_ext().execute_with(|| {
let message = 1;

MessageNonceStore::<Runtime>::set(u64::MAX);

assert_noop!(Queue::queue(message), Overflow);
});
}
}

mod on_idle {
Expand All @@ -209,7 +223,7 @@ mod on_idle {
#[test]
fn success_all() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));
Expand All @@ -220,13 +234,14 @@ mod on_idle {
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn not_all_messages_fit_in_the_block() {
new_test_ext().execute_with(|| {
(1..=5).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=5).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));
Expand All @@ -245,13 +260,14 @@ mod on_idle {
assert_eq!(weight, PROCESS_WEIGHT);
assert_eq!(handle.times(), 5);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 5)
});
}

#[test]
fn with_failed_messages() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|msg| match msg {
Expand All @@ -265,6 +281,51 @@ mod on_idle {
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 1);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn with_no_messages() {
new_test_ext().execute_with(|| {
let _ = Queue::on_idle(0, TOTAL_WEIGHT);

assert_eq!(LastProcessedNonce::<Runtime>::get(), 0)
});
}

#[test]
fn with_skipped_message_nonce() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));

// Manually process the 2nd nonce, the on_idle hook should skip it and process
// the remaining nonces.
assert_ok!(Queue::process_message(RuntimeOrigin::signed(1), 2));

let weight = Queue::on_idle(0, TOTAL_WEIGHT);

assert_eq!(weight, PROCESS_WEIGHT * 2);
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn max_messages() {
new_test_ext().execute_with(|| {
LastProcessedNonce::<Runtime>::set(u64::MAX);

let _ = Queue::on_idle(0, TOTAL_WEIGHT);

event_exists(Event::<Runtime>::MaxNumberOfMessagesReached {
last_processed_nonce: u64::MAX,
})
});
}
}
61 changes: 26 additions & 35 deletions pallets/liquidity-pools-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ use cfg_traits::liquidity_pools::{
OutboundMessageHandler, RouterProvider,
};
use cfg_types::domain_address::{Domain, DomainAddress};
use frame_support::{dispatch::DispatchResult, pallet_prelude::*};
use frame_support::{
dispatch::DispatchResult,
pallet_prelude::*,
storage::{with_transaction, TransactionOutcome},
};
use frame_system::pallet_prelude::{ensure_signed, OriginFor};
use message::GatewayMessage;
use orml_traits::GetByKey;
pub use pallet::*;
use parity_scale_codec::FullCodec;
use sp_arithmetic::traits::{BaseArithmetic, EnsureAddAssign, One};
use sp_runtime::SaturatedConversion;
use sp_std::convert::TryInto;

use crate::{
Expand Down Expand Up @@ -469,8 +472,8 @@ pub mod pallet {
router_ids.iter().any(|x| x == &router_id),
Error::<T>::UnknownRouter
);
// Message recovery shouldn't be supported for setups that have less than 1
// router since no proofs are required in that case.
// Message recovery shouldn't be supported for setups that have less than 2
// routers since no proofs are required in that case.
ensure!(router_ids.len() > 1, Error::<T>::NotEnoughRoutersForDomain);

let session_id = SessionIdStore::<T>::get();
Expand Down Expand Up @@ -621,45 +624,33 @@ pub mod pallet {
type Message = GatewayMessage<T::Message, T::RouterId>;

fn process(msg: Self::Message) -> (DispatchResult, Weight) {
match msg {
GatewayMessage::Inbound {
domain_address,
message,
router_id,
} => {
let mut counter = 0;

let res = Self::process_inbound_message(
// The #[transactional] macro only works for functions that return a
// `DispatchResult` therefore, we need to manually add this here.
let res = with_transaction(|| {
let res = match msg {
GatewayMessage::Inbound {
domain_address,
message,
router_id,
&mut counter,
);

let weight = match counter {
0 => LP_DEFENSIVE_WEIGHT / 10,
n => LP_DEFENSIVE_WEIGHT.saturating_mul(n),
};
} => Self::process_inbound_message(domain_address, message, router_id),
GatewayMessage::Outbound { message, router_id } => {
T::MessageSender::send(router_id, T::Sender::get(), message)
}
};

(res, weight)
if res.is_ok() {
TransactionOutcome::Commit(res)
} else {
TransactionOutcome::Rollback(res)
}
GatewayMessage::Outbound { message, router_id } => {
let res = T::MessageSender::send(router_id, T::Sender::get(), message);
});

(res, LP_DEFENSIVE_WEIGHT)
}
}
(res, LP_DEFENSIVE_WEIGHT)
}

/// Returns the max processing weight for a message, based on its
/// direction.
fn max_processing_weight(msg: &Self::Message) -> Weight {
match msg {
GatewayMessage::Inbound { message, .. } => {
LP_DEFENSIVE_WEIGHT.saturating_mul(message.submessages().len().saturated_into())
}
GatewayMessage::Outbound { .. } => LP_DEFENSIVE_WEIGHT,
}
/// Returns the maximum weight for processing one message.
fn max_processing_weight(_: &Self::Message) -> Weight {
LP_DEFENSIVE_WEIGHT
}
}

Expand Down
Loading

0 comments on commit 25e7ea5

Please sign in to comment.