From de1aa24799b97d5f4ae44b970a57a4fed07b584f Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 17 Sep 2024 14:03:59 +0200 Subject: [PATCH 1/2] refactor: Remove extra hashmap construction in new-streaming parquet --- .../src/nodes/parquet_source/metadata_fetch.rs | 2 +- .../src/nodes/parquet_source/metadata_utils.rs | 18 +++++------------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs index 7f5aa8abbf3e..05d59e76f468 100644 --- a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs @@ -128,8 +128,8 @@ impl ParquetSourceNode { }; ensure_metadata_has_projected_fields( - projected_arrow_schema.as_ref(), &metadata, + projected_arrow_schema.as_ref(), )?; PolarsResult::Ok((path_index, byte_source, metadata)) diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs index 24184fd12b10..eab841737573 100644 --- a/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs @@ -1,9 +1,8 @@ -use polars_core::prelude::{ArrowSchema, DataType, PlHashMap}; +use polars_core::prelude::{ArrowSchema, DataType}; use polars_error::{polars_bail, PolarsResult}; use polars_io::prelude::FileMetadata; use polars_io::utils::byte_source::{ByteSource, DynByteSource}; use polars_utils::mmap::MemSlice; -use polars_utils::pl_str::PlSmallStr; /// Read the metadata bytes of a parquet file, does not decode the bytes. If during metadata fetch /// the bytes of the entire file are loaded, it is returned in the second return value. @@ -124,25 +123,18 @@ pub(super) async fn read_parquet_metadata_bytes( /// Ensures that a parquet file has all the necessary columns for a projection with the correct /// dtype. There are no ordering requirements and extra columns are permitted. pub(super) fn ensure_metadata_has_projected_fields( - projected_fields: &ArrowSchema, metadata: &FileMetadata, + projected_fields: &ArrowSchema, ) -> PolarsResult<()> { let schema = polars_parquet::arrow::read::infer_schema(metadata)?; - // Note: We convert to Polars-native dtypes for timezone normalization. - let mut schema = schema - .into_iter_values() - .map(|x| { - let dtype = DataType::from_arrow(&x.dtype, true); - (x.name, dtype) - }) - .collect::>(); - for field in projected_fields.iter_values() { - let Some(dtype) = schema.remove(&field.name) else { + let Some(field) = schema.get(&field.name) else { polars_bail!(SchemaMismatch: "did not find column: {}", field.name) }; + // Note: We convert to Polars-native dtypes for timezone normalization. + let dtype = DataType::from_arrow(&field.dtype, true); let expected_dtype = DataType::from_arrow(&field.dtype, true); if dtype != expected_dtype { From c895a2e3b6e4d88dcfc58c69a095169d51700ff9 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 17 Sep 2024 14:31:14 +0200 Subject: [PATCH 2/2] c --- .../src/nodes/parquet_source/metadata_utils.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs index eab841737573..aa99742fb83e 100644 --- a/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs @@ -129,13 +129,14 @@ pub(super) fn ensure_metadata_has_projected_fields( let schema = polars_parquet::arrow::read::infer_schema(metadata)?; for field in projected_fields.iter_values() { - let Some(field) = schema.get(&field.name) else { - polars_bail!(SchemaMismatch: "did not find column: {}", field.name) - }; - // Note: We convert to Polars-native dtypes for timezone normalization. - let dtype = DataType::from_arrow(&field.dtype, true); let expected_dtype = DataType::from_arrow(&field.dtype, true); + let dtype = { + let Some(field) = schema.get(&field.name) else { + polars_bail!(SchemaMismatch: "did not find column: {}", field.name) + }; + DataType::from_arrow(&field.dtype, true) + }; if dtype != expected_dtype { polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}",