From 7ecf28bb7d3f62d0361893789a4109e9103d6ae3 Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Fri, 1 Sep 2023 13:25:51 -0700 Subject: [PATCH] Remove the test --- .../sorts/sort_preserving_merge.rs | 233 ++++-------------- 1 file changed, 50 insertions(+), 183 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index c5b85b68eedd2..0dad1d30dd184 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -30,10 +30,9 @@ use crate::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; -use datafusion_execution::memory_pool::MemoryConsumer; use arrow::datatypes::SchemaRef; -use datafusion_common::{internal_err, DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{ EquivalenceProperties, OrderingEquivalenceProperties, PhysicalSortRequirement, @@ -147,13 +146,6 @@ impl ExecutionPlan for SortPreservingMergeExec { Partitioning::UnknownPartitioning(1) } - /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but its input(s) are - /// infinite, returns an error to indicate this. - fn unbounded_output(&self, children: &[bool]) -> Result { - Ok(children[0]) - } - fn required_input_distribution(&self) -> Vec { vec![Distribution::UnspecifiedDistribution] } @@ -202,9 +194,9 @@ impl ExecutionPlan for SortPreservingMergeExec { partition ); if 0 != partition { - return internal_err!( + return Err(DataFusionError::Internal(format!( "SortPreservingMergeExec invalid partition {partition}" - ); + ))); } let input_partitions = self.input.output_partitioning().partition_count(); @@ -214,14 +206,11 @@ impl ExecutionPlan for SortPreservingMergeExec { ); let schema = self.schema(); - let reservation = - MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]")) - .register(&context.runtime_env().memory_pool); - match input_partitions { - 0 => internal_err!( + 0 => Err(DataFusionError::Internal( "SortPreservingMergeExec requires at least one input partition" - ), + .to_owned(), + )), 1 => { // bypass if there is only one partition to merge (no metrics in this case either) let result = self.input.execute(0, context); @@ -245,7 +234,6 @@ impl ExecutionPlan for SortPreservingMergeExec { BaselineMetrics::new(&self.metrics, partition), context.session_config().batch_size(), self.fetch, - reservation, )?; debug!("Got stream result from SortPreservingMergeStream::new_from_receivers"); @@ -267,19 +255,13 @@ impl ExecutionPlan for SortPreservingMergeExec { #[cfg(test)] mod tests { use std::iter::FromIterator; - use arrow_array::UInt32Array; - use rand::Rng; - use uuid::Uuid; use arrow::array::ArrayRef; use arrow::compute::SortOptions; - use arrow::datatypes::{DataType, Field, Schema, Int32Type}; + use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; - use datafusion_execution::config::SessionConfig; - use tempfile::TempDir; - use futures::{FutureExt, StreamExt, stream::BoxStream}; + use futures::{FutureExt, StreamExt}; - use crate::execution::context::SessionContext; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; @@ -287,124 +269,18 @@ mod tests { use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::stream::RecordBatchReceiverStream; use crate::physical_plan::{collect, common}; + use crate::prelude::{SessionConfig, SessionContext}; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending}; use crate::{assert_batches_eq, test_util}; - use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, DictionaryArray}; - - use crate::physical_plan::streaming::PartitionStream; - use crate::physical_plan::stream::RecordBatchStreamAdapter; - use crate::datasource::{streaming::StreamingTable, TableProvider}; + use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray}; use super::*; - fn make_infinite_sorted_stream(col_b_init: &u32) -> BoxStream<'static, RecordBatch> { - let col_b_init_clone = col_b_init.clone(); - futures::stream::unfold((0, col_b_init_clone), move |(mut counter, mut col_b_ascii)| async move { - if counter >= 12000 { - return None; - } - - if counter % 5 == 0 { - col_b_ascii = col_b_ascii + 2; - } - - counter = counter + 1; - - // building col `a` - let mut values_vector: Vec = Vec::new(); - for _i in 1..=8192 { - values_vector.push(String::from(Uuid::new_v4().to_string())); - } - let values = StringArray::from(values_vector); - - let mut keys_vector: Vec = Vec::new(); - for _i in 1..=8192 { - keys_vector.push(rand::thread_rng().gen_range(0..8192)); - } - let keys = Int32Array::from(keys_vector); - let col_a: ArrayRef = Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()); - - // building col `b` - let mut values: Vec = Vec::new(); - for _i in 1..=8192 { - values.push(col_b_ascii); - } - let col_b: ArrayRef = Arc::new(UInt32Array::from(values)); - - // build a record batch out of col `a` and col `b` - let batch: RecordBatch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); - Some((batch, (counter, col_b_ascii))) - }).boxed() - } - - struct InfiniteStream { - schema: SchemaRef, - col_b_init: u32 - } - - impl PartitionStream for InfiniteStream { - fn schema(&self) -> &SchemaRef { - &self.schema - } - - fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { - // We create an iterator from the record batches and map them into Ok values, - // converting the iterator into a futures::stream::Stream - Box::pin(RecordBatchStreamAdapter::new( - self.schema.clone(), - make_infinite_sorted_stream(&self.col_b_init).map(Ok) - )) - } - } - - #[tokio::test] - #[ignore] - async fn test_dict_merge_infinite() { - let session_ctx = SessionContext::new(); - let task_ctx: Arc = session_ctx.task_ctx(); - - let schema = SchemaRef::new(Schema::new(vec![ - Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), false), - Field::new("b", DataType::UInt32, false), - ])); - - let stream_1 = Arc::new(InfiniteStream { - schema: schema.clone(), col_b_init: 1 - }); - - let stream_2 = Arc::new(InfiniteStream { - schema: schema.clone(), col_b_init: 2 - }); - - println!("SortPreservingMergeExec result: "); - - let sort = vec![ - PhysicalSortExpr { - expr: col("b", &schema).unwrap(), - options: Default::default(), - }, - PhysicalSortExpr { - expr: col("a", &schema).unwrap(), - options: Default::default(), - }, - ]; - - let provider = StreamingTable::try_new(schema, vec![stream_1, stream_2]).unwrap(); - - let plan = provider.scan(&session_ctx.state(), None, &[], None).await.unwrap(); - let exec = Arc::new(SortPreservingMergeExec::new(sort, plan)); - let mut stream = exec.execute(0, task_ctx).unwrap(); - while let Some(batch) = stream.next().await { - println!("{}", arrow::util::pretty::pretty_format_batches(&[batch.unwrap().clone()]) - .unwrap() - .to_string()); - } - } - #[tokio::test] async fn test_merge_interleave() { - let task_ctx = Arc::new(TaskContext::default()); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ Some("a"), @@ -452,7 +328,8 @@ mod tests { #[tokio::test] async fn test_merge_some_overlap() { - let task_ctx = Arc::new(TaskContext::default()); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ Some("a"), @@ -500,7 +377,8 @@ mod tests { #[tokio::test] async fn test_merge_no_overlap() { - let task_ctx = Arc::new(TaskContext::default()); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ Some("a"), @@ -548,7 +426,8 @@ mod tests { #[tokio::test] async fn test_merge_three_partitions() { - let task_ctx = Arc::new(TaskContext::default()); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ Some("a"), @@ -668,11 +547,11 @@ mod tests { } #[tokio::test] - async fn test_partition_sort() -> Result<()> { - let task_ctx = Arc::new(TaskContext::default()); + async fn test_partition_sort() { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let partitions = 4; - let tmp_dir = TempDir::new()?; - let csv = test::scan_partitioned_csv(partitions, tmp_dir.path()).unwrap(); + let csv = test::scan_partitioned_csv(partitions).unwrap(); let schema = csv.schema(); let sort = vec![ @@ -711,8 +590,6 @@ mod tests { basic, partition, "basic:\n\n{basic}\n\npartition:\n\n{partition}\n\n" ); - - Ok(()) } // Split the provided record batch into multiple batch_size record batches @@ -742,22 +619,20 @@ mod tests { sort: Vec, sizes: &[usize], context: Arc, - ) -> Result> { + ) -> Arc { let partitions = 4; - let tmp_dir = TempDir::new()?; - let csv = test::scan_partitioned_csv(partitions, tmp_dir.path()).unwrap(); + let csv = test::scan_partitioned_csv(partitions).unwrap(); let sorted = basic_sort(csv, sort, context).await; let split: Vec<_> = sizes.iter().map(|x| split_batch(&sorted, *x)).collect(); - Ok(Arc::new( - MemoryExec::try_new(&split, sorted.schema(), None).unwrap(), - )) + Arc::new(MemoryExec::try_new(&split, sorted.schema(), None).unwrap()) } #[tokio::test] - async fn test_partition_sort_streaming_input() -> Result<()> { - let task_ctx = Arc::new(TaskContext::default()); + async fn test_partition_sort_streaming_input() { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let schema = test_util::aggr_test_schema(); let sort = vec![ // uint8 @@ -783,8 +658,7 @@ mod tests { ]; let input = - sorted_partitioned_input(sort.clone(), &[10, 3, 11], task_ctx.clone()) - .await?; + sorted_partitioned_input(sort.clone(), &[10, 3, 11], task_ctx.clone()).await; let basic = basic_sort(input.clone(), sort.clone(), task_ctx.clone()).await; let partition = sorted_merge(input, sort, task_ctx.clone()).await; @@ -799,12 +673,10 @@ mod tests { .to_string(); assert_eq!(basic, partition); - - Ok(()) } #[tokio::test] - async fn test_partition_sort_streaming_input_output() -> Result<()> { + async fn test_partition_sort_streaming_input_output() { let schema = test_util::aggr_test_schema(); let sort = vec![ @@ -820,19 +692,17 @@ mod tests { }, ]; - // Test streaming with default batch size - let task_ctx = Arc::new(TaskContext::default()); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let input = - sorted_partitioned_input(sort.clone(), &[10, 5, 13], task_ctx.clone()) - .await?; + sorted_partitioned_input(sort.clone(), &[10, 5, 13], task_ctx.clone()).await; let basic = basic_sort(input.clone(), sort.clone(), task_ctx).await; - // batch size of 23 - let task_ctx = TaskContext::default() - .with_session_config(SessionConfig::new().with_batch_size(23)); - let task_ctx = Arc::new(task_ctx); + let session_ctx_bs_23 = + SessionContext::with_config(SessionConfig::new().with_batch_size(23)); let merge = Arc::new(SortPreservingMergeExec::new(sort, input)); + let task_ctx = session_ctx_bs_23.task_ctx(); let merged = collect(merge, task_ctx).await.unwrap(); assert_eq!(merged.len(), 14); @@ -848,13 +718,12 @@ mod tests { .to_string(); assert_eq!(basic, partition); - - Ok(()) } #[tokio::test] async fn test_nulls() { - let task_ctx = Arc::new(TaskContext::default()); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ None, @@ -934,8 +803,9 @@ mod tests { } #[tokio::test] - async fn test_async() -> Result<()> { - let task_ctx = Arc::new(TaskContext::default()); + async fn test_async() { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let schema = test_util::aggr_test_schema(); let sort = vec![PhysicalSortExpr { expr: col("c12", &schema).unwrap(), @@ -943,7 +813,7 @@ mod tests { }]; let batches = - sorted_partitioned_input(sort.clone(), &[5, 7, 3], task_ctx.clone()).await?; + sorted_partitioned_input(sort.clone(), &[5, 7, 3], task_ctx.clone()).await; let partition_count = batches.output_partitioning().partition_count(); let mut streams = Vec::with_capacity(partition_count); @@ -966,18 +836,14 @@ mod tests { } let metrics = ExecutionPlanMetricsSet::new(); - let reservation = - MemoryConsumer::new("test").register(&task_ctx.runtime_env().memory_pool); - let fetch = None; let merge_stream = streaming_merge( streams, batches.schema(), sort.as_slice(), BaselineMetrics::new(&metrics, 0), task_ctx.session_config().batch_size(), - fetch, - reservation, + None, ) .unwrap(); @@ -998,13 +864,12 @@ mod tests { basic, partition, "basic:\n\n{basic}\n\npartition:\n\n{partition}\n\n" ); - - Ok(()) } #[tokio::test] async fn test_merge_metrics() { - let task_ctx = Arc::new(TaskContext::default()); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"), Some("c")])); let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); @@ -1022,7 +887,7 @@ mod tests { let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let collected = collect(merge.clone(), task_ctx).await.unwrap(); - let expected = [ + let expected = vec![ "+----+---+", "| a | b |", "+----+---+", @@ -1060,7 +925,8 @@ mod tests { #[tokio::test] async fn test_drop_cancel() -> Result<()> { - let task_ctx = Arc::new(TaskContext::default()); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); @@ -1086,7 +952,8 @@ mod tests { #[tokio::test] async fn test_stable_sort() { - let task_ctx = Arc::new(TaskContext::default()); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); // Create record batches like: // batch_number |value