diff --git a/Cargo.lock b/Cargo.lock index 30a01aa784a..f273ee7fbd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2196,10 +2196,15 @@ dependencies = [ "bincode", "bytecodec", "bytes", + "clap 4.5.20", "dashmap", "dirs", + "futures", + "nym-bin-common", + "nym-crypto", "nym-sdk", "serde", + "tempfile", "tokio", "tokio-stream", "tokio-util", @@ -4061,6 +4066,30 @@ dependencies = [ "wasm-utils", ] +[[package]] +name = "mixnet-check-all-gateways" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "clap 4.5.20", + "dirs", + "echo-server", + "futures", + "nym-bin-common", + "nym-crypto", + "nym-sdk", + "reqwest 0.12.4", + "serde", + "serde_json", + "tempfile", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", + "uuid", +] + [[package]] name = "mixnet-connectivity-check" version = "0.1.0" @@ -5310,7 +5339,7 @@ dependencies = [ [[package]] name = "nym-ffi-shared" -version = "0.2.0" +version = "0.2.1" dependencies = [ "anyhow", "bs58", @@ -5471,7 +5500,7 @@ dependencies = [ [[package]] name = "nym-go-ffi" -version = "0.2.0" +version = "0.2.1" dependencies = [ "anyhow", "lazy_static", @@ -6149,6 +6178,7 @@ dependencies = [ "reqwest 0.12.4", "serde", "tap", + "tempfile", "thiserror", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 122784eb5a8..a0b0fe957d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,7 +128,7 @@ members = [ "nym-node-status-api/nym-node-status-client", "nym-outfox", "nym-validator-rewarder", - "tools/echo-server", + # "tools/echo-server", "tools/internal/ssl-inject", # "tools/internal/sdk-version-bump", "tools/internal/testnet-manager", @@ -149,7 +149,7 @@ members = [ "tools/internal/testnet-manager", "tools/internal/testnet-manager/dkg-bypass-contract", "common/verloc", - "tools/internal/mixnet-connectivity-check", + "tools/internal/mixnet-connectivity-check", "tools/internal/mixnet-check-all-gateways", ] default-members = [ diff --git a/documentation/docs/pages/developers/rust/_meta.json b/documentation/docs/pages/developers/rust/_meta.json index 3fca1da31fc..d193319720a 100644 --- a/documentation/docs/pages/developers/rust/_meta.json +++ b/documentation/docs/pages/developers/rust/_meta.json @@ -3,6 +3,7 @@ "development-status": "Development Status", "mixnet": "Mixnet Module", "tcpproxy": "TcpProxy Module", + "client-pool": "Client Pool", "ffi": "FFI", "tutorials": "Tutorials (Coming Soon)" } diff --git a/documentation/docs/pages/developers/rust/client-pool.mdx b/documentation/docs/pages/developers/rust/client-pool.mdx new file mode 100644 index 00000000000..584b642f719 --- /dev/null +++ b/documentation/docs/pages/developers/rust/client-pool.mdx @@ -0,0 +1,7 @@ +# Client Pool + +We have a configurable-size Client Pool for processes that require multiple clients in quick succession (this is used by default by the [`TcpProxyClient`](./tcpproxy) for instance) + +This will be useful for developers looking to build connection logic, or just are using raw SDK clients in a sitatuation where there are multiple connections with a lot of churn. + +> You can find this code [here](https://github.com/nymtech/nym/blob/develop/sdk/rust/nym-sdk/examples/client_pool.rs) diff --git a/documentation/docs/pages/developers/rust/client-pool/_meta.json b/documentation/docs/pages/developers/rust/client-pool/_meta.json new file mode 100644 index 00000000000..3d6dc8b87b3 --- /dev/null +++ b/documentation/docs/pages/developers/rust/client-pool/_meta.json @@ -0,0 +1,4 @@ +{ + "architecture": "Architecture", + "example": "Example" +} diff --git a/documentation/docs/pages/developers/rust/client-pool/architecture.mdx b/documentation/docs/pages/developers/rust/client-pool/architecture.mdx new file mode 100644 index 00000000000..4159796c1e3 --- /dev/null +++ b/documentation/docs/pages/developers/rust/client-pool/architecture.mdx @@ -0,0 +1,19 @@ +# Client Pool Architecture + +## Motivations +In situations where multiple connections are expected, and the number of connections can vary greatly, the Client Pool reduces time spent waiting for the creation of a Mixnet Client blocking your code sending traffic through the Mixnet. Instead, a configurable number of Clients can be generated and run in the background which can be very quickly grabbed, used, and disconnected. + +The Pool can be simply run as a background process for the runtime of your program. + +## Clients & Lifetimes +The Client Pool creates **ephemeral Mixnet Clients** which are used and then disconnected. Using the [`TcpProxy`](../tcpproxy) as an example, Clients are used for the lifetime of a single incoming TCP connection; after the TCP connection is closed, the Mixnet client is disconnected. + +Clients are popped from the pool when in use, and another Client is created to take its place. If connections are coming in faster than Clients are replenished, you can instead generate an ephemeral Client on the fly, or wait; this is up to the developer to decide. You can see an example of this logic in the example on the next page. + +## Runtime Loop +Aside from a few helper / getter functions and a graceful `disconnect_pool()`, the Client Pool is mostly made up of a very simple loop around some conditional logic making up `start()`: +- if the number of Clients in the pool is `< client_pool_reserve_number` (set on `new()`) then create more, +- if the number of Clients in the pool `== client_pool_reserve_number` (set on `new()`) then `sleep`, +- if `client_pool_reserve_number == 0` just `sleep`. + +`disconnect_pool()` will cause this loop to `break` via cancellation token. diff --git a/documentation/docs/pages/developers/rust/client-pool/example.md b/documentation/docs/pages/developers/rust/client-pool/example.md new file mode 100644 index 00000000000..11258658950 --- /dev/null +++ b/documentation/docs/pages/developers/rust/client-pool/example.md @@ -0,0 +1,100 @@ +# Client Pool Example + +> You can find this code [here](https://github.com/nymtech/nym/blob/develop/sdk/rust/nym-sdk/examples/client_pool.rs) + +```rust +use anyhow::Result; +use nym_network_defaults::setup_env; +use nym_sdk::client_pool::ClientPool; +use nym_sdk::mixnet::{MixnetClientBuilder, NymNetworkDetails}; +use tokio::signal::ctrl_c; + +// This client pool is used internally by the TcpProxyClient but can also be used by the Mixnet module, in case you're quickly swapping clients in and out but won't want to use the TcpProxy module. +// +// Run with: cargo run --example client_pool -- ../../../envs/.env +#[tokio::main] +async fn main() -> Result<()> { + nym_bin_common::logging::setup_logging(); + setup_env(std::env::args().nth(1)); + + let conn_pool = ClientPool::new(2); // Start the Client Pool with 2 Clients always being kept in reserve + let client_maker = conn_pool.clone(); + tokio::spawn(async move { + client_maker.start().await?; + Ok::<(), anyhow::Error>(()) + }); + + println!("\n\nWaiting a few seconds to fill pool\n\n"); + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + + let pool_clone_one = conn_pool.clone(); + let pool_clone_two = conn_pool.clone(); + + tokio::spawn(async move { + let client_one = match pool_clone_one.get_mixnet_client().await { + Some(client) => { + println!("Grabbed client {} from pool", client.nym_address()); + client + } + None => { + println!("Not enough clients in pool, creating ephemeral client"); + let net = NymNetworkDetails::new_from_env(); + let client = MixnetClientBuilder::new_ephemeral() + .network_details(net) + .build()? + .connect_to_mixnet() + .await?; + println!( + "Using {} for the moment, created outside of the connection pool", + client.nym_address() + ); + client + } + }; + let our_address = client_one.nym_address(); + println!("\n\nClient 1: {our_address}\n\n"); + client_one.disconnect().await; + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; // Emulate doing something + return Ok::<(), anyhow::Error>(()); + }); + + tokio::spawn(async move { + let client_two = match pool_clone_two.get_mixnet_client().await { + Some(client) => { + println!("Grabbed client {} from pool", client.nym_address()); + client + } + None => { + println!("Not enough clients in pool, creating ephemeral client"); + let net = NymNetworkDetails::new_from_env(); + let client = MixnetClientBuilder::new_ephemeral() + .network_details(net) + .build()? + .connect_to_mixnet() + .await?; + println!( + "Using {} for the moment, created outside of the connection pool", + client.nym_address() + ); + client + } + }; + let our_address = *client_two.nym_address(); + println!("\n\nClient 2: {our_address}\n\n"); + client_two.disconnect().await; + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; // Emulate doing something + return Ok::<(), anyhow::Error>(()); + }); + + wait_for_ctrl_c(conn_pool).await?; + Ok(()) +} + +async fn wait_for_ctrl_c(pool: ClientPool) -> Result<()> { + println!("\n\nPress CTRL_C to disconnect pool\n\n"); + ctrl_c().await?; + println!("CTRL_C received. Killing client pool"); + pool.disconnect_pool().await; + Ok(()) +} +``` diff --git a/documentation/docs/pages/developers/rust/development-status.md b/documentation/docs/pages/developers/rust/development-status.md index 71ec96057c1..cde8d04d720 100644 --- a/documentation/docs/pages/developers/rust/development-status.md +++ b/documentation/docs/pages/developers/rust/development-status.md @@ -7,9 +7,12 @@ In the future the SDK will be made up of several modules, each of which will all |-----------|---------------------------------------------------------------------------------------|----------| | Mixnet | Create / load clients & keypairs, subscribe to Mixnet events, send & receive messages | ✔️ | | TcpProxy | Utilise the TcpProxyClient and TcpProxyServer abstractions for streaming | ✔️ | +| ClientPool| Create a pool of quickly useable Mixnet clients | ✔️ | | Ecash | Create & verify Ecash credentials | ❌ | | Validator | Sign & broadcast Nyx blockchain transactions, query the blockchain | ❌ | The `Mixnet` module currently exposes the logic of two clients: the [websocket client](../clients/websocket), and the [socks client](../clients/socks5). The `TcpProxy` module exposes functionality to set up client/server instances that expose a localhost TcpSocket to read/write to. + +The `ClientPool` is a configurable pool of ephemeral clients which can be created as a background process and quickly grabbed. diff --git a/documentation/docs/pages/developers/rust/ffi.mdx b/documentation/docs/pages/developers/rust/ffi.mdx index 0f4aba8fa01..848ca1755e3 100644 --- a/documentation/docs/pages/developers/rust/ffi.mdx +++ b/documentation/docs/pages/developers/rust/ffi.mdx @@ -22,7 +22,7 @@ The main functionality of exposed functions will be imported from `sdk/ffi/share Furthermore, the `shared/` code makes sure that client access is thread-safe, and that client actions happen in blocking threads on the Rust side of the FFI boundary. -### Mixnet Module +## Mixnet Module This is the basic mixnet component of the SDK, exposing client functionality with which people can build custom interfaces with the Mixnet. These functions are exposed to both Go and C/C++ via the `sdk/ffi/shared/` crate. | `shared/lib.rs` function | Rust Function | @@ -36,13 +36,13 @@ This is the basic mixnet component of the SDK, exposing client functionality wit > We have also implemented `listen_for_incoming_internal()` which is a wrapper around the Mixnet client's `wait_for_messages()`. This is a helper method for listening out for and handling incoming messages. -#### Currently Unsupported Functionality +### Currently Unsupported Functionality At the time of writing the following functionality is not exposed to the shared FFI library: - `split_sender()`: the ability to [split a client into sender and receiver](./mixnet/examples/split-send) for concurrent send/receive. - The use of [custom network topologies](./mixnet/examples/custom-topology). - `Socks5::new()`: creation and use of the [socks5/4a/4 proxy client](./mixnet/examples/socks). -### TcpProxy Module +## TcpProxy Module A connection abstraction which exposes a local TCP socket which developers are able to interact with basically as expected, being able to read/write to/from a bytestream, without really having to take into account the workings of the Mixnet/Sphinx/the [message-based](../concepts/messages) format of the underlying client. @@ -58,3 +58,6 @@ A connection abstraction which exposes a local TCP socket which developers are a | `proxy_server_new_internal(upstream_address: &str, config_dir: &str, env: Option)` | `NymProxyServer::new(upstream_address, config_dir, env)` | | `proxy_server_run_internal()` | `NymProxyServer.run_with_shutdown()` | | `proxy_server_address_internal()` | `NymProxyServer.nym_address()` | + +## Client Pool +There are currently no FFI bindings for the Client Pool. This will be coming in the future. The bindings for the TcpProxy have been updated to be able to use the Client Pool under the hood, but the standalone Pool is not yet exposed to FFI. diff --git a/documentation/docs/pages/developers/rust/mixnet/troubleshooting.md b/documentation/docs/pages/developers/rust/mixnet/troubleshooting.md index 06bba32f6d7..ff7b10577aa 100644 --- a/documentation/docs/pages/developers/rust/mixnet/troubleshooting.md +++ b/documentation/docs/pages/developers/rust/mixnet/troubleshooting.md @@ -112,3 +112,6 @@ Whether the `data` of a SURB request being empty is a feature or a bug is to be You can find a few helper functions [here](./message-helpers.md) to help deal with this issue in the meantime. > If you can think of a more succinct or different way of handling this do reach out - we're happy to hear other opinions + +## Lots of `duplicate fragment received` messages +You might see a lot of `WARN` level logs about duplicate fragments in your logs, depending on the log level you're using. This occurs when a packet is retransmitted somewhere in the Mixnet, but then the original makes it to the destination client as well. This is not something to do with your client logic, but instead the state of the Mixnet. diff --git a/documentation/docs/pages/developers/rust/tcpproxy/_meta.json b/documentation/docs/pages/developers/rust/tcpproxy/_meta.json index 4ee4d522ebc..7b7e6bb8217 100644 --- a/documentation/docs/pages/developers/rust/tcpproxy/_meta.json +++ b/documentation/docs/pages/developers/rust/tcpproxy/_meta.json @@ -1,4 +1,5 @@ { "architecture": "Architecture", - "examples": "Examples" + "examples": "Examples", + "troubleshooting": "Troubleshooting" } diff --git a/documentation/docs/pages/developers/rust/tcpproxy/architecture.mdx b/documentation/docs/pages/developers/rust/tcpproxy/architecture.mdx index 862d00c90fc..006cd1ce6ae 100644 --- a/documentation/docs/pages/developers/rust/tcpproxy/architecture.mdx +++ b/documentation/docs/pages/developers/rust/tcpproxy/architecture.mdx @@ -13,7 +13,7 @@ The motivation behind the creation of the `TcpProxy` module is to allow develope ## Clients Each of the sub-modules exposed by the `TcpProxy` deal with Nym clients in a different way. -- the `NymProxyClient` creates an ephemeral client per new TCP connection, which is closed according to the configurable timeout: we map one ephemeral client per TCP connection. This is to deal with multiple simultaneous streams. In the future, this will be superceded by a connection pool in order to speed up new connections. +- the `NymProxyClient` relies on the [`Client Pool`](../client-pool) to create clients and keep a certain number of them in reserve. If the amount of incoming TCP connections rises quicker than the Client Pool can create clients, or you have the pool size set to `0`, the `TcpProxyClient` creates an ephemeral client per new TCP connection, which is closed according to the configurable timeout: we map one ephemeral client per TCP connection. This is to deal with multiple simultaneous streams. - the `NymProxyServer` has a single Nym client with a persistent identity. ## Framing diff --git a/documentation/docs/pages/developers/rust/tcpproxy/examples/multiconn.mdx b/documentation/docs/pages/developers/rust/tcpproxy/examples/multiconn.mdx index 3a97858119d..a003b1bee8d 100644 --- a/documentation/docs/pages/developers/rust/tcpproxy/examples/multiconn.mdx +++ b/documentation/docs/pages/developers/rust/tcpproxy/examples/multiconn.mdx @@ -18,6 +18,8 @@ use tokio::net::TcpStream; use tokio::signal; use tokio_stream::StreamExt; use tokio_util::codec; +use tokio_util::sync::CancellationToken; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[derive(Serialize, Deserialize, Debug)] struct ExampleMessage { @@ -26,6 +28,8 @@ struct ExampleMessage { tcp_conn: i8, } +// This example just starts off a bunch of Tcp connections on a loop to a remote endpoint: in this case the TcpListener behind the NymProxyServer instance on the echo server found in `nym/tools/echo-server/`. It pipes a few messages to it, logs the replies, and keeps track of the number of replies received per connection. +// // To run: // - run the echo server with `cargo run` // - run this example with `cargo run --example tcp_proxy_multistream -- ` e.g. @@ -40,8 +44,13 @@ async fn main() -> anyhow::Result<()> { // Nym client logging is very informative but quite verbose. // The Message Decay related logging gives you an ideas of the internals of the proxy message ordering: you need to switch // to DEBUG to see the contents of the msg buffer, sphinx packet chunking, etc. - tracing_subscriber::fmt() - .with_max_level(tracing::Level::INFO) + tracing_subscriber::registry() + .with(fmt::layer()) + .with( + EnvFilter::new("info") + .add_directive("nym_sdk::client_pool=info".parse().unwrap()) + .add_directive("nym_sdk::tcp_proxy_client=debug".parse().unwrap()), + ) .init(); let env_path = env::args().nth(2).expect("Env file not specified"); @@ -49,23 +58,42 @@ async fn main() -> anyhow::Result<()> { let listen_port = env::args().nth(3).expect("Port not specified"); - // Within the TcpProxyClient, individual client shutdown is triggered by the timeout. + // Within the TcpProxyClient, individual client shutdown is triggered by the timeout. The final argument is how many clients to keep in reserve in the client pool when running the TcpProxy. let proxy_client = - tcp_proxy::NymProxyClient::new(server, "127.0.0.1", &listen_port, 45, Some(env)).await?; + tcp_proxy::NymProxyClient::new(server, "127.0.0.1", &listen_port, 45, Some(env), 2).await?; + + // For our disconnect() logic below + let proxy_clone = proxy_client.clone(); tokio::spawn(async move { proxy_client.run().await?; Ok::<(), anyhow::Error>(()) }); + let example_cancel_token = CancellationToken::new(); + let client_cancel_token = example_cancel_token.clone(); + let watcher_cancel_token = example_cancel_token.clone(); + + // Cancel listener thread + tokio::spawn(async move { + signal::ctrl_c().await?; + println!(":: CTRL_C received, shutting down + cleanup up proxy server config files"); + watcher_cancel_token.cancel(); + proxy_clone.disconnect().await; + Ok::<(), anyhow::Error>(()) + }); + println!("waiting for everything to be set up.."); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; println!("done. sending bytes"); // In the info traces you will see the different session IDs being set up, one for each TcpStream. - for i in 0..4 { + for i in 0..8 { + let client_cancel_inner_token = client_cancel_token.clone(); + if client_cancel_token.is_cancelled() { + break; + } let conn_id = i; - println!("Starting TCP connection {}", conn_id); let local_tcp_addr = format!("127.0.0.1:{}", listen_port.clone()); tokio::spawn(async move { // Now the client and server proxies are running we can create and pipe traffic to/from @@ -81,7 +109,10 @@ async fn main() -> anyhow::Result<()> { // Lets just send a bunch of messages to the server with variable delays between them, with a message and tcp connection ids to keep track of ordering on the server side (for illustrative purposes **only**; keeping track of anonymous replies is handled by the proxy under the hood with Single Use Reply Blocks (SURBs); for this illustration we want some kind of app-level message id, but irl most of the time you'll probably be parsing on e.g. the incoming response type instead) tokio::spawn(async move { - for i in 0..4 { + for i in 0..8 { + if client_cancel_inner_token.is_cancelled() { + break; + } let mut rng = SmallRng::from_entropy(); let delay: f64 = rng.gen_range(2.5..5.0); tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await; @@ -96,12 +127,7 @@ async fn main() -> anyhow::Result<()> { .write_all(&serialised) .await .expect("couldn't write to stream"); - println!( - ">> client sent {}: {} bytes on conn {}", - &i, - msg.message_bytes.len(), - &conn_id - ); + println!(">> client sent msg {} on conn {}", &i, &conn_id); } Ok::<(), anyhow::Error>(()) }); @@ -113,17 +139,8 @@ async fn main() -> anyhow::Result<()> { while let Some(Ok(bytes)) = framed_read.next().await { match bincode::deserialize::(&bytes) { Ok(msg) => { - println!( - "<< client received {}: {} bytes on conn {}", - msg.message_id, - msg.message_bytes.len(), - msg.tcp_conn - ); reply_counter += 1; - println!( - "tcp connection {} replies received {}/4", - msg.tcp_conn, reply_counter - ); + println!("<< conn {} received {}/8", msg.tcp_conn, reply_counter); } Err(e) => { println!("<< client received something that wasn't an example message of {} bytes. error: {}", bytes.len(), e); @@ -138,15 +155,12 @@ async fn main() -> anyhow::Result<()> { tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await; } - // Once timeout is passed, you can either wait for graceful shutdown or just hard stop it. - signal::ctrl_c().await?; - println!("CTRL+C received, shutting down"); Ok(()) } // emulate a series of small messages followed by a closing larger one fn gen_bytes_fixed(i: usize) -> Vec { - let amounts = [10, 15, 50, 1000]; + let amounts = [10, 15, 50, 1000, 10, 15, 500, 2000]; let len = amounts[i]; let mut rng = rand::thread_rng(); (0..len).map(|_| rng.gen::()).collect() diff --git a/documentation/docs/pages/developers/rust/tcpproxy/examples/singleconn.mdx b/documentation/docs/pages/developers/rust/tcpproxy/examples/singleconn.mdx index 1ee11968684..f5f1ce8c7ff 100644 --- a/documentation/docs/pages/developers/rust/tcpproxy/examples/singleconn.mdx +++ b/documentation/docs/pages/developers/rust/tcpproxy/examples/singleconn.mdx @@ -21,6 +21,8 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::signal; use tokio_stream::StreamExt; use tokio_util::codec; +use tokio_util::sync::CancellationToken; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[derive(Serialize, Deserialize, Debug)] struct ExampleMessage { @@ -28,7 +30,14 @@ struct ExampleMessage { message_bytes: Vec, } - +// This is a basic example which opens a single TCP connection and writes a bunch of messages between a client and an echo +// server, so only uses a single session under the hood and doesn't really show off the message ordering capabilities; this is mainly +// just a quick introductory illustration on how: +// - the mixnet does message ordering +// - the NymProxyClient and NymProxyServer can be hooked into and used to communicate between two otherwise pretty vanilla TcpStreams +// +// For a more irl example checkout tcp_proxy_multistream.rs +// // Run this with: // `cargo run --example tcp_proxy_single_connection ` e.g. // `cargo run --example tcp_proxy_single_connection 8081 ../../../envs/canary.env 8080 ` @@ -39,8 +48,9 @@ async fn main() -> anyhow::Result<()> { // Comment this out to just see println! statements from this example, as Nym client logging is very informative but quite verbose. // The Message Decay related logging gives you an ideas of the internals of the proxy message ordering. To see the contents of the msg buffer, sphinx packet chunking, etc change the tracing::Level to DEBUG. - tracing_subscriber::fmt() - .with_max_level(tracing::Level::INFO) + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::new("nym_sdk::tcp_proxy=info")) .init(); let server_port = env::args() @@ -63,10 +73,14 @@ async fn main() -> anyhow::Result<()> { // We'll run the instance with a long timeout since we're sending everything down the same Tcp connection, so should be using a single session. // Within the TcpProxyClient, individual client shutdown is triggered by the timeout. + // The final argument is how many clients to keep in reserve in the client pool when running the TcpProxy. let proxy_client = - tcp_proxy::NymProxyClient::new(*proxy_nym_addr, "127.0.0.1", &client_port, 60, Some(env)) + tcp_proxy::NymProxyClient::new(*proxy_nym_addr, "127.0.0.1", &client_port, 5, Some(env), 1) .await?; + // For our disconnect() logic below + let proxy_clone = proxy_client.clone(); + tokio::spawn(async move { proxy_server.run_with_shutdown().await?; Ok::<(), anyhow::Error>(()) @@ -77,10 +91,28 @@ async fn main() -> anyhow::Result<()> { Ok::<(), anyhow::Error>(()) }); + let example_cancel_token = CancellationToken::new(); + let server_cancel_token = example_cancel_token.clone(); + let client_cancel_token = example_cancel_token.clone(); + let watcher_cancel_token = example_cancel_token.clone(); + + // Cancel listener thread + tokio::spawn(async move { + signal::ctrl_c().await?; + println!(":: CTRL_C received, shutting down + cleanup up proxy server config files"); + fs::remove_dir_all(conf_path)?; + watcher_cancel_token.cancel(); + proxy_clone.disconnect().await; + Ok::<(), anyhow::Error>(()) + }); + // 'Server side' thread: echo back incoming as response to the messages sent in the 'client side' thread below tokio::spawn(async move { let listener = TcpListener::bind(upstream_tcp_addr).await?; loop { + if server_cancel_token.is_cancelled() { + break; + } let (socket, _) = listener.accept().await.unwrap(); let (read, mut write) = socket.into_split(); let codec = codec::BytesCodec::new(); @@ -118,9 +150,9 @@ async fn main() -> anyhow::Result<()> { Ok::<(), anyhow::Error>(()) }); - // Just wait for Nym clients to connect, TCP clients to bind, etc. + // Just wait for Nym clients to connect, TCP clients to bind, etc. If there isn't a client in the pool (or you started it with 0) already then the TcpProxyClient just spins up an ephemeral client itself. println!("waiting for everything to be set up.."); - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; println!("done. sending bytes"); // Now the client and server proxies are running we can create and pipe traffic to/from @@ -140,6 +172,9 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(async move { let mut rng = SmallRng::from_entropy(); for i in 0..10 { + if client_cancel_token.is_cancelled() { + break; + } let random_bytes = gen_bytes_fixed(i as usize); let msg = ExampleMessage { message_id: i, @@ -179,15 +214,12 @@ async fn main() -> anyhow::Result<()> { } } - // Once timeout is passed, you can either wait for graceful shutdown or just hard stop it. - signal::ctrl_c().await?; - println!(":: CTRL+C received, shutting down + cleanup up proxy server config files"); - fs::remove_dir_all(conf_path)?; Ok(()) } fn gen_bytes_fixed(i: usize) -> Vec { - let amounts = vec![1, 10, 50, 100, 150, 200, 350, 500, 750, 1000]; + // let amounts = vec![1, 10, 50, 100, 150, 200, 350, 500, 750, 1000]; + let amounts = [158, 1088, 505, 1001, 150, 200, 3500, 500, 750, 100]; let len = amounts[i]; let mut rng = rand::thread_rng(); (0..len).map(|_| rng.gen::()).collect() diff --git a/documentation/docs/pages/developers/rust/tcpproxy/troubleshooting.md b/documentation/docs/pages/developers/rust/tcpproxy/troubleshooting.md new file mode 100644 index 00000000000..8a3d6d5c233 --- /dev/null +++ b/documentation/docs/pages/developers/rust/tcpproxy/troubleshooting.md @@ -0,0 +1,4 @@ +# Troubleshooting + +## Lots of `duplicate fragment received` messages +You might see a lot of `WARN` level logs about duplicate fragments in your logs, depending on the log level you're using. This occurs when a packet is retransmitted somewhere in the Mixnet, but then the original makes it to the destination client as well. This is not something to do with your client logic, but instead the state of the Mixnet. diff --git a/documentation/docs/pages/developers/tools/standalone-tcpproxy.mdx b/documentation/docs/pages/developers/tools/standalone-tcpproxy.mdx index 177a080a5cb..659b1c70de5 100644 --- a/documentation/docs/pages/developers/tools/standalone-tcpproxy.mdx +++ b/documentation/docs/pages/developers/tools/standalone-tcpproxy.mdx @@ -39,6 +39,8 @@ Options: Listen port [default: 8080] -e, --env-path Optional env filepath - if none is supplied then the proxy defaults to using mainnet else just use a path to one of the supplied files in envs/ e.g. ./envs/sandbox.env + --client-pool-reserve + How many clients to have running in reserve for quick access by incoming connections [default: 2] -h, --help Print help ``` diff --git a/sdk/ffi/go/Cargo.toml b/sdk/ffi/go/Cargo.toml index e1aff9facac..3f20dec3cb4 100644 --- a/sdk/ffi/go/Cargo.toml +++ b/sdk/ffi/go/Cargo.toml @@ -1,12 +1,12 @@ [package] -name = "nym-go-ffi" #"goffitest" -version = "0.2.0" +name = "nym-go-ffi" +version = "0.2.1" edition = "2021" license.workspace = true [lib] crate-type = ["cdylib"] -name = "nym_go_ffi" #"go_ffi" +name = "nym_go_ffi" [dependencies] # Bindgen diff --git a/sdk/ffi/go/go-nym/bindings/bindings.go b/sdk/ffi/go/go-nym/bindings/bindings.go index 100e4a5bf22..00ad0e3a826 100644 --- a/sdk/ffi/go/go-nym/bindings/bindings.go +++ b/sdk/ffi/go/go-nym/bindings/bindings.go @@ -383,7 +383,7 @@ func uniffiCheckChecksums() { checksum := rustCall(func(uniffiStatus *C.RustCallStatus) C.uint16_t { return C.uniffi_nym_go_ffi_checksum_func_new_proxy_client(uniffiStatus) }) - if checksum != 14386 { + if checksum != 38844 { // If this happens try cleaning and rebuilding your project panic("bindings: uniffi_nym_go_ffi_checksum_func_new_proxy_client: UniFFI API checksum mismatch") } @@ -453,6 +453,30 @@ func uniffiCheckChecksums() { } } +type FfiConverterUint8 struct{} + +var FfiConverterUint8INSTANCE = FfiConverterUint8{} + +func (FfiConverterUint8) Lower(value uint8) C.uint8_t { + return C.uint8_t(value) +} + +func (FfiConverterUint8) Write(writer io.Writer, value uint8) { + writeUint8(writer, value) +} + +func (FfiConverterUint8) Lift(value C.uint8_t) uint8 { + return uint8(value) +} + +func (FfiConverterUint8) Read(reader io.Reader) uint8 { + return readUint8(reader) +} + +type FfiDestroyerUint8 struct{} + +func (FfiDestroyerUint8) Destroy(_ uint8) {} + type FfiConverterUint64 struct{} var FfiConverterUint64INSTANCE = FfiConverterUint64{} @@ -963,9 +987,9 @@ func ListenForIncoming() (IncomingMessage, error) { } } -func NewProxyClient(serverAddress string, listenAddress string, listenPort string, closeTimeout uint64, env *string) error { +func NewProxyClient(serverAddress string, listenAddress string, listenPort string, closeTimeout uint64, env *string, poolSize uint8) error { _, _uniffiErr := rustCallWithError(FfiConverterTypeGoWrapError{}, func(_uniffiStatus *C.RustCallStatus) bool { - C.uniffi_nym_go_ffi_fn_func_new_proxy_client(FfiConverterStringINSTANCE.Lower(serverAddress), FfiConverterStringINSTANCE.Lower(listenAddress), FfiConverterStringINSTANCE.Lower(listenPort), FfiConverterUint64INSTANCE.Lower(closeTimeout), FfiConverterOptionalStringINSTANCE.Lower(env), _uniffiStatus) + C.uniffi_nym_go_ffi_fn_func_new_proxy_client(FfiConverterStringINSTANCE.Lower(serverAddress), FfiConverterStringINSTANCE.Lower(listenAddress), FfiConverterStringINSTANCE.Lower(listenPort), FfiConverterUint64INSTANCE.Lower(closeTimeout), FfiConverterOptionalStringINSTANCE.Lower(env), FfiConverterUint8INSTANCE.Lower(poolSize), _uniffiStatus) return false }) return _uniffiErr diff --git a/sdk/ffi/go/go-nym/bindings/bindings.h b/sdk/ffi/go/go-nym/bindings/bindings.h index 9e586da85e8..1a28b13dc47 100644 --- a/sdk/ffi/go/go-nym/bindings/bindings.h +++ b/sdk/ffi/go/go-nym/bindings/bindings.h @@ -90,6 +90,7 @@ void uniffi_nym_go_ffi_fn_func_new_proxy_client( RustBuffer listen_port, uint64_t close_timeout, RustBuffer env, + uint8_t pool_size, RustCallStatus* out_status ); diff --git a/sdk/ffi/go/proxy_example.go b/sdk/ffi/go/proxy_example.go index c9e0667562b..b979410f20c 100644 --- a/sdk/ffi/go/proxy_example.go +++ b/sdk/ffi/go/proxy_example.go @@ -118,7 +118,7 @@ func main() { go runProxyServer() // initialise a proxy client - build_err := bindings.NewProxyClient(proxyAddr, "127.0.0.1", clientPort, clientTimeout, &env_path) + build_err := bindings.NewProxyClient(proxyAddr, "127.0.0.1", clientPort, clientTimeout, &env_path, 2) if build_err != nil { fmt.Println(build_err) return diff --git a/sdk/ffi/go/src/bindings.udl b/sdk/ffi/go/src/bindings.udl index 030e75a9a68..ec429404c0f 100644 --- a/sdk/ffi/go/src/bindings.udl +++ b/sdk/ffi/go/src/bindings.udl @@ -30,7 +30,7 @@ namespace bindings { [Throws=GoWrapError] IncomingMessage listen_for_incoming(); [Throws=GoWrapError] - void new_proxy_client(string server_address, string listen_address, string listen_port, u64 close_timeout, string? env); + void new_proxy_client(string server_address, string listen_address, string listen_port, u64 close_timeout, string? env, u8 pool_size); [Throws=GoWrapError] void new_proxy_client_default(string server_address, string? env); [Throws=GoWrapError] diff --git a/sdk/ffi/go/src/lib.rs b/sdk/ffi/go/src/lib.rs index a0a690c6913..83656c1b76c 100644 --- a/sdk/ffi/go/src/lib.rs +++ b/sdk/ffi/go/src/lib.rs @@ -100,6 +100,7 @@ fn new_proxy_client( listen_port: String, close_timeout: u64, env: Option, + pool_size: u8, ) -> Result<(), GoWrapError> { let server_nym_addr = Recipient::try_from_base58_string(server_address).expect("couldn't create Recipient"); @@ -109,6 +110,7 @@ fn new_proxy_client( &listen_port, close_timeout, env, + pool_size as usize, ) { Ok(_) => Ok(()), Err(_) => Err(GoWrapError::ProxyInitError {}), diff --git a/sdk/ffi/shared/Cargo.toml b/sdk/ffi/shared/Cargo.toml index 9f66e095eff..964be022174 100644 --- a/sdk/ffi/shared/Cargo.toml +++ b/sdk/ffi/shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nym-ffi-shared" -version = "0.2.0" +version = "0.2.1" edition = "2021" license.workspace = true diff --git a/sdk/ffi/shared/src/lib.rs b/sdk/ffi/shared/src/lib.rs index de25f90443a..e7a770f2699 100644 --- a/sdk/ffi/shared/src/lib.rs +++ b/sdk/ffi/shared/src/lib.rs @@ -1,7 +1,7 @@ // Copyright 2023-2024 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 -use anyhow::{anyhow, bail}; +use anyhow::{anyhow, bail, Error, Result}; use lazy_static::lazy_static; use nym_sdk::mixnet::{ MixnetClient, MixnetClientBuilder, MixnetMessageSender, Recipient, ReconstructedMessage, @@ -13,11 +13,8 @@ use std::path::PathBuf; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; -// NYM_CLIENT/PROXIES: Static reference (only init-ed once) to: -// - Arc: share ownership -// - Mutex: thread-safe way to share data between threads -// - Option: init-ed or not -// RUNTIME: Tokio runtime: no need to pass back to C and deal with raw pointers as it was previously +// NYM_CLIENT/PROXIES: Static thread-safe reference (init once) to Options. +// RUNTIME: Tokio runtime: no need to pass across FFI boundary and deal with raw pointers. lazy_static! { static ref NYM_PROXY_CLIENT: Arc>> = Arc::new(Mutex::new(None)); static ref NYM_PROXY_SERVER: Arc>> = Arc::new(Mutex::new(None)); @@ -25,7 +22,7 @@ lazy_static! { static ref RUNTIME: Runtime = Runtime::new().unwrap(); } -pub fn init_ephemeral_internal() -> anyhow::Result<(), anyhow::Error> { +pub fn init_ephemeral_internal() -> Result<(), Error> { if NYM_CLIENT.lock().unwrap().as_ref().is_some() { bail!("client already exists"); } else { @@ -37,13 +34,13 @@ pub fn init_ephemeral_internal() -> anyhow::Result<(), anyhow::Error> { } else { return Err(anyhow!("couldnt lock ephemeral NYM_CLIENT")); } - Ok::<(), anyhow::Error>(()) + Ok::<(), Error>(()) })?; } Ok(()) } -pub fn init_default_storage_internal(config_dir: PathBuf) -> anyhow::Result<(), anyhow::Error> { +pub fn init_default_storage_internal(config_dir: PathBuf) -> Result<(), Error> { if NYM_CLIENT.lock().unwrap().as_ref().is_some() { bail!("client already exists"); } else { @@ -60,13 +57,13 @@ pub fn init_default_storage_internal(config_dir: PathBuf) -> anyhow::Result<(), } else { return Err(anyhow!("couldnt lock NYM_CLIENT")); } - Ok::<(), anyhow::Error>(()) + Ok::<(), Error>(()) })?; } Ok(()) } -pub fn get_self_address_internal() -> anyhow::Result { +pub fn get_self_address_internal() -> Result { let client = NYM_CLIENT.lock().expect("could not lock NYM_CLIENT"); if client.is_none() { bail!("Client is not yet initialised"); @@ -83,7 +80,8 @@ pub fn get_self_address_internal() -> anyhow::Result { pub fn send_message_internal( recipient: Recipient, message: &str, -) -> anyhow::Result<(), anyhow::Error> { + // TODO add Option, if Some(surb_amount) call send_message() instead with specified #, else send_plain_message as this uses the default +) -> Result<(), Error> { let client = NYM_CLIENT.lock().expect("could not lock NYM_CLIENT"); if client.is_none() { bail!("Client is not yet initialised"); @@ -94,17 +92,14 @@ pub fn send_message_internal( RUNTIME.block_on(async move { nym_client.send_plain_message(recipient, message).await?; - Ok::<(), anyhow::Error>(()) + Ok::<(), Error>(()) })?; Ok(()) } // TODO send_raw_message_internal -pub fn reply_internal( - recipient: AnonymousSenderTag, - message: &str, -) -> anyhow::Result<(), anyhow::Error> { +pub fn reply_internal(recipient: AnonymousSenderTag, message: &str) -> Result<(), Error> { let client = NYM_CLIENT.lock().expect("could not lock NYM_CLIENT"); if client.is_none() { bail!("Client is not yet initialised"); @@ -115,12 +110,12 @@ pub fn reply_internal( RUNTIME.block_on(async move { nym_client.send_reply(recipient, message).await?; - Ok::<(), anyhow::Error>(()) + Ok::<(), Error>(()) })?; Ok(()) } -pub fn listen_for_incoming_internal() -> anyhow::Result { +pub fn listen_for_incoming_internal() -> Result { let mut binding = NYM_CLIENT.lock().expect("could not lock NYM_CLIENT"); if binding.is_none() { bail!("recipient is null"); @@ -131,7 +126,7 @@ pub fn listen_for_incoming_internal() -> anyhow::Result(ReconstructedMessage { + Ok::(ReconstructedMessage { message: received.message, sender_tag: received.sender_tag, }) @@ -140,9 +135,7 @@ pub fn listen_for_incoming_internal() -> anyhow::Result anyhow::Result { +pub async fn wait_for_non_empty_message(client: &mut MixnetClient) -> Result { while let Some(mut new_message) = client.wait_for_messages().await { if !new_message.is_empty() { return new_message @@ -159,7 +152,8 @@ pub fn proxy_client_new_internal( listen_port: &str, close_timeout: u64, env: Option, -) -> anyhow::Result<(), anyhow::Error> { + pool_size: usize, +) -> Result<(), Error> { if NYM_PROXY_CLIENT.lock().unwrap().as_ref().is_some() { bail!("proxy client already exists"); } else { @@ -170,6 +164,7 @@ pub fn proxy_client_new_internal( listen_port, close_timeout, env, + pool_size, ) .await?; let mut client = NYM_PROXY_CLIENT.try_lock(); @@ -178,7 +173,7 @@ pub fn proxy_client_new_internal( } else { return Err(anyhow!("couldnt lock NYM_PROXY_CLIENT")); } - Ok::<(), anyhow::Error>(()) + Ok::<(), Error>(()) })?; } Ok(()) @@ -187,7 +182,7 @@ pub fn proxy_client_new_internal( pub fn proxy_client_new_defaults_internal( server_address: Recipient, env: Option, -) -> anyhow::Result<(), anyhow::Error> { +) -> Result<(), Error> { if NYM_PROXY_CLIENT.lock().unwrap().as_ref().is_some() { bail!("proxy client already exists"); } else { @@ -199,13 +194,13 @@ pub fn proxy_client_new_defaults_internal( } else { return Err(anyhow!("couldn't lock PROXY_CLIENT")); } - Ok::<(), anyhow::Error>(()) + Ok::<(), Error>(()) })?; } Ok(()) } -pub fn proxy_client_run_internal() -> anyhow::Result<(), anyhow::Error> { +pub fn proxy_client_run_internal() -> Result<(), Error> { let proxy_client = NYM_PROXY_CLIENT .lock() .expect("could not lock NYM_PROXY_CLIENT"); @@ -217,7 +212,7 @@ pub fn proxy_client_run_internal() -> anyhow::Result<(), anyhow::Error> { .ok_or_else(|| anyhow!("could not get proxy_client as_ref()"))?; RUNTIME.block_on(async move { proxy.run().await?; - Ok::<(), anyhow::Error>(()) + Ok::<(), Error>(()) })?; Ok(()) } @@ -226,7 +221,7 @@ pub fn proxy_server_new_internal( upstream_address: &str, config_dir: &str, env: Option, -) -> anyhow::Result<(), anyhow::Error> { +) -> Result<(), Error> { if NYM_PROXY_SERVER.lock().unwrap().as_ref().is_some() { bail!("proxy client already exists"); } else { @@ -238,13 +233,13 @@ pub fn proxy_server_new_internal( } else { return Err(anyhow!("couldn't lock PROXY_SERVER")); } - Ok::<(), anyhow::Error>(()) + Ok::<(), Error>(()) })?; } Ok(()) } -pub fn proxy_server_run_internal() -> anyhow::Result<(), anyhow::Error> { +pub fn proxy_server_run_internal() -> Result<(), Error> { let mut proxy_server = NYM_PROXY_SERVER .lock() .expect("could not lock NYM_PROXY_CLIENT"); @@ -256,12 +251,12 @@ pub fn proxy_server_run_internal() -> anyhow::Result<(), anyhow::Error> { .ok_or_else(|| anyhow!("could not get proxy_client as_ref()"))?; RUNTIME.block_on(async move { proxy.run_with_shutdown().await?; - Ok::<(), anyhow::Error>(()) + Ok::<(), Error>(()) })?; Ok(()) } -pub fn proxy_server_address_internal() -> anyhow::Result { +pub fn proxy_server_address_internal() -> Result { let mut proxy_server = NYM_PROXY_SERVER .lock() .expect("could not lock NYM_PROXY_CLIENT"); diff --git a/sdk/ffi/shared/uniffi-bindgen.rs b/sdk/ffi/shared/uniffi-bindgen.rs.bak similarity index 100% rename from sdk/ffi/shared/uniffi-bindgen.rs rename to sdk/ffi/shared/uniffi-bindgen.rs.bak diff --git a/sdk/rust/nym-sdk/Cargo.toml b/sdk/rust/nym-sdk/Cargo.toml index 6d8f360effa..4c089be972e 100644 --- a/sdk/rust/nym-sdk/Cargo.toml +++ b/sdk/rust/nym-sdk/Cargo.toml @@ -57,8 +57,9 @@ uuid = { version = "1", features = ["v4", "serde"] } bincode = "1.0" serde = { version = "1", features = ["derive"] } tracing.workspace = true -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } dirs.workspace = true +tempfile.workspace = true [dev-dependencies] anyhow = { workspace = true } diff --git a/sdk/rust/nym-sdk/examples/client_pool.rs b/sdk/rust/nym-sdk/examples/client_pool.rs new file mode 100644 index 00000000000..27c452ec194 --- /dev/null +++ b/sdk/rust/nym-sdk/examples/client_pool.rs @@ -0,0 +1,101 @@ +// Copyright 2023 - Nym Technologies SA +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use nym_network_defaults::setup_env; +use nym_sdk::client_pool::ClientPool; +use nym_sdk::mixnet::{MixnetClientBuilder, NymNetworkDetails}; +use tokio::signal::ctrl_c; + +// This client pool is used internally by the TcpProxyClient but can also be used by the Mixnet module, in case you're quickly swapping clients in and out but won't want to use the TcpProxy module for whatever reason. +// +// Run with: cargo run --example client_pool -- ../../../envs/.env +#[tokio::main] +async fn main() -> Result<()> { + nym_bin_common::logging::setup_logging(); + setup_env(std::env::args().nth(1)); + + let conn_pool = ClientPool::new(2); // Start the Client Pool with 2 Clients always being kept in reserve + let client_maker = conn_pool.clone(); + tokio::spawn(async move { + client_maker.start().await?; + Ok::<(), anyhow::Error>(()) + }); + + println!("\n\nWaiting a few seconds to fill pool\n\n"); + tokio::time::sleep(tokio::time::Duration::from_secs(15)).await; + + let pool_clone_one = conn_pool.clone(); + let pool_clone_two = conn_pool.clone(); + + tokio::spawn(async move { + let client_one = match pool_clone_one.get_mixnet_client().await { + Some(client) => { + println!("Grabbed client {} from pool", client.nym_address()); + client + } + None => { + println!("Not enough clients in pool, creating ephemeral client"); + let net = NymNetworkDetails::new_from_env(); + let client = MixnetClientBuilder::new_ephemeral() + .network_details(net) + .build() + .unwrap() + .connect_to_mixnet() + .await + .unwrap(); + println!( + "Using {} for the moment, created outside of the connection pool", + client.nym_address() + ); + client + } + }; + let our_address = client_one.nym_address(); + println!("\n\nClient 1: {our_address}\n\n"); + client_one.disconnect().await; + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; // Emulate doing something + Ok::<(), anyhow::Error>(()) + }); + + tokio::spawn(async move { + let client_two = match pool_clone_two.get_mixnet_client().await { + Some(client) => { + println!("Grabbed client {} from pool", client.nym_address()); + client + } + None => { + println!("Not enough clients in pool, creating ephemeral client"); + let net = NymNetworkDetails::new_from_env(); + let client = MixnetClientBuilder::new_ephemeral() + .network_details(net) + .build() + .unwrap() + .connect_to_mixnet() + .await + .unwrap(); + println!( + "Using {} for the moment, created outside of the connection pool", + client.nym_address() + ); + client + } + }; + let our_address = *client_two.nym_address(); + println!("\n\nClient 2: {our_address}\n\n"); + client_two.disconnect().await; + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; // Emulate doing something + Ok::<(), anyhow::Error>(()) + }); + + wait_for_ctrl_c(conn_pool).await?; + Ok(()) +} + +async fn wait_for_ctrl_c(pool: ClientPool) -> Result<()> { + println!("\n\nPress CTRL_C to disconnect pool\n\n"); + ctrl_c().await?; + println!("CTRL_C received. Killing client pool"); + pool.disconnect_pool().await; + Ok(()) +} diff --git a/sdk/rust/nym-sdk/examples/tcp_proxy_multistream.rs b/sdk/rust/nym-sdk/examples/tcp_proxy_multistream.rs index ef3a6654a93..caf5d74020a 100644 --- a/sdk/rust/nym-sdk/examples/tcp_proxy_multistream.rs +++ b/sdk/rust/nym-sdk/examples/tcp_proxy_multistream.rs @@ -10,6 +10,8 @@ use tokio::net::TcpStream; use tokio::signal; use tokio_stream::StreamExt; use tokio_util::codec; +use tokio_util::sync::CancellationToken; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[derive(Serialize, Deserialize, Debug)] struct ExampleMessage { @@ -34,8 +36,13 @@ async fn main() -> anyhow::Result<()> { // Nym client logging is very informative but quite verbose. // The Message Decay related logging gives you an ideas of the internals of the proxy message ordering: you need to switch // to DEBUG to see the contents of the msg buffer, sphinx packet chunking, etc. - tracing_subscriber::fmt() - .with_max_level(tracing::Level::INFO) + tracing_subscriber::registry() + .with(fmt::layer()) + .with( + EnvFilter::new("info") + .add_directive("nym_sdk::client_pool=info".parse().unwrap()) + .add_directive("nym_sdk::tcp_proxy_client=debug".parse().unwrap()), + ) .init(); let env_path = env::args().nth(2).expect("Env file not specified"); @@ -43,23 +50,42 @@ async fn main() -> anyhow::Result<()> { let listen_port = env::args().nth(3).expect("Port not specified"); - // Within the TcpProxyClient, individual client shutdown is triggered by the timeout. + // Within the TcpProxyClient, individual client shutdown is triggered by the timeout. The final argument is how many clients to keep in reserve in the client pool when running the TcpProxy. let proxy_client = - tcp_proxy::NymProxyClient::new(server, "127.0.0.1", &listen_port, 45, Some(env)).await?; + tcp_proxy::NymProxyClient::new(server, "127.0.0.1", &listen_port, 80, Some(env), 3).await?; + + // For our disconnect() logic below + let proxy_clone = proxy_client.clone(); tokio::spawn(async move { proxy_client.run().await?; Ok::<(), anyhow::Error>(()) }); + let example_cancel_token = CancellationToken::new(); + let client_cancel_token = example_cancel_token.clone(); + let watcher_cancel_token = example_cancel_token.clone(); + + // Cancel listener thread + tokio::spawn(async move { + signal::ctrl_c().await?; + println!(":: CTRL_C received, shutting down + cleanup up proxy server config files"); + watcher_cancel_token.cancel(); + proxy_clone.disconnect().await; + Ok::<(), anyhow::Error>(()) + }); + println!("waiting for everything to be set up.."); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; println!("done. sending bytes"); // In the info traces you will see the different session IDs being set up, one for each TcpStream. for i in 0..4 { + let client_cancel_inner_token = client_cancel_token.clone(); + if client_cancel_token.is_cancelled() { + break; + } let conn_id = i; - println!("Starting TCP connection {}", conn_id); let local_tcp_addr = format!("127.0.0.1:{}", listen_port.clone()); tokio::spawn(async move { // Now the client and server proxies are running we can create and pipe traffic to/from @@ -76,6 +102,9 @@ async fn main() -> anyhow::Result<()> { // Lets just send a bunch of messages to the server with variable delays between them, with a message and tcp connection ids to keep track of ordering on the server side (for illustrative purposes **only**; keeping track of anonymous replies is handled by the proxy under the hood with Single Use Reply Blocks (SURBs); for this illustration we want some kind of app-level message id, but irl most of the time you'll probably be parsing on e.g. the incoming response type instead) tokio::spawn(async move { for i in 0..4 { + if client_cancel_inner_token.is_cancelled() { + break; + } let mut rng = SmallRng::from_entropy(); let delay: f64 = rng.gen_range(2.5..5.0); tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await; @@ -90,12 +119,7 @@ async fn main() -> anyhow::Result<()> { .write_all(&serialised) .await .expect("couldn't write to stream"); - println!( - ">> client sent {}: {} bytes on conn {}", - &i, - msg.message_bytes.len(), - &conn_id - ); + println!(">> client sent msg {} on conn {}", &i, &conn_id); } Ok::<(), anyhow::Error>(()) }); @@ -107,17 +131,8 @@ async fn main() -> anyhow::Result<()> { while let Some(Ok(bytes)) = framed_read.next().await { match bincode::deserialize::(&bytes) { Ok(msg) => { - println!( - "<< client received {}: {} bytes on conn {}", - msg.message_id, - msg.message_bytes.len(), - msg.tcp_conn - ); reply_counter += 1; - println!( - "tcp connection {} replies received {}/4", - msg.tcp_conn, reply_counter - ); + println!("<< conn {} received {}/4", msg.tcp_conn, reply_counter); } Err(e) => { println!("<< client received something that wasn't an example message of {} bytes. error: {}", bytes.len(), e); @@ -132,15 +147,18 @@ async fn main() -> anyhow::Result<()> { tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await; } - // Once timeout is passed, you can either wait for graceful shutdown or just hard stop it. - signal::ctrl_c().await?; - println!("CTRL+C received, shutting down"); + loop { + if example_cancel_token.is_cancelled() { + break; + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } Ok(()) } // emulate a series of small messages followed by a closing larger one fn gen_bytes_fixed(i: usize) -> Vec { - let amounts = [10, 15, 50, 1000]; + let amounts = [10, 15, 50, 1000, 2000]; let len = amounts[i]; let mut rng = rand::thread_rng(); (0..len).map(|_| rng.gen::()).collect() diff --git a/sdk/rust/nym-sdk/examples/tcp_proxy_single_connection.rs b/sdk/rust/nym-sdk/examples/tcp_proxy_single_connection.rs index 5fcea31267b..f2ca78c9311 100644 --- a/sdk/rust/nym-sdk/examples/tcp_proxy_single_connection.rs +++ b/sdk/rust/nym-sdk/examples/tcp_proxy_single_connection.rs @@ -10,6 +10,8 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::signal; use tokio_stream::StreamExt; use tokio_util::codec; +use tokio_util::sync::CancellationToken; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[derive(Serialize, Deserialize, Debug)] struct ExampleMessage { @@ -31,13 +33,13 @@ struct ExampleMessage { #[tokio::main] async fn main() -> anyhow::Result<()> { // Keep track of sent/received messages - // let counter = Arc::new(Mutex::new(0)); let counter = AtomicU8::new(0); // Comment this out to just see println! statements from this example, as Nym client logging is very informative but quite verbose. // The Message Decay related logging gives you an ideas of the internals of the proxy message ordering. To see the contents of the msg buffer, sphinx packet chunking, etc change the tracing::Level to DEBUG. - tracing_subscriber::fmt() - .with_max_level(tracing::Level::INFO) + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::new("nym_sdk::tcp_proxy=info")) .init(); let server_port = env::args() @@ -60,10 +62,14 @@ async fn main() -> anyhow::Result<()> { // We'll run the instance with a long timeout since we're sending everything down the same Tcp connection, so should be using a single session. // Within the TcpProxyClient, individual client shutdown is triggered by the timeout. + // The final argument is how many clients to keep in reserve in the client pool when running the TcpProxy. let proxy_client = - tcp_proxy::NymProxyClient::new(*proxy_nym_addr, "127.0.0.1", &client_port, 60, Some(env)) + tcp_proxy::NymProxyClient::new(*proxy_nym_addr, "127.0.0.1", &client_port, 5, Some(env), 1) .await?; + // For our disconnect() logic below + let proxy_clone = proxy_client.clone(); + tokio::spawn(async move { proxy_server.run_with_shutdown().await?; Ok::<(), anyhow::Error>(()) @@ -74,10 +80,28 @@ async fn main() -> anyhow::Result<()> { Ok::<(), anyhow::Error>(()) }); + let example_cancel_token = CancellationToken::new(); + let server_cancel_token = example_cancel_token.clone(); + let client_cancel_token = example_cancel_token.clone(); + let watcher_cancel_token = example_cancel_token.clone(); + + // Cancel listener thread + tokio::spawn(async move { + signal::ctrl_c().await?; + println!(":: CTRL_C received, shutting down + cleanup up proxy server config files"); + fs::remove_dir_all(conf_path)?; + watcher_cancel_token.cancel(); + proxy_clone.disconnect().await; + Ok::<(), anyhow::Error>(()) + }); + // 'Server side' thread: echo back incoming as response to the messages sent in the 'client side' thread below tokio::spawn(async move { let listener = TcpListener::bind(upstream_tcp_addr).await?; loop { + if server_cancel_token.is_cancelled() { + break; + } let (socket, _) = listener.accept().await.unwrap(); let (read, mut write) = socket.into_split(); let codec = codec::BytesCodec::new(); @@ -115,9 +139,9 @@ async fn main() -> anyhow::Result<()> { Ok::<(), anyhow::Error>(()) }); - // Just wait for Nym clients to connect, TCP clients to bind, etc. + // Just wait for Nym clients to connect, TCP clients to bind, etc. If there isn't a client in the pool (or you started it with 0) already then the TcpProxyClient just spins up an ephemeral client itself. println!("waiting for everything to be set up.."); - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; println!("done. sending bytes"); // Now the client and server proxies are running we can create and pipe traffic to/from @@ -137,6 +161,9 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(async move { let mut rng = SmallRng::from_entropy(); for i in 0..10 { + if client_cancel_token.is_cancelled() { + break; + } let random_bytes = gen_bytes_fixed(i as usize); let msg = ExampleMessage { message_id: i, @@ -176,15 +203,10 @@ async fn main() -> anyhow::Result<()> { } } - // Once timeout is passed, you can either wait for graceful shutdown or just hard stop it. - signal::ctrl_c().await?; - println!(":: CTRL+C received, shutting down + cleanup up proxy server config files"); - fs::remove_dir_all(conf_path)?; Ok(()) } fn gen_bytes_fixed(i: usize) -> Vec { - // let amounts = vec![1, 10, 50, 100, 150, 200, 350, 500, 750, 1000]; let amounts = [158, 1088, 505, 1001, 150, 200, 3500, 500, 750, 100]; let len = amounts[i]; let mut rng = rand::thread_rng(); diff --git a/sdk/rust/nym-sdk/src/client_pool.rs b/sdk/rust/nym-sdk/src/client_pool.rs new file mode 100644 index 00000000000..0a2471275b3 --- /dev/null +++ b/sdk/rust/nym-sdk/src/client_pool.rs @@ -0,0 +1,144 @@ +//! use crate::mixnet::{MixnetClient, MixnetClientBuilder, NymNetworkDetails}; +//! use anyhow::Result; +//! use std::fmt; +//! use std::sync::Arc; +//! use tokio::sync::RwLock; +//! use tokio_util::sync::CancellationToken; +//! use tracing::{debug, info, warn}; + +//! pub struct ClientPool { +//! clients: Arc>>>, // Collection of clients waiting to be used which are popped off in get_mixnet_client() +//! client_pool_reserve_number: usize, // Default # of clients to have available in pool in reserve waiting for incoming connections +//! cancel_token: CancellationToken, +//! } + +//! // This is only necessary for when you're wanting to check the addresses of the clients that are currently in the //! pool. +//! impl fmt::Debug for ClientPool { +//! fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +//! let clients_debug = match self.clients.try_read() { +//! Ok(clients) => { +//! if f.alternate() { +//! // pretty +//! clients +//! .iter() +//! .enumerate() +//! .map(|(i, client)| format!("\n {}: {}", i, client.nym_address())) +//! .collect::>() +//! .join(",") +//! } else { +//! // compact +//! clients +//! .iter() +//! .map(|client| client.nym_address().to_string()) +//! .collect::>() +//! .join(", ") +//! } +//! } +//! Err(_) => "".to_string(), +//! }; + +//! let mut debug_struct = f.debug_struct("Pool"); +//! debug_struct +//! .field( +//! "client_pool_reserve_number", +//! &self.client_pool_reserve_number, +//! ) +//! .field("clients", &format_args!("[{}]", clients_debug)); +//! debug_struct.finish() +//! } +//! } + +//! impl ClientPool { +//! pub fn new(client_pool_reserve_number: usize) -> Self { +//! ClientPool { +//! clients: Arc::new(RwLock::new(Vec::new())), +//! client_pool_reserve_number, +//! cancel_token: CancellationToken::new(), +//! } +//! } + +//! // The loop here is simple: if there aren't enough clients, create more. If you set clients to 0, repeatedly //! just sleep. +//! // disconnect_pool() will kill this loop via the cancellation token. +//! pub async fn start(&self) -> Result<()> { +//! loop { +//! let spawned_clients = self.clients.read().await.len(); +//! let addresses = self; +//! debug!( +//! "Currently spawned clients: {}: {:?}", +//! spawned_clients, addresses +//! ); +//! if self.cancel_token.is_cancelled() { +//! break Ok(()); +//! } +//! if spawned_clients >= self.client_pool_reserve_number { +//! debug!("Got enough clients already: sleeping"); +//! } else { +//! info!( +//! "Clients in reserve = {}, reserve amount = {}, spawning new client", +//! spawned_clients, self.client_pool_reserve_number +//! ); +//! let client = loop { +//! let net = NymNetworkDetails::new_from_env(); +//! match MixnetClientBuilder::new_ephemeral() +//! .network_details(net) +//! .build()? +//! .connect_to_mixnet() +//! .await +//! { +//! Ok(client) => break client, +//! Err(err) => { +//! warn!("Error creating client: {:?}, will retry in 100ms", err); +//! tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; +//! } +//! } +//! }; +//! self.clients.write().await.push(Arc::new(client)); +//! } +//! tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; +//! } +//! } + +//! pub async fn disconnect_pool(&self) { +//! info!("Triggering Client Pool disconnect"); +//! self.cancel_token.cancel(); +//! info!( +//! "Client pool cancellation token cancelled: {}", +//! self.cancel_token.is_cancelled() +//! ); +//! let mut clients = self.clients.write().await; +//! while let Some(arc_client) = clients.pop() { +//! if let Ok(client) = Arc::try_unwrap(arc_client) { +//! info!("Killing reserve client {}", client.nym_address()); +//! client.disconnect().await; +//! } +//! } +//! } + +//! pub async fn get_mixnet_client(&self) -> Option { +//! debug!("Grabbing client from pool"); +//! let mut clients = self.clients.write().await; +//! clients +//! .pop() +//! .and_then(|arc_client| Arc::try_unwrap(arc_client).ok()) +//! } + +//! pub async fn get_client_count(&self) -> usize { +//! self.clients.read().await.len() +//! } + +//! pub async fn get_pool_reserve(&self) -> usize { +//! self.client_pool_reserve_number +//! } + +//! pub fn clone(&self) -> Self { +//! Self { +//! clients: Arc::clone(&self.clients), +//! client_pool_reserve_number: self.client_pool_reserve_number, +//! cancel_token: self.cancel_token.clone(), +//! } +//! } +//! } + +mod mixnet_client_pool; + +pub use mixnet_client_pool::ClientPool; diff --git a/sdk/rust/nym-sdk/src/client_pool/mixnet_client_pool.rs b/sdk/rust/nym-sdk/src/client_pool/mixnet_client_pool.rs new file mode 100644 index 00000000000..ff7b8b81981 --- /dev/null +++ b/sdk/rust/nym-sdk/src/client_pool/mixnet_client_pool.rs @@ -0,0 +1,189 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::mixnet::{MixnetClient, MixnetClientBuilder, NymNetworkDetails}; +use anyhow::Result; +use nym_crypto::asymmetric::ed25519; +use std::fmt; +use std::sync::Arc; +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; + +pub struct ClientPool { + clients: Arc>>>, // Collection of clients waiting to be used which are popped off in get_mixnet_client() + client_pool_reserve_number: usize, // Default # of clients to have available in pool in reserve waiting for incoming connections + cancel_token: CancellationToken, +} + +// This is only necessary for when you're wanting to check the addresses of the clients that are currently in the pool via logging. +impl fmt::Debug for ClientPool { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let clients_debug = match self.clients.try_read() { + Ok(clients) => { + if f.alternate() { + // pretty + clients + .iter() + .enumerate() + .map(|(i, client)| format!("\n {}: {}", i, client.nym_address())) + .collect::>() + .join(",") + } else { + // compact + clients + .iter() + .map(|client| client.nym_address().to_string()) + .collect::>() + .join(", ") + } + } + Err(_) => "".to_string(), + }; + + let mut debug_struct = f.debug_struct("Pool"); + debug_struct + .field( + "client_pool_reserve_number", + &self.client_pool_reserve_number, + ) + .field("clients", &format_args!("[{}]", clients_debug)); + debug_struct.finish() + } +} + +impl Clone for ClientPool { + fn clone(&self) -> Self { + Self { + clients: Arc::clone(&self.clients), + client_pool_reserve_number: self.client_pool_reserve_number, + cancel_token: self.cancel_token.clone(), + } + } +} + +impl ClientPool { + pub fn new(client_pool_reserve_number: usize) -> Self { + ClientPool { + clients: Arc::new(RwLock::new(Vec::new())), + client_pool_reserve_number, + cancel_token: CancellationToken::new(), + } + } + + // The loop here is simple: if there aren't enough clients, create more. If you set clients to 0, repeatedly just sleep. + // disconnect_pool() will kill this loop via the cancellation token. + pub async fn start(&self) -> Result<()> { + loop { + let spawned_clients = self.clients.read().await.len(); + let addresses = self; + debug!( + "Currently spawned clients: {}: {:?}", + spawned_clients, addresses + ); + if self.cancel_token.is_cancelled() { + break Ok(()); + } + if spawned_clients >= self.client_pool_reserve_number { + debug!("Got enough clients already: sleeping"); + } else { + info!( + "Clients in reserve = {}, reserve amount = {}, spawning new client", + spawned_clients, self.client_pool_reserve_number + ); + let client = loop { + let net = NymNetworkDetails::new_from_env(); + match MixnetClientBuilder::new_ephemeral() + .network_details(net) + .build()? + .connect_to_mixnet() + .await + { + Ok(client) => break client, + Err(err) => { + warn!("Error creating client: {:?}, will retry in 100ms", err); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + }; + self.clients.write().await.push(Arc::new(client)); + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + + // Even though this is basically start() with an extra param since I think this + // will only be used for testing scenarios, and I didn't want to unnecessarily add + // another param to the function that will be used elsewhere, hence this is its own fn + pub async fn start_with_specified_gateway(&self, gateway: ed25519::PublicKey) -> Result<()> { + loop { + let spawned_clients = self.clients.read().await.len(); + let addresses = self; + debug!( + "Currently spawned clients: {}: {:?}", + spawned_clients, addresses + ); + if self.cancel_token.is_cancelled() { + break Ok(()); + } + if spawned_clients >= self.client_pool_reserve_number { + debug!("Got enough clients already: sleeping"); + } else { + info!( + "Clients in reserve = {}, reserve amount = {}, spawning new client", + spawned_clients, self.client_pool_reserve_number + ); + let client = loop { + let net = NymNetworkDetails::new_from_env(); + match MixnetClientBuilder::new_ephemeral() + .network_details(net) + .request_gateway(gateway.to_string()) + .build()? + .connect_to_mixnet() + .await + { + Ok(client) => break client, + Err(err) => { + warn!("Error creating client: {:?}, will retry in 100ms", err); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + }; + self.clients.write().await.push(Arc::new(client)); + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + + pub async fn disconnect_pool(&self) { + info!("Triggering Client Pool disconnect"); + self.cancel_token.cancel(); + info!( + "Client pool cancellation token cancelled: {}", + self.cancel_token.is_cancelled() + ); + let mut clients = self.clients.write().await; + while let Some(arc_client) = clients.pop() { + if let Ok(client) = Arc::try_unwrap(arc_client) { + info!("Killing reserve client {}", client.nym_address()); + client.disconnect().await; + } + } + } + + pub async fn get_mixnet_client(&self) -> Option { + debug!("Grabbing client from pool"); + let mut clients = self.clients.write().await; + clients + .pop() + .and_then(|arc_client| Arc::try_unwrap(arc_client).ok()) + } + + pub async fn get_client_count(&self) -> usize { + self.clients.read().await.len() + } + + pub async fn get_pool_reserve(&self) -> usize { + self.client_pool_reserve_number + } +} diff --git a/sdk/rust/nym-sdk/src/lib.rs b/sdk/rust/nym-sdk/src/lib.rs index 22e954e926f..8615645cce4 100644 --- a/sdk/rust/nym-sdk/src/lib.rs +++ b/sdk/rust/nym-sdk/src/lib.rs @@ -2,10 +2,12 @@ //! //! The main component currently is [`mixnet`]. //! [`tcp_proxy`] is probably a good place to start for anyone wanting to integrate with existing app code and read/write from a socket. +//! [`client_pool`] is a configurable client pool. mod error; pub mod bandwidth; +pub mod client_pool; pub mod mixnet; pub mod tcp_proxy; diff --git a/sdk/rust/nym-sdk/src/tcp_proxy.rs b/sdk/rust/nym-sdk/src/tcp_proxy.rs index 86aafcd8462..beb078ff4ab 100644 --- a/sdk/rust/nym-sdk/src/tcp_proxy.rs +++ b/sdk/rust/nym-sdk/src/tcp_proxy.rs @@ -1,11 +1,3 @@ -//! Proxy abstractions for interacting with the mixnet like a tcp socket -//! -//! -//! # Basic example -//! -//! ```no_run -//! use bincode; -//! use dirs; //! use nym_sdk::tcp_proxy; //! use rand::rngs::SmallRng; //! use rand::{Rng, SeedableRng}; @@ -18,19 +10,20 @@ //! use tokio::signal; //! use tokio_stream::StreamExt; //! use tokio_util::codec; -//! use tracing_subscriber; -//! +//! use tokio_util::sync::CancellationToken; +//! use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + //! #[derive(Serialize, Deserialize, Debug)] //! struct ExampleMessage { //! message_id: i8, //! message_bytes: Vec, //! } -//! -//! // This is a basic example which opens a single TCP connection //! and writes a bunch of messages between a client and an echo -//! // server, so only uses a single session under the hood and //! doesn't really show off the message ordering capabilities; this is mainly + +//! // This is a basic example which opens a single TCP connection and writes a bunch of messages between a client and an echo +//! // server, so only uses a single session under the hood and doesn't really show off the message ordering capabilities; this is mainly //! // just a quick introductory illustration on how: //! // - the mixnet does message ordering -//! // - the NymProxyClient and NymProxyServer can be hooked into //! and used to communicate between two otherwise pretty vanilla TcpStreams +//! // - the NymProxyClient and NymProxyServer can be hooked into and used to communicate between two otherwise pretty vanilla TcpStreams //! // //! // For a more irl example checkout tcp_proxy_multistream.rs //! // @@ -40,59 +33,81 @@ //! #[tokio::main] //! async fn main() -> anyhow::Result<()> { //! // Keep track of sent/received messages -//! // let counter = Arc::new(Mutex::new(0)); //! let counter = AtomicU8::new(0); -//! + //! // Comment this out to just see println! statements from this example, as Nym client logging is very informative but quite verbose. //! // The Message Decay related logging gives you an ideas of the internals of the proxy message ordering. To see the contents of the msg buffer, sphinx packet chunking, etc change the tracing::Level to DEBUG. -//! tracing_subscriber::fmt() -//! .with_max_level(tracing::Level::INFO) +//! tracing_subscriber::registry() +//! .with(fmt::layer()) +//! .with(EnvFilter::new("nym_sdk::tcp_proxy=info")) //! .init(); -//! + //! let server_port = env::args() //! .nth(1) //! .expect("Server listen port not specified"); //! let upstream_tcp_addr = format!("127.0.0.1:{}", server_port); -//! + //! // This dir gets cleaned up at the end: NOTE if you switch env between tests without letting the file do the automatic cleanup, make sure to manually remove this directory up before running again, otherwise your client will attempt to use these keys for the new env //! let home_dir = dirs::home_dir().expect("Unable to get home directory"); //! let conf_path = format!("{}/tmp/nym-proxy-server-config", home_dir.display()); -//! + //! let env_path = env::args().nth(2).expect("Env file not specified"); //! let env = env_path.to_string(); //! let client_port = env::args().nth(3).expect("Port not specified"); -//! + //! let mut proxy_server = //! tcp_proxy::NymProxyServer::new(&upstream_tcp_addr, &conf_path, Some(env_path.clone())) //! .await?; //! let proxy_nym_addr = proxy_server.nym_address(); -//! + //! // We'll run the instance with a long timeout since we're sending everything down the same Tcp connection, so should be using a single session. //! // Within the TcpProxyClient, individual client shutdown is triggered by the timeout. +//! // The final argument is how many clients to keep in reserve in the client pool when running the TcpProxy. //! let proxy_client = -//! tcp_proxy::NymProxyClient::new(*proxy_nym_addr, "127.0.0.1", &client_port, 60, Some(env)) +//! tcp_proxy::NymProxyClient::new(*proxy_nym_addr, "127.0.0.1", &client_port, 5, Some(env), 1) //! .await?; -//! + +//! // For our disconnect() logic below +//! let proxy_clone = proxy_client.clone(); + //! tokio::spawn(async move { -//! let _ = proxy_server.run_with_shutdown().await?; +//! proxy_server.run_with_shutdown().await?; //! Ok::<(), anyhow::Error>(()) //! }); -//! + //! tokio::spawn(async move { -//! let _ = proxy_client.run().await?; +//! proxy_client.run().await?; //! Ok::<(), anyhow::Error>(()) //! }); -//! + +//! let example_cancel_token = CancellationToken::new(); +//! let server_cancel_token = example_cancel_token.clone(); +//! let client_cancel_token = example_cancel_token.clone(); +//! let watcher_cancel_token = example_cancel_token.clone(); + +//! // Cancel listener thread +//! tokio::spawn(async move { +//! signal::ctrl_c().await?; +//! println!(":: CTRL_C received, shutting down + cleanup up proxy server config files"); +//! fs::remove_dir_all(conf_path)?; +//! watcher_cancel_token.cancel(); +//! proxy_clone.disconnect().await; +//! Ok::<(), anyhow::Error>(()) +//! }); + //! // 'Server side' thread: echo back incoming as response to the messages sent in the 'client side' thread below //! tokio::spawn(async move { //! let listener = TcpListener::bind(upstream_tcp_addr).await?; //! loop { +//! if server_cancel_token.is_cancelled() { +//! break; +//! } //! let (socket, _) = listener.accept().await.unwrap(); //! let (read, mut write) = socket.into_split(); //! let codec = codec::BytesCodec::new(); //! let mut framed_read = codec::FramedRead::new(read, codec); //! while let Some(Ok(bytes)) = framed_read.next().await { -//! match bincode::deserialize:: (&bytes) { +//! match bincode::deserialize::(&bytes) { //! Ok(msg) => { //! println!( //! "<< server received {}: {} bytes", @@ -123,12 +138,12 @@ //! #[allow(unreachable_code)] //! Ok::<(), anyhow::Error>(()) //! }); -//! -//! // Just wait for Nym clients to connect, TCP clients to bind, etc. + +//! // Just wait for Nym clients to connect, TCP clients to bind, etc. If there isn't a client in the pool (or you started it with 0) already then the TcpProxyClient just spins up an ephemeral client itself. //! println!("waiting for everything to be set up.."); -//! tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; +//! tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; //! println!("done. sending bytes"); -//! + //! // Now the client and server proxies are running we can create and pipe traffic to/from //! // a socket on the same port as our ProxyClient instance as if we were just communicating //! // between a client and host via a normal TcpStream - albeit with a decent amount of additional latency. @@ -139,13 +154,16 @@ //! let local_tcp_addr = format!("127.0.0.1:{}", client_port); //! let stream = TcpStream::connect(local_tcp_addr).await?; //! let (read, mut write) = stream.into_split(); -//! + //! // 'Client side' thread; lets just send a bunch of messages to the server with variable delays between them, with an id to keep track of ordering in the printlns; the mixnet only guarantees message delivery, not ordering. You might not be necessarily streaming traffic in this manner IRL, but this example is a good illustration of how messages travel through the mixnet. -//! // - On the level of individual messages broken into multiple packets, the Proxy abstraction deals with making sure that everything is sent between the sockets in the correct order. +//! // - On the level of individual messages broken into multiple packets, the Proxy abstraction deals with making sure that everything is sent between the sockets in the //! corrent order. //! // - On the level of different messages, this is not enforced: you might see in the logs that message 1 arrives at the server and is reconstructed after message 2. //! tokio::spawn(async move { //! let mut rng = SmallRng::from_entropy(); //! for i in 0..10 { +//! if client_cancel_token.is_cancelled() { +//! break; +//! } //! let random_bytes = gen_bytes_fixed(i as usize); //! let msg = ExampleMessage { //! message_id: i, @@ -158,11 +176,11 @@ //! .expect("couldn't write to stream"); //! println!(">> client sent {}: {} bytes", &i, msg.message_bytes.len()); //! let delay = rng.gen_range(3.0..7.0); -//! tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay.clone())).await; +//! tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await; //! } //! Ok::<(), anyhow::Error>(()) //! }); -//! + //! let codec = codec::BytesCodec::new(); //! let mut framed_read = codec::FramedRead::new(read, codec); //! while let Some(Ok(bytes)) = framed_read.next().await { @@ -184,22 +202,16 @@ //! } //! } //! } -//! -//! // Once timeout is passed, you can either wait for graceful shutdown or just hard stop it. -//! signal::ctrl_c().await?; -//! println!(":: CTRL+C received, shutting down + cleanup up proxy server config files"); -//! fs::remove_dir_all(conf_path)?; + //! Ok(()) //! } -//! + //! fn gen_bytes_fixed(i: usize) -> Vec { -//! // let amounts = vec![1, 10, 50, 100, 150, 200, 350, 500, 750, 1000]; -//! let amounts = vec![158, 1088, 505, 1001, 150, 200, 3500, 500, 750, 100]; +//! let amounts = [158, 1088, 505, 1001, 150, 200, 3500, 500, 750, 100]; //! let len = amounts[i]; //! let mut rng = rand::thread_rng(); //! (0..len).map(|_| rng.gen::()).collect() //! } -//! ``` mod tcp_proxy_client; mod tcp_proxy_server; diff --git a/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_client.rs b/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_client.rs index a0200d4c217..74f1c822667 100644 --- a/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_client.rs +++ b/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_client.rs @@ -1,3 +1,7 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::client_pool::ClientPool; use crate::mixnet::{IncludedSurbs, MixnetClientBuilder, MixnetMessageSender, NymNetworkDetails}; use std::sync::Arc; #[path = "utils.rs"] @@ -12,18 +16,23 @@ use tokio::{ }; use tokio_stream::StreamExt; use tokio_util::codec::{BytesCodec, FramedRead}; -use tracing::{debug, info, instrument, warn}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, instrument}; use utils::{MessageBuffer, Payload, ProxiedMessage}; -const DEFAULT_CLOSE_TIMEOUT: u64 = 60; +const DEFAULT_CLOSE_TIMEOUT: u64 = 60; // seconds const DEFAULT_LISTEN_HOST: &str = "127.0.0.1"; const DEFAULT_LISTEN_PORT: &str = "8080"; +const DEFAULT_CLIENT_POOL_SIZE: usize = 2; +#[derive(Clone)] pub struct NymProxyClient { server_address: Recipient, listen_address: String, listen_port: String, close_timeout: u64, + conn_pool: ClientPool, + cancel_token: CancellationToken, } impl NymProxyClient { @@ -33,45 +42,72 @@ impl NymProxyClient { listen_port: &str, close_timeout: u64, env: Option, + default_client_amount: usize, ) -> Result { - debug!("loading env file: {:?}", env); - setup_env(env); + debug!("Loading env file: {:?}", env); + setup_env(env); // Defaults to mainnet if empty Ok(NymProxyClient { server_address, listen_address: listen_address.to_string(), listen_port: listen_port.to_string(), close_timeout, + conn_pool: ClientPool::new(default_client_amount), + cancel_token: CancellationToken::new(), }) } // server_address is the Nym address of the NymProxyServer to communicate with. pub async fn new_with_defaults(server_address: Recipient, env: Option) -> Result { - debug!("loading env file: {:?}", env); - setup_env(env); // Defaults to mainnet if empty - Ok(NymProxyClient { + NymProxyClient::new( server_address, - listen_address: DEFAULT_LISTEN_HOST.to_string(), - listen_port: DEFAULT_LISTEN_PORT.to_string(), - close_timeout: DEFAULT_CLOSE_TIMEOUT, - }) + DEFAULT_LISTEN_HOST, + DEFAULT_LISTEN_PORT, + DEFAULT_CLOSE_TIMEOUT, + env, + DEFAULT_CLIENT_POOL_SIZE, + ) + .await } pub async fn run(&self) -> Result<()> { - info!("Connecting to mixnet server at {}", self.server_address); + info!( + "Outgoing Mixnet traffic will be sent to {}", + self.server_address + ); let listener = TcpListener::bind(format!("{}:{}", self.listen_address, self.listen_port)).await?; + let client_maker = self.conn_pool.clone(); + tokio::spawn(async move { + client_maker.start().await?; + Ok::<(), anyhow::Error>(()) + }); + loop { - let (stream, _) = listener.accept().await?; - tokio::spawn(NymProxyClient::handle_incoming( - stream, - self.server_address, - self.close_timeout, - )); + tokio::select! { + stream = listener.accept() => { + let (stream, _) = stream?; + tokio::spawn(NymProxyClient::handle_incoming( + stream, + self.server_address, + self.close_timeout, + self.conn_pool.clone(), + self.cancel_token.clone(), + )); + } + _ = self.cancel_token.cancelled() => { + break Ok(()); + } + } } } + pub async fn disconnect(&self) { + self.cancel_token.cancel(); + self.conn_pool.disconnect_pool().await; + } + // The main body of our logic, triggered on each accepted incoming tcp connection. To deal with assumptions about // streaming we have to implement an abstract session for each set of outgoing messages atop each connection, with message // IDs to deal with the fact that the mixnet does not enforce message ordering. @@ -84,11 +120,13 @@ impl NymProxyClient { // Then we spawn 2 tasks: // - 'Outgoing' thread => frames incoming bytes from OwnedReadHalf and pipe through the mixnet & trigger session close. // - 'Incoming' thread => orders incoming messages from the Mixnet via placing them in a MessageBuffer and using tick(), as well as manage session closing. - #[instrument] + #[instrument(skip(stream, server_address, close_timeout, conn_pool, cancel_token))] async fn handle_incoming( stream: TcpStream, server_address: Recipient, close_timeout: u64, + conn_pool: ClientPool, + cancel_token: CancellationToken, ) -> Result<()> { // ID for creation of session abstraction; new session ID per new connection accepted by our tcp listener above. let session_id = uuid::Uuid::new_v4(); @@ -96,29 +134,28 @@ impl NymProxyClient { // Used to communicate end of session between 'Outgoing' and 'Incoming' tasks let (tx, mut rx) = oneshot::channel(); - // Client creation can fail for multiple reasons like bad network connection: this loop just allows us to - // retry in a loop until we can successfully connect without having to restart the entire function - info!(":: Starting session: {}", session_id); - info!(":: creating client..."); - let mut client = loop { - let net = NymNetworkDetails::new_from_env(); - // TODO change to builder but ephemeral - // match MixnetClient::connect_new().await { - match MixnetClientBuilder::new_ephemeral() - .network_details(net) - .build()? - .connect_to_mixnet() - .await - { - Ok(client) => break client, - Err(err) => { - warn!(":: Error creating client: {:?}, will retry in 100ms", err); - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - } + info!("Starting session: {}", session_id); + + let mut client = match conn_pool.get_mixnet_client().await { + Some(client) => { + info!("Grabbed client {} from pool", client.nym_address()); + client + } + None => { + info!("Not enough clients in pool, creating ephemeral client"); + let net = NymNetworkDetails::new_from_env(); + let client = MixnetClientBuilder::new_ephemeral() + .network_details(net) + .build()? + .connect_to_mixnet() + .await?; + info!( + "Using {} for the moment, created outside of the connection pool", + client.nym_address() + ); + client } }; - let client_addr = &client.nym_address(); - info!(":: client created: {}", &client_addr); // Split our tcpstream into OwnedRead and OwnedWrite halves for concurrent read/writing let (read, mut write) = stream.into_split(); @@ -172,7 +209,7 @@ impl NymProxyClient { .send_message(server_addr, &coded_message, IncludedSurbs::Amount(100)) .await?; - info!(":: Closing read end of session: {}", session_id); + info!("Closing read end of session: {}", session_id); tx.send(true) .map_err(|_| anyhow::anyhow!("Could not send close signal"))?; Ok::<(), anyhow::Error>(()) @@ -186,11 +223,12 @@ impl NymProxyClient { // Select!-ing one of following options: // - rx is triggered by tx to log the session will end in ARGS.close_timeout time, break from this loop to pass to loop below // - Deserialise incoming mixnet message, push to msg buffer and tick() to order and write to OwnedWriteHalf. - // - call tick() once per 100ms if neither of the above have occurred. + // - If the cancel_token is in cancelled state, break and kick down to the loop below. + // - Call tick() once per 100ms if neither of the above have occurred. loop { tokio::select! { _ = &mut rx => { - info!(":: Closing write end of session: {} in {} seconds", session_id, close_timeout); + info!("Closing write end of session: {} in {} seconds", session_id, close_timeout); break } Some(message) = client.next() => { @@ -198,6 +236,10 @@ impl NymProxyClient { msg_buffer.push(message); msg_buffer.tick(&mut write).await?; }, + _ = cancel_token.cancelled() => { + info!("Triggering loop shutdown"); + break + }, _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => { msg_buffer.tick(&mut write).await?; } @@ -205,6 +247,7 @@ impl NymProxyClient { } // Select!-ing one of following options: // - Deserialise incoming mixnet message, push to msg buffer and tick() to order and write next messageID in line to OwnedWriteHalf. + // - If the cancel_token is in cancelled state, shutdown client for this thread. // - Sleep for session timeout and return, kills thread with Ok(()). loop { tokio::select! { @@ -213,16 +256,20 @@ impl NymProxyClient { msg_buffer.push(message); msg_buffer.tick(&mut write).await?; }, + _ = cancel_token.cancelled() => { + info!("Triggering client shutdown"); + client.disconnect().await; + return Ok::<(), anyhow::Error>(()) + }, _ = tokio::time::sleep(tokio::time::Duration::from_secs(close_timeout)) => { - info!(":: Closing write end of session: {}", session_id); - info!(":: Triggering client shutdown"); + info!("Closing write end of session: {}", session_id); + info!("Triggering client shutdown"); client.disconnect().await; return Ok::<(), anyhow::Error>(()) - } + }, } } }); - tokio::signal::ctrl_c().await?; Ok(()) } } diff --git a/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_server.rs b/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_server.rs index 3d5917efebe..06411cc61a0 100644 --- a/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_server.rs +++ b/sdk/rust/nym-sdk/src/tcp_proxy/tcp_proxy_server.rs @@ -1,9 +1,13 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + use crate::mixnet::{ AnonymousSenderTag, MixnetClient, MixnetClientBuilder, MixnetClientSender, MixnetMessageSender, NymNetworkDetails, StoragePaths, }; use anyhow::Result; use dashmap::DashSet; +use nym_crypto::asymmetric::ed25519; use nym_network_defaults::setup_env; use nym_sphinx::addressing::Recipient; use std::path::PathBuf; @@ -12,6 +16,7 @@ use tokio::net::TcpStream; use tokio::sync::watch::Receiver; use tokio::sync::RwLock; use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; #[allow(clippy::duplicate_mod)] #[path = "utils.rs"] @@ -26,6 +31,9 @@ pub struct NymProxyServer { mixnet_client_sender: Arc>, tx: tokio::sync::watch::Sender>, rx: tokio::sync::watch::Receiver>, + cancel_token: CancellationToken, + shutdown_tx: tokio::sync::mpsc::Sender<()>, + shutdown_rx: tokio::sync::mpsc::Receiver<()>, } impl NymProxyServer { @@ -33,20 +41,31 @@ impl NymProxyServer { upstream_address: &str, config_dir: &str, env: Option, + gateway: Option, ) -> Result { - info!(":: creating client..."); + info!("Creating client"); // We're wanting to build a client with a constant address, vs the ephemeral in-memory data storage of the NymProxyClient clients. // Following a builder pattern, having to manually connect to the mixnet below. + // Optional selectable Gateway to use. let config_dir = PathBuf::from(config_dir); - debug!("loading env file: {:?}", env); + debug!("Loading env file: {:?}", env); setup_env(env); // Defaults to mainnet if empty let net = NymNetworkDetails::new_from_env(); let storage_paths = StoragePaths::new_from_dir(&config_dir)?; - let client = MixnetClientBuilder::new_with_default_storage(storage_paths) - .await? - .network_details(net) - .build()?; + + let client = if let Some(gateway) = gateway { + MixnetClientBuilder::new_with_default_storage(storage_paths) + .await? + .network_details(net) + .request_gateway(gateway.to_string()) + .build()? + } else { + MixnetClientBuilder::new_with_default_storage(storage_paths) + .await? + .network_details(net) + .build()? + }; let client = client.connect_to_mixnet().await?; @@ -57,7 +76,10 @@ impl NymProxyServer { let (tx, rx) = tokio::sync::watch::channel::>(None); - info!(":: client created: {}", client.nym_address()); + // Our shutdown signal channel + let (shutdown_tx, shutdown_rx) = tokio::sync::mpsc::channel(1); + + info!("Client created: {}", client.nym_address()); Ok(NymProxyServer { upstream_address: upstream_address.to_string(), @@ -66,31 +88,80 @@ impl NymProxyServer { mixnet_client_sender: sender, tx, rx, + cancel_token: CancellationToken::new(), + shutdown_tx, + shutdown_rx, }) } - pub fn nym_address(&self) -> &Recipient { - self.mixnet_client.nym_address() - } + pub async fn run_with_shutdown(&mut self) -> Result<()> { + let handle_token = self.cancel_token.child_token(); + let upstream_address = self.upstream_address.clone(); + let rx = self.rx(); + let mixnet_sender = self.mixnet_client_sender(); + let tx = self.tx.clone(); + let session_map = self.session_map().clone(); - pub fn mixnet_client_mut(&mut self) -> &mut MixnetClient { - &mut self.mixnet_client - } + let mut shutdown_rx = + std::mem::replace(&mut self.shutdown_rx, tokio::sync::mpsc::channel(1).1); - pub fn session_map(&self) -> &DashSet { - &self.session_map - } + // Then get the message stream: poll this for incoming messages + let message_stream = self.mixnet_client_mut(); - pub fn mixnet_client_sender(&self) -> Arc> { - Arc::clone(&self.mixnet_client_sender) - } + loop { + tokio::select! { + Some(()) = shutdown_rx.recv() => { + debug!("Received shutdown signal, stopping TcpProxyServer"); + handle_token.cancel(); + break; + } + // On our Mixnet client getting a new message: + // - Try deserialise into ProxiedMessage TODO also add check for ReconstructedMessage message innards if != ProxiedMessage + // - Check if the attached sessionID exists. + // - If !sessionID, spawn a new session_handler() task. + // - Send the message down tx => rx in our handler. + message = message_stream.next() => { + if let Some(new_message) = message { + let message: ProxiedMessage = match bincode::deserialize(&new_message.message) { + Ok(msg) => msg, + Err(e) => { + error!("Failed to deserialize ProxiedMessage: {}", e); + continue; + } + }; - pub fn tx(&self) -> tokio::sync::watch::Sender> { - self.tx.clone() - } + let session_id = message.session_id(); - pub fn rx(&self) -> tokio::sync::watch::Receiver> { - self.rx.clone() + if session_map.insert(session_id) { + debug!("Got message for a new session"); + + tokio::spawn(Self::session_handler( + upstream_address.clone(), + session_id, + rx.clone(), + mixnet_sender.clone(), + handle_token.clone() + )); + + info!("Spawned a new session handler: {}", session_id); + } + + debug!("Sending message for session {}", session_id); + + if let Some(sender_tag) = new_message.sender_tag { + if let Err(e) = tx.send(Some((message, sender_tag))) { + error!("Failed to send ProxiedMessage: {}", e); + } + } else { + error!("No sender tag found, we can't send a reply without it!"); + } + } + } + } + } + + self.shutdown_rx = shutdown_rx; + Ok(()) } // The main body of our logic, triggered on each received new sessionID. To deal with assumptions about @@ -98,9 +169,9 @@ impl NymProxyServer { // IDs to deal with the fact that the mixnet does not enforce message ordering. // // There is an initial thread which does a bunch of setup logic: - // - Create a TcpStream connecting to our upstream server process. - // - Split incoming TcpStream into OwnedReadHalf and OwnedWriteHalf for concurrent read/write. - // - Create an Arc to store our session SURB - used for anonymous replies. + // - Create a TcpStream connecting to our upstream server process. + // - Split incoming TcpStream into OwnedReadHalf and OwnedWriteHalf for concurrent read/write. + // - Create an Arc to store our session SURB - used for anonymous replies. // // Then we spawn 2 tasks: // - 'Incoming' thread => deals with parsing and storing the SURB (used in Mixnet replies), deserialising and passing the incoming data from the Mixnet to the upstream server. @@ -110,6 +181,7 @@ impl NymProxyServer { session_id: Uuid, mut rx: Receiver>, sender: Arc>, + cancel_token: CancellationToken, ) -> Result<()> { let global_surb = Arc::new(RwLock::new(None)); let stream = TcpStream::connect(upstream_address).await?; @@ -134,7 +206,7 @@ impl NymProxyServer { // - Send serialised reply => Mixnet via SURB. // - If tick() returns true, close session. while let Some(Ok(bytes)) = framed_read.next().await { - info!("server received {} bytes", bytes.len()); + info!("Server received {} bytes", bytes.len()); let reply = ProxiedMessage::new(Payload::Data(bytes.to_vec()), session_id, message_id); message_id += 1; @@ -176,11 +248,14 @@ impl NymProxyServer { let should_close = msg_buffer.tick(&mut write).await?; if should_close { - info!(":: Closing write end of session: {}", session_id); + info!("Closing write end of session: {}", session_id); break; } } } + _ = cancel_token.cancelled() => { + break; + } _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => { msg_buffer.tick(&mut write).await?; } @@ -191,39 +266,65 @@ impl NymProxyServer { Ok(()) } - pub async fn run_with_shutdown(&mut self) -> Result<()> { - // On our Mixnet client getting a new message: - // - Check if the attached sessionID exists. - // - If !sessionID, spawn a new session_handler() task. - // - Send the message down tx => rx in our handler. - while let Some(new_message) = &self.mixnet_client_mut().next().await { - let message: ProxiedMessage = bincode::deserialize(&new_message.message)?; - let session_id = message.session_id(); - // If we've already got message from an existing session, continue, else add it to the session mapping and spawn a new handler(). - if self.session_map().contains(&message.session_id()) { - debug!("Got message for an existing session"); - } else { - self.session_map().insert(message.session_id()); - debug!("Got message for a new session"); - tokio::spawn(Self::session_handler( - self.upstream_address.clone(), - session_id, - self.rx(), - self.mixnet_client_sender(), - )); - info!("Spawned a new session handler: {}", message.session_id()); - } + pub fn disconnect_signal(&self) -> tokio::sync::mpsc::Sender<()> { + self.shutdown_tx.clone() + } - debug!("Sending message for session {}", message.session_id()); + pub fn nym_address(&self) -> &Recipient { + self.mixnet_client.nym_address() + } - if let Some(sender_tag) = new_message.sender_tag { - self.tx.send(Some((message, sender_tag)))? - } else { - error!("No sender tag found, we can't send a reply without it!") - } - } + pub fn mixnet_client_mut(&mut self) -> &mut MixnetClient { + &mut self.mixnet_client + } + + pub fn session_map(&self) -> &DashSet { + &self.session_map + } + + pub fn mixnet_client_sender(&self) -> Arc> { + Arc::clone(&self.mixnet_client_sender) + } + + pub fn tx(&self) -> tokio::sync::watch::Sender> { + self.tx.clone() + } + + pub fn rx(&self) -> tokio::sync::watch::Receiver> { + self.rx.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[tokio::test] + async fn shutdown_works() -> Result<()> { + let config_dir = TempDir::new()?; + let mut server = NymProxyServer::new( + "127.0.0.1:8000", + config_dir.path().to_str().unwrap(), + None, // Mainnet + None, // Random gateway + ) + .await?; + + // Getter for shutdown signal tx + let shutdown_tx = server.disconnect_signal(); + + let server_handle = tokio::spawn(async move { server.run_with_shutdown().await }); + + // Let it start up + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + + // Kill server + shutdown_tx.send(()).await?; + + // Wait for shutdown in handle + check Result != err + server_handle.await??; - tokio::signal::ctrl_c().await?; Ok(()) } } diff --git a/sdk/rust/nym-sdk/src/tcp_proxy/utils.rs b/sdk/rust/nym-sdk/src/tcp_proxy/utils.rs index b4bdad247e5..2753ef28cf2 100644 --- a/sdk/rust/nym-sdk/src/tcp_proxy/utils.rs +++ b/sdk/rust/nym-sdk/src/tcp_proxy/utils.rs @@ -1,11 +1,15 @@ -use std::{collections::HashSet, fmt, ops::Deref, time::Instant}; +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only use anyhow::Result; use serde::{Deserialize, Serialize}; +use std::{collections::HashSet, fmt, ops::Deref, time::Instant}; use tokio::{io::AsyncWriteExt as _, net::tcp::OwnedWriteHalf}; use tracing::{debug, info}; use uuid::Uuid; +const DEFAULT_DECAY: u64 = 6; // decay time in seconds + // Keeps track of // - incoming and unsorted messages wrapped in DecayWrapper for keeping track of when they were received // - the expected next message ID (reset on .tick()) @@ -60,7 +64,7 @@ impl MessageBuffer { debug!("{}", msg.inner()); } - // Iterate over self, filtering messages where msg.decayed() = true (aka message is older than 2 seconds), or where msg.message_id is less than next_msg_id. Then collect and order according to message_id. + // Iterate over self, filtering messages where msg.decayed() = true (aka message is older than DEFAULT_DECAY seconds), or where msg.message_id is less than next_msg_id. Then collect and order according to message_id. let mut send_buffer = self .iter() .filter(|msg| msg.decayed() || msg.message_id() <= self.next_msg_id) @@ -122,7 +126,7 @@ impl DecayWrapper { DecayWrapper { value, start: Instant::now(), - decay: 6, + decay: DEFAULT_DECAY, } } diff --git a/tools/echo-server/Cargo.toml b/tools/echo-server/Cargo.toml index dbdf44c056e..279f4c8fa7a 100644 --- a/tools/echo-server/Cargo.toml +++ b/tools/echo-server/Cargo.toml @@ -9,6 +9,11 @@ edition.workspace = true license.workspace = true rust-version.workspace = true + +[[bin]] +name = "echo-server" +path = "src/echo-server.rs" + [dependencies] anyhow.workspace = true dashmap.workspace = true @@ -24,3 +29,11 @@ bytecodec = { workspace = true } nym-sdk = { path = "../../sdk/rust/nym-sdk/" } bytes.workspace = true dirs.workspace = true +clap.workspace = true +nym-bin-common = { path = "../../common/bin-common", features = [ + "basic_tracing", + "output_format", +] } +nym-crypto = { path = "../../common/crypto", features = ["asymmetric"] } +futures = { workspace = true } +tempfile.workspace = true diff --git a/tools/echo-server/README.md b/tools/echo-server/README.md index 71d9f263c92..e1fb8841367 100644 --- a/tools/echo-server/README.md +++ b/tools/echo-server/README.md @@ -2,8 +2,9 @@ This is an initial minimal implementation of an echo server built using the `NymProxyServer` Rust SDK abstraction. -## Usage -``` -cargo build --release -../../target/release/echo-server e.g. ../../target/release/echo-server 9000 ../../envs/canary.env -``` +TODO +- [ ] proper readme +- [ ] workspace +- [ ] clippy +- [ ] make utils exportable + import properly +- [ ] child tokens diff --git a/tools/echo-server/src/echo-server.rs b/tools/echo-server/src/echo-server.rs new file mode 100644 index 00000000000..c192860b3df --- /dev/null +++ b/tools/echo-server/src/echo-server.rs @@ -0,0 +1,47 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use anyhow::Result; +use clap::Parser; +use echo_server::NymEchoServer; +use nym_crypto::asymmetric::ed25519; +use tracing::info; + +#[derive(Parser, Debug)] +struct Args { + /// Optional gateway to use + #[clap(short, long)] + gateway: Option, + + /// Optional config path to specify + #[clap(short, long)] + config_path: Option, + + /// Env file + #[clap(short, long)] + env: String, + + /// Listen port + #[clap(short, long, default_value = "8080")] + listen_port: String, +} + +#[tokio::main] +async fn main() -> Result<()> { + nym_bin_common::logging::setup_tracing_logger(); + let args = Args::parse(); + let mut echo_server = NymEchoServer::new( + args.gateway, + args.config_path.as_deref(), + args.env, + args.listen_port.as_str(), + ) + .await?; + + let echo_addr = echo_server.nym_address().await; + info!("listening on {echo_addr}"); + + echo_server.run().await?; + + Ok(()) +} diff --git a/tools/echo-server/src/lib.rs b/tools/echo-server/src/lib.rs new file mode 100644 index 00000000000..b5dbdef5de1 --- /dev/null +++ b/tools/echo-server/src/lib.rs @@ -0,0 +1,285 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use anyhow::Result; +use nym_crypto::asymmetric::ed25519; +use nym_sdk::mixnet::Recipient; +use nym_sdk::tcp_proxy; +use nym_sdk::tcp_proxy::NymProxyServer; +use std::fmt::Debug; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use tokio::io::AsyncWriteExt; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::Mutex; +use tokio::task; +use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info}; + +const METRICS_TICK: u8 = 6; // Tempo of metrics logging in seconds + +#[derive(Debug)] +pub struct Metrics { + total_conn: AtomicU64, + bytes_recv: AtomicU64, + bytes_sent: AtomicU64, +} + +impl Metrics { + fn new() -> Self { + Self { + total_conn: AtomicU64::new(0), + bytes_recv: AtomicU64::new(0), + bytes_sent: AtomicU64::new(0), + } + } +} + +pub struct NymEchoServer { + client: Arc>, + listen_addr: String, + metrics: Arc, + cancel_token: CancellationToken, + client_shutdown_tx: tokio::sync::mpsc::Sender<()>, // This is the shutdown signal for the TcpProxyServer instance + shutdown_tx: tokio::sync::mpsc::Sender<()>, // These are the shutdown signals for the EchoServer + shutdown_rx: tokio::sync::mpsc::Receiver<()>, +} + +impl NymEchoServer { + pub async fn new( + gateway: Option, + config_path: Option<&str>, + env: String, // TODO make Option + listen_port: &str, + ) -> Result { + let home_dir = dirs::home_dir().expect("Unable to get home directory"); + let default_path = format!("{}/tmp/nym-proxy-server-config", home_dir.display()); + let config_path = config_path.unwrap_or(&default_path); + let listen_addr = format!("127.0.0.1:{}", listen_port); + + let client = Arc::new(Mutex::new( + tcp_proxy::NymProxyServer::new(&listen_addr, &config_path, Some(env.clone()), gateway) + .await?, + )); + + let client_shutdown_tx = client.lock().await.disconnect_signal(); + + let (shutdown_tx, shutdown_rx) = tokio::sync::mpsc::channel(1); + + Ok(NymEchoServer { + client, + listen_addr, + metrics: Arc::new(Metrics::new()), + cancel_token: CancellationToken::new(), + client_shutdown_tx, + shutdown_tx, + shutdown_rx, + }) + } + + pub async fn run(&mut self) -> Result<()> { + let cancel_token = self.cancel_token.clone(); + + let mut interval = + tokio::time::interval(tokio::time::Duration::from_secs(METRICS_TICK as u64)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + let client = Arc::clone(&self.client); + task::spawn(async move { + client.lock().await.run_with_shutdown().await?; + Ok::<(), anyhow::Error>(()) + }); + + let all_metrics = Arc::clone(&self.metrics); + + let listener = TcpListener::bind(self.listen_addr.clone()).await?; + debug!("{listener:?}"); + + let mut shutdown_rx = + std::mem::replace(&mut self.shutdown_rx, tokio::sync::mpsc::channel(1).1); + + loop { + tokio::select! { + Some(()) = shutdown_rx.recv() => { + info!("Disconnect signal received"); + self.cancel_token.cancel(); + info!("Cancel token cancelled: killing handle_incoming loops"); + self.client_shutdown_tx.send(()).await?; + info!("Sent shutdown signal to ProxyServer instance"); + break; + } + stream = listener.accept() => { + let (stream, _) = stream?; + info!("Handling new stream"); + let connection_metrics = Arc::clone(&self.metrics); + connection_metrics.total_conn.fetch_add(1, Ordering::Relaxed); + + tokio::spawn(NymEchoServer::handle_incoming( + stream, connection_metrics, cancel_token.clone() + )); + } + _ = interval.tick() => { + info!("Metrics: total_connections_since_start={}, bytes_received={}, bytes_sent={}", + all_metrics.total_conn.load(Ordering::Relaxed), + all_metrics.bytes_recv.load(Ordering::Relaxed), + all_metrics.bytes_sent.load(Ordering::Relaxed), + ); + } + } + } + self.shutdown_rx = shutdown_rx; + Ok(()) + } + + async fn handle_incoming( + socket: TcpStream, + metrics: Arc, + cancel_token: CancellationToken, + ) { + let (read, mut write) = socket.into_split(); + let codec = tokio_util::codec::BytesCodec::new(); + let mut framed_read = tokio_util::codec::FramedRead::new(read, codec); + + loop { + tokio::select! { + Some(result) = framed_read.next() => { + match result { + Ok(bytes) => { + let len = bytes.len(); + metrics.bytes_recv.fetch_add(len as u64, Ordering::Relaxed); + if let Err(e) = write.write_all(&bytes).await { + error!("Failed to write to stream with err: {}", e); + break; + } + metrics.bytes_sent.fetch_add(len as u64, Ordering::Relaxed); + } + Err(e) => { + error!("Failed to read from stream with err: {}", e); + break; + } + } + } + _ = cancel_token.cancelled() => { + info!("Shutdown signal received, closing connection"); + break; + } + } + } + + info!("Connection closed"); + } + + pub fn disconnect_signal(&self) -> tokio::sync::mpsc::Sender<()> { + self.shutdown_tx.clone() + } + + pub async fn nym_address(&self) -> Recipient { + self.client.lock().await.nym_address().clone() + } + + pub fn listen_addr(&self) -> String { + self.listen_addr.clone() + } + + pub fn metrics(&self) -> Arc { + self.metrics.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + use nym_sdk::mixnet::{IncludedSurbs, MixnetClient, MixnetMessageSender}; + #[path = "utils.rs"] + mod utils; + use tempfile::TempDir; + use utils::{Payload, ProxiedMessage}; + + #[tokio::test] + async fn shutdown_works() -> Result<()> { + let config_dir = TempDir::new()?; + let mut echo_server = NymEchoServer::new( + None, + Some(config_dir.path().to_str().unwrap()), + "../../envs/mainnet.env".to_string(), + "9000", + ) + .await + .unwrap(); + + // Getter for shutdown signal + let shutdown_tx = echo_server.disconnect_signal(); + + let server_handle = tokio::spawn(async move { echo_server.run().await.unwrap() }); + + // Let it start up + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + + // Kill server + shutdown_tx.send(()).await?; + + // Wait for shutdown in handle + server_handle.await?; + + Ok(()) + } + + #[tokio::test] + async fn echoes_bytes() -> Result<()> { + let config_dir = TempDir::new()?; + let mut echo_server = NymEchoServer::new( + None, + Some(config_dir.path().to_str().unwrap()), + "../../envs/mainnet.env".to_string(), + "9000", + ) + .await + .unwrap(); + + let echo_addr = echo_server.nym_address().await; + + tokio::task::spawn(async move { + echo_server.run().await.unwrap(); + }); + + let session_id = uuid::Uuid::new_v4(); + let message_id = 0; + let outgoing = ProxiedMessage::new( + Payload::Data("test".as_bytes().to_vec()), + session_id, + message_id, + ); + let coded_message = bincode::serialize(&outgoing).unwrap(); + + let mut client = MixnetClient::connect_new().await.unwrap(); + let sender = client.split_sender(); + let sending_task_handle = tokio::spawn(async move { + sender + .send_message(echo_addr, &coded_message, IncludedSurbs::Amount(10)) + .await + .unwrap(); + }); + + let receiving_task_handle = tokio::spawn(async move { + if let Some(received) = client.next().await { + let incoming: ProxiedMessage = bincode::deserialize(&received.message).unwrap(); + assert_eq!(outgoing.message, incoming.message); + } + client.disconnect().await; + }); + + sending_task_handle.await.unwrap(); + receiving_task_handle.await.unwrap(); + + Ok(()) + } + + // #[tokio::test] + // async fn creates_a_valid_nym_addr_with_specified_gw() { + // todo!() + // // check valid + // // parse end + // } +} diff --git a/tools/echo-server/src/main.rs b/tools/echo-server/src/main.rs deleted file mode 100644 index 28b0a7b96d8..00000000000 --- a/tools/echo-server/src/main.rs +++ /dev/null @@ -1,161 +0,0 @@ -use anyhow::Result; -use bytes::Bytes; -use nym_sdk::tcp_proxy; -use std::env; -use std::fs; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; -use tokio::io::AsyncWriteExt; -use tokio::net::{TcpListener, TcpStream}; -use tokio::signal; -use tokio::sync::broadcast; -use tokio::task; -use tokio_stream::StreamExt; -use tracing::{error, info, warn}; - -struct Metrics { - total_conn: AtomicU64, - active_conn: AtomicU64, - bytes_recv: AtomicU64, - bytes_sent: AtomicU64, -} - -impl Metrics { - fn new() -> Self { - Self { - total_conn: AtomicU64::new(0), - active_conn: AtomicU64::new(0), - bytes_recv: AtomicU64::new(0), - bytes_sent: AtomicU64::new(0), - } - } -} - -#[tokio::main] -async fn main() -> Result<()> { - // if you run this with DEBUG you see the msg buffer on the ProxyServer, but its quite chatty - tracing_subscriber::fmt() - .with_max_level(tracing::Level::INFO) - .init(); - - let server_port = env::args() - .nth(1) - .expect("Server listen port not specified"); - let tcp_addr = format!("127.0.0.1:{}", server_port); - - // This dir gets cleaned up at the end: NOTE if you switch env between tests without letting the file do the automatic cleanup, make sure to manually remove this directory up before running again, otherwise your client will attempt to use these keys for the new env - let home_dir = dirs::home_dir().expect("Unable to get home directory"); - let conf_path = format!("{}/tmp/nym-proxy-server-config", home_dir.display()); - - let env_path = env::args().nth(2).expect("Env file not specified"); - let env = env_path.to_string(); - - let mut proxy_server = tcp_proxy::NymProxyServer::new(&tcp_addr, &conf_path, Some(env.clone())) - .await - .unwrap(); - let proxy_nym_addr = *proxy_server.nym_address(); - info!("ProxyServer listening out on {}", proxy_nym_addr); - - task::spawn(async move { - proxy_server.run_with_shutdown().await?; - Ok::<(), anyhow::Error>(()) - }); - - let (shutdown_sender, _) = broadcast::channel(1); - let metrics = Arc::new(Metrics::new()); - let all_metrics = Arc::clone(&metrics); - - tokio::spawn(async move { - loop { - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - info!( - "Metrics: total_connections={}, active_connections={}, bytes_received={}, bytes_sent={}", - all_metrics.total_conn.load(Ordering::Relaxed), - all_metrics.active_conn.load(Ordering::Relaxed), - all_metrics.bytes_recv.load(Ordering::Relaxed), - all_metrics.bytes_sent.load(Ordering::Relaxed), - ); - } - }); - - let listener = TcpListener::bind(tcp_addr).await?; - - loop { - tokio::select! { - _ = signal::ctrl_c() => { - info!("Shutdown signal received, closing server..."); - let _ = shutdown_sender.send(()); - // TODO we need something like this for the ProxyServer client - break; - } - Ok((socket, _)) = listener.accept() => { - let connection_metrics = Arc::clone(&metrics); - let shutdown_rx = shutdown_sender.subscribe(); - connection_metrics.total_conn.fetch_add(1, Ordering::Relaxed); - connection_metrics.active_conn.fetch_add(1, Ordering::Relaxed); - tokio::spawn(async move { - handle_incoming(socket, connection_metrics, shutdown_rx).await; - }); - } - } - } - - signal::ctrl_c().await?; - info!("Received CTRL+C"); - fs::remove_dir_all(conf_path)?; - while metrics.active_conn.load(Ordering::Relaxed) > 0 { - info!("Waiting on active connections to close: sleeping 100ms"); - // TODO some kind of hard kill here for the ProxyServer - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - } - Ok(()) -} - -async fn handle_incoming( - socket: TcpStream, - metrics: Arc, - mut shutdown_rx: broadcast::Receiver<()>, -) { - let (read, mut write) = socket.into_split(); - let codec = tokio_util::codec::BytesCodec::new(); - let mut framed_read = tokio_util::codec::FramedRead::new(read, codec); - - loop { - tokio::select! { - Some(result) = framed_read.next() => { - match result { - Ok(bytes) => { - let len = bytes.len(); - metrics.bytes_recv.fetch_add(len as u64, Ordering::Relaxed); - if let Err(e) = write.write_all(&bytes).await { - error!("Failed to write to stream with err: {}", e); - break; - } - metrics.bytes_sent.fetch_add(len as u64, Ordering::Relaxed); - } - Err(e) => { - error!("Failed to read from stream with err: {}", e); - break; - } - } - } - _ = shutdown_rx.recv() => { - warn!("Shutdown signal received, closing connection"); - break; - } - // TODO need to work out a way that if this timesout and breaks but you dont hang up the conn on the client end you can reconnect..maybe. If we just use this as a ping echo server I dont think this is a problem - // EDIT I'm not actually sure we want this functionality? Measuring active connections might be useful though - _ = tokio::time::sleep(tokio::time::Duration::from_secs(120)) => { - info!("Timeout reached, assuming we wont get more messages on this conn, closing"); - let close_message = "Closing conn, reconnect if you want to ping again"; - let bytes: Bytes = close_message.into(); - write.write_all(&bytes).await.expect("Couldn't write to socket"); - break; - } - } - } - metrics - .active_conn - .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); - info!("Connection closed"); -} diff --git a/tools/echo-server/src/tests/utils.rs b/tools/echo-server/src/tests/utils.rs new file mode 100644 index 00000000000..508a0462fd2 --- /dev/null +++ b/tools/echo-server/src/tests/utils.rs @@ -0,0 +1,57 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use serde::{Deserialize, Serialize}; +use std::fmt; +use uuid::Uuid; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct ProxiedMessage { + pub message: Payload, + session_id: Uuid, + message_id: u16, +} + +impl ProxiedMessage { + pub fn new(message: Payload, session_id: Uuid, message_id: u16) -> Self { + ProxiedMessage { + message, + session_id, + message_id, + } + } + + pub fn message(&self) -> &Payload { + &self.message + } + + pub fn session_id(&self) -> Uuid { + self.session_id + } + + pub fn message_id(&self) -> u16 { + self.message_id + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub enum Payload { + Data(Vec), + Close, +} + +impl fmt::Display for ProxiedMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let message = match self.message() { + Payload::Data(ref data) => format!("Data({})", data.len()), + Payload::Close => "Close".to_string(), + }; + write!( + f, + "ProxiedMessage {{ message: {}, session_id: {}, message_id: {} }}", + message, + self.session_id(), + self.message_id() + ) + } +} diff --git a/tools/internal/mixnet-check-all-gateways/.gitignore b/tools/internal/mixnet-check-all-gateways/.gitignore new file mode 100644 index 00000000000..3a63d5fc45f --- /dev/null +++ b/tools/internal/mixnet-check-all-gateways/.gitignore @@ -0,0 +1,2 @@ +src/results/* +scratch diff --git a/tools/internal/mixnet-check-all-gateways/Cargo.toml b/tools/internal/mixnet-check-all-gateways/Cargo.toml new file mode 100644 index 00000000000..7c6ffc5f546 --- /dev/null +++ b/tools/internal/mixnet-check-all-gateways/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "mixnet-check-all-gateways" +version = "0.1.0" +authors.workspace = true +repository.workspace = true +homepage.workspace = true +documentation.workspace = true +edition.workspace = true +license.workspace = true +rust-version.workspace = true +readme.workspace = true + +[dependencies] +nym-bin-common = { path = "../../../common/bin-common", features = [ + "basic_tracing", + "output_format", +] } +nym-crypto = { path = "../../../common/crypto", features = ["asymmetric"] } +clap.workspace = true +anyhow.workspace = true +tracing.workspace = true +tracing-subscriber = "0.3" +nym-sdk = { path = "../../../sdk/rust/nym-sdk/" } +echo-server = { path = "../../echo-server" } +serde = { version = "1", features = ["derive"] } +tokio = { workspace = true, features = ["full"] } +reqwest = { workspace = true, features = ["json"] } +serde_json.workspace = true +futures = { workspace = true } +uuid = { version = "1", features = ["v4", "serde"] } +bincode = "1.0" +dirs.workspace = true +tokio-util = { workspace = true } +tempfile.workspace = true diff --git a/tools/internal/mixnet-check-all-gateways/src/main.rs b/tools/internal/mixnet-check-all-gateways/src/main.rs new file mode 100644 index 00000000000..b87090bcbb9 --- /dev/null +++ b/tools/internal/mixnet-check-all-gateways/src/main.rs @@ -0,0 +1,316 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use anyhow::{anyhow, Result}; +use echo_server::NymEchoServer; +use futures::stream::StreamExt; +use nym_bin_common::logging::setup_logging; +use nym_crypto::asymmetric::ed25519; +use nym_sdk::mixnet; +use nym_sdk::mixnet::{IncludedSurbs, MixnetMessageSender}; +use reqwest::{self, Url}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::fs; +use std::fs::OpenOptions; +use std::io::Write; +use std::time::Duration; +use tokio::signal; +use tokio::time; +use tokio::time::timeout; +#[path = "utils.rs"] +// TODO make these exportable from tcp_proxy module and then import from there, ditto with echo server lib +mod utils; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; +use utils::{Payload, ProxiedMessage}; + +const TIMEOUT: u64 = 10; // message ping timeout +const MESSAGE: &str = "echo test"; + +#[derive(Serialize, Deserialize, Debug)] +struct TestResult { + entry_gw: String, + exit_gw: String, + error: TestError, +} + +#[derive(Serialize, Deserialize, Debug)] +enum TestError { + Timeout, + NoMessage, + None, + CouldNotCreateEchoServer(String), + Other(String), +} + +#[tokio::main] +async fn main() -> Result<()> { + setup_logging(); // TODO think about parsing and noise here if make it concurrent. Could just parse on errors from all libs and then have info from here? echo server metrics + info logging from this code + error logs from elsewhere should be ok + let entry_gw_keys = reqwest_and_parse( + Url::parse("https://validator.nymtech.net/api/v1/unstable/nym-nodes/skimmed/entry-gateways/all?no_legacy=true").unwrap(), + "EntryGateway", + ) + .await?; + debug!( + "got {} entry gws: \n{:?}", + entry_gw_keys.len(), + entry_gw_keys + ); + info!("got {} entry gws", entry_gw_keys.len(),); + + let exit_gw_keys = reqwest_and_parse( + Url::parse( + "https://validator.nymtech.net/api/v1/unstable/nym-nodes/skimmed/exit-gateways/all?no_legacy=true", + ) + .unwrap(), + "ExitGateway", + ) + .await?; + debug!( + "got {} exit gws: \n{:?}\n", + exit_gw_keys.len(), + exit_gw_keys + ); + info!("got {} exit gws", exit_gw_keys.len(),); + + let mut port_range: u64 = 9000; // Port that we start iterating upwards from, will go from port_range to (port_range + exit_gws.len()) by the end of the run. This was made configurable presuming at some point we'd try make this run concurrently for speedup. + + let cancel_token = CancellationToken::new(); + let watcher_token = cancel_token.clone(); + tokio::spawn(async move { + signal::ctrl_c().await?; + println!("CTRL_C received"); + watcher_token.cancel(); + Ok::<(), anyhow::Error>(()) + }); + + let start = SystemTime::now(); + let time_now = start.duration_since(UNIX_EPOCH).unwrap().as_secs(); + + for exit_gw in exit_gw_keys.clone() { + let loop_token = cancel_token.child_token(); + let inner_loop_token = cancel_token.child_token(); + if loop_token.is_cancelled() { + break; + } + let thread_token = cancel_token.child_token(); + let last_check_token = thread_token.clone(); + + if !fs::metadata(format!("./src/results/{}", time_now)) + .map(|metadata| metadata.is_dir()) + .unwrap_or(false) + { + fs::create_dir_all(format!("./src/results/{}", time_now))?; + } + + info!("creating echo server connecting to {}", exit_gw); + + let filepath = format!("./src/results/{}/{}.json", time_now, exit_gw.clone()); + let mut results = OpenOptions::new() + .read(true) + .write(true) // .append(true) + .create(true) + .open(filepath)?; + + let mut results_vec: Vec = Vec::new(); + let home_dir = dirs::home_dir().expect("Unable to get home directory"); + let mut echo_server = match NymEchoServer::new( + Some(ed25519::PublicKey::from_base58_string(&exit_gw)?), + Some( + format!( + "{}/tmp/nym-proxy-server-config-{}", + home_dir.display(), + &exit_gw + ) + .as_str(), + ), + "../../../envs/mainnet.env".to_string(), // TODO replace with None + port_range.to_string().as_str(), + ) + .await + { + Ok(echo_server) => echo_server, + Err(err) => { + let res = TestResult { + entry_gw: "".to_string(), + exit_gw: exit_gw.clone(), + error: TestError::CouldNotCreateEchoServer(err.to_string()), + }; + results_vec.push(json!(res)); + let json_array = json!(results_vec); + info!("{json_array}"); + results.write_all(json_array.to_string().as_bytes())?; + continue; + } + }; + port_range += 1; + + let echo_disconnect_signal = echo_server.disconnect_signal(); + let echo_addr = echo_server.nym_address().await; + debug!("echo addr: {echo_addr}"); + + tokio::task::spawn(async move { + loop { + tokio::select! { + _ = thread_token.cancelled() => { + info!("loop over; disconnecting echo server {}", echo_addr.clone()); + echo_disconnect_signal.send(()).await?; + break; + } + _ = echo_server.run() => {} + } + } + Ok::<(), anyhow::Error>(()) + }); + + // dumb sleep to let it startup + time::sleep(Duration::from_secs(5)).await; + + for entry_gw in entry_gw_keys.clone() { + if inner_loop_token.is_cancelled() { + info!("Inner loop cancelled"); + break; + } + let builder = mixnet::MixnetClientBuilder::new_ephemeral() + .request_gateway(entry_gw.clone()) + .build()?; + + let mut client = match builder.connect_to_mixnet().await { + Ok(client) => client, + Err(err) => { + let res = TestResult { + entry_gw: entry_gw.clone(), + exit_gw: exit_gw.clone(), + error: TestError::Other(err.to_string()), + }; + info!("{res:#?}"); + results_vec.push(json!(res)); + continue; + } + }; + + let test_address = client.nym_address(); + info!("currently testing entry gateway: {test_address}"); + + // Has to be ProxiedMessage for the moment which is slightly annoying until I + // modify the ProxyServer to just stupidly echo back whatever it gets in a + // ReconstructedMessage format if it can't deseralise incoming traffic to a ProxiedMessage + let session_id = uuid::Uuid::new_v4(); + let message_id = 0; + let outgoing = ProxiedMessage::new( + Payload::Data(MESSAGE.as_bytes().to_vec()), + session_id, + message_id, + ); + let coded_message = bincode::serialize(&outgoing).unwrap(); + + match client + .send_message(echo_addr, &coded_message, IncludedSurbs::Amount(30)) + .await + { + Ok(_) => { + debug!("Message sent"); + } + Err(err) => { + let res = TestResult { + entry_gw: entry_gw.clone(), + exit_gw: exit_gw.clone(), + error: TestError::Other(err.to_string()), + }; + info!("{res:#?}"); + results_vec.push(json!(res)); + continue; + } + }; + + let res = match timeout(Duration::from_secs(TIMEOUT), client.next()).await { + Err(_timeout) => { + warn!("timed out"); + TestResult { + entry_gw: entry_gw.clone(), + exit_gw: exit_gw.clone(), + error: TestError::Timeout, + } + } + Ok(Some(received)) => { + let incoming: ProxiedMessage = bincode::deserialize(&received.message).unwrap(); + info!("got echo: {:?}", incoming); + debug!( + "sent message as lazy ref until I properly sort the utils for comparison: {:?}", + MESSAGE.as_bytes() + ); + debug!("incoming message: {:?}", incoming.message); + TestResult { + entry_gw: entry_gw.clone(), + exit_gw: exit_gw.clone(), + error: TestError::None, + } + } + Ok(None) => { + info!("failed to receive any message back..."); + TestResult { + entry_gw: entry_gw.clone(), + exit_gw: exit_gw.clone(), + error: TestError::NoMessage, + } + } + }; + debug!("{res:#?}"); + results_vec.push(json!(res)); + debug!("disconnecting the client before shutting down..."); + client.disconnect().await; + + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + info!("{}", &entry_gw); + + if Some(&entry_gw) == entry_gw_keys.last() { + last_check_token.cancel(); + } + } + let json_array = json!(results_vec); + debug!("{json_array}"); + results.write_all(json_array.to_string().as_bytes())?; + } + Ok(()) +} + +async fn reqwest_and_parse(endpoint: Url, key: &str) -> Result> { + let response = reqwest::get(endpoint).await?; + let json: Value = response.json().await?; + let filtered_keys = filter_gateway_keys(&json, key)?; + Ok(filtered_keys) +} + +fn filter_gateway_keys(json: &Value, key: &str) -> Result> { + let mut filtered_keys = Vec::new(); + + if let Some(nodes) = json["nodes"]["data"].as_array() { + for node in nodes { + if let Some(performance) = node.get("performance").and_then(|v| v.as_str()) { + let performance_value: f64 = performance.parse().unwrap_or(0.0); // TODO make this configurable? + + let inactive = node.get("role").and_then(|v| v.as_str()) == Some("Inactive"); + + if let Some(role) = node.get("role").and_then(|v| v.as_str()) { + let is_correct_gateway = role == key; + debug!("node addr: {:?}", node); + debug!("perf score: {}", performance_value); + debug!("status: {}", inactive); + if performance_value > 0.0 && !inactive && is_correct_gateway { + if let Some(gateway_identity_key) = + node.get("ed25519_identity_pubkey").and_then(|v| v.as_str()) + { + filtered_keys.push(gateway_identity_key.to_string()); + } + } + } + } + } + } else { + // TODO make error and return / break + return Err(anyhow!("Could not parse any gateways")); + } + Ok(filtered_keys) +} diff --git a/tools/internal/mixnet-check-all-gateways/src/utils.rs b/tools/internal/mixnet-check-all-gateways/src/utils.rs new file mode 100644 index 00000000000..0735b009db6 --- /dev/null +++ b/tools/internal/mixnet-check-all-gateways/src/utils.rs @@ -0,0 +1,54 @@ +use serde::{Deserialize, Serialize}; +use std::fmt; +use uuid::Uuid; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct ProxiedMessage { + pub message: Payload, + session_id: Uuid, + message_id: u16, +} + +impl ProxiedMessage { + pub fn new(message: Payload, session_id: Uuid, message_id: u16) -> Self { + ProxiedMessage { + message, + session_id, + message_id, + } + } + + pub fn message(&self) -> &Payload { + &self.message + } + + pub fn session_id(&self) -> Uuid { + self.session_id + } + + pub fn message_id(&self) -> u16 { + self.message_id + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub enum Payload { + Data(Vec), + Close, +} + +impl fmt::Display for ProxiedMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let message = match self.message() { + Payload::Data(ref data) => format!("Data({})", data.len()), + Payload::Close => "Close".to_string(), + }; + write!( + f, + "ProxiedMessage {{ message: {}, session_id: {}, message_id: {} }}", + message, + self.session_id(), + self.message_id() + ) + } +}