Skip to content

Commit

Permalink
add selector impl
Browse files Browse the repository at this point in the history
  • Loading branch information
ibigbug committed Aug 8, 2023
1 parent 9564963 commit fc6499d
Show file tree
Hide file tree
Showing 18 changed files with 231 additions and 103 deletions.
38 changes: 8 additions & 30 deletions WORKSPACE
Original file line number Diff line number Diff line change
@@ -1,38 +1,16 @@
workspace(name = "clash-rs")

load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

http_archive(
name = "bazel_skylib",
sha256 = "66ffd9315665bfaafc96b52278f57c7e2dd09f5ede279ea6d39b2be471e7e3aa",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/bazel-skylib/releases/download/1.4.2/bazel-skylib-1.4.2.tar.gz",
"https://github.com/bazelbuild/bazel-skylib/releases/download/1.4.2/bazel-skylib-1.4.2.tar.gz",
],
)

load("@bazel_skylib//:workspace.bzl", "bazel_skylib_workspace")

bazel_skylib_workspace()

http_archive(
name = "rules_python",
sha256 = "84aec9e21cc56fbc7f1335035a71c850d1b9b5cc6ff497306f84cced9a769841",
strip_prefix = "rules_python-0.23.1",
url = "https://github.com/bazelbuild/rules_python/releases/download/0.23.1/rules_python-0.23.1.tar.gz",
)

load("@rules_python//python:repositories.bzl", "py_repositories")

py_repositories()
#load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

# To find additional information on this release or newer ones visit:
# https://github.com/bazelbuild/rules_rust/releases
#http_archive(
#name = "rules_rust",
#sha256 = "0c2ff9f58bbd6f2a4fc4fbea3a34e85fe848e7e4317357095551a18b2405a01c",
#urls = ["https://github.com/bazelbuild/rules_rust/releases/download/0.25.0/rules_rust-v0.25.0.tar.gz"],
#)

# http_archive(
# name = "rules_rust",
# sha256 = "9d04e658878d23f4b00163a72da3db03ddb451273eb347df7d7c50838d698f49",
# urls = ["https://github.com/bazelbuild/rules_rust/releases/download/0.26.0/rules_rust-v0.26.0.tar.gz"],
# )

load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository")

git_repository(
Expand Down
10 changes: 5 additions & 5 deletions clash_lib/src/app/outbound/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl OutboundManager {
Error::InvalidConfig(format!("invalid provider config: {}", x))
})?;

providers.push(Arc::new(provider));
providers.push(Arc::new(Mutex::new(provider)));
}

