Skip to content

Commit ddcda30

Browse files
authored
Merge branch 'main' into release-gil
2 parents d6b3365 + fae1406 commit ddcda30

File tree

9 files changed

+207
-105
lines changed

9 files changed

+207
-105
lines changed

crates/aws/src/storage.rs

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! AWS S3 storage backend.
22
3+
use aws_config::meta::region::ProvideRegion;
34
use aws_config::provider_config::ProviderConfig;
45
use aws_config::{Region, SdkConfig};
56
use bytes::Bytes;
@@ -20,6 +21,7 @@ use std::str::FromStr;
2021
use std::sync::Arc;
2122
use std::time::Duration;
2223
use tokio::io::AsyncWrite;
24+
use tracing::warn;
2325
use url::Url;
2426

2527
use crate::errors::DynamoDbConfigError;
@@ -67,9 +69,9 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
6769
fn parse_url_opts(
6870
&self,
6971
url: &Url,
70-
options: &StorageOptions,
72+
storage_options: &StorageOptions,
7173
) -> DeltaResult<(ObjectStoreRef, Path)> {
72-
let options = self.with_env_s3(options);
74+
let options = self.with_env_s3(storage_options);
7375
let (inner, prefix) = parse_url_opts(
7476
url,
7577
options.0.iter().filter_map(|(key, value)| {
@@ -87,7 +89,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
8789
{
8890
Ok((store, prefix))
8991
} else {
90-
let s3_options = S3StorageOptions::from_map(&options.0)?;
92+
let s3_options = S3StorageOptions::from_map(&storage_options.0)?;
9193

9294
let store = S3StorageBackend::try_new(
9395
store,
@@ -140,7 +142,6 @@ impl S3StorageOptions {
140142
.filter(|(k, _)| !s3_constants::S3_OPTS.contains(&k.as_str()))
141143
.map(|(k, v)| (k.to_owned(), v.to_owned()))
142144
.collect();
143-
144145
// Copy web identity values provided in options but not the environment into the environment
145146
// to get picked up by the `from_k8s_env` call in `get_web_identity_provider`.
146147
Self::ensure_env_var(options, s3_constants::AWS_REGION);
@@ -175,41 +176,47 @@ impl S3StorageOptions {
175176
.unwrap_or(true);
176177
let imds_timeout =
177178
Self::u64_or_default(options, s3_constants::AWS_EC2_METADATA_TIMEOUT, 100);
179+
let (loader, provider_config) =
180+
if let Some(endpoint_url) = str_option(options, s3_constants::AWS_ENDPOINT_URL) {
181+
let (region_provider, provider_config) = Self::create_provider_config(
182+
str_option(options, s3_constants::AWS_REGION)
183+
.or_else(|| std::env::var("AWS_DEFAULT_REGION").ok())
184+
.map_or(Region::from_static("custom"), Region::new),
185+
)?;
186+
let loader = aws_config::from_env()
187+
.endpoint_url(endpoint_url)
188+
.region(region_provider);
189+
(loader, provider_config)
190+
} else {
191+
let (region_provider, provider_config) = Self::create_provider_config(
192+
crate::credentials::new_region_provider(disable_imds, imds_timeout),
193+
)?;
194+
(
195+
aws_config::from_env().region(region_provider),
196+
provider_config,
197+
)
198+
};
178199

179-
let region_provider = crate::credentials::new_region_provider(disable_imds, imds_timeout);
180-
let region = execute_sdk_future(region_provider.region())?;
181-
let provider_config = ProviderConfig::default().with_region(region);
182200
let credentials_provider = crate::credentials::ConfiguredCredentialChain::new(
183201
disable_imds,
184202
imds_timeout,
185203
&provider_config,
186204
);
187205
#[cfg(feature = "native-tls")]
188206
let sdk_config = execute_sdk_future(
189-
aws_config::from_env()
207+
loader
190208
.http_client(native::use_native_tls_client(
191209
str_option(options, s3_constants::AWS_ALLOW_HTTP)
192210
.map(|val| str_is_truthy(&val))
193211
.unwrap_or(false),
194212
))
195213
.credentials_provider(credentials_provider)
196-
.region(region_provider)
197214
.load(),
198215
)?;
199216
#[cfg(feature = "rustls")]
200-
let sdk_config = execute_sdk_future(
201-
aws_config::from_env()
202-
.credentials_provider(credentials_provider)
203-
.region(region_provider)
204-
.load(),
205-
)?;
206-
207217
let sdk_config =
208-
if let Some(endpoint_url) = str_option(options, s3_constants::AWS_ENDPOINT_URL) {
209-
sdk_config.to_builder().endpoint_url(endpoint_url).build()
210-
} else {
211-
sdk_config
212-
};
218+
execute_sdk_future(loader.credentials_provider(credentials_provider).load())?;
219+
213220
Ok(Self {
214221
virtual_hosted_style_request,
215222
locking_provider: str_option(options, s3_constants::AWS_S3_LOCKING_PROVIDER),
@@ -230,6 +237,16 @@ impl S3StorageOptions {
230237
self.sdk_config.region()
231238
}
232239

240+
fn create_provider_config<T: ProvideRegion>(
241+
region_provider: T,
242+
) -> DeltaResult<(T, ProviderConfig)> {
243+
let region = execute_sdk_future(region_provider.region())?;
244+
Ok((
245+
region_provider,
246+
ProviderConfig::default().with_region(region),
247+
))
248+
}
249+
233250
fn u64_or_default(map: &HashMap<String, String>, key: &str, default: u64) -> u64 {
234251
str_option(map, key)
235252
.and_then(|v| v.parse().ok())
@@ -494,8 +511,15 @@ pub mod s3_constants {
494511
}
495512

496513
pub(crate) fn str_option(map: &HashMap<String, String>, key: &str) -> Option<String> {
497-
map.get(key)
498-
.map_or_else(|| std::env::var(key).ok(), |v| Some(v.to_owned()))
514+
if let Some(s) = map.get(key) {
515+
return Some(s.to_owned());
516+
}
517+
518+
if let Some(s) = map.get(&key.to_ascii_lowercase()) {
519+
return Some(s.to_owned());
520+
}
521+
522+
std::env::var(key).ok()
499523
}
500524

501525
#[cfg(test)]

crates/core/src/operations/convert_to_delta.rs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
//! Command for converting a Parquet table to a Delta table in place
22
// https://github.com/delta-io/delta/blob/1d5dd774111395b0c4dc1a69c94abc169b1c83b6/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala
33

4+
use crate::operations::get_num_idx_cols_and_stats_columns;
45
use crate::{
56
kernel::{Add, DataType, Schema, StructField},
67
logstore::{LogStore, LogStoreRef},
78
operations::create::CreateBuilder,
89
protocol::SaveMode,
910
table::builder::ensure_table_uri,
1011
table::config::DeltaConfigKey,
12+
writer::stats::stats_from_parquet_metadata,
1113
DeltaResult, DeltaTable, DeltaTableError, ObjectStoreError, NULL_PARTITION_VALUE_DATA_PATH,
1214
};
1315
use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError};
1416
use futures::{
1517
future::{self, BoxFuture},
1618
TryStreamExt,
1719
};
20+
use indexmap::IndexMap;
1821
use parquet::{
1922
arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder},
2023
errors::ParquetError,
@@ -284,6 +287,10 @@ impl ConvertToDeltaBuilder {
284287
// A vector of StructField of all unique partition columns in a Parquet table
285288
let mut partition_schema_fields = HashMap::new();
286289

290+
// Obtain settings on which columns to skip collecting stats on if any
291+
let (num_indexed_cols, stats_columns) =
292+
get_num_idx_cols_and_stats_columns(None, self.configuration.clone());
293+
287294
for file in files {
288295
// A HashMap from partition column to value for this parquet file only
289296
let mut partition_values = HashMap::new();
@@ -328,6 +335,24 @@ impl ConvertToDeltaBuilder {
328335
subpath = iter.next();
329336
}
330337

338+
let batch_builder = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(
339+
object_store.clone(),
340+
file.clone(),
341+
))
342+
.await?;
343+
344+
// Fetch the stats
345+
let parquet_metadata = batch_builder.metadata();
346+
let stats = stats_from_parquet_metadata(
347+
&IndexMap::from_iter(partition_values.clone().into_iter()),
348+
parquet_metadata.as_ref(),
349+
num_indexed_cols,
350+
&stats_columns,
351+
)
352+
.map_err(|e| Error::DeltaTable(e.into()))?;
353+
let stats_string =
354+
serde_json::to_string(&stats).map_err(|e| Error::DeltaTable(e.into()))?;
355+
331356
actions.push(
332357
Add {
333358
path: percent_decode_str(file.location.as_ref())
@@ -349,19 +374,13 @@ impl ConvertToDeltaBuilder {
349374
.collect(),
350375
modification_time: file.last_modified.timestamp_millis(),
351376
data_change: true,
377+
stats: Some(stats_string),
352378
..Default::default()
353379
}
354380
.into(),
355381
);
356382

357-
let mut arrow_schema = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(
358-
object_store.clone(),
359-
file,
360-
))
361-
.await?
362-
.schema()
363-
.as_ref()
364-
.clone();
383+
let mut arrow_schema = batch_builder.schema().as_ref().clone();
365384

366385
// Arrow schema of Parquet files may have conflicting metatdata
367386
// Since Arrow schema metadata is not used to generate Delta table schema, we set the metadata field to an empty HashMap
@@ -584,6 +603,15 @@ mod tests {
584603
"part-00000-d22c627d-9655-4153-9527-f8995620fa42-c000.snappy.parquet"
585604
);
586605

606+
let Some(Scalar::Struct(min_values, _)) = action.min_values() else {
607+
panic!("Missing min values");
608+
};
609+
assert_eq!(min_values, vec![Scalar::Date(18628), Scalar::Integer(1)]);
610+
let Some(Scalar::Struct(max_values, _)) = action.max_values() else {
611+
panic!("Missing max values");
612+
};
613+
assert_eq!(max_values, vec![Scalar::Date(18632), Scalar::Integer(5)]);
614+
587615
assert_delta_table(
588616
table,
589617
path,

crates/core/src/operations/mod.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,33 @@ impl AsRef<DeltaTable> for DeltaOps {
239239
}
240240
}
241241

242+
/// Get the num_idx_columns and stats_columns from the table configuration in the state
243+
/// If table_config does not exist (only can occur in the first write action) it takes
244+
/// the configuration that was passed to the writerBuilder.
245+
pub fn get_num_idx_cols_and_stats_columns(
246+
config: Option<crate::table::config::TableConfig<'_>>,
247+
configuration: HashMap<String, Option<String>>,
248+
) -> (i32, Option<Vec<String>>) {
249+
let (num_index_cols, stats_columns) = match &config {
250+
Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()),
251+
_ => (
252+
configuration
253+
.get("delta.dataSkippingNumIndexedCols")
254+
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
255+
.unwrap_or(crate::table::config::DEFAULT_NUM_INDEX_COLS),
256+
configuration
257+
.get("delta.dataSkippingStatsColumns")
258+
.and_then(|v| v.as_ref().map(|v| v.split(',').collect::<Vec<&str>>())),
259+
),
260+
};
261+
(
262+
num_index_cols,
263+
stats_columns
264+
.clone()
265+
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
266+
)
267+
}
268+
242269
#[cfg(feature = "datafusion")]
243270
mod datafusion_utils {
244271
use datafusion::execution::context::SessionState;

crates/core/src/operations/transaction/conflict_checker.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl<'a> TransactionInfo<'a> {
194194
#[cfg(not(feature = "datafusion"))]
195195
/// Files read by the transaction
196196
pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
197-
Ok(self.read_snapshot.file_actions().unwrap().into_iter())
197+
Ok(self.read_snapshot.file_actions().unwrap())
198198
}
199199

200200
/// Whether the whole table was read during the transaction
@@ -311,13 +311,6 @@ impl WinningCommitSummary {
311311
}
312312
}
313313

314-
// pub fn only_add_files(&self) -> bool {
315-
// !self
316-
// .actions
317-
// .iter()
318-
// .any(|action| matches!(action, Action::remove(_)))
319-
// }
320-
321314
pub fn is_blind_append(&self) -> Option<bool> {
322315
self.commit_info
323316
.as_ref()

crates/core/src/operations/transaction/protocol.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ impl ProtocolChecker {
8080
}
8181

8282
/// checks if table contains timestamp_ntz in any field including nested fields.
83-
pub fn contains_timestampntz(&self, fields: &Vec<StructField>) -> bool {
84-
fn check_vec_fields(fields: &Vec<StructField>) -> bool {
83+
pub fn contains_timestampntz(&self, fields: &[StructField]) -> bool {
84+
fn check_vec_fields(fields: &[StructField]) -> bool {
8585
fields.iter().any(|f| _check_type(f.data_type()))
8686
}
8787

crates/core/src/operations/write.rs

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ use crate::logstore::LogStoreRef;
5858
use crate::operations::cast::{cast_record_batch, merge_schema};
5959
use crate::protocol::{DeltaOperation, SaveMode};
6060
use crate::storage::ObjectStoreRef;
61-
use crate::table::config::DEFAULT_NUM_INDEX_COLS;
6261
use crate::table::state::DeltaTableState;
6362
use crate::table::Constraint as DeltaConstraint;
6463
use crate::writer::record_batch::divide_by_partition_values;
@@ -759,7 +758,7 @@ impl std::future::IntoFuture for WriteBuilder {
759758
.map(|snapshot| snapshot.table_config());
760759

761760
let (num_indexed_cols, stats_columns) =
762-
get_num_idx_cols_and_stats_columns(config, this.configuration);
761+
super::get_num_idx_cols_and_stats_columns(config, this.configuration);
763762

764763
let writer_stats_config = WriterStatsConfig {
765764
num_indexed_cols,
@@ -922,33 +921,6 @@ fn try_cast_batch(from_fields: &Fields, to_fields: &Fields) -> Result<(), ArrowE
922921
Ok(())
923922
}
924923

925-
/// Get the num_idx_columns and stats_columns from the table configuration in the state
926-
/// If table_config does not exist (only can occur in the first write action) it takes
927-
/// the configuration that was passed to the writerBuilder.
928-
pub fn get_num_idx_cols_and_stats_columns(
929-
config: Option<crate::table::config::TableConfig<'_>>,
930-
configuration: HashMap<String, Option<String>>,
931-
) -> (i32, Option<Vec<String>>) {
932-
let (num_index_cols, stats_columns) = match &config {
933-
Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()),
934-
_ => (
935-
configuration
936-
.get("delta.dataSkippingNumIndexedCols")
937-
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
938-
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
939-
configuration
940-
.get("delta.dataSkippingStatsColumns")
941-
.and_then(|v| v.as_ref().map(|v| v.split(',').collect::<Vec<&str>>())),
942-
),
943-
};
944-
(
945-
num_index_cols,
946-
stats_columns
947-
.clone()
948-
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
949-
)
950-
}
951-
952924
#[cfg(test)]
953925
mod tests {
954926
use super::*;

crates/core/src/writer/record_batch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ mod tests {
520520
use crate::DeltaOps;
521521

522522
let table = crate::writer::test_utils::create_bare_table();
523-
let partition_cols = vec!["modified".to_string()];
523+
let partition_cols = ["modified".to_string()];
524524
let delta_schema = r#"
525525
{"type" : "struct",
526526
"fields" : [

0 commit comments

Comments
 (0)