Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass at writing the library #1

Merged
merged 8 commits into from
Feb 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,17 @@ edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["async-tungstenite", "cynic"]

[dependencies]
async_executors = { version = "0.4" }
futures = { version = "0.3"}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
uuid = { version = "0.8", features = ["v4"] }

async-tungstenite = { version = "0.13", optional = true }

cynic = { git = "https://github.com/obmarg/cynic.git", optional = true }
274 changes: 274 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
use std::{collections::HashMap, marker::PhantomData, sync::Arc};

use async_executors::{JoinHandle, SpawnHandle, SpawnHandleExt};
use futures::{
channel::{mpsc, oneshot},
lock::Mutex,
sink::{Sink, SinkExt},
stream::{Stream, StreamExt},
};
use uuid::Uuid;

use super::{
graphql::{self, GraphqlOperation},
protocol::{ConnectionAck, ConnectionInit, Event, Message},
websockets::WebsocketMessage,
};

const SUBSCRIPTION_BUFFER_SIZE: usize = 5;

/// A websocket client
pub struct AsyncWebsocketClient<GraphqlClient, WsMessage>
where
GraphqlClient: graphql::GraphqlClient,
{
inner: Arc<ClientInner<GraphqlClient>>,
sender_sink: mpsc::Sender<WsMessage>,
phantom: PhantomData<*const GraphqlClient>,
}

#[derive(thiserror::Error, Debug)]
#[error("Something went wrong")]
pub struct Error {}

impl<GraphqlClient, WsMessage> AsyncWebsocketClient<GraphqlClient, WsMessage>
where
WsMessage: WebsocketMessage + Send + 'static,
GraphqlClient: crate::graphql::GraphqlClient + 'static,
{
/// Constructs an AsyncWebsocketClient
///
/// Accepts a stream and a sink for the underlying websocket connection,
/// and an `async_executors::SpawnHandle` that tells the client which
/// async runtime to use.
pub async fn new(
mut websocket_stream: impl Stream<Item = Result<WsMessage, WsMessage::Error>>
+ Unpin
+ Send
+ 'static,
mut websocket_sink: impl Sink<WsMessage, Error = WsMessage::Error> + Unpin + Send + 'static,
runtime: impl SpawnHandle<()>,
) -> Result<Self, Error> {
websocket_sink
.send(json_message(ConnectionInit::new()).unwrap())
.await
.map_err(|_| Error {})?;

match websocket_stream.next().await {
None => todo!(),
Some(Err(_)) => todo!(),
Some(Ok(data)) => {
decode_message::<ConnectionAck<()>, _>(data).unwrap();
}
}

let (shutdown_sender, shutdown_receiver) = oneshot::channel();

let operations = Arc::new(Mutex::new(HashMap::new()));

let receiver_handle = runtime
.spawn_handle(receiver_loop::<_, _, GraphqlClient>(
websocket_stream,
Arc::clone(&operations),
shutdown_sender,
))
.unwrap();

let (sender_sink, sender_stream) = mpsc::channel(1);

let sender_handle = runtime
.spawn_handle(sender_loop(
sender_stream,
websocket_sink,
Arc::clone(&operations),
shutdown_receiver,
))
.unwrap();

Ok(AsyncWebsocketClient {
inner: Arc::new(ClientInner {
receiver_handle,
operations,
sender_handle,
}),
sender_sink,
phantom: PhantomData,
})
}

/*
pub async fn operation<'a, T: 'a>(&self, _op: Operation<'a, T>) -> Result<(), ()> {
todo!()
// Probably hook into streaming operation and do a take 1 -> into_future
}*/
obmarg marked this conversation as resolved.
Show resolved Hide resolved

/// Starts a streaming operation on this client.
///
/// Returns a `Stream` of responses.
pub async fn streaming_operation<Operation>(
&mut self,
op: Operation,
) -> impl Stream<Item = Operation::Response>
where
Operation: GraphqlOperation<GenericResponse = GraphqlClient::Response>,
{
let id = Uuid::new_v4();
let (sender, receiver) = mpsc::channel(SUBSCRIPTION_BUFFER_SIZE);

self.inner.operations.lock().await.insert(id, sender);

let msg = json_message(Message::Subscribe {
id: id.to_string(),
payload: &op,
})
.unwrap();

self.sender_sink.send(msg).await.unwrap();

receiver.map(move |response| op.decode(response).unwrap())
}
}

type OperationSender<GenericResponse> = mpsc::Sender<GenericResponse>;

type OperationMap<GenericResponse> = Arc<Mutex<HashMap<Uuid, OperationSender<GenericResponse>>>>;

async fn receiver_loop<S, WsMessage, GraphqlClient>(
mut receiver: S,
operations: OperationMap<GraphqlClient::Response>,
shutdown: oneshot::Sender<()>,
) where
S: Stream<Item = Result<WsMessage, WsMessage::Error>> + Unpin,
WsMessage: WebsocketMessage,
GraphqlClient: crate::graphql::GraphqlClient,
{
while let Some(msg) = receiver.next().await {
println!("Received message: {:?}", msg);
if handle_message::<WsMessage, GraphqlClient>(msg, &operations)
.await
.is_err()
{
println!("Error happened, killing things");
break;
}
}

shutdown.send(()).expect("Couldn't shutdown sender");
}

type BoxError = Box<dyn std::error::Error>;

