Skip to content

Commit

Permalink
try adding relay
Browse files Browse the repository at this point in the history
  • Loading branch information
ibigbug committed Jul 31, 2023
1 parent ec80c97 commit d2bbae3
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 70 deletions.
162 changes: 153 additions & 9 deletions clash_lib/src/app/outbound/manager.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
use anyhow::Result;
use http::Uri;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use tracing::debug;

use crate::config::internal::proxy::{PROXY_DIRECT, PROXY_REJECT};
use crate::proxy::{reject, shadowsocks, CommonOption};
use crate::app::proxy_manager::healthcheck::HealthCheck;
use crate::app::proxy_manager::providers::file_vehicle;
use crate::app::proxy_manager::providers::http_vehicle::{self, Vehicle};
use crate::app::proxy_manager::providers::plain_provider::PlainProvider;
use crate::app::proxy_manager::providers::proxy_provider::ThreadSafeProxyProvider;
use crate::app::proxy_manager::providers::proxy_set_provider::ProxySetProvider;
use crate::app::proxy_manager::ProxyManager;
use crate::config::internal::proxy::{OutboundProxyProvider, PROXY_DIRECT, PROXY_REJECT};
use crate::proxy::{reject, relay};
use crate::{
app::ThreadSafeDNSResolver,
config::internal::proxy::{OutboundGroupProtocol, OutboundProxyProtocol},
Expand All @@ -15,21 +24,42 @@ use crate::{

pub struct OutboundManager {
handlers: HashMap<String, AnyOutboundHandler>,
proxy_manager: Arc<Mutex<ProxyManager>>,
}

pub type ThreadSafeOutboundManager = Arc<RwLock<OutboundManager>>;

impl OutboundManager {
pub fn new(
pub async fn new(
outbounds: Vec<OutboundProxyProtocol>,
outbound_groups: Vec<OutboundGroupProtocol>,
dns_client: ThreadSafeDNSResolver,
proxy_providers: HashMap<String, OutboundProxyProvider>,
dns_resolver: ThreadSafeDNSResolver,
) -> Result<Self, Error> {
let mut handlers = HashMap::new();
let mut provider_registry = HashMap::new();
let proxy_manager = Arc::new(Mutex::new(ProxyManager::new()));

OutboundManager::load_handlers(outbounds, outbound_groups, dns_client, &mut handlers)?;
Self::load_proxy_providers(
proxy_providers,
proxy_manager.clone(),
dns_resolver.clone(),
&mut provider_registry,
)
.await?;

Ok(Self { handlers })
Self::load_handlers(
outbounds,
outbound_groups,
proxy_manager.clone(),
provider_registry,
&mut handlers,
)?;

Ok(Self {
handlers,
proxy_manager,
})
}

pub fn get(&self, name: &str) -> Option<AnyOutboundHandler> {
Expand All @@ -39,7 +69,8 @@ impl OutboundManager {
fn load_handlers(
outbounds: Vec<OutboundProxyProtocol>,
outbound_groups: Vec<OutboundGroupProtocol>,
_dns_client: ThreadSafeDNSResolver,
proxy_manager: Arc<Mutex<ProxyManager>>,
provider_registry: HashMap<String, ThreadSafeProxyProvider>,
handlers: &mut HashMap<String, AnyOutboundHandler>,
) -> Result<(), Error> {
for outbound in outbounds.iter() {
Expand All @@ -65,7 +96,58 @@ impl OutboundManager {
for outbound_group in outbound_groups.iter() {
match outbound_group {
OutboundGroupProtocol::Relay(proto) => {
handlers.insert(proto.name.clone(), proto.try_into()?);
let mut providers: Vec<ThreadSafeProxyProvider> = vec![];

if let Some(proxies) = &proto.proxies {
let proxies = proxies
.into_iter()
.map(|x| {
handlers
.get(x)
.expect(format!("proxy {} not found", x).as_str())
.clone()
})
.collect::<Vec<_>>();
let hc = HealthCheck::new(
proxies.clone(),
proto.url.clone(),
proto.interval,
true,
proxy_manager.clone(),
)
.map_err(|x| Error::InvalidConfig(format!("invalid hc config: {}", x)))?;
let provider = PlainProvider::new(
proto.name.clone(),
proxies,
hc,
proxy_manager.clone(),
)
.map_err(|x| {
Error::InvalidConfig(format!("invalid provider config: {}", x))
})?;

providers.push(Arc::new(provider));
}

if let Some(provider_names) = &proto.use_provider {
for provider_name in provider_names {
let provider = provider_registry
.get(provider_name)
.expect(format!("provider {} not found", provider_name).as_str())
.clone();
providers.push(provider);
}
}

let relay = relay::Handler::new(
relay::HandlerOptions {
name: proto.name.clone(),
..Default::default()
},
providers,
);

handlers.insert(proto.name.clone(), relay);
}
OutboundGroupProtocol::UrlTest(_proto) => todo!(),
OutboundGroupProtocol::Fallback(_proto) => todo!(),
Expand All @@ -75,4 +157,66 @@ impl OutboundManager {
}
Ok(())
}

async fn load_proxy_providers(
proxy_providers: HashMap<String, OutboundProxyProvider>,
proxy_manager: Arc<Mutex<ProxyManager>>,
resolver: ThreadSafeDNSResolver,
provider_registry: &mut HashMap<String, ThreadSafeProxyProvider>,
) -> Result<(), Error> {
for (name, provider) in proxy_providers.into_iter() {
match provider {
OutboundProxyProvider::Http(http) => {
let vehicle = http_vehicle::Vehicle::new(
http.url
.parse::<Uri>()
.expect(format!("invalid provider url: {}", http.url).as_str()),
http.path,
resolver.clone(),
);
let hc = HealthCheck::new(
vec![],
http.health_check.url,
http.health_check.interval,
true,
proxy_manager.clone(),
)
.map_err(|e| Error::InvalidConfig(format!("invalid hc config {}", e)))?;
let provider = ProxySetProvider::new(
name.clone(),
Duration::from_secs(http.interval),
Arc::new(Mutex::new(vehicle)),
hc,
proxy_manager.clone(),
)
.map_err(|x| Error::InvalidConfig(format!("invalid provider config: {}", x)))?;

provider_registry.insert(name, Arc::new(provider));
}
OutboundProxyProvider::File(file) => {
let vehicle = file_vehicle::Vehicle::new(&file.path);
let hc = HealthCheck::new(
vec![],
file.health_check.url,
file.health_check.interval,
true,
proxy_manager.clone(),
)
.map_err(|e| Error::InvalidConfig(format!("invalid hc config {}", e)))?;

let provider = ProxySetProvider::new(
name.clone(),
Duration::from_secs(file.interval.unwrap_or_default()),
Arc::new(Mutex::new(vehicle)),
hc,
proxy_manager.clone(),
)
.map_err(|x| Error::InvalidConfig(format!("invalid provider config: {}", x)))?;

provider_registry.insert(name, Arc::new(provider));
}
}
}
Ok(())
}
}
10 changes: 6 additions & 4 deletions clash_lib/src/app/proxy_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@ use std::collections::{HashMap, VecDeque};

use crate::proxy::AnyOutboundHandler;

mod healthcheck;
use self::providers::proxy_provider::ThreadSafeProxyProvider;

pub mod healthcheck;
pub mod providers;

type Latency = VecDeque<u64>;

/// ProxyManager is only the latency registry.
/// TODO: move all proxies here, too, maybe.
#[derive(Default)]
pub struct ProxyManager {
latency_map: HashMap<String, Latency>,
proxy_provider: HashMap<String, ThreadSafeProxyProvider>,
}

impl ProxyManager {
pub fn new() -> Self {
Self {
latency_map: HashMap::new(),
}
Self::default()
}

pub async fn check(&mut self, _proxy: &Vec<AnyOutboundHandler>) {
Expand Down
2 changes: 1 addition & 1 deletion clash_lib/src/app/proxy_manager/providers/http_vehicle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{ProviderVehicle, ProviderVehicleType};
use crate::app::ThreadSafeDNSResolver;
use crate::common::errors::map_io_error;
use crate::common::http::{new_http_client, HttpClient, LocalConnector};
use crate::common::http::{new_http_client, HttpClient};

use async_trait::async_trait;

Expand Down
8 changes: 4 additions & 4 deletions clash_lib/src/app/proxy_manager/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::sync::Arc;
use tokio::sync::Mutex;

pub mod fether;
mod file_vehicle;
mod http_vehicle;
mod plain_provider;
pub mod file_vehicle;
pub mod http_vehicle;
pub mod plain_provider;
pub mod proxy_provider;
mod proxy_set_provider;
pub mod proxy_set_provider;
pub mod rule_provider;

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{

use super::{proxy_provider::ProxyProvider, Provider, ProviderType, ProviderVehicleType};

struct PlainProvider {
pub struct PlainProvider {
name: String,
proxies: Vec<AnyOutboundHandler>,
healthcheck: HealthCheck,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct FileProviderInner {
proxies: Vec<AnyOutboundHandler>,
}

struct ProxySetProvider {
pub struct ProxySetProvider {
fetcher: Fetcher<
Box<dyn Fn(Vec<AnyOutboundHandler>) + Send + Sync + 'static>,
Box<dyn Fn(&[u8]) -> anyhow::Result<Vec<AnyOutboundHandler>> + Send + Sync + 'static>,
Expand Down
8 changes: 8 additions & 0 deletions clash_lib/src/config/internal/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ pub struct OutboundGroupRelay {
pub proxies: Option<Vec<String>>,
#[serde(rename = "use")]
pub use_provider: Option<Vec<String>>,

// hc
pub url: String,
#[serde(deserialize_with = "utils::deserialize_u64")]
pub interval: u64,
pub tolerance: Option<i32>,
pub lazy: Option<bool>,
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
Expand Down Expand Up @@ -271,6 +278,7 @@ pub struct OutboundFileProvider {
#[serde(skip)]
pub name: String,
pub path: String,
pub interval: Option<u64>,
pub health_check: HealthCheck,
}

Expand Down
42 changes: 23 additions & 19 deletions clash_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,25 +106,29 @@ async fn start_async(opts: Options) -> Result<(), Error> {

let default_dns_resolver = Arc::new(dns::Resolver::new(config.dns).await);

let outbound_manager = Arc::new(RwLock::new(OutboundManager::new(
config
.proxies
.into_values()
.filter_map(|x| match x {
OutboundProxy::ProxyServer(s) => Some(s),
_ => None,
})
.collect(),
config
.proxy_groups
.into_values()
.filter_map(|x| match x {
OutboundProxy::ProxyGroup(g) => Some(g),
_ => None,
})
.collect(),
default_dns_resolver.clone(),
)?));
let outbound_manager = Arc::new(RwLock::new(
OutboundManager::new(
config
.proxies
.into_values()
.filter_map(|x| match x {
OutboundProxy::ProxyServer(s) => Some(s),
_ => None,
})
.collect(),
config
.proxy_groups
.into_values()
.filter_map(|x| match x {
OutboundProxy::ProxyGroup(g) => Some(g),
_ => None,
})
.collect(),
config.proxy_providers,
default_dns_resolver.clone(),
)
.await?,
));

let router = Arc::new(RwLock::new(
Router::new(
Expand Down
1 change: 0 additions & 1 deletion clash_lib/src/proxy/converters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod relay;
pub mod shadowsocks;
29 changes: 0 additions & 29 deletions clash_lib/src/proxy/converters/relay.rs

This file was deleted.

2 changes: 1 addition & 1 deletion clash_lib/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub mod utils;
pub mod converters;

// proxy groups
mod relay;
pub mod relay;

mod transport;

Expand Down
Loading

0 comments on commit d2bbae3

Please sign in to comment.