Skip to content

Commit

Permalink
import refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
LimpidCrypto committed Aug 8, 2023
1 parent 34b7f09 commit 4ef657d
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 49 deletions.
68 changes: 45 additions & 23 deletions src/client/websocket/async_websocket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,40 @@ use core::{
pin::Pin,
task::Poll,
};
use embedded_websocket::{framer_async::Framer, Client, WebSocketClient};
use embedded_websocket::{
framer_async::Framer as EmbeddedWebsocketFramer, Client as EmbeddedWebsocketClient,
WebSocket as EmbeddedWebsocket,
};
use futures::{Sink, Stream};
use rand_core::RngCore;
use url::Url;

#[cfg(feature = "std")]
use tokio::net::TcpStream;
#[cfg(feature = "std")]
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::{
connect_async as tungstenite_connect_async, MaybeTlsStream as TungsteniteMaybeTlsStream,
WebSocketStream as TungsteniteWebsocketStream,
};

// Exports
pub use embedded_websocket::{
framer_async::ReadResult, WebSocketCloseStatusCode as WebsocketCloseStatusCode,
WebSocketOptions as WebsocketOptions, WebSocketSendMessageType as WebsocketSendMessageType,
WebSocketState as WebsocketState,
framer_async::{
FramerError as EmbeddedWebsocketFramerError, ReadResult as EmbeddedWebsocketReadMessageType,
},
Error as EmbeddedWebsocketError, WebSocketCloseStatusCode as EmbeddedWebsocketCloseStatusCode,
WebSocketOptions as EmbeddedWebsocketOptions,
WebSocketSendMessageType as EmbeddedWebsocketSendMessageType,
WebSocketState as EmbeddedWebsocketState,
};

#[cfg(feature = "std")]
pub type AsyncWebsocketClientTungstenite<Status> =
AsyncWebsocketClient<WebSocketStream<MaybeTlsStream<TcpStream>>, Status>;
AsyncWebsocketClient<TungsteniteWebsocketStream<TungsteniteMaybeTlsStream<TcpStream>>, Status>;
pub type AsyncWebsocketClientEmbeddedWebsocketTokio<Rng, Status> =
AsyncWebsocketClient<Framer<Rng, Client>, Status>;
AsyncWebsocketClient<EmbeddedWebsocketFramer<Rng, EmbeddedWebsocketClient>, Status>;
#[cfg(feature = "std")]
pub use tokio_tungstenite::tungstenite::Message;
pub use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;

pub struct WebsocketOpen;
pub struct WebsocketClosed;
Expand Down Expand Up @@ -117,12 +127,21 @@ where
}

