Skip to content

Commit

Permalink
Make CAN_DO more robust
Browse files Browse the repository at this point in the history
We don't need to do anything in the loop except clone the original
packet.

Also reviewing more unwraps and converting to expects as we review the
code underneath.
  • Loading branch information
SpamapS committed May 31, 2024
1 parent b57f3ca commit 10b83c1
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 17 deletions.
14 changes: 7 additions & 7 deletions rustygear/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ impl Client {
}
reader_conns
.lock()
.unwrap()
.expect("Threads should not panic while holding lock")
.get_mut(offset)
.and_then(|conn| Some(conn.set_active(false)));
if let Err(e) = reader_ctx.send(offset).await {
Expand All @@ -479,7 +479,7 @@ impl Client {
error!("Connection ({}) dropped", offset);
writer_conns
.lock()
.unwrap()
.expect("Threads should not panic while holding lock")
.get_mut(offset)
.and_then(|conn| {
Some(conn.set_active(false))
Expand Down Expand Up @@ -715,10 +715,13 @@ impl Client {
{
let (tx, mut rx) = channel(CLIENT_CHANNEL_BOUND_SIZE); // Some day we'll use this param right
{
let mut payload = BytesMut::with_capacity(function.len());
payload.extend(function.bytes());
let can_do = new_req(CAN_DO, payload.freeze());
for (i, conn) in self
.conns
.lock()
.unwrap()
.expect("Threads should not panic while holding lock.")
.iter_mut()
.filter_map(|c| c.to_owned())
.enumerate()
Expand All @@ -729,10 +732,7 @@ impl Client {
// Same tx for all jobs, the jobs themselves will have a response conn ref
self.client_data.set_jobs_tx_by_func(k, tx.clone());
}
let mut payload = BytesMut::with_capacity(function.len());
payload.extend(function.bytes());
let can_do = new_req(CAN_DO, payload.freeze());
conn.send_packet(can_do).await?;
conn.send_packet(can_do.clone()).await?;
info!("Sent CAN_DO({}) to {}", function, self.servers[i]);
}
}
Expand Down
54 changes: 44 additions & 10 deletions rustygear/src/clientdata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,31 +115,59 @@ impl ClientData {

pub fn receivers(&self) -> MutexGuard<ClientReceivers> {
trace!("Locking receivers");
self.receivers.lock().unwrap()
self.receivers
.lock()
.expect("Threads should not panic while holding lock.")
}

pub fn echo_tx(&self) -> Sender<Bytes> {
self.senders.read().unwrap().echo_tx.clone()
self.senders
.read()
.expect("Threads should not panic while holding lock.")
.echo_tx
.clone()
}

pub fn job_created_tx(&self) -> Sender<JobCreated> {
self.senders.read().unwrap().job_created_tx.clone()
self.senders
.read()
.expect("Threads should not panic while holding lock.")
.job_created_tx
.clone()
}

pub fn error_tx(&self) -> Sender<(Bytes, Bytes)> {
self.senders.read().unwrap().error_tx.clone()
self.senders
.read()
.expect("Threads should not panic while holding lock.")
.error_tx
.clone()
}

pub fn status_res_tx(&self) -> Sender<JobStatus> {
self.senders.read().unwrap().status_res_tx.clone()
self.senders
.read()
.expect("Threads should not panic while holding lock.")
.status_res_tx
.clone()
}

pub fn worker_job_tx(&self) -> Sender<WorkerJob> {
self.senders.read().unwrap().worker_job_tx.clone()
self.senders
.read()
.expect("Threads should not panic while holding lock.")
.worker_job_tx
.clone()
}

pub fn get_sender_by_handle(&self, handle: &ServerHandle) -> Option<Sender<WorkUpdate>> {
match self.senders.read().unwrap().senders_by_handle.get(handle) {
match self
.senders
.read()
.expect("Threads should not panic while holding lock.")
.senders_by_handle
.get(handle)
{
None => None,
Some(sender) => Some(sender.clone()),
}
Expand All @@ -148,13 +176,19 @@ impl ClientData {
pub fn set_sender_by_handle(&mut self, handle: ServerHandle, tx: Sender<WorkUpdate>) {
self.senders
.write()
.unwrap()
.expect("Threads should not panic while holding lock.")
.senders_by_handle
.insert(handle, tx);
}

pub fn get_jobs_tx_by_func(&self, func: &Vec<u8>) -> Option<Sender<WorkerJob>> {
match self.senders.read().unwrap().jobs_tx_by_func.get(func) {
match self
.senders
.read()
.expect("Threads should not panic while holding lock.")
.jobs_tx_by_func
.get(func)
{
None => None,
Some(tx) => Some(tx.clone()),
}
Expand All @@ -163,7 +197,7 @@ impl ClientData {
pub fn set_jobs_tx_by_func(&mut self, func: Vec<u8>, tx: Sender<WorkerJob>) {
self.senders
.write()
.unwrap()
.expect("Threads should not panic while holding lock.")
.jobs_tx_by_func
.insert(func, tx);
}
Expand Down

0 comments on commit 10b83c1

Please sign in to comment.