diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 075ff1dd6..b1bc21e4e 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "e89acea9bd04dd56b7348acd3f61745f139fb8fa5ec97035a22ba430f0a0eefa", + "checksum": "ce7c31601716f140663a3d2a302a4fae2d302be9f11b2f84d42912131fdf5655", "crates": { "addr2line 0.20.0": { "name": "addr2line", @@ -532,6 +532,36 @@ }, "license": "BSD-3-Clause" }, + "antidote 1.0.0": { + "name": "antidote", + "version": "1.0.0", + "repository": { + "Http": { + "url": "https://crates.io/api/v1/crates/antidote/1.0.0/download", + "sha256": "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" + } + }, + "targets": [ + { + "Library": { + "crate_name": "antidote", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "antidote", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "edition": "2015", + "version": "1.0.0" + }, + "license": "MIT/Apache-2.0" + }, "anyhow 1.0.72": { "name": "anyhow", "version": "1.0.72", @@ -682,6 +712,53 @@ }, "license": "MIT OR Apache-2.0" }, + "async-recursion 1.0.4": { + "name": "async-recursion", + "version": "1.0.4", + "repository": { + "Http": { + "url": "https://crates.io/api/v1/crates/async-recursion/1.0.4/download", + "sha256": "0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba" + } + }, + "targets": [ + { + "ProcMacro": { + "crate_name": "async_recursion", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "async_recursion", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "deps": { + "common": [ + { + "id": "proc-macro2 1.0.66", + "target": "proc_macro2" + }, + { + "id": "quote 1.0.31", + "target": "quote" + }, + { + "id": "syn 2.0.26", + "target": "syn" + } + ], + "selects": {} + }, + "edition": "2018", + "version": "1.0.4" + }, + "license": "MIT OR Apache-2.0" + }, "async-trait 0.1.71": { "name": "async-trait", "version": "0.1.71", @@ -2539,6 +2616,10 @@ "id": "hyper 0.14.27", "target": "hyper" }, + { + "id": "hyper-boring 2.1.2", + "target": "hyper_boring" + }, { "id": "ipnet 2.8.0", "target": "ipnet" @@ -2685,6 +2766,10 @@ "edition": "2021", "proc_macro_deps": { "common": [ + { + "id": "async-recursion 1.0.4", + "target": "async_recursion" + }, { "id": "async-trait 0.1.71", "target": "async_trait" @@ -6196,6 +6281,7 @@ "h2", "http1", "http2", + "runtime", "server", "socket2", "tcp" @@ -6276,6 +6362,87 @@ }, "license": "MIT" }, + "hyper-boring 2.1.2": { + "name": "hyper-boring", + "version": "2.1.2", + "repository": { + "Git": { + "remote": "https://github.com/Watfaq/boring.git", + "commitish": { + "Branch": "bazel" + }, + "strip_prefix": "hyper-boring" + } + }, + "targets": [ + { + "Library": { + "crate_name": "hyper_boring", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "hyper_boring", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "crate_features": { + "common": [ + "default", + "runtime" + ], + "selects": {} + }, + "deps": { + "common": [ + { + "id": "antidote 1.0.0", + "target": "antidote" + }, + { + "id": "boring 2.1.0", + "target": "boring" + }, + { + "id": "http 0.2.9", + "target": "http" + }, + { + "id": "hyper 0.14.27", + "target": "hyper" + }, + { + "id": "linked_hash_set 0.1.4", + "target": "linked_hash_set" + }, + { + "id": "once_cell 1.18.0", + "target": "once_cell" + }, + { + "id": "tokio 1.29.1", + "target": "tokio" + }, + { + "id": "tokio-boring 2.1.5", + "target": "tokio_boring" + }, + { + "id": "tower-layer 0.3.2", + "target": "tower_layer" + } + ], + "selects": {} + }, + "edition": "2018", + "version": "2.1.2" + }, + "license": "MIT/Apache-2.0" + }, "idna 0.2.3": { "name": "idna", "version": "0.2.3", @@ -7304,6 +7471,45 @@ }, "license": "MIT/Apache-2.0" }, + "linked_hash_set 0.1.4": { + "name": "linked_hash_set", + "version": "0.1.4", + "repository": { + "Http": { + "url": "https://crates.io/api/v1/crates/linked_hash_set/0.1.4/download", + "sha256": "47186c6da4d81ca383c7c47c1bfc80f4b95f4720514d860a5407aaf4233f9588" + } + }, + "targets": [ + { + "Library": { + "crate_name": "linked_hash_set", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "linked_hash_set", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "deps": { + "common": [ + { + "id": "linked-hash-map 0.5.6", + "target": "linked_hash_map" + } + ], + "selects": {} + }, + "edition": "2018", + "version": "0.1.4" + }, + "license": "Apache-2.0" + }, "linux-raw-sys 0.3.8": { "name": "linux-raw-sys", "version": "0.3.8", diff --git a/Cargo.lock b/Cargo.lock index 68dfd8726..e3d06f968 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,6 +112,12 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "antidote" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" + [[package]] name = "anyhow" version = "1.0.72" @@ -136,6 +142,17 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "async-recursion" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.26", +] + [[package]] name = "async-trait" version = "0.1.71" @@ -467,6 +484,7 @@ version = "0.1.0" dependencies = [ "aes-gcm 0.10.2", "anyhow", + "async-recursion", "async-trait", "atty", "base64 0.21.2", @@ -487,6 +505,7 @@ dependencies = [ "http", "httparse", "hyper", + "hyper-boring", "ipnet", "libc", "lru_time_cache", @@ -1202,6 +1221,22 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-boring" +version = "2.1.2" +source = "git+https://github.com/Watfaq/boring.git?branch=bazel#d58d4367cb21a44d301d295b9259f755eb302041" +dependencies = [ + "antidote", + "boring", + "http", + "hyper", + "linked_hash_set", + "once_cell", + "tokio", + "tokio-boring", + "tower-layer", +] + [[package]] name = "idna" version = "0.2.3" @@ -1397,6 +1432,15 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linked_hash_set" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47186c6da4d81ca383c7c47c1bfc80f4b95f4720514d860a5407aaf4233f9588" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "linux-raw-sys" version = "0.3.8" diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index 21b341c98..9deff2de8 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -15,6 +15,7 @@ async-trait = "0.1" anyhow = "1.0" futures = "0.3" bytes = "1.1" +async-recursion = "1" ipnet = "2.5" url = "2.2" regex = "1" @@ -34,6 +35,7 @@ base64 = "0.21" uuid = { version = "1.2.1", features = ["v4", "fast-rng", "macro-diagnostics"] } boring = { git = "https://github.com/Watfaq/boring.git", branch = "bazel" } boring-sys = { git = "https://github.com/Watfaq/boring.git", branch = "bazel" } +hyper-boring = { git = "https://github.com/Watfaq/boring.git", branch = "bazel" } tokio-boring = { git = "https://github.com/Watfaq/boring.git", branch = "bazel" } crc32fast = "1.3.2" brotli = "3.3.4" diff --git a/clash_lib/src/app/dispatcher.rs b/clash_lib/src/app/dispatcher.rs index 996aedfef..a9d0a3586 100644 --- a/clash_lib/src/app/dispatcher.rs +++ b/clash_lib/src/app/dispatcher.rs @@ -66,7 +66,7 @@ impl Dispatcher { info!("remote connection established {}", sess); match copy_bidirectional(&mut lhs, &mut rhs).await { Ok((up, down)) => { - info!( + debug!( "connection {} closed with {} bytes up, {} bytes down", sess, up, down ); diff --git a/clash_lib/src/app/outbound/manager.rs b/clash_lib/src/app/outbound/manager.rs index f10f050e4..64a4d3cd3 100644 --- a/clash_lib/src/app/outbound/manager.rs +++ b/clash_lib/src/app/outbound/manager.rs @@ -64,7 +64,9 @@ impl OutboundManager { for outbound_group in outbound_groups.iter() { match outbound_group { - OutboundGroupProtocol::Relay(_proto) => {} + OutboundGroupProtocol::Relay(proto) => { + handlers.insert(proto.name.clone(), proto.try_into()?); + } OutboundGroupProtocol::UrlTest(_proto) => todo!(), OutboundGroupProtocol::Fallback(_proto) => todo!(), OutboundGroupProtocol::LoadBalance(_proto) => todo!(), diff --git a/clash_lib/src/app/proxy_manager/providers/http_vehicle.rs b/clash_lib/src/app/proxy_manager/providers/http_vehicle.rs index 7f1b03451..9d63a64fa 100644 --- a/clash_lib/src/app/proxy_manager/providers/http_vehicle.rs +++ b/clash_lib/src/app/proxy_manager/providers/http_vehicle.rs @@ -1,70 +1,15 @@ use super::{ProviderVehicle, ProviderVehicleType}; use crate::app::ThreadSafeDNSResolver; use crate::common::errors::map_io_error; -use crate::proxy::utils::new_tcp_stream; -use crate::proxy::AnyStream; -use async_trait::async_trait; +use crate::common::http::{new_http_client, HttpClient, LocalConnector}; -use hyper::client::connect::{Connected, Connection}; +use async_trait::async_trait; -use hyper::service::Service; use hyper::{body, Uri}; -use std::future::Future; use std::io; use std::path::{Path, PathBuf}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -#[derive(Clone)] -struct LocalConnector(pub ThreadSafeDNSResolver); - -impl Service for LocalConnector { - type Response = AnyStream; - type Error = io::Error; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, remote: Uri) -> Self::Future { - let host = remote - .host() - .expect(format!("invalid url: {}", remote.to_string()).as_str()) - .to_owned(); - - let dns = self.0.clone(); - - Box::pin(async move { - new_tcp_stream( - dns, - host.as_str(), - remote.port_u16().unwrap_or(match remote.scheme_str() { - None => 80, - Some(s) => match s { - s if s == "http" => 80, - s if s == "https" => 443, - _ => panic!("invalid url: {}", remote), - }, - }), - None, - #[cfg(any(target_os = "linux", target_os = "android"))] - None, - ) - .await - }) - } -} - -impl Connection for AnyStream { - fn connected(&self) -> Connected { - Connected::new() - } -} - -type HttpClient = hyper::Client; pub struct Vehicle { pub url: Uri, @@ -78,9 +23,7 @@ impl Vehicle { path: P, dns_resolver: ThreadSafeDNSResolver, ) -> Self { - let connector = LocalConnector(dns_resolver); - - let client = hyper::Client::builder().build::<_, hyper::Body>(connector); + let client = new_http_client(dns_resolver).expect("failed to create http client"); Self { url: url.into(), path: path.as_ref().to_path_buf(), diff --git a/clash_lib/src/app/router/mmdb.rs b/clash_lib/src/app/router/mmdb.rs index a83144830..15b3abaff 100644 --- a/clash_lib/src/app/router/mmdb.rs +++ b/clash_lib/src/app/router/mmdb.rs @@ -1,25 +1,125 @@ -use std::{net::IpAddr, path::Path}; +use std::{fs, io::Write, net::IpAddr, path::Path}; +use async_recursion::async_recursion; +use hyper::body::HttpBody; use maxminddb::geoip2; +use tracing::{debug, info, warn}; -use crate::{common::errors::map_io_error, Error}; +use crate::{ + common::{ + errors::{map_io_error, new_io_error}, + http::HttpClient, + }, + Error, +}; pub struct MMDB { reader: maxminddb::Reader>, } impl MMDB { - pub fn new>(path: P) -> anyhow::Result { + pub async fn new>( + path: P, + download_url: Option, + http_client: HttpClient, + ) -> anyhow::Result { + debug!("mmdb path: {}", path.as_ref().to_string_lossy()); + let cwd = std::env::current_dir().unwrap(); - let reader = maxminddb::Reader::open_readfile(&path).map_err(|x| { - Error::InvalidConfig(format!( - "cant open mmdb `{}/{}`: {}", - cwd.to_string_lossy(), - path.as_ref().to_string_lossy(), - x.to_string() - )) - })?; - Ok(MMDB { reader }) + let mmdb_file = Path::new(&cwd).join(&path); + + if !mmdb_file.exists() { + if let Some(url) = download_url.as_ref() { + info!("downloading mmdb from {}", url); + Self::download(url, &mmdb_file, &http_client).await?; + } else { + return Err(Error::InvalidConfig(format!( + "mmdb `{}/{}` not found and mmdb_download_url is not set", + cwd.to_string_lossy(), + path.as_ref().to_string_lossy() + )) + .into()); + } + } + + match maxminddb::Reader::open_readfile(&path) { + Ok(r) => Ok(MMDB { reader: r }), + Err(e) => match e { + maxminddb::MaxMindDBError::InvalidDatabaseError(_) + | maxminddb::MaxMindDBError::IoError(_) => { + warn!( + "invalid mmdb `{}/{}`: {}, trying to download again", + cwd.to_string_lossy(), + path.as_ref().to_string_lossy(), + e.to_string() + ); + + // try to download again + fs::remove_file(&mmdb_file)?; + if let Some(url) = download_url.as_ref() { + info!("downloading mmdb from {}", url); + Self::download(url, &mmdb_file, &http_client).await?; + Ok(MMDB { + reader: maxminddb::Reader::open_readfile(&path)?, + }) + } else { + return Err(Error::InvalidConfig(format!( + "mmdb `{}/{}` not found and mmdb_download_url is not set", + cwd.to_string_lossy(), + path.as_ref().to_string_lossy() + )) + .into()); + } + } + _ => Err(Error::InvalidConfig(format!( + "cant open mmdb `{}/{}`: {}", + cwd.to_string_lossy(), + path.as_ref().to_string_lossy(), + e.to_string() + )) + .into()), + }, + } + } + + #[async_recursion(?Send)] + async fn download>( + url: &str, + path: P, + http_client: &HttpClient, + ) -> anyhow::Result<()> { + let uri = url.parse::()?; + let mut out = std::fs::File::create(&path)?; + + let mut res = http_client.get(uri).await?; + + if res.status().is_redirection() { + return Self::download( + res.headers() + .get("Location") + .ok_or(new_io_error( + format!("failed to download from {}", url).as_str(), + ))? + .to_str()?, + path, + http_client, + ) + .await; + } + + if !res.status().is_success() { + return Err( + Error::InvalidConfig(format!("mmdb download failed: {}", res.status())).into(), + ); + } + + debug!("downloading mmdb to {}", path.as_ref().to_string_lossy()); + + while let Some(chunk) = res.body_mut().data().await { + out.write_all(&chunk?)?; + } + + Ok(()) } pub fn lookup(&self, ip: IpAddr) -> anyhow::Result { diff --git a/clash_lib/src/app/router/mod.rs b/clash_lib/src/app/router/mod.rs index ae9eb85e2..46032b44a 100644 --- a/clash_lib/src/app/router/mod.rs +++ b/clash_lib/src/app/router/mod.rs @@ -6,19 +6,21 @@ use crate::app::router::rules::ruleset::RuleSet; use crate::app::router::rules::RuleMatcher; use crate::app::ThreadSafeDNSResolver; +use crate::common::http::new_http_client; use crate::config::internal::rule::Rule; use crate::session::{Session, SocksAddr}; use crate::app::router::rules::final_::Final; use std::sync::Arc; use tokio::sync::RwLock; +use tracing::info; mod mmdb; mod rules; pub struct Router { rules: Vec>, - dns_client: ThreadSafeDNSResolver, + dns_resolver: ThreadSafeDNSResolver, } pub type ThreadSafeRouter = Arc>; @@ -26,8 +28,18 @@ pub type ThreadSafeRouter = Arc>; const MATCH: &str = "MATCH"; impl Router { - pub fn new(rules: Vec, dns_client: ThreadSafeDNSResolver, mmdb_path: String) -> Self { - let mmdb = Arc::new(mmdb::MMDB::new(mmdb_path).expect("failed to load mmdb")); + pub async fn new( + rules: Vec, + dns_resolver: ThreadSafeDNSResolver, + mmdb_path: String, + mmdb_download_url: Option, + ) -> Self { + let client = new_http_client(dns_resolver.clone()).expect("failed to create http client"); + let mmdb = Arc::new( + mmdb::MMDB::new(mmdb_path, mmdb_download_url, client) + .await + .expect("failed to load mmdb"), + ); Self { rules: rules @@ -97,7 +109,7 @@ impl Router { Rule::Match { target } => Box::new(Final { target }), }) .collect(), - dns_client, + dns_resolver, } } @@ -108,7 +120,7 @@ impl Router { for r in self.rules.iter() { if sess.destination.is_domain() && r.should_resolve_ip() && !sess_resolved { if let Ok(ip) = self - .dns_client + .dns_resolver .resolve(sess.destination.domain().unwrap()) .await { @@ -120,6 +132,7 @@ impl Router { } if r.apply(&sess_dup) { + info!("matched {} to target {}", &sess_dup, r.target()); return r.target(); } } diff --git a/clash_lib/src/common/http.rs b/clash_lib/src/common/http.rs new file mode 100644 index 000000000..84f705773 --- /dev/null +++ b/clash_lib/src/common/http.rs @@ -0,0 +1,78 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use boring::ssl::{SslConnector, SslMethod}; +use futures::Future; +use http::Uri; +use hyper::client::connect::{Connected, Connection}; +use hyper_boring::HttpsConnector; +use tower::Service; + +use crate::{ + app::ThreadSafeDNSResolver, + proxy::{utils::new_tcp_stream, AnyStream}, +}; + +use super::errors::map_io_error; + +#[derive(Clone)] +pub struct LocalConnector(pub ThreadSafeDNSResolver); + +impl Service for LocalConnector { + type Response = AnyStream; + type Error = std::io::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, remote: Uri) -> Self::Future { + let host = remote + .host() + .expect(format!("invalid url: {}", remote.to_string()).as_str()) + .to_owned(); + + let dns = self.0.clone(); + + Box::pin(async move { + new_tcp_stream( + dns, + host.as_str(), + remote.port_u16().unwrap_or(match remote.scheme_str() { + None => 80, + Some(s) => match s { + s if s == "http" => 80, + s if s == "https" => 443, + _ => panic!("invalid url: {}", remote), + }, + }), + None, + #[cfg(any(target_os = "linux", target_os = "android"))] + None, + ) + .await + }) + } +} + +impl Connection for AnyStream { + fn connected(&self) -> Connected { + Connected::new() + } +} + +pub type HttpClient = hyper::Client>; + +pub fn new_http_client(dns_resolver: ThreadSafeDNSResolver) -> std::io::Result { + let connector = LocalConnector(dns_resolver); + + let mut ssl = SslConnector::builder(SslMethod::tls()).map_err(map_io_error)?; + ssl.set_alpn_protos(b"\x02h2\x08http/1.1") + .map_err(map_io_error)?; + + let connector = HttpsConnector::with_connector(connector, ssl).map_err(map_io_error)?; + Ok(hyper::Client::builder().build::<_, hyper::Body>(connector)) +} diff --git a/clash_lib/src/common/mod.rs b/clash_lib/src/common/mod.rs index 35c6999cd..a7cb78a77 100644 --- a/clash_lib/src/common/mod.rs +++ b/clash_lib/src/common/mod.rs @@ -1,5 +1,6 @@ pub mod crypto; pub mod errors; +pub mod http; pub mod tls; pub mod trie; pub mod utils; diff --git a/clash_lib/src/config/def.rs b/clash_lib/src/config/def.rs index fff218f00..4073c0911 100644 --- a/clash_lib/src/config/def.rs +++ b/clash_lib/src/config/def.rs @@ -55,6 +55,7 @@ pub struct Config { pub rule: Vec, pub hosts: HashMap, pub mmdb: String, + pub mmdb_download_url: Option, /// these options has default vals, /// and needs extra processing @@ -130,6 +131,10 @@ impl Default for Config { proxy_group: Default::default(), rule: Default::default(), mmdb: "Country.mmdb".to_string(), + mmdb_download_url: Some( + "https://github.com/Loyalsoldier/geoip/releases/download/202307271745/Country.mmdb" + .to_owned(), + ), } } } diff --git a/clash_lib/src/config/internal/config.rs b/clash_lib/src/config/internal/config.rs index 5323f6e02..26a885776 100644 --- a/clash_lib/src/config/internal/config.rs +++ b/clash_lib/src/config/internal/config.rs @@ -69,6 +69,7 @@ impl TryFrom for Config { }), routing_mask: c.routing_mask, mmdb: c.mmdb.to_owned(), + mmdb_download_url: c.mmdb_download_url.to_owned(), }, dns: (&c).try_into()?, experimental: c.experimental, @@ -164,6 +165,7 @@ pub struct General { interface: Option, routing_mask: Option, pub mmdb: String, + pub mmdb_download_url: Option, } pub struct Profile { diff --git a/clash_lib/src/lib.rs b/clash_lib/src/lib.rs index 2d38642d4..afb6a2f3c 100644 --- a/clash_lib/src/lib.rs +++ b/clash_lib/src/lib.rs @@ -105,6 +105,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { let mut runners = Vec::new(); let default_dns_resolver = Arc::new(dns::Resolver::new(config.dns).await); + let outbound_manager = Arc::new(RwLock::new(OutboundManager::new( config .proxies @@ -125,11 +126,16 @@ async fn start_async(opts: Options) -> Result<(), Error> { default_dns_resolver.clone(), )?)); - let router = Arc::new(RwLock::new(Router::new( - config.rules, - default_dns_resolver.clone(), - config.general.mmdb, - ))); + let router = Arc::new(RwLock::new( + Router::new( + config.rules, + default_dns_resolver.clone(), + config.general.mmdb, + config.general.mmdb_download_url, + ) + .await, + )); + let dispatcher = Arc::new(Dispatcher::new( outbound_manager, router, diff --git a/clash_lib/src/proxy/converters/mod.rs b/clash_lib/src/proxy/converters/mod.rs new file mode 100644 index 000000000..481774dc8 --- /dev/null +++ b/clash_lib/src/proxy/converters/mod.rs @@ -0,0 +1,2 @@ +pub mod relay; +pub mod shadowsocks; diff --git a/clash_lib/src/proxy/converters/relay.rs b/clash_lib/src/proxy/converters/relay.rs new file mode 100644 index 000000000..63e2a4c9f --- /dev/null +++ b/clash_lib/src/proxy/converters/relay.rs @@ -0,0 +1,29 @@ +use crate::{ + config::internal::proxy::OutboundGroupRelay, + proxy::{ + relay::{Handler, HandlerOptions}, + AnyOutboundHandler, CommonOption, + }, +}; + +impl TryFrom for AnyOutboundHandler { + type Error = crate::Error; + + fn try_from(value: OutboundGroupRelay) -> Result { + (&value).try_into() + } +} + +impl TryFrom<&OutboundGroupRelay> for AnyOutboundHandler { + type Error = crate::Error; + + fn try_from(value: &OutboundGroupRelay) -> Result { + Ok(Handler::new( + HandlerOptions { + name: todo!(), + common_opts: todo!(), + }, + vec![], + )) + } +} diff --git a/clash_lib/src/proxy/converters/shadowsocks.rs b/clash_lib/src/proxy/converters/shadowsocks.rs new file mode 100644 index 000000000..203bd2c93 --- /dev/null +++ b/clash_lib/src/proxy/converters/shadowsocks.rs @@ -0,0 +1,61 @@ +use crate::{ + config::internal::proxy::OutboundShadowsocks, + proxy::{ + shadowsocks::{Handler, HandlerOptions, OBFSOption}, + AnyOutboundHandler, CommonOption, + }, + Error, +}; + +impl TryFrom for AnyOutboundHandler { + type Error = crate::Error; + + fn try_from(value: OutboundShadowsocks) -> Result { + (&value).try_into() + } +} + +impl TryFrom<&OutboundShadowsocks> for AnyOutboundHandler { + type Error = crate::Error; + + fn try_from(s: &OutboundShadowsocks) -> Result { + let h = Handler::new(HandlerOptions { + name: s.name.to_owned(), + common_opts: CommonOption::default(), + server: s.server.to_owned(), + port: s.port, + password: s.password.to_owned(), + cipher: s.cipher.to_owned(), + plugin_opts: match &s.plugin { + Some(plugin) => match plugin.as_str() { + "obfs" => s + .plugin_opts + .clone() + .ok_or(Error::InvalidConfig( + "plugin_opts is required for plugin obfs".to_owned(), + ))? + .try_into() + .map(|x| OBFSOption::Simple(x)) + .ok(), + "v2ray-plugin" => s + .plugin_opts + .clone() + .ok_or(Error::InvalidConfig( + "plugin_opts is required for plugin obfs".to_owned(), + ))? + .try_into() + .map(|x| OBFSOption::V2Ray(x)) + .ok(), + _ => { + return Err(Error::InvalidConfig(format!( + "unsupported plugin: {}", + plugin + ))); + } + }, + None => None, + }, + }); + Ok(h) + } +} diff --git a/clash_lib/src/proxy/mod.rs b/clash_lib/src/proxy/mod.rs index ca340fd0c..5c714e2a0 100644 --- a/clash_lib/src/proxy/mod.rs +++ b/clash_lib/src/proxy/mod.rs @@ -23,6 +23,8 @@ pub mod socks; pub mod utils; //pub mod vmess; +pub mod converters; + // proxy groups mod relay; @@ -40,8 +42,8 @@ pub enum ProxyError { Socks5(String), } -pub trait ProxyStream: AsyncRead + AsyncWrite + Send + Sync + Unpin {} -impl ProxyStream for T where T: AsyncRead + AsyncWrite + Send + Sync + Unpin {} +pub trait ProxyStream: AsyncRead + AsyncWrite + Send + Sync + Unpin + Debug {} +impl ProxyStream for T where T: AsyncRead + AsyncWrite + Send + Sync + Unpin + Debug {} pub type AnyStream = Box; pub trait InboundDatagram: diff --git a/clash_lib/src/proxy/relay/mod.rs b/clash_lib/src/proxy/relay/mod.rs index 014f46c22..f80ac3746 100644 --- a/clash_lib/src/proxy/relay/mod.rs +++ b/clash_lib/src/proxy/relay/mod.rs @@ -32,7 +32,11 @@ impl Handler { } async fn get_proxies(&self) -> Vec { - todo!("get proxies from providers") + futures::future::join_all(self.providers.iter().map(|x| x.proxies())) + .await + .into_iter() + .flatten() + .collect::>() } } diff --git a/clash_lib/src/proxy/shadowsocks/mod.rs b/clash_lib/src/proxy/shadowsocks/mod.rs index 75548e342..b95ad751b 100644 --- a/clash_lib/src/proxy/shadowsocks/mod.rs +++ b/clash_lib/src/proxy/shadowsocks/mod.rs @@ -1,5 +1,6 @@ mod datagram; mod obfs; +mod stream; mod v2ray; use async_trait::async_trait; @@ -18,7 +19,7 @@ use crate::{ }; use std::{collections::HashMap, io, sync::Arc}; -use self::datagram::OutboundDatagramShadowsocks; +use self::{datagram::OutboundDatagramShadowsocks, stream::ShadowSocksStream}; use super::{ utils::{new_tcp_stream, new_udp_socket}, @@ -148,59 +149,6 @@ impl Handler { } } -impl TryFrom for AnyOutboundHandler { - type Error = crate::Error; - - fn try_from(value: OutboundShadowsocks) -> Result { - (&value).try_into() - } -} - -impl TryFrom<&OutboundShadowsocks> for AnyOutboundHandler { - type Error = crate::Error; - - fn try_from(s: &OutboundShadowsocks) -> Result { - let h = Handler::new(HandlerOptions { - name: s.name.to_owned(), - common_opts: CommonOption::default(), - server: s.server.to_owned(), - port: s.port, - password: s.password.to_owned(), - cipher: s.cipher.to_owned(), - plugin_opts: match &s.plugin { - Some(plugin) => match plugin.as_str() { - "obfs" => s - .plugin_opts - .clone() - .ok_or(Error::InvalidConfig( - "plugin_opts is required for plugin obfs".to_owned(), - ))? - .try_into() - .map(|x| OBFSOption::Simple(x)) - .ok(), - "v2ray-plugin" => s - .plugin_opts - .clone() - .ok_or(Error::InvalidConfig( - "plugin_opts is required for plugin obfs".to_owned(), - ))? - .try_into() - .map(|x| OBFSOption::V2Ray(x)) - .ok(), - _ => { - return Err(Error::InvalidConfig(format!( - "unsupported plugin: {}", - plugin - ))); - } - }, - None => None, - }, - }); - Ok(h) - } -} - #[async_trait] impl OutboundHandler for Handler { fn name(&self) -> &str { @@ -279,7 +227,7 @@ impl OutboundHandler for Handler { (sess.destination.host(), sess.destination.port()), ); - Ok(Box::new(stream)) + Ok(Box::new(ShadowSocksStream(stream))) } async fn connect_datagram( diff --git a/clash_lib/src/proxy/shadowsocks/stream.rs b/clash_lib/src/proxy/shadowsocks/stream.rs new file mode 100644 index 000000000..91f81f3ca --- /dev/null +++ b/clash_lib/src/proxy/shadowsocks/stream.rs @@ -0,0 +1,47 @@ +use std::{fmt::Debug, pin::Pin}; + +use shadowsocks::ProxyClientStream; +use tokio::io::{AsyncRead, AsyncWrite}; + +use crate::proxy::AnyStream; + +pub struct ShadowSocksStream(pub ProxyClientStream); +impl Debug for ShadowSocksStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("ShadowSocksStream").finish() + } +} + +impl AsyncRead for ShadowSocksStream { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().0).poll_read(cx, buf) + } +} + +impl AsyncWrite for ShadowSocksStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().0).poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().0).poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.get_mut().0).poll_shutdown(cx) + } +} diff --git a/clash_lib/src/proxy/transport/grpc.rs b/clash_lib/src/proxy/transport/grpc.rs index 54f37d85b..d7c9eca69 100644 --- a/clash_lib/src/proxy/transport/grpc.rs +++ b/clash_lib/src/proxy/transport/grpc.rs @@ -10,6 +10,7 @@ use prost::encoding::decode_varint; use prost::encoding::encode_varint; use tracing::log; +use std::fmt::Debug; use std::future::Future; use std::io; use std::io::{Error, ErrorKind}; @@ -66,6 +67,18 @@ pub struct GrpcStream { payload_len: u64, } +impl Debug for GrpcStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GrpcStream") + .field("resp_fut", &self.resp_fut) + .field("recv", &self.recv) + .field("send", &self.send) + .field("buffer", &self.buffer) + .field("payload_len", &self.payload_len) + .finish() + } +} + impl GrpcStream { pub fn new(resp_fut: h2::client::ResponseFuture, send: SendStream) -> Self { Self { diff --git a/clash_lib/src/proxy/transport/h2.rs b/clash_lib/src/proxy/transport/h2.rs index 0a7288ca9..753299a8b 100644 --- a/clash_lib/src/proxy/transport/h2.rs +++ b/clash_lib/src/proxy/transport/h2.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, fmt::Debug}; use bytes::{Bytes, BytesMut}; use futures::ready; @@ -64,6 +64,16 @@ pub struct Http2Stream { buffer: BytesMut, } +impl Debug for Http2Stream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Http2Stream") + .field("recv", &self.recv) + .field("send", &self.send) + .field("buffer", &self.buffer) + .finish() + } +} + impl Http2Stream { pub fn new(recv: RecvStream, send: SendStream) -> Self { Self { diff --git a/clash_lib/src/proxy/transport/websocket/websocket.rs b/clash_lib/src/proxy/transport/websocket/websocket.rs index 987e01cf0..eedc4ace0 100644 --- a/clash_lib/src/proxy/transport/websocket/websocket.rs +++ b/clash_lib/src/proxy/transport/websocket/websocket.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use std::{fmt::Debug, pin::Pin}; use bytes::{Buf, Bytes}; use futures::{ready, Sink, Stream}; @@ -15,6 +15,15 @@ pub struct WebsocketConn { read_buffer: Option, } +impl Debug for WebsocketConn { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WebsocketConn") + .field("inner", &self.inner) + .field("read_buffer", &self.read_buffer) + .finish() + } +} + impl WebsocketConn { pub fn from_websocket(stream: WebSocketStream) -> Self { Self { diff --git a/clash_lib/src/proxy/transport/websocket/websocket_early_data.rs b/clash_lib/src/proxy/transport/websocket/websocket_early_data.rs index 71e31c397..ff469b0c3 100644 --- a/clash_lib/src/proxy/transport/websocket/websocket_early_data.rs +++ b/clash_lib/src/proxy/transport/websocket/websocket_early_data.rs @@ -1,5 +1,6 @@ use std::{ cmp, + fmt::Debug, pin::Pin, task::{Poll, Waker}, }; @@ -32,6 +33,21 @@ pub struct WebsocketEarlyDataConn { early_data_flushed: bool, } +impl Debug for WebsocketEarlyDataConn { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WebsocketEarlyDataConn") + .field("stream", &self.stream) + .field("req", &self.req) + .field("early_waker", &self.early_waker) + .field("flush_waker", &self.flush_waker) + .field("ws_config", &self.ws_config) + .field("early_data_header_name", &self.early_data_header_name) + .field("early_data_len", &self.early_data_len) + .field("early_data_flushed", &self.early_data_flushed) + .finish() + } +} + impl WebsocketEarlyDataConn { pub fn new( stream: AnyStream,