From cc49c8acf7204b9fc84268c0b1bb3d65f3a4130e Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 24 Feb 2023 15:26:43 +0200 Subject: [PATCH 1/5] Make `context` field in `Publish` a reference Co-Authored-By: Casper Beyer --- async-nats/src/jetstream/context.rs | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index 432e7d7cf..fc42e42ef 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -117,7 +117,7 @@ impl Context { /// # } /// ``` pub fn publish(&self, subject: String, payload: Bytes) -> Publish { - Publish::new(self.clone(), subject, payload) + Publish::new(self, subject, payload) } /// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from @@ -1011,16 +1011,16 @@ impl futures::Stream for Streams { } /// Used for building customized `publish` message. #[derive(Clone, Debug)] -pub struct Publish { - context: Context, +pub struct Publish<'a> { + context: &'a Context, subject: String, payload: Bytes, headers: Option, } -impl Publish { +impl<'a> Publish<'a> { /// Creates a new custom Publish struct to be used with. - pub(crate) fn new(context: Context, subject: String, payload: Bytes) -> Self { + pub(crate) fn new(context: &'a Context, subject: String, payload: Bytes) -> Self { Publish { context, subject, @@ -1085,26 +1085,23 @@ impl Publish { } } -impl IntoFuture for Publish { +impl<'a> IntoFuture for Publish<'a> { type Output = Result; type IntoFuture = Pin> + Send>>; fn into_future(self) -> Self::IntoFuture { + let client = self.context.client.clone(); + let timeout = self.context.timeout; + Box::pin(std::future::IntoFuture::into_future(async move { - let inbox = self.context.client.new_inbox(); - let subscription = self.context.client.subscribe(inbox.clone()).await?; - let mut publish = self - .context - .client - .publish(self.subject, self.payload) - .reply(inbox); + let inbox = client.new_inbox(); + let subscription = client.subscribe(inbox.clone()).await?; + let mut publish = client.publish(self.subject, self.payload).reply(inbox); if let Some(headers) = self.headers { publish = publish.headers(headers); } - let timeout = self.context.timeout; - tokio::time::timeout(timeout, publish.into_future()) .map_err(|_| { std::io::Error::new(ErrorKind::TimedOut, "JetStream publish request timed out") From 0984e468119a53f4809ba1cdca9d7eec2659f836 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 24 Feb 2023 21:30:47 +0200 Subject: [PATCH 2/5] Make `context` field in `Request` a reference Co-Authored-By: Casper Beyer --- async-nats/src/jetstream/context.rs | 61 +++++++++++++++-------------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index fc42e42ef..cd20f5108 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -19,6 +19,7 @@ use crate::jetstream::publish::PublishAck; use crate::jetstream::response::Response; use crate::{header, Client, Command, Error, HeaderMap, HeaderValue}; use bytes::Bytes; +use futures::FutureExt; use futures::{Future, StreamExt, TryFutureExt}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -697,7 +698,7 @@ impl Context { T: Sized + Serialize, V: DeserializeOwned, { - Request::new(self.clone(), subject, payload) + Request::new(self, subject, payload) } /// Creates a new object store bucket. @@ -1117,16 +1118,16 @@ impl<'a> IntoFuture for Publish<'a> { } #[derive(Debug)] -pub struct Request { - context: Context, +pub struct Request<'a, T: Sized + Serialize, V: DeserializeOwned> { + context: &'a Context, subject: String, payload: T, timeout: Option, response_type: PhantomData, } -impl Request { - pub fn new(context: Context, subject: String, payload: T) -> Self { +impl<'a, T: Sized + Serialize, V: DeserializeOwned> Request<'a, T, V> { + pub fn new(context: &'a Context, subject: String, payload: T) -> Self { Self { context, subject, @@ -1142,34 +1143,36 @@ impl Request { } } -impl IntoFuture for Request { +impl<'a, T: Sized + Serialize, V: DeserializeOwned + Send> IntoFuture for Request<'a, T, V> { type Output = Result, Error>; type IntoFuture = Pin, Error>> + Send>>; fn into_future(self) -> Self::IntoFuture { - let payload_result = serde_json::to_vec(&self.payload).map(Bytes::from); - - let prefix = self.context.prefix; - let client = self.context.client; - let subject = self.subject; - let timeout = self.timeout; - - Box::pin(std::future::IntoFuture::into_future(async move { - let payload = payload_result?; - debug!("JetStream request sent: {:?}", payload); - - let request = client.request(format!("{}.{}", prefix, subject), payload); - let request = request.timeout(timeout); - let message = request.await?; - - debug!( - "JetStream request response: {:?}", - from_utf8(&message.payload) - ); - let response = serde_json::from_slice(message.payload.as_ref())?; - - Ok(response) - })) + serde_json::to_vec(&self.payload) + .map_err(|s| Box::new(s) as Error) + .map(Bytes::from) + .map(|payload| { + debug!("JetStream request sent: {:?}", payload); + + self.context + .client + .request(format!("{}.{}", self.context.prefix, self.subject), payload) + .timeout(self.timeout) + .into_future() + .map(|result| { + result.and_then(|message| { + debug!( + "JetStream request response: {:?}", + from_utf8(&message.payload) + ); + + serde_json::from_slice(message.payload.as_ref()) + .map_err(|s| Box::new(s) as Error) + }) + }) + .boxed() + }) + .unwrap_or_else(|err| std::future::IntoFuture::into_future(async { Err(err) }).boxed()) } } From f342cf2198d68b3fd8b784ca57b71faae1d52bc2 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 3 Mar 2023 18:54:16 +0200 Subject: [PATCH 3/5] Make `client` field in `Publish` a reference Co-Authored-By: Casper Beyer --- async-nats/src/client.rs | 50 +++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index db67c85fb..9babd9367 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -18,6 +18,7 @@ use super::{header::HeaderMap, status::StatusCode, Command, Error, Message, Subs use bytes::Bytes; use futures::future::TryFutureExt; use futures::stream::StreamExt; +use futures::FutureExt; use lazy_static::lazy_static; use regex::Regex; use std::error; @@ -38,18 +39,18 @@ lazy_static! { /// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions. pub struct PublishError(mpsc::error::SendError); -pub struct Publish { - sender: mpsc::Sender, +pub struct Publish<'a> { + client: &'a Client, subject: String, payload: Bytes, headers: Option, respond: Option, } -impl Publish { - pub fn new(sender: mpsc::Sender, subject: String, payload: Bytes) -> Publish { +impl<'a> Publish<'a> { + pub fn new(client: &Client, subject: String, payload: Bytes) -> Publish { Publish { - sender, + client, subject, payload, headers: None, @@ -57,41 +58,32 @@ impl Publish { } } - pub fn headers(mut self, headers: HeaderMap) -> Publish { + pub fn headers(mut self, headers: HeaderMap) -> Publish<'a> { self.headers = Some(headers); self } - pub fn reply(mut self, subject: String) -> Publish { + pub fn reply(mut self, subject: String) -> Publish<'a> { self.respond = Some(subject); self } } -impl IntoFuture for Publish { +impl<'a> IntoFuture for Publish<'a> { type Output = Result<(), PublishError>; - type IntoFuture = Pin> + Send>>; + type IntoFuture = Pin> + Send + 'a>>; fn into_future(self) -> Self::IntoFuture { - let sender = self.sender.clone(); - let subject = self.subject; - let payload = self.payload; - let respond = self.respond; - let headers = self.headers; - - Box::pin(async move { - sender - .send(Command::Publish { - subject, - payload, - respond, - headers, - }) - .map_err(PublishError) - .await?; - - Ok(()) - }) + self.client + .sender + .send(Command::Publish { + subject: self.subject, + payload: self.payload, + respond: self.respond, + headers: self.headers, + }) + .map_err(PublishError) + .boxed() } } @@ -213,7 +205,7 @@ impl Client { /// # } /// ``` pub fn publish(&self, subject: String, payload: Bytes) -> Publish { - Publish::new(self.sender.clone(), subject, payload) + Publish::new(self, subject, payload) } /// Publish a [Message] with headers to a given subject. From 1a360fe50e3db4c911c016abe7a6386c5f39432e Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 3 Mar 2023 19:17:13 +0200 Subject: [PATCH 4/5] Make `Client` field in `Request` a reference Co-Authored-By: Casper Beyer --- async-nats/src/client.rs | 27 ++++++++++++++------------- async-nats/src/jetstream/context.rs | 2 +- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 9babd9367..775a7442b 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -302,7 +302,7 @@ impl Client { /// # } /// ``` pub fn request(&self, subject: String, payload: Bytes) -> Request { - Request::new(self.clone(), subject, payload) + Request::new(self, subject, payload) } /// Sends the request with headers. @@ -325,7 +325,7 @@ impl Client { headers: HeaderMap, payload: Bytes, ) -> Result { - let message = Request::new(self.clone(), subject, payload) + let message = Request::new(self, subject, payload) .headers(headers) .await?; @@ -455,8 +455,8 @@ impl Client { /// Used for building and sending requests. #[derive(Debug)] -pub struct Request { - client: Client, +pub struct Request<'a> { + client: &'a Client, subject: String, payload: Option, headers: Option, @@ -464,8 +464,8 @@ pub struct Request { inbox: Option, } -impl Request { - pub fn new(client: Client, subject: String, payload: Bytes) -> Request { +impl<'a> Request<'a> { + pub fn new(client: &'a Client, subject: String, payload: Bytes) -> Request<'a> { Request { client, subject, @@ -487,7 +487,7 @@ impl Request { /// # Ok(()) /// # } /// ``` - pub fn payload(mut self, payload: Bytes) -> Request { + pub fn payload(mut self, payload: Bytes) -> Request<'a> { self.payload = Some(payload); self } @@ -510,7 +510,7 @@ impl Request { /// # Ok(()) /// # } /// ``` - pub fn headers(mut self, headers: HeaderMap) -> Request { + pub fn headers(mut self, headers: HeaderMap) -> Request<'a> { self.headers = Some(headers); self } @@ -531,7 +531,7 @@ impl Request { /// # Ok(()) /// # } /// ``` - pub fn timeout(mut self, timeout: Option) -> Request { + pub fn timeout(mut self, timeout: Option) -> Request<'a> { self.timeout = Some(timeout); self } @@ -550,7 +550,7 @@ impl Request { /// # Ok(()) /// # } /// ``` - pub fn inbox(mut self, inbox: String) -> Request { + pub fn inbox(mut self, inbox: String) -> Request<'a> { self.inbox = Some(inbox); self } @@ -561,6 +561,7 @@ impl Request { let mut publish = self .client .publish(self.subject, self.payload.unwrap_or_else(Bytes::new)); + if let Some(headers) = self.headers { publish = publish.headers(headers); } @@ -598,11 +599,11 @@ impl Request { } } -impl IntoFuture for Request { +impl<'a> IntoFuture for Request<'a> { type Output = Result; - type IntoFuture = Pin> + Send>>; + type IntoFuture = Pin> + Send + 'a>>; fn into_future(self) -> Self::IntoFuture { - Box::pin(self.send()) + self.send().boxed() } } diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index cd20f5108..c54bcbf60 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -1146,7 +1146,7 @@ impl<'a, T: Sized + Serialize, V: DeserializeOwned> Request<'a, T, V> { impl<'a, T: Sized + Serialize, V: DeserializeOwned + Send> IntoFuture for Request<'a, T, V> { type Output = Result, Error>; - type IntoFuture = Pin, Error>> + Send>>; + type IntoFuture = Pin, Error>> + Send + 'a>>; fn into_future(self) -> Self::IntoFuture { serde_json::to_vec(&self.payload) From be16dac708860de6bb843f5283fa03f98fb1548d Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 10 Mar 2023 17:24:07 +0200 Subject: [PATCH 5/5] Test for moving `Publish` --- async-nats/tests/jetstream_tests.rs | 39 +++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index ec4ee696d..b5a4cc7a2 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -24,6 +24,7 @@ pub struct AccountInfo { mod jetstream { + use std::future::IntoFuture; use std::str::from_utf8; use std::time::{Duration, Instant}; @@ -2534,4 +2535,42 @@ mod jetstream { message.ack().await.unwrap(); } } + + #[tokio::test] + async fn publish_move() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + let context = async_nats::jetstream::new(client.clone()); + + context + .create_stream(async_nats::jetstream::stream::Config { + name: "test_stream".to_string(), + ..Default::default() + }) + .await + .unwrap(); + + let publish = context + .publish("test_stream".to_string(), "data".into()) + .into_future(); + + // Ensure that you get a result after moving the publish and awaiting it + let handle = tokio::task::spawn(async move { publish.await.unwrap().await.unwrap() }); + + let result = handle.await.unwrap(); + assert_eq!(result.stream, "test_stream"); + assert_eq!(result.sequence, 1); + + let publish_ack = context + .publish("test_stream".to_string(), "data".into()) + .await + .unwrap(); + + // Ensure that you get a result after moving the ack and awaiting it + let handle = tokio::task::spawn(async move { publish_ack.await.unwrap() }); + + let result = handle.await.unwrap(); + assert_eq!(result.stream, "test_stream"); + assert_eq!(result.sequence, 2); + } }