diff --git a/crates/core/src/body/map.rs b/crates/core/src/body/map.rs deleted file mode 100644 index 6ab80bee..00000000 --- a/crates/core/src/body/map.rs +++ /dev/null @@ -1,42 +0,0 @@ -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -use futures::ready; -use pin_project_lite::pin_project; - -pin_project! { - /// A function for mapping a body into a future. - #[must_use = "streams do nothing unless polled"] - pub(crate) struct Map { - #[pin] - future: Option, - func: fn(Fut::Output) -> Result, - } -} - -impl Map { - pub fn new(future: Fut, func: fn(Fut::Output) -> Result) -> Self { - Self { - future: Some(future), - func, - } - } -} - -impl Future for Map { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - let v = match this.future.as_mut().as_pin_mut() { - Some(fut) => ready!(fut.poll(cx)), - None => panic!("`Map` polled after completion"), - }; - - this.future.set(None); - Poll::Ready((this.func)(v)) - } -} diff --git a/crates/core/src/body/mod.rs b/crates/core/src/body/mod.rs index b96d07f7..87a1ddb8 100644 --- a/crates/core/src/body/mod.rs +++ b/crates/core/src/body/mod.rs @@ -1,7 +1,5 @@ mod body; -mod map; mod once; pub use body::*; -pub(crate) use map::*; pub(crate) use once::*; diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index fedfb3ed..71b5034d 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -15,7 +15,8 @@ pub trait IntoResolverError: Serialize + Type + std::error::Error { Self: Sized, { ResolverError { - value: serde_json::to_value(&self).unwrap_or_default(), + // TODO: Error handling + value: serde_json::to_value(&self).unwrap(), message: self.to_string(), } } @@ -123,8 +124,18 @@ impl From for ProcedureError { cause: None, }, ExecError::Resolver(err) => return err.into(), - ExecError::ErrSubscriptionNotFound => todo!(), - ExecError::ErrSubscriptionAlreadyClosed => todo!(), + // ExecError::ErrSubscriptionNotFound => Error { + // code: ErrorCode::InternalServerError, + // message: "error a procedure returned an empty stream".into(), + // cause: None, + // }, + // ExecError::ErrSubscriptionAlreadyClosed => Error { + // code: ErrorCode::InternalServerError, + // message: "error subscription was already closed".into(), + // cause: None, + // }, + // TODO: Sort out this panic + _ => todo!(), }) } } diff --git a/crates/core/src/exec/arc_ref.rs b/crates/core/src/exec/arc_ref.rs index 03dba7db..dd02e54f 100644 --- a/crates/core/src/exec/arc_ref.rs +++ b/crates/core/src/exec/arc_ref.rs @@ -16,7 +16,6 @@ use serde_json::Value; use crate::{ body::Body, middleware::{ProcedureKind, RequestContext}, - procedure_store::ProcedureTodo, router_builder::ProcedureMap, Router, }; diff --git a/crates/core/src/exec/connection.rs b/crates/core/src/exec/connection.rs index 04b63f3e..6964da9c 100644 --- a/crates/core/src/exec/connection.rs +++ b/crates/core/src/exec/connection.rs @@ -1,10 +1,7 @@ use std::{ collections::HashMap, - future::Future, - marker::PhantomData, pin::{pin, Pin}, sync::Arc, - task::{Context, Poll}, time::{Duration, Instant}, }; @@ -13,27 +10,22 @@ use futures::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, oneshot, }, - future::{Either, OptionFuture}, - pin_mut, ready, - stream::{self, Fuse, FusedStream, FuturesUnordered}, + future::OptionFuture, + pin_mut, + stream::{self, FusedStream, FuturesUnordered}, FutureExt, Sink, SinkExt, Stream, StreamExt, }; -use pin_project_lite::pin_project; -use serde_json::Value; use streamunordered::{StreamUnordered, StreamYield}; -use super::{ExecutorResult, IncomingMessage, Request, Requests, Response, Task}; -use crate::{ - exec, - util::{PinnedOption, PinnedOptionProj}, - AsyncRuntime, Router, -}; +use super::{ExecutorResult, IncomingMessage, Request, Response, Task}; +use crate::{exec, AsyncRuntime, Router}; // Time to wait for more messages before sending them over the websocket connection. // This batch is mostly designed to reduce the impact of duplicate subscriptions a bit // as sending them together should help us utilise transport layer compression. const BATCH_TIMEOUT: Duration = Duration::from_millis(5); +// TODO: I don't like this pub(crate) struct TaskShutdown { stream_id: usize, tx: oneshot::Sender, @@ -106,6 +98,7 @@ fn batch_unbounded( 'batch: loop { let timer = R::sleep_util(Instant::now() + BATCH_TIMEOUT).fuse(); + #[allow(clippy::never_loop)] 'timer: loop { pin_mut!(timer); diff --git a/crates/core/src/exec/execute.rs b/crates/core/src/exec/execute.rs index a4794b54..4561144d 100644 --- a/crates/core/src/exec/execute.rs +++ b/crates/core/src/exec/execute.rs @@ -1,19 +1,4 @@ -use std::{ - borrow::Cow, - collections::{HashMap, HashSet}, - convert::Infallible, - fmt, - future::{Future, Ready}, - marker::PhantomData, - ops::{Deref, DerefMut}, - pin::Pin, - sync::{Arc, Mutex}, - task::{Context, Poll, Waker}, -}; - -use futures::{channel::oneshot, stream::FuturesUnordered, Stream, StreamExt}; - -use serde_json::Value; +use std::{pin::Pin, sync::Arc}; use crate::{ body::Body, @@ -23,14 +8,10 @@ use crate::{ request_future::RequestFuture, Request, Response, ResponseInner, Task, }, - layer::FutureValueOrStream, - middleware::{ProcedureKind, RequestContext}, - procedure_store::ProcedureTodo, - router_builder::ProcedureMap, Router, }; -use super::{task, Connection, RequestData}; +use super::{task, Connection}; /// TODO /// @@ -56,6 +37,7 @@ impl Router { self: Arc, ctx: TCtx, req: Request, + // TODO: Can the executor be decoupled from the connection??? conn: Option<&mut Connection>, ) -> Option { // TODO diff --git a/crates/core/src/exec/request_future.rs b/crates/core/src/exec/request_future.rs index 44939478..a72e1622 100644 --- a/crates/core/src/exec/request_future.rs +++ b/crates/core/src/exec/request_future.rs @@ -2,21 +2,13 @@ use std::{ fmt, future::Future, pin::Pin, - sync::Arc, task::{Context, Poll}, }; -use futures::{ready, Stream}; -use pin_project_lite::pin_project; -use serde_json::Value; - use crate::{ body::Body, error::ExecError, - exec::{self, Response, ResponseInner}, - middleware::RequestContext, - util::{PinnedOption, PinnedOptionProj}, - Router, + exec::{Response, ResponseInner}, }; use super::arc_ref::ArcRef; diff --git a/crates/core/src/exec/task.rs b/crates/core/src/exec/task.rs index 6f7c2067..cc81e1f8 100644 --- a/crates/core/src/exec/task.rs +++ b/crates/core/src/exec/task.rs @@ -41,6 +41,7 @@ impl Stream for Task { cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { match &self.status { + #[allow(clippy::panic)] Status::DoNotPoll => { #[cfg(debug_assertions)] panic!("`StreamWrapper` polled after completion") diff --git a/crates/core/src/middleware/mw_ctx.rs b/crates/core/src/middleware/mw_ctx.rs index 35f8779d..0994d0f7 100644 --- a/crates/core/src/middleware/mw_ctx.rs +++ b/crates/core/src/middleware/mw_ctx.rs @@ -5,18 +5,13 @@ use serde_json::Value; use super::{Executable2Placeholder, MwResultWithCtx}; pub fn new_mw_ctx(input: serde_json::Value, req: RequestContext) -> MiddlewareContext { - MiddlewareContext { - input, - req, - _priv: (), - } + MiddlewareContext { input, req } } +#[non_exhaustive] pub struct MiddlewareContext { pub input: Value, pub req: RequestContext, - // Prevents downstream user constructing type - _priv: (), } impl MiddlewareContext { diff --git a/crates/core/src/procedure_store.rs b/crates/core/src/procedure_store.rs index dec4f957..5f304b15 100644 --- a/crates/core/src/procedure_store.rs +++ b/crates/core/src/procedure_store.rs @@ -66,7 +66,7 @@ fn never() -> DataType { }, &[], ) - .unwrap() + .expect("rspc: error exporting `never`") } impl ProcedureDef { @@ -88,7 +88,7 @@ impl ProcedureDef { }, &[], )? { - DataType::Tuple(TupleType::Named { fields, .. }) if fields.len() == 0 => never(), + DataType::Tuple(TupleType::Named { fields, .. }) if fields.is_empty() => never(), t => t, }, result: TResult::reference( diff --git a/crates/core/src/router.rs b/crates/core/src/router.rs index 9d25109a..b2dc3262 100644 --- a/crates/core/src/router.rs +++ b/crates/core/src/router.rs @@ -154,13 +154,13 @@ where Some(dt) => { if let Some(ext) = dt.ext() { if let Some((existing_sid, existing_impl_location)) = - map.insert(dt.name(), (sid, ext.impl_location().clone())) + map.insert(dt.name(), (sid, *ext.impl_location())) { if existing_sid != sid { return Err(ExportError::TsExportErr( TsExportError::DuplicateTypeName( dt.name().clone(), - ext.impl_location().clone(), + *ext.impl_location(), existing_impl_location, ), )); diff --git a/crates/core/src/util.rs b/crates/core/src/util.rs index 2a5a8404..7318522a 100644 --- a/crates/core/src/util.rs +++ b/crates/core/src/util.rs @@ -15,11 +15,12 @@ impl From for PinnedOption { } } +// TODO: Avoid need for manually expanding the `pin_project` macro /* The `cargo expand` output for `pin_project!` so that we can make `PinnedOptionProj` public */ #[doc(hidden)] #[allow(dead_code)] #[allow(single_use_lifetimes)] -#[allow(clippy::unknown_clippy_lints)] +#[allow(clippy::unknown_lints)] #[allow(clippy::mut_mut)] #[allow(clippy::redundant_pub_crate)] #[allow(clippy::ref_option_ref)] @@ -34,7 +35,7 @@ where None, } #[allow(single_use_lifetimes)] -#[allow(clippy::unknown_clippy_lints)] +#[allow(clippy::unknown_lints)] #[allow(clippy::used_underscore_binding)] #[allow(unsafe_code)] // <- Custom #[allow(warnings)] // <- Custom diff --git a/crates/httpz/src/httpz_endpoint.rs b/crates/httpz/src/httpz_endpoint.rs index e2faaf50..68eb55f8 100644 --- a/crates/httpz/src/httpz_endpoint.rs +++ b/crates/httpz/src/httpz_endpoint.rs @@ -11,7 +11,7 @@ use std::{ }; use rspc_core::{ - exec::{self, Connection, ExecutorResult}, + exec::{self, ExecutorResult}, Router, }; @@ -127,7 +127,11 @@ where Some(res) => match res { ExecutorResult::Future(fut) => fut.await, ExecutorResult::Response(response) => response, - ExecutorResult::Task(task) => todo!(), + #[allow(clippy::panic)] + ExecutorResult::Task(_) => { + #[cfg(debug_assertions)] + panic!("rspc: unexpected HTTP endpoint returned 'Task'"); + } }, None => unreachable!( "Executor will only return none for a 'stopSubscription' event which is impossible here" @@ -215,7 +219,11 @@ where ExecutorResult::Response(resp) => { responses.push(resp); } - ExecutorResult::Task(task) => todo!(), + #[allow(clippy::panic)] + ExecutorResult::Task(_) => { + #[cfg(debug_assertions)] + panic!("rspc: unexpected HTTP endpoint returned 'Task'"); + } } } diff --git a/crates/httpz/src/lib.rs b/crates/httpz/src/lib.rs index b70a2d25..b5763ff0 100644 --- a/crates/httpz/src/lib.rs +++ b/crates/httpz/src/lib.rs @@ -1,6 +1,18 @@ //! Integrate rspc with a http server so it can be accessed from your frontend. //! //! This is done through [httpz](https://github.com/oscartbeaumont/httpz). +#![warn( + clippy::all, + clippy::cargo, + clippy::unwrap_used, + clippy::panic, + clippy::todo, + clippy::panic_in_result_fn, + // missing_docs +)] +#![forbid(unsafe_code)] +#![allow(clippy::module_inception)] +#![cfg_attr(docsrs, feature(doc_cfg))] mod cookie_jar; mod extractors; diff --git a/crates/tauri/src/lib.rs b/crates/tauri/src/lib.rs index 4c56ba20..e3c0ce4e 100644 --- a/crates/tauri/src/lib.rs +++ b/crates/tauri/src/lib.rs @@ -1,4 +1,16 @@ //! Access rspc via the Tauri IPC bridge. +#![warn( + clippy::all, + clippy::cargo, + clippy::unwrap_used, + clippy::panic, + clippy::todo, + clippy::panic_in_result_fn, + // missing_docs +)] +#![forbid(unsafe_code)] +#![allow(clippy::module_inception)] +#![cfg_attr(docsrs, feature(doc_cfg))] use std::{ collections::{hash_map::DefaultHasher, HashMap}, diff --git a/examples/axum/src/main.rs b/examples/axum/src/main.rs index 0a46df18..cf80f62f 100644 --- a/examples/axum/src/main.rs +++ b/examples/axum/src/main.rs @@ -61,11 +61,11 @@ async fn main() { .procedure("echo", R.query(|_, v: String| Ok(v))) .procedure( "error", - R.query(|_, _: ()| Err(Error("Something went wrong".into())) as Result), + R.query(|_, _: ()| Err(Error("Something went wrong")) as Result), ) .procedure( "error", - R.mutation(|_, _: ()| Err(Error("Something went wrong".into())) as Result), + R.mutation(|_, _: ()| Err(Error("Something went wrong")) as Result), ) .procedure( "transformMe", @@ -100,7 +100,7 @@ async fn main() { yield Ok("ping".to_string()); sleep(Duration::from_secs(1)).await; } - yield Err(Error("Something went wrong".into())); + yield Err(Error("Something went wrong")); } }), ) diff --git a/src/internal/middleware/middleware_layer.rs b/src/internal/middleware/middleware_layer.rs index 11fb7e14..e651f79e 100644 --- a/src/internal/middleware/middleware_layer.rs +++ b/src/internal/middleware/middleware_layer.rs @@ -161,7 +161,7 @@ mod private { Some(resp_fn) => match result { Ok(result) => { resp_fut.set(PinnedOption::Some { - v: (&*resp_fn).call(result), + v: (*resp_fn).call(result), }); continue; } @@ -192,6 +192,7 @@ mod private { self.as_mut().set(Self::Done); return Poll::Ready(None); } + #[allow(clippy::panic)] MiddlewareLayerFutureProj::Done => { #[cfg(debug_assertions)] panic!("`MiddlewareLayerFuture` polled after completion"); diff --git a/src/internal/procedure.rs b/src/internal/procedure.rs index 73ed980b..30829c97 100644 --- a/src/internal/procedure.rs +++ b/src/internal/procedure.rs @@ -10,7 +10,7 @@ use crate::internal::{ StreamMarkerType, }, }; -use rspc_core::internal::{router::Router, ProcedureDef, ProcedureKind}; +use rspc_core::internal::{router::Router, ProcedureKind}; /// TODO: Explain pub struct MissingResolver(PhantomData); diff --git a/src/internal/procedure_store.rs b/src/internal/procedure_store.rs index 7c684081..80dbaef1 100644 --- a/src/internal/procedure_store.rs +++ b/src/internal/procedure_store.rs @@ -1,4 +1,4 @@ -use rspc_core::internal::{BuildError, BuildErrorCause, Layer, ProcedureDef, ProcedureMap}; +use rspc_core::internal::BuildErrorCause; pub(crate) fn is_valid_name(name: &str) -> Option { if name.is_empty() || name.len() > 255 { diff --git a/src/internal/resolver/result.rs b/src/internal/resolver/result.rs index bdd729ea..43757c90 100644 --- a/src/internal/resolver/result.rs +++ b/src/internal/resolver/result.rs @@ -83,7 +83,7 @@ mod private { stream: once(ready( self.map_err(|e| e.into_resolver_error().into()) .and_then(|v| { - Ok(serde_json::to_value(v).map_err(ExecError::SerializingResultErr)?) + serde_json::to_value(v).map_err(ExecError::SerializingResultErr) }), )), } @@ -339,60 +339,6 @@ mod private { } } } - - #[cfg(feature = "tokio")] - pin_project! { - #[project = FutureBlobStreamProj] - pub struct FutureBlobStream - where - F: Future - { - #[pin] - fut: F, - map: fn(F::Output) -> S, - phantom: PhantomData - } - } - - #[cfg(feature = "tokio")] - impl Body for FutureBlobStream { - fn poll_next( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll>> { - todo!("blob unimplemented"); - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - (0, None) - } - } - - #[cfg(feature = "tokio")] - pin_project! { - #[project = BlobStreamProj] - pub struct BlobStream { - #[pin] - stream: S, - // buf: Vec, - } - } - - #[cfg(feature = "tokio")] - impl Body for BlobStream { - fn poll_next( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll>> { - todo!("blob unimplemented"); - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - (0, None) - } - } } pub(crate) use private::{FutureMarkerType, StreamMarkerType};