Skip to content

Commit

Permalink
feat: Introduce io/{read,write}/raw trace events in imap-flow
Browse files Browse the repository at this point in the history
  • Loading branch information
duesee committed Jan 14, 2024
1 parent 8dd7397 commit 379f41b
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ license = "MIT OR Apache-2.0"
bounded-static = "0.5.0"
bytes = "1.5.0"
imap-codec = { version = "2.0.0", features = ["quirk_crlf_relaxed", "bounded-static"] }
imap-types = { version = "2.0.0" }
thiserror = "1.0.49"
tokio = { version = "1.32.0", features = ["io-util"] }
tracing = "0.1.40"

[dev-dependencies]
rand = "0.8.5"
Expand Down
17 changes: 15 additions & 2 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::{fmt::Debug, num::NonZeroUsize, pin::Pin};

use bytes::BytesMut;
use bytes::{Buf, BytesMut};
use imap_types::utils::escape_byte_string;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tracing::trace;

// TODO: Reconsider this. Do we really need Stream + AnyStream? What is the smallest API that we need to expose?

Expand All @@ -22,8 +24,14 @@ impl AnyStream {
///
/// Returns [`StreamError::Closed`] when no bytes could be read.
pub async fn read(&mut self, read_buffer: &mut BytesMut) -> Result<NonZeroUsize, StreamError> {
let old_len = read_buffer.len();
let byte_count = self.0.read_buf(read_buffer).await?;

trace!(
data = escape_byte_string(&read_buffer[old_len..]),
"io/read/raw"
);

match NonZeroUsize::new(byte_count) {
None => {
// The result is 0 if the stream reached "end of file" or the read buffer was
Expand All @@ -40,7 +48,12 @@ impl AnyStream {
/// Returns [`StreamError::Closed`] when not all bytes could be written.
pub async fn write_all(&mut self, write_buffer: &mut BytesMut) -> Result<(), StreamError> {
while !write_buffer.is_empty() {
let byte_count = self.0.write_buf(write_buffer).await?;
let byte_count = self.0.write(write_buffer).await?;
trace!(
data = escape_byte_string(&write_buffer[..byte_count]),
"io/write/raw"
);
write_buffer.advance(byte_count);

if byte_count == 0 {
// The result is 0 if the stream doesn't accept bytes anymore or the write buffer
Expand Down

0 comments on commit 379f41b

Please sign in to comment.