diff --git a/examples/client.rs b/examples/client.rs index 2143c19c..2b6a4419 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -27,17 +27,17 @@ async fn main() { match client.progress().await.unwrap() { ClientFlowEvent::CommandSent { handle: got_handle, - tag, + command, } => { - println!("command sent: {got_handle:?}, {tag:?}"); + println!("command sent: {got_handle:?}, {command:?}"); assert_eq!(handle, got_handle); } ClientFlowEvent::CommandRejected { handle: got_handle, - tag, + command, status, } => { - println!("command rejected: {got_handle:?}, {tag:?}, {status:?}"); + println!("command rejected: {got_handle:?}, {command:?}, {status:?}"); assert_eq!(handle, got_handle); } ClientFlowEvent::DataReceived { data } => { diff --git a/examples/server.rs b/examples/server.rs index cae93bab..93d31b6a 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -15,7 +15,7 @@ async fn main() { stream }; - let mut server = ServerFlow::send_greeting( + let (mut server, _) = ServerFlow::send_greeting( AnyStream::new(stream), ServerFlowOptions::default(), Greeting::ok(None, "Hello, World!").unwrap(), @@ -33,8 +33,11 @@ async fn main() { server.enqueue_status(Status::no(Some(command.tag), None, "...").unwrap()), ); } - ServerFlowEvent::ResponseSent { handle: got_handle } => { - println!("response sent: {handle:?}"); + ServerFlowEvent::ResponseSent { + handle: got_handle, + response, + } => { + println!("response sent: {response:?}"); assert_eq!(handle, Some(got_handle)); } } diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index da2cd214..afdfd429 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -2,6 +2,7 @@ use std::{net::SocketAddr, sync::Arc}; use colored::Colorize; use imap_codec::imap_types::{ + bounded_static::ToBoundedStatic, core::Text, response::{Code, Status}, }; @@ -204,7 +205,7 @@ impl Proxy { util::filter_capabilities_in_greeting(&mut greeting); - let mut client_to_proxy = { + let (mut client_to_proxy, greeting) = { // TODO: Read options from config let options = ServerFlowOptions { literal_accept_text: Text::try_from(LITERAL_ACCEPT_TEXT).unwrap(), @@ -223,7 +224,7 @@ impl Proxy { } } }; - trace!(role = "p2c", "<--- Forwarded greeting"); + trace!(role = "p2c", ?greeting, "<--- Forwarded greeting"); loop { let control_flow = tokio::select! { @@ -269,9 +270,12 @@ fn handle_client_event( }; match event { - ServerFlowEvent::ResponseSent { handle: _handle } => { + ServerFlowEvent::ResponseSent { + handle: _handle, + response, + } => { // TODO: log handle - trace!(role = "p2c", "<--- Forwarded response"); + trace!(role = "p2c", ?response, "<--- Forwarded response"); } ServerFlowEvent::CommandReceived { command } => { trace!(command=%format!("{:?}", command).red(), role = "c2p", "|--> Received command"); @@ -309,26 +313,31 @@ fn handle_server_event( match event { ClientFlowEvent::CommandSent { handle: _handle, - tag, + command, } => { // TODO: log handle - trace!(role = "p2s", ?tag, "---> Forwarded command"); + trace!(role = "p2s", ?command, "---> Forwarded command"); } ClientFlowEvent::CommandRejected { handle: _handle, - tag, + command, status, } => { // TODO: log handle - trace!(role = "p2s", ?tag, ?status, "---> Aborted command"); + trace!(role = "p2s", ?command, ?status, "---> Aborted command"); let status = match status.code() { Some(Code::Alert) => { // Keep the alert message because it MUST be displayed to the user - Status::bad(Some(tag), Some(Code::Alert), status.text().clone()).unwrap() + Status::bad( + Some(command.tag), + Some(Code::Alert), + status.text().to_static(), + ) + .unwrap() } _ => { // Use generic message because the original code and text might be misleading - Status::bad(Some(tag), None, COMMAND_REJECTED_TEXT).unwrap() + Status::bad(Some(command.tag), None, COMMAND_REJECTED_TEXT).unwrap() } }; let _handle = client_to_proxy.enqueue_status(status); diff --git a/src/client.rs b/src/client.rs index e1a246df..471d2fe5 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,10 +1,8 @@ -use bounded_static::ToBoundedStatic; use bytes::BytesMut; use imap_codec::{ decode::{GreetingDecodeError, ResponseDecodeError}, imap_types::{ command::Command, - core::Tag, response::{Data, Greeting, Response, Status, StatusBody, StatusKind, Tagged}, }, CommandCodec, GreetingCodec, ResponseCodec, @@ -36,7 +34,7 @@ pub struct ClientFlow { stream: AnyStream, next_command_handle: ClientFlowCommandHandle, - send_command_state: SendCommandState<(Tag<'static>, ClientFlowCommandHandle)>, + send_command_state: SendCommandState, receive_response_state: ReceiveState, } @@ -87,12 +85,11 @@ impl ClientFlow { /// The [`Command`] is not sent immediately but during one of the next calls of /// [`ClientFlow::progress`]. All [`Command`]s are sent in the same order they have been /// enqueued. - pub fn enqueue_command(&mut self, command: Command<'_>) -> ClientFlowCommandHandle { + pub fn enqueue_command(&mut self, command: Command<'static>) -> ClientFlowCommandHandle { // TODO(#53) let handle = self.next_command_handle; self.next_command_handle = ClientFlowCommandHandle(handle.0 + 1); - let tag = command.tag.to_static(); - self.send_command_state.enqueue((tag, handle), command); + self.send_command_state.enqueue(handle, command); handle } @@ -110,7 +107,7 @@ impl ClientFlow { async fn progress_command(&mut self) -> Result, ClientFlowError> { match self.send_command_state.progress(&mut self.stream).await? { - Some((tag, handle)) => Ok(Some(ClientFlowEvent::CommandSent { handle, tag })), + Some((handle, command)) => Ok(Some(ClientFlowEvent::CommandSent { handle, command })), None => Ok(None), } } @@ -145,10 +142,10 @@ impl ClientFlow { match response { Response::Status(status) => { - if let Some((tag, handle)) = self.maybe_abort_command(&status) { + if let Some((handle, command)) = self.maybe_abort_command(&status) { break Some(ClientFlowEvent::CommandRejected { handle, - tag, + command, status, }); } else { @@ -169,15 +166,15 @@ impl ClientFlow { fn maybe_abort_command( &mut self, status: &Status, - ) -> Option<(Tag<'static>, ClientFlowCommandHandle)> { - let (command_tag, _) = self.send_command_state.command_in_progress()?; + ) -> Option<(ClientFlowCommandHandle, Command<'static>)> { + let command = self.send_command_state.command_in_progress()?; match status { Status::Tagged(Tagged { tag, body: StatusBody { kind, .. }, .. - }) if *kind == StatusKind::Bad && tag == command_tag => { + }) if *kind == StatusKind::Bad && tag == &command.tag => { self.send_command_state.abort_command() } _ => None, @@ -200,15 +197,15 @@ pub enum ClientFlowEvent { CommandSent { /// The handle of the enqueued [`Command`]. handle: ClientFlowCommandHandle, - /// The [`Tag`] of the enqueued [`Command`]. - tag: Tag<'static>, + /// Formerly enqueued [`Command`] that was now sent. + command: Command<'static>, }, /// The enqueued [`Command`] wasn't sent completely because the server rejected it. CommandRejected { /// The handle of the enqueued [`Command`]. handle: ClientFlowCommandHandle, - /// The [`Tag`] of the enqueued [`Command`]. - tag: Tag<'static>, + /// Formerly enqueued [`Command`] that was now rejected. + command: Command<'static>, /// The [`Status`] sent by the server in order to reject the [`Command`]. /// [`ClientFlow`] has already handled this [`Status`] but it might still have /// useful information that could be logged or displayed to the user diff --git a/src/send.rs b/src/send.rs index 413165a8..ad9f6e4a 100644 --- a/src/send.rs +++ b/src/send.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::{collections::VecDeque, fmt::Debug}; use bytes::BytesMut; use imap_codec::{ @@ -32,19 +32,25 @@ impl SendCommandState { } } - pub fn enqueue(&mut self, key: K, command: Command<'_>) { + pub fn enqueue(&mut self, key: K, command: Command<'static>) { let fragments = self.codec.encode(&command).collect(); - let entry = SendCommandQueueEntry { key, fragments }; + let entry = SendCommandQueueEntry { + key, + command, + fragments, + }; self.send_queue.push_back(entry); } - pub fn command_in_progress(&self) -> Option<&K> { - self.send_progress.as_ref().map(|x| &x.key) + pub fn command_in_progress(&self) -> Option<&Command<'static>> { + self.send_progress.as_ref().map(|x| &x.command) } - pub fn abort_command(&mut self) -> Option { + pub fn abort_command(&mut self) -> Option<(K, Command<'static>)> { self.write_buffer.clear(); - self.send_progress.take().map(|x| x.key) + self.send_progress + .take() + .map(|progress| (progress.key, progress.command)) } pub fn continue_command(&mut self) { @@ -65,7 +71,7 @@ impl SendCommandState { pub async fn progress( &mut self, stream: &mut AnyStream, - ) -> Result, tokio::io::Error> { + ) -> Result)>, tokio::io::Error> { let progress = match self.send_progress.take() { Some(progress) => { // We are currently sending a command to the server. This sending process was @@ -82,6 +88,7 @@ impl SendCommandState { // Start sending the next command SendCommandProgress { key: entry.key, + command: entry.command, next_literal: None, next_fragments: entry.fragments, } @@ -138,7 +145,10 @@ impl SendCommandState { Ok(None) } else { // Command was sent completely - Ok(self.send_progress.take().map(|progress| progress.key)) + Ok(self + .send_progress + .take() + .map(|progress| (progress.key, progress.command))) } } } @@ -146,12 +156,14 @@ impl SendCommandState { #[derive(Debug)] struct SendCommandQueueEntry { key: K, + command: Command<'static>, fragments: VecDeque, } #[derive(Debug)] struct SendCommandProgress { key: K, + command: Command<'static>, // If defined this literal need to be sent before `next_fragments`. next_literal: Option, // The fragments that need to be sent. @@ -167,30 +179,40 @@ struct SendCommandLiteralProgress { } #[derive(Debug)] -pub struct SendResponseState { +pub struct SendResponseState +where + C::Message<'static>: Debug, +{ codec: C, // The responses that should be sent. - send_queue: VecDeque>, - // Key of the response that is currently being sent. - send_in_progress_key: Option, + send_queue: VecDeque>, + // State of the response that is currently being sent. + send_progress: Option>, // Used for writing the current response to the stream. // Should be empty if `send_in_progress_key` is `None`. write_buffer: BytesMut, } -impl SendResponseState { +impl SendResponseState +where + C::Message<'static>: Debug, +{ pub fn new(codec: C, write_buffer: BytesMut) -> Self { Self { codec, send_queue: VecDeque::new(), - send_in_progress_key: None, + send_progress: None, write_buffer, } } - pub fn enqueue(&mut self, key: K, response: C::Message<'_>) { + pub fn enqueue(&mut self, key: K, response: C::Message<'static>) { let fragments = self.codec.encode(&response).collect(); - let entry = SendResponseQueueEntry { key, fragments }; + let entry = SendResponseQueueEntry { + key, + response, + fragments, + }; self.send_queue.push_back(entry); } @@ -202,12 +224,12 @@ impl SendResponseState { pub async fn progress( &mut self, stream: &mut AnyStream, - ) -> Result, tokio::io::Error> { - let send_in_progress_key = match self.send_in_progress_key.take() { - Some(key) => { + ) -> Result)>, tokio::io::Error> { + let progress = match self.send_progress.take() { + Some(progress) => { // We are currently sending a response. This sending process was // previously aborted because the `Future` was dropped while sending. - key + progress } None => { let Some(entry) = self.send_queue.pop_front() else { @@ -226,21 +248,40 @@ impl SendResponseState { self.write_buffer.extend(data); } - entry.key + SendResponseProgress { + key: entry.key, + response: entry.response, + } } }; - self.send_in_progress_key = Some(send_in_progress_key); + self.send_progress = Some(progress); // Send all bytes of current response stream.0.write_all_buf(&mut self.write_buffer).await?; - // response was sent completely - Ok(self.send_in_progress_key.take()) + // Response was sent completely + Ok(self + .send_progress + .take() + .map(|progress| (progress.key, progress.response))) } } #[derive(Debug)] -struct SendResponseQueueEntry { +struct SendResponseQueueEntry +where + C::Message<'static>: Debug, +{ key: K, + response: C::Message<'static>, fragments: Vec, } + +#[derive(Debug)] +struct SendResponseProgress +where + C::Message<'static>: Debug, +{ + key: K, + response: C::Message<'static>, +} diff --git a/src/server.rs b/src/server.rs index fe4b544c..eede8129 100644 --- a/src/server.rs +++ b/src/server.rs @@ -56,14 +56,18 @@ impl ServerFlow { pub async fn send_greeting( mut stream: AnyStream, options: ServerFlowOptions, - greeting: Greeting<'_>, - ) -> Result { + greeting: Greeting<'static>, + ) -> Result<(Self, Greeting<'static>), ServerFlowError> { // Send greeting let write_buffer = BytesMut::new(); let mut send_greeting_state = SendResponseState::new(GreetingCodec::default(), write_buffer); send_greeting_state.enqueue((), greeting); - while let Some(()) = send_greeting_state.progress(&mut stream).await? {} + let greeting = loop { + if let Some(((), greeting)) = send_greeting_state.progress(&mut stream).await? { + break greeting; + } + }; // Successfully sent greeting, construct instance let write_buffer = send_greeting_state.finish(); @@ -81,7 +85,7 @@ impl ServerFlow { literal_reject_text: options.literal_reject_text, }; - Ok(server_flow) + Ok((server_flow, greeting)) } /// Enqueues the [`Data`] response for being sent to the client. @@ -89,7 +93,7 @@ impl ServerFlow { /// The response is not sent immediately but during one of the next calls of /// [`ServerFlow::progress`]. All responses are sent in the same order they have been /// enqueued. - pub fn enqueue_data(&mut self, data: Data<'_>) -> ServerFlowResponseHandle { + pub fn enqueue_data(&mut self, data: Data<'static>) -> ServerFlowResponseHandle { let handle = self.next_response_handle(); self.send_response_state .enqueue(Some(handle), Response::Data(data)); @@ -101,7 +105,7 @@ impl ServerFlow { /// The response is not sent immediately but during one of the next calls of /// [`ServerFlow::progress`]. All responses are sent in the same order they have been /// enqueued. - pub fn enqueue_status(&mut self, status: Status<'_>) -> ServerFlowResponseHandle { + pub fn enqueue_status(&mut self, status: Status<'static>) -> ServerFlowResponseHandle { let handle = self.next_response_handle(); self.send_response_state .enqueue(Some(handle), Response::Status(status)); @@ -128,8 +132,18 @@ impl ServerFlow { async fn progress_response(&mut self) -> Result, ServerFlowError> { match self.send_response_state.progress(&mut self.stream).await? { - Some(Some(handle)) => Ok(Some(ServerFlowEvent::ResponseSent { handle })), - _ => Ok(None), + Some((Some(handle), response)) => { + // A response was sucessfully sent, inform the caller + Ok(Some(ServerFlowEvent::ResponseSent { handle, response })) + } + Some((None, _)) => { + // An internally created response was sent, don't inform the caller + Ok(None) + } + _ => { + // No progress yet + Ok(None) + } } } @@ -202,6 +216,8 @@ pub enum ServerFlowEvent { ResponseSent { /// The handle of the enqueued [`Response`]. handle: ServerFlowResponseHandle, + /// Formerly enqueued [`Response`] that was now sent. + response: Response<'static>, }, CommandReceived { command: Command<'static>, diff --git a/tasks/src/lib.rs b/tasks/src/lib.rs index 7b4f0931..4733b0d1 100644 --- a/tasks/src/lib.rs +++ b/tasks/src/lib.rs @@ -105,10 +105,10 @@ impl Scheduler { let event = self.flow.progress().await?; match event { - ClientFlowEvent::CommandSent { tag, handle } => { + ClientFlowEvent::CommandSent { handle, command } => { // This `unwrap` can't fail because `waiting_tasks` contains all unsent `Commands`. let entry = self.waiting_tasks.remove(&handle).unwrap(); - self.active_tasks.insert(handle, (tag, entry)); + self.active_tasks.insert(handle, (command.tag, entry)); } ClientFlowEvent::CommandRejected { handle, status, .. } => { let body = match status {