Skip to content

Commit

Permalink
feat: Give back enqueued value
Browse files Browse the repository at this point in the history
  • Loading branch information
jakoschiko authored Nov 15, 2023
1 parent 44f4259 commit b124c10
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 69 deletions.
8 changes: 4 additions & 4 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ async fn main() {
match client.progress().await.unwrap() {
ClientFlowEvent::CommandSent {
handle: got_handle,
tag,
command,
} => {
println!("command sent: {got_handle:?}, {tag:?}");
println!("command sent: {got_handle:?}, {command:?}");
assert_eq!(handle, got_handle);
}
ClientFlowEvent::CommandRejected {
handle: got_handle,
tag,
command,
status,
} => {
println!("command rejected: {got_handle:?}, {tag:?}, {status:?}");
println!("command rejected: {got_handle:?}, {command:?}, {status:?}");
assert_eq!(handle, got_handle);
}
ClientFlowEvent::DataReceived { data } => {
Expand Down
9 changes: 6 additions & 3 deletions examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn main() {
stream
};

let mut server = ServerFlow::send_greeting(
let (mut server, _) = ServerFlow::send_greeting(
AnyStream::new(stream),
ServerFlowOptions::default(),
Greeting::ok(None, "Hello, World!").unwrap(),
Expand All @@ -33,8 +33,11 @@ async fn main() {
server.enqueue_status(Status::no(Some(command.tag), None, "...").unwrap()),
);
}
ServerFlowEvent::ResponseSent { handle: got_handle } => {
println!("response sent: {handle:?}");
ServerFlowEvent::ResponseSent {
handle: got_handle,
response,
} => {
println!("response sent: {response:?}");
assert_eq!(handle, Some(got_handle));
}
}
Expand Down
29 changes: 19 additions & 10 deletions proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{net::SocketAddr, sync::Arc};

use colored::Colorize;
use imap_codec::imap_types::{
bounded_static::ToBoundedStatic,
core::Text,
response::{Code, Status},
};
Expand Down Expand Up @@ -204,7 +205,7 @@ impl Proxy<ConnectedState> {

util::filter_capabilities_in_greeting(&mut greeting);

let mut client_to_proxy = {
let (mut client_to_proxy, greeting) = {
// TODO: Read options from config
let options = ServerFlowOptions {
literal_accept_text: Text::try_from(LITERAL_ACCEPT_TEXT).unwrap(),
Expand All @@ -223,7 +224,7 @@ impl Proxy<ConnectedState> {
}
}
};
trace!(role = "p2c", "<--- Forwarded greeting");
trace!(role = "p2c", ?greeting, "<--- Forwarded greeting");

loop {
let control_flow = tokio::select! {
Expand Down Expand Up @@ -269,9 +270,12 @@ fn handle_client_event(
};

match event {
ServerFlowEvent::ResponseSent { handle: _handle } => {
ServerFlowEvent::ResponseSent {
handle: _handle,
response,
} => {
// TODO: log handle
trace!(role = "p2c", "<--- Forwarded response");
trace!(role = "p2c", ?response, "<--- Forwarded response");
}
ServerFlowEvent::CommandReceived { command } => {
trace!(command=%format!("{:?}", command).red(), role = "c2p", "|--> Received command");
Expand Down Expand Up @@ -309,26 +313,31 @@ fn handle_server_event(
match event {
ClientFlowEvent::CommandSent {
handle: _handle,
tag,
command,
} => {
// TODO: log handle
trace!(role = "p2s", ?tag, "---> Forwarded command");
trace!(role = "p2s", ?command, "---> Forwarded command");
}
ClientFlowEvent::CommandRejected {
handle: _handle,
tag,
command,
status,
} => {
// TODO: log handle
trace!(role = "p2s", ?tag, ?status, "---> Aborted command");
trace!(role = "p2s", ?command, ?status, "---> Aborted command");
let status = match status.code() {
Some(Code::Alert) => {
// Keep the alert message because it MUST be displayed to the user
Status::bad(Some(tag), Some(Code::Alert), status.text().clone()).unwrap()
Status::bad(
Some(command.tag),
Some(Code::Alert),
status.text().to_static(),
)
.unwrap()
}
_ => {
// Use generic message because the original code and text might be misleading
Status::bad(Some(tag), None, COMMAND_REJECTED_TEXT).unwrap()
Status::bad(Some(command.tag), None, COMMAND_REJECTED_TEXT).unwrap()
}
};
let _handle = client_to_proxy.enqueue_status(status);
Expand Down
29 changes: 13 additions & 16 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use bounded_static::ToBoundedStatic;
use bytes::BytesMut;
use imap_codec::{
decode::{GreetingDecodeError, ResponseDecodeError},
imap_types::{
command::Command,
core::Tag,
response::{Data, Greeting, Response, Status, StatusBody, StatusKind, Tagged},
},
CommandCodec, GreetingCodec, ResponseCodec,
Expand Down Expand Up @@ -36,7 +34,7 @@ pub struct ClientFlow {
stream: AnyStream,

next_command_handle: ClientFlowCommandHandle,
send_command_state: SendCommandState<(Tag<'static>, ClientFlowCommandHandle)>,
send_command_state: SendCommandState<ClientFlowCommandHandle>,
receive_response_state: ReceiveState<ResponseCodec>,
}

Expand Down Expand Up @@ -87,12 +85,11 @@ impl ClientFlow {
/// The [`Command`] is not sent immediately but during one of the next calls of
/// [`ClientFlow::progress`]. All [`Command`]s are sent in the same order they have been
/// enqueued.
pub fn enqueue_command(&mut self, command: Command<'_>) -> ClientFlowCommandHandle {
pub fn enqueue_command(&mut self, command: Command<'static>) -> ClientFlowCommandHandle {
// TODO(#53)
let handle = self.next_command_handle;
self.next_command_handle = ClientFlowCommandHandle(handle.0 + 1);
let tag = command.tag.to_static();
self.send_command_state.enqueue((tag, handle), command);
self.send_command_state.enqueue(handle, command);
handle
}

Expand All @@ -110,7 +107,7 @@ impl ClientFlow {

async fn progress_command(&mut self) -> Result<Option<ClientFlowEvent>, ClientFlowError> {
match self.send_command_state.progress(&mut self.stream).await? {
Some((tag, handle)) => Ok(Some(ClientFlowEvent::CommandSent { handle, tag })),
Some((handle, command)) => Ok(Some(ClientFlowEvent::CommandSent { handle, command })),
None => Ok(None),
}
}
Expand Down Expand Up @@ -145,10 +142,10 @@ impl ClientFlow {

match response {
Response::Status(status) => {
if let Some((tag, handle)) = self.maybe_abort_command(&status) {
if let Some((handle, command)) = self.maybe_abort_command(&status) {
break Some(ClientFlowEvent::CommandRejected {
handle,
tag,
command,
status,
});
} else {
Expand All @@ -169,15 +166,15 @@ impl ClientFlow {
fn maybe_abort_command(
&mut self,
status: &Status,
) -> Option<(Tag<'static>, ClientFlowCommandHandle)> {
let (command_tag, _) = self.send_command_state.command_in_progress()?;
) -> Option<(ClientFlowCommandHandle, Command<'static>)> {
let command = self.send_command_state.command_in_progress()?;

match status {
Status::Tagged(Tagged {
tag,
body: StatusBody { kind, .. },
..
}) if *kind == StatusKind::Bad && tag == command_tag => {
}) if *kind == StatusKind::Bad && tag == &command.tag => {
self.send_command_state.abort_command()
}
_ => None,
Expand All @@ -200,15 +197,15 @@ pub enum ClientFlowEvent {
CommandSent {
/// The handle of the enqueued [`Command`].
handle: ClientFlowCommandHandle,
/// The [`Tag`] of the enqueued [`Command`].
tag: Tag<'static>,
/// Formerly enqueued [`Command`] that was now sent.
command: Command<'static>,
},
/// The enqueued [`Command`] wasn't sent completely because the server rejected it.
CommandRejected {
/// The handle of the enqueued [`Command`].
handle: ClientFlowCommandHandle,
/// The [`Tag`] of the enqueued [`Command`].
tag: Tag<'static>,
/// Formerly enqueued [`Command`] that was now rejected.
command: Command<'static>,
/// The [`Status`] sent by the server in order to reject the [`Command`].
/// [`ClientFlow`] has already handled this [`Status`] but it might still have
/// useful information that could be logged or displayed to the user
Expand Down
Loading

0 comments on commit b124c10

Please sign in to comment.