Skip to content

Commit

Permalink
Unwrap all unwrap() and expect() to actual errors. (#20)
Browse files Browse the repository at this point in the history
Fixes #4 
Also playing a little after merge this with into #19 (in my branch `all_changes`).
  • Loading branch information
BratSinot authored May 30, 2021
1 parent eb6e04c commit 8ffb424
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 25 deletions.
2 changes: 1 addition & 1 deletion examples/examples/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn main() {
.await
.unwrap();

let mut stream = client.streaming_operation(build_query()).await;
let mut stream = client.streaming_operation(build_query()).await.unwrap();
println!("Running subscription apparently?");
while let Some(item) = stream.next().await {
println!("{:?}", item);
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn main() {
.await
.unwrap();

let mut stream = client.streaming_operation(build_query()).await;
let mut stream = client.streaming_operation(build_query()).await.unwrap();
println!("Running subscription apparently?");
while let Some(item) = stream.next().await {
println!("{:?}", item);
Expand Down
65 changes: 42 additions & 23 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ pub enum Error {
/// Sending error
#[error("message sending error, reason: {0}")]
Send(String),
/// Futures spawn error
#[error("futures spawn error, reason: {0}")]
SpawnHandle(String),
/// Sender shutdown error
#[error("sender shutdown error, reason: {0}")]
SenderShutdown(String),
}

#[derive(Serialize)]
Expand Down Expand Up @@ -102,7 +108,7 @@ where
+ Send
+ 'static,
mut websocket_sink: impl Sink<WsMessage, Error = WsMessage::Error> + Unpin + Send + 'static,
runtime: impl SpawnHandle<()>,
runtime: impl SpawnHandle<Result<(), Error>>,
) -> Result<AsyncWebsocketClient<GraphqlClient, WsMessage>, Error>
where
GraphqlClient: crate::graphql::GraphqlClient + Send + 'static,
Expand Down Expand Up @@ -131,7 +137,7 @@ where
Arc::clone(&operations),
shutdown_sender,
))
.unwrap();
.map_err(|err| Error::SpawnHandle(err.to_string()))?;

let (sender_sink, sender_stream) = mpsc::channel(1);

Expand All @@ -142,7 +148,7 @@ where
Arc::clone(&operations),
shutdown_receiver,
))
.unwrap();
.map_err(|err| Error::SpawnHandle(err.to_string()))?;

Ok(AsyncWebsocketClient {
inner: Arc::new(ClientInner {
Expand Down Expand Up @@ -173,7 +179,7 @@ where
pub async fn streaming_operation<'a, Operation>(
&mut self,
op: Operation,
) -> SubscriptionStream<GraphqlClient, Operation>
) -> Result<SubscriptionStream<GraphqlClient, Operation>, Error>
where
Operation:
GraphqlOperation<GenericResponse = GraphqlClient::Response> + Unpin + Send + 'static,
Expand All @@ -187,16 +193,22 @@ where
id: id.to_string(),
payload: &op,
})
.unwrap();
.map_err(|err| Error::Decode(err.to_string()))?;

self.sender_sink.send(msg).await.unwrap();
self.sender_sink
.send(msg)
.await
.map_err(|err| Error::Send(err.to_string()))?;

let mut sender_clone = self.sender_sink.clone();
let id_clone = id.to_string();

SubscriptionStream::<GraphqlClient, Operation> {
Ok(SubscriptionStream::<GraphqlClient, Operation> {
id: id.to_string(),
stream: Box::pin(receiver.map(move |response| op.decode(response).unwrap())),
stream: Box::pin(receiver.map(move |response| {
op.decode(response)
.map_err(|err| Error::Decode(err.to_string()))
})),
cancel_func: Box::new(move || {
Box::pin(async move {
let msg: Message<()> = Message::Complete { id: id_clone };
Expand All @@ -210,7 +222,7 @@ where
})
}),
phantom: PhantomData,
}
})
}
}

Expand All @@ -221,7 +233,7 @@ where
Operation: GraphqlOperation<GenericResponse = GraphqlClient::Response>,
{
id: String,
stream: Pin<Box<dyn Stream<Item = Operation::Response> + Send>>,
stream: Pin<Box<dyn Stream<Item = Result<Operation::Response, Error>> + Send>>,
cancel_func: Box<dyn FnOnce() -> futures::future::BoxFuture<'static, Result<(), Error>> + Send>,
phantom: PhantomData<GraphqlClient>,
}
Expand All @@ -241,7 +253,7 @@ where
GraphqlClient: graphql::GraphqlClient,
Operation: GraphqlOperation<GenericResponse = GraphqlClient::Response> + Unpin,
{
type Item = Operation::Response;
type Item = Result<Operation::Response, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().stream.as_mut().poll_next(cx)
Expand All @@ -256,7 +268,8 @@ async fn receiver_loop<S, WsMessage, GraphqlClient>(
mut receiver: S,
operations: OperationMap<GraphqlClient::Response>,
shutdown: oneshot::Sender<()>,
) where
) -> Result<(), Error>
where
S: Stream<Item = Result<WsMessage, WsMessage::Error>> + Unpin,
WsMessage: WebsocketMessage,
GraphqlClient: crate::graphql::GraphqlClient,
Expand All @@ -272,7 +285,9 @@ async fn receiver_loop<S, WsMessage, GraphqlClient>(
}
}

shutdown.send(()).expect("Couldn't shutdown sender");
shutdown
.send(())
.map_err(|_| Error::SenderShutdown("Couldn't shutdown sender".to_owned()))
}

async fn handle_message<WsMessage, GraphqlClient>(
Expand All @@ -288,10 +303,10 @@ where
)
.map_err(|err| Error::Decode(err.to_string()))?;

if event.is_none() {
return Ok(());
}
let event = event.unwrap();
let event = match event {
Some(event) => event,
None => return Ok(()),
};

let id = &Uuid::parse_str(event.id()).map_err(|err| Error::Decode(err.to_string()))?;
match event {
Expand Down Expand Up @@ -335,7 +350,8 @@ async fn sender_loop<M, S, E, GenericResponse>(
mut ws_sender: S,
operations: OperationMap<GenericResponse>,
shutdown: oneshot::Receiver<()>,
) where
) -> Result<(), Error>
where
M: WebsocketMessage,
S: Sink<M, Error = E> + Unpin,
E: std::error::Error,
Expand All @@ -350,9 +366,12 @@ async fn sender_loop<M, S, E, GenericResponse>(
msg = message_stream.next() => {
if let Some(msg) = msg {
println!("Sending message: {:?}", msg);
ws_sender.send(msg).await.unwrap();
ws_sender
.send(msg)
.await
.map_err(|err| Error::Send(err.to_string()))?;
} else {
return;
return Ok(());
}
}
_ = shutdown => {
Expand All @@ -364,7 +383,7 @@ async fn sender_loop<M, S, E, GenericResponse>(
// Clear out any operations
operations.lock().await.clear();

return;
return Ok(());
}
}
}
Expand All @@ -375,9 +394,9 @@ where
GraphqlClient: crate::graphql::GraphqlClient,
{
#[allow(dead_code)]
receiver_handle: JoinHandle<()>,
receiver_handle: JoinHandle<Result<(), Error>>,
#[allow(dead_code)]
sender_handle: JoinHandle<()>,
sender_handle: JoinHandle<Result<(), Error>>,
operations: OperationMap<GraphqlClient::Response>,
}

Expand Down

0 comments on commit 8ffb424

Please sign in to comment.