Skip to content

Commit

Permalink
feature(proxy): Log fragments
Browse files Browse the repository at this point in the history
  • Loading branch information
jakoschiko committed Jul 12, 2024
1 parent 829617a commit b8e4ca6
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 17 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
anyhow = "1.0.86"
argh = "0.1.12"
byte_string = "1.0.0"
colored = "2.1.0"
imap-codec = { version = "2.0.0-alpha.2", features = ["quirk_crlf_relaxed", "ext_id"] }
imap-next = { path = ".." }
Expand Down
177 changes: 160 additions & 17 deletions proxy/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{net::SocketAddr, sync::Arc};

use colored::Colorize;
use byte_string::ByteStr;
use colored::{Color, ColoredString, Colorize};
use imap_codec::fragmentizer::{FragmentInfo, Fragmentizer, MaxMessageSize};
use imap_next::{
client::{self, Client},
imap_types::{
Expand All @@ -11,6 +13,7 @@ use imap_next::{
},
server::{self, Server},
stream::{self, Stream},
Interrupt, Io,
};
use once_cell::sync::Lazy;
use thiserror::Error;
Expand All @@ -19,7 +22,7 @@ use tokio_rustls::{
rustls::{pki_types::ServerName, ClientConfig, RootCertStore, ServerConfig},
TlsAcceptor, TlsConnector,
};
use tracing::{error, info, trace};
use tracing::{error, info, instrument, trace};

use crate::{
config::{Bind, Connect, Identity, Service},
Expand All @@ -40,6 +43,10 @@ const LITERAL_ACCEPT_TEXT: &str = "proxy: Literal accepted by proxy";
const LITERAL_REJECT_TEXT: &str = "proxy: Literal rejected by proxy";
const COMMAND_REJECTED_TEXT: &str = "proxy: Command rejected by server";

const LAYER_TRANSPORT: &str = "transport";
const LAYER_FRAGMENT: &str = "fragment";
const LAYER_MESSAGE: &str = "message";

#[derive(Debug, Error)]
pub enum ProxyError {
#[error(transparent)]
Expand All @@ -64,6 +71,7 @@ pub struct BoundState {
impl State for BoundState {}

impl Proxy<BoundState> {
#[instrument(name = "event", fields(layer = LAYER_TRANSPORT), skip_all)]
pub async fn bind(service: Service) -> Result<Self, ProxyError> {
// Accept arbitrary number of connections.
let bind_addr_port = service.bind.addr_port();
Expand All @@ -76,6 +84,7 @@ impl Proxy<BoundState> {
})
}

#[instrument(name = "event", fields(layer = LAYER_TRANSPORT), skip_all)]
pub async fn accept_client(&self) -> Result<Proxy<ClientAcceptedState>, ProxyError> {
let (client_to_proxy, client_addr) = self.state.listener.accept().await?;
info!(?client_addr, "Accepted client");
Expand Down Expand Up @@ -138,6 +147,7 @@ impl Proxy<ClientAcceptedState> {
self.state.client_addr
}

#[instrument(name = "event", fields(layer = LAYER_TRANSPORT), skip_all)]
pub async fn connect_to_server(self) -> Result<Proxy<ConnectedState>, ProxyError> {
let server_addr_port = self.service.connect.addr_port();
info!(?server_addr_port, "Connecting to server");
Expand Down Expand Up @@ -188,7 +198,8 @@ impl Proxy<ConnectedState> {
let mut proxy_to_server = {
// TODO(#144): Read options from config
let options = client::Options::default();
Client::new(options)
let client = Client::new(options);
ImapState::client(client)
};
let mut proxy_to_server_stream = self.state.proxy_to_server;
let stream_event = proxy_to_server_stream.next(&mut proxy_to_server).await;
Expand All @@ -210,7 +221,8 @@ impl Proxy<ConnectedState> {
options
.set_literal_reject_text(LITERAL_REJECT_TEXT.to_string())
.unwrap();
Server::new(options, greeting)
let server = Server::new(options, greeting);
ImapState::server(server)
};
let mut client_to_proxy_stream = self.state.client_to_proxy;

Expand All @@ -233,6 +245,7 @@ impl Proxy<ConnectedState> {
}
}

#[instrument(name = "event", fields(layer = LAYER_TRANSPORT), skip_all)]
fn handle_stream_event<T, E>(
role: &'static str,
stream_event: Result<T, stream::Error<E>>,
Expand All @@ -255,6 +268,7 @@ fn handle_stream_event<T, E>(
}
}

#[instrument(name = "event", fields(layer = LAYER_MESSAGE), skip_all)]
fn handle_initial_server_event(
server_event: Result<client::Event, client::Error>,
) -> Option<Greeting<'static>> {
Expand All @@ -274,9 +288,10 @@ fn handle_initial_server_event(
}
}

#[instrument(name = "event", fields(layer = LAYER_MESSAGE), skip_all)]
fn handle_client_event(
client_event: Result<server::Event, server::Error>,
proxy_to_server: &mut Client,
proxy_to_server: &mut ImapState<Client>,
) {
let event = match client_event {
Ok(event) => event,
Expand Down Expand Up @@ -309,7 +324,7 @@ fn handle_client_event(
server::Event::CommandReceived { command } => {
trace!(role = "c2p", command=%format!("{:?}", command).red(), "|-->");

let handle = proxy_to_server.enqueue_command(command);
let handle = proxy_to_server.state.enqueue_command(command);
trace!(role = "p2s", ?handle, "enqueue_command");
}
server::Event::CommandAuthenticateReceived {
Expand All @@ -323,7 +338,7 @@ fn handle_client_event(
"|-->"
);

let handle = proxy_to_server.enqueue_command(command_authenticate);
let handle = proxy_to_server.state.enqueue_command(command_authenticate);
trace!(role = "p2s", ?handle, "enqueue_command");
}
server::Event::AuthenticateDataReceived { authenticate_data } => {
Expand All @@ -335,6 +350,7 @@ fn handle_client_event(

// TODO(#145): Fix unwrap
let handle = proxy_to_server
.state
.set_authenticate_data(authenticate_data)
.unwrap();
trace!(role = "p2s", ?handle, "set_authenticate_data");
Expand All @@ -347,21 +363,22 @@ fn handle_client_event(

trace!(role = "c2p", idle=%format!("{:?}", idle).red(), "|-->");

let handle = proxy_to_server.enqueue_command(idle);
let handle = proxy_to_server.state.enqueue_command(idle);
trace!(role = "p2s", ?handle, "enqueue_command");
}
server::Event::IdleDoneReceived => {
trace!(role = "c2p", done=%format!("{:?}", IdleDone).red(), "|-->");

let handle = proxy_to_server.set_idle_done();
let handle = proxy_to_server.state.set_idle_done();
trace!(role = "p2s", ?handle, "set_idle_done");
}
}
}

#[instrument(name = "event", fields(layer = LAYER_MESSAGE), skip_all)]
fn handle_server_event(
server_event: Result<client::Event, client::Error>,
client_to_proxy: &mut Server,
client_to_proxy: &mut ImapState<Server>,
) {
let event = match server_event {
Ok(event) => event,
Expand Down Expand Up @@ -409,7 +426,9 @@ fn handle_server_event(
Status::bad(Some(command.tag), None, COMMAND_REJECTED_TEXT).unwrap()
}
};
let handle = client_to_proxy.enqueue_status(modified_status.clone());
let handle = client_to_proxy
.state
.enqueue_status(modified_status.clone());
trace!(
role = "p2c",
?handle,
Expand All @@ -431,6 +450,7 @@ fn handle_server_event(
);

let handle = client_to_proxy
.state
.authenticate_continue(continuation_request)
.unwrap();
trace!(role = "p2c", ?handle, "authenticate_continue");
Expand All @@ -439,23 +459,23 @@ fn handle_server_event(
trace!(role = "s2p", authenticate_status=%format!("{:?}", status).blue(), "<--|");

// TODO(#145): Fix unwrap
let handle = client_to_proxy.authenticate_finish(status).unwrap();
let handle = client_to_proxy.state.authenticate_finish(status).unwrap();
trace!(role = "p2c", ?handle, "authenticate_finish");
}
client::Event::DataReceived { mut data } => {
trace!(role = "s2p", data=%format!("{:?}", data).blue(), "<--|");

util::filter_capabilities_in_data(&mut data);

let handle = client_to_proxy.enqueue_data(data);
let handle = client_to_proxy.state.enqueue_data(data);
trace!(role = "p2c", ?handle, "enqueue_data");
}
client::Event::StatusReceived { mut status } => {
trace!(role = "s2p", status=%format!("{:?}", status).blue(), "<--|");

util::filter_capabilities_in_status(&mut status);

let handle = client_to_proxy.enqueue_status(status);
let handle = client_to_proxy.state.enqueue_status(status);
trace!(role = "p2c", ?handle, "enqueue_status");
}
client::Event::ContinuationRequestReceived {
Expand All @@ -469,7 +489,9 @@ fn handle_server_event(

util::filter_capabilities_in_continuation(&mut continuation_request);

let handle = client_to_proxy.enqueue_continuation_request(continuation_request);
let handle = client_to_proxy
.state
.enqueue_continuation_request(continuation_request);
trace!(role = "p2c", ?handle, "enqueue_continuation_request");
}
client::Event::IdleCommandSent { handle } => {
Expand All @@ -487,7 +509,10 @@ fn handle_server_event(
);

// TODO(#145): Fix unwrap
let handle = client_to_proxy.idle_accept(continuation_request).unwrap();
let handle = client_to_proxy
.state
.idle_accept(continuation_request)
.unwrap();
trace!(role = "p2c", ?handle, "idle_accept");
}
client::Event::IdleRejected { handle, status } => {
Expand All @@ -499,11 +524,129 @@ fn handle_server_event(
);

// TODO(#145): Fix unwrap
let handle = client_to_proxy.idle_reject(status).unwrap();
let handle = client_to_proxy.state.idle_reject(status).unwrap();
trace!(role = "p2c", ?handle, "idle_reject");
}
client::Event::IdleDoneSent { handle } => {
trace!(role = "p2s", ?handle, "--->");
}
}
}

struct ImapState<S> {
input_role: &'static str,
input_direction: &'static str,
input_color: Option<Color>,
input_fragmentizer: Fragmentizer,
output_role: &'static str,
output_direction: &'static str,
output_color: Option<Color>,
output_fragmentizer: Fragmentizer,
state: S,
}

impl ImapState<Client> {
pub fn client(state: Client) -> Self {
Self {
input_role: "s2p",
input_direction: "<--|",
input_color: Some(Color::Blue),
input_fragmentizer: Fragmentizer::new(MaxMessageSize::Unlimited),
output_role: "p2s",
output_direction: "--->",
output_color: None,
output_fragmentizer: Fragmentizer::new(MaxMessageSize::Unlimited),
state,
}
}
}

impl ImapState<Server> {
pub fn server(state: Server) -> Self {
Self {
input_role: "c2p",
input_direction: "|-->",
input_color: Some(Color::Red),
input_fragmentizer: Fragmentizer::new(MaxMessageSize::Unlimited),
output_role: "p2c",
output_direction: "<---",
output_color: None,
output_fragmentizer: Fragmentizer::new(MaxMessageSize::Unlimited),
state,
}
}
}

impl<S: imap_next::State> imap_next::State for ImapState<S> {
type Event = S::Event;
type Error = S::Error;

fn enqueue_input(&mut self, bytes: &[u8]) {
self.input_fragmentizer.enqueue_bytes(bytes);
while let Some(fragment_info) = self.input_fragmentizer.progress() {
handle_fragment_event(
self.input_role,
self.input_direction,
self.input_color,
&self.input_fragmentizer,
fragment_info,
);
}

self.state.enqueue_input(bytes);
}

fn next(&mut self) -> Result<Self::Event, Interrupt<Self::Error>> {
let event = self.state.next();

if let Err(Interrupt::Io(Io::Output(ref output))) = event {
self.output_fragmentizer.enqueue_bytes(output);
while let Some(fragment_info) = self.output_fragmentizer.progress() {
handle_fragment_event(
self.output_role,
self.output_direction,
self.output_color,
&self.output_fragmentizer,
fragment_info,
);
}
};

event
}
}

#[instrument(name = "event", fields(layer = LAYER_FRAGMENT), skip_all)]
fn handle_fragment_event(
role: &'static str,
direction: &'static str,
color: Option<Color>,
fragmentizer: &Fragmentizer,
fragment_info: FragmentInfo,
) {
let fragment_bytes = fragmentizer.fragment_bytes(fragment_info);

match fragment_info {
FragmentInfo::Line { .. } => {
trace!(
role,
line=%maybe_color(format!("{:?}", ByteStr::new(fragment_bytes)), color),
"{direction}"
);
}
FragmentInfo::Literal { .. } => {
trace!(
role,
literal=%maybe_color(format!("{:?}", ByteStr::new(fragment_bytes)), color),
"{direction}"
);
}
};
}

fn maybe_color(string: String, color: Option<Color>) -> ColoredString {
match color {
None => string.normal(),
Some(c) => string.color(c),
}
}

0 comments on commit b8e4ca6

Please sign in to comment.