Skip to content

Commit

Permalink
Remove unused features
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Jan 20, 2023
1 parent 5b25624 commit 74b28bc
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 451 deletions.
3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ edition = "2021"
[features]
default = ["flume"]
crossbeam = ["crossbeam-channel"]
fair = []
profiler = []
async-tokio = ["tokio", "futures", "tokio/net", "tokio/io-util", "tokio/time", "tokio/rt-multi-thread", "tokio/macros"]
deque-flatten = []
deque-start = []

[dependencies]
# for logging to the console
Expand Down
211 changes: 3 additions & 208 deletions src/network/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,51 +42,11 @@ const MUX_CHANNEL_CAPACITY: usize = 10;
/// The `ReceiverEndpoint` is sent alongside the actual message in order to demultiplex it.
#[derive(Debug)]
pub struct MultiplexingSender<Out: ExchangeData> {
/// The internal sender that points to the actual multiplexed channel.
// sender: Sender<(ReceiverEndpoint, NetworkMessage<Out>)>,
#[cfg(feature = "fair")]
tx: UnboundedSender<(ReceiverEndpoint, Receiver<NetworkMessage<Out>>)>,
#[cfg(not(feature = "fair"))]
tx: Option<Sender<(ReceiverEndpoint, NetworkMessage<Out>)>>,
}

