Skip to content

Commit

Permalink
refactor: more explicit error confirmation for http query route error.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed May 29, 2024
1 parent 0b2f70a commit cd7cd07
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 51 deletions.
26 changes: 17 additions & 9 deletions src/query/service/src/servers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use crate::sessions::SessionType;
const DEDUPLICATE_LABEL: &str = "X-DATABEND-DEDUPLICATE-LABEL";
const USER_AGENT: &str = "User-Agent";
const QUERY_ID: &str = "X-DATABEND-QUERY-ID";
const NODE_ID: &str = "X-DATABEND-NODE-ID";

const TRACE_PARENT: &str = "traceparent";

Expand Down Expand Up @@ -253,9 +254,6 @@ impl<E> HTTPSessionEndpoint<E> {

let session = session_manager.register_session(session)?;

let ctx = session.create_query_context().await?;
let node_id = ctx.get_cluster().local_id.clone();

let deduplicate_label = req
.headers()
.get(DEDUPLICATE_LABEL)
Expand All @@ -266,24 +264,34 @@ impl<E> HTTPSessionEndpoint<E> {
.get(USER_AGENT)
.map(|id| id.to_str().unwrap().to_string());

let expected_node_id = req
.headers()
.get(NODE_ID)
.map(|id| id.to_str().unwrap().to_string());

let trace_parent = req
.headers()
.get(TRACE_PARENT)
.map(|id| id.to_str().unwrap().to_string());
let baggage = extract_baggage_from_headers(req.headers());
let opentelemetry_baggage = extract_baggage_from_headers(req.headers());
let client_host = get_client_ip(req);
Ok(HttpQueryContext::new(

let ctx = session.create_query_context().await?;
let node_id = ctx.get_cluster().local_id.clone();

Ok(HttpQueryContext {
session,
query_id,
node_id,
expected_node_id,
deduplicate_label,
user_agent,
trace_parent,
baggage,
req.method().to_string(),
req.uri().to_string(),
opentelemetry_baggage,
http_method: req.method().to_string(),
uri: req.uri().to_string(),
client_host,
))
})
}
}

Expand Down
21 changes: 13 additions & 8 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,14 @@ async fn query_final_handler(
ctx: &HttpQueryContext,
Path(query_id): Path<String>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);

