Skip to content

Commit

Permalink
Brendan idea (#231)
Browse files Browse the repository at this point in the history
* upgrade workspace

* Drop core crate

* Drop `Blob`

* Good luck, you'll need it

* use tauri async runtime

* RouterBuilder & Router

* router > executor

* fn execute() cleanup

* run_connection > ConnectionTask

* batcher stream

* batch stream + RequestFuture -> Task

* cleanup

* Move Tauri and httpz integration out of the core

* actually drive streams

* unify connection + task shutdowns

* cleanup httpz

* SinkAndStream

* more unfold

* Update Cargo.toml

* cleanup

* a

* add rspc-core crate

* use futures std instead of alloc

---------

Co-authored-by: Brendan Allan <[email protected]>
  • Loading branch information
oscartbeaumont and Brendonovich authored Oct 9, 2023
1 parent ec02628 commit 7632944
Show file tree
Hide file tree
Showing 79 changed files with 3,388 additions and 3,351 deletions.
43 changes: 14 additions & 29 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,64 +27,49 @@ harness = false

[features]
default = ["typescript"]
tauri = ["dep:tauri", "tokio", "tauri/wry", "tauri-specta"]
typescript = ["specta/typescript"]
tracing = ["dep:tracing"]
httpz = ["dep:httpz", "httpz/cookies", "tokio", "tokio/sync"] # TODO: Remove the requirement on tokio
# anyhow = ["dep:anyhow"]
tokio = ["dep:tokio", "specta/tokio"]
typescript = ["specta/typescript", "tauri-specta/typescript"] # TODO: Use this in the actual codebase

unstable = [] # APIs where one line of code can blow up your whole app

# Webservers
axum = ["httpz", "httpz/axum", "httpz/tokio-ws", "httpz/axum"]
# actix-web = ["httpz/actix-web"]
# poem = ["httpz/poem"]
# rocket = ["httpz/rocket"]
# warp = ["httpz/warp"]
# TODO: Following ones are exposed but not officially supported
lambda = ["httpz", "httpz/lambda", "httpz/ws", "httpz/axum"]
workers = ["httpz", "httpz/workers", "httpz/ws"]
vercel = ["httpz", "httpz/vercel", "httpz/ws", "axum"]

[dependencies]
specta = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
futures = { version = "0.3.28", default-features = false }
pin-project-lite = "0.2.10"
futures = { version = "0.3.28", default-features = false, features = ["std", "async-await"] } # TODO: Drop for `futures_core` if possible
pin-project-lite = "0.2.13"
streamunordered = "0.5.3"

#### TODO: Can all the following deps be moved out of the core crate ###

# Optional
serde_json = { version = "1", default-features = false }
# TODO: toml, yaml, formdata

httpz = { version = "0.0.5", default-features = false, optional = true }
tauri = { version = "1.4.1", default-features = false, optional = true }
tauri-specta = { git = "https://github.com/oscartbeaumont/tauri-specta", rev = "1073c8da4e4fccbeebfb389b2f093ac711cd35df", default-features = false, optional = true }
tracing = { version = "0.1.37", default-features = false, optional = true }
worker = { version = "0.0.17", default-features = false, optional = true }
# anyhow = { version = "1", default-features = false, optional = true } # TODO: Should we bring this back with the new typed error handling?
tokio = { version = "1", default-features = false, features = ["rt", "time"], optional = true }
streamunordered = "0.5.2"

# TODO: Does this negatively affect compile times? Should it be flipped?
# # Even though this `cfg` can never be enabled, it still forces cargo to keep `serde_derive` in lockstep with `serde`.
# [target.'cfg(any())'.dependencies]
# rspc_httpz = { version = "=1.0.185", path = "../serde_derive" }

[dev-dependencies]
# Tests
async-stream = "0.3.5"
tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
tauri = { version = "1.4.1", features = ["api-all"] }

# Benchmark
criterion = { version = "0.5", features = ["async_tokio", "html_reports"] }
pprof = { version = "0.12.0", features = ["flamegraph", "criterion", "protobuf-codec", "frame-pointer"] }
# rspc_legacy = { package = "rspc", version = "0.1.3" }
pprof = { version = "0.13.0", features = ["flamegraph", "criterion", "protobuf-codec", "frame-pointer"] }

[workspace]
members = ["./crates/*", "./examples", "./examples/axum", "./examples/vercel", "./examples/tauri/src-tauri"]

[workspace.dependencies]
specta = { version = "=2.0.0-rc.1", default-features = false, features = ["serde", "serde_json"] }
serde = { version = "1", default-features = false, features = ["derive"] }
thiserror = { version = "1.0.43", default-features = false }
thiserror = { version = "1.0.49", default-features = false }

[patch.crates-io]
specta = { git = "https://github.com/oscartbeaumont/specta", rev = "5948d80f2551780eda2c7bf38450fc796c74cfbf" }
2 changes: 1 addition & 1 deletion benches/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn benchmark_main(e: &Executor<()>) {
},
&mut (None as Option<NoOpSubscriptionManager>),
) {
ExecutorResult::FutureResponse(fut) => fut.await,
ExecutorResult::Future(fut) => fut.await,
ExecutorResult::Response(resp) => resp,
ExecutorResult::None => unreachable!(),
};
Expand Down
10 changes: 0 additions & 10 deletions crates/axum/Cargo.toml

