Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: export promql service in server #924

Merged
merged 12 commits into from
Feb 3, 2023
4 changes: 4 additions & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::prometheus::PrometheusOptions;
use frontend::promql::PromqlOptions;
use frontend::Plugins;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
Expand Down Expand Up @@ -72,6 +73,7 @@ pub struct StandaloneOptions {
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub promql_options: Option<PromqlOptions>,
pub mode: Mode,
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
Expand All @@ -88,6 +90,7 @@ impl Default for StandaloneOptions {
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
promql_options: Some(PromqlOptions::default()),
mode: Mode::Standalone,
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
Expand All @@ -106,6 +109,7 @@ impl StandaloneOptions {
opentsdb_options: self.opentsdb_options,
influxdb_options: self.influxdb_options,
prometheus_options: self.prometheus_options,
promql_options: self.promql_options,
mode: self.mode,
meta_client_opts: None,
}
Expand Down
16 changes: 15 additions & 1 deletion src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

use async_trait::async_trait;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging::info;
use common_telemetry::timer;
use query::parser::{QueryLanguageParser, QueryStatement};
use servers::error as server_error;
use servers::promql::PromqlHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextRef};
use snafu::prelude::*;
use sql::ast::ObjectName;
use sql::statements::statement::Statement;
Expand Down Expand Up @@ -225,6 +228,17 @@ impl SqlQueryHandler for Instance {
}
}

#[async_trait]
impl PromqlHandler for Instance {
async fn do_query(&self, query: &str) -> server_error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);
self.execute_promql(query, QueryContext::arc())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu { query })
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
pub const METRIC_HANDLE_SQL_ELAPSED: &str = "datanode.handle_sql_elapsed";
pub const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "datanode.handle_scripts_elapsed";
pub const METRIC_RUN_SCRIPT_ELAPSED: &str = "datanode.run_script_elapsed";
pub const METRIC_HANDLE_PROMQL_ELAPSED: &str = "datanode.handle_promql_elapsed";
3 changes: 3 additions & 0 deletions src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::mysql::MysqlOptions;
use crate::opentsdb::OpentsdbOptions;
use crate::postgres::PostgresOptions;
use crate::prometheus::PrometheusOptions;
use crate::promql::PromqlOptions;
use crate::server::Services;
use crate::Plugins;

Expand All @@ -41,6 +42,7 @@ pub struct FrontendOptions {
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub promql_options: Option<PromqlOptions>,
pub mode: Mode,
pub meta_client_opts: Option<MetaClientOpts>,
}
Expand All @@ -55,6 +57,7 @@ impl Default for FrontendOptions {
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
promql_options: Some(PromqlOptions::default()),
mode: Mode::Standalone,
meta_client_opts: None,
}
Expand Down
20 changes: 20 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use servers::error as server_error;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::promql::{PromqlHandler, PromqlHandlerRef};
use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use servers::query_handler::sql::{SqlQueryHandler, SqlQueryHandlerRef};
use servers::query_handler::{
Expand Down Expand Up @@ -71,6 +72,7 @@ pub trait FrontendInstance:
+ InfluxdbLineProtocolHandler
+ PrometheusProtocolHandler
+ ScriptHandler
+ PromqlHandler
+ Send
+ Sync
+ 'static
Expand All @@ -88,6 +90,7 @@ pub struct Instance {
script_handler: Option<ScriptHandlerRef>,
sql_handler: SqlQueryHandlerRef<Error>,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
promql_handler: Option<PromqlHandlerRef>,

create_expr_factory: CreateExprFactoryRef,

Expand Down Expand Up @@ -121,6 +124,7 @@ impl Instance {
create_expr_factory: Arc::new(DefaultCreateExprFactory),
sql_handler: dist_instance.clone(),
grpc_query_handler: dist_instance,
promql_handler: None,
plugins: Default::default(),
})
}
Expand Down Expand Up @@ -162,6 +166,7 @@ impl Instance {
create_expr_factory: Arc::new(DefaultCreateExprFactory),
sql_handler: StandaloneSqlQueryHandler::arc(dn_instance.clone()),
grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()),
promql_handler: Some(dn_instance.clone()),
plugins: Default::default(),
}
}
Expand All @@ -174,6 +179,7 @@ impl Instance {
create_expr_factory: Arc::new(DefaultCreateExprFactory),
sql_handler: dist_instance.clone(),
grpc_query_handler: dist_instance,
promql_handler: None,
plugins: Default::default(),
}
}
Expand Down Expand Up @@ -506,6 +512,20 @@ impl ScriptHandler for Instance {
}
}

