Skip to content

Commit

Permalink
save dev state - handle connection close, session improvements, etc
Browse files Browse the repository at this point in the history
  • Loading branch information
borngraced committed Sep 12, 2024
1 parent 588316f commit 60a2fc5
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 40 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions mm2src/coins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,5 @@ wagyu-zcash-parameters = { version = "0.2" }
[build-dependencies]
prost-build = { version = "0.11", default-features = false }
tonic-build = { version = "0.9", default-features = false, features = ["prost"] }


3 changes: 2 additions & 1 deletion mm2src/kdf_walletconnect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ relay_rpc = { path = "../../../kdf-wc/relay_rpc" }
sha2 = "0.10.8"
thiserror = "1.0.40"
wc_common = { path = "../../../kdf-wc/wc_common" }
x25519-dalek = { version = "2.0", features = ["static_secrets"] }
secp256k1 = { version = "0.20" }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1", features = ["preserve_order", "raw_value"] }
x25519-dalek = { version = "2.0", features = ["static_secrets"] }
2 changes: 1 addition & 1 deletion mm2src/kdf_walletconnect/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub enum WalletConnectCtxError {
SubscriptionError(String),
InternalError(String),
SerdeError(String),
UnsuccessfulResponse(String)
UnsuccessfulResponse(String),
}

