From b7262c2d56c6254dcb07a227ac89f9181c4cf570 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 4 Aug 2024 15:24:12 +0800 Subject: [PATCH 01/12] reduce clone of `Statistics` by using arc. --- datafusion/core/src/datasource/listing/mod.rs | 3 ++- .../core/src/datasource/listing/table.rs | 15 +++++++++------ .../physical_plan/file_scan_config.rs | 4 ++-- datafusion/core/src/datasource/statistics.rs | 18 +++++++++++------- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 44f92760908d..c5a742d50e53 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -78,10 +78,11 @@ pub struct PartitionedFile { /// /// DataFusion relies on these statistics for planning (in particular to sort file groups), /// so if they are incorrect, incorrect answers may result. - pub statistics: Option, + pub statistics: Option>, /// An optional field for user defined per object metadata pub extensions: Option>, } + impl PartitionedFile { /// Create a simple file without metadata or partition pub fn new(path: impl Into, size: u64) -> Self { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 80f49e4eb8e6..61960e185289 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -978,10 +978,12 @@ impl ListingTable { let statistics = self.do_collect_statistics(ctx, &store, &part_file).await?; part_file.statistics = Some(statistics.clone()); - Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)> + Ok((part_file, statistics)) } else { - Ok((part_file, Statistics::new_unknown(&self.file_schema))) - as Result<(PartitionedFile, Statistics)> + Ok(( + part_file, + Arc::new(Statistics::new_unknown(&self.file_schema)), + )) } }) .boxed() @@ -1011,12 +1013,12 @@ impl ListingTable { ctx: &SessionState, store: &Arc, part_file: &PartitionedFile, - ) -> Result { + ) -> Result> { let statistics_cache = self.collected_statistics.clone(); return match statistics_cache .get_with_extra(&part_file.object_meta.location, &part_file.object_meta) { - Some(statistics) => Ok(statistics.as_ref().clone()), + Some(statistics) => Ok(statistics.clone()), None => { let statistics = self .options @@ -1028,9 +1030,10 @@ impl ListingTable { &part_file.object_meta, ) .await?; + let statistics = Arc::new(statistics); statistics_cache.put_with_extra( &part_file.object_meta.location, - statistics.clone().into(), + statistics.clone(), &part_file.object_meta, ); Ok(statistics) diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 17850ea7585a..566464cdfab0 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -1193,7 +1193,7 @@ mod tests { }, partition_values: vec![ScalarValue::from(file.date)], range: None, - statistics: Some(Statistics { + statistics: Some(Arc::new(Statistics { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: file @@ -1213,7 +1213,7 @@ mod tests { .unwrap_or_default() }) .collect::>(), - }), + })), extensions: None, } } diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 8c789e461b08..0191479551b1 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use super::listing::PartitionedFile; use crate::arrow::datatypes::{Schema, SchemaRef}; use crate::error::Result; @@ -35,7 +37,7 @@ use itertools::multiunzip; /// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive /// call to `multiunzip` for constructing file level summary statistics. pub async fn get_statistics_with_limit( - all_files: impl Stream>, + all_files: impl Stream)>>, file_schema: SchemaRef, limit: Option, collect_stats: bool, @@ -62,9 +64,11 @@ pub async fn get_statistics_with_limit( result_files.push(file); // First file, we set them directly from the file statistics. - num_rows = file_stats.num_rows; - total_byte_size = file_stats.total_byte_size; - for (index, file_column) in file_stats.column_statistics.into_iter().enumerate() { + num_rows = file_stats.num_rows.clone(); + total_byte_size = file_stats.total_byte_size.clone(); + for (index, file_column) in + file_stats.column_statistics.clone().into_iter().enumerate() + { null_counts[index] = file_column.null_count; max_values[index] = file_column.max_value; min_values[index] = file_column.min_value; @@ -90,14 +94,14 @@ pub async fn get_statistics_with_limit( // counts across all the files in question. If any file does not // provide any information or provides an inexact value, we demote // the statistic precision to inexact. - num_rows = add_row_stats(file_stats.num_rows, num_rows); + num_rows = add_row_stats(file_stats.num_rows.clone(), num_rows); total_byte_size = - add_row_stats(file_stats.total_byte_size, total_byte_size); + add_row_stats(file_stats.total_byte_size.clone(), total_byte_size); (null_counts, max_values, min_values) = multiunzip( izip!( - file_stats.column_statistics.into_iter(), + file_stats.column_statistics.clone().into_iter(), null_counts.into_iter(), max_values.into_iter(), min_values.into_iter() From 9205343395b3ed52c568c5393732886b456c55f7 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 4 Aug 2024 20:53:36 +0800 Subject: [PATCH 02/12] optimize `get_statistics_with_limit` and `split_files`. --- .../core/src/datasource/listing/helpers.rs | 5 +- datafusion/core/src/datasource/listing/mod.rs | 18 +++ datafusion/core/src/datasource/statistics.rs | 106 +++++++++--------- 3 files changed, 77 insertions(+), 52 deletions(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 29b593a70ca0..c0903710929a 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -18,6 +18,7 @@ //! Helper functions for the table implementation use std::collections::HashMap; +use std::mem; use std::sync::Arc; use super::PartitionedFile; @@ -139,8 +140,8 @@ pub fn split_files( // effectively this is div with rounding up instead of truncating let chunk_size = (partitioned_files.len() + n - 1) / n; partitioned_files - .chunks(chunk_size) - .map(|c| c.to_vec()) + .chunks_mut(chunk_size) + .map(|c| c.iter_mut().map(|p| mem::take(p)).collect()) .collect() } diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index c5a742d50e53..1603b05fe490 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -160,6 +160,24 @@ impl From for PartitionedFile { } } +impl Default for PartitionedFile { + fn default() -> Self { + Self { + object_meta: ObjectMeta { + location: Path::default(), + last_modified: chrono::Utc.timestamp_nanos(0), + size: 0, + e_tag: None, + version: None, + }, + partition_values: Vec::new(), + range: None, + statistics: None, + extensions: None, + } + } +} + #[cfg(test)] mod tests { use crate::datasource::listing::ListingTableUrl; diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 0191479551b1..3c9aa8dbe87c 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::mem; use std::sync::Arc; use super::listing::PartitionedFile; @@ -29,7 +30,6 @@ use datafusion_common::ScalarValue; use futures::{Stream, StreamExt}; use itertools::izip; -use itertools::multiunzip; /// Get all files as well as the file level summary statistics (no statistic for partition columns). /// If the optional `limit` is provided, includes only sufficient files. Needed to read up to @@ -99,33 +99,23 @@ pub async fn get_statistics_with_limit( total_byte_size = add_row_stats(file_stats.total_byte_size.clone(), total_byte_size); - (null_counts, max_values, min_values) = multiunzip( - izip!( - file_stats.column_statistics.clone().into_iter(), - null_counts.into_iter(), - max_values.into_iter(), - min_values.into_iter() - ) - .map( - |( - ColumnStatistics { - null_count: file_nc, - max_value: file_max, - min_value: file_min, - distinct_count: _, - }, - null_count, - max_value, - min_value, - )| { - ( - add_row_stats(file_nc, null_count), - set_max_if_greater(file_max, max_value), - set_min_if_lesser(file_min, min_value), - ) - }, - ), - ); + for (file_col_stats, null_count, max_value, min_value) in izip!( + file_stats.column_statistics.iter(), + null_counts.iter_mut(), + max_values.iter_mut(), + min_values.iter_mut(), + ) { + let ColumnStatistics { + null_count: file_nc, + max_value: file_max, + min_value: file_min, + distinct_count: _, + } = file_col_stats; + + *null_count = add_row_stats(file_nc.clone(), null_count.clone()); + set_max_if_greater(file_max, max_value); + set_min_if_lesser(file_min, min_value) + } // If the number of rows exceeds the limit, we can stop processing // files. This only applies when we know the number of rows. It also @@ -242,45 +232,61 @@ fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType { /// If the given value is numerically greater than the original maximum value, /// return the new maximum value with appropriate exactness information. fn set_max_if_greater( - max_nominee: Precision, - max_values: Precision, -) -> Precision { - match (&max_values, &max_nominee) { - (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => max_nominee, + max_nominee: &Precision, + max_value: &mut Precision, +) { + match (&max_value, max_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => { + *max_value = max_nominee.clone(); + } (Precision::Exact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Exact(val2)) if val1 < val2 => { - max_nominee.to_inexact() + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_max = mem::take(max_value); + *max_value = exact_max.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *max_value = max_nominee.clone().to_inexact(); } - (Precision::Exact(_), Precision::Absent) => max_values.to_inexact(), - (Precision::Absent, Precision::Exact(_)) => max_nominee.to_inexact(), - (Precision::Absent, Precision::Inexact(_)) => max_nominee, - (Precision::Absent, Precision::Absent) => Precision::Absent, - _ => max_values, + (Precision::Absent, Precision::Inexact(_)) => { + *max_value = max_nominee.clone(); + } + _ => {} } } /// If the given value is numerically lesser than the original minimum value, /// return the new minimum value with appropriate exactness information. fn set_min_if_lesser( - min_nominee: Precision, - min_values: Precision, -) -> Precision { - match (&min_values, &min_nominee) { - (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => min_nominee, + min_nominee: &Precision, + min_value: &mut Precision, +) { + match (&min_value, min_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => { + *min_value = min_nominee.clone(); + } (Precision::Exact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Exact(val2)) if val1 > val2 => { - min_nominee.to_inexact() + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_min = mem::take(min_value); + *min_value = exact_min.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *min_value = min_nominee.clone(); } - (Precision::Exact(_), Precision::Absent) => min_values.to_inexact(), - (Precision::Absent, Precision::Exact(_)) => min_nominee.to_inexact(), - (Precision::Absent, Precision::Inexact(_)) => min_nominee, - (Precision::Absent, Precision::Absent) => Precision::Absent, - _ => min_values, + _ => {} } } From d6361550d921eb606812cec3111d5f08cbe5677a Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 4 Aug 2024 21:34:39 +0800 Subject: [PATCH 03/12] directly create the col stats set. --- datafusion/core/src/datasource/statistics.rs | 49 +++++++------------- 1 file changed, 17 insertions(+), 32 deletions(-) diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 3c9aa8dbe87c..83c66bb77bc6 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -29,7 +29,6 @@ use datafusion_common::stats::Precision; use datafusion_common::ScalarValue; use futures::{Stream, StreamExt}; -use itertools::izip; /// Get all files as well as the file level summary statistics (no statistic for partition columns). /// If the optional `limit` is provided, includes only sufficient files. Needed to read up to @@ -50,9 +49,10 @@ pub async fn get_statistics_with_limit( // - zero for summations, and // - neutral element for extreme points. let size = file_schema.fields().len(); - let mut null_counts: Vec> = vec![Precision::Absent; size]; - let mut max_values: Vec> = vec![Precision::Absent; size]; - let mut min_values: Vec> = vec![Precision::Absent; size]; + let mut col_stats_set = vec![ColumnStatistics::default(); size]; + // let mut null_counts: Vec> = vec![Precision::Absent; size]; + // let mut max_values: Vec> = vec![Precision::Absent; size]; + // let mut min_values: Vec> = vec![Precision::Absent; size]; let mut num_rows = Precision::::Absent; let mut total_byte_size = Precision::::Absent; @@ -69,9 +69,9 @@ pub async fn get_statistics_with_limit( for (index, file_column) in file_stats.column_statistics.clone().into_iter().enumerate() { - null_counts[index] = file_column.null_count; - max_values[index] = file_column.max_value; - min_values[index] = file_column.min_value; + col_stats_set[index].null_count = file_column.null_count; + col_stats_set[index].max_value = file_column.max_value; + col_stats_set[index].min_value = file_column.min_value; } // If the number of rows exceeds the limit, we can stop processing @@ -99,12 +99,11 @@ pub async fn get_statistics_with_limit( total_byte_size = add_row_stats(file_stats.total_byte_size.clone(), total_byte_size); - for (file_col_stats, null_count, max_value, min_value) in izip!( - file_stats.column_statistics.iter(), - null_counts.iter_mut(), - max_values.iter_mut(), - min_values.iter_mut(), - ) { + for (file_col_stats, col_stats) in file_stats + .column_statistics + .iter() + .zip(col_stats_set.iter_mut()) + { let ColumnStatistics { null_count: file_nc, max_value: file_max, @@ -112,9 +111,10 @@ pub async fn get_statistics_with_limit( distinct_count: _, } = file_col_stats; - *null_count = add_row_stats(file_nc.clone(), null_count.clone()); - set_max_if_greater(file_max, max_value); - set_min_if_lesser(file_min, min_value) + col_stats.null_count = + add_row_stats(file_nc.clone(), col_stats.null_count.clone()); + set_max_if_greater(file_max, &mut col_stats.max_value); + set_min_if_lesser(file_min, &mut col_stats.min_value) } // If the number of rows exceeds the limit, we can stop processing @@ -133,7 +133,7 @@ pub async fn get_statistics_with_limit( let mut statistics = Statistics { num_rows, total_byte_size, - column_statistics: get_col_stats_vec(null_counts, max_values, min_values), + column_statistics: col_stats_set, }; if all_files.next().await.is_some() { // If we still have files in the stream, it means that the limit kicked @@ -176,21 +176,6 @@ fn add_row_stats( } } -pub(crate) fn get_col_stats_vec( - null_counts: Vec>, - max_values: Vec>, - min_values: Vec>, -) -> Vec { - izip!(null_counts, max_values, min_values) - .map(|(null_count, max_value, min_value)| ColumnStatistics { - null_count, - max_value, - min_value, - distinct_count: Precision::Absent, - }) - .collect() -} - pub(crate) fn get_col_stats( schema: &Schema, null_counts: Vec>, From fb5fbd15a092be26d34c3a23ec573c2db081fb3d Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 4 Aug 2024 21:48:37 +0800 Subject: [PATCH 04/12] fix pb. --- datafusion/proto/src/physical_plan/from_proto.rs | 2 +- datafusion/proto/src/physical_plan/to_proto.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index bc0a19336bae..79ce7ce62bfc 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -559,7 +559,7 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: val.range.as_ref().map(|v| v.try_into()).transpose()?, - statistics: val.statistics.as_ref().map(|v| v.try_into()).transpose()?, + statistics: val.statistics.as_ref().map(|v| v.try_into().map(Arc::new)).transpose()?, extensions: None, }) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 57cd22a99ae1..ae330e7421d4 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -527,7 +527,7 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: pf.range.as_ref().map(|r| r.try_into()).transpose()?, - statistics: pf.statistics.as_ref().map(|s| s.into()), + statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()), }) } } From ba92c0088f862e4a43777e25d97de0bcd9a2ed4e Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 4 Aug 2024 21:52:20 +0800 Subject: [PATCH 05/12] fix fmt. --- datafusion/proto/src/physical_plan/from_proto.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 79ce7ce62bfc..44fa8a9a717c 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -559,7 +559,11 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: val.range.as_ref().map(|v| v.try_into()).transpose()?, - statistics: val.statistics.as_ref().map(|v| v.try_into().map(Arc::new)).transpose()?, + statistics: val + .statistics + .as_ref() + .map(|v| v.try_into().map(Arc::new)) + .transpose()?, extensions: None, }) } From 136eb5f30e4a5b845cc1236c8301c9c6c5da3e87 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 4 Aug 2024 22:23:18 +0800 Subject: [PATCH 06/12] fix clippy. --- datafusion/core/src/datasource/listing/helpers.rs | 2 +- datafusion/core/src/datasource/listing/table.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index c0903710929a..edce56fff305 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -141,7 +141,7 @@ pub fn split_files( let chunk_size = (partitioned_files.len() + n - 1) / n; partitioned_files .chunks_mut(chunk_size) - .map(|c| c.iter_mut().map(|p| mem::take(p)).collect()) + .map(|c| c.iter_mut().map(mem::take).collect()) .collect() } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 61960e185289..3863f6d5926e 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1015,7 +1015,7 @@ impl ListingTable { part_file: &PartitionedFile, ) -> Result> { let statistics_cache = self.collected_statistics.clone(); - return match statistics_cache + match statistics_cache .get_with_extra(&part_file.object_meta.location, &part_file.object_meta) { Some(statistics) => Ok(statistics.clone()), From cc33b8a832c14153b0671110b971952933aff4d8 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 4 Aug 2024 22:26:45 +0800 Subject: [PATCH 07/12] fix compile. --- datafusion/core/src/datasource/listing/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 3863f6d5926e..c971f357c7b8 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1038,7 +1038,7 @@ impl ListingTable { ); Ok(statistics) } - }; + } } } From b28adeb82b7fa4bafd793b9f49be3beb41917a9e Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 4 Aug 2024 23:04:11 +0800 Subject: [PATCH 08/12] remove stale codes. --- datafusion/core/src/datasource/statistics.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 83c66bb77bc6..d9cbe4f8ac29 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -50,9 +50,6 @@ pub async fn get_statistics_with_limit( // - neutral element for extreme points. let size = file_schema.fields().len(); let mut col_stats_set = vec![ColumnStatistics::default(); size]; - // let mut null_counts: Vec> = vec![Precision::Absent; size]; - // let mut max_values: Vec> = vec![Precision::Absent; size]; - // let mut min_values: Vec> = vec![Precision::Absent; size]; let mut num_rows = Precision::::Absent; let mut total_byte_size = Precision::::Absent; From 49ca5cb40459864302d8d68c0f4ac8d2720bca02 Mon Sep 17 00:00:00 2001 From: kamille Date: Tue, 6 Aug 2024 15:57:12 +0800 Subject: [PATCH 09/12] optimize `split_files` by using drain. --- .../core/src/datasource/listing/helpers.rs | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index edce56fff305..67af8ef12c8b 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -139,10 +139,22 @@ pub fn split_files( // effectively this is div with rounding up instead of truncating let chunk_size = (partitioned_files.len() + n - 1) / n; - partitioned_files - .chunks_mut(chunk_size) - .map(|c| c.iter_mut().map(mem::take).collect()) - .collect() + let mut chunks = Vec::with_capacity(n); + let mut current_chunk = Vec::with_capacity(chunk_size); + for file in partitioned_files.drain(..) { + current_chunk.push(file); + if current_chunk.len() == chunk_size { + let full_chunk = + mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size)); + chunks.push(full_chunk); + } + } + + if !current_chunk.is_empty() { + chunks.push(current_chunk) + } + + chunks } struct Partition { From 2b56774486d6c98e732c8ddc4abf0930f499e450 Mon Sep 17 00:00:00 2001 From: kamille Date: Tue, 6 Aug 2024 16:01:15 +0800 Subject: [PATCH 10/12] remove default for PartitionedFile. --- datafusion/core/src/datasource/listing/mod.rs | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 1603b05fe490..c5a742d50e53 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -160,24 +160,6 @@ impl From for PartitionedFile { } } -impl Default for PartitionedFile { - fn default() -> Self { - Self { - object_meta: ObjectMeta { - location: Path::default(), - last_modified: chrono::Utc.timestamp_nanos(0), - size: 0, - e_tag: None, - version: None, - }, - partition_values: Vec::new(), - range: None, - statistics: None, - extensions: None, - } - } -} - #[cfg(test)] mod tests { use crate::datasource::listing::ListingTableUrl; From 56cc8eae391828eabfbe9f447685aa6f65cf5fce Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 7 Aug 2024 00:06:13 +0800 Subject: [PATCH 11/12] don't keep `Arc` in `PartitionedFile`. --- datafusion/core/src/datasource/listing/mod.rs | 2 +- datafusion/core/src/datasource/listing/table.rs | 8 +++----- .../core/src/datasource/physical_plan/file_scan_config.rs | 4 ++-- datafusion/core/src/datasource/statistics.rs | 7 +++++-- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index c5a742d50e53..21a60614cff2 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -78,7 +78,7 @@ pub struct PartitionedFile { /// /// DataFusion relies on these statistics for planning (in particular to sort file groups), /// so if they are incorrect, incorrect answers may result. - pub statistics: Option>, + pub statistics: Option, /// An optional field for user defined per object metadata pub extensions: Option>, } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index c971f357c7b8..1b1fc9bc470e 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -973,11 +973,10 @@ impl ListingTable { // collect the statistics if required by the config let files = file_list .map(|part_file| async { - let mut part_file = part_file?; + let part_file = part_file?; if self.options.collect_stat { let statistics = self.do_collect_statistics(ctx, &store, &part_file).await?; - part_file.statistics = Some(statistics.clone()); Ok((part_file, statistics)) } else { Ok(( @@ -1014,8 +1013,7 @@ impl ListingTable { store: &Arc, part_file: &PartitionedFile, ) -> Result> { - let statistics_cache = self.collected_statistics.clone(); - match statistics_cache + match self.collected_statistics .get_with_extra(&part_file.object_meta.location, &part_file.object_meta) { Some(statistics) => Ok(statistics.clone()), @@ -1031,7 +1029,7 @@ impl ListingTable { ) .await?; let statistics = Arc::new(statistics); - statistics_cache.put_with_extra( + self.collected_statistics.put_with_extra( &part_file.object_meta.location, statistics.clone(), &part_file.object_meta, diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 566464cdfab0..17850ea7585a 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -1193,7 +1193,7 @@ mod tests { }, partition_values: vec![ScalarValue::from(file.date)], range: None, - statistics: Some(Arc::new(Statistics { + statistics: Some(Statistics { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: file @@ -1213,7 +1213,7 @@ mod tests { .unwrap_or_default() }) .collect::>(), - })), + }), extensions: None, } } diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index d9cbe4f8ac29..40872f0f8b4a 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -57,9 +57,11 @@ pub async fn get_statistics_with_limit( let mut all_files = Box::pin(all_files.fuse()); if let Some(first_file) = all_files.next().await { - let (file, file_stats) = first_file?; + let (mut file, file_stats) = first_file?; + file.statistics = Some(file_stats.as_ref().clone()); result_files.push(file); + // First file, we set them directly from the file statistics. num_rows = file_stats.num_rows.clone(); total_byte_size = file_stats.total_byte_size.clone(); @@ -81,7 +83,8 @@ pub async fn get_statistics_with_limit( }; if conservative_num_rows <= limit.unwrap_or(usize::MAX) { while let Some(current) = all_files.next().await { - let (file, file_stats) = current?; + let (mut file, file_stats) = current?; + file.statistics = Some(file_stats.as_ref().clone()); result_files.push(file); if !collect_stats { continue; From 302590dfc9f88d865b96c95c9f504aaf5652139e Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 7 Aug 2024 00:09:50 +0800 Subject: [PATCH 12/12] fix pb. --- datafusion/core/src/datasource/listing/table.rs | 3 ++- datafusion/core/src/datasource/statistics.rs | 1 - datafusion/proto/src/physical_plan/from_proto.rs | 6 +----- datafusion/proto/src/physical_plan/to_proto.rs | 2 +- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 1b1fc9bc470e..bb86ac3ae416 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1013,7 +1013,8 @@ impl ListingTable { store: &Arc, part_file: &PartitionedFile, ) -> Result> { - match self.collected_statistics + match self + .collected_statistics .get_with_extra(&part_file.object_meta.location, &part_file.object_meta) { Some(statistics) => Ok(statistics.clone()), diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 40872f0f8b4a..9d031a6bbc85 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -61,7 +61,6 @@ pub async fn get_statistics_with_limit( file.statistics = Some(file_stats.as_ref().clone()); result_files.push(file); - // First file, we set them directly from the file statistics. num_rows = file_stats.num_rows.clone(); total_byte_size = file_stats.total_byte_size.clone(); diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 44fa8a9a717c..bc0a19336bae 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -559,11 +559,7 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: val.range.as_ref().map(|v| v.try_into()).transpose()?, - statistics: val - .statistics - .as_ref() - .map(|v| v.try_into().map(Arc::new)) - .transpose()?, + statistics: val.statistics.as_ref().map(|v| v.try_into()).transpose()?, extensions: None, }) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index ae330e7421d4..57cd22a99ae1 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -527,7 +527,7 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { .map(|v| v.try_into()) .collect::, _>>()?, range: pf.range.as_ref().map(|r| r.try_into()).transpose()?, - statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()), + statistics: pf.statistics.as_ref().map(|s| s.into()), }) } }