Skip to content

Commit 7fa3c1f

Browse files
authored
Minor: move datasource statistics code into its own module (#7391)
* Minor: move datasource statistics code into its own module * fixup
1 parent 32b2330 commit 7fa3c1f

File tree

2 files changed

+178
-158
lines changed

2 files changed

+178
-158
lines changed

datafusion/core/src/datasource/mod.rs

Lines changed: 3 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -28,174 +28,19 @@ pub mod listing_table_factory;
2828
pub mod memory;
2929
pub mod physical_plan;
3030
pub mod provider;
31+
mod statistics;
3132
pub mod streaming;
3233
pub mod view;
3334

3435
// backwards compatibility
3536
pub use datafusion_execution::object_store;
3637

37-
use futures::Stream;
38-
3938
pub use self::default_table_source::{
4039
provider_as_source, source_as_provider, DefaultTableSource,
4140
};
42-
use self::listing::PartitionedFile;
4341
pub use self::memory::MemTable;
4442
pub use self::provider::TableProvider;
4543
pub use self::view::ViewTable;
46-
use crate::arrow::datatypes::{Schema, SchemaRef};
47-
use crate::error::Result;
4844
pub use crate::logical_expr::TableType;
49-
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
50-
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
51-
use futures::StreamExt;
52-
53-
/// Get all files as well as the file level summary statistics (no statistic for partition columns).
54-
/// If the optional `limit` is provided, includes only sufficient files.
55-
/// Needed to read up to `limit` number of rows.
56-
pub async fn get_statistics_with_limit(
57-
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
58-
file_schema: SchemaRef,
59-
limit: Option<usize>,
60-
) -> Result<(Vec<PartitionedFile>, Statistics)> {
61-
let mut result_files = vec![];
62-
63-
let mut null_counts = vec![0; file_schema.fields().len()];
64-
let mut has_statistics = false;
65-
let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);
66-
67-
let mut is_exact = true;
68-
69-
// The number of rows and the total byte size can be calculated as long as
70-
// at least one file has them. If none of the files provide them, then they
71-
// will be omitted from the statistics. The missing values will be counted
72-
// as zero.
73-
let mut num_rows = None;
74-
let mut total_byte_size = None;
75-
76-
// fusing the stream allows us to call next safely even once it is finished
77-
let mut all_files = Box::pin(all_files.fuse());
78-
while let Some(res) = all_files.next().await {
79-
let (file, file_stats) = res?;
80-
result_files.push(file);
81-
is_exact &= file_stats.is_exact;
82-
num_rows = if let Some(num_rows) = num_rows {
83-
Some(num_rows + file_stats.num_rows.unwrap_or(0))
84-
} else {
85-
file_stats.num_rows
86-
};
87-
total_byte_size = if let Some(total_byte_size) = total_byte_size {
88-
Some(total_byte_size + file_stats.total_byte_size.unwrap_or(0))
89-
} else {
90-
file_stats.total_byte_size
91-
};
92-
if let Some(vec) = &file_stats.column_statistics {
93-
has_statistics = true;
94-
for (i, cs) in vec.iter().enumerate() {
95-
null_counts[i] += cs.null_count.unwrap_or(0);
96-
97-
if let Some(max_value) = &mut max_values[i] {
98-
if let Some(file_max) = cs.max_value.clone() {
99-
match max_value.update_batch(&[file_max.to_array()]) {
100-
Ok(_) => {}
101-
Err(_) => {
102-
max_values[i] = None;
103-
}
104-
}
105-
} else {
106-
max_values[i] = None;
107-
}
108-
}
109-
110-
if let Some(min_value) = &mut min_values[i] {
111-
if let Some(file_min) = cs.min_value.clone() {
112-
match min_value.update_batch(&[file_min.to_array()]) {
113-
Ok(_) => {}
114-
Err(_) => {
115-
min_values[i] = None;
116-
}
117-
}
118-
} else {
119-
min_values[i] = None;
120-
}
121-
}
122-
}
123-
}
124-
125-
// If the number of rows exceeds the limit, we can stop processing
126-
// files. This only applies when we know the number of rows. It also
127-
// currently ignores tables that have no statistics regarding the
128-
// number of rows.
129-
if num_rows.unwrap_or(usize::MIN) > limit.unwrap_or(usize::MAX) {
130-
break;
131-
}
132-
}
133-
// if we still have files in the stream, it means that the limit kicked
134-
// in and that the statistic could have been different if we processed
135-
// the files in a different order.
136-
if all_files.next().await.is_some() {
137-
is_exact = false;
138-
}
139-
140-
let column_stats = if has_statistics {
141-
Some(get_col_stats(
142-
&file_schema,
143-
null_counts,
144-
&mut max_values,
145-
&mut min_values,
146-
))
147-
} else {
148-
None
149-
};
150-
151-
let statistics = Statistics {
152-
num_rows,
153-
total_byte_size,
154-
column_statistics: column_stats,
155-
is_exact,
156-
};
157-
158-
Ok((result_files, statistics))
159-
}
160-
161-
fn create_max_min_accs(
162-
schema: &Schema,
163-
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
164-
let max_values: Vec<Option<MaxAccumulator>> = schema
165-
.fields()
166-
.iter()
167-
.map(|field| MaxAccumulator::try_new(field.data_type()).ok())
168-
.collect::<Vec<_>>();
169-
let min_values: Vec<Option<MinAccumulator>> = schema
170-
.fields()
171-
.iter()
172-
.map(|field| MinAccumulator::try_new(field.data_type()).ok())
173-
.collect::<Vec<_>>();
174-
(max_values, min_values)
175-
}
176-
177-
fn get_col_stats(
178-
schema: &Schema,
179-
null_counts: Vec<usize>,
180-
max_values: &mut [Option<MaxAccumulator>],
181-
min_values: &mut [Option<MinAccumulator>],
182-
) -> Vec<ColumnStatistics> {
183-
(0..schema.fields().len())
184-
.map(|i| {
185-
let max_value = match &max_values[i] {
186-
Some(max_value) => max_value.evaluate().ok(),
187-
None => None,
188-
};
189-
let min_value = match &min_values[i] {
190-
Some(min_value) => min_value.evaluate().ok(),
191-
None => None,
192-
};
193-
ColumnStatistics {
194-
null_count: Some(null_counts[i]),
195-
max_value,
196-
min_value,
197-
distinct_count: None,
198-
}
199-
})
200-
.collect()
201-
}
45+
pub use statistics::get_statistics_with_limit;
46+
pub(crate) use statistics::{create_max_min_accs, get_col_stats};
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::arrow::datatypes::{Schema, SchemaRef};
19+
use crate::error::Result;
20+
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
21+
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
22+
use futures::Stream;
23+
use futures::StreamExt;
24+
25+
use super::listing::PartitionedFile;
26+
27+
/// Get all files as well as the file level summary statistics (no statistic for partition columns).
28+
/// If the optional `limit` is provided, includes only sufficient files.
29+
/// Needed to read up to `limit` number of rows.
30+
pub async fn get_statistics_with_limit(
31+
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
32+
file_schema: SchemaRef,
33+
limit: Option<usize>,
34+
) -> Result<(Vec<PartitionedFile>, Statistics)> {
35+
let mut result_files = vec![];
36+
37+
let mut null_counts = vec![0; file_schema.fields().len()];
38+
let mut has_statistics = false;
39+
let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);
40+
41+
let mut is_exact = true;
42+
43+
// The number of rows and the total byte size can be calculated as long as
44+
// at least one file has them. If none of the files provide them, then they
45+
// will be omitted from the statistics. The missing values will be counted
46+
// as zero.
47+
let mut num_rows = None;
48+
let mut total_byte_size = None;
49+
50+
// fusing the stream allows us to call next safely even once it is finished
51+
let mut all_files = Box::pin(all_files.fuse());
52+
while let Some(res) = all_files.next().await {
53+
let (file, file_stats) = res?;
54+
result_files.push(file);
55+
is_exact &= file_stats.is_exact;
56+
num_rows = if let Some(num_rows) = num_rows {
57+
Some(num_rows + file_stats.num_rows.unwrap_or(0))
58+
} else {
59+
file_stats.num_rows
60+
};
61+
total_byte_size = if let Some(total_byte_size) = total_byte_size {
62+
Some(total_byte_size + file_stats.total_byte_size.unwrap_or(0))
63+
} else {
64+
file_stats.total_byte_size
65+
};
66+
if let Some(vec) = &file_stats.column_statistics {
67+
has_statistics = true;
68+
for (i, cs) in vec.iter().enumerate() {
69+
null_counts[i] += cs.null_count.unwrap_or(0);
70+
71+
if let Some(max_value) = &mut max_values[i] {
72+
if let Some(file_max) = cs.max_value.clone() {
73+
match max_value.update_batch(&[file_max.to_array()]) {
74+
Ok(_) => {}
75+
Err(_) => {
76+
max_values[i] = None;
77+
}
78+
}
79+
} else {
80+
max_values[i] = None;
81+
}
82+
}
83+
84+
if let Some(min_value) = &mut min_values[i] {
85+
if let Some(file_min) = cs.min_value.clone() {
86+
match min_value.update_batch(&[file_min.to_array()]) {
87+
Ok(_) => {}
88+
Err(_) => {
89+
min_values[i] = None;
90+
}
91+
}
92+
} else {
93+
min_values[i] = None;
94+
}
95+
}
96+
}
97+
}
98+
99+
// If the number of rows exceeds the limit, we can stop processing
100+
// files. This only applies when we know the number of rows. It also
101+
// currently ignores tables that have no statistics regarding the
102+
// number of rows.
103+
if num_rows.unwrap_or(usize::MIN) > limit.unwrap_or(usize::MAX) {
104+
break;
105+
}
106+
}
107+
// if we still have files in the stream, it means that the limit kicked
108+
// in and that the statistic could have been different if we processed
109+
// the files in a different order.
110+
if all_files.next().await.is_some() {
111+
is_exact = false;
112+
}
113+
114+
let column_stats = if has_statistics {
115+
Some(get_col_stats(
116+
&file_schema,
117+
null_counts,
118+
&mut max_values,
119+
&mut min_values,
120+
))
121+
} else {
122+
None
123+
};
124+
125+
let statistics = Statistics {
126+
num_rows,
127+
total_byte_size,
128+
column_statistics: column_stats,
129+
is_exact,
130+
};
131+
132+
Ok((result_files, statistics))
133+
}
134+
135+
pub(crate) fn create_max_min_accs(
136+
schema: &Schema,
137+
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
138+
let max_values: Vec<Option<MaxAccumulator>> = schema
139+
.fields()
140+
.iter()
141+
.map(|field| MaxAccumulator::try_new(field.data_type()).ok())
142+
.collect::<Vec<_>>();
143+
let min_values: Vec<Option<MinAccumulator>> = schema
144+
.fields()
145+
.iter()
146+
.map(|field| MinAccumulator::try_new(field.data_type()).ok())
147+
.collect::<Vec<_>>();
148+
(max_values, min_values)
149+
}
150+
151+
pub(crate) fn get_col_stats(
152+
schema: &Schema,
153+
null_counts: Vec<usize>,
154+
max_values: &mut [Option<MaxAccumulator>],
155+
min_values: &mut [Option<MinAccumulator>],
156+
) -> Vec<ColumnStatistics> {
157+
(0..schema.fields().len())
158+
.map(|i| {
159+
let max_value = match &max_values[i] {
160+
Some(max_value) => max_value.evaluate().ok(),
161+
None => None,
162+
};
163+
let min_value = match &min_values[i] {
164+
Some(min_value) => min_value.evaluate().ok(),
165+
None => None,
166+
};
167+
ColumnStatistics {
168+
null_count: Some(null_counts[i]),
169+
max_value,
170+
min_value,
171+
distinct_count: None,
172+
}
173+
})
174+
.collect()
175+
}

0 commit comments

Comments
 (0)