diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 0b82231ee8..f224aed54d 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -10,7 +10,7 @@ use lazy_static::lazy_static; use object_store::path::Path; use object_store::{Error as ObjectStoreError, ObjectMeta, ObjectStore}; use parquet::arrow::arrow_reader::ArrowReaderOptions; -use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; +use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder; use regex::Regex; use serde::{Deserialize, Serialize}; use tracing::debug; @@ -19,8 +19,8 @@ use super::parse; use crate::kernel::{arrow::json, ActionType, Metadata, Protocol, Schema, StructType}; use crate::logstore::LogStore; use crate::operations::transaction::CommitData; +use crate::reader::CloudParquetObjectReader; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; - const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; lazy_static! { @@ -264,7 +264,7 @@ impl LogSegment { .map(move |meta| { let store = store.clone(); async move { - let reader = ParquetObjectReader::new(store, meta); + let reader = CloudParquetObjectReader::new(store, meta); let options = ArrowReaderOptions::new(); //.with_page_index(enable_page_index); let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options).await?; diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 4ef9fc06fd..f452cb4b53 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -75,6 +75,7 @@ pub mod kernel; pub mod logstore; pub mod operations; pub mod protocol; +mod reader; pub mod schema; pub mod storage; pub mod table; diff --git a/crates/core/src/operations/convert_to_delta.rs b/crates/core/src/operations/convert_to_delta.rs index 351a596062..779962f0d7 100644 --- a/crates/core/src/operations/convert_to_delta.rs +++ b/crates/core/src/operations/convert_to_delta.rs @@ -1,6 +1,7 @@ //! Command for converting a Parquet table to a Delta table in place // https://github.com/delta-io/delta/blob/1d5dd774111395b0c4dc1a69c94abc169b1c83b6/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +use crate::reader::CloudParquetObjectReader; use crate::{ kernel::{Add, DataType, Schema, StructField}, logstore::{LogStore, LogStoreRef}, @@ -15,10 +16,8 @@ use futures::{ future::{self, BoxFuture}, TryStreamExt, }; -use parquet::{ - arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}, - errors::ParquetError, -}; +use parquet::{arrow::async_reader::ParquetRecordBatchStreamBuilder, errors::ParquetError}; + use percent_encoding::percent_decode_str; use serde_json::{Map, Value}; use std::{ @@ -352,10 +351,9 @@ impl ConvertToDeltaBuilder { .into(), ); - let mut arrow_schema = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new( - object_store.clone(), - file, - )) + let mut arrow_schema = ParquetRecordBatchStreamBuilder::new( + CloudParquetObjectReader::new(object_store.clone(), file), + ) .await? .schema() .as_ref() diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index babe17a6a0..c021b2debf 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -33,7 +33,7 @@ use futures::{Future, StreamExt, TryStreamExt}; use indexmap::IndexMap; use itertools::Itertools; use num_cpus; -use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; +use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder; use parquet::basic::{Compression, ZstdLevel}; use parquet::errors::ParquetError; use parquet::file::properties::WriterProperties; @@ -47,11 +47,11 @@ use crate::kernel::{Action, PartitionsExt, Remove, Scalar}; use crate::logstore::LogStoreRef; use crate::operations::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES}; use crate::protocol::DeltaOperation; +use crate::reader::CloudParquetObjectReader; use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::writer::utils::arrow_schema_without_partitions; use crate::{crate_version, DeltaTable, ObjectMeta, PartitionFilter}; - /// Metrics from Optimize #[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -666,7 +666,8 @@ impl MergePlan { .then(move |file| { let object_store_ref = object_store_ref.clone(); async move { - let file_reader = ParquetObjectReader::new(object_store_ref, file); + let file_reader = + CloudParquetObjectReader::new(object_store_ref, file); ParquetRecordBatchStreamBuilder::new(file_reader) .await? .build() @@ -1159,7 +1160,7 @@ pub(super) mod zorder { .then(move |file| { let object_store = object_store.clone(); async move { - let file_reader = ParquetObjectReader::new(object_store.clone(), file); + let file_reader = CloudParquetObjectReader::new(object_store.clone(), file); ParquetRecordBatchStreamBuilder::new(file_reader) .await? .build() diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index 8f21018364..e8c0c7f7d1 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::sync::Arc; +use crate::reader::CloudParquetObjectReader; use arrow::array::{ArrayRef, BooleanArray}; use arrow::datatypes::{ DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, @@ -12,7 +13,7 @@ use datafusion_expr::Expr; use itertools::Itertools; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; -use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; +use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder; use crate::delta_datafusion::{ get_null_of_arrow_type, logical_expr_to_physical_expr, to_correct_scalar_value, @@ -37,7 +38,7 @@ impl DeltaTableState { .max_by_key(|obj| obj.modification_time) { let file_meta = add.try_into()?; - let file_reader = ParquetObjectReader::new(object_store, file_meta); + let file_reader = CloudParquetObjectReader::new(object_store, file_meta); let file_schema = ParquetRecordBatchStreamBuilder::new_with_options( file_reader, ArrowReaderOptions::new().with_skip_arrow_metadata(true), diff --git a/crates/core/src/reader/mod.rs b/crates/core/src/reader/mod.rs new file mode 100644 index 0000000000..a312ca8afb --- /dev/null +++ b/crates/core/src/reader/mod.rs @@ -0,0 +1,135 @@ +use std::future::ready; +use std::{ops::Range, sync::Arc}; + +use bytes::Bytes; +use futures::executor::block_on; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; +use object_store::{ObjectMeta, ObjectStore}; +use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; +use parquet::errors::ParquetError; +use parquet::errors::Result as ParquetResult; +use parquet::file::metadata::ParquetMetaData; + +/// Wrapper for ParquetObjectReader that does additional retries +#[derive(Clone, Debug)] +pub struct CloudParquetObjectReader { + store: Arc, + meta: ObjectMeta, + metadata_size_hint: Option, + preload_column_index: bool, + preload_offset_index: bool, +} +#[allow(dead_code)] +impl CloudParquetObjectReader { + /// Creates a new [`CloudParquetObjectReader`] for the provided [`ObjectStore`] and [`ObjectMeta`] + /// + /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`] + pub fn new(store: Arc, meta: ObjectMeta) -> Self { + Self { + store, + meta, + metadata_size_hint: None, + preload_column_index: false, + preload_offset_index: false, + } + } + + /// Provide a hint as to the size of the parquet file's footer, + /// see [fetch_parquet_metadata](crate::arrow::async_reader::fetch_parquet_metadata) + pub fn with_footer_size_hint(self, hint: usize) -> Self { + Self { + metadata_size_hint: Some(hint), + ..self + } + } + + /// Load the Column Index as part of [`Self::get_metadata`] + pub fn with_preload_column_index(self, preload_column_index: bool) -> Self { + Self { + preload_column_index, + ..self + } + } + + /// Load the Offset Index as part of [`Self::get_metadata`] + pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self { + Self { + preload_offset_index, + ..self + } + } +} + +impl AsyncFileReader for CloudParquetObjectReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { + let mut retries = 5; + loop { + let future_result = self + .store + .get_range(&self.meta.location, range.clone()) + .map_err(|e| { + ParquetError::General(format!("AsyncChunkReader::get_bytes error: {e}")) + }) + .boxed(); + + let result = block_on(future_result); + match result { + Ok(bytes) => return Box::pin(ready(Ok(bytes))), + Err(err) => { + if retries == 0 { + return Box::pin(ready(Err(err))); + } + retries -= 1; + } + } + } + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, ParquetResult>> + where + Self: Send, + { + async move { + let mut retries = 5; + loop { + let future_result = self + .store + .get_ranges(&self.meta.location, &ranges) + .await + .map_err(|e| { + ParquetError::General(format!( + "ParquetObjectReader::get_byte_ranges error: {e}" + )) + }); + match future_result { + Ok(bytes) => return Ok(bytes), + Err(err) => { + if retries == 0 { + return Err(err); + } + retries -= 1; + } + } + } + } + .boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, ParquetResult>> { + Box::pin(async move { + let preload_column_index = self.preload_column_index; + let preload_offset_index = self.preload_offset_index; + let file_size = self.meta.size; + let prefetch = self.metadata_size_hint; + let mut loader = MetadataLoader::load(self, file_size, prefetch).await?; + loader + .load_page_index(preload_column_index, preload_offset_index) + .await?; + Ok(Arc::new(loader.finish())) + }) + } +}