Skip to content

Commit

Permalink
make stream optional
Browse files Browse the repository at this point in the history
  • Loading branch information
Nastasia Emelianova authored and Nastasia Emelianova committed Dec 26, 2023
1 parent b66a8da commit 32088ef
Showing 1 changed file with 47 additions and 37 deletions.
84 changes: 47 additions & 37 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::{Buf, BufMut};
use log::{warn, error};
use log::{error, warn};
use rmp_serde::Serializer;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use tokio::{
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct RetryConfig {
}

pub struct Worker<StreamType> {
stream: StreamType,
stream: Option<StreamType>,
receiver: Receiver<Message>,
retry_config: RetryConfig,
}
Expand All @@ -95,7 +95,7 @@ where
{
pub fn new(stream: StreamType, receiver: Receiver<Message>, retry_config: RetryConfig) -> Self {
Self {
stream,
stream: Some(stream),
receiver,
retry_config,
}
Expand All @@ -118,7 +118,8 @@ where
Err(e) => {
error!("GOT ERROR FROM write_with_retry: {:#?}", e.to_string());
// TODO: attempt to reconnect the tcp stream we're writing to
continue},
continue;
}
};
}
Err(RecvError::Closed) | Ok(Message::Terminate) => {
Expand All @@ -145,8 +146,7 @@ where

match self.write(record).await {
Ok(_) => return Ok(()),
Err(Error::ConnectionClosed) => {
return Err(Error::ConnectionClosed)},
Err(Error::ConnectionClosed) => return Err(Error::ConnectionClosed),
Err(e) => {
warn!("Received error when writing: {:?}", e.to_string());
}
Expand All @@ -164,33 +164,38 @@ where
}

async fn write(&mut self, record: &SerializedRecord) -> Result<(), Error> {
match self.stream.write_all(record.record.chunk()).await {
Ok(_) => {
let received_ack = self.read_ack().await?;

if received_ack.ack != record.chunk {
warn!(
"ack and chunk did not match. ack: {}, chunk: {}",
received_ack.ack,
record.chunk
);
Err(Error::AckUnmatched(received_ack.ack, record.chunk.clone()))
} else {
Ok(())
if let Some(st) = self.stream.as_mut() {
match st.write_all(record.record.chunk()).await {
Ok(_) => {
let received_ack = self.read_ack().await?;

if received_ack.ack != record.chunk {
warn!(
"ack and chunk did not match. ack: {}, chunk: {}",
received_ack.ack, record.chunk
);
Err(Error::AckUnmatched(received_ack.ack, record.chunk.clone()))
} else {
Ok(())
}
}
// lost connection could look like multpile kinds of errors,
// so we're attempting to catch all of them here
Err(e)
if e.kind() == std::io::ErrorKind::ConnectionReset
|| e.kind() == std::io::ErrorKind::ConnectionAborted
|| e.kind() == tokio::io::ErrorKind::BrokenPipe =>
{
Err(Error::ConnectionClosed)
}
Err(e) => Err(Error::WriteFailed(e.to_string())),
}
// lost connection could look like multpile kinds of errors,
// so we're attempting to catch all of them here
Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset
|| e.kind() == std::io::ErrorKind::ConnectionAborted
|| e.kind() == tokio::io::ErrorKind::BrokenPipe => {
Err(Error::ConnectionClosed)
}
Err(e) => {
Err(Error::WriteFailed(e.to_string()))},
} else {
Err(Error::WriteFailed(
"Connection stream is none, can't write anything".to_string(),
))
}
}


async fn read_ack(&mut self) -> Result<AckResponse, Error> {
let mut buf = bytes::BytesMut::with_capacity(64);
Expand All @@ -199,14 +204,19 @@ where
return Ok(ack);
}

if self
.stream
.read_buf(&mut buf)
.await
.map_err(|e| Error::ReadFailed(e.to_string()))?
== 0
{
return Err(Error::ConnectionClosed);
if let Some(st) = self.stream.as_mut() {
if st
.read_buf(&mut buf)
.await
.map_err(|e| Error::ReadFailed(e.to_string()))?
== 0
{
return Err(Error::ConnectionClosed);
}
} else {
return Err(Error::WriteFailed(
"Connection stream is none, can't acknowledge anything".to_string(),
));
}
}
}
Expand Down

0 comments on commit 32088ef

Please sign in to comment.