Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/flow: add array inference to protocol #1787

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/assemble/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub fn inference(shape: &Shape, exists: Exists) -> flow::Inference {
} else {
None
},
array: None,
}
}

Expand Down Expand Up @@ -120,7 +121,9 @@ pub fn partition_template(
let compression_codec = compression_codec(codec.unwrap_or(models::CompressionCodec::Gzip));

// If an explicit flush interval isn't provided, default to 24 hours
let flush_interval = flush_interval.unwrap_or(std::time::Duration::from_secs(24 * 3600)).into();
let flush_interval = flush_interval
.unwrap_or(std::time::Duration::from_secs(24 * 3600))
.into();

// If a fragment length isn't set, default and then map MB to bytes.
let length = (length.unwrap_or(512) as i64) << 20;
Expand Down
3 changes: 3 additions & 0 deletions crates/assemble/src/snapshots/assemble__test__inference.snap
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ expression: "&[out1, out2, out3]"
secret: true,
exists: Must,
numeric: None,
array: None,
},
Inference {
types: [
Expand All @@ -34,6 +35,7 @@ expression: "&[out1, out2, out3]"
secret: true,
exists: May,
numeric: None,
array: None,
},
Inference {
types: [
Expand Down Expand Up @@ -61,5 +63,6 @@ expression: "&[out1, out2, out3]"
maximum: 1000.0,
},
),
array: None,
},
]
24 changes: 24 additions & 0 deletions crates/proto-flow/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pub struct Inference {
pub exists: i32,
#[prost(message, optional, tag = "9")]
pub numeric: ::core::option::Option<inference::Numeric>,
#[prost(message, optional, tag = "10")]
pub array: ::core::option::Option<inference::Array>,
}
/// Nested message and enum types in `Inference`.
pub mod inference {
Expand Down Expand Up @@ -130,6 +132,28 @@ pub mod inference {
#[prost(double, tag = "4")]
pub maximum: f64,
}
/// Array type-specific inferences. Will be nil if types doesn't include
/// "array", or if the specification was built prior to array inference
/// existing in the protocol.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Array {
/// Minimum number of items the array must contain.
#[prost(uint32, tag = "1")]
pub min_items: u32,
/// True if there is an inferred maximum allowed number of items the array
/// may contain, otherwise False.
#[prost(bool, tag = "2")]
pub has_max_items: bool,
/// Maximum number of items the array may contain.
#[prost(uint32, tag = "3")]
pub max_items: u32,
/// The possible types of items contained in this array.
/// Subset of ["null", "boolean", "object", "array", "integer", "numeric",
/// "string"].
#[prost(string, repeated, tag = "4")]
pub item_types: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
/// Exists enumerates the possible states of existence for a location.
#[derive(
Clone,
Expand Down
167 changes: 167 additions & 0 deletions crates/proto-flow/src/flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2573,6 +2573,9 @@ impl serde::Serialize for Inference {
if self.numeric.is_some() {
len += 1;
}
if self.array.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("flow.Inference", len)?;
if !self.types.is_empty() {
struct_ser.serialize_field("types", &self.types)?;
Expand Down Expand Up @@ -2600,6 +2603,9 @@ impl serde::Serialize for Inference {
if let Some(v) = self.numeric.as_ref() {
struct_ser.serialize_field("numeric", v)?;
}
if let Some(v) = self.array.as_ref() {
struct_ser.serialize_field("array", v)?;
}
struct_ser.end()
}
}
Expand All @@ -2619,6 +2625,7 @@ impl<'de> serde::Deserialize<'de> for Inference {
"secret",
"exists",
"numeric",
"array",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -2631,6 +2638,7 @@ impl<'de> serde::Deserialize<'de> for Inference {
Secret,
Exists,
Numeric,
Array,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand Down Expand Up @@ -2660,6 +2668,7 @@ impl<'de> serde::Deserialize<'de> for Inference {
"secret" => Ok(GeneratedField::Secret),
"exists" => Ok(GeneratedField::Exists),
"numeric" => Ok(GeneratedField::Numeric),
"array" => Ok(GeneratedField::Array),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand Down Expand Up @@ -2687,6 +2696,7 @@ impl<'de> serde::Deserialize<'de> for Inference {
let mut secret__ = None;
let mut exists__ = None;
let mut numeric__ = None;
let mut array__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Types => {
Expand Down Expand Up @@ -2737,6 +2747,12 @@ impl<'de> serde::Deserialize<'de> for Inference {
}
numeric__ = map_.next_value()?;
}
GeneratedField::Array => {
if array__.is_some() {
return Err(serde::de::Error::duplicate_field("array"));
}
array__ = map_.next_value()?;
}
}
}
Ok(Inference {
Expand All @@ -2748,12 +2764,163 @@ impl<'de> serde::Deserialize<'de> for Inference {
secret: secret__.unwrap_or_default(),
exists: exists__.unwrap_or_default(),
numeric: numeric__,
array: array__,
})
}
}
deserializer.deserialize_struct("flow.Inference", FIELDS, GeneratedVisitor)
}
}
impl serde::Serialize for inference::Array {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let mut len = 0;
if self.min_items != 0 {
len += 1;
}
if self.has_max_items {
len += 1;
}
if self.max_items != 0 {
len += 1;
}
if !self.item_types.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("flow.Inference.Array", len)?;
if self.min_items != 0 {
struct_ser.serialize_field("minItems", &self.min_items)?;
}
if self.has_max_items {
struct_ser.serialize_field("hasMaxItems", &self.has_max_items)?;
}
if self.max_items != 0 {
struct_ser.serialize_field("maxItems", &self.max_items)?;
}
if !self.item_types.is_empty() {
struct_ser.serialize_field("itemTypes", &self.item_types)?;
}
struct_ser.end()
}
}
impl<'de> serde::Deserialize<'de> for inference::Array {
#[allow(deprecated)]
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
const FIELDS: &[&str] = &[
"min_items",
"minItems",
"has_max_items",
"hasMaxItems",
"max_items",
"maxItems",
"item_types",
"itemTypes",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
MinItems,
HasMaxItems,
MaxItems,
ItemTypes,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
where
D: serde::Deserializer<'de>,
{
struct GeneratedVisitor;

impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
type Value = GeneratedField;

fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(formatter, "expected one of: {:?}", &FIELDS)
}

#[allow(unused_variables)]
fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
where
E: serde::de::Error,
{
match value {
"minItems" | "min_items" => Ok(GeneratedField::MinItems),
"hasMaxItems" | "has_max_items" => Ok(GeneratedField::HasMaxItems),
"maxItems" | "max_items" => Ok(GeneratedField::MaxItems),
"itemTypes" | "item_types" => Ok(GeneratedField::ItemTypes),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
}
deserializer.deserialize_identifier(GeneratedVisitor)
}
}
struct GeneratedVisitor;
impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
type Value = inference::Array;

fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("struct flow.Inference.Array")
}

