Skip to content

Commit

Permalink
feat(server): allow custom implementations of listener (#1217)
Browse files Browse the repository at this point in the history
* feat(server): allow custom implementations of listener

* feat(listen): directly use Arc<dyn, remove enum for listener

* feat(listen): hide listen dyn requirement at most at possible

* feat(listener): remove listen implement for box<listendyn>
  • Loading branch information
joelwurtz authored Mar 4, 2025
1 parent 915b14d commit cab9658
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 64 deletions.
76 changes: 44 additions & 32 deletions io/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ pub use tcp::{TcpListener, TcpStream};
#[cfg(unix)]
pub use unix::{UnixListener, UnixStream};

use core::net::SocketAddr;

use core::{future::Future, net::SocketAddr, pin::Pin};
use std::io;

macro_rules! default_aio_impl {
Expand Down Expand Up @@ -107,38 +106,51 @@ macro_rules! default_aio_impl {

use default_aio_impl;

/// A collection of listener types of different protocol.
#[derive(Debug)]
pub enum Listener {
Tcp(TcpListener),
#[cfg(feature = "quic")]
Udp(QuicListener),
#[cfg(unix)]
Unix(UnixListener),
type BoxFuture<'f, T> = Pin<Box<dyn Future<Output = T> + Send + 'f>>;

pub trait Listen: Send + Sync {
fn accept(&self) -> impl Future<Output = io::Result<Stream>> + Send;
}

impl Listener {
pub async fn accept(&self) -> io::Result<Stream> {
match *self {
Self::Tcp(ref tcp) => {
let (stream, addr) = tcp.accept().await?;
let stream = stream.into_std()?;
Ok(Stream::Tcp(stream, addr))
}
#[cfg(feature = "quic")]
Self::Udp(ref udp) => {
let stream = udp.accept().await?;
let addr = stream.peer_addr();
Ok(Stream::Udp(stream, addr))
}
#[cfg(unix)]
Self::Unix(ref unix) => {
let (stream, _) = unix.accept().await?;
let stream = stream.into_std()?;
let addr = stream.peer_addr()?;
Ok(Stream::Unix(stream, addr))
}
}
#[doc(hidden)]
pub trait ListenDyn: Send + Sync {
fn accept(&self) -> BoxFuture<io::Result<Stream>>;
}

impl<S> ListenDyn for S
where
S: Listen,
{
#[inline]
fn accept(&self) -> BoxFuture<io::Result<Stream>> {
Box::pin(Listen::accept(self))
}
}

impl Listen for TcpListener {
async fn accept(&self) -> io::Result<Stream> {
let (stream, addr) = self.accept().await?;
let stream = stream.into_std()?;
Ok(Stream::Tcp(stream, addr))
}
}

#[cfg(feature = "quic")]
impl Listen for QuicListener {
async fn accept(&self) -> io::Result<Stream> {
let stream = self.accept().await?;
let addr = stream.peer_addr();
Ok(Stream::Udp(stream, addr))
}
}

#[cfg(unix)]
impl Listen for UnixListener {
async fn accept(&self) -> io::Result<Stream> {
let (stream, _) = self.accept().await?;
let stream = stream.into_std()?;
let addr = stream.peer_addr()?;
Ok(Stream::Unix(stream, addr))
}
}

Expand Down
17 changes: 10 additions & 7 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ use std::{collections::HashMap, future::Future, io, pin::Pin, time::Duration};
#[cfg(not(target_family = "wasm"))]
use std::net;

use xitca_io::net::{Listener, Stream};
use std::sync::Arc;

use xitca_io::net::{ListenDyn, Stream};

use crate::{
net::IntoListener,
server::{IntoServiceObj, Server, ServerFuture, ServiceObj},
};

type ListenerFn = Box<dyn FnOnce() -> io::Result<Listener> + Send>;
type ListenerFn = Box<dyn FnOnce() -> io::Result<Arc<dyn ListenDyn>> + Send>;

pub struct Builder {
pub(crate) server_threads: usize,
Expand Down Expand Up @@ -143,7 +145,9 @@ impl Builder {
self.listeners
.entry(name.as_ref().to_string())
.or_default()
.push(Box::new(|| listener.into_listener()));
.push(Box::new(|| {
listener.into_listener().map(|l| Arc::new(l) as Arc<dyn ListenDyn>)
}));

self.factories.insert(name.as_ref().to_string(), service.into_object());

Expand Down Expand Up @@ -241,10 +245,9 @@ impl Builder {

let builder = xitca_io::net::QuicListenerBuilder::new(addr, config).backlog(self.backlog);

self.listeners
.get_mut(name.as_ref())
.unwrap()
.push(Box::new(|| builder.into_listener()));
self.listeners.get_mut(name.as_ref()).unwrap().push(Box::new(|| {
builder.into_listener().map(|l| Arc::new(l) as Arc<dyn ListenDyn>)
}));

Ok(self)
}
Expand Down
26 changes: 17 additions & 9 deletions server/src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,51 @@
use std::io;

#[cfg(feature = "quic")]
use xitca_io::net::QuicListenerBuilder;
use xitca_io::net::{QuicListener, QuicListenerBuilder};
#[cfg(unix)]
use xitca_io::net::UnixListener;
use xitca_io::net::{Listener, TcpListener};
use xitca_io::net::{Listen, TcpListener};

use tracing::info;

/// Helper trait for converting listener types and register them to xitca-server
/// By delay the conversion and make the process happen in server thread(s) it avoid possible panic due to runtime locality.
pub trait IntoListener: Send {
fn into_listener(self) -> io::Result<Listener>;
type Listener: Listen;

fn into_listener(self) -> io::Result<Self::Listener>;
}

impl IntoListener for std::net::TcpListener {
fn into_listener(self) -> io::Result<Listener> {
type Listener = TcpListener;

fn into_listener(self) -> io::Result<Self::Listener> {
self.set_nonblocking(true)?;
let listener = TcpListener::from_std(self)?;
info!("Started Tcp listening on: {:?}", listener.local_addr().ok());
Ok(Listener::Tcp(listener))
Ok(listener)
}
}

#[cfg(unix)]
impl IntoListener for std::os::unix::net::UnixListener {
fn into_listener(self) -> io::Result<Listener> {
type Listener = UnixListener;

fn into_listener(self) -> io::Result<Self::Listener> {
self.set_nonblocking(true)?;
let listener = UnixListener::from_std(self)?;
info!("Started Unix listening on: {:?}", listener.local_addr().ok());
Ok(Listener::Unix(listener))
Ok(listener)
}
}

#[cfg(feature = "quic")]
impl IntoListener for QuicListenerBuilder {
fn into_listener(self) -> io::Result<Listener> {
type Listener = QuicListener;

fn into_listener(self) -> io::Result<Self::Listener> {
let udp = self.build()?;
info!("Started Udp listening on: {:?}", udp.endpoint().local_addr().ok());
Ok(Listener::Udp(udp))
Ok(udp)
}
}
12 changes: 2 additions & 10 deletions server/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ impl Server {
let fut = async {
listeners
.into_iter()
.flat_map(|(name, listeners)| {
listeners
.into_iter()
.map(move |l| l().map(|l| (name.to_owned(), Arc::new(l))))
})
.flat_map(|(name, listeners)| listeners.into_iter().map(move |l| l().map(|l| (name.to_owned(), l))))
.collect::<Result<Vec<_>, io::Error>>()
};

Expand Down Expand Up @@ -108,11 +104,7 @@ impl Server {
let fut = async {
listeners
.into_iter()
.flat_map(|(name, listeners)| {
listeners
.into_iter()
.map(move |l| l().map(|l| (name.to_owned(), Arc::new(l))))
})
.flat_map(|(name, listeners)| listeners.into_iter().map(move |l| l().map(|l| (name.to_owned(), l))))
.collect::<Result<Vec<_>, io::Error>>()
};

Expand Down
8 changes: 4 additions & 4 deletions server/src/server/service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::{marker::PhantomData, rc::Rc, sync::Arc};

use tokio::task::JoinHandle;
use xitca_io::net::{Listener, Stream};
use xitca_io::net::{ListenDyn, Stream};
use xitca_service::{Service, ready::ReadyService};

use crate::worker::{self, ServiceAny};

pub type ServiceObj = Box<
dyn for<'a> xitca_service::object::ServiceObject<
(&'a str, &'a [(String, Arc<Listener>)]),
(&'a str, &'a [(String, Arc<dyn ListenDyn>)]),
Response = (Vec<JoinHandle<()>>, ServiceAny),
Error = (),
> + Send
Expand All @@ -20,7 +20,7 @@ struct Container<F, Req> {
_t: PhantomData<fn(Req)>,
}

impl<'a, F, Req> Service<(&'a str, &'a [(String, Arc<Listener>)])> for Container<F, Req>
impl<'a, F, Req> Service<(&'a str, &'a [(String, Arc<dyn ListenDyn>)])> for Container<F, Req>
where
F: IntoServiceObj<Req>,
Req: TryFrom<Stream> + 'static,
Expand All @@ -30,7 +30,7 @@ where

async fn call(
&self,
(name, listeners): (&'a str, &'a [(String, Arc<Listener>)]),
(name, listeners): (&'a str, &'a [(String, Arc<dyn ListenDyn>)]),
) -> Result<Self::Response, Self::Error> {
let service = self.inner.call(()).await.map_err(|_| ())?;
let service = Rc::new(service);
Expand Down
4 changes: 2 additions & 2 deletions server/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use std::{io, rc::Rc, sync::Arc, thread};

use tokio::{task::JoinHandle, time::sleep};
use tracing::{error, info};
use xitca_io::net::{Listener, Stream};
use xitca_io::net::{ListenDyn, Stream};
use xitca_service::{Service, ready::ReadyService};

use self::shutdown::ShutdownHandle;

// erase Rc<S: ReadyService<_>> type and only use it for counting the reference counter of Rc.
pub(crate) type ServiceAny = Rc<dyn Any>;

pub(crate) fn start<S, Req>(listener: &Arc<Listener>, service: &Rc<S>) -> JoinHandle<()>
pub(crate) fn start<S, Req>(listener: &Arc<dyn ListenDyn>, service: &Rc<S>) -> JoinHandle<()>
where
S: ReadyService + Service<Req> + 'static,
S::Ready: 'static,
Expand Down

0 comments on commit cab9658

Please sign in to comment.