Skip to content

Commit

Permalink
feat(listen): directly use Arc<dyn, remove enum for listener
Browse files Browse the repository at this point in the history
  • Loading branch information
joelwurtz committed Mar 3, 2025
1 parent 9325527 commit e055a6d
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 59 deletions.
56 changes: 23 additions & 33 deletions io/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,6 @@ macro_rules! default_aio_impl {

use default_aio_impl;

/// A collection of listener types of different protocol.
#[derive(Debug)]
pub enum Listener {
Tcp(TcpListener),
#[cfg(feature = "quic")]
Udp(QuicListener),
#[cfg(unix)]
Unix(UnixListener),
}

type BoxFuture<'f, T> = Pin<Box<dyn Future<Output = T> + Send + 'f>>;

pub trait Listen {
Expand All @@ -126,8 +116,6 @@ pub trait ListenDyn {
fn accept(&self) -> BoxFuture<io::Result<Stream>>;
}

pub type ListenObj = Box<dyn ListenDyn + Send + Sync>;

impl<S> ListenDyn for S
where
S: Listen,
Expand All @@ -148,28 +136,30 @@ where
}
}

impl Listen for Listener {
impl Listen for TcpListener {
async fn accept(&self) -> io::Result<Stream> {
match *self {
Self::Tcp(ref tcp) => {
let (stream, addr) = tcp.accept().await?;
let stream = stream.into_std()?;
Ok(Stream::Tcp(stream, addr))
}
#[cfg(feature = "quic")]
Self::Udp(ref udp) => {
let stream = udp.accept().await?;
let addr = stream.peer_addr();
Ok(Stream::Udp(stream, addr))
}
#[cfg(unix)]
Self::Unix(ref unix) => {
let (stream, _) = unix.accept().await?;
let stream = stream.into_std()?;
let addr = stream.peer_addr()?;
Ok(Stream::Unix(stream, addr))
}
}
let (stream, addr) = self.accept().await?;
let stream = stream.into_std()?;
Ok(Stream::Tcp(stream, addr))
}
}

#[cfg(feature = "quic")]
impl Listen for QuicListener {
async fn accept(&self) -> io::Result<Stream> {
let stream = self.accept().await?;
let addr = stream.peer_addr();
Ok(Stream::Udp(stream, addr))
}
}

#[cfg(unix)]
impl Listen for UnixListener {
async fn accept(&self) -> io::Result<Stream> {
let (stream, _) = self.accept().await?;
let stream = stream.into_std()?;
let addr = stream.peer_addr()?;
Ok(Stream::Unix(stream, addr))
}
}

Expand Down
4 changes: 2 additions & 2 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::{collections::HashMap, future::Future, io, pin::Pin, time::Duration};
#[cfg(not(target_family = "wasm"))]
use std::net;

use xitca_io::net::{ListenObj, Stream};
use xitca_io::net::Stream;

use crate::{
net::IntoListener,
net::{IntoListener, ListenObj},
server::{IntoServiceObj, Server, ServerFuture, ServiceObj},
};

Expand Down
12 changes: 7 additions & 5 deletions server/src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::io;
use std::{io, sync::Arc};

#[cfg(feature = "quic")]
use xitca_io::net::QuicListenerBuilder;
#[cfg(unix)]
use xitca_io::net::UnixListener;
use xitca_io::net::{ListenObj, Listener, TcpListener};
use xitca_io::net::{ListenDyn, TcpListener};

use tracing::info;

pub type ListenObj = Arc<dyn ListenDyn + Send + Sync>;

/// Helper trait for converting listener types and register them to xitca-server
/// By delay the conversion and make the process happen in server thread(s) it avoid possible panic due to runtime locality.
pub trait IntoListener: Send {
Expand All @@ -19,7 +21,7 @@ impl IntoListener for std::net::TcpListener {
self.set_nonblocking(true)?;
let listener = TcpListener::from_std(self)?;
info!("Started Tcp listening on: {:?}", listener.local_addr().ok());
Ok(Box::new(Listener::Tcp(listener)))
Ok(Arc::new(listener))
}
}

Expand All @@ -29,7 +31,7 @@ impl IntoListener for std::os::unix::net::UnixListener {
self.set_nonblocking(true)?;
let listener = UnixListener::from_std(self)?;
info!("Started Unix listening on: {:?}", listener.local_addr().ok());
Ok(Box::new(Listener::Unix(listener)))
Ok(Arc::new(listener))
}
}

Expand All @@ -38,6 +40,6 @@ impl IntoListener for QuicListenerBuilder {
fn into_listener(self) -> io::Result<ListenObj> {
let udp = self.build()?;
info!("Started Udp listening on: {:?}", udp.endpoint().local_addr().ok());
Ok(Box::new(Listener::Udp(udp)))
Ok(Arc::new(udp))
}
}
12 changes: 2 additions & 10 deletions server/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ impl Server {
let fut = async {
listeners
.into_iter()
.flat_map(|(name, listeners)| {
listeners
.into_iter()
.map(move |l| l().map(|l| (name.to_owned(), Arc::new(l))))
})
.flat_map(|(name, listeners)| listeners.into_iter().map(move |l| l().map(|l| (name.to_owned(), l))))
.collect::<Result<Vec<_>, io::Error>>()
};

Expand Down Expand Up @@ -108,11 +104,7 @@ impl Server {
let fut = async {
listeners
.into_iter()
.flat_map(|(name, listeners)| {
listeners
.into_iter()
.map(move |l| l().map(|l| (name.to_owned(), Arc::new(l))))
})
.flat_map(|(name, listeners)| listeners.into_iter().map(move |l| l().map(|l| (name.to_owned(), l))))
.collect::<Result<Vec<_>, io::Error>>()
};

Expand Down
15 changes: 9 additions & 6 deletions server/src/server/service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::{marker::PhantomData, rc::Rc, sync::Arc};
use std::{marker::PhantomData, rc::Rc};

use tokio::task::JoinHandle;
use xitca_io::net::{ListenObj, Stream};
use xitca_io::net::Stream;
use xitca_service::{Service, ready::ReadyService};

use crate::worker::{self, ServiceAny};
use crate::{
net::ListenObj,
worker::{self, ServiceAny},
};

pub type ServiceObj = Box<
dyn for<'a> xitca_service::object::ServiceObject<
(&'a str, &'a [(String, Arc<ListenObj>)]),
(&'a str, &'a [(String, ListenObj)]),
Response = (Vec<JoinHandle<()>>, ServiceAny),
Error = (),
> + Send
Expand All @@ -20,7 +23,7 @@ struct Container<F, Req> {
_t: PhantomData<fn(Req)>,
}

impl<'a, F, Req> Service<(&'a str, &'a [(String, Arc<ListenObj>)])> for Container<F, Req>
impl<'a, F, Req> Service<(&'a str, &'a [(String, ListenObj)])> for Container<F, Req>
where
F: IntoServiceObj<Req>,
Req: TryFrom<Stream> + 'static,
Expand All @@ -30,7 +33,7 @@ where

async fn call(
&self,
(name, listeners): (&'a str, &'a [(String, Arc<ListenObj>)]),
(name, listeners): (&'a str, &'a [(String, ListenObj)]),
) -> Result<Self::Response, Self::Error> {
let service = self.inner.call(()).await.map_err(|_| ())?;
let service = Rc::new(service);
Expand Down
8 changes: 5 additions & 3 deletions server/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ mod shutdown;

use core::{any::Any, sync::atomic::AtomicBool, time::Duration};

use std::{io, rc::Rc, sync::Arc, thread};
use std::{io, rc::Rc, thread};

use tokio::{task::JoinHandle, time::sleep};
use tracing::{error, info};
use xitca_io::net::{ListenDyn, ListenObj, Stream};
use xitca_io::net::Stream;
use xitca_service::{Service, ready::ReadyService};

use crate::net::ListenObj;

use self::shutdown::ShutdownHandle;

// erase Rc<S: ReadyService<_>> type and only use it for counting the reference counter of Rc.
pub(crate) type ServiceAny = Rc<dyn Any>;

pub(crate) fn start<S, Req>(listener: &Arc<ListenObj>, service: &Rc<S>) -> JoinHandle<()>
pub(crate) fn start<S, Req>(listener: &ListenObj, service: &Rc<S>) -> JoinHandle<()>
where
S: ReadyService + Service<Req> + 'static,
S::Ready: 'static,
Expand Down

0 comments on commit e055a6d

Please sign in to comment.