From 3c191bb9cf982800e52893c5cd92ae4c3c71b25e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Onur=20=C3=96zkan?= Date: Mon, 7 Oct 2024 12:02:11 +0300 Subject: [PATCH] check status of peers in kdf network (#25) * require kdf connection string in the config file Signed-off-by: onur-ozkan * add {serializer, deserializer} wrappers for `RpcClient` Signed-off-by: onur-ozkan * implement expirable hashmap Signed-off-by: onur-ozkan * implement peer status check logic into the middleware Signed-off-by: onur-ozkan * check if KDF is available on app initialization Signed-off-by: onur-ozkan * update kdf rpc module Signed-off-by: onur-ozkan * allow dead-code for various `expirable_map` functions Signed-off-by: onur-ozkan * move `peer_connection_healthcheck` priority Signed-off-by: onur-ozkan * update execution flow docs Signed-off-by: onur-ozkan * update drawio document file Signed-off-by: onur-ozkan * Update README.md * keep `RpcSocketPayload` private Signed-off-by: onur-ozkan * update README Signed-off-by: onur-ozkan * Update README.md * extend configuration interface with `peer_healthcheck_caching_secs` Signed-off-by: onur-ozkan * update README Signed-off-by: onur-ozkan * sync the upstream changes Signed-off-by: onur-ozkan * update proxy_signature Signed-off-by: onur-ozkan * sync upstream expirable map impl Signed-off-by: onur-ozkan * exclude nightly pipeline from runner Signed-off-by: onur-ozkan --------- Signed-off-by: onur-ozkan --- .github/workflows/pipelines.yml | 10 +- Cargo.lock | 9 +- Cargo.toml | 3 +- README.md | 54 +++++----- assets/.config_test | 2 + docs/arch.drawio | 111 ++++++++++++-------- src/ctx.rs | 46 ++++++++- src/expirable_map.rs | 174 ++++++++++++++++++++++++++++++++ src/main.rs | 6 ++ src/net/kdf_rpc_interface.rs | 26 +++++ src/net/rpc.rs | 22 ++-- src/proxy/http/get.rs | 4 +- src/proxy/http/mod.rs | 83 ++++++++++++++- 13 files changed, 450 insertions(+), 100 deletions(-) create mode 100644 src/expirable_map.rs create mode 100644 src/net/kdf_rpc_interface.rs diff --git a/.github/workflows/pipelines.yml b/.github/workflows/pipelines.yml index de336c7..f82f011 100644 --- a/.github/workflows/pipelines.yml +++ b/.github/workflows/pipelines.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - rust: [nightly, beta, stable] + rust: [beta, stable] steps: - uses: actions/checkout@v2 @@ -35,7 +35,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - rust: [nightly, beta, stable] + rust: [beta, stable] steps: - uses: actions/checkout@v2 @@ -58,7 +58,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - rust: [nightly, beta, stable] + rust: [beta, stable] steps: - uses: actions/checkout@v2 @@ -82,7 +82,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest] - rust: [nightly, beta, stable] + rust: [beta, stable] steps: - uses: actions/checkout@v2 @@ -103,7 +103,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest] - rust: [nightly, beta, stable] + rust: [beta, stable] steps: - uses: actions/checkout@v2 diff --git a/Cargo.lock b/Cargo.lock index 9861d54..7130fb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1067,6 +1067,7 @@ dependencies = [ "once_cell", "proxy_signature", "redis", + "rustc-hash", "serde", "serde_json", "sha3", @@ -1404,7 +1405,7 @@ dependencies = [ [[package]] name = "proxy_signature" version = "0.1.0" -source = "git+https://github.com/KomodoPlatform/komodo-defi-framework?rev=9ebc006#9ebc0065350c735d5ba209f634f48a9e24063243" +source = "git+https://github.com/KomodoPlatform/komodo-defi-framework?branch=dev#e65fefe5adfdff6e3d1fb0d31b65e5ef78b2e437" dependencies = [ "chrono", "http", @@ -1571,6 +1572,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hex" version = "2.1.0" diff --git a/Cargo.toml b/Cargo.toml index 24dfac3..8703128 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ log = "0.4.17" once_cell = "1.12.0" url = { version = "2.2.2", features = ["serde"] } redis = { version = "0.21.5", default-features = false, features = ["tokio-comp"] } +rustc-hash = "1.1.0" serde = "1.0.137" serde_json = { version = "1.0.81", features = ["preserve_order", "raw_value"] } sha3 = "0.9" @@ -28,7 +29,7 @@ tokio = { version = "1.12.0", default-features = false, features = ["macros", "r tokio-tungstenite = { version = "0.20.0", features = ["native-tls"] } # From our sources libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.4", default-features = false, features = ["identify"] } -proxy_signature = { git = "https://github.com/KomodoPlatform/komodo-defi-framework", rev = "9ebc006" } +proxy_signature = { git = "https://github.com/KomodoPlatform/komodo-defi-framework", branch = "dev" } [target.x86_64-unknown-linux-gnu.dependencies] jemallocator = "0.5.0" diff --git a/README.md b/README.md index ecfda32..7ef2697 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,7 @@ +# Komodo Defi Proxy + +Decentralized P2P applications have some limitations by their nature and one of them is the use application/API keys. If an API key is used in the application, any user could retrieve it by simply debugging the app. Some of the blockchain services we use in [komodo-defi-framework](https://github.com/KomodoPlatform/komodo-defi-framework) are paid services and we want to prevent abuse, such as users copying the API key for personal use. To address this problem, we created this project, komodo-defi-proxy. It takes the request, handles the API key, forwards the request to the actual service, and returns the result without modifying the original request. This keeps our secret application keys secure and hidden from end users. + ### Dev Requirements Creating rsa key pairs @@ -16,12 +20,14 @@ Create the configuration file for app runtime. "pubkey_path": "/path_to_publick_key.pem", "privkey_path": "/path_to_private_key.pem", "redis_connection_string": "redis://localhost", + "kdf_rpc_client": "http://127.0.0.1:7783", + "kdf_rpc_password": "testpass", "token_expiration_time": 300, "proxy_routes": [ { "inbound_route": "/dev", "outbound_route": "http://localhost:8000", - "proxy_type": "quicknode", + "proxy_type": "quicknode", # available types are: "quicknode", "moralis", "block_pi" "authorized": false, "allowed_rpc_methods": [ "eth_blockNumber", @@ -36,20 +42,18 @@ Create the configuration file for app runtime. "rp_15_min": 200, "rp_30_min": 350, "rp_60_min": 575 - } + }, + "peer_healthcheck_caching_secs": 10 } ``` Expose configuration file's path as an environment variable in `AUTH_APP_CONFIG_PATH`. -***Important Note:*** The environment where the application will be deployed, the timezone MUST be as UTC. Also, make sure redis is version `6.*` - -### Architecture (TODO: OUTDATED) - -![arch2](https://github.com/KomodoPlatform/komodo-defi-proxy/assets/39852038/be7fe7ae-2f2a-4f68-afa8-ce4938c570a7) +***Important Note:*** Make sure redis is version `7.*` +### Architecture -**Execution flow (TODO: OUTDATED):** +![2024-09-09_14-09](https://github.com/user-attachments/assets/2775d73e-8003-4bfe-89e1-2c64da9e3004) 1) Client sends the request. @@ -58,29 +62,17 @@ Expose configuration file's path as an environment variable in `AUTH_APP_CONFIG_ 3) If the incoming request comes from the same network, step 4 will be by-passed. 4) Request Handling in the Middleware: + - **Status Checker**: + - **Blocked**: Return `403 Forbidden`. + - **Allowed**: Process continues with the rate limiter. + - **Trusted**: Bypass rate limiter and proof of funding. - **For Quicknode:** - - **Status Checker**: - - **Blocked**: Return `403 Forbidden` immediately. - - **Allowed**: Process continues with the rate limiter. - - **Trusted**: Bypass rate limiter and proof of funding. + - **Peer Status Checker**: + - The requesting peer must be active in the KDF network. Validate this by executing the `peer_connection_healthcheck` KDF RPC. If the peer is not connected to the network, return `401 Unauthorized`. - - **Rate Limiter**: - - First, verify the signed message. If not valid, return `401 Unauthorized` immediately. - - If valid, calculate the request count with the time interval specified in the application configuration. If the wallet address has sent too many requests than the expected amount, process continues with the proof of funding. If not, bypass the proof of funding. - - - **Proof of Funding**: - - Return `406 Not Acceptable` if the wallet has a 0 balance. Otherwise, assume the request is valid and process it as usual. - - **For Moralis:** - - **Status Checker**: - - **Blocked**: Return `403 Forbidden` immediately. - - **Allowed**: Process continues with the rate limiter. - - **Trusted**: Bypass the rate limiter. - - - **Rate Limiter**: - - First, verify the signed message. If not valid, return `401 Unauthorized` immediately. - - If valid, calculate the request count with the time interval specified in the application configuration. If the wallet address has sent too many requests, return an error `406 Not Acceptable` indicating that the wallet address must wait for some time before making more requests. + - **Rate Limiter**: + - First, verify the signed message. If not valid, return `401 Unauthorized`. + - If valid, calculate the request count with the time interval specified in the application configuration. If the wallet address has sent too many requests than the expected amount, process continues with the proof of funding. If not, bypass the proof of funding. 5) Find target route by requested endpoint. @@ -102,7 +94,7 @@ curl -v --url "'$mm2_address'" -s --data '{ "params": { "ticker": "ETH", "nodes": [ - {"url": "'$atomicdex_gui_auth_address'", "gui_auth": true } + {"url": "'$atomicdex_gui_auth_address'", "komodo_proxy": true } ], "swap_contract_address": "0x24ABE4c71FC658C91313b6552cd40cD808b3Ea80", "erc20_tokens_requests": [ @@ -140,4 +132,4 @@ If you want to test features locally, you can run Docker containers using Docker 4. **Stop the Containers**: ```sh docker compose down - ``` \ No newline at end of file + ``` diff --git a/assets/.config_test b/assets/.config_test index c8b140f..c289c2f 100644 --- a/assets/.config_test +++ b/assets/.config_test @@ -1,6 +1,8 @@ { "port": 6150, "redis_connection_string": "redis://redis:6379", + "kdf_rpc_client": "http://127.0.0.1:7783", + "kdf_rpc_password": "testpass", "pubkey_path": "/usr/src/komodo-defi-proxy/assets/.pubkey_test", "privkey_path": "/usr/src/komodo-defi-proxy/assets/.privkey_test", "token_expiration_time": 300, diff --git a/docs/arch.drawio b/docs/arch.drawio index 0a2e96e..e40c5df 100644 --- a/docs/arch.drawio +++ b/docs/arch.drawio @@ -1,126 +1,155 @@ - + - + - + - + - + - + - + - + - + - - + + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + - + - - + + - + - - + + + + + + + + - + + + + + + + + + + + + + - + - - + - - + + - + + + + + + + diff --git a/src/ctx.rs b/src/ctx.rs index 7bece5f..2a48dd2 100644 --- a/src/ctx.rs +++ b/src/ctx.rs @@ -1,13 +1,19 @@ use hyper::Uri; use once_cell::sync::OnceCell; use proxy::ProxyType; -use serde::{Deserialize, Serialize}; +use rpc::RpcClient; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::env; pub(crate) use super::*; const DEFAULT_TOKEN_EXPIRATION_TIME: i64 = 3600; pub(crate) const DEFAULT_PORT: u16 = 5000; + +const fn default_peer_caching_secs() -> u64 { + 10 +} + static CONFIG: OnceCell = OnceCell::new(); pub(crate) fn get_app_config() -> &'static AppConfig { @@ -16,6 +22,21 @@ pub(crate) fn get_app_config() -> &'static AppConfig { }) } +fn deserialize_rpc_client<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let connection_string = String::deserialize(deserializer)?; + Ok(RpcClient::new(connection_string)) +} + +fn serialize_rpc_client(v: &RpcClient, serializer: S) -> Result +where + S: Serializer, +{ + serializer.serialize_str(&v.url) +} + /// Configuration settings for the application, loaded typically from a JSON configuration file. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub(crate) struct AppConfig { @@ -23,6 +44,14 @@ pub(crate) struct AppConfig { pub(crate) port: Option, /// Redis database connection string. pub(crate) redis_connection_string: String, + /// RPC client for komodo-defi-framework. + #[serde( + serialize_with = "serialize_rpc_client", + deserialize_with = "deserialize_rpc_client" + )] + pub(crate) kdf_rpc_client: RpcClient, + /// `rpc_userpass` which is required for kdf RPCs. + pub(crate) kdf_rpc_password: String, /// File path to the public key used for user verification and authentication. pub(crate) pubkey_path: String, /// File path to the private key used for user verification and authentication. @@ -34,6 +63,13 @@ pub(crate) struct AppConfig { pub(crate) proxy_routes: Vec, /// The default rate limiting rules for maintaining the frequency of incoming traffic for per client. pub(crate) rate_limiter: RateLimiter, + /// The number of seconds to cache a known peer. + /// + /// When a peer is identified as connected with `peer_connection_healthcheck` RPC, + /// this value determines how long to cache that peer as known-peer to avoid + /// sending repeated `peer_connection_healthcheck` requests for every proxy request. + #[serde(default = "default_peer_caching_secs")] + pub(crate) peer_healthcheck_caching_secs: u64, } /// Defines a routing rule for proxying requests from an inbound route to an outbound URL @@ -107,6 +143,8 @@ pub(crate) fn get_app_config_test_instance() -> AppConfig { AppConfig { port: Some(6150), redis_connection_string: String::from("redis://redis:6379"), + kdf_rpc_client: RpcClient::new("http://127.0.0.1:7783".into()), + kdf_rpc_password: String::from("testpass"), pubkey_path: String::from("/usr/src/komodo-defi-proxy/assets/.pubkey_test"), privkey_path: String::from("/usr/src/komodo-defi-proxy/assets/.privkey_test"), token_expiration_time: Some(300), @@ -177,6 +215,7 @@ pub(crate) fn get_app_config_test_instance() -> AppConfig { rp_30_min: 555, rp_60_min: 555, }, + peer_healthcheck_caching_secs: 10, } } @@ -185,6 +224,8 @@ fn test_app_config_serialzation_and_deserialization() { let json_config = serde_json::json!({ "port": 6150, "redis_connection_string": "redis://redis:6379", + "kdf_rpc_client": "http://127.0.0.1:7783", + "kdf_rpc_password": "testpass", "pubkey_path": "/usr/src/komodo-defi-proxy/assets/.pubkey_test", "privkey_path": "/usr/src/komodo-defi-proxy/assets/.privkey_test", "token_expiration_time": 300, @@ -254,7 +295,8 @@ fn test_app_config_serialzation_and_deserialization() { "rp_15_min": 555, "rp_30_min": 555, "rp_60_min": 555 - } + }, + "peer_healthcheck_caching_secs": 10, }); let actual_config: AppConfig = serde_json::from_str(&json_config.to_string()).unwrap(); diff --git a/src/expirable_map.rs b/src/expirable_map.rs new file mode 100644 index 0000000..35ef3da --- /dev/null +++ b/src/expirable_map.rs @@ -0,0 +1,174 @@ +//! This module provides a cross-compatible map that associates values with keys and supports expiring entries. +//! +//! Designed for performance-oriented use-cases utilizing `FxHashMap` under the hood, +//! and is not suitable for cryptographic purposes. + +#![allow(dead_code)] + +use rustc_hash::FxHashMap; +use std::{ + collections::BTreeMap, + hash::Hash, + time::{Duration, Instant}, +}; + +#[derive(Clone, Debug)] +pub struct ExpirableEntry { + pub(crate) value: V, + pub(crate) expires_at: Instant, +} + +impl ExpirableEntry { + #[inline(always)] + pub fn new(v: V, exp: Duration) -> Self { + Self { + expires_at: Instant::now() + exp, + value: v, + } + } + + #[inline(always)] + pub fn get_element(&self) -> &V { + &self.value + } + + #[inline(always)] + pub fn update_value(&mut self, v: V) { + self.value = v + } + + #[inline(always)] + pub fn update_expiration(&mut self, expires_at: Instant) { + self.expires_at = expires_at + } + + /// Checks whether entry has longer ttl than the given one. + #[inline(always)] + pub fn has_longer_life_than(&self, min_ttl: Duration) -> bool { + self.expires_at > Instant::now() + min_ttl + } +} + +impl Default for ExpirableMap { + fn default() -> Self { + Self::new() + } +} + +/// A map that allows associating values with keys and expiring entries. +/// It is important to note that this implementation does not have a background worker to +/// automatically clear expired entries. Outdated entries are only removed when the control flow +/// is handed back to the map mutably (i.e. some mutable method of the map is invoked). +/// +/// WARNING: This is designed for performance-oriented use-cases utilizing `FxHashMap` +/// under the hood and is not suitable for cryptographic purposes. +#[derive(Clone, Debug)] +pub struct ExpirableMap { + map: FxHashMap>, + /// A sorted inverse map from expiration times to keys to speed up expired entries clearing. + expiries: BTreeMap, +} + +impl ExpirableMap { + /// Creates a new empty `ExpirableMap` + #[inline] + pub fn new() -> Self { + Self { + map: FxHashMap::default(), + expiries: BTreeMap::new(), + } + } + + /// Returns the associated value if present and not expired. + #[inline] + pub fn get(&self, k: &K) -> Option<&V> { + self.map + .get(k) + .filter(|v| v.expires_at > Instant::now()) + .map(|v| &v.value) + } + + /// Removes a key-value pair from the map and returns the associated value if present and not expired. + #[inline] + pub fn remove(&mut self, k: &K) -> Option { + self.map + .remove(k) + .filter(|v| v.expires_at > Instant::now()) + .map(|v| { + self.expiries.remove(&v.expires_at); + v.value + }) + } + + /// Inserts a key-value pair with an expiration duration. + /// + /// If a value already exists for the given key, it will be updated and then + /// the old one will be returned. + pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option { + self.clear_expired_entries(); + let entry = ExpirableEntry::new(v, exp); + self.expiries.insert(entry.expires_at, k); + self.map.insert(k, entry).map(|v| v.value) + } + + /// Removes expired entries from the map. + /// + /// Iterates through the `expiries` in order, removing entries that have expired. + /// Stops at the first non-expired entry, leveraging the sorted nature of `BTreeMap`. + fn clear_expired_entries(&mut self) { + let now = Instant::now(); + + // `pop_first()` is used here as it efficiently removes expired entries. + // `first_key_value()` was considered as it wouldn't need re-insertion for + // non-expired entries, but it would require an extra remove operation for + // each expired entry. `pop_first()` needs only one re-insertion per call, + // which is an acceptable trade-off compared to multiple remove operations. + while let Some((exp, key)) = self.expiries.pop_first() { + if exp > now { + self.expiries.insert(exp, key); + break; + } + self.map.remove(&key); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + async fn test_clear_expired_entries() { + let mut expirable_map = ExpirableMap::new(); + let value = "test_value"; + let exp = Duration::from_secs(1); + + // Insert 2 entries with 1 sec expiration time + expirable_map.insert("key1", value, exp); + expirable_map.insert("key2", value, exp); + + // Wait for entries to expire + tokio::time::sleep(Duration::from_secs(2)).await; + + // Clear expired entries + expirable_map.clear_expired_entries(); + + // We waited for 2 seconds, so we shouldn't have any entry accessible + assert_eq!(expirable_map.map.len(), 0); + + // Insert 5 entries + expirable_map.insert("key1", value, Duration::from_secs(5)); + expirable_map.insert("key2", value, Duration::from_secs(4)); + expirable_map.insert("key3", value, Duration::from_secs(7)); + expirable_map.insert("key4", value, Duration::from_secs(2)); + expirable_map.insert("key5", value, Duration::from_millis(3750)); + + // Wait 2 seconds to expire some entries + tokio::time::sleep(Duration::from_secs(2)).await; + + // Clear expired entries + expirable_map.clear_expired_entries(); + + // We waited for 2 seconds, only one entry should expire + assert_eq!(expirable_map.map.len(), 4); + } +} diff --git a/src/main.rs b/src/main.rs index 6902fc5..1c859af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,17 @@ use ctx::get_app_config; use db::get_redis_connection; +use kdf_rpc_interface::version_rpc; use server::serve; #[path = "security/address_status.rs"] mod address_status; mod ctx; mod db; +mod expirable_map; #[path = "security/jwt.rs"] mod jwt; +#[path = "net/kdf_rpc_interface.rs"] +mod kdf_rpc_interface; mod logger; mod proxy; #[path = "security/rate_limiter.rs"] @@ -37,5 +41,7 @@ async fn main() -> GenericResult<()> { // to panic if redis is not available get_redis_connection(cfg).await; + version_rpc(cfg).await.expect("KDF is not available."); + serve(cfg).await } diff --git a/src/net/kdf_rpc_interface.rs b/src/net/kdf_rpc_interface.rs new file mode 100644 index 0000000..fa09dcb --- /dev/null +++ b/src/net/kdf_rpc_interface.rs @@ -0,0 +1,26 @@ +use crate::{ctx::AppConfig, GenericResult}; + +pub(crate) async fn version_rpc(cfg: &AppConfig) -> GenericResult { + let payload = serde_json::json!({ + "userpass": cfg.kdf_rpc_password, + "method": "version", + }); + + cfg.kdf_rpc_client.send(cfg, payload, false).await +} + +pub(crate) async fn peer_connection_healthcheck_rpc( + cfg: &AppConfig, + peer_address: &str, +) -> GenericResult { + let payload = serde_json::json!({ + "userpass": cfg.kdf_rpc_password, + "method": "peer_connection_healthcheck", + "mmrpc": "2.0", + "params": { + "peer_address": peer_address + } + }); + + cfg.kdf_rpc_client.send(cfg, payload, false).await +} diff --git a/src/net/rpc.rs b/src/net/rpc.rs index 6897d82..3194ba6 100644 --- a/src/net/rpc.rs +++ b/src/net/rpc.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] // TODO: remove this - use bytes::Buf; use ctx::AppConfig; use hyper::{body::aggregate, header, Body, Request}; @@ -12,9 +10,7 @@ use crate::proxy::{insert_jwt_to_http_header, APPLICATION_JSON}; use super::*; -pub(crate) type Json = serde_json::Value; - -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub(crate) struct RpcClient { pub(crate) url: String, } @@ -30,7 +26,7 @@ pub(crate) enum Id { #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub(crate) struct RpcPayload { pub(crate) method: String, - pub(crate) params: serde_json::value::Value, + pub(crate) params: serde_json::Value, pub(crate) id: Id, pub(crate) jsonrpc: String, } @@ -40,11 +36,11 @@ pub(crate) struct RpcPayload { /// for authentication and validation, facilitating secure and validated interactions with the Quicknode service. #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub(crate) struct RpcSocketPayload { - pub(crate) method: String, - pub(crate) params: serde_json::value::Value, - pub(crate) id: Id, - pub(crate) jsonrpc: String, - pub(crate) proxy_sign: ProxySign, + method: String, + params: serde_json::Value, + id: Id, + jsonrpc: String, + proxy_sign: ProxySign, } impl RpcSocketPayload { @@ -68,9 +64,9 @@ impl RpcClient { pub(crate) async fn send( &self, cfg: &AppConfig, - payload: Json, + payload: serde_json::Value, is_authorized: bool, - ) -> GenericResult { + ) -> GenericResult { let mut req = Request::post(&self.url).body(Body::from(payload.to_string()))?; req.headers_mut() .append(header::CONTENT_TYPE, APPLICATION_JSON.parse()?); diff --git a/src/proxy/http/get.rs b/src/proxy/http/get.rs index 3cae11d..c0db5a9 100644 --- a/src/proxy/http/get.rs +++ b/src/proxy/http/get.rs @@ -149,7 +149,9 @@ mod tests { ); assert_eq!(deserialized_proxy_sign, proxy_sign); - assert!(deserialized_proxy_sign.is_valid_message()); + assert!( + deserialized_proxy_sign.is_valid_message(crate::proxy::http::MAX_SIGNATURE_EXP_SECS) + ); let additional_headers = &[ header::CONTENT_LENGTH, diff --git a/src/proxy/http/mod.rs b/src/proxy/http/mod.rs index 5c46c45..dd63a59 100644 --- a/src/proxy/http/mod.rs +++ b/src/proxy/http/mod.rs @@ -1,12 +1,15 @@ -use std::net::SocketAddr; - use hyper::{StatusCode, Uri}; +use libp2p::PeerId; use proxy_signature::ProxySign; +use std::{net::SocketAddr, str::FromStr, sync::LazyLock, time::Duration}; +use tokio::sync::Mutex; use crate::{ address_status::{AddressStatus, AddressStatusOperations}, ctx::{AppConfig, ProxyRoute}, db::Db, + expirable_map::ExpirableMap, + kdf_rpc_interface::peer_connection_healthcheck_rpc, logger::tracked_log, rate_limiter::RateLimitOperations, }; @@ -14,7 +17,8 @@ use crate::{ pub(crate) mod get; pub(crate) mod post; -// TODO: Query peers on KDF seeds +const MAX_SIGNATURE_EXP_SECS: u64 = 15; + pub(crate) async fn validation_middleware( cfg: &AppConfig, signed_message: &ProxySign, @@ -28,7 +32,9 @@ pub(crate) async fn validation_middleware( AddressStatus::Trusted => Ok(()), AddressStatus::Blocked => Err(StatusCode::FORBIDDEN), AddressStatus::None => { - if !signed_message.is_valid_message() { + peer_connection_healthcheck(cfg, signed_message, req_uri, remote_addr).await?; + + if !signed_message.is_valid_message(MAX_SIGNATURE_EXP_SECS) { tracked_log( log::Level::Warn, remote_addr.ip(), @@ -93,6 +99,73 @@ pub(crate) async fn validation_middleware( } } +async fn peer_connection_healthcheck( + cfg: &AppConfig, + signed_message: &ProxySign, + req_uri: &Uri, + remote_addr: &SocketAddr, +) -> Result<(), StatusCode> { + // Once we know a peer is connected to the KDF network, we can assume they are connected + // for 10 seconds without asking again. + let know_peer_expiration = Duration::from_secs(cfg.peer_healthcheck_caching_secs); + + static KNOWN_PEERS: LazyLock>> = + LazyLock::new(|| Mutex::new(ExpirableMap::new())); + + let mut know_peers = KNOWN_PEERS.lock().await; + + let Ok(peer_id) = PeerId::from_str(&signed_message.address) else { + tracked_log( + log::Level::Warn, + remote_addr.ip(), + &signed_message.address, + req_uri, + format!( + "Peer id '{}' isn't valid, returning 401", + signed_message.address + ), + ); + return Err(StatusCode::UNAUTHORIZED); + }; + + let is_known = know_peers.get(&peer_id).is_some(); + + if !is_known { + match peer_connection_healthcheck_rpc(cfg, &signed_message.address).await { + Ok(response) => { + if response["result"] == serde_json::json!(true) { + know_peers.insert(peer_id, (), know_peer_expiration); + } else { + tracked_log( + log::Level::Warn, + remote_addr.ip(), + &signed_message.address, + req_uri, + "Peer isn't connected to KDF network, returning 401", + ); + + return Err(StatusCode::UNAUTHORIZED); + } + } + Err(error) => { + tracked_log( + log::Level::Error, + remote_addr.ip(), + &signed_message.address, + req_uri, + format!( + "`peer_connection_healthcheck` RPC failed, returning 500. Error: {}", + error + ), + ); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use hyper::{header, Body, Request, StatusCode}; @@ -201,7 +274,7 @@ mod tests { ); assert_eq!(deserialized_proxy_sign, proxy_sign); - assert!(deserialized_proxy_sign.is_valid_message()); + assert!(deserialized_proxy_sign.is_valid_message(MAX_SIGNATURE_EXP_SECS)); let additional_headers = &[ header::CONTENT_LENGTH,