#[async_trait]
impl PromqlHandler for Instance {
async fn do_query(&self, query: &str) -> server_error::Result<Output> {
if let Some(promql_handler) = &self.promql_handler {
promql_handler.do_query(query).await
} else {
server_error::NotSupportedSnafu {
feat: "PromQL query in Frontend",
}
.fail()
}
}
}

#[cfg(test)]
mod tests {
use std::borrow::Cow;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod opentsdb;
pub mod partitioning;
pub mod postgres;
pub mod prometheus;
pub mod promql;
mod server;
pub mod spliter;
mod sql;
Expand Down
42 changes: 42 additions & 0 deletions src/frontend/src/promql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PromqlOptions {
pub enable: bool,
waynexia marked this conversation as resolved.
Show resolved Hide resolved
pub addr: String,
}

impl Default for PromqlOptions {
fn default() -> Self {
Self {
enable: true,
addr: "127.0.0.1:4004".to_string(),
}
}
}

#[cfg(test)]
mod tests {
use super::PromqlOptions;

#[test]
fn test_prometheus_options() {
let default = PromqlOptions::default();
assert!(default.enable);
assert_eq!(default.addr, "127.0.0.1:4004".to_string());
}
}
19 changes: 17 additions & 2 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use servers::http::HttpServer;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::promql::PromqlServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::server::Server;
Expand Down Expand Up @@ -154,7 +155,7 @@ impl Services {
ServerSqlQueryHandlerAdaptor::arc(instance.clone()),
http_options.clone(),
);
if let Some(user_provider) = user_provider {
if let Some(user_provider) = user_provider.clone() {
http_server.set_user_provider(user_provider);
}

Expand All @@ -181,12 +182,26 @@ impl Services {
None
};

let promql_server_and_addr = if let Some(promql_options) = &opts.promql_options {
let promql_addr = parse_addr(&promql_options.addr)?;
waynexia marked this conversation as resolved.
Show resolved Hide resolved

let mut promql_server = PromqlServer::create_server(instance.clone());
if let Some(user_provider) = user_provider {
promql_server.set_user_provider(user_provider);
}

Some((promql_server as _, promql_addr))
} else {
None
};

try_join!(
start_server(http_server_and_addr),
start_server(grpc_server_and_addr),
start_server(mysql_server_and_addr),
start_server(postgres_server_and_addr),
start_server(opentsdb_server_and_addr)
start_server(opentsdb_server_and_addr),
start_server(promql_server_and_addr),
)
.context(error::StartServerSnafu)?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl Server for GrpcServer {
.await
.context(TcpBindSnafu { addr })?;
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
info!("GRPC server is bound to {}", addr);
info!("gRPC server is bound to {}", addr);

*shutdown_tx = Some(tx);

Expand Down
4 changes: 2 additions & 2 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,8 @@ impl HttpServer {
pub fn make_app(&self) -> Router {
let mut api = OpenApi {
info: Info {
title: "Greptime DB HTTP API".to_string(),
description: Some("HTTP APIs to interact with Greptime DB".to_string()),
title: "GreptimeDB HTTP API".to_string(),
description: Some("HTTP APIs to interact with GreptimeDB".to_string()),
version: HTTP_API_VERSION.to_string(),
..Info::default()
},
Expand Down
4 changes: 2 additions & 2 deletions src/servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prometheus;
pub mod promql;
pub mod query_handler;
pub mod server;
pub mod tls;

mod shutdown;
pub mod tls;

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
Expand Down
Loading