diff --git a/README.md b/README.md index 028f9da7..6a9393cb 100644 --- a/README.md +++ b/README.md @@ -174,18 +174,21 @@ For error reporting to the runtime APIs the library defines the `RuntimeApiError This library makes it easy to create Rust executables for AWS lambda. The library defines a `lambda!()` macro. Call the `lambda!()` macro from your main method with an implementation the `Handler` type: ```rust -pub trait Handler { +pub trait Handler: Send { + /// Future of return value returned by handler. + type Future: Future + Send; + /// IntoFuture of return value returned by handler. + type IntoFuture: IntoFuture + Send; + /// Run the handler. - fn run( - &mut self, - event: E, - ctx: Context - ) -> Result; + fn run(&mut self, event: E, ctx: Context) -> Self::IntoFuture; } ``` `Handler` provides a default implementation that enables you to provide a Rust closure or function pointer to the `lambda!()` macro. +Your `Handler` needs to return something which implements `IntoFuture`; `Result` implements `IntoIterator`, so most synchronous `Handler`s will return a `Result`. + Optionally, you can pass your own instance of Tokio runtime to the `lambda!()` macro. See our [`with_custom_runtime.rs` example](https://github.com/awslabs/aws-lambda-rust-runtime/tree/master/lambda-runtime/examples/with_custom_runtime.rs) ## AWS event objects diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index 56402a1a..9b6e66f4 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -70,14 +70,14 @@ use crate::{request::LambdaRequest, response::LambdaResponse}; pub type Request = http::Request; /// Functions serving as ALB and API Gateway handlers must conform to this type. -pub trait Handler { +pub trait Handler: Send { /// Run the handler. fn run(&mut self, event: Request, ctx: Context) -> Result; } impl Handler for F where - F: FnMut(Request, Context) -> Result, + F: FnMut(Request, Context) -> Result + Send, { fn run(&mut self, event: Request, ctx: Context) -> Result { (*self)(event, ctx) @@ -92,14 +92,14 @@ where /// /// # Panics /// The function panics if the Lambda environment variables are not set. -pub fn start(f: impl Handler, runtime: Option) +pub fn start(f: impl Handler + 'static, runtime: Option) where R: IntoResponse, { // handler requires a mutable ref let mut func = f; lambda::start( - |req: LambdaRequest<'_>, ctx: Context| { + move |req: LambdaRequest<'_>, ctx: Context| { let is_alb = req.request_context.is_alb(); func.run(req.into(), ctx) .map(|resp| LambdaResponse::from_response(is_alb, resp.into_response())) diff --git a/lambda-runtime-client/src/client.rs b/lambda-runtime-client/src/client.rs index f87a2d2c..f9268318 100644 --- a/lambda-runtime-client/src/client.rs +++ b/lambda-runtime-client/src/client.rs @@ -8,7 +8,7 @@ use hyper::{ }; use serde_derive::Deserialize; use serde_json; -use tokio::runtime::Runtime; +use tokio::{prelude::future::IntoFuture, runtime::TaskExecutor}; use crate::error::{ApiError, ErrorResponse, RuntimeApiError}; @@ -120,8 +120,8 @@ pub struct EventContext { } /// Used by the Runtime to communicate with the internal endpoint. +#[derive(Clone)] pub struct RuntimeClient { - _runtime: Runtime, http_client: Client, endpoint: String, } @@ -129,40 +129,35 @@ pub struct RuntimeClient { impl RuntimeClient { /// Creates a new instance of the Runtime APIclient SDK. The http client has timeouts disabled and /// will always send a `Connection: keep-alive` header. - pub fn new(endpoint: String, runtime: Option) -> Result { + pub fn new(endpoint: String, task_executor: TaskExecutor) -> Self { debug!("Starting new HttpRuntimeClient for {}", endpoint); // start a tokio core main event loop for hyper - let runtime = match runtime { - Some(r) => r, - None => Runtime::new()?, - }; - let http_client = Client::builder().executor(runtime.executor()).build_http(); + let http_client = Client::builder().executor(task_executor).build_http(); - Ok(RuntimeClient { - _runtime: runtime, - http_client, - endpoint, - }) + RuntimeClient { http_client, endpoint } } } impl RuntimeClient { /// Polls for new events to the Runtime APIs. - pub fn next_event(&self) -> Result<(Vec, EventContext), ApiError> { + pub fn next_event(&self) -> impl Future, EventContext), Error = ApiError> { let uri = format!( "http://{}/{}/runtime/invocation/next", self.endpoint, RUNTIME_API_VERSION ) - .parse()?; + .parse(); trace!("Polling for next event"); - - // We wait instead of processing the future asynchronously because AWS Lambda - // itself enforces only one event per container at a time. No point in taking on - // the additional complexity. - let out = self.http_client.get(uri).wait(); - match out { - Ok(resp) => { + let http_client = self.http_client.clone(); + uri.into_future() + .map_err(ApiError::from) + .and_then(move |uri| { + http_client.get(uri).map_err(|e| { + error!("Error when fetching next event from Runtime API: {}", e); + ApiError::from(e) + }) + }) + .and_then(|resp| { if resp.status().is_client_error() { error!( "Runtime API returned client error when polling for new events: {}", @@ -182,22 +177,23 @@ impl RuntimeClient { .unrecoverable() .clone()); } - let ctx = self.get_event_context(&resp.headers())?; - let out = resp.into_body().concat2().wait()?; - let buf: Vec = out.into_bytes().to_vec(); + return Ok((Self::get_event_context(&resp.headers())?, resp)); + }) + .and_then(|(ctx, resp)| { + Ok(ctx) + .into_future() + .join(resp.into_body().concat2().map_err(Into::into)) + }) + .map(|(ctx, body)| { + let buf = body.into_bytes().to_vec(); trace!( "Received new event for request id {}. Event length {} bytes", ctx.aws_request_id, buf.len() ); - Ok((buf, ctx)) - } - Err(e) => { - error!("Error when fetching next event from Runtime API: {}", e); - Err(ApiError::from(e)) - } - } + (buf, ctx) + }) } /// Calls the Lambda Runtime APIs to submit a response to an event. In this function we treat @@ -211,41 +207,44 @@ impl RuntimeClient { /// * `output` The object be sent back to the Runtime APIs as a response. /// /// # Returns - /// A `Result` object containing a bool return value for the call or an `error::ApiError` instance. - pub fn event_response(&self, request_id: &str, output: Vec) -> Result<(), ApiError> { - let uri: Uri = format!( + /// A `Future` object containing a either resolving () for success or an `error::ApiError` instance. + pub fn event_response(&self, request_id: String, output: Vec) -> impl Future { + let uri = format!( "http://{}/{}/runtime/invocation/{}/response", self.endpoint, RUNTIME_API_VERSION, request_id ) - .parse()?; + .parse(); trace!( "Posting response for request {} to Runtime API. Response length {} bytes", request_id, output.len() ); - let req = self.get_runtime_post_request(&uri, output); - - match self.http_client.request(req).wait() { - Ok(resp) => { - if !resp.status().is_success() { - error!( - "Error from Runtime API when posting response for request {}: {}", - request_id, - resp.status() - ); - return Err(ApiError::new(&format!( - "Error {} while sending response", - resp.status() - ))); + let http_client = self.http_client.clone(); + uri.into_future() + .map_err(ApiError::from) + .map(move |uri| Self::get_runtime_post_request(&uri, output)) + .and_then(move |req| http_client.request(req).map_err(ApiError::from)) + .then(move |result| match result { + Ok(resp) => { + if !resp.status().is_success() { + error!( + "Error from Runtime API when posting response for request {}: {}", + request_id, + resp.status() + ); + return Err(ApiError::new(&format!( + "Error {} while sending response", + resp.status() + ))); + } + trace!("Posted response to Runtime API for request {}", request_id); + Ok(()) } - trace!("Posted response to Runtime API for request {}", request_id); - Ok(()) - } - Err(e) => { - error!("Error when calling runtime API for request {}: {}", request_id, e); - Err(ApiError::from(e)) - } - } + Err(e) => { + error!("Error when calling runtime API for request {}: {}", request_id, e); + Err(ApiError::from(e)) + } + }) } /// Calls Lambda's Runtime APIs to send an error generated by the `Handler`. Because it's rust, @@ -259,41 +258,48 @@ impl RuntimeClient { /// object. /// /// # Returns - /// A `Result` object containing a bool return value for the call or an `error::ApiError` instance. - pub fn event_error(&self, request_id: &str, e: &dyn RuntimeApiError) -> Result<(), ApiError> { - let uri: Uri = format!( + /// A `Future` object containing a either resolving () for success or an `error::ApiError` instance. + pub fn event_error(&self, request_id: String, e: &dyn RuntimeApiError) -> impl Future { + let uri = format!( "http://{}/{}/runtime/invocation/{}/error", self.endpoint, RUNTIME_API_VERSION, request_id ) - .parse()?; - trace!( - "Posting error to runtime API for request {}: {}", - request_id, - e.to_response().error_message - ); - let req = self.get_runtime_error_request(&uri, &e.to_response()); - - match self.http_client.request(req).wait() { - Ok(resp) => { - if !resp.status().is_success() { - error!( - "Error from Runtime API when posting error response for request {}: {}", - request_id, - resp.status() - ); - return Err(ApiError::new(&format!( - "Error {} while sending response", - resp.status() - ))); + .parse(); + let http_client = self.http_client.clone(); + let response = e.to_response(); + let request_id2 = request_id.clone(); + uri.into_future() + .map_err(ApiError::from) + .map(move |uri| (Self::get_runtime_error_request(&uri, &response), response)) + .and_then(move |(req, error_response)| { + trace!( + "Posting error to runtime API for request {}: {}", + request_id, + error_response.error_message + ); + http_client.request(req).map_err(ApiError::from) + }) + .then(move |result| match result { + Ok(resp) => { + if !resp.status().is_success() { + error!( + "Error from Runtime API when posting error response for request {}: {}", + request_id2, + resp.status() + ); + return Err(ApiError::new(&format!( + "Error {} while sending response", + resp.status() + ))); + } + trace!("Posted error response for request id {}", request_id2); + Ok(()) } - trace!("Posted error response for request id {}", request_id); - Ok(()) - } - Err(e) => { - error!("Error when calling runtime API for request {}: {}", request_id, e); - Err(ApiError::from(e)) - } - } + Err(e) => { + error!("Error when calling runtime API for request {}: {}", request_id2, e); + Err(ApiError::from(e)) + } + }) } /// Calls the Runtime APIs to report a failure during the init process. @@ -312,7 +318,7 @@ impl RuntimeClient { .parse() .expect("Could not generate Runtime URI"); error!("Calling fail_init Runtime API: {}", e.to_response().error_message); - let req = self.get_runtime_error_request(&uri, &e.to_response()); + let req = Self::get_runtime_error_request(&uri, &e.to_response()); self.http_client .request(req) @@ -344,7 +350,7 @@ impl RuntimeClient { /// /// # Returns /// A Populated Hyper `Request` object. - fn get_runtime_post_request(&self, uri: &Uri, body: Vec) -> Request { + fn get_runtime_post_request(uri: &Uri, body: Vec) -> Request { Request::builder() .method(Method::POST) .uri(uri.clone()) @@ -353,7 +359,7 @@ impl RuntimeClient { .unwrap() } - fn get_runtime_error_request(&self, uri: &Uri, e: &ErrorResponse) -> Request { + fn get_runtime_error_request(uri: &Uri, e: &ErrorResponse) -> Request { let body = serde_json::to_vec(e).expect("Could not turn error object into response JSON"); Request::builder() .method(Method::POST) @@ -378,7 +384,7 @@ impl RuntimeClient { /// A `Result` containing the populated `EventContext` or an `ApiError` if the required headers /// were not present or the client context and cognito identity could not be parsed from the /// JSON string. - fn get_event_context(&self, headers: &HeaderMap) -> Result { + fn get_event_context(headers: &HeaderMap) -> Result { // let headers = resp.headers(); let aws_request_id = match headers.get(LambdaHeaders::RequestId.as_str()) { diff --git a/lambda-runtime-client/src/lib.rs b/lambda-runtime-client/src/lib.rs index efd336bd..f64ce6b1 100644 --- a/lambda-runtime-client/src/lib.rs +++ b/lambda-runtime-client/src/lib.rs @@ -14,11 +14,14 @@ //! //! ```rust,no_run //! extern crate lambda_runtime_client; +//! extern crate tokio; //! #[macro_use] //! extern crate serde_derive; //! extern crate serde_json; //! //! use lambda_runtime_client::{RuntimeClient, EventContext}; +//! use tokio::prelude::future::Future; +//! use tokio::runtime::Runtime as TokioRuntime; //! //! #[derive(Serialize, Deserialize, Debug)] //! struct CustomEvent { @@ -31,11 +34,11 @@ //! } //! //! fn main() { +//! let tokio_runtime = TokioRuntime::new().expect("Could not make tokio runtime"); //! let runtime_endpoint = String::from("http://localhost:8080"); -//! let client = RuntimeClient::new(runtime_endpoint, None) -//! .expect("Could not initialize client"); +//! let client = RuntimeClient::new(runtime_endpoint, tokio_runtime.executor()); //! -//! let (event_data, event_context) = client.next_event() +//! let (event_data, event_context) = client.next_event().wait() //! .expect("Could not retrieve next event"); //! let custom_event: CustomEvent = serde_json::from_slice(&event_data) //! .expect("Could not turn Vec into CustomEvent object"); @@ -45,7 +48,7 @@ //! let resp_object = CustomResponse{ surname: String::from("Doe")}; //! let resp_vec = serde_json::to_vec(&resp_object) //! .expect("Could not serialize CustomResponse to Vec"); -//! client.event_response(&event_context.aws_request_id, resp_vec) +//! client.event_response(event_context.aws_request_id.clone(), resp_vec).wait() //! .expect("Response sent successfully"); //! } else { //! // return a custom error by implementing the RuntimeApiError trait. diff --git a/lambda-runtime/examples/async.rs b/lambda-runtime/examples/async.rs new file mode 100644 index 00000000..56451ad3 --- /dev/null +++ b/lambda-runtime/examples/async.rs @@ -0,0 +1,30 @@ +use std::error::Error; + +use lambda_runtime::{error::HandlerError, lambda, Context}; +use serde_derive::{Deserialize, Serialize}; +use simple_logger; +use tokio::prelude::future::{ok, Future}; + +#[derive(Deserialize)] +struct CustomEvent { + #[serde(rename = "firstName")] + first_name: String, +} + +#[derive(Serialize)] +struct CustomOutput { + message: String, +} + +fn main() -> Result<(), Box> { + simple_logger::init_with_level(log::Level::Debug).unwrap(); + lambda!(my_handler); + + Ok(()) +} + +fn my_handler(e: CustomEvent, _c: Context) -> impl Future { + ok(format!("Hello, {}!", e.first_name)) + .map(|message| format!("{} (modified in a Future)", message)) + .map(|message| CustomOutput { message }) +} diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index caac0983..7bcbfc0a 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -3,27 +3,46 @@ use std::{error::Error, marker::PhantomData, result}; use lambda_runtime_client::RuntimeClient; use serde; use serde_json; -use tokio::runtime::Runtime as TokioRuntime; +use tokio::{ + prelude::future::{loop_fn, Future, IntoFuture, Loop}, + runtime::Runtime as TokioRuntime, +}; use crate::{ context::Context, env::{ConfigProvider, EnvConfigProvider, FunctionSettings}, error::{HandlerError, RuntimeError}, }; +use std::sync::{Arc, Mutex}; +use tokio::runtime::TaskExecutor; const MAX_RETRIES: i8 = 3; /// Functions acting as a handler must conform to this type. -pub trait Handler { +pub trait Handler: Send { + /// Future of return value returned by handler. + type Future: Future + Send; + /// IntoFuture of return value returned by handler. + type IntoFuture: IntoFuture + Send; + /// Run the handler. - fn run(&mut self, event: E, ctx: Context) -> Result; + fn run(&mut self, event: E, ctx: Context) -> Self::IntoFuture; } -impl Handler for F +impl< + F, + E, + O: Send, + Fut: Future + Send, + IntoFut: IntoFuture + Send, + > Handler for F where - F: FnMut(E, Context) -> Result, + F: FnMut(E, Context) -> IntoFut + Send, { - fn run(&mut self, event: E, ctx: Context) -> Result { + type Future = Fut; + type IntoFuture = IntoFut; + + fn run(&mut self, event: E, ctx: Context) -> IntoFut { (*self)(event, ctx) } } @@ -36,12 +55,16 @@ where /// /// # Panics /// The function panics if the Lambda environment variables are not set. -pub fn start(f: impl Handler, runtime: Option) +pub fn start(f: impl Handler + 'static, runtime: Option) where - E: serde::de::DeserializeOwned, - O: serde::Serialize, + E: serde::de::DeserializeOwned + Send + 'static, + O: serde::Serialize + Send + 'static, { - start_with_config(f, &EnvConfigProvider::new(), runtime) + let mut runtime = runtime.unwrap_or_else(|| TokioRuntime::new().expect("Failed to start tokio runtime")); + let task_executor = runtime.executor(); + runtime + .block_on(start_with_config(f, &EnvConfigProvider::new(), task_executor)) + .unwrap(); } #[macro_export] @@ -73,10 +96,14 @@ macro_rules! lambda { /// The function panics if the `ConfigProvider` returns an error from the `get_runtime_api_endpoint()` /// or `get_function_settings()` methods. The panic forces AWS Lambda to terminate the environment /// and spin up a new one for the next invocation. -pub(crate) fn start_with_config(f: impl Handler, config: &C, runtime: Option) +pub(crate) fn start_with_config( + f: impl Handler, + config: &C, + task_executor: TaskExecutor, +) -> impl Future + Send where - E: serde::de::DeserializeOwned, - O: serde::Serialize, + E: serde::de::DeserializeOwned + Send + 'static, + O: serde::Serialize + Send, C: ConfigProvider, { // if we cannot find the endpoint we panic, nothing else we can do. @@ -99,14 +126,7 @@ where } } - match RuntimeClient::new(endpoint, runtime) { - Ok(client) => { - start_with_runtime_client(f, function_config, client); - } - Err(e) => { - panic!("Could not create runtime client SDK: {}", e); - } - } + start_with_runtime_client(f, function_config, RuntimeClient::new(endpoint, task_executor)) } /// Starts the rust runtime with the given Runtime API client. @@ -124,11 +144,12 @@ pub(crate) fn start_with_runtime_client( f: impl Handler, func_settings: FunctionSettings, client: RuntimeClient, -) where - E: serde::de::DeserializeOwned, - O: serde::Serialize, +) -> impl Future + Send +where + E: serde::de::DeserializeOwned + Send + 'static, + O: serde::Serialize + Send, { - let mut lambda_runtime: Runtime<_, E, O>; + let lambda_runtime: Runtime<_, E, O>; match Runtime::new(f, func_settings, MAX_RETRIES, client) { Ok(r) => lambda_runtime = r, Err(e) => { @@ -137,14 +158,14 @@ pub(crate) fn start_with_runtime_client( } // start the infinite loop - lambda_runtime.start(); + lambda_runtime.start() } /// Internal representation of the runtime object that polls for events and communicates /// with the Runtime APIs pub(super) struct Runtime { runtime_client: RuntimeClient, - handler: F, + handler: Arc>, max_retries: i8, settings: FunctionSettings, _phan: PhantomData<(E, O)>, @@ -179,7 +200,7 @@ impl Runtime { Ok(Runtime { runtime_client: client, settings: config, - handler: f, + handler: Arc::new(Mutex::new(f)), max_retries: retries, _phan: PhantomData, }) @@ -191,134 +212,169 @@ impl Runtime { impl Runtime where F: Handler, - E: serde::de::DeserializeOwned, - O: serde::Serialize, + E: serde::de::DeserializeOwned + Send + 'static, + O: serde::Serialize + Send, { /// Starts the main event loop and begin polling or new events. If one of the /// Runtime APIs returns an unrecoverable error this method calls the init failed /// API and then panics. - fn start(&mut self) { + fn start(&self) -> impl Future + Send { debug!("Beginning main event loop"); - loop { - let (event, ctx) = self.get_next_event(0, None); - let request_id = ctx.aws_request_id.clone(); - info!("Received new event with AWS request id: {}", request_id); - let function_outcome = self.invoke(event, ctx); - match function_outcome { - Ok(response) => { - debug!( - "Function executed succesfully for {}, pushing response to Runtime API", - request_id - ); - match serde_json::to_vec(&response) { - Ok(response_bytes) => { - match self.runtime_client.event_response(&request_id, response_bytes) { - Ok(_) => info!("Response for {} accepted by Runtime API", request_id), - // unrecoverable error while trying to communicate with the endpoint. - // we let the Lambda Runtime API know that we have died + + let max_retries = self.max_retries; + let runtime_client = self.runtime_client.clone(); + let settings = self.settings.clone(); + let handler = self.handler.clone(); + + loop_fn((), move |()| { + let runtime_client = runtime_client.clone(); + let handler = handler.clone(); + Self::get_next_event(max_retries, runtime_client.clone(), settings.clone()).and_then(move |(event, ctx)| { + let runtime_client = runtime_client.clone(); + let request_id = ctx.aws_request_id.clone(); + info!("Received new event with AWS request id: {}", request_id); + let handler = handler.clone(); + let mut handler_function = handler.lock().unwrap(); + handler_function.run(event, ctx).into_future().then(|function_outcome| { + match function_outcome { + Ok(response) => { + debug!( + "Function executed succesfully for {}, pushing response to Runtime API", + request_id + ); + match serde_json::to_vec(&response) { + Ok(response_bytes) => { + Box::new(runtime_client.event_response(request_id.clone(), response_bytes).then(move |result| match result { + Ok(_) => { + info!("Response for {} accepted by Runtime API", request_id); + Ok(()) + }, + // unrecoverable error while trying to communicate with the endpoint. + // we let the Lambda Runtime API know that we have died + Err(e) => { + error!("Could not send response for {} to Runtime API: {}", request_id, e); + if !e.recoverable { + error!( + "Error for {} is not recoverable, sending fail_init signal and panicking.", + request_id + ); + runtime_client.fail_init(&e); + panic!("Could not send response"); + } + Ok(()) + } + }).map(|()| Loop::Continue(()))) as Box + Send> + } + Err(e) => { + error!( + "Could not marshal output object to Vec JSON represnetation for request {}: {}", + request_id, e + ); + runtime_client + .fail_init(&RuntimeError::unrecoverable(e.description())); + Box::new(Err("Failed to marshal handler output, panic".to_owned()).into_future()) as Box + Send> + } + } + } + Err(e) => { + debug!("Handler returned an error for {}: {}", request_id, e); + debug!("Attempting to send error response to Runtime API for {}", request_id); + Box::new(runtime_client.event_error(request_id.clone(), &e).then(move |result| match result { + Ok(_) => { + info!("Error response for {} accepted by Runtime API", request_id); + Ok(()) + }, Err(e) => { - error!("Could not send response for {} to Runtime API: {}", request_id, e); + error!("Unable to send error response for {} to Runtime API: {}", request_id, e); if !e.recoverable { error!( - "Error for {} is not recoverable, sending fail_init signal and panicking.", + "Error for {} is not recoverable, sending fail_init signal and panicking", request_id ); - self.runtime_client.fail_init(&e); - panic!("Could not send response"); + runtime_client.fail_init(&e); + panic!("Could not send error response"); } + Ok(()) } - } - } - Err(e) => { - error!( - "Could not marshal output object to Vec JSON represnetation for request {}: {}", - request_id, e - ); - self.runtime_client - .fail_init(&RuntimeError::unrecoverable(e.description())); - panic!("Failed to marshal handler output, panic"); + }).map(|()| Loop::Continue(()))) as Box + Send> } } - } - Err(e) => { - debug!("Handler returned an error for {}: {}", request_id, e); - debug!("Attempting to send error response to Runtime API for {}", request_id); - match self.runtime_client.event_error(&request_id, &e) { - Ok(_) => info!("Error response for {} accepted by Runtime API", request_id), - Err(e) => { - error!("Unable to send error response for {} to Runtime API: {}", request_id, e); - if !e.recoverable { - error!( - "Error for {} is not recoverable, sending fail_init signal and panicking", - request_id - ); - self.runtime_client.fail_init(&e); - panic!("Could not send error response"); - } - } - } - } - } - } + }) + }) + }) } /// Invoke the handler function. This method is split out of the main loop to /// make it testable. - pub(super) fn invoke(&mut self, e: E, ctx: Context) -> Result { - (&mut self.handler).run(e, ctx) + #[cfg(test)] + pub(super) fn invoke(&mut self, e: E, ctx: Context) -> F::IntoFuture { + let mut handler = self.handler.lock().unwrap(); + (&mut handler).run(e, ctx) } /// Attempts to get the next event from the Runtime APIs and keeps retrying /// unless the error throws is not recoverable. /// /// # Return - /// The next `Event` object to be processed. - pub(super) fn get_next_event(&self, retries: i8, e: Option) -> (E, Context) { - if let Some(err) = e { - if retries > self.max_retries { - error!("Unrecoverable error while fetching next event: {}", err); - match err.request_id.clone() { - Some(req_id) => { - self.runtime_client - .event_error(&req_id, &err) - .expect("Could not send event error response"); - } - None => { - self.runtime_client.fail_init(&err); + /// A `Future` resolving to the next `Event` object to be processed. + pub(super) fn get_next_event( + max_retries: i8, + runtime_client: RuntimeClient, + settings: FunctionSettings, + ) -> impl Future { + loop_fn( + (0, None), + move |(iteration, maybe_error): (i8, Option)| { + if let Some(err) = maybe_error { + if iteration > max_retries { + error!("Unrecoverable error while fetching next event: {}", err); + match err.request_id.clone() { + Some(req_id) => { + return Box::new( + runtime_client + .event_error(req_id, &err) + .map_err(|e| format!("Could not send event error response: {}", e)) + // these errors are not recoverable. Either we can't communicate with the runtime APIs + // or we cannot parse the event. panic to restart the environment. + .then(|_| Err("Could not retrieve next event".to_owned())), + ) as Box + Send>; + } + None => { + runtime_client.fail_init(&err); + unreachable!(); + } + } } } - // these errors are not recoverable. Either we can't communicate with the runtie APIs - // or we cannot parse the event. panic to restart the environment. - panic!("Could not retrieve next event"); - } - } - - match self.runtime_client.next_event() { - Ok((ev_data, invocation_ctx)) => { - let parse_result = serde_json::from_slice(&ev_data); - match parse_result { - Ok(ev) => { - let mut handler_ctx = Context::new(self.settings.clone()); - handler_ctx.invoked_function_arn = invocation_ctx.invoked_function_arn; - handler_ctx.aws_request_id = invocation_ctx.aws_request_id; - handler_ctx.xray_trace_id = invocation_ctx.xray_trace_id; - handler_ctx.client_context = invocation_ctx.client_context; - handler_ctx.identity = invocation_ctx.identity; - handler_ctx.deadline = invocation_ctx.deadline; + let settings = settings.clone(); + Box::new(runtime_client.next_event().then(move |result| match result { + Ok((ev_data, invocation_ctx)) => { + let parse_result = serde_json::from_slice(&ev_data); + match parse_result { + Ok(ev) => { + let mut handler_ctx = Context::new(settings.clone()); + handler_ctx.invoked_function_arn = invocation_ctx.invoked_function_arn; + handler_ctx.aws_request_id = invocation_ctx.aws_request_id; + handler_ctx.xray_trace_id = invocation_ctx.xray_trace_id; + handler_ctx.client_context = invocation_ctx.client_context; + handler_ctx.identity = invocation_ctx.identity; + handler_ctx.deadline = invocation_ctx.deadline; - (ev, handler_ctx) - } - Err(e) => { - error!("Could not parse event to type: {}", e); - let mut runtime_err = RuntimeError::from(e); - runtime_err.request_id = Option::from(invocation_ctx.aws_request_id); - self.get_next_event(retries + 1, Option::from(runtime_err)) + Ok(Loop::Break((ev, handler_ctx))) + } + Err(e) => { + error!("Could not parse event to type: {}", e); + let mut runtime_err = RuntimeError::from(e); + runtime_err.request_id = Some(invocation_ctx.aws_request_id); + Ok(Loop::Continue((iteration + 1, Some(runtime_err)))) + } + } } - } - } - Err(e) => self.get_next_event(retries + 1, Option::from(RuntimeError::from(e))), - } + Err(e) => Ok(Loop::Continue((iteration + 1, Some(RuntimeError::from(e))))), + })) as Box + Send> + }, + ) } } @@ -331,13 +387,13 @@ pub(crate) mod tests { #[test] fn runtime_invokes_handler() { let config: &dyn env::ConfigProvider = &env::tests::MockConfigProvider { error: false }; + let runtime = TokioRuntime::new().expect("Could not create tokio runtime"); let client = RuntimeClient::new( config .get_runtime_api_endpoint() .expect("Could not get runtime endpoint"), - None, - ) - .expect("Could not initialize client"); + runtime.executor(), + ); let handler = |_e: String, _c: context::Context| -> Result { Ok("hello".to_string()) }; let retries: i8 = 3; let runtime = Runtime::new(