Skip to content

Commit

Permalink
Optimize async runtime to directly forward single consumer inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
twright committed Oct 18, 2024
1 parent 712b5ab commit 10dd723
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ async fn manage_var<V: Clone + Debug + Send + 'static>(
cancel: CancellationToken,
) {
let mut senders: Vec<mpsc::Sender<V>> = vec![];
let mut send_requests = vec![];
// Gathering senders
loop {
select! {
Expand All @@ -103,13 +104,7 @@ async fn manage_var<V: Clone + Debug + Send + 'static>(
}
channel_sender = channel_request_rx.recv() => {
if let Some(channel_sender) = channel_sender {
let (tx, rx) = mpsc::channel(10);
senders.push(tx);
let stream = ReceiverStream::new(rx);
// let typed_stream = SS::to_typed_stream(typ, Box::pin(stream));
if let Err(_) = channel_sender.send(Box::pin(stream)) {
panic!("Failed to send stream for {var} to requester");
}
send_requests.push(channel_sender);
}
// We don't care if we stop receiving requests
}
Expand All @@ -120,6 +115,27 @@ async fn manage_var<V: Clone + Debug + Send + 'static>(
}
}

// Sending subscriptions
if send_requests.len() == 1 {
// Special case handling for a single request; just send the input stream
let channel_sender = send_requests.pop().unwrap();
if let Err(_) = channel_sender.send(input_stream) {
panic!("Failed to send stream for {var} to requester");
}
// We directly re-forwarded the input stream, so we are done
return;
} else {
for channel_sender in send_requests {
let (tx, rx) = mpsc::channel(10);
senders.push(tx);
let stream = ReceiverStream::new(rx);
// let typed_stream = SS::to_typed_stream(typ, Box::pin(stream));
if let Err(_) = channel_sender.send(Box::pin(stream)) {
panic!("Failed to send stream for {var} to requester");
}
}
}

// Distributing data
loop {
select! {
Expand Down

0 comments on commit 10dd723

Please sign in to comment.