diff --git a/Cargo.lock b/Cargo.lock index 2730af59070d..048910db514d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7032,6 +7032,7 @@ dependencies = [ "common-catalog", "common-error", "common-macro", + "common-recordbatch", "common-telemetry", "datafusion", "datatypes", diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index 990197a34c4b..a1fe7f4510b7 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -12,6 +12,7 @@ catalog = { workspace = true } common-catalog = { workspace = true } common-error = { workspace = true } common-macro = { workspace = true } +common-recordbatch = { workspace = true } common-telemetry = { workspace = true } datafusion.workspace = true datatypes = { workspace = true } diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index 1fb6c6b1ee3e..2bc1abaf3648 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -13,6 +13,7 @@ // limitations under the License. mod empty_metric; +mod histogram_fold; mod instant_manipulate; mod normalize; mod planner; diff --git a/src/promql/src/extension_plan/histogram_fold.rs b/src/promql/src/extension_plan/histogram_fold.rs new file mode 100644 index 000000000000..9d6d5340f33f --- /dev/null +++ b/src/promql/src/extension_plan/histogram_fold.rs @@ -0,0 +1,798 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::task::Poll; +use std::time::Instant; + +use common_recordbatch::RecordBatch as GtRecordBatch; +use common_telemetry::warn; +use datafusion::arrow::array::AsArray; +use datafusion::arrow::compute::{self, concat_batches, SortOptions}; +use datafusion::arrow::datatypes::{DataType, Field, Float64Type, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::{DFField, DFSchema, DFSchemaRef}; +use datafusion::error::{DataFusionError, Result as DataFusionResult}; +use datafusion::execution::TaskContext; +use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; +use datafusion::physical_plan::expressions::Column as PhyColumn; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr, + RecordBatchStream, SendableRecordBatchStream, Statistics, +}; +use datafusion::prelude::{Column, Expr}; +use datatypes::prelude::{ConcreteDataType, DataType as GtDataType}; +use datatypes::schema::Schema as GtSchema; +use datatypes::value::{ListValue, Value}; +use datatypes::vectors::MutableVector; +use futures::{ready, Stream, StreamExt}; + +/// `HistogramFold` will fold the conventional (non-native) histogram ([1]) for later +/// computing. Specifically, it will transform the `le` and `field` column into a complex +/// type, and samples on other tag columns: +/// - `le` will become a [ListArray] of [f64]. With each bucket bound parsed +/// - `field` will become a [ListArray] of [f64] +/// - other columns will be sampled every `bucket_num` element, but their types won't change. +/// +/// Due to the folding or sampling, the output rows number will become `input_rows` / `bucket_num`. +/// +/// # Requirement +/// - Input should be sorted on `, le ASC, ts`. +/// - The value set of `le` should be same. I.e., buckets of every series should be same. +/// +/// [1]: https://prometheus.io/docs/concepts/metric_types/#histogram +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct HistogramFold { + /// Name of the `le` column. It's a special column in prometheus + /// for implementing conventional histogram. It's a string column + /// with "literal" float value, like "+Inf", "0.001" etc. + le_column: String, + ts_column: String, + input: LogicalPlan, + field_column: String, + output_schema: DFSchemaRef, +} + +impl UserDefinedLogicalNodeCore for HistogramFold { + fn name(&self) -> &str { + Self::name() + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &DFSchemaRef { + &self.output_schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "HistogramFold: le={}, field={}", + self.le_column, self.field_column + ) + } + + fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { + Self { + le_column: self.le_column.clone(), + ts_column: self.ts_column.clone(), + input: inputs[0].clone(), + field_column: self.field_column.clone(), + // This method cannot return error. Otherwise we should re-calculate + // the output schema + output_schema: self.output_schema.clone(), + } + } +} + +impl HistogramFold { + #[allow(dead_code)] + pub fn new( + le_column: String, + field_column: String, + ts_column: String, + input: LogicalPlan, + ) -> DataFusionResult { + let input_schema = input.schema(); + Self::check_schema(input_schema, &le_column, &field_column, &ts_column)?; + let output_schema = Self::convert_schema(input_schema, &le_column, &field_column)?; + Ok(Self { + le_column, + ts_column, + input, + field_column, + output_schema, + }) + } + + pub const fn name() -> &'static str { + "HistogramFold" + } + + fn check_schema( + input_schema: &DFSchemaRef, + le_column: &str, + field_column: &str, + ts_column: &str, + ) -> DataFusionResult<()> { + let check_column = |col| { + if !input_schema.has_column_with_unqualified_name(col) { + return Err(DataFusionError::SchemaError( + datafusion::common::SchemaError::FieldNotFound { + field: Box::new(Column::new(None::, col)), + valid_fields: input_schema + .fields() + .iter() + .map(|f| f.qualified_column()) + .collect(), + }, + )); + } else { + Ok(()) + } + }; + + check_column(le_column)?; + check_column(ts_column)?; + check_column(field_column) + } + + #[allow(dead_code)] + pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { + let input_schema = self.input.schema(); + // safety: those fields are checked in `check_schema()` + let le_column_index = input_schema + .index_of_column_by_name(None, &self.le_column) + .unwrap() + .unwrap(); + let field_column_index = input_schema + .index_of_column_by_name(None, &self.field_column) + .unwrap() + .unwrap(); + let ts_column_index = input_schema + .index_of_column_by_name(None, &self.ts_column) + .unwrap() + .unwrap(); + + Arc::new(HistogramFoldExec { + le_column_index, + field_column_index, + ts_column_index, + input: exec_input, + output_schema: Arc::new(self.output_schema.as_ref().into()), + metric: ExecutionPlanMetricsSet::new(), + }) + } + + /// Transform the schema + /// + /// - `le` will become a [ListArray] of [f64]. With each bucket bound parsed + /// - `field` will become a [ListArray] of [f64] + fn convert_schema( + input_schema: &DFSchemaRef, + le_column: &str, + field_column: &str, + ) -> DataFusionResult { + let mut fields = input_schema.fields().clone(); + // safety: those fields are checked in `check_schema()` + let le_column_idx = input_schema + .index_of_column_by_name(None, le_column)? + .unwrap(); + let field_column_idx = input_schema + .index_of_column_by_name(None, field_column)? + .unwrap(); + + // transform `le` + let le_field: Field = fields[le_column_idx].field().as_ref().clone(); + let le_field = le_field.with_data_type(DataType::Float64); + let folded_le_datatype = DataType::List(Arc::new(le_field)); + let folded_le = DFField::new( + fields[le_column_idx].qualifier().cloned(), + fields[le_column_idx].name(), + folded_le_datatype, + false, + ); + + // transform `field` + // to avoid ambiguity, that field will be referenced as `the_field` below. + let the_field: Field = fields[field_column_idx].field().as_ref().clone(); + let folded_field_datatype = DataType::List(Arc::new(the_field)); + let folded_field = DFField::new( + fields[field_column_idx].qualifier().cloned(), + fields[field_column_idx].name(), + folded_field_datatype, + false, + ); + + fields[le_column_idx] = folded_le; + fields[field_column_idx] = folded_field; + + Ok(Arc::new(DFSchema::new_with_metadata( + fields, + HashMap::new(), + )?)) + } +} + +#[derive(Debug)] +pub struct HistogramFoldExec { + /// Index for `le` column in the schema of input. + le_column_index: usize, + input: Arc, + output_schema: SchemaRef, + /// Index for field column in the schema of input. + field_column_index: usize, + ts_column_index: usize, + metric: ExecutionPlanMetricsSet, +} + +impl ExecutionPlan for HistogramFoldExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + self.input.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + + fn required_input_ordering(&self) -> Vec>> { + let mut cols = self + .tag_col_exprs() + .into_iter() + .map(|expr| PhysicalSortRequirement { + expr, + options: None, + }) + .collect::>(); + // add le ASC + cols.push(PhysicalSortRequirement { + expr: Arc::new(PhyColumn::new( + self.output_schema.field(self.le_column_index).name(), + self.le_column_index, + )), + options: Some(SortOptions { + descending: false, // +INF in the last + nulls_first: false, // not nullable + }), + }); + // add ts + cols.push(PhysicalSortRequirement { + expr: Arc::new(PhyColumn::new( + self.output_schema.field(self.ts_column_index).name(), + self.ts_column_index, + )), + options: None, + }); + + vec![Some(cols)] + } + + fn required_input_distribution(&self) -> Vec { + // partition on all tag columns, i.e., non-le, non-ts and non-field columns + vec![Distribution::HashPartitioned(self.tag_col_exprs())] + } + + fn maintains_input_order(&self) -> Vec { + vec![true; self.children().len()] + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + // cannot change schema with this method + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DataFusionResult> { + assert!(!children.is_empty()); + Ok(Arc::new(Self { + input: children[0].clone(), + metric: self.metric.clone(), + le_column_index: self.le_column_index, + ts_column_index: self.ts_column_index, + output_schema: self.output_schema.clone(), + field_column_index: self.field_column_index, + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DataFusionResult { + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + + let batch_size = context.session_config().batch_size(); + let input = self.input.execute(partition, context)?; + let output_schema = self.output_schema.clone(); + + let mut normal_indices = (0..output_schema.fields().len()).collect::>(); + normal_indices.remove(&self.le_column_index); + normal_indices.remove(&self.field_column_index); + Ok(Box::pin(HistogramFoldStream { + le_column_index: self.le_column_index, + field_column_index: self.field_column_index, + normal_indices: normal_indices.into_iter().collect(), + bucket_size: None, + input_buffer: vec![], + input, + output_schema, + metric: baseline_metric, + batch_size, + input_buffered_rows: 0, + output_buffer: HistogramFoldStream::empty_output_buffer(&self.output_schema)?, + output_buffered_rows: 0, + })) + } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } + + fn statistics(&self) -> Statistics { + Statistics { + num_rows: None, + total_byte_size: None, + column_statistics: None, + is_exact: false, + } + } +} + +impl HistogramFoldExec { + /// Return all the [PhysicalExpr] of tag columns in order. + /// + /// Tag columns are all columns except `le`, `field` and `ts` columns. + pub fn tag_col_exprs(&self) -> Vec> { + self.input + .schema() + .fields() + .iter() + .enumerate() + .filter_map(|(idx, field)| { + if idx == self.le_column_index + || idx == self.field_column_index + || idx == self.ts_column_index + { + None + } else { + Some(Arc::new(PhyColumn::new(field.name(), idx)) as _) + } + }) + .collect() + } +} + +impl DisplayAs for HistogramFoldExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "HistogramFoldExec: le=@{}, field=@{}", + self.le_column_index, self.field_column_index + ) + } + } + } +} + +pub struct HistogramFoldStream { + // internal states + le_column_index: usize, + field_column_index: usize, + /// Columns need not folding + normal_indices: Vec, + bucket_size: Option, + /// Expected output batch size + batch_size: usize, + output_schema: SchemaRef, + + // buffers + input_buffer: Vec, + input_buffered_rows: usize, + output_buffer: Vec>, + output_buffered_rows: usize, + + // runtime things + input: SendableRecordBatchStream, + metric: BaselineMetrics, +} + +impl RecordBatchStream for HistogramFoldStream { + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } +} + +impl Stream for HistogramFoldStream { + type Item = DataFusionResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let poll = loop { + match ready!(self.input.poll_next_unpin(cx)) { + Some(batch) => { + let batch = batch?; + let timer = Instant::now(); + let Some(result) = self.fold_input(batch)? else { + self.metric.elapsed_compute().add_elapsed(timer); + continue; + }; + self.metric.elapsed_compute().add_elapsed(timer); + break Poll::Ready(Some(result)); + } + None => break Poll::Ready(self.take_output_buf()?.map(Ok)), + } + }; + self.metric.record_poll(poll) + } +} + +impl HistogramFoldStream { + /// The inner most `Result` is for `poll_next()` + pub fn fold_input( + &mut self, + input: RecordBatch, + ) -> DataFusionResult>> { + let Some(bucket_num) = self.calculate_bucket_num(&input)? else { + return Ok(None); + }; + + if self.input_buffered_rows + input.num_rows() < bucket_num { + // not enough rows to fold + self.push_input_buf(input); + return Ok(None); + } + + self.fold_buf(bucket_num, input)?; + if self.output_buffered_rows >= self.batch_size { + return Ok(self.take_output_buf()?.map(Ok)); + } + + Ok(None) + } + + pub fn empty_output_buffer( + schema: &SchemaRef, + ) -> DataFusionResult>> { + let mut builders = Vec::with_capacity(schema.fields().len()); + for field in schema.fields() { + let concrete_datatype = ConcreteDataType::try_from(field.data_type()).unwrap(); + let mutable_vector = concrete_datatype.create_mutable_vector(0); + builders.push(mutable_vector); + } + + Ok(builders) + } + + fn calculate_bucket_num(&mut self, batch: &RecordBatch) -> DataFusionResult> { + if let Some(size) = self.bucket_size { + return Ok(Some(size)); + } + + let inf_pos = self.find_positive_inf(batch)?; + if inf_pos == batch.num_rows() { + // no positive inf found, append to buffer and wait for next batch + self.push_input_buf(batch.clone()); + return Ok(None); + } + + // else we found the positive inf. + // calculate the bucket size + let bucket_size = inf_pos + self.input_buffered_rows + 1; + Ok(Some(bucket_size)) + } + + /// Fold record batches from input buffer and put to output buffer + fn fold_buf(&mut self, bucket_num: usize, input: RecordBatch) -> DataFusionResult<()> { + self.push_input_buf(input); + // TODO(ruihang): this concat is avoidable. + let batch = concat_batches(&self.input.schema(), self.input_buffer.drain(..).as_ref())?; + let mut remaining_rows = self.input_buffered_rows; + let mut cursor = 0; + + let gt_schema = GtSchema::try_from(self.input.schema()).unwrap(); + let batch = GtRecordBatch::try_from_df_record_batch(Arc::new(gt_schema), batch).unwrap(); + + while remaining_rows >= bucket_num { + // "sample" normal columns + for normal_index in &self.normal_indices { + let val = batch.column(*normal_index).get(cursor); + self.output_buffer[*normal_index].push_value_ref(val.as_value_ref()); + } + // "fold" `le` and field columns + let le_array = batch.column(self.le_column_index); + let field_array = batch.column(self.field_column_index); + let mut le_item = vec![]; + let mut field_item = vec![]; + for bias in 0..bucket_num { + let le_str_val = le_array.get(cursor + bias); + let le_str_val_ref = le_str_val.as_value_ref(); + let le_str = le_str_val_ref + .as_string() + .unwrap() + .expect("le column should not be nullable"); + let le = le_str.parse::().unwrap(); + let le_val = Value::from(le); + le_item.push(le_val); + + let field = field_array.get(cursor + bias); + field_item.push(field); + } + let le_list_val = Value::List(ListValue::new( + Some(Box::new(le_item)), + ConcreteDataType::float64_datatype(), + )); + let field_list_val = Value::List(ListValue::new( + Some(Box::new(field_item)), + ConcreteDataType::float64_datatype(), + )); + self.output_buffer[self.le_column_index].push_value_ref(le_list_val.as_value_ref()); + self.output_buffer[self.field_column_index] + .push_value_ref(field_list_val.as_value_ref()); + + cursor += bucket_num; + remaining_rows -= bucket_num; + self.output_buffered_rows += 1; + } + + let remaining_input_batch = batch.into_df_record_batch().slice(cursor, remaining_rows); + self.input_buffered_rows = remaining_input_batch.num_rows(); + self.input_buffer.push(remaining_input_batch); + + Ok(()) + } + + fn push_input_buf(&mut self, batch: RecordBatch) { + self.input_buffered_rows += batch.num_rows(); + self.input_buffer.push(batch); + } + + fn take_output_buf(&mut self) -> DataFusionResult> { + if self.output_buffered_rows == 0 { + if self.input_buffered_rows != 0 { + warn!( + "input buffer is not empty, {} rows remaining", + self.input_buffered_rows + ); + } + return Ok(None); + } + + let mut output_buf = Self::empty_output_buffer(&self.output_schema)?; + std::mem::swap(&mut self.output_buffer, &mut output_buf); + let mut columns = Vec::with_capacity(output_buf.len()); + for builder in output_buf.iter_mut() { + columns.push(builder.to_vector().to_arrow_array()); + } + + // overwrite default list datatype to change field name + columns[self.le_column_index] = compute::cast( + &columns[self.le_column_index], + self.output_schema.field(self.le_column_index).data_type(), + )?; + columns[self.field_column_index] = compute::cast( + &columns[self.field_column_index], + self.output_schema + .field(self.field_column_index) + .data_type(), + )?; + + self.output_buffered_rows = 0; + RecordBatch::try_new(self.output_schema.clone(), columns) + .map(Some) + .map_err(DataFusionError::ArrowError) + } + + /// Find the first `+Inf` which indicates the end of the bucket group + /// + /// If the return value equals to batch's num_rows means the it's not found + /// in this batch + fn find_positive_inf(&self, batch: &RecordBatch) -> DataFusionResult { + // fuse this function. It should not be called when the + // bucket size is already know. + if let Some(bucket_size) = self.bucket_size { + return Ok(bucket_size); + } + let string_le_array = batch.column(self.le_column_index); + let float_le_array = compute::cast(&string_le_array, &DataType::Float64).map_err(|e| { + DataFusionError::Execution(format!( + "cannot cast {} array to float64 array: {:?}", + string_le_array.data_type(), + e + )) + })?; + let le_as_f64_array = float_le_array + .as_primitive_opt::() + .ok_or_else(|| { + DataFusionError::Execution(format!( + "expect a float64 array, but found {}", + float_le_array.data_type() + )) + })?; + for (i, v) in le_as_f64_array.iter().enumerate() { + if let Some(v) = v && v == f64::INFINITY { + return Ok(i); + } + } + + Ok(batch.num_rows()) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use datafusion::arrow::array::Float64Array; + use datafusion::arrow::datatypes::Schema; + use datafusion::common::ToDFSchema; + use datafusion::physical_plan::memory::MemoryExec; + use datafusion::prelude::SessionContext; + use datatypes::arrow_array::StringArray; + + use super::*; + + fn prepare_test_data() -> MemoryExec { + let schema = Arc::new(Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new("le", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + ])); + + // 12 items + let host_column_1 = Arc::new(StringArray::from(vec![ + "host_1", "host_1", "host_1", "host_1", "host_1", "host_1", "host_1", "host_1", + "host_1", "host_1", "host_1", "host_1", + ])) as _; + let le_column_1 = Arc::new(StringArray::from(vec![ + "0.001", "0.1", "10", "1000", "+Inf", "0.001", "0.1", "10", "1000", "+inf", "0.001", + "0.1", + ])) as _; + let val_column_1 = Arc::new(Float64Array::from(vec![ + 0_0.0, 1.0, 1.0, 5.0, 5.0, 0_0.0, 20.0, 60.0, 70.0, 100.0, 0_1.0, 1.0, + ])) as _; + + // 2 items + let host_column_2 = Arc::new(StringArray::from(vec!["host_1", "host_1"])) as _; + let le_column_2 = Arc::new(StringArray::from(vec!["10", "1000"])) as _; + let val_column_2 = Arc::new(Float64Array::from(vec![1.0, 1.0])) as _; + + // 11 items + let host_column_3 = Arc::new(StringArray::from(vec![ + "host_1", "host_2", "host_2", "host_2", "host_2", "host_2", "host_2", "host_2", + "host_2", "host_2", "host_2", + ])) as _; + let le_column_3 = Arc::new(StringArray::from(vec![ + "+INF", "0.001", "0.1", "10", "1000", "+iNf", "0.001", "0.1", "10", "1000", "+Inf", + ])) as _; + let val_column_3 = Arc::new(Float64Array::from(vec![ + 1.0, 0_0.0, 0.0, 0.0, 0.0, 0.0, 0_0.0, 1.0, 2.0, 3.0, 4.0, + ])) as _; + + let data_1 = RecordBatch::try_new( + schema.clone(), + vec![host_column_1, le_column_1, val_column_1], + ) + .unwrap(); + let data_2 = RecordBatch::try_new( + schema.clone(), + vec![host_column_2, le_column_2, val_column_2], + ) + .unwrap(); + let data_3 = RecordBatch::try_new( + schema.clone(), + vec![host_column_3, le_column_3, val_column_3], + ) + .unwrap(); + + MemoryExec::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap() + } + + #[tokio::test] + async fn fold_overall() { + let memory_exec = Arc::new(prepare_test_data()); + let output_schema = Arc::new( + (*HistogramFold::convert_schema( + &Arc::new(memory_exec.schema().to_dfschema().unwrap()), + "le", + "val", + ) + .unwrap() + .as_ref()) + .clone() + .into(), + ); + let fold_exec = Arc::new(HistogramFoldExec { + le_column_index: 1, + field_column_index: 2, + ts_column_index: 9999, // not exist but doesn't matter + input: memory_exec, + output_schema, + metric: ExecutionPlanMetricsSet::new(), + }); + + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + let expected = String::from( + "+--------+---------------------------------+--------------------------------+ +| host | le | val | ++--------+---------------------------------+--------------------------------+ +| host_1 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 1.0, 1.0, 5.0, 5.0] | +| host_1 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 20.0, 60.0, 70.0, 100.0] | +| host_1 | [0.001, 0.1, 10.0, 1000.0, inf] | [1.0, 1.0, 1.0, 1.0, 1.0] | +| host_2 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 0.0, 0.0, 0.0, 0.0] | +| host_2 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 1.0, 2.0, 3.0, 4.0] | ++--------+---------------------------------+--------------------------------+", + ); + assert_eq!(result_literal, expected); + } + + #[test] + fn confirm_schema() { + let input_schema = Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new("le", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + ]) + .to_dfschema_ref() + .unwrap(); + let expected_output_schema = Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new( + "le", + DataType::List(Arc::new(Field::new("le", DataType::Float64, true))), + false, + ), + Field::new( + "val", + DataType::List(Arc::new(Field::new("val", DataType::Float64, true))), + false, + ), + ]) + .to_dfschema_ref() + .unwrap(); + + let actual = HistogramFold::convert_schema(&input_schema, "le", "val").unwrap(); + assert_eq!(actual, expected_output_schema) + } +} diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index c7140659a878..c8d45033dc1b 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -61,6 +61,8 @@ use crate::functions::{ /// `time()` function in PromQL. const SPECIAL_TIME_FUNCTION: &str = "time"; +/// `histogram_quantile` function in PromQL +const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile"; const DEFAULT_TIME_INDEX_COLUMN: &str = "time"; @@ -440,6 +442,10 @@ impl PromPlanner { })); } + if func.name == SPECIAL_HISTOGRAM_QUANTILE { + todo!() + } + let args = self.create_function_args(&args.args)?; let input = self .prom_expr_to_plan(args.input.with_context(|| ExpectExprSnafu {