diff --git a/clash/src/main.rs b/clash/src/main.rs index 44c9f70a..c23295ee 100644 --- a/clash/src/main.rs +++ b/clash/src/main.rs @@ -59,6 +59,7 @@ fn main() { } } } + std::env::set_var("RUST_BACKTRACE", "1"); match clash::start(clash::Options { config: clash::Config::File(file), cwd: cli.directory.map(|x| x.to_string_lossy().to_string()), diff --git a/clash_lib/src/config/internal/config.rs b/clash_lib/src/config/internal/config.rs index 5e159682..1b63e8bb 100644 --- a/clash_lib/src/config/internal/config.rs +++ b/clash_lib/src/config/internal/config.rs @@ -254,6 +254,12 @@ pub struct TunConfig { /// default: 198.18.0.0/16 pub network: Option, pub gateway: Option, + /// auto manage route + pub auto_route: Option, + /// fwmark for preveting loop + pub mark: Option, + /// ip rule table name + pub table: Option } #[derive(Clone, Default)] diff --git a/clash_lib/src/config/internal/proxy.rs b/clash_lib/src/config/internal/proxy.rs index 918e48cf..f0db2155 100644 --- a/clash_lib/src/config/internal/proxy.rs +++ b/clash_lib/src/config/internal/proxy.rs @@ -253,6 +253,8 @@ pub struct OutboundTuic { pub gc_lifetime: Option, pub send_window: Option, pub receive_window: Option, + /// fwmark + pub mark: Option, } #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] diff --git a/clash_lib/src/lib.rs b/clash_lib/src/lib.rs index 3910e878..8f4acf92 100644 --- a/clash_lib/src/lib.rs +++ b/clash_lib/src/lib.rs @@ -247,7 +247,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { let inbound_runner = inbound_manager.lock().await.get_runner()?; let inbound_listener_handle = tokio::spawn(inbound_runner); - let tun_runner = get_tun_runner(config.tun, dispatcher.clone(), dns_resolver.clone())?; + let tun_runner = get_tun_runner(config.tun, dispatcher.clone(), dns_resolver.clone()).await?; let tun_runner_handle = tun_runner.map(tokio::spawn); debug!("initializing dns listener"); @@ -419,7 +419,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { .map(tokio::spawn)?; let tun_runner_handle = - get_tun_runner(config.tun, dispatcher.clone(), dns_resolver.clone())? + get_tun_runner(config.tun, dispatcher.clone(), dns_resolver.clone()).await? .map(tokio::spawn); debug!("reloading dns listener"); diff --git a/clash_lib/src/proxy/converters/tuic.rs b/clash_lib/src/proxy/converters/tuic.rs index 5dd43c54..037334cd 100644 --- a/clash_lib/src/proxy/converters/tuic.rs +++ b/clash_lib/src/proxy/converters/tuic.rs @@ -1,4 +1,7 @@ -use std::time::Duration; +use std::{ + sync::{atomic::AtomicU32, Arc}, + time::Duration, +}; use quinn::VarInt; @@ -54,6 +57,7 @@ impl TryFrom<&OutboundTuic> for AnyOutboundHandler { send_window: s.send_window.unwrap_or(8 * 1024 * 1024 * 2), receive_window: VarInt::from_u64(s.receive_window.unwrap_or(8 * 1024 * 1024)) .unwrap_or(VarInt::MAX), + mark: Arc::new(AtomicU32::new(s.mark.unwrap_or(0))), }) } } diff --git a/clash_lib/src/proxy/direct/mod.rs b/clash_lib/src/proxy/direct/mod.rs index 8abf99ff..d33e2039 100644 --- a/clash_lib/src/proxy/direct/mod.rs +++ b/clash_lib/src/proxy/direct/mod.rs @@ -53,8 +53,7 @@ impl OutboundHandler for Handler { sess.destination.host().as_str(), sess.destination.port(), None, - #[cfg(any(target_os = "linux", target_os = "android"))] - None, + sess.packet_mark, ) .await?; @@ -66,8 +65,8 @@ impl OutboundHandler for Handler { async fn proxy_stream( &self, s: AnyStream, - #[allow(unused_variables)] sess: &Session, - #[allow(unused_variables)] _resolver: ThreadSafeDNSResolver, + _sess: &Session, + _resolver: ThreadSafeDNSResolver, ) -> std::io::Result { Ok(s) } @@ -77,14 +76,9 @@ impl OutboundHandler for Handler { sess: &Session, resolver: ThreadSafeDNSResolver, ) -> std::io::Result { - let d = new_udp_socket( - None, - sess.iface.as_ref(), - #[cfg(any(target_os = "linux", target_os = "android"))] - None, - ) - .await - .map(|x| OutboundDatagramImpl::new(x, resolver))?; + let d = new_udp_socket(None, sess.iface.as_ref(), sess.packet_mark) + .await + .map(|x| OutboundDatagramImpl::new(x, resolver))?; let d = ChainedDatagramWrapper::new(d); d.append_to_chain(self.name()).await; diff --git a/clash_lib/src/proxy/tuic/mod.rs b/clash_lib/src/proxy/tuic/mod.rs index 23327c85..15842fdf 100644 --- a/clash_lib/src/proxy/tuic/mod.rs +++ b/clash_lib/src/proxy/tuic/mod.rs @@ -8,6 +8,7 @@ use anyhow::Result; use axum::async_trait; use quinn::{EndpointConfig, TokioRuntime}; use std::net::SocketAddr; +use std::sync::atomic::AtomicU32; use std::{ net::{Ipv4Addr, Ipv6Addr, UdpSocket}, sync::{ @@ -68,6 +69,7 @@ pub struct HandlerOptions { pub gc_lifetime: Duration, pub send_window: u64, pub receive_window: VarInt, + pub mark: Arc, /// not used pub ip: Option, @@ -168,6 +170,7 @@ impl Handler { socket, Arc::new(TokioRuntime), )?; + endpoint.set_default_client_config(quinn_config); let endpoint = TuicEndpoint { ep: endpoint, @@ -179,6 +182,7 @@ impl Handler { heartbeat: opts.heartbeat_interval, gc_interval: opts.gc_interval, gc_lifetime: opts.gc_lifetime, + mark: opts.mark.clone(), }; Ok(Arc::new(Self { opts, @@ -187,17 +191,27 @@ impl Handler { next_assoc_id: AtomicU16::new(0), })) } - async fn get_conn(&self) -> Result> { + async fn get_conn( + &self, + resolver: &ThreadSafeDNSResolver, + mark: Option, + ) -> Result> { + let mark = mark.unwrap_or(self.opts.mark.load(Ordering::Relaxed)); + let mut rebind = false; + // if mark not match the one current used, then rebind + if mark != self.opts.mark.swap(mark, Ordering::Relaxed) { + rebind = true; + } let fut = async { let mut guard = self.conn.lock().await; if guard.is_none() { // init - *guard = Some(self.ep.connect().await?); + *guard = Some(self.ep.connect(resolver, rebind).await?); } let conn = guard.take().unwrap(); - let conn = if conn.check_open().is_err() { + let conn = if conn.check_open().is_err() || rebind { // reconnect - self.ep.connect().await? + self.ep.connect(resolver, rebind).await? } else { conn }; @@ -210,9 +224,9 @@ impl Handler { async fn do_connect_stream( &self, sess: &Session, - _resolver: ThreadSafeDNSResolver, + resolver: ThreadSafeDNSResolver, ) -> Result { - let conn = self.get_conn().await?; + let conn = self.get_conn(&resolver, sess.packet_mark).await?; let dest = sess.destination.clone().into_tuic(); let tuic_tcp = conn.connect_tcp(dest).await?.compat(); @@ -224,9 +238,9 @@ impl Handler { async fn do_connect_datagram( &self, sess: &Session, - _resolver: ThreadSafeDNSResolver, + resolver: ThreadSafeDNSResolver, ) -> Result { - let conn = self.get_conn().await?; + let conn = self.get_conn(&resolver, sess.packet_mark).await?; let assos_id = self.next_assoc_id.fetch_add(1, Ordering::Relaxed); let quic_udp = TuicDatagramOutbound::new(assos_id, conn, sess.source.into()); diff --git a/clash_lib/src/proxy/tuic/types.rs b/clash_lib/src/proxy/tuic/types.rs index 84d8a569..ab51bca9 100644 --- a/clash_lib/src/proxy/tuic/types.rs +++ b/clash_lib/src/proxy/tuic/types.rs @@ -1,9 +1,12 @@ +use crate::app::dns::ThreadSafeDNSResolver; +use crate::proxy::utils::StdSocketExt; use crate::session::SocksAddr as ClashSocksAddr; use anyhow::Result; use quinn::Connection as QuinnConnection; use quinn::{Endpoint as QuinnEndpoint, ZeroRttAccepted}; use register_count::Counter; use std::collections::HashMap; +use std::sync::atomic::Ordering; use std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, str::FromStr, @@ -26,63 +29,71 @@ pub struct TuicEndpoint { pub heartbeat: Duration, pub gc_interval: Duration, pub gc_lifetime: Duration, + pub mark: Arc, } impl TuicEndpoint { - pub async fn connect(&self) -> Result> { - let mut last_err = None; + pub async fn connect(&self, resolver: &ThreadSafeDNSResolver, rebind: bool) -> Result> { + let remote_addr = self.server.resolve(resolver).await?; + let connect_to = async { + let match_ipv4 = remote_addr.is_ipv4() + && self + .ep + .local_addr() + .map_or(false, |local_addr| local_addr.is_ipv4()); + let match_ipv6 = remote_addr.is_ipv6() + && self + .ep + .local_addr() + .map_or(false, |local_addr| local_addr.is_ipv6()); - for addr in self.server.resolve().await? { - let connect_to = async { - let match_ipv4 = - addr.is_ipv4() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv4()); - let match_ipv6 = - addr.is_ipv6() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv6()); - - if !match_ipv4 && !match_ipv6 { - let bind_addr = if addr.is_ipv4() { - SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)) - } else { - SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0)) - }; - - self.ep - .rebind(UdpSocket::bind(bind_addr).map_err(|err| { - anyhow!("failed to create endpoint UDP socket {}", err) - })?) - .map_err(|err| anyhow!("failed to rebind endpoint UDP socket {}", err))?; - } - - tracing::trace!("Connect to {} {}", addr, self.server.server_name()); - let conn = self.ep.connect(addr, self.server.server_name())?; - let (conn, zero_rtt_accepted) = if self.zero_rtt_handshake { - match conn.into_0rtt() { - Ok((conn, zero_rtt_accepted)) => (conn, Some(zero_rtt_accepted)), - Err(conn) => (conn.await?, None), - } + // if client and server don't match each other or forced to rebind, then rebind local socket + if (!match_ipv4 && !match_ipv6) || rebind { + let bind_addr = if remote_addr.is_ipv4() { + SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)) } else { - (conn.await?, None) + SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0)) }; + let socket = UdpSocket::bind(bind_addr) + .map_err(|err| anyhow!("failed to bind local socket: {}", err))?; + let mark = self.mark.load(Ordering::Relaxed); + // ignore mark == 0, just for convenient + if mark != 0 { + socket.set_mark(mark)?; + } + self.ep + .rebind(socket) + .map_err(|err| anyhow!("failed to rebind endpoint UDP socket {}", err))?; + } - Ok((conn, zero_rtt_accepted)) + tracing::trace!("Connect to {} {}", remote_addr, self.server.server_name()); + let conn = self.ep.connect(remote_addr, self.server.server_name())?; + let (conn, zero_rtt_accepted) = if self.zero_rtt_handshake { + match conn.into_0rtt() { + Ok((conn, zero_rtt_accepted)) => (conn, Some(zero_rtt_accepted)), + Err(conn) => (conn.await?, None), + } + } else { + (conn.await?, None) }; - match connect_to.await { - Ok((conn, zero_rtt_accepted)) => { - return Ok(TuicConnection::new( - conn, - zero_rtt_accepted, - self.udp_relay_mode, - self.uuid, - self.password.clone(), - self.heartbeat, - self.gc_interval, - self.gc_lifetime, - )); - } - Err(err) => last_err = Some(err), + Ok((conn, zero_rtt_accepted)) + }; + + match connect_to.await { + Ok((conn, zero_rtt_accepted)) => { + return Ok(TuicConnection::new( + conn, + zero_rtt_accepted, + self.udp_relay_mode, + self.uuid, + self.password.clone(), + self.heartbeat, + self.gc_interval, + self.gc_lifetime, + )); } + Err(err) => Err(err), } - Err(last_err.unwrap_or(anyhow!("dns resolve"))) } } @@ -194,15 +205,16 @@ impl ServerAddr { pub fn server_name(&self) -> &str { &self.domain } - // TODO change to clash dns? - pub async fn resolve(&self) -> Result> { + + pub async fn resolve(&self, resolver: &ThreadSafeDNSResolver) -> Result { if let Some(ip) = self.ip { - Ok(vec![SocketAddr::from((ip, self.port))].into_iter()) + Ok(SocketAddr::from((ip, self.port))) } else { - Ok(tokio::net::lookup_host((self.domain.as_str(), self.port)) + let ip = resolver + .resolve(self.domain.as_str(), false) .await? - .collect::>() - .into_iter()) + .ok_or(anyhow!("Resolve failed: unknown hostname"))?; + Ok(SocketAddr::from((ip, self.port))) } } } diff --git a/clash_lib/src/proxy/tun/auto_route.rs b/clash_lib/src/proxy/tun/auto_route.rs new file mode 100644 index 00000000..e979ac86 --- /dev/null +++ b/clash_lib/src/proxy/tun/auto_route.rs @@ -0,0 +1,20 @@ +use crate::config::internal::config::TunConfig; + +#[cfg(target_os = "linux")] +pub async fn setup(cfg: &mut TunConfig, tun_name: &str) -> anyhow::Result<()>{ + if !cfg.auto_route.unwrap_or(false) { + return Ok(()); + } + let mark = cfg.mark.unwrap_or(6969); + cfg.mark = Some(mark); + let table = cfg.table.take().unwrap_or("2233".into()); + cfg.table = Some(table); + // TODO + Ok(()) +} + +#[cfg(not(target_os = "linux"))] +pub fn setup(cfg: &mut TunConfig, tun_name: &str) -> anyhow::Result<()>{ + tracing::error!("Auto route not impl!"); + Ok(()) +} \ No newline at end of file diff --git a/clash_lib/src/proxy/tun/inbound.rs b/clash_lib/src/proxy/tun/inbound.rs index 7e1a3339..e8454565 100644 --- a/clash_lib/src/proxy/tun/inbound.rs +++ b/clash_lib/src/proxy/tun/inbound.rs @@ -10,7 +10,7 @@ use crate::{ app::{dispatcher::Dispatcher, dns::ThreadSafeDNSResolver}, common::errors::map_io_error, config::internal::config::TunConfig, - proxy::datagram::UdpPacket, + proxy::{datagram::UdpPacket, tun::auto_route}, session::{Network, Session, SocksAddr, Type}, Error, Runner, }; @@ -20,12 +20,14 @@ async fn handle_inbound_stream( local_addr: SocketAddr, remote_addr: SocketAddr, dispatcher: Arc, + packet_mark: Option, ) { let sess = Session { network: Network::Tcp, typ: Type::Tun, source: local_addr, destination: remote_addr.into(), + packet_mark, ..Default::default() }; @@ -36,6 +38,7 @@ async fn handle_inbound_datagram( socket: Box, dispatcher: Arc, resolver: ThreadSafeDNSResolver, + packet_mark: Option, ) { let local_addr = socket.local_addr(); // tun i/o @@ -56,6 +59,7 @@ async fn handle_inbound_datagram( let sess = Session { network: Network::Udp, typ: Type::Tun, + packet_mark, ..Default::default() }; @@ -117,8 +121,8 @@ async fn handle_inbound_datagram( let _ = futures::future::join(fut1, fut2).await; } -pub fn get_runner( - cfg: TunConfig, +pub async fn get_runner( + mut cfg: TunConfig, dispatcher: Arc, resolver: ThreadSafeDNSResolver, ) -> Result, Error> { @@ -127,10 +131,10 @@ pub fn get_runner( return Ok(None); } - let device_id = cfg.device_id; + let device_id = &cfg.device_id; let u = - Url::parse(&device_id).map_err(|x| Error::InvalidConfig(format!("tun device {}", x)))?; + Url::parse(device_id).map_err(|x| Error::InvalidConfig(format!("tun device {}", x)))?; let mut tun_cfg = tun::Configuration::default(); @@ -163,6 +167,11 @@ pub fn get_runner( let tun_name = tun.get_ref().name().map_err(map_io_error)?; info!("tun started at {}", tun_name); + // Configuare auto-route when tun is ready + if cfg.auto_route.unwrap_or(false) { + auto_route::setup(&mut cfg, &tun_name).await.map_err(|e| Error::Operation(e.to_string()))?; + } + let (stack, mut tcp_listener, udp_socket) = netstack::NetStack::with_buffer_size(512, 256).map_err(map_io_error)?; @@ -215,6 +224,14 @@ pub fn get_runner( })); let dsp = dispatcher.clone(); + + let mark = if cfg.auto_route.unwrap_or(false) + && cfg!(any(target_os = "android", target_os = "linux")) + { + cfg.mark + } else { + None + }; futs.push(Box::pin(async move { while let Some((stream, local_addr, remote_addr)) = tcp_listener.next().await { tokio::spawn(handle_inbound_stream( @@ -222,6 +239,7 @@ pub fn get_runner( local_addr, remote_addr, dsp.clone(), + mark, )); } @@ -229,7 +247,7 @@ pub fn get_runner( })); futs.push(Box::pin(async move { - handle_inbound_datagram(udp_socket, dispatcher, resolver).await; + handle_inbound_datagram(udp_socket, dispatcher, resolver, mark).await; Err(Error::Operation("tun stopped unexpectedly 3".to_string())) })); diff --git a/clash_lib/src/proxy/tun/mod.rs b/clash_lib/src/proxy/tun/mod.rs index cde10543..0e53d8fa 100644 --- a/clash_lib/src/proxy/tun/mod.rs +++ b/clash_lib/src/proxy/tun/mod.rs @@ -1,4 +1,5 @@ pub mod inbound; +pub mod auto_route; pub use netstack_lwip as netstack; mod datagram; pub use inbound::get_runner as get_tun_runner; diff --git a/clash_lib/src/proxy/utils/socket_helpers.rs b/clash_lib/src/proxy/utils/socket_helpers.rs index fe9ef4b8..141b7ced 100644 --- a/clash_lib/src/proxy/utils/socket_helpers.rs +++ b/clash_lib/src/proxy/utils/socket_helpers.rs @@ -72,7 +72,7 @@ pub async fn new_tcp_stream<'a>( address: &'a str, port: u16, iface: Option<&'a Interface>, - #[cfg(any(target_os = "linux", target_os = "android"))] packet_mark: Option, + packet_mark: Option, ) -> io::Result { let dial_addr = resolver .resolve(address, false) @@ -104,6 +104,8 @@ pub async fn new_tcp_stream<'a>( #[cfg(any(target_os = "linux", target_os = "android"))] if let Some(packet_mark) = packet_mark { + // TODO: on android, setting the fwmark need root or CAP_NET_ADMIN. + // We should allow users disable it in some way eg. configuration. socket.set_mark(packet_mark)?; } @@ -123,7 +125,7 @@ pub async fn new_tcp_stream<'a>( pub async fn new_udp_socket( src: Option<&SocketAddr>, iface: Option<&Interface>, - #[cfg(any(target_os = "linux", target_os = "android"))] packet_mark: Option, + packet_mark: Option, ) -> io::Result { let socket = match src { Some(src) => { @@ -154,6 +156,27 @@ pub async fn new_udp_socket( UdpSocket::from_std(socket.into()) } +/// An extension to std::net::{UdpSocket, TcpStream} +pub trait StdSocketExt { + fn set_mark(&self, mark: u32) -> io::Result<()>; +} +impl StdSocketExt for std::net::UdpSocket { + fn set_mark(&self, mark: u32) -> io::Result<()> { + set_mark(socket2::SockRef::from(self), mark) + } +} +impl StdSocketExt for std::net::TcpStream { + fn set_mark(&self, mark: u32) -> io::Result<()> { + set_mark(socket2::SockRef::from(self), mark) + } +} + +fn set_mark<'a>(socket: socket2::SockRef<'a>, mark: u32) -> io::Result<()> { + #[cfg(any(target_os = "android", target_os = "linux"))] + return socket.set_mark(mark); + #[cfg(not(any(target_os = "android", target_os = "linux")))] + return Ok(()); +} #[cfg(test)] mod tests {