Skip to content

Commit d5841bb

Browse files
authored
Merge branch 'main' into convert-variance-sample-to-udaf
2 parents e4e9e57 + 2796e01 commit d5841bb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+2340
-607
lines changed

benchmarks/bench.sh

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ all(default): Data/Run/Compare for all benchmarks
6969
tpch: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table
7070
tpch_mem: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), query from memory
7171
tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table
72-
tpch10_mem: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory
72+
tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory
7373
parquet: Benchmark of parquet reader's filtering speed
7474
sort: Benchmark of sorting speed
7575
clickbench_1: ClickBench queries against a single parquet file
@@ -243,9 +243,7 @@ main() {
243243
echo "Done"
244244
;;
245245
compare)
246-
BRANCH1=$1
247-
BRANCH2=$2
248-
compare_benchmarks
246+
compare_benchmarks "$ARG2" "$ARG3"
249247
;;
250248
"")
251249
usage
@@ -446,8 +444,8 @@ run_clickbench_extended() {
446444

447445
compare_benchmarks() {
448446
BASE_RESULTS_DIR="${SCRIPT_DIR}/results"
449-
BRANCH1="${ARG2}"
450-
BRANCH2="${ARG3}"
447+
BRANCH1="$1"
448+
BRANCH2="$2"
451449
if [ -z "$BRANCH1" ] ; then
452450
echo "<branch1> not specified. Available branches:"
453451
ls -1 "${BASE_RESULTS_DIR}"

datafusion/common/src/scalar/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ use arrow::{
4545
compute::kernels::cast::{cast_with_options, CastOptions},
4646
datatypes::{
4747
i256, ArrowDictionaryKeyType, ArrowNativeType, ArrowTimestampType, DataType,
48-
Date32Type, Field, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type,
49-
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
48+
Date32Type, Date64Type, Field, Float32Type, Int16Type, Int32Type, Int64Type,
49+
Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
5050
IntervalYearMonthType, TimeUnit, TimestampMicrosecondType,
5151
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
5252
UInt16Type, UInt32Type, UInt64Type, UInt8Type, DECIMAL128_MAX_PRECISION,
@@ -3179,8 +3179,12 @@ impl fmt::Display for ScalarValue {
31793179
ScalarValue::List(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
31803180
ScalarValue::LargeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
31813181
ScalarValue::FixedSizeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
3182-
ScalarValue::Date32(e) => format_option!(f, e)?,
3183-
ScalarValue::Date64(e) => format_option!(f, e)?,
3182+
ScalarValue::Date32(e) => {
3183+
format_option!(f, e.map(|v| Date32Type::to_naive_date(v).to_string()))?
3184+
}
3185+
ScalarValue::Date64(e) => {
3186+
format_option!(f, e.map(|v| Date64Type::to_naive_date(v).to_string()))?
3187+
}
31843188
ScalarValue::Time32Second(e) => format_option!(f, e)?,
31853189
ScalarValue::Time32Millisecond(e) => format_option!(f, e)?,
31863190
ScalarValue::Time64Microsecond(e) => format_option!(f, e)?,

datafusion/core/src/dataframe/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1035,7 +1035,9 @@ impl DataFrame {
10351035
}
10361036

10371037
/// Return a reference to the unoptimized [`LogicalPlan`] that comprises
1038-
/// this DataFrame. See [`Self::into_unoptimized_plan`] for more details.
1038+
/// this DataFrame.
1039+
///
1040+
/// See [`Self::into_unoptimized_plan`] for more details.
10391041
pub fn logical_plan(&self) -> &LogicalPlan {
10401042
&self.plan
10411043
}
@@ -1052,6 +1054,9 @@ impl DataFrame {
10521054
/// snapshot of the [`SessionState`] attached to this [`DataFrame`] and
10531055
/// consequently subsequent operations may take place against a different
10541056
/// state (e.g. a different value of `now()`)
1057+
///
1058+
/// See [`Self::into_parts`] to retrieve the owned [`LogicalPlan`] and
1059+
/// corresponding [`SessionState`].
10551060
pub fn into_unoptimized_plan(self) -> LogicalPlan {
10561061
self.plan
10571062
}

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 114 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,84 @@ use crate::datasource::schema_adapter::{
7676
pub use metrics::ParquetFileMetrics;
7777
pub use statistics::{RequestedStatistics, StatisticsConverter};
7878

79-
/// Execution plan for scanning one or more Parquet partitions
79+
/// Execution plan for reading one or more Parquet files.
80+
///
81+
/// ```text
82+
/// ▲
83+
/// │
84+
/// │ Produce a stream of
85+
/// │ RecordBatches
86+
/// │
87+
/// ┌───────────────────────┐
88+
/// │ │
89+
/// │ ParquetExec │
90+
/// │ │
91+
/// └───────────────────────┘
92+
/// ▲
93+
/// │ Asynchronously read from one
94+
/// │ or more parquet files via
95+
/// │ ObjectStore interface
96+
/// │
97+
/// │
98+
/// .───────────────────.
99+
/// │ )
100+
/// │`───────────────────'│
101+
/// │ ObjectStore │
102+
/// │.───────────────────.│
103+
/// │ )
104+
/// `───────────────────'
105+
///
106+
/// ```
107+
/// # Features
108+
///
109+
/// Supports the following optimizations:
110+
///
111+
/// * Concurrent reads: Can read from one or more files in parallel as multiple
112+
/// partitions, including concurrently reading multiple row groups from a single
113+
/// file.
114+
///
115+
/// * Predicate push down: skips row groups and pages based on
116+
/// min/max/null_counts in the row group metadata, the page index and bloom
117+
/// filters.
118+
///
119+
/// * Projection pushdown: reads and decodes only the columns required.
120+
///
121+
/// * Limit pushdown: stop execution early after some number of rows are read.
122+
///
123+
/// * Custom readers: customize reading parquet files, e.g. to cache metadata,
124+
/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
125+
/// details.
126+
///
127+
/// * Schema adapters: read parquet files with different schemas into a unified
128+
/// table schema. This can be used to implement "schema evolution". See
129+
/// [`SchemaAdapterFactory`] for more details.
130+
///
131+
/// * metadata_size_hint: controls the number of bytes read from the end of the
132+
/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
133+
/// custom reader is used, it supplies the metadata directly and this parameter
134+
/// is ignored. See [`Self::with_parquet_file_reader_factory`] for more details.
135+
///
136+
/// # Execution Overview
137+
///
138+
/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
139+
/// configured to open parquet files with a [`ParquetOpener`].
140+
///
141+
/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open
142+
/// the file.
143+
///
144+
/// * Step 3: The `ParquetOpener` gets the file metadata via
145+
/// [`ParquetFileReaderFactory`] and applies any predicates
146+
/// and projections to determine what pages must be read.
147+
///
148+
/// * Step 4: The stream begins reading data, fetching the required pages
149+
/// and incrementally decoding them.
150+
///
151+
/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a
152+
/// [`SchemaAdapter`] to match the table schema. By default missing columns are
153+
/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
154+
///
155+
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
156+
/// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter
80157
#[derive(Debug, Clone)]
81158
pub struct ParquetExec {
82159
/// Base configuration for this scan
@@ -86,9 +163,9 @@ pub struct ParquetExec {
86163
metrics: ExecutionPlanMetricsSet,
87164
/// Optional predicate for row filtering during parquet scan
88165
predicate: Option<Arc<dyn PhysicalExpr>>,
89-
/// Optional predicate for pruning row groups
166+
/// Optional predicate for pruning row groups (derived from `predicate`)
90167
pruning_predicate: Option<Arc<PruningPredicate>>,
91-
/// Optional predicate for pruning pages
168+
/// Optional predicate for pruning pages (derived from `predicate`)
92169
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
93170
/// Optional hint for the size of the parquet metadata
94171
metadata_size_hint: Option<usize>,
@@ -190,11 +267,13 @@ impl ParquetExec {
190267

191268
/// Optional user defined parquet file reader factory.
192269
///
193-
/// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom
194-
/// implementation for data access operations.
270+
/// You can use [`ParquetFileReaderFactory`] to more precisely control how
271+
/// data is read from parquet files (e.g. skip re-reading metadata, coalesce
272+
/// I/O operations, etc).
195273
///
196-
/// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed
197-
/// to this factory instead of `ObjectStore`.
274+
/// The default reader factory reads directly from an [`ObjectStore`]
275+
/// instance using individual I/O operations for the footer and then for
276+
/// each page.
198277
pub fn with_parquet_file_reader_factory(
199278
mut self,
200279
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
@@ -643,11 +722,21 @@ fn should_enable_page_index(
643722
.unwrap_or(false)
644723
}
645724

646-
/// Factory of parquet file readers.
725+
/// Interface for reading parquet files.
647726
///
648-
/// Provides means to implement custom data access interface.
727+
/// The combined implementations of [`ParquetFileReaderFactory`] and
728+
/// [`AsyncFileReader`] can be used to provide custom data access operations
729+
/// such as pre-cached data, I/O coalescing, etc.
730+
///
731+
/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
649732
pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
650-
/// Provides `AsyncFileReader` over parquet file specified in `FileMeta`
733+
/// Provides an `AsyncFileReader` for reading data from a parquet file specified
734+
///
735+
/// # Arguments
736+
/// * partition_index - Index of the partition (for reporting metrics)
737+
/// * file_meta - The file to be read
738+
/// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer
739+
/// * metrics - Execution metrics
651740
fn create_reader(
652741
&self,
653742
partition_index: usize,
@@ -657,20 +746,32 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
657746
) -> Result<Box<dyn AsyncFileReader + Send>>;
658747
}
659748

660-
/// Default parquet reader factory.
749+
/// Default implementation of [`ParquetFileReaderFactory`]
750+
///
751+
/// This implementation:
752+
/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
753+
/// 2. Reads the footer and page metadata on demand.
754+
/// 3. Does not cache metadata or coalesce I/O operations.
661755
#[derive(Debug)]
662756
pub struct DefaultParquetFileReaderFactory {
663757
store: Arc<dyn ObjectStore>,
664758
}
665759

666760
impl DefaultParquetFileReaderFactory {
667-
/// Create a factory.
761+
/// Create a new `DefaultParquetFileReaderFactory`.
668762
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
669763
Self { store }
670764
}
671765
}
672766

673-
/// Implements [`AsyncFileReader`] for a parquet file in object storage
767+
/// Implements [`AsyncFileReader`] for a parquet file in object storage.
768+
///
769+
/// This implementation uses the [`ParquetObjectReader`] to read data from the
770+
/// object store on demand, as required, tracking the number of bytes read.
771+
///
772+
/// This implementation does not coalesce I/O operations or cache bytes. Such
773+
/// optimizations can be done either at the object store level or by providing a
774+
/// custom implementation of [`ParquetFileReaderFactory`].
674775
pub(crate) struct ParquetFileReader {
675776
file_metrics: ParquetFileMetrics,
676777
inner: ParquetObjectReader,

datafusion/core/src/datasource/physical_plan/parquet/statistics.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ macro_rules! get_statistic {
8181
Some(DataType::Int16) => {
8282
Some(ScalarValue::Int16(Some((*s.$func()).try_into().unwrap())))
8383
}
84+
Some(DataType::UInt8) => {
85+
Some(ScalarValue::UInt8(Some((*s.$func()).try_into().unwrap())))
86+
}
87+
Some(DataType::UInt16) => {
88+
Some(ScalarValue::UInt16(Some((*s.$func()).try_into().unwrap())))
89+
}
90+
Some(DataType::UInt32) => {
91+
Some(ScalarValue::UInt32(Some((*s.$func()) as u32)))
92+
}
8493
Some(DataType::Date32) => {
8594
Some(ScalarValue::Date32(Some(*s.$func())))
8695
}
@@ -100,6 +109,9 @@ macro_rules! get_statistic {
100109
*scale,
101110
))
102111
}
112+
Some(DataType::UInt64) => {
113+
Some(ScalarValue::UInt64(Some((*s.$func()) as u64)))
114+
}
103115
_ => Some(ScalarValue::Int64(Some(*s.$func()))),
104116
}
105117
}

datafusion/core/src/datasource/schema_adapter.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Schema Adapter provides a method of translating the RecordBatches that come out of the
18+
//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema.
19+
//!
20+
//! Adapter provides a method of translating the RecordBatches that come out of the
1921
//! physical format into how they should be used by DataFusion. For instance, a schema
2022
//! can be stored external to a parquet file that maps parquet logical types to arrow types.
2123
@@ -26,35 +28,38 @@ use datafusion_common::plan_err;
2628
use std::fmt::Debug;
2729
use std::sync::Arc;
2830

29-
/// Factory of schema adapters.
31+
/// Factory for creating [`SchemaAdapter`]
3032
///
31-
/// Provides means to implement custom schema adaptation.
33+
/// This interface provides a way to implement custom schema adaptation logic
34+
/// for ParquetExec (for example, to fill missing columns with default value
35+
/// other than null)
3236
pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
3337
/// Provides `SchemaAdapter`.
3438
fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
3539
}
3640

37-
/// A utility which can adapt file-level record batches to a table schema which may have a schema
41+
/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a schema
3842
/// obtained from merging multiple file-level schemas.
3943
///
4044
/// This is useful for enabling schema evolution in partitioned datasets.
4145
///
4246
/// This has to be done in two stages.
4347
///
44-
/// 1. Before reading the file, we have to map projected column indexes from the table schema to
45-
/// the file schema.
48+
/// 1. Before reading the file, we have to map projected column indexes from the
49+
/// table schema to the file schema.
4650
///
47-
/// 2. After reading a record batch we need to map the read columns back to the expected columns
48-
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
49-
/// in the table schema.
51+
/// 2. After reading a record batch map the read columns back to the expected
52+
/// columns indexes and insert null-valued columns wherever the file schema was
53+
/// missing a column present in the table schema.
5054
pub trait SchemaAdapter: Send + Sync {
5155
/// Map a column index in the table schema to a column index in a particular
5256
/// file schema
5357
///
5458
/// Panics if index is not in range for the table schema
5559
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;
5660

57-
/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
61+
/// Creates a `SchemaMapping` that can be used to cast or map the columns
62+
/// from the file schema to the table schema.
5863
///
5964
/// If the provided `file_schema` contains columns of a different type to the expected
6065
/// `table_schema`, the method will attempt to cast the array data from the file schema
@@ -68,7 +73,8 @@ pub trait SchemaAdapter: Send + Sync {
6873
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
6974
}
7075

71-
/// Transforms a RecordBatch from the physical layer to a RecordBatch that meets the table schema.
76+
/// Creates a `SchemaMapping` that can be used to cast or map the columns
77+
/// from the file schema to the table schema.
7278
pub trait SchemaMapper: Send + Sync {
7379
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
7480
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;

datafusion/core/tests/expr_api/simplification.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ fn select_date_plus_interval() -> Result<()> {
289289

290290
// Note that constant folder runs and folds the entire
291291
// expression down to a single constant (true)
292-
let expected = r#"Projection: Date32("18636") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408")
292+
let expected = r#"Projection: Date32("2021-01-09") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408")
293293
TableScan: test"#;
294294
let actual = get_optimized_plan_formatted(plan, &time);
295295

0 commit comments

Comments
 (0)