Skip to content

Commit edbccba

Browse files
committed
Fix request id span
1 parent 20ed82d commit edbccba

File tree

7 files changed

+40
-75
lines changed

7 files changed

+40
-75
lines changed

distribution/lambda/cdk/cli.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def from_lambda_response(lambda_resp: dict) -> "LambdaResult":
7272
log_tail=base64.b64decode(lambda_resp["LogResult"]).decode(),
7373
payload=payload,
7474
raw_size_bytes=len(payload),
75-
status_code=200,
75+
status_code=0,
7676
)
7777

7878
@staticmethod
@@ -119,8 +119,6 @@ def _format_lambda_output(
119119
if lambda_result.function_error != "":
120120
print("\n## FUNCTION ERROR:")
121121
print(lambda_result.function_error)
122-
print("\n## LOG TAIL:")
123-
print(lambda_result.log_tail)
124122
print("\n## RAW RESPONSE SIZE (BYTES):")
125123
if len(lambda_result.payload) == 0:
126124
ratio = "empty payload"
@@ -213,12 +211,14 @@ def _invoke_searcher(
213211
LogType="Tail",
214212
Payload=json.dumps(
215213
{
216-
"path": f"/api/v1/{index_id}/search",
217214
"resource": f"/api/v1/{index_id}/search",
215+
"path": f"/api/v1/{index_id}/search",
218216
"httpMethod": "POST",
219217
"headers": {
220218
"Content-Type": "application/json",
221-
"Content-Length": f"{len(payload)}",
219+
},
220+
"requestContext": {
221+
"httpMethod": "POST",
222222
},
223223
"body": payload,
224224
"isBase64Encoded": False,
@@ -268,7 +268,6 @@ def get_logs(
268268
last_event_id = event["eventId"]
269269
yield event["message"]
270270
if event["message"].startswith("REPORT"):
271-
print(event["message"])
272271
lower_time_bound = int(event["timestamp"])
273272
last_event_id = "REPORT"
274273
break
@@ -296,6 +295,7 @@ def download_logs_to_file(request_id: str, function_name: str, invoke_start: flo
296295
int(invoke_start * 1000),
297296
):
298297
f.write(log)
298+
print(f"Logs written to lambda.{request_id}.log")
299299
except Exception as e:
300300
print(f"Failed to download logs: {e}")
301301

quickwit/Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-lambda/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ rand = { workspace = true }
3939
reqwest = { workspace = true }
4040
serde = { workspace = true }
4141
serde_json = { workspace = true }
42-
thiserror = { workspace = true }
4342
time = { workspace = true }
4443
tokio = { workspace = true }
4544
tracing = { workspace = true }

quickwit/quickwit-lambda/src/bin/searcher.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

2020
use quickwit_lambda::logger;
21-
use quickwit_lambda::searcher::{searcher_api, warp_lambda};
21+
use quickwit_lambda::searcher::{setup_searcher_api, warp_lambda};
2222

2323
#[tokio::main]
2424
async fn main() -> anyhow::Result<()> {
2525
logger::setup_lambda_tracer(tracing::Level::INFO)?;
26-
let routes = searcher_api().await?;
26+
let routes = setup_searcher_api().await?;
2727
let warp_service = warp::service(routes);
2828
warp_lambda::run(warp_service)
2929
.await

quickwit/quickwit-lambda/src/searcher/api.rs

+26-29
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,15 @@ use quickwit_search::{
3232
use quickwit_serve::lambda_search_api::*;
3333
use quickwit_storage::StorageResolver;
3434
use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent};
35-
use tracing::{error, info, info_span};
35+
use tracing::{error, info};
3636
use warp::filters::path::FullPath;
3737
use warp::reject::Rejection;
3838
use warp::Filter;
3939

40-
use super::LAMBDA_REQUEST_ID_HEADER;
4140
use crate::searcher::environment::CONFIGURATION_TEMPLATE;
4241
use crate::utils::load_node_config;
4342

44-
async fn get_search_service(
43+
async fn create_local_search_service(
4544
searcher_config: SearcherConfig,
4645
metastore: MetastoreServiceClient,
4746
storage_resolver: StorageResolver,
@@ -88,18 +87,31 @@ fn es_compat_api(
8887
.or(es_compat_cat_indices_handler(metastore.clone()))
8988
}
9089

91-
pub async fn searcher_api(
90+
fn v1_searcher_api(
91+
search_service: Arc<dyn SearchService>,
92+
metastore: MetastoreServiceClient,
93+
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
94+
warp::path!("api" / "v1" / ..)
95+
.and(native_api(search_service.clone()).or(es_compat_api(search_service, metastore)))
96+
.with(warp::filters::compression::gzip())
97+
.recover(|rejection| {
98+
error!(?rejection, "request rejected");
99+
recover_fn(rejection)
100+
})
101+
}
102+
103+
pub async fn setup_searcher_api(
92104
) -> anyhow::Result<impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone> {
93105
let (node_config, storage_resolver, metastore) =
94106
load_node_config(CONFIGURATION_TEMPLATE).await?;
95107

96-
let services: HashSet<String> =
97-
HashSet::from_iter([QuickwitService::Searcher.as_str().to_string()]);
98-
let telemetry_info =
99-
QuickwitTelemetryInfo::new(services, HashSet::from_iter([QuickwitFeature::AwsLambda]));
108+
let telemetry_info = QuickwitTelemetryInfo::new(
109+
HashSet::from_iter([QuickwitService::Searcher.as_str().to_string()]),
110+
HashSet::from_iter([QuickwitFeature::AwsLambda]),
111+
);
100112
let _telemetry_handle_opt = quickwit_telemetry::start_telemetry_loop(telemetry_info);
101113

102-
let search_service = get_search_service(
114+
let search_service = create_local_search_service(
103115
node_config.searcher_config,
104116
metastore.clone(),
105117
storage_resolver,
@@ -122,26 +134,11 @@ pub async fn searcher_api(
122134
let after_hook = warp::log::custom(|info| {
123135
info!(status = info.status().as_str(), "request completed");
124136
});
125-
let api = warp::any().and(before_hook).and(
126-
warp::path!("api" / "v1" / ..)
127-
.and(native_api(search_service.clone()).or(es_compat_api(search_service, metastore)))
128-
.with(warp::filters::compression::gzip())
129-
.recover(|rejection| {
130-
error!(?rejection, "request rejected");
131-
recover_fn(rejection)
132-
})
133-
.with(after_hook)
134-
.with(warp::trace(|info| {
135-
info_span!(
136-
"request",
137-
request_id = info
138-
.request_headers()
139-
.get(LAMBDA_REQUEST_ID_HEADER)
140-
.and_then(|v| v.to_str().ok())
141-
.unwrap_or(&""),
142-
)
143-
})),
144-
);
137+
138+
let api = warp::any()
139+
.and(before_hook)
140+
.and(v1_searcher_api(search_service, metastore))
141+
.with(after_hook);
145142

146143
Ok(api)
147144
}

quickwit/quickwit-lambda/src/searcher/mod.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,4 @@ mod api;
2121
mod environment;
2222
pub mod warp_lambda;
2323

24-
pub use api::searcher_api;
25-
26-
const LAMBDA_REQUEST_ID_HEADER: &str = "lambda-request-id";
24+
pub use api::setup_searcher_api;

quickwit/quickwit-lambda/src/searcher/warp_lambda.rs

+5-33
Original file line numberDiff line numberDiff line change
@@ -31,31 +31,19 @@ use anyhow::anyhow;
3131
use http::header::Entry;
3232
use lambda_http::http::response::Parts;
3333
use lambda_http::http::HeaderValue;
34-
use lambda_http::request::RequestContext;
3534
use lambda_http::{
3635
lambda_runtime, Adapter, Body as LambdaBody, Error as LambdaError, Request, RequestExt,
3736
Response, Service,
3837
};
3938
use mime_guess::{mime, Mime};
4039
use once_cell::sync::Lazy;
40+
use tracing::{info_span, Instrument};
4141
use warp::hyper::Body as WarpBody;
4242
pub use {lambda_http, warp};
4343

44-
use super::LAMBDA_REQUEST_ID_HEADER;
45-
4644
pub type WarpRequest = warp::http::Request<warp::hyper::Body>;
4745
pub type WarpResponse = warp::http::Response<warp::hyper::Body>;
4846

49-
#[derive(thiserror::Error, Debug)]
50-
pub enum WarpAdapterError {
51-
#[error("This may never occur, it's infallible!!")]
52-
Infallible(#[from] std::convert::Infallible),
53-
#[error("Warp error: `{0:#?}`")]
54-
HyperError(#[from] warp::hyper::Error),
55-
#[error("Unexpected error: `{0:#?}`")]
56-
Unexpected(#[from] LambdaError),
57-
}
58-
5947
pub async fn run<'a, S>(service: S) -> Result<(), LambdaError>
6048
where
6149
S: Service<WarpRequest, Response = WarpResponse, Error = Infallible> + Send + 'a,
@@ -141,18 +129,7 @@ where
141129

142130
fn call(&mut self, req: Request) -> Self::Future {
143131
let query_params = req.query_string_parameters();
144-
let request_id_opt = match req.request_context() {
145-
RequestContext::ApiGatewayV2(ctx) => ctx.request_id.clone(),
146-
RequestContext::ApiGatewayV1(ctx) => ctx.request_id.clone(),
147-
RequestContext::Alb(_) => None,
148-
RequestContext::WebSocket(ctx) => ctx.request_id.clone(),
149-
};
150-
let request_id_header_opt = match request_id_opt.as_ref().map(|a| HeaderValue::from_str(a))
151-
{
152-
Some(Ok(rid)) => Some(rid),
153-
Some(Err(_)) => None,
154-
None => None,
155-
};
132+
let request_id = req.lambda_context().request_id.clone();
156133
let (mut parts, body) = req.into_parts();
157134
let (content_len, body) = match body {
158135
LambdaBody::Empty => (0, WarpBody::empty()),
@@ -175,9 +152,6 @@ where
175152
if let Entry::Vacant(v) = parts.headers.entry("Content-Length") {
176153
v.insert(content_len.into());
177154
}
178-
if let Some(rid) = &request_id_header_opt {
179-
parts.headers.insert(LAMBDA_REQUEST_ID_HEADER, rid.clone());
180-
}
181155

182156
parts.uri = warp::hyper::Uri::from_str(uri.as_str())
183157
.map_err(|e| e)
@@ -190,14 +164,12 @@ where
190164
// Create lambda future
191165
let fut = async move {
192166
let warp_response = warp_fut.await?;
193-
let (mut parts, res_body): (_, _) = warp_response.into_parts();
167+
let (parts, res_body): (_, _) = warp_response.into_parts();
194168
let body = warp_body_as_lambda_body(res_body, &parts).await?;
195-
if let Some(rid) = request_id_header_opt {
196-
parts.headers.insert(LAMBDA_REQUEST_ID_HEADER, rid);
197-
}
198169
let lambda_response = Response::from_parts(parts, body);
199170
Ok::<Self::Response, Self::Error>(lambda_response)
200-
};
171+
}
172+
.instrument(info_span!("searcher request", request_id));
201173
Box::pin(fut)
202174
}
203175
}

0 commit comments

Comments
 (0)