diff --git a/src/query/service/src/servers/http/middleware.rs b/src/query/service/src/servers/http/middleware.rs index da2b6018a669..5192fd4472b6 100644 --- a/src/query/service/src/servers/http/middleware.rs +++ b/src/query/service/src/servers/http/middleware.rs @@ -61,6 +61,7 @@ use crate::sessions::SessionType; const DEDUPLICATE_LABEL: &str = "X-DATABEND-DEDUPLICATE-LABEL"; const USER_AGENT: &str = "User-Agent"; const QUERY_ID: &str = "X-DATABEND-QUERY-ID"; +const NODE_ID: &str = "X-DATABEND-NODE-ID"; const TRACE_PARENT: &str = "traceparent"; @@ -253,9 +254,6 @@ impl HTTPSessionEndpoint { let session = session_manager.register_session(session)?; - let ctx = session.create_query_context().await?; - let node_id = ctx.get_cluster().local_id.clone(); - let deduplicate_label = req .headers() .get(DEDUPLICATE_LABEL) @@ -266,24 +264,34 @@ impl HTTPSessionEndpoint { .get(USER_AGENT) .map(|id| id.to_str().unwrap().to_string()); + let expected_node_id = req + .headers() + .get(NODE_ID) + .map(|id| id.to_str().unwrap().to_string()); + let trace_parent = req .headers() .get(TRACE_PARENT) .map(|id| id.to_str().unwrap().to_string()); - let baggage = extract_baggage_from_headers(req.headers()); + let opentelemetry_baggage = extract_baggage_from_headers(req.headers()); let client_host = get_client_ip(req); - Ok(HttpQueryContext::new( + + let ctx = session.create_query_context().await?; + let node_id = ctx.get_cluster().local_id.clone(); + + Ok(HttpQueryContext { session, query_id, node_id, + expected_node_id, deduplicate_label, user_agent, trace_parent, - baggage, - req.method().to_string(), - req.uri().to_string(), + opentelemetry_baggage, + http_method: req.method().to_string(), + uri: req.uri().to_string(), client_host, - )) + }) } } diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index e888ab689801..490808aa29e4 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -225,13 +225,14 @@ async fn query_final_handler( ctx: &HttpQueryContext, Path(query_id): Path, ) -> PoemResult { + ctx.check_node_id(&query_id)?; let root = get_http_tracing_span(full_name!(), ctx, &query_id); let _t = SlowRequestLogTracker::new(ctx); - async { info!( - "{}: got /v1/query/{}/final request, this query is going to be finally completed.", - query_id, query_id + "{}: got {} request, this query is going to be finally completed.", + query_id, + make_final_uri(&query_id) ); let http_query_manager = HttpQueryManager::instance(); match http_query_manager @@ -262,13 +263,14 @@ async fn query_cancel_handler( ctx: &HttpQueryContext, Path(query_id): Path, ) -> PoemResult { + ctx.check_node_id(&query_id)?; let root = get_http_tracing_span(full_name!(), ctx, &query_id); let _t = SlowRequestLogTracker::new(ctx); - async { info!( - "{}: got /v1/query/{}/kill request, cancel the query", - query_id, query_id + "{}: got {} request, cancel the query", + query_id, + make_kill_uri(&query_id) ); let http_query_manager = HttpQueryManager::instance(); match http_query_manager @@ -292,6 +294,7 @@ async fn query_state_handler( ctx: &HttpQueryContext, Path(query_id): Path, ) -> PoemResult { + ctx.check_node_id(&query_id)?; let root = get_http_tracing_span(full_name!(), ctx, &query_id); async { @@ -317,6 +320,7 @@ async fn query_page_handler( ctx: &HttpQueryContext, Path((query_id, page_no)): Path<(String, usize)>, ) -> PoemResult { + ctx.check_node_id(&query_id)?; let root = get_http_tracing_span(full_name!(), ctx, &query_id); let _t = SlowRequestLogTracker::new(ctx); @@ -352,7 +356,8 @@ pub(crate) async fn query_handler( let _t = SlowRequestLogTracker::new(ctx); async { - info!("http query new request: {:}", mask_connection_info(&format!("{:?}", req))); + let agent = ctx.user_agent.as_ref().map(|s|(format!("(from {s})"))).unwrap_or("".to_string()); + info!("http query new request{}: {:}", agent, mask_connection_info(&format!("{:?}", req))); let http_query_manager = HttpQueryManager::instance(); let sql = req.sql.clone(); @@ -436,7 +441,7 @@ fn query_id_to_trace_id(query_id: &str) -> TraceId { } /// The HTTP query endpoints are expected to be responses within 60 seconds. -/// If it exceeds far of 60 seconds, there might be something wrong, we should +/// If it exceeds far from 60 seconds, there might be something wrong, we should /// log it. struct SlowRequestLogTracker { started_at: std::time::Instant, diff --git a/src/query/service/src/servers/http/v1/query/http_query_context.rs b/src/query/service/src/servers/http/v1/query/http_query_context.rs index f1a12748faab..056466f653b3 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_context.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_context.rs @@ -15,11 +15,14 @@ use std::collections::BTreeMap; use std::sync::Arc; +use http::StatusCode; +use log::warn; use poem::FromRequest; use poem::Request; use poem::RequestBody; -use poem::Result as PoemResult; +use time::Instant; +use crate::servers::http::v1::HttpQueryManager; use crate::sessions::Session; use crate::sessions::SessionManager; use crate::sessions::SessionType; @@ -29,6 +32,7 @@ pub struct HttpQueryContext { pub session: Arc, pub query_id: String, pub node_id: String, + pub expected_node_id: Option, pub deduplicate_label: Option, pub user_agent: Option, pub trace_parent: Option, @@ -39,32 +43,6 @@ pub struct HttpQueryContext { } impl HttpQueryContext { - pub fn new( - session: Arc, - query_id: String, - node_id: String, - deduplicate_label: Option, - user_agent: Option, - trace_parent: Option, - open_telemetry_baggage: Option>, - http_method: String, - uri: String, - client_host: Option, - ) -> Self { - HttpQueryContext { - session, - query_id, - node_id, - deduplicate_label, - user_agent, - trace_parent, - opentelemetry_baggage: open_telemetry_baggage, - http_method, - uri, - client_host, - } - } - pub fn upgrade_session(&self, session_type: SessionType) -> Result, poem::Error> { SessionManager::instance() .try_upgrade_session(self.session.clone(), session_type.clone()) @@ -101,11 +79,29 @@ impl HttpQueryContext { pub fn set_fail(&self) { self.session.txn_mgr().lock().set_fail(); } + + pub fn check_node_id(&self, query_id: &str) -> poem::Result<()> { + if let Some(expected_node_id) = self.expected_node_id.as_ref() { + if expected_node_id != &self.node_id { + let manager = HttpQueryManager::instance(); + let start_time = manager.server_info.start_time.clone(); + let uptime = (Instant::now() - manager.start_instant).as_seconds_f32(); + let msg = format!( + "route error: query {query_id} SHOULD be on server {expected_node_id}, but current server is {}, which started at {start_time}({uptime} secs ago)", + self.node_id + ); + warn!("{msg}"); + return Err(poem::Error::from_string(msg, StatusCode::NOT_FOUND)); + } + } + + Ok(()) + } } impl<'a> FromRequest<'a> for &'a HttpQueryContext { #[async_backtrace::framed] - async fn from_request(req: &'a Request, _body: &mut RequestBody) -> PoemResult { + async fn from_request(req: &'a Request, _body: &mut RequestBody) -> poem::Result { Ok(req.extensions().get::().expect( "To use the `HttpQueryContext` extractor, the `HTTPSessionMiddleware` is required", )) diff --git a/src/query/service/src/servers/http/v1/query/http_query_manager.rs b/src/query/service/src/servers/http/v1/query/http_query_manager.rs index 3afc2cec3113..296f74f67194 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_manager.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_manager.rs @@ -30,6 +30,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_storages_common_txn::TxnManagerRef; use parking_lot::Mutex; +use time::Instant; use tokio::task; use super::expiring_map::ExpiringMap; @@ -77,6 +78,7 @@ impl LimitedQueue { } pub struct HttpQueryManager { + pub(crate) start_instant: Instant, pub(crate) server_info: ServerInfo, #[allow(clippy::type_complexity)] pub(crate) queries: Arc>>, @@ -90,9 +92,10 @@ impl HttpQueryManager { #[async_backtrace::framed] pub async fn init(cfg: &InnerConfig) -> Result<()> { GlobalInstance::set(Arc::new(HttpQueryManager { + start_instant: Instant::now(), server_info: ServerInfo { id: cfg.query.node_id.clone(), - start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Nanos, false), + start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Millis, false), }, queries: Arc::new(DashMap::new()), sessions: Mutex::new(ExpiringMap::default()), diff --git a/src/query/service/tests/it/servers/http/http_query_handlers.rs b/src/query/service/tests/it/servers/http/http_query_handlers.rs index f9a54bb2a2d5..7ffd1ba305a4 100644 --- a/src/query/service/tests/it/servers/http/http_query_handlers.rs +++ b/src/query/service/tests/it/servers/http/http_query_handlers.rs @@ -31,9 +31,7 @@ use databend_query::auth::AuthMgr; use databend_query::servers::http::middleware::get_client_ip; use databend_query::servers::http::middleware::HTTPSessionEndpoint; use databend_query::servers::http::middleware::HTTPSessionMiddleware; -use databend_query::servers::http::v1::make_final_uri; use databend_query::servers::http::v1::make_page_uri; -use databend_query::servers::http::v1::make_state_uri; use databend_query::servers::http::v1::query_route; use databend_query::servers::http::v1::ExecuteStateKind; use databend_query::servers::http::v1::HttpSessionConf; @@ -276,7 +274,7 @@ async fn test_simple_sql() -> Result<()> { assert!(result.error.is_none(), "{:?}", result.error); let query_id = &result.id; - let final_uri = make_final_uri(query_id); + let final_uri = result.final_uri.clone().unwrap(); assert_eq!(result.state, ExecuteStateKind::Succeeded, "{:?}", result); assert_eq!(result.next_uri, Some(final_uri.clone()), "{:?}", result); @@ -284,7 +282,7 @@ async fn test_simple_sql() -> Result<()> { assert_eq!(result.schema.len(), 19, "{:?}", result); // get state - let uri = make_state_uri(query_id); + let uri = result.stats_uri.unwrap(); let (status, result) = get_uri_checked(&ep, &uri).await?; assert_eq!(status, StatusCode::OK, "{:?}", result); assert!(result.error.is_none(), "{:?}", result); @@ -597,7 +595,7 @@ async fn test_pagination() -> Result<()> { assert!(!result.schema.is_empty(), "{:?}", result); if page == 5 { // get state - let uri = make_state_uri(query_id); + let uri = result.stats_uri.clone().unwrap(); let (status, _state_result) = get_uri_checked(&ep, &uri).await?; assert_eq!(status, StatusCode::OK); diff --git a/tests/suites/1_stateful/09_http_handler/09_0006_route_error.result b/tests/suites/1_stateful/09_http_handler/09_0006_route_error.result new file mode 100644 index 000000000000..b6d7c05744ff --- /dev/null +++ b/tests/suites/1_stateful/09_http_handler/09_0006_route_error.result @@ -0,0 +1,15 @@ +# error +## page +{"error":{"code":"404","message":"route error: query QID SHOULD be on server XXX, but current server is NODE, which started ... ago)"}} +## kill +{"error":{"code":"404","message":"route error: query QID SHOULD be on server XXX, but current server is NODE, which started ... ago)"}} +## final +{"error":{"code":"404","message":"route error: query QID SHOULD be on server XXX, but current server is NODE, which started ... ago)"}} + +# ok +## page +[["1"]] +## kill +200 +## final +null diff --git a/tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh b/tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh new file mode 100755 index 000000000000..f55ffd5832ad --- /dev/null +++ b/tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +QID="my_query_for_route_${RANDOM}" +NODE=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H "x-databend-query-id:${QID}" -H 'Content-Type: application/json' -d '{"sql": "select 1;"}' | jq -r ".session.last_server_info.id") +echo "# error" +echo "## page" +curl -s -u root: -XGET -H "x-databend-node-id:XXX" -w "\n" "http://localhost:8000/v1/query/${QID}/page/0" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../' +echo "## kill" +curl -s -u root: -XGET -H "x-databend-node-id:XXX" -w "\n" "http://localhost:8000/v1/query/${QID}/kill" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../' +echo "## final" +curl -s -u root: -XGET -H "x-databend-node-id:XXX" -w "\n" "http://localhost:8000/v1/query/${QID}/final" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../' + +echo "" + +echo "# ok" +echo "## page" +curl -s -u root: -XGET -H "x-databend-node-id:${NODE}" -w "\n" "http://localhost:8000/v1/query/${QID}/page/0" | jq -c ".data" +echo "## kill" +curl -s -u root: -XGET -H "x-databend-node-id:${NODE}" -w "%{http_code}\n" "http://localhost:8000/v1/query/${QID}/kill" +echo "## final" +curl -s -u root: -XGET -H "x-databend-node-id:${NODE}" -w "\n" "http://localhost:8000/v1/query/${QID}/final" | jq ".next_uri"