Skip to content

Implement ingress client #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rust-version = "1.76.0"
default = ["http_server", "rand", "uuid"]
hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"]
http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"]
ingress_client = ["dep:reqwest", "restate-sdk-macros/ingress_client"]

[dependencies]
bytes = "1.6.1"
Expand All @@ -22,6 +23,7 @@ hyper-util = { version = "0.1", features = ["tokio", "server", "server-graceful"
pin-project-lite = "0.2"
rand = { version = "0.8.5", optional = true }
regress = "0.10"
reqwest = { version = "0.12", optional = true, features = ["json"] }
restate-sdk-macros = { version = "0.3.2", path = "macros" }
restate-sdk-shared-core = "0.1.0"
serde = "1.0"
Expand Down
3 changes: 3 additions & 0 deletions macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ repository = "https://github.com/restatedev/sdk-rust"
[lib]
proc-macro = true

[features]
ingress_client = []

[dependencies]
proc-macro2 = "1.0"
quote = "1.0"
Expand Down
283 changes: 282 additions & 1 deletion macros/src/gen.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use crate::ast::{Handler, Object, Service, ServiceInner, ServiceType, Workflow};
use proc_macro2::TokenStream as TokenStream2;
use proc_macro2::{Ident, Literal};
use quote::{format_ident, quote, ToTokens};
use quote::{format_ident, quote, quote_spanned, ToTokens};
use syn::{Attribute, PatType, Visibility};

pub(crate) struct ServiceGenerator<'a> {
pub(crate) service_ty: ServiceType,
pub(crate) restate_name: &'a str,
pub(crate) service_ident: &'a Ident,
pub(crate) client_ident: Ident,
#[cfg(feature = "ingress_client")]
pub(crate) ingress_ident: Ident,
#[cfg(feature = "ingress_client")]
pub(crate) handle_ident: Ident,
pub(crate) serve_ident: Ident,
pub(crate) vis: &'a Visibility,
pub(crate) attrs: &'a [Attribute],
Expand All @@ -22,6 +26,10 @@ impl<'a> ServiceGenerator<'a> {
restate_name: &s.restate_name,
service_ident: &s.ident,
client_ident: format_ident!("{}Client", s.ident),
#[cfg(feature = "ingress_client")]
ingress_ident: format_ident!("{}Ingress", s.ident),
#[cfg(feature = "ingress_client")]
handle_ident: format_ident!("{}Handle", s.ident),
serve_ident: format_ident!("Serve{}", s.ident),
vis: &s.vis,
attrs: &s.attrs,
Expand Down Expand Up @@ -336,6 +344,271 @@ impl<'a> ServiceGenerator<'a> {
}
}
}

#[cfg(feature = "ingress_client")]
fn struct_ingress(&self) -> TokenStream2 {
let &Self {
vis,
ref ingress_ident,
ref service_ty,
service_ident,
..
} = self;

let key_field = match service_ty {
ServiceType::Service => quote! {},
ServiceType::Object | ServiceType::Workflow => quote! {
key: String,
},
};

let into_client_impl = match service_ty {
ServiceType::Service => {
quote! {
impl<'client> ::restate_sdk::ingress_client::IntoServiceIngress<'client> for #ingress_ident<'client> {
fn create_ingress(client: &'client ::restate_sdk::ingress_client::IngressClient) -> Self {
Self { client }
}
}
}
}
ServiceType::Object => quote! {
impl<'client> ::restate_sdk::ingress_client::IntoObjectIngress<'client> for #ingress_ident<'client> {
fn create_ingress(client: &'client ::restate_sdk::ingress_client::IngressClient, key: String) -> Self {
Self { client, key }
}
}
},
ServiceType::Workflow => quote! {
impl<'client> ::restate_sdk::ingress_client::IntoWorkflowIngress<'client> for #ingress_ident<'client> {
fn create_ingress(client: &'client ::restate_sdk::ingress_client::IngressClient, key: String) -> Self {
Self { client, key }
}
}
},
};

