diff --git a/mm2src/kdf_walletconnect/src/inbound_message.rs b/mm2src/kdf_walletconnect/src/inbound_message.rs new file mode 100644 index 0000000000..1c783f9cc5 --- /dev/null +++ b/mm2src/kdf_walletconnect/src/inbound_message.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; + +use mm2_err_handle::prelude::{MmError, MmResult}; +use relay_rpc::{domain::Topic, + rpc::{params::ResponseParamsSuccess, Params, Request, Response}}; + +use crate::{error::WalletConnectCtxError, + pairing::{process_pairing_delete_response, process_pairing_extend_response, process_pairing_ping_response}, + WalletConnectCtx}; + +pub(crate) async fn process_inbound_request( + ctx: Arc, + request: Request, + topic: &Topic, +) -> MmResult<(), WalletConnectCtxError> { + let response = match request.params { + Params::SessionPropose(proposal) => { + ctx.session + .process_proposal_request(&ctx, proposal, topic.clone()) + .await + }, + Params::SessionExtend(param) => ctx.session.process_session_extend_request(topic, param).await, + Params::SessionDelete(param) => ctx.session.process_session_delete_request(param), + Params::SessionPing(()) => ctx.session.process_session_ping_request(), + Params::SessionSettle(param) => ctx.session.process_session_settle_request(topic, param).await, + Params::SessionUpdate(param) => ctx.session.process_session_update_request(topic, param).await, + Params::SessionRequest(_) => todo!(), + Params::SessionEvent(_) => todo!(), + + Params::PairingPing(_param) => process_pairing_ping_response().await, + Params::PairingDelete(param) => process_pairing_delete_response(&ctx, topic, param).await, + Params::PairingExtend(param) => process_pairing_extend_response(&ctx, topic, param).await, + _ => todo!(), + }?; + + ctx.publish_response(topic, response.0, response.1, request.id).await?; + + // ctx.session.session_delete_cleanup(ctx.clone(), topic).await? + + Ok(()) +} + +pub(crate) async fn process_inbound_response( + ctx: Arc, + response: Response, + topic: &Topic, +) -> MmResult<(), WalletConnectCtxError> { + match response { + Response::Success(value) => { + let params = serde_json::from_value::(value.result)?; + match params { + ResponseParamsSuccess::SessionPropose(param) => { + ctx.session.handle_session_propose_response(topic, param).await; + Ok(()) + }, + ResponseParamsSuccess::SessionSettle(success) + | ResponseParamsSuccess::SessionUpdate(success) + | ResponseParamsSuccess::SessionExtend(success) + | ResponseParamsSuccess::SessionRequest(success) + | ResponseParamsSuccess::SessionEvent(success) + | ResponseParamsSuccess::SessionDelete(success) + | ResponseParamsSuccess::SessionPing(success) + | ResponseParamsSuccess::PairingExtend(success) + | ResponseParamsSuccess::PairingDelete(success) + | ResponseParamsSuccess::PairingPing(success) => { + if !success { + return MmError::err(WalletConnectCtxError::UnsuccessfulResponse(format!( + "Unsuccessful response={params:?}" + ))); + } + + Ok(()) + }, + } + }, + Response::Error(err) => { + println!("Error: {err:?}"); + todo!() + }, + } +} diff --git a/mm2src/kdf_walletconnect/src/lib.rs b/mm2src/kdf_walletconnect/src/lib.rs index d082098572..00139e3e82 100644 --- a/mm2src/kdf_walletconnect/src/lib.rs +++ b/mm2src/kdf_walletconnect/src/lib.rs @@ -1,18 +1,19 @@ mod error; mod handler; +mod inbound_message; mod pairing; mod session; mod session_key; -use common::log::info; +use common::{executor::Timer, log::info}; use error::WalletConnectCtxError; use futures::{channel::mpsc::{unbounded, UnboundedReceiver}, lock::Mutex, StreamExt}; use handler::Handler; +use inbound_message::{process_inbound_request, process_inbound_response}; use mm2_err_handle::prelude::MmResult; use mm2_err_handle::prelude::*; -use pairing::{process_pairing_delete_response, process_pairing_extend_response, process_pairing_ping_response}; use pairing_api::{Methods, PairingClient}; use rand::rngs::OsRng; use relay_client::{websocket::{Client, PublishedMessage}, @@ -20,8 +21,7 @@ use relay_client::{websocket::{Client, PublishedMessage}, use relay_rpc::rpc::params::RelayProtocolMetadata; use relay_rpc::{auth::{ed25519_dalek::SigningKey, AuthToken}, domain::{MessageId, Topic}, - rpc::{params::{session_propose::SessionProposeRequest, IrnMetadata, Metadata, RequestParams, - ResponseParamsSuccess}, + rpc::{params::{session_propose::SessionProposeRequest, IrnMetadata, Metadata, RequestParams}, Params, Payload, Request, Response, SuccessfulResponse, JSON_RPC_VERSION_STR}}; use session::{Session, SessionInfo, SessionUserType, APP_DESCRIPTION, APP_NAME}; use session_key::SessionKey; @@ -86,13 +86,11 @@ impl WalletConnectCtx { let opts = ConnectionOptions::new(PROJECT_ID, auth).with_address(RELAY_ADDRESS); self.client.connect(&opts).await?; - // let is_connected = self.client.; info!("WC connected"); Ok(()) } - // todo: return slice async fn sym_key(&self, topic: &Topic) -> MmResult, WalletConnectCtxError> { { let sessions = self.session.lock().await; @@ -126,6 +124,7 @@ impl WalletConnectCtx { .iter() .map(|m| m.to_string()) .collect::>()]); + let (topic, url) = self .pairing .create(metadata.clone(), Some(methods), &self.client) @@ -155,7 +154,6 @@ impl WalletConnectCtx { required_namespaces: session.namespaces.clone(), }); - // Store the session information in your session manager (self.sessions or similar) { let mut sessions = self.session.lock().await; sessions.insert(session_topic.clone(), session); @@ -249,7 +247,7 @@ impl WalletConnectCtx { decode_and_decrypt_type0(msg.message.as_bytes(), &key).unwrap() }; - println!("\nInbound message payload={message}"); + info!("\nInbound message payload={message}"); let response = serde_json::from_str::(&message).unwrap(); let result = match response { @@ -270,83 +268,10 @@ impl WalletConnectCtx { info!("connection disconnected, reconnecting"); if let Err(err) = self.connect_client().await { common::log::error!("{err:?}"); + Timer::sleep(5.).await; continue; }; info!("reconnecting success!"); } } } - -async fn process_inbound_request( - ctx: Arc, - request: Request, - topic: &Topic, -) -> MmResult<(), WalletConnectCtxError> { - let response = match request.params { - Params::SessionPropose(proposal) => { - ctx.session - .process_proposal_request(&ctx, proposal, topic.clone()) - .await - }, - Params::SessionExtend(param) => ctx.session.process_session_extend_request(topic, param).await, - Params::SessionDelete(param) => ctx.session.process_session_delete_request(param), - Params::SessionPing(()) => ctx.session.process_session_ping_request(), - Params::SessionSettle(param) => ctx.session.process_session_settle_request(topic, param).await, - Params::SessionUpdate(param) => ctx.session.process_session_update_request(topic, param).await, - Params::SessionRequest(_) => todo!(), - Params::SessionEvent(_) => todo!(), - - Params::PairingPing(_param) => process_pairing_ping_response().await, - Params::PairingDelete(param) => process_pairing_delete_response(&ctx, topic, param).await, - Params::PairingExtend(param) => process_pairing_extend_response(&ctx, topic, param).await, - _ => todo!(), - }?; - - info!("Publishing reponse"); - ctx.publish_response(topic, response.0, response.1, request.id).await?; - - // todo - // ctx.session.session_delete_cleanup(ctx.clone(), topic).await? - - Ok(()) -} - -async fn process_inbound_response( - ctx: Arc, - response: Response, - topic: &Topic, -) -> MmResult<(), WalletConnectCtxError> { - match response { - Response::Success(value) => { - let params = serde_json::from_value::(value.result)?; - match params { - ResponseParamsSuccess::SessionPropose(param) => { - ctx.session.handle_session_propose_response(topic, param).await; - Ok(()) - }, - ResponseParamsSuccess::SessionSettle(success) - | ResponseParamsSuccess::SessionUpdate(success) - | ResponseParamsSuccess::SessionExtend(success) - | ResponseParamsSuccess::SessionRequest(success) - | ResponseParamsSuccess::SessionEvent(success) - | ResponseParamsSuccess::SessionDelete(success) - | ResponseParamsSuccess::SessionPing(success) - | ResponseParamsSuccess::PairingExtend(success) - | ResponseParamsSuccess::PairingDelete(success) - | ResponseParamsSuccess::PairingPing(success) => { - if !success { - return MmError::err(WalletConnectCtxError::UnsuccessfulResponse(format!( - "Unsuccessful response={params:?}" - ))); - } - - Ok(()) - }, - } - }, - Response::Error(err) => { - println!("Error: {err:?}"); - todo!() - }, - } -} diff --git a/mm2src/kdf_walletconnect/src/session.rs b/mm2src/kdf_walletconnect/src/session.rs index fb96c65acc..174e858b9b 100644 --- a/mm2src/kdf_walletconnect/src/session.rs +++ b/mm2src/kdf_walletconnect/src/session.rs @@ -21,6 +21,8 @@ use std::{collections::BTreeMap, sync::Arc}; pub(crate) const APP_NAME: &str = "Komodefi Framework"; pub(crate) const APP_DESCRIPTION: &str = "WallectConnect Komodefi Framework Playground"; +const FIVE_MINUTES: u64 = 300; +const THIRTY_DAYS: u64 = 60 * 60 * 30; pub(crate) type WcRequestResult = MmResult<(Value, IrnMetadata), WalletConnectCtxError>; @@ -69,13 +71,12 @@ impl SessionInfo { events: SUPPORTED_EVENTS.iter().map(|e| e.to_string()).collect(), }); - // Initialize relay let relay = Relay { protocol: SUPPORTED_PROTOCOL.to_string(), data: None, }; - // Conditional logic to handle proposer or controller + // handle proposer or controller let (proposer, controller) = match session_type { SessionUserType::Proposer => ( Proposer { @@ -98,7 +99,7 @@ impl SessionInfo { namespaces: ProposeNamespaces(namespaces), settled_namespaces: SettleNamespaces(settled_namespaces), relay, - expiry: Utc::now().timestamp() as u64 + 300, + expiry: Utc::now().timestamp() as u64 + FIVE_MINUTES, pairing_topic, session_type, } @@ -111,7 +112,7 @@ impl SessionInfo { relay: self.relay.clone(), controller: self.controller.clone(), namespaces: self.supported_settle_namespaces().clone(), - expiry: Utc::now().timestamp() as u64 + 300, // 5 min TTL + expiry: Utc::now().timestamp() as u64 + FIVE_MINUTES, }) } fn create_proposal_response(&self) -> Result<(Value, IrnMetadata), WalletConnectCtxError> { @@ -164,6 +165,7 @@ impl Session { info!("session found!"); session.proposer.public_key = response.responder_public_key; session.relay = response.relay; + session.expiry = Utc::now().timestamp() + THIRTY_DAYS; }; } diff --git a/mm2src/mm2_test_helpers/src/for_tests.rs b/mm2src/mm2_test_helpers/src/for_tests.rs index 57084a72e2..56fa519ccc 100644 --- a/mm2src/mm2_test_helpers/src/for_tests.rs +++ b/mm2src/mm2_test_helpers/src/for_tests.rs @@ -853,9 +853,7 @@ pub fn nft_dev_conf() -> Json { }) } -fn set_chain_id(conf: &mut Json, chain_id: u64) { - conf["chain_id"] = json!(chain_id); -} +fn set_chain_id(conf: &mut Json, chain_id: u64) { conf["chain_id"] = json!(chain_id); } pub fn eth_sepolia_conf() -> Json { json!({ @@ -2903,7 +2901,10 @@ pub async fn enable_tendermint( tx_history: bool, ) -> Json { let ibc_requests: Vec<_> = ibc_assets.iter().map(|ticker| json!({ "ticker": ticker })).collect(); - let nodes: Vec = rpc_urls.iter().map(|u| json!({"url": u, "komodo_proxy": false })).collect(); + let nodes: Vec = rpc_urls + .iter() + .map(|u| json!({"url": u, "komodo_proxy": false })) + .collect(); let request = json!({ "userpass": mm.userpass, @@ -2940,7 +2941,10 @@ pub async fn enable_tendermint_without_balance( tx_history: bool, ) -> Json { let ibc_requests: Vec<_> = ibc_assets.iter().map(|ticker| json!({ "ticker": ticker })).collect(); - let nodes: Vec = rpc_urls.iter().map(|u| json!({"url": u, "komodo_proxy": false })).collect(); + let nodes: Vec = rpc_urls + .iter() + .map(|u| json!({"url": u, "komodo_proxy": false })) + .collect(); let request = json!({ "userpass": mm.userpass,