Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep context and client fields for Publish and Request by reference #866

Open
wants to merge 5 commits into
base: into-future-refactor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 35 additions & 42 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use super::{header::HeaderMap, status::StatusCode, Command, Error, Message, Subs
use bytes::Bytes;
use futures::future::TryFutureExt;
use futures::stream::StreamExt;
use futures::FutureExt;
use lazy_static::lazy_static;
use regex::Regex;
use std::error;
Expand All @@ -38,60 +39,51 @@ lazy_static! {
/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
pub struct PublishError(mpsc::error::SendError<Command>);

pub struct Publish {
sender: mpsc::Sender<Command>,
pub struct Publish<'a> {
client: &'a Client,
subject: String,
payload: Bytes,
headers: Option<HeaderMap>,
respond: Option<String>,
}

impl Publish {
pub fn new(sender: mpsc::Sender<Command>, subject: String, payload: Bytes) -> Publish {
impl<'a> Publish<'a> {
pub fn new(client: &Client, subject: String, payload: Bytes) -> Publish {
Publish {
sender,
client,
subject,
payload,
headers: None,
respond: None,
}
}

pub fn headers(mut self, headers: HeaderMap) -> Publish {
pub fn headers(mut self, headers: HeaderMap) -> Publish<'a> {
self.headers = Some(headers);
self
}

pub fn reply(mut self, subject: String) -> Publish {
pub fn reply(mut self, subject: String) -> Publish<'a> {
self.respond = Some(subject);
self
}
}

impl IntoFuture for Publish {
impl<'a> IntoFuture for Publish<'a> {
type Output = Result<(), PublishError>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send>>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send + 'a>>;

fn into_future(self) -> Self::IntoFuture {
let sender = self.sender.clone();
let subject = self.subject;
let payload = self.payload;
let respond = self.respond;
let headers = self.headers;

Box::pin(async move {
sender
.send(Command::Publish {
subject,
payload,
respond,
headers,
})
.map_err(PublishError)
.await?;

Ok(())
})
self.client
.sender
.send(Command::Publish {
subject: self.subject,
payload: self.payload,
respond: self.respond,
headers: self.headers,
})
.map_err(PublishError)
.boxed()
}
}

Expand Down Expand Up @@ -213,7 +205,7 @@ impl Client {
/// # }
/// ```
pub fn publish(&self, subject: String, payload: Bytes) -> Publish {
Publish::new(self.sender.clone(), subject, payload)
Publish::new(self, subject, payload)
}

/// Publish a [Message] with headers to a given subject.
Expand Down Expand Up @@ -310,7 +302,7 @@ impl Client {
/// # }
/// ```
pub fn request(&self, subject: String, payload: Bytes) -> Request {
Request::new(self.clone(), subject, payload)
Request::new(self, subject, payload)
}

/// Sends the request with headers.
Expand All @@ -333,7 +325,7 @@ impl Client {
headers: HeaderMap,
payload: Bytes,
) -> Result<Message, Error> {
let message = Request::new(self.clone(), subject, payload)
let message = Request::new(self, subject, payload)
.headers(headers)
.await?;

Expand Down Expand Up @@ -463,17 +455,17 @@ impl Client {

/// Used for building and sending requests.
#[derive(Debug)]
pub struct Request {
client: Client,
pub struct Request<'a> {
client: &'a Client,
subject: String,
payload: Option<Bytes>,
headers: Option<HeaderMap>,
timeout: Option<Option<Duration>>,
inbox: Option<String>,
}

impl Request {
pub fn new(client: Client, subject: String, payload: Bytes) -> Request {
impl<'a> Request<'a> {
pub fn new(client: &'a Client, subject: String, payload: Bytes) -> Request<'a> {
Request {
client,
subject,
Expand All @@ -495,7 +487,7 @@ impl Request {
/// # Ok(())
/// # }
/// ```
pub fn payload(mut self, payload: Bytes) -> Request {
pub fn payload(mut self, payload: Bytes) -> Request<'a> {
self.payload = Some(payload);
self
}
Expand All @@ -518,7 +510,7 @@ impl Request {
/// # Ok(())
/// # }
/// ```
pub fn headers(mut self, headers: HeaderMap) -> Request {
pub fn headers(mut self, headers: HeaderMap) -> Request<'a> {
self.headers = Some(headers);
self
}
Expand All @@ -539,7 +531,7 @@ impl Request {
/// # Ok(())
/// # }
/// ```
pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
pub fn timeout(mut self, timeout: Option<Duration>) -> Request<'a> {
self.timeout = Some(timeout);
self
}
Expand All @@ -558,7 +550,7 @@ impl Request {
/// # Ok(())
/// # }
/// ```
pub fn inbox(mut self, inbox: String) -> Request {
pub fn inbox(mut self, inbox: String) -> Request<'a> {
self.inbox = Some(inbox);
self
}
Expand All @@ -569,6 +561,7 @@ impl Request {
let mut publish = self
.client
.publish(self.subject, self.payload.unwrap_or_else(Bytes::new));

if let Some(headers) = self.headers {
publish = publish.headers(headers);
}
Expand Down Expand Up @@ -606,11 +599,11 @@ impl Request {
}
}

impl IntoFuture for Request {
impl<'a> IntoFuture for Request<'a> {
type Output = Result<Message, Error>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<Message, Error>> + Send>>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<Message, Error>> + Send + 'a>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(self.send())
self.send().boxed()
}
}
90 changes: 45 additions & 45 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::jetstream::publish::PublishAck;
use crate::jetstream::response::Response;
use crate::{header, Client, Command, Error, HeaderMap, HeaderValue};
use bytes::Bytes;
use futures::FutureExt;
use futures::{Future, StreamExt, TryFutureExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -117,7 +118,7 @@ impl Context {
/// # }
/// ```
pub fn publish(&self, subject: String, payload: Bytes) -> Publish {
Publish::new(self.clone(), subject, payload)
Publish::new(self, subject, payload)
}

/// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
Expand Down Expand Up @@ -697,7 +698,7 @@ impl Context {
T: Sized + Serialize,
V: DeserializeOwned,
{
Request::new(self.clone(), subject, payload)
Request::new(self, subject, payload)
}

/// Creates a new object store bucket.
Expand Down Expand Up @@ -1011,16 +1012,16 @@ impl futures::Stream for Streams {
}
/// Used for building customized `publish` message.
#[derive(Clone, Debug)]
pub struct Publish {
context: Context,
pub struct Publish<'a> {
context: &'a Context,
subject: String,
payload: Bytes,
headers: Option<header::HeaderMap>,
}

impl Publish {
impl<'a> Publish<'a> {
/// Creates a new custom Publish struct to be used with.
pub(crate) fn new(context: Context, subject: String, payload: Bytes) -> Self {
pub(crate) fn new(context: &'a Context, subject: String, payload: Bytes) -> Self {
Publish {
context,
subject,
Expand Down Expand Up @@ -1085,26 +1086,23 @@ impl Publish {
}
}

impl IntoFuture for Publish {
impl<'a> IntoFuture for Publish<'a> {
type Output = Result<PublishAckFuture, Error>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAckFuture, Error>> + Send>>;

fn into_future(self) -> Self::IntoFuture {
let client = self.context.client.clone();
let timeout = self.context.timeout;

Box::pin(std::future::IntoFuture::into_future(async move {
let inbox = self.context.client.new_inbox();
let subscription = self.context.client.subscribe(inbox.clone()).await?;
let mut publish = self
.context
.client
.publish(self.subject, self.payload)
.reply(inbox);
let inbox = client.new_inbox();
let subscription = client.subscribe(inbox.clone()).await?;
let mut publish = client.publish(self.subject, self.payload).reply(inbox);

if let Some(headers) = self.headers {
publish = publish.headers(headers);
}

let timeout = self.context.timeout;

tokio::time::timeout(timeout, publish.into_future())
.map_err(|_| {
std::io::Error::new(ErrorKind::TimedOut, "JetStream publish request timed out")
Expand All @@ -1120,16 +1118,16 @@ impl IntoFuture for Publish {
}

#[derive(Debug)]
pub struct Request<T: Sized + Serialize, V: DeserializeOwned> {
context: Context,
pub struct Request<'a, T: Sized + Serialize, V: DeserializeOwned> {
context: &'a Context,
subject: String,
payload: T,
timeout: Option<Duration>,
response_type: PhantomData<V>,
}

impl<T: Sized + Serialize, V: DeserializeOwned> Request<T, V> {
pub fn new(context: Context, subject: String, payload: T) -> Self {
impl<'a, T: Sized + Serialize, V: DeserializeOwned> Request<'a, T, V> {
pub fn new(context: &'a Context, subject: String, payload: T) -> Self {
Self {
context,
subject,
Expand All @@ -1145,34 +1143,36 @@ impl<T: Sized + Serialize, V: DeserializeOwned> Request<T, V> {
}
}

impl<T: Sized + Serialize, V: DeserializeOwned> IntoFuture for Request<T, V> {
impl<'a, T: Sized + Serialize, V: DeserializeOwned + Send> IntoFuture for Request<'a, T, V> {
type Output = Result<Response<V>, Error>;

type IntoFuture = Pin<Box<dyn Future<Output = Result<Response<V>, Error>> + Send>>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<Response<V>, Error>> + Send + 'a>>;

fn into_future(self) -> Self::IntoFuture {
let payload_result = serde_json::to_vec(&self.payload).map(Bytes::from);

let prefix = self.context.prefix;
let client = self.context.client;
let subject = self.subject;
let timeout = self.timeout;

Box::pin(std::future::IntoFuture::into_future(async move {
let payload = payload_result?;
debug!("JetStream request sent: {:?}", payload);

let request = client.request(format!("{}.{}", prefix, subject), payload);
let request = request.timeout(timeout);
let message = request.await?;

debug!(
"JetStream request response: {:?}",
from_utf8(&message.payload)
);
let response = serde_json::from_slice(message.payload.as_ref())?;

Ok(response)
}))
serde_json::to_vec(&self.payload)
.map_err(|s| Box::new(s) as Error)
.map(Bytes::from)
.map(|payload| {
debug!("JetStream request sent: {:?}", payload);

self.context
.client
.request(format!("{}.{}", self.context.prefix, self.subject), payload)
.timeout(self.timeout)
.into_future()
.map(|result| {
result.and_then(|message| {
debug!(
"JetStream request response: {:?}",
from_utf8(&message.payload)
);

serde_json::from_slice(message.payload.as_ref())
.map_err(|s| Box::new(s) as Error)
})
})
.boxed()
})
.unwrap_or_else(|err| std::future::IntoFuture::into_future(async { Err(err) }).boxed())
}
}
Loading