Skip to content

Commit

Permalink
try_recv/recv/send (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
vnt-dev authored Dec 4, 2024
1 parent 04823b0 commit babdf47
Showing 1 changed file with 86 additions and 22 deletions.
108 changes: 86 additions & 22 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,28 +119,32 @@ impl Session {
}
}
}
//Wait on both the read handle and the shutdown handle so that we stop when requested
let handles = [self.get_read_wait_event()?.0, self.shutdown_event.0 .0];
let result = unsafe {
//SAFETY: We abide by the requirements of WaitForMultipleObjects, handles is a
//pointer to valid, aligned, stack memory
WaitForMultipleObjects(handles.len() as u32, &handles as _, FALSE, INFINITE)
};
const WAIT_OBJECT_1: WAIT_EVENT = WAIT_OBJECT_0 + 1;
match result {
WAIT_FAILED => return Err(util::get_last_error()?.into()),
WAIT_OBJECT_0 => {
//We have data!
continue;
}
WAIT_OBJECT_1 => {
//Shutdown event triggered
return Err(Error::ShuttingDown);
}
_ => {
//This should never happen
panic!("WaitForMultipleObjects returned unexpected value {:?}", result);
}
self.wait_read()?;
}
}

fn wait_read(&self) -> Result<(), Error> {
//Wait on both the read handle and the shutdown handle so that we stop when requested
let handles = [self.get_read_wait_event()?.0, self.shutdown_event.0 .0];
let result = unsafe {
//SAFETY: We abide by the requirements of WaitForMultipleObjects, handles is a
//pointer to valid, aligned, stack memory
WaitForMultipleObjects(handles.len() as u32, &handles as _, FALSE, INFINITE)
};
const WAIT_OBJECT_1: WAIT_EVENT = WAIT_OBJECT_0 + 1;
match result {
WAIT_FAILED => Err(util::get_last_error()?.into()),
WAIT_OBJECT_0 => {
//We have data!
Ok(())
}
WAIT_OBJECT_1 => {
//Shutdown event triggered
Err(Error::ShuttingDown)
}
_ => {
//This should never happen
panic!("WaitForMultipleObjects returned unexpected value {:?}", result);
}
}
}
Expand All @@ -152,6 +156,66 @@ impl Session {
}
}

impl Session {
pub fn try_recv(&self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut size = 0u32;

let wintun = &self.adapter.wintun;
let ptr = unsafe { wintun.WintunReceivePacket(self.session.0, &mut size as *mut u32) };

debug_assert!(size <= u16::MAX as u32);
if ptr.is_null() {
// Wintun returns ERROR_NO_MORE_ITEMS instead of blocking if packets are not available
return match unsafe { GetLastError() } {
ERROR_NO_MORE_ITEMS => Err(std::io::Error::from(std::io::ErrorKind::WouldBlock)),
e => Err(std::io::Error::from_raw_os_error(e as i32)),
};
}
let size = size as usize;
if size > buf.len() {
use std::io::{Error, ErrorKind::InvalidInput};
return Err(Error::new(InvalidInput, "destination buffer too small"));
}
unsafe { ptr::copy_nonoverlapping(ptr, buf.as_mut_ptr(), size) };
Ok(size)
}

/// Blocks until a packet is available, returning the next packet in the receive queue once this happens.
/// If the session is closed via [`Session::shutdown`] all threads currently blocking inside this function
/// will return Err(())
pub fn recv(&self, buf: &mut [u8]) -> std::io::Result<usize> {
loop {
// Try 5 times to receive without blocking so we don't have to issue a syscall to wait
// for the event if packets are being received at a rapid rate
for _ in 0..5 {
return match self.try_recv(buf) {
Ok(len) => Ok(len),
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
// Try again
continue;
}
Err(e)
}
};
}
self.wait_read()?;
}
}

pub fn send(&self, buf: &[u8]) -> std::io::Result<usize> {
let wintun = &self.adapter.wintun;
let size = buf.len();
let ptr = unsafe { wintun.WintunAllocateSendPacket(self.session.0, size as u32) };
if ptr.is_null() {
util::get_last_error()?;
}
unsafe { ptr::copy_nonoverlapping(buf.as_ptr(), ptr, size) };
unsafe { wintun.WintunSendPacket(self.session.0, ptr) };
Ok(buf.len())
}
}

impl Drop for Session {
fn drop(&mut self) {
if let Err(e) = self.shutdown() {
Expand Down

0 comments on commit babdf47

Please sign in to comment.