Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support loading from datasets where the hive columns are also stored in the file #17203

Merged
merged 11 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ impl DataFrame {
series: Series,
) -> PolarsResult<&mut Self> {
polars_ensure!(
series.len() == self.height(),
self.width() == 0 || series.len() == self.height(),
ShapeMismatch: "unable to add a column of length {} to a DataFrame of height {}",
series.len(), self.height(),
);
Expand Down
38 changes: 34 additions & 4 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,32 @@ pub(super) fn array_iter_to_series(
/// num_rows equals the height of the df when the df height is non-zero.
pub(crate) fn materialize_hive_partitions(
df: &mut DataFrame,
reader_schema: &ArrowSchema,
hive_partition_columns: Option<&[Series]>,
num_rows: usize,
) {
if let Some(hive_columns) = hive_partition_columns {
for s in hive_columns {
unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) };
let Some(first) = hive_columns.first() else {
return;
};

if reader_schema.index_of(first.name()).is_some() {
// Insert these hive columns in the order they are stored in the file.
for s in hive_columns {
let i = match df.get_columns().binary_search_by_key(
&reader_schema.index_of(s.name()).unwrap_or(usize::MAX),
|s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN),
) {
Ok(i) => i,
Err(i) => i,
};

df.insert_column(i, s.new_from_index(0, num_rows)).unwrap();
}
} else {
for s in hive_columns {
unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) };
}
}
}
}
Expand Down Expand Up @@ -294,7 +314,12 @@ fn rg_to_dfs_optionally_par_over_columns(
df.with_row_index_mut(&rc.name, Some(*previous_row_count + rc.offset));
}

materialize_hive_partitions(&mut df, hive_partition_columns, projection_height);
materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
projection_height,
);
apply_predicate(&mut df, predicate, true)?;

*previous_row_count += current_row_count;
Expand Down Expand Up @@ -382,7 +407,12 @@ fn rg_to_dfs_par_over_rg(
df.with_row_index_mut(&rc.name, Some(row_count_start as IdxSize + rc.offset));
}

materialize_hive_partitions(&mut df, hive_partition_columns, projection_height);
materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
projection_height,
);
apply_predicate(&mut df, predicate, false)?;

Ok(Some(df))
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub fn materialize_empty_df(
.unwrap();
}

materialize_hive_partitions(&mut df, hive_partition_columns, 0);
materialize_hive_partitions(&mut df, reader_schema, hive_partition_columns, 0);

df
}
63 changes: 59 additions & 4 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use arrow::datatypes::ArrowSchemaRef;
use either::Either;
use expr_expansion::{is_regex_projection, rewrite_projections};
use hive::hive_partitions_from_paths;
use hive::{hive_partitions_from_paths, HivePartitions};

