Skip to content

Commit

Permalink
Content Types (#200)
Browse files Browse the repository at this point in the history
* dumb

* fix

* Remove `OwnedStream`

* Refactoring of request types

* Replace `Stream` with internal trait

* a little cleanup

* wip

* Remove `RspcStream::Item`

* `RspcStream` -> `Body`

* rollback `Bytes` + make it work

* cleanup
  • Loading branch information
oscartbeaumont authored Aug 12, 2023
1 parent dff9616 commit 5840037
Show file tree
Hide file tree
Showing 33 changed files with 649 additions and 541 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,7 @@ stats.html

# Alpha bindings
demo.bindings.ts
demo2.bindings.ts
demo2.bindings.ts

# /_ - I use it for stashing WIP stuff
/_
24 changes: 8 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ harness = false
[features]
default = ["typescript"]
tauri = ["dep:tauri", "tokio", "tauri/wry"]
tracing = ["dep:tracing", "dep:tracing-futures"]
tracing = ["dep:tracing"]
httpz = ["dep:httpz", "httpz/cookies", "tokio", "tokio/sync"] # TODO: Remove the requirement on tokio
anyhow = ["dep:anyhow"]
tokio = ["dep:tokio", "specta/tokio"]
typescript = ["specta/typescript"] # TODO: Use this in the actual codebase
typescript = ["specta/typescript"] # TODO: Use this in the actual codebase

unstable = [] # APIs where one line of code can blow up your whole app

Expand All @@ -50,23 +50,21 @@ vercel = ["httpz", "httpz/vercel", "httpz/ws", "axum"]
[dependencies]
specta = { version = "=2.0.0-rc.1", default-features = false, features = ["serde"] }
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false }
thiserror = { version = "1.0.43", default-features = false } # TODO: Possibly remove and do Specta typesafe errors manully?
thiserror = { version = "1.0.43", default-features = false }
futures = { version = "0.3.28", default-features = false }
pin-project-lite = "0.2.10"

# Optional
serde_json = { version = "1", default-features = false }
# TODO: toml, yaml, formdata

httpz = { version = "0.0.5", default-features = false, optional = true }
tauri = { version = "1.4.1", default-features = false, optional = true }
tracing = { version = "0.1.37", default-features = false, optional = true }
tracing-futures = { version = "0.2.5", default-features = false, features = ["futures-03"], optional = true }
worker = { version = "0.0.17", default-features = false, optional = true }
anyhow = { version = "1", default-features = false, optional = true }
tokio = { version = "1", default-features = false, features = ["rt", "time"], optional = true }
streamunordered = "0.5.2"
http-body = "1.0.0-rc.2"
bytes = "1.4.0"
http-body-util = "0.1.0-rc.3" # TODO: Remove

[dev-dependencies]
# Tests
Expand All @@ -77,13 +75,7 @@ tauri = { version = "1.4.1", features = ["api-all"] }
# Benchmark
criterion = { version = "0.5", features = ["async_tokio", "html_reports"] }
pprof = { version = "0.12.0", features = ["flamegraph", "criterion", "protobuf-codec", "frame-pointer"] }
rspc_legacy = { package = "rspc", version = "0.1.3"}
rspc_legacy = { package = "rspc", version = "0.1.3" }

[workspace]
members = [
"./crates/*",
"./examples",
"./examples/axum",
"./examples/vercel",
"./examples/tauri/src-tauri"
]
members = ["./crates/*", "./examples", "./examples/axum", "./examples/vercel", "./examples/tauri/src-tauri"]
1 change: 1 addition & 0 deletions crates/axum/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Axum integration for rspc
#![allow(warnings)] // TODO: Remove once stabilized

// TODO: Crate lints

Expand Down
4 changes: 2 additions & 2 deletions examples/axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ edition = "2021"
publish = false

