diff --git a/io/src/net.rs b/io/src/net.rs index f160de01..d2043e82 100644 --- a/io/src/net.rs +++ b/io/src/net.rs @@ -106,16 +106,6 @@ macro_rules! default_aio_impl { use default_aio_impl; -/// A collection of listener types of different protocol. -#[derive(Debug)] -pub enum Listener { - Tcp(TcpListener), - #[cfg(feature = "quic")] - Udp(QuicListener), - #[cfg(unix)] - Unix(UnixListener), -} - type BoxFuture<'f, T> = Pin + Send + 'f>>; pub trait Listen { @@ -126,8 +116,6 @@ pub trait ListenDyn { fn accept(&self) -> BoxFuture>; } -pub type ListenObj = Box; - impl ListenDyn for S where S: Listen, @@ -148,28 +136,30 @@ where } } -impl Listen for Listener { +impl Listen for TcpListener { async fn accept(&self) -> io::Result { - match *self { - Self::Tcp(ref tcp) => { - let (stream, addr) = tcp.accept().await?; - let stream = stream.into_std()?; - Ok(Stream::Tcp(stream, addr)) - } - #[cfg(feature = "quic")] - Self::Udp(ref udp) => { - let stream = udp.accept().await?; - let addr = stream.peer_addr(); - Ok(Stream::Udp(stream, addr)) - } - #[cfg(unix)] - Self::Unix(ref unix) => { - let (stream, _) = unix.accept().await?; - let stream = stream.into_std()?; - let addr = stream.peer_addr()?; - Ok(Stream::Unix(stream, addr)) - } - } + let (stream, addr) = self.accept().await?; + let stream = stream.into_std()?; + Ok(Stream::Tcp(stream, addr)) + } +} + +#[cfg(feature = "quic")] +impl Listen for QuicListener { + async fn accept(&self) -> io::Result { + let stream = self.accept().await?; + let addr = stream.peer_addr(); + Ok(Stream::Udp(stream, addr)) + } +} + +#[cfg(unix)] +impl Listen for UnixListener { + async fn accept(&self) -> io::Result { + let (stream, _) = self.accept().await?; + let stream = stream.into_std()?; + let addr = stream.peer_addr()?; + Ok(Stream::Unix(stream, addr)) } } diff --git a/server/src/builder.rs b/server/src/builder.rs index 68f0a247..edb85984 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -3,10 +3,10 @@ use std::{collections::HashMap, future::Future, io, pin::Pin, time::Duration}; #[cfg(not(target_family = "wasm"))] use std::net; -use xitca_io::net::{ListenObj, Stream}; +use xitca_io::net::Stream; use crate::{ - net::IntoListener, + net::{IntoListener, ListenObj}, server::{IntoServiceObj, Server, ServerFuture, ServiceObj}, }; diff --git a/server/src/net/mod.rs b/server/src/net/mod.rs index f7ff7605..9b38f03d 100644 --- a/server/src/net/mod.rs +++ b/server/src/net/mod.rs @@ -1,13 +1,15 @@ -use std::io; +use std::{io, sync::Arc}; #[cfg(feature = "quic")] use xitca_io::net::QuicListenerBuilder; #[cfg(unix)] use xitca_io::net::UnixListener; -use xitca_io::net::{ListenObj, Listener, TcpListener}; +use xitca_io::net::{ListenDyn, TcpListener}; use tracing::info; +pub type ListenObj = Arc; + /// Helper trait for converting listener types and register them to xitca-server /// By delay the conversion and make the process happen in server thread(s) it avoid possible panic due to runtime locality. pub trait IntoListener: Send { @@ -19,7 +21,7 @@ impl IntoListener for std::net::TcpListener { self.set_nonblocking(true)?; let listener = TcpListener::from_std(self)?; info!("Started Tcp listening on: {:?}", listener.local_addr().ok()); - Ok(Box::new(Listener::Tcp(listener))) + Ok(Arc::new(listener)) } } @@ -29,7 +31,7 @@ impl IntoListener for std::os::unix::net::UnixListener { self.set_nonblocking(true)?; let listener = UnixListener::from_std(self)?; info!("Started Unix listening on: {:?}", listener.local_addr().ok()); - Ok(Box::new(Listener::Unix(listener))) + Ok(Arc::new(listener)) } } @@ -38,6 +40,6 @@ impl IntoListener for QuicListenerBuilder { fn into_listener(self) -> io::Result { let udp = self.build()?; info!("Started Udp listening on: {:?}", udp.endpoint().local_addr().ok()); - Ok(Box::new(Listener::Udp(udp))) + Ok(Arc::new(udp)) } } diff --git a/server/src/server/mod.rs b/server/src/server/mod.rs index 1dbff3ab..3cb56565 100644 --- a/server/src/server/mod.rs +++ b/server/src/server/mod.rs @@ -46,11 +46,7 @@ impl Server { let fut = async { listeners .into_iter() - .flat_map(|(name, listeners)| { - listeners - .into_iter() - .map(move |l| l().map(|l| (name.to_owned(), Arc::new(l)))) - }) + .flat_map(|(name, listeners)| listeners.into_iter().map(move |l| l().map(|l| (name.to_owned(), l)))) .collect::, io::Error>>() }; @@ -108,11 +104,7 @@ impl Server { let fut = async { listeners .into_iter() - .flat_map(|(name, listeners)| { - listeners - .into_iter() - .map(move |l| l().map(|l| (name.to_owned(), Arc::new(l)))) - }) + .flat_map(|(name, listeners)| listeners.into_iter().map(move |l| l().map(|l| (name.to_owned(), l)))) .collect::, io::Error>>() }; diff --git a/server/src/server/service.rs b/server/src/server/service.rs index 336622e7..fbaf4ab2 100644 --- a/server/src/server/service.rs +++ b/server/src/server/service.rs @@ -1,14 +1,17 @@ -use std::{marker::PhantomData, rc::Rc, sync::Arc}; +use std::{marker::PhantomData, rc::Rc}; use tokio::task::JoinHandle; -use xitca_io::net::{ListenObj, Stream}; +use xitca_io::net::Stream; use xitca_service::{Service, ready::ReadyService}; -use crate::worker::{self, ServiceAny}; +use crate::{ + net::ListenObj, + worker::{self, ServiceAny}, +}; pub type ServiceObj = Box< dyn for<'a> xitca_service::object::ServiceObject< - (&'a str, &'a [(String, Arc)]), + (&'a str, &'a [(String, ListenObj)]), Response = (Vec>, ServiceAny), Error = (), > + Send @@ -20,7 +23,7 @@ struct Container { _t: PhantomData, } -impl<'a, F, Req> Service<(&'a str, &'a [(String, Arc)])> for Container +impl<'a, F, Req> Service<(&'a str, &'a [(String, ListenObj)])> for Container where F: IntoServiceObj, Req: TryFrom + 'static, @@ -30,7 +33,7 @@ where async fn call( &self, - (name, listeners): (&'a str, &'a [(String, Arc)]), + (name, listeners): (&'a str, &'a [(String, ListenObj)]), ) -> Result { let service = self.inner.call(()).await.map_err(|_| ())?; let service = Rc::new(service); diff --git a/server/src/worker/mod.rs b/server/src/worker/mod.rs index 9735ec1d..ed1ea9d3 100644 --- a/server/src/worker/mod.rs +++ b/server/src/worker/mod.rs @@ -2,19 +2,21 @@ mod shutdown; use core::{any::Any, sync::atomic::AtomicBool, time::Duration}; -use std::{io, rc::Rc, sync::Arc, thread}; +use std::{io, rc::Rc, thread}; use tokio::{task::JoinHandle, time::sleep}; use tracing::{error, info}; -use xitca_io::net::{ListenDyn, ListenObj, Stream}; +use xitca_io::net::Stream; use xitca_service::{Service, ready::ReadyService}; +use crate::net::ListenObj; + use self::shutdown::ShutdownHandle; // erase Rc> type and only use it for counting the reference counter of Rc. pub(crate) type ServiceAny = Rc; -pub(crate) fn start(listener: &Arc, service: &Rc) -> JoinHandle<()> +pub(crate) fn start(listener: &ListenObj, service: &Rc) -> JoinHandle<()> where S: ReadyService + Service + 'static, S::Ready: 'static,