From 49efffd9d2ea9b0c0bafe98dbd3f74f28e811a51 Mon Sep 17 00:00:00 2001 From: spacemeowx2 Date: Sun, 6 Feb 2022 04:38:24 +0800 Subject: [PATCH 1/3] feat: add shadowsocks udp server --- protocol/ss/src/server.rs | 19 +++++-- protocol/ss/src/server/source.rs | 93 ++++++++++++++++++++++++++++++++ rabbit-digger | 2 +- src/api_server/filters.rs | 1 - 4 files changed, 110 insertions(+), 5 deletions(-) create mode 100644 protocol/ss/src/server/source.rs diff --git a/protocol/ss/src/server.rs b/protocol/ss/src/server.rs index be22178e..b43e9247 100644 --- a/protocol/ss/src/server.rs +++ b/protocol/ss/src/server.rs @@ -1,12 +1,15 @@ use std::net::SocketAddr; +use self::source::UdpSource; use super::wrapper::{Cipher, CryptoStream}; use rd_interface::{async_trait, prelude::*, Address, Arc, IServer, Net, Result, TcpStream}; -use rd_std::util::connect_tcp; +use rd_std::util::{connect_tcp, forward_udp}; use shadowsocks::{config::ServerType, context::Context, ServerConfig}; use socks5_protocol::Address as S5Addr; use tokio::select; +mod source; + #[rd_config] #[derive(Debug, Clone)] pub struct SSServerConfig { @@ -47,11 +50,21 @@ impl SSServer { } async fn serve_udp(&self) -> Result<()> { std::future::pending::<()>().await; - let _listener = self + let udp_listener = self .listen .udp_bind(&mut rd_interface::Context::new(), &self.cfg.bind) .await?; - // TODO: add udp server + + forward_udp( + UdpSource::new( + self.cfg.cipher.into(), + self.cfg.password.as_bytes().into(), + udp_listener, + ), + self.net.clone(), + ) + .await?; + Ok(()) } async fn serve_tcp(&self) -> Result<()> { diff --git a/protocol/ss/src/server/source.rs b/protocol/ss/src/server/source.rs new file mode 100644 index 00000000..f49eaa5b --- /dev/null +++ b/protocol/ss/src/server/source.rs @@ -0,0 +1,93 @@ +use std::{ + io, + pin::Pin, + task::{self, Poll}, +}; + +use bytes::BytesMut; +use futures::{ready, Sink, SinkExt, Stream, StreamExt}; +use rd_interface::UdpSocket; +use rd_std::util::forward_udp::{RawUdpSource, UdpPacket}; +use shadowsocks::crypto::v1::CipherKind; +use socks5_protocol::Address as S5Addr; + +use crate::udp::{decrypt_payload, encrypt_payload}; + +pub struct UdpSource { + udp: UdpSocket, + + method: CipherKind, + key: Box<[u8]>, +} + +impl UdpSource { + pub fn new(method: CipherKind, key: Box<[u8]>, udp: UdpSocket) -> UdpSource { + UdpSource { udp, method, key } + } +} + +impl Stream for UdpSource { + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + // TODO: support send_to domain + let packet = loop { + let (mut recv_buf, from) = match ready!(self.udp.poll_next_unpin(cx)) { + Some(r) => r?, + None => return Poll::Ready(None), + }; + let (n, addr) = decrypt_payload(self.method, &self.key, &mut recv_buf[..])?; + // drop the packet if it's sent to domain silently + let to = match addr { + S5Addr::Domain(_, _) => continue, + S5Addr::SocketAddr(s) => s, + }; + + break UdpPacket { + data: recv_buf.split_to(n).freeze(), + from, + to, + }; + }; + + Poll::Ready(Some(Ok(packet))) + } +} + +impl Sink for UdpSource { + type Error = io::Error; + + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll> { + self.udp.poll_ready_unpin(cx) + } + + fn start_send( + mut self: Pin<&mut Self>, + UdpPacket { from, to, data }: UdpPacket, + ) -> Result<(), Self::Error> { + let mut send_buf = BytesMut::new(); + + encrypt_payload(self.method, &self.key, &from.into(), &data, &mut send_buf)?; + + self.udp.start_send_unpin((send_buf.freeze(), to.into())) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll> { + self.udp.poll_flush_unpin(cx) + } + + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll> { + self.udp.poll_close_unpin(cx) + } +} + +impl RawUdpSource for UdpSource {} diff --git a/rabbit-digger b/rabbit-digger index fc5a66a3..dc19e2da 160000 --- a/rabbit-digger +++ b/rabbit-digger @@ -1 +1 @@ -Subproject commit fc5a66a308950e8acd47202fd24366e107a0911a +Subproject commit dc19e2da5eca4de4ad2a8ad4189987051d70c11f diff --git a/src/api_server/filters.rs b/src/api_server/filters.rs index becf6476..91202bfc 100644 --- a/src/api_server/filters.rs +++ b/src/api_server/filters.rs @@ -16,7 +16,6 @@ pub async fn api( ) -> Result + Clone> { let at = check_access_token(server.access_token); let prefix = warp::path!("api" / ..); - // TODO: read or write userdata by API let ctx = Ctx { rd: server.rabbit_digger.clone(), cfg_mgr: server.config_manager.clone(), From 75d18b4c034a27ce8f72d93135326513d64a91d4 Mon Sep 17 00:00:00 2001 From: spacemeowx2 Date: Sun, 6 Feb 2022 05:37:54 +0800 Subject: [PATCH 2/3] fix(ss): wrong key to udp --- protocol/ss/src/client.rs | 10 ++++----- protocol/ss/src/lib.rs | 2 ++ protocol/ss/src/server.rs | 33 ++++++++++++++-------------- protocol/ss/src/tests.rs | 46 +++++++++++++++++++++++++++++++++++++++ rabbit-digger | 2 +- 5 files changed, 71 insertions(+), 22 deletions(-) create mode 100644 protocol/ss/src/tests.rs diff --git a/protocol/ss/src/client.rs b/protocol/ss/src/client.rs index f125f2cd..9ba298d1 100644 --- a/protocol/ss/src/client.rs +++ b/protocol/ss/src/client.rs @@ -12,15 +12,15 @@ use shadowsocks::{ #[rd_config] #[derive(Debug, Clone)] pub struct SSNetConfig { - server: Address, - password: String, + pub(crate) server: Address, + pub(crate) password: String, #[serde(default)] - udp: bool, + pub(crate) udp: bool, - cipher: Cipher, + pub(crate) cipher: Cipher, #[serde(default)] - net: NetRef, + pub(crate) net: NetRef, } pub struct SSNet { diff --git a/protocol/ss/src/lib.rs b/protocol/ss/src/lib.rs index 13dda31f..3a4a80d6 100644 --- a/protocol/ss/src/lib.rs +++ b/protocol/ss/src/lib.rs @@ -7,6 +7,8 @@ use server::{SSServer, SSServerConfig}; mod client; mod server; +#[cfg(test)] +mod tests; mod udp; mod wrapper; diff --git a/protocol/ss/src/server.rs b/protocol/ss/src/server.rs index b43e9247..ae3c93a9 100644 --- a/protocol/ss/src/server.rs +++ b/protocol/ss/src/server.rs @@ -13,17 +13,18 @@ mod source; #[rd_config] #[derive(Debug, Clone)] pub struct SSServerConfig { - bind: Address, - password: String, + pub(crate) bind: Address, + pub(crate) password: String, #[serde(default)] - udp: bool, + pub(crate) udp: bool, - cipher: Cipher, + pub(crate) cipher: Cipher, } pub struct SSServer { + bind: Address, context: Arc, - cfg: Arc, + cfg: Arc, listen: Net, net: Net, } @@ -41,24 +42,27 @@ impl IServer for SSServer { impl SSServer { pub fn new(listen: Net, net: Net, cfg: SSServerConfig) -> SSServer { let context = Arc::new(Context::new(ServerType::Local)); + let svr_cfg = + ServerConfig::new(("example.com", 0), cfg.password.clone(), cfg.cipher.into()); + SSServer { + bind: cfg.bind, context, - cfg: Arc::new(cfg), + cfg: Arc::new(svr_cfg), listen, net, } } async fn serve_udp(&self) -> Result<()> { - std::future::pending::<()>().await; let udp_listener = self .listen - .udp_bind(&mut rd_interface::Context::new(), &self.cfg.bind) + .udp_bind(&mut rd_interface::Context::new(), &self.bind) .await?; forward_udp( UdpSource::new( - self.cfg.cipher.into(), - self.cfg.password.as_bytes().into(), + self.cfg.method(), + self.cfg.key().to_vec().into_boxed_slice(), udp_listener, ), self.net.clone(), @@ -70,7 +74,7 @@ impl SSServer { async fn serve_tcp(&self) -> Result<()> { let listener = self .listen - .tcp_bind(&mut rd_interface::Context::new(), &self.cfg.bind) + .tcp_bind(&mut rd_interface::Context::new(), &self.bind) .await?; loop { let (socket, addr) = listener.accept().await?; @@ -85,16 +89,13 @@ impl SSServer { } } async fn serve_connection( - cfg: Arc, + cfg: Arc, context: Arc, socket: TcpStream, net: Net, addr: SocketAddr, ) -> Result<()> { - let svr_cfg = - ServerConfig::new(("example.com", 0), cfg.password.clone(), cfg.cipher.into()); - let mut socket = - CryptoStream::from_stream(context, socket, cfg.cipher.into(), svr_cfg.key()); + let mut socket = CryptoStream::from_stream(context, socket, cfg.method(), cfg.key()); let target = S5Addr::read(&mut socket).await.map_err(|e| e.to_io_err())?; let ctx = &mut rd_interface::Context::from_socketaddr(addr); diff --git a/protocol/ss/src/tests.rs b/protocol/ss/src/tests.rs new file mode 100644 index 00000000..fcd521ee --- /dev/null +++ b/protocol/ss/src/tests.rs @@ -0,0 +1,46 @@ +use crate::wrapper::Cipher; + +use super::*; +use rd_interface::{config::NetRef, IServer, IntoAddress, IntoDyn}; +use rd_std::tests::{ + assert_echo, assert_echo_udp, get_registry, spawn_echo_server, spawn_echo_server_udp, TestNet, +}; +use std::time::Duration; +use tokio::time::sleep; + +#[test] +fn test_ss_smoke() { + let mut registry = get_registry(); + super::init(&mut registry).unwrap(); +} + +#[tokio::test] +async fn test_ss_server_client() { + let local = TestNet::new().into_dyn(); + spawn_echo_server(&local, "127.0.0.1:26666").await; + spawn_echo_server_udp(&local, "127.0.0.1:26666").await; + + let server_addr = "127.0.0.1:16666".into_address().unwrap(); + let server_cfg = server::SSServerConfig { + bind: server_addr.clone(), + password: "password".into(), + udp: true, + cipher: Cipher::AES_128_GCM, + }; + let server = server::SSServer::new(local.clone(), local.clone(), server_cfg); + tokio::spawn(async move { server.start().await }); + + sleep(Duration::from_secs(1)).await; + + let client_cfg = client::SSNetConfig { + server: server_addr, + password: "password".into(), + udp: true, + cipher: Cipher::AES_128_GCM, + net: NetRef::new_with_value("local".to_string(), local.clone()), + }; + let client = client::SSNet::new(client_cfg).into_dyn(); + + assert_echo(&client, "127.0.0.1:26666").await; + assert_echo_udp(&client, "127.0.0.1:26666").await; +} diff --git a/rabbit-digger b/rabbit-digger index dc19e2da..19343ff7 160000 --- a/rabbit-digger +++ b/rabbit-digger @@ -1 +1 @@ -Subproject commit dc19e2da5eca4de4ad2a8ad4189987051d70c11f +Subproject commit 19343ff7722db9945da21a26b5bd0fd6086460c4 From 120bd69bd8b5bba41d6e18cbd44fc80a4e37f97d Mon Sep 17 00:00:00 2001 From: spacemeowx2 Date: Sun, 6 Feb 2022 05:55:02 +0800 Subject: [PATCH 3/3] fix: dashmap v5.0.0 is yanked --- protocol/remote/Cargo.toml | 2 +- rabbit-digger | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/protocol/remote/Cargo.toml b/protocol/remote/Cargo.toml index df94f2b2..36fbdbd8 100644 --- a/protocol/remote/Cargo.toml +++ b/protocol/remote/Cargo.toml @@ -12,4 +12,4 @@ serde = "1.0" tracing = "0.1.26" tokio = { version = "1.5.0", features = ["rt"] } bincode = "1.3.3" -dashmap = "5.0.0" +dashmap = "4.0.2" diff --git a/rabbit-digger b/rabbit-digger index 19343ff7..2cfb00c7 160000 --- a/rabbit-digger +++ b/rabbit-digger @@ -1 +1 @@ -Subproject commit 19343ff7722db9945da21a26b5bd0fd6086460c4 +Subproject commit 2cfb00c7397416dd008484f52e622e41cec17b81