Skip to content

Commit

Permalink
fix websocket connection drop
Browse files Browse the repository at this point in the history
  • Loading branch information
borngraced committed Nov 12, 2024
1 parent 045625b commit 65c7f90
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 121 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 17 additions & 51 deletions mm2src/kdf_walletconnect/src/connection_handler.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use std::sync::Arc;

use crate::storage::WalletConnectStorageOps;
use crate::{WalletConnectCtx, WalletConnectError};
use crate::WalletConnectCtx;

use common::executor::Timer;
use common::log::{error, info};
use common::log::{debug, error, info};
use futures::channel::mpsc::UnboundedSender;
use futures::StreamExt;
use mm2_err_handle::prelude::*;
use relay_client::error::ClientError;
use relay_client::websocket::{CloseFrame, ConnectionHandler, PublishedMessage};
use relay_rpc::domain::Topic;
use std::sync::Arc;

const INITIAL_RETRY_SECS: f64 = 5.0;
const MAX_BACKOFF: u64 = 60;
Expand Down Expand Up @@ -38,39 +35,39 @@ impl Handler {

impl ConnectionHandler for Handler {
fn connected(&mut self) {
info!("\n[{}] connection to WalletConnect relay server successful", self.name);
debug!("[{}] connection to WalletConnect relay server successful", self.name);
}

fn disconnected(&mut self, frame: Option<CloseFrame<'static>>) {
info!("\n[{}] connection closed: frame={frame:?}", self.name);
debug!("[{}] connection closed: frame={frame:?}", self.name);

if let Err(e) = self.conn_live_sender.start_send(None) {
error!("\n[{}] failed to send to the receiver: {e}", self.name);
error!("[{}] failed to send to the receiver: {e}", self.name);
}
}

fn message_received(&mut self, message: PublishedMessage) {
info!(
"\n[{}] inbound message: message_id={} topic={} tag={} message={}",
debug!(
"[{}] inbound message: message_id={} topic={} tag={} message={}",
self.name, message.message_id, message.topic, message.tag, message.message,
);

if let Err(e) = self.msg_sender.start_send(message) {
error!("\n[{}] failed to send to the receiver: {e}", self.name);
error!("[{}] failed to send to the receiver: {e}", self.name);
}
}

fn inbound_error(&mut self, error: ClientError) {
info!("\n[{}] inbound error: {error}", self.name);
debug!("[{}] inbound error: {error}", self.name);
if let Err(e) = self.conn_live_sender.start_send(Some(error.to_string())) {
error!("\n[{}] failed to send to the receiver: {e}", self.name);
error!("[{}] failed to send to the receiver: {e}", self.name);
}
}

fn outbound_error(&mut self, error: ClientError) {
info!("\n[{}] outbound error: {error}", self.name);
debug!("[{}] outbound error: {error}", self.name);
if let Err(e) = self.conn_live_sender.start_send(Some(error.to_string())) {
error!("\n[{}] failed to send to the receiver: {e}", self.name);
error!("[{}] failed to send to the receiver: {e}", self.name);
}
}
}
Expand All @@ -85,7 +82,7 @@ pub(crate) async fn initialize_connection(this: Arc<WalletConnectCtx>) {

while let Err(err) = this.connect_client().await {
retry_count += 1;
error!(
info!(
"Error during initial connection attempt {}: {:?}. Retrying in {retry_secs} seconds...",
retry_count, err
);
Expand Down Expand Up @@ -115,52 +112,21 @@ pub(crate) async fn handle_disconnections(this: &WalletConnectCtx) {
let mut backoff = 1;

while let Some(msg) = recv.next().await {
info!("Connection disconnected. Attempting to reconnect...");
// In some rare occasion, WalletConnect websocket client returns
// `Websocket transport error: IO error: unexpected end of file`
// probably a problem with the relay server.
let msg = msg.map_or("".to_string(), |m| m);
if msg.contains("unexpected end of file") {
this.client.disconnect().await.ok();
}
info!("WalletConnect disconnected with message: {msg:?}. Attempting to reconnect...");

loop {
match this.connect_client().await {
match this.reconnect_and_subscribe().await {
Ok(_) => {
if let Err(e) = resubscribe_to_topics(this, None).await {
error!("Failed to resubscribe after reconnection: {:?}", e);
}
info!("Reconnection process complete.");
backoff = 1;
break;
},
Err(e) => {
error!("Reconnection attempt failed: {:?}. Retrying in {:?}...", e, backoff);
info!("Reconnection attempt failed: {:?}. Retrying in {:?}...", e, backoff);
Timer::sleep(backoff as f64).await;
backoff = std::cmp::min(backoff * 2, MAX_BACKOFF);
},
}
}
}
}

/// Resubscribes to previously active topics after reconnection.
/// Called by handle_disconnections to restore subscription state.
async fn resubscribe_to_topics(this: &WalletConnectCtx, topic: Option<&Topic>) -> MmResult<(), WalletConnectError> {
if let Some(topic) = topic {
if let Some(session) = this.session.get_session(topic) {
this.client
.batch_subscribe(vec![session.topic.clone(), session.pairing_topic.clone()])
.await?;
return Ok(());
}
}
let sessions = this.session.get_sessions();
for session in sessions {
this.client
.batch_subscribe(vec![session.topic.into(), session.pairing_topic.into()])
.await?;
}

Ok(())
}
4 changes: 2 additions & 2 deletions mm2src/kdf_walletconnect/src/inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ pub(crate) async fn process_inbound_response(ctx: &WalletConnectCtx, response: R
let result = match response {
Response::Success(value) => match serde_json::from_value::<ResponseParamsSuccess>(value.result) {
Ok(data) => {
// Probably the best place to handle session propose response
// as we might not get a feedback for a long time or even at all
// TODO: move to session::proposal mod and spawn in a different thread to avoid
// blocking
if let ResponseParamsSuccess::SessionPropose(propose) = &data {
process_session_propose_response(ctx, topic, propose).await.error_log();
}
Expand Down
Loading

0 comments on commit 65c7f90

Please sign in to comment.