#[cfg(feature = "std")]
impl AsyncWebsocketClient<WebSocketStream<MaybeTlsStream<TcpStream>>, WebsocketClosed> {
impl
AsyncWebsocketClient<
TungsteniteWebsocketStream<TungsteniteMaybeTlsStream<TcpStream>>,
WebsocketClosed,
>
{
pub async fn open(
uri: Url,
) -> Result<AsyncWebsocketClient<WebSocketStream<MaybeTlsStream<TcpStream>>, WebsocketOpen>>
{
let (websocket_stream, _) = connect_async(uri).await.unwrap();
) -> Result<
AsyncWebsocketClient<
TungsteniteWebsocketStream<TungsteniteMaybeTlsStream<TcpStream>>,
WebsocketOpen,
>,
> {
let (websocket_stream, _) = tungstenite_connect_async(uri).await.unwrap();

Ok(AsyncWebsocketClient {
inner: websocket_stream,
Expand All @@ -131,22 +150,25 @@ impl AsyncWebsocketClient<WebSocketStream<MaybeTlsStream<TcpStream>>, WebsocketC
}
}

impl<Rng> AsyncWebsocketClient<Framer<Rng, Client>, WebsocketClosed>
impl<Rng>
AsyncWebsocketClient<EmbeddedWebsocketFramer<Rng, EmbeddedWebsocketClient>, WebsocketClosed>
where
Rng: RngCore,
{
pub async fn open<B, E>(
stream: &mut (impl Stream<Item = Result<B, E>> + for<'a> Sink<&'a [u8], Error = E> + Unpin),
buffer: &mut [u8],
rng: Rng,
websocket_options: &WebsocketOptions<'_>,
) -> Result<AsyncWebsocketClient<Framer<Rng, Client>, WebsocketOpen>>
websocket_options: &EmbeddedWebsocketOptions<'_>,
) -> Result<
AsyncWebsocketClient<EmbeddedWebsocketFramer<Rng, EmbeddedWebsocketClient>, WebsocketOpen>,
>
where
B: AsRef<[u8]>,
E: Debug,
{
let websocket = WebSocketClient::new_client(rng);
let mut framer = Framer::new(websocket);
let websocket = EmbeddedWebsocket::<Rng, EmbeddedWebsocketClient>::new_client(rng);
let mut framer = EmbeddedWebsocketFramer::new(websocket);
framer
.connect(stream, buffer, websocket_options)
.await
Expand All @@ -159,13 +181,13 @@ where
}
}

impl<Rng> AsyncWebsocketClient<Framer<Rng, Client>, WebsocketOpen>
impl<Rng> AsyncWebsocketClient<EmbeddedWebsocketFramer<Rng, EmbeddedWebsocketClient>, WebsocketOpen>
where
Rng: RngCore,
{
pub fn encode<E>(
&mut self,
message_type: WebsocketSendMessageType,
message_type: EmbeddedWebsocketSendMessageType,
end_of_message: bool,
from: &[u8],
to: &mut [u8],
Expand All @@ -185,7 +207,7 @@ where
&mut self,
stream: &mut (impl Sink<&'b [u8], Error = E> + Unpin),
stream_buf: &'b mut [u8],
message_type: WebsocketSendMessageType,
message_type: EmbeddedWebsocketSendMessageType,
end_of_message: bool,
frame_buf: &'b [u8],
) -> Result<()>
Expand All @@ -204,7 +226,7 @@ where
&mut self,
stream: &mut (impl Sink<&'b [u8], Error = E> + Unpin),
stream_buf: &'b mut [u8],
close_status: WebsocketCloseStatusCode,
close_status: EmbeddedWebsocketCloseStatusCode,
status_description: Option<&str>,
) -> Result<()>
where
Expand All @@ -222,7 +244,7 @@ where
&'a mut self,
stream: &mut (impl Stream<Item = Result<B, E>> + Sink<&'a [u8], Error = E> + Unpin),
buffer: &'a mut [u8],
) -> Option<Result<ReadResult<'_>>>
) -> Option<Result<EmbeddedWebsocketReadMessageType<'_>>>
where
E: Debug,
{
Expand All @@ -237,7 +259,7 @@ where
&'a mut self,
stream: &mut (impl Stream<Item = Result<B, E>> + Sink<&'a [u8], Error = E> + Unpin),
buffer: &'a mut [u8],
) -> Result<Option<ReadResult<'_>>>
) -> Result<Option<EmbeddedWebsocketReadMessageType<'_>>>
where
E: Debug,
{
Expand Down
23 changes: 12 additions & 11 deletions src/client/websocket/errors.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use super::async_websocket_client::EmbeddedWebsocketFramerError;
use core::fmt::Debug;
use core::str::Utf8Error;
use embedded_websocket::framer_async::FramerError;
use thiserror_no_std::Error;

#[derive(Debug, PartialEq, Eq, Error)]
pub enum WebsocketError<E: Debug> {
#[error("Stream is not connected.")]
NotConnected,

// FramerError
#[error("I/O error: {0:?}")]
Io(E),
Expand All @@ -25,16 +24,18 @@ pub enum WebsocketError<E: Debug> {
RxBufferTooSmall(usize),
}

impl<E: Debug> From<FramerError<E>> for WebsocketError<E> {
fn from(value: FramerError<E>) -> Self {
impl<E: Debug> From<EmbeddedWebsocketFramerError<E>> for WebsocketError<E> {
fn from(value: EmbeddedWebsocketFramerError<E>) -> Self {
match value {
FramerError::Io(e) => WebsocketError::Io(e),
FramerError::FrameTooLarge(e) => WebsocketError::FrameTooLarge(e),
FramerError::Utf8(e) => WebsocketError::Utf8(e),
FramerError::HttpHeader(_) => WebsocketError::HttpHeader,
FramerError::WebSocket(e) => WebsocketError::WebSocket(e),
FramerError::Disconnected => WebsocketError::Disconnected,
FramerError::RxBufferTooSmall(e) => WebsocketError::RxBufferTooSmall(e),
EmbeddedWebsocketFramerError::Io(e) => WebsocketError::Io(e),
EmbeddedWebsocketFramerError::FrameTooLarge(e) => WebsocketError::FrameTooLarge(e),
EmbeddedWebsocketFramerError::Utf8(e) => WebsocketError::Utf8(e),
EmbeddedWebsocketFramerError::HttpHeader(_) => WebsocketError::HttpHeader,
EmbeddedWebsocketFramerError::WebSocket(e) => WebsocketError::WebSocket(e),
EmbeddedWebsocketFramerError::Disconnected => WebsocketError::Disconnected,
EmbeddedWebsocketFramerError::RxBufferTooSmall(e) => {
WebsocketError::RxBufferTooSmall(e)
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ mod constants;

pub use constants::*;
use em_as_net::client::websocket::{
AsyncWebsocketClientEmbeddedWebsocketTokio, AsyncWebsocketClientTungstenite, WebsocketOpen,
WebsocketOptions,
AsyncWebsocketClientEmbeddedWebsocketTokio, AsyncWebsocketClientTungstenite,
EmbeddedWebsocketOptions, WebsocketOpen,
};
use rand::{rngs::ThreadRng, thread_rng};
use tokio::net::TcpStream;
Expand Down Expand Up @@ -33,7 +33,7 @@ pub async fn connect_to_tungstenite_wss_echo<'a>() -> AsyncWebsocketClientTungst
pub async fn connect_to_embedded_websocket_tokio_ws_echo<'a>(
stream: &'a mut Framed<TcpStream, codec::Codec>,
buffer: &'a mut [u8],
websocket_options: &'a WebsocketOptions<'a>,
websocket_options: &'a EmbeddedWebsocketOptions<'a>,
) -> AsyncWebsocketClientEmbeddedWebsocketTokio<ThreadRng, WebsocketOpen> {
let rng = thread_rng();

Expand Down
27 changes: 15 additions & 12 deletions tests/integration/clients/async_websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use crate::common::{
ECHO_WS_AS_IP_SERVER,
};

use em_as_net::client::websocket::{Message, ReadResult, WebsocketOptions};
use em_as_net::client::websocket::{
EmbeddedWebsocketOptions, EmbeddedWebsocketReadMessageType, EmbeddedWebsocketSendMessageType,
TungsteniteMessage,
};
use futures::{SinkExt, TryStreamExt};
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
Expand All @@ -13,14 +16,14 @@ use tokio_util::codec::Framed;
async fn test_websocket_non_tls() {
let mut websocket = connect_to_ws_tungstenite_echo().await;
websocket
.send(Message::Text("Hello World".to_string()))
.send(TungsteniteMessage::Text("Hello World".to_string()))
.await
.unwrap();

loop {
let message = websocket.try_next().await.unwrap().unwrap();
match message {
Message::Text(text) => {
TungsteniteMessage::Text(text) => {
assert_eq!("Hello World".to_string(), text)
}
_ => panic!("Expected 'Hello World' as text message."),
Expand All @@ -33,14 +36,14 @@ async fn test_websocket_non_tls() {
async fn test_websocket_tls() {
let mut websocket = connect_to_tungstenite_wss_echo().await;
websocket
.send(Message::Text("Hello World".to_string()))
.send(TungsteniteMessage::Text("Hello World".to_string()))
.await
.unwrap();

loop {
let message = websocket.try_next().await.unwrap().unwrap();
match message {
Message::Text(text) => {
TungsteniteMessage::Text(text) => {
assert_eq!("Hello World".to_string(), text)
}
_ => panic!("Expected 'Hello World' as text message."),
Expand All @@ -54,7 +57,7 @@ async fn test_websocket_embedded_ws_tokio() {
let stream = TcpStream::connect(ECHO_WS_AS_IP_SERVER).await.unwrap();
let mut framed = Framed::new(stream, Codec::new());
let mut buffer = [0u8; 4096];
let websocket_options = WebsocketOptions {
let websocket_options = EmbeddedWebsocketOptions {
path: "/mirror",
host: "ws.vi-server.org",
origin: "http://ws.vi-server.org:80",
Expand All @@ -68,7 +71,7 @@ async fn test_websocket_embedded_ws_tokio() {
.send(
&mut framed,
&mut buffer,
embedded_websocket::WebSocketSendMessageType::Binary,
EmbeddedWebsocketSendMessageType::Binary,
false,
b"Hello World",
)
Expand All @@ -82,16 +85,16 @@ async fn test_websocket_embedded_ws_tokio() {
.unwrap()
.unwrap();
match message {
ReadResult::Text(text) => {
EmbeddedWebsocketReadMessageType::Text(text) => {
println!("Text: {:?}", text)
}
ReadResult::Binary(msg) => {
EmbeddedWebsocketReadMessageType::Binary(msg) => {
assert_eq!(b"Hello World", msg);
break;
}
ReadResult::Pong(t) => println!("Pong: {:?}", t),
ReadResult::Ping(t) => println!("Ping: {:?}", t),
ReadResult::Close(_) => println!("Close:"),
EmbeddedWebsocketReadMessageType::Pong(t) => println!("Pong: {:?}", t),
EmbeddedWebsocketReadMessageType::Ping(t) => println!("Ping: {:?}", t),
EmbeddedWebsocketReadMessageType::Close(_) => println!("Close:"),
}
}
}

0 comments on commit 4ef657d

Please sign in to comment.