Skip to content

Commit

Permalink
Run cargo fmt (mostly).
Browse files Browse the repository at this point in the history
  • Loading branch information
de-vri-es committed Oct 13, 2020
1 parent 9f2857d commit 892f65d
Show file tree
Hide file tree
Showing 20 changed files with 186 additions and 146 deletions.
5 changes: 3 additions & 2 deletions examples/unix-seqpacket-client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use fizyr_rpc::Peer;
use fizyr_rpc::IntoTransport;
use fizyr_rpc::Peer;

use std::path::PathBuf;
use structopt::StructOpt;
Expand Down Expand Up @@ -31,7 +31,8 @@ async fn do_main(options: &Options) -> Result<(), String> {
let mut peer = Peer::spawn(socket.into_transport_default());

// Send a request to the remote peer.
let mut request = peer.send_request(1, &b"Hello World!"[..])
let mut request = peer
.send_request(1, &b"Hello World!"[..])
.await
.map_err(|e| format!("failed to send request: {}", e))?;

Expand Down
21 changes: 11 additions & 10 deletions examples/unix-seqpacket-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ async fn main() {

async fn do_main(options: &Options) -> Result<(), String> {
// Create a listening socket for the server.
let socket = UnixSeqpacketListener::bind(&options.socket)
.map_err(|e| format!("failed to bind to {}: {}", options.socket.display(), e))?;
let socket = UnixSeqpacketListener::bind(&options.socket).map_err(|e| format!("failed to bind to {}: {}", options.socket.display(), e))?;

// Wrap the socket in an RPC server.
let mut server = Server::new(socket, Default::default());
Expand All @@ -49,13 +48,15 @@ async fn handle_peer(mut peer: fizyr_rpc::PeerHandle<fizyr_rpc::UnixBody>) -> Re
// Receive the next incoming message.
let incoming = match peer.next_message().await {
Ok(x) => x,
Err(e) => if e.is_connection_aborted() {
// Log aborted connections but return Ok(()).
eprintln!("connection closed by peer");
return Ok(());
} else {
// Pass other errors up to the caller.
return Err(format!("failed to receive message from peer: {}", e));
Err(e) => {
if e.is_connection_aborted() {
// Log aborted connections but return Ok(()).
eprintln!("connection closed by peer");
return Ok(());
} else {
// Pass other errors up to the caller.
return Err(format!("failed to receive message from peer: {}", e));
}
},
};

Expand All @@ -68,7 +69,7 @@ async fn handle_peer(mut peer: fizyr_rpc::PeerHandle<fizyr_rpc::UnixBody>) -> Re
.send_error_response(&format!("unknown service ID: {}", n))
.await
.map_err(|e| format!("failed to send error response message: {}", e))?,
}
},
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions examples/unix-stream-client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use fizyr_rpc::Peer;
use fizyr_rpc::IntoTransport;
use fizyr_rpc::Peer;

use std::path::PathBuf;
use structopt::StructOpt;
Expand Down Expand Up @@ -31,7 +31,8 @@ async fn do_main(options: &Options) -> Result<(), String> {
let mut peer = Peer::spawn(socket.into_transport_default());

// Send a request to the remote peer.
let mut request = peer.send_request(1, &b"Hello World!"[..])
let mut request = peer
.send_request(1, &b"Hello World!"[..])
.await
.map_err(|e| format!("failed to send request: {}", e))?;

Expand Down
21 changes: 11 additions & 10 deletions examples/unix-stream-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ async fn main() {

async fn do_main(options: &Options) -> Result<(), String> {
// Create a listening socket for the server.
let socket = UnixListener::bind(&options.socket)
.map_err(|e| format!("failed to bind to {}: {}", options.socket.display(), e))?;
let socket = UnixListener::bind(&options.socket).map_err(|e| format!("failed to bind to {}: {}", options.socket.display(), e))?;

// Wrap the socket in an RPC server.
let mut server = Server::new(socket, Default::default());
Expand All @@ -49,13 +48,15 @@ async fn handle_peer(mut peer: fizyr_rpc::PeerHandle<fizyr_rpc::StreamBody>) ->
// Receive the next incoming message.
let incoming = match peer.next_message().await {
Ok(x) => x,
Err(e) => if e.is_connection_aborted() {
// Log aborted connections but return Ok(()).
eprintln!("connection closed by peer");
return Ok(());
} else {
// Pass other errors up to the caller.
return Err(format!("failed to receive message from peer: {}", e));
Err(e) => {
if e.is_connection_aborted() {
// Log aborted connections but return Ok(()).
eprintln!("connection closed by peer");
return Ok(());
} else {
// Pass other errors up to the caller.
return Err(format!("failed to receive message from peer: {}", e));
}
},
};

Expand All @@ -68,7 +69,7 @@ async fn handle_peer(mut peer: fizyr_rpc::PeerHandle<fizyr_rpc::StreamBody>) ->
.send_error_response(&format!("unknown service ID: {}", n))
.await
.map_err(|e| format!("failed to send error response message: {}", e))?,
}
},
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
hard_tabs = true
max_width = 140
tab_spaces = 4
max_width = 150
imports_layout = "HorizontalVertical"
match_block_trailing_comma = true
overflow_delimited_expr = true
reorder_impl_items = true
unstable_features = true
use_field_init_shorthand = true
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#[macro_use]
mod macros;

pub mod error;
mod message;
mod peer;
mod peer_handle;
Expand All @@ -67,21 +68,20 @@ mod request_tracker;
mod server;
mod transport;
mod util;
pub mod error;

#[cfg(any(feature = "unix-stream", feature = "tcp"))]
mod stream;

#[cfg(feature = "unix-seqpacket")]
mod unix;

pub use message::service_id;
pub use message::Body;
pub use message::HEADER_LEN;
pub use message::MAX_PAYLOAD_LEN;
pub use message::Message;
pub use message::MessageHeader;
pub use message::MessageType;
pub use message::service_id;
pub use message::HEADER_LEN;
pub use message::MAX_PAYLOAD_LEN;
pub use peer::Peer;
pub use peer_handle::PeerHandle;
pub use peer_handle::PeerReadHandle;
Expand Down
10 changes: 6 additions & 4 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,11 @@ impl MessageHeader {
let service_id = LE::read_i32(&buffer[8..]);

let message_type = MessageType::from_u32(message_type)?;
Ok(Self { message_type, request_id, service_id })
Ok(Self {
message_type,
request_id,
service_id,
})
}

/// Encode a message header into a byte slice.
Expand All @@ -236,8 +240,6 @@ impl MessageHeader {
impl<Body> std::fmt::Debug for Message<Body> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
// TODO: use finish_non_exhaustive when it hits stable.
f.debug_struct("Message")
.field("header", &self.header)
.finish()
f.debug_struct("Message").field("header", &self.header).finish()
}
}
31 changes: 15 additions & 16 deletions src/peer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use tokio::sync::mpsc;
use crate::util::{select, Either};
use tokio::sync::mpsc;

use crate::error;
use crate::Incoming;
use crate::Message;
use crate::PeerHandle;
use crate::RequestTracker;
use crate::SentRequest;
use crate::error;
use tokio::sync::oneshot;

/// Message for the internal peer command loop.
Expand Down Expand Up @@ -138,7 +138,10 @@ impl<Transport: crate::Transport> Peer<Transport> {
match select(read_loop, command_loop).await {
Either::Left(((), command_loop)) => {
// If the read loop stopped we should still flush all queued incoming messages, then stop.
command_tx.send(Command::Stop).map_err(drop).expect("command loop did not stop yet but command channel is closed");
command_tx
.send(Command::Stop)
.map_err(drop)
.expect("command loop did not stop yet but command channel is closed");
command_loop.await;
},
Either::Right((read_loop, ())) => {
Expand Down Expand Up @@ -223,7 +226,9 @@ where
}

// Get the next command from the channel.
let command = self.command_rx.recv()
let command = self
.command_rx
.recv()
.await
.expect("all command channels closed, but we keep one open ourselves");

Expand Down Expand Up @@ -262,7 +267,7 @@ where
Err(e) => {
let _: Result<_, _> = command.result_tx.send(Err(e.into()));
return LoopFlow::Continue;
}
},
};

let request_id = request.request_id();
Expand Down Expand Up @@ -345,7 +350,7 @@ where
}
},
Incoming::Stream(_) => LoopFlow::Continue,
}
},
}
}

