Skip to content

Commit

Permalink
Make context field in Request a reference
Browse files Browse the repository at this point in the history
Co-Authored-By: Casper Beyer <[email protected]>
  • Loading branch information
n1ghtmare and caspervonb committed Mar 6, 2023
1 parent cc49c8a commit 0984e46
Showing 1 changed file with 32 additions and 29 deletions.
61 changes: 32 additions & 29 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::jetstream::publish::PublishAck;
use crate::jetstream::response::Response;
use crate::{header, Client, Command, Error, HeaderMap, HeaderValue};
use bytes::Bytes;
use futures::FutureExt;
use futures::{Future, StreamExt, TryFutureExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -697,7 +698,7 @@ impl Context {
T: Sized + Serialize,
V: DeserializeOwned,
{
Request::new(self.clone(), subject, payload)
Request::new(self, subject, payload)
}

/// Creates a new object store bucket.
Expand Down Expand Up @@ -1117,16 +1118,16 @@ impl<'a> IntoFuture for Publish<'a> {
}

#[derive(Debug)]
pub struct Request<T: Sized + Serialize, V: DeserializeOwned> {
context: Context,
pub struct Request<'a, T: Sized + Serialize, V: DeserializeOwned> {
context: &'a Context,
subject: String,
payload: T,
timeout: Option<Duration>,
response_type: PhantomData<V>,
}

impl<T: Sized + Serialize, V: DeserializeOwned> Request<T, V> {
pub fn new(context: Context, subject: String, payload: T) -> Self {
impl<'a, T: Sized + Serialize, V: DeserializeOwned> Request<'a, T, V> {
pub fn new(context: &'a Context, subject: String, payload: T) -> Self {
Self {
context,
subject,
Expand All @@ -1142,34 +1143,36 @@ impl<T: Sized + Serialize, V: DeserializeOwned> Request<T, V> {
}
}

impl<T: Sized + Serialize, V: DeserializeOwned> IntoFuture for Request<T, V> {
impl<'a, T: Sized + Serialize, V: DeserializeOwned + Send> IntoFuture for Request<'a, T, V> {
type Output = Result<Response<V>, Error>;

type IntoFuture = Pin<Box<dyn Future<Output = Result<Response<V>, Error>> + Send>>;

fn into_future(self) -> Self::IntoFuture {
let payload_result = serde_json::to_vec(&self.payload).map(Bytes::from);

let prefix = self.context.prefix;
let client = self.context.client;
let subject = self.subject;
let timeout = self.timeout;

Box::pin(std::future::IntoFuture::into_future(async move {
let payload = payload_result?;
debug!("JetStream request sent: {:?}", payload);

let request = client.request(format!("{}.{}", prefix, subject), payload);
let request = request.timeout(timeout);
let message = request.await?;

debug!(
"JetStream request response: {:?}",
from_utf8(&message.payload)
);
let response = serde_json::from_slice(message.payload.as_ref())?;

Ok(response)
}))
serde_json::to_vec(&self.payload)
.map_err(|s| Box::new(s) as Error)
.map(Bytes::from)
.map(|payload| {
debug!("JetStream request sent: {:?}", payload);

self.context
.client
.request(format!("{}.{}", self.context.prefix, self.subject), payload)
.timeout(self.timeout)
.into_future()
.map(|result| {
result.and_then(|message| {
debug!(
"JetStream request response: {:?}",
from_utf8(&message.payload)
);

serde_json::from_slice(message.payload.as_ref())
.map_err(|s| Box::new(s) as Error)
})
})
.boxed()
})
.unwrap_or_else(|err| std::future::IntoFuture::into_future(async { Err(err) }).boxed())
}
}

0 comments on commit 0984e46

Please sign in to comment.