Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: more explicit error confirmation for http query route error. #15621

Merged
merged 2 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions src/query/service/src/servers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use crate::sessions::SessionType;
const DEDUPLICATE_LABEL: &str = "X-DATABEND-DEDUPLICATE-LABEL";
const USER_AGENT: &str = "User-Agent";
const QUERY_ID: &str = "X-DATABEND-QUERY-ID";
const NODE_ID: &str = "X-DATABEND-NODE-ID";

const TRACE_PARENT: &str = "traceparent";

Expand Down Expand Up @@ -253,9 +254,6 @@ impl<E> HTTPSessionEndpoint<E> {

let session = session_manager.register_session(session)?;

let ctx = session.create_query_context().await?;
let node_id = ctx.get_cluster().local_id.clone();

let deduplicate_label = req
.headers()
.get(DEDUPLICATE_LABEL)
Expand All @@ -266,24 +264,34 @@ impl<E> HTTPSessionEndpoint<E> {
.get(USER_AGENT)
.map(|id| id.to_str().unwrap().to_string());

let expected_node_id = req
.headers()
.get(NODE_ID)
.map(|id| id.to_str().unwrap().to_string());

let trace_parent = req
.headers()
.get(TRACE_PARENT)
.map(|id| id.to_str().unwrap().to_string());
let baggage = extract_baggage_from_headers(req.headers());
let opentelemetry_baggage = extract_baggage_from_headers(req.headers());
let client_host = get_client_ip(req);
Ok(HttpQueryContext::new(

let ctx = session.create_query_context().await?;
let node_id = ctx.get_cluster().local_id.clone();

Ok(HttpQueryContext {
session,
query_id,
node_id,
expected_node_id,
deduplicate_label,
user_agent,
trace_parent,
baggage,
req.method().to_string(),
req.uri().to_string(),
opentelemetry_baggage,
http_method: req.method().to_string(),
uri: req.uri().to_string(),
client_host,
))
})
}
}

Expand Down
21 changes: 13 additions & 8 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,14 @@ async fn query_final_handler(
ctx: &HttpQueryContext,
Path(query_id): Path<String>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);

