From c5f5e817dcd0007f2f14b571f4a772685d001b37 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Wed, 28 Feb 2024 10:54:59 -0800 Subject: [PATCH 01/19] initial derive macros --- Cargo.toml | 1 + derive-macros/Cargo.toml | 20 ++++ derive-macros/src/lib.rs | 160 ++++++++++++++++++++++++++ kernel/Cargo.toml | 3 + kernel/src/actions/deletion_vector.rs | 3 +- kernel/src/actions/mod.rs | 21 +++- 6 files changed, 201 insertions(+), 7 deletions(-) create mode 100644 derive-macros/Cargo.toml create mode 100644 derive-macros/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index d7ef7832c..9ece4d6bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "acceptance", + "derive-macros", "ffi", "kernel", "kernel/examples/*", diff --git a/derive-macros/Cargo.toml b/derive-macros/Cargo.toml new file mode 100644 index 000000000..3d82ddb7d --- /dev/null +++ b/derive-macros/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "derive-macros" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true +readme.workspace = true +version.workspace = true + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = "1" +syn = "2.0" +quote = "1.0" + + diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs new file mode 100644 index 000000000..c989df5b2 --- /dev/null +++ b/derive-macros/src/lib.rs @@ -0,0 +1,160 @@ +use proc_macro2::TokenStream; +use quote::{quote, quote_spanned}; +use syn::spanned::Spanned; +use syn::{ + parse_macro_input, Data, DataStruct, DeriveInput, Fields, GenericArgument, PathArguments, + PathSegment, Type, +}; + +#[proc_macro_derive(Schema)] +pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let input = parse_macro_input!(input as DeriveInput); + let struct_ident = input.ident; + let schema_fields = gen_schema_fields(&input.data); + let output = quote! { + impl crate::actions::GetSchema for #struct_ident { + fn get_schema() -> crate::schema::StructField { + crate::schema::StructField::new( + // TODO: this is wrong, we probably need an attribute for this since it doesn't + // exactly follow the struct name + stringify!(#struct_ident), + crate::schema::StructType::new(vec![ + #schema_fields + ]), + true, // TODO: how to determine nullable + ) + } + } + }; + proc_macro::TokenStream::from(output) +} + +/// Get the generic types out of something like Option or HashMap +fn get_generic_types(path_args: &PathArguments) -> TokenStream { + if let PathArguments::AngleBracketed(gen_args) = path_args { + let types = gen_args.args.iter().map(|arg| { + if let GenericArgument::Type(typ) = arg { + match typ { + Type::Path(ref type_path) => { + if let Some(fin) = type_path.path.segments.iter().last() { + get_data_type(fin) + } else { + panic!("Path for generic type must have last") + } + } + _ => panic!("Generic needs type path"), + } + } else { + panic!("Only support schema for literal generic types") + } + }); + quote! { + #(#types),* + } + } else { + panic!("Can only handle angle bracketed (i.e. <>) generic args") + } +} + +fn get_data_type(path_segment: &PathSegment) -> Option { + let name = path_segment.ident.to_string(); + match name.as_str() { + "String" => Some(quote! { crate::schema::DataType::STRING }), + "i64" => Some(quote! { crate::schema::DataType::LONG }), + "i32" => Some(quote! { crate::schema::DataType::INTEGER }), + "i16" => Some(quote! { crate::schema::DataType::SHORT }), + "char" => Some(quote! { crate::schema::DataType::BYTE }), // TODO: Correct rust type + "f32" => Some(quote! { crate::schema::DataType::FLOAT }), + "f64" => Some(quote! { crate::schema::DataType::DOUBLE }), + "bool" => Some(quote! { crate::schema::DataType::BOOLEAN }), + // TODO: Binary, Date and Timestamp rust types + "HashMap" => { + let types = get_generic_types(&path_segment.arguments); + Some(quote! { + crate::schema::MapType::new(#types, true) // TODO (how to determine if value contains null) + }) + } + "Option" => { + let option_type = get_generic_types(&path_segment.arguments); + if option_type.to_string() == "" { + // This indicates that the thing in the option didn't have a directly known data type + // TODO get_generic_types should probably return an Option + None + } else { + Some(quote! { #option_type }) + } + } + "Vec" => { + let vec_type = get_generic_types(&path_segment.arguments); + Some(quote! { + crate::schema::ArrayType::new(#vec_type, false) // TODO (how to determine if contains null) + }) + } + _ => { + // assume it's a struct type that implements get_schema, will be handled in called + None + } + } +} + +fn gen_schema_fields(data: &Data) -> TokenStream { + let fields = match data { + Data::Struct(DataStruct { + fields: Fields::Named(fields), + .. + }) => &fields.named, + _ => panic!("this derive macro only works on structs with named fields"), + }; + + let schema_fields = fields.iter().map(|field| { + let name = field.ident.as_ref().unwrap(); // we know these are named fields + match field.ty { + Type::Path(ref type_path) => { + if let Some(fin) = type_path.path.segments.iter().last() { + let optional = fin.ident == "Option"; + if let Some(schema_type) = get_data_type(fin) { + quote_spanned! {field.span()=> + crate::schema::StructField::new(stringify!(#name), #schema_type, #optional) + } + } else { + // Return of None means it's some type we don't support directly, just + // assume it implements GetSchema + let ident = if optional { + // if optional, use name of inside thing + // TODO: this is similar to get_generic_types, unify + if let PathArguments::AngleBracketed(gen_args) = &fin.arguments { + assert!(gen_args.args.iter().len() == 1); + match gen_args.args.iter().next().unwrap() { + GenericArgument::Type(typ) => { + match typ { + Type::Path(ref type_path) => { + if let Some(fin) = type_path.path.segments.iter().last() { + fin.ident.clone() + } else { + panic!("Path for generic type must have last") + } + } + _ => panic!("Generic needs type path") + } + } + _ => panic!("Option must be a generic arg"), + } + } else { + panic!("Need angle bracketed Option types"); + } + } else { + fin.ident.clone() + }; + quote_spanned! {field.span()=> + #ident::get_schema() + } + } + } else { + panic!("Cound't get type"); + } + } + _ => { panic!("Can't handle type: {:?}", field.ty); } + } + }); + quote! { #(#schema_fields),* } +} diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index f8e4fe4a3..c63acc49d 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -29,6 +29,9 @@ url = "2" uuid = "1.3.0" z85 = "3.0.5" +# bring in our derive macros +derive-macros = { path = "../derive-macros" } + # used for developer-visibility visibility = "0.1.0" diff --git a/kernel/src/actions/deletion_vector.rs b/kernel/src/actions/deletion_vector.rs index 0238f8c05..a8d80376a 100644 --- a/kernel/src/actions/deletion_vector.rs +++ b/kernel/src/actions/deletion_vector.rs @@ -4,12 +4,13 @@ use std::io::{Cursor, Read}; use std::sync::Arc; use bytes::Bytes; +use derive_macros::Schema; use roaring::RoaringTreemap; use url::Url; use crate::{DeltaResult, Error, FileSystemClient}; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct DeletionVectorDescriptor { /// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p']. pub storage_type: String, diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index a2c73d772..df8c56fa0 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -3,14 +3,23 @@ pub(crate) mod deletion_vector; pub(crate) mod schemas; pub(crate) mod visitors; +use derive_macros::Schema; use std::{collections::HashMap, sync::Arc}; use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; -use crate::{schema::StructType, DeltaResult, EngineData}; +use crate::{ + schema::{StructField, StructType}, + DeltaResult, EngineData, +}; use self::deletion_vector::DeletionVectorDescriptor; -#[derive(Debug, Clone, PartialEq, Eq)] +/// A trait that says you can ask for the [`Schema`] of the implementor +trait GetSchema { + fn get_schema() -> StructField; +} + +#[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Format { /// Name of the encoding for files in this table pub provider: String, @@ -27,7 +36,7 @@ impl Default for Format { } } -#[derive(Debug, Default, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Clone, PartialEq, Eq, Schema)] pub struct Metadata { /// Unique identifier for this table pub id: String, @@ -60,7 +69,7 @@ impl Metadata { } } -#[derive(Default, Debug, Clone, PartialEq, Eq)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Schema)] pub struct Protocol { /// The minimum version of the Delta read protocol that a client must implement /// in order to correctly read this table @@ -85,7 +94,7 @@ impl Protocol { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Add { /// A relative path to a data file from the root of the table or an absolute path to a file /// that should be added to the table. The path is a URI as specified by @@ -144,7 +153,7 @@ impl Add { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Schema)] pub(crate) struct Remove { /// A relative path to a data file from the root of the table or an absolute path to a file /// that should be added to the table. The path is a URI as specified by From f2f6e34e53399a0320384cf06b81987de379f449 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 29 Feb 2024 18:08:02 -0800 Subject: [PATCH 02/19] simpler version --- derive-macros/src/lib.rs | 186 ++++++++++++-------------- kernel/src/actions/deletion_vector.rs | 1 + kernel/src/actions/mod.rs | 11 +- kernel/src/actions/schemas.rs | 46 +++++++ 4 files changed, 131 insertions(+), 113 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index c989df5b2..84a00bc9f 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -1,23 +1,72 @@ -use proc_macro2::TokenStream; +use proc_macro2::{Ident, Spacing, TokenStream, TokenTree}; use quote::{quote, quote_spanned}; use syn::spanned::Spanned; use syn::{ - parse_macro_input, Data, DataStruct, DeriveInput, Fields, GenericArgument, PathArguments, - PathSegment, Type, + parse_macro_input, Attribute, Data, DataStruct, DeriveInput, Fields, Meta, PathArguments, Type, }; -#[proc_macro_derive(Schema)] +// Return the ident to use as the schema name if it's been specified in the attributes of the struct +// TODO: can we simplify this? +fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> Option { + for attr in attrs { + if let Meta::List(list) = &attr.meta { + if let Some(attr_name) = list.path.segments.iter().last() { + if attr_name.ident == "schema" { + // We have some schema(...) attribute, see if we've specified a different name + let tokens: Vec = list.tokens.clone().into_iter().collect(); + if tokens.len() == 3 { + // we only support `name = name` style + if let TokenTree::Ident(ref ident) = tokens[0] { + assert!(ident == "name"); + } else { + panic!("schema(...) only supports schema(name = name)"); + } + // ensure a normal = sign in the specification + if let TokenTree::Punct(ref punct) = tokens[1] { + assert!(punct.as_char() == '='); + assert!(punct.spacing() == Spacing::Alone); + } else { + panic!("schema(...) only supports schema(name = name)"); + } + if let TokenTree::Ident(ref ident) = tokens[2] { + return Some(ident.clone()); + } else { + panic!("schema(...) only supports schema(name = name)"); + } + } + } else { + panic!("Schema only accepts `schema` as an extra attribute") + } + } + } + } + None +} + +#[proc_macro_derive(Schema, attributes(schema))] pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { let input = parse_macro_input!(input as DeriveInput); let struct_ident = input.ident; + let schema_name = + get_schema_name_from_attr(input.attrs.iter()).unwrap_or_else(|| { + // default to the struct name, but lowercased + Ident::new(&struct_ident.to_string().to_lowercase(), struct_ident.span()) + }); + let schema_fields = gen_schema_fields(&input.data); let output = quote! { - impl crate::actions::GetSchema for #struct_ident { + impl crate::actions::schemas::GetSchema for #struct_ident { fn get_schema() -> crate::schema::StructField { + use crate::actions::schemas::GetField; + Self::get_field(stringify!(#schema_name)) + } + } + + impl crate::actions::schemas::GetField for #struct_ident { + fn get_field(name: impl Into) -> crate::schema::StructField { + use crate::actions::schemas::GetField; crate::schema::StructField::new( - // TODO: this is wrong, we probably need an attribute for this since it doesn't - // exactly follow the struct name - stringify!(#struct_ident), + name, crate::schema::StructType::new(vec![ #schema_fields ]), @@ -29,72 +78,26 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream proc_macro::TokenStream::from(output) } -/// Get the generic types out of something like Option or HashMap -fn get_generic_types(path_args: &PathArguments) -> TokenStream { - if let PathArguments::AngleBracketed(gen_args) = path_args { - let types = gen_args.args.iter().map(|arg| { - if let GenericArgument::Type(typ) = arg { - match typ { - Type::Path(ref type_path) => { - if let Some(fin) = type_path.path.segments.iter().last() { - get_data_type(fin) - } else { - panic!("Path for generic type must have last") - } - } - _ => panic!("Generic needs type path"), - } - } else { - panic!("Only support schema for literal generic types") - } - }); - quote! { - #(#types),* - } - } else { - panic!("Can only handle angle bracketed (i.e. <>) generic args") - } -} -fn get_data_type(path_segment: &PathSegment) -> Option { - let name = path_segment.ident.to_string(); - match name.as_str() { - "String" => Some(quote! { crate::schema::DataType::STRING }), - "i64" => Some(quote! { crate::schema::DataType::LONG }), - "i32" => Some(quote! { crate::schema::DataType::INTEGER }), - "i16" => Some(quote! { crate::schema::DataType::SHORT }), - "char" => Some(quote! { crate::schema::DataType::BYTE }), // TODO: Correct rust type - "f32" => Some(quote! { crate::schema::DataType::FLOAT }), - "f64" => Some(quote! { crate::schema::DataType::DOUBLE }), - "bool" => Some(quote! { crate::schema::DataType::BOOLEAN }), - // TODO: Binary, Date and Timestamp rust types - "HashMap" => { - let types = get_generic_types(&path_segment.arguments); - Some(quote! { - crate::schema::MapType::new(#types, true) // TODO (how to determine if value contains null) - }) - } - "Option" => { - let option_type = get_generic_types(&path_segment.arguments); - if option_type.to_string() == "" { - // This indicates that the thing in the option didn't have a directly known data type - // TODO get_generic_types should probably return an Option - None +// turn our struct name into the schema name, goes from snake_case to camelCase +fn get_schema_name(name: &Ident) -> Ident { + let snake_name = name.to_string(); + let mut next_caps = false; + let ret: String = snake_name.chars().filter_map(|c| { + if c == '_' { + next_caps = true; + None + } else { + if next_caps { + next_caps = false; + // This assumes we're basically using ascii, should be okay + Some(c.to_uppercase().next().unwrap()) } else { - Some(quote! { #option_type }) + Some(c) } } - "Vec" => { - let vec_type = get_generic_types(&path_segment.arguments); - Some(quote! { - crate::schema::ArrayType::new(#vec_type, false) // TODO (how to determine if contains null) - }) - } - _ => { - // assume it's a struct type that implements get_schema, will be handled in called - None - } - } + }).collect(); + Ident::new(&ret, name.span()) } fn gen_schema_fields(data: &Data) -> TokenStream { @@ -108,52 +111,27 @@ fn gen_schema_fields(data: &Data) -> TokenStream { let schema_fields = fields.iter().map(|field| { let name = field.ident.as_ref().unwrap(); // we know these are named fields + let name = get_schema_name(name); match field.ty { Type::Path(ref type_path) => { if let Some(fin) = type_path.path.segments.iter().last() { - let optional = fin.ident == "Option"; - if let Some(schema_type) = get_data_type(fin) { + let type_ident = &fin.ident; + if let PathArguments::AngleBracketed(angle_args) = &fin.arguments { quote_spanned! {field.span()=> - crate::schema::StructField::new(stringify!(#name), #schema_type, #optional) + #type_ident::#angle_args::get_field(stringify!(#name)) } } else { - // Return of None means it's some type we don't support directly, just - // assume it implements GetSchema - let ident = if optional { - // if optional, use name of inside thing - // TODO: this is similar to get_generic_types, unify - if let PathArguments::AngleBracketed(gen_args) = &fin.arguments { - assert!(gen_args.args.iter().len() == 1); - match gen_args.args.iter().next().unwrap() { - GenericArgument::Type(typ) => { - match typ { - Type::Path(ref type_path) => { - if let Some(fin) = type_path.path.segments.iter().last() { - fin.ident.clone() - } else { - panic!("Path for generic type must have last") - } - } - _ => panic!("Generic needs type path") - } - } - _ => panic!("Option must be a generic arg"), - } - } else { - panic!("Need angle bracketed Option types"); - } - } else { - fin.ident.clone() - }; quote_spanned! {field.span()=> - #ident::get_schema() + #type_ident::get_field(stringify!(#name)) } } } else { - panic!("Cound't get type"); + panic!("Couldn't get type"); } } - _ => { panic!("Can't handle type: {:?}", field.ty); } + _ => { + panic!("Can't handle type: {:?}", field.ty); + } } }); quote! { #(#schema_fields),* } diff --git a/kernel/src/actions/deletion_vector.rs b/kernel/src/actions/deletion_vector.rs index a8d80376a..b8bdb9b24 100644 --- a/kernel/src/actions/deletion_vector.rs +++ b/kernel/src/actions/deletion_vector.rs @@ -11,6 +11,7 @@ use url::Url; use crate::{DeltaResult, Error, FileSystemClient}; #[derive(Debug, Clone, PartialEq, Eq, Schema)] +#[schema(name = deletionVector)] pub struct DeletionVectorDescriptor { /// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p']. pub storage_type: String, diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index df8c56fa0..8c4edd5f8 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -7,18 +7,10 @@ use derive_macros::Schema; use std::{collections::HashMap, sync::Arc}; use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; -use crate::{ - schema::{StructField, StructType}, - DeltaResult, EngineData, -}; +use crate::{schema::StructType, DeltaResult, EngineData}; use self::deletion_vector::DeletionVectorDescriptor; -/// A trait that says you can ask for the [`Schema`] of the implementor -trait GetSchema { - fn get_schema() -> StructField; -} - #[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Format { /// Name of the encoding for files in this table @@ -37,6 +29,7 @@ impl Default for Format { } #[derive(Debug, Default, Clone, PartialEq, Eq, Schema)] +#[schema(name = metaData)] pub struct Metadata { /// Unique identifier for this table pub id: String, diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index a1cbf890c..d723d493f 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -1,9 +1,55 @@ //! Schema definitions for action types +use std::collections::HashMap; + use lazy_static::lazy_static; use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; +/// A trait that says you can ask for the [`Schema`] of the implementor +pub(crate) trait GetSchema { + fn get_schema() -> StructField; +} + +/// A trait that allows getting a `StructField` based on the provided name and nullability +pub(crate) trait GetField { + fn get_field(name: impl Into) -> StructField; +} + +macro_rules! impl_get_field { + ( $(($rust_type: ty, $kernel_type: expr)), * ) => { + $( + impl GetField for $rust_type { + fn get_field(name: impl Into) -> StructField { + StructField::new(name, $kernel_type, false) + } + } + )* + }; +} + +impl_get_field!( + (String, DataType::STRING), + (i64, DataType::LONG), + (i32, DataType::INTEGER), + (i16, DataType::SHORT), + (char, DataType::BYTE), + (f32, DataType::FLOAT), + (f64, DataType::DOUBLE), + (bool, DataType::BOOLEAN), + (HashMap, MapType::new(DataType::STRING, DataType::STRING, false)), + (HashMap>, MapType::new(DataType::STRING, DataType::STRING, true)), + (Vec, ArrayType::new(DataType::STRING, false)) +); + +impl GetField for Option { + fn get_field(name: impl Into) -> StructField { + let mut inner = T::get_field(name); + inner.nullable = true; + inner + } +} + lazy_static! { // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata pub(crate) static ref METADATA_FIELD: StructField = StructField::new( From 9abe8ea333cd9462eb74772d429945e061526a9b Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 1 Mar 2024 09:47:19 -0800 Subject: [PATCH 03/19] Update derive-macros/src/lib.rs Co-authored-by: Ryan Johnson --- derive-macros/src/lib.rs | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 84a00bc9f..a7973de94 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -14,25 +14,14 @@ fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> if attr_name.ident == "schema" { // We have some schema(...) attribute, see if we've specified a different name let tokens: Vec = list.tokens.clone().into_iter().collect(); - if tokens.len() == 3 { + match tokens[..] { // we only support `name = name` style - if let TokenTree::Ident(ref ident) = tokens[0] { + [TokenTree::Ident(ref ident), TokenTree::Punct(ref punct), TokenTree::Ident(ref ident)] => { assert!(ident == "name"); - } else { - panic!("schema(...) only supports schema(name = name)"); - } - // ensure a normal = sign in the specification - if let TokenTree::Punct(ref punct) = tokens[1] { assert!(punct.as_char() == '='); - assert!(punct.spacing() == Spacing::Alone); - } else { - panic!("schema(...) only supports schema(name = name)"); - } - if let TokenTree::Ident(ref ident) = tokens[2] { return Some(ident.clone()); - } else { - panic!("schema(...) only supports schema(name = name)"); } + _ => panic!("schema(...) only supports schema(name = name)"), } } else { panic!("Schema only accepts `schema` as an extra attribute") From addb7a33b7139e477b4ed063f9961feae756ebaa Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 1 Mar 2024 09:50:26 -0800 Subject: [PATCH 04/19] cleanup get_schema_name_from_attr< --- derive-macros/src/lib.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index a7973de94..9f737b6e6 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -5,8 +5,9 @@ use syn::{ parse_macro_input, Attribute, Data, DataStruct, DeriveInput, Fields, Meta, PathArguments, Type, }; +static SCHEMA_ERR_STR: &str = "schema(...) only supports schema(name = name)"; + // Return the ident to use as the schema name if it's been specified in the attributes of the struct -// TODO: can we simplify this? fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> Option { for attr in attrs { if let Meta::List(list) = &attr.meta { @@ -16,12 +17,13 @@ fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> let tokens: Vec = list.tokens.clone().into_iter().collect(); match tokens[..] { // we only support `name = name` style - [TokenTree::Ident(ref ident), TokenTree::Punct(ref punct), TokenTree::Ident(ref ident)] => { - assert!(ident == "name"); - assert!(punct.as_char() == '='); - return Some(ident.clone()); + [TokenTree::Ident(ref name_ident), TokenTree::Punct(ref punct), TokenTree::Ident(ref schema_ident)] => { + assert!(name_ident == "name", "{}", SCHEMA_ERR_STR); + assert!(punct.as_char() == '=', "{}", SCHEMA_ERR_STR); + assert!(punct.spacing() == Spacing::Alone, "{}", SCHEMA_ERR_STR); + return Some(schema_ident.clone()); } - _ => panic!("schema(...) only supports schema(name = name)"), + _ => panic!("{}", SCHEMA_ERR_STR), } } else { panic!("Schema only accepts `schema` as an extra attribute") From 7f00ac12d22d439f9a1e18cf7bae9ca332992b43 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 1 Mar 2024 09:51:25 -0800 Subject: [PATCH 05/19] to_ascii_uppercase makes more sense --- derive-macros/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 9f737b6e6..9f407707d 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -81,8 +81,8 @@ fn get_schema_name(name: &Ident) -> Ident { } else { if next_caps { next_caps = false; - // This assumes we're basically using ascii, should be okay - Some(c.to_uppercase().next().unwrap()) + // This assumes we're using ascii, should be okay + Some(c.to_ascii_uppercase()) } else { Some(c) } From 9dcc09621e9c86ba796a95426b352a681f539517 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 1 Mar 2024 10:07:19 -0800 Subject: [PATCH 06/19] add test --- derive-macros/src/lib.rs | 4 ++- kernel/src/actions/mod.rs | 54 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 9f407707d..08b0c0e06 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -61,7 +61,9 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream crate::schema::StructType::new(vec![ #schema_fields ]), - true, // TODO: how to determine nullable + // TODO: Ensure correct. By default not nullable, only can be made nullable by + // being wrapped in an Option + false, ) } } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 8c4edd5f8..0658b3497 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -203,3 +203,57 @@ impl Remove { self.deletion_vector.as_ref().map(|dv| dv.unique_id()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{actions::schemas::GetSchema, schema::{StructField, DataType, ArrayType, MapType}}; + + #[test] + fn test_metadata_schema() { + let schema = Metadata::get_schema(); + + let expected = StructField::new( + "metaData", + StructType::new(vec![ + StructField::new("id", DataType::STRING, false), + StructField::new("name", DataType::STRING, true), + StructField::new("description", DataType::STRING, true), + StructField::new( + "format", + StructType::new(vec![ + StructField::new("provider", DataType::STRING, false), + StructField::new( + "options", + MapType::new( + DataType::STRING, + DataType::STRING, + false, + ), + false, + ), + ]), + false, + ), + StructField::new("schemaString", DataType::STRING, false), + StructField::new( + "partitionColumns", + ArrayType::new(DataType::STRING, false), + false, + ), + StructField::new("createdTime", DataType::LONG, true), + StructField::new( + "configuration", + MapType::new( + DataType::STRING, + DataType::STRING, + true, + ), + false, + ), + ]), + false, + ); + assert_eq!(schema, expected); + } +} From 6d00e4396efc1403ad16c47860cd2c9176b8fdd7 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 1 Mar 2024 10:07:40 -0800 Subject: [PATCH 07/19] fmt --- derive-macros/src/lib.rs | 43 ++++++++++++++++++++++----------------- kernel/src/actions/mod.rs | 17 ++++++---------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 08b0c0e06..dd720a6a0 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -17,7 +17,8 @@ fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> let tokens: Vec = list.tokens.clone().into_iter().collect(); match tokens[..] { // we only support `name = name` style - [TokenTree::Ident(ref name_ident), TokenTree::Punct(ref punct), TokenTree::Ident(ref schema_ident)] => { + [TokenTree::Ident(ref name_ident), TokenTree::Punct(ref punct), TokenTree::Ident(ref schema_ident)] => + { assert!(name_ident == "name", "{}", SCHEMA_ERR_STR); assert!(punct.as_char() == '=', "{}", SCHEMA_ERR_STR); assert!(punct.spacing() == Spacing::Alone, "{}", SCHEMA_ERR_STR); @@ -38,11 +39,13 @@ fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { let input = parse_macro_input!(input as DeriveInput); let struct_ident = input.ident; - let schema_name = - get_schema_name_from_attr(input.attrs.iter()).unwrap_or_else(|| { - // default to the struct name, but lowercased - Ident::new(&struct_ident.to_string().to_lowercase(), struct_ident.span()) - }); + let schema_name = get_schema_name_from_attr(input.attrs.iter()).unwrap_or_else(|| { + // default to the struct name, but lowercased + Ident::new( + &struct_ident.to_string().to_lowercase(), + struct_ident.span(), + ) + }); let schema_fields = gen_schema_fields(&input.data); let output = quote! { @@ -71,25 +74,27 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream proc_macro::TokenStream::from(output) } - // turn our struct name into the schema name, goes from snake_case to camelCase fn get_schema_name(name: &Ident) -> Ident { let snake_name = name.to_string(); let mut next_caps = false; - let ret: String = snake_name.chars().filter_map(|c| { - if c == '_' { - next_caps = true; - None - } else { - if next_caps { - next_caps = false; - // This assumes we're using ascii, should be okay - Some(c.to_ascii_uppercase()) + let ret: String = snake_name + .chars() + .filter_map(|c| { + if c == '_' { + next_caps = true; + None } else { - Some(c) + if next_caps { + next_caps = false; + // This assumes we're using ascii, should be okay + Some(c.to_ascii_uppercase()) + } else { + Some(c) + } } - } - }).collect(); + }) + .collect(); Ident::new(&ret, name.span()) } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 0658b3497..b6bdd59be 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -207,7 +207,10 @@ impl Remove { #[cfg(test)] mod tests { use super::*; - use crate::{actions::schemas::GetSchema, schema::{StructField, DataType, ArrayType, MapType}}; + use crate::{ + actions::schemas::GetSchema, + schema::{ArrayType, DataType, MapType, StructField}, + }; #[test] fn test_metadata_schema() { @@ -225,11 +228,7 @@ mod tests { StructField::new("provider", DataType::STRING, false), StructField::new( "options", - MapType::new( - DataType::STRING, - DataType::STRING, - false, - ), + MapType::new(DataType::STRING, DataType::STRING, false), false, ), ]), @@ -244,11 +243,7 @@ mod tests { StructField::new("createdTime", DataType::LONG, true), StructField::new( "configuration", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), + MapType::new(DataType::STRING, DataType::STRING, true), false, ), ]), From 48219eef82613c36bd31f797b7d23f7332404967 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 1 Mar 2024 12:22:07 -0800 Subject: [PATCH 08/19] clippy --- derive-macros/src/lib.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index dd720a6a0..cb94d7a87 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -84,14 +84,12 @@ fn get_schema_name(name: &Ident) -> Ident { if c == '_' { next_caps = true; None + } else if next_caps { + next_caps = false; + // This assumes we're using ascii, should be okay + Some(c.to_ascii_uppercase()) } else { - if next_caps { - next_caps = false; - // This assumes we're using ascii, should be okay - Some(c.to_ascii_uppercase()) - } else { - Some(c) - } + Some(c) } }) .collect(); From 5e621486b8929e59cd238639221cb3ade1cdcc20 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 5 Mar 2024 14:52:09 -0800 Subject: [PATCH 09/19] make init static (at cost of returning a ref) --- derive-macros/src/lib.rs | 7 +++++-- kernel/src/actions/mod.rs | 6 +++--- kernel/src/actions/schemas.rs | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index cb94d7a87..b00a331b5 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -50,9 +50,12 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream let schema_fields = gen_schema_fields(&input.data); let output = quote! { impl crate::actions::schemas::GetSchema for #struct_ident { - fn get_schema() -> crate::schema::StructField { + fn get_schema() -> &'static crate::schema::StructField { use crate::actions::schemas::GetField; - Self::get_field(stringify!(#schema_name)) + static SCHEMA_LOCK: std::sync::OnceLock = std::sync::OnceLock::new(); + SCHEMA_LOCK.get_or_init(|| { + Self::get_field(stringify!(#schema_name)) + }) } } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index b6bdd59be..d09125026 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -9,7 +9,7 @@ use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; use crate::{schema::StructType, DeltaResult, EngineData}; -use self::deletion_vector::DeletionVectorDescriptor; +use self::{deletion_vector::DeletionVectorDescriptor, schemas::GetSchema}; #[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Format { @@ -51,8 +51,8 @@ pub struct Metadata { impl Metadata { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { - let schema = StructType::new(vec![crate::actions::schemas::METADATA_FIELD.clone()]); let mut visitor = MetadataVisitor::default(); + let schema = StructType::new(vec![Metadata::get_schema().clone()]); data.extract(Arc::new(schema), &mut visitor)?; Ok(visitor.metadata) } @@ -249,6 +249,6 @@ mod tests { ]), false, ); - assert_eq!(schema, expected); + assert_eq!(schema, &expected); } } diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index d723d493f..21dd9f853 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -8,7 +8,7 @@ use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; /// A trait that says you can ask for the [`Schema`] of the implementor pub(crate) trait GetSchema { - fn get_schema() -> StructField; + fn get_schema() -> &'static StructField; } /// A trait that allows getting a `StructField` based on the provided name and nullability From 2cd6027284ba17d0778278ed6f400e6c3d65d06a Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 5 Mar 2024 14:59:36 -0800 Subject: [PATCH 10/19] better static via returning SchemaRef --- derive-macros/src/lib.rs | 10 ++++++---- kernel/src/actions/mod.rs | 9 ++++----- kernel/src/actions/schemas.rs | 4 ++-- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index b00a331b5..9875f72e2 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -50,12 +50,14 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream let schema_fields = gen_schema_fields(&input.data); let output = quote! { impl crate::actions::schemas::GetSchema for #struct_ident { - fn get_schema() -> &'static crate::schema::StructField { + fn get_schema() -> crate::schema::SchemaRef { use crate::actions::schemas::GetField; - static SCHEMA_LOCK: std::sync::OnceLock = std::sync::OnceLock::new(); + static SCHEMA_LOCK: std::sync::OnceLock = std::sync::OnceLock::new(); SCHEMA_LOCK.get_or_init(|| { - Self::get_field(stringify!(#schema_name)) - }) + std::sync::Arc::new(crate::schema::StructType::new(vec![ + Self::get_field(stringify!(#schema_name)) + ])) + }).clone() // cheap clone, it's an Arc } } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index d09125026..a4568adff 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -52,8 +52,7 @@ pub struct Metadata { impl Metadata { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = MetadataVisitor::default(); - let schema = StructType::new(vec![Metadata::get_schema().clone()]); - data.extract(Arc::new(schema), &mut visitor)?; + data.extract(Metadata::get_schema(), &mut visitor)?; Ok(visitor.metadata) } @@ -216,7 +215,7 @@ mod tests { fn test_metadata_schema() { let schema = Metadata::get_schema(); - let expected = StructField::new( + let expected = Arc::new(StructType::new(vec![StructField::new( "metaData", StructType::new(vec![ StructField::new("id", DataType::STRING, false), @@ -248,7 +247,7 @@ mod tests { ), ]), false, - ); - assert_eq!(schema, &expected); + )])); + assert_eq!(schema, expected); } } diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index 21dd9f853..046f134b8 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -4,11 +4,11 @@ use std::collections::HashMap; use lazy_static::lazy_static; -use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; +use crate::schema::{ArrayType, DataType, MapType, SchemaRef, StructField, StructType}; /// A trait that says you can ask for the [`Schema`] of the implementor pub(crate) trait GetSchema { - fn get_schema() -> &'static StructField; + fn get_schema() -> SchemaRef; } /// A trait that allows getting a `StructField` based on the provided name and nullability From 746b6f3cf618291d5ed0c7408b7154fb7eb93ae8 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 5 Mar 2024 16:03:47 -0800 Subject: [PATCH 11/19] use get_schema in actions/mod.rs --- kernel/src/actions/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index a4568adff..c5e71b627 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -4,7 +4,7 @@ pub(crate) mod schemas; pub(crate) mod visitors; use derive_macros::Schema; -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; use crate::{schema::StructType, DeltaResult, EngineData}; @@ -80,8 +80,7 @@ pub struct Protocol { impl Protocol { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = ProtocolVisitor::default(); - let schema = StructType::new(vec![crate::actions::schemas::PROTOCOL_FIELD.clone()]); - data.extract(Arc::new(schema), &mut visitor)?; + data.extract(Protocol::get_schema(), &mut visitor)?; Ok(visitor.protocol) } } @@ -135,8 +134,7 @@ impl Add { /// Since we always want to parse multiple adds from data, we return a `Vec` pub fn parse_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = AddVisitor::default(); - let schema = StructType::new(vec![crate::actions::schemas::ADD_FIELD.clone()]); - data.extract(Arc::new(schema), &mut visitor)?; + data.extract(Add::get_schema(), &mut visitor)?; Ok(visitor.adds) } @@ -205,6 +203,8 @@ impl Remove { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::{ actions::schemas::GetSchema, From 150b024d0ab81e566bf0209e93ab739c704657a0 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 5 Mar 2024 17:23:18 -0800 Subject: [PATCH 12/19] use dervied schemas, and fix remove schema --- kernel/src/actions/mod.rs | 60 ++++++++++++++++++++++++++++++++-- kernel/src/actions/visitors.rs | 16 ++++----- kernel/src/scan/file_stream.rs | 19 ++++++----- kernel/src/snapshot.rs | 5 +-- 4 files changed, 79 insertions(+), 21 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index c5e71b627..0103aaa10 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -152,13 +152,13 @@ pub(crate) struct Remove { /// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt pub(crate) path: String, + /// The time this logical file was created, as milliseconds since the epoch. + pub(crate) deletion_timestamp: Option, + /// When `false` the logical file must already be present in the table or the records /// in the added file must be contained in one or more remove actions in the same version. pub(crate) data_change: bool, - /// The time this logical file was created, as milliseconds since the epoch. - pub(crate) deletion_timestamp: Option, - /// When true the fields `partition_values`, `size`, and `tags` are present pub(crate) extended_file_metadata: Option, @@ -250,4 +250,58 @@ mod tests { )])); assert_eq!(schema, expected); } + + fn tags_field() -> StructField { + StructField::new( + "tags", + MapType::new(DataType::STRING, DataType::STRING, true), + true, + ) + } + + fn partition_values_field() -> StructField { + StructField::new( + "partitionValues", + MapType::new(DataType::STRING, DataType::STRING, true), + true, + ) + } + + fn deletion_vector_field() -> StructField { + StructField::new( + "deletionVector", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("storageType", DataType::STRING, false), + StructField::new("pathOrInlineDv", DataType::STRING, false), + StructField::new("offset", DataType::INTEGER, true), + StructField::new("sizeInBytes", DataType::INTEGER, false), + StructField::new("cardinality", DataType::LONG, false), + ]))), + true, + ) + } + + #[test] + fn test_remove_schema() { + let schema = Remove::get_schema(); + let expected = Arc::new(StructType::new(vec![ + StructField::new( + "remove", + StructType::new(vec![ + StructField::new("path", DataType::STRING, false), + StructField::new("deletionTimestamp", DataType::LONG, true), + StructField::new("dataChange", DataType::BOOLEAN, false), + StructField::new("extendedFileMetadata", DataType::BOOLEAN, true), + partition_values_field(), + StructField::new("size", DataType::LONG, true), + tags_field(), + deletion_vector_field(), + StructField::new("baseRowId", DataType::LONG, true), + StructField::new("defaultRowCommitVersion", DataType::LONG, true), + ]), + false, + ) + ])); + assert_eq!(schema, expected); + } } diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 3fb0a1589..a12d7ee31 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -196,20 +196,20 @@ impl RemoveVisitor { let size: Option = getters[5].get_opt(row_index, "remove.size")?; - // TODO(nick) stats are skipped in getters[6] and tags are skipped in getters[7] + // TODO(nick) tags are skipped in getters[6] let deletion_vector = if let Some(storage_type) = - getters[8].get_opt(row_index, "remove.deletionVector.storageType")? + getters[7].get_opt(row_index, "remove.deletionVector.storageType")? { // there is a storageType, so the whole DV must be there let path_or_inline_dv: String = - getters[9].get(row_index, "remove.deletionVector.pathOrInlineDv")?; + getters[8].get(row_index, "remove.deletionVector.pathOrInlineDv")?; let offset: Option = - getters[10].get_opt(row_index, "remove.deletionVector.offset")?; + getters[9].get_opt(row_index, "remove.deletionVector.offset")?; let size_in_bytes: i32 = - getters[11].get(row_index, "remove.deletionVector.sizeInBytes")?; + getters[10].get(row_index, "remove.deletionVector.sizeInBytes")?; let cardinality: i64 = - getters[12].get(row_index, "remove.deletionVector.cardinality")?; + getters[11].get(row_index, "remove.deletionVector.cardinality")?; Some(DeletionVectorDescriptor { storage_type, path_or_inline_dv, @@ -221,9 +221,9 @@ impl RemoveVisitor { None }; - let base_row_id: Option = getters[13].get_opt(row_index, "remove.baseRowId")?; + let base_row_id: Option = getters[12].get_opt(row_index, "remove.baseRowId")?; let default_row_commit_version: Option = - getters[14].get_opt(row_index, "remove.defaultRowCommitVersion")?; + getters[13].get_opt(row_index, "remove.defaultRowCommitVersion")?; Ok(Remove { path, diff --git a/kernel/src/scan/file_stream.rs b/kernel/src/scan/file_stream.rs index 372140819..f7878febf 100644 --- a/kernel/src/scan/file_stream.rs +++ b/kernel/src/scan/file_stream.rs @@ -5,6 +5,7 @@ use either::Either; use tracing::debug; use super::data_skipping::DataSkippingFilter; +use crate::actions::schemas::{GetSchema, GetField}; use crate::actions::{visitors::AddVisitor, visitors::RemoveVisitor, Add, Remove}; use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::Expression; @@ -80,18 +81,20 @@ impl LogReplayScanner { None => actions, }; - let schema_to_use = StructType::new(if is_log_batch { - vec![ - crate::actions::schemas::ADD_FIELD.clone(), - crate::actions::schemas::REMOVE_FIELD.clone(), - ] + let schema_to_use = if is_log_batch { + Arc::new(StructType::new( + vec![ + Option::::get_field("add"), + Option::::get_field("remove"), + ] + )) } else { // All checkpoint actions are already reconciled and Remove actions in checkpoint files // only serve as tombstones for vacuum jobs. So no need to load them here. - vec![crate::actions::schemas::ADD_FIELD.clone()] - }); + Add::get_schema() + }; let mut visitor = AddRemoveVisitor::default(); - actions.extract(Arc::new(schema_to_use), &mut visitor)?; + actions.extract(schema_to_use, &mut visitor)?; for remove in visitor.removes.into_iter() { self.seen diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index f7df065a6..c91886d56 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -9,6 +9,7 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use url::Url; +use crate::actions::schemas::GetField; use crate::actions::{Metadata, Protocol}; use crate::path::LogPath; use crate::schema::{Schema, SchemaRef, StructType}; @@ -67,8 +68,8 @@ impl LogSegment { engine_interface: &dyn EngineInterface, ) -> DeltaResult> { let schema = StructType::new(vec![ - crate::actions::schemas::METADATA_FIELD.clone(), - crate::actions::schemas::PROTOCOL_FIELD.clone(), + Option::::get_field("metaData"), + Option::::get_field("protocol"), ]); let data_batches = self.replay(engine_interface, Arc::new(schema), None)?; let mut metadata_opt: Option = None; From c4e037e6d6b9bd5a32109f587f83e9f41caddb61 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 5 Mar 2024 17:34:26 -0800 Subject: [PATCH 13/19] tags is optional on Add, use get_schema everywhere --- kernel/src/actions/mod.rs | 48 ++++++++++++---------------------- kernel/src/actions/visitors.rs | 13 ++++----- kernel/src/scan/file_stream.rs | 12 ++++----- kernel/src/scan/mod.rs | 7 ++--- 4 files changed, 31 insertions(+), 49 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 0103aaa10..77684a569 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -113,7 +113,7 @@ pub struct Add { pub stats: Option, /// Map containing metadata about this logical file. - pub tags: HashMap>, + pub tags: Option>>, /// Information about deletion vector (DV) associated with this add action pub deletion_vector: Option, @@ -184,18 +184,6 @@ pub(crate) struct Remove { } impl Remove { - // _try_new_from_data for now, to avoid warning, probably will need at some point - // pub(crate) fn _try_new_from_data( - // data: &dyn EngineData, - // ) -> DeltaResult { - // let mut visitor = Visitor::new(visit_remove); - // let schema = StructType::new(vec![crate::actions::schemas::REMOVE_FIELD.clone()]); - // data.extract(Arc::new(schema), &mut visitor)?; - // visitor - // .extracted - // .unwrap_or_else(|| Err(Error::generic("Didn't get expected remove"))) - // } - pub(crate) fn dv_unique_id(&self) -> Option { self.deletion_vector.as_ref().map(|dv| dv.unique_id()) } @@ -284,24 +272,22 @@ mod tests { #[test] fn test_remove_schema() { let schema = Remove::get_schema(); - let expected = Arc::new(StructType::new(vec![ - StructField::new( - "remove", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new("deletionTimestamp", DataType::LONG, true), - StructField::new("dataChange", DataType::BOOLEAN, false), - StructField::new("extendedFileMetadata", DataType::BOOLEAN, true), - partition_values_field(), - StructField::new("size", DataType::LONG, true), - tags_field(), - deletion_vector_field(), - StructField::new("baseRowId", DataType::LONG, true), - StructField::new("defaultRowCommitVersion", DataType::LONG, true), - ]), - false, - ) - ])); + let expected = Arc::new(StructType::new(vec![StructField::new( + "remove", + StructType::new(vec![ + StructField::new("path", DataType::STRING, false), + StructField::new("deletionTimestamp", DataType::LONG, true), + StructField::new("dataChange", DataType::BOOLEAN, false), + StructField::new("extendedFileMetadata", DataType::BOOLEAN, true), + partition_values_field(), + StructField::new("size", DataType::LONG, true), + tags_field(), + deletion_vector_field(), + StructField::new("baseRowId", DataType::LONG, true), + StructField::new("defaultRowCommitVersion", DataType::LONG, true), + ]), + false, + )])); assert_eq!(schema, expected); } } diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index a12d7ee31..2ef01256b 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -154,7 +154,7 @@ impl AddVisitor { modification_time, data_change, stats, - tags: HashMap::new(), + tags: None, deletion_vector, base_row_id, default_row_commit_version, @@ -262,8 +262,7 @@ mod tests { use super::*; use crate::{ - actions::schemas::log_schema, - schema::StructType, + actions::schemas::{log_schema, GetSchema}, simple_client::{data::SimpleData, json::SimpleJsonHandler, SimpleClient}, EngineData, EngineInterface, JsonHandler, }; @@ -356,11 +355,9 @@ mod tests { let batch = json_handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); - let add_schema = StructType::new(vec![crate::actions::schemas::ADD_FIELD.clone()]); + let add_schema = Add::get_schema(); let mut add_visitor = AddVisitor::default(); - batch - .extract(Arc::new(add_schema), &mut add_visitor) - .unwrap(); + batch.extract(add_schema, &mut add_visitor).unwrap(); let add1 = Add { path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet".into(), partition_values: HashMap::from([ @@ -371,7 +368,7 @@ mod tests { modification_time: 1670892998135, data_change: true, stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}".into()), - tags: HashMap::new(), + tags: None, deletion_vector: None, base_row_id: None, default_row_commit_version: None, diff --git a/kernel/src/scan/file_stream.rs b/kernel/src/scan/file_stream.rs index f7878febf..5dbf8254d 100644 --- a/kernel/src/scan/file_stream.rs +++ b/kernel/src/scan/file_stream.rs @@ -5,7 +5,7 @@ use either::Either; use tracing::debug; use super::data_skipping::DataSkippingFilter; -use crate::actions::schemas::{GetSchema, GetField}; +use crate::actions::schemas::{GetField, GetSchema}; use crate::actions::{visitors::AddVisitor, visitors::RemoveVisitor, Add, Remove}; use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::Expression; @@ -82,12 +82,10 @@ impl LogReplayScanner { }; let schema_to_use = if is_log_batch { - Arc::new(StructType::new( - vec![ - Option::::get_field("add"), - Option::::get_field("remove"), - ] - )) + Arc::new(StructType::new(vec![ + Option::::get_field("add"), + Option::::get_field("remove"), + ])) } else { // All checkpoint actions are already reconciled and Remove actions in checkpoint files // only serve as tombstones for vacuum jobs. So no need to load them here. diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 6dc94d41d..72d9414f8 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -3,7 +3,8 @@ use std::sync::Arc; use itertools::Itertools; use self::file_stream::log_replay_iter; -use crate::actions::Add; +use crate::actions::schemas::GetField; +use crate::actions::{Add, Remove}; use crate::expressions::{Expression, Scalar}; use crate::schema::{DataType, SchemaRef, StructType}; use crate::snapshot::Snapshot; @@ -129,8 +130,8 @@ impl Scan { engine_interface: &dyn EngineInterface, ) -> DeltaResult>> { let action_schema = Arc::new(StructType::new(vec![ - crate::actions::schemas::ADD_FIELD.clone(), - crate::actions::schemas::REMOVE_FIELD.clone(), + Option::::get_field("add"), + Option::::get_field("remove"), ])); let log_iter = self.snapshot.log_segment.replay( From c7785c2416ebca534591975258a7bcea006ea64e Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 5 Mar 2024 17:48:05 -0800 Subject: [PATCH 14/19] remove leftover code in schemas.rs --- kernel/src/actions/mod.rs | 24 +++ kernel/src/actions/schemas.rs | 253 +------------------------------ kernel/src/actions/visitors.rs | 7 +- kernel/src/client/json.rs | 6 +- kernel/src/simple_client/data.rs | 4 +- 5 files changed, 34 insertions(+), 260 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 77684a569..5d10149d6 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -189,6 +189,30 @@ impl Remove { } } +use crate::actions::schemas::GetField; +use lazy_static::lazy_static; + +lazy_static! { + static ref LOG_SCHEMA: StructType = StructType::new( + vec![ + Option::::get_field("add"), + Option::::get_field("remove"), + Option::::get_field("metaData"), + Option::::get_field("protocol"), + // We don't support the following actions yet + //Option::get_field("cdc"), + //Option::get_field("commitInfo"), + //Option::get_field("domainMetadata"), + //Option::get_field("txn"), + ] + ); +} + +#[cfg(test)] +pub(crate) fn get_log_schema() -> &'static StructType { + &LOG_SCHEMA +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index 046f134b8..c104e52a1 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -2,9 +2,7 @@ use std::collections::HashMap; -use lazy_static::lazy_static; - -use crate::schema::{ArrayType, DataType, MapType, SchemaRef, StructField, StructType}; +use crate::schema::{ArrayType, DataType, MapType, SchemaRef, StructField}; /// A trait that says you can ask for the [`Schema`] of the implementor pub(crate) trait GetSchema { @@ -49,252 +47,3 @@ impl GetField for Option { inner } } - -lazy_static! { - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata - pub(crate) static ref METADATA_FIELD: StructField = StructField::new( - "metaData", - StructType::new(vec![ - StructField::new("id", DataType::STRING, false), - StructField::new("name", DataType::STRING, true), - StructField::new("description", DataType::STRING, true), - StructField::new( - "format", - StructType::new(vec![ - StructField::new("provider", DataType::STRING, false), - StructField::new( - "options", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - true, - ), - ]), - false, - ), - StructField::new("schemaString", DataType::STRING, false), - StructField::new( - "partitionColumns", - ArrayType::new(DataType::STRING, false), - false, - ), - StructField::new("createdTime", DataType::LONG, true), - StructField::new( - "configuration", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - false, - ), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#protocol-evolution - pub(crate) static ref PROTOCOL_FIELD: StructField = StructField::new( - "protocol", - StructType::new(vec![ - StructField::new("minReaderVersion", DataType::INTEGER, false), - StructField::new("minWriterVersion", DataType::INTEGER, false), - StructField::new( - "readerFeatures", - ArrayType::new(DataType::STRING, false), - true, - ), - StructField::new( - "writerFeatures", - ArrayType::new(DataType::STRING, false), - true, - ), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#commit-provenance-information - static ref COMMIT_INFO_FIELD: StructField = StructField::new( - "commitInfo", - StructType::new(vec![ - StructField::new("timestamp", DataType::LONG, false), - StructField::new("operation", DataType::STRING, false), - StructField::new("isolationLevel", DataType::STRING, true), - StructField::new("isBlindAppend", DataType::BOOLEAN, true), - StructField::new("txnId", DataType::STRING, true), - StructField::new("readVersion", DataType::LONG, true), - StructField::new( - "operationParameters", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - true, - ), - StructField::new( - "operationMetrics", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - true, - ), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file - pub(crate) static ref ADD_FIELD: StructField = StructField::new( - "add", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - partition_values_field(), - StructField::new("size", DataType::LONG, false), - StructField::new("modificationTime", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), - StructField::new("stats", DataType::STRING, true), - tags_field(), - deletion_vector_field(), - StructField::new("baseRowId", DataType::LONG, true), - StructField::new("defaultRowCommitVersion", DataType::LONG, true), - StructField::new("clusteringProvider", DataType::STRING, true), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file - pub(crate) static ref REMOVE_FIELD: StructField = StructField::new( - "remove", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new("deletionTimestamp", DataType::LONG, true), - StructField::new("dataChange", DataType::BOOLEAN, false), - StructField::new("extendedFileMetadata", DataType::BOOLEAN, true), - partition_values_field(), - StructField::new("size", DataType::LONG, true), - StructField::new("stats", DataType::STRING, true), - tags_field(), - deletion_vector_field(), - StructField::new("baseRowId", DataType::LONG, true), - StructField::new("defaultRowCommitVersion", DataType::LONG, true), - ]), - true, - ); - static ref REMOVE_FIELD_CHECKPOINT: StructField = StructField::new( - "remove", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new("deletionTimestamp", DataType::LONG, true), - StructField::new("dataChange", DataType::BOOLEAN, false), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file - static ref CDC_FIELD: StructField = StructField::new( - "cdc", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - partition_values_field(), - StructField::new("size", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), - tags_field(), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers - static ref TXN_FIELD: StructField = StructField::new( - "txn", - StructType::new(vec![ - StructField::new("appId", DataType::STRING, false), - StructField::new("version", DataType::LONG, false), - StructField::new("lastUpdated", DataType::LONG, true), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#domain-metadata - static ref DOMAIN_METADATA_FIELD: StructField = StructField::new( - "domainMetadata", - StructType::new(vec![ - StructField::new("domain", DataType::STRING, false), - StructField::new( - "configuration", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - false, - ), - StructField::new("removed", DataType::BOOLEAN, false), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoint-metadata - static ref CHECKPOINT_METADATA_FIELD: StructField = StructField::new( - "checkpointMetadata", - StructType::new(vec![ - StructField::new("flavor", DataType::STRING, false), - tags_field(), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information - static ref SIDECAR_FIELD: StructField = StructField::new( - "sidecar", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new("sizeInBytes", DataType::LONG, false), - StructField::new("modificationTime", DataType::LONG, false), - StructField::new("type", DataType::STRING, false), - tags_field(), - ]), - true, - ); - - static ref LOG_SCHEMA: StructType = StructType::new( - vec![ - ADD_FIELD.clone(), - CDC_FIELD.clone(), - COMMIT_INFO_FIELD.clone(), - DOMAIN_METADATA_FIELD.clone(), - METADATA_FIELD.clone(), - PROTOCOL_FIELD.clone(), - REMOVE_FIELD.clone(), - TXN_FIELD.clone(), - ] - ); -} - -fn tags_field() -> StructField { - StructField::new( - "tags", - MapType::new(DataType::STRING, DataType::STRING, true), - true, - ) -} - -fn partition_values_field() -> StructField { - StructField::new( - "partitionValues", - MapType::new(DataType::STRING, DataType::STRING, true), - false, - ) -} - -fn deletion_vector_field() -> StructField { - StructField::new( - "deletionVector", - DataType::Struct(Box::new(StructType::new(vec![ - StructField::new("storageType", DataType::STRING, false), - StructField::new("pathOrInlineDv", DataType::STRING, false), - StructField::new("offset", DataType::INTEGER, true), - StructField::new("sizeInBytes", DataType::INTEGER, false), - StructField::new("cardinality", DataType::LONG, false), - ]))), - true, - ) -} - -#[cfg(test)] -pub(crate) fn log_schema() -> &'static StructType { - &LOG_SCHEMA -} diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 2ef01256b..1625b2e66 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -262,7 +262,8 @@ mod tests { use super::*; use crate::{ - actions::schemas::{log_schema, GetSchema}, + actions::get_log_schema, + actions::schemas::GetSchema, simple_client::{data::SimpleData, json::SimpleJsonHandler, SimpleClient}, EngineData, EngineInterface, JsonHandler, }; @@ -285,7 +286,7 @@ mod tests { r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, ] .into(); - let output_schema = Arc::new(log_schema().clone()); + let output_schema = Arc::new(get_log_schema().clone()); let parsed = handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); @@ -351,7 +352,7 @@ mod tests { r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#, ] .into(); - let output_schema = Arc::new(log_schema().clone()); + let output_schema = Arc::new(get_log_schema().clone()); let batch = json_handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); diff --git a/kernel/src/client/json.rs b/kernel/src/client/json.rs index 2a95a13df..87cb21f2f 100644 --- a/kernel/src/client/json.rs +++ b/kernel/src/client/json.rs @@ -234,7 +234,7 @@ mod tests { use object_store::{local::LocalFileSystem, ObjectStore}; use super::*; - use crate::{actions::schemas::log_schema, executor::tokio::TokioBackgroundExecutor}; + use crate::{actions::get_log_schema, executor::tokio::TokioBackgroundExecutor}; fn string_array_to_engine_data(string_array: StringArray) -> Box { let string_field = Arc::new(Field::new("a", DataType::Utf8, true)); @@ -255,7 +255,7 @@ mod tests { r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#, r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, ]); - let output_schema = Arc::new(log_schema().clone()); + let output_schema = Arc::new(get_log_schema().clone()); let batch = handler .parse_json(string_array_to_engine_data(json_strings), output_schema) @@ -282,7 +282,7 @@ mod tests { }]; let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); - let physical_schema = Arc::new(ArrowSchema::try_from(log_schema()).unwrap()); + let physical_schema = Arc::new(ArrowSchema::try_from(get_log_schema()).unwrap()); let data: Vec = handler .read_json_files(files, Arc::new(physical_schema.try_into().unwrap()), None) .unwrap() diff --git a/kernel/src/simple_client/data.rs b/kernel/src/simple_client/data.rs index 941c30a48..72da526aa 100644 --- a/kernel/src/simple_client/data.rs +++ b/kernel/src/simple_client/data.rs @@ -317,7 +317,7 @@ mod tests { use crate::actions::Metadata; use crate::DeltaResult; use crate::{ - actions::schemas::log_schema, + actions::get_log_schema, simple_client::{data::SimpleData, SimpleClient}, EngineData, EngineInterface, }; @@ -338,7 +338,7 @@ mod tests { r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, ] .into(); - let output_schema = Arc::new(log_schema().clone()); + let output_schema = Arc::new(get_log_schema().clone()); let parsed = handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); From b51b7b1b65247b5e6ddf177203f55ac136570528 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 12 Mar 2024 12:00:41 -0700 Subject: [PATCH 15/19] switch to using "project" to get schemas --- derive-macros/Cargo.toml | 2 +- derive-macros/src/lib.rs | 55 +------------------- kernel/src/actions/mod.rs | 87 ++++++++++++++++++-------------- kernel/src/actions/schemas.rs | 7 +-- kernel/src/actions/visitors.rs | 7 +-- kernel/src/scan/file_stream.rs | 12 ++--- kernel/src/scan/mod.rs | 8 +-- kernel/src/schema.rs | 35 +++++++++++++ kernel/src/simple_client/data.rs | 32 +++++++++--- 9 files changed, 125 insertions(+), 120 deletions(-) diff --git a/derive-macros/Cargo.toml b/derive-macros/Cargo.toml index 3d82ddb7d..e103333b6 100644 --- a/derive-macros/Cargo.toml +++ b/derive-macros/Cargo.toml @@ -14,7 +14,7 @@ proc-macro = true [dependencies] proc-macro2 = "1" -syn = "2.0" +syn = { version = "2.0", features = ["extra-traits"] } quote = "1.0" diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 9875f72e2..f9665def5 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -1,66 +1,15 @@ -use proc_macro2::{Ident, Spacing, TokenStream, TokenTree}; +use proc_macro2::{Ident, TokenStream}; use quote::{quote, quote_spanned}; use syn::spanned::Spanned; -use syn::{ - parse_macro_input, Attribute, Data, DataStruct, DeriveInput, Fields, Meta, PathArguments, Type, -}; - -static SCHEMA_ERR_STR: &str = "schema(...) only supports schema(name = name)"; - -// Return the ident to use as the schema name if it's been specified in the attributes of the struct -fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> Option { - for attr in attrs { - if let Meta::List(list) = &attr.meta { - if let Some(attr_name) = list.path.segments.iter().last() { - if attr_name.ident == "schema" { - // We have some schema(...) attribute, see if we've specified a different name - let tokens: Vec = list.tokens.clone().into_iter().collect(); - match tokens[..] { - // we only support `name = name` style - [TokenTree::Ident(ref name_ident), TokenTree::Punct(ref punct), TokenTree::Ident(ref schema_ident)] => - { - assert!(name_ident == "name", "{}", SCHEMA_ERR_STR); - assert!(punct.as_char() == '=', "{}", SCHEMA_ERR_STR); - assert!(punct.spacing() == Spacing::Alone, "{}", SCHEMA_ERR_STR); - return Some(schema_ident.clone()); - } - _ => panic!("{}", SCHEMA_ERR_STR), - } - } else { - panic!("Schema only accepts `schema` as an extra attribute") - } - } - } - } - None -} +use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields, PathArguments, Type}; #[proc_macro_derive(Schema, attributes(schema))] pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { let input = parse_macro_input!(input as DeriveInput); let struct_ident = input.ident; - let schema_name = get_schema_name_from_attr(input.attrs.iter()).unwrap_or_else(|| { - // default to the struct name, but lowercased - Ident::new( - &struct_ident.to_string().to_lowercase(), - struct_ident.span(), - ) - }); let schema_fields = gen_schema_fields(&input.data); let output = quote! { - impl crate::actions::schemas::GetSchema for #struct_ident { - fn get_schema() -> crate::schema::SchemaRef { - use crate::actions::schemas::GetField; - static SCHEMA_LOCK: std::sync::OnceLock = std::sync::OnceLock::new(); - SCHEMA_LOCK.get_or_init(|| { - std::sync::Arc::new(crate::schema::StructType::new(vec![ - Self::get_field(stringify!(#schema_name)) - ])) - }).clone() // cheap clone, it's an Arc - } - } - impl crate::actions::schemas::GetField for #struct_ident { fn get_field(name: impl Into) -> crate::schema::StructField { use crate::actions::schemas::GetField; diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 5d10149d6..a9af7a875 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -4,12 +4,39 @@ pub(crate) mod schemas; pub(crate) mod visitors; use derive_macros::Schema; -use std::collections::HashMap; +use lazy_static::lazy_static; use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; +use self::deletion_vector::DeletionVectorDescriptor; +use crate::actions::schemas::GetField; use crate::{schema::StructType, DeltaResult, EngineData}; -use self::{deletion_vector::DeletionVectorDescriptor, schemas::GetSchema}; +use std::collections::HashMap; + +lazy_static! { + static ref LOG_SCHEMA: StructType = StructType::new( + vec![ + Option::::get_field("add"), + Option::::get_field("remove"), + Option::::get_field("metaData"), + Option::::get_field("protocol"), + // We don't support the following actions yet + //Option::get_field("cdc"), + //Option::get_field("commitInfo"), + //Option::get_field("domainMetadata"), + //Option::get_field("txn"), + ] + ); +} + +pub(crate) static ADD_NAME: &str = "add"; +pub(crate) static REMOVE_NAME: &str = "remove"; +pub(crate) static METADATA_NAME: &str = "metaData"; +pub(crate) static PROTOCOL_NAME: &str = "protocol"; + +pub(crate) fn get_log_schema() -> &'static StructType { + &LOG_SCHEMA +} #[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Format { @@ -52,7 +79,10 @@ pub struct Metadata { impl Metadata { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = MetadataVisitor::default(); - data.extract(Metadata::get_schema(), &mut visitor)?; + data.extract( + get_log_schema().project_as_schema(&[METADATA_NAME])?, + &mut visitor, + )?; Ok(visitor.metadata) } @@ -80,7 +110,10 @@ pub struct Protocol { impl Protocol { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = ProtocolVisitor::default(); - data.extract(Protocol::get_schema(), &mut visitor)?; + data.extract( + get_log_schema().project_as_schema(&[PROTOCOL_NAME])?, + &mut visitor, + )?; Ok(visitor.protocol) } } @@ -134,7 +167,10 @@ impl Add { /// Since we always want to parse multiple adds from data, we return a `Vec` pub fn parse_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = AddVisitor::default(); - data.extract(Add::get_schema(), &mut visitor)?; + data.extract( + get_log_schema().project_as_schema(&[ADD_NAME])?, + &mut visitor, + )?; Ok(visitor.adds) } @@ -189,43 +225,18 @@ impl Remove { } } -use crate::actions::schemas::GetField; -use lazy_static::lazy_static; - -lazy_static! { - static ref LOG_SCHEMA: StructType = StructType::new( - vec![ - Option::::get_field("add"), - Option::::get_field("remove"), - Option::::get_field("metaData"), - Option::::get_field("protocol"), - // We don't support the following actions yet - //Option::get_field("cdc"), - //Option::get_field("commitInfo"), - //Option::get_field("domainMetadata"), - //Option::get_field("txn"), - ] - ); -} - -#[cfg(test)] -pub(crate) fn get_log_schema() -> &'static StructType { - &LOG_SCHEMA -} - #[cfg(test)] mod tests { use std::sync::Arc; use super::*; - use crate::{ - actions::schemas::GetSchema, - schema::{ArrayType, DataType, MapType, StructField}, - }; + use crate::schema::{ArrayType, DataType, MapType, StructField}; #[test] fn test_metadata_schema() { - let schema = Metadata::get_schema(); + let schema = get_log_schema() + .project_as_schema(&["metaData"]) + .expect("Couldn't get metaData field"); let expected = Arc::new(StructType::new(vec![StructField::new( "metaData", @@ -258,7 +269,7 @@ mod tests { false, ), ]), - false, + true, )])); assert_eq!(schema, expected); } @@ -295,7 +306,9 @@ mod tests { #[test] fn test_remove_schema() { - let schema = Remove::get_schema(); + let schema = get_log_schema() + .project_as_schema(&["remove"]) + .expect("Couldn't get remove field"); let expected = Arc::new(StructType::new(vec![StructField::new( "remove", StructType::new(vec![ @@ -310,7 +323,7 @@ mod tests { StructField::new("baseRowId", DataType::LONG, true), StructField::new("defaultRowCommitVersion", DataType::LONG, true), ]), - false, + true, )])); assert_eq!(schema, expected); } diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index c104e52a1..894bbf8fe 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -2,12 +2,7 @@ use std::collections::HashMap; -use crate::schema::{ArrayType, DataType, MapType, SchemaRef, StructField}; - -/// A trait that says you can ask for the [`Schema`] of the implementor -pub(crate) trait GetSchema { - fn get_schema() -> SchemaRef; -} +use crate::schema::{ArrayType, DataType, MapType, StructField}; /// A trait that allows getting a `StructField` based on the provided name and nullability pub(crate) trait GetField { diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 1625b2e66..1656e9163 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -262,8 +262,7 @@ mod tests { use super::*; use crate::{ - actions::get_log_schema, - actions::schemas::GetSchema, + actions::{get_log_schema, ADD_NAME}, simple_client::{data::SimpleData, json::SimpleJsonHandler, SimpleClient}, EngineData, EngineInterface, JsonHandler, }; @@ -356,7 +355,9 @@ mod tests { let batch = json_handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); - let add_schema = Add::get_schema(); + let add_schema = get_log_schema() + .project_as_schema(&[ADD_NAME]) + .expect("Can't get add schema"); let mut add_visitor = AddVisitor::default(); batch.extract(add_schema, &mut add_visitor).unwrap(); let add1 = Add { diff --git a/kernel/src/scan/file_stream.rs b/kernel/src/scan/file_stream.rs index 5dbf8254d..a2cf11245 100644 --- a/kernel/src/scan/file_stream.rs +++ b/kernel/src/scan/file_stream.rs @@ -1,15 +1,14 @@ use std::collections::HashSet; -use std::sync::Arc; use either::Either; use tracing::debug; use super::data_skipping::DataSkippingFilter; -use crate::actions::schemas::{GetField, GetSchema}; +use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME}; use crate::actions::{visitors::AddVisitor, visitors::RemoveVisitor, Add, Remove}; use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::Expression; -use crate::schema::{SchemaRef, StructType}; +use crate::schema::SchemaRef; use crate::{DataVisitor, DeltaResult, EngineData, EngineInterface}; struct LogReplayScanner { @@ -82,14 +81,11 @@ impl LogReplayScanner { }; let schema_to_use = if is_log_batch { - Arc::new(StructType::new(vec![ - Option::::get_field("add"), - Option::::get_field("remove"), - ])) + get_log_schema().project_as_schema(&[ADD_NAME, REMOVE_NAME])? } else { // All checkpoint actions are already reconciled and Remove actions in checkpoint files // only serve as tombstones for vacuum jobs. So no need to load them here. - Add::get_schema() + get_log_schema().project_as_schema(&[ADD_NAME])? }; let mut visitor = AddRemoveVisitor::default(); actions.extract(schema_to_use, &mut visitor)?; diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 72d9414f8..1806819c3 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -3,8 +3,7 @@ use std::sync::Arc; use itertools::Itertools; use self::file_stream::log_replay_iter; -use crate::actions::schemas::GetField; -use crate::actions::{Add, Remove}; +use crate::actions::{get_log_schema, Add, ADD_NAME, REMOVE_NAME}; use crate::expressions::{Expression, Scalar}; use crate::schema::{DataType, SchemaRef, StructType}; use crate::snapshot::Snapshot; @@ -129,10 +128,7 @@ impl Scan { &self, engine_interface: &dyn EngineInterface, ) -> DeltaResult>> { - let action_schema = Arc::new(StructType::new(vec![ - Option::::get_field("add"), - Option::::get_field("remove"), - ])); + let action_schema = get_log_schema().project_as_schema(&[ADD_NAME, REMOVE_NAME])?; let log_iter = self.snapshot.log_segment.replay( engine_interface, diff --git a/kernel/src/schema.rs b/kernel/src/schema.rs index 7d3050af2..94b29834b 100644 --- a/kernel/src/schema.rs +++ b/kernel/src/schema.rs @@ -3,8 +3,11 @@ use std::sync::Arc; use std::{collections::HashMap, fmt::Display}; use indexmap::IndexMap; +use itertools::Itertools; use serde::{Deserialize, Serialize}; +use crate::{DeltaResult, Error}; + pub type Schema = StructType; pub type SchemaRef = Arc; @@ -140,6 +143,38 @@ impl StructType { } } + /// Get a [`StructType`] containing [`StructField`]s of the given names, preserving the original + /// order of fields. Returns an Err if a specified field doesn't exist + pub fn project(&self, names: &[impl AsRef]) -> DeltaResult { + let mut indexes: Vec = names + .iter() + .map(|name| { + self.fields + .get_index_of(name.as_ref()) + .ok_or_else(|| Error::missing_column(name.as_ref())) + }) + .try_collect()?; + indexes.sort(); // keep schema order + let fields: Vec = indexes + .iter() + .map(|index| { + self.fields + .get_index(*index) + .expect("get_index_of returned non-existant index") + .1 + .clone() + }) + .collect(); + Ok(Self::new(fields)) + } + + /// Get a [`SchemaRef`] containing [`StructField`]s of the given names, preserving the original + /// order of fields. Returns an Err if a specified field doesn't exist + pub fn project_as_schema(&self, names: &[impl AsRef]) -> DeltaResult { + let struct_type = self.project(names)?; + Ok(Arc::new(struct_type)) + } + pub fn field(&self, name: impl AsRef) -> Option<&StructField> { self.fields.get(name.as_ref()) } diff --git a/kernel/src/simple_client/data.rs b/kernel/src/simple_client/data.rs index 72da526aa..5b0999708 100644 --- a/kernel/src/simple_client/data.rs +++ b/kernel/src/simple_client/data.rs @@ -214,13 +214,16 @@ impl SimpleData { .filter(|a| *a.data_type() != ArrowDataType::Null); // Note: if col is None we have either: // a) encountered a column that is all nulls or, - // b) recursed into a struct that was all null. - // So below if the field is allowed to be null, we push that, otherwise we error out. + // b) recursed into a optional struct that was null. In this case, array.is_none() is + // true and we don't need to check field nullability, because we assume all fields + // of a nullable struct can be null + // So below if the field is allowed to be null, OR array.is_none() we push that, + // otherwise we error out. if let Some(col) = col { Self::extract_column(out_col_array, field, col)?; - } else if field.is_nullable() { - if let DataType::Struct(_) = field.data_type() { - Self::extract_columns_from_array(out_col_array, schema, None)?; + } else if array.is_none() || field.is_nullable() { + if let DataType::Struct(inner_struct) = field.data_type() { + Self::extract_columns_from_array(out_col_array, inner_struct.as_ref(), None)?; } else { debug!("Pushing a null field for {}", field.name); out_col_array.push(&()); @@ -314,7 +317,7 @@ mod tests { use arrow_array::{RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; - use crate::actions::Metadata; + use crate::actions::{Metadata, Protocol}; use crate::DeltaResult; use crate::{ actions::get_log_schema, @@ -348,4 +351,21 @@ mod tests { assert_eq!(metadata.partition_columns, vec!("c1", "c2")); Ok(()) } + + #[test] + fn test_nullable_struct() -> DeltaResult<()> { + let client = SimpleClient::new(); + let handler = client.get_json_handler(); + let json_strings: StringArray = vec![ + r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, + ] + .into(); + let output_schema = get_log_schema().project_as_schema(&["metaData"])?; + let parsed = handler + .parse_json(string_array_to_engine_data(json_strings), output_schema) + .unwrap(); + let protocol = Protocol::try_new_from_data(parsed.as_ref())?; + assert!(protocol.is_none()); + Ok(()) + } } From 9bca3f562d6ea2483ca00768d9aca16345a7791c Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 12 Mar 2024 12:05:40 -0700 Subject: [PATCH 16/19] remove unneeded annotations --- kernel/src/actions/deletion_vector.rs | 1 - kernel/src/actions/mod.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/kernel/src/actions/deletion_vector.rs b/kernel/src/actions/deletion_vector.rs index b8bdb9b24..a8d80376a 100644 --- a/kernel/src/actions/deletion_vector.rs +++ b/kernel/src/actions/deletion_vector.rs @@ -11,7 +11,6 @@ use url::Url; use crate::{DeltaResult, Error, FileSystemClient}; #[derive(Debug, Clone, PartialEq, Eq, Schema)] -#[schema(name = deletionVector)] pub struct DeletionVectorDescriptor { /// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p']. pub storage_type: String, diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index a9af7a875..3eb325909 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -56,7 +56,6 @@ impl Default for Format { } #[derive(Debug, Default, Clone, PartialEq, Eq, Schema)] -#[schema(name = metaData)] pub struct Metadata { /// Unique identifier for this table pub id: String, From 0bf4c3a36e875afd33c3679141bd527909c1faf3 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 12 Mar 2024 12:09:58 -0700 Subject: [PATCH 17/19] move snapshot to new way --- kernel/src/snapshot.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index c91886d56..84c7ae77d 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -9,10 +9,9 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use url::Url; -use crate::actions::schemas::GetField; -use crate::actions::{Metadata, Protocol}; +use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}; use crate::path::LogPath; -use crate::schema::{Schema, SchemaRef, StructType}; +use crate::schema::{Schema, SchemaRef}; use crate::{DeltaResult, EngineInterface, Error, FileMeta, FileSystemClient, Version}; use crate::{EngineData, Expression}; @@ -67,11 +66,8 @@ impl LogSegment { &self, engine_interface: &dyn EngineInterface, ) -> DeltaResult> { - let schema = StructType::new(vec![ - Option::::get_field("metaData"), - Option::::get_field("protocol"), - ]); - let data_batches = self.replay(engine_interface, Arc::new(schema), None)?; + let schema = get_log_schema().project_as_schema(&[METADATA_NAME, PROTOCOL_NAME])?; + let data_batches = self.replay(engine_interface, schema, None)?; let mut metadata_opt: Option = None; let mut protocol_opt: Option = None; for batch in data_batches { From b273e3b8743d5a745439b250c93468fd85346b23 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 12 Mar 2024 12:12:45 -0700 Subject: [PATCH 18/19] use static names --- kernel/src/actions/mod.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 3eb325909..a72f811e0 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -13,27 +13,27 @@ use crate::{schema::StructType, DeltaResult, EngineData}; use std::collections::HashMap; +pub(crate) static ADD_NAME: &str = "add"; +pub(crate) static REMOVE_NAME: &str = "remove"; +pub(crate) static METADATA_NAME: &str = "metaData"; +pub(crate) static PROTOCOL_NAME: &str = "protocol"; + lazy_static! { static ref LOG_SCHEMA: StructType = StructType::new( vec![ - Option::::get_field("add"), - Option::::get_field("remove"), - Option::::get_field("metaData"), - Option::::get_field("protocol"), + Option::::get_field(ADD_NAME), + Option::::get_field(REMOVE_NAME), + Option::::get_field(METADATA_NAME), + Option::::get_field(PROTOCOL_NAME), // We don't support the following actions yet - //Option::get_field("cdc"), - //Option::get_field("commitInfo"), - //Option::get_field("domainMetadata"), - //Option::get_field("txn"), + //Option::get_field(CDC_NAME), + //Option::get_field(COMMIT_INFO_NAME), + //Option::get_field(DOMAIN_METADATA_NAME), + //Option::get_field(TRANSACTION_NAME), ] ); } -pub(crate) static ADD_NAME: &str = "add"; -pub(crate) static REMOVE_NAME: &str = "remove"; -pub(crate) static METADATA_NAME: &str = "metaData"; -pub(crate) static PROTOCOL_NAME: &str = "protocol"; - pub(crate) fn get_log_schema() -> &'static StructType { &LOG_SCHEMA } From 505983e2cd726ab0c8c1bdf9180c7c1f3364ae86 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 12 Mar 2024 12:13:47 -0700 Subject: [PATCH 19/19] fix comment --- derive-macros/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index f9665def5..dcb4a6b81 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -18,8 +18,7 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream crate::schema::StructType::new(vec![ #schema_fields ]), - // TODO: Ensure correct. By default not nullable, only can be made nullable by - // being wrapped in an Option + // By default not nullable. To make something nullable wrap it in an Option false, ) }