if let Some(provider_names) = &proto.use_provider {
Expand Down Expand Up @@ -181,13 +181,13 @@ impl OutboundManager {
let provider = ProxySetProvider::new(
name.clone(),
Duration::from_secs(http.interval),
Arc::new(Mutex::new(vehicle)),
Arc::new(vehicle),
hc,
proxy_manager.clone(),
)
.map_err(|x| Error::InvalidConfig(format!("invalid provider config: {}", x)))?;

provider_registry.insert(name, Arc::new(provider));
provider_registry.insert(name, Arc::new(Mutex::new(provider)));
}
OutboundProxyProvider::File(file) => {
let vehicle = file_vehicle::Vehicle::new(&file.path);
Expand All @@ -203,13 +203,13 @@ impl OutboundManager {
let provider = ProxySetProvider::new(
name.clone(),
Duration::from_secs(file.interval.unwrap_or_default()),
Arc::new(Mutex::new(vehicle)),
Arc::new(vehicle),
hc,
proxy_manager.clone(),
)
.map_err(|x| Error::InvalidConfig(format!("invalid provider config: {}", x)))?;

provider_registry.insert(name, Arc::new(provider));
provider_registry.insert(name, Arc::new(Mutex::new(provider)));
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions clash_lib/src/app/proxy_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl ProxyManager {
};

let result = tester.await;
self.report_alive(&name, result.is_ok());
self.report_alive(&name, result.is_ok()).await;
let ins = DelayHistory {
time: SystemTime::now(),
delay: result.as_ref().map(|x| x.0).unwrap_or(0),
Expand Down Expand Up @@ -192,7 +192,7 @@ mod tests {
};

#[tokio::test]
async fn test_proxy_manager() {
async fn test_proxy_manager_alive() {
let mut mock_resolver = MockClashResolver::new();
mock_resolver
.expect_resolve()
Expand Down Expand Up @@ -227,7 +227,7 @@ mod tests {
assert!(manager.last_delay(PROXY_DIRECT).await > 0);
assert!(manager.delay_history(PROXY_DIRECT).await.len() > 0);

manager.report_alive(PROXY_DIRECT, false);
manager.report_alive(PROXY_DIRECT, false).await;
assert!(!manager.alive(PROXY_DIRECT).await);

for _ in 0..10 {
Expand Down
33 changes: 15 additions & 18 deletions clash_lib/src/app/proxy_manager/providers/fether.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,15 @@ where
self.name.as_str()
}

pub async fn vehicle_type(&self) -> super::ProviderVehicleType {
self.vehicle.lock().await.typ()
pub fn vehicle_type(&self) -> super::ProviderVehicleType {
self.vehicle.typ()
}

pub async fn initial(&mut self) -> anyhow::Result<T> {
let mut is_local = false;
let mut immediately_update = false;

let vehicle_path = {
let l = self.vehicle.lock().await;
l.path().to_owned()
};
let vehicle_path = self.vehicle.path().to_owned();

let mut inner = self.inner.lock().await;

Expand All @@ -97,7 +94,7 @@ where
> self.interval;
content
}
Err(_) => self.vehicle.lock().await.read().await?,
Err(_) => self.vehicle.read().await?,
};

let proxies = match (self.parser.lock().await)(&content) {
Expand All @@ -106,19 +103,19 @@ where
if !is_local {
return Err(e);
}
let content = self.vehicle.lock().await.read().await?;
let content = self.vehicle.read().await?;
(self.parser.lock().await)(&content)?
}
};

if self.vehicle_type().await != ProviderVehicleType::File && !is_local {
let p = self.vehicle.lock().await.path().to_owned();
if self.vehicle_type() != ProviderVehicleType::File && !is_local {
let p = self.vehicle.path().to_owned();
let path = Path::new(p.as_str());
let prefix = path.parent().unwrap();
if !prefix.exists() {
fs::create_dir_all(prefix)?;
}
fs::write(self.vehicle.lock().await.path(), &content)?;
fs::write(self.vehicle.path(), &content)?;
}

inner.hash = utils::md5(&content)[..16]
Expand Down Expand Up @@ -149,7 +146,7 @@ where
parser: Arc<Mutex<P>>,
) -> anyhow::Result<(T, bool)> {
let mut this = inner.lock().await;
let content = vehicle.lock().await.read().await?;
let content = vehicle.read().await?;
let proxies = (parser.lock().await)(&content)?;

let now = SystemTime::now();
Expand All @@ -159,21 +156,21 @@ where

if hash == this.hash {
this.updated_at = now;
filetime::set_file_times(vehicle.lock().await.path(), now.into(), now.into())?;
filetime::set_file_times(vehicle.path(), now.into(), now.into())?;
return Ok((proxies, false));
}

let proxies = (parser.lock().await)(&content)?;

if vehicle.lock().await.typ() != ProviderVehicleType::File {
let p = vehicle.lock().await.path().to_owned();
if vehicle.typ() != ProviderVehicleType::File {
let p = vehicle.path().to_owned();
let path = Path::new(p.as_str());
let prefix = path.parent().unwrap();
if !prefix.exists() {
fs::create_dir_all(prefix)?;
}

fs::write(vehicle.lock().await.path(), &content)?;
fs::write(vehicle.path(), &content)?;
return Ok((proxies, false));
}

Expand Down Expand Up @@ -244,7 +241,7 @@ where
mod tests {
use std::{path::Path, sync::Arc, time::Duration};

use tokio::{sync::Mutex, time::sleep};
use tokio::time::sleep;

use crate::app::proxy_manager::providers::{MockProviderVehicle, ProviderVehicleType};

Expand Down Expand Up @@ -283,7 +280,7 @@ mod tests {
let mut f = Fetcher::new(
"test_fetcher".to_string(),
Duration::from_secs(1),
Arc::new(Mutex::new(mock_vehicle)),
Arc::new(mock_vehicle),
parser,
Some(updater),
);
Expand Down
9 changes: 4 additions & 5 deletions clash_lib/src/app/proxy_manager/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use async_trait::async_trait;
use std::fmt::{Display, Formatter};
use std::io;
use std::sync::Arc;
use tokio::sync::Mutex;

pub mod fether;
pub mod file_vehicle;
Expand Down Expand Up @@ -32,7 +31,7 @@ impl Display for ProviderVehicleType {
}
}

pub type ThreadSafeProviderVehicle = Arc<Mutex<dyn ProviderVehicle + Send + Sync>>;
pub type ThreadSafeProviderVehicle = Arc<dyn ProviderVehicle + Send + Sync>;

#[cfg_attr(test, automock)]
#[async_trait]
Expand All @@ -58,9 +57,9 @@ impl Display for ProviderType {
/// either Proxy or Rule provider
#[async_trait]
pub trait Provider {
async fn name(&self) -> &str;
async fn vehicle_type(&self) -> ProviderVehicleType;
async fn typ(&self) -> ProviderType;
fn name(&self) -> &str;
fn vehicle_type(&self) -> ProviderVehicleType;
fn typ(&self) -> ProviderType;
async fn initialize(&mut self) -> io::Result<()>;
async fn update(&self) -> io::Result<()>;
}
12 changes: 4 additions & 8 deletions clash_lib/src/app/proxy_manager/providers/plain_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;

use crate::{
app::proxy_manager::{healthcheck::HealthCheck, ProxyManager},
proxy::AnyOutboundHandler,
Error,
};
use crate::{app::proxy_manager::ProxyManager, proxy::AnyOutboundHandler, Error};

use super::{proxy_provider::ProxyProvider, Provider, ProviderType, ProviderVehicleType};

Expand Down Expand Up @@ -40,13 +36,13 @@ impl PlainProvider {

#[async_trait]
impl Provider for PlainProvider {
async fn name(&self) -> &str {
fn name(&self) -> &str {
&self.name
}
async fn vehicle_type(&self) -> ProviderVehicleType {
fn vehicle_type(&self) -> ProviderVehicleType {
ProviderVehicleType::Compatible
}
async fn typ(&self) -> ProviderType {
fn typ(&self) -> ProviderType {
ProviderType::Proxy
}
async fn initialize(&mut self) -> std::io::Result<()> {
Expand Down
3 changes: 2 additions & 1 deletion clash_lib/src/app/proxy_manager/providers/proxy_provider.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::Mutex;

use crate::proxy::AnyOutboundHandler;

use super::Provider;

pub type ThreadSafeProxyProvider = Arc<dyn ProxyProvider + Send + Sync>;
pub type ThreadSafeProxyProvider = Arc<Mutex<dyn ProxyProvider + Send + Sync>>;

#[async_trait]
pub trait ProxyProvider: Provider {
Expand Down
10 changes: 5 additions & 5 deletions clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ impl ProxySetProvider {

#[async_trait]
impl Provider for ProxySetProvider {
async fn name(&self) -> &str {
fn name(&self) -> &str {
self.fetcher.name()
}

async fn vehicle_type(&self) -> ProviderVehicleType {
self.fetcher.vehicle_type().await
fn vehicle_type(&self) -> ProviderVehicleType {
self.fetcher.vehicle_type()
}

async fn typ(&self) -> ProviderType {
fn typ(&self) -> ProviderType {
ProviderType::Proxy
}

Expand Down Expand Up @@ -202,7 +202,7 @@ proxies:
.expect_typ()
.return_const(ProviderVehicleType::File);

let vehicle = Arc::new(Mutex::new(mock_vehicle));
let vehicle = Arc::new(mock_vehicle);

let mock_resolver = MockClashResolver::new();

Expand Down
2 changes: 1 addition & 1 deletion clash_lib/src/config/internal/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ pub enum LoadBalanceStrategy {
RoundRobin,
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
#[derive(serde::Serialize, serde::Deserialize, Debug, Default)]
pub struct OutboundGroupSelect {
pub name: String,

Expand Down
2 changes: 2 additions & 0 deletions clash_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub enum Error {
DNSError(String),
#[error("crypto error: {0}")]
Crypto(String),
#[error("operation error: {0}")]
Operation(String),
}

pub type Runner = futures::future::BoxFuture<'static, ()>;
Expand Down
4 changes: 2 additions & 2 deletions clash_lib/src/proxy/direct/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ impl OutboundHandler for Handler {
OutboundProxy::ProxyServer(OutboundProxyProtocol::Direct)
}

fn remote_addr(&self) -> Option<SocksAddr> {
async fn remote_addr(&self) -> Option<SocksAddr> {
None
}

fn support_udp(&self) -> bool {
async fn support_udp(&self) -> bool {
true
}

Expand Down
5 changes: 3 additions & 2 deletions clash_lib/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod converters;

// proxy groups
pub mod relay;
pub mod selector;

mod transport;

Expand Down Expand Up @@ -113,10 +114,10 @@ pub trait OutboundHandler: Sync + Send + Unpin {
fn proto(&self) -> OutboundProxy;

/// The proxy remote address
fn remote_addr(&self) -> Option<SocksAddr>;
async fn remote_addr(&self) -> Option<SocksAddr>;

/// whether the outbound handler support UDP
fn support_udp(&self) -> bool;
async fn support_udp(&self) -> bool;

/// connect to remote target via TCP
async fn connect_stream(
Expand Down
4 changes: 2 additions & 2 deletions clash_lib/src/proxy/reject/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ impl OutboundHandler for Handler {
OutboundProxy::ProxyServer(OutboundProxyProtocol::Reject)
}

fn remote_addr(&self) -> Option<SocksAddr> {
async fn remote_addr(&self) -> Option<SocksAddr> {
None
}

fn support_udp(&self) -> bool {
async fn support_udp(&self) -> bool {
false
}

Expand Down
Loading

0 comments on commit fc6499d

Please sign in to comment.