Skip to content

Commit

Permalink
Refactor udp interface (#19)
Browse files Browse the repository at this point in the history
* fix: ss udp

* feat: impl raw udp

* refactor: add raw udp

* build: use shadowsocks v1.11.1

* refactor(ss): remove context from udp

* refactor(ss): remove oncecell on context

* fix: tproxy examples

* build: update rabbit-digger dep
  • Loading branch information
spacemeowx2 authored Jan 10, 2022
1 parent 4ffd73e commit 665c34f
Show file tree
Hide file tree
Showing 13 changed files with 434 additions and 337 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ Cargo.lock
.DS_Store
/*.yaml
/generated
*.rd.yml
13 changes: 9 additions & 4 deletions examples/tproxy/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ RD_DISABLE_IPV6="${RD_DISABLE_IPV6:=0}"
RD_ENABLE_SELF="${RD_ENABLE_SELF:=0}"
RD_EXCLUDE_IP="$RD_EXCLUDE_IP"
RD_EXCLUDE_MAC="$RD_EXCLUDE_MAC"
RD_HIJACK_DNS="${RD_HIJACK_DNS:=1}"

if [ "$(id -u)" != "0" ]; then
echo "This script must be run as root" 1>&2
Expand All @@ -27,7 +28,7 @@ if [ "$RD_DISABLE_IPV6" != "1" ]; then
ip -6 rule add fwmark $RD_FW_MARK table $RD_TABLE

ip6tables -t mangle -N RD_OUTPUT
if [ "$RD_ENABLE_SELF" == "1" ]; then
if [ "$RD_ENABLE_SELF" = "1" ]; then
ip6tables -t mangle -A RD_OUTPUT -d ::1/128 -j RETURN
ip6tables -t mangle -A RD_OUTPUT -d fc00::/7 -j RETURN
ip6tables -t mangle -A RD_OUTPUT -d fe80::/10 -j RETURN
Expand All @@ -49,7 +50,9 @@ if [ "$RD_DISABLE_IPV6" != "1" ]; then
ip6tables -t mangle -A RD_PREROUTING -m mac --mac-source $i -j RETURN 2>/dev/null || true
done
ip6tables -t mangle -A RD_PREROUTING -m mark --mark $RD_MARK -j RETURN
ip6tables -t mangle -A RD_PREROUTING -p udp --dport 53 -j TPROXY --on-port $RD_PORT6 --tproxy-mark $RD_FW_MARK
if [ "$RD_HIJACK_DNS" = "1" ]; then
ip6tables -t mangle -A RD_PREROUTING -p udp --dport 53 -j TPROXY --on-port $RD_PORT6 --tproxy-mark $RD_FW_MARK
fi
ip6tables -t mangle -A RD_PREROUTING -d ::1/128 -j RETURN
ip6tables -t mangle -A RD_PREROUTING -d fc00::/7 -j RETURN
ip6tables -t mangle -A RD_PREROUTING -d fe80::/10 -j RETURN
Expand All @@ -61,7 +64,7 @@ if [ "$RD_DISABLE_IPV6" != "1" ]; then
fi

iptables -t mangle -N RD_OUTPUT
if [ "$RD_ENABLE_SELF" == "1" ]; then
if [ "$RD_ENABLE_SELF" = "1" ]; then
iptables -t mangle -A RD_OUTPUT -d 0/8 -j RETURN
iptables -t mangle -A RD_OUTPUT -d 127/8 -j RETURN
iptables -t mangle -A RD_OUTPUT -d 10/8 -j RETURN
Expand All @@ -88,7 +91,9 @@ for i in $(echo $RD_EXCLUDE_MAC | tr "," "\n"); do
iptables -t mangle -A RD_PREROUTING -m mac --mac-source $i -j RETURN 2>/dev/null || true
done
iptables -t mangle -A RD_PREROUTING -m mark --mark $RD_MARK -j RETURN
iptables -t mangle -A RD_PREROUTING -p udp --dport 53 -j TPROXY --on-port $RD_PORT --tproxy-mark $RD_FW_MARK
if [ "$RD_HIJACK_DNS" = "1" ]; then
iptables -t mangle -A RD_PREROUTING -p udp --dport 53 -j TPROXY --on-port $RD_PORT --tproxy-mark $RD_FW_MARK
fi
iptables -t mangle -A RD_PREROUTING -d 0/8 -j RETURN
iptables -t mangle -A RD_PREROUTING -d 127/8 -j RETURN
iptables -t mangle -A RD_PREROUTING -d 10/8 -j RETURN
Expand Down
2 changes: 1 addition & 1 deletion protocol/raw/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl INet for RawNet {
_ctx: &mut Context,
addr: &Address,
) -> Result<rd_interface::UdpSocket> {
let udp = UdpSocketWrap(self.net.udp_bind(addr.to_socket_addr()?).await?);
let udp = UdpSocketWrap::new(self.net.udp_bind(addr.to_socket_addr()?).await?);
Ok(udp.into_dyn())
}
}
228 changes: 108 additions & 120 deletions protocol/raw/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,37 @@
use std::{
io,
net::{SocketAddr, SocketAddrV4},
pin::Pin,
str::FromStr,
time::Duration,
task,
};

use futures::{future::ready, StreamExt};
use lru_time_cache::LruCache;
use crate::{
device,
gateway::{GatewayInterface, MapTable},
};
use futures::{ready, Sink, Stream, StreamExt};
use rd_interface::{
async_trait, constant::UDP_BUFFER_SIZE, error::map_other, prelude::*, registry::ServerFactory,
Address, Context, Error, IServer, IntoAddress, Net, Result,
async_trait, error::map_other, prelude::*, registry::ServerFactory, Bytes, Context, Error,
IServer, IntoAddress, Net, Result,
};
use rd_std::util::{
connect_tcp,
forward_udp::{self, RawUdpSource},
};
use rd_std::util::connect_tcp;
use smoltcp::{
phy::{Checksum, ChecksumCapabilities, Medium},
wire::{
EthernetAddress, EthernetFrame, EthernetProtocol, IpCidr, IpProtocol, IpVersion,
Ipv4Address, Ipv4Packet, Ipv4Repr, UdpPacket, UdpRepr,
},
};
use tokio::{
select, spawn,
sync::mpsc::{unbounded_channel, UnboundedSender as Sender},
time::timeout,
};
use tokio::{select, spawn};
use tokio_smoltcp::{
device::{FutureDevice, Interface, Packet},
BufferSize, NetConfig, RawSocket, TcpListener,
};

use crate::{
device,
gateway::{GatewayInterface, MapTable},
};

#[rd_config]
#[derive(Clone, Copy)]
pub enum Layer {
Expand Down Expand Up @@ -145,7 +144,9 @@ impl RawServer {
};

let device = GatewayInterface::new(
dev.filter(move |p: &Packet| ready(filter_packet(p, ethernet_addr, ip_addr, layer))),
dev.filter(move |p: &Packet| {
std::future::ready(filter_packet(p, ethernet_addr, ip_addr, layer))
}),
lru_size,
SocketAddrV4::new(addr.into(), 20000),
layer,
Expand Down Expand Up @@ -216,61 +217,107 @@ impl RawServer {
}
}
async fn serve_udp(&self, raw: RawSocket) -> Result<()> {
let (send_raw, mut send_rx) = unbounded_channel::<(SocketAddr, SocketAddr, Vec<u8>)>();
let source = Source::new(raw);

let mut buf = [0u8; UDP_BUFFER_SIZE];
let mut nat = LruCache::<SocketAddr, UdpTunnel>::with_expiry_duration_and_capacity(
Duration::from_secs(30),
128,
);
let net = self.net.clone();

let recv = async {
loop {
let size = raw.recv(&mut buf).await?;
let (src, dst, payload) = match parse_udp(&buf[..size]) {
Ok(v) => v,
_ => break,
};

let udp = nat
.entry(src)
.or_insert_with(|| UdpTunnel::new(net.clone(), src, send_raw.clone()));
if let Err(e) = udp.send_to(payload, dst).await {
tracing::error!("Udp send_to {:?}", e);
nat.remove(&src);
}
}
forward_udp::forward_udp(source, self.net.clone()).await?;

Ok(()) as Result<()>
};
Ok(())
}
}

let send = async {
while let Some((src, dst, payload)) = send_rx.recv().await {
if let Some(ip_packet) = pack_udp(src, dst, &payload) {
if let Err(e) = raw.send(&ip_packet).await {
tracing::error!(
"Raw send error: {:?}, dropping udp size: {}",
e,
ip_packet.len()
);
}
} else {
tracing::debug!("Unsupported src/dst");
}
}
Ok(()) as Result<()>
struct Source {
raw: RawSocket,
recv_buf: Box<[u8]>,
send_buf: Option<Vec<u8>>,
}

impl Source {
pub fn new(raw: RawSocket) -> Source {
Source {
raw,
recv_buf: Box::new([0u8; 65536]),
send_buf: None,
}
}
}

impl Stream for Source {
type Item = io::Result<forward_udp::UdpPacket>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
let Source { raw, recv_buf, .. } = &mut *self;

let (from, to, data) = loop {
let size = ready!(raw.poll_recv(cx, recv_buf))?;

match parse_udp(&recv_buf[..size]) {
Ok(v) => break v,
_ => {}
};
};

select! {
r = send => r?,
r = recv => r?,
let data = Bytes::copy_from_slice(data);

Some(Ok(forward_udp::UdpPacket { from, to, data })).into()
}
}

impl Sink<forward_udp::UdpPacket> for Source {
type Error = io::Error;

fn poll_ready(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Result<(), Self::Error>> {
if self.send_buf.is_some() {
return self.poll_flush(cx);
}

Ok(()).into()
}

fn start_send(
mut self: Pin<&mut Self>,
forward_udp::UdpPacket { from, to, data }: forward_udp::UdpPacket,
) -> Result<(), Self::Error> {
if let Some(ip_packet) = pack_udp(from, to, &data) {
self.send_buf = Some(ip_packet);
} else {
tracing::debug!("Unsupported src/dst");
}
Ok(())
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Result<(), Self::Error>> {
let Source { raw, send_buf, .. } = &mut *self;

match send_buf {
Some(buf) => {
ready!(raw.poll_send(cx, buf))?;
*send_buf = None;
}
None => {}
}

Ok(()).into()
}

fn poll_close(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
}

impl RawUdpSource for Source {}

/// buf is a ip packet
fn parse_udp(buf: &[u8]) -> smoltcp::Result<(SocketAddr, SocketAddr, &[u8])> {
let ipv4 = Ipv4Packet::new_checked(buf)?;
Expand Down Expand Up @@ -354,62 +401,3 @@ impl ServerFactory for RawServer {
RawServer::new(net, config)
}
}

struct UdpTunnel {
tx: Sender<(SocketAddr, Vec<u8>)>,
}

impl UdpTunnel {
fn new(
net: Net,
src: SocketAddr,
send_raw: Sender<(SocketAddr, SocketAddr, Vec<u8>)>,
) -> UdpTunnel {
let (tx, mut rx) = unbounded_channel::<(SocketAddr, Vec<u8>)>();
tokio::spawn(async move {
let udp = timeout(
Duration::from_secs(5),
net.udp_bind(
&mut Context::from_socketaddr(src),
&Address::any_addr_port(&src),
),
)
.await
.map_err(map_other)??;

let send = async {
while let Some((addr, packet)) = rx.recv().await {
udp.send_to(&packet, addr.into()).await?;
}
Ok(())
};
let recv = async {
let mut buf = [0u8; UDP_BUFFER_SIZE];
loop {
let (size, addr) = udp.recv_from(&mut buf).await?;

if send_raw.send((addr, src, buf[..size].to_vec())).is_err() {
break;
}
}
tracing::trace!("send_raw return error");
Ok(())
};

let r: Result<()> = select! {
r = send => r,
r = recv => r,
};

r
});
UdpTunnel { tx }
}
/// return false if the send queue is full
async fn send_to(&self, buf: &[u8], addr: SocketAddr) -> Result<()> {
match self.tx.send((addr, buf.to_vec())) {
Ok(_) => Ok(()),
Err(_) => Err(Error::Other("Other side closed".into())),
}
}
}
Loading

0 comments on commit 665c34f

Please sign in to comment.