|
| 1 | +use lambda_extension::Extension; |
| 2 | +use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; |
| 3 | +use serde::{Deserialize, Serialize}; |
| 4 | +use tokio::signal::unix::{signal, SignalKind}; |
| 5 | + |
| 6 | +/// This is also a made-up example. Requests come into the runtime as unicode |
| 7 | +/// strings in json format, which can map to any structure that implements `serde::Deserialize` |
| 8 | +/// The runtime pays no attention to the contents of the request payload. |
| 9 | +#[derive(Deserialize)] |
| 10 | +struct Request { |
| 11 | + command: String, |
| 12 | +} |
| 13 | + |
| 14 | +/// This is a made-up example of what a response structure may look like. |
| 15 | +/// There is no restriction on what it can be. The runtime requires responses |
| 16 | +/// to be serialized into json. The runtime pays no attention |
| 17 | +/// to the contents of the response payload. |
| 18 | +#[derive(Serialize)] |
| 19 | +struct Response { |
| 20 | + req_id: String, |
| 21 | + msg: String, |
| 22 | +} |
| 23 | + |
| 24 | +pub(crate) async fn handler(event: LambdaEvent<Request>) -> Result<Response, Error> { |
| 25 | + // extract some useful info from the request |
| 26 | + let command = event.payload.command; |
| 27 | + |
| 28 | + tracing::info!("executing {command:#}"); |
| 29 | + |
| 30 | + // prepare the response |
| 31 | + let resp = Response { |
| 32 | + req_id: event.context.request_id, |
| 33 | + msg: format!("Command {} executed.", command), |
| 34 | + }; |
| 35 | + |
| 36 | + // return `Response` (it will be serialized to JSON automatically by the runtime) |
| 37 | + Ok(resp) |
| 38 | +} |
| 39 | + |
| 40 | +#[tokio::main] |
| 41 | +async fn main() -> Result<(), Error> { |
| 42 | + // Let's pretend this Lambda function is: |
| 43 | + // - sensitive to the tokio runtime being blocked due to heavy async concurrency in its handler |
| 44 | + // - stays fairly warm |
| 45 | + // - is somewhat long-running |
| 46 | + // In that case, we might prefer to avoid brief runtime blocks from flushing to stdout in the local context. |
| 47 | + // ref: https://github.com/tokio-rs/tracing/issues/2653 |
| 48 | + // |
| 49 | + // So, we can use a dedicated logging thread with tracing_appender. The downside is that we need |
| 50 | + // to flush any outstanding entries before shutdown - hence the need for graceful shutdown. |
| 51 | + let (writer, log_guard) = tracing_appender::non_blocking(std::io::stdout()); |
| 52 | + tracing::init_default_subscriber_with_writer(writer); |
| 53 | + |
| 54 | + // You need an extension registered with the Lambda orchestrator in order for your process |
| 55 | + // to receive a SIGTERM for graceful shutdown. |
| 56 | + // |
| 57 | + // We accomplish this here by registering a no-op internal extension, which does not subscribe to any events. |
| 58 | + // |
| 59 | + // You could also run a useful internal extension, such as in: |
| 60 | + // https://github.com/awslabs/aws-lambda-rust-runtime/blob/main/examples/extension-internal-flush |
| 61 | + // |
| 62 | + // If an external extension (for example CloudWatch Lambda Insights) is running, it would register itself from outside |
| 63 | + // your Rust process and also get you SIGTERM. In that case, you don't need an internal extension at all. |
| 64 | + let extension = Extension::new() |
| 65 | + // Don't subscribe to any event types |
| 66 | + .with_events(&[]) |
| 67 | + // Internal extension names MUST be unique within a given Lambda function. |
| 68 | + .with_extension_name("no-op") |
| 69 | + // Extensions MUST be registered before calling lambda_runtime::run(), which ends the Init |
| 70 | + // phase and begins the Invoke phase. |
| 71 | + .register() |
| 72 | + .await |
| 73 | + .expect("could not register extension"); |
| 74 | + |
| 75 | + // Handle SIGTERM signal. |
| 76 | + // |
| 77 | + // In this graceful shutdown, we flush our background logging thread by dropping its guard. |
| 78 | + // |
| 79 | + // More on signal handling: |
| 80 | + // https://tokio.rs/tokio/topics/shutdown |
| 81 | + // https://rust-cli.github.io/book/in-depth/signals.html |
| 82 | + tokio::spawn(async move { |
| 83 | + let mut sigint = signal(SignalKind::interrupt()).unwrap(); |
| 84 | + let mut sigterm = signal(SignalKind::terminate()).unwrap(); |
| 85 | + tokio::select! { |
| 86 | + _sigint = sigint.recv() => { |
| 87 | + tracing::info!("[runtime] SIGINT received"); |
| 88 | + tracing::info!("[runtime] Graceful shutdown in progress ..."); |
| 89 | + std::mem::drop(log_guard); |
| 90 | + |
| 91 | + eprintln!("[runtime] Graceful shutdown completed"); |
| 92 | + std::process::exit(0); |
| 93 | + }, |
| 94 | + _sigterm = sigterm.recv()=> { |
| 95 | + tracing::info!("[runtime] SIGTERM received"); |
| 96 | + tracing::info!("[runtime] Graceful shutdown in progress ..."); |
| 97 | + std::mem::drop(log_guard); |
| 98 | + |
| 99 | + eprintln!("[runtime] Graceful shutdown completed"); |
| 100 | + std::process::exit(0); |
| 101 | + }, |
| 102 | + } |
| 103 | + }); |
| 104 | + |
| 105 | + // TODO: add biased! to always poll the handler future first, once supported: |
| 106 | + // https://github.com/tokio-rs/tokio/issues/7304 |
| 107 | + tokio::try_join!(lambda_runtime::run(service_fn(handler)), extension.run(),)?; |
| 108 | + |
| 109 | + Ok(()) |
| 110 | +} |
0 commit comments