From bdbf7f186610e70ed181738724c63c01d26e87fb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 11 Apr 2025 09:26:49 +0100 Subject: [PATCH 1/6] Make with_schema_unchecked unsafe --- arrow-array/src/record_batch.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index cf20f9a059c..2b844f89333 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -359,11 +359,12 @@ impl RecordBatch { }) } - /// Overrides the schema of this [`RecordBatch`] - /// without additional schema checks. Note, however, that this pushes all the schema compatibility responsibilities - /// to the caller site. In particular, the caller guarantees that `schema` is a superset - /// of the current schema as determined by [`Schema::contains`]. - pub fn with_schema_unchecked(self, schema: SchemaRef) -> Result { + /// Overrides the schema of this [`RecordBatch`] without additional schema checks. + /// + /// # Safety + /// + /// `schema` must be a superset of the current schema as determined by [`Schema::contains`] + pub unsafe fn with_schema_unchecked(self, schema: SchemaRef) -> Result { Ok(Self { schema, columns: self.columns, From fd36a01926b5a07ec60fc90cf9f58beaf3a266a1 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 11 Apr 2025 09:32:32 +0100 Subject: [PATCH 2/6] Fix tests --- arrow-array/src/record_batch.rs | 40 ++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 2b844f89333..347c73a6f96 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1655,7 +1655,7 @@ mod tests { #[test] fn test_batch_with_unchecked_schema() { - fn apply_schema_unchecked( + unsafe fn apply_schema_unchecked( record_batch: &RecordBatch, schema_ref: SchemaRef, idx: usize, @@ -1676,7 +1676,7 @@ mod tests { // Test empty schema for non-empty schema batch let invalid_schema_empty = Schema::empty(); assert_eq!( - apply_schema_unchecked(&record_batch, invalid_schema_empty.into(), 0) + unsafe { apply_schema_unchecked(&record_batch, invalid_schema_empty.into(), 0) } .unwrap() .to_string(), "Schema error: project index 0 out of bounds, max field 0" @@ -1688,13 +1688,13 @@ mod tests { Field::new("b", DataType::Int32, false), ]); - assert!( + assert!(unsafe { apply_schema_unchecked(&record_batch, invalid_schema_more_cols.clone().into(), 0) - .is_none() - ); + } + .is_none()); assert_eq!( - apply_schema_unchecked(&record_batch, invalid_schema_more_cols.into(), 1) + unsafe { apply_schema_unchecked(&record_batch, invalid_schema_more_cols.into(), 1) } .unwrap() .to_string(), "Schema error: project index 1 out of bounds, max field 1" @@ -1703,28 +1703,32 @@ mod tests { // Wrong datatype let invalid_schema_wrong_datatype = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - assert_eq!(apply_schema_unchecked(&record_batch, invalid_schema_wrong_datatype.into(), 0).unwrap().to_string(), "Invalid argument error: column types must match schema types, expected Int32 but found Utf8 at column index 0"); + assert_eq!(unsafe { apply_schema_unchecked(&record_batch, invalid_schema_wrong_datatype.into(), 0)}.unwrap().to_string(), "Invalid argument error: column types must match schema types, expected Int32 but found Utf8 at column index 0"); // Wrong column name. A instead C let invalid_schema_wrong_col_name = Schema::new(vec![Field::new("a", DataType::Utf8, false)]); - assert!(record_batch - .clone() - .with_schema_unchecked(invalid_schema_wrong_col_name.into()) - .unwrap() - .column_by_name("c") - .is_none()); + assert!(unsafe { + record_batch + .clone() + .with_schema_unchecked(invalid_schema_wrong_col_name.into()) + } + .unwrap() + .column_by_name("c") + .is_none()); // Valid schema let valid_schema = Schema::new(vec![Field::new("c", DataType::Utf8, false)]); assert_eq!( - record_batch - .clone() - .with_schema_unchecked(valid_schema.into()) - .unwrap() - .column_by_name("c"), + unsafe { + record_batch + .clone() + .with_schema_unchecked(valid_schema.into()) + } + .unwrap() + .column_by_name("c"), record_batch.column_by_name("c") ); } From 8cd9dc5ca6ac928d61ed2ad52b19dd3f7fed8fdb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 11 Apr 2025 15:15:52 +0100 Subject: [PATCH 3/6] Revert with_schema_unchecked --- arrow-array/src/record_batch.rs | 99 +-------------------------------- 1 file changed, 2 insertions(+), 97 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 347c73a6f96..a6c2aee7cbc 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -359,19 +359,6 @@ impl RecordBatch { }) } - /// Overrides the schema of this [`RecordBatch`] without additional schema checks. - /// - /// # Safety - /// - /// `schema` must be a superset of the current schema as determined by [`Schema::contains`] - pub unsafe fn with_schema_unchecked(self, schema: SchemaRef) -> Result { - Ok(Self { - schema, - columns: self.columns, - row_count: self.row_count, - }) - } - /// Returns the [`Schema`] of the record batch. pub fn schema(&self) -> SchemaRef { self.schema.clone() @@ -757,14 +744,12 @@ impl RecordBatchOptions { row_count: None, } } - - /// Sets the `row_count` of `RecordBatchOptions` and returns this [`RecordBatch`] + /// Sets the row_count of RecordBatchOptions and returns self pub fn with_row_count(mut self, row_count: Option) -> Self { self.row_count = row_count; self } - - /// Sets the `match_field_names` of `RecordBatchOptions` and returns this [`RecordBatch`] + /// Sets the match_field_names of RecordBatchOptions and returns self pub fn with_match_field_names(mut self, match_field_names: bool) -> Self { self.match_field_names = match_field_names; self @@ -1652,84 +1637,4 @@ mod tests { "bar" ); } - - #[test] - fn test_batch_with_unchecked_schema() { - unsafe fn apply_schema_unchecked( - record_batch: &RecordBatch, - schema_ref: SchemaRef, - idx: usize, - ) -> Option { - record_batch - .clone() - .with_schema_unchecked(schema_ref) - .unwrap() - .project(&[idx]) - .err() - } - - let c: ArrayRef = Arc::new(StringArray::from(vec!["d", "e", "f"])); - - let record_batch = - RecordBatch::try_from_iter(vec![("c", c.clone())]).expect("valid conversion"); - - // Test empty schema for non-empty schema batch - let invalid_schema_empty = Schema::empty(); - assert_eq!( - unsafe { apply_schema_unchecked(&record_batch, invalid_schema_empty.into(), 0) } - .unwrap() - .to_string(), - "Schema error: project index 0 out of bounds, max field 0" - ); - - // Wrong number of columns - let invalid_schema_more_cols = Schema::new(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Int32, false), - ]); - - assert!(unsafe { - apply_schema_unchecked(&record_batch, invalid_schema_more_cols.clone().into(), 0) - } - .is_none()); - - assert_eq!( - unsafe { apply_schema_unchecked(&record_batch, invalid_schema_more_cols.into(), 1) } - .unwrap() - .to_string(), - "Schema error: project index 1 out of bounds, max field 1" - ); - - // Wrong datatype - let invalid_schema_wrong_datatype = - Schema::new(vec![Field::new("a", DataType::Int32, false)]); - assert_eq!(unsafe { apply_schema_unchecked(&record_batch, invalid_schema_wrong_datatype.into(), 0)}.unwrap().to_string(), "Invalid argument error: column types must match schema types, expected Int32 but found Utf8 at column index 0"); - - // Wrong column name. A instead C - let invalid_schema_wrong_col_name = - Schema::new(vec![Field::new("a", DataType::Utf8, false)]); - - assert!(unsafe { - record_batch - .clone() - .with_schema_unchecked(invalid_schema_wrong_col_name.into()) - } - .unwrap() - .column_by_name("c") - .is_none()); - - // Valid schema - let valid_schema = Schema::new(vec![Field::new("c", DataType::Utf8, false)]); - - assert_eq!( - unsafe { - record_batch - .clone() - .with_schema_unchecked(valid_schema.into()) - } - .unwrap() - .column_by_name("c"), - record_batch.column_by_name("c") - ); - } } From 7feaf371822f33239e07105b770671d54f55ef7b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 11 Apr 2025 15:22:30 +0100 Subject: [PATCH 4/6] Add new_unchecked --- arrow-array/src/record_batch.rs | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index a6c2aee7cbc..064c996dd82 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -211,10 +211,11 @@ impl RecordBatch { /// Creates a `RecordBatch` from a schema and columns. /// /// Expects the following: - /// * the vec of columns to not be empty - /// * the schema and column data types to have equal lengths - /// and match - /// * each array in columns to have the same length + /// + /// * `!columns.is_empty()` + /// * `schema.fields.len() == columns.len()` + /// * `schema.fields[i].data_type() == columns[i].data_type()` + /// * `columns[i].len() == columns[j].len()` /// /// If the conditions are not met, an error is returned. /// @@ -240,6 +241,27 @@ impl RecordBatch { Self::try_new_impl(schema, columns, &options) } + /// Creates a `RecordBatch` from a schema and columns + /// + /// # Safety + /// + /// Expects the following: + /// + /// * `schema.fields.len() == columns.len()` + /// * `schema.fields[i].data_type() == columns[i].data_type()` + /// * `columns[i].len() == row_count` + pub unsafe fn new_unchecked( + schema: SchemaRef, + columns: Vec>, + row_count: usize, + ) -> Self { + Self { + schema, + columns, + row_count, + } + } + /// Creates a `RecordBatch` from a schema and columns, with additional options, /// such as whether to strictly validate field names. /// From e2f5d8f12d0bc9fdc3a9fc546854a35648e967ed Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 11 Apr 2025 15:39:21 +0100 Subject: [PATCH 5/6] Add into_parts --- arrow-array/src/record_batch.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 064c996dd82..34a21ba3f11 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -362,6 +362,11 @@ impl RecordBatch { }) } + /// Return the schema, columns and row count of this [`RecordBatch`] + pub fn into_parts(self) -> (SchemaRef, Vec, usize) { + (self.schema, self.columns, self.row_count) + } + /// Override the schema of this [`RecordBatch`] /// /// Returns an error if `schema` is not a superset of the current schema From 4da5095d4304cd5596a084b1b02b6f91f7dfc358 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 11 Apr 2025 15:40:53 +0100 Subject: [PATCH 6/6] Review feedback --- arrow-array/src/record_batch.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 34a21ba3f11..7bf7dc3a394 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -241,7 +241,9 @@ impl RecordBatch { Self::try_new_impl(schema, columns, &options) } - /// Creates a `RecordBatch` from a schema and columns + /// Creates a `RecordBatch` from a schema and columns, without validation. + /// + /// See [`Self::try_new`] for the checked version. /// /// # Safety /// @@ -250,6 +252,10 @@ impl RecordBatch { /// * `schema.fields.len() == columns.len()` /// * `schema.fields[i].data_type() == columns[i].data_type()` /// * `columns[i].len() == row_count` + /// + /// Note: if the schema does not match the underlying data exactly, it can lead to undefined + /// behavior, for example, via conversion to a `StructArray`, which in turn could lead + /// to incorrect access. pub unsafe fn new_unchecked( schema: SchemaRef, columns: Vec>,