Skip to content

Remove dict_id from schema and make it an IPC concern only #7467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion arrow-flight/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl FlightDataDecoder {

self.state = Some(FlightStreamState {
schema: Arc::clone(&schema),
schema_message: data.clone(),
dictionaries_by_field,
});
Ok(Some(DecodedFlightData::new_schema(data, schema)))
Expand All @@ -302,10 +303,15 @@ impl FlightDataDecoder {
)
})?;

let ipc_schema = arrow_ipc::root_as_message(&state.schema_message.data_header)
.unwrap()
.header_as_schema()
.unwrap();

arrow_ipc::reader::read_dictionary(
&buffer,
dictionary_batch,
&state.schema,
ipc_schema,
&mut state.dictionaries_by_field,
&message.version(),
)
Expand All @@ -325,8 +331,14 @@ impl FlightDataDecoder {
));
};

let ipc_schema = arrow_ipc::root_as_message(&state.schema_message.data_header)
.unwrap()
.header_as_schema()
.unwrap();

let batch = flight_data_to_arrow_batch(
&data,
ipc_schema,
Arc::clone(&state.schema),
&state.dictionaries_by_field,
)
Expand Down Expand Up @@ -382,6 +394,7 @@ impl futures::Stream for FlightDataDecoder {
#[derive(Debug)]
struct FlightStreamState {
schema: SchemaRef,
schema_message: FlightData,
dictionaries_by_field: HashMap<i64, ArrayRef>,
}

Expand Down
29 changes: 6 additions & 23 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,15 +535,11 @@ fn prepare_field_for_flight(
)
.with_metadata(field.metadata().clone())
} else {
#[allow(deprecated)]
let dict_id = dictionary_tracker.set_dict_id(field.as_ref());

#[allow(deprecated)]
dictionary_tracker.next_dict_id();
Field::new_dict(
field.name(),
field.data_type().clone(),
field.is_nullable(),
dict_id,
field.dict_is_ordered().unwrap_or_default(),
)
.with_metadata(field.metadata().clone())
Expand Down Expand Up @@ -585,14 +581,11 @@ fn prepare_schema_for_flight(
)
.with_metadata(field.metadata().clone())
} else {
#[allow(deprecated)]
let dict_id = dictionary_tracker.set_dict_id(field.as_ref());
#[allow(deprecated)]
dictionary_tracker.next_dict_id();
Field::new_dict(
field.name(),
field.data_type().clone(),
field.is_nullable(),
dict_id,
field.dict_is_ordered().unwrap_or_default(),
)
.with_metadata(field.metadata().clone())
Expand Down Expand Up @@ -654,16 +647,10 @@ struct FlightIpcEncoder {

impl FlightIpcEncoder {
fn new(options: IpcWriteOptions, error_on_replacement: bool) -> Self {
#[allow(deprecated)]
let preserve_dict_id = options.preserve_dict_id();
Self {
options,
data_gen: IpcDataGenerator::default(),
#[allow(deprecated)]
dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id(
error_on_replacement,
preserve_dict_id,
),
dictionary_tracker: DictionaryTracker::new(error_on_replacement),
}
}

Expand Down Expand Up @@ -1547,9 +1534,8 @@ mod tests {
async fn verify_flight_round_trip(mut batches: Vec<RecordBatch>) {
let expected_schema = batches.first().unwrap().schema();

#[allow(deprecated)]
let encoder = FlightDataEncoderBuilder::default()
.with_options(IpcWriteOptions::default().with_preserve_dict_id(false))
.with_options(IpcWriteOptions::default())
.with_dictionary_handling(DictionaryHandling::Resend)
.build(futures::stream::iter(batches.clone().into_iter().map(Ok)));

Expand All @@ -1575,8 +1561,7 @@ mod tests {
HashMap::from([("some_key".to_owned(), "some_value".to_owned())]),
);

#[allow(deprecated)]
let mut dictionary_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
let mut dictionary_tracker = DictionaryTracker::new(false);

let got = prepare_schema_for_flight(&schema, &mut dictionary_tracker, false);
assert!(got.metadata().contains_key("some_key"));
Expand Down Expand Up @@ -1606,9 +1591,7 @@ mod tests {
options: &IpcWriteOptions,
) -> (Vec<FlightData>, FlightData) {
let data_gen = IpcDataGenerator::default();
#[allow(deprecated)]
let mut dictionary_tracker =
DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
let mut dictionary_tracker = DictionaryTracker::new(false);

let (encoded_dictionaries, encoded_batch) = data_gen
.encoded_batch(batch, &mut dictionary_tracker, options)
Expand Down
4 changes: 1 addition & 3 deletions arrow-flight/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ pub struct IpcMessage(pub Bytes);

fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
let data_gen = writer::IpcDataGenerator::default();
#[allow(deprecated)]
let mut dict_tracker =
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
let mut dict_tracker = writer::DictionaryTracker::new(false);
data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
}

Expand Down
2 changes: 2 additions & 0 deletions arrow-flight/src/sql/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ pub enum ArrowFlightData {
pub fn arrow_data_from_flight_data(
flight_data: FlightData,
arrow_schema_ref: &SchemaRef,
ipc_schema: arrow_ipc::Schema,
) -> Result<ArrowFlightData, ArrowError> {
let ipc_message = root_as_message(&flight_data.data_header[..])
.map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?;
Expand All @@ -723,6 +724,7 @@ pub fn arrow_data_from_flight_data(
let record_batch = read_record_batch(
&Buffer::from(flight_data.data_body),
ipc_record_batch,
ipc_schema,
arrow_schema_ref.clone(),
&dictionaries_by_field,
None,
Expand Down
9 changes: 5 additions & 4 deletions arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ pub fn flight_data_to_batches(flight_data: &[FlightData]) -> Result<Vec<RecordBa
let mut batches = vec![];
let dictionaries_by_id = HashMap::new();
for datum in flight_data[1..].iter() {
let batch = flight_data_to_arrow_batch(datum, schema.clone(), &dictionaries_by_id)?;
let batch =
flight_data_to_arrow_batch(datum, ipc_schema, schema.clone(), &dictionaries_by_id)?;
batches.push(batch);
}
Ok(batches)
Expand All @@ -53,6 +54,7 @@ pub fn flight_data_to_batches(flight_data: &[FlightData]) -> Result<Vec<RecordBa
/// Convert `FlightData` (with supplied schema and dictionaries) to an arrow `RecordBatch`.
pub fn flight_data_to_arrow_batch(
data: &FlightData,
ipc_schema: arrow_ipc::Schema,
schema: SchemaRef,
dictionaries_by_id: &HashMap<i64, ArrayRef>,
) -> Result<RecordBatch, ArrowError> {
Expand All @@ -71,6 +73,7 @@ pub fn flight_data_to_arrow_batch(
reader::read_record_batch(
&Buffer::from(data.data_body.as_ref()),
batch,
ipc_schema,
schema,
dictionaries_by_id,
None,
Expand All @@ -90,9 +93,7 @@ pub fn batches_to_flight_data(
let mut flight_data = vec![];

let data_gen = writer::IpcDataGenerator::default();
#[allow(deprecated)]
let mut dictionary_tracker =
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
let mut dictionary_tracker = writer::DictionaryTracker::new(false);

for batch in batches.iter() {
let (encoded_dictionaries, encoded_batch) =
Expand Down
1 change: 1 addition & 0 deletions arrow-integration-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ all-features = true
[dependencies]
arrow = { workspace = true }
arrow-buffer = { workspace = true }
arrow-ipc = { workspace = true }
hex = { version = "0.4", default-features = false, features = ["std"] }
serde = { version = "1.0", default-features = false, features = ["rc", "derive"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
Expand Down
35 changes: 15 additions & 20 deletions arrow-integration-test/src/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::{data_type_from_json, data_type_to_json};
use arrow::datatypes::{DataType, Field};
use arrow::error::{ArrowError, Result};
use arrow_ipc::writer::DictionaryTracker;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -218,7 +219,6 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
_ => data_type,
};

let mut dict_id = 0;
let mut dict_is_ordered = false;

let data_type = match map.get("dictionary") {
Expand All @@ -231,14 +231,6 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
));
}
};
dict_id = match dictionary.get("id") {
Some(Value::Number(n)) => n.as_i64().unwrap(),
_ => {
return Err(ArrowError::ParseError(
"Field missing 'id' attribute".to_string(),
));
}
};
dict_is_ordered = match dictionary.get("isOrdered") {
Some(&Value::Bool(n)) => n,
_ => {
Expand All @@ -252,8 +244,7 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
_ => data_type,
};

#[allow(deprecated)]
let mut field = Field::new_dict(name, data_type, nullable, dict_id, dict_is_ordered);
let mut field = Field::new_dict(name, data_type, nullable, dict_is_ordered);
field.set_metadata(metadata);
Ok(field)
}
Expand All @@ -264,27 +255,28 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
}

/// Generate a JSON representation of the `Field`.
pub fn field_to_json(field: &Field) -> serde_json::Value {
pub fn field_to_json(dict_tracker: &mut DictionaryTracker, field: &Field) -> serde_json::Value {
let children: Vec<serde_json::Value> = match field.data_type() {
DataType::Struct(fields) => fields.iter().map(|x| field_to_json(x.as_ref())).collect(),
DataType::Struct(fields) => fields
.iter()
.map(|x| field_to_json(dict_tracker, x.as_ref()))
.collect(),
DataType::List(field)
| DataType::LargeList(field)
| DataType::FixedSizeList(field, _)
| DataType::Map(field, _) => vec![field_to_json(field)],
| DataType::Map(field, _) => vec![field_to_json(dict_tracker, field)],
_ => vec![],
};

match field.data_type() {
DataType::Dictionary(ref index_type, ref value_type) => {
#[allow(deprecated)]
let dict_id = field.dict_id().unwrap();
serde_json::json!({
"name": field.name(),
"nullable": field.is_nullable(),
"type": data_type_to_json(value_type),
"children": children,
"dictionary": {
"id": dict_id,
"id": dict_tracker.next_dict_id(),
"indexType": data_type_to_json(index_type),
"isOrdered": field.dict_is_ordered().unwrap(),
}
Expand Down Expand Up @@ -345,7 +337,8 @@ mod tests {
}"#,
)
.unwrap();
assert_eq!(value, field_to_json(&f));
let mut dictionary_tracker = DictionaryTracker::new(false);
assert_eq!(value, field_to_json(&mut dictionary_tracker, &f));
}

#[test]
Expand Down Expand Up @@ -398,7 +391,8 @@ mod tests {
}"#,
)
.unwrap();
assert_eq!(value, field_to_json(&f));
let mut dictionary_tracker = DictionaryTracker::new(false);
assert_eq!(value, field_to_json(&mut dictionary_tracker, &f));
}

#[test]
Expand All @@ -415,7 +409,8 @@ mod tests {
}"#,
)
.unwrap();
assert_eq!(value, field_to_json(&f));
let mut dictionary_tracker = DictionaryTracker::new(false);
assert_eq!(value, field_to_json(&mut dictionary_tracker, &f));
}
#[test]
fn parse_struct_from_json() {
Expand Down
Loading
Loading