This file was deleted.

21 changes: 0 additions & 21 deletions crates/axum/src/lib.rs

This file was deleted.

3 changes: 1 addition & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
[package]
name = "rspc-core"
version = "0.0.1"
version = "1.0.0-rc.5"
edition = "2021"
publish = false

[dependencies]
3 changes: 3 additions & 0 deletions crates/core/src/internal/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! rspc core internals
//!
//! Anything in the module does *NOT* follow semantic versioning and may change at any time.
6 changes: 5 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
//! TODO
//! Core traits and types for [rspc](https://docs.rs/rspc)
// TODO: move over lints

#[doc(hidden)]
pub mod internal;
12 changes: 6 additions & 6 deletions crates/create-rspc-app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ requestty = "0.5.0"
strum = { version = "0.25.0", features = ["derive"] }
rustc_version = "0.4.0"
ureq = { version = "2.7.1", features = ["json"] }
serde_json = "1.0.103"
ctrlc = "3.4.0"
thiserror = "1.0.43"
serde_json = "1.0.107"
ctrlc = "3.4.1"
thiserror = "1.0.49"
walkdir = "2"

[dev-dependencies]
tempfile = "3.6.0"
cargo = "0.71.0"
tokio = { version = "1.29.1", features = ["full", "process"] }
tempfile = "3.8.0"
cargo = "0.73.1"
tokio = { version = "1.32.0", features = ["full", "process"] }
futures = "0.3.28"
26 changes: 26 additions & 0 deletions crates/httpz/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "rspc-httpz"
version = "1.0.0-rc.5"
edition = "2021"

[features]
# Webservers
axum = ["httpz/axum", "httpz/tokio-ws", "httpz/axum"]
# actix-web = ["httpz/actix-web"]
# poem = ["httpz/poem"]
# rocket = ["httpz/rocket"]
# warp = ["httpz/warp"]
# TODO: Following ones are exposed but not officially supported
lambda = ["httpz/lambda", "httpz/ws", "httpz/axum"]
workers = ["httpz/workers", "httpz/ws"]
vercel = ["httpz/vercel", "httpz/ws", "axum"] # TODO: Shouldn't rely on Axum

[dependencies]
rspc = { version = "1.0.0-rc.5", path = "../../", default-features = false, features = ["tokio"] } # TODO: Shouldn't rely on Tokio
httpz = { version = "0.0.5", default-features = false, features = ["cookies"] }
tokio = { version = "1", default-features = false, features = ["sync"] }
serde_json = "1.0.107"
futures = "0.3.28"

# TODO: Remove following
worker = { version = "0.0.17", default-features = false, optional = true } # TODO: update this package once httpz is updated
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The future of this code in unsure. It will probs be removed or refactored once we support more than just Axum because all of the feature gating is bad.

use super::{CookieJar, Request};
use crate::ExecError;
use rspc::ExecError;

use std::marker::PhantomData;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use std::{
sync::{Arc, Mutex},
};

