Skip to content

Commit

Permalink
Shuffle things around to reduce number of pub items at crate root.
Browse files Browse the repository at this point in the history
  • Loading branch information
de-vri-es committed Nov 2, 2020
1 parent ae38da7 commit 80e1b52
Show file tree
Hide file tree
Showing 21 changed files with 140 additions and 106 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
- Added `Server::bind` function.

### Changed
- Use `Vec<u8>` for body data instead of `Box<[u8]>`.
- Changed body date to use `Vec<u8>` instead of `Box<[u8]>`.
- Moved transport traits and implementations to `transport` module.
- Moved some traits to `util` module.

### Removed
- Made `RequestTracker` a private implementation detail.

### Changed
- Renamed `into_transport_default()` to `into_default_transport()`.
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Usually, you will want to spawn a task for each accepted connection that handles

### Transports

Each peer internally uses a [`Transport`].
Each peer internally uses a [`Transport`][transport::Transport].
The transport is responsible for reading and writing raw messages.
By abstracting away the message transport,
the library can expose a single generic [`Peer`] and [`Server`] struct.
Expand Down Expand Up @@ -89,7 +89,7 @@ loop {
[`PeerWriteHandle`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/struct.PeerWriteHandle.html
[`Server`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/struct.Server.html

[`Transport`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/trait.Transport.html
[`transport::Transport`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/transport/trait.Transport.html
[`TcpTransport`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/type.TcpTransport.html
[`UnixStreamTransport`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/type.UnixStreamTransport.html
[`UnixSeqpacketTransport`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/type.UnixSeqpacketTransport.html
Expand Down
2 changes: 1 addition & 1 deletion README.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
[`PeerWriteHandle`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/struct.PeerWriteHandle.html
[`Server`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/struct.Server.html

[`Transport`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/trait.Transport.html
[`transport::Transport`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/transport/trait.Transport.html
[`TcpTransport`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/type.TcpTransport.html
[`UnixStreamTransport`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/type.UnixStreamTransport.html
[`UnixSeqpacketTransport`]: https://docs.rs/fizyr-rpc/latest/fizyr_rpc/type.UnixSeqpacketTransport.html
Expand Down
36 changes: 10 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//!
//! ## Transports
//!
//! Each peer internally uses a [`Transport`].
//! Each peer internally uses a [`Transport`][transport::Transport].
//! The transport is responsible for reading and writing raw messages.
//! By abstracting away the message transport,
//! the library can expose a single generic [`Peer`] and [`Server`] struct.
Expand Down Expand Up @@ -91,14 +91,9 @@ mod peer_handle;
mod request;
mod request_tracker;
mod server;
mod transport;
mod util;

#[cfg(any(feature = "unix-stream", feature = "tcp"))]
mod stream;

#[cfg(feature = "unix-seqpacket")]
mod unix;
pub mod transport;
pub mod util;

pub use message::service_id;
pub use message::Body;
Expand All @@ -115,35 +110,24 @@ pub use request::Incoming;
pub use request::Outgoing;
pub use request::ReceivedRequest;
pub use request::SentRequest;
pub use request_tracker::RequestTracker;
pub use server::Server;
pub use server::ServerListener;
pub use transport::Connect;
pub use transport::IntoTransport;
pub use transport::Transport;
pub use transport::TransportReadHalf;
pub use transport::TransportWriteHalf;

#[cfg(any(feature = "unix-stream", feature = "tcp"))]
pub use stream::StreamBody;
#[cfg(any(feature = "unix-stream", feature = "tcp"))]
pub use stream::StreamConfig;
pub use transport::stream::StreamBody;

#[cfg(any(feature = "unix-stream", feature = "tcp"))]
pub use stream::StreamTransport;

#[cfg(feature = "unix-seqpacket")]
pub use unix::UnixBody;
pub use transport::stream::StreamConfig;

#[cfg(feature = "unix-seqpacket")]
pub use unix::UnixConfig;
pub use transport::unix::UnixBody;

#[cfg(feature = "unix-seqpacket")]
pub use unix::UnixTransport;
pub use transport::unix::UnixConfig;

/// Message transport for TCP.
#[cfg(feature = "tcp")]
pub type TcpTransport = StreamTransport<tokio::net::TcpStream>;
pub type TcpTransport = transport::StreamTransport<tokio::net::TcpStream>;

/// Peer using the TCP transport.
#[cfg(feature = "tcp")]
Expand All @@ -155,7 +139,7 @@ pub type TcpServer = Server<tokio::net::TcpListener>;

/// Message transport for Unix stream sockets.
#[cfg(feature = "unix-stream")]
pub type UnixStreamTransport = StreamTransport<tokio::net::UnixStream>;
pub type UnixStreamTransport = transport::StreamTransport<tokio::net::UnixStream>;

/// Peer using the Unix stream transport.
#[cfg(feature = "unix-stream")]
Expand All @@ -167,7 +151,7 @@ pub type UnixStreamServer = Server<tokio::net::UnixListener>;

/// Message transport for Unix seqpacket sockets.
#[cfg(feature = "unix-seqpacket")]
pub type UnixSeqpacketTransport = UnixTransport<tokio_seqpacket::UnixSeqpacket>;
pub type UnixSeqpacketTransport = transport::UnixTransport<tokio_seqpacket::UnixSeqpacket>;

/// Peer using the Unix seqpacket transport.
#[cfg(feature = "unix-seqpacket")]
Expand Down
6 changes: 5 additions & 1 deletion src/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::error;

/// The encoded length of a message header (excluding the frame size).
/// The encoded length of a message header.
///
/// This does not include the message framing that may be used by a transport.
/// For example, [`StreamTransport`][crate::transport::StreamTransport] preceeds each message
/// by a 32 bit message size.
pub const HEADER_LEN: u32 = 12;

/// The maximum length of a message body.
Expand Down
22 changes: 12 additions & 10 deletions src/peer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::util::{select, Either};
use tokio::sync::mpsc;

use crate::error;
use crate::Incoming;
use crate::Message;
use crate::PeerHandle;
use crate::RequestTracker;
use crate::SentRequest;
use crate::error;
use crate::request_tracker::RequestTracker;
use tokio::sync::oneshot;
use crate::util;

/// Message for the internal peer command loop.
pub enum Command<Body> {
Expand All @@ -25,7 +26,7 @@ pub enum Command<Body> {
/// This struct is used to run the read/write loop of the peer.
/// To send or receive requests and stream messages,
/// you need to use the [`PeerHandle`] instead.
pub struct Peer<Transport: crate::Transport> {
pub struct Peer<Transport: crate::transport::Transport> {
/// The transport to use for sending/receiving messages.
transport: Transport,

Expand Down Expand Up @@ -54,7 +55,7 @@ pub struct Peer<Transport: crate::Transport> {
write_handles: usize,
}

impl<Transport: crate::Transport> Peer<Transport> {
impl<Transport: crate::transport::Transport> Peer<Transport> {
/// Create a new peer and a handle to it.
///
/// The [`Peer`] itself is used to run the read/write loop.
Expand Down Expand Up @@ -113,7 +114,7 @@ impl<Transport: crate::Transport> Peer<Transport> {
pub async fn connect<'a, Address>(address: Address, config: Transport::Config) -> std::io::Result<PeerHandle<Transport::Body>>
where
Address: 'a,
Transport: crate::transport::Connect<'a, Address>,
Transport: util::Connect<'a, Address>,
{
let transport = Transport::connect(address, config).await?;
Ok(Self::spawn(transport))
Expand Down Expand Up @@ -174,7 +175,7 @@ impl<Transport: crate::Transport> Peer<Transport> {
/// Implementation of the read loop of a peer.
struct ReadLoop<R>
where
R: crate::TransportReadHalf,
R: crate::transport::TransportReadHalf,
{
/// The read half of the message transport.
read_half: R,
Expand All @@ -185,7 +186,7 @@ where

impl<R> ReadLoop<R>
where
R: crate::TransportReadHalf,
R: crate::transport::TransportReadHalf,
{
/// Run the read loop.
async fn run(&mut self) {
Expand All @@ -210,7 +211,7 @@ where
/// Implementation of the command loop of a peer.
struct CommandLoop<'a, W>
where
W: crate::TransportWriteHalf,
W: crate::transport::TransportWriteHalf,
{
/// The write half of the message transport.
write_half: W,
Expand All @@ -233,7 +234,7 @@ where

impl<W> CommandLoop<'_, W>
where
W: crate::TransportWriteHalf,
W: crate::transport::TransportWriteHalf,
{
/// Run the command loop.
async fn run(&mut self) {
Expand Down Expand Up @@ -488,7 +489,8 @@ mod test {
use assert2::assert;
use assert2::let_assert;

use crate::{MessageHeader, StreamTransport};
use crate::MessageHeader;
use crate::transport::StreamTransport;
use tokio::net::UnixStream;

#[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions src/request_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl<Body> RequestTracker<Body> {
///
/// This should be called when a request is finished to make the ID available again.
/// Note that received requests are also removed internally when they would receive a message but the [`ReceivedRequest`] was dropped.
#[allow(unused)] // TODO: Evaluate if Peer should be calling this sometimes.
pub fn remove_received_request(&mut self, request_id: u32) -> Result<(), error::UnknownRequestId> {
self.received_requests.remove(&request_id).ok_or(error::UnknownRequestId { request_id })?;
Ok(())
Expand Down
19 changes: 10 additions & 9 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Peer;
use crate::IntoTransport;
use crate::Peer;
use crate::PeerHandle;
use crate::util;

/// Server that spawns peers for all accepted connections.
pub struct Server<Listener>
Expand All @@ -20,7 +20,7 @@ where
///
/// You *can* use it as trait bound for generic arguments,
/// but you should not rely on any of the items in this trait.
pub trait ServerListener: crate::util::Listener + Unpin {
pub trait ServerListener: util::Listener + Unpin {
#[doc(hidden)]
type Body: crate::Body;

Expand All @@ -36,14 +36,15 @@ pub trait ServerListener: crate::util::Listener + Unpin {

impl<Listener> ServerListener for Listener
where
Listener: crate::util::Listener + Unpin,
Listener::Connection: IntoTransport,
Listener: util::Listener + Unpin,
Listener::Connection: util::IntoTransport,
{
type Body = <Listener::Connection as IntoTransport>::Body;
type Config = <Listener::Connection as IntoTransport>::Config;
type Transport = <Listener::Connection as IntoTransport>::Transport;
type Body = <Listener::Connection as util::IntoTransport>::Body;
type Config = <Listener::Connection as util::IntoTransport>::Config;
type Transport = <Listener::Connection as util::IntoTransport>::Transport;

fn spawn(connection: Self::Connection, config: Self::Config) -> PeerHandle<Self::Body> {
use util::IntoTransport;
Peer::spawn(connection.into_transport(config))
}
}
Expand All @@ -65,7 +66,7 @@ impl<Listener: ServerListener> Server<Listener> {
/// This function is asynchronous because it may perform a DNS lookup for some address types.
pub async fn bind<'a, Address: 'a>(address: Address, config: Listener::Config) -> std::io::Result<Self>
where
Listener: crate::util::Bind<'a, Address>,
Listener: util::Bind<'a, Address>,
{
Ok(Self::new(Listener::bind(address).await?, config))
}
Expand Down
52 changes: 20 additions & 32 deletions src/transport.rs → src/transport/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
//! Transport traits and concrete implementations.
//!
//! Transports are responsible for passing raw messages to a remote peer.
//! They are used by the [`Peer`][crate::Peer] struct to implement higher level RPC communication.
//!
//! Specific transports must be enabled with individual feature flags.
//! None of the concrete transport implementations are enabled by default.
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::error::{ReadMessageError, WriteMessageError};
use crate::{Message, MessageHeader};

#[cfg(any(feature = "unix-stream", feature = "tcp"))]
pub(crate) mod stream;

#[cfg(any(feature = "unix-stream", feature = "tcp"))]
pub use stream::StreamTransport;

#[cfg(feature = "unix-seqpacket")]
pub use unix::UnixTransport;

#[cfg(feature = "unix-seqpacket")]
pub(crate) mod unix;

/// Trait for types that represent a bi-direction message transport.
///
/// Note that you can not use the transport itself directly.
Expand Down Expand Up @@ -51,38 +71,6 @@ pub trait WriteHalfType<'a> {
type WriteHalf: TransportWriteHalf<Body = Self::Body>;
}

/// Trait to allow generic creation of transports from a socket.
pub trait IntoTransport: Sized + Send {
/// The body type for messages transferred over the transport.
type Body: crate::Body;

/// The configuration type of the transport.
type Config: Clone + Send + Sync + 'static;

/// The transport type.
type Transport: Transport<Body = Self::Body> + Send + 'static;

/// Create a transport from `self` and a configuration struct.
fn into_transport(self, config: Self::Config) -> Self::Transport;

/// Create a transport from `self` using the default configuration.
fn into_default_transport(self) -> Self::Transport
where
Self::Config: Default,
{
self.into_transport(Self::Config::default())
}
}

/// Trait for connecting transports to a remote address.
pub trait Connect<'a, Address: 'a>: Sized + Transport {
/// The type of the future returned by `Self::connect`.
type Future: Future<Output = std::io::Result<Self>>;

/// Create a new transport connected to a remote address.
fn connect(address: Address, config: Self::Config) -> Self::Future;
}

/// Trait for the read half of a transport type.
pub trait TransportReadHalf: Send + Unpin {
/// The body type for messages transferred over the transport.
Expand Down
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 80e1b52

Please sign in to comment.