diff --git a/Cargo.lock b/Cargo.lock index d60af01a9f91..a827be32fd1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5070,8 +5070,12 @@ name = "promql" version = "0.1.0" dependencies = [ "common-error", + "datafusion", + "datatypes", + "futures", "promql-parser", "snafu", + "tokio", ] [[package]] diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 4952e36cc0fd..cd738448f0c9 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::sync::Arc; use arrow::datatypes::{Field, Schema as ArrowSchema}; +pub use column_schema::TIME_INDEX_KEY; use datafusion_common::DFSchemaRef; use snafu::{ensure, ResultExt}; @@ -30,7 +31,7 @@ pub use crate::schema::constraint::ColumnDefaultConstraint; pub use crate::schema::raw::RawSchema; /// Key used to store version number of the schema in metadata. -const VERSION_KEY: &str = "greptime:version"; +pub const VERSION_KEY: &str = "greptime:version"; /// A common schema, should be immutable. #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 94fbba87eccb..7fc76fdda95a 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -26,7 +26,7 @@ use crate::vectors::VectorRef; pub type Metadata = HashMap; /// Key used to store whether the column is time index in arrow field's metadata. -const TIME_INDEX_KEY: &str = "greptime:time_index"; +pub const TIME_INDEX_KEY: &str = "greptime:time_index"; /// Key used to store default constraint in arrow field's metadata. const DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint"; diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index 7bc49ea2138c..48b988f92357 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -6,5 +6,11 @@ license.workspace = true [dependencies] common-error = { path = "../common/error" } +datafusion.workspace = true +datatypes = { path = "../datatypes" } +futures = "0.3" promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "71d8a90" } snafu = { version = "0.7", features = ["backtraces"] } + +[dev-dependencies] +tokio = { version = "1.23", features = ["full"] } diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs new file mode 100644 index 000000000000..c314ce041cec --- /dev/null +++ b/src/promql/src/extension_plan.rs @@ -0,0 +1,17 @@ +// Copyright 2022 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod normalize; + +pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream}; diff --git a/src/promql/src/extension_plan/normalize.rs b/src/promql/src/extension_plan/normalize.rs new file mode 100644 index 000000000000..ae08dc62c4e7 --- /dev/null +++ b/src/promql/src/extension_plan/normalize.rs @@ -0,0 +1,355 @@ +// Copyright 2022 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use datafusion::arrow::compute; +use datafusion::common::{DFSchemaRef, Result as DataFusionResult, Statistics}; +use datafusion::execution::context::TaskContext; +use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; +use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, +}; +use datatypes::arrow::array::{ArrowPrimitiveType, TimestampMillisecondArray}; +use datatypes::arrow::datatypes::{SchemaRef, TimestampMillisecondType}; +use datatypes::arrow::error::Result as ArrowResult; +use datatypes::arrow::record_batch::RecordBatch; +use futures::{Stream, StreamExt}; + +type Millisecond = ::Native; + +/// Normalize the input record batch. Notice that for simplicity, this method assumes +/// the input batch only contains sample points from one time series. +/// +/// Roughly speaking, this method does these things: +/// - bias sample's timestamp by offset +/// - sort the record batch based on timestamp column +#[derive(Debug)] +pub struct SeriesNormalize { + offset: Millisecond, + time_index_column_name: String, + + input: LogicalPlan, +} + +impl UserDefinedLogicalNode for SeriesNormalize { + fn as_any(&self) -> &dyn Any { + self as _ + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &DFSchemaRef { + self.input.schema() + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "PromSeriesNormalize: offset=[{}]", self.offset) + } + + fn from_template( + &self, + _exprs: &[datafusion::logical_expr::Expr], + inputs: &[LogicalPlan], + ) -> Arc { + assert!(!inputs.is_empty()); + + Arc::new(Self { + offset: self.offset, + time_index_column_name: self.time_index_column_name.clone(), + input: inputs[0].clone(), + }) + } +} + +impl SeriesNormalize { + pub fn new>( + offset: Duration, + time_index_column_name: N, + input: LogicalPlan, + ) -> Self { + Self { + offset: offset.as_millis() as i64, + time_index_column_name: time_index_column_name.as_ref().to_string(), + input, + } + } + + pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { + Arc::new(SeriesNormalizeExec { + offset: self.offset, + time_index_column_name: self.time_index_column_name.clone(), + input: exec_input, + metric: ExecutionPlanMetricsSet::new(), + }) + } +} + +#[derive(Debug)] +pub struct SeriesNormalizeExec { + offset: Millisecond, + time_index_column_name: String, + + input: Arc, + metric: ExecutionPlanMetricsSet, +} + +impl ExecutionPlan for SeriesNormalizeExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn output_partitioning(&self) -> Partitioning { + self.input.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + + fn maintains_input_order(&self) -> bool { + false + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DataFusionResult> { + assert!(!children.is_empty()); + Ok(Arc::new(Self { + offset: self.offset, + time_index_column_name: self.time_index_column_name.clone(), + input: children[0].clone(), + metric: self.metric.clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DataFusionResult { + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + + let input = self.input.execute(partition, context)?; + let schema = input.schema(); + let time_index = schema + .column_with_name(&self.time_index_column_name) + .expect("time index column not found") + .0; + Ok(Box::pin(SeriesNormalizeStream { + offset: self.offset, + time_index, + schema, + input, + metric: baseline_metric, + })) + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "PromSeriesNormalizeExec: offset=[{}]", self.offset) + } + } + } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } + + fn statistics(&self) -> Statistics { + self.input.statistics() + } +} + +pub struct SeriesNormalizeStream { + offset: Millisecond, + time_index: usize, + + schema: SchemaRef, + input: SendableRecordBatchStream, + metric: BaselineMetrics, +} + +impl SeriesNormalizeStream { + pub fn normalize(&self, input: RecordBatch) -> ArrowResult { + // TODO(ruihang): maybe the input is not timestamp millisecond array + let ts_column = input + .column(self.time_index) + .as_any() + .downcast_ref::() + .unwrap(); + + // bias the timestamp column by offset + let ts_column_biased = if self.offset == 0 { + ts_column.clone() + } else { + TimestampMillisecondArray::from_iter( + ts_column.iter().map(|ts| ts.map(|ts| ts - self.offset)), + ) + }; + let mut columns = input.columns().to_vec(); + columns[self.time_index] = Arc::new(ts_column_biased); + + // sort the record batch + let ordered_indices = compute::sort_to_indices(&columns[self.time_index], None, None)?; + let ordered_columns = columns + .iter() + .map(|array| compute::take(array, &ordered_indices, None)) + .collect::>>()?; + RecordBatch::try_new(input.schema(), ordered_columns) + } +} + +impl RecordBatchStream for SeriesNormalizeStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for SeriesNormalizeStream { + type Item = ArrowResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let poll = match self.input.poll_next_unpin(cx) { + Poll::Ready(batch) => { + let _timer = self.metric.elapsed_compute().timer(); + Poll::Ready(batch.map(|batch| batch.and_then(|batch| self.normalize(batch)))) + } + Poll::Pending => Poll::Pending, + }; + self.metric.record_poll(poll) + } +} + +#[cfg(test)] +mod test { + use datafusion::arrow::array::Float64Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::from_slice::FromSlice; + use datafusion::physical_plan::memory::MemoryExec; + use datafusion::prelude::SessionContext; + use datatypes::arrow::array::TimestampMillisecondArray; + use datatypes::arrow_array::StringArray; + + use super::*; + + const TIME_INDEX_COLUMN: &str = "timestamp"; + + fn prepare_test_data() -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("value", DataType::Float64, true), + Field::new("path", DataType::Utf8, true), + ])); + let timestamp_column = Arc::new(TimestampMillisecondArray::from_slice([ + 60_000, 120_000, 0, 30_000, 90_000, + ])) as _; + let value_column = Arc::new(Float64Array::from_slice([0.0, 1.0, 10.0, 100.0, 1000.0])) as _; + let path_column = + Arc::new(StringArray::from_slice(["foo", "foo", "foo", "foo", "foo"])) as _; + let data = RecordBatch::try_new( + schema.clone(), + vec![timestamp_column, value_column, path_column], + ) + .unwrap(); + + MemoryExec::try_new(&[vec![data]], schema, None).unwrap() + } + + #[tokio::test] + async fn test_sort_record_batch() { + let memory_exec = Arc::new(prepare_test_data()); + let normalize_exec = Arc::new(SeriesNormalizeExec { + offset: 0, + time_index_column_name: TIME_INDEX_COLUMN.to_string(), + input: memory_exec, + metric: ExecutionPlanMetricsSet::new(), + }); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + let expected = String::from( + "+---------------------+-------+------+\ + \n| timestamp | value | path |\ + \n+---------------------+-------+------+\ + \n| 1970-01-01T00:00:00 | 10 | foo |\ + \n| 1970-01-01T00:00:30 | 100 | foo |\ + \n| 1970-01-01T00:01:00 | 0 | foo |\ + \n| 1970-01-01T00:01:30 | 1000 | foo |\ + \n| 1970-01-01T00:02:00 | 1 | foo |\ + \n+---------------------+-------+------+", + ); + + assert_eq!(result_literal, expected); + } + + #[tokio::test] + async fn test_offset_record_batch() { + let memory_exec = Arc::new(prepare_test_data()); + let normalize_exec = Arc::new(SeriesNormalizeExec { + offset: 1_000, // offset 1s + time_index_column_name: TIME_INDEX_COLUMN.to_string(), + input: memory_exec, + metric: ExecutionPlanMetricsSet::new(), + }); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + let expected = String::from( + "+---------------------+-------+------+\ + \n| timestamp | value | path |\ + \n+---------------------+-------+------+\ + \n| 1969-12-31T23:59:59 | 10 | foo |\ + \n| 1970-01-01T00:00:29 | 100 | foo |\ + \n| 1970-01-01T00:00:59 | 0 | foo |\ + \n| 1970-01-01T00:01:29 | 1000 | foo |\ + \n| 1970-01-01T00:01:59 | 1 | foo |\ + \n+---------------------+-------+------+", + ); + + assert_eq!(result_literal, expected); + } +} diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index 11415d1838a2..552b4fd0ff60 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -14,3 +14,4 @@ pub mod engine; pub mod error; +pub mod extension_plan;