fn visit_map<V>(self, mut map_: V) -> std::result::Result<inference::Array, V::Error>
where
V: serde::de::MapAccess<'de>,
{
let mut min_items__ = None;
let mut has_max_items__ = None;
let mut max_items__ = None;
let mut item_types__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::MinItems => {
if min_items__.is_some() {
return Err(serde::de::Error::duplicate_field("minItems"));
}
min_items__ =
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::HasMaxItems => {
if has_max_items__.is_some() {
return Err(serde::de::Error::duplicate_field("hasMaxItems"));
}
has_max_items__ = Some(map_.next_value()?);
}
GeneratedField::MaxItems => {
if max_items__.is_some() {
return Err(serde::de::Error::duplicate_field("maxItems"));
}
max_items__ =
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::ItemTypes => {
if item_types__.is_some() {
return Err(serde::de::Error::duplicate_field("itemTypes"));
}
item_types__ = Some(map_.next_value()?);
}
}
}
Ok(inference::Array {
min_items: min_items__.unwrap_or_default(),
has_max_items: has_max_items__.unwrap_or_default(),
max_items: max_items__.unwrap_or_default(),
item_types: item_types__.unwrap_or_default(),
})
}
}
deserializer.deserialize_struct("flow.Inference.Array", FIELDS, GeneratedVisitor)
}
}
impl serde::Serialize for inference::Exists {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
Expand Down
6 changes: 6 additions & 0 deletions crates/proto-flow/tests/regression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ fn ex_projections() -> Vec<flow::Projection> {
has_maximum: false,
maximum: 0.0,
}),
array: Some(inference::Array {
min_items: 10,
has_max_items: true,
max_items: 20,
item_types: vec!["null".to_string(), "integer".to_string()],
}),
}),
}]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ expression: json_test(msg)
"numeric": {
"hasMinimum": true,
"minimum": -1000.0
},
"array": {
"minItems": 10,
"hasMaxItems": true,
"maxItems": 20,
"itemTypes": [
"null",
"integer"
]
}
}
}
Expand Down Expand Up @@ -141,6 +150,15 @@ expression: json_test(msg)
"numeric": {
"hasMinimum": true,
"minimum": -1000.0
},
"array": {
"minItems": 10,
"hasMaxItems": true,
"maxItems": 20,
"itemTypes": [
"null",
"integer"
]
}
}
}
Expand Down Expand Up @@ -276,6 +294,15 @@ expression: json_test(msg)
"numeric": {
"hasMinimum": true,
"minimum": -1000.0
},
"array": {
"minItems": 10,
"hasMaxItems": true,
"maxItems": 20,
"itemTypes": [
"null",
"integer"
]
}
}
}
Expand Down
Loading
Loading