async {
info!(
"{}: got /v1/query/{}/final request, this query is going to be finally completed.",
query_id, query_id
"{}: got {} request, this query is going to be finally completed.",
query_id,
make_final_uri(&query_id)
);
let http_query_manager = HttpQueryManager::instance();
match http_query_manager
Expand Down Expand Up @@ -262,13 +263,14 @@ async fn query_cancel_handler(
ctx: &HttpQueryContext,
Path(query_id): Path<String>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);

async {
info!(
"{}: got /v1/query/{}/kill request, cancel the query",
query_id, query_id
"{}: got {} request, cancel the query",
query_id,
make_kill_uri(&query_id)
);
let http_query_manager = HttpQueryManager::instance();
match http_query_manager
Expand All @@ -292,6 +294,7 @@ async fn query_state_handler(
ctx: &HttpQueryContext,
Path(query_id): Path<String>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
let root = get_http_tracing_span(full_name!(), ctx, &query_id);

async {
Expand All @@ -317,6 +320,7 @@ async fn query_page_handler(
ctx: &HttpQueryContext,
Path((query_id, page_no)): Path<(String, usize)>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);

Expand Down Expand Up @@ -352,7 +356,8 @@ pub(crate) async fn query_handler(
let _t = SlowRequestLogTracker::new(ctx);

async {
info!("http query new request: {:}", mask_connection_info(&format!("{:?}", req)));
let agent = ctx.user_agent.as_ref().map(|s|(format!("(from {s})"))).unwrap_or("".to_string());
info!("http query new request{}: {:}", agent, mask_connection_info(&format!("{:?}", req)));
let http_query_manager = HttpQueryManager::instance();
let sql = req.sql.clone();

Expand Down Expand Up @@ -436,7 +441,7 @@ fn query_id_to_trace_id(query_id: &str) -> TraceId {
}

/// The HTTP query endpoints are expected to be responses within 60 seconds.
/// If it exceeds far of 60 seconds, there might be something wrong, we should
/// If it exceeds far from 60 seconds, there might be something wrong, we should
/// log it.
struct SlowRequestLogTracker {
started_at: std::time::Instant,
Expand Down
52 changes: 24 additions & 28 deletions src/query/service/src/servers/http/v1/query/http_query_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use http::StatusCode;
use log::warn;
use poem::FromRequest;
use poem::Request;
use poem::RequestBody;
use poem::Result as PoemResult;
use time::Instant;

use crate::servers::http::v1::HttpQueryManager;
use crate::sessions::Session;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
Expand All @@ -29,6 +32,7 @@ pub struct HttpQueryContext {
pub session: Arc<Session>,
pub query_id: String,
pub node_id: String,
pub expected_node_id: Option<String>,
pub deduplicate_label: Option<String>,
pub user_agent: Option<String>,
pub trace_parent: Option<String>,
Expand All @@ -39,32 +43,6 @@ pub struct HttpQueryContext {
}

impl HttpQueryContext {
pub fn new(
session: Arc<Session>,
query_id: String,
node_id: String,
deduplicate_label: Option<String>,
user_agent: Option<String>,
trace_parent: Option<String>,
open_telemetry_baggage: Option<Vec<(String, String)>>,
http_method: String,
uri: String,
client_host: Option<String>,
) -> Self {
HttpQueryContext {
session,
query_id,
node_id,
deduplicate_label,
user_agent,
trace_parent,
opentelemetry_baggage: open_telemetry_baggage,
http_method,
uri,
client_host,
}
}

pub fn upgrade_session(&self, session_type: SessionType) -> Result<Arc<Session>, poem::Error> {
SessionManager::instance()
.try_upgrade_session(self.session.clone(), session_type.clone())
Expand Down Expand Up @@ -101,11 +79,29 @@ impl HttpQueryContext {
pub fn set_fail(&self) {
self.session.txn_mgr().lock().set_fail();
}

pub fn check_node_id(&self, query_id: &str) -> poem::Result<()> {
if let Some(expected_node_id) = self.expected_node_id.as_ref() {
if expected_node_id != &self.node_id {
let manager = HttpQueryManager::instance();
let start_time = manager.server_info.start_time.clone();
let uptime = (Instant::now() - manager.start_instant).as_seconds_f32();
let msg = format!(
"route error: query {query_id} SHOULD be on server {expected_node_id}, but current server is {}, which started at {start_time}({uptime} secs ago)",
self.node_id
);
warn!("{msg}");
return Err(poem::Error::from_string(msg, StatusCode::NOT_FOUND));
}
}

Ok(())
}
}

impl<'a> FromRequest<'a> for &'a HttpQueryContext {
#[async_backtrace::framed]
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> PoemResult<Self> {
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> poem::Result<Self> {
Ok(req.extensions().get::<HttpQueryContext>().expect(
"To use the `HttpQueryContext` extractor, the `HTTPSessionMiddleware` is required",
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_storages_common_txn::TxnManagerRef;
use parking_lot::Mutex;
use time::Instant;
use tokio::task;

use super::expiring_map::ExpiringMap;
Expand Down Expand Up @@ -77,6 +78,7 @@ impl<T> LimitedQueue<T> {
}

pub struct HttpQueryManager {
pub(crate) start_instant: Instant,
pub(crate) server_info: ServerInfo,
#[allow(clippy::type_complexity)]
pub(crate) queries: Arc<DashMap<String, Arc<HttpQuery>>>,
Expand All @@ -90,9 +92,10 @@ impl HttpQueryManager {
#[async_backtrace::framed]
pub async fn init(cfg: &InnerConfig) -> Result<()> {
GlobalInstance::set(Arc::new(HttpQueryManager {
start_instant: Instant::now(),
server_info: ServerInfo {
id: cfg.query.node_id.clone(),
start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Nanos, false),
start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Millis, false),
},
queries: Arc::new(DashMap::new()),
sessions: Mutex::new(ExpiringMap::default()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ use databend_query::auth::AuthMgr;
use databend_query::servers::http::middleware::get_client_ip;
use databend_query::servers::http::middleware::HTTPSessionEndpoint;
use databend_query::servers::http::middleware::HTTPSessionMiddleware;
use databend_query::servers::http::v1::make_final_uri;
use databend_query::servers::http::v1::make_page_uri;
use databend_query::servers::http::v1::make_state_uri;
use databend_query::servers::http::v1::query_route;
use databend_query::servers::http::v1::ExecuteStateKind;
use databend_query::servers::http::v1::HttpSessionConf;
Expand Down Expand Up @@ -276,15 +274,15 @@ async fn test_simple_sql() -> Result<()> {
assert!(result.error.is_none(), "{:?}", result.error);

let query_id = &result.id;
let final_uri = make_final_uri(query_id);
let final_uri = result.final_uri.clone().unwrap();

assert_eq!(result.state, ExecuteStateKind::Succeeded, "{:?}", result);
assert_eq!(result.next_uri, Some(final_uri.clone()), "{:?}", result);
assert_eq!(result.data.len(), 10, "{:?}", result);
assert_eq!(result.schema.len(), 19, "{:?}", result);

// get state
let uri = make_state_uri(query_id);
let uri = result.stats_uri.unwrap();
let (status, result) = get_uri_checked(&ep, &uri).await?;
assert_eq!(status, StatusCode::OK, "{:?}", result);
assert!(result.error.is_none(), "{:?}", result);
Expand Down Expand Up @@ -597,7 +595,7 @@ async fn test_pagination() -> Result<()> {
assert!(!result.schema.is_empty(), "{:?}", result);
if page == 5 {
// get state
let uri = make_state_uri(query_id);
let uri = result.stats_uri.clone().unwrap();
let (status, _state_result) = get_uri_checked(&ep, &uri).await?;
assert_eq!(status, StatusCode::OK);

Expand Down
15 changes: 15 additions & 0 deletions tests/suites/1_stateful/09_http_handler/09_0006_route_error.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# error
## page
{"error":{"code":"404","message":"route error: query QID SHOULD be on server XXX, but current server is NODE, which started ... ago)"}}
## kill
{"error":{"code":"404","message":"route error: query QID SHOULD be on server XXX, but current server is NODE, which started ... ago)"}}
## final
{"error":{"code":"404","message":"route error: query QID SHOULD be on server XXX, but current server is NODE, which started ... ago)"}}

# ok
## page
[["1"]]
## kill
200
## final
null
24 changes: 24 additions & 0 deletions tests/suites/1_stateful/09_http_handler/09_0006_route_error.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../../../shell_env.sh

QID="my_query_for_route_${RANDOM}"
NODE=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H "x-databend-query-id:${QID}" -H 'Content-Type: application/json' -d '{"sql": "select 1;"}' | jq -r ".session.last_server_info.id")
echo "# error"
echo "## page"
curl -s -u root: -XGET -H "x-databend-node-id:XXX" -w "\n" "http://localhost:8000/v1/query/${QID}/page/0" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../'
echo "## kill"
curl -s -u root: -XGET -H "x-databend-node-id:XXX" -w "\n" "http://localhost:8000/v1/query/${QID}/kill" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../'
echo "## final"
curl -s -u root: -XGET -H "x-databend-node-id:XXX" -w "\n" "http://localhost:8000/v1/query/${QID}/final" | sed "s/${QID}/QID/g" | sed "s/${NODE}/NODE/g" | sed 's/at.*secs/.../'

echo ""

echo "# ok"
echo "## page"
curl -s -u root: -XGET -H "x-databend-node-id:${NODE}" -w "\n" "http://localhost:8000/v1/query/${QID}/page/0" | jq -c ".data"
echo "## kill"
curl -s -u root: -XGET -H "x-databend-node-id:${NODE}" -w "%{http_code}\n" "http://localhost:8000/v1/query/${QID}/kill"
echo "## final"
curl -s -u root: -XGET -H "x-databend-node-id:${NODE}" -w "\n" "http://localhost:8000/v1/query/${QID}/final" | jq ".next_uri"

0 comments on commit cd7cd07

Please sign in to comment.