From cdee0c12bfa592fd9f82cd08146f536a7f9a0492 Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Fri, 14 Feb 2025 00:03:45 +0700 Subject: [PATCH 01/13] Implement ingress client --- Cargo.toml | 1 + src/ingress/internal.rs | 168 ++++++++++++++++++++++++++++++++++++++++ src/ingress/mod.rs | 130 +++++++++++++++++++++++++++++++ src/ingress/request.rs | 93 ++++++++++++++++++++++ src/ingress/result.rs | 167 +++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 6 files changed, 560 insertions(+) create mode 100644 src/ingress/internal.rs create mode 100644 src/ingress/mod.rs create mode 100644 src/ingress/request.rs create mode 100644 src/ingress/result.rs diff --git a/Cargo.toml b/Cargo.toml index ae96dcb..c95812c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful" pin-project-lite = "0.2" rand = { version = "0.8.5", optional = true } regress = "0.10" +reqwest = { version = "0.12", features = ["json"] } restate-sdk-macros = { version = "0.3.2", path = "macros" } restate-sdk-shared-core = "0.1.0" serde = "1.0" diff --git a/src/ingress/internal.rs b/src/ingress/internal.rs new file mode 100644 index 0000000..3d1d2c1 --- /dev/null +++ b/src/ingress/internal.rs @@ -0,0 +1,168 @@ +use std::time::Duration; + +use reqwest::{header::HeaderMap, Url}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +use super::{ + request::{IngressRequestOptions, SendResponse, SendStatus}, + result::{IngressResultOptions, ResultOp, ResultTarget}, +}; +use crate::{context::RequestTarget, errors::TerminalError}; + +const IDEMPOTENCY_KEY_HEADER: &str = "Idempotency-Key"; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct SendResponseSchema { + invocation_id: String, + status: SendStatusSchema, +} + +#[derive(Deserialize)] +enum SendStatusSchema { + Accepted, + PreviouslyAccepted, +} + +impl From for SendStatus { + fn from(value: SendStatusSchema) -> Self { + match value { + SendStatusSchema::Accepted => SendStatus::Accepted, + SendStatusSchema::PreviouslyAccepted => SendStatus::PreviouslyAccepted, + } + } +} + +#[derive(Deserialize)] +struct TerminalErrorSchema { + code: Option, + message: String, +} + +pub(super) struct IngressInternal { + pub(super) client: reqwest::Client, + pub(super) url: Url, + pub(super) headers: HeaderMap, +} + +impl IngressInternal { + pub(super) async fn call( + &self, + target: RequestTarget, + req: Req, + opts: IngressRequestOptions, + ) -> Result, reqwest::Error> { + let mut headers = self.headers.clone(); + if let Some(key) = opts.idempotency_key { + headers.append(IDEMPOTENCY_KEY_HEADER, key); + } + + let url = format!("{}/{target}", self.url.as_str().trim_end_matches("/")); + + let mut builder = self.client.post(url).headers(headers).json(&req); + + if let Some(timeout) = opts.timeout { + builder = builder.timeout(timeout); + } + + let res = builder.send().await?; + + if let Err(e) = res.error_for_status_ref() { + let status = res.status().as_u16(); + if let Ok(e) = res.json::().await { + Ok(Err(TerminalError::new_with_code( + e.code.unwrap_or(status), + e.message, + ))) + } else { + Err(e) + } + } else { + Ok(Ok(res.json::().await?)) + } + } + + pub(super) async fn send( + &self, + target: RequestTarget, + req: Req, + opts: IngressRequestOptions, + delay: Option, + ) -> Result, reqwest::Error> { + let mut headers = self.headers.clone(); + let attachable = if let Some(key) = opts.idempotency_key { + headers.append(IDEMPOTENCY_KEY_HEADER, key); + true + } else { + false + }; + + let url = if let Some(delay) = delay { + format!( + "{}/{target}/send?delay={}ms", + self.url.as_str().trim_end_matches("/"), + delay.as_millis() + ) + } else { + format!("{}/{target}/send", self.url.as_str().trim_end_matches("/")) + }; + + let mut builder = self.client.post(url).headers(headers).json(&req); + + if let Some(timeout) = opts.timeout { + builder = builder.timeout(timeout); + } + + let res = builder.send().await?; + + if let Err(e) = res.error_for_status_ref() { + let status = res.status().as_u16(); + if let Ok(e) = res.json::().await { + Ok(Err(TerminalError::new_with_code( + e.code.unwrap_or(status), + e.message, + ))) + } else { + Err(e) + } + } else { + let res = res.json::().await?; + Ok(Ok(SendResponse { + invocation_id: res.invocation_id, + status: res.status.into(), + attachable, + })) + } + } + + pub(super) async fn result( + &self, + target: ResultTarget, + op: ResultOp, + opts: IngressResultOptions, + ) -> Result, reqwest::Error> { + let url = format!("{}/{target}/{op}", self.url.as_str().trim_end_matches("/")); + + let mut builder = self.client.get(url).headers(self.headers.clone()); + + if let Some(timeout) = opts.timeout { + builder = builder.timeout(timeout); + } + + let res = builder.send().await?; + + if let Err(e) = res.error_for_status_ref() { + let status = res.status().as_u16(); + if let Ok(e) = res.json::().await { + Ok(Err(TerminalError::new_with_code( + e.code.unwrap_or(status), + e.message, + ))) + } else { + Err(e) + } + } else { + Ok(Ok(res.json::().await?)) + } + } +} diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs new file mode 100644 index 0000000..05213bb --- /dev/null +++ b/src/ingress/mod.rs @@ -0,0 +1,130 @@ +use reqwest::{header::HeaderMap, Url}; + +use self::{ + internal::IngressInternal, + request::IngressRequest, + result::{IngressResult, ResultTarget}, +}; +use crate::context::RequestTarget; + +pub mod internal; +pub mod request; +pub mod result; + +/// A client for invoking handlers via the ingress. +pub struct IngressClient { + inner: IngressInternal, +} + +impl IngressClient { + /// Create a new [`IngressClient`]. + pub fn new(url: Url) -> Self { + Self { + inner: IngressInternal { + client: reqwest::Client::new(), + url, + headers: Default::default(), + }, + } + } + + /// Create a new [`IngressClient`] with custom headers. + pub fn new_with_headers(url: Url, headers: HeaderMap) -> Self { + Self { + inner: IngressInternal { + client: reqwest::Client::new(), + url, + headers, + }, + } + } + + /// Create a new [`IngressRequest`]. + pub fn request(&self, target: RequestTarget, req: Req) -> IngressRequest { + IngressRequest::new(&self.inner, target, req) + } + + /// Create a new [`IngressResult`]. + pub fn result(&self, target: ResultTarget) -> IngressResult { + IngressResult::new(&self.inner, target) + } + + pub fn service_ingress<'a, I>(&'a self) -> I + where + I: IntoServiceIngress<'a>, + { + I::create_ingress(self) + } + + pub fn object_ingress<'a, I>(&'a self, key: impl Into) -> I + where + I: IntoObjectIngress<'a>, + { + I::create_ingress(self, key.into()) + } + + pub fn workflow_client<'a, I>(&'a self, id: impl Into) -> I + where + I: IntoWorkflowIngress<'a>, + { + I::create_ingress(self, id.into()) + } + + pub fn invocation_result<'a, Res>( + &'a self, + invocation_id: impl Into, + ) -> IngressResult<'a, Res> { + self.result(ResultTarget::invocation(invocation_id)) + } + + pub fn service_result<'a, R>(&'a self) -> R + where + R: IntoServiceResult<'a>, + { + R::create_result(self) + } + + pub fn object_result<'a, R>(&'a self, key: impl Into) -> R + where + R: IntoObjectResult<'a>, + { + R::create_result(self, key.into()) + } + + pub fn workflow_result<'a, R>(&'a self, id: impl Into) -> R + where + R: IntoWorkflowResult<'a>, + { + R::create_result(self, id.into()) + } +} + +/// Trait used by codegen to use the service ingress. +pub trait IntoServiceIngress<'a>: Sized { + fn create_ingress(client: &'a IngressClient) -> Self; +} + +/// Trait used by codegen to use the object ingress. +pub trait IntoObjectIngress<'a>: Sized { + fn create_ingress(client: &'a IngressClient, key: String) -> Self; +} + +/// Trait used by codegen to use the workflow ingress. +pub trait IntoWorkflowIngress<'a>: Sized { + fn create_ingress(client: &'a IngressClient, id: String) -> Self; +} + +/// Trait used by codegen to retrieve the service result. +pub trait IntoServiceResult<'a>: Sized { + fn create_result(client: &'a IngressClient) -> Self; +} + +/// Trait used by codegen to retrieve the object result. +pub trait IntoObjectResult<'a>: Sized { + fn create_result(client: &'a IngressClient, key: String) -> Self; +} + +/// Trait used by codegen to retrieve the workflow result. +pub trait IntoWorkflowResult<'a>: Sized { + fn create_result(client: &'a IngressClient, id: String) -> Self; +} diff --git a/src/ingress/request.rs b/src/ingress/request.rs new file mode 100644 index 0000000..f6f41b1 --- /dev/null +++ b/src/ingress/request.rs @@ -0,0 +1,93 @@ +use std::{marker::PhantomData, time::Duration}; + +use http::HeaderValue; +use serde::{de::DeserializeOwned, Serialize}; + +use super::internal::IngressInternal; +use crate::{context::RequestTarget, errors::TerminalError}; + +/// A send response. +#[derive(Debug, Clone)] +pub struct SendResponse { + pub invocation_id: String, + pub status: SendStatus, + pub attachable: bool, +} + +/// The status of the send. +#[derive(Debug, Clone, Copy)] +pub enum SendStatus { + Accepted, + PreviouslyAccepted, +} + +/// This struct encapsulates the parameters for a request to an ingress. +pub struct IngressRequest<'a, Req, Res = ()> { + inner: &'a IngressInternal, + target: RequestTarget, + req: Req, + res: PhantomData, + opts: IngressRequestOptions, +} + +#[derive(Default, Clone)] +pub(super) struct IngressRequestOptions { + pub(super) idempotency_key: Option, + pub(super) timeout: Option, +} + +impl<'a, Req, Res> IngressRequest<'a, Req, Res> { + pub(super) fn new(inner: &'a IngressInternal, target: RequestTarget, req: Req) -> Self { + Self { + inner, + target, + req, + res: PhantomData, + opts: Default::default(), + } + } + + /// Set the idempotency key for the request. + pub fn idempotency_key(mut self, value: HeaderValue) -> Self { + self.opts.idempotency_key = Some(value); + self + } + + /// Set the timeout for the request. + pub fn timeout(mut self, value: Duration) -> Self { + self.opts.timeout = Some(value); + self + } + + /// Call a service via the ingress. This returns a future encapsulating the response. + pub async fn call(self) -> Result, reqwest::Error> + where + Req: Serialize + 'static, + Res: DeserializeOwned + 'static, + { + self.inner.call(self.target, self.req, self.opts).await + } + + /// Send the request to the ingress, without waiting for the response. + pub async fn send(self) -> Result, reqwest::Error> + where + Req: Serialize + 'static, + { + self.inner + .send(self.target, self.req, self.opts, None) + .await + } + + /// Schedule the request to the ingress, without waiting for the response. + pub async fn send_with_delay( + self, + duration: Duration, + ) -> Result, reqwest::Error> + where + Req: Serialize + 'static, + { + self.inner + .send(self.target, self.req, self.opts, Some(duration)) + .await + } +} diff --git a/src/ingress/result.rs b/src/ingress/result.rs new file mode 100644 index 0000000..193d047 --- /dev/null +++ b/src/ingress/result.rs @@ -0,0 +1,167 @@ +use std::{ + fmt::{self, Display, Formatter}, + marker::PhantomData, + time::Duration, +}; + +use serde::de::DeserializeOwned; + +use super::internal::IngressInternal; +use crate::errors::TerminalError; + +/// The invocation or workflow target to retrieve the result from. +#[derive(Debug, Clone)] +pub enum ResultTarget { + Invocation { + id: String, + }, + Service { + name: String, + handler: String, + idempotency_key: String, + }, + Object { + name: String, + key: String, + handler: String, + idempotency_key: String, + }, + Workflow { + name: String, + id: String, + }, +} + +impl ResultTarget { + pub fn invocation(id: impl Into) -> Self { + Self::Invocation { id: id.into() } + } + + pub fn service( + name: impl Into, + handler: impl Into, + idempotency_key: impl Into, + ) -> Self { + Self::Service { + name: name.into(), + handler: handler.into(), + idempotency_key: idempotency_key.into(), + } + } + + pub fn object( + name: impl Into, + key: impl Into, + handler: impl Into, + idempotency_key: impl Into, + ) -> Self { + Self::Object { + name: name.into(), + key: key.into(), + handler: handler.into(), + idempotency_key: idempotency_key.into(), + } + } + + pub fn workflow(name: impl Into, id: impl Into) -> Self { + Self::Workflow { + name: name.into(), + id: id.into(), + } + } +} + +impl Display for ResultTarget { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + ResultTarget::Invocation { id } => { + write!(f, "restate/invocation/{id}") + } + ResultTarget::Service { + name, + handler, + idempotency_key, + } => { + write!(f, "restate/invocation/{name}/{handler}/{idempotency_key}") + } + ResultTarget::Object { + name, + key, + handler, + idempotency_key, + } => write!( + f, + "restate/invocation/{name}/{key}/{handler}/{idempotency_key}" + ), + ResultTarget::Workflow { name, id } => { + write!(f, "restate/workflow/{name}/{id}") + } + } + } +} + +/// The mode of operation to use when retrieving the result of an invocation or workflow. +#[derive(Debug, Clone, Copy)] +pub enum ResultOp { + Attach, + Output, +} + +impl Display for ResultOp { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + ResultOp::Attach => write!(f, "attach"), + ResultOp::Output => write!(f, "output"), + } + } +} + +/// This struct encapsulates the parameters for retrieving a result of an invocation or workflow. +pub struct IngressResult<'a, Res = ()> { + inner: &'a IngressInternal, + target: ResultTarget, + res: PhantomData, + opts: IngressResultOptions, +} + +#[derive(Default)] +pub(super) struct IngressResultOptions { + pub(super) timeout: Option, +} + +impl<'a, Res> IngressResult<'a, Res> { + pub(super) fn new(inner: &'a IngressInternal, target: ResultTarget) -> Self { + Self { + inner, + target, + res: PhantomData, + opts: Default::default(), + } + } + + /// Set the timeout for the request. + pub fn timeout(mut self, timeout: Duration) -> Self { + self.opts.timeout = Some(timeout); + self + } + + /// Attach to an invocation or workflow and wait for it to finish. + pub async fn attach(self) -> Result, reqwest::Error> + where + Res: DeserializeOwned + 'static, + { + self.inner + .result(self.target, ResultOp::Attach, self.opts) + .await + } + + /// Peek at the output of an invocation or workflow. + pub async fn output(self) -> Result, reqwest::Error> + where + Res: DeserializeOwned + 'static, + { + self.inner + .result(self.target, ResultOp::Output, self.opts) + .await + } +} diff --git a/src/lib.rs b/src/lib.rs index 930e9d6..9f30cb1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -222,6 +222,7 @@ pub mod errors; pub mod http_server; #[cfg(feature = "hyper")] pub mod hyper; +pub mod ingress; pub mod serde; /// Entry-point macro to define a Restate [Service](https://docs.restate.dev/concepts/services#services-1). From 0854d8ec4f03ca4ee688825babc62001ff3f7025 Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Fri, 14 Feb 2025 17:55:19 +0700 Subject: [PATCH 02/13] fixup! Implement ingress client --- src/ingress/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ingress/mod.rs b/src/ingress/mod.rs index 05213bb..4ecc3ea 100644 --- a/src/ingress/mod.rs +++ b/src/ingress/mod.rs @@ -63,7 +63,7 @@ impl IngressClient { I::create_ingress(self, key.into()) } - pub fn workflow_client<'a, I>(&'a self, id: impl Into) -> I + pub fn workflow_ingress<'a, I>(&'a self, id: impl Into) -> I where I: IntoWorkflowIngress<'a>, { From ace42a6e865c5f01afbef826d0a9649ae1879750 Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Tue, 18 Feb 2025 20:45:05 +0700 Subject: [PATCH 03/13] Gate ingress client behind a feature flag --- Cargo.toml | 3 ++- src/lib.rs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c95812c..ec4cad8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ rust-version = "1.76.0" default = ["http_server", "rand", "uuid"] hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"] http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"] +ingress_client = ["dep:reqwest"] [dependencies] bytes = "1.6.1" @@ -22,7 +23,7 @@ hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful" pin-project-lite = "0.2" rand = { version = "0.8.5", optional = true } regress = "0.10" -reqwest = { version = "0.12", features = ["json"] } +reqwest = { version = "0.12", optional = true, features = ["json"] } restate-sdk-macros = { version = "0.3.2", path = "macros" } restate-sdk-shared-core = "0.1.0" serde = "1.0" diff --git a/src/lib.rs b/src/lib.rs index 9f30cb1..6d2757b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -222,6 +222,7 @@ pub mod errors; pub mod http_server; #[cfg(feature = "hyper")] pub mod hyper; +#[cfg(feature = "ingress_client")] pub mod ingress; pub mod serde; From 4c9fe9f11ead23277cb36ca39f80cd44a4e1b015 Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Tue, 18 Feb 2025 21:18:19 +0700 Subject: [PATCH 04/13] Use Restate's trait for serialization and deserialization --- src/ingress/internal.rs | 81 +++++++++++++++++++++++++---------------- src/ingress/request.rs | 17 +++++---- src/ingress/result.rs | 14 +++---- 3 files changed, 66 insertions(+), 46 deletions(-) diff --git a/src/ingress/internal.rs b/src/ingress/internal.rs index 3d1d2c1..918819b 100644 --- a/src/ingress/internal.rs +++ b/src/ingress/internal.rs @@ -1,24 +1,28 @@ use std::time::Duration; use reqwest::{header::HeaderMap, Url}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use thiserror::Error; use super::{ request::{IngressRequestOptions, SendResponse, SendStatus}, result::{IngressResultOptions, ResultOp, ResultTarget}, }; -use crate::{context::RequestTarget, errors::TerminalError}; +use crate::{ + context::RequestTarget, + errors::TerminalError, + serde::{Deserialize, Serialize}, +}; const IDEMPOTENCY_KEY_HEADER: &str = "Idempotency-Key"; -#[derive(Deserialize)] +#[derive(serde::Deserialize)] #[serde(rename_all = "camelCase")] struct SendResponseSchema { invocation_id: String, status: SendStatusSchema, } -#[derive(Deserialize)] +#[derive(serde::Deserialize)] enum SendStatusSchema { Accepted, PreviouslyAccepted, @@ -33,7 +37,7 @@ impl From for SendStatus { } } -#[derive(Deserialize)] +#[derive(serde::Deserialize)] struct TerminalErrorSchema { code: Option, message: String, @@ -45,13 +49,29 @@ pub(super) struct IngressInternal { pub(super) headers: HeaderMap, } +#[derive(Debug, Error)] +pub enum IngressClientError { + #[error(transparent)] + Http(#[from] reqwest::Error), + #[error("terminal error [{}]: {}", ._0.code(), ._0.message())] + Terminal(TerminalError), + #[error(transparent)] + Serde(Box), +} + +impl From for IngressClientError { + fn from(value: TerminalError) -> Self { + Self::Terminal(value) + } +} + impl IngressInternal { - pub(super) async fn call( + pub(super) async fn call( &self, target: RequestTarget, req: Req, opts: IngressRequestOptions, - ) -> Result, reqwest::Error> { + ) -> Result { let mut headers = self.headers.clone(); if let Some(key) = opts.idempotency_key { headers.append(IDEMPOTENCY_KEY_HEADER, key); @@ -59,7 +79,10 @@ impl IngressInternal { let url = format!("{}/{target}", self.url.as_str().trim_end_matches("/")); - let mut builder = self.client.post(url).headers(headers).json(&req); + let mut builder = self.client.post(url).headers(headers).body( + req.serialize() + .map_err(|e| IngressClientError::Serde(Box::new(e)))?, + ); if let Some(timeout) = opts.timeout { builder = builder.timeout(timeout); @@ -70,15 +93,13 @@ impl IngressInternal { if let Err(e) = res.error_for_status_ref() { let status = res.status().as_u16(); if let Ok(e) = res.json::().await { - Ok(Err(TerminalError::new_with_code( - e.code.unwrap_or(status), - e.message, - ))) + Err(TerminalError::new_with_code(e.code.unwrap_or(status), e.message).into()) } else { - Err(e) + Err(e.into()) } } else { - Ok(Ok(res.json::().await?)) + Ok(Res::deserialize(&mut res.bytes().await?) + .map_err(|e| IngressClientError::Serde(Box::new(e)))?) } } @@ -88,7 +109,7 @@ impl IngressInternal { req: Req, opts: IngressRequestOptions, delay: Option, - ) -> Result, reqwest::Error> { + ) -> Result { let mut headers = self.headers.clone(); let attachable = if let Some(key) = opts.idempotency_key { headers.append(IDEMPOTENCY_KEY_HEADER, key); @@ -107,7 +128,10 @@ impl IngressInternal { format!("{}/{target}/send", self.url.as_str().trim_end_matches("/")) }; - let mut builder = self.client.post(url).headers(headers).json(&req); + let mut builder = self.client.post(url).headers(headers).body( + req.serialize() + .map_err(|e| IngressClientError::Serde(Box::new(e)))?, + ); if let Some(timeout) = opts.timeout { builder = builder.timeout(timeout); @@ -118,29 +142,26 @@ impl IngressInternal { if let Err(e) = res.error_for_status_ref() { let status = res.status().as_u16(); if let Ok(e) = res.json::().await { - Ok(Err(TerminalError::new_with_code( - e.code.unwrap_or(status), - e.message, - ))) + Err(TerminalError::new_with_code(e.code.unwrap_or(status), e.message).into()) } else { - Err(e) + Err(e.into()) } } else { let res = res.json::().await?; - Ok(Ok(SendResponse { + Ok(SendResponse { invocation_id: res.invocation_id, status: res.status.into(), attachable, - })) + }) } } - pub(super) async fn result( + pub(super) async fn result( &self, target: ResultTarget, op: ResultOp, opts: IngressResultOptions, - ) -> Result, reqwest::Error> { + ) -> Result { let url = format!("{}/{target}/{op}", self.url.as_str().trim_end_matches("/")); let mut builder = self.client.get(url).headers(self.headers.clone()); @@ -154,15 +175,13 @@ impl IngressInternal { if let Err(e) = res.error_for_status_ref() { let status = res.status().as_u16(); if let Ok(e) = res.json::().await { - Ok(Err(TerminalError::new_with_code( - e.code.unwrap_or(status), - e.message, - ))) + Err(TerminalError::new_with_code(e.code.unwrap_or(status), e.message).into()) } else { - Err(e) + Err(e.into()) } } else { - Ok(Ok(res.json::().await?)) + Ok(Res::deserialize(&mut res.bytes().await?) + .map_err(|e| IngressClientError::Serde(Box::new(e)))?) } } } diff --git a/src/ingress/request.rs b/src/ingress/request.rs index f6f41b1..76d53ed 100644 --- a/src/ingress/request.rs +++ b/src/ingress/request.rs @@ -1,10 +1,13 @@ use std::{marker::PhantomData, time::Duration}; use http::HeaderValue; -use serde::{de::DeserializeOwned, Serialize}; -use super::internal::IngressInternal; -use crate::{context::RequestTarget, errors::TerminalError}; +use super::internal::{IngressClientError, IngressInternal}; +use crate::{ + context::RequestTarget, + errors::TerminalError, + serde::{Deserialize, Serialize}, +}; /// A send response. #[derive(Debug, Clone)] @@ -60,16 +63,16 @@ impl<'a, Req, Res> IngressRequest<'a, Req, Res> { } /// Call a service via the ingress. This returns a future encapsulating the response. - pub async fn call(self) -> Result, reqwest::Error> + pub async fn call(self) -> Result where Req: Serialize + 'static, - Res: DeserializeOwned + 'static, + Res: Deserialize + 'static, { self.inner.call(self.target, self.req, self.opts).await } /// Send the request to the ingress, without waiting for the response. - pub async fn send(self) -> Result, reqwest::Error> + pub async fn send(self) -> Result where Req: Serialize + 'static, { @@ -82,7 +85,7 @@ impl<'a, Req, Res> IngressRequest<'a, Req, Res> { pub async fn send_with_delay( self, duration: Duration, - ) -> Result, reqwest::Error> + ) -> Result where Req: Serialize + 'static, { diff --git a/src/ingress/result.rs b/src/ingress/result.rs index 193d047..89b2ba6 100644 --- a/src/ingress/result.rs +++ b/src/ingress/result.rs @@ -4,10 +4,8 @@ use std::{ time::Duration, }; -use serde::de::DeserializeOwned; - -use super::internal::IngressInternal; -use crate::errors::TerminalError; +use super::internal::{IngressClientError, IngressInternal}; +use crate::serde::Deserialize; /// The invocation or workflow target to retrieve the result from. #[derive(Debug, Clone)] @@ -146,9 +144,9 @@ impl<'a, Res> IngressResult<'a, Res> { } /// Attach to an invocation or workflow and wait for it to finish. - pub async fn attach(self) -> Result, reqwest::Error> + pub async fn attach(self) -> Result where - Res: DeserializeOwned + 'static, + Res: Deserialize + 'static, { self.inner .result(self.target, ResultOp::Attach, self.opts) @@ -156,9 +154,9 @@ impl<'a, Res> IngressResult<'a, Res> { } /// Peek at the output of an invocation or workflow. - pub async fn output(self) -> Result, reqwest::Error> + pub async fn output(self) -> Result where - Res: DeserializeOwned + 'static, + Res: Deserialize + 'static, { self.inner .result(self.target, ResultOp::Output, self.opts) From 52c83f385bbc343afe57ed67b499a0d5c8eff162 Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Tue, 18 Feb 2025 21:20:29 +0700 Subject: [PATCH 05/13] Rename ingress module to ingress_client --- src/{ingress => ingress_client}/internal.rs | 0 src/{ingress => ingress_client}/mod.rs | 0 src/{ingress => ingress_client}/request.rs | 0 src/{ingress => ingress_client}/result.rs | 0 src/lib.rs | 2 +- 5 files changed, 1 insertion(+), 1 deletion(-) rename src/{ingress => ingress_client}/internal.rs (100%) rename src/{ingress => ingress_client}/mod.rs (100%) rename src/{ingress => ingress_client}/request.rs (100%) rename src/{ingress => ingress_client}/result.rs (100%) diff --git a/src/ingress/internal.rs b/src/ingress_client/internal.rs similarity index 100% rename from src/ingress/internal.rs rename to src/ingress_client/internal.rs diff --git a/src/ingress/mod.rs b/src/ingress_client/mod.rs similarity index 100% rename from src/ingress/mod.rs rename to src/ingress_client/mod.rs diff --git a/src/ingress/request.rs b/src/ingress_client/request.rs similarity index 100% rename from src/ingress/request.rs rename to src/ingress_client/request.rs diff --git a/src/ingress/result.rs b/src/ingress_client/result.rs similarity index 100% rename from src/ingress/result.rs rename to src/ingress_client/result.rs diff --git a/src/lib.rs b/src/lib.rs index 6d2757b..454bf63 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -223,7 +223,7 @@ pub mod http_server; #[cfg(feature = "hyper")] pub mod hyper; #[cfg(feature = "ingress_client")] -pub mod ingress; +pub mod ingress_client; pub mod serde; /// Entry-point macro to define a Restate [Service](https://docs.restate.dev/concepts/services#services-1). From ca36456cf37041da4b9cbec7734a4e49a2e8a0a5 Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Tue, 18 Feb 2025 23:22:17 +0700 Subject: [PATCH 06/13] Implement macros for ingress client --- Cargo.toml | 2 +- macros/Cargo.toml | 3 ++ macros/src/gen.rs | 121 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ec4cad8..7b6ce8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ rust-version = "1.76.0" default = ["http_server", "rand", "uuid"] hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"] http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"] -ingress_client = ["dep:reqwest"] +ingress_client = ["dep:reqwest", "restate-sdk-macros/ingress_client"] [dependencies] bytes = "1.6.1" diff --git a/macros/Cargo.toml b/macros/Cargo.toml index 5f125c8..3aa09d7 100644 --- a/macros/Cargo.toml +++ b/macros/Cargo.toml @@ -9,6 +9,9 @@ repository = "https://github.com/restatedev/sdk-rust" [lib] proc-macro = true +[features] +ingress_client = [] + [dependencies] proc-macro2 = "1.0" quote = "1.0" diff --git a/macros/src/gen.rs b/macros/src/gen.rs index a882b4d..742515c 100644 --- a/macros/src/gen.rs +++ b/macros/src/gen.rs @@ -9,6 +9,7 @@ pub(crate) struct ServiceGenerator<'a> { pub(crate) restate_name: &'a str, pub(crate) service_ident: &'a Ident, pub(crate) client_ident: Ident, + pub(crate) ingress_ident: Ident, pub(crate) serve_ident: Ident, pub(crate) vis: &'a Visibility, pub(crate) attrs: &'a [Attribute], @@ -22,6 +23,7 @@ impl<'a> ServiceGenerator<'a> { restate_name: &s.restate_name, service_ident: &s.ident, client_ident: format_ident!("{}Client", s.ident), + ingress_ident: format_ident!("{}Ingress", s.ident), serve_ident: format_ident!("Serve{}", s.ident), vis: &s.vis, attrs: &s.attrs, @@ -336,6 +338,121 @@ impl<'a> ServiceGenerator<'a> { } } } + + fn struct_ingress(&self) -> TokenStream2 { + let &Self { + vis, + ref ingress_ident, + ref service_ty, + service_ident, + .. + } = self; + + let key_field = match service_ty { + ServiceType::Service => quote! {}, + ServiceType::Object | ServiceType::Workflow => quote! { + key: String, + }, + }; + + let into_client_impl = match service_ty { + ServiceType::Service => { + quote! { + impl<'client> ::restate_sdk::ingress_client::IntoServiceIngress<'client> for #ingress_ident<'client> { + fn create_ingress(client: &'client ::restate_sdk::ingress_client::IngressClient) -> Self { + Self { client } + } + } + } + } + ServiceType::Object => quote! { + impl<'client> ::restate_sdk::ingress_client::IntoObjectIngress<'client> for #ingress_ident<'client> { + fn create_ingress(client: &'client ::restate_sdk::ingress_client::IngressClient, key: String) -> Self { + Self { client, key } + } + } + }, + ServiceType::Workflow => quote! { + impl<'client> ::restate_sdk::ingress_client::IntoWorkflowIngress<'client> for #ingress_ident<'client> { + fn create_ingress(client: &'client ::restate_sdk::ingress_client::IngressClient, key: String) -> Self { + Self { client, key } + } + } + }, + }; + + let doc_msg = format!( + "Struct exposing the ingress client to invoke [`{service_ident}`] without a context." + ); + quote! { + #[doc = #doc_msg] + #vis struct #ingress_ident<'client> { + client: &'client ::restate_sdk::ingress_client::IngressClient, + #key_field + } + + #into_client_impl + } + } + + fn impl_ingress(&self) -> TokenStream2 { + let &Self { + vis, + ref ingress_ident, + handlers, + restate_name, + service_ty, + .. + } = self; + + let service_literal = Literal::string(restate_name); + + let handlers_fns = handlers.iter().map(|handler| { + let handler_ident = &handler.ident; + let handler_literal = Literal::string(&handler.restate_name); + + let argument = match &handler.arg { + None => quote! {}, + Some(PatType { + ty, .. + }) => quote! { req: #ty } + }; + let argument_ty = match &handler.arg { + None => quote! { () }, + Some(PatType { + ty, .. + }) => quote! { #ty } + }; + let res_ty = &handler.output_ok; + let input = match &handler.arg { + None => quote! { () }, + Some(_) => quote! { req } + }; + let request_target = match service_ty { + ServiceType::Service => quote! { + ::restate_sdk::context::RequestTarget::service(#service_literal, #handler_literal) + }, + ServiceType::Object => quote! { + ::restate_sdk::context::RequestTarget::object(#service_literal, &self.key, #handler_literal) + }, + ServiceType::Workflow => quote! { + ::restate_sdk::context::RequestTarget::workflow(#service_literal, &self.key, #handler_literal) + } + }; + + quote! { + #vis fn #handler_ident(&self, #argument) -> ::restate_sdk::ingress_client::request::IngressRequest<'client, #argument_ty, #res_ty> { + self.client.request(#request_target, #input) + } + } + }); + + quote! { + impl<'client> #ingress_ident<'client> { + #( #handlers_fns )* + } + } + } } impl<'a> ToTokens for ServiceGenerator<'a> { @@ -347,6 +464,10 @@ impl<'a> ToTokens for ServiceGenerator<'a> { self.impl_discoverable(), self.struct_client(), self.impl_client(), + #[cfg(feature = "ingress_client")] + self.struct_ingress(), + #[cfg(feature = "ingress_client")] + self.impl_ingress(), ]); } } From 7f6c8523a25d432cbf03a8731ab1cd23073021aa Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Wed, 19 Feb 2025 00:46:29 +0700 Subject: [PATCH 07/13] fixup! Implement macros for ingress client --- macros/src/gen.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/macros/src/gen.rs b/macros/src/gen.rs index 742515c..a0021d5 100644 --- a/macros/src/gen.rs +++ b/macros/src/gen.rs @@ -9,6 +9,7 @@ pub(crate) struct ServiceGenerator<'a> { pub(crate) restate_name: &'a str, pub(crate) service_ident: &'a Ident, pub(crate) client_ident: Ident, + #[cfg(feature = "ingress_client")] pub(crate) ingress_ident: Ident, pub(crate) serve_ident: Ident, pub(crate) vis: &'a Visibility, @@ -23,6 +24,7 @@ impl<'a> ServiceGenerator<'a> { restate_name: &s.restate_name, service_ident: &s.ident, client_ident: format_ident!("{}Client", s.ident), + #[cfg(feature = "ingress_client")] ingress_ident: format_ident!("{}Ingress", s.ident), serve_ident: format_ident!("Serve{}", s.ident), vis: &s.vis, @@ -339,6 +341,7 @@ impl<'a> ServiceGenerator<'a> { } } + #[cfg(feature = "ingress_client")] fn struct_ingress(&self) -> TokenStream2 { let &Self { vis, @@ -395,6 +398,7 @@ impl<'a> ServiceGenerator<'a> { } } + #[cfg(feature = "ingress_client")] fn impl_ingress(&self) -> TokenStream2 { let &Self { vis, From 993547e07e394f637d3eea7720991b5559707c54 Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Wed, 19 Feb 2025 17:06:46 +0700 Subject: [PATCH 08/13] Rename result to handle --- src/ingress_client/{result.rs => handle.rs} | 48 +++++++++---------- src/ingress_client/internal.rs | 10 ++-- src/ingress_client/mod.rs | 52 ++++++++++----------- src/ingress_client/request.rs | 1 - 4 files changed, 55 insertions(+), 56 deletions(-) rename src/ingress_client/{result.rs => handle.rs} (72%) diff --git a/src/ingress_client/result.rs b/src/ingress_client/handle.rs similarity index 72% rename from src/ingress_client/result.rs rename to src/ingress_client/handle.rs index 89b2ba6..5175002 100644 --- a/src/ingress_client/result.rs +++ b/src/ingress_client/handle.rs @@ -7,9 +7,9 @@ use std::{ use super::internal::{IngressClientError, IngressInternal}; use crate::serde::Deserialize; -/// The invocation or workflow target to retrieve the result from. +/// The target invocation or workflow to retrieve the handle of. #[derive(Debug, Clone)] -pub enum ResultTarget { +pub enum HandleTarget { Invocation { id: String, }, @@ -30,7 +30,7 @@ pub enum ResultTarget { }, } -impl ResultTarget { +impl HandleTarget { pub fn invocation(id: impl Into) -> Self { Self::Invocation { id: id.into() } } @@ -69,20 +69,20 @@ impl ResultTarget { } } -impl Display for ResultTarget { +impl Display for HandleTarget { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - ResultTarget::Invocation { id } => { + HandleTarget::Invocation { id } => { write!(f, "restate/invocation/{id}") } - ResultTarget::Service { + HandleTarget::Service { name, handler, idempotency_key, } => { write!(f, "restate/invocation/{name}/{handler}/{idempotency_key}") } - ResultTarget::Object { + HandleTarget::Object { name, key, handler, @@ -91,44 +91,44 @@ impl Display for ResultTarget { f, "restate/invocation/{name}/{key}/{handler}/{idempotency_key}" ), - ResultTarget::Workflow { name, id } => { + HandleTarget::Workflow { name, id } => { write!(f, "restate/workflow/{name}/{id}") } } } } -/// The mode of operation to use when retrieving the result of an invocation or workflow. +/// The mode of operation to use on the handle of the invocation or workflow. #[derive(Debug, Clone, Copy)] -pub enum ResultOp { +pub enum HandleOp { Attach, Output, } -impl Display for ResultOp { +impl Display for HandleOp { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { - ResultOp::Attach => write!(f, "attach"), - ResultOp::Output => write!(f, "output"), + HandleOp::Attach => write!(f, "attach"), + HandleOp::Output => write!(f, "output"), } } } -/// This struct encapsulates the parameters for retrieving a result of an invocation or workflow. -pub struct IngressResult<'a, Res = ()> { +/// This struct encapsulates the parameters for operating on the handle of an invocation or workflow. +pub struct IngressHandle<'a, Res = ()> { inner: &'a IngressInternal, - target: ResultTarget, + target: HandleTarget, res: PhantomData, - opts: IngressResultOptions, + opts: IngressHandleOptions, } #[derive(Default)] -pub(super) struct IngressResultOptions { +pub(super) struct IngressHandleOptions { pub(super) timeout: Option, } -impl<'a, Res> IngressResult<'a, Res> { - pub(super) fn new(inner: &'a IngressInternal, target: ResultTarget) -> Self { +impl<'a, Res> IngressHandle<'a, Res> { + pub(super) fn new(inner: &'a IngressInternal, target: HandleTarget) -> Self { Self { inner, target, @@ -143,23 +143,23 @@ impl<'a, Res> IngressResult<'a, Res> { self } - /// Attach to an invocation or workflow and wait for it to finish. + /// Attach to the invocation or workflow and wait for it to finish. pub async fn attach(self) -> Result where Res: Deserialize + 'static, { self.inner - .result(self.target, ResultOp::Attach, self.opts) + .handle(self.target, HandleOp::Attach, self.opts) .await } - /// Peek at the output of an invocation or workflow. + /// Peek at the output of the invocation or workflow. pub async fn output(self) -> Result where Res: Deserialize + 'static, { self.inner - .result(self.target, ResultOp::Output, self.opts) + .handle(self.target, HandleOp::Output, self.opts) .await } } diff --git a/src/ingress_client/internal.rs b/src/ingress_client/internal.rs index 918819b..b24e5ec 100644 --- a/src/ingress_client/internal.rs +++ b/src/ingress_client/internal.rs @@ -4,8 +4,8 @@ use reqwest::{header::HeaderMap, Url}; use thiserror::Error; use super::{ + handle::{HandleOp, HandleTarget, IngressHandleOptions}, request::{IngressRequestOptions, SendResponse, SendStatus}, - result::{IngressResultOptions, ResultOp, ResultTarget}, }; use crate::{ context::RequestTarget, @@ -156,11 +156,11 @@ impl IngressInternal { } } - pub(super) async fn result( + pub(super) async fn handle( &self, - target: ResultTarget, - op: ResultOp, - opts: IngressResultOptions, + target: HandleTarget, + op: HandleOp, + opts: IngressHandleOptions, ) -> Result { let url = format!("{}/{target}/{op}", self.url.as_str().trim_end_matches("/")); diff --git a/src/ingress_client/mod.rs b/src/ingress_client/mod.rs index 4ecc3ea..00df354 100644 --- a/src/ingress_client/mod.rs +++ b/src/ingress_client/mod.rs @@ -1,15 +1,15 @@ use reqwest::{header::HeaderMap, Url}; use self::{ + handle::{HandleTarget, IngressHandle}, internal::IngressInternal, request::IngressRequest, - result::{IngressResult, ResultTarget}, }; use crate::context::RequestTarget; +pub mod handle; pub mod internal; pub mod request; -pub mod result; /// A client for invoking handlers via the ingress. pub struct IngressClient { @@ -44,9 +44,9 @@ impl IngressClient { IngressRequest::new(&self.inner, target, req) } - /// Create a new [`IngressResult`]. - pub fn result(&self, target: ResultTarget) -> IngressResult { - IngressResult::new(&self.inner, target) + /// Create a new [`IngressHandle`]. + pub fn handle(&self, target: HandleTarget) -> IngressHandle { + IngressHandle::new(&self.inner, target) } pub fn service_ingress<'a, I>(&'a self) -> I @@ -70,32 +70,32 @@ impl IngressClient { I::create_ingress(self, id.into()) } - pub fn invocation_result<'a, Res>( + pub fn invocation_handle<'a, Res>( &'a self, invocation_id: impl Into, - ) -> IngressResult<'a, Res> { - self.result(ResultTarget::invocation(invocation_id)) + ) -> IngressHandle<'a, Res> { + self.handle(HandleTarget::invocation(invocation_id)) } - pub fn service_result<'a, R>(&'a self) -> R + pub fn service_handle<'a, H>(&'a self) -> H where - R: IntoServiceResult<'a>, + H: IntoServiceHandle<'a>, { - R::create_result(self) + H::create_handle(self) } - pub fn object_result<'a, R>(&'a self, key: impl Into) -> R + pub fn object_handle<'a, H>(&'a self, key: impl Into) -> H where - R: IntoObjectResult<'a>, + H: IntoObjectHandle<'a>, { - R::create_result(self, key.into()) + H::create_handle(self, key.into()) } - pub fn workflow_result<'a, R>(&'a self, id: impl Into) -> R + pub fn workflow_handle<'a, H>(&'a self, id: impl Into) -> H where - R: IntoWorkflowResult<'a>, + H: IntoWorkflowHandle<'a>, { - R::create_result(self, id.into()) + H::create_handle(self, id.into()) } } @@ -114,17 +114,17 @@ pub trait IntoWorkflowIngress<'a>: Sized { fn create_ingress(client: &'a IngressClient, id: String) -> Self; } -/// Trait used by codegen to retrieve the service result. -pub trait IntoServiceResult<'a>: Sized { - fn create_result(client: &'a IngressClient) -> Self; +/// Trait used by codegen to use the service handle. +pub trait IntoServiceHandle<'a>: Sized { + fn create_handle(client: &'a IngressClient) -> Self; } -/// Trait used by codegen to retrieve the object result. -pub trait IntoObjectResult<'a>: Sized { - fn create_result(client: &'a IngressClient, key: String) -> Self; +/// Trait used by codegen to use the object handle. +pub trait IntoObjectHandle<'a>: Sized { + fn create_handle(client: &'a IngressClient, key: String) -> Self; } -/// Trait used by codegen to retrieve the workflow result. -pub trait IntoWorkflowResult<'a>: Sized { - fn create_result(client: &'a IngressClient, id: String) -> Self; +/// Trait used by codegen to use the workflow handle. +pub trait IntoWorkflowHandle<'a>: Sized { + fn create_handle(client: &'a IngressClient, id: String) -> Self; } diff --git a/src/ingress_client/request.rs b/src/ingress_client/request.rs index 76d53ed..703ff3c 100644 --- a/src/ingress_client/request.rs +++ b/src/ingress_client/request.rs @@ -5,7 +5,6 @@ use http::HeaderValue; use super::internal::{IngressClientError, IngressInternal}; use crate::{ context::RequestTarget, - errors::TerminalError, serde::{Deserialize, Serialize}, }; From a91c42a1d74c5e2e606ddef086982fcf2709d584 Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Wed, 19 Feb 2025 19:46:53 +0700 Subject: [PATCH 09/13] Add the ability to interact with awakeables --- src/ingress_client/awakeable.rs | 56 ++++++++++++++++++++++ src/ingress_client/internal.rs | 83 +++++++++++++++++++++++++++++++++ src/ingress_client/mod.rs | 7 +++ 3 files changed, 146 insertions(+) create mode 100644 src/ingress_client/awakeable.rs diff --git a/src/ingress_client/awakeable.rs b/src/ingress_client/awakeable.rs new file mode 100644 index 0000000..c2a6128 --- /dev/null +++ b/src/ingress_client/awakeable.rs @@ -0,0 +1,56 @@ +use std::time::Duration; + +use crate::serde::Serialize; + +use super::{internal::IngressClientError, IngressInternal}; + +pub struct IngressAwakeable<'a> { + inner: &'a IngressInternal, + key: String, + opts: IngressAwakeableOptions, +} + +#[derive(Default, Clone)] +pub(super) struct IngressAwakeableOptions { + pub(super) timeout: Option, +} + +impl<'a> IngressAwakeable<'a> { + pub(super) fn new(inner: &'a IngressInternal, key: impl Into) -> Self { + Self { + inner, + key: key.into(), + opts: Default::default(), + } + } + + /// Set the timeout for the request. + pub fn timeout(mut self, value: Duration) -> Self { + self.opts.timeout = Some(value); + self + } + + /// Resolve the awakeable + pub async fn resolve(self) -> Result<(), IngressClientError> { + self.inner + .resolve_awakeable(&self.key, None::<()>, self.opts) + .await + } + + /// Resolve the awakeable with a payload + pub async fn resolve_with_payload( + self, + payload: T, + ) -> Result<(), IngressClientError> { + self.inner + .resolve_awakeable(&self.key, Some(payload), self.opts) + .await + } + + /// Reject the awakeable with a failure message + pub async fn reject(self, message: impl Into) -> Result<(), IngressClientError> { + self.inner + .reject_awakeable(&self.key, &message.into(), self.opts) + .await + } +} diff --git a/src/ingress_client/internal.rs b/src/ingress_client/internal.rs index b24e5ec..ce0ce7f 100644 --- a/src/ingress_client/internal.rs +++ b/src/ingress_client/internal.rs @@ -1,9 +1,11 @@ use std::time::Duration; +use http::HeaderValue; use reqwest::{header::HeaderMap, Url}; use thiserror::Error; use super::{ + awakeable::IngressAwakeableOptions, handle::{HandleOp, HandleTarget, IngressHandleOptions}, request::{IngressRequestOptions, SendResponse, SendStatus}, }; @@ -14,6 +16,8 @@ use crate::{ }; const IDEMPOTENCY_KEY_HEADER: &str = "Idempotency-Key"; +const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json"); +const TEXT_PLAIN: HeaderValue = HeaderValue::from_static("text/plain"); #[derive(serde::Deserialize)] #[serde(rename_all = "camelCase")] @@ -184,4 +188,83 @@ impl IngressInternal { .map_err(|e| IngressClientError::Serde(Box::new(e)))?) } } + + pub(super) async fn resolve_awakeable( + &self, + key: &str, + payload: Option, + opts: IngressAwakeableOptions, + ) -> Result<(), IngressClientError> { + let url = format!( + "{}/restate/awakeables/{}/resolve", + self.url.as_str().trim_end_matches("/"), + key + ); + + let mut builder = self.client.post(url).headers(self.headers.clone()); + + if let Some(timeout) = opts.timeout { + builder = builder.timeout(timeout); + } + + if let Some(payload) = payload { + builder = builder + .header(http::header::CONTENT_TYPE, APPLICATION_JSON) + .body( + payload + .serialize() + .map_err(|e| IngressClientError::Serde(Box::new(e)))?, + ); + } + + let res = builder.send().await?; + + if let Err(e) = res.error_for_status_ref() { + let status = res.status().as_u16(); + if let Ok(e) = res.json::().await { + Err(TerminalError::new_with_code(e.code.unwrap_or(status), e.message).into()) + } else { + Err(e.into()) + } + } else { + Ok(()) + } + } + + pub(super) async fn reject_awakeable( + &self, + key: &str, + message: &str, + opts: IngressAwakeableOptions, + ) -> Result<(), IngressClientError> { + let url = format!( + "{}/restate/awakeables/{}/reject", + self.url.as_str().trim_end_matches("/"), + key + ); + + let mut builder = self + .client + .post(url) + .headers(self.headers.clone()) + .header(http::header::CONTENT_TYPE, TEXT_PLAIN) + .body(message.to_string()); + + if let Some(timeout) = opts.timeout { + builder = builder.timeout(timeout); + } + + let res = builder.send().await?; + + if let Err(e) = res.error_for_status_ref() { + let status = res.status().as_u16(); + if let Ok(e) = res.json::().await { + Err(TerminalError::new_with_code(e.code.unwrap_or(status), e.message).into()) + } else { + Err(e.into()) + } + } else { + Ok(()) + } + } } diff --git a/src/ingress_client/mod.rs b/src/ingress_client/mod.rs index 00df354..61f7164 100644 --- a/src/ingress_client/mod.rs +++ b/src/ingress_client/mod.rs @@ -1,3 +1,4 @@ +use awakeable::IngressAwakeable; use reqwest::{header::HeaderMap, Url}; use self::{ @@ -7,6 +8,7 @@ use self::{ }; use crate::context::RequestTarget; +pub mod awakeable; pub mod handle; pub mod internal; pub mod request; @@ -97,6 +99,11 @@ impl IngressClient { { H::create_handle(self, id.into()) } + + /// Create a new [`IngressAwakeable`]. + pub fn awakeable(&self, key: impl Into) -> IngressAwakeable<'_> { + IngressAwakeable::new(&self.inner, key) + } } /// Trait used by codegen to use the service ingress. From e186ae1fefa510373a49531e5faf450e6eda5c5a Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Wed, 19 Feb 2025 20:33:51 +0700 Subject: [PATCH 10/13] Refactor client usage --- src/ingress_client/internal.rs | 58 ++++++++++++++++++---------------- src/ingress_client/mod.rs | 13 +++----- src/ingress_client/request.rs | 8 ++--- 3 files changed, 38 insertions(+), 41 deletions(-) diff --git a/src/ingress_client/internal.rs b/src/ingress_client/internal.rs index ce0ce7f..269d2c0 100644 --- a/src/ingress_client/internal.rs +++ b/src/ingress_client/internal.rs @@ -1,7 +1,7 @@ use std::time::Duration; use http::HeaderValue; -use reqwest::{header::HeaderMap, Url}; +use reqwest::Url; use thiserror::Error; use super::{ @@ -50,14 +50,13 @@ struct TerminalErrorSchema { pub(super) struct IngressInternal { pub(super) client: reqwest::Client, pub(super) url: Url, - pub(super) headers: HeaderMap, } #[derive(Debug, Error)] pub enum IngressClientError { #[error(transparent)] Http(#[from] reqwest::Error), - #[error("terminal error [{}]: {}", ._0.code(), ._0.message())] + #[error("{0}")] Terminal(TerminalError), #[error(transparent)] Serde(Box), @@ -76,17 +75,20 @@ impl IngressInternal { req: Req, opts: IngressRequestOptions, ) -> Result { - let mut headers = self.headers.clone(); - if let Some(key) = opts.idempotency_key { - headers.append(IDEMPOTENCY_KEY_HEADER, key); - } - let url = format!("{}/{target}", self.url.as_str().trim_end_matches("/")); - let mut builder = self.client.post(url).headers(headers).body( - req.serialize() - .map_err(|e| IngressClientError::Serde(Box::new(e)))?, - ); + let mut builder = self + .client + .post(url) + .header(http::header::CONTENT_TYPE, APPLICATION_JSON) + .body( + req.serialize() + .map_err(|e| IngressClientError::Serde(Box::new(e)))?, + ); + + if let Some(key) = opts.idempotency_key { + builder = builder.header(IDEMPOTENCY_KEY_HEADER, key); + } if let Some(timeout) = opts.timeout { builder = builder.timeout(timeout); @@ -114,14 +116,6 @@ impl IngressInternal { opts: IngressRequestOptions, delay: Option, ) -> Result { - let mut headers = self.headers.clone(); - let attachable = if let Some(key) = opts.idempotency_key { - headers.append(IDEMPOTENCY_KEY_HEADER, key); - true - } else { - false - }; - let url = if let Some(delay) = delay { format!( "{}/{target}/send?delay={}ms", @@ -132,10 +126,21 @@ impl IngressInternal { format!("{}/{target}/send", self.url.as_str().trim_end_matches("/")) }; - let mut builder = self.client.post(url).headers(headers).body( - req.serialize() - .map_err(|e| IngressClientError::Serde(Box::new(e)))?, - ); + let mut builder = self + .client + .post(url) + .header(http::header::CONTENT_TYPE, APPLICATION_JSON) + .body( + req.serialize() + .map_err(|e| IngressClientError::Serde(Box::new(e)))?, + ); + + let attachable = if let Some(key) = opts.idempotency_key { + builder = builder.header(IDEMPOTENCY_KEY_HEADER, key); + true + } else { + false + }; if let Some(timeout) = opts.timeout { builder = builder.timeout(timeout); @@ -168,7 +173,7 @@ impl IngressInternal { ) -> Result { let url = format!("{}/{target}/{op}", self.url.as_str().trim_end_matches("/")); - let mut builder = self.client.get(url).headers(self.headers.clone()); + let mut builder = self.client.get(url); if let Some(timeout) = opts.timeout { builder = builder.timeout(timeout); @@ -201,7 +206,7 @@ impl IngressInternal { key ); - let mut builder = self.client.post(url).headers(self.headers.clone()); + let mut builder = self.client.post(url); if let Some(timeout) = opts.timeout { builder = builder.timeout(timeout); @@ -246,7 +251,6 @@ impl IngressInternal { let mut builder = self .client .post(url) - .headers(self.headers.clone()) .header(http::header::CONTENT_TYPE, TEXT_PLAIN) .body(message.to_string()); diff --git a/src/ingress_client/mod.rs b/src/ingress_client/mod.rs index 61f7164..1da2944 100644 --- a/src/ingress_client/mod.rs +++ b/src/ingress_client/mod.rs @@ -1,5 +1,5 @@ use awakeable::IngressAwakeable; -use reqwest::{header::HeaderMap, Url}; +use reqwest::Url; use self::{ handle::{HandleTarget, IngressHandle}, @@ -25,19 +25,14 @@ impl IngressClient { inner: IngressInternal { client: reqwest::Client::new(), url, - headers: Default::default(), }, } } - /// Create a new [`IngressClient`] with custom headers. - pub fn new_with_headers(url: Url, headers: HeaderMap) -> Self { + /// Create a new [`IngressClient`] with a custom client. + pub fn new_with_client(url: Url, client: reqwest::Client) -> Self { Self { - inner: IngressInternal { - client: reqwest::Client::new(), - url, - headers, - }, + inner: IngressInternal { client, url }, } } diff --git a/src/ingress_client/request.rs b/src/ingress_client/request.rs index 703ff3c..9f93ae2 100644 --- a/src/ingress_client/request.rs +++ b/src/ingress_client/request.rs @@ -1,7 +1,5 @@ use std::{marker::PhantomData, time::Duration}; -use http::HeaderValue; - use super::internal::{IngressClientError, IngressInternal}; use crate::{ context::RequestTarget, @@ -34,7 +32,7 @@ pub struct IngressRequest<'a, Req, Res = ()> { #[derive(Default, Clone)] pub(super) struct IngressRequestOptions { - pub(super) idempotency_key: Option, + pub(super) idempotency_key: Option, pub(super) timeout: Option, } @@ -50,8 +48,8 @@ impl<'a, Req, Res> IngressRequest<'a, Req, Res> { } /// Set the idempotency key for the request. - pub fn idempotency_key(mut self, value: HeaderValue) -> Self { - self.opts.idempotency_key = Some(value); + pub fn idempotency_key(mut self, value: impl Into) -> Self { + self.opts.idempotency_key = Some(value.into()); self } From 7dbaeb542f7757f936a36c5277cfc8833564d6c3 Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Wed, 19 Feb 2025 20:43:37 +0700 Subject: [PATCH 11/13] Remove unnecessary lifetimes --- src/ingress_client/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ingress_client/mod.rs b/src/ingress_client/mod.rs index 1da2944..c749f22 100644 --- a/src/ingress_client/mod.rs +++ b/src/ingress_client/mod.rs @@ -67,10 +67,10 @@ impl IngressClient { I::create_ingress(self, id.into()) } - pub fn invocation_handle<'a, Res>( - &'a self, + pub fn invocation_handle( + &self, invocation_id: impl Into, - ) -> IngressHandle<'a, Res> { + ) -> IngressHandle<'_, Res> { self.handle(HandleTarget::invocation(invocation_id)) } From 2b6bb359fc6a805f8011099143737dc878beb6f4 Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Wed, 19 Feb 2025 21:55:59 +0700 Subject: [PATCH 12/13] Implement macros for ingress handles --- macros/src/gen.rs | 158 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 157 insertions(+), 1 deletion(-) diff --git a/macros/src/gen.rs b/macros/src/gen.rs index a0021d5..5002f32 100644 --- a/macros/src/gen.rs +++ b/macros/src/gen.rs @@ -1,7 +1,7 @@ use crate::ast::{Handler, Object, Service, ServiceInner, ServiceType, Workflow}; use proc_macro2::TokenStream as TokenStream2; use proc_macro2::{Ident, Literal}; -use quote::{format_ident, quote, ToTokens}; +use quote::{format_ident, quote, quote_spanned, ToTokens}; use syn::{Attribute, PatType, Visibility}; pub(crate) struct ServiceGenerator<'a> { @@ -11,6 +11,8 @@ pub(crate) struct ServiceGenerator<'a> { pub(crate) client_ident: Ident, #[cfg(feature = "ingress_client")] pub(crate) ingress_ident: Ident, + #[cfg(feature = "ingress_client")] + pub(crate) handle_ident: Ident, pub(crate) serve_ident: Ident, pub(crate) vis: &'a Visibility, pub(crate) attrs: &'a [Attribute], @@ -26,6 +28,8 @@ impl<'a> ServiceGenerator<'a> { client_ident: format_ident!("{}Client", s.ident), #[cfg(feature = "ingress_client")] ingress_ident: format_ident!("{}Ingress", s.ident), + #[cfg(feature = "ingress_client")] + handle_ident: format_ident!("{}Handle", s.ident), serve_ident: format_ident!("Serve{}", s.ident), vis: &s.vis, attrs: &s.attrs, @@ -457,6 +461,154 @@ impl<'a> ServiceGenerator<'a> { } } } + + #[cfg(feature = "ingress_client")] + fn struct_handle(&self) -> TokenStream2 { + let &Self { + vis, + ref service_ty, + service_ident, + restate_name, + ref handle_ident, + handlers, + .. + } = self; + + let service_literal = Literal::string(restate_name); + + let key_field = match service_ty { + ServiceType::Service | ServiceType::Workflow => quote! {}, + ServiceType::Object => quote! { + key: String, + }, + }; + + let into_client_impl = match service_ty { + ServiceType::Service => quote! { + impl<'client> ::restate_sdk::ingress_client::IntoServiceHandle<'client> for #handle_ident<'client> { + fn create_handle(client: &'client ::restate_sdk::ingress_client::IngressClient) -> Self { + Self { client } + } + } + }, + ServiceType::Object => quote! { + impl<'client> ::restate_sdk::ingress_client::IntoObjectHandle<'client> for #handle_ident<'client> { + fn create_handle(client: &'client ::restate_sdk::ingress_client::IngressClient, key: String) -> Self { + Self { client, key } + } + } + }, + ServiceType::Workflow => quote! { + impl<'client> ::restate_sdk::ingress_client::IntoWorkflowHandle<'client> for #handle_ident<'client> { + fn create_handle(client: &'client ::restate_sdk::ingress_client::IngressClient, id: String) -> Self { + Self { result: client.handle(::restate_sdk::ingress_client::handle::HandleTarget::workflow(#service_literal, id)) } + } + } + }, + }; + + let doc_msg = + format!("Struct exposing the handle to retrieve the result of [`{service_ident}`]."); + match service_ty { + ServiceType::Service | ServiceType::Object => quote! { + #[doc = #doc_msg] + #vis struct #handle_ident<'client> { + client: &'client ::restate_sdk::ingress_client::IngressClient, + #key_field + } + + #into_client_impl + }, + ServiceType::Workflow => { + let Some(handler) = &handlers + .iter() + .find(|handler| handler.restate_name == "run") + else { + return quote_spanned! { + service_ident.span() => compile_error!("A workflow definition must contain a `run` handler"); + }; + }; + let res_ty = &handler.output_ok; + + quote! { + #[doc = #doc_msg] + #vis struct #handle_ident<'client> { + result: ::restate_sdk::ingress_client::handle::IngressHandle<'client, #res_ty>, + } + + #into_client_impl + } + } + } + } + + #[cfg(feature = "ingress_client")] + fn impl_handle(&self) -> TokenStream2 { + let &Self { + vis, + ref handle_ident, + handlers, + restate_name, + service_ty, + service_ident, + .. + } = self; + + let service_literal = Literal::string(restate_name); + + if let ServiceType::Service | ServiceType::Object = service_ty { + let handlers_fns = handlers.iter().map(|handler| { + let handler_ident = &handler.ident; + let handler_literal = Literal::string(&handler.restate_name); + let res_ty = &handler.output_ok; + + let handle_target = match service_ty { + ServiceType::Service => quote! { + ::restate_sdk::ingress_client::handle::HandleTarget::service(#service_literal, #handler_literal, idempotency_key) + }, + ServiceType::Object => quote! { + ::restate_sdk::ingress_client::handle::HandleTarget::object(#service_literal, &self.key, #handler_literal, idempotency_key) + }, + ServiceType::Workflow => quote! { + ::restate_sdk::ingress_client::handle::HandleTarget::workflow(#service_literal, &self.key, #handler_literal) + } + }; + quote! { + #vis fn #handler_ident(&self, idempotency_key: impl Into) -> ::restate_sdk::ingress_client::handle::IngressHandle<'client, #res_ty> { + self.client.handle(#handle_target) + } + } + }); + + quote! { + impl<'client> #handle_ident<'client> { + #( #handlers_fns )* + } + } + } else { + let Some(handler) = &handlers + .iter() + .find(|handler| handler.restate_name == "run") + else { + return quote_spanned! { + service_ident.span() => compile_error!("A workflow definition must contain a `run` handler"); + }; + }; + let res_ty = &handler.output_ok; + + quote! { + impl<'client> #handle_ident<'client> { + #vis async fn attach(self) -> Result<#res_ty, ::restate_sdk::ingress_client::internal::IngressClientError> { + self.result.attach().await + } + + #vis async fn output(self) -> Result<#res_ty, ::restate_sdk::ingress_client::internal::IngressClientError> { + self.result.output().await + } + } + } + } + } } impl<'a> ToTokens for ServiceGenerator<'a> { @@ -472,6 +624,10 @@ impl<'a> ToTokens for ServiceGenerator<'a> { self.struct_ingress(), #[cfg(feature = "ingress_client")] self.impl_ingress(), + #[cfg(feature = "ingress_client")] + self.struct_handle(), + #[cfg(feature = "ingress_client")] + self.impl_handle(), ]); } } From 84cfb1bfd6f079b3d66861cc4445c2410268deda Mon Sep 17 00:00:00 2001 From: patrickariel <161032380+patrickariel@users.noreply.github.com> Date: Wed, 19 Feb 2025 22:29:38 +0700 Subject: [PATCH 13/13] Remove explicit optional payload --- src/ingress_client/awakeable.rs | 11 ++--------- src/ingress_client/internal.rs | 22 ++++++++++------------ 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/src/ingress_client/awakeable.rs b/src/ingress_client/awakeable.rs index c2a6128..fa812ae 100644 --- a/src/ingress_client/awakeable.rs +++ b/src/ingress_client/awakeable.rs @@ -30,20 +30,13 @@ impl<'a> IngressAwakeable<'a> { self } - /// Resolve the awakeable - pub async fn resolve(self) -> Result<(), IngressClientError> { - self.inner - .resolve_awakeable(&self.key, None::<()>, self.opts) - .await - } - /// Resolve the awakeable with a payload - pub async fn resolve_with_payload( + pub async fn resolve( self, payload: T, ) -> Result<(), IngressClientError> { self.inner - .resolve_awakeable(&self.key, Some(payload), self.opts) + .resolve_awakeable(&self.key, payload, self.opts) .await } diff --git a/src/ingress_client/internal.rs b/src/ingress_client/internal.rs index 269d2c0..aa42948 100644 --- a/src/ingress_client/internal.rs +++ b/src/ingress_client/internal.rs @@ -197,7 +197,7 @@ impl IngressInternal { pub(super) async fn resolve_awakeable( &self, key: &str, - payload: Option, + payload: T, opts: IngressAwakeableOptions, ) -> Result<(), IngressClientError> { let url = format!( @@ -206,22 +206,20 @@ impl IngressInternal { key ); - let mut builder = self.client.post(url); + let mut builder = self + .client + .post(url) + .header(http::header::CONTENT_TYPE, APPLICATION_JSON) + .body( + payload + .serialize() + .map_err(|e| IngressClientError::Serde(Box::new(e)))?, + ); if let Some(timeout) = opts.timeout { builder = builder.timeout(timeout); } - if let Some(payload) = payload { - builder = builder - .header(http::header::CONTENT_TYPE, APPLICATION_JSON) - .body( - payload - .serialize() - .map_err(|e| IngressClientError::Serde(Box::new(e)))?, - ); - } - let res = builder.send().await?; if let Err(e) = res.error_for_status_ref() {