Skip to content

Commit

Permalink
improve relayer disconnection, remove unneeded changes, track topic s…
Browse files Browse the repository at this point in the history
…ubscriptions
  • Loading branch information
borngraced committed Sep 23, 2024
1 parent edcb5a7 commit e606ca4
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 4,986 deletions.
4 changes: 2 additions & 2 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
JEMALLOC_SYS_WITH_MALLOC_CONF = "background_thread:true,narenas:1,tcache:false,dirty_decay_ms:0,muzzy_decay_ms:0,metadata_thp:auto"

[target.'cfg(all())']
rustflags = ["-Zshare-generics=y", '--cfg=curve25519_dalek_backend="fiat"']
rustflags = [ "-Zshare-generics=y", '--cfg=curve25519_dalek_backend="fiat"' ]

# # Install lld using package manager
# [target.x86_64-unknown-linux-gnu]
Expand All @@ -24,4 +24,4 @@ rustflags = ["-Zshare-generics=y", '--cfg=curve25519_dalek_backend="fiat"']

[target.wasm32-unknown-unknown]
runner = 'wasm-bindgen-test-runner'
rustflags = ["--cfg=web_sys_unstable_apis"]
rustflags = [ "--cfg=web_sys_unstable_apis" ]
24 changes: 11 additions & 13 deletions mm2src/coins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ edition = "2018"
zhtlc-native-tests = []
# TODO
enable-solana = [
"dep:bincode",
"dep:ed25519-dalek-bip32",
"dep:solana-client",
"dep:solana-sdk",
"dep:solana-transaction-status",
"dep:spl-token",
"dep:spl-associated-token-account",
"dep:satomic-swap"
"dep:bincode",
"dep:ed25519-dalek-bip32",
"dep:solana-client",
"dep:solana-sdk",
"dep:solana-transaction-status",
"dep:spl-token",
"dep:spl-associated-token-account",
"dep:satomic-swap"
]
enable-sia = [
"dep:reqwest",
"dep:blake2b_simd",
"dep:sia-rust"
"dep:reqwest",
"dep:blake2b_simd",
"dep:sia-rust"
]
default = []
run-docker-tests = []
Expand Down Expand Up @@ -188,5 +188,3 @@ wagyu-zcash-parameters = { version = "0.2" }
[build-dependencies]
prost-build = { version = "0.11", default-features = false }
tonic-build = { version = "0.9", default-features = false, features = ["prost"] }


10 changes: 0 additions & 10 deletions mm2src/coins/tendermint/tendermint_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ use futures01::Future;
use hex::FromHexError;
use instant::Duration;
use itertools::Itertools;
use kdf_walletconnect::WcCoinOps;
use keys::{KeyPair, Public};
use mm2_core::mm_ctx::{MmArc, MmWeak};
use mm2_err_handle::prelude::*;
Expand Down Expand Up @@ -3315,15 +3314,6 @@ fn parse_expected_sequence_number(e: &str) -> MmResult<u64, TendermintCoinRpcErr
)))
}

impl WcCoinOps for TendermintCoin {
//"cosmoshub-4".to_owned()
fn chain_id(&self) -> Vec<String> { todo!() }

fn chain(&self) -> String { "cosmos".to_owned() }

fn use_walletconnect(&self) -> bool { true }
}