let doc_msg = format!(
"Struct exposing the ingress client to invoke [`{service_ident}`] without a context."
);
quote! {
#[doc = #doc_msg]
#vis struct #ingress_ident<'client> {
client: &'client ::restate_sdk::ingress_client::IngressClient,
#key_field
}

#into_client_impl
}
}

#[cfg(feature = "ingress_client")]
fn impl_ingress(&self) -> TokenStream2 {
let &Self {
vis,
ref ingress_ident,
handlers,
restate_name,
service_ty,
..
} = self;

let service_literal = Literal::string(restate_name);

let handlers_fns = handlers.iter().map(|handler| {
let handler_ident = &handler.ident;
let handler_literal = Literal::string(&handler.restate_name);

let argument = match &handler.arg {
None => quote! {},
Some(PatType {
ty, ..
}) => quote! { req: #ty }
};
let argument_ty = match &handler.arg {
None => quote! { () },
Some(PatType {
ty, ..
}) => quote! { #ty }
};
let res_ty = &handler.output_ok;
let input = match &handler.arg {
None => quote! { () },
Some(_) => quote! { req }
};
let request_target = match service_ty {
ServiceType::Service => quote! {
::restate_sdk::context::RequestTarget::service(#service_literal, #handler_literal)
},
ServiceType::Object => quote! {
::restate_sdk::context::RequestTarget::object(#service_literal, &self.key, #handler_literal)
},
ServiceType::Workflow => quote! {
::restate_sdk::context::RequestTarget::workflow(#service_literal, &self.key, #handler_literal)
}
};

quote! {
#vis fn #handler_ident(&self, #argument) -> ::restate_sdk::ingress_client::request::IngressRequest<'client, #argument_ty, #res_ty> {
self.client.request(#request_target, #input)
}
}
});

quote! {
impl<'client> #ingress_ident<'client> {
#( #handlers_fns )*
}
}
}

#[cfg(feature = "ingress_client")]
fn struct_handle(&self) -> TokenStream2 {
let &Self {
vis,
ref service_ty,
service_ident,
restate_name,
ref handle_ident,
handlers,
..
} = self;

let service_literal = Literal::string(restate_name);

let key_field = match service_ty {
ServiceType::Service | ServiceType::Workflow => quote! {},
ServiceType::Object => quote! {
key: String,
},
};

let into_client_impl = match service_ty {
ServiceType::Service => quote! {
impl<'client> ::restate_sdk::ingress_client::IntoServiceHandle<'client> for #handle_ident<'client> {
fn create_handle(client: &'client ::restate_sdk::ingress_client::IngressClient) -> Self {
Self { client }
}
}
},
ServiceType::Object => quote! {
impl<'client> ::restate_sdk::ingress_client::IntoObjectHandle<'client> for #handle_ident<'client> {
fn create_handle(client: &'client ::restate_sdk::ingress_client::IngressClient, key: String) -> Self {
Self { client, key }
}
}
},
ServiceType::Workflow => quote! {
impl<'client> ::restate_sdk::ingress_client::IntoWorkflowHandle<'client> for #handle_ident<'client> {
fn create_handle(client: &'client ::restate_sdk::ingress_client::IngressClient, id: String) -> Self {
Self { result: client.handle(::restate_sdk::ingress_client::handle::HandleTarget::workflow(#service_literal, id)) }
}
}
},
};

let doc_msg =
format!("Struct exposing the handle to retrieve the result of [`{service_ident}`].");
match service_ty {
ServiceType::Service | ServiceType::Object => quote! {
#[doc = #doc_msg]
#vis struct #handle_ident<'client> {
client: &'client ::restate_sdk::ingress_client::IngressClient,
#key_field
}

#into_client_impl
},
ServiceType::Workflow => {
let Some(handler) = &handlers
.iter()
.find(|handler| handler.restate_name == "run")
else {
return quote_spanned! {
service_ident.span() => compile_error!("A workflow definition must contain a `run` handler");
};
};
let res_ty = &handler.output_ok;

quote! {
#[doc = #doc_msg]
#vis struct #handle_ident<'client> {
result: ::restate_sdk::ingress_client::handle::IngressHandle<'client, #res_ty>,
}

#into_client_impl
}
}
}
}

#[cfg(feature = "ingress_client")]
fn impl_handle(&self) -> TokenStream2 {
let &Self {
vis,
ref handle_ident,
handlers,
restate_name,
service_ty,
service_ident,
..
} = self;

let service_literal = Literal::string(restate_name);

if let ServiceType::Service | ServiceType::Object = service_ty {
let handlers_fns = handlers.iter().map(|handler| {
let handler_ident = &handler.ident;
let handler_literal = Literal::string(&handler.restate_name);
let res_ty = &handler.output_ok;

let handle_target = match service_ty {
ServiceType::Service => quote! {
::restate_sdk::ingress_client::handle::HandleTarget::service(#service_literal, #handler_literal, idempotency_key)
},
ServiceType::Object => quote! {
::restate_sdk::ingress_client::handle::HandleTarget::object(#service_literal, &self.key, #handler_literal, idempotency_key)
},
ServiceType::Workflow => quote! {
::restate_sdk::ingress_client::handle::HandleTarget::workflow(#service_literal, &self.key, #handler_literal)
}
};
quote! {
#vis fn #handler_ident(&self, idempotency_key: impl Into<String>) -> ::restate_sdk::ingress_client::handle::IngressHandle<'client, #res_ty> {
self.client.handle(#handle_target)
}
}
});

quote! {
impl<'client> #handle_ident<'client> {
#( #handlers_fns )*
}
}
} else {
let Some(handler) = &handlers
.iter()
.find(|handler| handler.restate_name == "run")
else {
return quote_spanned! {
service_ident.span() => compile_error!("A workflow definition must contain a `run` handler");
};
};
let res_ty = &handler.output_ok;

quote! {
impl<'client> #handle_ident<'client> {
#vis async fn attach(self) -> Result<#res_ty, ::restate_sdk::ingress_client::internal::IngressClientError> {
self.result.attach().await
}

#vis async fn output(self) -> Result<#res_ty, ::restate_sdk::ingress_client::internal::IngressClientError> {
self.result.output().await
}
}
}
}
}
}

impl<'a> ToTokens for ServiceGenerator<'a> {
Expand All @@ -347,6 +620,14 @@ impl<'a> ToTokens for ServiceGenerator<'a> {
self.impl_discoverable(),
self.struct_client(),
self.impl_client(),
#[cfg(feature = "ingress_client")]
self.struct_ingress(),
#[cfg(feature = "ingress_client")]
self.impl_ingress(),
#[cfg(feature = "ingress_client")]
self.struct_handle(),
#[cfg(feature = "ingress_client")]
self.impl_handle(),
]);
}
}
49 changes: 49 additions & 0 deletions src/ingress_client/awakeable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::time::Duration;

use crate::serde::Serialize;

use super::{internal::IngressClientError, IngressInternal};

pub struct IngressAwakeable<'a> {
inner: &'a IngressInternal,
key: String,
opts: IngressAwakeableOptions,
}

#[derive(Default, Clone)]
pub(super) struct IngressAwakeableOptions {
pub(super) timeout: Option<Duration>,
}

impl<'a> IngressAwakeable<'a> {
pub(super) fn new(inner: &'a IngressInternal, key: impl Into<String>) -> Self {
Self {
inner,
key: key.into(),
opts: Default::default(),
}
}

/// Set the timeout for the request.
pub fn timeout(mut self, value: Duration) -> Self {
self.opts.timeout = Some(value);
self
}

/// Resolve the awakeable with a payload
pub async fn resolve<T: Serialize + 'static>(
self,
payload: T,
) -> Result<(), IngressClientError> {
self.inner
.resolve_awakeable(&self.key, payload, self.opts)
.await
}

/// Reject the awakeable with a failure message
pub async fn reject(self, message: impl Into<String>) -> Result<(), IngressClientError> {
self.inner
.reject_awakeable(&self.key, &message.into(), self.opts)
.await
}
}
Loading