From 1bd7b3cd3a5deb0338cec526155ffbdbe2e3431c Mon Sep 17 00:00:00 2001 From: shamardy Date: Mon, 4 Nov 2024 18:03:45 +0200 Subject: [PATCH 1/4] unify taker fee validation retry logic between maker and watcher --- mm2src/mm2_main/src/lp_swap.rs | 5 ++- mm2src/mm2_main/src/lp_swap/maker_swap.rs | 7 ++-- mm2src/mm2_main/src/lp_swap/swap_watcher.rs | 44 ++++++++++++--------- 3 files changed, 34 insertions(+), 22 deletions(-) diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 7692503c18..09e4de99b4 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -151,8 +151,11 @@ pub const TX_HELPER_PREFIX: TopicPrefix = "txhlp"; pub(crate) const LEGACY_SWAP_TYPE: u8 = 0; pub(crate) const MAKER_SWAP_V2_TYPE: u8 = 1; pub(crate) const TAKER_SWAP_V2_TYPE: u8 = 2; -const MAX_STARTED_AT_DIFF: u64 = 60; +pub(crate) const TAKER_FEE_VALIDATION_ATTEMPTS: usize = 6; +pub(crate) const TAKER_FEE_VALIDATION_RETRY_DELAY: f64 = 10.; + +const MAX_STARTED_AT_DIFF: u64 = 60; const NEGOTIATE_SEND_INTERVAL: f64 = 30.; /// If a certain P2P message is not received, swap will be aborted after this time expires. diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap.rs b/mm2src/mm2_main/src/lp_swap/maker_swap.rs index 84c1bbc6aa..ace9a4c8a9 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap.rs @@ -9,7 +9,8 @@ use super::{broadcast_my_swap_status, broadcast_p2p_tx_msg, broadcast_swap_msg_e wait_for_maker_payment_conf_until, AtomicSwap, LockedAmount, MySwapInfo, NegotiationDataMsg, NegotiationDataV2, NegotiationDataV3, RecoveredSwap, RecoveredSwapAction, SavedSwap, SavedSwapIo, SavedTradeFee, SecretHashAlgo, SwapConfirmationsSettings, SwapError, SwapMsg, SwapPubkeys, SwapTxDataMsg, - SwapsContext, TransactionIdentifier, INCLUDE_REFUND_FEE, NO_REFUND_FEE, WAIT_CONFIRM_INTERVAL_SEC}; + SwapsContext, TransactionIdentifier, INCLUDE_REFUND_FEE, NO_REFUND_FEE, TAKER_FEE_VALIDATION_ATTEMPTS, + TAKER_FEE_VALIDATION_RETRY_DELAY, WAIT_CONFIRM_INTERVAL_SEC}; use crate::lp_dispatcher::{DispatcherContext, LpEvents}; use crate::lp_network::subscribe_to_topic; use crate::lp_ordermatch::MakerOrderBuilder; @@ -771,13 +772,13 @@ impl MakerSwap { { Ok(_) => break, Err(err) => { - if attempts >= 6 { + if attempts >= TAKER_FEE_VALIDATION_ATTEMPTS { return Ok((Some(MakerSwapCommand::Finish), vec![ MakerSwapEvent::TakerFeeValidateFailed(ERRL!("{}", err).into()), ])); } else { attempts += 1; - Timer::sleep(10.).await; + Timer::sleep(TAKER_FEE_VALIDATION_RETRY_DELAY).await; } }, }; diff --git a/mm2src/mm2_main/src/lp_swap/swap_watcher.rs b/mm2src/mm2_main/src/lp_swap/swap_watcher.rs index 16e3712f16..017fd0bf3b 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_watcher.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_watcher.rs @@ -1,5 +1,6 @@ use super::{broadcast_p2p_tx_msg, get_payment_locktime, lp_coinfind, taker_payment_spend_deadline, tx_helper_topic, - H256Json, SwapsContext, WAIT_CONFIRM_INTERVAL_SEC}; + H256Json, SwapsContext, TAKER_FEE_VALIDATION_ATTEMPTS, TAKER_FEE_VALIDATION_RETRY_DELAY, + WAIT_CONFIRM_INTERVAL_SEC}; use crate::lp_network::{P2PRequestError, P2PRequestResult}; use crate::MmError; @@ -181,24 +182,31 @@ impl State for ValidateTakerFee { async fn on_changed(self: Box, watcher_ctx: &mut WatcherStateMachine) -> StateResult { debug!("Watcher validate taker fee"); - let validated_f = watcher_ctx - .taker_coin - .watcher_validate_taker_fee(WatcherValidateTakerFeeInput { - taker_fee_hash: watcher_ctx.data.taker_fee_hash.clone(), - sender_pubkey: watcher_ctx.verified_pub.clone(), - min_block_number: watcher_ctx.data.taker_coin_start_block, - fee_addr: DEX_FEE_ADDR_RAW_PUBKEY.clone(), - lock_duration: watcher_ctx.data.lock_duration, - }) - .compat(); - - if let Err(err) = validated_f.await { - return Self::change_state(Stopped::from_reason(StopReason::Error( - WatcherError::InvalidTakerFee(format!("{:?}", err)).into(), - ))); - }; - Self::change_state(ValidateTakerPayment {}) + let validation_result = retry_on_err!(async { + watcher_ctx + .taker_coin + .watcher_validate_taker_fee(WatcherValidateTakerFeeInput { + taker_fee_hash: watcher_ctx.data.taker_fee_hash.clone(), + sender_pubkey: watcher_ctx.verified_pub.clone(), + min_block_number: watcher_ctx.data.taker_coin_start_block, + fee_addr: DEX_FEE_ADDR_RAW_PUBKEY.clone(), + lock_duration: watcher_ctx.data.lock_duration, + }) + .compat() + .await + }) + .repeat_every_secs(TAKER_FEE_VALIDATION_RETRY_DELAY) + .attempts(TAKER_FEE_VALIDATION_ATTEMPTS) + .inspect_err(|e| error!("Error validating taker fee: {}", e)) + .await; + + match validation_result { + Ok(_) => Self::change_state(ValidateTakerPayment {}), + Err(repeat_err) => Self::change_state(Stopped::from_reason(StopReason::Error( + WatcherError::InvalidTakerFee(repeat_err.to_string()).into(), + ))), + } } } From b7781b3e053722bc653a963ca443656e46e7e1c7 Mon Sep 17 00:00:00 2001 From: shamardy Date: Fri, 8 Nov 2024 22:14:00 +0200 Subject: [PATCH 2/4] fix `TX_HELPER` message processing and propagation --- mm2src/mm2_main/src/lp_network.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 08ae5f8b3e..d1587b3596 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -190,15 +190,16 @@ async fn process_p2p_message( to_propagate = true; }, Some(lp_swap::TX_HELPER_PREFIX) => { - if let Some(pair) = split.next() { - if let Ok(Some(coin)) = lp_coinfind(&ctx, pair).await { + if let Some(ticker) = split.next() { + if let Ok(Some(coin)) = lp_coinfind(&ctx, ticker).await { if let Err(e) = coin.tx_enum_from_bytes(&message.data) { log::error!("Message cannot continue the process due to: {:?}", e); return; }; - let fut = coin.send_raw_tx_bytes(&message.data); - ctx.spawner().spawn(async { + if coin.is_utxo_in_native_mode() { + let fut = coin.send_raw_tx_bytes(&message.data); + ctx.spawner().spawn(async { match fut.compat().await { Ok(id) => log::debug!("Transaction broadcasted successfully: {:?} ", id), // TODO (After https://github.com/KomodoPlatform/atomicDEX-API/pull/1433) @@ -207,7 +208,10 @@ async fn process_p2p_message( Err(e) => log::error!("Broadcast transaction failed (ignore this error if the transaction already sent by another seednode). {}", e), }; }) + } } + + to_propagate = true; } }, Some(lp_healthcheck::PEER_HEALTHCHECK_PREFIX) => { From 515f6ae44fee19651083d71bde06363a8a98925e Mon Sep 17 00:00:00 2001 From: shamardy Date: Wed, 13 Nov 2024 18:41:58 +0200 Subject: [PATCH 3/4] review fix: rename const to `TAKER_FEE_VALIDATION_RETRY_DELAY_SECS` --- mm2src/coins/utxo/rpc_clients/electrum_rpc/connection.rs | 8 ++++++-- mm2src/mm2_main/src/lp_swap.rs | 2 +- mm2src/mm2_main/src/lp_swap/maker_swap.rs | 4 ++-- mm2src/mm2_main/src/lp_swap/swap_watcher.rs | 4 ++-- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection.rs index 2b9a3ada48..ca7a4bac60 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection.rs @@ -386,11 +386,15 @@ impl ElectrumConnection { }; let Some(dns_name) = uri.host().map(String::from) else { - return Err(ElectrumConnectionErr::Irrecoverable("Couldn't retrieve host from address".to_string())); + return Err(ElectrumConnectionErr::Irrecoverable( + "Couldn't retrieve host from address".to_string(), + )); }; let Ok(dns) = server_name_from_domain(dns_name.as_str()) else { - return Err(ElectrumConnectionErr::Irrecoverable("Address isn't a valid domain name".to_string())); + return Err(ElectrumConnectionErr::Irrecoverable( + "Address isn't a valid domain name".to_string(), + )); }; let tls_connector = if connection.settings.disable_cert_verification { diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 09e4de99b4..0acb7fc443 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -153,7 +153,7 @@ pub(crate) const MAKER_SWAP_V2_TYPE: u8 = 1; pub(crate) const TAKER_SWAP_V2_TYPE: u8 = 2; pub(crate) const TAKER_FEE_VALIDATION_ATTEMPTS: usize = 6; -pub(crate) const TAKER_FEE_VALIDATION_RETRY_DELAY: f64 = 10.; +pub(crate) const TAKER_FEE_VALIDATION_RETRY_DELAY_SECS: f64 = 10.; const MAX_STARTED_AT_DIFF: u64 = 60; const NEGOTIATE_SEND_INTERVAL: f64 = 30.; diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap.rs b/mm2src/mm2_main/src/lp_swap/maker_swap.rs index ace9a4c8a9..0eb72b8a71 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap.rs @@ -10,7 +10,7 @@ use super::{broadcast_my_swap_status, broadcast_p2p_tx_msg, broadcast_swap_msg_e NegotiationDataV2, NegotiationDataV3, RecoveredSwap, RecoveredSwapAction, SavedSwap, SavedSwapIo, SavedTradeFee, SecretHashAlgo, SwapConfirmationsSettings, SwapError, SwapMsg, SwapPubkeys, SwapTxDataMsg, SwapsContext, TransactionIdentifier, INCLUDE_REFUND_FEE, NO_REFUND_FEE, TAKER_FEE_VALIDATION_ATTEMPTS, - TAKER_FEE_VALIDATION_RETRY_DELAY, WAIT_CONFIRM_INTERVAL_SEC}; + TAKER_FEE_VALIDATION_RETRY_DELAY_SECS, WAIT_CONFIRM_INTERVAL_SEC}; use crate::lp_dispatcher::{DispatcherContext, LpEvents}; use crate::lp_network::subscribe_to_topic; use crate::lp_ordermatch::MakerOrderBuilder; @@ -778,7 +778,7 @@ impl MakerSwap { ])); } else { attempts += 1; - Timer::sleep(TAKER_FEE_VALIDATION_RETRY_DELAY).await; + Timer::sleep(TAKER_FEE_VALIDATION_RETRY_DELAY_SECS).await; } }, }; diff --git a/mm2src/mm2_main/src/lp_swap/swap_watcher.rs b/mm2src/mm2_main/src/lp_swap/swap_watcher.rs index 017fd0bf3b..74440f4d5d 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_watcher.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_watcher.rs @@ -1,5 +1,5 @@ use super::{broadcast_p2p_tx_msg, get_payment_locktime, lp_coinfind, taker_payment_spend_deadline, tx_helper_topic, - H256Json, SwapsContext, TAKER_FEE_VALIDATION_ATTEMPTS, TAKER_FEE_VALIDATION_RETRY_DELAY, + H256Json, SwapsContext, TAKER_FEE_VALIDATION_ATTEMPTS, TAKER_FEE_VALIDATION_RETRY_DELAY_SECS, WAIT_CONFIRM_INTERVAL_SEC}; use crate::lp_network::{P2PRequestError, P2PRequestResult}; @@ -196,7 +196,7 @@ impl State for ValidateTakerFee { .compat() .await }) - .repeat_every_secs(TAKER_FEE_VALIDATION_RETRY_DELAY) + .repeat_every_secs(TAKER_FEE_VALIDATION_RETRY_DELAY_SECS) .attempts(TAKER_FEE_VALIDATION_ATTEMPTS) .inspect_err(|e| error!("Error validating taker fee: {}", e)) .await; From b81bfc6381da0621de3ca81bb7d9788dd9e27c8c Mon Sep 17 00:00:00 2001 From: shamardy Date: Wed, 13 Nov 2024 18:53:50 +0200 Subject: [PATCH 4/4] review fix: propagate healthcheck message after validation --- mm2src/mm2_main/src/lp_healthcheck.rs | 27 +++++++++++++-------------- mm2src/mm2_main/src/lp_network.rs | 8 +++++++- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/mm2src/mm2_main/src/lp_healthcheck.rs b/mm2src/mm2_main/src/lp_healthcheck.rs index 5e9db51111..4f35c5b5c1 100644 --- a/mm2src/mm2_main/src/lp_healthcheck.rs +++ b/mm2src/mm2_main/src/lp_healthcheck.rs @@ -9,6 +9,7 @@ use instant::{Duration, Instant}; use lazy_static::lazy_static; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::MmError; +use mm2_err_handle::prelude::*; use mm2_libp2p::p2p_ctx::P2PContext; use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, PeerAddress, TopicPrefix}; use ser_error_derive::SerializeErrorType; @@ -16,7 +17,7 @@ use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use std::sync::Mutex; -use crate::lp_network::broadcast_p2p_msg; +use crate::lp_network::{broadcast_p2p_msg, P2PRequestError, P2PRequestResult}; pub(crate) const PEER_HEALTHCHECK_PREFIX: TopicPrefix = "hcheck"; @@ -279,7 +280,10 @@ pub async fn peer_connection_healthcheck_rpc( Ok(rx.timeout(timeout_duration).await == Ok(Ok(()))) } -pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_libp2p::GossipsubMessage) { +pub(crate) async fn process_p2p_healthcheck_message( + ctx: &MmArc, + message: mm2_libp2p::GossipsubMessage, +) -> P2PRequestResult<()> { macro_rules! try_or_return { ($exp:expr, $msg: expr) => { match $exp { @@ -292,24 +296,17 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li }; } - let data = try_or_return!( - HealthcheckMessage::decode(&message.data), - "Couldn't decode healthcheck message" - ); + let data = HealthcheckMessage::decode(&message.data) + .map_to_mm(|e| P2PRequestError::DecodeError(format!("Couldn't decode healthcheck message: {}", e)))?; + let sender_peer = data.is_received_message_valid().map_to_mm(|e| { + P2PRequestError::ValidationFailed(format!("Received an invalid healthcheck message. Error: {}", e)) + })?; let ctx = ctx.clone(); // Pass the remaining work to another thread to free up this one as soon as possible, // so KDF can handle a high amount of healthcheck messages more efficiently. ctx.spawner().spawn(async move { - let sender_peer = match data.is_received_message_valid() { - Ok(t) => t, - Err(e) => { - log::error!("Received an invalid healthcheck message. Error: {e}"); - return; - }, - }; - if data.should_reply() { // Reply the message so they know we are healthy. @@ -337,6 +334,8 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li }; } }); + + Ok(()) } #[cfg(any(test, target_arch = "wasm32"))] diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index d1587b3596..b2ef53f3fb 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -62,6 +62,7 @@ pub enum P2PRequestError { ResponseError(String), #[display(fmt = "Expected 1 response, found {}", _0)] ExpectedSingleResponseError(usize), + ValidationFailed(String), } /// Enum covering error cases that can happen during P2P message processing. @@ -215,7 +216,12 @@ async fn process_p2p_message( } }, Some(lp_healthcheck::PEER_HEALTHCHECK_PREFIX) => { - lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await + if let Err(e) = lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await { + log::error!("{}", e); + return; + } + + to_propagate = true; }, None | Some(_) => (), }