From bd077676b3acf0b5e0643ad9b1c19cbef08e450b Mon Sep 17 00:00:00 2001 From: Ryan Butler Date: Thu, 20 Jun 2024 00:04:54 -0400 Subject: [PATCH] replicate: make manager take &self --- .../client/examples/example-client.rs | 2 +- crates/replicate/client/src/manager.rs | 91 +++++++++++++------ .../replicate/common/src/messages/manager.rs | 4 +- 3 files changed, 66 insertions(+), 31 deletions(-) diff --git a/crates/replicate/client/examples/example-client.rs b/crates/replicate/client/examples/example-client.rs index 56ad508..86d2774 100644 --- a/crates/replicate/client/examples/example-client.rs +++ b/crates/replicate/client/examples/example-client.rs @@ -32,7 +32,7 @@ async fn main() -> Result<()> { let args = Args::parse(); - let mut manager = Manager::connect(args.url, args.token.as_deref()) + let manager = Manager::connect(args.url, args.token.as_deref()) .await .wrap_err("failed to connect to manager")?; info!("Connected to manager!"); diff --git a/crates/replicate/client/src/manager.rs b/crates/replicate/client/src/manager.rs index 91fe96e..735a11b 100644 --- a/crates/replicate/client/src/manager.rs +++ b/crates/replicate/client/src/manager.rs @@ -2,19 +2,23 @@ use std::fmt::Debug; -use eyre::{bail, ensure, eyre, Context}; +use eyre::Result; +use eyre::{bail, ensure, Context, OptionExt}; use futures::sink::SinkExt; use futures::stream::StreamExt; use replicate_common::{ messages::manager::{Clientbound as Cb, Serverbound as Sb}, InstanceId, }; +use tokio::sync::{mpsc, oneshot}; use url::Url; use crate::connect_to_url; use crate::Ascii; -type Result = eyre::Result; +/// The number of queued rpc calls allowed before we start erroring. +const RPC_CAPACITY: usize = 64; + type Framed = replicate_common::Framed; /// Manages instances on the instance server. Under the hood, this is all done @@ -27,7 +31,8 @@ type Framed = replicate_common::Framed; pub struct Manager { _conn: wtransport::Connection, _url: Url, - framed: Framed, + task: tokio::task::JoinHandle>, + request_tx: mpsc::Sender<(Sb, oneshot::Sender)>, } impl Manager { @@ -70,40 +75,70 @@ impl Manager { ); } + let (request_tx, request_rx) = mpsc::channel(RPC_CAPACITY); + let task = tokio::spawn(manager_task(framed, request_rx)); + Ok(Self { _conn: conn, _url: url, - framed, + task, + request_tx, }) } - pub async fn instance_create(&mut self) -> Result { - self.framed - .send(Sb::InstanceCreateRequest) + pub async fn instance_create(&self) -> Result { + let response = self.request(Sb::InstanceCreateRequest).await?; + let Cb::InstanceCreateResponse { id } = response else { + bail!("unexpected response: {response:?}"); + }; + Ok(id) + } + + pub async fn instance_url(&self, id: InstanceId) -> Result { + let response = self.request(Sb::InstanceUrlRequest { id }).await?; + let Cb::InstanceUrlResponse { url } = response else { + bail!("unexpected response: {response:?}"); + }; + Ok(url) + } + + /// Panics if the connection is already dead + async fn request(&self, request: Sb) -> Result { + let (response_tx, response_rx) = oneshot::channel(); + self.request_tx + .send((request, response_tx)) .await - .wrap_err("failed to write message")?; - match self.framed.next().await { - None => Err(eyre!("server disconnected")), - Some(Err(err)) => { - Err(eyre::Report::new(err).wrap_err("failed to receive message")) - } - Some(Ok(Cb::InstanceCreateResponse { id })) => Ok(id), - Some(Ok(_)) => Err(eyre!("unexpected response")), - } + .wrap_err("failed to send to manager task")?; + response_rx + .await + .wrap_err("failed to receive from manager task") } - pub async fn instance_url(&mut self, id: InstanceId) -> Result { - self.framed - .send(Sb::InstanceUrlRequest { id }) + /// Destroys the manager and reaps any errors from its networking task + pub async fn join(self) -> Result<()> { + self.task .await - .wrap_err("failed to write message")?; - match self.framed.next().await { - None => Err(eyre!("server disconnected")), - Some(Err(err)) => { - Err(eyre::Report::new(err).wrap_err("failed to receive message")) - } - Some(Ok(Cb::InstanceUrlResponse { url })) => Ok(url), - Some(Ok(_)) => Err(eyre!("unexpected response")), - } + .wrap_err("panic in manager task, file a bug report on github uwu")? + .wrap_err("error in task") + } +} + +async fn manager_task( + mut framed: Framed, + mut request_rx: mpsc::Receiver<(Sb, oneshot::Sender)>, +) -> Result<()> { + while let Some((request, response_tx)) = request_rx.recv().await { + framed + .send(request) + .await + .wrap_err("error while sending request")?; + let response = framed + .next() + .await + .ok_or_eyre("expected a response from the server")? + .wrap_err("error while receiving response")?; + let _ = response_tx.send(response); } + // We only return ok when the manager struct was dropped + Ok(()) } diff --git a/crates/replicate/common/src/messages/manager.rs b/crates/replicate/common/src/messages/manager.rs index f3ede7f..7e623d7 100644 --- a/crates/replicate/common/src/messages/manager.rs +++ b/crates/replicate/common/src/messages/manager.rs @@ -3,14 +3,14 @@ use url::Url; use crate::InstanceId; -#[derive(Serialize, Deserialize, Eq, PartialEq)] +#[derive(Serialize, Deserialize, Eq, PartialEq, Debug)] pub enum Serverbound { InstanceUrlRequest { id: InstanceId }, InstanceCreateRequest, HandshakeRequest, } -#[derive(Serialize, Deserialize, Eq, PartialEq)] +#[derive(Serialize, Deserialize, Eq, PartialEq, Debug)] pub enum Clientbound { InstanceUrlResponse { url: Url }, InstanceCreateResponse { id: InstanceId },