#[cfg(not(feature = "async-tokio"))]
impl<Out: ExchangeData> MultiplexingSender<Out> {
/// Construct a new `MultiplexingSender` for a block.
///
/// All the replicas of this block should point to this multiplexer (or one of its clones).
#[cfg(feature = "fair")]
pub fn new(coord: DemuxCoord, address: (String, u16)) -> (Self, JoinHandle<()>) {
let (tx, rx) = channel::unbounded();
let join_handle = std::thread::Builder::new()
.name(format!("noir-mux-{}", coord))
.spawn(move || {
log::debug!(
"mux connecting to {}",
address.to_socket_addrs().unwrap().nth(0).unwrap()
);
let stream = connect_remote(coord, address);
log::debug!("mux connected, waiting for receivers");
let mut receivers = Vec::new();
while let Ok(t) = rx.recv() {
receivers.push(t);
}
log::debug!(
"mux for {} got receivers: [{}]",
coord,
receivers
.iter()
.map(|(e, _)| format!("{e} "))
.collect::<String>()
.trim()
);
mux_thread::<Out>(coord, receivers, stream);
})
.unwrap();
(Self { tx }, join_handle)
}

#[cfg(not(feature = "fair"))]
pub fn new(coord: DemuxCoord, address: (String, u16)) -> (Self, JoinHandle<()>) {
let (tx, rx) = channel::bounded(MUX_CHANNEL_CAPACITY);

Expand All @@ -104,27 +64,7 @@ impl<Out: ExchangeData> MultiplexingSender<Out> {
.unwrap();
(Self { tx: Some(tx) }, join_handle)
}
/// Send a message to the channel.
///
/// Unlikely the normal channels, the destination is required since the channel is multiplexed.
// pub fn send(
// &self,
// destination: ReceiverEndpoint,
// message: NetworkMessage<Out>,
// ) -> Result<(), SendError<(ReceiverEndpoint, NetworkMessage<Out>)>> {
// self.sender.send((destination, message))
// }
#[cfg(feature = "fair")]
pub(crate) fn get_sender(&mut self, receiver_endpoint: ReceiverEndpoint) -> NetworkSender<Out> {
let (sender, receiver) = channel::bounded(CHANNEL_CAPACITY);
self.tx.send((receiver_endpoint, receiver)).unwrap();
NetworkSender {
receiver_endpoint,
sender,
}
}

#[cfg(not(feature = "fair"))]

pub(crate) fn get_sender(&mut self, receiver_endpoint: ReceiverEndpoint) -> NetworkSender<Out> {
use super::mux_sender;

Expand Down Expand Up @@ -189,43 +129,7 @@ fn connect_remote(coord: DemuxCoord, address: (String, u16)) -> TcpStream {
);
}

/// Handle the connection to the remote replica.
///
/// Waits messages from the local receiver, then serialize the message and send it to the remote
/// replica.
///
/// # Upgrade path
///
/// Instead of using a single mpsc channel, use multiple channels, one per block
/// Use a fair (round robin?) selection from each channel when sending
///
/// Before popping from a channel, check that a Yield request was not received for that block
/// In that case, do not pop from the channel and only select from others
/// (this handles backpressure since the sender will block when the channel is full)
/// If a Resume request was received then allow popping from the channel
#[cfg(not(feature = "async-tokio"))]
#[cfg(feature = "fair")]
fn mux_thread<Out: ExchangeData>(
coord: DemuxCoord,
receivers: Vec<(ReceiverEndpoint, Receiver<NetworkMessage<Out>>)>,
mut stream: TcpStream,
) {
let address = stream
.peer_addr()
.map(|a| a.to_string())
.unwrap_or_else(|_| "unknown".to_string());
log::debug!("Connection to {} at {} established", coord, address);

let mut selector = Selector::new(receivers);

while let Ok((dest, message)) = selector.recv() {
remote_send(message, dest, &mut stream);
}
let _ = stream.shutdown(Shutdown::Both);
log::debug!("Remote sender for {} exited", coord);
}

#[cfg(all(not(feature = "async-tokio"), not(feature = "fair")))]
fn mux_thread<Out: ExchangeData>(
coord: DemuxCoord,
rx: Receiver<(ReceiverEndpoint, NetworkMessage<Out>)>,
Expand All @@ -251,60 +155,7 @@ fn mux_thread<Out: ExchangeData>(
log::debug!("Remote sender for {} exited", coord);
}

#[cfg(all(feature = "async-tokio", feature = "fair"))]
impl<Out: ExchangeData> MultiplexingSender<Out> {
/// Construct a new `MultiplexingSender` for a block.
///
/// All the replicas of this block should point to this multiplexer (or one of its clones).
pub fn new(coord: DemuxCoord, address: (String, u16)) -> (Self, JoinHandle<()>) {
let (tx, rx) = channel::unbounded();
let join_handle = tokio::spawn(async move {
log::debug!(
"mux connecting to {}",
address.to_socket_addrs().unwrap().nth(0).unwrap()
);
let stream = connect_remote(coord, address).await;
log::debug!("mux connected, waiting for receivers");
let mut receivers = Vec::new();
while let Ok(t) = rx.recv() {
receivers.push(t);
}
log::debug!(
"mux for {} got receivers: [{}]",
coord,
receivers
.iter()
.map(|(e, _)| format!("{e} "))
.collect::<String>()
.trim()
);
mux_thread::<Out>(coord, receivers, stream).await;
});
(Self { tx }, join_handle)
}

/// Send a message to the channel.
///
/// Unlikely the normal channels, the destination is required since the channel is multiplexed.
// pub fn send(
// &self,
// destination: ReceiverEndpoint,
// message: NetworkMessage<Out>,
// ) -> Result<(), SendError<(ReceiverEndpoint, NetworkMessage<Out>)>> {
// self.sender.send((destination, message))
// }

pub(crate) fn get_sender(&mut self, receiver_endpoint: ReceiverEndpoint) -> NetworkSender<Out> {
let (sender, receiver) = channel::bounded(CHANNEL_CAPACITY);
self.tx.send((receiver_endpoint, receiver)).unwrap();
NetworkSender {
receiver_endpoint,
sender,
}
}
}

#[cfg(all(feature = "async-tokio", not(feature = "fair")))]
#[cfg(feature = "async-tokio")]
impl<Out: ExchangeData> MultiplexingSender<Out> {
/// Construct a new `MultiplexingSender` for a block.
///
Expand Down Expand Up @@ -393,63 +244,7 @@ async fn connect_remote(coord: DemuxCoord, address: (String, u16)) -> TcpStream
);
}

/// Handle the connection to the remote replica.
///
/// Waits messages from the local receiver, then serialize the message and send it to the remote
/// replica.
///
/// # Upgrade path
///
/// Instead of using a single mpsc channel, use multiple channels, one per block
/// Use a fair (round robin?) selection from each channel when sending
///
/// Before popping from a channel, check that a Yield request was not received for that block
/// In that case, do not pop from the channel and only select from others
/// (this handles backpressure since the sender will block when the channel is full)
/// If a Resume request was received then allow popping from the channel
#[cfg(all(feature = "async-tokio", feature = "fair"))]
async fn mux_thread<Out: ExchangeData>(
coord: DemuxCoord,
receivers: Vec<(ReceiverEndpoint, Receiver<NetworkMessage<Out>>)>,
mut stream: TcpStream,
) {
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::io::AsyncWriteExt;

use crate::channel::RecvError;

let address = stream
.peer_addr()
.map(|a| a.to_string())
.unwrap_or_else(|_| "unknown".to_string());
log::debug!("Connection to {} at {} established", coord, address);

async fn make_fut<T: ExchangeData>(
(i, r): (usize, &(ReceiverEndpoint, Receiver<NetworkMessage<T>>)),
) -> (usize, Result<NetworkMessage<T>, RecvError>) {
(i, r.1.recv_async().await)
}

// let make_fut = |(i, r) : (usize, &(ReceiverEndpoint, Receiver<NetworkMessage<Out>>))| async { (i, r.1.recv_async().await) };

let mut selector: FuturesUnordered<_> = receivers.iter().enumerate().map(make_fut).collect();

while let Some(next) = selector.next().await {
match next {
(i, Ok(message)) => {
let r = &receivers[i];
remote_send(message, r.0, &mut stream).await;
selector.push(make_fut((i, r)))
}
(_, Err(RecvError::Disconnected)) => {}
}
}

stream.shutdown().await.unwrap();
log::debug!("Remote sender for {} exited", coord);
}

#[cfg(all(feature = "async-tokio", not(feature = "fair")))]
#[cfg(feature = "async-tokio")]
async fn mux_thread<Out: ExchangeData>(
coord: DemuxCoord,
rx: Receiver<(ReceiverEndpoint, NetworkMessage<Out>)>,
Expand Down
42 changes: 0 additions & 42 deletions src/network/network_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,6 @@ use crate::profiler::{get_profiler, Profiler};
/// The capacity of the in-buffer.
const CHANNEL_CAPACITY: usize = 10;

#[cfg(feature = "fair")]
pub(crate) fn local_channel<T: ExchangeData>(
receiver_endpoint: ReceiverEndpoint,
) -> (NetworkSender<T>, NetworkReceiver<T>) {
let (sender, receiver) = channel::bounded(CHANNEL_CAPACITY);
(
NetworkSender {
receiver_endpoint,
sender,
},
NetworkReceiver {
receiver_endpoint,
receiver,
},
)
}

#[cfg(not(feature = "fair"))]
pub(crate) fn local_channel<T: ExchangeData>(
receiver_endpoint: ReceiverEndpoint,
) -> (NetworkSender<T>, NetworkReceiver<T>) {
Expand All @@ -47,7 +29,6 @@ pub(crate) fn local_channel<T: ExchangeData>(
)
}

#[cfg(not(feature = "fair"))]
pub(crate) fn mux_sender<T: ExchangeData>(
receiver_endpoint: ReceiverEndpoint,
tx: Sender<(ReceiverEndpoint, NetworkMessage<T>)>,
Expand Down Expand Up @@ -142,35 +123,16 @@ pub(crate) struct NetworkSender<Out: ExchangeData> {
pub receiver_endpoint: ReceiverEndpoint,
/// The generic sender that will send the message either locally or remotely.
#[derivative(Debug = "ignore")]
#[cfg(feature = "fair")]
pub(super) sender: Sender<NetworkMessage<Out>>,
#[derivative(Debug = "ignore")]
#[cfg(not(feature = "fair"))]
sender: SenderInner<Out>,
}

#[derive(Clone)]
#[cfg(not(feature = "fair"))]
enum SenderInner<Out: ExchangeData> {
Mux(Sender<(ReceiverEndpoint, NetworkMessage<Out>)>),
Local(Sender<NetworkMessage<Out>>),
}

impl<Out: ExchangeData> NetworkSender<Out> {
/// Send a message to a replica.
#[cfg(feature = "fair")]
pub fn send(&self, message: NetworkMessage<Out>) -> Result<(), NetworkSendError> {
get_profiler().items_out(
message.sender,
self.receiver_endpoint.coord,
message.num_items(),
);
self.sender
.send(message)
.map_err(|_| NetworkSendError::Disconnected(self.receiver_endpoint))
}

#[cfg(not(feature = "fair"))]
pub fn send(&self, message: NetworkMessage<Out>) -> Result<(), NetworkSendError> {
get_profiler().items_out(
message.sender,
Expand All @@ -189,10 +151,6 @@ impl<Out: ExchangeData> NetworkSender<Out> {
}

pub fn clone_inner(&self) -> Sender<NetworkMessage<Out>> {
#[cfg(feature = "fair")]
return self.sender.clone();

#[cfg(not(feature = "fair"))]
match &self.sender {
SenderInner::Mux(_) => panic!("Trying to clone mux channel. Not supported"),
SenderInner::Local(tx) => tx.clone(),
Expand Down
Loading

0 comments on commit 74b28bc

Please sign in to comment.