diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 013cc6e72..48c952b64 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -38,9 +38,19 @@ transport = [ "channel", "dep:h2", "dep:hyper", + "hyper/full", "tokio/net", "tokio/time", "dep:tower", + "tower/balance", + "dep:hyper-timeout", +] +client = [ + "channel", + "dep:h2", + "hyper/client", + "hyper/http2", + "dep:tower", "dep:hyper-timeout", ] channel = [] @@ -55,7 +65,7 @@ bytes = "1.0" http = "0.2" tracing = "0.1" -tokio = "1.0.1" +tokio = { version = "1.0.1" } http-body = "0.4.4" percent-encoding = "2.1" pin-project = "1.0.11" @@ -70,7 +80,7 @@ async-trait = {version = "0.1.13", optional = true} # transport h2 = {version = "0.3.17", optional = true} -hyper = {version = "0.14.26", features = ["full"], optional = true} +hyper = {version = "0.14.26", default-features = false, optional = true} hyper-timeout = {version = "0.4", optional = true} tokio-stream = "0.1" tower = {version = "0.4.7", default-features = false, features = ["balance", "buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true} @@ -88,6 +98,9 @@ webpki-roots = { version = "0.25.0", optional = true } flate2 = {version = "1.0", optional = true} zstd = { version = "0.12.3", optional = true } +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen-futures = "0.4.38" + [dev-dependencies] bencher = "0.1.5" quickcheck = "1.0" diff --git a/tonic/src/lib.rs b/tonic/src/lib.rs index 8aa80a121..719f967ab 100644 --- a/tonic/src/lib.rs +++ b/tonic/src/lib.rs @@ -101,7 +101,7 @@ pub mod metadata; pub mod server; pub mod service; -#[cfg(feature = "transport")] +#[cfg(any(feature = "transport", feature = "client"))] #[cfg_attr(docsrs, doc(cfg(feature = "transport")))] pub mod transport; diff --git a/tonic/src/transport/channel/endpoint.rs b/tonic/src/transport/channel/endpoint.rs index 6aacb57a5..6922d6903 100644 --- a/tonic/src/transport/channel/endpoint.rs +++ b/tonic/src/transport/channel/endpoint.rs @@ -27,10 +27,15 @@ pub struct Endpoint { pub(crate) buffer_size: Option, pub(crate) init_stream_window_size: Option, pub(crate) init_connection_window_size: Option, + #[cfg(feature = "transport")] pub(crate) tcp_keepalive: Option, + #[cfg(feature = "transport")] pub(crate) tcp_nodelay: bool, + #[cfg(feature = "transport")] pub(crate) http2_keep_alive_interval: Option, + #[cfg(feature = "transport")] pub(crate) http2_keep_alive_timeout: Option, + #[cfg(feature = "transport")] pub(crate) http2_keep_alive_while_idle: Option, pub(crate) connect_timeout: Option, pub(crate) http2_adaptive_window: Option, @@ -167,6 +172,7 @@ impl Endpoint { /// /// Default is no keepalive (`None`) /// + #[cfg(feature = "transport")] pub fn tcp_keepalive(self, tcp_keepalive: Option) -> Self { Endpoint { tcp_keepalive, @@ -251,6 +257,7 @@ impl Endpoint { } /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default. + #[cfg(feature = "transport")] pub fn tcp_nodelay(self, enabled: bool) -> Self { Endpoint { tcp_nodelay: enabled, @@ -259,6 +266,7 @@ impl Endpoint { } /// Set http2 KEEP_ALIVE_INTERVAL. Uses `hyper`'s default otherwise. + #[cfg(feature = "transport")] pub fn http2_keep_alive_interval(self, interval: Duration) -> Self { Endpoint { http2_keep_alive_interval: Some(interval), @@ -267,6 +275,7 @@ impl Endpoint { } /// Set http2 KEEP_ALIVE_TIMEOUT. Uses `hyper`'s default otherwise. + #[cfg(feature = "transport")] pub fn keep_alive_timeout(self, duration: Duration) -> Self { Endpoint { http2_keep_alive_timeout: Some(duration), @@ -275,6 +284,7 @@ impl Endpoint { } /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses `hyper`'s default otherwise. + #[cfg(feature = "transport")] pub fn keep_alive_while_idle(self, enabled: bool) -> Self { Endpoint { http2_keep_alive_while_idle: Some(enabled), @@ -312,6 +322,7 @@ impl Endpoint { } /// Create a channel from this config. + #[cfg(feature = "transport")] pub async fn connect(&self) -> Result { let mut http = hyper::client::connect::HttpConnector::new(); http.enforce_http(false); @@ -333,6 +344,7 @@ impl Endpoint { /// /// The channel returned by this method does not attempt to connect to the endpoint until first /// use. + #[cfg(feature = "transport")] pub fn connect_lazy(&self) -> Channel { let mut http = hyper::client::connect::HttpConnector::new(); http.enforce_http(false); @@ -421,14 +433,19 @@ impl From for Endpoint { buffer_size: None, init_stream_window_size: None, init_connection_window_size: None, + #[cfg(feature = "transport")] tcp_keepalive: None, + #[cfg(feature = "transport")] tcp_nodelay: true, + #[cfg(feature = "transport")] http2_keep_alive_interval: None, + #[cfg(feature = "transport")] http2_keep_alive_timeout: None, + #[cfg(feature = "transport")] http2_keep_alive_while_idle: None, connect_timeout: None, http2_adaptive_window: None, - executor: SharedExec::tokio(), + executor: SharedExec::default_exec(), } } } diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index b510a6980..e85f21710 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -9,7 +9,9 @@ pub use endpoint::Endpoint; #[cfg(feature = "tls")] pub use tls::ClientTlsConfig; -use super::service::{Connection, DynamicServiceStream, SharedExec}; +use super::service::Connection; +#[cfg(feature = "transport")] +use super::service::{DynamicServiceStream, SharedExec}; use crate::body::BoxBody; use crate::transport::Executor; use bytes::Bytes; @@ -18,22 +20,26 @@ use http::{ Request, Response, }; use hyper::client::connect::Connection as HyperConnection; +#[cfg(feature = "transport")] +use std::hash::Hash; use std::{ fmt, future::Future, - hash::Hash, pin::Pin, task::{ready, Context, Poll}, }; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - sync::mpsc::{channel, Sender}, +use tokio::io::{AsyncRead, AsyncWrite}; + +#[cfg(feature = "transport")] +use tokio::sync::mpsc::{channel, Sender}; +#[cfg(feature = "transport")] +use tower::{ + balance::p2c::Balance, + discover::{Change, Discover}, }; -use tower::balance::p2c::Balance; use tower::{ buffer::{self, Buffer}, - discover::{Change, Discover}, util::{BoxService, Either}, Service, }; @@ -109,6 +115,7 @@ impl Channel { /// /// This creates a [`Channel`] that will load balance across all the /// provided endpoints. + #[cfg(feature = "transport")] pub fn balance_list(list: impl Iterator) -> Self { let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE); list.for_each(|endpoint| { @@ -122,11 +129,12 @@ impl Channel { /// Balance a list of [`Endpoint`]'s. /// /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. + #[cfg(feature = "transport")] pub fn balance_channel(capacity: usize) -> (Self, Sender>) where K: Hash + Eq + Send + Clone + 'static, { - Self::balance_channel_with_executor(capacity, SharedExec::tokio()) + Self::balance_channel_with_executor(capacity, SharedExec::default_exec()) } /// Balance a list of [`Endpoint`]'s. @@ -134,6 +142,7 @@ impl Channel { /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. /// /// The [`Channel`] will use the given executor to spawn async tasks. + #[cfg(feature = "transport")] pub fn balance_channel_with_executor( capacity: usize, executor: E, @@ -183,6 +192,7 @@ impl Channel { Ok(Channel { svc }) } + #[cfg(feature = "transport")] pub(crate) fn balance(discover: D, buffer_size: usize, executor: E) -> Self where D: Discover + Unpin + Send + 'static, diff --git a/tonic/src/transport/mod.rs b/tonic/src/transport/mod.rs index a0435c797..ff349cac2 100644 --- a/tonic/src/transport/mod.rs +++ b/tonic/src/transport/mod.rs @@ -88,6 +88,7 @@ //! [rustls]: https://docs.rs/rustls/0.16.0/rustls/ pub mod channel; +#[cfg(feature = "transport")] pub mod server; mod error; @@ -100,6 +101,7 @@ mod tls; #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] pub use self::channel::{Channel, Endpoint}; pub use self::error::Error; +#[cfg(feature = "transport")] #[doc(inline)] pub use self::server::Server; #[doc(inline)] @@ -107,6 +109,7 @@ pub use self::service::grpc_timeout::TimeoutExpired; #[cfg(feature = "tls")] #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] pub use self::tls::Certificate; +#[cfg(feature = "transport")] pub use axum::{body::BoxBody as AxumBoxBody, Router as AxumRouter}; pub use hyper::{Body, Uri}; diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 46a88dda5..d70f52992 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -1,16 +1,12 @@ -use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent}; -use crate::{ - body::BoxBody, - transport::{BoxFuture, Endpoint}, +use std::{ + fmt, + task::{Context, Poll}, }; + use http::Uri; use hyper::client::conn::Builder; use hyper::client::connect::Connection as HyperConnection; use hyper::client::service::Connect as HyperConnect; -use std::{ - fmt, - task::{Context, Poll}, -}; use tokio::io::{AsyncRead, AsyncWrite}; use tower::load::Load; use tower::{ @@ -21,6 +17,13 @@ use tower::{ }; use tower_service::Service; +use crate::{ + body::BoxBody, + transport::{BoxFuture, Endpoint}, +}; + +use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent}; + pub(crate) type Request = http::Request; pub(crate) type Response = http::Response; @@ -40,20 +43,32 @@ impl Connection { .http2_initial_stream_window_size(endpoint.init_stream_window_size) .http2_initial_connection_window_size(endpoint.init_connection_window_size) .http2_only(true) - .http2_keep_alive_interval(endpoint.http2_keep_alive_interval) .executor(endpoint.executor.clone()) .clone(); - if let Some(val) = endpoint.http2_keep_alive_timeout { - settings.http2_keep_alive_timeout(val); + if let Some(val) = endpoint.http2_adaptive_window { + settings.http2_adaptive_window(val); } - if let Some(val) = endpoint.http2_keep_alive_while_idle { - settings.http2_keep_alive_while_idle(val); + #[cfg(feature = "transport")] + { + settings.http2_keep_alive_interval(endpoint.http2_keep_alive_interval); + + if let Some(val) = endpoint.http2_keep_alive_timeout { + settings.http2_keep_alive_timeout(val); + } + + if let Some(val) = endpoint.http2_keep_alive_while_idle { + settings.http2_keep_alive_while_idle(val); + } } - if let Some(val) = endpoint.http2_adaptive_window { - settings.http2_adaptive_window(val); + #[cfg(target_arch = "wasm32")] + { + settings + .executor(wasm::Executor) + // reset streams require `Instant::now` which is not available on wasm + .http2_max_concurrent_reset_streams(0); } let stack = ServiceBuilder::new() @@ -126,3 +141,19 @@ impl fmt::Debug for Connection { f.debug_struct("Connection").finish() } } + +#[cfg(target_arch = "wasm32")] +mod wasm { + use std::future::Future; + use std::pin::Pin; + + type BoxSendFuture = Pin + Send>>; + + pub(crate) struct Executor; + + impl hyper::rt::Executor for Executor { + fn execute(&self, fut: BoxSendFuture) { + wasm_bindgen_futures::spawn_local(fut) + } + } +} diff --git a/tonic/src/transport/service/executor.rs b/tonic/src/transport/service/executor.rs index de3cfbe6e..6847b1c00 100644 --- a/tonic/src/transport/service/executor.rs +++ b/tonic/src/transport/service/executor.rs @@ -3,9 +3,11 @@ use std::{future::Future, sync::Arc}; pub(crate) use hyper::rt::Executor; +#[cfg(not(target_arch = "wasm32"))] #[derive(Copy, Clone)] struct TokioExec; +#[cfg(not(target_arch = "wasm32"))] impl Executor for TokioExec where F: Future + Send + 'static, @@ -16,6 +18,23 @@ where } } +#[cfg(target_arch = "wasm32")] +#[derive(Copy, Clone)] +struct WasmBindgenExec; + +#[cfg(target_arch = "wasm32")] +impl Executor for WasmBindgenExec +where + F: Future + 'static, + F::Output: 'static, +{ + fn execute(&self, fut: F) { + wasm_bindgen_futures::spawn_local(async move { + fut.await; + }); + } +} + #[derive(Clone)] pub(crate) struct SharedExec { inner: Arc> + Send + Sync + 'static>, @@ -31,8 +50,11 @@ impl SharedExec { } } - pub(crate) fn tokio() -> Self { - Self::new(TokioExec) + pub(crate) fn default_exec() -> Self { + #[cfg(not(target_arch = "wasm32"))] + return Self::new(TokioExec); + #[cfg(target_arch = "wasm32")] + Self::new(WasmBindgenExec) } } diff --git a/tonic/src/transport/service/io.rs b/tonic/src/transport/service/io.rs index 2230b9b2e..d3c064c88 100644 --- a/tonic/src/transport/service/io.rs +++ b/tonic/src/transport/service/io.rs @@ -1,12 +1,13 @@ -use crate::transport::server::Connected; use hyper::client::connect::{Connected as HyperConnected, Connection}; use std::io; use std::io::IoSlice; use std::pin::Pin; use std::task::{Context, Poll}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -#[cfg(feature = "tls")] -use tokio_rustls::server::TlsStream; + +#[cfg(feature = "transport")] +pub(crate) use server::ServerIo; pub(in crate::transport) trait Io: AsyncRead + AsyncWrite + Send + 'static @@ -29,7 +30,8 @@ impl Connection for BoxedIo { } } -impl Connected for BoxedIo { +#[cfg(feature = "transport")] +impl crate::transport::server::Connected for BoxedIo { type ConnectInfo = NoneConnectInfo; fn connect_info(&self) -> Self::ConnectInfo { @@ -80,120 +82,133 @@ impl AsyncWrite for BoxedIo { } } -pub(crate) enum ServerIo { - Io(IO), - #[cfg(feature = "tls")] - TlsIo(Box>), -} +#[cfg(feature = "transport")] +mod server { + use crate::transport::server::Connected; + use std::io; + use std::io::IoSlice; + use std::pin::Pin; + use std::task::{Context, Poll}; + use tower::util::Either; -use tower::util::Either; - -#[cfg(feature = "tls")] -type ServerIoConnectInfo = - Either<::ConnectInfo, as Connected>::ConnectInfo>; + #[cfg(feature = "tls")] + use tokio_rustls::server::TlsStream; -#[cfg(not(feature = "tls"))] -type ServerIoConnectInfo = Either<::ConnectInfo, ()>; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -impl ServerIo { - pub(in crate::transport) fn new_io(io: IO) -> Self { - Self::Io(io) + pub(crate) enum ServerIo { + Io(IO), + #[cfg(feature = "tls")] + TlsIo(Box>), } #[cfg(feature = "tls")] - pub(in crate::transport) fn new_tls_io(io: TlsStream) -> Self { - Self::TlsIo(Box::new(io)) - } + type ServerIoConnectInfo = + Either<::ConnectInfo, as Connected>::ConnectInfo>; - #[cfg(feature = "tls")] - pub(in crate::transport) fn connect_info(&self) -> ServerIoConnectInfo - where - IO: Connected, - TlsStream: Connected, - { - match self { - Self::Io(io) => Either::A(io.connect_info()), - Self::TlsIo(io) => Either::B(io.connect_info()), + #[cfg(not(feature = "tls"))] + type ServerIoConnectInfo = Either<::ConnectInfo, ()>; + + impl ServerIo { + pub(in crate::transport) fn new_io(io: IO) -> Self { + Self::Io(io) } - } - #[cfg(not(feature = "tls"))] - pub(in crate::transport) fn connect_info(&self) -> ServerIoConnectInfo - where - IO: Connected, - { - match self { - Self::Io(io) => Either::A(io.connect_info()), + #[cfg(feature = "tls")] + pub(in crate::transport) fn new_tls_io(io: TlsStream) -> Self { + Self::TlsIo(Box::new(io)) } - } -} -impl AsyncRead for ServerIo -where - IO: AsyncWrite + AsyncRead + Unpin, -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - match &mut *self { - Self::Io(io) => Pin::new(io).poll_read(cx, buf), - #[cfg(feature = "tls")] - Self::TlsIo(io) => Pin::new(io).poll_read(cx, buf), + #[cfg(feature = "tls")] + pub(in crate::transport) fn connect_info(&self) -> ServerIoConnectInfo + where + IO: Connected, + TlsStream: Connected, + { + match self { + Self::Io(io) => Either::A(io.connect_info()), + Self::TlsIo(io) => Either::B(io.connect_info()), + } } - } -} -impl AsyncWrite for ServerIo -where - IO: AsyncWrite + AsyncRead + Unpin, -{ - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - match &mut *self { - Self::Io(io) => Pin::new(io).poll_write(cx, buf), - #[cfg(feature = "tls")] - Self::TlsIo(io) => Pin::new(io).poll_write(cx, buf), + #[cfg(not(feature = "tls"))] + pub(in crate::transport) fn connect_info(&self) -> ServerIoConnectInfo + where + IO: Connected, + { + match self { + Self::Io(io) => Either::A(io.connect_info()), + } } } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match &mut *self { - Self::Io(io) => Pin::new(io).poll_flush(cx), - #[cfg(feature = "tls")] - Self::TlsIo(io) => Pin::new(io).poll_flush(cx), + impl AsyncRead for ServerIo + where + IO: AsyncWrite + AsyncRead + Unpin, + { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match &mut *self { + Self::Io(io) => Pin::new(io).poll_read(cx, buf), + #[cfg(feature = "tls")] + Self::TlsIo(io) => Pin::new(io).poll_read(cx, buf), + } } } - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match &mut *self { - Self::Io(io) => Pin::new(io).poll_shutdown(cx), - #[cfg(feature = "tls")] - Self::TlsIo(io) => Pin::new(io).poll_shutdown(cx), + impl AsyncWrite for ServerIo + where + IO: AsyncWrite + AsyncRead + Unpin, + { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match &mut *self { + Self::Io(io) => Pin::new(io).poll_write(cx, buf), + #[cfg(feature = "tls")] + Self::TlsIo(io) => Pin::new(io).poll_write(cx, buf), + } } - } - fn poll_write_vectored( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[IoSlice<'_>], - ) -> Poll> { - match &mut *self { - Self::Io(io) => Pin::new(io).poll_write_vectored(cx, bufs), - #[cfg(feature = "tls")] - Self::TlsIo(io) => Pin::new(io).poll_write_vectored(cx, bufs), + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &mut *self { + Self::Io(io) => Pin::new(io).poll_flush(cx), + #[cfg(feature = "tls")] + Self::TlsIo(io) => Pin::new(io).poll_flush(cx), + } } - } - fn is_write_vectored(&self) -> bool { - match self { - Self::Io(io) => io.is_write_vectored(), - #[cfg(feature = "tls")] - Self::TlsIo(io) => io.is_write_vectored(), + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &mut *self { + Self::Io(io) => Pin::new(io).poll_shutdown(cx), + #[cfg(feature = "tls")] + Self::TlsIo(io) => Pin::new(io).poll_shutdown(cx), + } + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + match &mut *self { + Self::Io(io) => Pin::new(io).poll_write_vectored(cx, bufs), + #[cfg(feature = "tls")] + Self::TlsIo(io) => Pin::new(io).poll_write_vectored(cx, bufs), + } + } + + fn is_write_vectored(&self) -> bool { + match self { + Self::Io(io) => io.is_write_vectored(), + #[cfg(feature = "tls")] + Self::TlsIo(io) => io.is_write_vectored(), + } } } } diff --git a/tonic/src/transport/service/mod.rs b/tonic/src/transport/service/mod.rs index 69d850f10..922223c78 100644 --- a/tonic/src/transport/service/mod.rs +++ b/tonic/src/transport/service/mod.rs @@ -1,11 +1,13 @@ mod add_origin; mod connection; mod connector; +#[cfg(feature = "transport")] mod discover; pub(crate) mod executor; pub(crate) mod grpc_timeout; mod io; mod reconnect; +#[cfg(feature = "transport")] mod router; #[cfg(feature = "tls")] mod tls; @@ -14,13 +16,18 @@ mod user_agent; pub(crate) use self::add_origin::AddOrigin; pub(crate) use self::connection::Connection; pub(crate) use self::connector::Connector; +#[cfg(feature = "transport")] pub(crate) use self::discover::DynamicServiceStream; pub(crate) use self::executor::SharedExec; +#[cfg(feature = "transport")] pub(crate) use self::grpc_timeout::GrpcTimeout; +#[cfg(feature = "transport")] pub(crate) use self::io::ServerIo; #[cfg(feature = "tls")] pub(crate) use self::tls::{TlsAcceptor, TlsConnector}; pub(crate) use self::user_agent::UserAgent; +#[cfg(feature = "transport")] pub use self::router::Routes; +#[cfg(feature = "transport")] pub use self::router::RoutesBuilder;