Skip to content

Commit

Permalink
feat: Prom SeriesNormalize plan (#787)
Browse files Browse the repository at this point in the history
* feat: impl SeriesNormalize plan

Signed-off-by: Ruihang Xia <[email protected]>

* some tests

Signed-off-by: Ruihang Xia <[email protected]>

* feat: add metrics

Signed-off-by: Ruihang Xia <[email protected]>

* add license header

Signed-off-by: Ruihang Xia <[email protected]>

* resolve CR comments

Signed-off-by: Ruihang Xia <[email protected]>

* update tests

Signed-off-by: Ruihang Xia <[email protected]>

* make time index column a parameter

Signed-off-by: Ruihang Xia <[email protected]>

* precompute time index column index

Signed-off-by: Ruihang Xia <[email protected]>

* sign the TODO

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>
waynexia authored Dec 27, 2022
1 parent a14ec94 commit 9099058
Showing 7 changed files with 386 additions and 2 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use arrow::datatypes::{Field, Schema as ArrowSchema};
pub use column_schema::TIME_INDEX_KEY;
use datafusion_common::DFSchemaRef;
use snafu::{ensure, ResultExt};

@@ -30,7 +31,7 @@ pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;

/// Key used to store version number of the schema in metadata.
const VERSION_KEY: &str = "greptime:version";
pub const VERSION_KEY: &str = "greptime:version";

/// A common schema, should be immutable.
#[derive(Debug, Clone, PartialEq, Eq)]
2 changes: 1 addition & 1 deletion src/datatypes/src/schema/column_schema.rs
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ use crate::vectors::VectorRef;
pub type Metadata = HashMap<String, String>;

/// Key used to store whether the column is time index in arrow field's metadata.
const TIME_INDEX_KEY: &str = "greptime:time_index";
pub const TIME_INDEX_KEY: &str = "greptime:time_index";
/// Key used to store default constraint in arrow field's metadata.
const DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";

6 changes: 6 additions & 0 deletions src/promql/Cargo.toml
Original file line number Diff line number Diff line change
@@ -6,5 +6,11 @@ license.workspace = true

[dependencies]
common-error = { path = "../common/error" }
datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "71d8a90" }
snafu = { version = "0.7", features = ["backtraces"] }

[dev-dependencies]
tokio = { version = "1.23", features = ["full"] }
17 changes: 17 additions & 0 deletions src/promql/src/extension_plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod normalize;

pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream};
355 changes: 355 additions & 0 deletions src/promql/src/extension_plan/normalize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,355 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use datafusion::arrow::compute;
use datafusion::common::{DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
};
use datatypes::arrow::array::{ArrowPrimitiveType, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{SchemaRef, TimestampMillisecondType};
use datatypes::arrow::error::Result as ArrowResult;
use datatypes::arrow::record_batch::RecordBatch;
use futures::{Stream, StreamExt};

type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;

/// Normalize the input record batch. Notice that for simplicity, this method assumes
/// the input batch only contains sample points from one time series.
///
/// Roughly speaking, this method does these things:
/// - bias sample's timestamp by offset
/// - sort the record batch based on timestamp column
#[derive(Debug)]
pub struct SeriesNormalize {
offset: Millisecond,
time_index_column_name: String,

input: LogicalPlan,
}

impl UserDefinedLogicalNode for SeriesNormalize {
fn as_any(&self) -> &dyn Any {
self as _
}

fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
}

fn schema(&self) -> &DFSchemaRef {
self.input.schema()
}

fn expressions(&self) -> Vec<datafusion::logical_expr::Expr> {
vec![]
}

fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "PromSeriesNormalize: offset=[{}]", self.offset)
}

fn from_template(
&self,
_exprs: &[datafusion::logical_expr::Expr],
inputs: &[LogicalPlan],
) -> Arc<dyn UserDefinedLogicalNode> {
assert!(!inputs.is_empty());

Arc::new(Self {
offset: self.offset,
time_index_column_name: self.time_index_column_name.clone(),
input: inputs[0].clone(),
})
}
}

impl SeriesNormalize {
pub fn new<N: AsRef<str>>(
offset: Duration,
time_index_column_name: N,
input: LogicalPlan,
) -> Self {
Self {
offset: offset.as_millis() as i64,
time_index_column_name: time_index_column_name.as_ref().to_string(),
input,
}
}

pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(SeriesNormalizeExec {
offset: self.offset,
time_index_column_name: self.time_index_column_name.clone(),
input: exec_input,
metric: ExecutionPlanMetricsSet::new(),
})
}
}

#[derive(Debug)]
pub struct SeriesNormalizeExec {
offset: Millisecond,
time_index_column_name: String,

input: Arc<dyn ExecutionPlan>,
metric: ExecutionPlanMetricsSet,
}

impl ExecutionPlan for SeriesNormalizeExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.input.schema()
}

fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.input.output_ordering()
}