async fn handle_message<WsMessage, GraphqlClient>(
msg: Result<WsMessage, WsMessage::Error>,
operations: &OperationMap<GraphqlClient::Response>,
) -> Result<(), BoxError>
where
WsMessage: WebsocketMessage,
GraphqlClient: crate::graphql::GraphqlClient,
{
let event = decode_message::<Event<GraphqlClient::Response>, WsMessage>(msg?)?;

if event.is_none() {
return Ok(());
}
let event = event.unwrap();

let id = &Uuid::parse_str(event.id())?;
match event {
Event::Next { payload, .. } => {
let mut sink = operations
.lock()
.await
.get(&id)
.ok_or("Received message for unknown subscription")?
.clone();

sink.send(payload).await?
}
Event::Complete { .. } => {
println!("Stream complete");
operations.lock().await.remove(&id);
}
Event::Error { payload, .. } => {
let mut sink = operations
.lock()
.await
.remove(&id)
.ok_or("Received error for unknown subscription")?;

sink.send(GraphqlClient::error_response(payload)?).await?;
}
}

Ok(())
}

async fn sender_loop<M, S, E, GenericResponse>(
message_stream: mpsc::Receiver<M>,
mut ws_sender: S,
operations: OperationMap<GenericResponse>,
shutdown: oneshot::Receiver<()>,
) where
M: WebsocketMessage,
S: Sink<M, Error = E> + Unpin,
E: std::error::Error,
{
use futures::{future::FutureExt, select};

let mut message_stream = message_stream.fuse();
let mut shutdown = shutdown.fuse();

loop {
select! {
msg = message_stream.next() => {
if let Some(msg) = msg {
println!("Sending message: {:?}", msg);
ws_sender.send(msg).await.unwrap();
} else {
return;
}
}
_ = shutdown => {
// Shutdown the incoming message stream
let mut message_stream = message_stream.into_inner();
message_stream.close();
while message_stream.next().await.is_some() {}

// Clear out any operations
operations.lock().await.clear();

return;
}
}
}
}

struct ClientInner<GraphqlClient>
where
GraphqlClient: crate::graphql::GraphqlClient,
{
#[allow(dead_code)]
receiver_handle: JoinHandle<()>,
#[allow(dead_code)]
sender_handle: JoinHandle<()>,
operations: OperationMap<GraphqlClient::Response>,
}

fn json_message<M: WebsocketMessage>(payload: impl serde::Serialize) -> Result<M, BoxError> {
Ok(M::new(serde_json::to_string(&payload)?))
}

fn decode_message<T: serde::de::DeserializeOwned, WsMessage: WebsocketMessage>(
message: WsMessage,
) -> Result<Option<T>, BoxError> {
if message.is_ping() || message.is_pong() {
Ok(None)
} else if message.is_close() {
todo!()
} else if let Some(s) = message.text() {
println!("Received {}", s);
Ok(Some(serde_json::from_str::<T>(&s)?))
} else {
Ok(None)
}
}
88 changes: 88 additions & 0 deletions src/graphql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//! This module contains traits that abstract over GraphQL implementations,
//! allowing this library to be used with different GraphQL client libraries.
//!
//! Support is provided for [`cynic`][cynic], but other client libraries can
//! be added by implementing these traits for those libraries.
//!
//! [cynic]: https://cynic-rs.dev

/// An abstraction over GraphQL operations.
pub trait GraphqlOperation: serde::Serialize {
/// The "generic" response type. A graphql-ws-client supports running multiple
/// operations at a time. This GenericResponse is what will intially be decoded -
/// with the `decode` function converting this into the actual operation response.
///
/// This type needs to match up with `GraphqlClient::Response` below.
type GenericResponse;

/// The actual response & error type of this operation.
type Response;

/// The error that will be returned from failed attempts to decode a `Response`.
type Error: std::error::Error;

/// Decodes a `GenericResponse` into the actual response that will be returned
/// to users for this operation.
fn decode(&self, data: Self::GenericResponse) -> Result<Self::Response, Self::Error>;
}

/// A trait for GraphQL clients.
pub trait GraphqlClient {
/// The generic response type for this GraphqlClient implementation
///
/// Our client will decode this, then pass it to a `GraphqlOperation` for decoding
/// to the specific response type of the GraphqlOperation.
type Response: serde::de::DeserializeOwned + Send;

/// The error that will be returned from failed attempts to decode a `Response`.
type DecodeError: std::error::Error + Send + 'static;

/// Decodes some error JSON into a `Response`
fn error_response(errors: Vec<serde_json::Value>) -> Result<Self::Response, Self::DecodeError>;
}

#[cfg(feature = "cynic")]
pub use self::cynic::Cynic;

#[cfg(feature = "cynic")]
mod cynic {
use super::*;

/// Provides an implementation of [GraphqlClient] for the cynic GraphQL crate
pub struct Cynic {}

impl GraphqlClient for Cynic {
type Response = ::cynic::GraphQLResponse<serde_json::Value>;

type DecodeError = serde_json::Error;

fn error_response(
errors: Vec<serde_json::Value>,
) -> Result<Self::Response, Self::DecodeError> {
Ok(::cynic::GraphQLResponse {
data: None,
errors: Some(
errors
.into_iter()
.map(serde_json::from_value)
.collect::<Result<Vec<_>, _>>()?,
),
})
}
}

impl<'a, ResponseData> GraphqlOperation for ::cynic::StreamingOperation<'a, ResponseData>
where
ResponseData: 'a,
{
type GenericResponse = ::cynic::GraphQLResponse<serde_json::Value>;

type Response = ::cynic::GraphQLResponse<ResponseData>;

type Error = ::cynic::DecodeError;

fn decode(&self, data: Self::GenericResponse) -> Result<Self::Response, Self::Error> {
self.decode_response(data)
}
}
}
Loading