Skip to content

Commit

Permalink
remove optional stream from this branch
Browse files Browse the repository at this point in the history
  • Loading branch information
Nastasia Emelianova authored and Nastasia Emelianova committed Jan 9, 2024
1 parent 0958a31 commit e31bf29
Showing 1 changed file with 46 additions and 57 deletions.
103 changes: 46 additions & 57 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use rmp_serde::Serializer;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::broadcast::{error::{RecvError, self}, Receiver},
sync::broadcast::{
error::{self, RecvError},
Receiver,
},
time::Duration,
};

Expand Down Expand Up @@ -84,7 +87,7 @@ pub struct RetryConfig {
}

pub struct Worker<StreamType> {
stream: Option<StreamType>,
stream: StreamType,
receiver: Receiver<Message>,
retry_config: RetryConfig,
}
Expand All @@ -95,7 +98,7 @@ where
{
pub fn new(stream: StreamType, receiver: Receiver<Message>, retry_config: RetryConfig) -> Self {
Self {
stream: Some(stream),
stream,
receiver,
retry_config,
}
Expand All @@ -115,19 +118,17 @@ where

match self.write_with_retry(&record).await {
Ok(_) => {}
Err(e) => {
match e {
Error::MaxRetriesExceeded => {
error!("Reached MaxRetriesExceeded");
break;
},
Error::ConnectionClosed => {
error!("Reached ConnectionClosed");
break;
},
_ => continue,
Err(e) => match e {
Error::MaxRetriesExceeded => {
error!("Reached MaxRetriesExceeded");
break;
}
}
Error::ConnectionClosed => {
error!("Reached ConnectionClosed");
break;
}
_ => continue,
},
};
}
Err(RecvError::Closed) | Ok(Message::Terminate) => {
Expand Down Expand Up @@ -172,36 +173,30 @@ where
}

async fn write(&mut self, record: &SerializedRecord) -> Result<(), Error> {
if let Some(st) = self.stream.as_mut() {
match st.write_all(record.record.chunk()).await {
Ok(_) => {
let received_ack = self.read_ack().await?;

if received_ack.ack != record.chunk {
warn!(
"ack and chunk did not match. ack: {}, chunk: {}",
received_ack.ack, record.chunk
);
Err(Error::AckUnmatched(received_ack.ack, record.chunk.clone()))
} else {
Ok(())
}
}
// lost connection could look like multpile kinds of errors,
// so we're attempting to catch all of them here
Err(e)
if e.kind() == std::io::ErrorKind::ConnectionReset
|| e.kind() == std::io::ErrorKind::ConnectionAborted
|| e.kind() == tokio::io::ErrorKind::BrokenPipe =>
{
Err(Error::ConnectionClosed)
match self.stream.write_all(record.record.chunk()).await {
Ok(_) => {
let received_ack = self.read_ack().await?;

if received_ack.ack != record.chunk {
warn!(
"ack and chunk did not match. ack: {}, chunk: {}",
received_ack.ack, record.chunk
);
Err(Error::AckUnmatched(received_ack.ack, record.chunk.clone()))
} else {
Ok(())
}
Err(e) => Err(Error::WriteFailed(e.to_string())),
}
} else {
Err(Error::WriteFailed(
"Connection stream is none, can't write anything".to_string(),
))
// lost connection could look like multpile kinds of errors,
// so we're attempting to catch all of them here
Err(e)
if e.kind() == std::io::ErrorKind::ConnectionReset
|| e.kind() == std::io::ErrorKind::ConnectionAborted
|| e.kind() == tokio::io::ErrorKind::BrokenPipe =>
{
Err(Error::ConnectionClosed)
}
Err(e) => Err(Error::WriteFailed(e.to_string())),
}
}

Expand All @@ -211,20 +206,14 @@ where
if let Ok(ack) = rmp_serde::from_slice::<AckResponse>(&buf) {
return Ok(ack);
}

if let Some(st) = self.stream.as_mut() {
if st
.read_buf(&mut buf)
.await
.map_err(|e| Error::ReadFailed(e.to_string()))?
== 0
{
return Err(Error::ConnectionClosed);
}
} else {
return Err(Error::WriteFailed(
"Connection stream is none, can't acknowledge anything".to_string(),
));
if self
.stream
.read_buf(&mut buf)
.await
.map_err(|e| Error::ReadFailed(e.to_string()))?
== 0
{
return Err(Error::ConnectionClosed);
}
}
}
Expand Down

0 comments on commit e31bf29

Please sign in to comment.