Skip to content

Handlers can be async #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,18 +174,21 @@ For error reporting to the runtime APIs the library defines the `RuntimeApiError
This library makes it easy to create Rust executables for AWS lambda. The library defines a `lambda!()` macro. Call the `lambda!()` macro from your main method with an implementation the `Handler` type:

```rust
pub trait Handler<E, O> {
pub trait Handler<E, O>: Send {
/// Future of return value returned by handler.
type Future: Future<Item=O, Error=HandlerError> + Send;
/// IntoFuture of return value returned by handler.
type IntoFuture: IntoFuture<Future=Self::Future, Item=O, Error=HandlerError> + Send;

/// Run the handler.
fn run(
&mut self,
event: E,
ctx: Context
) -> Result<O, HandlerError>;
fn run(&mut self, event: E, ctx: Context) -> Self::IntoFuture;
}
```

`Handler` provides a default implementation that enables you to provide a Rust closure or function pointer to the `lambda!()` macro.

Your `Handler` needs to return something which implements `IntoFuture`; `Result` implements `IntoIterator`, so most synchronous `Handler`s will return a `Result`.

Optionally, you can pass your own instance of Tokio runtime to the `lambda!()` macro. See our [`with_custom_runtime.rs` example](https://github.com/awslabs/aws-lambda-rust-runtime/tree/master/lambda-runtime/examples/with_custom_runtime.rs)

## AWS event objects
Expand Down
8 changes: 4 additions & 4 deletions lambda-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ use crate::{request::LambdaRequest, response::LambdaResponse};
pub type Request = http::Request<Body>;

/// Functions serving as ALB and API Gateway handlers must conform to this type.
pub trait Handler<R> {
pub trait Handler<R>: Send {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Send not inferred on Handler?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Handler is a trait, there's no way for the type system to know that an implementation of it doesn't have some non-Send fields. We can either add the bound here to require that implementors are Send, or we can add a + Send bound in each of the places that requires Handler be Send, but doing the latter would require adding the bound to several core functions, and would lead to more confusing error messages if you had a non-Send trait.

/// Run the handler.
fn run(&mut self, event: Request, ctx: Context) -> Result<R, HandlerError>;
}

impl<F, R> Handler<R> for F
where
F: FnMut(Request, Context) -> Result<R, HandlerError>,
F: FnMut(Request, Context) -> Result<R, HandlerError> + Send,
{
fn run(&mut self, event: Request, ctx: Context) -> Result<R, HandlerError> {
(*self)(event, ctx)
Expand All @@ -92,14 +92,14 @@ where
///
/// # Panics
/// The function panics if the Lambda environment variables are not set.
pub fn start<R>(f: impl Handler<R>, runtime: Option<TokioRuntime>)
pub fn start<R>(f: impl Handler<R> + 'static, runtime: Option<TokioRuntime>)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  • I'm not completely sure that impl Handler needs to be &'static. I wonder if start<R> can be refactored to take an impl Future instead of an impl Handler<R>.
  • If we're able to get rid of all .waits() inside the client, we can remove the optional TokioRuntime parameter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll hold off on looking at this until #63 merges, if that's ok

where
R: IntoResponse,
{
// handler requires a mutable ref
let mut func = f;
lambda::start(
|req: LambdaRequest<'_>, ctx: Context| {
move |req: LambdaRequest<'_>, ctx: Context| {
let is_alb = req.request_context.is_alb();
func.run(req.into(), ctx)
.map(|resp| LambdaResponse::from_response(is_alb, resp.into_response()))
Expand Down
192 changes: 99 additions & 93 deletions lambda-runtime-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use hyper::{
};
use serde_derive::Deserialize;
use serde_json;
use tokio::runtime::Runtime;
use tokio::{prelude::future::IntoFuture, runtime::TaskExecutor};

use crate::error::{ApiError, ErrorResponse, RuntimeApiError};

Expand Down Expand Up @@ -120,49 +120,44 @@ pub struct EventContext {
}

/// Used by the Runtime to communicate with the internal endpoint.
#[derive(Clone)]
pub struct RuntimeClient {
_runtime: Runtime,
http_client: Client<HttpConnector, Body>,
endpoint: String,
}

impl RuntimeClient {
/// Creates a new instance of the Runtime APIclient SDK. The http client has timeouts disabled and
/// will always send a `Connection: keep-alive` header.
pub fn new(endpoint: String, runtime: Option<Runtime>) -> Result<Self, ApiError> {
pub fn new(endpoint: String, task_executor: TaskExecutor) -> Self {
debug!("Starting new HttpRuntimeClient for {}", endpoint);
// start a tokio core main event loop for hyper
let runtime = match runtime {
Some(r) => r,
None => Runtime::new()?,
};

let http_client = Client::builder().executor(runtime.executor()).build_http();
let http_client = Client::builder().executor(task_executor).build_http();

Ok(RuntimeClient {
_runtime: runtime,
http_client,
endpoint,
})
RuntimeClient { http_client, endpoint }
}
}

impl RuntimeClient {
/// Polls for new events to the Runtime APIs.
pub fn next_event(&self) -> Result<(Vec<u8>, EventContext), ApiError> {
pub fn next_event(&self) -> impl Future<Item = (Vec<u8>, EventContext), Error = ApiError> {
let uri = format!(
"http://{}/{}/runtime/invocation/next",
self.endpoint, RUNTIME_API_VERSION
)
.parse()?;
.parse();
trace!("Polling for next event");

// We wait instead of processing the future asynchronously because AWS Lambda
// itself enforces only one event per container at a time. No point in taking on
// the additional complexity.
let out = self.http_client.get(uri).wait();
match out {
Ok(resp) => {
let http_client = self.http_client.clone();
uri.into_future()
.map_err(ApiError::from)
.and_then(move |uri| {
http_client.get(uri).map_err(|e| {
error!("Error when fetching next event from Runtime API: {}", e);
ApiError::from(e)
})
})
.and_then(|resp| {
if resp.status().is_client_error() {
error!(
"Runtime API returned client error when polling for new events: {}",
Expand All @@ -182,22 +177,23 @@ impl RuntimeClient {
.unrecoverable()
.clone());
}
let ctx = self.get_event_context(&resp.headers())?;
let out = resp.into_body().concat2().wait()?;
let buf: Vec<u8> = out.into_bytes().to_vec();
return Ok((Self::get_event_context(&resp.headers())?, resp));
})
.and_then(|(ctx, resp)| {
Ok(ctx)
.into_future()
.join(resp.into_body().concat2().map_err(Into::into))
})
.map(|(ctx, body)| {
let buf = body.into_bytes().to_vec();

trace!(
"Received new event for request id {}. Event length {} bytes",
ctx.aws_request_id,
buf.len()
);
Ok((buf, ctx))
}
Err(e) => {
error!("Error when fetching next event from Runtime API: {}", e);
Err(ApiError::from(e))
}
}
(buf, ctx)
})
}

/// Calls the Lambda Runtime APIs to submit a response to an event. In this function we treat
Expand All @@ -211,41 +207,44 @@ impl RuntimeClient {
/// * `output` The object be sent back to the Runtime APIs as a response.
///
/// # Returns
/// A `Result` object containing a bool return value for the call or an `error::ApiError` instance.
pub fn event_response(&self, request_id: &str, output: Vec<u8>) -> Result<(), ApiError> {
let uri: Uri = format!(
/// A `Future` object containing a either resolving () for success or an `error::ApiError` instance.
pub fn event_response(&self, request_id: String, output: Vec<u8>) -> impl Future<Item = (), Error = ApiError> {
let uri = format!(
"http://{}/{}/runtime/invocation/{}/response",
self.endpoint, RUNTIME_API_VERSION, request_id
)
.parse()?;
.parse();
trace!(
"Posting response for request {} to Runtime API. Response length {} bytes",
request_id,
output.len()
);
let req = self.get_runtime_post_request(&uri, output);

match self.http_client.request(req).wait() {
Ok(resp) => {
if !resp.status().is_success() {
error!(
"Error from Runtime API when posting response for request {}: {}",
request_id,
resp.status()
);
return Err(ApiError::new(&format!(
"Error {} while sending response",
resp.status()
)));
let http_client = self.http_client.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

http_client might be able to be wrapped in an Arc, which would make the clones cheaper, I believe. It might be an unnecessary micro-optimization.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now cloning a Client is 3 Arc clones and ~6 word copies, so yeah, probably a bit micro for now, but I can do it if you'd like :)

uri.into_future()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know that Uri had an IntoFuture implementation. Can I ask why you opted for this?

Copy link
Contributor

@softprops softprops Jan 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think uri here is a Result and all Result types get a free IntoFuture impl. Since the return type changed, the ? needed to be removed above

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah; I could change this to future::done(uri) if that's more clear?

.map_err(ApiError::from)
.map(move |uri| Self::get_runtime_post_request(&uri, output))
.and_then(move |req| http_client.request(req).map_err(ApiError::from))
.then(move |result| match result {
Ok(resp) => {
if !resp.status().is_success() {
error!(
"Error from Runtime API when posting response for request {}: {}",
request_id,
resp.status()
);
return Err(ApiError::new(&format!(
"Error {} while sending response",
resp.status()
)));
}
trace!("Posted response to Runtime API for request {}", request_id);
Ok(())
}
trace!("Posted response to Runtime API for request {}", request_id);
Ok(())
}
Err(e) => {
error!("Error when calling runtime API for request {}: {}", request_id, e);
Err(ApiError::from(e))
}
}
Err(e) => {
error!("Error when calling runtime API for request {}: {}", request_id, e);
Err(ApiError::from(e))
}
})
}

/// Calls Lambda's Runtime APIs to send an error generated by the `Handler`. Because it's rust,
Expand All @@ -259,41 +258,48 @@ impl RuntimeClient {
/// object.
///
/// # Returns
/// A `Result` object containing a bool return value for the call or an `error::ApiError` instance.
pub fn event_error(&self, request_id: &str, e: &dyn RuntimeApiError) -> Result<(), ApiError> {
let uri: Uri = format!(
/// A `Future` object containing a either resolving () for success or an `error::ApiError` instance.
pub fn event_error(&self, request_id: String, e: &dyn RuntimeApiError) -> impl Future<Item = (), Error = ApiError> {
let uri = format!(
"http://{}/{}/runtime/invocation/{}/error",
self.endpoint, RUNTIME_API_VERSION, request_id
)
.parse()?;
trace!(
"Posting error to runtime API for request {}: {}",
request_id,
e.to_response().error_message
);
let req = self.get_runtime_error_request(&uri, &e.to_response());

match self.http_client.request(req).wait() {
Ok(resp) => {
if !resp.status().is_success() {
error!(
"Error from Runtime API when posting error response for request {}: {}",
request_id,
resp.status()
);
return Err(ApiError::new(&format!(
"Error {} while sending response",
resp.status()
)));
.parse();
let http_client = self.http_client.clone();
let response = e.to_response();
let request_id2 = request_id.clone();
uri.into_future()
.map_err(ApiError::from)
.map(move |uri| (Self::get_runtime_error_request(&uri, &response), response))
.and_then(move |(req, error_response)| {
trace!(
"Posting error to runtime API for request {}: {}",
request_id,
error_response.error_message
);
http_client.request(req).map_err(ApiError::from)
})
.then(move |result| match result {
Ok(resp) => {
if !resp.status().is_success() {
error!(
"Error from Runtime API when posting error response for request {}: {}",
request_id2,
resp.status()
);
return Err(ApiError::new(&format!(
"Error {} while sending response",
resp.status()
)));
}
trace!("Posted error response for request id {}", request_id2);
Ok(())
}
trace!("Posted error response for request id {}", request_id);
Ok(())
}
Err(e) => {
error!("Error when calling runtime API for request {}: {}", request_id, e);
Err(ApiError::from(e))
}
}
Err(e) => {
error!("Error when calling runtime API for request {}: {}", request_id2, e);
Err(ApiError::from(e))
}
})
}

/// Calls the Runtime APIs to report a failure during the init process.
Expand All @@ -312,7 +318,7 @@ impl RuntimeClient {
.parse()
.expect("Could not generate Runtime URI");
error!("Calling fail_init Runtime API: {}", e.to_response().error_message);
let req = self.get_runtime_error_request(&uri, &e.to_response());
let req = Self::get_runtime_error_request(&uri, &e.to_response());

self.http_client
.request(req)
Expand Down Expand Up @@ -344,7 +350,7 @@ impl RuntimeClient {
///
/// # Returns
/// A Populated Hyper `Request` object.
fn get_runtime_post_request(&self, uri: &Uri, body: Vec<u8>) -> Request<Body> {
fn get_runtime_post_request(uri: &Uri, body: Vec<u8>) -> Request<Body> {
Request::builder()
.method(Method::POST)
.uri(uri.clone())
Expand All @@ -353,7 +359,7 @@ impl RuntimeClient {
.unwrap()
}

fn get_runtime_error_request(&self, uri: &Uri, e: &ErrorResponse) -> Request<Body> {
fn get_runtime_error_request(uri: &Uri, e: &ErrorResponse) -> Request<Body> {
let body = serde_json::to_vec(e).expect("Could not turn error object into response JSON");
Request::builder()
.method(Method::POST)
Expand All @@ -378,7 +384,7 @@ impl RuntimeClient {
/// A `Result` containing the populated `EventContext` or an `ApiError` if the required headers
/// were not present or the client context and cognito identity could not be parsed from the
/// JSON string.
fn get_event_context(&self, headers: &HeaderMap<HeaderValue>) -> Result<EventContext, ApiError> {
fn get_event_context(headers: &HeaderMap<HeaderValue>) -> Result<EventContext, ApiError> {
// let headers = resp.headers();

let aws_request_id = match headers.get(LambdaHeaders::RequestId.as_str()) {
Expand Down
11 changes: 7 additions & 4 deletions lambda-runtime-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
//!
//! ```rust,no_run
//! extern crate lambda_runtime_client;
//! extern crate tokio;
//! #[macro_use]
//! extern crate serde_derive;
//! extern crate serde_json;
//!
//! use lambda_runtime_client::{RuntimeClient, EventContext};
//! use tokio::prelude::future::Future;
//! use tokio::runtime::Runtime as TokioRuntime;
//!
//! #[derive(Serialize, Deserialize, Debug)]
//! struct CustomEvent {
Expand All @@ -31,11 +34,11 @@
//! }
//!
//! fn main() {
//! let tokio_runtime = TokioRuntime::new().expect("Could not make tokio runtime");
//! let runtime_endpoint = String::from("http://localhost:8080");
//! let client = RuntimeClient::new(runtime_endpoint, None)
//! .expect("Could not initialize client");
//! let client = RuntimeClient::new(runtime_endpoint, tokio_runtime.executor());
//!
//! let (event_data, event_context) = client.next_event()
//! let (event_data, event_context) = client.next_event().wait()
//! .expect("Could not retrieve next event");
//! let custom_event: CustomEvent = serde_json::from_slice(&event_data)
//! .expect("Could not turn Vec<u8> into CustomEvent object");
Expand All @@ -45,7 +48,7 @@
//! let resp_object = CustomResponse{ surname: String::from("Doe")};
//! let resp_vec = serde_json::to_vec(&resp_object)
//! .expect("Could not serialize CustomResponse to Vec<u8>");
//! client.event_response(&event_context.aws_request_id, resp_vec)
//! client.event_response(event_context.aws_request_id.clone(), resp_vec).wait()
//! .expect("Response sent successfully");
//! } else {
//! // return a custom error by implementing the RuntimeApiError trait.
Expand Down
Loading