From 10b83c17bc094f8d7fb344254f777d004554f4ec Mon Sep 17 00:00:00 2001 From: Clint Byrum Date: Fri, 31 May 2024 17:06:21 +0900 Subject: [PATCH] Make CAN_DO more robust We don't need to do anything in the loop except clone the original packet. Also reviewing more unwraps and converting to expects as we review the code underneath. --- rustygear/src/client.rs | 14 +++++----- rustygear/src/clientdata.rs | 54 ++++++++++++++++++++++++++++++------- 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/rustygear/src/client.rs b/rustygear/src/client.rs index 039fb58..cb4a107 100644 --- a/rustygear/src/client.rs +++ b/rustygear/src/client.rs @@ -461,7 +461,7 @@ impl Client { } reader_conns .lock() - .unwrap() + .expect("Threads should not panic while holding lock") .get_mut(offset) .and_then(|conn| Some(conn.set_active(false))); if let Err(e) = reader_ctx.send(offset).await { @@ -479,7 +479,7 @@ impl Client { error!("Connection ({}) dropped", offset); writer_conns .lock() - .unwrap() + .expect("Threads should not panic while holding lock") .get_mut(offset) .and_then(|conn| { Some(conn.set_active(false)) @@ -715,10 +715,13 @@ impl Client { { let (tx, mut rx) = channel(CLIENT_CHANNEL_BOUND_SIZE); // Some day we'll use this param right { + let mut payload = BytesMut::with_capacity(function.len()); + payload.extend(function.bytes()); + let can_do = new_req(CAN_DO, payload.freeze()); for (i, conn) in self .conns .lock() - .unwrap() + .expect("Threads should not panic while holding lock.") .iter_mut() .filter_map(|c| c.to_owned()) .enumerate() @@ -729,10 +732,7 @@ impl Client { // Same tx for all jobs, the jobs themselves will have a response conn ref self.client_data.set_jobs_tx_by_func(k, tx.clone()); } - let mut payload = BytesMut::with_capacity(function.len()); - payload.extend(function.bytes()); - let can_do = new_req(CAN_DO, payload.freeze()); - conn.send_packet(can_do).await?; + conn.send_packet(can_do.clone()).await?; info!("Sent CAN_DO({}) to {}", function, self.servers[i]); } } diff --git a/rustygear/src/clientdata.rs b/rustygear/src/clientdata.rs index 304f392..80ceb44 100644 --- a/rustygear/src/clientdata.rs +++ b/rustygear/src/clientdata.rs @@ -115,31 +115,59 @@ impl ClientData { pub fn receivers(&self) -> MutexGuard { trace!("Locking receivers"); - self.receivers.lock().unwrap() + self.receivers + .lock() + .expect("Threads should not panic while holding lock.") } pub fn echo_tx(&self) -> Sender { - self.senders.read().unwrap().echo_tx.clone() + self.senders + .read() + .expect("Threads should not panic while holding lock.") + .echo_tx + .clone() } pub fn job_created_tx(&self) -> Sender { - self.senders.read().unwrap().job_created_tx.clone() + self.senders + .read() + .expect("Threads should not panic while holding lock.") + .job_created_tx + .clone() } pub fn error_tx(&self) -> Sender<(Bytes, Bytes)> { - self.senders.read().unwrap().error_tx.clone() + self.senders + .read() + .expect("Threads should not panic while holding lock.") + .error_tx + .clone() } pub fn status_res_tx(&self) -> Sender { - self.senders.read().unwrap().status_res_tx.clone() + self.senders + .read() + .expect("Threads should not panic while holding lock.") + .status_res_tx + .clone() } pub fn worker_job_tx(&self) -> Sender { - self.senders.read().unwrap().worker_job_tx.clone() + self.senders + .read() + .expect("Threads should not panic while holding lock.") + .worker_job_tx + .clone() } pub fn get_sender_by_handle(&self, handle: &ServerHandle) -> Option> { - match self.senders.read().unwrap().senders_by_handle.get(handle) { + match self + .senders + .read() + .expect("Threads should not panic while holding lock.") + .senders_by_handle + .get(handle) + { None => None, Some(sender) => Some(sender.clone()), } @@ -148,13 +176,19 @@ impl ClientData { pub fn set_sender_by_handle(&mut self, handle: ServerHandle, tx: Sender) { self.senders .write() - .unwrap() + .expect("Threads should not panic while holding lock.") .senders_by_handle .insert(handle, tx); } pub fn get_jobs_tx_by_func(&self, func: &Vec) -> Option> { - match self.senders.read().unwrap().jobs_tx_by_func.get(func) { + match self + .senders + .read() + .expect("Threads should not panic while holding lock.") + .jobs_tx_by_func + .get(func) + { None => None, Some(tx) => Some(tx.clone()), } @@ -163,7 +197,7 @@ impl ClientData { pub fn set_jobs_tx_by_func(&mut self, func: Vec, tx: Sender) { self.senders .write() - .unwrap() + .expect("Threads should not panic while holding lock.") .jobs_tx_by_func .insert(func, tx); }