use super::stack_opt::ConversionOptimizer;
use super::*;
Expand Down Expand Up @@ -86,7 +88,7 @@ pub fn to_alp_impl(
paths,
predicate,
mut scan_type,
file_options,
mut file_options,
} => {
let mut file_info = if let Some(file_info) = file_info {
file_info
Expand Down Expand Up @@ -136,20 +138,42 @@ pub fn to_alp_impl(

let hive_parts = if hive_parts.is_some() {
hive_parts
} else if file_options.hive_options.enabled.unwrap() {
} else if file_options.hive_options.enabled.unwrap()
&& file_info.reader_schema.is_some()
{
#[allow(unused_assignments)]
let mut owned = None;

hive_partitions_from_paths(
paths.as_ref(),
file_options.hive_options.hive_start_idx,
file_options.hive_options.schema.clone(),
match file_info.reader_schema.as_ref().unwrap() {
Either::Left(v) => {
owned = Some(Schema::from(v));
owned.as_ref().unwrap()
},
Either::Right(v) => v.as_ref(),
},
)?
} else {
None
};

if let Some(ref hive_parts) = hive_parts {
file_info.update_schema_with_hive_schema(hive_parts[0].schema().clone())?;
let hive_schema = hive_parts[0].schema();
file_info.update_schema_with_hive_schema(hive_schema.clone());
}

file_options.with_columns = if file_info.reader_schema.is_some() {
maybe_init_projection_excluding_hive(
file_info.reader_schema.as_ref().unwrap(),
hive_parts.as_ref().map(|x| &x[0]),
)
} else {
None
};

if let Some(row_index) = &file_options.row_index {
let schema = Arc::make_mut(&mut file_info.schema);
*schema = schema
Expand Down Expand Up @@ -802,3 +826,34 @@ where
})
.collect()
}

pub(crate) fn maybe_init_projection_excluding_hive(
reader_schema: &Either<ArrowSchemaRef, SchemaRef>,
hive_parts: Option<&HivePartitions>,
) -> Option<Arc<[String]>> {
// Update `with_columns` with a projection so that hive columns aren't loaded from the
// file
let hive_parts = hive_parts?;

let hive_schema = hive_parts.schema();

let (first_hive_name, _) = hive_schema.get_at_index(0)?;

let names = match reader_schema {
Either::Left(ref v) => {
let names = v.get_names();
names.contains(&first_hive_name.as_str()).then_some(names)
},
Either::Right(ref v) => v.contains(first_hive_name.as_str()).then(|| v.get_names()),
};

let names = names?;

Some(
names
.iter()
.filter(|x| !hive_schema.contains(x))
.map(ToString::to_string)
.collect::<Arc<[_]>>(),
)
}
26 changes: 21 additions & 5 deletions crates/polars-plan/src/plans/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ pub struct HivePartitions {
}

impl HivePartitions {
pub fn get_projection_schema_and_indices<T: AsRef<str>>(
pub fn get_projection_schema_and_indices(
&self,
names: &[T],
names: &PlHashSet<String>,
) -> (SchemaRef, Vec<usize>) {
let names = names.iter().map(T::as_ref).collect::<PlHashSet<&str>>();
let mut out_schema = Schema::with_capacity(self.stats.schema().len());
let mut out_indices = Vec::with_capacity(self.stats.column_stats().len());

Expand Down Expand Up @@ -66,6 +65,7 @@ pub fn hive_partitions_from_paths(
paths: &[PathBuf],
hive_start_idx: usize,
schema: Option<SchemaRef>,
reader_schema: &Schema,
) -> PolarsResult<Option<Arc<[HivePartitions]>>> {
let Some(path) = paths.first() else {
return Ok(None);
Expand All @@ -88,14 +88,30 @@ pub fn hive_partitions_from_paths(
}};
}

let hive_schema = if let Some(v) = schema {
v
let hive_schema = if let Some(ref schema) = schema {
Arc::new(get_hive_parts_iter!(path_string).map(|(name, _)| {
let Some(dtype) = schema.get(name) else {
polars_bail!(
SchemaFieldNotFound:
"path contains column not present in the given Hive schema: {:?}, path = {:?}",
name,
path
)
};
Ok(Field::new(name, dtype.clone()))
}).collect::<PolarsResult<Schema>>()?)
} else {
let mut hive_schema = Schema::with_capacity(16);
let mut schema_inference_map: PlHashMap<&str, PlHashSet<DataType>> =
PlHashMap::with_capacity(16);

for (name, _) in get_hive_parts_iter!(path_string) {
// If the column is also in the file we can use the dtype stored there.
if let Some(dtype) = reader_schema.get(name) {
hive_schema.insert_at_index(hive_schema.len(), name.into(), dtype.clone())?;
continue;
}

hive_schema.insert_at_index(hive_schema.len(), name.into(), DataType::String)?;
schema_inference_map.insert(name, PlHashSet::with_capacity(4));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn init_set() -> PlHashSet<Arc<str>> {

/// utility function to get names of the columns needed in projection at scan level
fn get_scan_columns(
acc_projections: &mut Vec<ColumnNode>,
acc_projections: &Vec<ColumnNode>,
expr_arena: &Arena<AExpr>,
row_index: Option<&RowIndex>,
) -> Option<Arc<[String]>> {
Expand Down Expand Up @@ -378,7 +378,7 @@ impl ProjectionPushDown {
mut options,
predicate,
} => {
options.with_columns = get_scan_columns(&mut acc_projections, expr_arena, None);
options.with_columns = get_scan_columns(&acc_projections, expr_arena, None);

options.output_schema = if options.with_columns.is_none() {
None
Expand Down Expand Up @@ -417,7 +417,7 @@ impl ProjectionPushDown {

if do_optimization {
file_options.with_columns = get_scan_columns(
&mut acc_projections,
&acc_projections,
expr_arena,
file_options.row_index.as_ref(),
);
Expand All @@ -432,7 +432,9 @@ impl ProjectionPushDown {

hive_parts = if let Some(hive_parts) = hive_parts {
let (new_schema, projected_indices) = hive_parts[0]
.get_projection_schema_and_indices(with_columns.as_ref());
.get_projection_schema_and_indices(
&with_columns.iter().cloned().collect::<PlHashSet<_>>(),
);

Some(
hive_parts
Expand All @@ -448,15 +450,22 @@ impl ProjectionPushDown {
.collect::<Arc<[_]>>(),
)
} else {
hive_parts
None
};

// Hive partitions are created AFTER the projection, so the output
// schema is incorrect. Here we ensure the columns that are projected and hive
// parts are added at the proper place in the schema, which is at the end.
if let Some(ref mut hive_parts) = hive_parts {
if let Some(ref hive_parts) = hive_parts {
let partition_schema = hive_parts.first().unwrap().schema();

file_options.with_columns = file_options.with_columns.map(|x| {
x.iter()
.filter(|x| !partition_schema.contains(x))
.cloned()
.collect::<Arc<[_]>>()
});

for (name, _) in partition_schema.iter() {
if let Some(dt) = schema.shift_remove(name) {
schema.with_column(name.clone(), dt);
Expand All @@ -465,6 +474,10 @@ impl ProjectionPushDown {
}
Some(Arc::new(schema))
} else {
file_options.with_columns = maybe_init_projection_excluding_hive(
file_info.reader_schema.as_ref().unwrap(),
hive_parts.as_ref().map(|x| &x[0]),
);
None
};
}
Expand Down
24 changes: 11 additions & 13 deletions crates/polars-plan/src/plans/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,18 @@ impl FileInfo {
}

/// Merge the [`Schema`] of a [`HivePartitions`] with the schema of this [`FileInfo`].
///
/// Returns an `Err` if any of the columns in either schema overlap.
pub fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) -> PolarsResult<()> {
let expected_len = self.schema.len() + hive_schema.len();
pub fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) {
let schema = Arc::make_mut(&mut self.schema);

let file_schema = Arc::make_mut(&mut self.schema);
file_schema.merge(Arc::unwrap_or_clone(hive_schema));

polars_ensure!(
file_schema.len() == expected_len,
Duplicate: "invalid Hive partition schema\n\n\
Extending the schema with the Hive partition schema would create duplicate fields."
);
Ok(())
for field in hive_schema.iter_fields() {
if let Ok(existing) = schema.try_get_mut(&field.name) {
*existing = field.data_type().clone();
} else {
schema
.insert_at_index(schema.len(), field.name, field.dtype.clone())
.unwrap();
}
}
}
}

Expand Down
Loading