[dependencies]
rspc = { path = "../../", features = ["axum", "tracing"] }
rspc = { path = "../../", features = ["axum", "tracing", "unstable"] }
rspc-axum = { path = "../../crates/axum" }
tokio = { version = "1.29.1", features = ["full"] }
async-stream = "0.3.5"
axum = { version = "0.6.19", features = ["ws"] }
axum = { version = "0.6.19", features = ["ws", "http2"] }
tower-http = { version = "0.4.1", default-features = false, features = ["cors"] }
futures = "0.3.28"
tracing = "0.1.37"
Expand Down
12 changes: 10 additions & 2 deletions examples/axum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::{
use futures::Stream;
use async_stream::stream;
use axum::routing::get;
use rspc::{integrations::httpz::Request, ErrorCode, ExportConfig, Rspc};
use tokio::time::sleep;
use rspc::{integrations::httpz::Request, ErrorCode, ExportConfig, Rspc, Blob};
use tokio::{time::sleep, fs::File, io::BufReader};
use tower_http::cors::{Any, CorsLayer};
use tracing::info;

Expand Down Expand Up @@ -132,6 +132,14 @@ async fn main() {
}
}),
)
// TODO: This is an unstable feature and should be used with caution!
.procedure("serveFile", R.query(|_, _: ()| async move {
let file = File::open("./demo.json").await.unwrap();

// TODO: What if type which is `futures::Stream` + `tokio::AsyncRead`???

Blob(BufReader::new(file))
}))
.build()
.unwrap()
.arced(); // This function is a shortcut to wrap the router in an `Arc`.
Expand Down
1 change: 1 addition & 0 deletions examples/bindings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type Procedures = {
{ key: "X-Demo-Header", input: never, result: string } |
{ key: "echo", input: string, result: string } |
{ key: "error", input: never, result: string } |
{ key: "serveFile", input: never, result: null } |
{ key: "transformMe", input: never, result: string } |
{ key: "version", input: never, result: string },
mutations:
Expand Down
2 changes: 2 additions & 0 deletions src/blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/// TODO
pub struct Blob<T>(pub T);
2 changes: 1 addition & 1 deletion src/compiled_router.rs → src/built_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use specta::{
};

use crate::{
internal::{ProcedureStore, ProcedureTodo},
internal::procedure::{ProcedureStore, ProcedureTodo},
ExportError,
};

Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl From<ExecError> for ResponseError {
}

#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
pub enum ExportError {
#[error("IO error exporting bindings: {0}")]
IOErr(#[from] std::io::Error),
Expand Down Expand Up @@ -175,6 +176,7 @@ impl Error {

/// TODO
#[derive(Debug, Clone, Copy, Serialize, Type, PartialEq, Eq)]
#[non_exhaustive]
pub enum ErrorCode {
BadRequest,
Unauthorized,
Expand Down
73 changes: 73 additions & 0 deletions src/internal/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use futures::ready;
use pin_project_lite::pin_project;
use serde_json::Value;

use crate::ExecError;

/// The resulting body from an rspc operation.
///
/// This can mean different things in different contexts.
/// For a query or mutation each frame is a part of the resulting single "message". Eg. part of the json, or part of a file.
/// For a subscription each frame is a discrete websocket message. Eg. the json for a single procedure's result
///
#[must_use = "`Body` do nothing unless polled"]
pub trait Body {
// TODO: Return `bytes::Bytes` instead
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Value, ExecError>>>;

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}

// This type was taken from futures_util so all credit to it's original authors!
pin_project! {
/// A stream which emits single element and then EOF.
#[must_use = "streams do nothing unless polled"]
pub(crate) struct Once<Fut> {
#[pin]
future: Option<Fut>
}
}

impl<Fut> Once<Fut> {
pub fn new(future: Fut) -> Self {
Self {
future: Some(future),
}
}
}

impl<Fut: Future<Output = Result<Value, ExecError>>> Body for Once<Fut> {
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Value, ExecError>>> {
let mut this = self.project();
let v = match this.future.as_mut().as_pin_mut() {
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Ready(None),
};

this.future.set(None);
Poll::Ready(Some(v))
}

fn size_hint(&self) -> (usize, Option<usize>) {
if self.future.is_some() {
(1, Some(1))
} else {
(0, Some(0))
}
}
}
18 changes: 9 additions & 9 deletions src/internal/exec/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use serde_json::Value;
use streamunordered::{StreamUnordered, StreamYield};

use super::{
AsyncRuntime, Executor, IncomingMessage, OwnedStream, Request, Response, StreamOrFut,
SubscriptionManager, SubscriptionSet,
AsyncRuntime, Executor, IncomingMessage, Request, Response, RspcTask, SubscriptionManager,
SubscriptionSet,
};
use crate::internal::{
exec::{self, ResponseInner},
Expand Down Expand Up @@ -42,15 +42,15 @@ enum PollResult {
struct ConnectionSubscriptionManager<'a, TCtx> {
pub map: &'a mut SubscriptionSet,
pub to_abort: Option<Vec<u32>>,
pub queued: Option<Vec<OwnedStream<TCtx>>>,
pub queued: Option<Vec<RspcTask<TCtx>>>,
}

impl<'a, TCtx: Clone + Send + 'static> SubscriptionManager<TCtx>
for ConnectionSubscriptionManager<'a, TCtx>
{
type Set<'m> = &'m mut SubscriptionSet where Self: 'm;

fn queue(&mut self, stream: OwnedStream<TCtx>) {
fn queue(&mut self, stream: RspcTask<TCtx>) {
match &mut self.queued {
Some(queued) => {
queued.push(stream);
Expand Down Expand Up @@ -104,7 +104,7 @@ pin_project! {
executor: Executor<TCtx>,
map: SubscriptionSet,
#[pin]
streams: StreamUnordered<StreamOrFut<TCtx>>,
streams: StreamUnordered<RspcTask<TCtx>>,

// TODO: Remove these cause disgusting messes
sub_id_to_stream: HashMap<u32, usize>,
Expand All @@ -125,8 +125,8 @@ where
let resps = self
.executor
.execute_batch(&self.ctx, reqs, &mut manager, |fut| {
let fut_id = fut.id;
let token = self.streams.insert(StreamOrFut::Future { fut });
let fut_id = fut.id();
let token = self.streams.insert(fut.into());
self.sub_id_to_stream.insert(fut_id, token);
});

Expand All @@ -142,8 +142,8 @@ where

if let Some(queued) = manager.queued {
for stream in queued {
let sub_id = stream.id;
let token = self.streams.insert(StreamOrFut::Stream { stream });
let sub_id = stream.id();
let token = self.streams.insert(stream);
self.sub_id_to_stream.insert(sub_id, token);
}
}
Expand Down
Loading

0 comments on commit 5840037

Please sign in to comment.