Skip to content

Commit

Permalink
Merge pull request #21 from rabbit-digger/feature-ss-udp
Browse files Browse the repository at this point in the history
Feature ss udp
  • Loading branch information
spacemeowx2 authored Feb 5, 2022
2 parents 4fe0121 + 120bd69 commit 0e75fbf
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 25 deletions.
2 changes: 1 addition & 1 deletion protocol/remote/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ serde = "1.0"
tracing = "0.1.26"
tokio = { version = "1.5.0", features = ["rt"] }
bincode = "1.3.3"
dashmap = "5.0.0"
dashmap = "4.0.2"
10 changes: 5 additions & 5 deletions protocol/ss/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use shadowsocks::{
#[rd_config]
#[derive(Debug, Clone)]
pub struct SSNetConfig {
server: Address,
password: String,
pub(crate) server: Address,
pub(crate) password: String,
#[serde(default)]
udp: bool,
pub(crate) udp: bool,

cipher: Cipher,
pub(crate) cipher: Cipher,

#[serde(default)]
net: NetRef,
pub(crate) net: NetRef,
}

pub struct SSNet {
Expand Down
2 changes: 2 additions & 0 deletions protocol/ss/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use server::{SSServer, SSServerConfig};

mod client;
mod server;
#[cfg(test)]
mod tests;
mod udp;
mod wrapper;

Expand Down
48 changes: 31 additions & 17 deletions protocol/ss/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
use std::net::SocketAddr;

use self::source::UdpSource;
use super::wrapper::{Cipher, CryptoStream};
use rd_interface::{async_trait, prelude::*, Address, Arc, IServer, Net, Result, TcpStream};
use rd_std::util::connect_tcp;
use rd_std::util::{connect_tcp, forward_udp};
use shadowsocks::{config::ServerType, context::Context, ServerConfig};
use socks5_protocol::Address as S5Addr;
use tokio::select;

mod source;

#[rd_config]
#[derive(Debug, Clone)]
pub struct SSServerConfig {
bind: Address,
password: String,
pub(crate) bind: Address,
pub(crate) password: String,
#[serde(default)]
udp: bool,
pub(crate) udp: bool,

cipher: Cipher,
pub(crate) cipher: Cipher,
}

pub struct SSServer {
bind: Address,
context: Arc<Context>,
cfg: Arc<SSServerConfig>,
cfg: Arc<ServerConfig>,
listen: Net,
net: Net,
}
Expand All @@ -38,26 +42,39 @@ impl IServer for SSServer {
impl SSServer {
pub fn new(listen: Net, net: Net, cfg: SSServerConfig) -> SSServer {
let context = Arc::new(Context::new(ServerType::Local));
let svr_cfg =
ServerConfig::new(("example.com", 0), cfg.password.clone(), cfg.cipher.into());

SSServer {
bind: cfg.bind,
context,
cfg: Arc::new(cfg),
cfg: Arc::new(svr_cfg),
listen,
net,
}
}
async fn serve_udp(&self) -> Result<()> {
std::future::pending::<()>().await;
let _listener = self
let udp_listener = self
.listen
.udp_bind(&mut rd_interface::Context::new(), &self.cfg.bind)
.udp_bind(&mut rd_interface::Context::new(), &self.bind)
.await?;
// TODO: add udp server

forward_udp(
UdpSource::new(
self.cfg.method(),
self.cfg.key().to_vec().into_boxed_slice(),
udp_listener,
),
self.net.clone(),
)
.await?;

Ok(())
}
async fn serve_tcp(&self) -> Result<()> {
let listener = self
.listen
.tcp_bind(&mut rd_interface::Context::new(), &self.cfg.bind)
.tcp_bind(&mut rd_interface::Context::new(), &self.bind)
.await?;
loop {
let (socket, addr) = listener.accept().await?;
Expand All @@ -72,16 +89,13 @@ impl SSServer {
}
}
async fn serve_connection(
cfg: Arc<SSServerConfig>,
cfg: Arc<ServerConfig>,
context: Arc<Context>,
socket: TcpStream,
net: Net,
addr: SocketAddr,
) -> Result<()> {
let svr_cfg =
ServerConfig::new(("example.com", 0), cfg.password.clone(), cfg.cipher.into());
let mut socket =
CryptoStream::from_stream(context, socket, cfg.cipher.into(), svr_cfg.key());
let mut socket = CryptoStream::from_stream(context, socket, cfg.method(), cfg.key());
let target = S5Addr::read(&mut socket).await.map_err(|e| e.to_io_err())?;

let ctx = &mut rd_interface::Context::from_socketaddr(addr);
Expand Down
93 changes: 93 additions & 0 deletions protocol/ss/src/server/source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::{
io,
pin::Pin,
task::{self, Poll},
};

use bytes::BytesMut;
use futures::{ready, Sink, SinkExt, Stream, StreamExt};
use rd_interface::UdpSocket;
use rd_std::util::forward_udp::{RawUdpSource, UdpPacket};
use shadowsocks::crypto::v1::CipherKind;
use socks5_protocol::Address as S5Addr;

use crate::udp::{decrypt_payload, encrypt_payload};

pub struct UdpSource {
udp: UdpSocket,

method: CipherKind,
key: Box<[u8]>,
}

impl UdpSource {
pub fn new(method: CipherKind, key: Box<[u8]>, udp: UdpSocket) -> UdpSource {
UdpSource { udp, method, key }
}
}

impl Stream for UdpSource {
type Item = io::Result<UdpPacket>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
// TODO: support send_to domain
let packet = loop {
let (mut recv_buf, from) = match ready!(self.udp.poll_next_unpin(cx)) {
Some(r) => r?,
None => return Poll::Ready(None),
};
let (n, addr) = decrypt_payload(self.method, &self.key, &mut recv_buf[..])?;
// drop the packet if it's sent to domain silently
let to = match addr {
S5Addr::Domain(_, _) => continue,
S5Addr::SocketAddr(s) => s,
};

break UdpPacket {
data: recv_buf.split_to(n).freeze(),
from,
to,
};
};

Poll::Ready(Some(Ok(packet)))
}
}

impl Sink<UdpPacket> for UdpSource {
type Error = io::Error;

fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.udp.poll_ready_unpin(cx)
}

fn start_send(
mut self: Pin<&mut Self>,
UdpPacket { from, to, data }: UdpPacket,
) -> Result<(), Self::Error> {
let mut send_buf = BytesMut::new();

encrypt_payload(self.method, &self.key, &from.into(), &data, &mut send_buf)?;

self.udp.start_send_unpin((send_buf.freeze(), to.into()))
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.udp.poll_flush_unpin(cx)
}

fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.udp.poll_close_unpin(cx)
}
}

impl RawUdpSource for UdpSource {}
46 changes: 46 additions & 0 deletions protocol/ss/src/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::wrapper::Cipher;

use super::*;
use rd_interface::{config::NetRef, IServer, IntoAddress, IntoDyn};
use rd_std::tests::{
assert_echo, assert_echo_udp, get_registry, spawn_echo_server, spawn_echo_server_udp, TestNet,
};
use std::time::Duration;
use tokio::time::sleep;

#[test]
fn test_ss_smoke() {
let mut registry = get_registry();
super::init(&mut registry).unwrap();
}

#[tokio::test]
async fn test_ss_server_client() {
let local = TestNet::new().into_dyn();
spawn_echo_server(&local, "127.0.0.1:26666").await;
spawn_echo_server_udp(&local, "127.0.0.1:26666").await;

let server_addr = "127.0.0.1:16666".into_address().unwrap();
let server_cfg = server::SSServerConfig {
bind: server_addr.clone(),
password: "password".into(),
udp: true,
cipher: Cipher::AES_128_GCM,
};
let server = server::SSServer::new(local.clone(), local.clone(), server_cfg);
tokio::spawn(async move { server.start().await });

sleep(Duration::from_secs(1)).await;

let client_cfg = client::SSNetConfig {
server: server_addr,
password: "password".into(),
udp: true,
cipher: Cipher::AES_128_GCM,
net: NetRef::new_with_value("local".to_string(), local.clone()),
};
let client = client::SSNet::new(client_cfg).into_dyn();

assert_echo(&client, "127.0.0.1:26666").await;
assert_echo_udp(&client, "127.0.0.1:26666").await;
}
1 change: 0 additions & 1 deletion src/api_server/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub async fn api(
) -> Result<impl Filter<Extract = impl warp::Reply, Error = Rejection> + Clone> {
let at = check_access_token(server.access_token);
let prefix = warp::path!("api" / ..);
// TODO: read or write userdata by API
let ctx = Ctx {
rd: server.rabbit_digger.clone(),
cfg_mgr: server.config_manager.clone(),
Expand Down

0 comments on commit 0e75fbf

Please sign in to comment.