Skip to content

Commit

Permalink
refactor: extract worker loop into function shared between first and …
Browse files Browse the repository at this point in the history
…extra workers
  • Loading branch information
max-niederman committed Apr 22, 2024
1 parent 4601117 commit 99b04ce
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 68 deletions.
4 changes: 0 additions & 4 deletions packages/centipede/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ pub struct Centipede {
/// Addresses on which the daemon should listen for incoming packets.
pub recv_addrs: Vec<SocketAddr>,

/// of workers to spawn.
#[serde(default = "num_cpus::get")]
pub workers: usize,

/// List of peers.
pub peers: Vec<Peer>,
}
Expand Down
117 changes: 53 additions & 64 deletions packages/centipede/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ fn main() -> Result<()> {
thread::scope(|s| {
{
let shutdown = shutdown.clone();
let mut worker = Worker::new(
let worker = Worker::new(
router.worker(),
control_message_sink.clone(),
tun_dev
Expand All @@ -111,53 +111,12 @@ fn main() -> Result<()> {
)
.wrap_err("failed to create worker 0")?;

s.spawn(move || {
let mut events = mio::Events::with_capacity(1024);
loop {
if shutdown.load(Ordering::Relaxed) {
break;
}

if let Ok(outgoing) = rx_outgoing_control.try_recv() {
let res = worker.send_control_message::<Vec<u8>>(
outgoing.from,
outgoing.to,
outgoing.message,
);

if let Err(e) = res {
println!(
"{:?}",
Report::new(InWorkerThread {
inner: e,
thread_number: 0
})
);

log::info!("shutting down due to error");
shutdown.store(true, Ordering::Relaxed);
}
}

if let Err(e) = worker.wait_and_handle(&mut events) {
println!(
"{:?}",
Report::new(InWorkerThread {
inner: e,
thread_number: 0
})
);

log::info!("shutting down due to error");
shutdown.store(true, Ordering::Relaxed);
}
}
});
s.spawn(move || worker_loop(worker, 0, shutdown, Some(rx_outgoing_control)));
}

for i in 1..opt.workers {
let shutdown = shutdown.clone();
let mut worker = Worker::new(
let worker = Worker::new(
router.worker(),
control_message_sink.clone(),
tun_dev
Expand All @@ -166,26 +125,8 @@ fn main() -> Result<()> {
.wrap_err_with(|| format!("failed to get TUN queue {}", i))?,
)
.wrap_err_with(|| format!("failed to create worker {}", i))?;
s.spawn(move || {
let mut events = mio::Events::with_capacity(1024);
loop {
if shutdown.load(Ordering::Relaxed) {
break;
}
if let Err(e) = worker.wait_and_handle(&mut events) {
println!(
"{:?}",
Report::new(InWorkerThread {
inner: e,
thread_number: i
})
);

log::info!("shutting down due to error");
shutdown.store(true, Ordering::Relaxed);
}
}
});

s.spawn(move || worker_loop(worker, i, shutdown, None));
}

let router_configurator = router.configurator();
Expand Down Expand Up @@ -230,6 +171,54 @@ fn main() -> Result<()> {
})
}

fn worker_loop(
mut worker: Worker,
thread_number: usize,
shutdown: Arc<AtomicBool>,
rx_outgoing_control: Option<mpsc::Receiver<centipede_control::OutgoingMessage>>,
) {
let mut events = mio::Events::with_capacity(1024);
loop {
if shutdown.load(Ordering::Relaxed) {
break;
}

if let Some(Ok(outgoing)) = rx_outgoing_control.as_ref().map(mpsc::Receiver::try_recv) {
let res = worker.send_control_message::<Vec<u8>>(
outgoing.from,
outgoing.to,
outgoing.message,
);

if let Err(e) = res {
println!(
"{:?}",
Report::new(InWorkerThread {
inner: e,
thread_number
})
);

log::info!("shutting down due to error");
shutdown.store(true, Ordering::Relaxed);
}
}

if let Err(e) = worker.wait_and_handle(&mut events) {
println!(
"{:?}",
Report::new(InWorkerThread {
inner: e,
thread_number
})
);

log::info!("shutting down due to error");
shutdown.store(true, Ordering::Relaxed);
}
}
}

#[derive(Debug, Error, Diagnostic)]
#[error("worker thread {thread_number} failed")]
struct InWorkerThread<E: Error + 'static> {
Expand Down

0 comments on commit 99b04ce

Please sign in to comment.