From ec4ca5d56e55480dd3970181c20c3a5e6e9f2303 Mon Sep 17 00:00:00 2001 From: Martin Grigorov Date: Wed, 25 Oct 2023 10:19:03 +0300 Subject: [PATCH] AVRO-3894: [Rust] Record field aliases are not taken into account when serializing (#2566) * AVRO-3894: [Rust] Record field aliases are not taken into account when serializing Also don't treat field's aliases as custom attributes. Signed-off-by: Martin Tzvetanov Grigorov * AVRO-3894: [Rust] Add a unit test for schema_compatibility Provided-by: Josua Stingelin Signed-off-by: Martin Tzvetanov Grigorov --------- Signed-off-by: Martin Tzvetanov Grigorov --- lang/rust/avro/src/encode.rs | 37 ++++++++++-------- lang/rust/avro/src/error.rs | 2 +- lang/rust/avro/src/schema.rs | 2 +- lang/rust/avro/src/schema_compatibility.rs | 45 ++++++++++++++++++++++ lang/rust/avro/src/writer.rs | 33 ++++++++++++++++ 5 files changed, 101 insertions(+), 18 deletions(-) diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs index f347767f3cc..4593779ac1a 100644 --- a/lang/rust/avro/src/encode.rs +++ b/lang/rust/avro/src/encode.rs @@ -219,23 +219,28 @@ pub(crate) fn encode_internal>( for schema_field in schema_fields.iter() { let name = &schema_field.name; - let value = match lookup.get(name) { - Some(value) => value, - None => { - return Err(Error::NoEntryInLookupTable( - name.clone(), - format!("{lookup:?}"), - )); + let value_opt = lookup.get(name).or_else(|| { + if let Some(aliases) = &schema_field.aliases { + aliases.iter().find_map(|alias| lookup.get(alias)) + } else { + None } - }; - - encode_internal( - value, - &schema_field.schema, - names, - &record_namespace, - buffer, - )?; + }); + + if let Some(value) = value_opt { + encode_internal( + value, + &schema_field.schema, + names, + &record_namespace, + buffer, + )?; + } else { + return Err(Error::NoEntryInLookupTable( + name.clone(), + format!("{lookup:?}"), + )); + } } } else { error!("invalid schema type for Record: {:?}", schema); diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs index 7b27ad57b55..00a04c93130 100644 --- a/lang/rust/avro/src/error.rs +++ b/lang/rust/avro/src/error.rs @@ -460,7 +460,7 @@ pub enum Error { #[error("Signed decimal bytes length {0} not equal to fixed schema size {1}.")] EncodeDecimalAsFixedError(usize, usize), - #[error("There is no entry for {0} in the lookup table: {1}.")] + #[error("There is no entry for '{0}' in the lookup table: {1}.")] NoEntryInLookupTable(String, String), #[error("Can only encode value type {value_kind:?} as one of {supported_schema:?}")] diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index 914efe10e23..d2858722bd6 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -748,7 +748,7 @@ impl RecordField { let mut custom_attributes: BTreeMap = BTreeMap::new(); for (key, value) in field { match key.as_str() { - "type" | "name" | "doc" | "default" | "order" | "position" => continue, + "type" | "name" | "doc" | "default" | "order" | "position" | "aliases" => continue, _ => custom_attributes.insert(key.clone(), value.clone()), }; } diff --git a/lang/rust/avro/src/schema_compatibility.rs b/lang/rust/avro/src/schema_compatibility.rs index a15c18407ef..cafd250f7e0 100644 --- a/lang/rust/avro/src/schema_compatibility.rs +++ b/lang/rust/avro/src/schema_compatibility.rs @@ -1038,4 +1038,49 @@ mod tests { Ok(()) } + + #[test] + fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult + { + use serde::{Deserialize, Serialize}; + + const RAW_SCHEMA_V1: &str = r#" + { + "type": "record", + "name": "Conference", + "namespace": "advdaba", + "fields": [ + {"type": "string", "name": "name"}, + {"type": "long", "name": "date"} + ] + }"#; + const RAW_SCHEMA_V2: &str = r#" + { + "type": "record", + "name": "Conference", + "namespace": "advdaba", + "fields": [ + {"type": "string", "name": "name"}, + {"type": "long", "name": "date", "aliases" : [ "time" ]} + ] + }"#; + + #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] + pub struct Conference { + pub name: String, + pub date: i64, + } + #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] + pub struct ConferenceV2 { + pub name: String, + pub time: i64, + } + + let schema_v1 = Schema::parse_str(RAW_SCHEMA_V1)?; + let schema_v2 = Schema::parse_str(RAW_SCHEMA_V2)?; + + assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2)); + + Ok(()) + } } diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs index 8802ce604a9..d968d28e053 100644 --- a/lang/rust/avro/src/writer.rs +++ b/lang/rust/avro/src/writer.rs @@ -1343,4 +1343,37 @@ mod tests { Ok(()) } + + #[test] + fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult { + const SCHEMA: &str = r#" + { + "type": "record", + "name": "Conference", + "fields": [ + {"type": "string", "name": "name"}, + {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]} + ] + }"#; + + #[derive(Debug, PartialEq, Eq, Clone, Serialize)] + pub struct Conference { + pub name: String, + pub time: Option, + } + + let conf = Conference { + name: "RustConf".to_string(), + time: Some(1234567890), + }; + + let schema = Schema::parse_str(SCHEMA)?; + let mut writer = Writer::new(&schema, Vec::new()); + + let bytes = writer.append_ser(conf)?; + + assert_eq!(198, bytes); + + Ok(()) + } }