#[cfg(test)]
pub mod tendermint_coin_tests {
use super::*;
Expand Down
1 change: 0 additions & 1 deletion mm2src/crypto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,3 @@ tokio = { version = "1.20", default-features = false }

[features]
trezor-udp = ["trezor/trezor-udp"]

Empty file removed mm2src/crypto/src/walletconnect.rs
Empty file.
2 changes: 0 additions & 2 deletions mm2src/kdf_walletconnect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ name = "kdf_walletconnect"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.52"
chrono = { version = "0.4.23", "features" = ["serde"] }
Expand Down
7 changes: 4 additions & 3 deletions mm2src/kdf_walletconnect/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ pub async fn cosmos_get_accounts_impl(
) -> MmResult<Vec<CosmosAccount>, WalletConnectCtxError> {
let account = ctx.get_account_for_chain_id(chain_id).await?;

let session = ctx.session.lock().await;
let session_topic = session.as_ref().map(|s| s.topic.clone());
drop(session);
let session_topic = {
let session = ctx.session.lock().await;
session.as_ref().map(|s| s.topic.clone())
};

if let Some(topic) = session_topic {
let request = SessionRequest {
Expand Down
4 changes: 2 additions & 2 deletions mm2src/kdf_walletconnect/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl ConnectionHandler for Handler {
info!("\n[{}] connection closed: frame={frame:?}", self.name);

if let Err(e) = self.conn_live_sender.start_send(()) {
info!("\n[{}] failed to send the to the receiver: {e}", self.name);
info!("\n[{}] failed to send to the receiver: {e}", self.name);
}
}

Expand All @@ -43,7 +43,7 @@ impl ConnectionHandler for Handler {
);

if let Err(e) = self.msg_sender.start_send(message) {
info!("\n[{}] failed to send the to the receiver: {e}", self.name);
info!("\n[{}] failed to send to the receiver: {e}", self.name);
}
}

Expand Down
118 changes: 76 additions & 42 deletions mm2src/kdf_walletconnect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use async_trait::async_trait;
use chain::{build_required_namespaces,
cosmos::{cosmos_get_accounts_impl, CosmosAccount},
SUPPORTED_CHAINS};
use common::{executor::Timer, log::info};
use common::{executor::Timer,
log::{error, info}};
use error::WalletConnectCtxError;
use futures::{channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
lock::Mutex,
Expand All @@ -35,7 +36,7 @@ use std::{sync::Arc, time::Duration};
use wc_common::{decode_and_decrypt_type0, encrypt_and_encode, EnvelopeType};

pub(crate) const SUPPORTED_PROTOCOL: &str = "irn";
const DEFAULT_CHAIN_ID: &str = "cosmoshub-4"; // cosmos.
const DEFAULT_CHAIN_ID: &str = "cosmoshub-4"; // tendermint e.g ATOM

type SessionEventMessage = (MessageId, Value);

Expand All @@ -45,9 +46,12 @@ pub struct WalletConnectCtx {
pub session: Arc<Mutex<Option<Session>>>,
pub active_chain_id: Arc<Mutex<String>>,
pub(crate) key_pair: SymKeyPair,

relay: Relay,
namespaces: ProposeNamespaces,
metadata: Metadata,
namespaces: ProposeNamespaces,
subscriptions: Arc<Mutex<Vec<Topic>>>,

inbound_message_handler: Arc<Mutex<UnboundedReceiver<PublishedMessage>>>,
connection_live_handler: Arc<Mutex<UnboundedReceiver<()>>>,

Expand Down Expand Up @@ -78,7 +82,7 @@ impl WalletConnectCtx {
Self {
client,
pairing,
session: Arc::new(Mutex::new(None)),
session: Default::default(),
active_chain_id: Arc::new(Mutex::new(DEFAULT_CHAIN_ID.to_string())),
relay,
namespaces: required,
Expand All @@ -88,9 +92,28 @@ impl WalletConnectCtx {
connection_live_handler: Arc::new(Mutex::new(conn_live_receiver)),
session_request_handler: Arc::new(Mutex::new(session_request_receiver)),
session_request_sender: Arc::new(Mutex::new(session_request_sender)),
subscriptions: Default::default(),
}
}

pub async fn connect_client(&self) -> MmResult<(), WalletConnectCtxError> {
let auth = {
let key = SigningKey::generate(&mut rand::thread_rng());
AuthToken::new(AUTH_TOKEN_SUB)
.aud(RELAY_ADDRESS)
.ttl(Duration::from_secs(60 * 60))
.as_jwt(&key)
.unwrap()
};
let opts = ConnectionOptions::new(PROJECT_ID, auth).with_address(RELAY_ADDRESS);

self.client.connect(&opts).await?;

info!("WC connected");

Ok(())
}

/// Create a WalletConnect pairing connection url.
pub async fn create_pairing(
&self,
Expand All @@ -99,10 +122,17 @@ impl WalletConnectCtx {
let (topic, url) = self.pairing.create(self.metadata.clone(), None).await?;

info!("Subscribing to topic: {topic:?}");

self.client.subscribe(topic.clone()).await?;

info!("Subscribed to topic: {topic:?}");

send_proposal(self, topic, required_namespaces).await?;
send_proposal(self, topic.clone(), required_namespaces).await?;

{
let mut subs = self.subscriptions.lock().await;
subs.push(topic);
};

Ok(url)
}
Expand All @@ -115,6 +145,11 @@ impl WalletConnectCtx {
self.client.subscribe(topic.clone()).await?;
info!("Subscribed to topic: {topic:?}");

{
let mut subs = self.subscriptions.lock().await;
subs.push(topic.clone());
};

Ok(topic)
}

Expand Down Expand Up @@ -185,24 +220,6 @@ impl WalletConnectCtx {
Ok(accounts[account_index as usize].clone())
}

pub async fn connect_client(&self) -> MmResult<(), WalletConnectCtxError> {
let auth = {
let key = SigningKey::generate(&mut rand::thread_rng());
AuthToken::new(AUTH_TOKEN_SUB)
.aud(RELAY_ADDRESS)
.ttl(Duration::from_secs(60 * 60))
.as_jwt(&key)
.unwrap()
};
let opts = ConnectionOptions::new(PROJECT_ID, auth).with_address(RELAY_ADDRESS);

self.client.connect(&opts).await?;

info!("WC connected");

Ok(())
}

async fn sym_key(&self, topic: &Topic) -> MmResult<Vec<u8>, WalletConnectCtxError> {
{
let session = self.session.lock().await;
Expand Down Expand Up @@ -327,28 +344,45 @@ impl WalletConnectCtx {
info!("Inbound message was handled successfully");
Ok(())
}

pub async fn spawn_connection_live_watcher(self: Arc<Self>) {
let mut recv = self.connection_live_handler.lock().await;
while let Some(_msg) = recv.next().await {
info!("connection disconnected, reconnecting");
if let Err(err) = self.connect_client().await {
common::log::error!("{err:?}");
Timer::sleep(5.).await;
continue;
};
info!("reconnecting success!");
}
}
}
let mut retry_count = 0;

#[async_trait]
pub trait WcCoinOps {
/// Returns the coin's namespace identifier (e.g., "eip155" for Ethereum).
fn chain(&self) -> String;
while let Some(_msg) = recv.next().await {
info!("Connection disconnected. Attempting to reconnect...");

// Try reconnecting
match self.connect_client().await {
Ok(_) => {
info!(
"Successfully reconnected to client after {} attempt(s).",
retry_count + 1
);
retry_count = 0;
},
Err(err) => {
retry_count += 1;
common::log::error!(
"Error while reconnecting to client (attempt {}): {:?}. Retrying in 5 seconds...",
retry_count,
err
);
Timer::sleep(5.).await;
continue;
},
}

/// Returns the list of supported chains for the coin.
fn chain_id(&self) -> Vec<String>;
// Subscribe to existing topics again after reconnecting
let subs = self.subscriptions.lock().await;
for topic in &*subs {
match self.client.subscribe(topic.clone()).await {
Ok(_) => info!("Successfully reconnected to topic: {:?}", topic),
Err(err) => error!("Failed to subscribe to topic: {:?}. Error: {:?}", topic, err),
}
}

/// Returns a boolean indicating whether WalletConnect should be used for this coin.
fn use_walletconnect(&self) -> bool;
info!("Reconnection process complete.");
}
}
}
6 changes: 5 additions & 1 deletion mm2src/kdf_walletconnect/src/session/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub async fn process_proposal_request(
{
let mut old_session = ctx.session.lock().await;
*old_session = Some(session.clone());
let mut subs = ctx.subscriptions.lock().await;
subs.push(session_topic.clone());
}

{
Expand Down Expand Up @@ -110,7 +112,7 @@ pub(crate) async fn process_session_propose_response(

let mut session = Session::new(
ctx,
session_topic,
session_topic.clone(),
subscription_id,
session_key,
pairing_topic.clone(),
Expand All @@ -124,6 +126,8 @@ pub(crate) async fn process_session_propose_response(
{
let mut old_session = ctx.session.lock().await;
*old_session = Some(session);
let mut subs = ctx.subscriptions.lock().await;
subs.push(session_topic.clone());
};

// Activate pairing_topic
Expand Down
4 changes: 3 additions & 1 deletion mm2src/kdf_walletconnect/src/session/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use mm2_err_handle::prelude::MmResult;
use relay_rpc::{domain::{MessageId, Topic},
rpc::params::{session_update::SessionUpdateRequest, ResponseParamsSuccess}};

// TODO: Handle properly when multi chain is supported.
// Hanlding for only cosmos support.
pub(crate) async fn process_session_update_request(
ctx: &WalletConnectCtx,
topic: &Topic,
Expand All @@ -14,7 +16,7 @@ pub(crate) async fn process_session_update_request(
let mut session = ctx.session.lock().await;
if let Some(session) = session.as_mut() {
session.namespaces = update.namespaces.0.clone();
}
};
}

let param = ResponseParamsSuccess::SessionUpdate(true);
Expand Down
Loading

0 comments on commit e606ca4

Please sign in to comment.