Expand Down Expand Up @@ -425,25 +430,19 @@ impl<Body> std::fmt::Debug for Command<Body> {

impl<Body> std::fmt::Debug for SendRequest<Body> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("SendRequest")
.field("service_id", &self.service_id)
.finish()
f.debug_struct("SendRequest").field("service_id", &self.service_id).finish()
}
}

impl<Body> std::fmt::Debug for SendRawMessage<Body> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("SendRawMessage")
.field("message", &self.message)
.finish()
f.debug_struct("SendRawMessage").field("message", &self.message).finish()
}
}

impl<Body> std::fmt::Debug for ProcessIncomingMessage<Body> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("ProcessIncomingMessage")
.field("message", &self.message)
.finish()
f.debug_struct("ProcessIncomingMessage").field("message", &self.message).finish()
}
}

Expand Down Expand Up @@ -471,8 +470,8 @@ mod test {
use assert2::assert;
use assert2::let_assert;

use tokio::net::UnixStream;
use crate::{MessageHeader, StreamTransport};
use tokio::net::UnixStream;

#[tokio::test]
async fn test_peer() {
Expand Down
15 changes: 10 additions & 5 deletions src/peer_handle.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use tokio::sync::mpsc;
use tokio::sync::oneshot;

use crate::error;
use crate::peer::{Command, SendRawMessage, SendRequest};
use crate::Incoming;
use crate::Message;
use crate::SentRequest;
use crate::error;
use crate::peer::{Command, SendRequest, SendRawMessage};

/// Handle to a peer.
///
Expand Down Expand Up @@ -73,7 +73,10 @@ impl<Body> PeerHandle<Body> {
incoming_rx: mpsc::UnboundedReceiver<Result<Incoming<Body>, error::NextMessageError>>,
command_tx: mpsc::UnboundedSender<Command<Body>>,
) -> Self {
let read_handle = PeerReadHandle { incoming_rx, command_tx: command_tx.clone() };
let read_handle = PeerReadHandle {
incoming_rx,
command_tx: command_tx.clone(),
};
let write_handle = PeerWriteHandle { command_tx };
Self { read_handle, write_handle }
}
Expand Down Expand Up @@ -155,7 +158,8 @@ impl<Body> PeerWriteHandle<Body> {
pub async fn send_request(&mut self, service_id: i32, body: impl Into<Body>) -> Result<SentRequest<Body>, error::SendRequestError> {
let body = body.into();
let (result_tx, result_rx) = oneshot::channel();
self.command_tx.send(SendRequest { service_id, body, result_tx }.into())
self.command_tx
.send(SendRequest { service_id, body, result_tx }.into())
.map_err(|_| error::connection_aborted())?;

result_rx.await.map_err(|_| error::connection_aborted())?
Expand All @@ -166,7 +170,8 @@ impl<Body> PeerWriteHandle<Body> {
let body = body.into();
let (result_tx, result_rx) = oneshot::channel();
let message = Message::stream(0, service_id, body);
self.command_tx.send(SendRawMessage { message, result_tx }.into())
self.command_tx
.send(SendRawMessage { message, result_tx }.into())
.map_err(|_| error::connection_aborted())?;

result_rx.await.map_err(|_| error::connection_aborted())?
Expand Down
Loading

0 comments on commit 892f65d

Please sign in to comment.