Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: Reimplement stream using futures and without split #290

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

jakoschiko
Copy link
Collaborator

This is an alternative to #274.

It simplifies Stream drastically:

pub struct Stream<S> {
    stream: S,
    read_buffer: ReadBuffer,
    write_buffer: WriteBuffer,
}

impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
    // ...
}

Highlights:

  • Stream is now based on futures-io (or more specific futures-util)
    • This removes the dependencies tokio, tokio-rustls, bytes
  • We don't need to use a split function for the stream anymore
  • The example and tests are still using tokio
    • Compat from tokio-util allows us to be compatible with futures-io
  • The new code ensures that the read buffer is always empty
  • The user can now choose a runtime and a TLS provider
  • Because the implementation does not use TLS anymore, we don't need to test it

Closes #275
Closes #212
Closes #169
Closes #245

@jakoschiko jakoschiko requested a review from duesee November 13, 2024 01:46
soywod

This comment was marked as outdated.

Copy link
Contributor

@soywod soywod left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really nice! I would like just to proposed some minor improvements (syntax and formating):

diff --git a/src/stream.rs b/src/stream.rs
index 8f6e8f7..42f6cc5 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -1,13 +1,15 @@
 use std::{
     collections::VecDeque,
     convert::Infallible,
-    future::{poll_fn, Future},
+    future::poll_fn,
     io::IoSlice,
-    pin::pin,
-    task::{Context, Poll},
+    task::{ready, Context, Poll},
 };
 
-use futures_util::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+use futures_util::{
+    io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
+    FutureExt,
+};
 #[cfg(debug_assertions)]
 use imap_codec::imap_types::utils::escape_byte_string;
 use thiserror::Error;
@@ -99,13 +101,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
                     self.read_buffer.poll_read(&mut self.stream, cx)?
                 };
 
-                if let Poll::Ready(bytes) = result {
-                    // Provide input bytes to the client/server and try again
-                    state.enqueue_input(bytes);
-                    Poll::Ready(Ok(()))
-                } else {
-                    Poll::Pending
-                }
+                let bytes = ready!(result);
+                // Provide input bytes to the client/server and try again
+                state.enqueue_input(bytes);
+                Poll::Ready(Ok(()))
             })
             .await?;
         };
