diff --git a/arrow-flight/src/decode.rs b/arrow-flight/src/decode.rs index 760fc926fca6..0133216ac473 100644 --- a/arrow-flight/src/decode.rs +++ b/arrow-flight/src/decode.rs @@ -282,6 +282,7 @@ impl FlightDataDecoder { self.state = Some(FlightStreamState { schema: Arc::clone(&schema), + schema_message: data.clone(), dictionaries_by_field, }); Ok(Some(DecodedFlightData::new_schema(data, schema))) @@ -302,10 +303,15 @@ impl FlightDataDecoder { ) })?; + let ipc_schema = arrow_ipc::root_as_message(&state.schema_message.data_header) + .unwrap() + .header_as_schema() + .unwrap(); + arrow_ipc::reader::read_dictionary( &buffer, dictionary_batch, - &state.schema, + ipc_schema, &mut state.dictionaries_by_field, &message.version(), ) @@ -325,8 +331,14 @@ impl FlightDataDecoder { )); }; + let ipc_schema = arrow_ipc::root_as_message(&state.schema_message.data_header) + .unwrap() + .header_as_schema() + .unwrap(); + let batch = flight_data_to_arrow_batch( &data, + ipc_schema, Arc::clone(&state.schema), &state.dictionaries_by_field, ) @@ -382,6 +394,7 @@ impl futures::Stream for FlightDataDecoder { #[derive(Debug)] struct FlightStreamState { schema: SchemaRef, + schema_message: FlightData, dictionaries_by_field: HashMap, } diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 57ac9f3173fe..f89caef83941 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -535,15 +535,11 @@ fn prepare_field_for_flight( ) .with_metadata(field.metadata().clone()) } else { - #[allow(deprecated)] - let dict_id = dictionary_tracker.set_dict_id(field.as_ref()); - - #[allow(deprecated)] + dictionary_tracker.next_dict_id(); Field::new_dict( field.name(), field.data_type().clone(), field.is_nullable(), - dict_id, field.dict_is_ordered().unwrap_or_default(), ) .with_metadata(field.metadata().clone()) @@ -585,14 +581,11 @@ fn prepare_schema_for_flight( ) .with_metadata(field.metadata().clone()) } else { - #[allow(deprecated)] - let dict_id = dictionary_tracker.set_dict_id(field.as_ref()); - #[allow(deprecated)] + dictionary_tracker.next_dict_id(); Field::new_dict( field.name(), field.data_type().clone(), field.is_nullable(), - dict_id, field.dict_is_ordered().unwrap_or_default(), ) .with_metadata(field.metadata().clone()) @@ -654,16 +647,10 @@ struct FlightIpcEncoder { impl FlightIpcEncoder { fn new(options: IpcWriteOptions, error_on_replacement: bool) -> Self { - #[allow(deprecated)] - let preserve_dict_id = options.preserve_dict_id(); Self { options, data_gen: IpcDataGenerator::default(), - #[allow(deprecated)] - dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id( - error_on_replacement, - preserve_dict_id, - ), + dictionary_tracker: DictionaryTracker::new(error_on_replacement), } } @@ -1547,9 +1534,8 @@ mod tests { async fn verify_flight_round_trip(mut batches: Vec) { let expected_schema = batches.first().unwrap().schema(); - #[allow(deprecated)] let encoder = FlightDataEncoderBuilder::default() - .with_options(IpcWriteOptions::default().with_preserve_dict_id(false)) + .with_options(IpcWriteOptions::default()) .with_dictionary_handling(DictionaryHandling::Resend) .build(futures::stream::iter(batches.clone().into_iter().map(Ok))); @@ -1575,8 +1561,7 @@ mod tests { HashMap::from([("some_key".to_owned(), "some_value".to_owned())]), ); - #[allow(deprecated)] - let mut dictionary_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); + let mut dictionary_tracker = DictionaryTracker::new(false); let got = prepare_schema_for_flight(&schema, &mut dictionary_tracker, false); assert!(got.metadata().contains_key("some_key")); @@ -1606,9 +1591,7 @@ mod tests { options: &IpcWriteOptions, ) -> (Vec, FlightData) { let data_gen = IpcDataGenerator::default(); - #[allow(deprecated)] - let mut dictionary_tracker = - DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let mut dictionary_tracker = DictionaryTracker::new(false); let (encoded_dictionaries, encoded_batch) = data_gen .encoded_batch(batch, &mut dictionary_tracker, options) diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs index 74ee21a05406..44f41a8bb3c3 100644 --- a/arrow-flight/src/lib.rs +++ b/arrow-flight/src/lib.rs @@ -149,9 +149,7 @@ pub struct IpcMessage(pub Bytes); fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData { let data_gen = writer::IpcDataGenerator::default(); - #[allow(deprecated)] - let mut dict_tracker = - writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let mut dict_tracker = writer::DictionaryTracker::new(false); data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options) } diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs index 6791b68b757d..130f48795730 100644 --- a/arrow-flight/src/sql/client.rs +++ b/arrow-flight/src/sql/client.rs @@ -707,6 +707,7 @@ pub enum ArrowFlightData { pub fn arrow_data_from_flight_data( flight_data: FlightData, arrow_schema_ref: &SchemaRef, + ipc_schema: arrow_ipc::Schema, ) -> Result { let ipc_message = root_as_message(&flight_data.data_header[..]) .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?; @@ -723,6 +724,7 @@ pub fn arrow_data_from_flight_data( let record_batch = read_record_batch( &Buffer::from(flight_data.data_body), ipc_record_batch, + ipc_schema, arrow_schema_ref.clone(), &dictionaries_by_field, None, diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index 428dde73ca6c..f004739245d9 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -44,7 +44,8 @@ pub fn flight_data_to_batches(flight_data: &[FlightData]) -> Result Result, ) -> Result { @@ -71,6 +73,7 @@ pub fn flight_data_to_arrow_batch( reader::read_record_batch( &Buffer::from(data.data_body.as_ref()), batch, + ipc_schema, schema, dictionaries_by_id, None, @@ -90,9 +93,7 @@ pub fn batches_to_flight_data( let mut flight_data = vec![]; let data_gen = writer::IpcDataGenerator::default(); - #[allow(deprecated)] - let mut dictionary_tracker = - writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let mut dictionary_tracker = writer::DictionaryTracker::new(false); for batch in batches.iter() { let (encoded_dictionaries, encoded_batch) = diff --git a/arrow-integration-test/Cargo.toml b/arrow-integration-test/Cargo.toml index d560d4fd8363..f36a2ad4c621 100644 --- a/arrow-integration-test/Cargo.toml +++ b/arrow-integration-test/Cargo.toml @@ -38,6 +38,7 @@ all-features = true [dependencies] arrow = { workspace = true } arrow-buffer = { workspace = true } +arrow-ipc = { workspace = true } hex = { version = "0.4", default-features = false, features = ["std"] } serde = { version = "1.0", default-features = false, features = ["rc", "derive"] } serde_json = { version = "1.0", default-features = false, features = ["std"] } diff --git a/arrow-integration-test/src/field.rs b/arrow-integration-test/src/field.rs index 4b896ed391be..007ccdd18725 100644 --- a/arrow-integration-test/src/field.rs +++ b/arrow-integration-test/src/field.rs @@ -18,6 +18,7 @@ use crate::{data_type_from_json, data_type_to_json}; use arrow::datatypes::{DataType, Field}; use arrow::error::{ArrowError, Result}; +use arrow_ipc::writer::DictionaryTracker; use std::collections::HashMap; use std::sync::Arc; @@ -218,7 +219,6 @@ pub fn field_from_json(json: &serde_json::Value) -> Result { _ => data_type, }; - let mut dict_id = 0; let mut dict_is_ordered = false; let data_type = match map.get("dictionary") { @@ -231,14 +231,6 @@ pub fn field_from_json(json: &serde_json::Value) -> Result { )); } }; - dict_id = match dictionary.get("id") { - Some(Value::Number(n)) => n.as_i64().unwrap(), - _ => { - return Err(ArrowError::ParseError( - "Field missing 'id' attribute".to_string(), - )); - } - }; dict_is_ordered = match dictionary.get("isOrdered") { Some(&Value::Bool(n)) => n, _ => { @@ -252,8 +244,7 @@ pub fn field_from_json(json: &serde_json::Value) -> Result { _ => data_type, }; - #[allow(deprecated)] - let mut field = Field::new_dict(name, data_type, nullable, dict_id, dict_is_ordered); + let mut field = Field::new_dict(name, data_type, nullable, dict_is_ordered); field.set_metadata(metadata); Ok(field) } @@ -264,27 +255,28 @@ pub fn field_from_json(json: &serde_json::Value) -> Result { } /// Generate a JSON representation of the `Field`. -pub fn field_to_json(field: &Field) -> serde_json::Value { +pub fn field_to_json(dict_tracker: &mut DictionaryTracker, field: &Field) -> serde_json::Value { let children: Vec = match field.data_type() { - DataType::Struct(fields) => fields.iter().map(|x| field_to_json(x.as_ref())).collect(), + DataType::Struct(fields) => fields + .iter() + .map(|x| field_to_json(dict_tracker, x.as_ref())) + .collect(), DataType::List(field) | DataType::LargeList(field) | DataType::FixedSizeList(field, _) - | DataType::Map(field, _) => vec![field_to_json(field)], + | DataType::Map(field, _) => vec![field_to_json(dict_tracker, field)], _ => vec![], }; match field.data_type() { DataType::Dictionary(ref index_type, ref value_type) => { - #[allow(deprecated)] - let dict_id = field.dict_id().unwrap(); serde_json::json!({ "name": field.name(), "nullable": field.is_nullable(), "type": data_type_to_json(value_type), "children": children, "dictionary": { - "id": dict_id, + "id": dict_tracker.next_dict_id(), "indexType": data_type_to_json(index_type), "isOrdered": field.dict_is_ordered().unwrap(), } @@ -345,7 +337,8 @@ mod tests { }"#, ) .unwrap(); - assert_eq!(value, field_to_json(&f)); + let mut dictionary_tracker = DictionaryTracker::new(false); + assert_eq!(value, field_to_json(&mut dictionary_tracker, &f)); } #[test] @@ -398,7 +391,8 @@ mod tests { }"#, ) .unwrap(); - assert_eq!(value, field_to_json(&f)); + let mut dictionary_tracker = DictionaryTracker::new(false); + assert_eq!(value, field_to_json(&mut dictionary_tracker, &f)); } #[test] @@ -415,7 +409,8 @@ mod tests { }"#, ) .unwrap(); - assert_eq!(value, field_to_json(&f)); + let mut dictionary_tracker = DictionaryTracker::new(false); + assert_eq!(value, field_to_json(&mut dictionary_tracker, &f)); } #[test] fn parse_struct_from_json() { diff --git a/arrow-integration-test/src/lib.rs b/arrow-integration-test/src/lib.rs index baa76059f9c6..43f2d174a1cd 100644 --- a/arrow-integration-test/src/lib.rs +++ b/arrow-integration-test/src/lib.rs @@ -77,7 +77,7 @@ pub struct ArrowJsonSchema { } /// Fields are left as JSON `Value` as they vary by `DataType` -#[derive(Deserialize, Serialize, Debug)] +#[derive(Deserialize, Serialize, Debug, Clone)] pub struct ArrowJsonField { /// The name of the field pub name: String, @@ -134,7 +134,7 @@ impl From<&Field> for ArrowJsonField { } /// Represents a dictionary-encoded field in the Arrow JSON format -#[derive(Deserialize, Serialize, Debug)] +#[derive(Deserialize, Serialize, Debug, Clone)] pub struct ArrowJsonFieldDictionary { /// A unique identifier for the dictionary pub id: i64, @@ -147,7 +147,7 @@ pub struct ArrowJsonFieldDictionary { } /// Type of an index for a dictionary-encoded field in the Arrow JSON format -#[derive(Deserialize, Serialize, Debug)] +#[derive(Deserialize, Serialize, Debug, Clone)] pub struct DictionaryIndexType { /// The name of the dictionary index type pub name: String, @@ -238,7 +238,9 @@ impl ArrowJson { let batches: Result> = self .batches .iter() - .map(|col| record_batch_from_json(&schema, col.clone(), Some(&dictionaries))) + .map(|col| { + record_batch_from_json(&schema, &self.schema, col.clone(), Some(&dictionaries)) + }) .collect(); batches @@ -314,13 +316,19 @@ impl ArrowJsonField { /// Generates a [`RecordBatch`] from an Arrow JSON batch, given a schema pub fn record_batch_from_json( schema: &Schema, + json_schema: &ArrowJsonSchema, json_batch: ArrowJsonBatch, json_dictionaries: Option<&HashMap>, ) -> Result { let mut columns = vec![]; - for (field, json_col) in schema.fields().iter().zip(json_batch.columns) { - let col = array_from_json(field, json_col, json_dictionaries)?; + for ((field, json_field), json_col) in schema + .fields() + .iter() + .zip(json_schema.fields.clone().into_iter()) + .zip(json_batch.columns.into_iter()) + { + let col = array_from_json(field, &json_field, json_col, json_dictionaries)?; columns.push(col); } @@ -330,6 +338,7 @@ pub fn record_batch_from_json( /// Construct an Arrow array from a partially typed JSON column pub fn array_from_json( field: &Field, + json_field: &ArrowJsonField, json_col: ArrowJsonColumn, dictionaries: Option<&HashMap>, ) -> Result { @@ -723,7 +732,12 @@ pub fn array_from_json( DataType::List(child_field) => { let null_buf = create_null_buf(&json_col); let children = json_col.children.clone().unwrap(); - let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?; + let child_array = array_from_json( + child_field, + json_field.children.first().unwrap(), + children[0].clone(), + dictionaries, + )?; let offsets: Vec = json_col .offset .unwrap() @@ -743,7 +757,12 @@ pub fn array_from_json( DataType::LargeList(child_field) => { let null_buf = create_null_buf(&json_col); let children = json_col.children.clone().unwrap(); - let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?; + let child_array = array_from_json( + child_field, + json_field.children.first().unwrap(), + children[0].clone(), + dictionaries, + )?; let offsets: Vec = json_col .offset .unwrap() @@ -766,7 +785,12 @@ pub fn array_from_json( } DataType::FixedSizeList(child_field, _) => { let children = json_col.children.clone().unwrap(); - let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?; + let child_array = array_from_json( + child_field, + json_field.children.first().unwrap(), + children[0].clone(), + dictionaries, + )?; let null_buf = create_null_buf(&json_col); let list_data = ArrayData::builder(field.data_type().clone()) .len(json_col.count) @@ -783,8 +807,12 @@ pub fn array_from_json( .len(json_col.count) .null_bit_buffer(Some(null_buf)); - for (field, col) in fields.iter().zip(json_col.children.unwrap()) { - let array = array_from_json(field, col, dictionaries)?; + for ((field, json_field), col) in fields + .iter() + .zip(json_field.children.clone()) + .zip(json_col.children.unwrap()) + { + let array = array_from_json(field, &json_field, col, dictionaries)?; array_data = array_data.add_child_data(array.into_data()); } @@ -792,10 +820,13 @@ pub fn array_from_json( Ok(Arc::new(array)) } DataType::Dictionary(key_type, value_type) => { - #[allow(deprecated)] - let dict_id = field.dict_id().ok_or_else(|| { - ArrowError::JsonError(format!("Unable to find dict_id for field {field:?}")) - })?; + let dict_id = json_field + .dictionary + .as_ref() + .ok_or_else(|| { + ArrowError::JsonError(format!("Unable to find dictionary for field {field:?}")) + })? + .id; // find dictionary let dictionary = dictionaries .ok_or_else(|| { @@ -807,6 +838,7 @@ pub fn array_from_json( match dictionary { Some(dictionary) => dictionary_array_from_json( field, + json_field, json_col, key_type, value_type, @@ -868,7 +900,12 @@ pub fn array_from_json( DataType::Map(child_field, _) => { let null_buf = create_null_buf(&json_col); let children = json_col.children.clone().unwrap(); - let child_array = array_from_json(child_field, children[0].clone(), dictionaries)?; + let child_array = array_from_json( + child_field, + json_field.children.first().unwrap(), + children[0].clone(), + dictionaries, + )?; let offsets: Vec = json_col .offset .unwrap() @@ -900,8 +937,12 @@ pub fn array_from_json( .map(|offsets| offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect()); let mut children = Vec::with_capacity(fields.len()); - for ((_, field), col) in fields.iter().zip(json_col.children.unwrap()) { - let array = array_from_json(field, col, dictionaries)?; + for (((_, field), col), json_field) in fields + .iter() + .zip(json_col.children.unwrap()) + .zip(json_field.children.clone()) + { + let array = array_from_json(field, &json_field, col, dictionaries)?; children.push(array); } @@ -918,6 +959,7 @@ pub fn array_from_json( /// Construct a [`DictionaryArray`] from a partially typed JSON column pub fn dictionary_array_from_json( field: &Field, + json_field: &ArrowJsonField, json_col: ArrowJsonColumn, dict_key: &DataType, dict_value: &DataType, @@ -936,24 +978,39 @@ pub fn dictionary_array_from_json( let null_buf = create_null_buf(&json_col); // build the key data into a buffer, then construct values separately - #[allow(deprecated)] let key_field = Field::new_dict( "key", dict_key.clone(), field.is_nullable(), - #[allow(deprecated)] - field - .dict_id() - .expect("Dictionary fields must have a dict_id value"), field .dict_is_ordered() .expect("Dictionary fields must have a dict_is_ordered value"), ); - let keys = array_from_json(&key_field, json_col, None)?; + let keys = array_from_json( + &key_field, + &ArrowJsonField { + name: "key".to_string(), + field_type: json_field.field_type.clone(), + nullable: false, + children: json_field.children[0].children.clone(), + dictionary: None, + metadata: json_field.metadata.clone(), + }, + json_col, + None, + )?; // note: not enough info on nullability of dictionary - let value_field = Field::new("value", dict_value.clone(), true); + let value_field = Field::new("value", dict_value.clone(), json_field.nullable); let values = array_from_json( &value_field, + &ArrowJsonField { + name: "value".to_string(), + field_type: json_field.field_type.clone(), + nullable: json_field.nullable, + children: json_field.children[1].children.clone(), + dictionary: None, + metadata: json_field.metadata.clone(), + }, dictionary.data.columns[0].clone(), dictionaries, )?; diff --git a/arrow-integration-test/src/schema.rs b/arrow-integration-test/src/schema.rs index 512f0aed8e54..62b69ab99f7a 100644 --- a/arrow-integration-test/src/schema.rs +++ b/arrow-integration-test/src/schema.rs @@ -18,12 +18,14 @@ use crate::{field_from_json, field_to_json}; use arrow::datatypes::{Fields, Schema}; use arrow::error::{ArrowError, Result}; +use arrow_ipc::writer::DictionaryTracker; use std::collections::HashMap; /// Generate a JSON representation of the `Schema`. pub fn schema_to_json(schema: &Schema) -> serde_json::Value { + let mut dictionary_tracker = DictionaryTracker::new(false); serde_json::json!({ - "fields": schema.fields().iter().map(|f| field_to_json(f.as_ref())).collect::>(), + "fields": schema.fields().iter().map(|f| field_to_json(&mut dictionary_tracker, f.as_ref())).collect::>(), "metadata": serde_json::to_value(schema.metadata()).unwrap() }) } @@ -189,12 +191,10 @@ mod tests { Field::new("c30", DataType::Duration(TimeUnit::Millisecond), false), Field::new("c31", DataType::Duration(TimeUnit::Microsecond), false), Field::new("c32", DataType::Duration(TimeUnit::Nanosecond), false), - #[allow(deprecated)] Field::new_dict( "c33", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true, - 123, true, ), Field::new("c34", DataType::LargeBinary, true), @@ -589,7 +589,7 @@ mod tests { "name": "utf8" }, "dictionary": { - "id": 123, + "id": 0, "indexType": { "name": "int", "bitWidth": 32, diff --git a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs index 406419028d00..0981d71d50cd 100644 --- a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs @@ -34,7 +34,6 @@ use arrow_flight::{ use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; use tonic::{Request, Streaming}; -use arrow::datatypes::Schema; use std::sync::Arc; type Error = Box; @@ -72,9 +71,7 @@ async fn upload_data( let (mut upload_tx, upload_rx) = mpsc::channel(10); let options = arrow::ipc::writer::IpcWriteOptions::default(); - #[allow(deprecated)] - let mut dict_tracker = - writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let mut dict_tracker = writer::DictionaryTracker::new(false); let data_gen = writer::IpcDataGenerator::default(); let data = IpcMessage( data_gen @@ -217,33 +214,40 @@ async fn consume_flight_location( let resp = client.do_get(ticket).await?; let mut resp = resp.into_inner(); - let flight_schema = receive_schema_flight_data(&mut resp) + let data = resp + .next() .await - .unwrap_or_else(|| panic!("Failed to receive flight schema")); - let actual_schema = Arc::new(flight_schema); + .ok_or_else(|| Error::from("No data received from Flight server"))??; + let message = + arrow::ipc::root_as_message(&data.data_header[..]).expect("Error parsing message"); + + // message header is a Schema, so read it + let ipc_schema: ipc::Schema = message + .header_as_schema() + .expect("Unable to read IPC message as schema"); + let schema = Arc::new(ipc::convert::fb_to_schema(ipc_schema)); let mut dictionaries_by_id = HashMap::new(); for (counter, expected_batch) in expected_data.iter().enumerate() { - let data = - receive_batch_flight_data(&mut resp, actual_schema.clone(), &mut dictionaries_by_id) - .await - .unwrap_or_else(|| { - panic!( - "Got fewer batches than expected, received so far: {} expected: {}", - counter, - expected_data.len(), - ) - }); + let data = receive_batch_flight_data(&mut resp, ipc_schema, &mut dictionaries_by_id) + .await + .unwrap_or_else(|| { + panic!( + "Got fewer batches than expected, received so far: {} expected: {}", + counter, + expected_data.len(), + ) + }); let metadata = counter.to_string().into_bytes(); assert_eq!(metadata, data.app_metadata); let actual_batch = - flight_data_to_arrow_batch(&data, actual_schema.clone(), &dictionaries_by_id) + flight_data_to_arrow_batch(&data, ipc_schema, schema.clone(), &dictionaries_by_id) .expect("Unable to convert flight data to Arrow batch"); - assert_eq!(actual_schema, actual_batch.schema()); + assert_eq!(schema, actual_batch.schema()); assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); assert_eq!(expected_batch.num_rows(), actual_batch.num_rows()); let schema = expected_batch.schema(); @@ -267,23 +271,9 @@ async fn consume_flight_location( Ok(()) } -async fn receive_schema_flight_data(resp: &mut Streaming) -> Option { - let data = resp.next().await?.ok()?; - let message = - arrow::ipc::root_as_message(&data.data_header[..]).expect("Error parsing message"); - - // message header is a Schema, so read it - let ipc_schema: ipc::Schema = message - .header_as_schema() - .expect("Unable to read IPC message as schema"); - let schema = ipc::convert::fb_to_schema(ipc_schema); - - Some(schema) -} - async fn receive_batch_flight_data( resp: &mut Streaming, - schema: SchemaRef, + ipc_schema: arrow::ipc::Schema<'_>, dictionaries_by_id: &mut HashMap, ) -> Option { let mut data = resp.next().await?.ok()?; @@ -296,7 +286,7 @@ async fn receive_batch_flight_data( message .header_as_dictionary_batch() .expect("Error parsing dictionary"), - &schema, + ipc_schema, dictionaries_by_id, &message.version(), ) diff --git a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs index 92989a20393e..27ec48532492 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs @@ -119,9 +119,7 @@ impl FlightService for FlightServiceImpl { .ok_or_else(|| Status::not_found(format!("Could not find flight. {key}")))?; let options = arrow::ipc::writer::IpcWriteOptions::default(); - #[allow(deprecated)] - let mut dictionary_tracker = - writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let mut dictionary_tracker = writer::DictionaryTracker::new(false); let data_gen = writer::IpcDataGenerator::default(); let data = IpcMessage( data_gen @@ -268,6 +266,7 @@ impl FlightService for FlightServiceImpl { if let Err(e) = save_uploaded_chunks( uploaded_chunks, schema_ref, + flight_data, input_stream, response_tx, schema, @@ -319,6 +318,7 @@ async fn record_batch_from_message( message: ipc::Message<'_>, data_body: &Buffer, schema_ref: SchemaRef, + ipc_schema: ipc::Schema<'_>, dictionaries_by_id: &HashMap, ) -> Result { let ipc_batch = message @@ -328,6 +328,7 @@ async fn record_batch_from_message( let arrow_batch_result = reader::read_record_batch( data_body, ipc_batch, + ipc_schema, schema_ref, dictionaries_by_id, None, @@ -341,7 +342,7 @@ async fn record_batch_from_message( async fn dictionary_from_message( message: ipc::Message<'_>, data_body: &Buffer, - schema_ref: SchemaRef, + ipc_schema: ipc::Schema<'_>, dictionaries_by_id: &mut HashMap, ) -> Result<(), Status> { let ipc_batch = message @@ -351,7 +352,7 @@ async fn dictionary_from_message( let dictionary_batch_result = reader::read_dictionary( data_body, ipc_batch, - &schema_ref, + ipc_schema, dictionaries_by_id, &message.version(), ); @@ -362,6 +363,7 @@ async fn dictionary_from_message( async fn save_uploaded_chunks( uploaded_chunks: Arc>>, schema_ref: Arc, + schema_flight_data: FlightData, mut input_stream: Streaming, mut response_tx: mpsc::Sender>, schema: Schema, @@ -372,6 +374,11 @@ async fn save_uploaded_chunks( let mut dictionaries_by_id = HashMap::new(); + let ipc_schema = arrow::ipc::root_as_message(&schema_flight_data.data_header[..]) + .map_err(|e| Status::invalid_argument(format!("Could not parse message: {e:?}")))? + .header_as_schema() + .ok_or_else(|| Status::invalid_argument("Could not parse message header as schema"))?; + while let Some(Ok(data)) = input_stream.next().await { let message = arrow::ipc::root_as_message(&data.data_header[..]) .map_err(|e| Status::internal(format!("Could not parse message: {e:?}")))?; @@ -389,6 +396,7 @@ async fn save_uploaded_chunks( message, &Buffer::from(data.data_body.as_ref()), schema_ref.clone(), + ipc_schema, &dictionaries_by_id, ) .await?; @@ -399,7 +407,7 @@ async fn save_uploaded_chunks( dictionary_from_message( message, &Buffer::from(data.data_body.as_ref()), - schema_ref.clone(), + ipc_schema, &mut dictionaries_by_id, ) .await?; diff --git a/arrow-integration-testing/src/lib.rs b/arrow-integration-testing/src/lib.rs index e669690ef4f5..6e5ab8276711 100644 --- a/arrow-integration-testing/src/lib.rs +++ b/arrow-integration-testing/src/lib.rs @@ -53,6 +53,7 @@ pub struct ArrowFile { // this is temporarily not being read from dictionaries: HashMap, arrow_json: Value, + json_schema: ArrowJsonSchema, } impl ArrowFile { @@ -60,7 +61,12 @@ impl ArrowFile { pub fn read_batch(&self, batch_num: usize) -> Result { let b = self.arrow_json["batches"].get(batch_num).unwrap(); let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap(); - record_batch_from_json(&self.schema, json_batch, Some(&self.dictionaries)) + record_batch_from_json( + &self.schema, + &self.json_schema, + json_batch, + Some(&self.dictionaries), + ) } /// Read all [RecordBatch]es from the file @@ -71,7 +77,12 @@ impl ArrowFile { .iter() .map(|b| { let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap(); - record_batch_from_json(&self.schema, json_batch, Some(&self.dictionaries)) + record_batch_from_json( + &self.schema, + &self.json_schema, + json_batch, + Some(&self.dictionaries), + ) }) .collect() } @@ -120,6 +131,8 @@ pub fn open_json_file(json_name: &str) -> Result { let reader = BufReader::new(json_file); let arrow_json: Value = serde_json::from_reader(reader).unwrap(); let schema = schema_from_json(&arrow_json["schema"])?; + let json_schema: ArrowJsonSchema = + serde_json::from_value(arrow_json["schema"].clone()).unwrap(); // read dictionaries let mut dictionaries = HashMap::new(); if let Some(dicts) = arrow_json.get("dictionaries") { @@ -137,6 +150,7 @@ pub fn open_json_file(json_name: &str) -> Result { schema, dictionaries, arrow_json, + json_schema, }) } diff --git a/arrow-ipc/benches/ipc_reader.rs b/arrow-ipc/benches/ipc_reader.rs index ab77449eeb7d..5e41d908833a 100644 --- a/arrow-ipc/benches/ipc_reader.rs +++ b/arrow-ipc/benches/ipc_reader.rs @@ -18,7 +18,6 @@ use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder}; use arrow_array::{builder::StringBuilder, RecordBatch}; use arrow_buffer::Buffer; -use arrow_ipc::convert::fb_to_schema; use arrow_ipc::reader::{read_footer_length, FileDecoder, FileReader, StreamReader}; use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter}; use arrow_ipc::{root_as_footer, Block, CompressionType}; @@ -215,9 +214,11 @@ impl IPCBufferDecoder { let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap(); let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap(); - let schema = fb_to_schema(footer.schema().unwrap()); - - let mut decoder = FileDecoder::new(Arc::new(schema), footer.version()); + let mut decoder = FileDecoder::new( + buffer[trailer_start - footer_len..trailer_start].to_vec(), + Default::default(), + footer.version(), + ); // Read dictionaries for block in footer.dictionaries().iter().flatten() { diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index 79dd1726ed70..62b707e622fe 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -19,6 +19,7 @@ use arrow_buffer::Buffer; use arrow_schema::*; +use core::panic; use flatbuffers::{ FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier, VerifierOptions, WIPOffset, @@ -127,12 +128,6 @@ impl<'a> IpcSchemaEncoder<'a> { } } -/// Serialize a schema in IPC format -#[deprecated(since = "54.0.0", note = "Use `IpcSchemaConverter`.")] -pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder<'_> { - IpcSchemaEncoder::new().schema_to_fb(schema) -} - /// Push a key-value metadata into a FlatBufferBuilder and return [WIPOffset] pub fn metadata_to_fb<'a>( fbb: &mut FlatBufferBuilder<'a>, @@ -165,12 +160,10 @@ pub fn schema_to_fb_offset<'a>( impl From> for Field { fn from(field: crate::Field) -> Field { let arrow_field = if let Some(dictionary) = field.dictionary() { - #[allow(deprecated)] Field::new_dict( field.name().unwrap(), get_data_type(field, true), field.nullable(), - dictionary.id(), dictionary.isOrdered(), ) } else { @@ -525,24 +518,13 @@ pub(crate) fn build_field<'a>( match dictionary_tracker { Some(tracker) => Some(get_fb_dictionary( index_type, - #[allow(deprecated)] - tracker.set_dict_id(field), - field - .dict_is_ordered() - .expect("All Dictionary types have `dict_is_ordered`"), - fbb, - )), - None => Some(get_fb_dictionary( - index_type, - #[allow(deprecated)] - field - .dict_id() - .expect("Dictionary type must have a dictionary id"), + tracker.next_dict_id(), field .dict_is_ordered() .expect("All Dictionary types have `dict_is_ordered`"), fbb, )), + None => panic!("IPC must no longer be used without dictionary tracker"), } } else { None @@ -1151,20 +1133,16 @@ mod tests { ), true, ), - #[allow(deprecated)] Field::new_dict( "dictionary", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true, - 123, true, ), - #[allow(deprecated)] Field::new_dict( "dictionary", DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)), true, - 123, true, ), Field::new("decimal", DataType::Decimal128(10, 6), false), diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 83dc5702dc94..d00befcc6978 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -39,6 +39,9 @@ use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, Scalar use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag}; use arrow_schema::*; +use crate::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +use crate::Field as IpcField; + use crate::compression::CompressionCodec; use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER}; use DataType::*; @@ -80,6 +83,7 @@ impl RecordBatchDecoder<'_> { /// - cast the 64-bit array to the appropriate data type fn create_array( &mut self, + ipc_field: IpcField, field: &Field, variadic_counts: &mut VecDeque, ) -> Result { @@ -115,13 +119,21 @@ impl RecordBatchDecoder<'_> { List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => { let list_node = self.next_node(field)?; let list_buffers = [self.next_buffer()?, self.next_buffer()?]; - let values = self.create_array(list_field, variadic_counts)?; + let values = self.create_array( + ipc_field.children().unwrap().get(0), + list_field, + variadic_counts, + )?; self.create_list_array(list_node, data_type, &list_buffers, values) } FixedSizeList(ref list_field, _) => { let list_node = self.next_node(field)?; let list_buffers = [self.next_buffer()?]; - let values = self.create_array(list_field, variadic_counts)?; + let values = self.create_array( + ipc_field.children().unwrap().get(0), + list_field, + variadic_counts, + )?; self.create_list_array(list_node, data_type, &list_buffers, values) } Struct(struct_fields) => { @@ -132,16 +144,22 @@ impl RecordBatchDecoder<'_> { let mut struct_arrays = vec![]; // TODO investigate whether just knowing the number of buffers could // still work - for struct_field in struct_fields { - let child = self.create_array(struct_field, variadic_counts)?; + for (idx, struct_field) in struct_fields.iter().enumerate() { + let child = self.create_array( + ipc_field.children().unwrap().get(idx), + struct_field, + variadic_counts, + )?; struct_arrays.push(child); } self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays) } RunEndEncoded(run_ends_field, values_field) => { let run_node = self.next_node(field)?; - let run_ends = self.create_array(run_ends_field, variadic_counts)?; - let values = self.create_array(values_field, variadic_counts)?; + let children = ipc_field.children().unwrap(); + let run_ends = + self.create_array(children.get(0), run_ends_field, variadic_counts)?; + let values = self.create_array(children.get(1), values_field, variadic_counts)?; let run_array_length = run_node.length() as usize; let builder = ArrayData::builder(data_type.clone()) @@ -156,11 +174,7 @@ impl RecordBatchDecoder<'_> { let index_node = self.next_node(field)?; let index_buffers = [self.next_buffer()?, self.next_buffer()?]; - #[allow(deprecated)] - let dict_id = field.dict_id().ok_or_else(|| { - ArrowError::ParseError(format!("Field {field} does not have dict id")) - })?; - + let dict_id = ipc_field.dictionary().unwrap().id(); let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| { ArrowError::ParseError(format!( "Cannot find a dictionary batch with dict id: {dict_id}" @@ -198,8 +212,11 @@ impl RecordBatchDecoder<'_> { let mut children = Vec::with_capacity(fields.len()); - for (_id, field) in fields.iter() { - let child = self.create_array(field, variadic_counts)?; + let ipc_children = ipc_field.children().unwrap(); + for i in 0..ipc_children.len() { + let ipc_field = ipc_children.get(i); + let field: Field = ipc_field.into(); + let child = self.create_array(ipc_field, &field, variadic_counts)?; children.push(child); } @@ -371,6 +388,8 @@ struct RecordBatchDecoder<'a> { batch: crate::RecordBatch<'a>, /// The output schema schema: SchemaRef, + /// The schema as it is encoded in the IPC source + ipc_schema: crate::Schema<'a>, /// Decoded dictionaries indexed by dictionary id dictionaries_by_id: &'a HashMap, /// Optional compression codec @@ -400,6 +419,7 @@ impl<'a> RecordBatchDecoder<'a> { fn try_new( buf: &'a Buffer, batch: crate::RecordBatch<'a>, + ipc_schema: crate::Schema<'a>, schema: SchemaRef, dictionaries_by_id: &'a HashMap, metadata: &'a MetadataVersion, @@ -418,6 +438,7 @@ impl<'a> RecordBatchDecoder<'a> { Ok(Self { batch, + ipc_schema, schema, dictionaries_by_id, compression, @@ -477,35 +498,46 @@ impl<'a> RecordBatchDecoder<'a> { let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize)); - let schema = Arc::clone(&self.schema); if let Some(projection) = self.projection { let mut arrays = vec![]; // project fields - for (idx, field) in schema.fields().iter().enumerate() { + let ipc_fields = self.ipc_schema.fields().unwrap(); + let ipc_fields_len = ipc_fields.len(); + for idx in 0..ipc_fields_len { // Create array for projected field if let Some(proj_idx) = projection.iter().position(|p| p == &idx) { - let child = self.create_array(field, &mut variadic_counts)?; + let child = self.create_array( + ipc_fields.get(idx), + self.schema.clone().field(idx), + &mut variadic_counts, + )?; arrays.push((proj_idx, child)); } else { - self.skip_field(field, &mut variadic_counts)?; + self.skip_field(self.schema.clone().field(idx), &mut variadic_counts)?; } } assert!(variadic_counts.is_empty()); arrays.sort_by_key(|t| t.0); RecordBatch::try_new_with_options( - Arc::new(schema.project(projection)?), + Arc::new(self.schema.project(projection)?), arrays.into_iter().map(|t| t.1).collect(), &options, ) } else { let mut children = vec![]; // keep track of index as lists require more than one node - for field in schema.fields() { - let child = self.create_array(field, &mut variadic_counts)?; + let ipc_fields = self.ipc_schema.fields().unwrap(); + let ipc_fields_len = ipc_fields.len(); + for idx in 0..ipc_fields_len { + let child = self.create_array( + ipc_fields.get(idx), + self.schema.clone().field(idx), + &mut variadic_counts, + )?; children.push(child); } assert!(variadic_counts.is_empty()); - RecordBatch::try_new_with_options(schema, children, &options) + RecordBatch::try_new_with_options(self.schema, children, &options) } } @@ -615,12 +647,13 @@ impl<'a> RecordBatchDecoder<'a> { pub fn read_record_batch( buf: &Buffer, batch: crate::RecordBatch, + ipc_schema: crate::Schema, schema: SchemaRef, dictionaries_by_id: &HashMap, projection: Option<&[usize]>, metadata: &MetadataVersion, ) -> Result { - RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)? + RecordBatchDecoder::try_new(buf, batch, ipc_schema, schema, dictionaries_by_id, metadata)? .with_projection(projection) .with_require_alignment(false) .read_record_batch() @@ -631,14 +664,14 @@ pub fn read_record_batch( pub fn read_dictionary( buf: &Buffer, batch: crate::DictionaryBatch, - schema: &Schema, + ipc_schema: crate::Schema, dictionaries_by_id: &mut HashMap, metadata: &MetadataVersion, ) -> Result<(), ArrowError> { read_dictionary_impl( buf, batch, - schema, + ipc_schema, dictionaries_by_id, metadata, false, @@ -646,10 +679,41 @@ pub fn read_dictionary( ) } +fn first_field_with_dict_id_from_schema(schema: crate::Schema, id: i64) -> Option { + let c_fields = schema.fields().unwrap(); + let len = c_fields.len(); + for i in 0..len { + let c_field: crate::Field = c_fields.get(i); + if let Some(field) = first_field_with_dict_id_from_field(c_field, id) { + return Some(field); + } + } + + None +} + +fn first_field_with_dict_id_from_field(field: crate::Field, id: i64) -> Option { + if let Some(dictionary) = field.dictionary() { + if dictionary.id() == id { + return Some(field.into()); + } + } + + if let Some(children) = field.children() { + for child in children.iter() { + if let Some(field) = first_field_with_dict_id_from_field(child, id) { + return Some(field); + } + } + } + + None +} + fn read_dictionary_impl( buf: &Buffer, batch: crate::DictionaryBatch, - schema: &Schema, + ipc_schema: crate::Schema, dictionaries_by_id: &mut HashMap, metadata: &MetadataVersion, require_alignment: bool, @@ -662,9 +726,7 @@ fn read_dictionary_impl( } let id = batch.id(); - #[allow(deprecated)] - let fields_using_this_dictionary = schema.fields_with_dict_id(id); - let first_field = fields_using_this_dictionary.first().ok_or_else(|| { + let first_field = first_field_with_dict_id_from_schema(ipc_schema, id).ok_or_else(|| { ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema")) })?; @@ -676,10 +738,22 @@ fn read_dictionary_impl( // Make a fake schema for the dictionary batch. let value = value_type.as_ref().clone(); let schema = Schema::new(vec![Field::new("", value, true)]); + let gen = IpcDataGenerator::default(); + let mut dict_tracker = DictionaryTracker::new(false); + let data = gen.schema_to_bytes_with_dictionary_tracker( + &schema, + &mut dict_tracker, + &IpcWriteOptions::default(), + ); + let message = crate::root_as_message(&data.ipc_message).map_err(|err| { + ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) + })?; + let ipc_schema = message.header_as_schema().unwrap(); // Read a single column let record_batch = RecordBatchDecoder::try_new( buf, batch.data().unwrap(), + ipc_schema, Arc::new(schema), dictionaries_by_id, metadata, @@ -789,7 +863,7 @@ pub fn read_footer_length(buf: [u8; 10]) -> Result { /// let back = fb_to_schema(footer.schema().unwrap()); /// assert_eq!(&back, schema.as_ref()); /// -/// let mut decoder = FileDecoder::new(schema, footer.version()); +/// let mut decoder = FileDecoder::new(buffer[trailer_start - footer_len..trailer_start].to_vec(), Default::default(), footer.version()); /// /// // Read dictionaries /// for block in footer.dictionaries().iter().flatten() { @@ -812,6 +886,8 @@ pub fn read_footer_length(buf: [u8; 10]) -> Result { #[derive(Debug)] pub struct FileDecoder { schema: SchemaRef, + footer_buffer: Vec, + verifier_options: VerifierOptions, dictionaries: HashMap, version: MetadataVersion, projection: Option>, @@ -821,9 +897,19 @@ pub struct FileDecoder { impl FileDecoder { /// Create a new [`FileDecoder`] with the given schema and version - pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self { + pub fn new( + footer_buffer: Vec, + verifier_options: VerifierOptions, + version: MetadataVersion, + ) -> Self { + let footer = + crate::root_as_footer_with_opts(&verifier_options, &footer_buffer[..]).unwrap(); + let ipc_schema = footer.schema().unwrap(); + let schema = crate::convert::fb_to_schema(ipc_schema); Self { - schema, + schema: Arc::new(schema), + footer_buffer, + verifier_options, version, dictionaries: Default::default(), projection: None, @@ -888,10 +974,16 @@ impl FileDecoder { match message.header_type() { crate::MessageHeader::DictionaryBatch => { let batch = message.header_as_dictionary_batch().unwrap(); + let footer = crate::root_as_footer_with_opts( + &self.verifier_options, + &self.footer_buffer[..], + ) + .unwrap(); + let ipc_schema = footer.schema().unwrap(); read_dictionary_impl( &buf.slice(block.metaDataLength() as _), batch, - &self.schema, + ipc_schema, &mut self.dictionaries, &message.version(), self.require_alignment, @@ -919,10 +1011,17 @@ impl FileDecoder { let batch = message.header_as_record_batch().ok_or_else(|| { ArrowError::IpcError("Unable to read IPC message as record batch".to_string()) })?; + let footer = crate::root_as_footer_with_opts( + &self.verifier_options, + &self.footer_buffer[..], + ) + .unwrap(); + let ipc_schema = footer.schema().unwrap(); // read the block that makes up the record batch into a buffer RecordBatchDecoder::try_new( &buf.slice(block.metaDataLength() as _), batch, + ipc_schema, self.schema.clone(), &self.dictionaries, &message.version(), @@ -1047,8 +1146,6 @@ impl FileReaderBuilder { )); } - let schema = crate::convert::fb_to_schema(ipc_schema); - let mut custom_metadata = HashMap::new(); if let Some(fb_custom_metadata) = footer.custom_metadata() { for kv in fb_custom_metadata.into_iter() { @@ -1059,7 +1156,7 @@ impl FileReaderBuilder { } } - let mut decoder = FileDecoder::new(Arc::new(schema), footer.version()); + let mut decoder = FileDecoder::new(footer_data.clone(), verifier_options, footer.version()); if let Some(projection) = self.projection { decoder = decoder.with_projection(projection) } @@ -1311,6 +1408,10 @@ pub struct StreamReader { /// The schema that is read from the stream's first message schema: SchemaRef, + /// The buffer that the IPC schema flatbuffer is stored in. This is needed to satify the + /// lifetime requirements of the flatbuffer reader. + schema_message_buffer: Vec, + /// Optional dictionaries for each schema field. /// /// Dictionaries may be appended to in the streaming format. @@ -1404,6 +1505,7 @@ impl StreamReader { Ok(Self { reader, schema: Arc::new(schema), + schema_message_buffer: meta_buffer.to_vec(), finished: false, dictionaries_by_id, projection, @@ -1487,10 +1589,20 @@ impl StreamReader { let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); self.reader.read_exact(&mut buf)?; + let message = + crate::root_as_message(&self.schema_message_buffer).map_err(|err| { + ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) + })?; + // message header is a Schema, so read it + let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| { + ArrowError::ParseError("Unable to read IPC message as schema".to_string()) + })?; + RecordBatchDecoder::try_new( &buf.into(), batch, - self.schema(), + ipc_schema, + self.schema.clone(), &self.dictionaries_by_id, &message.version(), )? @@ -1510,10 +1622,19 @@ impl StreamReader { let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); self.reader.read_exact(&mut buf)?; + let message = + crate::root_as_message(&self.schema_message_buffer).map_err(|err| { + ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) + })?; + // message header is a Schema, so read it + let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| { + ArrowError::ParseError("Unable to read IPC message as schema".to_string()) + })?; + read_dictionary_impl( &buf.into(), batch, - &self.schema, + ipc_schema, &mut self.dictionaries_by_id, &message.version(), false, @@ -1869,11 +1990,13 @@ mod tests { let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]) .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?; - let schema = fb_to_schema(footer.schema().unwrap()); - let mut decoder = unsafe { - FileDecoder::new(Arc::new(schema), footer.version()) - .with_skip_validation(skip_validation) + FileDecoder::new( + buffer[trailer_start - footer_len..trailer_start].to_vec(), + Default::default(), + footer.version(), + ) + .with_skip_validation(skip_validation) }; // Read dictionaries for block in footer.dictionaries().iter().flatten() { @@ -1984,8 +2107,7 @@ mod tests { let mut writer = crate::writer::FileWriter::try_new_with_options( &mut buf, batch.schema_ref(), - #[allow(deprecated)] - IpcWriteOptions::default().with_preserve_dict_id(false), + IpcWriteOptions::default(), ) .unwrap(); writer.write(&batch).unwrap(); @@ -2129,20 +2251,16 @@ mod tests { let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]); let key_dict_array = DictionaryArray::new(key_dict_keys, values); - #[allow(deprecated)] let keys_field = Arc::new(Field::new_dict( "keys", DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), true, // It is technically not legal for this field to be null. - 1, false, )); - #[allow(deprecated)] let values_field = Arc::new(Field::new_dict( "values", DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), true, - 2, false, )); let entry_struct = StructArray::from(vec![ @@ -2218,24 +2336,20 @@ mod tests { #[test] fn test_roundtrip_stream_dict_of_list_of_dict() { // list - #[allow(deprecated)] let list_data_type = DataType::List(Arc::new(Field::new_dict( "item", DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), true, - 1, false, ))); let offsets: &[i32; 5] = &[0, 2, 4, 4, 6]; test_roundtrip_stream_dict_of_list_of_dict_impl::(list_data_type, offsets); // large list - #[allow(deprecated)] let list_data_type = DataType::LargeList(Arc::new(Field::new_dict( "item", DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), true, - 1, false, ))); let offsets: &[i64; 5] = &[0, 2, 4, 4, 7]; @@ -2249,13 +2363,11 @@ mod tests { let dict_array = DictionaryArray::new(keys, Arc::new(values)); let dict_data = dict_array.into_data(); - #[allow(deprecated)] let list_data_type = DataType::FixedSizeList( Arc::new(Field::new_dict( "item", DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), true, - 1, false, )), 3, @@ -2340,23 +2452,19 @@ mod tests { let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]); let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone()); - #[allow(deprecated)] let keys_field = Arc::new(Field::new_dict( "keys", DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)), true, - 1, false, )); let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]); let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array); - #[allow(deprecated)] let values_field = Arc::new(Field::new_dict( "values", DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)), true, - 2, false, )); let entry_struct = StructArray::from(vec![ @@ -2417,8 +2525,16 @@ mod tests { .unwrap(); let gen = IpcDataGenerator {}; - #[allow(deprecated)] - let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); + let mut dict_tracker = DictionaryTracker::new(false); + + let encoded_schema = gen.schema_to_bytes_with_dictionary_tracker( + &batch.schema(), + &mut dict_tracker, + &IpcWriteOptions::default(), + ); + let schema_message = root_as_message(&encoded_schema.ipc_message).unwrap(); + let ipc_schema = schema_message.header_as_schema().unwrap(); + let (_, encoded) = gen .encoded_batch(&batch, &mut dict_tracker, &Default::default()) .unwrap(); @@ -2436,6 +2552,7 @@ mod tests { let roundtrip = RecordBatchDecoder::try_new( &b, ipc_batch, + ipc_schema, batch.schema(), &Default::default(), &message.version(), @@ -2456,8 +2573,16 @@ mod tests { .unwrap(); let gen = IpcDataGenerator {}; - #[allow(deprecated)] - let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); + let mut dict_tracker = DictionaryTracker::new(false); + + let encoded_schema = gen.schema_to_bytes_with_dictionary_tracker( + &batch.schema(), + &mut dict_tracker, + &IpcWriteOptions::default(), + ); + let schema_message = root_as_message(&encoded_schema.ipc_message).unwrap(); + let ipc_schema = schema_message.header_as_schema().unwrap(); + let (_, encoded) = gen .encoded_batch(&batch, &mut dict_tracker, &Default::default()) .unwrap(); @@ -2475,6 +2600,7 @@ mod tests { let result = RecordBatchDecoder::try_new( &b, ipc_batch, + ipc_schema, batch.schema(), &Default::default(), &message.version(), @@ -2633,7 +2759,6 @@ mod tests { ["a", "b"] .iter() .map(|name| { - #[allow(deprecated)] Field::new_dict( name.to_string(), DataType::Dictionary( @@ -2641,7 +2766,6 @@ mod tests { Box::new(DataType::Utf8), ), true, - 0, false, ) }) @@ -2668,8 +2792,7 @@ mod tests { let mut writer = crate::writer::StreamWriter::try_new_with_options( &mut buf, batch.schema().as_ref(), - #[allow(deprecated)] - crate::writer::IpcWriteOptions::default().with_preserve_dict_id(false), + crate::writer::IpcWriteOptions::default(), ) .expect("Failed to create StreamWriter"); writer.write(&batch).expect("Failed to write RecordBatch"); diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs index 5902cbe4e039..887011c09ea8 100644 --- a/arrow-ipc/src/reader/stream.rs +++ b/arrow-ipc/src/reader/stream.rs @@ -35,6 +35,8 @@ use crate::{MessageHeader, CONTINUATION_MARKER}; pub struct StreamDecoder { /// The schema of this decoder, if read schema: Option, + /// The ipc schema and its buffer + ipc_schema_message: Option>, /// Lookup table for dictionaries by ID dictionaries: HashMap, /// The decoder state @@ -181,8 +183,8 @@ impl StreamDecoder { } } DecoderState::Body { message } => { - let message = message.as_ref(); - let body_length = message.bodyLength() as usize; + let message_ref = message.as_ref(); + let body_length = message_ref.bodyLength() as usize; let body = if self.buf.is_empty() && buffer.len() >= body_length { let body = buffer.slice_with_length(0, body_length); @@ -199,8 +201,8 @@ impl StreamDecoder { std::mem::take(&mut self.buf).into() }; - let version = message.version(); - match message.header_type() { + let version = message_ref.version(); + match message_ref.header_type() { MessageHeader::Schema => { if self.schema.is_some() { return Err(ArrowError::IpcError( @@ -208,19 +210,34 @@ impl StreamDecoder { )); } - let ipc_schema = message.header_as_schema().unwrap(); + // Get a reference to the schema from the message + let ipc_schema = message_ref.header_as_schema().unwrap(); let schema = crate::convert::fb_to_schema(ipc_schema); + + // Store the schema and reset state + self.ipc_schema_message = Some(Arc::new(message.clone())); self.state = DecoderState::default(); self.schema = Some(Arc::new(schema)); } MessageHeader::RecordBatch => { - let batch = message.header_as_record_batch().unwrap(); + let batch = message_ref.header_as_record_batch().unwrap(); + let ipc_schema_message = + self.ipc_schema_message.clone().ok_or_else(|| { + ArrowError::IpcError("Missing IPC schema".to_string()) + })?; + let ipc_schema = ipc_schema_message + .as_ref() + .as_ref() + .header_as_schema() + .unwrap(); + let schema = self.schema.clone().ok_or_else(|| { ArrowError::IpcError("Missing schema".to_string()) })?; let batch = RecordBatchDecoder::try_new( &body, batch, + ipc_schema, schema, &self.dictionaries, &version, @@ -231,14 +248,21 @@ impl StreamDecoder { return Ok(Some(batch)); } MessageHeader::DictionaryBatch => { - let dictionary = message.header_as_dictionary_batch().unwrap(); - let schema = self.schema.as_deref().ok_or_else(|| { - ArrowError::IpcError("Missing schema".to_string()) - })?; + let dictionary = message_ref.header_as_dictionary_batch().unwrap(); + let ipc_schema_message = + self.ipc_schema_message.clone().ok_or_else(|| { + ArrowError::IpcError("Missing IPC schema".to_string()) + })?; + let ipc_schema = ipc_schema_message + .as_ref() + .as_ref() + .header_as_schema() + .unwrap(); + read_dictionary_impl( &body, dictionary, - schema, + ipc_schema, &mut self.dictionaries, &version, self.require_alignment, @@ -332,12 +356,10 @@ mod tests { "test1", DataType::RunEndEncoded( Arc::new(Field::new("run_ends".to_string(), DataType::Int32, false)), - #[allow(deprecated)] Arc::new(Field::new_dict( "values".to_string(), DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true, - 0, false, )), ), @@ -362,8 +384,7 @@ mod tests { let mut writer = StreamWriter::try_new_with_options( &mut buffer, &schema, - #[allow(deprecated)] - IpcWriteOptions::default().with_preserve_dict_id(false), + IpcWriteOptions::default(), ) .expect("Failed to create StreamWriter"); writer.write(&batch).expect("Failed to write RecordBatch"); diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 9bba420574c7..ee50c3b65aef 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -65,15 +65,6 @@ pub struct IpcWriteOptions { /// Compression, if desired. Will result in a runtime error /// if the corresponding feature is not enabled batch_compression_type: Option, - /// Flag indicating whether the writer should preserve the dictionary IDs defined in the - /// schema or generate unique dictionary IDs internally during encoding. - /// - /// Defaults to `false` - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it." - )] - preserve_dict_id: bool, } impl IpcWriteOptions { @@ -122,7 +113,6 @@ impl IpcWriteOptions { write_legacy_ipc_format, metadata_version, batch_compression_type: None, - preserve_dict_id: false, }), crate::MetadataVersion::V5 => { if write_legacy_ipc_format { @@ -130,13 +120,11 @@ impl IpcWriteOptions { "Legacy IPC format only supported on metadata version 4".to_string(), )) } else { - #[allow(deprecated)] Ok(Self { alignment, write_legacy_ipc_format, metadata_version, batch_compression_type: None, - preserve_dict_id: false, }) } } @@ -145,45 +133,15 @@ impl IpcWriteOptions { ))), } } - - /// Return whether the writer is configured to preserve the dictionary IDs - /// defined in the schema - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it." - )] - pub fn preserve_dict_id(&self) -> bool { - #[allow(deprecated)] - self.preserve_dict_id - } - - /// Set whether the IPC writer should preserve the dictionary IDs in the schema - /// or auto-assign unique dictionary IDs during encoding (defaults to true) - /// - /// If this option is true, the application must handle assigning ids - /// to the dictionary batches in order to encode them correctly - /// - /// The default will change to `false` in future releases - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it." - )] - #[allow(deprecated)] - pub fn with_preserve_dict_id(mut self, preserve_dict_id: bool) -> Self { - self.preserve_dict_id = preserve_dict_id; - self - } } impl Default for IpcWriteOptions { fn default() -> Self { - #[allow(deprecated)] Self { alignment: 64, write_legacy_ipc_format: false, metadata_version: crate::MetadataVersion::V5, batch_compression_type: None, - preserve_dict_id: false, } } } @@ -224,10 +182,7 @@ pub struct IpcDataGenerator {} impl IpcDataGenerator { /// Converts a schema to an IPC message along with `dictionary_tracker` - /// and returns it encoded inside [EncodedData] as a flatbuffer - /// - /// Preferred method over [IpcDataGenerator::schema_to_bytes] since it's - /// deprecated since Arrow v54.0.0 + /// and returns it encoded inside [EncodedData] as a flatbuffer. pub fn schema_to_bytes_with_dictionary_tracker( &self, schema: &Schema, @@ -258,36 +213,6 @@ impl IpcDataGenerator { } } - #[deprecated( - since = "54.0.0", - note = "Use `schema_to_bytes_with_dictionary_tracker` instead. This function signature of `schema_to_bytes_with_dictionary_tracker` in the next release." - )] - /// Converts a schema to an IPC message and returns it encoded inside [EncodedData] as a flatbuffer - pub fn schema_to_bytes(&self, schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData { - let mut fbb = FlatBufferBuilder::new(); - let schema = { - #[allow(deprecated)] - // This will be replaced with the IpcSchemaConverter in the next release. - let fb = crate::convert::schema_to_fb_offset(&mut fbb, schema); - fb.as_union_value() - }; - - let mut message = crate::MessageBuilder::new(&mut fbb); - message.add_version(write_options.metadata_version); - message.add_header_type(crate::MessageHeader::Schema); - message.add_bodyLength(0); - message.add_header(schema); - // TODO: custom metadata - let data = message.finish(); - fbb.finish(data, None); - - let data = fbb.finished_data(); - EncodedData { - ipc_message: data.to_vec(), - arrow_data: vec![], - } - } - fn _encode_dictionaries>( &self, column: &ArrayRef, @@ -441,13 +366,9 @@ impl IpcDataGenerator { // It's importnat to only take the dict_id at this point, because the dict ID // sequence is assigned depth-first, so we need to first encode children and have // them take their assigned dict IDs before we take the dict ID for this field. - #[allow(deprecated)] - let dict_id = dict_id_seq - .next() - .or_else(|| field.dict_id()) - .ok_or_else(|| { - ArrowError::IpcError(format!("no dict id for field {}", field.name())) - })?; + let dict_id = dict_id_seq.next().ok_or_else(|| { + ArrowError::IpcError(format!("no dict id for field {}", field.name())) + })?; let emit = dictionary_tracker.insert(dict_id, column)?; @@ -789,11 +710,6 @@ pub struct DictionaryTracker { written: HashMap, dict_ids: Vec, error_on_replacement: bool, - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it." - )] - preserve_dict_id: bool, } impl DictionaryTracker { @@ -813,52 +729,17 @@ impl DictionaryTracker { written: HashMap::new(), dict_ids: Vec::new(), error_on_replacement, - preserve_dict_id: false, - } - } - - /// Create a new [`DictionaryTracker`]. - /// - /// If `error_on_replacement` - /// is true, an error will be generated if an update to an - /// existing dictionary is attempted. - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it." - )] - pub fn new_with_preserve_dict_id(error_on_replacement: bool, preserve_dict_id: bool) -> Self { - #[allow(deprecated)] - Self { - written: HashMap::new(), - dict_ids: Vec::new(), - error_on_replacement, - preserve_dict_id, } } - /// Set the dictionary ID for `field`. - /// - /// If `preserve_dict_id` is true, this will return the `dict_id` in `field` (or panic if `field` does - /// not have a `dict_id` defined). - /// - /// If `preserve_dict_id` is false, this will return the value of the last `dict_id` assigned incremented by 1 - /// or 0 in the case where no dictionary IDs have yet been assigned - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it." - )] - pub fn set_dict_id(&mut self, field: &Field) -> i64 { - #[allow(deprecated)] - let next = if self.preserve_dict_id { - #[allow(deprecated)] - field.dict_id().expect("no dict_id in field") - } else { - self.dict_ids - .last() - .copied() - .map(|i| i + 1) - .unwrap_or_default() - }; + /// Record and return the next dictionary ID. + pub fn next_dict_id(&mut self) -> i64 { + let next = self + .dict_ids + .last() + .copied() + .map(|i| i + 1) + .unwrap_or_default(); self.dict_ids.push(next); next @@ -995,11 +876,7 @@ impl FileWriter { writer.write_all(&super::ARROW_MAGIC)?; writer.write_all(&PADDING[..pad_len])?; // write the schema, set the written bytes to the schema + header - #[allow(deprecated)] - let preserve_dict_id = write_options.preserve_dict_id; - #[allow(deprecated)] - let mut dictionary_tracker = - DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id); + let mut dictionary_tracker = DictionaryTracker::new(true); let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker( schema, &mut dictionary_tracker, @@ -1074,11 +951,7 @@ impl FileWriter { let mut fbb = FlatBufferBuilder::new(); let dictionaries = fbb.create_vector(&self.dictionary_blocks); let record_batches = fbb.create_vector(&self.record_blocks); - #[allow(deprecated)] - let preserve_dict_id = self.write_options.preserve_dict_id; - #[allow(deprecated)] - let mut dictionary_tracker = - DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id); + let mut dictionary_tracker = DictionaryTracker::new(true); let schema = IpcSchemaEncoder::new() .with_dictionary_tracker(&mut dictionary_tracker) .schema_to_fb_offset(&mut fbb, &self.schema); @@ -1229,11 +1102,7 @@ impl StreamWriter { write_options: IpcWriteOptions, ) -> Result { let data_gen = IpcDataGenerator::default(); - #[allow(deprecated)] - let preserve_dict_id = write_options.preserve_dict_id; - #[allow(deprecated)] - let mut dictionary_tracker = - DictionaryTracker::new_with_preserve_dict_id(false, preserve_dict_id); + let mut dictionary_tracker = DictionaryTracker::new(false); // write the schema, set the written bytes to the schema let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker( @@ -1867,7 +1736,6 @@ mod tests { use arrow_array::types::*; use arrow_buffer::ScalarBuffer; - use crate::convert::fb_to_schema; use crate::reader::*; use crate::root_as_footer; use crate::MetadataVersion; @@ -2139,8 +2007,7 @@ mod tests { let array = Arc::new(inner) as ArrayRef; // Dict field with id 2 - #[allow(deprecated)] - let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 2, false); + let dctfield = Field::new_dict("dict", array.data_type().clone(), false, false); let union_fields = [(0, Arc::new(dctfield))].into_iter().collect(); let types = [0, 0, 0].into_iter().collect::>(); @@ -2154,17 +2021,22 @@ mod tests { false, )])); + let gen = IpcDataGenerator {}; + let mut dict_tracker = DictionaryTracker::new(false); + gen.schema_to_bytes_with_dictionary_tracker( + &schema, + &mut dict_tracker, + &IpcWriteOptions::default(), + ); + let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap(); - let gen = IpcDataGenerator {}; - #[allow(deprecated)] - let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); gen.encoded_batch(&batch, &mut dict_tracker, &Default::default()) .unwrap(); // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema // so we expect the dict will be keyed to 0 - assert!(dict_tracker.written.contains_key(&2)); + assert!(dict_tracker.written.contains_key(&0)); } #[test] @@ -2173,13 +2045,10 @@ mod tests { let array = Arc::new(inner) as ArrayRef; - // Dict field with id 2 - #[allow(deprecated)] let dctfield = Arc::new(Field::new_dict( "dict", array.data_type().clone(), false, - 2, false, )); @@ -2192,15 +2061,20 @@ mod tests { false, )])); + let gen = IpcDataGenerator {}; + let mut dict_tracker = DictionaryTracker::new(false); + gen.schema_to_bytes_with_dictionary_tracker( + &schema, + &mut dict_tracker, + &IpcWriteOptions::default(), + ); + let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap(); - let gen = IpcDataGenerator {}; - #[allow(deprecated)] - let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); gen.encoded_batch(&batch, &mut dict_tracker, &Default::default()) .unwrap(); - assert!(dict_tracker.written.contains_key(&2)); + assert!(dict_tracker.written.contains_key(&0)); } fn write_union_file(options: IpcWriteOptions) { @@ -2976,12 +2850,14 @@ mod tests { let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap(); - let schema = fb_to_schema(footer.schema().unwrap()); - // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient // for `read_record_batch` later on to read the data in a zero-copy manner. - let decoder = - FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true); + let decoder = FileDecoder::new( + buffer[trailer_start - footer_len..trailer_start].to_vec(), + Default::default(), + footer.version(), + ) + .with_require_alignment(true); let batches = footer.recordBatches().unwrap(); @@ -3029,12 +2905,14 @@ mod tests { let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap(); let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap(); - let schema = fb_to_schema(footer.schema().unwrap()); - // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying // to an aligned buffer in `ArrayDataBuilder.build_aligned`. - let decoder = - FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true); + let decoder = FileDecoder::new( + buffer[trailer_start - footer_len..trailer_start].to_vec(), + Default::default(), + footer.version(), + ) + .with_require_alignment(true); let batches = footer.recordBatches().unwrap(); diff --git a/arrow-schema/src/ffi.rs b/arrow-schema/src/ffi.rs index d86fb66190b4..790329c7aa10 100644 --- a/arrow-schema/src/ffi.rs +++ b/arrow-schema/src/ffi.rs @@ -920,12 +920,10 @@ mod tests { #[test] fn test_dictionary_ordered() { - #[allow(deprecated)] let schema = Schema::new(vec![Field::new_dict( "dict", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), false, - 0, true, )]); diff --git a/arrow-schema/src/field.rs b/arrow-schema/src/field.rs index dbd671a62a3a..29ec98bdbc90 100644 --- a/arrow-schema/src/field.rs +++ b/arrow-schema/src/field.rs @@ -43,17 +43,12 @@ pub struct Field { name: String, data_type: DataType, nullable: bool, - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it." - )] - dict_id: i64, dict_is_ordered: bool, /// A map of key-value pairs containing additional custom meta data. metadata: HashMap, } -// Auto-derive `PartialEq` traits will pull `dict_id` and `dict_is_ordered` +// Auto-derive `PartialEq` traits will pull and `dict_is_ordered` // into comparison. However, these properties are only used in IPC context // for matching dictionary encoded data. They are not necessary to be same // to consider schema equality. For example, in C++ `Field` implementation, @@ -131,12 +126,10 @@ impl Field { /// Creates a new field with the given name, type, and nullability pub fn new(name: impl Into, data_type: DataType, nullable: bool) -> Self { - #[allow(deprecated)] Field { name: name.into(), data_type, nullable, - dict_id: 0, dict_is_ordered: false, metadata: HashMap::default(), } @@ -161,15 +154,10 @@ impl Field { } /// Creates a new field that has additional dictionary information - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With the dict_id field disappearing this function signature will change by removing the dict_id parameter." - )] pub fn new_dict( name: impl Into, data_type: DataType, nullable: bool, - dict_id: i64, dict_is_ordered: bool, ) -> Self { #[allow(deprecated)] @@ -177,7 +165,6 @@ impl Field { name: name.into(), data_type, nullable, - dict_id, dict_is_ordered, metadata: HashMap::default(), } @@ -559,38 +546,6 @@ impl Field { } } - /// Returns a vector containing all (potentially nested) `Field` instances selected by the - /// dictionary ID they use - #[inline] - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it." - )] - pub(crate) fn fields_with_dict_id(&self, id: i64) -> Vec<&Field> { - self.fields() - .into_iter() - .filter(|&field| { - #[allow(deprecated)] - let matching_dict_id = field.dict_id == id; - matches!(field.data_type(), DataType::Dictionary(_, _)) && matching_dict_id - }) - .collect() - } - - /// Returns the dictionary ID, if this is a dictionary type. - #[inline] - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it." - )] - pub const fn dict_id(&self) -> Option { - match self.data_type { - #[allow(deprecated)] - DataType::Dictionary(_, _) => Some(self.dict_id), - _ => None, - } - } - /// Returns whether this `Field`'s dictionary is ordered, if this is a dictionary type. /// /// # Example @@ -640,13 +595,6 @@ impl Field { /// assert!(field.is_nullable()); /// ``` pub fn try_merge(&mut self, from: &Field) -> Result<(), ArrowError> { - #[allow(deprecated)] - if from.dict_id != self.dict_id { - return Err(ArrowError::SchemaError(format!( - "Fail to merge schema field '{}' because from dict_id = {} does not match {}", - self.name, from.dict_id, self.dict_id - ))); - } if from.dict_is_ordered != self.dict_is_ordered { return Err(ArrowError::SchemaError(format!( "Fail to merge schema field '{}' because from dict_is_ordered = {} does not match {}", @@ -783,11 +731,8 @@ impl Field { /// * self.metadata is a superset of other.metadata /// * all other fields are equal pub fn contains(&self, other: &Field) -> bool { - #[allow(deprecated)] - let matching_dict_id = self.dict_id == other.dict_id; self.name == other.name && self.data_type.contains(&other.data_type) - && matching_dict_id && self.dict_is_ordered == other.dict_is_ordered // self need to be nullable or both of them are not nullable && (self.nullable || !other.nullable) @@ -836,8 +781,7 @@ mod test { fn test_new_dict_with_string() { // Fields should allow owned Strings to support reuse let s = "c1"; - #[allow(deprecated)] - Field::new_dict(s, DataType::Int64, false, 4, false); + Field::new_dict(s, DataType::Int64, false, false); } #[test] @@ -952,63 +896,6 @@ mod test { ); } - #[test] - fn test_fields_with_dict_id() { - #[allow(deprecated)] - let dict1 = Field::new_dict( - "dict1", - DataType::Dictionary(DataType::Utf8.into(), DataType::Int32.into()), - false, - 10, - false, - ); - #[allow(deprecated)] - let dict2 = Field::new_dict( - "dict2", - DataType::Dictionary(DataType::Int32.into(), DataType::Int8.into()), - false, - 20, - false, - ); - - let field = Field::new( - "struct]>", - DataType::Struct(Fields::from(vec![ - dict1.clone(), - Field::new( - "list[struct]>]", - DataType::List(Arc::new(Field::new( - "struct]>", - DataType::Struct(Fields::from(vec![ - dict1.clone(), - Field::new( - "list[struct]", - DataType::List(Arc::new(Field::new( - "struct", - DataType::Struct(vec![dict2.clone()].into()), - false, - ))), - false, - ), - ])), - false, - ))), - false, - ), - ])), - false, - ); - - #[allow(deprecated)] - for field in field.fields_with_dict_id(10) { - assert_eq!(dict1, *field); - } - #[allow(deprecated)] - for field in field.fields_with_dict_id(20) { - assert_eq!(dict2, *field); - } - } - fn get_field_hash(field: &Field) -> u64 { let mut s = DefaultHasher::new(); field.hash(&mut s); @@ -1018,32 +905,26 @@ mod test { #[test] fn test_field_comparison_case() { // dictionary-encoding properties not used for field comparison - #[allow(deprecated)] let dict1 = Field::new_dict( "dict1", DataType::Dictionary(DataType::Utf8.into(), DataType::Int32.into()), false, - 10, false, ); - #[allow(deprecated)] let dict2 = Field::new_dict( "dict1", DataType::Dictionary(DataType::Utf8.into(), DataType::Int32.into()), false, - 20, false, ); assert_eq!(dict1, dict2); assert_eq!(get_field_hash(&dict1), get_field_hash(&dict2)); - #[allow(deprecated)] let dict1 = Field::new_dict( "dict0", DataType::Dictionary(DataType::Utf8.into(), DataType::Int32.into()), false, - 10, false, ); diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 9affd4162995..ec772d2fb579 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -387,20 +387,6 @@ impl Schema { Ok(&self.fields[self.index_of(name)?]) } - /// Returns a vector of immutable references to all [`Field`] instances selected by - /// the dictionary ID they use. - #[deprecated( - since = "54.0.0", - note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it." - )] - pub fn fields_with_dict_id(&self, dict_id: i64) -> Vec<&Field> { - #[allow(deprecated)] - self.fields - .iter() - .flat_map(|f| f.fields_with_dict_id(dict_id)) - .collect() - } - /// Find the index of the column with the given name. pub fn index_of(&self, name: &str) -> Result { let (idx, _) = self.fields().find(name).ok_or_else(|| { @@ -705,13 +691,13 @@ mod tests { fn create_schema_string() { let schema = person_schema(); assert_eq!(schema.to_string(), - "Field { name: \"first_name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {\"k\": \"v\"} }, \ - Field { name: \"last_name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \ + "Field { name: \"first_name\", data_type: Utf8, nullable: false, dict_is_ordered: false, metadata: {\"k\": \"v\"} }, \ + Field { name: \"last_name\", data_type: Utf8, nullable: false, dict_is_ordered: false, metadata: {} }, \ Field { name: \"address\", data_type: Struct([\ - Field { name: \"street\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \ - Field { name: \"zip\", data_type: UInt16, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }\ - ]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \ - Field { name: \"interests\", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 123, dict_is_ordered: true, metadata: {} }") + Field { name: \"street\", data_type: Utf8, nullable: false, dict_is_ordered: false, metadata: {} }, \ + Field { name: \"zip\", data_type: UInt16, nullable: false, dict_is_ordered: false, metadata: {} }\ + ]), nullable: false, dict_is_ordered: false, metadata: {} }, \ + Field { name: \"interests\", data_type: Dictionary(Int32, Utf8), nullable: true, dict_is_ordered: true, metadata: {} }") } #[test] @@ -726,9 +712,6 @@ mod tests { assert_eq!(first_name.name(), "first_name"); assert_eq!(first_name.data_type(), &DataType::Utf8); assert!(!first_name.is_nullable()); - #[allow(deprecated)] - let dict_id = first_name.dict_id(); - assert_eq!(dict_id, None); assert_eq!(first_name.dict_is_ordered(), None); let metadata = first_name.metadata(); @@ -745,9 +728,6 @@ mod tests { interests.data_type(), &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)) ); - #[allow(deprecated)] - let dict_id = interests.dict_id(); - assert_eq!(dict_id, Some(123)); assert_eq!(interests.dict_is_ordered(), Some(true)); } @@ -1183,23 +1163,6 @@ mod tests { schema.field_with_name("nickname").unwrap(); } - #[test] - fn schema_field_with_dict_id() { - let schema = person_schema(); - - #[allow(deprecated)] - let fields_dict_123: Vec<_> = schema - .fields_with_dict_id(123) - .iter() - .map(|f| f.name()) - .collect(); - assert_eq!(fields_dict_123, vec!["interests"]); - - #[allow(deprecated)] - let is_empty = schema.fields_with_dict_id(456).is_empty(); - assert!(is_empty); - } - fn person_schema() -> Schema { let kv_array = [("k".to_string(), "v".to_string())]; let field_metadata: HashMap = kv_array.iter().cloned().collect(); @@ -1222,7 +1185,6 @@ mod tests { "interests", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true, - 123, true, ), ]) diff --git a/arrow/examples/zero_copy_ipc.rs b/arrow/examples/zero_copy_ipc.rs index 15fc477c59cf..e28a2bfc63b3 100644 --- a/arrow/examples/zero_copy_ipc.rs +++ b/arrow/examples/zero_copy_ipc.rs @@ -24,12 +24,10 @@ use arrow::array::{record_batch, RecordBatch}; use arrow::error::Result; use arrow_buffer::Buffer; use arrow_cast::pretty::pretty_format_batches; -use arrow_ipc::convert::fb_to_schema; use arrow_ipc::reader::{read_footer_length, FileDecoder}; use arrow_ipc::writer::FileWriter; use arrow_ipc::{root_as_footer, Block}; use std::path::PathBuf; -use std::sync::Arc; /// This example shows how to read data from an Arrow IPC file without copying /// using `mmap` and the [`FileDecoder`] API @@ -101,9 +99,11 @@ impl IPCBufferDecoder { let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap(); let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap(); - let schema = fb_to_schema(footer.schema().unwrap()); - - let mut decoder = FileDecoder::new(Arc::new(schema), footer.version()); + let mut decoder = FileDecoder::new( + buffer[trailer_start - footer_len..trailer_start].to_vec(), + Default::default(), + footer.version(), + ); // Read dictionaries for block in footer.dictionaries().iter().flatten() { diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 1e1054c9a063..ba2f86de9f96 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -2864,13 +2864,11 @@ mod tests { #[test] fn arrow_writer_string_dictionary() { - // define schema #[allow(deprecated)] let schema = Arc::new(Schema::new(vec![Field::new_dict( "dictionary", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true, - 42, true, )])); @@ -2887,12 +2885,10 @@ mod tests { #[test] fn arrow_writer_primitive_dictionary() { // define schema - #[allow(deprecated)] let schema = Arc::new(Schema::new(vec![Field::new_dict( "dictionary", DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)), true, - 42, true, )])); @@ -2956,12 +2952,10 @@ mod tests { #[test] fn arrow_writer_string_dictionary_unsigned_index() { // define schema - #[allow(deprecated)] let schema = Arc::new(Schema::new(vec![Field::new_dict( "dictionary", DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)), true, - 42, true, )])); diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs index 16d46bd852dc..8394563b9cda 100644 --- a/parquet/src/arrow/schema/complex.rs +++ b/parquet/src/arrow/schema/complex.rs @@ -552,12 +552,9 @@ fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<& match arrow_hint { Some(hint) => { // If the inferred type is a dictionary, preserve dictionary metadata - #[allow(deprecated)] - let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) { - (DataType::Dictionary(_, _), Some(id), Some(ordered)) => - { - #[allow(deprecated)] - Field::new_dict(name, data_type, nullable, id, ordered) + let field = match (&data_type, hint.dict_is_ordered()) { + (DataType::Dictionary(_, _), Some(ordered)) => { + Field::new_dict(name, data_type, nullable, ordered) } _ => Field::new(name, data_type, nullable), }; diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 89c42f5eaf92..c4f80eccc619 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -174,9 +174,7 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result { /// Encodes the Arrow schema into the IPC format, and base64 encodes it pub fn encode_arrow_schema(schema: &Schema) -> String { let options = writer::IpcWriteOptions::default(); - #[allow(deprecated)] - let mut dictionary_tracker = - writer::DictionaryTracker::new_with_preserve_dict_id(true, options.preserve_dict_id()); + let mut dictionary_tracker = writer::DictionaryTracker::new(true); let data_gen = writer::IpcDataGenerator::default(); let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options); @@ -1991,12 +1989,10 @@ mod tests { // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false), // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false), // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false), - #[allow(deprecated)] Field::new_dict( "c31", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true, - 123, true, ) .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),