Skip to content
This repository was archived by the owner on Feb 3, 2025. It is now read-only.

Commit cbc78f5

Browse files
Merge pull request #41 from BitcoinDevShop/connection
Connection to another node
2 parents e2e2636 + f8ed3db commit cbc78f5

File tree

9 files changed

+101
-53
lines changed

9 files changed

+101
-53
lines changed

.github/workflows/preview.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ jobs:
3535
3636
- name: Build wasm
3737
working-directory: ./node-manager
38-
run: wasm-pack build --target web
38+
run: wasm-pack build --release --target web
3939

4040
- name: Use Node.js 18.x
4141
uses: actions/setup-node@v3

.github/workflows/production.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ jobs:
3535
3636
- name: Build wasm
3737
working-directory: ./node-manager
38-
run: wasm-pack build --target web
38+
run: wasm-pack build --release --target web
3939

4040
- name: Use Node.js 18.x
4141
uses: actions/setup-node@v3

frontend/src/routes/App.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ function App() {
4444
<header className='p-8'>
4545
<img src={logo} className="App-logo" alt="logo" />
4646
<h2>You're probably looking for <a href="/tests">the tests</a></h2>
47+
<p>View the <a href="https://github.com/BitcoinDevShop/mutiny-web-poc">source</a></p>
4748
{wasmSupported ? <p>WASM works!</p> :
4849
<p>
4950
WASM does not seem supported in your browser, this might not work for you!

frontend/src/routes/KitchenSink.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ function App() {
196196
{newPubkey &&
197197
<form onSubmit={connect_peer} className="flex flex-col items-start gap-4 my-4">
198198
<h2>Connect Peer:</h2>
199+
<p>You may want to use "wss://websocket-tcp-proxy-fywbx.ondigitalocean.app" as the example websocket proxy</p>
199200
<input type="text" placeholder='Websocket Proxy Address' onChange={handleProxyAddressChange}></input>
200201
<input type="text" placeholder='Peer Connection String' onChange={handleConnectPeerChange}></input>
201202
<input type="submit" value="Connect" />

justfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
dev:
22
cd ./frontend && npm run start-ssl
33
pack:
4-
wasm-pack build ./node-manager --target web
4+
wasm-pack build ./node-manager --dev --target web
55

66
pack-mac:
7-
AR=/opt/homebrew/opt/llvm/bin/llvm-ar CC=/opt/homebrew/opt/llvm/bin/clang wasm-pack build ./node-manager --target web
7+
AR=/opt/homebrew/opt/llvm/bin/llvm-ar CC=/opt/homebrew/opt/llvm/bin/clang wasm-pack build ./node-manager --dev --target web
88

99
test:
1010
cargo test --package websocket-tcp-proxy

node-manager/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ wasm-bindgen-test = "0.3.13"
5757
web-sys = { version = "0.3.60", features = ["console"] }
5858

5959
[features]
60-
# default = ["console_error_panic_hook" ,"wee_alloc"]
61-
default = ["console_error_panic_hook"]
60+
default = ["console_error_panic_hook" ,"wee_alloc"]
6261

6362
[package.metadata.wasm-pack.profile.release]
64-
wasm-opt = false
63+
wasm-opt = true

node-manager/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use lightning::ln::peer_handler::PeerHandleError;
12
use thiserror::Error;
23
use wasm_bindgen::JsValue;
34

@@ -91,6 +92,13 @@ impl From<bdk::Error> for MutinyError {
9192
}
9293
}
9394

95+
impl From<PeerHandleError> for MutinyError {
96+
fn from(_e: PeerHandleError) -> Self {
97+
// TODO handle the case where `no_connection_possible`
98+
Self::ConnectionFailed
99+
}
100+
}
101+
94102
impl From<MutinyStorageError> for bdk::Error {
95103
fn from(e: MutinyStorageError) -> Self {
96104
match e {

node-manager/src/node.rs

Lines changed: 74 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@ use crate::localstorage::MutinyBrowserStorage;
33
use crate::wallet::{esplora_from_network, MutinyWallet};
44
use bdk::blockchain::EsploraBlockchain;
55
use bitcoin::Network;
6+
use futures::StreamExt;
7+
use gloo_net::websocket::Message;
68
use lightning::chain::{chainmonitor, Filter};
9+
use lightning::ln::msgs::NetAddress;
710
use std::net::{SocketAddr, ToSocketAddrs};
811
use std::str::FromStr;
912
use std::sync::Arc;
13+
use wasm_bindgen_futures::spawn_local;
1014

1115
use crate::tcpproxy::{SocketDescriptor, TcpProxy};
1216
use crate::{
@@ -21,10 +25,10 @@ use bitcoin::secp256k1::PublicKey;
2125
use lightning::chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager, Recipient};
2226
use lightning::ln::peer_handler::{
2327
ErroringMessageHandler, IgnoringMessageHandler, MessageHandler as LdkMessageHandler,
24-
PeerManager as LdkPeerManager,
28+
PeerManager as LdkPeerManager, SocketDescriptor as LdkSocketDescriptor,
2529
};
2630
use lightning::routing::gossip;
27-
use log::info;
31+
use log::{debug, error, info, warn};
2832

2933
pub(crate) type NetworkGraph = gossip::NetworkGraph<Arc<MutinyLogger>>;
3034

@@ -138,9 +142,14 @@ impl Node {
138142
}
139143
};
140144

141-
if connect_peer_if_necessary(websocket_proxy_addr, pubkey, peer_addr)
142-
.await
143-
.is_err()
145+
if connect_peer_if_necessary(
146+
websocket_proxy_addr,
147+
pubkey,
148+
peer_addr,
149+
self.peer_manager.clone(),
150+
)
151+
.await
152+
.is_err()
144153
{
145154
Err(MutinyError::PeerInfoParseFailed)
146155
.with_context(|| format!("could not connect to peer: {pubkey}"))?
@@ -152,10 +161,10 @@ impl Node {
152161

153162
pub(crate) async fn connect_peer_if_necessary(
154163
websocket_proxy_addr: String,
155-
_pubkey: PublicKey,
164+
pubkey: PublicKey,
156165
peer_addr: SocketAddr,
157-
// peer_manager: Arc<PeerManager>,
158-
) -> Result<(), ()> {
166+
peer_manager: Arc<PeerManager>,
167+
) -> Result<(), MutinyError> {
159168
// TODO add this when the peer manager is ready
160169
/*
161170
for node_pubkey in peer_manager.get_peer_node_ids() {
@@ -166,18 +175,68 @@ pub(crate) async fn connect_peer_if_necessary(
166175
*/
167176

168177
// first make a connection to the node
169-
let tcp_proxy = TcpProxy::new(websocket_proxy_addr, peer_addr).await;
170-
171-
// TODO remove the test send
172-
tcp_proxy.send(String::from("test\n").into_bytes().to_vec());
173-
174-
// TODO then give that connection to the peer manager
178+
let tcp_proxy = Arc::new(TcpProxy::new(websocket_proxy_addr, peer_addr).await);
179+
let mut descriptor = SocketDescriptor::new(tcp_proxy);
180+
181+
// then give that connection to the peer manager
182+
let initial_bytes = peer_manager.new_outbound_connection(
183+
pubkey,
184+
descriptor.clone(),
185+
Some(get_net_addr_from_socket(peer_addr)),
186+
)?;
187+
188+
let sent_bytes = descriptor.send_data(&initial_bytes, true);
189+
debug!("sent {sent_bytes} to node: {pubkey}");
190+
191+
// schedule a reader on the connection
192+
let mut new_descriptor = descriptor.clone();
193+
spawn_local(async move {
194+
while let Some(msg) = descriptor.conn.read.lock().await.next().await {
195+
if let Ok(msg_contents) = msg {
196+
match msg_contents {
197+
Message::Text(t) => {
198+
warn!(
199+
"received text from websocket when we should only receive binary: {}",
200+
t
201+
)
202+
}
203+
Message::Bytes(b) => {
204+
debug!("received binary data from websocket");
205+
206+
let read_res = peer_manager.read_event(&mut new_descriptor, &b);
207+
match read_res {
208+
// TODO handle read boolean event
209+
Ok(_read_bool) => {
210+
debug!("read event from the node");
211+
peer_manager.process_events();
212+
}
213+
Err(e) => error!("got an error reading event: {}", e),
214+
}
215+
}
216+
};
217+
}
218+
}
175219

176-
// TODO then schedule a reader on the connection
220+
// TODO when we detect an error, lock the writes and close connection.
221+
debug!("WebSocket Closed")
222+
});
177223

178224
Ok(())
179225
}
180226

227+
fn get_net_addr_from_socket(socket_addr: SocketAddr) -> NetAddress {
228+
match socket_addr {
229+
SocketAddr::V4(sockaddr) => NetAddress::IPv4 {
230+
addr: sockaddr.ip().octets(),
231+
port: sockaddr.port(),
232+
},
233+
SocketAddr::V6(sockaddr) => NetAddress::IPv6 {
234+
addr: sockaddr.ip().octets(),
235+
port: sockaddr.port(),
236+
},
237+
}
238+
}
239+
181240
pub(crate) fn create_peer_manager(
182241
km: Arc<KeysManager>,
183242
lightning_msg_handler: MessageHandler,

node-manager/src/tcpproxy.rs

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,48 +2,28 @@ use std::hash::Hash;
22
use std::sync::atomic::{AtomicU64, Ordering};
33
use std::{net::SocketAddr, sync::Arc};
44

5+
use futures::stream::SplitStream;
56
use futures::{lock::Mutex, stream::SplitSink, SinkExt, StreamExt};
67
use gloo_net::websocket::{futures::WebSocket, Message};
78
use lightning::ln::peer_handler;
89
use log::{debug, info};
910
use wasm_bindgen_futures::spawn_local;
1011

1112
pub struct TcpProxy {
12-
connection: WsSplit,
13+
pub write: WsSplit,
14+
pub read: ReadSplit,
1315
}
1416

1517
type WsSplit = Arc<Mutex<SplitSink<WebSocket, Message>>>;
18+
type ReadSplit = Arc<Mutex<SplitStream<WebSocket>>>;
1619

1720
impl TcpProxy {
1821
pub async fn new(proxy_url: String, peer_addr: SocketAddr) -> Self {
1922
let ws = WebSocket::open(String::as_str(&proxy_to_url(proxy_url, peer_addr))).unwrap();
20-
let (write, mut read) = ws.split();
21-
22-
spawn_local(async move {
23-
// TODO callback or pass bytes over to some stream reader.
24-
// need to figure out how LDK wants this incoming data.
25-
while let Some(msg) = read.next().await {
26-
if let Ok(msg_contents) = msg {
27-
match msg_contents {
28-
Message::Text(t) => {
29-
info!("receive text from websocket {}", t)
30-
}
31-
Message::Bytes(b) => {
32-
info!(
33-
"receive binary from websocket {}",
34-
String::from_utf8_lossy(&b)
35-
)
36-
}
37-
};
38-
}
39-
}
40-
41-
// TODO when we detect an error, lock the writes and close connection.
42-
debug!("WebSocket Closed")
43-
});
44-
23+
let (write, read) = ws.split();
4524
TcpProxy {
46-
connection: Arc::new(Mutex::new(write)),
25+
write: Arc::new(Mutex::new(write)),
26+
read: Arc::new(Mutex::new(read)),
4727
}
4828
}
4929

@@ -53,7 +33,7 @@ impl TcpProxy {
5333
// There can only be one sender at a time
5434
// Cannot send and write at the same time either
5535
// TODO check if the connection is closed before trying to send.
56-
let cloned_conn = self.connection.clone();
36+
let cloned_conn = self.write.clone();
5737
spawn_local(async move {
5838
let mut write = cloned_conn.lock().await;
5939
write.send(Message::Bytes(data)).await.unwrap();
@@ -73,11 +53,11 @@ fn proxy_to_url(proxy_url: String, peer_addr: SocketAddr) -> String {
7353
static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
7454

7555
pub struct SocketDescriptor {
76-
conn: Arc<TcpProxy>,
56+
pub conn: Arc<TcpProxy>,
7757
id: u64,
7858
}
7959
impl SocketDescriptor {
80-
fn new(conn: Arc<TcpProxy>) -> Self {
60+
pub fn new(conn: Arc<TcpProxy>) -> Self {
8161
let id = ID_COUNTER.fetch_add(1, Ordering::AcqRel);
8262
Self { conn, id }
8363
}
@@ -90,7 +70,7 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
9070
}
9171

9272
fn disconnect_socket(&mut self) {
93-
let cloned = self.conn.connection.clone();
73+
let cloned = self.conn.write.clone();
9474
spawn_local(async move {
9575
let mut conn = cloned.lock().await;
9676
let _ = conn.close();

0 commit comments

Comments
 (0)