@@ -148,10 +147,7 @@ impl ReadBuffer {
         cx: &mut Context<'_>,
     ) -> Poll<Result<&[u8], ReadWriteError>> {
         // Constructing this future is cheap
-        let read_future = pin!(stream.read(&mut self.bytes));
-        let Poll::Ready(byte_count) = read_future.poll(cx)? else {
-            return Poll::Pending;
-        };
+        let byte_count = ready!(stream.read(&mut self.bytes).poll_unpin(cx)?);
 
         #[cfg(debug_assertions)]
         trace!(
@@ -200,20 +196,16 @@ impl WriteBuffer {
         stream: &mut S,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), ReadWriteError>> {
-        while !self.bytes.is_empty() {
+        while self.needs_write() {
             let write_slices = &self.write_slices();
 
             // Constructing this future is cheap
-            let write_future = pin!(stream.write_vectored(write_slices));
-            let Poll::Ready(byte_count) = write_future.poll(cx)? else {
-                return Poll::Pending;
-            };
+            let byte_count = ready!(stream.write_vectored(write_slices).poll_unpin(cx)?);
 
             #[cfg(debug_assertions)]
             trace!(
                 data = escape_byte_string(
-                    self
-                        .bytes
+                    self.bytes
                         .iter()
                         .copied()
                         .take(byte_count)

@soywod
Copy link
Contributor

soywod commented Nov 13, 2024

  1. I like the direction it takes. As discussed with @duesee as well, having a generic stream that does not care about TLS is definitely the way to go. It makes code more maintainable and dependencies/features at their minimum.
  2. Now that we support futures' traits for the async side, it could be nice to support std's traits for the sync side. See below a patch proposition.
  3. Would it make sense to export this stream logic in a dedicated crate? Having a generic, DuplexStream that can both read and write, working on both sync/async worlds is definitely appealing to me. It would also be useful for the next point:
  4. TLS: I did some experiments on my own. So far, a simple function prepare_imap_starttls that takes a mutable reference to a generic stream works well. It would be even more reliable with a duplex stream to avoid dead locks. I'm still working on a nicer version, I will propose sth ASAP.
  5. Is MaybeTlsStream still useful at this point?

For this patch, I got inspiration from this blog post. Using const bools internally allows us to propose same named functions on the same structure but with different signature. It can be made transparent for users with simple aliases.

diff --git a/src/stream.rs b/src/stream.rs
index 8f6e8f7..a7821ea 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -1,13 +1,15 @@
 use std::{
     collections::VecDeque,
     convert::Infallible,
-    future::{poll_fn, Future},
-    io::IoSlice,
-    pin::pin,
-    task::{Context, Poll},
+    future::poll_fn,
+    io::{IoSlice, Read, Write},
+    task::{ready, Context, Poll},
 };
 
-use futures_util::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+use futures_util::{
+    io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
+    FutureExt,
+};
 #[cfg(debug_assertions)]
 use imap_codec::imap_types::utils::escape_byte_string;
 use thiserror::Error;
@@ -16,28 +18,32 @@ use tracing::trace;
 
 use crate::{Interrupt, Io, State};
 
-pub struct Stream<S> {
+pub const ASYNC: bool = true;
+pub const BLOCKING: bool = false;
+
+pub type Stream<S> = DuplexStream<S, ASYNC>;
+pub type BlockingStream<S> = DuplexStream<S, BLOCKING>;
+
+pub struct DuplexStream<S, const ASYNC: bool> {
     stream: S,
-    read_buffer: ReadBuffer,
-    write_buffer: WriteBuffer,
+    read_buffer: ReadBuffer<ASYNC>,
+    write_buffer: WriteBuffer<ASYNC>,
 }
 
-impl<S> Stream<S> {
+impl<S, const ASYNC: bool> DuplexStream<S, ASYNC> {
     pub fn new(stream: S) -> Self {
         Self {
             stream,
-            read_buffer: ReadBuffer::new(),
-            write_buffer: WriteBuffer::new(),
+            read_buffer: ReadBuffer::<ASYNC>::new(),
+            write_buffer: WriteBuffer::<ASYNC>::new(),
         }
     }
-}
 
-impl<S> Stream<S> {
-    #[cfg(feature = "expose_stream")]
     /// Return the underlying stream for debug purposes (or experiments).
     ///
     /// Note: Writing to or reading from the stream may introduce
     /// conflicts with `imap-next`.
+    #[cfg(feature = "expose_stream")]
     pub fn stream_mut(&mut self) -> &mut S {
         &mut self.stream
     }
@@ -99,13 +105,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
                     self.read_buffer.poll_read(&mut self.stream, cx)?
                 };
 
-                if let Poll::Ready(bytes) = result {
-                    // Provide input bytes to the client/server and try again
-                    state.enqueue_input(bytes);
-                    Poll::Ready(Ok(()))
-                } else {
-                    Poll::Pending
-                }
+                let bytes = ready!(result);
+                // Provide input bytes to the client/server and try again
+                state.enqueue_input(bytes);
+                Poll::Ready(Ok(()))
             })
             .await?;
         };
@@ -114,6 +117,42 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
     }
 }
 
+impl<S: Read + Write> BlockingStream<S> {
+    pub fn next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
+        let event = loop {
+            // Progress the client/server
+            let result = state.next();
+
+            // Return events immediately without doing IO
+            let interrupt = match result {
+                Err(interrupt) => interrupt,
+                Ok(event) => break event,
+            };
+
+            // Return errors immediately without doing IO
+            let io = match interrupt {
+                Interrupt::Io(io) => io,
+                Interrupt::Error(err) => return Err(Error::State(err)),
+            };
+
+            // Handle the output bytes from the client/server
+            if let Io::Output(bytes) = io {
+                self.write_buffer.push_bytes(bytes);
+            }
+
+            // Progress the stream
+            if self.write_buffer.needs_write() {
+                self.write_buffer.write(&mut self.stream)?;
+            };
+
+            let bytes = self.read_buffer.read(&mut self.stream)?;
+            state.enqueue_input(bytes);
+        };
+
+        Ok(event)
+    }
+}
+
 /// Error during reading into or writing from a stream.
 #[derive(Debug, Error)]
 pub enum Error<E> {
@@ -131,27 +170,26 @@ pub enum Error<E> {
     State(E),
 }
 
-struct ReadBuffer {
+struct ReadBuffer<const ASYNC: bool> {
     bytes: Box<[u8]>,
 }
 
-impl ReadBuffer {
+impl<const ASYNC: bool> ReadBuffer<ASYNC> {
     fn new() -> Self {
         Self {
             bytes: vec![0; 1024].into(),
         }
     }
+}
 
+impl ReadBuffer<ASYNC> {
     fn poll_read<S: AsyncRead + Unpin>(
         &mut self,
         stream: &mut S,
         cx: &mut Context<'_>,
     ) -> Poll<Result<&[u8], ReadWriteError>> {
         // Constructing this future is cheap
-        let read_future = pin!(stream.read(&mut self.bytes));
-        let Poll::Ready(byte_count) = read_future.poll(cx)? else {
-            return Poll::Pending;
-        };
+        let byte_count = ready!(stream.read(&mut self.bytes).poll_unpin(cx)?);
 
         #[cfg(debug_assertions)]
         trace!(
@@ -168,14 +206,34 @@ impl ReadBuffer {
     }
 }
 
-struct WriteBuffer {
+impl ReadBuffer<BLOCKING> {
+    fn read<S: Read>(&mut self, stream: &mut S) -> Result<&[u8], ReadWriteError> {
+        // Constructing this future is cheap
+        let byte_count = stream.read(&mut self.bytes)?;
+
+        #[cfg(debug_assertions)]
+        trace!(
+            data = escape_byte_string(&self.bytes[0..byte_count]),
+            "io/read/raw"
+        );
+
+        if byte_count == 0 {
+            // The result is 0 if the stream reached "end of file"
+            return Err(ReadWriteError::Closed);
+        }
+
+        Ok(&self.bytes[0..byte_count])
+    }
+}
+
+struct WriteBuffer<const ASYNC: bool> {
     /// Output bytes that needs to be written.
     ///
     /// Enqueue output bytes to the back, dequeue written bytes from the front.
     bytes: VecDeque<u8>,
 }
 
-impl WriteBuffer {
+impl<const ASYNC: bool> WriteBuffer<ASYNC> {
     fn new() -> Self {
         Self {
             bytes: VecDeque::new(),
@@ -194,26 +252,24 @@ impl WriteBuffer {
         let (init, tail) = self.bytes.as_slices();
         [IoSlice::new(init), IoSlice::new(tail)]
     }
+}
 
+impl WriteBuffer<ASYNC> {
     fn poll_write<S: AsyncWrite + Unpin>(
         &mut self,
         stream: &mut S,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), ReadWriteError>> {
-        while !self.bytes.is_empty() {
+        while self.needs_write() {
             let write_slices = &self.write_slices();
 
             // Constructing this future is cheap
-            let write_future = pin!(stream.write_vectored(write_slices));
-            let Poll::Ready(byte_count) = write_future.poll(cx)? else {
-                return Poll::Pending;
-            };
+            let byte_count = ready!(stream.write_vectored(write_slices).poll_unpin(cx)?);
 
             #[cfg(debug_assertions)]
             trace!(
                 data = escape_byte_string(
-                    self
-                        .bytes
+                    self.bytes
                         .iter()
                         .copied()
                         .take(byte_count)
@@ -237,6 +293,41 @@ impl WriteBuffer {
     }
 }
 
+impl WriteBuffer<BLOCKING> {
+    fn write<S: Write>(&mut self, stream: &mut S) -> Result<(), ReadWriteError> {
+        while self.needs_write() {
+            let write_slices = &self.write_slices();
+
+            // Constructing this future is cheap
+            let byte_count = stream.write_vectored(write_slices)?;
+
+            #[cfg(debug_assertions)]
+            trace!(
+                data = escape_byte_string(
+                    self.bytes
+                        .iter()
+                        .copied()
+                        .take(byte_count)
+                        .collect::<Vec<_>>()
+                ),
+                "io/write/raw"
+            );
+
+            // Drop written bytes
+            drop(self.bytes.drain(..byte_count));
+
+            if byte_count == 0 {
+                // The result is 0 if the stream doesn't accept bytes anymore or the write buffer
+                // was already empty before calling `write_buf`. Because we checked the buffer
+                // we know that the first case occurred.
+                return Err(ReadWriteError::Closed);
+            }
+        }
+
+        Ok(())
+    }
+}
+
 #[derive(Debug, Error)]
 enum ReadWriteError {
     #[error("Stream was closed")]

@jakoschiko
Copy link
Collaborator Author

Your first patch looks good, thanks. I committed it.

Now that we support futures' traits for the async side, it could be nice to support std's traits for the sync side. See below a patch proposition.

I would prefer a separate issue or PR for that. But I'm not a big fan of this topic. See below.

Would it make sense to export this stream logic in a dedicated crate? Having a generic, DuplexStream that can both read and write, working on both sync/async worlds is definitely appealing to me. It would also be useful for the next point:

Not sure how things get easier with a separate crate. The Stream type is very important, I can't even write good examples or integration tests without it. It's only feature-gated because a user might want to write something better/more complicated themself.

TLS: I did some experiments on my own. So far, a simple function prepare_imap_starttls that takes a mutable reference to a generic stream works well. It would be even more reliable with a duplex stream to avoid dead locks. I'm still working on a nicer version, I will propose sth ASAP.

No hurry, STARTTLS is a complicated topic. We need to be slow and careful.

Is MaybeTlsStream still useful at this point?

Yes, e.g. imap-proxy wants to put either TcpStream or TlsStream into Stream, so we need something like Stream<TcpStream OR TlsStream>.

For this patch, I got inspiration from this blog post. Using const bools internally allows us to propose same named functions on the same structure but with different signature. It can be made transparent for users with simple aliases.

Hm, I don't like it. It looks too complicated. I think the type magic is unnecessary, we could achieve something similar by adding a simple impl block:

impl<S: std::io::Read + std::io::Write> Stream<S> {
    pub fn block_next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
        // ...
    }
}

However, the bigger problem is that the sync implementation can't write in read in parallel. If server and client are using this implementation, both would insist to write first and could block each other by that. The async implementation don't have this issue. Maybe we can solve or accept that problem, but I'm not motivated to dig into this issue.

@jakoschiko jakoschiko force-pushed the jakoschiko_futures-stream-without-split branch from f24f8ae to de9d490 Compare November 13, 2024 23:49
@soywod
Copy link
Contributor

soywod commented Nov 14, 2024

Would it make sense to export this stream logic in a dedicated crate? Having a generic, DuplexStream that can both read and write, working on both sync/async worlds is definitely appealing to me. It would also be useful for the next point:

Not sure how things get easier with a separate crate. The Stream type is very important, I can't even write good examples or integration tests without it. It's only feature-gated because a user might want to write something better/more complicated themself.

I believe it would be really useful, and would have no impact on your actual usage in stream.rs. To illustrate my sayings, I refactored, simplified and extracted the core logic of the actual stream into a duplex_stream module (EDIT: module moved to its own crate, see comments below):

Similar to the actual Stream:

  • The Duplex Stream enqueues bytes via DuplexStream::push_bytes
  • The progress_read fn reads bytes from the stream and put them into the internal buffer
  • The progress_write fn writes (and consumes) bytes from the internal buffer into the stream
  • The progress fn reads and writes at the same time

The main differences with Stream:

  • The duplex algorithm has been optimized. The progress fn just polls the write future without carying about the result (except in case of error, return it). Does not matter if this future is pending or ready, it just polls the read future just after. One call to progress can do either a write then a read, or just a read. It reduces the number of loops.
  • The duplex implements Async{Read,Write}, in this case it will act like a regular stream. It just by-passes internal buffers.

It should integrate effortlessly with the actual Stream (EDIT: see last comment for example).

Such a Duplex Stream would be so useful outside of imap-next. I already see usage inside Pimalaya core libs and in the future STARTTLS integration.

TLS: I did some experiments on my own. So far, a simple function prepare_imap_starttls that takes a mutable reference to a generic stream works well. It would be even more reliable with a duplex stream to avoid dead locks. I'm still working on a nicer version, I will propose sth ASAP.

No hurry, STARTTLS is a complicated topic. We need to be slow and careful.

Sure, I wanted to say that I will be ready soon to propose something, so we can discuss on. I find it easier to iterate over sth concrete.

Is MaybeTlsStream still useful at this point?

Yes, e.g. imap-proxy wants to put either TcpStream or TlsStream into Stream, so we need something like Stream<TcpStream OR TlsStream>.

The futures crate exposes a Either<Left, Right> type, would not it be enough to just use Stream<Either<TcpStream, TlsStream<TcpStream>>>? I'm not sure anymore if maintaining an enum with providers is worth it.

I will be able to experiment inside Pimalaya libs anyway, it should give me a great feadback about usability of such idea.

For this patch, I got inspiration from this blog post. Using const bools internally allows us to propose same named functions on the same structure but with different signature. It can be made transparent for users with simple aliases.

Hm, I don't like it. It looks too complicated. I think the type magic is unnecessary, we could achieve something similar by adding a simple impl block:

impl<S: std::io::Read + std::io::Write> Stream<S> {
    pub fn block_next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
        // ...
    }
}

This forces you to have multiple functions with different names, which I don't like either. The user experience is way better and is far more intuitive when you have same function names (even with different implems: params, async etc).

impl<S: std::io::Read + std::io::Write> Stream<S, true> {
    pub fn next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
        // ...
    }
}

impl<S: futures::AsyncRead + futures::AsyncWrite> Stream<S, false> {
    pub async fn next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
        // ...
    }
}
  • As long as this const bool logic is hidden from users, I don't find it complicated nor magical.
  • It prevents you to duplicate either function names like block_* (which I really hate) or entire structs like Block* (which I hate even more, you duplicate more code)
let mut tcp_stream = std::net::TcpStream::connect();
let mut stream = Stream::from(tcp_stream);
stream.next()

let mut tcp_stream = async_std::net::TcpStream::connect();
let mut stream = Stream::from(tcp_stream);
stream.next().await

// versus
stream.block_next()
stream.next().await
BlockStream::from(tcp_stream)

However, the bigger problem is that the sync implementation can't write in read in parallel. If server and client are using this implementation, both would insist to write first and could block each other by that. The async implementation don't have this issue. Maybe we can solve or accept that problem, but I'm not motivated to dig into this issue.

I have some idea that I want to experiment. It's another good argument for exporting the duplex stream, this logic could go there and not impact the actual stream. I am ready to do the export thing and even to maintain it!

@soywod
Copy link
Contributor

soywod commented Nov 14, 2024

I just found sth interesting for std::io::{Read,Write}futures::io::Async{Read,Write} compatibility:

https://docs.rs/futures-util/latest/futures_util/io/struct.AllowStdIo.html

@soywod
Copy link
Contributor

soywod commented Nov 14, 2024

Here what I had in mind: https://github.com/pimalaya/core/tree/master/duplex-stream (see examples)

EDIT: I renamed the project buf-stream as it fits better. I also removed native implementations of (Async)Read and (Async)Write, so that the purpose of BufStream is more explicit:

https://github.com/pimalaya/core/tree/master/buf-stream

@soywod
Copy link
Contributor

soywod commented Nov 14, 2024

This is how it would integrate with imap-next: soywod@1343b3b

@jakoschiko jakoschiko marked this pull request as draft November 16, 2024 23:32
@soywod
Copy link
Contributor

soywod commented Nov 17, 2024

As I mentionned here: #169 (comment), I ran into several issues with the previous implementation.

The actual proposition is the following:

  • 2 structs are availables: std::BufStream (Read + Write), and futures::BufStream (AsyncRead + AsyncWrite + Unpin).
  • All the buffer logic that was implemented in your stream moved there, so it can be reused in any other project.
  • The integration with imap-next remains simple, pushing bytes to the buffered stream and feeding state with its output should be enough. EDIT: integration would look like this.

@jakoschiko
Copy link
Collaborator Author

I believe it would be really useful, and would have no impact on your actual usage in stream.rs. To illustrate my sayings, I refactored, simplified and extracted the core logic of the actual stream into a duplex_stream module.

I misunderstood your proposal. I thought you want to move the entire stream module into an own crate. But I think you are right, something like a DuplexStream would be very useful. If you want to handle send and receive in a single task, then DuplexStream would make things very easy.

However, this is only implementation detail. I would focus on getting this PR merged. I'll refactor the current implementation and introduce a simple DuplexStream in a private module. Later we can discuss whether and how we replace the private module with a crate.

About naming: I like DuplexStream. But tokio::io already contains both DuplexStream and BufStream and they do different things. I will proceed with the name ProgressStream. We can discuss the name later.

The futures crate exposes a Either<Left, Right> type, would not it be enough to just use Stream<Either<TcpStream, TlsStream>>? I'm not sure anymore if maintaining an enum with providers is worth it.

You are absolutely right. If this PR succeeds and Either works, then I'll will delete the repository tokio-maybe-tls if you don't mind. Thanks for the hint.

About sync/async. I think the most common approach is to use different traits like std::io::Read and futures_io::AsyncRead. However, I'm still not convinced that supporting sync is a good idea.

For now I need to tackle some other issues first. The CI is broken and I noticed a fundamental flaw in Stream and the integration tests that I noticed when refactoring this PR.

@jakoschiko jakoschiko force-pushed the jakoschiko_futures-stream-without-split branch from de9d490 to 49f6d84 Compare November 17, 2024 17:54
@soywod
Copy link
Contributor

soywod commented Nov 17, 2024

However, this is only implementation detail. I would focus on getting this PR merged. I'll refactor the current implementation and introduce a simple DuplexStream in a private module. Later we can discuss whether and how we replace the private module with a crate.

Sounds great, do not hesitate to take stuff from my attempt in Pimalaya core.

About naming: I like DuplexStream. But tokio::io already contains both DuplexStream and BufStream and they do different things. I will proceed with the name ProgressStream. We can discuss the name later.

I initially liked DuplexStream, but it implies that you can read and write at the same time, which is not technically the case. The tokio's duplex stream is even different, it acts like a channel (it seems to give a pair of sender/receiver). The tokio's BufStream seems to be closer to what we have in the current stream implem (or in my BufStream attempt). Their implementation is dead simple: BufStream<S> = BufReader<BufWriter<S>>, I will experiment around it tomorrow morning.

So far, BufStream fits better the current usage.

If this PR succeeds and Either works, then I'll will delete the repository tokio-maybe-tls if you don't mind.

Sure!

About sync/async. I think the most common approach is to use different traits like std::io::Read and futures_io::AsyncRead. However, I'm still not convinced that supporting sync is a good idea.

My concern is that the lib would not be usable without runtime. You will always have someone that cannot use async and would like to use the lib. I don't think it would be such a burden to maintain a blocking module, especially if we do right abstractions and put correctly code in common.

Supporting blocking with std and async with futures should be the bare minimum. Other runtimes are optional and could be supported user side, with compatibility layers (like tokio-util::compat).

For now I need to tackle some other issues first. The CI is broken and I noticed a fundamental flaw in Stream and the integration tests that I noticed when refactoring this PR.

Sure, let me know if and how I could help.

@soywod
Copy link
Contributor

soywod commented Nov 18, 2024

If I understood well, it is technically not possible to read from and write into a stream at the same time:

  • Splitting into halves then select!(read, write) is not a good option because splitting involves mutex. It can definitely lead to unexpected state. For example if there is sth to write, but the read half locks first, then the write half cannot lock nor write bytes.
  • After heavily testing the polling write then read (the actual proposition of this PR), it also leads to unexpected results: sometimes bytes are sent twice (write polled twice), sometimes it deadlocks. It is definitely not consistent.

I also dived into the buffered reader/writer topic:

BufWriter or BufReader can improve the speed of programs that make small and repeated write calls to the same file or network socket. It does not help when writing very large amounts at once, or writing just one or a few times.

If I don't mistake, imap-next does not fit into it. Why don't we just directly read from and write into the stream? It is stated in the code:

We read and write the stream simultaneously because otherwise a deadlock between client and server might occur if both sides would only read or only write. We achieve this by polling both operations.

As I said earlier, there is no efficient way to both read and write at the same time. I would rather optimize the usage of read/write inside Stream::next():

match io {
    Io::Output(bytes) => {
        self.stream.write(&bytes).await?;

	// We should always receive bytes, since we just wrote sth.
	// Deadlock cannot happen here.
	
	let mut bytes = vec![0; 1024];
        let n = self.stream.read(&bytes).await?;
        state.enqueue_input(&bytes[..n]);
    }
    Io::NeedMoreInput => {
	// Is there such a case where the state needs more input,
	// but nothing more is available on the stream?
	
	let mut bytes = vec![0; 1024];
        let n = self.stream.read(&bytes).await?;
        state.enqueue_input(&bytes[..n]);
    }
}

I will experiment this idea with high-level apps to see how it behaves.

@soywod
Copy link
Contributor

soywod commented Nov 18, 2024

Very good news: this simple implementation seems to work as expected:

impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
    pub async fn next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
        let event = loop {
            // Progress the client/server
            let result = state.next();

            // Return events immediately without doing IO
            let interrupt = match result {
                Err(interrupt) => interrupt,
                Ok(event) => break event,
            };

            // Return errors immediately without doing IO
            let io = match interrupt {
                Interrupt::Io(io) => io,
                Interrupt::Error(err) => return Err(Error::State(err)),
            };

            // Handle the output bytes from the client/server
            if let Io::Output(ref bytes) = io {
                self.stream.write(bytes).await?;
            }

            // After a write, or if more input is need
            let n = self.stream.read(&mut self.buf).await?;
            state.enqueue_input(&self.buf[..n]);
        };

        Ok(event)
    }
}

I tried this implementation with a simple client and with a synchronizer (using client pool) over 5k+ mails, I did not encounter a single deadlock.

My conclusion is that we do not need buffering nor "simultaneous" read and write, just direct read and write calls is enough.

@soywod
Copy link
Contributor

soywod commented Nov 18, 2024

Side note regarding TLS: I also tried Either inside imap-client and it fits well (at least with a tokio-based TCP stream and a rustls-based TLS stream):

pub struct MaybeTlsStream(Either<Compat<TcpStream>, Compat<TlsStream<TcpStream>>>);

impl MaybeTlsStream {
    pub fn tcp(stream: TcpStream) -> Self {
        Self(Either::Left(stream.compat()))
    }

    pub fn tls(stream: TlsStream<TcpStream>) -> Self {
        Self(Either::Right(stream.compat()))
    }
}

impl AsyncRead for MaybeTlsStream {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<std::io::Result<usize>> {
        match &mut self.get_mut().0 {
            Either::Left(s) => Pin::new(s).poll_read(cx, buf),
            Either::Right(s) => Pin::new(s).poll_read(cx, buf),
        }
    }
}

impl AsyncWrite for MaybeTlsStream {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<std::io::Result<usize>> {
        match &mut self.get_mut().0 {
            Either::Left(s) => Pin::new(s).poll_write(cx, buf),
            Either::Right(s) => Pin::new(s).poll_write(cx, buf),
        }
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        match &mut self.get_mut().0 {
            Either::Left(s) => Pin::new(s).poll_flush(cx),
            Either::Right(s) => Pin::new(s).poll_flush(cx),
        }
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        match &mut self.get_mut().0 {
            Either::Left(s) => Pin::new(s).poll_close(cx),
            Either::Right(s) => Pin::new(s).poll_close(cx),
        }
    }
}

The client can just hold a Stream<MaybeTlsStream>:

pub struct Client {
    host: String,
    stream: Stream<MaybeTlsStream>,
    resolver: Resolver,
    capabilities: Vec1<Capability<'static>>,
    idle_timeout: Duration,
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants