Skip to content

Commit

Permalink
move listen traits from io to server (#1221)
Browse files Browse the repository at this point in the history
* move listen traits from io to server

* use a different method name to avoid confusion between Listen and ListenDyn

* ci fix

* use dedicate action for disk space free
  • Loading branch information
fakeshadow authored Mar 6, 2025
1 parent cab9658 commit a3eb126
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 81 deletions.
24 changes: 16 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,15 @@ jobs:

- name: Free up disk space
if: matrix.target.os == 'ubuntu-latest'
run: sudo rm -rf /usr/share/dotnet && sudo rm -rf /usr/local/lib/android && sudo rm -rf /opt/ghc && sudo rm -rf "/usr/local/share/boost" && sudo rm -rf "$AGENT_TOOLSDIRECTORY"
uses: jlumbroso/free-disk-space@main
with:
tool-cache: false
android: true
dotnet: true
haskell: true
large-packages: true
docker-images: true
swap-storage: true

- name: Cache Dependencies
uses: Swatinem/rust-cache@v2
Expand All @@ -47,7 +55,7 @@ jobs:
- name: check-linux
if: matrix.target.os == 'ubuntu-latest'
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-check-http"
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/home/runner/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-check-http"
check_web:
strategy:
Expand Down Expand Up @@ -89,7 +97,7 @@ jobs:
- name: check-linux
if: matrix.target.os == 'ubuntu-latest'
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-check-web"
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/home/runner/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-check-web"
check_client:
strategy:
Expand Down Expand Up @@ -131,7 +139,7 @@ jobs:
- name: check-linux
if: matrix.target.os == 'ubuntu-latest'
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-check-client"
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/home/runner/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-check-client"
check_other:
strategy:
Expand Down Expand Up @@ -173,7 +181,7 @@ jobs:
- name: check-linux
if: matrix.target.os == 'ubuntu-latest'
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-check-other"
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/home/runner/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-check-other"
test_linux:
strategy:
Expand Down Expand Up @@ -223,12 +231,12 @@ jobs:
- name: test-linux-other
if: matrix.target.os == 'ubuntu-latest'
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-other"
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/home/runner/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-other"
- name: test-linux-test
if: matrix.target.os == 'ubuntu-latest'
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-test"
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/home/runner/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-test"
check_examples:
strategy:
Expand Down Expand Up @@ -275,4 +283,4 @@ jobs:
- name: check-linux
if: matrix.target.os == 'ubuntu-latest'
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && cd examples && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-check-examples"
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/home/runner/.cargo/bin && cd examples && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-check-examples"
51 changes: 1 addition & 50 deletions io/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ pub use tcp::{TcpListener, TcpStream};
#[cfg(unix)]
pub use unix::{UnixListener, UnixStream};

use core::{future::Future, net::SocketAddr, pin::Pin};
use std::io;
use core::net::SocketAddr;

macro_rules! default_aio_impl {
($ty: ty) => {
Expand Down Expand Up @@ -106,54 +105,6 @@ macro_rules! default_aio_impl {

use default_aio_impl;

type BoxFuture<'f, T> = Pin<Box<dyn Future<Output = T> + Send + 'f>>;

pub trait Listen: Send + Sync {
fn accept(&self) -> impl Future<Output = io::Result<Stream>> + Send;
}

#[doc(hidden)]
pub trait ListenDyn: Send + Sync {
fn accept(&self) -> BoxFuture<io::Result<Stream>>;
}

impl<S> ListenDyn for S
where
S: Listen,
{
#[inline]
fn accept(&self) -> BoxFuture<io::Result<Stream>> {
Box::pin(Listen::accept(self))
}
}

impl Listen for TcpListener {
async fn accept(&self) -> io::Result<Stream> {
let (stream, addr) = self.accept().await?;
let stream = stream.into_std()?;
Ok(Stream::Tcp(stream, addr))
}
}

#[cfg(feature = "quic")]
impl Listen for QuicListener {
async fn accept(&self) -> io::Result<Stream> {
let stream = self.accept().await?;
let addr = stream.peer_addr();
Ok(Stream::Udp(stream, addr))
}
}

#[cfg(unix)]
impl Listen for UnixListener {
async fn accept(&self) -> io::Result<Stream> {
let (stream, _) = self.accept().await?;
let stream = stream.into_std()?;
let addr = stream.peer_addr()?;
Ok(Stream::Unix(stream, addr))
}
}

/// A collection of stream types of different protocol.
#[allow(clippy::large_enum_variant)]
pub enum Stream {
Expand Down
17 changes: 8 additions & 9 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use std::net;

use std::sync::Arc;

use xitca_io::net::{ListenDyn, Stream};
use xitca_io::net::Stream;

use crate::{
net::IntoListener,
net::{IntoListener, ListenerDyn},
server::{IntoServiceObj, Server, ServerFuture, ServiceObj},
};

type ListenerFn = Box<dyn FnOnce() -> io::Result<Arc<dyn ListenDyn>> + Send>;
type ListenerFn = Box<dyn FnOnce() -> io::Result<ListenerDyn> + Send>;

pub struct Builder {
pub(crate) server_threads: usize,
Expand Down Expand Up @@ -145,9 +145,7 @@ impl Builder {
self.listeners
.entry(name.as_ref().to_string())
.or_default()
.push(Box::new(|| {
listener.into_listener().map(|l| Arc::new(l) as Arc<dyn ListenDyn>)
}));
.push(Box::new(|| listener.into_listener().map(|l| Arc::new(l) as _)));

self.factories.insert(name.as_ref().to_string(), service.into_object());

Expand Down Expand Up @@ -245,9 +243,10 @@ impl Builder {

let builder = xitca_io::net::QuicListenerBuilder::new(addr, config).backlog(self.backlog);

self.listeners.get_mut(name.as_ref()).unwrap().push(Box::new(|| {
builder.into_listener().map(|l| Arc::new(l) as Arc<dyn ListenDyn>)
}));
self.listeners
.get_mut(name.as_ref())
.unwrap()
.push(Box::new(|| builder.into_listener().map(|l| Arc::new(l) as _)));

Ok(self)
}
Expand Down
112 changes: 108 additions & 4 deletions server/src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,119 @@
use std::io;
use std::{io, sync::Arc};

use xitca_io::net::{Stream, TcpListener};

#[cfg(feature = "quic")]
use xitca_io::net::{QuicListener, QuicListenerBuilder};
#[cfg(unix)]
use xitca_io::net::UnixListener;
use xitca_io::net::{Listen, TcpListener};

use tracing::info;
#[cfg(feature = "quic")]
use xitca_io::net::{QuicListener, QuicListenerBuilder};

/// trait for defining how socket listener would accept remote connection and omit connection stream asynchronously
///
/// listener must be thread safe type for parallel accessing by multiple worker threads.
///
/// # Examples
/// ```rust
/// use std::io;
///
/// use xitca_io::net::Stream;
/// use xitca_server::net::{IntoListener, Listen};
/// use xitca_service::fn_service;
///
/// // arbitrary socket type
/// struct MySocket;
///
/// impl Listen for MySocket {
/// async fn accept(&self) -> io::Result<Stream> {
/// todo!("defining how my socket would accept remote connection in the type of Stream")
/// }
/// }
///
/// // arbitrary builder type for socket. allow for additional logic when constructing socket type
/// struct MySocketBuilder;
///
/// impl IntoListener for MySocketBuilder {
/// type Listener = MySocket;
///
/// fn into_listener(self) -> io::Result<Self::Listener> {
/// // transform socket builder to the socket runner type.
/// // this function is called from inside xitca-server and it's possible to tap into it's internal from here.
/// // e.g: accessing the thread local storage or the async runtime(tokio)'s context.
/// Ok(MySocket)
/// }
/// }
///
/// // service function receive connection stream from MySocket's Listen::accept method
/// let service = fn_service(async |stream: Stream| {
/// Ok::<_, io::Error>(())
/// });
///
/// // start a server with socket builder where My socket would be instantiated and it's accepting logic would start and
/// // run the service function when successfully accepted remote connection.
/// let _ = xitca_server::Builder::new().listen("my_socket_service", MySocketBuilder, service);
/// ```
pub trait Listen: Send + Sync {
fn accept(&self) -> impl Future<Output = io::Result<Stream>> + Send;
}

mod _seal {
use core::{future::Future, pin::Pin};

use super::*;

type BoxFuture<'f, T> = Pin<Box<dyn Future<Output = T> + Send + 'f>>;

#[doc(hidden)]
/// dynamic compat trait for [Listen]
pub trait ListenDyn: Send + Sync {
fn accept_dyn(&self) -> BoxFuture<io::Result<Stream>>;
}

impl<S> ListenDyn for S
where
S: Listen,
{
#[inline]
fn accept_dyn(&self) -> BoxFuture<io::Result<Stream>> {
Box::pin(Listen::accept(self))
}
}
}

pub(crate) type ListenerDyn = Arc<dyn _seal::ListenDyn>;

impl Listen for TcpListener {
async fn accept(&self) -> io::Result<Stream> {
let (stream, addr) = self.accept().await?;
let stream = stream.into_std()?;
Ok(Stream::Tcp(stream, addr))
}
}

#[cfg(unix)]
impl Listen for UnixListener {
async fn accept(&self) -> io::Result<Stream> {
let (stream, _) = self.accept().await?;
let stream = stream.into_std()?;
let addr = stream.peer_addr()?;
Ok(Stream::Unix(stream, addr))
}
}

#[cfg(feature = "quic")]
impl Listen for QuicListener {
async fn accept(&self) -> io::Result<Stream> {
let stream = self.accept().await?;
let addr = stream.peer_addr();
Ok(Stream::Udp(stream, addr))
}
}

/// Helper trait for converting listener types and register them to xitca-server
/// By delay the conversion and make the process happen in server thread(s) it avoid possible panic due to runtime locality.
///
/// This trait is often utilized together with [Listen] trait. Please reference it's doc for examples.
pub trait IntoListener: Send {
type Listener: Listen;

Expand Down
17 changes: 11 additions & 6 deletions server/src/server/service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::{marker::PhantomData, rc::Rc, sync::Arc};
use core::marker::PhantomData;

use std::rc::Rc;

use tokio::task::JoinHandle;
use xitca_io::net::{ListenDyn, Stream};
use xitca_io::net::Stream;
use xitca_service::{Service, ready::ReadyService};

use crate::worker::{self, ServiceAny};
use crate::{
net::ListenerDyn,
worker::{self, ServiceAny},
};

pub type ServiceObj = Box<
dyn for<'a> xitca_service::object::ServiceObject<
(&'a str, &'a [(String, Arc<dyn ListenDyn>)]),
(&'a str, &'a [(String, ListenerDyn)]),
Response = (Vec<JoinHandle<()>>, ServiceAny),
Error = (),
> + Send
Expand All @@ -20,7 +25,7 @@ struct Container<F, Req> {
_t: PhantomData<fn(Req)>,
}

impl<'a, F, Req> Service<(&'a str, &'a [(String, Arc<dyn ListenDyn>)])> for Container<F, Req>
impl<'a, F, Req> Service<(&'a str, &'a [(String, ListenerDyn)])> for Container<F, Req>
where
F: IntoServiceObj<Req>,
Req: TryFrom<Stream> + 'static,
Expand All @@ -30,7 +35,7 @@ where

async fn call(
&self,
(name, listeners): (&'a str, &'a [(String, Arc<dyn ListenDyn>)]),
(name, listeners): (&'a str, &'a [(String, ListenerDyn)]),
) -> Result<Self::Response, Self::Error> {
let service = self.inner.call(()).await.map_err(|_| ())?;
let service = Rc::new(service);
Expand Down
10 changes: 6 additions & 4 deletions server/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ mod shutdown;

use core::{any::Any, sync::atomic::AtomicBool, time::Duration};

use std::{io, rc::Rc, sync::Arc, thread};
use std::{io, rc::Rc, thread};

use tokio::{task::JoinHandle, time::sleep};
use tracing::{error, info};
use xitca_io::net::{ListenDyn, Stream};
use xitca_io::net::Stream;
use xitca_service::{Service, ready::ReadyService};

use crate::net::ListenerDyn;

use self::shutdown::ShutdownHandle;

// erase Rc<S: ReadyService<_>> type and only use it for counting the reference counter of Rc.
pub(crate) type ServiceAny = Rc<dyn Any>;

pub(crate) fn start<S, Req>(listener: &Arc<dyn ListenDyn>, service: &Rc<S>) -> JoinHandle<()>
pub(crate) fn start<S, Req>(listener: &ListenerDyn, service: &Rc<S>) -> JoinHandle<()>
where
S: ReadyService + Service<Req> + 'static,
S::Ready: 'static,
Expand All @@ -27,7 +29,7 @@ where
loop {
let ready = service.ready().await;

match listener.accept().await {
match listener.accept_dyn().await {
Ok(stream) => {
if let Ok(req) = TryFrom::try_from(stream) {
let service = service.clone();
Expand Down

0 comments on commit a3eb126

Please sign in to comment.