Skip to content

Commit 56cc8ea

Browse files
committed
don't keep Arc<Statistic> in PartitionedFile.
1 parent 2b56774 commit 56cc8ea

File tree

4 files changed

+11
-10
lines changed

4 files changed

+11
-10
lines changed

datafusion/core/src/datasource/listing/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ pub struct PartitionedFile {
7878
///
7979
/// DataFusion relies on these statistics for planning (in particular to sort file groups),
8080
/// so if they are incorrect, incorrect answers may result.
81-
pub statistics: Option<Arc<Statistics>>,
81+
pub statistics: Option<Statistics>,
8282
/// An optional field for user defined per object metadata
8383
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
8484
}

datafusion/core/src/datasource/listing/table.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -973,11 +973,10 @@ impl ListingTable {
973973
// collect the statistics if required by the config
974974
let files = file_list
975975
.map(|part_file| async {
976-
let mut part_file = part_file?;
976+
let part_file = part_file?;
977977
if self.options.collect_stat {
978978
let statistics =
979979
self.do_collect_statistics(ctx, &store, &part_file).await?;
980-
part_file.statistics = Some(statistics.clone());
981980
Ok((part_file, statistics))
982981
} else {
983982
Ok((
@@ -1014,8 +1013,7 @@ impl ListingTable {
10141013
store: &Arc<dyn ObjectStore>,
10151014
part_file: &PartitionedFile,
10161015
) -> Result<Arc<Statistics>> {
1017-
let statistics_cache = self.collected_statistics.clone();
1018-
match statistics_cache
1016+
match self.collected_statistics
10191017
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
10201018
{
10211019
Some(statistics) => Ok(statistics.clone()),
@@ -1031,7 +1029,7 @@ impl ListingTable {
10311029
)
10321030
.await?;
10331031
let statistics = Arc::new(statistics);
1034-
statistics_cache.put_with_extra(
1032+
self.collected_statistics.put_with_extra(
10351033
&part_file.object_meta.location,
10361034
statistics.clone(),
10371035
&part_file.object_meta,

datafusion/core/src/datasource/physical_plan/file_scan_config.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1193,7 +1193,7 @@ mod tests {
11931193
},
11941194
partition_values: vec![ScalarValue::from(file.date)],
11951195
range: None,
1196-
statistics: Some(Arc::new(Statistics {
1196+
statistics: Some(Statistics {
11971197
num_rows: Precision::Absent,
11981198
total_byte_size: Precision::Absent,
11991199
column_statistics: file
@@ -1213,7 +1213,7 @@ mod tests {
12131213
.unwrap_or_default()
12141214
})
12151215
.collect::<Vec<_>>(),
1216-
})),
1216+
}),
12171217
extensions: None,
12181218
}
12191219
}

datafusion/core/src/datasource/statistics.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,11 @@ pub async fn get_statistics_with_limit(
5757
let mut all_files = Box::pin(all_files.fuse());
5858

5959
if let Some(first_file) = all_files.next().await {
60-
let (file, file_stats) = first_file?;
60+
let (mut file, file_stats) = first_file?;
61+
file.statistics = Some(file_stats.as_ref().clone());
6162
result_files.push(file);
6263

64+
6365
// First file, we set them directly from the file statistics.
6466
num_rows = file_stats.num_rows.clone();
6567
total_byte_size = file_stats.total_byte_size.clone();
@@ -81,7 +83,8 @@ pub async fn get_statistics_with_limit(
8183
};
8284
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
8385
while let Some(current) = all_files.next().await {
84-
let (file, file_stats) = current?;
86+
let (mut file, file_stats) = current?;
87+
file.statistics = Some(file_stats.as_ref().clone());
8588
result_files.push(file);
8689
if !collect_stats {
8790
continue;

0 commit comments

Comments
 (0)