From 26d0918dd70cf3147b3bd8f8f4789733eba22701 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 1 Feb 2023 15:03:47 +0800 Subject: [PATCH 01/10] chore: some tiny typo/style fix Signed-off-by: Ruihang Xia --- src/servers/src/grpc.rs | 2 +- src/servers/src/http.rs | 4 ++-- src/servers/src/query_handler.rs | 20 ++++++++++---------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 4eee7617ca3f..afb111629fe9 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -81,7 +81,7 @@ impl Server for GrpcServer { .await .context(TcpBindSnafu { addr })?; let addr = listener.local_addr().context(TcpBindSnafu { addr })?; - info!("GRPC server is bound to {}", addr); + info!("gRPC server is bound to {}", addr); *shutdown_tx = Some(tx); diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 6ac52c55e9a1..3df7cbc2c235 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -406,8 +406,8 @@ impl HttpServer { pub fn make_app(&self) -> Router { let mut api = OpenApi { info: Info { - title: "Greptime DB HTTP API".to_string(), - description: Some("HTTP APIs to interact with Greptime DB".to_string()), + title: "GreptimeDB HTTP API".to_string(), + description: Some("HTTP APIs to interact with GreptimeDB".to_string()), version: HTTP_API_VERSION.to_string(), ..Info::default() }, diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index c678d7885ca7..eaf76884ae06 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -12,6 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! All query handler traits for various request protocols, like SQL or GRPC. +//! Instance that wishes to support certain request protocol, just implement the corresponding +//! trait, the Server will handle codec for you. +//! +//! Note: +//! Query handlers are not confined to only handle read requests, they are expecting to handle +//! write requests too. So the "query" here not might seem ambiguity. However, "query" has been +//! used as some kind of "convention", it's the "Q" in "SQL". So we might better stick to the +//! word "query". + pub mod grpc; pub mod sql; @@ -26,16 +36,6 @@ use crate::influxdb::InfluxdbRequest; use crate::opentsdb::codec::DataPoint; use crate::prometheus::Metrics; -/// All query handler traits for various request protocols, like SQL or GRPC. -/// Instance that wishes to support certain request protocol, just implement the corresponding -/// trait, the Server will handle codec for you. -/// -/// Note: -/// Query handlers are not confined to only handle read requests, they are expecting to handle -/// write requests too. So the "query" here not might seem ambiguity. However, "query" has been -/// used as some kind of "convention", it's the "Q" in "SQL". So we might better stick to the -/// word "query". - pub type OpentsdbProtocolHandlerRef = Arc; pub type InfluxdbLineProtocolHandlerRef = Arc; pub type PrometheusProtocolHandlerRef = Arc; From 792e0e7eebee3dd21180993d36cb76bd1e3df1c6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 1 Feb 2023 15:03:58 +0800 Subject: [PATCH 02/10] feat: add promql server Signed-off-by: Ruihang Xia --- src/cmd/src/standalone.rs | 4 + src/datanode/src/instance/sql.rs | 16 ++- src/datanode/src/metric.rs | 1 + src/frontend/src/frontend.rs | 3 + src/frontend/src/instance.rs | 20 +++ src/frontend/src/lib.rs | 1 + src/frontend/src/promql.rs | 42 ++++++ src/frontend/src/server.rs | 19 ++- src/servers/src/lib.rs | 4 +- src/servers/src/promql.rs | 220 +++++++++++++++++++++++++++++++ 10 files changed, 325 insertions(+), 5 deletions(-) create mode 100644 src/frontend/src/promql.rs create mode 100644 src/servers/src/promql.rs diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 26d30cbbf222..cb4efc072d1d 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -26,6 +26,7 @@ use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; use frontend::prometheus::PrometheusOptions; +use frontend::promql::PromqlOptions; use frontend::Plugins; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; @@ -72,6 +73,7 @@ pub struct StandaloneOptions { pub opentsdb_options: Option, pub influxdb_options: Option, pub prometheus_options: Option, + pub promql_options: Option, pub mode: Mode, pub wal: WalConfig, pub storage: ObjectStoreConfig, @@ -88,6 +90,7 @@ impl Default for StandaloneOptions { opentsdb_options: Some(OpentsdbOptions::default()), influxdb_options: Some(InfluxdbOptions::default()), prometheus_options: Some(PrometheusOptions::default()), + promql_options: Some(PromqlOptions::default()), mode: Mode::Standalone, wal: WalConfig::default(), storage: ObjectStoreConfig::default(), @@ -106,6 +109,7 @@ impl StandaloneOptions { opentsdb_options: self.opentsdb_options, influxdb_options: self.influxdb_options, prometheus_options: self.prometheus_options, + promql_options: self.promql_options, mode: self.mode, meta_client_opts: None, } diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index c9abf05ca3f5..8818c7c50574 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -14,13 +14,16 @@ use async_trait::async_trait; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::prelude::BoxedError; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::logging::info; use common_telemetry::timer; use query::parser::{QueryLanguageParser, QueryStatement}; +use servers::error as server_error; +use servers::promql::PromqlHandler; use servers::query_handler::sql::SqlQueryHandler; -use session::context::QueryContextRef; +use session::context::{QueryContext, QueryContextRef}; use snafu::prelude::*; use sql::ast::ObjectName; use sql::statements::statement::Statement; @@ -225,6 +228,17 @@ impl SqlQueryHandler for Instance { } } +#[async_trait] +impl PromqlHandler for Instance { + async fn do_query(&self, query: &str) -> server_error::Result { + let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED); + self.execute_promql(query, QueryContext::arc()) + .await + .map_err(BoxedError::new) + .with_context(|_| server_error::ExecuteQuerySnafu { query }) + } +} + #[cfg(test)] mod test { use std::sync::Arc; diff --git a/src/datanode/src/metric.rs b/src/datanode/src/metric.rs index 22cb05b2c5ff..88f50b5ed4e6 100644 --- a/src/datanode/src/metric.rs +++ b/src/datanode/src/metric.rs @@ -17,3 +17,4 @@ pub const METRIC_HANDLE_SQL_ELAPSED: &str = "datanode.handle_sql_elapsed"; pub const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "datanode.handle_scripts_elapsed"; pub const METRIC_RUN_SCRIPT_ELAPSED: &str = "datanode.run_script_elapsed"; +pub const METRIC_HANDLE_PROMQL_ELAPSED: &str = "datanode.handle_promql_elapsed"; diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index ad92b65dff78..899c5929bc1a 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -28,6 +28,7 @@ use crate::mysql::MysqlOptions; use crate::opentsdb::OpentsdbOptions; use crate::postgres::PostgresOptions; use crate::prometheus::PrometheusOptions; +use crate::promql::PromqlOptions; use crate::server::Services; use crate::Plugins; @@ -41,6 +42,7 @@ pub struct FrontendOptions { pub opentsdb_options: Option, pub influxdb_options: Option, pub prometheus_options: Option, + pub promql_options: Option, pub mode: Mode, pub meta_client_opts: Option, } @@ -55,6 +57,7 @@ impl Default for FrontendOptions { opentsdb_options: Some(OpentsdbOptions::default()), influxdb_options: Some(InfluxdbOptions::default()), prometheus_options: Some(PrometheusOptions::default()), + promql_options: Some(PromqlOptions::default()), mode: Mode::Standalone, meta_client_opts: None, } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 812573e23edc..9ff659f998b5 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -42,6 +42,7 @@ use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOpts; use servers::error as server_error; use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef}; +use servers::promql::{PromqlHandler, PromqlHandlerRef}; use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; use servers::query_handler::sql::{SqlQueryHandler, SqlQueryHandlerRef}; use servers::query_handler::{ @@ -71,6 +72,7 @@ pub trait FrontendInstance: + InfluxdbLineProtocolHandler + PrometheusProtocolHandler + ScriptHandler + + PromqlHandler + Send + Sync + 'static @@ -88,6 +90,7 @@ pub struct Instance { script_handler: Option, sql_handler: SqlQueryHandlerRef, grpc_query_handler: GrpcQueryHandlerRef, + promql_handler: Option, create_expr_factory: CreateExprFactoryRef, @@ -121,6 +124,7 @@ impl Instance { create_expr_factory: Arc::new(DefaultCreateExprFactory), sql_handler: dist_instance.clone(), grpc_query_handler: dist_instance, + promql_handler: None, plugins: Default::default(), }) } @@ -162,6 +166,7 @@ impl Instance { create_expr_factory: Arc::new(DefaultCreateExprFactory), sql_handler: StandaloneSqlQueryHandler::arc(dn_instance.clone()), grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()), + promql_handler: Some(dn_instance.clone()), plugins: Default::default(), } } @@ -174,6 +179,7 @@ impl Instance { create_expr_factory: Arc::new(DefaultCreateExprFactory), sql_handler: dist_instance.clone(), grpc_query_handler: dist_instance, + promql_handler: None, plugins: Default::default(), } } @@ -506,6 +512,20 @@ impl ScriptHandler for Instance { } } +#[async_trait] +impl PromqlHandler for Instance { + async fn do_query(&self, query: &str) -> server_error::Result { + if let Some(promql_handler) = &self.promql_handler { + promql_handler.do_query(query).await + } else { + server_error::NotSupportedSnafu { + feat: "PromQL query in Frontend", + } + .fail() + } + } +} + #[cfg(test)] mod tests { use std::borrow::Cow; diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 14dd7328b32e..ff8d03e3005d 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -29,6 +29,7 @@ pub mod opentsdb; pub mod partitioning; pub mod postgres; pub mod prometheus; +pub mod promql; mod server; pub mod spliter; mod sql; diff --git a/src/frontend/src/promql.rs b/src/frontend/src/promql.rs new file mode 100644 index 000000000000..5f2024cd30c2 --- /dev/null +++ b/src/frontend/src/promql.rs @@ -0,0 +1,42 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PromqlOptions { + pub enable: bool, + pub addr: String, +} + +impl Default for PromqlOptions { + fn default() -> Self { + Self { + enable: true, + addr: "127.0.0.1:4004".to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use super::PromqlOptions; + + #[test] + fn test_prometheus_options() { + let default = PromqlOptions::default(); + assert!(default.enable); + assert_eq!(default.addr, "127.0.0.1:4004".to_string()); + } +} diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index e438d0657b9c..daff47ed26fb 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -24,6 +24,7 @@ use servers::http::HttpServer; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::opentsdb::OpentsdbServer; use servers::postgres::PostgresServer; +use servers::promql::PromqlServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; @@ -154,7 +155,7 @@ impl Services { ServerSqlQueryHandlerAdaptor::arc(instance.clone()), http_options.clone(), ); - if let Some(user_provider) = user_provider { + if let Some(user_provider) = user_provider.clone() { http_server.set_user_provider(user_provider); } @@ -181,12 +182,26 @@ impl Services { None }; + let promql_server_and_addr = if let Some(promql_options) = &opts.promql_options { + let promql_addr = parse_addr(&promql_options.addr)?; + + let mut promql_server = PromqlServer::create_server(instance.clone()); + if let Some(user_provider) = user_provider { + promql_server.set_user_provider(user_provider); + } + + Some((promql_server as _, promql_addr)) + } else { + None + }; + try_join!( start_server(http_server_and_addr), start_server(grpc_server_and_addr), start_server(mysql_server_and_addr), start_server(postgres_server_and_addr), - start_server(opentsdb_server_and_addr) + start_server(opentsdb_server_and_addr), + start_server(promql_server_and_addr), ) .context(error::StartServerSnafu)?; Ok(()) diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 6233ef1e5c7a..55fb29a3112b 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -28,11 +28,11 @@ pub mod mysql; pub mod opentsdb; pub mod postgres; pub mod prometheus; +pub mod promql; pub mod query_handler; pub mod server; -pub mod tls; - mod shutdown; +pub mod tls; #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] #[serde(rename_all = "lowercase")] diff --git a/src/servers/src/promql.rs b/src/servers/src/promql.rs new file mode 100644 index 000000000000..e8e24448893d --- /dev/null +++ b/src/servers/src/promql.rs @@ -0,0 +1,220 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; + +use aide::openapi::{Info, OpenApi, Server as OpenAPIServer}; +use async_trait::async_trait; +use axum::body::BoxBody; +use axum::error_handling::HandleErrorLayer; +use axum::extract::{Query, State}; +use axum::{routing, BoxError, Json, Router}; +use common_query::Output; +use common_runtime::Runtime; +use common_telemetry::info; +use futures::FutureExt; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use tokio::sync::oneshot::Sender; +use tokio::sync::{oneshot, Mutex}; +use tower::timeout::TimeoutLayer; +use tower::ServiceBuilder; +use tower_http::auth::AsyncRequireAuthorizationLayer; +use tower_http::trace::TraceLayer; + +use crate::auth::UserProviderRef; +use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; +use crate::http::authorize::HttpAuth; +use crate::http::JsonResponse; +use crate::server::{BaseTcpServer, Server}; + +pub const PROMQL_API_VERSION: &str = "v1"; + +pub type PromqlHandlerRef = Arc; + +#[async_trait] +pub trait PromqlHandler { + async fn do_query(&self, query: &str) -> Result; +} + +pub struct PromqlServer { + query_handler: PromqlHandlerRef, + shutdown_tx: Mutex>>, + user_provider: Option, +} + +impl PromqlServer { + pub fn create_server(query_handler: PromqlHandlerRef) -> Box { + Box::new(PromqlServer { + query_handler, + shutdown_tx: Mutex::new(None), + user_provider: None, + }) + } + + pub fn set_user_provider(&mut self, user_provider: UserProviderRef) { + debug_assert!(self.user_provider.is_none()); + self.user_provider = Some(user_provider); + } + + fn make_app(&self) -> Router { + let api = OpenApi { + info: Info { + title: "GreptimeDB PromQL API".to_string(), + description: Some("PromQL HTTP Api in GreptimeDB".to_string()), + version: PROMQL_API_VERSION.to_string(), + ..Info::default() + }, + servers: vec![OpenAPIServer { + url: format!("/{PROMQL_API_VERSION}"), + ..OpenAPIServer::default() + }], + ..OpenApi::default() + }; + + // TODO(ruihang): implement format_query, series, labels, values, query_examplars and targets methods + let router = Router::new() + .route( + &format!("/{PROMQL_API_VERSION}/query"), + routing::post(instant_query).get(instant_query), + ) + .route( + &format!("/{PROMQL_API_VERSION}/range_query"), + routing::post(range_query).get(range_query), + ) + .with_state(self.query_handler.clone()); + + router + // middlewares + .layer( + ServiceBuilder::new() + // .layer(HandleErrorLayer::new(handle_error)) + .layer(TraceLayer::new_for_http()) + // .layer(TimeoutLayer::new(self.options.timeout)) + // custom layer + .layer(AsyncRequireAuthorizationLayer::new( + HttpAuth::::new(self.user_provider.clone()), + )), + ) + } +} + +#[async_trait] +impl Server for PromqlServer { + async fn shutdown(&self) -> Result<()> { + let mut shutdown_tx = self.shutdown_tx.lock().await; + if let Some(tx) = shutdown_tx.take() { + if tx.send(()).is_err() { + info!("Receiver dropped, the PromQl server has already existed"); + } + } + info!("Shutdown PromQL server"); + + Ok(()) + } + + async fn start(&self, listening: SocketAddr) -> Result { + let (tx, rx) = oneshot::channel(); + let server = { + let mut shutdown_tx = self.shutdown_tx.lock().await; + ensure!( + shutdown_tx.is_none(), + AlreadyStartedSnafu { server: "PromQL" } + ); + + let app = self.make_app(); + let server = axum::Server::bind(&listening).serve(app.into_make_service()); + + *shutdown_tx = Some(tx); + + server + }; + let listening = server.local_addr(); + info!("PromQL server is bound to {}", listening); + + let graceful = server.with_graceful_shutdown(rx.map(drop)); + graceful.await.context(StartHttpSnafu)?; + + Ok(listening) + } +} + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct PromqlJsonResponse { + status: String, + data: HashMap<(), ()>, + error: Option, + error_type: Option, + warnings: Option>, +} + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct InstantQuery { + query: String, + time: Option, + timeout: Option, +} + +#[axum_macros::debug_handler] +pub async fn instant_query( + State(handler): State, + Query(params): Query, +) -> Json { + Json(PromqlJsonResponse { + status: "error".to_string(), + data: HashMap::new(), + error: Some("not implemented".to_string()), + error_type: Some("not implemented".to_string()), + warnings: None, + }) +} + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct RangeQuery { + query: String, + start: String, + end: String, + step: String, + timeout: Option, +} + +#[axum_macros::debug_handler] +pub async fn range_query( + State(handler): State, + Query(params): Query, +) -> Json { + let result = handler.do_query(¶ms.query).await; + + // let request = decode_remote_read_request(body).await?; + + // handler + // .read(params.db.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), request) + // .await + + todo!() +} + +/// handle error middleware +async fn handle_error(err: BoxError) -> Json { + Json(PromqlJsonResponse { + status: "error".to_string(), + data: HashMap::new(), + error: Some(format!("Unhandled internal error: {err}")), + error_type: Some("internal".to_string()), + warnings: None, + }) +} From fcfcd55f7b7a88996f0c7b62484ae42458595fa5 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 1 Feb 2023 17:07:23 +0800 Subject: [PATCH 03/10] works for mocked query Signed-off-by: Ruihang Xia --- src/servers/src/promql.rs | 117 ++++++++++++++++++++++++++++++-------- 1 file changed, 92 insertions(+), 25 deletions(-) diff --git a/src/servers/src/promql.rs b/src/servers/src/promql.rs index e8e24448893d..1f4e17479ac6 100644 --- a/src/servers/src/promql.rs +++ b/src/servers/src/promql.rs @@ -22,13 +22,14 @@ use axum::body::BoxBody; use axum::error_handling::HandleErrorLayer; use axum::extract::{Query, State}; use axum::{routing, BoxError, Json, Router}; +use common_error::prelude::ErrorExt; use common_query::Output; -use common_runtime::Runtime; +use common_recordbatch::RecordBatches; use common_telemetry::info; use futures::FutureExt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::oneshot::Sender; use tokio::sync::{oneshot, Mutex}; use tower::timeout::TimeoutLayer; @@ -37,10 +38,9 @@ use tower_http::auth::AsyncRequireAuthorizationLayer; use tower_http::trace::TraceLayer; use crate::auth::UserProviderRef; -use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; +use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartHttpSnafu}; use crate::http::authorize::HttpAuth; -use crate::http::JsonResponse; -use crate::server::{BaseTcpServer, Server}; +use crate::server::Server; pub const PROMQL_API_VERSION: &str = "v1"; @@ -153,15 +153,92 @@ impl Server for PromqlServer { } } +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct PromqlSeries { + metric: HashMap, + values: Vec<(i64, String)>, +} + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct PromqlData { + result_type: String, + data: Vec, +} + #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct PromqlJsonResponse { status: String, - data: HashMap<(), ()>, + data: PromqlData, error: Option, error_type: Option, warnings: Option>, } +impl PromqlJsonResponse { + pub fn error(error_type: S1, reason: S2) -> Json + where + S1: Into, + S2: Into, + { + Json(PromqlJsonResponse { + status: "error".to_string(), + data: PromqlData::default(), + error: Some(reason.into()), + error_type: Some(error_type.into()), + warnings: None, + }) + } + + pub fn success(data: PromqlData) -> Json { + Json(PromqlJsonResponse { + status: "success".to_string(), + data, + error: None, + error_type: None, + warnings: None, + }) + } + + /// Convert from `Result` + pub async fn from_query_result(result: Result) -> Json { + match result { + Ok(Output::RecordBatches(batches)) => { + Self::success(Self::record_batches_to_data(batches).unwrap()) + } + Ok(Output::Stream(stream)) => { + let record_batches = RecordBatches::try_collect(stream).await.unwrap(); + Self::success(Self::record_batches_to_data(record_batches).unwrap()) + } + Ok(Output::AffectedRows(_)) => Self::error( + "unexpected result", + "expected data result, but got affected rows", + ), + Err(err) => Self::error(err.status_code().to_string(), err.to_string()), + } + } + + fn record_batches_to_data(batches: RecordBatches) -> Result { + info!("schema: {:?}", batches.schema()); + for batch in batches.iter() { + for row in batch.rows() { + info!("row: {row:?}",); + } + } + + let data = PromqlData { + result_type: "matrix".to_string(), + data: vec![PromqlSeries { + metric: vec![("__name__".to_string(), "foo".to_string())] + .into_iter() + .collect(), + values: vec![(1, "123.45".to_string())], + }], + }; + + Ok(data) + } +} + #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct InstantQuery { query: String, @@ -171,16 +248,13 @@ pub struct InstantQuery { #[axum_macros::debug_handler] pub async fn instant_query( - State(handler): State, - Query(params): Query, + State(_handler): State, + Query(_params): Query, ) -> Json { - Json(PromqlJsonResponse { - status: "error".to_string(), - data: HashMap::new(), - error: Some("not implemented".to_string()), - error_type: Some("not implemented".to_string()), - warnings: None, - }) + PromqlJsonResponse::error( + "not implemented", + "instant query api `/query` is not implemented. Use `/range_query` instead.", + ) } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] @@ -196,23 +270,16 @@ pub struct RangeQuery { pub async fn range_query( State(handler): State, Query(params): Query, -) -> Json { +) -> Json { let result = handler.do_query(¶ms.query).await; - - // let request = decode_remote_read_request(body).await?; - - // handler - // .read(params.db.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), request) - // .await - - todo!() + PromqlJsonResponse::from_query_result(result).await } /// handle error middleware async fn handle_error(err: BoxError) -> Json { Json(PromqlJsonResponse { status: "error".to_string(), - data: HashMap::new(), + data: PromqlData::default(), error: Some(format!("Unhandled internal error: {err}")), error_type: Some("internal".to_string()), warnings: None, From c23a30a10416ee212594dcd165a294359b2aa396 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 1 Feb 2023 17:15:01 +0800 Subject: [PATCH 04/10] clean up Signed-off-by: Ruihang Xia --- src/servers/src/promql.rs | 41 ++++++--------------------------------- 1 file changed, 6 insertions(+), 35 deletions(-) diff --git a/src/servers/src/promql.rs b/src/servers/src/promql.rs index 1f4e17479ac6..6c2e68de0661 100644 --- a/src/servers/src/promql.rs +++ b/src/servers/src/promql.rs @@ -16,12 +16,10 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; -use aide::openapi::{Info, OpenApi, Server as OpenAPIServer}; use async_trait::async_trait; use axum::body::BoxBody; -use axum::error_handling::HandleErrorLayer; use axum::extract::{Query, State}; -use axum::{routing, BoxError, Json, Router}; +use axum::{routing, Json, Router}; use common_error::prelude::ErrorExt; use common_query::Output; use common_recordbatch::RecordBatches; @@ -29,16 +27,15 @@ use common_telemetry::info; use futures::FutureExt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, ResultExt}; use tokio::sync::oneshot::Sender; use tokio::sync::{oneshot, Mutex}; -use tower::timeout::TimeoutLayer; use tower::ServiceBuilder; use tower_http::auth::AsyncRequireAuthorizationLayer; use tower_http::trace::TraceLayer; use crate::auth::UserProviderRef; -use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartHttpSnafu}; +use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; use crate::http::authorize::HttpAuth; use crate::server::Server; @@ -72,20 +69,6 @@ impl PromqlServer { } fn make_app(&self) -> Router { - let api = OpenApi { - info: Info { - title: "GreptimeDB PromQL API".to_string(), - description: Some("PromQL HTTP Api in GreptimeDB".to_string()), - version: PROMQL_API_VERSION.to_string(), - ..Info::default() - }, - servers: vec![OpenAPIServer { - url: format!("/{PROMQL_API_VERSION}"), - ..OpenAPIServer::default() - }], - ..OpenApi::default() - }; - // TODO(ruihang): implement format_query, series, labels, values, query_examplars and targets methods let router = Router::new() .route( @@ -102,9 +85,7 @@ impl PromqlServer { // middlewares .layer( ServiceBuilder::new() - // .layer(HandleErrorLayer::new(handle_error)) .layer(TraceLayer::new_for_http()) - // .layer(TimeoutLayer::new(self.options.timeout)) // custom layer .layer(AsyncRequireAuthorizationLayer::new( HttpAuth::::new(self.user_provider.clone()), @@ -161,8 +142,9 @@ pub struct PromqlSeries { #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct PromqlData { + #[serde(rename = "resultType")] result_type: String, - data: Vec, + result: Vec, } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] @@ -227,7 +209,7 @@ impl PromqlJsonResponse { let data = PromqlData { result_type: "matrix".to_string(), - data: vec![PromqlSeries { + result: vec![PromqlSeries { metric: vec![("__name__".to_string(), "foo".to_string())] .into_iter() .collect(), @@ -274,14 +256,3 @@ pub async fn range_query( let result = handler.do_query(¶ms.query).await; PromqlJsonResponse::from_query_result(result).await } - -/// handle error middleware -async fn handle_error(err: BoxError) -> Json { - Json(PromqlJsonResponse { - status: "error".to_string(), - data: PromqlData::default(), - error: Some(format!("Unhandled internal error: {err}")), - error_type: Some("internal".to_string()), - warnings: None, - }) -} From 4ae87e33a372467842b669531a71489c445fa5a6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 1 Feb 2023 18:47:16 +0800 Subject: [PATCH 05/10] integration test case Signed-off-by: Ruihang Xia --- src/servers/src/promql.rs | 2 +- tests-integration/src/test_util.rs | 25 +++++++++++++++++-- tests-integration/tests/http.rs | 39 ++++++++++++++++++++++++++---- 3 files changed, 58 insertions(+), 8 deletions(-) diff --git a/src/servers/src/promql.rs b/src/servers/src/promql.rs index 6c2e68de0661..5445ee0758da 100644 --- a/src/servers/src/promql.rs +++ b/src/servers/src/promql.rs @@ -68,7 +68,7 @@ impl PromqlServer { self.user_provider = Some(user_provider); } - fn make_app(&self) -> Router { + pub fn make_app(&self) -> Router { // TODO(ruihang): implement format_query, series, labels, values, query_examplars and targets methods let router = Router::new() .route( diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 978495cec796..3232b846289f 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -40,6 +40,7 @@ use once_cell::sync::OnceCell; use rand::Rng; use servers::grpc::GrpcServer; use servers::http::{HttpOptions, HttpServer}; +use servers::promql::PromqlServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; @@ -265,7 +266,7 @@ async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance { frontend_instance } -pub async fn setup_test_app(store_type: StorageType, name: &str) -> (Router, TestGuard) { +pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) { let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); instance.start().await.unwrap(); @@ -283,7 +284,7 @@ pub async fn setup_test_app(store_type: StorageType, name: &str) -> (Router, Tes (http_server.make_app(), guard) } -pub async fn setup_test_app_with_frontend( +pub async fn setup_test_http_app_with_frontend( store_type: StorageType, name: &str, ) -> (Router, TestGuard) { @@ -307,6 +308,26 @@ pub async fn setup_test_app_with_frontend( (app, guard) } +pub async fn setup_test_promql_app_with_frontend( + store_type: StorageType, + name: &str, +) -> (Router, TestGuard) { + let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); + let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); + let frontend = build_frontend_instance(instance.clone()).await; + instance.start().await.unwrap(); + create_test_table( + frontend.catalog_manager(), + instance.sql_handler(), + ConcreteDataType::timestamp_millisecond_datatype(), + ) + .await + .unwrap(); + let promql_server = PromqlServer::create_server(Arc::new(frontend) as _); + let app = promql_server.make_app(); + (app, guard) +} + pub async fn setup_grpc_server( store_type: StorageType, name: &str, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7634f9b9286d..2daa73649dc1 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -18,7 +18,10 @@ use common_error::status_code::StatusCode as ErrorCode; use serde_json::json; use servers::http::handler::HealthResponse; use servers::http::{JsonOutput, JsonResponse}; -use tests_integration::test_util::{setup_test_app, setup_test_app_with_frontend, StorageType}; +use tests_integration::test_util::{ + setup_test_http_app, setup_test_http_app_with_frontend, setup_test_promql_app_with_frontend, + StorageType, +}; #[macro_export] macro_rules! http_test { @@ -50,6 +53,7 @@ macro_rules! http_tests { $service, test_sql_api, + test_promql_api, test_metrics_api, test_scripts_api, test_health_api, @@ -60,7 +64,7 @@ macro_rules! http_tests { pub async fn test_sql_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (app, mut guard) = setup_test_app_with_frontend(store_type, "sql_api").await; + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await; let client = TestClient::new(app); let res = client.get("/v1/sql").send().await; assert_eq!(res.status(), StatusCode::OK); @@ -261,10 +265,35 @@ pub async fn test_sql_api(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_promql_api(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_promql_app_with_frontend(store_type, "promql_api").await; + let client = TestClient::new(app); + + // instant query + let res = client.get("/v1/query?query=up").send().await; + assert_eq!(res.status(), StatusCode::OK); + let res = client.post("/v1/query?query=up").send().await; + assert_eq!(res.status(), StatusCode::OK); + + let res = client + .get("/v1/range_query?query=up&start=1&end=100&step=5") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let res = client + .post("/v1/range_query?query=up&start=1&end=100&step=5") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + guard.remove_all().await; +} + pub async fn test_metrics_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); common_telemetry::init_default_metrics_recorder(); - let (app, mut guard) = setup_test_app(store_type, "metrics_api").await; + let (app, mut guard) = setup_test_http_app(store_type, "metrics_api").await; let client = TestClient::new(app); // Send a sql @@ -284,7 +313,7 @@ pub async fn test_metrics_api(store_type: StorageType) { pub async fn test_scripts_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (app, mut guard) = setup_test_app_with_frontend(store_type, "script_api").await; + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "script_api").await; let client = TestClient::new(app); let res = client @@ -325,7 +354,7 @@ def test(n): pub async fn test_health_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (app, _guard) = setup_test_app_with_frontend(store_type, "health_api").await; + let (app, _guard) = setup_test_http_app_with_frontend(store_type, "health_api").await; let client = TestClient::new(app); // we can call health api with both `GET` and `POST` method. From f1f709eb2f3002882ddf2293ff09cff87f4d07cf Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 2 Feb 2023 16:38:34 +0800 Subject: [PATCH 06/10] resolve CR comments Signed-off-by: Ruihang Xia --- src/frontend/src/lib.rs | 1 + src/frontend/src/server.rs | 2 +- src/servers/src/lib.rs | 1 + src/servers/src/promql.rs | 44 ++++++++++++++++++++++---------------- 4 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 36fa94ec1fd0..58440cb10910 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] +#![feature(let_chains)] pub type Plugins = anymap::Map; diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index daff47ed26fb..8302d53daaeb 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -182,7 +182,7 @@ impl Services { None }; - let promql_server_and_addr = if let Some(promql_options) = &opts.promql_options { + let promql_server_and_addr = if let Some(promql_options) = &opts.promql_options && promql_options.enable { let promql_addr = parse_addr(&promql_options.addr)?; let mut promql_server = PromqlServer::create_server(instance.clone()); diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 55fb29a3112b..d759a57bb6a8 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] +#![feature(try_blocks)] use common_catalog::consts::DEFAULT_CATALOG_NAME; use serde::{Deserialize, Serialize}; diff --git a/src/servers/src/promql.rs b/src/servers/src/promql.rs index 5445ee0758da..4042cb388990 100644 --- a/src/servers/src/promql.rs +++ b/src/servers/src/promql.rs @@ -23,7 +23,7 @@ use axum::{routing, Json, Router}; use common_error::prelude::ErrorExt; use common_query::Output; use common_recordbatch::RecordBatches; -use common_telemetry::info; +use common_telemetry::{debug, info}; use futures::FutureExt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -35,7 +35,7 @@ use tower_http::auth::AsyncRequireAuthorizationLayer; use tower_http::trace::TraceLayer; use crate::auth::UserProviderRef; -use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; +use crate::error::{AlreadyStartedSnafu, CollectRecordbatchSnafu, Result, StartHttpSnafu}; use crate::http::authorize::HttpAuth; use crate::server::Server; @@ -183,27 +183,35 @@ impl PromqlJsonResponse { /// Convert from `Result` pub async fn from_query_result(result: Result) -> Json { - match result { - Ok(Output::RecordBatches(batches)) => { - Self::success(Self::record_batches_to_data(batches).unwrap()) - } - Ok(Output::Stream(stream)) => { - let record_batches = RecordBatches::try_collect(stream).await.unwrap(); - Self::success(Self::record_batches_to_data(record_batches).unwrap()) - } - Ok(Output::AffectedRows(_)) => Self::error( - "unexpected result", - "expected data result, but got affected rows", - ), - Err(err) => Self::error(err.status_code().to_string(), err.to_string()), - } + let response: Result> = try { + let json = match result? { + Output::RecordBatches(batches) => { + Self::success(Self::record_batches_to_data(batches)?) + } + Output::Stream(stream) => { + let record_batches = RecordBatches::try_collect(stream) + .await + .context(CollectRecordbatchSnafu)?; + Self::success(Self::record_batches_to_data(record_batches)?) + } + Output::AffectedRows(_) => Self::error( + "unexpected result", + "expected data result, but got affected rows", + ), + }; + + json + }; + + response.unwrap_or_else(|err| Self::error(err.status_code().to_string(), err.to_string())) } + /// TODO(ruihang): implement this conversion method fn record_batches_to_data(batches: RecordBatches) -> Result { - info!("schema: {:?}", batches.schema()); + debug!("schema: {:?}", batches.schema()); for batch in batches.iter() { for row in batch.rows() { - info!("row: {row:?}",); + debug!("row: {row:?}",); } } From d27b8e825a8e10b0f799576298f7d01ae1d52329 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 2 Feb 2023 19:02:14 +0800 Subject: [PATCH 07/10] expose promql api to our http server Signed-off-by: Ruihang Xia --- src/datanode/src/instance/sql.rs | 10 +++++++++ src/frontend/src/error.rs | 9 ++++++++ src/frontend/src/instance.rs | 14 +++++++++++- src/frontend/src/instance/distributed.rs | 8 +++++++ src/frontend/src/instance/standalone.rs | 8 +++++++ src/servers/src/http.rs | 13 ++++++++++++ src/servers/src/http/handler.rs | 26 +++++++++++++++++++++++ src/servers/src/query_handler/sql.rs | 22 +++++++++++++++++++ src/servers/tests/http/influxdb_test.rs | 8 +++++++ src/servers/tests/http/opentsdb_test.rs | 8 +++++++ src/servers/tests/http/prometheus_test.rs | 8 +++++++ src/servers/tests/mod.rs | 8 +++++++ 12 files changed, 141 insertions(+), 1 deletion(-) diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 8818c7c50574..9f236db2e277 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -210,6 +210,16 @@ impl SqlQueryHandler for Instance { vec![result] } + async fn do_promql_query( + &self, + query: &str, + query_ctx: QueryContextRef, + ) -> Vec> { + let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED); + let result = self.execute_promql(query, query_ctx).await; + vec![result] + } + async fn do_statement_query( &self, stmt: Statement, diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 4498ba580cdd..3bfb94bc983e 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -330,6 +330,14 @@ pub enum Error { #[snafu(backtrace)] source: partition::error::Error, }, + + // TODO(ruihang): merge all query execution error kinds + #[snafu(display("failed to execute PromQL query {}, source: {}", query, source))] + ExecutePromql { + query: String, + #[snafu(backtrace)] + source: servers::error::Error, + }, } pub type Result = std::result::Result; @@ -345,6 +353,7 @@ impl ErrorExt for Error { Error::NotSupported { .. } => StatusCode::Unsupported, Error::RuntimeResource { source, .. } => source.status_code(), + Error::ExecutePromql { source, .. } => source.status_code(), Error::SqlExecIntercepted { source, .. } => source.status_code(), Error::StartServer { source, .. } => source.status_code(), diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 38924c6a8810..e8964935e22f 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -59,7 +59,7 @@ use sql::statements::statement::Statement; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; -use crate::error::{self, Error, MissingMetasrvOptsSnafu, Result}; +use crate::error::{self, Error, ExecutePromqlSnafu, MissingMetasrvOptsSnafu, Result}; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; use crate::instance::standalone::{StandaloneGrpcQueryHandler, StandaloneSqlQueryHandler}; @@ -466,6 +466,18 @@ impl SqlQueryHandler for Instance { } } + async fn do_promql_query(&self, query: &str, _: QueryContextRef) -> Vec> { + if let Some(handler) = &self.promql_handler { + let result = handler + .do_query(query) + .await + .context(ExecutePromqlSnafu { query }); + vec![result] + } else { + vec![] + } + } + async fn do_statement_query( &self, stmt: Statement, diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 897d48cd86bd..900a3c1cf0bf 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -376,6 +376,14 @@ impl SqlQueryHandler for DistInstance { self.handle_sql(query, query_ctx).await } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, stmt: Statement, diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index f08859577702..88d8a865c127 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -47,6 +47,14 @@ impl SqlQueryHandler for StandaloneSqlQueryHandler { .collect() } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, stmt: Statement, diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 3df7cbc2c235..5f2522fd1b3a 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -477,6 +477,11 @@ impl HttpServer { apirouting::get_with(handler::sql, handler::sql_docs) .post_with(handler::sql, handler::sql_docs), ) + .api_route( + "/promql", + apirouting::get_with(handler::promql, handler::sql_docs) + .post_with(handler::promql, handler::sql_docs), + ) .api_route("/scripts", apirouting::post(script::scripts)) .api_route("/run-script", apirouting::post(script::run_script)) .route("/private/api.json", apirouting::get(serve_api)) @@ -584,6 +589,14 @@ mod test { unimplemented!() } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, _stmt: sql::statements::statement::Statement, diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 0c7daac6b87d..0598fe03935e 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -59,6 +59,32 @@ pub async fn sql( Json(resp.with_execution_time(start.elapsed().as_millis())) } +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct PromqlQuery { + pub query: String, +} + +/// Handler to execute promql +#[axum_macros::debug_handler] +pub async fn promql( + State(state): State, + Query(params): Query, + // TODO(fys): pass _user_info into query context + _user_info: Extension, +) -> Json { + let sql_handler = &state.sql_handler; + let start = Instant::now(); + let resp = match super::query_context_from_db(sql_handler.clone(), None) { + Ok(query_ctx) => { + JsonResponse::from_output(sql_handler.do_promql_query(¶ms.query, query_ctx).await) + .await + } + Err(resp) => resp, + }; + + Json(resp.with_execution_time(start.elapsed().as_millis())) +} + pub(crate) fn sql_docs(op: TransformOperation) -> TransformOperation { op.response::<200, Json>() } diff --git a/src/servers/src/query_handler/sql.rs b/src/servers/src/query_handler/sql.rs index 0b82ae134f7d..d394e84d645f 100644 --- a/src/servers/src/query_handler/sql.rs +++ b/src/servers/src/query_handler/sql.rs @@ -35,6 +35,12 @@ pub trait SqlQueryHandler { query_ctx: QueryContextRef, ) -> Vec>; + async fn do_promql_query( + &self, + query: &str, + query_ctx: QueryContextRef, + ) -> Vec>; + async fn do_statement_query( &self, stmt: Statement, @@ -75,6 +81,22 @@ where .collect() } + async fn do_promql_query( + &self, + query: &str, + query_ctx: QueryContextRef, + ) -> Vec> { + self.0 + .do_promql_query(query, query_ctx) + .await + .into_iter() + .map(|x| { + x.map_err(BoxedError::new) + .context(error::ExecuteQuerySnafu { query }) + }) + .collect() + } + async fn do_statement_query( &self, stmt: Statement, diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 979b6b31eebe..cb158d5852e6 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -54,6 +54,14 @@ impl SqlQueryHandler for DummyInstance { unimplemented!() } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, _stmt: sql::statements::statement::Statement, diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 32954def9d3e..70f8c3e07046 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -52,6 +52,14 @@ impl SqlQueryHandler for DummyInstance { unimplemented!() } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, _stmt: sql::statements::statement::Statement, diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index a57b70136d9c..85e1fdc1e8bb 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -77,6 +77,14 @@ impl SqlQueryHandler for DummyInstance { unimplemented!() } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, _stmt: sql::statements::statement::Statement, diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 6e8e833640b0..15af6e9706eb 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -68,6 +68,14 @@ impl SqlQueryHandler for DummyInstance { vec![Ok(output)] } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, _stmt: sql::statements::statement::Statement, From 2fc0fd93405a97dcb4fe998ded01527811ade7df Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 3 Feb 2023 14:40:03 +0800 Subject: [PATCH 08/10] resolve CR comments Signed-off-by: Ruihang Xia --- src/frontend/src/instance.rs | 9 +++++++-- src/frontend/src/lib.rs | 1 - src/frontend/src/promql.rs | 3 --- src/frontend/src/server.rs | 2 +- src/servers/src/promql.rs | 35 ++++++++++++++++++----------------- 5 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 852f4534f451..6a6010d15326 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -58,7 +58,9 @@ use sql::statements::statement::Statement; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; -use crate::error::{self, Error, ExecutePromqlSnafu, MissingMetasrvOptsSnafu, Result}; +use crate::error::{ + self, Error, ExecutePromqlSnafu, MissingMetasrvOptsSnafu, NotSupportedSnafu, Result, +}; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; use crate::instance::standalone::{StandaloneGrpcQueryHandler, StandaloneSqlQueryHandler}; @@ -461,7 +463,10 @@ impl SqlQueryHandler for Instance { .context(ExecutePromqlSnafu { query }); vec![result] } else { - vec![] + vec![Err(NotSupportedSnafu { + feat: "PromQL Query", + } + .build())] } } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 58440cb10910..36fa94ec1fd0 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -13,7 +13,6 @@ // limitations under the License. #![feature(assert_matches)] -#![feature(let_chains)] pub type Plugins = anymap::Map; diff --git a/src/frontend/src/promql.rs b/src/frontend/src/promql.rs index 5f2024cd30c2..a2e18a492220 100644 --- a/src/frontend/src/promql.rs +++ b/src/frontend/src/promql.rs @@ -16,14 +16,12 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct PromqlOptions { - pub enable: bool, pub addr: String, } impl Default for PromqlOptions { fn default() -> Self { Self { - enable: true, addr: "127.0.0.1:4004".to_string(), } } @@ -36,7 +34,6 @@ mod tests { #[test] fn test_prometheus_options() { let default = PromqlOptions::default(); - assert!(default.enable); assert_eq!(default.addr, "127.0.0.1:4004".to_string()); } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 8302d53daaeb..daff47ed26fb 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -182,7 +182,7 @@ impl Services { None }; - let promql_server_and_addr = if let Some(promql_options) = &opts.promql_options && promql_options.enable { + let promql_server_and_addr = if let Some(promql_options) = &opts.promql_options { let promql_addr = parse_addr(&promql_options.addr)?; let mut promql_server = PromqlServer::create_server(instance.clone()); diff --git a/src/servers/src/promql.rs b/src/servers/src/promql.rs index 4042cb388990..6e0a3e3cf1b1 100644 --- a/src/servers/src/promql.rs +++ b/src/servers/src/promql.rs @@ -23,7 +23,7 @@ use axum::{routing, Json, Router}; use common_error::prelude::ErrorExt; use common_query::Output; use common_recordbatch::RecordBatches; -use common_telemetry::{debug, info}; +use common_telemetry::info; use futures::FutureExt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -71,15 +71,11 @@ impl PromqlServer { pub fn make_app(&self) -> Router { // TODO(ruihang): implement format_query, series, labels, values, query_examplars and targets methods let router = Router::new() - .route( - &format!("/{PROMQL_API_VERSION}/query"), - routing::post(instant_query).get(instant_query), - ) - .route( + .nest(&format!("/{PROMQL_API_VERSION}/query"), self.route_query()) + .nest( &format!("/{PROMQL_API_VERSION}/range_query"), - routing::post(range_query).get(range_query), - ) - .with_state(self.query_handler.clone()); + self.route_range_query(), + ); router // middlewares @@ -92,6 +88,18 @@ impl PromqlServer { )), ) } + + fn route_query(&self) -> Router { + Router::new() + .route("/", routing::post(instant_query).get(instant_query)) + .with_state(self.query_handler.clone()) + } + + fn route_range_query(&self) -> Router { + Router::new() + .route("/", routing::post(range_query).get(range_query)) + .with_state(self.query_handler.clone()) + } } #[async_trait] @@ -207,14 +215,7 @@ impl PromqlJsonResponse { } /// TODO(ruihang): implement this conversion method - fn record_batches_to_data(batches: RecordBatches) -> Result { - debug!("schema: {:?}", batches.schema()); - for batch in batches.iter() { - for row in batch.rows() { - debug!("row: {row:?}",); - } - } - + fn record_batches_to_data(_: RecordBatches) -> Result { let data = PromqlData { result_type: "matrix".to_string(), result: vec![PromqlSeries { From e7ebef8ef93df12c4101062928b7318d2218755f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 3 Feb 2023 15:39:35 +0800 Subject: [PATCH 09/10] adjust router structure Signed-off-by: Ruihang Xia --- src/servers/src/promql.rs | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/src/servers/src/promql.rs b/src/servers/src/promql.rs index 6e0a3e3cf1b1..341875d4a1a7 100644 --- a/src/servers/src/promql.rs +++ b/src/servers/src/promql.rs @@ -70,14 +70,14 @@ impl PromqlServer { pub fn make_app(&self) -> Router { // TODO(ruihang): implement format_query, series, labels, values, query_examplars and targets methods + let router = Router::new() - .nest(&format!("/{PROMQL_API_VERSION}/query"), self.route_query()) - .nest( - &format!("/{PROMQL_API_VERSION}/range_query"), - self.route_range_query(), - ); + .route("/query", routing::post(instant_query).get(instant_query)) + .route("/query_range", routing::post(range_query).get(range_query)) + .with_state(self.query_handler.clone()); - router + Router::new() + .nest(&format!("/{PROMQL_API_VERSION}"), router) // middlewares .layer( ServiceBuilder::new() @@ -88,18 +88,6 @@ impl PromqlServer { )), ) } - - fn route_query(&self) -> Router { - Router::new() - .route("/", routing::post(instant_query).get(instant_query)) - .with_state(self.query_handler.clone()) - } - - fn route_range_query(&self) -> Router { - Router::new() - .route("/", routing::post(range_query).get(range_query)) - .with_state(self.query_handler.clone()) - } } #[async_trait] From c67d85aecc3ed2439ae771a448114e2faed6fb03 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 3 Feb 2023 16:10:24 +0800 Subject: [PATCH 10/10] fix api path Signed-off-by: Ruihang Xia --- src/servers/src/promql.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/servers/src/promql.rs b/src/servers/src/promql.rs index 341875d4a1a7..45902d5c1887 100644 --- a/src/servers/src/promql.rs +++ b/src/servers/src/promql.rs @@ -73,7 +73,7 @@ impl PromqlServer { let router = Router::new() .route("/query", routing::post(instant_query).get(instant_query)) - .route("/query_range", routing::post(range_query).get(range_query)) + .route("/range_query", routing::post(range_query).get(range_query)) .with_state(self.query_handler.clone()); Router::new()