async {
info!(
"{}: got /v1/query/{}/final request, this query is going to be finally completed.",
query_id, query_id
"{}: got {} request, this query is going to be finally completed.",
query_id,
make_final_uri(&query_id)
);
let http_query_manager = HttpQueryManager::instance();
match http_query_manager
Expand Down Expand Up @@ -262,13 +263,14 @@ async fn query_cancel_handler(
ctx: &HttpQueryContext,
Path(query_id): Path<String>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);

async {
info!(
"{}: got /v1/query/{}/kill request, cancel the query",
query_id, query_id
"{}: got {} request, cancel the query",
query_id,
make_kill_uri(&query_id)
);
let http_query_manager = HttpQueryManager::instance();
match http_query_manager
Expand All @@ -292,6 +294,7 @@ async fn query_state_handler(
ctx: &HttpQueryContext,
Path(query_id): Path<String>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
let root = get_http_tracing_span(full_name!(), ctx, &query_id);

async {
Expand All @@ -317,6 +320,7 @@ async fn query_page_handler(
ctx: &HttpQueryContext,
Path((query_id, page_no)): Path<(String, usize)>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);

Expand Down Expand Up @@ -352,7 +356,8 @@ pub(crate) async fn query_handler(
let _t = SlowRequestLogTracker::new(ctx);

async {
info!("http query new request: {:}", mask_connection_info(&format!("{:?}", req)));
let agent = ctx.user_agent.as_ref().map(|s|(format!("(from {s})"))).unwrap_or("".to_string());
info!("http query new request{}: {:}", agent, mask_connection_info(&format!("{:?}", req)));
let http_query_manager = HttpQueryManager::instance();
let sql = req.sql.clone();

Expand Down Expand Up @@ -436,7 +441,7 @@ fn query_id_to_trace_id(query_id: &str) -> TraceId {
}

/// The HTTP query endpoints are expected to be responses within 60 seconds.
/// If it exceeds far of 60 seconds, there might be something wrong, we should
/// If it exceeds far from 60 seconds, there might be something wrong, we should
/// log it.
struct SlowRequestLogTracker {
started_at: std::time::Instant,
Expand Down
52 changes: 24 additions & 28 deletions src/query/service/src/servers/http/v1/query/http_query_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use http::StatusCode;
use log::warn;
use poem::FromRequest;
use poem::Request;
use poem::RequestBody;
use poem::Result as PoemResult;
use time::Instant;

use crate::servers::http::v1::HttpQueryManager;
use crate::sessions::Session;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
Expand All @@ -29,6 +32,7 @@ pub struct HttpQueryContext {
pub session: Arc<Session>,
pub query_id: String,
pub node_id: String,
pub expected_node_id: Option<String>,
pub deduplicate_label: Option<String>,
pub user_agent: Option<String>,
pub trace_parent: Option<String>,
Expand All @@ -39,32 +43,6 @@ pub struct HttpQueryContext {
}

impl HttpQueryContext {
pub fn new(
session: Arc<Session>,
query_id: String,
node_id: String,
deduplicate_label: Option<String>,
user_agent: Option<String>,
trace_parent: Option<String>,
open_telemetry_baggage: Option<Vec<(String, String)>>,
http_method: String,
uri: String,
client_host: Option<String>,
) -> Self {
HttpQueryContext {
session,
query_id,
node_id,
deduplicate_label,
user_agent,
trace_parent,
opentelemetry_baggage: open_telemetry_baggage,
http_method,
uri,
client_host,
}
}

pub fn upgrade_session(&self, session_type: SessionType) -> Result<Arc<Session>, poem::Error> {
SessionManager::instance()
.try_upgrade_session(self.session.clone(), session_type.clone())
Expand Down Expand Up @@ -101,11 +79,29 @@ impl HttpQueryContext {
pub fn set_fail(&self) {
self.session.txn_mgr().lock().set_fail();
}

pub fn check_node_id(&self, query_id: &str) -> poem::Result<()> {
if let Some(expected_node_id) = self.expected_node_id.as_ref() {
if expected_node_id != &self.node_id {
let manager = HttpQueryManager::instance();
let start_time = manager.server_info.start_time.clone();
let uptime = (Instant::now() - manager.start_instant).as_seconds_f32();
let msg = format!(
"route error: query {query_id} SHOULD be on server {expected_node_id}, but current server is {}, which started at {start_time}({uptime} secs ago)",
self.node_id
);
warn!("{msg}");
return Err(poem::Error::from_string(msg, StatusCode::NOT_FOUND));
}
}

Ok(())
}
}

impl<'a> FromRequest<'a> for &'a HttpQueryContext {
#[async_backtrace::framed]
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> PoemResult<Self> {
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> poem::Result<Self> {
Ok(req.extensions().get::<HttpQueryContext>().expect(
"To use the `HttpQueryContext` extractor, the `HTTPSessionMiddleware` is required",
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_storages_common_txn::TxnManagerRef;
use parking_lot::Mutex;
use time::Instant;
use tokio::task;

use super::expiring_map::ExpiringMap;
Expand Down Expand Up @@ -77,6 +78,7 @@ impl<T> LimitedQueue<T> {
}

pub struct HttpQueryManager {
pub(crate) start_instant: Instant,
pub(crate) server_info: ServerInfo,
#[allow(clippy::type_complexity)]
pub(crate) queries: Arc<DashMap<String, Arc<HttpQuery>>>,
Expand All @@ -90,9 +92,10 @@ impl HttpQueryManager {
#[async_backtrace::framed]
pub async fn init(cfg: &InnerConfig) -> Result<()> {
GlobalInstance::set(Arc::new(HttpQueryManager {
start_instant: Instant::now(),
server_info: ServerInfo {
id: cfg.query.node_id.clone(),
start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Nanos, false),
start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Millis, false),
},
queries: Arc::new(DashMap::new()),
sessions: Mutex::new(ExpiringMap::default()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ use databend_query::auth::AuthMgr;
use databend_query::servers::http::middleware::get_client_ip;
use databend_query::servers::http::middleware::HTTPSessionEndpoint;
use databend_query::servers::http::middleware::HTTPSessionMiddleware;
use databend_query::servers::http::v1::make_final_uri;
use databend_query::servers::http::v1::make_page_uri;
use databend_query::servers::http::v1::make_state_uri;
use databend_query::servers::http::v1::query_route;
use databend_query::servers::http::v1::ExecuteStateKind;
use databend_query::servers::http::v1::HttpSessionConf;
Expand Down Expand Up @@ -276,15 +274,15 @@ async fn test_simple_sql() -> Result<()> {
assert!(result.error.is_none(), "{:?}", result.error);

let query_id = &result.id;
let final_uri = make_final_uri(query_id);
let final_uri = result.final_uri.clone().unwrap();

assert_eq!(result.state, ExecuteStateKind::Succeeded, "{:?}", result);
assert_eq!(result.next_uri, Some(final_uri.clone()), "{:?}", result);
assert_eq!(result.data.len(), 10, "{:?}", result);
assert_eq!(result.schema.len(), 19, "{:?}", result);

// get state
let uri = make_state_uri(query_id);
let uri = result.stats_uri.unwrap();
let (status, result) = get_uri_checked(&ep, &uri).await?;
assert_eq!(status, StatusCode::OK, "{:?}", result);
assert!(result.error.is_none(), "{:?}", result);
Expand Down Expand Up @@ -597,7 +595,7 @@ async fn test_pagination() -> Result<()> {
assert!(!result.schema.is_empty(), "{:?}", result);
if page == 5 {
// get state
let uri = make_state_uri(query_id);
let uri = result.stats_uri.clone().unwrap();
let (status, _state_result) = get_uri_checked(&ep, &uri).await?;
assert_eq!(status, StatusCode::OK);

Expand Down
15 changes: 15 additions & 0 deletions tests/suites/1_stateful/09_http_handler/09_0006_route_error.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# error
## page
{"error":{"code":"404","message":"route error: query QID SHOULD be on server XXX, but current server is NODE, which started ... ago)"}}
## kill
{"error":{"code":"404","message":"route error: query QID SHOULD be on server XXX, but current server is NODE, which started ... ago)"}}
## final
{"error":{"code":"404","message":"route error: query QID SHOULD be on server XXX, but current server is NODE, which started ... ago)"}}
youngsofun marked this conversation as resolved.
Show resolved Hide resolved

# ok
## page
[["1"]]
## kill
200
## final
null
24 changes: 24 additions & 0 deletions tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../../../shell_env.sh

QID="my_query_for_route_${RANDOM}"
NODE=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H "x-databend-query-id:${QID}" -H 'Content-Type: application/json' -d '{"sql": "select 1;"}' | jq -r ".session.last_server_info.id")
echo "# error"
echo "## page"
curl -s -u root: -XGET -H "x-databend-node-id:XXX" -w "\n" "http://localhost:8000/v1/query/${QID}/page/0" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../'
echo "## kill"
curl -s -u root: -XGET -H "x-databend-node-id:XXX" -w "\n" "http://localhost:8000/v1/query/${QID}/kill" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../'
echo "## final"
curl -s -u root: -XGET -H "x-databend-node-id:XXX" -w "\n" "http://localhost:8000/v1/query/${QID}/final" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../'

echo ""

echo "# ok"
echo "## page"
curl -s -u root: -XGET -H "x-databend-node-id:${NODE}" -w "\n" "http://localhost:8000/v1/query/${QID}/page/0" | jq -c ".data"
echo "## kill"
curl -s -u root: -XGET -H "x-databend-node-id:${NODE}" -w "%{http_code}\n" "http://localhost:8000/v1/query/${QID}/kill"
echo "## final"
curl -s -u root: -XGET -H "x-databend-node-id:${NODE}" -w "\n" "http://localhost:8000/v1/query/${QID}/final" | jq ".next_uri"
Loading