impl From<PairingClientError> for WalletConnectCtxError {
Expand Down
21 changes: 18 additions & 3 deletions mm2src/kdf_walletconnect/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,22 @@ use relay_client::{error::ClientError,

pub struct Handler {
name: &'static str,
sender: UnboundedSender<PublishedMessage>,
msg_sender: UnboundedSender<PublishedMessage>,
conn_live_sender: UnboundedSender<()>,
}

impl Handler {
pub fn new(name: &'static str, sender: UnboundedSender<PublishedMessage>) -> Self { Self { name, sender } }
pub fn new(
name: &'static str,
msg_sender: UnboundedSender<PublishedMessage>,
conn_live_sender: UnboundedSender<()>,
) -> Self {
Self {
name,
msg_sender,
conn_live_sender,
}
}
}

impl ConnectionHandler for Handler {
Expand All @@ -19,6 +30,10 @@ impl ConnectionHandler for Handler {

fn disconnected(&mut self, frame: Option<CloseFrame<'static>>) {
info!("\n[{}] connection closed: frame={frame:?}", self.name);

if let Err(e) = self.conn_live_sender.start_send(()) {
info!("\n[{}] failed to send the to the receiver: {e}", self.name);
}
}

fn message_received(&mut self, message: PublishedMessage) {
Expand All @@ -27,7 +42,7 @@ impl ConnectionHandler for Handler {
self.name, message.message_id, message.topic, message.tag, message.message,
);

if let Err(e) = self.sender.start_send(message) {
if let Err(e) = self.msg_sender.start_send(message) {
info!("\n[{}] failed to send the to the receiver: {e}", self.name);
}
}
Expand Down
103 changes: 86 additions & 17 deletions mm2src/kdf_walletconnect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ use mm2_err_handle::prelude::MmResult;
use mm2_err_handle::prelude::*;
use pairing::{process_pairing_delete_response, process_pairing_extend_response, process_pairing_ping_response};
use pairing_api::{Methods, PairingClient};
use rand::rngs::OsRng;
use relay_client::{websocket::{Client, PublishedMessage},
ConnectionOptions, MessageIdGenerator};
use relay_rpc::rpc::params::RelayProtocolMetadata;
use relay_rpc::{auth::{ed25519_dalek::SigningKey, AuthToken},
domain::{MessageId, Topic},
rpc::{params::{IrnMetadata, Metadata, ResponseParamsSuccess},
rpc::{params::{session_propose::SessionProposeRequest, IrnMetadata, Metadata, RequestParams,
ResponseParamsSuccess},
Params, Payload, Request, Response, SuccessfulResponse, JSON_RPC_VERSION_STR}};
use session::{Session, APP_DESCRIPTION, APP_NAME};
use session::{Session, SessionInfo, SessionUserType, APP_DESCRIPTION, APP_NAME};
use session_key::SessionKey;
use std::{sync::Arc, time::Duration};
use wc_common::{decode_and_decrypt_type0, encrypt_and_encode, EnvelopeType};

Expand All @@ -45,7 +49,8 @@ pub struct WalletConnectCtx {
pub client: Client,
pub pairing: PairingClient,
pub session: Session,
pub handler: Arc<Mutex<UnboundedReceiver<PublishedMessage>>>,
pub msg_handler: Arc<Mutex<UnboundedReceiver<PublishedMessage>>>,
pub connection_live_handler: Arc<Mutex<UnboundedReceiver<()>>>,
}

impl Default for WalletConnectCtx {
Expand All @@ -55,15 +60,17 @@ impl Default for WalletConnectCtx {
impl WalletConnectCtx {
pub fn new() -> Self {
let (msg_sender, msg_receiver) = unbounded();
let (conn_live_sender, conn_live_receiver) = unbounded();

let pairing = PairingClient::new();
let client = Client::new(Handler::new("Komodefi", msg_sender));
let client = Client::new(Handler::new("Komodefi", msg_sender, conn_live_sender));

Self {
client,
pairing,
session: Session::new(),
handler: Arc::new(Mutex::new(msg_receiver)),
msg_handler: Arc::new(Mutex::new(msg_receiver)),
connection_live_handler: Arc::new(Mutex::new(conn_live_receiver)),
}
}

Expand All @@ -79,14 +86,14 @@ impl WalletConnectCtx {
let opts = ConnectionOptions::new(PROJECT_ID, auth).with_address(RELAY_ADDRESS);
self.client.connect(&opts).await?;

// let is_connected = self.client.;
info!("WC connected");

Ok(())
}

// todo: return slice
async fn sym_key(&self, topic: &Topic) -> MmResult<Vec<u8>, WalletConnectCtxError> {
println!("sym topic: {topic:?}");
{
let sessions = self.session.lock().await;
if let Some(sesssion) = sessions.get(topic) {
Expand All @@ -103,10 +110,12 @@ impl WalletConnectCtx {
}
}

MmError::err(WalletConnectCtxError::PairingNotFound("Topic not found".to_owned()))
MmError::err(WalletConnectCtxError::PairingNotFound(format!(
"Topic not found:{topic}"
)))
}

pub async fn create_pairing(&self) -> MmResult<(Topic, String), WalletConnectCtxError> {
pub async fn create_pairing(&self) -> MmResult<String, WalletConnectCtxError> {
let metadata = Metadata {
description: APP_DESCRIPTION.to_owned(),
url: "127.0.0.1:3000".to_owned(),
Expand All @@ -117,8 +126,47 @@ impl WalletConnectCtx {
.iter()
.map(|m| m.to_string())
.collect::<Vec<_>>()]);
let (topic, url) = self
.pairing
.create(metadata.clone(), Some(methods), &self.client)
.await?;

let signing_key = SigningKey::generate(&mut OsRng);
let public_key = signing_key.verifying_key();
let session_key = SessionKey::from_osrng(public_key.as_bytes())
.map_to_mm(|err| WalletConnectCtxError::EncodeError(err.to_string()))?;
let session_topic: Topic = session_key.generate_topic().into();
let subscription_id = self
.client
.subscribe(session_topic.clone())
.await
.map_to_mm(|err| WalletConnectCtxError::SubscriptionError(err.to_string()))?;
let session = SessionInfo::new(
subscription_id,
session_key,
topic.clone(),
metadata,
SessionUserType::Proposer,
);

let session_proposal = RequestParams::SessionPropose(SessionProposeRequest {
relays: vec![session.relay.clone()],
proposer: session.proposer.clone(),
required_namespaces: session.namespaces.clone(),
});

// Store the session information in your session manager (self.sessions or similar)
{
let mut sessions = self.session.lock().await;
sessions.insert(session_topic.clone(), session);
}

let irn_metadata = session_proposal.irn_metadata();
self.publish_request(&topic, session_proposal.into(), irn_metadata)
.await?;

Ok(self.pairing.create(metadata, Some(methods), &self.client).await?)
let clean_url = url.replace("&amp;", "&");
Ok(clean_url)
}

pub async fn connect_to_pairing(&self, url: &str, activate: bool) -> MmResult<Topic, WalletConnectCtxError> {
Expand Down Expand Up @@ -194,7 +242,7 @@ impl WalletConnectCtx {
}

pub async fn published_message_event_loop(self: Arc<Self>) {
let mut recv = self.handler.lock().await;
let mut recv = self.msg_handler.lock().await;
while let Some(msg) = recv.next().await {
let message = {
let key = self.sym_key(&msg.topic).await.unwrap();
Expand All @@ -206,7 +254,7 @@ impl WalletConnectCtx {
let response = serde_json::from_str::<Payload>(&message).unwrap();
let result = match response {
Payload::Request(request) => process_inbound_request(self.clone(), request, &msg.topic).await,
Payload::Response(response) => process_inbound_response(self.clone(), response).await,
Payload::Response(response) => process_inbound_response(self.clone(), response, &msg.topic).await,
};

match result {
Expand All @@ -215,6 +263,18 @@ impl WalletConnectCtx {
};
}
}

pub async fn spawn_connection_live_watcher(self: Arc<Self>) {
let mut recv = self.connection_live_handler.lock().await;
while let Some(_msg) = recv.next().await {
info!("connection disconnected, reconnecting");
if let Err(err) = self.connect_client().await {
common::log::error!("{err:?}");
continue;
};
info!("reconnecting success!");
}
}
}

async fn process_inbound_request(
Expand All @@ -223,7 +283,11 @@ async fn process_inbound_request(
topic: &Topic,
) -> MmResult<(), WalletConnectCtxError> {
let response = match request.params {
Params::SessionPropose(proposal) => ctx.session.process_proposal_request(&ctx, proposal).await,
Params::SessionPropose(proposal) => {
ctx.session
.process_proposal_request(&ctx, proposal, topic.clone())
.await
},
Params::SessionExtend(param) => ctx.session.process_session_extend_request(topic, param).await,
Params::SessionDelete(param) => ctx.session.process_session_delete_request(param),
Params::SessionPing(()) => ctx.session.process_session_ping_request(),
Expand All @@ -248,18 +312,21 @@ async fn process_inbound_request(
}

async fn process_inbound_response(
_ctx: Arc<WalletConnectCtx>,
ctx: Arc<WalletConnectCtx>,
response: Response,
topic: &Topic,
) -> MmResult<(), WalletConnectCtxError> {
match response {
Response::Success(value) => {
let params = serde_json::from_value::<ResponseParamsSuccess>(value.result)?;
match params {
ResponseParamsSuccess::SessionPropose(param) => {
info!("Session Propose Response: {param:?}");
todo!()
ctx.session.handle_session_propose_response(topic, param).await;
Ok(())
},
ResponseParamsSuccess::SessionSettle(success) | ResponseParamsSuccess::SessionUpdate(success) |ResponseParamsSuccess::SessionExtend(success)
ResponseParamsSuccess::SessionSettle(success)
| ResponseParamsSuccess::SessionUpdate(success)
| ResponseParamsSuccess::SessionExtend(success)
| ResponseParamsSuccess::SessionRequest(success)
| ResponseParamsSuccess::SessionEvent(success)
| ResponseParamsSuccess::SessionDelete(success)
Expand All @@ -268,7 +335,9 @@ async fn process_inbound_response(
| ResponseParamsSuccess::PairingDelete(success)
| ResponseParamsSuccess::PairingPing(success) => {
if !success {
return MmError::err(WalletConnectCtxError::UnsuccessfulResponse(format!("Unsuccessful response={params:?}")));
return MmError::err(WalletConnectCtxError::UnsuccessfulResponse(format!(
"Unsuccessful response={params:?}"
)));
}

Ok(())
Expand Down
Loading

0 comments on commit 60a2fc5

Please sign in to comment.