diff --git a/Cargo.toml b/Cargo.toml index 781eacf7..939b5004 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ tokio = { version = "1.32.0", features = ["macros", "net", "rt", "sync"] } [workspace] resolver = "2" members = [ + "flow-test", "proxy", "tag-generator", "tasks", diff --git a/flow-test/Cargo.toml b/flow-test/Cargo.toml new file mode 100644 index 00000000..da730ca5 --- /dev/null +++ b/flow-test/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "flow-test" +version = "0.1.0" +edition = "2021" +license = "MIT OR Apache-2.0" + +[dependencies] +bstr = { version = "1.9.0", default-features = false } +bytes = "1.5.0" +imap-codec = "2.0.0" +imap-flow = { path = ".." } +imap-types = "2.0.0" +tokio = { version = "1.32.0", features = ["macros", "net", "rt", "time"] } +tracing = "0.1.37" +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } diff --git a/flow-test/README.md b/flow-test/README.md new file mode 100644 index 00000000..f4d96806 --- /dev/null +++ b/flow-test/README.md @@ -0,0 +1,3 @@ +# flow-test + +Test harness for writing lightweight unit tests for `imap-flow`. diff --git a/flow-test/src/client_tester.rs b/flow-test/src/client_tester.rs new file mode 100644 index 00000000..b9f4a2d7 --- /dev/null +++ b/flow-test/src/client_tester.rs @@ -0,0 +1,155 @@ +use std::net::SocketAddr; + +use bstr::ByteSlice; +use imap_flow::{ + client::{ClientFlow, ClientFlowError, ClientFlowEvent, ClientFlowOptions}, + stream::AnyStream, +}; +use imap_types::bounded_static::ToBoundedStatic; +use tokio::net::TcpStream; +use tracing::trace; + +use crate::codecs::Codecs; + +/// A wrapper for `ClientFlow` suitable for testing. +pub struct ClientTester { + codecs: Codecs, + client_flow_options: ClientFlowOptions, + connection_state: ConnectionState, +} + +impl ClientTester { + pub async fn new( + codecs: Codecs, + client_flow_options: ClientFlowOptions, + server_address: SocketAddr, + ) -> Self { + let stream = TcpStream::connect(server_address).await.unwrap(); + trace!(?server_address, "Client is connected"); + Self { + codecs, + client_flow_options, + connection_state: ConnectionState::Connected { stream }, + } + } + + pub async fn receive_greeting(&mut self, expected_bytes: &[u8]) { + let expected_greeting = self.codecs.decode_greeting(expected_bytes); + match self.connection_state.take() { + ConnectionState::Connected { stream } => { + let stream = AnyStream::new(stream); + let (client, greeting) = + ClientFlow::receive_greeting(stream, self.client_flow_options.clone()) + .await + .unwrap(); + assert_eq!(expected_greeting, greeting); + self.connection_state = ConnectionState::Greeted { client }; + } + ConnectionState::Greeted { .. } => { + panic!("Client is already greeted"); + } + ConnectionState::Disconnected => { + panic!("Client is already disconnected"); + } + } + } + + pub async fn send_command(&mut self, bytes: &[u8]) { + let enqueued_command = self.codecs.decode_command_normalized(bytes); + let client = self.connection_state.greeted(); + let enqueued_handle = client.enqueue_command(enqueued_command.to_static()); + let event = client.progress().await.unwrap(); + match event { + ClientFlowEvent::CommandSent { handle, command } => { + assert_eq!(enqueued_handle, handle); + assert_eq!(enqueued_command, command); + } + event => { + panic!("Client emitted unexpected event: {event:?}"); + } + } + } + + pub async fn receive_data(&mut self, expected_bytes: &[u8]) { + let expected_data = self.codecs.decode_data(expected_bytes); + let client = self.connection_state.greeted(); + match client.progress().await.unwrap() { + ClientFlowEvent::DataReceived { data } => { + assert_eq!(expected_data, data); + } + event => { + panic!("Client emitted unexpected event: {event:?}"); + } + } + } + + pub async fn receive_status(&mut self, expected_bytes: &[u8]) { + let expected_status = self.codecs.decode_status(expected_bytes); + let client = self.connection_state.greeted(); + match client.progress().await.unwrap() { + ClientFlowEvent::StatusReceived { status } => { + assert_eq!(expected_status, status); + } + event => { + panic!("Client emitted unexpected event: {event:?}"); + } + } + } + + pub async fn receive_error_because_malformed_message(&mut self, expected_bytes: &[u8]) { + let error = match self.connection_state.take() { + ConnectionState::Connected { stream } => { + let stream = AnyStream::new(stream); + ClientFlow::receive_greeting(stream, self.client_flow_options.clone()) + .await + .unwrap_err() + } + ConnectionState::Greeted { mut client } => { + let error = client.progress().await.unwrap_err(); + self.connection_state = ConnectionState::Greeted { client }; + error + } + ConnectionState::Disconnected => { + panic!("Client is already disconnected") + } + }; + match error { + ClientFlowError::MalformedMessage { discarded_bytes } => { + assert_eq!(expected_bytes.as_bstr(), discarded_bytes.as_bstr()); + } + error => { + panic!("Client emitted unexpected error: {error:?}"); + } + } + } +} + +/// The current state of the connection between client and server. +#[allow(clippy::large_enum_variant)] +enum ConnectionState { + /// The client has established a TCP connection to the server. + Connected { stream: TcpStream }, + /// The client was greeted by the server. + Greeted { client: ClientFlow }, + /// The TCP connection between client and server was dropped. + Disconnected, +} + +impl ConnectionState { + /// Assumes that the client was already greeted by the server and returns the `ClientFlow`. + fn greeted(&mut self) -> &mut ClientFlow { + match self { + ConnectionState::Connected { .. } => { + panic!("Client is not greeted yet"); + } + ConnectionState::Greeted { client } => client, + ConnectionState::Disconnected => { + panic!("Client is already disconnected"); + } + } + } + + fn take(&mut self) -> ConnectionState { + std::mem::replace(self, ConnectionState::Disconnected) + } +} diff --git a/flow-test/src/codecs.rs b/flow-test/src/codecs.rs new file mode 100644 index 00000000..b95b0bfd --- /dev/null +++ b/flow-test/src/codecs.rs @@ -0,0 +1,178 @@ +use bstr::ByteSlice; +use imap_codec::{ + decode::Decoder, encode::Encoder, AuthenticateDataCodec, CommandCodec, GreetingCodec, + IdleDoneCodec, ResponseCodec, +}; +use imap_types::{ + command::Command, + response::{Data, Greeting, Response, Status}, +}; + +/// Contains all codecs from `imap-codec`. +#[derive(Clone, Debug, Default, PartialEq)] +#[non_exhaustive] +pub struct Codecs { + pub greeting_codec: GreetingCodec, + pub command_codec: CommandCodec, + pub response_codec: ResponseCodec, + pub authenticate_data_codec: AuthenticateDataCodec, + pub idle_done_codec: IdleDoneCodec, +} + +impl Codecs { + pub fn encode_greeting(&self, greeting: &Greeting) -> Vec { + self.greeting_codec.encode(greeting).dump() + } + + pub fn encode_command(&self, command: &Command) -> Vec { + self.command_codec.encode(command).dump() + } + + pub fn encode_response(&self, response: &Response) -> Vec { + self.response_codec.encode(response).dump() + } + + pub fn encode_data(&self, data: &Data) -> Vec { + self.response_codec + .encode(&Response::Data(data.clone())) + .dump() + } + + pub fn encode_status(&self, status: &Status) -> Vec { + self.response_codec + .encode(&Response::Status(status.clone())) + .dump() + } + + pub fn decode_greeting<'a>(&self, bytes: &'a [u8]) -> Greeting<'a> { + match self.greeting_codec.decode(bytes) { + Ok((rem, greeting)) => { + if !rem.is_empty() { + panic!( + "Expected single greeting but there are remaining bytes {:?}", + rem.as_bstr() + ) + } + greeting + } + Err(err) => { + panic!( + "Got error {:?} when parsing greeting from bytes {:?}", + err, + bytes.as_bstr() + ) + } + } + } + + pub fn decode_command<'a>(&self, bytes: &'a [u8]) -> Command<'a> { + match self.command_codec.decode(bytes) { + Ok((rem, command)) => { + if !rem.is_empty() { + panic!( + "Expected single command but there are remaining bytes {:?}", + rem.as_bstr() + ) + } + command + } + Err(err) => { + panic!( + "Got error {:?} when parsing command from bytes {:?}", + err, + bytes.as_bstr() + ) + } + } + } + + pub fn decode_response<'a>(&self, bytes: &'a [u8]) -> Response<'a> { + match self.response_codec.decode(bytes) { + Ok((rem, response)) => { + if !rem.is_empty() { + panic!( + "Expected single response but there are remaining bytes {:?}", + rem.as_bstr() + ) + } + response + } + Err(err) => { + panic!( + "Got error {:?} when parsing response bytes {:?}", + err, + bytes.as_bstr() + ) + } + } + } + + pub fn decode_data<'a>(&self, bytes: &'a [u8]) -> Data<'a> { + let Response::Data(expected_data) = self.decode_response(bytes) else { + panic!("Got wrong response type when parsing data from {bytes:?}") + }; + expected_data + } + + pub fn decode_status<'a>(&self, bytes: &'a [u8]) -> Status<'a> { + let Response::Status(expected_status) = self.decode_response(bytes) else { + panic!("Got wrong response type when parsing status from {bytes:?}") + }; + expected_status + } + + pub fn decode_greeting_normalized<'a>(&self, bytes: &'a [u8]) -> Greeting<'a> { + let greeting = self.decode_greeting(bytes); + let normalized_bytes = self.encode_greeting(&greeting); + assert_eq!( + normalized_bytes.as_bstr(), + bytes.as_bstr(), + "Bytes must contain a normalized greeting" + ); + greeting + } + + pub fn decode_command_normalized<'a>(&self, bytes: &'a [u8]) -> Command<'a> { + let command = self.decode_command(bytes); + let normalized_bytes = self.encode_command(&command); + assert_eq!( + normalized_bytes.as_bstr(), + bytes.as_bstr(), + "Bytes must contain a normalized command" + ); + command + } + + pub fn decode_response_normalized<'a>(&self, bytes: &'a [u8]) -> Response<'a> { + let response = self.decode_response(bytes); + let normalized_bytes = self.encode_response(&response); + assert_eq!( + normalized_bytes.as_bstr(), + bytes.as_bstr(), + "Bytes must contain a normalized response" + ); + response + } + + pub fn decode_data_normalized<'a>(&self, bytes: &'a [u8]) -> Data<'a> { + let data = self.decode_data(bytes); + let normalized_bytes = self.encode_data(&data); + assert_eq!( + normalized_bytes.as_bstr(), + bytes.as_bstr(), + "Bytes must contain a normalized data" + ); + data + } + + pub fn decode_status_normalized<'a>(&self, bytes: &'a [u8]) -> Status<'a> { + let status = self.decode_status(bytes); + let normalized_bytes = self.encode_status(&status); + assert_eq!( + normalized_bytes.as_bstr(), + bytes.as_bstr(), + "Bytes must contain a normalized status" + ); + status + } +} diff --git a/flow-test/src/lib.rs b/flow-test/src/lib.rs new file mode 100644 index 00000000..7588b43f --- /dev/null +++ b/flow-test/src/lib.rs @@ -0,0 +1,6 @@ +pub mod client_tester; +pub mod codecs; +pub mod mock; +pub mod runtime; +pub mod server_tester; +pub mod test_setup; diff --git a/flow-test/src/mock.rs b/flow-test/src/mock.rs new file mode 100644 index 00000000..19a1c038 --- /dev/null +++ b/flow-test/src/mock.rs @@ -0,0 +1,84 @@ +use std::net::SocketAddr; + +use bstr::{BStr, ByteSlice}; +use bytes::{Buf, BytesMut}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, +}; +use tracing::trace; + +/// Mocks either the server or client. +/// +/// This mock doesn't know any IMAP semantics. Instead it provides direct access to the +/// TCP connection. Therefore the correctness of the test depends on the correctness +/// of the test data. +pub struct Mock { + role: Role, + stream: TcpStream, + read_buffer: BytesMut, +} + +impl Mock { + pub async fn server(server_listener: TcpListener) -> Self { + let role = Role::Server; + let (stream, client_address) = server_listener.accept().await.unwrap(); + trace!(?role, ?client_address, "Mock accepts connection"); + Self { + role, + stream, + read_buffer: BytesMut::default(), + } + } + + pub async fn client(server_address: SocketAddr) -> Self { + let role = Role::Client; + let stream = TcpStream::connect(server_address).await.unwrap(); + trace!(?role, ?server_address, "Mock is connected"); + Self { + role, + stream, + read_buffer: BytesMut::default(), + } + } + + pub async fn send(&mut self, bytes: &[u8]) { + trace!( + role = ?self.role, + bytes = ?BStr::new(bytes), + "Mock writes bytes" + ); + self.stream.write_all(bytes).await.unwrap(); + } + + pub async fn receive(&mut self, expected_bytes: &[u8]) { + loop { + let bytes = &self.read_buffer[..]; + trace!( + role = ?self.role, + read_bytes = ?BStr::new(bytes), + "Mock reads bytes" + ); + + if bytes.len() < expected_bytes.len() { + assert_eq!(expected_bytes[..bytes.len()].as_bstr(), bytes.as_bstr()); + + self.stream.read_buf(&mut self.read_buffer).await.unwrap(); + } else { + assert_eq!( + expected_bytes.as_bstr(), + bytes[..expected_bytes.len()].as_bstr() + ); + + self.read_buffer.advance(expected_bytes.len()); + break; + } + } + } +} + +#[derive(Debug)] +enum Role { + Server, + Client, +} diff --git a/flow-test/src/runtime.rs b/flow-test/src/runtime.rs new file mode 100644 index 00000000..537b6191 --- /dev/null +++ b/flow-test/src/runtime.rs @@ -0,0 +1,63 @@ +use std::{future::Future, time::Duration}; + +use tokio::{join, runtime, time::sleep}; + +/// Options for creating an instance of `Runtime`. +#[derive(Clone, Debug, PartialEq)] +#[non_exhaustive] +pub struct RuntimeOptions { + pub timeout: Option, +} + +impl Default for RuntimeOptions { + fn default() -> Self { + Self { + timeout: Some(Duration::from_secs(1)), + } + } +} + +/// Allows to execute one or more `Future`s by blocking the current thread. +/// +/// We prefer to have single-threaded unit tests because it makes debugging easier. +/// This runtime allows us to execute server and client tasks in parallel on the same +/// thread the test is executed on. +pub struct Runtime { + timeout: Option, + rt: runtime::Runtime, +} + +impl Runtime { + pub fn new(runtime_options: RuntimeOptions) -> Self { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .enable_io() + .build() + .unwrap(); + + Runtime { + timeout: runtime_options.timeout, + rt, + } + } + + pub fn run(&self, future: impl Future) -> T { + match self.timeout { + None => self.rt.block_on(future), + Some(timeout) => self.rt.block_on(async { + tokio::select! { + output = future => output, + () = sleep(timeout) => panic!("Timeout reached"), + } + }), + } + } + + pub fn run2( + &self, + future1: impl Future, + future2: impl Future, + ) -> (T1, T2) { + self.run(async { join!(future1, future2) }) + } +} diff --git a/flow-test/src/server_tester.rs b/flow-test/src/server_tester.rs new file mode 100644 index 00000000..a7aa69bf --- /dev/null +++ b/flow-test/src/server_tester.rs @@ -0,0 +1,145 @@ +use bstr::ByteSlice; +use imap_flow::{ + server::{ServerFlow, ServerFlowError, ServerFlowEvent, ServerFlowOptions}, + stream::AnyStream, +}; +use imap_types::{bounded_static::ToBoundedStatic, response::Response}; +use tokio::net::{TcpListener, TcpStream}; +use tracing::trace; + +use crate::codecs::Codecs; + +/// A wrapper for `ServerFlow` suitable for testing. +pub struct ServerTester { + codecs: Codecs, + server_flow_options: ServerFlowOptions, + connection_state: ConnectionState, +} + +impl ServerTester { + pub async fn new( + codecs: Codecs, + server_flow_options: ServerFlowOptions, + server_listener: TcpListener, + ) -> Self { + let (stream, client_address) = server_listener.accept().await.unwrap(); + trace!(?client_address, "Server accepts connection"); + Self { + codecs, + server_flow_options, + connection_state: ConnectionState::Connected { stream }, + } + } + + pub async fn send_greeting(&mut self, bytes: &[u8]) { + let enqueued_greeting = self.codecs.decode_greeting_normalized(bytes); + match self.connection_state.take() { + ConnectionState::Connected { stream } => { + let stream = AnyStream::new(stream); + let (server, greeting) = ServerFlow::send_greeting( + stream, + self.server_flow_options.clone(), + enqueued_greeting.to_static(), + ) + .await + .unwrap(); + assert_eq!(enqueued_greeting, greeting); + self.connection_state = ConnectionState::Greeted { server }; + } + ConnectionState::Greeted { .. } => { + panic!("Server has already greeted"); + } + ConnectionState::Disconnected => { + panic!("Server is already disconnected"); + } + } + } + + pub async fn receive_command(&mut self, expected_bytes: &[u8]) { + let expected_command = self.codecs.decode_command(expected_bytes); + let server = self.connection_state.greeted(); + match server.progress().await.unwrap() { + ServerFlowEvent::CommandReceived { command } => { + assert_eq!(expected_command, command); + } + event => { + panic!("Server emitted unexpected event: {event:?}"); + } + } + } + + pub async fn send_data(&mut self, bytes: &[u8]) { + let enqueued_data = self.codecs.decode_data_normalized(bytes); + let server = self.connection_state.greeted(); + let enqueued_handle = server.enqueue_data(enqueued_data.to_static()); + let event = server.progress().await.unwrap(); + match event { + ServerFlowEvent::ResponseSent { handle, response } => { + assert_eq!(enqueued_handle, handle); + assert_eq!(Response::Data(enqueued_data), response); + } + event => { + panic!("Server has unexpected event: {event:?}"); + } + } + } + + pub async fn send_status(&mut self, bytes: &[u8]) { + let enqueued_status = self.codecs.decode_status_normalized(bytes); + let server = self.connection_state.greeted(); + let enqueued_handle = server.enqueue_status(enqueued_status.to_static()); + let event = server.progress().await.unwrap(); + match event { + ServerFlowEvent::ResponseSent { handle, response } => { + assert_eq!(enqueued_handle, handle); + assert_eq!(Response::Status(enqueued_status), response); + } + event => { + panic!("Server has unexpected event: {event:?}"); + } + } + } + + pub async fn receive_error_because_malformed_message(&mut self, expected_bytes: &[u8]) { + let server = self.connection_state.greeted(); + let error = server.progress().await.unwrap_err(); + match error { + ServerFlowError::MalformedMessage { discarded_bytes } => { + assert_eq!(expected_bytes.as_bstr(), discarded_bytes.as_bstr()); + } + error => { + panic!("Server has unexpected error: {error:?}"); + } + } + } +} + +/// The current state of the connection between server and client. +#[allow(clippy::large_enum_variant)] +enum ConnectionState { + // The server has established a TCP connection to the client. + Connected { stream: TcpStream }, + // The server has greeted the client. + Greeted { server: ServerFlow }, + // The TCP connection between server and client was dropped. + Disconnected, +} + +impl ConnectionState { + /// Assumes that the server has already greeted the client and returns the `ServerFlow`. + fn greeted(&mut self) -> &mut ServerFlow { + match self { + ConnectionState::Connected { .. } => { + panic!("Server has not greeted yet"); + } + ConnectionState::Greeted { server } => server, + ConnectionState::Disconnected => { + panic!("Server is already disconnected"); + } + } + } + + fn take(&mut self) -> ConnectionState { + std::mem::replace(self, ConnectionState::Disconnected) + } +} diff --git a/flow-test/src/test_setup.rs b/flow-test/src/test_setup.rs new file mode 100644 index 00000000..95cff356 --- /dev/null +++ b/flow-test/src/test_setup.rs @@ -0,0 +1,119 @@ +use std::net::SocketAddr; + +use imap_flow::{client::ClientFlowOptions, server::ServerFlowOptions}; +use tokio::net::TcpListener; +use tracing::trace; +use tracing_subscriber::EnvFilter; + +use crate::{ + client_tester::ClientTester, + codecs::Codecs, + mock::Mock, + runtime::{Runtime, RuntimeOptions}, + server_tester::ServerTester, +}; + +/// Contains all parameters for creating a test setup for the server or client side +/// of `imap-flow`. +#[derive(Clone, Debug, PartialEq)] +#[non_exhaustive] +pub struct TestSetup { + pub codecs: Codecs, + pub server_flow_options: ServerFlowOptions, + pub client_flow_options: ClientFlowOptions, + pub runtime_options: RuntimeOptions, + pub init_logging: bool, +} + +impl Default for TestSetup { + fn default() -> Self { + Self { + codecs: Codecs::default(), + server_flow_options: ServerFlowOptions::default(), + client_flow_options: ClientFlowOptions::default(), + runtime_options: RuntimeOptions::default(), + init_logging: true, + } + } +} + +impl TestSetup { + /// Create a test setup to test the client side (mocking the server side). + pub fn setup_client(self) -> (Runtime, Mock, ClientTester) { + if self.init_logging { + init_logging(); + } + + let rt = Runtime::new(self.runtime_options); + + let (server_listener, server_address) = rt.run(bind_address()); + + let (server, client) = rt.run2( + Mock::server(server_listener), + ClientTester::new(self.codecs, self.client_flow_options, server_address), + ); + + (rt, server, client) + } + + /// Create a test setup to test the server side (mocking the client side). + pub fn setup_server(self) -> (Runtime, ServerTester, Mock) { + if self.init_logging { + init_logging(); + } + + let rt = Runtime::new(self.runtime_options); + + let (server_listener, server_address) = rt.run(bind_address()); + + let (server, client) = rt.run2( + ServerTester::new(self.codecs, self.server_flow_options, server_listener), + Mock::client(server_address), + ); + + (rt, server, client) + } + + /// Create a test setup to test the server side and the client side. + pub fn setup(self) -> (Runtime, ServerTester, ClientTester) { + if self.init_logging { + init_logging(); + } + + let rt = Runtime::new(self.runtime_options); + + let (server_listener, server_address) = rt.run(bind_address()); + + let (server, client) = rt.run2( + ServerTester::new( + self.codecs.clone(), + self.server_flow_options, + server_listener, + ), + ClientTester::new(self.codecs, self.client_flow_options, server_address), + ); + + (rt, server, client) + } +} + +fn init_logging() { + let builder = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .with_target(false) + .with_file(false) + .with_line_number(false) + .without_time(); + + // We use `try_init` because multiple tests might try to initialize the logging + let _result = builder.try_init(); +} + +async fn bind_address() -> (TcpListener, SocketAddr) { + // If we use port 0 the OS will assign us a free port. This is useful because + // we want to run many tests in parallel and two tests must not use the same port. + let server_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let server_address = server_listener.local_addr().unwrap(); + trace!(?server_address, "Bound to address"); + (server_listener, server_address) +} diff --git a/flow-test/tests/both.rs b/flow-test/tests/both.rs new file mode 100644 index 00000000..eb9c6036 --- /dev/null +++ b/flow-test/tests/both.rs @@ -0,0 +1,18 @@ +use flow_test::test_setup::TestSetup; + +#[test] +fn noop() { + let (rt, mut server, mut client) = TestSetup::default().setup(); + + let greeting = b"* OK ...\r\n"; + rt.run2( + server.send_greeting(greeting), + client.receive_greeting(greeting), + ); + + let noop = b"A1 NOOP\r\n"; + rt.run2(client.send_command(noop), server.receive_command(noop)); + + let status = b"A1 OK ...\r\n"; + rt.run2(server.send_status(status), client.receive_status(status)); +} diff --git a/flow-test/tests/client.rs b/flow-test/tests/client.rs new file mode 100644 index 00000000..bb5eddbe --- /dev/null +++ b/flow-test/tests/client.rs @@ -0,0 +1,71 @@ +use std::time::Duration; + +use flow_test::test_setup::TestSetup; + +#[test] +fn noop() { + let (rt, mut server, mut client) = TestSetup::default().setup_client(); + + let greeting = b"* OK ...\r\n"; + rt.run2(server.send(greeting), client.receive_greeting(greeting)); + + let noop = b"A1 NOOP\r\n"; + rt.run2(client.send_command(noop), server.receive(noop)); + + let status = b"A1 OK ...\r\n"; + rt.run2(server.send(status), client.receive_status(status)); +} + +#[test] +fn noop_with_large_lines() { + let mut setup = TestSetup::default(); + // Sending large messages takes some time, especially when running on a slow CI. + setup.runtime_options.timeout = Some(Duration::from_secs(10)); + + let (rt, mut server, mut client) = setup.setup_client(); + + // This number seems to be larger than the TCP buffer, so server/client must + // send/receive in parallel to prevent a dead lock. + const LARGE: usize = 10 * 1024 * 1024; + + let greeting = &mut b"* OK ".to_vec(); + greeting.extend(vec![b'.'; LARGE]); + greeting.extend(b"\r\n"); + rt.run2(server.send(&greeting), client.receive_greeting(&greeting)); + + let noop = b"A1 NOOP\r\n"; + rt.run2(client.send_command(noop), server.receive(noop)); + + let status = &mut b"A1 OK ".to_vec(); + status.extend(vec![b'.'; LARGE]); + status.extend(b"\r\n"); + rt.run2(server.send(status), client.receive_status(status)); +} + +#[test] +fn gibberish_instead_of_greeting() { + let (rt, mut server, mut client) = TestSetup::default().setup_client(); + + let gibberish = b"I like bananas\r\n"; + rt.run2( + server.send(gibberish), + client.receive_error_because_malformed_message(gibberish), + ); +} + +#[test] +fn gibberish_instead_of_response() { + let (rt, mut server, mut client) = TestSetup::default().setup_client(); + + let greeting = b"* OK ...\r\n"; + rt.run2(server.send(greeting), client.receive_greeting(greeting)); + + let noop = b"A1 NOOP\r\n"; + rt.run2(client.send_command(noop), server.receive(noop)); + + let gibberish = b"I like bananas\r\n"; + rt.run2( + server.send(gibberish), + client.receive_error_because_malformed_message(gibberish), + ); +} diff --git a/flow-test/tests/server.rs b/flow-test/tests/server.rs new file mode 100644 index 00000000..1b456dae --- /dev/null +++ b/flow-test/tests/server.rs @@ -0,0 +1,57 @@ +use std::time::Duration; + +use flow_test::test_setup::TestSetup; + +#[test] +fn noop() { + let (rt, mut server, mut client) = TestSetup::default().setup_server(); + + let greeting = b"* OK ...\r\n"; + rt.run2(server.send_greeting(greeting), client.receive(greeting)); + + let noop = b"A1 NOOP\r\n"; + rt.run2(client.send(noop), server.receive_command(noop)); + + let status = b"A1 OK ...\r\n"; + rt.run2(server.send_status(status), client.receive(status)); +} + +#[test] +fn noop_with_large_lines() { + let mut setup = TestSetup::default(); + // Sending large messages takes some time, especially when running on a slow CI. + setup.runtime_options.timeout = Some(Duration::from_secs(10)); + + let (rt, mut server, mut client) = setup.setup_server(); + + // This number seems to be larger than the TCP buffer, so server/client must + // send/receive in parallel to prevent a dead lock. + const LARGE: usize = 10 * 1024 * 1024; + + let greeting = &mut b"* OK ".to_vec(); + greeting.extend(vec![b'.'; LARGE]); + greeting.extend(b"\r\n"); + rt.run2(server.send_greeting(&greeting), client.receive(&greeting)); + + let noop = b"A1 NOOP\r\n"; + rt.run2(client.send(noop), server.receive_command(noop)); + + let status = &mut b"A1 OK ".to_vec(); + status.extend(vec![b'.'; LARGE]); + status.extend(b"\r\n"); + rt.run2(server.send_status(status), client.receive(status)); +} + +#[test] +fn gibberish_instead_of_command() { + let (rt, mut server, mut client) = TestSetup::default().setup_server(); + + let greeting = b"* OK ...\r\n"; + rt.run2(server.send_greeting(greeting), client.receive(greeting)); + + let gibberish = b"I like bananas\r\n"; + rt.run2( + client.send(gibberish), + server.receive_error_because_malformed_message(gibberish), + ); +}