fn maintains_input_order(&self) -> bool {
false
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
assert!(!children.is_empty());
Ok(Arc::new(Self {
offset: self.offset,
time_index_column_name: self.time_index_column_name.clone(),
input: children[0].clone(),
metric: self.metric.clone(),
}))
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);

let input = self.input.execute(partition, context)?;
let schema = input.schema();
let time_index = schema
.column_with_name(&self.time_index_column_name)
.expect("time index column not found")
.0;
Ok(Box::pin(SeriesNormalizeStream {
offset: self.offset,
time_index,
schema,
input,
metric: baseline_metric,
}))
}

fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "PromSeriesNormalizeExec: offset=[{}]", self.offset)
}
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}

fn statistics(&self) -> Statistics {
self.input.statistics()
}
}

pub struct SeriesNormalizeStream {
offset: Millisecond,
time_index: usize,

schema: SchemaRef,
input: SendableRecordBatchStream,
metric: BaselineMetrics,
}

impl SeriesNormalizeStream {
pub fn normalize(&self, input: RecordBatch) -> ArrowResult<RecordBatch> {
// TODO(ruihang): maybe the input is not timestamp millisecond array
let ts_column = input
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();

// bias the timestamp column by offset
let ts_column_biased = if self.offset == 0 {
ts_column.clone()
} else {
TimestampMillisecondArray::from_iter(
ts_column.iter().map(|ts| ts.map(|ts| ts - self.offset)),
)
};
let mut columns = input.columns().to_vec();
columns[self.time_index] = Arc::new(ts_column_biased);

// sort the record batch
let ordered_indices = compute::sort_to_indices(&columns[self.time_index], None, None)?;
let ordered_columns = columns
.iter()
.map(|array| compute::take(array, &ordered_indices, None))
.collect::<ArrowResult<Vec<_>>>()?;
RecordBatch::try_new(input.schema(), ordered_columns)
}
}

impl RecordBatchStream for SeriesNormalizeStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

impl Stream for SeriesNormalizeStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll = match self.input.poll_next_unpin(cx) {
Poll::Ready(batch) => {
let _timer = self.metric.elapsed_compute().timer();
Poll::Ready(batch.map(|batch| batch.and_then(|batch| self.normalize(batch))))
}
Poll::Pending => Poll::Pending,
};
self.metric.record_poll(poll)
}
}

#[cfg(test)]
mod test {
use datafusion::arrow::array::Float64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::from_slice::FromSlice;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow_array::StringArray;

use super::*;

const TIME_INDEX_COLUMN: &str = "timestamp";

fn prepare_test_data() -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value", DataType::Float64, true),
Field::new("path", DataType::Utf8, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from_slice([
60_000, 120_000, 0, 30_000, 90_000,
])) as _;
let value_column = Arc::new(Float64Array::from_slice([0.0, 1.0, 10.0, 100.0, 1000.0])) as _;
let path_column =
Arc::new(StringArray::from_slice(["foo", "foo", "foo", "foo", "foo"])) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![timestamp_column, value_column, path_column],
)
.unwrap();

MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
}

#[tokio::test]
async fn test_sort_record_batch() {
let memory_exec = Arc::new(prepare_test_data());
let normalize_exec = Arc::new(SeriesNormalizeExec {
offset: 0,
time_index_column_name: TIME_INDEX_COLUMN.to_string(),
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
.await
.unwrap();
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();

let expected = String::from(
"+---------------------+-------+------+\
\n| timestamp | value | path |\
\n+---------------------+-------+------+\
\n| 1970-01-01T00:00:00 | 10 | foo |\
\n| 1970-01-01T00:00:30 | 100 | foo |\
\n| 1970-01-01T00:01:00 | 0 | foo |\
\n| 1970-01-01T00:01:30 | 1000 | foo |\
\n| 1970-01-01T00:02:00 | 1 | foo |\
\n+---------------------+-------+------+",
);

assert_eq!(result_literal, expected);
}

#[tokio::test]
async fn test_offset_record_batch() {
let memory_exec = Arc::new(prepare_test_data());
let normalize_exec = Arc::new(SeriesNormalizeExec {
offset: 1_000, // offset 1s
time_index_column_name: TIME_INDEX_COLUMN.to_string(),
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
.await
.unwrap();
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();

let expected = String::from(
"+---------------------+-------+------+\
\n| timestamp | value | path |\
\n+---------------------+-------+------+\
\n| 1969-12-31T23:59:59 | 10 | foo |\
\n| 1970-01-01T00:00:29 | 100 | foo |\
\n| 1970-01-01T00:00:59 | 0 | foo |\
\n| 1970-01-01T00:01:29 | 1000 | foo |\
\n| 1970-01-01T00:01:59 | 1 | foo |\
\n+---------------------+-------+------+",
);

assert_eq!(result_literal, expected);
}
}
1 change: 1 addition & 0 deletions src/promql/src/lib.rs
Original file line number Diff line number Diff line change
@@ -14,3 +14,4 @@

pub mod engine;
pub mod error;
pub mod extension_plan;

0 comments on commit 9099058

Please sign in to comment.