use crate::{
internal::exec::{self, Executor, ExecutorResult, NoOpSubscriptionManager},
BuiltRouter,
use rspc::{
internal::exec::{self, Connection, ExecutorResult},
Router,
};

use super::{handle_websocket, CookieJar, TCtxFunc};
Expand All @@ -21,55 +21,45 @@ use super::{handle_websocket, CookieJar, TCtxFunc};
// TODO: Remove all panics lol
// TODO: Cleanup the code and use more chaining

impl<TCtx> BuiltRouter<TCtx>
pub fn endpoint<TCtx, TCtxFnMarker: Send + Sync + 'static, TCtxFn: TCtxFunc<TCtx, TCtxFnMarker>>(
router: Arc<Router<TCtx>>,
ctx_fn: TCtxFn,
) -> Endpoint<impl HttpEndpoint>
where
TCtx: Clone + Send + Sync + 'static,
{
pub fn endpoint<TCtxFnMarker: Send + Sync + 'static, TCtxFn: TCtxFunc<TCtx, TCtxFnMarker>>(
self: Arc<Self>,
ctx_fn: TCtxFn,
) -> Endpoint<impl HttpEndpoint> {
let executor = Executor::new(self);

// TODO: This should be able to call `ctn_fn` prior to the async boundary to avoid cloning it!
// TODO: Basically httpz would need to be able to return `Response | Future<Response>` basically how rspc executor works.

GenericEndpoint::new(
"/:id", // TODO: I think this is Axum specific. Fix in `httpz`!
[Method::GET, Method::POST],
move |req: httpz::Request| {
// TODO: It would be nice if these clones weren't per request.
// TODO: Maybe httpz can `Box::leak` a ref to a context type and allow it to be shared.
let executor = executor.clone();
let ctx_fn = ctx_fn.clone();

async move {
match (req.method(), &req.uri().path()[1..]) {
(&Method::GET, "ws") => {
handle_websocket(executor, ctx_fn, req).into_response()
}
(&Method::GET, _) => {
handle_http(executor, ctx_fn, req).await.into_response()
}
(&Method::POST, "_batch") => handle_http_batch(executor, ctx_fn, req)
.await
.into_response(),
(&Method::POST, _) => {
handle_http(executor, ctx_fn, req).await.into_response()
}
_ => Ok(Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.body(vec![])?),
// TODO: This should be able to call `ctn_fn` prior to the async boundary to avoid cloning it!
// TODO: Basically httpz would need to be able to return `Response | Future<Response>` basically how rspc executor works.

GenericEndpoint::new(
"/:id", // TODO: I think this is Axum specific. Fix in `httpz`!
[Method::GET, Method::POST],
move |req: httpz::Request| {
// TODO: It would be nice if these clones weren't per request.
// TODO: Maybe httpz can `Box::leak` a ref to a context type and allow it to be shared.
let router = router.clone();
let ctx_fn = ctx_fn.clone();

async move {
match (req.method(), &req.uri().path()[1..]) {
(&Method::GET, "ws") => handle_websocket(router, ctx_fn, req).into_response(),
(&Method::GET, _) => handle_http(router, ctx_fn, req).await.into_response(),
(&Method::POST, "_batch") => {
handle_http_batch(router, ctx_fn, req).await.into_response()
}
(&Method::POST, _) => handle_http(router, ctx_fn, req).await.into_response(),
_ => Ok(Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.body(vec![])?),
}
},
)
}
}
},
)
}

#[allow(clippy::unwrap_used)] // TODO: Remove all panics lol
async fn handle_http<TCtx, TCtxFn, TCtxFnMarker>(
executor: Executor<TCtx>,
router: Arc<Router<TCtx>>,
ctx_fn: TCtxFn,
req: httpz::Request,
) -> impl HttpResponse
Expand Down Expand Up @@ -99,15 +89,15 @@ where
.unwrap_or(Ok(None as Option<Value>))
.unwrap();

exec::Request::Query { id: 0, path, input }
exec::Request::Query(exec::RequestData { id: 0, path, input })
}
Method::POST => {
let input = (!req.body().is_empty())
.then(|| serde_json::from_slice(req.body()))
.unwrap_or(Ok(None))
.unwrap();

exec::Request::Mutation { id: 0, path, input }
exec::Request::Mutation(exec::RequestData { id: 0, path, input })
}
_ => unreachable!(),
};
Expand All @@ -133,10 +123,13 @@ where
};

let response =
match executor.execute(ctx, request, &mut (None as Option<NoOpSubscriptionManager>)) {
ExecutorResult::FutureResponse(fut) => fut.await,
ExecutorResult::Response(response) => response,
ExecutorResult::None => unreachable!(
match router.execute(ctx, request, None) {
Some(res) => match res {
ExecutorResult::Future(fut) => fut.await,
ExecutorResult::Response(response) => response,
ExecutorResult::Task(task) => todo!(),
},
None => unreachable!(
"Executor will only return none for a 'stopSubscription' event which is impossible here"
),
}.inner;
Expand Down Expand Up @@ -176,7 +169,7 @@ where

#[allow(clippy::unwrap_used)] // TODO: Remove this
async fn handle_http_batch<TCtx, TCtxFn, TCtxFnMarker>(
executor: Executor<TCtx>,
router: Arc<Router<TCtx>>,
ctx_fn: TCtxFn,
req: httpz::Request,
) -> impl HttpResponse
Expand Down Expand Up @@ -208,12 +201,23 @@ where
};

let fut_responses = FuturesUnordered::new();
let mut responses = executor.execute_batch(
&ctx,
requests,
&mut (None as Option<NoOpSubscriptionManager>),
|fut| fut_responses.push(fut),
);

let mut responses = Vec::with_capacity(requests.len());
for req in requests {
let Some(res) = router.clone().execute(ctx.clone(), req, None) else {
continue;
};

match res {
ExecutorResult::Future(fut) => {
fut_responses.push(fut);
}
ExecutorResult::Response(resp) => {
responses.push(resp);
}
ExecutorResult::Task(task) => todo!(),
}
}

let cookies = {
match Arc::try_unwrap(cookie_jar) {
Expand Down
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 7632944

Please sign in to comment.