Skip to content

Commit

Permalink
fix(worker): register tun queue with mio
Browse files Browse the repository at this point in the history
  • Loading branch information
max-niederman committed Apr 4, 2024
1 parent c5261ad commit be1494c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
4 changes: 2 additions & 2 deletions packages/centipede/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ fn main() -> Result<()> {
.queue_nonblocking(0)
.into_diagnostic()
.wrap_err("failed to get TUN queue 0 (for the special first worker)")?,
);
)?;

s.spawn(move || {
let mut events = mio::Events::with_capacity(1024);
Expand Down Expand Up @@ -158,7 +158,7 @@ fn main() -> Result<()> {
.queue_nonblocking(0)
.into_diagnostic()
.wrap_err_with(|| format!("failed to get TUN queue {}", i))?,
);
)?;
s.spawn(move || {
let mut events = mio::Events::with_capacity(1024);
loop {
Expand Down
26 changes: 22 additions & 4 deletions packages/centipede_worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,22 @@ impl<'r> Worker<'r> {
dyn FnMut(SocketAddr, ControlMessage<Vec<u8>, auth::Unknown>) + Send + 'r,
>,
tun_queue: hypertube::Queue<'r, false>,
) -> Self {
Self {
) -> Result<Self, Error> {
let poll = mio::Poll::new()?;

poll.registry().register(
&mut SourceFd(&tun_queue.as_raw_fd()),
TUN_TOKEN,
mio::Interest::READABLE,
)?;

Ok(Self {
router_handle,
control_message_sink,
tun_queue,
sockets: Sockets::new(),
poll: mio::Poll::new().unwrap(),
}
poll,
})
}

/// Send a control message using the worker's set of sockets.
Expand Down Expand Up @@ -112,16 +120,22 @@ impl<'r> Worker<'r> {
.unwrap();
}

// FIXME: deregister closed sockets

Ok(())
}

/// Handle a readable event on the TUN device.
fn handle_tun_readable(&mut self) -> Result<(), Error> {
log::trace!("tun readable");

// TODO: optimize this
let mut read_buf = [0; PACKET_BUFFER_SIZE];
let mut write_buf = Vec::new();

while let Poll::Ready(n) = self.tun_queue.read(&mut read_buf).map_err(Error::ReadTun)? {
log::trace!("tun read {} byte packet", n);

let buf = &mut read_buf[..n];

let mut obligations = self.router_handle.handle_outgoing(buf);
Expand All @@ -147,6 +161,8 @@ impl<'r> Worker<'r> {

/// Handle a readable event on a socket.
fn handle_socket_readable(&mut self, idx: usize) -> Result<(), Error> {
log::trace!("socket {} readable", idx);

let socket = self
.sockets
.resolve_index(idx)
Expand All @@ -157,6 +173,8 @@ impl<'r> Worker<'r> {
loop {
match socket.recv_from(&mut buf) {
Ok((n, from)) => {
log::trace!("socket {} read {} byte packet", idx, n);

// SAFETY: we just read `n` bytes into the buffer.
let msg = unsafe { MaybeUninit::slice_assume_init_mut(&mut buf[..n]) };

Expand Down

0 comments on commit be1494c

Please sign in to comment.