-
Notifications
You must be signed in to change notification settings - Fork 359
Handlers can be async #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2f7eb74
a5072a0
4301ffd
5adedef
695333c
e96a013
c65d0e5
2f6e599
3aa5a85
f2ae75f
0f253c6
5d1de6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,14 +70,14 @@ use crate::{request::LambdaRequest, response::LambdaResponse}; | |
pub type Request = http::Request<Body>; | ||
|
||
/// Functions serving as ALB and API Gateway handlers must conform to this type. | ||
pub trait Handler<R> { | ||
pub trait Handler<R>: Send { | ||
/// Run the handler. | ||
fn run(&mut self, event: Request, ctx: Context) -> Result<R, HandlerError>; | ||
} | ||
|
||
impl<F, R> Handler<R> for F | ||
where | ||
F: FnMut(Request, Context) -> Result<R, HandlerError>, | ||
F: FnMut(Request, Context) -> Result<R, HandlerError> + Send, | ||
{ | ||
fn run(&mut self, event: Request, ctx: Context) -> Result<R, HandlerError> { | ||
(*self)(event, ctx) | ||
|
@@ -92,14 +92,14 @@ where | |
/// | ||
/// # Panics | ||
/// The function panics if the Lambda environment variables are not set. | ||
pub fn start<R>(f: impl Handler<R>, runtime: Option<TokioRuntime>) | ||
pub fn start<R>(f: impl Handler<R> + 'static, runtime: Option<TokioRuntime>) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Two things:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll hold off on looking at this until #63 merges, if that's ok |
||
where | ||
R: IntoResponse, | ||
{ | ||
// handler requires a mutable ref | ||
let mut func = f; | ||
lambda::start( | ||
|req: LambdaRequest<'_>, ctx: Context| { | ||
move |req: LambdaRequest<'_>, ctx: Context| { | ||
let is_alb = req.request_context.is_alb(); | ||
func.run(req.into(), ctx) | ||
.map(|resp| LambdaResponse::from_response(is_alb, resp.into_response())) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,7 @@ use hyper::{ | |
}; | ||
use serde_derive::Deserialize; | ||
use serde_json; | ||
use tokio::runtime::Runtime; | ||
use tokio::{prelude::future::IntoFuture, runtime::TaskExecutor}; | ||
|
||
use crate::error::{ApiError, ErrorResponse, RuntimeApiError}; | ||
|
||
|
@@ -120,49 +120,44 @@ pub struct EventContext { | |
} | ||
|
||
/// Used by the Runtime to communicate with the internal endpoint. | ||
#[derive(Clone)] | ||
pub struct RuntimeClient { | ||
_runtime: Runtime, | ||
http_client: Client<HttpConnector, Body>, | ||
endpoint: String, | ||
} | ||
|
||
impl RuntimeClient { | ||
/// Creates a new instance of the Runtime APIclient SDK. The http client has timeouts disabled and | ||
/// will always send a `Connection: keep-alive` header. | ||
pub fn new(endpoint: String, runtime: Option<Runtime>) -> Result<Self, ApiError> { | ||
pub fn new(endpoint: String, task_executor: TaskExecutor) -> Self { | ||
debug!("Starting new HttpRuntimeClient for {}", endpoint); | ||
// start a tokio core main event loop for hyper | ||
let runtime = match runtime { | ||
Some(r) => r, | ||
None => Runtime::new()?, | ||
}; | ||
|
||
let http_client = Client::builder().executor(runtime.executor()).build_http(); | ||
let http_client = Client::builder().executor(task_executor).build_http(); | ||
|
||
Ok(RuntimeClient { | ||
_runtime: runtime, | ||
http_client, | ||
endpoint, | ||
}) | ||
RuntimeClient { http_client, endpoint } | ||
} | ||
} | ||
|
||
impl RuntimeClient { | ||
/// Polls for new events to the Runtime APIs. | ||
pub fn next_event(&self) -> Result<(Vec<u8>, EventContext), ApiError> { | ||
pub fn next_event(&self) -> impl Future<Item = (Vec<u8>, EventContext), Error = ApiError> { | ||
let uri = format!( | ||
"http://{}/{}/runtime/invocation/next", | ||
self.endpoint, RUNTIME_API_VERSION | ||
) | ||
.parse()?; | ||
.parse(); | ||
trace!("Polling for next event"); | ||
|
||
// We wait instead of processing the future asynchronously because AWS Lambda | ||
// itself enforces only one event per container at a time. No point in taking on | ||
// the additional complexity. | ||
let out = self.http_client.get(uri).wait(); | ||
match out { | ||
Ok(resp) => { | ||
let http_client = self.http_client.clone(); | ||
uri.into_future() | ||
.map_err(ApiError::from) | ||
.and_then(move |uri| { | ||
http_client.get(uri).map_err(|e| { | ||
error!("Error when fetching next event from Runtime API: {}", e); | ||
ApiError::from(e) | ||
}) | ||
}) | ||
.and_then(|resp| { | ||
if resp.status().is_client_error() { | ||
error!( | ||
"Runtime API returned client error when polling for new events: {}", | ||
|
@@ -182,22 +177,23 @@ impl RuntimeClient { | |
.unrecoverable() | ||
.clone()); | ||
} | ||
let ctx = self.get_event_context(&resp.headers())?; | ||
let out = resp.into_body().concat2().wait()?; | ||
let buf: Vec<u8> = out.into_bytes().to_vec(); | ||
return Ok((Self::get_event_context(&resp.headers())?, resp)); | ||
}) | ||
.and_then(|(ctx, resp)| { | ||
Ok(ctx) | ||
.into_future() | ||
.join(resp.into_body().concat2().map_err(Into::into)) | ||
}) | ||
.map(|(ctx, body)| { | ||
let buf = body.into_bytes().to_vec(); | ||
|
||
trace!( | ||
"Received new event for request id {}. Event length {} bytes", | ||
ctx.aws_request_id, | ||
buf.len() | ||
); | ||
Ok((buf, ctx)) | ||
} | ||
Err(e) => { | ||
error!("Error when fetching next event from Runtime API: {}", e); | ||
Err(ApiError::from(e)) | ||
} | ||
} | ||
(buf, ctx) | ||
}) | ||
} | ||
|
||
/// Calls the Lambda Runtime APIs to submit a response to an event. In this function we treat | ||
|
@@ -211,41 +207,44 @@ impl RuntimeClient { | |
/// * `output` The object be sent back to the Runtime APIs as a response. | ||
/// | ||
/// # Returns | ||
/// A `Result` object containing a bool return value for the call or an `error::ApiError` instance. | ||
pub fn event_response(&self, request_id: &str, output: Vec<u8>) -> Result<(), ApiError> { | ||
let uri: Uri = format!( | ||
/// A `Future` object containing a either resolving () for success or an `error::ApiError` instance. | ||
pub fn event_response(&self, request_id: String, output: Vec<u8>) -> impl Future<Item = (), Error = ApiError> { | ||
let uri = format!( | ||
"http://{}/{}/runtime/invocation/{}/response", | ||
self.endpoint, RUNTIME_API_VERSION, request_id | ||
) | ||
.parse()?; | ||
.parse(); | ||
trace!( | ||
"Posting response for request {} to Runtime API. Response length {} bytes", | ||
request_id, | ||
output.len() | ||
); | ||
let req = self.get_runtime_post_request(&uri, output); | ||
|
||
match self.http_client.request(req).wait() { | ||
Ok(resp) => { | ||
if !resp.status().is_success() { | ||
error!( | ||
"Error from Runtime API when posting response for request {}: {}", | ||
request_id, | ||
resp.status() | ||
); | ||
return Err(ApiError::new(&format!( | ||
"Error {} while sending response", | ||
resp.status() | ||
))); | ||
let http_client = self.http_client.clone(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now cloning a |
||
uri.into_future() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't know that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think uri here is a Result and all Result types get a free There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah; I could change this to |
||
.map_err(ApiError::from) | ||
.map(move |uri| Self::get_runtime_post_request(&uri, output)) | ||
.and_then(move |req| http_client.request(req).map_err(ApiError::from)) | ||
.then(move |result| match result { | ||
Ok(resp) => { | ||
if !resp.status().is_success() { | ||
error!( | ||
"Error from Runtime API when posting response for request {}: {}", | ||
request_id, | ||
resp.status() | ||
); | ||
return Err(ApiError::new(&format!( | ||
"Error {} while sending response", | ||
resp.status() | ||
))); | ||
} | ||
trace!("Posted response to Runtime API for request {}", request_id); | ||
Ok(()) | ||
} | ||
trace!("Posted response to Runtime API for request {}", request_id); | ||
Ok(()) | ||
} | ||
Err(e) => { | ||
error!("Error when calling runtime API for request {}: {}", request_id, e); | ||
Err(ApiError::from(e)) | ||
} | ||
} | ||
Err(e) => { | ||
error!("Error when calling runtime API for request {}: {}", request_id, e); | ||
Err(ApiError::from(e)) | ||
} | ||
}) | ||
} | ||
|
||
/// Calls Lambda's Runtime APIs to send an error generated by the `Handler`. Because it's rust, | ||
|
@@ -259,41 +258,48 @@ impl RuntimeClient { | |
/// object. | ||
/// | ||
/// # Returns | ||
/// A `Result` object containing a bool return value for the call or an `error::ApiError` instance. | ||
pub fn event_error(&self, request_id: &str, e: &dyn RuntimeApiError) -> Result<(), ApiError> { | ||
let uri: Uri = format!( | ||
/// A `Future` object containing a either resolving () for success or an `error::ApiError` instance. | ||
pub fn event_error(&self, request_id: String, e: &dyn RuntimeApiError) -> impl Future<Item = (), Error = ApiError> { | ||
let uri = format!( | ||
"http://{}/{}/runtime/invocation/{}/error", | ||
self.endpoint, RUNTIME_API_VERSION, request_id | ||
) | ||
.parse()?; | ||
trace!( | ||
"Posting error to runtime API for request {}: {}", | ||
request_id, | ||
e.to_response().error_message | ||
); | ||
let req = self.get_runtime_error_request(&uri, &e.to_response()); | ||
|
||
match self.http_client.request(req).wait() { | ||
Ok(resp) => { | ||
if !resp.status().is_success() { | ||
error!( | ||
"Error from Runtime API when posting error response for request {}: {}", | ||
request_id, | ||
resp.status() | ||
); | ||
return Err(ApiError::new(&format!( | ||
"Error {} while sending response", | ||
resp.status() | ||
))); | ||
.parse(); | ||
let http_client = self.http_client.clone(); | ||
let response = e.to_response(); | ||
let request_id2 = request_id.clone(); | ||
uri.into_future() | ||
.map_err(ApiError::from) | ||
.map(move |uri| (Self::get_runtime_error_request(&uri, &response), response)) | ||
.and_then(move |(req, error_response)| { | ||
trace!( | ||
"Posting error to runtime API for request {}: {}", | ||
request_id, | ||
error_response.error_message | ||
); | ||
http_client.request(req).map_err(ApiError::from) | ||
}) | ||
.then(move |result| match result { | ||
Ok(resp) => { | ||
if !resp.status().is_success() { | ||
error!( | ||
"Error from Runtime API when posting error response for request {}: {}", | ||
request_id2, | ||
resp.status() | ||
); | ||
return Err(ApiError::new(&format!( | ||
"Error {} while sending response", | ||
resp.status() | ||
))); | ||
} | ||
trace!("Posted error response for request id {}", request_id2); | ||
Ok(()) | ||
} | ||
trace!("Posted error response for request id {}", request_id); | ||
Ok(()) | ||
} | ||
Err(e) => { | ||
error!("Error when calling runtime API for request {}: {}", request_id, e); | ||
Err(ApiError::from(e)) | ||
} | ||
} | ||
Err(e) => { | ||
error!("Error when calling runtime API for request {}: {}", request_id2, e); | ||
Err(ApiError::from(e)) | ||
} | ||
}) | ||
} | ||
|
||
/// Calls the Runtime APIs to report a failure during the init process. | ||
|
@@ -312,7 +318,7 @@ impl RuntimeClient { | |
.parse() | ||
.expect("Could not generate Runtime URI"); | ||
error!("Calling fail_init Runtime API: {}", e.to_response().error_message); | ||
let req = self.get_runtime_error_request(&uri, &e.to_response()); | ||
let req = Self::get_runtime_error_request(&uri, &e.to_response()); | ||
|
||
self.http_client | ||
.request(req) | ||
|
@@ -344,7 +350,7 @@ impl RuntimeClient { | |
/// | ||
/// # Returns | ||
/// A Populated Hyper `Request` object. | ||
fn get_runtime_post_request(&self, uri: &Uri, body: Vec<u8>) -> Request<Body> { | ||
fn get_runtime_post_request(uri: &Uri, body: Vec<u8>) -> Request<Body> { | ||
Request::builder() | ||
.method(Method::POST) | ||
.uri(uri.clone()) | ||
|
@@ -353,7 +359,7 @@ impl RuntimeClient { | |
.unwrap() | ||
} | ||
|
||
fn get_runtime_error_request(&self, uri: &Uri, e: &ErrorResponse) -> Request<Body> { | ||
fn get_runtime_error_request(uri: &Uri, e: &ErrorResponse) -> Request<Body> { | ||
let body = serde_json::to_vec(e).expect("Could not turn error object into response JSON"); | ||
Request::builder() | ||
.method(Method::POST) | ||
|
@@ -378,7 +384,7 @@ impl RuntimeClient { | |
/// A `Result` containing the populated `EventContext` or an `ApiError` if the required headers | ||
/// were not present or the client context and cognito identity could not be parsed from the | ||
/// JSON string. | ||
fn get_event_context(&self, headers: &HeaderMap<HeaderValue>) -> Result<EventContext, ApiError> { | ||
fn get_event_context(headers: &HeaderMap<HeaderValue>) -> Result<EventContext, ApiError> { | ||
// let headers = resp.headers(); | ||
|
||
let aws_request_id = match headers.get(LambdaHeaders::RequestId.as_str()) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is
Send
not inferred on Handler?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because
Handler
is atrait
, there's no way for the type system to know that an implementation of it doesn't have some non-Send
fields. We can either add the bound here to require that implementors areSend
, or we can add a+ Send
bound in each of the places that requiresHandler
beSend
, but doing the latter would require adding the bound to several core functions, and would lead to more confusing error messages if you had a non-Send
trait.