Skip to content

Commit

Permalink
thread through statistics analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 16, 2024
1 parent 266b932 commit 55f92ed
Showing 1 changed file with 126 additions and 58 deletions.
184 changes: 126 additions & 58 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt64Array};
use arrow_schema::SchemaRef;
use arrow::array::{Array, ArrayRef, AsArray, Int32Array, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{SchemaRef};
use async_trait::async_trait;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{
Expand All @@ -33,8 +33,7 @@ use datafusion::prelude::*;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, Result,
Statistics,
internal_datafusion_err, DFSchema, DataFusionError, Result, Statistics,
};
use datafusion_expr::utils::conjunction;
use datafusion_expr::TableType;
Expand All @@ -44,6 +43,7 @@ use std::fs::{DirEntry, File};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use arrow::datatypes::{Int32Type, UInt64Type};
use tempfile::TempDir;
use url::Url;

Expand Down Expand Up @@ -116,8 +116,6 @@ pub struct IndexTableProvider {
files: Vec<Vec<PartitionedFile>>,
}



impl IndexTableProvider {
/// Create a new IndexTableProvider
pub fn try_new(dir: impl Into<PathBuf>) -> Result<Self> {
Expand All @@ -133,7 +131,6 @@ impl IndexTableProvider {

let index = index_builder.build()?;


// todo make a nicer API for this ("partitioned files" from directory)
let files = files
.iter()
Expand All @@ -151,21 +148,6 @@ impl IndexTableProvider {
}
}

/// Get the schema for the parquet files in the given directory, assuming they
/// are all the same
///
/// Note this could be done with SchemaAdapter once DataFusion 39 is released
///
fn get_schema(file: &Path) -> Result<SchemaRef> {
let file = File::open(file).map_err(|e| {
DataFusionError::from(e).context(format!("Error opening file {file:?}"))
})?;

let reader = ParquetRecordBatchReaderBuilder::try_new(file)?;

Ok(reader.schema().clone())
}

#[async_trait]
impl TableProvider for IndexTableProvider {
fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -232,19 +214,20 @@ impl TableProvider for IndexTableProvider {
}
}

/// This is very simple in memory index for a set of parquet files
/// Simple in memory index for a set of parquet files
///
/// The index is represented as an arrow `RecordBatch` that can be passed
/// directly by the DataFusion [`PruningPredicate`] API
///
/// The index looks like
/// | file_name | file_size | row_count | value_column_null_count | value_column_min | value_column_max |
/// |-----------|-----------|-----------|-------------------------|------------------|------------------|
/// | file1.parquet | 1000 | 100 | 0 | 0 | 100 |
/// | file2.parquet | 1000 | 100 | 0 | 100 | 200 |
/// | file3.parquet | 1000 | 100 | 0 | 200 | 300 |
/// | file_name | file_size | row_count | value_column_min | value_column_max |
/// |-----------|-----------|-----------|------------------|------------------|
/// | file1.parquet | 1000 | 100 | 0 | 100 |
/// | file2.parquet | 1000 | 100 | 100 | 200 |
/// | file3.parquet | 1000 | 100 | 200 | 300 |
///
/// // TODO eventually store this information for each row group within a file
/// Note a more advanced index would store this information for each row group
/// within a file

struct ParquetMetadataIndex {
file_schema: SchemaRef,
Expand All @@ -264,62 +247,151 @@ struct ParquetMetadataIndexBuilder {
filenames: Vec<String>,
file_sizes: Vec<u64>,
row_counts: Vec<u64>,
value_column_null_counts: Vec<u64>,
/// Holds the min/max value of the value column for each file as a single
/// value array.
///
/// In the more advanced parquet index, this will hold values for each row
/// group within a file
value_column_mins: Vec<ArrayRef>,
value_column_maxs: Vec<ArrayRef>,
/// Holds the min/max value of the value column
value_column_mins: Vec<i32>,
value_column_maxs: Vec<i32>,
}


impl ParquetMetadataIndexBuilder {
fn new() -> Self {
Self::default()
}

/// Add a file to the index
fn add_file(&mut self, filename: &Path) -> Result<()> {
println!("Adding file {:?}", filename);
fn add_file(&mut self, file: &Path) -> Result<()> {
let file_name = file
.file_name()
.ok_or_else(|| internal_datafusion_err!("No filename"))?
.to_str()
.ok_or_else(|| internal_datafusion_err!("Invalid filename"))?;
let file_size = file.metadata()?.len();

println!("Adding file {file_name}");

let file = File::open(file).map_err(|e| {
DataFusionError::from(e).context(format!("Error opening file {file:?}"))
})?;

let reader = ParquetRecordBatchReaderBuilder::try_new(file)?;

// Get the schema of the file. A real system might have to handle the
// case where the schema of the file is not the same as the schema of
// the other files e.g. using SchemaAdapter.
if self.file_schema.is_none() {
self.file_schema = Some(get_schema(filename)?);
self.file_schema = Some(reader.schema().clone());
}



// extract the statistics from the file
let metadata = reader.metadata();

// TODO: extract the min/max values for each row group
let stats = parquet_stats_to_arrow("value", &reader)?;
// our example has no nulls, so this is a sanity check
assert_eq!(stats.row_count.null_count(), 0);
assert_eq!(stats.min.null_count(), 0);
assert_eq!(stats.max.null_count(), 0);

let row_count = stats.row_count.as_primitive::<UInt64Type>()
.iter().flatten().sum::<u64>();

let value_column_min = stats.min.as_primitive::<Int32Type>()
.iter().flatten().min().unwrap_or_default();
let value_column_max = stats.min.as_primitive::<Int32Type>()
.iter().flatten().min().unwrap_or_default();

// sanity check the statistics
assert_eq!(row_count, metadata.file_metadata().num_rows() as u64);


self.add_row(file_name, file_size, row_count, value_column_min, value_column_max);
todo!();
}

/// Add a single row values to all the in progress rows
fn add_row(
&mut self,
file_name: impl Into<String>,
file_size: u64,
row_count: u64,
value_column_min: i32,
value_column_max: i32,
) {
self.filenames.push(file_name.into());
self.file_sizes.push(file_size);
self.row_counts.push(row_count);
self.value_column_mins.push(value_column_min);
self.value_column_maxs.push(value_column_max);
}

/// Build the index from the files added
fn build(self) -> Result<ParquetMetadataIndex> {
let Some(file_schema) = self.file_schema else {
return Err(internal_datafusion_err!("No files added to index"));
};

let value_column_mins = concat_arrays(&self.value_column_mins)?;
let value_column_maxs = concat_arrays(&self.value_column_maxs)?;

let index = RecordBatch::try_from_iter(vec![
("file_name", Arc::new(StringArray::from(self.filenames)) as ArrayRef),
("file_size", Arc::new(UInt64Array::from(self.file_sizes)) as ArrayRef),
("row_count", Arc::new(UInt64Array::from(self.row_counts)) as ArrayRef),
("value_column_null_count", Arc::new(UInt64Array::from(self.value_column_null_counts)) as ArrayRef),
("value_column_min", value_column_mins),
("value_column_max", value_column_maxs),
(
"file_name",
Arc::new(StringArray::from(self.filenames)) as ArrayRef,
),
(
"file_size",
Arc::new(UInt64Array::from(self.file_sizes)) as ArrayRef,
),
(
"row_count",
Arc::new(UInt64Array::from(self.row_counts)) as ArrayRef,
),
(
"value_column_min",
Arc::new(Int32Array::from(self.value_column_mins)) as ArrayRef,
),
(
"value_column_max",
Arc::new(Int32Array::from(self.value_column_maxs)) as ArrayRef,
),
])?;

Ok(ParquetMetadataIndex { file_schema, index })
}
}

/// TODO use the new
/// API from https://github.com/apache/datafusion/issues/10453
pub struct ArrowStatistics {
/// min values
min: ArrayRef,
/// max values
max: ArrayRef,
/// Row counts (UInt64Array)
row_count: ArrayRef,
/// Null Counts (UInt64Array)
null_count: ArrayRef,
}

/// extract the minimum value in the statistics for the given column, if any
pub fn parquet_stats_to_arrow(
column_name: &str,
// todo only take the fields of this we need
parquet_record_batch_reader_builder: &ParquetRecordBatchReaderBuilder<File>
) -> Result<ArrowStatistics> {

todo!();

}





/// Return a list of the directory entries in the given directory, sorted by name
fn read_dir(dir: &Path) -> Result<Vec<DirEntry>> {
fn read_dir(dir: &Path) -> Result<Vec<DirEntry>> {
let mut files = dir
.read_dir()
.map_err(|e| {
DataFusionError::from(e)
.context(format!("Error reading directory {dir:?}"))
DataFusionError::from(e).context(format!("Error reading directory {dir:?}"))
})?
.map(|entry| {
entry.map_err(|e| {
Expand All @@ -332,11 +404,7 @@ fn read_dir(dir: &Path) -> Result<Vec<DirEntry>> {
Ok(files)
}

fn concat_arrays(arrays: &[ArrayRef]) -> Result<ArrayRef> {
// Need to use refs for some reason
let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
Ok(arrow::compute::concat(&arrays)?)
}


/// Demonstration Data
///
Expand Down

0 comments on commit 55f92ed

Please sign in to comment.