Skip to content

Commit

Permalink
propagate errors from write_with_retry
Browse files Browse the repository at this point in the history
  • Loading branch information
Nastasia Emelianova authored and Nastasia Emelianova committed Jan 9, 2024
1 parent 32088ef commit 0958a31
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use rmp_serde::Serializer;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::broadcast::{error::RecvError, Receiver},
sync::broadcast::{error::{RecvError, self}, Receiver},
time::Duration,
};

Expand Down Expand Up @@ -116,9 +116,17 @@ where
match self.write_with_retry(&record).await {
Ok(_) => {}
Err(e) => {
error!("GOT ERROR FROM write_with_retry: {:#?}", e.to_string());
// TODO: attempt to reconnect the tcp stream we're writing to
continue;
match e {
Error::MaxRetriesExceeded => {
error!("Reached MaxRetriesExceeded");
break;
},
Error::ConnectionClosed => {
error!("Reached ConnectionClosed");
break;
},
_ => continue,
}
}
};
}
Expand Down

0 comments on commit 0958a31

Please sign in to comment.