From 34c873c5172da4d8c377fd40d35ee2a83d4c0c66 Mon Sep 17 00:00:00 2001 From: kf zheng <100595273+kev1n8@users.noreply.github.com> Date: Sun, 6 Oct 2024 03:07:16 +0800 Subject: [PATCH 1/5] feat: json output format for http --- src/servers/src/http.rs | 32 +++++++ src/servers/src/http/handler.rs | 2 + src/servers/src/http/json_result.rs | 135 ++++++++++++++++++++++++++++ 3 files changed, 169 insertions(+) create mode 100644 src/servers/src/http/json_result.rs diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index a2b72b548b1e..953ff9e73ae7 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -59,6 +59,7 @@ use crate::http::error_result::ErrorResponse; use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; use crate::http::influxdb_result_v1::InfluxdbV1Response; +use crate::http::json_result::JsonResponse; use crate::http::prometheus::{ build_info_query, format_query, instant_query, label_values_query, labels_query, range_query, series_query, @@ -97,6 +98,7 @@ pub mod error_result; pub mod greptime_manage_resp; pub mod greptime_result_v1; pub mod influxdb_result_v1; +pub mod json_result; pub mod table_result; #[cfg(any(test, feature = "testing"))] @@ -279,6 +281,7 @@ pub enum ResponseFormat { #[default] GreptimedbV1, InfluxdbV1, + Json, } impl ResponseFormat { @@ -289,6 +292,7 @@ impl ResponseFormat { "table" => Some(ResponseFormat::Table), "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1), "influxdb_v1" => Some(ResponseFormat::InfluxdbV1), + "json" => Some(ResponseFormat::Json), _ => None, } } @@ -300,6 +304,7 @@ impl ResponseFormat { ResponseFormat::Table => "table", ResponseFormat::GreptimedbV1 => "greptimedb_v1", ResponseFormat::InfluxdbV1 => "influxdb_v1", + ResponseFormat::Json => "json", } } } @@ -356,6 +361,7 @@ pub enum HttpResponse { Error(ErrorResponse), GreptimedbV1(GreptimedbV1Response), InfluxdbV1(InfluxdbV1Response), + Json(JsonResponse), } impl HttpResponse { @@ -366,6 +372,7 @@ impl HttpResponse { HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(), + HttpResponse::Json(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(), } } @@ -375,6 +382,7 @@ impl HttpResponse { HttpResponse::Csv(resp) => resp.with_limit(limit).into(), HttpResponse::Table(resp) => resp.with_limit(limit).into(), HttpResponse::GreptimedbV1(resp) => resp.with_limit(limit).into(), + HttpResponse::Json(resp) => resp.with_limit(limit).into(), _ => self, } } @@ -407,6 +415,7 @@ impl IntoResponse for HttpResponse { HttpResponse::Table(resp) => resp.into_response(), HttpResponse::GreptimedbV1(resp) => resp.into_response(), HttpResponse::InfluxdbV1(resp) => resp.into_response(), + HttpResponse::Json(resp) => resp.into_response(), HttpResponse::Error(resp) => resp.into_response(), } } @@ -452,6 +461,12 @@ impl From for HttpResponse { } } +impl From for HttpResponse { + fn from(value: JsonResponse) -> Self { + HttpResponse::Json(value) + } +} + async fn serve_api(Extension(api): Extension) -> impl IntoApiResponse { Json(api) } @@ -1131,6 +1146,7 @@ mod test { ResponseFormat::Csv, ResponseFormat::Table, ResponseFormat::Arrow, + ResponseFormat::Json, ] { let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap(); @@ -1141,6 +1157,7 @@ mod test { ResponseFormat::Table => TableResponse::from_output(outputs).await, ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await, + ResponseFormat::Json => JsonResponse::from_output(outputs).await, }; match json_resp { @@ -1210,6 +1227,21 @@ mod test { assert_eq!(rb.num_columns(), 2); assert_eq!(rb.num_rows(), 4); } + + HttpResponse::Json(resp) => { + let output = &resp.output()[0]; + if let GreptimeQueryOutput::Records(r) = output { + assert_eq!(r.num_rows(), 4); + assert_eq!(r.num_cols(), 2); + assert_eq!(r.schema.column_schemas[0].name, "numbers"); + assert_eq!(r.schema.column_schemas[0].data_type, "UInt32"); + assert_eq!(r.rows[0][0], serde_json::Value::from(1)); + assert_eq!(r.rows[0][1], serde_json::Value::Null); + } else { + panic!("invalid output type"); + } + } + HttpResponse::Error(err) => unreachable!("{err:?}"), } } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 1befc2224014..4925c79639ce 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -39,6 +39,7 @@ use crate::http::csv_result::CsvResponse; use crate::http::error_result::ErrorResponse; use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::influxdb_result_v1::InfluxdbV1Response; +use crate::http::json_result::JsonResponse; use crate::http::table_result::TableResponse; use crate::http::{ ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput, @@ -138,6 +139,7 @@ pub async fn sql( ResponseFormat::Table => TableResponse::from_output(outputs).await, ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, + ResponseFormat::Json => JsonResponse::from_output(outputs).await, }; if let Some(limit) = query_params.limit { diff --git a/src/servers/src/http/json_result.rs b/src/servers/src/http/json_result.rs new file mode 100644 index 000000000000..30c47aff4336 --- /dev/null +++ b/src/servers/src/http/json_result.rs @@ -0,0 +1,135 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use axum::http::{header, HeaderValue}; +use axum::response::{IntoResponse, Response}; +use common_error::status_code::StatusCode; +use common_query::Output; +use mime_guess::mime; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Map}; + +use super::process_with_limit; +use crate::http::error_result::ErrorResponse; +use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; +use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; + +/// The json format here is different from the default json output of greptime_result. +/// `JsonResponse` is intended to make it easier for user to consume data. +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +pub struct JsonResponse { + output: Vec, + execution_time_ms: u64, +} + +impl JsonResponse { + pub async fn from_output(outputs: Vec>) -> HttpResponse { + match handler::from_output(outputs).await { + Err(err) => HttpResponse::Error(err), + Ok((output, _)) => { + if output.len() > 1 { + HttpResponse::Error(ErrorResponse::from_error_message( + StatusCode::InvalidArguments, + "cannot output multi-statements result in json format".to_string(), + )) + } else { + HttpResponse::Json(JsonResponse { + output, + execution_time_ms: 0, + }) + } + } + } + } + + pub fn output(&self) -> &[GreptimeQueryOutput] { + &self.output + } + + pub fn with_execution_time(mut self, execution_time: u64) -> Self { + self.execution_time_ms = execution_time; + self + } + + pub fn execution_time_ms(&self) -> u64 { + self.execution_time_ms + } + + pub fn with_limit(mut self, limit: usize) -> Self { + self.output = process_with_limit(self.output, limit); + self + } +} + +impl IntoResponse for JsonResponse { + fn into_response(mut self) -> Response { + debug_assert!( + self.output.len() <= 1, + "self.output has extra elements: {}", + self.output.len() + ); + + let execution_time = self.execution_time_ms; + let payload = match self.output.pop() { + None => String::default(), + Some(GreptimeQueryOutput::AffectedRows(n)) => json!({ + "data": [ + { + "affectedrows": n + }, + ], + "execution_time_ms": execution_time, + }) + .to_string(), + + Some(GreptimeQueryOutput::Records(records)) => { + let mut data = Vec::new(); + let schema = records.schema(); + + for row in records.rows.iter() { + let mut row_map = Map::new(); + for (i, col) in schema.column_schemas.iter().enumerate() { + row_map.insert(col.name.clone(), row[i].clone()); + } + data.push(row_map); + } + + json!({ + "data": data, + "execution_time_ms": execution_time, + }) + .to_string() + } + }; + + let mut resp = ( + [( + header::CONTENT_TYPE, + HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()), + )], + payload, + ) + .into_response(); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_FORMAT, + HeaderValue::from_static(ResponseFormat::Json.as_str()), + ); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_EXECUTION_TIME, + HeaderValue::from(execution_time), + ); + resp + } +} From a5ddc5e32029109aa5689c7041390ecbd308161c Mon Sep 17 00:00:00 2001 From: kf zheng <100595273+kev1n8@users.noreply.github.com> Date: Sun, 6 Oct 2024 14:25:50 +0800 Subject: [PATCH 2/5] feat: add json result test case --- tests-integration/tests/http.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 20a0f4edd2ae..7551fd83cc06 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -181,6 +181,22 @@ pub async fn test_sql_api(store_type: StorageType) { })).unwrap() ); + // test json result format + let res = client + .get("/v1/sql?format=json&sql=select * from numbers limit 10") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + let body = res.json::().await; + let data = body.get("data").expect("Missing 'data' field in response"); + + let expected = json!([ + {"number": 0}, {"number": 1}, {"number": 2}, {"number": 3}, {"number": 4}, + {"number": 5}, {"number": 6}, {"number": 7}, {"number": 8}, {"number": 9} + ]); + assert_eq!(data, &expected); + // test insert and select let res = client .get("/v1/sql?sql=insert into demo values('host', 66.6, 1024, 0)") @@ -1236,7 +1252,7 @@ transform: .send() .await; assert_eq!(res.status(), StatusCode::OK); - let body: serde_json::Value = res.json().await; + let body: Value = res.json().await; let schema = &body["schema"]; let rows = &body["rows"]; assert_eq!( From c33c20e5b0d3b9a14dc94234c040335254a40c7a Mon Sep 17 00:00:00 2001 From: kf zheng <100595273+kev1n8@users.noreply.github.com> Date: Tue, 8 Oct 2024 21:24:14 +0800 Subject: [PATCH 3/5] fix: typo and refactor a piece of code --- src/servers/src/http/json_result.rs | 52 ++++++++++++++++------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/src/servers/src/http/json_result.rs b/src/servers/src/http/json_result.rs index 30c47aff4336..ca2d47064614 100644 --- a/src/servers/src/http/json_result.rs +++ b/src/servers/src/http/json_result.rs @@ -19,14 +19,14 @@ use common_query::Output; use mime_guess::mime; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use serde_json::{json, Map}; +use serde_json::{json, Map, Value}; use super::process_with_limit; use crate::http::error_result::ErrorResponse; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; -/// The json format here is different from the default json output of greptime_result. +/// The json format here is different from the default json output of `GreptimedbV1` result. /// `JsonResponse` is intended to make it easier for user to consume data. #[derive(Serialize, Deserialize, Debug, JsonSchema)] pub struct JsonResponse { @@ -87,7 +87,7 @@ impl IntoResponse for JsonResponse { Some(GreptimeQueryOutput::AffectedRows(n)) => json!({ "data": [ { - "affectedrows": n + "affected_rows": n }, ], "execution_time_ms": execution_time, @@ -95,16 +95,20 @@ impl IntoResponse for JsonResponse { .to_string(), Some(GreptimeQueryOutput::Records(records)) => { - let mut data = Vec::new(); let schema = records.schema(); - for row in records.rows.iter() { - let mut row_map = Map::new(); - for (i, col) in schema.column_schemas.iter().enumerate() { - row_map.insert(col.name.clone(), row[i].clone()); - } - data.push(row_map); - } + let data: Vec> = records + .rows + .iter() + .map(|row| { + schema + .column_schemas + .iter() + .enumerate() + .map(|(i, col)| (col.name.clone(), row[i].clone())) + .collect::>() + }) + .collect(); json!({ "data": data, @@ -115,21 +119,23 @@ impl IntoResponse for JsonResponse { }; let mut resp = ( - [( - header::CONTENT_TYPE, - HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()), - )], + [ + ( + header::CONTENT_TYPE, + HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()), + ), + ( + GREPTIME_DB_HEADER_FORMAT.clone(), + HeaderValue::from_static(ResponseFormat::Json.as_str()), + ), + ( + GREPTIME_DB_HEADER_EXECUTION_TIME.clone(), + HeaderValue::from(execution_time), + ), + ], payload, ) .into_response(); - resp.headers_mut().insert( - &GREPTIME_DB_HEADER_FORMAT, - HeaderValue::from_static(ResponseFormat::Json.as_str()), - ); - resp.headers_mut().insert( - &GREPTIME_DB_HEADER_EXECUTION_TIME, - HeaderValue::from(execution_time), - ); resp } } From d2a0ad551ce13a616de8bea17236bebc67182a25 Mon Sep 17 00:00:00 2001 From: kf zheng <100595273+kev1n8@users.noreply.github.com> Date: Tue, 8 Oct 2024 21:36:47 +0800 Subject: [PATCH 4/5] fix: cargo check --- src/servers/src/http/json_result.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/servers/src/http/json_result.rs b/src/servers/src/http/json_result.rs index ca2d47064614..c13ad37bce4c 100644 --- a/src/servers/src/http/json_result.rs +++ b/src/servers/src/http/json_result.rs @@ -118,7 +118,7 @@ impl IntoResponse for JsonResponse { } }; - let mut resp = ( + ( [ ( header::CONTENT_TYPE, @@ -135,7 +135,6 @@ impl IntoResponse for JsonResponse { ], payload, ) - .into_response(); - resp + .into_response() } } From 95842da166a86baf4b76c400329ce544e91012a5 Mon Sep 17 00:00:00 2001 From: kf zheng <100595273+kev1n8@users.noreply.github.com> Date: Wed, 9 Oct 2024 09:12:28 +0800 Subject: [PATCH 5/5] move affected_rows to top level --- src/servers/src/http/json_result.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/servers/src/http/json_result.rs b/src/servers/src/http/json_result.rs index c13ad37bce4c..bf4e4d77704c 100644 --- a/src/servers/src/http/json_result.rs +++ b/src/servers/src/http/json_result.rs @@ -85,11 +85,8 @@ impl IntoResponse for JsonResponse { let payload = match self.output.pop() { None => String::default(), Some(GreptimeQueryOutput::AffectedRows(n)) => json!({ - "data": [ - { - "affected_rows": n - }, - ], + "data": [], + "affected_rows": n, "execution_time_ms": execution_time, }) .to_string(),