Skip to content

Commit

Permalink
add retries
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Apr 7, 2024
1 parent 5eade5e commit d9191fa
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 17 deletions.
6 changes: 3 additions & 3 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use lazy_static::lazy_static;
use object_store::path::Path;
use object_store::{Error as ObjectStoreError, ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder;
use regex::Regex;
use serde::{Deserialize, Serialize};
use tracing::debug;
Expand All @@ -19,8 +19,8 @@ use super::parse;
use crate::kernel::{arrow::json, ActionType, Metadata, Protocol, Schema, StructType};
use crate::logstore::LogStore;
use crate::operations::transaction::CommitData;
use crate::reader::CloudParquetObjectReader;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint";

lazy_static! {
Expand Down Expand Up @@ -264,7 +264,7 @@ impl LogSegment {
.map(move |meta| {
let store = store.clone();
async move {
let reader = ParquetObjectReader::new(store, meta);
let reader = CloudParquetObjectReader::new(store, meta);
let options = ArrowReaderOptions::new(); //.with_page_index(enable_page_index);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, options).await?;
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub mod kernel;
pub mod logstore;
pub mod operations;
pub mod protocol;
mod reader;
pub mod schema;
pub mod storage;
pub mod table;
Expand Down
14 changes: 6 additions & 8 deletions crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Command for converting a Parquet table to a Delta table in place
// https://github.com/delta-io/delta/blob/1d5dd774111395b0c4dc1a69c94abc169b1c83b6/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala

use crate::reader::CloudParquetObjectReader;
use crate::{
kernel::{Add, DataType, Schema, StructField},
logstore::{LogStore, LogStoreRef},
Expand All @@ -15,10 +16,8 @@ use futures::{
future::{self, BoxFuture},
TryStreamExt,
};
use parquet::{
arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder},
errors::ParquetError,
};
use parquet::{arrow::async_reader::ParquetRecordBatchStreamBuilder, errors::ParquetError};

use percent_encoding::percent_decode_str;
use serde_json::{Map, Value};
use std::{
Expand Down Expand Up @@ -352,10 +351,9 @@ impl ConvertToDeltaBuilder {
.into(),
);

let mut arrow_schema = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(
object_store.clone(),
file,
))
let mut arrow_schema = ParquetRecordBatchStreamBuilder::new(
CloudParquetObjectReader::new(object_store.clone(), file),
)
.await?
.schema()
.as_ref()
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use futures::{Future, StreamExt, TryStreamExt};
use indexmap::IndexMap;
use itertools::Itertools;
use num_cpus;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder;
use parquet::basic::{Compression, ZstdLevel};
use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
Expand All @@ -47,11 +47,11 @@ use crate::kernel::{Action, PartitionsExt, Remove, Scalar};
use crate::logstore::LogStoreRef;
use crate::operations::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES};
use crate::protocol::DeltaOperation;
use crate::reader::CloudParquetObjectReader;
use crate::storage::ObjectStoreRef;
use crate::table::state::DeltaTableState;
use crate::writer::utils::arrow_schema_without_partitions;
use crate::{crate_version, DeltaTable, ObjectMeta, PartitionFilter};

/// Metrics from Optimize
#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -666,7 +666,8 @@ impl MergePlan {
.then(move |file| {
let object_store_ref = object_store_ref.clone();
async move {
let file_reader = ParquetObjectReader::new(object_store_ref, file);
let file_reader =
CloudParquetObjectReader::new(object_store_ref, file);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()
Expand Down Expand Up @@ -1159,7 +1160,7 @@ pub(super) mod zorder {
.then(move |file| {
let object_store = object_store.clone();
async move {
let file_reader = ParquetObjectReader::new(object_store.clone(), file);
let file_reader = CloudParquetObjectReader::new(object_store.clone(), file);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::sync::Arc;

use crate::reader::CloudParquetObjectReader;
use arrow::array::{ArrayRef, BooleanArray};
use arrow::datatypes::{
DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
Expand All @@ -12,7 +13,7 @@ use datafusion_expr::Expr;
use itertools::Itertools;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder;

use crate::delta_datafusion::{
get_null_of_arrow_type, logical_expr_to_physical_expr, to_correct_scalar_value,
Expand All @@ -37,7 +38,7 @@ impl DeltaTableState {
.max_by_key(|obj| obj.modification_time)
{
let file_meta = add.try_into()?;
let file_reader = ParquetObjectReader::new(object_store, file_meta);
let file_reader = CloudParquetObjectReader::new(object_store, file_meta);
let file_schema = ParquetRecordBatchStreamBuilder::new_with_options(
file_reader,
ArrowReaderOptions::new().with_skip_arrow_metadata(true),
Expand Down
135 changes: 135 additions & 0 deletions crates/core/src/reader/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::future::ready;
use std::{ops::Range, sync::Arc};

use bytes::Bytes;
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::errors::ParquetError;
use parquet::errors::Result as ParquetResult;
use parquet::file::metadata::ParquetMetaData;

/// Wrapper for ParquetObjectReader that does additional retries
#[derive(Clone, Debug)]
pub struct CloudParquetObjectReader {
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
metadata_size_hint: Option<usize>,
preload_column_index: bool,
preload_offset_index: bool,
}
#[allow(dead_code)]
impl CloudParquetObjectReader {
/// Creates a new [`CloudParquetObjectReader`] for the provided [`ObjectStore`] and [`ObjectMeta`]
///
/// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`]
pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self {
Self {
store,
meta,
metadata_size_hint: None,
preload_column_index: false,
preload_offset_index: false,
}
}

/// Provide a hint as to the size of the parquet file's footer,
/// see [fetch_parquet_metadata](crate::arrow::async_reader::fetch_parquet_metadata)
pub fn with_footer_size_hint(self, hint: usize) -> Self {
Self {
metadata_size_hint: Some(hint),
..self
}
}

/// Load the Column Index as part of [`Self::get_metadata`]
pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
Self {
preload_column_index,
..self
}
}

/// Load the Offset Index as part of [`Self::get_metadata`]
pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
Self {
preload_offset_index,
..self
}
}
}

impl AsyncFileReader for CloudParquetObjectReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
let mut retries = 5;
loop {
let future_result = self
.store
.get_range(&self.meta.location, range.clone())
.map_err(|e| {
ParquetError::General(format!("AsyncChunkReader::get_bytes error: {e}"))
})
.boxed();

let result = block_on(future_result);
match result {
Ok(bytes) => return Box::pin(ready(Ok(bytes))),
Err(err) => {
if retries == 0 {
return Box::pin(ready(Err(err)));
}
retries -= 1;
}
}
}
}

fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, ParquetResult<Vec<Bytes>>>
where
Self: Send,
{
async move {
let mut retries = 5;
loop {
let future_result = self
.store
.get_ranges(&self.meta.location, &ranges)
.await
.map_err(|e| {
ParquetError::General(format!(
"ParquetObjectReader::get_byte_ranges error: {e}"
))
});
match future_result {
Ok(bytes) => return Ok(bytes),
Err(err) => {
if retries == 0 {
return Err(err);
}
retries -= 1;
}
}
}
}
.boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, ParquetResult<Arc<ParquetMetaData>>> {
Box::pin(async move {
let preload_column_index = self.preload_column_index;
let preload_offset_index = self.preload_offset_index;
let file_size = self.meta.size;
let prefetch = self.metadata_size_hint;
let mut loader = MetadataLoader::load(self, file_size, prefetch).await?;
loader
.load_page_index(preload_column_index, preload_offset_index)
.await?;
Ok(Arc::new(loader.finish()))
})
}
}

0 comments on commit d9191fa

Please sign in to comment.