diff --git a/Cargo.lock b/Cargo.lock index ee1f1772d8..a934484167 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4798,10 +4798,11 @@ dependencies = [ [[package]] name = "pairing_api" version = "0.1.0" -source = "git+https://github.com/komodoplatform/walletconnectrust?branch=pairing-api#2372fa5f4a7b5f22c6489ecb4938970f6a37dea0" +source = "git+https://github.com/komodoplatform/walletconnectrust?branch=pairing-api#d208b549a0e12e6c232551269e7bc564a6cee004" dependencies = [ "anyhow", "chrono", + "dashmap", "getrandom 0.2.9", "hex", "lazy_static", @@ -5730,7 +5731,7 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "relay_client" version = "0.1.0" -source = "git+https://github.com/komodoplatform/walletconnectrust?branch=pairing-api#2372fa5f4a7b5f22c6489ecb4938970f6a37dea0" +source = "git+https://github.com/komodoplatform/walletconnectrust?branch=pairing-api#d208b549a0e12e6c232551269e7bc564a6cee004" dependencies = [ "chrono", "data-encoding", @@ -5758,7 +5759,7 @@ dependencies = [ [[package]] name = "relay_rpc" version = "0.1.0" -source = "git+https://github.com/komodoplatform/walletconnectrust?branch=pairing-api#2372fa5f4a7b5f22c6489ecb4938970f6a37dea0" +source = "git+https://github.com/komodoplatform/walletconnectrust?branch=pairing-api#d208b549a0e12e6c232551269e7bc564a6cee004" dependencies = [ "anyhow", "bs58 0.4.0", @@ -8133,7 +8134,7 @@ dependencies = [ [[package]] name = "wc_common" version = "0.1.0" -source = "git+https://github.com/komodoplatform/walletconnectrust?branch=pairing-api#2372fa5f4a7b5f22c6489ecb4938970f6a37dea0" +source = "git+https://github.com/komodoplatform/walletconnectrust?branch=pairing-api#d208b549a0e12e6c232551269e7bc564a6cee004" dependencies = [ "base64 0.21.7", "chacha20poly1305", diff --git a/mm2src/kdf_walletconnect/src/connection_handler.rs b/mm2src/kdf_walletconnect/src/connection_handler.rs index 70ef3e37e6..eb5cb2fd53 100644 --- a/mm2src/kdf_walletconnect/src/connection_handler.rs +++ b/mm2src/kdf_walletconnect/src/connection_handler.rs @@ -1,16 +1,13 @@ -use std::sync::Arc; - use crate::storage::WalletConnectStorageOps; -use crate::{WalletConnectCtx, WalletConnectError}; +use crate::WalletConnectCtx; use common::executor::Timer; -use common::log::{error, info}; +use common::log::{debug, error, info}; use futures::channel::mpsc::UnboundedSender; use futures::StreamExt; -use mm2_err_handle::prelude::*; use relay_client::error::ClientError; use relay_client::websocket::{CloseFrame, ConnectionHandler, PublishedMessage}; -use relay_rpc::domain::Topic; +use std::sync::Arc; const INITIAL_RETRY_SECS: f64 = 5.0; const MAX_BACKOFF: u64 = 60; @@ -38,39 +35,39 @@ impl Handler { impl ConnectionHandler for Handler { fn connected(&mut self) { - info!("\n[{}] connection to WalletConnect relay server successful", self.name); + debug!("[{}] connection to WalletConnect relay server successful", self.name); } fn disconnected(&mut self, frame: Option>) { - info!("\n[{}] connection closed: frame={frame:?}", self.name); + debug!("[{}] connection closed: frame={frame:?}", self.name); if let Err(e) = self.conn_live_sender.start_send(None) { - error!("\n[{}] failed to send to the receiver: {e}", self.name); + error!("[{}] failed to send to the receiver: {e}", self.name); } } fn message_received(&mut self, message: PublishedMessage) { - info!( - "\n[{}] inbound message: message_id={} topic={} tag={} message={}", + debug!( + "[{}] inbound message: message_id={} topic={} tag={} message={}", self.name, message.message_id, message.topic, message.tag, message.message, ); if let Err(e) = self.msg_sender.start_send(message) { - error!("\n[{}] failed to send to the receiver: {e}", self.name); + error!("[{}] failed to send to the receiver: {e}", self.name); } } fn inbound_error(&mut self, error: ClientError) { - info!("\n[{}] inbound error: {error}", self.name); + debug!("[{}] inbound error: {error}", self.name); if let Err(e) = self.conn_live_sender.start_send(Some(error.to_string())) { - error!("\n[{}] failed to send to the receiver: {e}", self.name); + error!("[{}] failed to send to the receiver: {e}", self.name); } } fn outbound_error(&mut self, error: ClientError) { - info!("\n[{}] outbound error: {error}", self.name); + debug!("[{}] outbound error: {error}", self.name); if let Err(e) = self.conn_live_sender.start_send(Some(error.to_string())) { - error!("\n[{}] failed to send to the receiver: {e}", self.name); + error!("[{}] failed to send to the receiver: {e}", self.name); } } } @@ -85,7 +82,7 @@ pub(crate) async fn initialize_connection(this: Arc) { while let Err(err) = this.connect_client().await { retry_count += 1; - error!( + info!( "Error during initial connection attempt {}: {:?}. Retrying in {retry_secs} seconds...", retry_count, err ); @@ -115,27 +112,17 @@ pub(crate) async fn handle_disconnections(this: &WalletConnectCtx) { let mut backoff = 1; while let Some(msg) = recv.next().await { - info!("Connection disconnected. Attempting to reconnect..."); - // In some rare occasion, WalletConnect websocket client returns - // `Websocket transport error: IO error: unexpected end of file` - // probably a problem with the relay server. - let msg = msg.map_or("".to_string(), |m| m); - if msg.contains("unexpected end of file") { - this.client.disconnect().await.ok(); - } + info!("WalletConnect disconnected with message: {msg:?}. Attempting to reconnect..."); loop { - match this.connect_client().await { + match this.reconnect_and_subscribe().await { Ok(_) => { - if let Err(e) = resubscribe_to_topics(this, None).await { - error!("Failed to resubscribe after reconnection: {:?}", e); - } info!("Reconnection process complete."); backoff = 1; break; }, Err(e) => { - error!("Reconnection attempt failed: {:?}. Retrying in {:?}...", e, backoff); + info!("Reconnection attempt failed: {:?}. Retrying in {:?}...", e, backoff); Timer::sleep(backoff as f64).await; backoff = std::cmp::min(backoff * 2, MAX_BACKOFF); }, @@ -143,24 +130,3 @@ pub(crate) async fn handle_disconnections(this: &WalletConnectCtx) { } } } - -/// Resubscribes to previously active topics after reconnection. -/// Called by handle_disconnections to restore subscription state. -async fn resubscribe_to_topics(this: &WalletConnectCtx, topic: Option<&Topic>) -> MmResult<(), WalletConnectError> { - if let Some(topic) = topic { - if let Some(session) = this.session.get_session(topic) { - this.client - .batch_subscribe(vec![session.topic.clone(), session.pairing_topic.clone()]) - .await?; - return Ok(()); - } - } - let sessions = this.session.get_sessions(); - for session in sessions { - this.client - .batch_subscribe(vec![session.topic.into(), session.pairing_topic.into()]) - .await?; - } - - Ok(()) -} diff --git a/mm2src/kdf_walletconnect/src/inbound_message.rs b/mm2src/kdf_walletconnect/src/inbound_message.rs index 828adc9a61..0fd3545cb4 100644 --- a/mm2src/kdf_walletconnect/src/inbound_message.rs +++ b/mm2src/kdf_walletconnect/src/inbound_message.rs @@ -59,8 +59,8 @@ pub(crate) async fn process_inbound_response(ctx: &WalletConnectCtx, response: R let result = match response { Response::Success(value) => match serde_json::from_value::(value.result) { Ok(data) => { - // Probably the best place to handle session propose response - // as we might not get a feedback for a long time or even at all + // TODO: move to session::proposal mod and spawn in a different thread to avoid + // blocking if let ResponseParamsSuccess::SessionPropose(propose) = &data { process_session_propose_response(ctx, topic, propose).await.error_log(); } diff --git a/mm2src/kdf_walletconnect/src/lib.rs b/mm2src/kdf_walletconnect/src/lib.rs index 4cc52c17bd..fb4d2e917a 100644 --- a/mm2src/kdf_walletconnect/src/lib.rs +++ b/mm2src/kdf_walletconnect/src/lib.rs @@ -12,7 +12,7 @@ use crate::session::rpc::propose::send_proposal_request; use chain::{WcChainId, WcRequestMethods, SUPPORTED_PROTOCOL}; use chrono::Utc; use common::custom_futures::timeout::FutureTimerExt; -use common::executor::{spawn_abortable, AbortOnDropHandle}; +use common::executor::{spawn_abortable, AbortOnDropHandle, Timer}; use common::log::{debug, info}; use common::{executor::SpawnFuture, log::error}; use connection_handler::{initialize_connection, Handler}; @@ -99,9 +99,10 @@ impl WalletConnectCtx { let storage = SessionStorageDb::init(ctx)?; - let handler = Handler::new("Komodefi", inbound_message_tx, conn_live_sender); - let (client, _abort_handle) = - Client::new_with_callback(handler, |r, h| spawn_abortable(client_event_loop(r, h))); + let (client, _abort_handle) = Client::new_with_callback( + Handler::new("Komodefi", inbound_message_tx, conn_live_sender), + |r, h| spawn_abortable(client_event_loop(r, h)), + ); Ok(Self { client, @@ -144,15 +145,28 @@ impl WalletConnectCtx { Ok(()) } + pub(crate) async fn reconnect_and_subscribe(&self) -> MmResult<(), WalletConnectError> { + self.connect_client().await?; + // Resubscribes to previously active session topics after reconnection. + let sessions = self.session.get_sessions(); + for session in sessions { + self.client + .batch_subscribe(vec![session.topic.into(), session.pairing_topic.into()]) + .await?; + } + + Ok(()) + } + /// Create a WalletConnect pairing connection url. pub async fn new_connection(&self, namespaces: Option) -> MmResult { let namespaces = match namespaces { Some(value) => Some(serde_json::from_value(value)?), None => None, }; - let (topic, url) = self.pairing.create(self.metadata.clone(), None).await?; + let (topic, url) = self.pairing.create(self.metadata.clone(), None)?; - info!("Subscribing to topic: {topic:?}"); + info!("[topic] Subscribing to topic"); self.client .subscribe(topic.clone()) @@ -160,7 +174,7 @@ impl WalletConnectCtx { .await .map_to_mm(|err| WalletConnectError::InternalError(err.to_string()))??; - info!("Subscribed to topic: {topic:?}"); + info!("[topic] Subscribed to topic"); send_proposal_request(self, &topic, namespaces).await?; @@ -168,12 +182,12 @@ impl WalletConnectCtx { } /// Retrieves the symmetric key associated with a given `topic`. - async fn sym_key(&self, topic: &Topic) -> MmResult { + fn sym_key(&self, topic: &Topic) -> MmResult { if let Some(key) = self.session.sym_key(topic) { return Ok(key); } - if let Ok(key) = self.pairing.sym_key(topic.as_ref()).await { + if let Ok(key) = self.pairing.sym_key(topic) { return Ok(key); } @@ -185,18 +199,18 @@ impl WalletConnectCtx { /// Handles an inbound published message by decrypting, decoding, and processing it. async fn handle_published_message(&self, msg: PublishedMessage) -> MmResult<(), WalletConnectError> { let message = { - let key = self.sym_key(&msg.topic).await?; + let key = self.sym_key(&msg.topic)?; decode_and_decrypt_type0(msg.message.as_bytes(), &key)? }; - info!("Inbound message payload={message}"); + info!("[{}] Inbound message payload={message}", msg.topic); match serde_json::from_str(&message)? { Payload::Request(request) => process_inbound_request(self, request, &msg.topic).await?, Payload::Response(response) => process_inbound_response(self, response, &msg.topic).await, } - info!("Inbound message was handled successfully"); + info!("[{}] Inbound message was handled successfully", msg.topic); Ok(()) } @@ -218,16 +232,16 @@ impl WalletConnectCtx { if now > session.expiry { debug!("Session {} expired, trying to delete from storage", session.topic); if let Err(err) = self.storage.delete_session(&session.topic).await { - error!("Unable to delete session: {:?} from storage", err); + error!("[{}] Unable to delete session from storage: {err:?}", session.topic); } continue; }; let topic = session.topic.clone(); let pairing_topic = session.pairing_topic.clone(); - debug!("Session found! activating :{}", topic); + debug!("[{topic}] Session found! activating"); self.session.add_session(session).await; - self.client.batch_subscribe(vec![topic.clone(), pairing_topic]).await?; + self.client.batch_subscribe(vec![topic, pairing_topic]).await?; } Ok(()) @@ -290,51 +304,63 @@ impl WalletConnectCtx { irn_metadata: IrnMetadata, payload: Payload, ) -> MmResult<(), WalletConnectError> { - debug!("Publishing message={:?} to topic: {topic}", payload); + const MAX_RETRIES: usize = 5; + const PUBLISH_TIMEOUT_SECS: f64 = 5.0; + + info!("[{topic}] Publishing message={payload:?}"); let message = { - let sym_key = self.sym_key(topic).await?; + let sym_key = self.sym_key(topic)?; let payload = serde_json::to_string(&payload)?; encrypt_and_encode(EnvelopeType::Type0, payload, &sym_key)? }; - // Attempt to publish, retrying once on NotConnected error - match self - .client - .publish( - topic.clone(), - message.clone(), - None, - irn_metadata.tag, - Duration::from_secs(irn_metadata.ttl), - irn_metadata.prompt, - ) - .await - { - Ok(_) => { - info!("Message published successfully to topic: {topic}"); - Ok(()) - }, - Err(err) => { - if err.to_string().contains("WebsocketClient") { - debug!("Reconnecting client after NotConnected error..."); - self.connect_client().await?; - self.client - .publish( - topic.clone(), - message, - None, - irn_metadata.tag, - Duration::from_secs(irn_metadata.ttl), - irn_metadata.prompt, - ) - .await?; - info!("Message published successfully after reconnect"); - Ok(()) - } else { - MmError::err(err.into()) - } - }, + for attempt in 0..MAX_RETRIES { + match self + .client + .publish( + topic.clone(), + message.clone(), + None, + irn_metadata.tag, + Duration::from_secs(irn_metadata.ttl), + irn_metadata.prompt, + ) + .timeout_secs(PUBLISH_TIMEOUT_SECS) + .await + { + Ok(Ok(_)) => { + info!("[{topic}] Message published successfully"); + return Ok(()); + }, + Ok(Err(err)) => return MmError::err(err.into()), + Err(timeout_err) => { + // This persistent reconnection and retry strategy keeps the WebSocket connection active, + // allowing the client to automatically resume operations after network interruptions or disconnections. + // Since TCP handles connection timeouts (which can be lengthy), we're using a shorter timeout here + // to detect issues quickly and reconnect as needed. + if attempt >= MAX_RETRIES - 1 { + return MmError::err(WalletConnectError::InternalError(timeout_err.to_string())); + } + debug!("Attempt {} failed due to timeout. Reconnecting...", attempt + 1); + loop { + match self.reconnect_and_subscribe().await { + Ok(_) => { + info!("Reconnected and subscribed successfully."); + break; + }, + Err(reconnect_err) => { + error!("Reconnection attempt failed: {reconnect_err:?}. Retrying..."); + Timer::sleep(1.5).await; + }, + } + } + }, + } } + + info!("[{topic}] Message published successfully"); + + Ok(()) } /// Checks if the current session is connected to a Ledger device. @@ -499,7 +525,7 @@ impl WalletConnectCtx { if let Some(session) = self.session.delete_session(topic).await { self.pairing - .disconnect(session.pairing_topic.as_ref(), &self.client) + .disconnect_rpc(&session.pairing_topic, &self.client) .await?; }; @@ -526,7 +552,7 @@ pub async fn initialize_walletconnect(ctx: &MmArc) -> MmResult<(), WalletConnect let mut recv = wallet_connect.inbound_message_rx.lock().await; while let Some(msg) = recv.next().await { if let Err(e) = wallet_connect.handle_published_message(msg).await { - info!("Error processing message: {:?}", e); + debug!("Error processing message: {:?}", e); } } }); diff --git a/mm2src/kdf_walletconnect/src/pairing.rs b/mm2src/kdf_walletconnect/src/pairing.rs index 9fc3973d57..3e4411305b 100644 --- a/mm2src/kdf_walletconnect/src/pairing.rs +++ b/mm2src/kdf_walletconnect/src/pairing.rs @@ -28,8 +28,7 @@ pub(crate) async fn reply_pairing_extend_response( extend: PairingExtendRequest, ) -> MmResult<(), WalletConnectError> { { - let mut pairings = ctx.pairing.pairings.lock().await; - if let Some(pairing) = pairings.get_mut(topic.as_ref()) { + if let Some(mut pairing) = ctx.pairing.pairings.get_mut(topic) { pairing.pairing.expiry = extend.expiry; pairing.pairing.active = true; }; @@ -48,7 +47,7 @@ pub(crate) async fn reply_pairing_delete_response( _delete: PairingDeleteRequest, ) -> MmResult<(), WalletConnectError> { { - ctx.pairing.disconnect(topic.as_ref(), &ctx.client).await?; + ctx.pairing.disconnect_rpc(topic, &ctx.client).await?; } let param = ResponseParamsSuccess::PairingDelete(true); diff --git a/mm2src/kdf_walletconnect/src/session/rpc/delete.rs b/mm2src/kdf_walletconnect/src/session/rpc/delete.rs index 75843e3dcc..46374cc30e 100644 --- a/mm2src/kdf_walletconnect/src/session/rpc/delete.rs +++ b/mm2src/kdf_walletconnect/src/session/rpc/delete.rs @@ -41,13 +41,13 @@ async fn session_delete_cleanup(ctx: &WalletConnectCtx, topic: &Topic) -> MmResu if let Some(session) = ctx.session.delete_session(topic).await { debug!( - "No active sessions left for pairing {}, disconnecting", + "[{}] No active sessions for pairing disconnecting", session.pairing_topic ); //Attempt to unsubscribe from topic ctx.client.unsubscribe(session.pairing_topic.clone()).await?; // Attempt to disconnect the pairing - ctx.pairing.delete(session.pairing_topic.as_ref()).await; + ctx.pairing.delete(&session.pairing_topic); // delete session from storage as well. ctx.storage diff --git a/mm2src/kdf_walletconnect/src/session/rpc/ping.rs b/mm2src/kdf_walletconnect/src/session/rpc/ping.rs index 640465bebf..ad0e8eda10 100644 --- a/mm2src/kdf_walletconnect/src/session/rpc/ping.rs +++ b/mm2src/kdf_walletconnect/src/session/rpc/ping.rs @@ -6,7 +6,7 @@ use common::custom_futures::timeout::FutureTimerExt; use futures::StreamExt; use mm2_err_handle::prelude::*; use relay_rpc::{domain::{MessageId, Topic}, - rpc::params::{RequestParams, ResponseParamsSuccess}}; + rpc::params::{RelayProtocolMetadata, RequestParams, ResponseParamsSuccess}}; pub(crate) async fn reply_session_ping_request( ctx: &WalletConnectCtx, @@ -21,9 +21,10 @@ pub(crate) async fn reply_session_ping_request( pub async fn send_session_ping_request(ctx: &WalletConnectCtx, topic: &Topic) -> MmResult<(), WalletConnectError> { let param = RequestParams::SessionPing(()); + let ttl = param.irn_metadata().ttl; ctx.publish_request(topic, param).await?; - let wait_duration = Duration::from_secs(30); + let wait_duration = Duration::from_secs(ttl); if let Ok(Some(resp)) = ctx.message_rx.lock().await.next().timeout(wait_duration).await { resp.mm_err(WalletConnectError::InternalError)?; return Ok(()); diff --git a/mm2src/kdf_walletconnect/src/session/rpc/propose.rs b/mm2src/kdf_walletconnect/src/session/rpc/propose.rs index eee8c90b8a..f941fb6dd3 100644 --- a/mm2src/kdf_walletconnect/src/session/rpc/propose.rs +++ b/mm2src/kdf_walletconnect/src/session/rpc/propose.rs @@ -145,7 +145,7 @@ pub(crate) async fn process_session_propose_response( }; // Activate pairing_topic - ctx.pairing.activate(pairing_topic.as_ref()).await?; + ctx.pairing.activate(pairing_topic)?; Ok(()) }