Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Obfs4 features #41

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion crates/obfs4/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ simple_asn1 = { version="0.6.1", optional=true}
tracing-subscriber = "0.3.18"
hex-literal = "0.4.1"
tor-basic-utils = "0.20.0"
rand_distr = "0.4.3"

# benches
# criterion = "0.5"

# # o5 pqc test
# pqc_kyber = {version="0.7.1", features=["kyber1024", "std"]}
# ml-kem = "0.1.0"
Expand Down
30 changes: 25 additions & 5 deletions crates/obfs4/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,42 @@ tokio::spawn(async move {
Server example using [ptrs](../ptrs)

```rs
use ptrs::{ServerBuilder, ServerTransport};
...
use ptrs::{ServerBuilder as _, ServerTransport as _};
use obfs4::Obfs4PT;

let mut builder = Obfs4PT::server_builder();
let server = if params.is_some() {
builder.options(&params.unwrap())?.build()
} else {
builder.build()
};

let listener = tokio::net::TcpListener::bind(listen_addrs).await?;
loop {
let (conn, _) = listener.accept()?;
let pt_conn = server.reveal(conn).await?;

// pt_conn wraps conn and is usable as an `AsyncRead + AsyncWrite` object.
tokio::spawn( async move{
// use the connection (e.g. to echo)
let (mut r, mut w) = tokio::io::split(pt_conn);
if let Err(e) = tokio::io::copy(&mut r, &mut w).await {
warn!("echo closed with error: {e}")
}
});
}

// TODO fill out example
```

### Loose Ends:

- [X] server / client compatibility test go-to-rust and rust-to-go.
- [x] double check the bit randomization and clearing for high two bits in the `dalek` representative
- [ ] length distribution things
- [ ] iat mode handling
- [ ] double check the bit randomization and clearing for high two bits in the `dalek` representative

## Performance

- comparison to golang
- comparison when kyber is enabled
- NaCl encryption library(s)

8 changes: 4 additions & 4 deletions crates/obfs4/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ impl Client {

/// On a failed handshake the client will read for the remainder of the
/// handshake timeout and then close the connection.
pub async fn wrap<'a, T>(self, mut stream: T) -> Result<Obfs4Stream<T>>
pub async fn wrap<'a, T>(self, mut stream: T) -> Result<Obfs4Stream>
where
T: AsyncRead + AsyncWrite + Unpin + 'a,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let session = sessions::new_client_session(self.station_pubkey, self.iat_mode);

Expand All @@ -156,9 +156,9 @@ impl Client {
pub async fn establish<'a, T, E>(
self,
mut stream_fut: Pin<ptrs::FutureResult<T, E>>,
) -> Result<Obfs4Stream<T>>
) -> Result<Obfs4Stream>
where
T: AsyncRead + AsyncWrite + Unpin + 'a,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
E: std::error::Error + Send + Sync + 'static,
{
let stream = stream_fut.await.map_err(|e| Error::Other(Box::new(e)))?;
Expand Down
49 changes: 49 additions & 0 deletions crates/obfs4/src/common/delay/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Sink Delays

Adding Structured Delays to rust sinks on event.


Example test using a sampled normal distribution for the delay after each
send (`start_send()` if not using `SinkExt`).

```rs
#[cfg(test)]
mod testing {
use super::*;
use futures::sink::{self, SinkExt};
use std::time::Instant;
use rand_distr::{Normal, Distribution};

#[tokio::test]
async fn delay_sink() {
let start = Instant::now();

let unfold = sink::unfold(0, |mut sum, i: i32| async move {
sum += i;
eprintln!("{} - {:?}", i, Instant::now().duration_since(start));
Ok::<_, futures::never::Never>(sum)
});
futures::pin_mut!(unfold);

// let mut delayed_unfold = DelayedSink::new(unfold, || Duration::from_secs(1));
let mut delayed_unfold = DelayedSink::new(unfold, delay_distribution);
delayed_unfold.send(5).await.unwrap();
delayed_unfold.send(4).await.unwrap();
delayed_unfold.send(3).await.unwrap();
}

fn delay_distribution() -> Duration {
let distr = Normal::new(500.0, 100.0).unwrap();
let dur_ms = distr.sample(&mut rand::thread_rng());
Duration::from_millis(dur_ms as u64)
}
}
```

---

But I wanna go fast! Why would I ever want this???

-> This lets us control the delays (or leave them out) in between sink events.
As an example, we can control the delay between network writes, which helps when
reshaping the traffic fingerprint of a proxy connection.
107 changes: 107 additions & 0 deletions crates/obfs4/src/common/delay/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use futures::{sink::Sink, Future};
use tokio::time::{Instant, Sleep};

use pin_project::pin_project;

type DurationFn = fn() -> Duration;

#[pin_project]
pub struct DelayedSink<Si, Item, E> {
// #[pin]
// sink: Si,
// #[pin]
// sleep: Sleep,
sink: Pin<Box<Si>>,
sleep: Pin<Box<Sleep>>,
delay_fn: DurationFn,
_item: PhantomData<Item>,
_error: PhantomData<E>,
}

impl<Item, E, Si: Sink<Item, Error = E>> DelayedSink<Si, Item, E> {
pub fn new(sink: Si, delay_fn: DurationFn) -> Self {
let delay = delay_fn();
let sleep = tokio::time::sleep(delay);
Self {
// sink,
// sleep,
sink: Box::pin(sink),
sleep: Box::pin(sleep),
delay_fn,
_item: PhantomData {},
_error: PhantomData {},
}
}
}

impl<I, J, E, Si: Sink<I, Error = E>> Sink<J> for DelayedSink<Si, I, E>
where
J: Into<I>,
{
type Error = Si::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let s = self.project();
match (s.sink.as_mut().poll_ready(cx), s.sleep.as_mut().poll(cx)) {
(Poll::Ready(k), Poll::Ready(_)) => Poll::Ready(k),
_ => Poll::Pending,
}
}

fn start_send(self: Pin<&mut Self>, item: J) -> Result<(), Self::Error> {
let s = self.project();
s.sink.as_mut().start_send(item.into())?;

let delay = (*s.delay_fn)();

if delay.is_zero() {
s.sleep.as_mut().reset(Instant::now() + delay);
}
Ok(())
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().sink.as_mut().poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().sink.as_mut().poll_close(cx)
}
}

#[cfg(test)]
mod testing {
use super::*;
use futures::sink::{self, SinkExt};
use rand_distr::{Distribution, Normal};
use std::time::Instant;

#[tokio::test]
async fn delay_sink() {
let start = Instant::now();

let unfold = sink::unfold(0, |mut sum, i: i32| async move {
sum += i;
eprintln!("{} - {:?}", i, Instant::now().duration_since(start));
Ok::<_, futures::never::Never>(sum)
});
futures::pin_mut!(unfold);

// let mut delayed_unfold = DelayedSink::new(unfold, || Duration::from_secs(1));
let mut delayed_unfold = DelayedSink::new(unfold, delay_distribution);
delayed_unfold.send(5).await.unwrap();
delayed_unfold.send(4).await.unwrap();
delayed_unfold.send(3).await.unwrap();
}

fn delay_distribution() -> Duration {
let distr = Normal::new(500.0, 100.0).unwrap();
let dur_ms = distr.sample(&mut rand::thread_rng());
Duration::from_millis(dur_ms as u64)
}
}
3 changes: 2 additions & 1 deletion crates/obfs4/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ pub(crate) mod kdf;
mod skip;
pub use skip::discard;

pub(crate) mod delay;
pub mod drbg;
// pub mod ntor;

pub mod ntor_arti;
pub mod probdist;
pub mod replay_filter;
Expand Down
Loading
Loading