From c5f5e817dcd0007f2f14b571f4a772685d001b37 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Wed, 28 Feb 2024 10:54:59 -0800 Subject: [PATCH 01/35] initial derive macros --- Cargo.toml | 1 + derive-macros/Cargo.toml | 20 ++++ derive-macros/src/lib.rs | 160 ++++++++++++++++++++++++++ kernel/Cargo.toml | 3 + kernel/src/actions/deletion_vector.rs | 3 +- kernel/src/actions/mod.rs | 21 +++- 6 files changed, 201 insertions(+), 7 deletions(-) create mode 100644 derive-macros/Cargo.toml create mode 100644 derive-macros/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index d7ef7832c..9ece4d6bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "acceptance", + "derive-macros", "ffi", "kernel", "kernel/examples/*", diff --git a/derive-macros/Cargo.toml b/derive-macros/Cargo.toml new file mode 100644 index 000000000..3d82ddb7d --- /dev/null +++ b/derive-macros/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "derive-macros" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true +readme.workspace = true +version.workspace = true + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = "1" +syn = "2.0" +quote = "1.0" + + diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs new file mode 100644 index 000000000..c989df5b2 --- /dev/null +++ b/derive-macros/src/lib.rs @@ -0,0 +1,160 @@ +use proc_macro2::TokenStream; +use quote::{quote, quote_spanned}; +use syn::spanned::Spanned; +use syn::{ + parse_macro_input, Data, DataStruct, DeriveInput, Fields, GenericArgument, PathArguments, + PathSegment, Type, +}; + +#[proc_macro_derive(Schema)] +pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let input = parse_macro_input!(input as DeriveInput); + let struct_ident = input.ident; + let schema_fields = gen_schema_fields(&input.data); + let output = quote! { + impl crate::actions::GetSchema for #struct_ident { + fn get_schema() -> crate::schema::StructField { + crate::schema::StructField::new( + // TODO: this is wrong, we probably need an attribute for this since it doesn't + // exactly follow the struct name + stringify!(#struct_ident), + crate::schema::StructType::new(vec![ + #schema_fields + ]), + true, // TODO: how to determine nullable + ) + } + } + }; + proc_macro::TokenStream::from(output) +} + +/// Get the generic types out of something like Option or HashMap +fn get_generic_types(path_args: &PathArguments) -> TokenStream { + if let PathArguments::AngleBracketed(gen_args) = path_args { + let types = gen_args.args.iter().map(|arg| { + if let GenericArgument::Type(typ) = arg { + match typ { + Type::Path(ref type_path) => { + if let Some(fin) = type_path.path.segments.iter().last() { + get_data_type(fin) + } else { + panic!("Path for generic type must have last") + } + } + _ => panic!("Generic needs type path"), + } + } else { + panic!("Only support schema for literal generic types") + } + }); + quote! { + #(#types),* + } + } else { + panic!("Can only handle angle bracketed (i.e. <>) generic args") + } +} + +fn get_data_type(path_segment: &PathSegment) -> Option { + let name = path_segment.ident.to_string(); + match name.as_str() { + "String" => Some(quote! { crate::schema::DataType::STRING }), + "i64" => Some(quote! { crate::schema::DataType::LONG }), + "i32" => Some(quote! { crate::schema::DataType::INTEGER }), + "i16" => Some(quote! { crate::schema::DataType::SHORT }), + "char" => Some(quote! { crate::schema::DataType::BYTE }), // TODO: Correct rust type + "f32" => Some(quote! { crate::schema::DataType::FLOAT }), + "f64" => Some(quote! { crate::schema::DataType::DOUBLE }), + "bool" => Some(quote! { crate::schema::DataType::BOOLEAN }), + // TODO: Binary, Date and Timestamp rust types + "HashMap" => { + let types = get_generic_types(&path_segment.arguments); + Some(quote! { + crate::schema::MapType::new(#types, true) // TODO (how to determine if value contains null) + }) + } + "Option" => { + let option_type = get_generic_types(&path_segment.arguments); + if option_type.to_string() == "" { + // This indicates that the thing in the option didn't have a directly known data type + // TODO get_generic_types should probably return an Option + None + } else { + Some(quote! { #option_type }) + } + } + "Vec" => { + let vec_type = get_generic_types(&path_segment.arguments); + Some(quote! { + crate::schema::ArrayType::new(#vec_type, false) // TODO (how to determine if contains null) + }) + } + _ => { + // assume it's a struct type that implements get_schema, will be handled in called + None + } + } +} + +fn gen_schema_fields(data: &Data) -> TokenStream { + let fields = match data { + Data::Struct(DataStruct { + fields: Fields::Named(fields), + .. + }) => &fields.named, + _ => panic!("this derive macro only works on structs with named fields"), + }; + + let schema_fields = fields.iter().map(|field| { + let name = field.ident.as_ref().unwrap(); // we know these are named fields + match field.ty { + Type::Path(ref type_path) => { + if let Some(fin) = type_path.path.segments.iter().last() { + let optional = fin.ident == "Option"; + if let Some(schema_type) = get_data_type(fin) { + quote_spanned! {field.span()=> + crate::schema::StructField::new(stringify!(#name), #schema_type, #optional) + } + } else { + // Return of None means it's some type we don't support directly, just + // assume it implements GetSchema + let ident = if optional { + // if optional, use name of inside thing + // TODO: this is similar to get_generic_types, unify + if let PathArguments::AngleBracketed(gen_args) = &fin.arguments { + assert!(gen_args.args.iter().len() == 1); + match gen_args.args.iter().next().unwrap() { + GenericArgument::Type(typ) => { + match typ { + Type::Path(ref type_path) => { + if let Some(fin) = type_path.path.segments.iter().last() { + fin.ident.clone() + } else { + panic!("Path for generic type must have last") + } + } + _ => panic!("Generic needs type path") + } + } + _ => panic!("Option must be a generic arg"), + } + } else { + panic!("Need angle bracketed Option types"); + } + } else { + fin.ident.clone() + }; + quote_spanned! {field.span()=> + #ident::get_schema() + } + } + } else { + panic!("Cound't get type"); + } + } + _ => { panic!("Can't handle type: {:?}", field.ty); } + } + }); + quote! { #(#schema_fields),* } +} diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index f8e4fe4a3..c63acc49d 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -29,6 +29,9 @@ url = "2" uuid = "1.3.0" z85 = "3.0.5" +# bring in our derive macros +derive-macros = { path = "../derive-macros" } + # used for developer-visibility visibility = "0.1.0" diff --git a/kernel/src/actions/deletion_vector.rs b/kernel/src/actions/deletion_vector.rs index 0238f8c05..a8d80376a 100644 --- a/kernel/src/actions/deletion_vector.rs +++ b/kernel/src/actions/deletion_vector.rs @@ -4,12 +4,13 @@ use std::io::{Cursor, Read}; use std::sync::Arc; use bytes::Bytes; +use derive_macros::Schema; use roaring::RoaringTreemap; use url::Url; use crate::{DeltaResult, Error, FileSystemClient}; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct DeletionVectorDescriptor { /// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p']. pub storage_type: String, diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index a2c73d772..df8c56fa0 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -3,14 +3,23 @@ pub(crate) mod deletion_vector; pub(crate) mod schemas; pub(crate) mod visitors; +use derive_macros::Schema; use std::{collections::HashMap, sync::Arc}; use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; -use crate::{schema::StructType, DeltaResult, EngineData}; +use crate::{ + schema::{StructField, StructType}, + DeltaResult, EngineData, +}; use self::deletion_vector::DeletionVectorDescriptor; -#[derive(Debug, Clone, PartialEq, Eq)] +/// A trait that says you can ask for the [`Schema`] of the implementor +trait GetSchema { + fn get_schema() -> StructField; +} + +#[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Format { /// Name of the encoding for files in this table pub provider: String, @@ -27,7 +36,7 @@ impl Default for Format { } } -#[derive(Debug, Default, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Clone, PartialEq, Eq, Schema)] pub struct Metadata { /// Unique identifier for this table pub id: String, @@ -60,7 +69,7 @@ impl Metadata { } } -#[derive(Default, Debug, Clone, PartialEq, Eq)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Schema)] pub struct Protocol { /// The minimum version of the Delta read protocol that a client must implement /// in order to correctly read this table @@ -85,7 +94,7 @@ impl Protocol { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Add { /// A relative path to a data file from the root of the table or an absolute path to a file /// that should be added to the table. The path is a URI as specified by @@ -144,7 +153,7 @@ impl Add { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Schema)] pub(crate) struct Remove { /// A relative path to a data file from the root of the table or an absolute path to a file /// that should be added to the table. The path is a URI as specified by From f2f6e34e53399a0320384cf06b81987de379f449 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 29 Feb 2024 18:08:02 -0800 Subject: [PATCH 02/35] simpler version --- derive-macros/src/lib.rs | 186 ++++++++++++-------------- kernel/src/actions/deletion_vector.rs | 1 + kernel/src/actions/mod.rs | 11 +- kernel/src/actions/schemas.rs | 46 +++++++ 4 files changed, 131 insertions(+), 113 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index c989df5b2..84a00bc9f 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -1,23 +1,72 @@ -use proc_macro2::TokenStream; +use proc_macro2::{Ident, Spacing, TokenStream, TokenTree}; use quote::{quote, quote_spanned}; use syn::spanned::Spanned; use syn::{ - parse_macro_input, Data, DataStruct, DeriveInput, Fields, GenericArgument, PathArguments, - PathSegment, Type, + parse_macro_input, Attribute, Data, DataStruct, DeriveInput, Fields, Meta, PathArguments, Type, }; -#[proc_macro_derive(Schema)] +// Return the ident to use as the schema name if it's been specified in the attributes of the struct +// TODO: can we simplify this? +fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> Option { + for attr in attrs { + if let Meta::List(list) = &attr.meta { + if let Some(attr_name) = list.path.segments.iter().last() { + if attr_name.ident == "schema" { + // We have some schema(...) attribute, see if we've specified a different name + let tokens: Vec = list.tokens.clone().into_iter().collect(); + if tokens.len() == 3 { + // we only support `name = name` style + if let TokenTree::Ident(ref ident) = tokens[0] { + assert!(ident == "name"); + } else { + panic!("schema(...) only supports schema(name = name)"); + } + // ensure a normal = sign in the specification + if let TokenTree::Punct(ref punct) = tokens[1] { + assert!(punct.as_char() == '='); + assert!(punct.spacing() == Spacing::Alone); + } else { + panic!("schema(...) only supports schema(name = name)"); + } + if let TokenTree::Ident(ref ident) = tokens[2] { + return Some(ident.clone()); + } else { + panic!("schema(...) only supports schema(name = name)"); + } + } + } else { + panic!("Schema only accepts `schema` as an extra attribute") + } + } + } + } + None +} + +#[proc_macro_derive(Schema, attributes(schema))] pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { let input = parse_macro_input!(input as DeriveInput); let struct_ident = input.ident; + let schema_name = + get_schema_name_from_attr(input.attrs.iter()).unwrap_or_else(|| { + // default to the struct name, but lowercased + Ident::new(&struct_ident.to_string().to_lowercase(), struct_ident.span()) + }); + let schema_fields = gen_schema_fields(&input.data); let output = quote! { - impl crate::actions::GetSchema for #struct_ident { + impl crate::actions::schemas::GetSchema for #struct_ident { fn get_schema() -> crate::schema::StructField { + use crate::actions::schemas::GetField; + Self::get_field(stringify!(#schema_name)) + } + } + + impl crate::actions::schemas::GetField for #struct_ident { + fn get_field(name: impl Into) -> crate::schema::StructField { + use crate::actions::schemas::GetField; crate::schema::StructField::new( - // TODO: this is wrong, we probably need an attribute for this since it doesn't - // exactly follow the struct name - stringify!(#struct_ident), + name, crate::schema::StructType::new(vec![ #schema_fields ]), @@ -29,72 +78,26 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream proc_macro::TokenStream::from(output) } -/// Get the generic types out of something like Option or HashMap -fn get_generic_types(path_args: &PathArguments) -> TokenStream { - if let PathArguments::AngleBracketed(gen_args) = path_args { - let types = gen_args.args.iter().map(|arg| { - if let GenericArgument::Type(typ) = arg { - match typ { - Type::Path(ref type_path) => { - if let Some(fin) = type_path.path.segments.iter().last() { - get_data_type(fin) - } else { - panic!("Path for generic type must have last") - } - } - _ => panic!("Generic needs type path"), - } - } else { - panic!("Only support schema for literal generic types") - } - }); - quote! { - #(#types),* - } - } else { - panic!("Can only handle angle bracketed (i.e. <>) generic args") - } -} -fn get_data_type(path_segment: &PathSegment) -> Option { - let name = path_segment.ident.to_string(); - match name.as_str() { - "String" => Some(quote! { crate::schema::DataType::STRING }), - "i64" => Some(quote! { crate::schema::DataType::LONG }), - "i32" => Some(quote! { crate::schema::DataType::INTEGER }), - "i16" => Some(quote! { crate::schema::DataType::SHORT }), - "char" => Some(quote! { crate::schema::DataType::BYTE }), // TODO: Correct rust type - "f32" => Some(quote! { crate::schema::DataType::FLOAT }), - "f64" => Some(quote! { crate::schema::DataType::DOUBLE }), - "bool" => Some(quote! { crate::schema::DataType::BOOLEAN }), - // TODO: Binary, Date and Timestamp rust types - "HashMap" => { - let types = get_generic_types(&path_segment.arguments); - Some(quote! { - crate::schema::MapType::new(#types, true) // TODO (how to determine if value contains null) - }) - } - "Option" => { - let option_type = get_generic_types(&path_segment.arguments); - if option_type.to_string() == "" { - // This indicates that the thing in the option didn't have a directly known data type - // TODO get_generic_types should probably return an Option - None +// turn our struct name into the schema name, goes from snake_case to camelCase +fn get_schema_name(name: &Ident) -> Ident { + let snake_name = name.to_string(); + let mut next_caps = false; + let ret: String = snake_name.chars().filter_map(|c| { + if c == '_' { + next_caps = true; + None + } else { + if next_caps { + next_caps = false; + // This assumes we're basically using ascii, should be okay + Some(c.to_uppercase().next().unwrap()) } else { - Some(quote! { #option_type }) + Some(c) } } - "Vec" => { - let vec_type = get_generic_types(&path_segment.arguments); - Some(quote! { - crate::schema::ArrayType::new(#vec_type, false) // TODO (how to determine if contains null) - }) - } - _ => { - // assume it's a struct type that implements get_schema, will be handled in called - None - } - } + }).collect(); + Ident::new(&ret, name.span()) } fn gen_schema_fields(data: &Data) -> TokenStream { @@ -108,52 +111,27 @@ fn gen_schema_fields(data: &Data) -> TokenStream { let schema_fields = fields.iter().map(|field| { let name = field.ident.as_ref().unwrap(); // we know these are named fields + let name = get_schema_name(name); match field.ty { Type::Path(ref type_path) => { if let Some(fin) = type_path.path.segments.iter().last() { - let optional = fin.ident == "Option"; - if let Some(schema_type) = get_data_type(fin) { + let type_ident = &fin.ident; + if let PathArguments::AngleBracketed(angle_args) = &fin.arguments { quote_spanned! {field.span()=> - crate::schema::StructField::new(stringify!(#name), #schema_type, #optional) + #type_ident::#angle_args::get_field(stringify!(#name)) } } else { - // Return of None means it's some type we don't support directly, just - // assume it implements GetSchema - let ident = if optional { - // if optional, use name of inside thing - // TODO: this is similar to get_generic_types, unify - if let PathArguments::AngleBracketed(gen_args) = &fin.arguments { - assert!(gen_args.args.iter().len() == 1); - match gen_args.args.iter().next().unwrap() { - GenericArgument::Type(typ) => { - match typ { - Type::Path(ref type_path) => { - if let Some(fin) = type_path.path.segments.iter().last() { - fin.ident.clone() - } else { - panic!("Path for generic type must have last") - } - } - _ => panic!("Generic needs type path") - } - } - _ => panic!("Option must be a generic arg"), - } - } else { - panic!("Need angle bracketed Option types"); - } - } else { - fin.ident.clone() - }; quote_spanned! {field.span()=> - #ident::get_schema() + #type_ident::get_field(stringify!(#name)) } } } else { - panic!("Cound't get type"); + panic!("Couldn't get type"); } } - _ => { panic!("Can't handle type: {:?}", field.ty); } + _ => { + panic!("Can't handle type: {:?}", field.ty); + } } }); quote! { #(#schema_fields),* } diff --git a/kernel/src/actions/deletion_vector.rs b/kernel/src/actions/deletion_vector.rs index a8d80376a..b8bdb9b24 100644 --- a/kernel/src/actions/deletion_vector.rs +++ b/kernel/src/actions/deletion_vector.rs @@ -11,6 +11,7 @@ use url::Url; use crate::{DeltaResult, Error, FileSystemClient}; #[derive(Debug, Clone, PartialEq, Eq, Schema)] +#[schema(name = deletionVector)] pub struct DeletionVectorDescriptor { /// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p']. pub storage_type: String, diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index df8c56fa0..8c4edd5f8 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -7,18 +7,10 @@ use derive_macros::Schema; use std::{collections::HashMap, sync::Arc}; use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; -use crate::{ - schema::{StructField, StructType}, - DeltaResult, EngineData, -}; +use crate::{schema::StructType, DeltaResult, EngineData}; use self::deletion_vector::DeletionVectorDescriptor; -/// A trait that says you can ask for the [`Schema`] of the implementor -trait GetSchema { - fn get_schema() -> StructField; -} - #[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Format { /// Name of the encoding for files in this table @@ -37,6 +29,7 @@ impl Default for Format { } #[derive(Debug, Default, Clone, PartialEq, Eq, Schema)] +#[schema(name = metaData)] pub struct Metadata { /// Unique identifier for this table pub id: String, diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index a1cbf890c..d723d493f 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -1,9 +1,55 @@ //! Schema definitions for action types +use std::collections::HashMap; + use lazy_static::lazy_static; use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; +/// A trait that says you can ask for the [`Schema`] of the implementor +pub(crate) trait GetSchema { + fn get_schema() -> StructField; +} + +/// A trait that allows getting a `StructField` based on the provided name and nullability +pub(crate) trait GetField { + fn get_field(name: impl Into) -> StructField; +} + +macro_rules! impl_get_field { + ( $(($rust_type: ty, $kernel_type: expr)), * ) => { + $( + impl GetField for $rust_type { + fn get_field(name: impl Into) -> StructField { + StructField::new(name, $kernel_type, false) + } + } + )* + }; +} + +impl_get_field!( + (String, DataType::STRING), + (i64, DataType::LONG), + (i32, DataType::INTEGER), + (i16, DataType::SHORT), + (char, DataType::BYTE), + (f32, DataType::FLOAT), + (f64, DataType::DOUBLE), + (bool, DataType::BOOLEAN), + (HashMap, MapType::new(DataType::STRING, DataType::STRING, false)), + (HashMap>, MapType::new(DataType::STRING, DataType::STRING, true)), + (Vec, ArrayType::new(DataType::STRING, false)) +); + +impl GetField for Option { + fn get_field(name: impl Into) -> StructField { + let mut inner = T::get_field(name); + inner.nullable = true; + inner + } +} + lazy_static! { // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata pub(crate) static ref METADATA_FIELD: StructField = StructField::new( From 9abe8ea333cd9462eb74772d429945e061526a9b Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 1 Mar 2024 09:47:19 -0800 Subject: [PATCH 03/35] Update derive-macros/src/lib.rs Co-authored-by: Ryan Johnson --- derive-macros/src/lib.rs | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 84a00bc9f..a7973de94 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -14,25 +14,14 @@ fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> if attr_name.ident == "schema" { // We have some schema(...) attribute, see if we've specified a different name let tokens: Vec = list.tokens.clone().into_iter().collect(); - if tokens.len() == 3 { + match tokens[..] { // we only support `name = name` style - if let TokenTree::Ident(ref ident) = tokens[0] { + [TokenTree::Ident(ref ident), TokenTree::Punct(ref punct), TokenTree::Ident(ref ident)] => { assert!(ident == "name"); - } else { - panic!("schema(...) only supports schema(name = name)"); - } - // ensure a normal = sign in the specification - if let TokenTree::Punct(ref punct) = tokens[1] { assert!(punct.as_char() == '='); - assert!(punct.spacing() == Spacing::Alone); - } else { - panic!("schema(...) only supports schema(name = name)"); - } - if let TokenTree::Ident(ref ident) = tokens[2] { return Some(ident.clone()); - } else { - panic!("schema(...) only supports schema(name = name)"); } + _ => panic!("schema(...) only supports schema(name = name)"), } } else { panic!("Schema only accepts `schema` as an extra attribute") From addb7a33b7139e477b4ed063f9961feae756ebaa Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 1 Mar 2024 09:50:26 -0800 Subject: [PATCH 04/35] cleanup get_schema_name_from_attr< --- derive-macros/src/lib.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index a7973de94..9f737b6e6 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -5,8 +5,9 @@ use syn::{ parse_macro_input, Attribute, Data, DataStruct, DeriveInput, Fields, Meta, PathArguments, Type, }; +static SCHEMA_ERR_STR: &str = "schema(...) only supports schema(name = name)"; + // Return the ident to use as the schema name if it's been specified in the attributes of the struct -// TODO: can we simplify this? fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> Option { for attr in attrs { if let Meta::List(list) = &attr.meta { @@ -16,12 +17,13 @@ fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> let tokens: Vec = list.tokens.clone().into_iter().collect(); match tokens[..] { // we only support `name = name` style - [TokenTree::Ident(ref ident), TokenTree::Punct(ref punct), TokenTree::Ident(ref ident)] => { - assert!(ident == "name"); - assert!(punct.as_char() == '='); - return Some(ident.clone()); + [TokenTree::Ident(ref name_ident), TokenTree::Punct(ref punct), TokenTree::Ident(ref schema_ident)] => { + assert!(name_ident == "name", "{}", SCHEMA_ERR_STR); + assert!(punct.as_char() == '=', "{}", SCHEMA_ERR_STR); + assert!(punct.spacing() == Spacing::Alone, "{}", SCHEMA_ERR_STR); + return Some(schema_ident.clone()); } - _ => panic!("schema(...) only supports schema(name = name)"), + _ => panic!("{}", SCHEMA_ERR_STR), } } else { panic!("Schema only accepts `schema` as an extra attribute") From 7f00ac12d22d439f9a1e18cf7bae9ca332992b43 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 1 Mar 2024 09:51:25 -0800 Subject: [PATCH 05/35] to_ascii_uppercase makes more sense --- derive-macros/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 9f737b6e6..9f407707d 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -81,8 +81,8 @@ fn get_schema_name(name: &Ident) -> Ident { } else { if next_caps { next_caps = false; - // This assumes we're basically using ascii, should be okay - Some(c.to_uppercase().next().unwrap()) + // This assumes we're using ascii, should be okay + Some(c.to_ascii_uppercase()) } else { Some(c) } From 9dcc09621e9c86ba796a95426b352a681f539517 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 1 Mar 2024 10:07:19 -0800 Subject: [PATCH 06/35] add test --- derive-macros/src/lib.rs | 4 ++- kernel/src/actions/mod.rs | 54 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 9f407707d..08b0c0e06 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -61,7 +61,9 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream crate::schema::StructType::new(vec![ #schema_fields ]), - true, // TODO: how to determine nullable + // TODO: Ensure correct. By default not nullable, only can be made nullable by + // being wrapped in an Option + false, ) } } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 8c4edd5f8..0658b3497 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -203,3 +203,57 @@ impl Remove { self.deletion_vector.as_ref().map(|dv| dv.unique_id()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{actions::schemas::GetSchema, schema::{StructField, DataType, ArrayType, MapType}}; + + #[test] + fn test_metadata_schema() { + let schema = Metadata::get_schema(); + + let expected = StructField::new( + "metaData", + StructType::new(vec![ + StructField::new("id", DataType::STRING, false), + StructField::new("name", DataType::STRING, true), + StructField::new("description", DataType::STRING, true), + StructField::new( + "format", + StructType::new(vec![ + StructField::new("provider", DataType::STRING, false), + StructField::new( + "options", + MapType::new( + DataType::STRING, + DataType::STRING, + false, + ), + false, + ), + ]), + false, + ), + StructField::new("schemaString", DataType::STRING, false), + StructField::new( + "partitionColumns", + ArrayType::new(DataType::STRING, false), + false, + ), + StructField::new("createdTime", DataType::LONG, true), + StructField::new( + "configuration", + MapType::new( + DataType::STRING, + DataType::STRING, + true, + ), + false, + ), + ]), + false, + ); + assert_eq!(schema, expected); + } +} From 6d00e4396efc1403ad16c47860cd2c9176b8fdd7 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 1 Mar 2024 10:07:40 -0800 Subject: [PATCH 07/35] fmt --- derive-macros/src/lib.rs | 43 ++++++++++++++++++++++----------------- kernel/src/actions/mod.rs | 17 ++++++---------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 08b0c0e06..dd720a6a0 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -17,7 +17,8 @@ fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> let tokens: Vec = list.tokens.clone().into_iter().collect(); match tokens[..] { // we only support `name = name` style - [TokenTree::Ident(ref name_ident), TokenTree::Punct(ref punct), TokenTree::Ident(ref schema_ident)] => { + [TokenTree::Ident(ref name_ident), TokenTree::Punct(ref punct), TokenTree::Ident(ref schema_ident)] => + { assert!(name_ident == "name", "{}", SCHEMA_ERR_STR); assert!(punct.as_char() == '=', "{}", SCHEMA_ERR_STR); assert!(punct.spacing() == Spacing::Alone, "{}", SCHEMA_ERR_STR); @@ -38,11 +39,13 @@ fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { let input = parse_macro_input!(input as DeriveInput); let struct_ident = input.ident; - let schema_name = - get_schema_name_from_attr(input.attrs.iter()).unwrap_or_else(|| { - // default to the struct name, but lowercased - Ident::new(&struct_ident.to_string().to_lowercase(), struct_ident.span()) - }); + let schema_name = get_schema_name_from_attr(input.attrs.iter()).unwrap_or_else(|| { + // default to the struct name, but lowercased + Ident::new( + &struct_ident.to_string().to_lowercase(), + struct_ident.span(), + ) + }); let schema_fields = gen_schema_fields(&input.data); let output = quote! { @@ -71,25 +74,27 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream proc_macro::TokenStream::from(output) } - // turn our struct name into the schema name, goes from snake_case to camelCase fn get_schema_name(name: &Ident) -> Ident { let snake_name = name.to_string(); let mut next_caps = false; - let ret: String = snake_name.chars().filter_map(|c| { - if c == '_' { - next_caps = true; - None - } else { - if next_caps { - next_caps = false; - // This assumes we're using ascii, should be okay - Some(c.to_ascii_uppercase()) + let ret: String = snake_name + .chars() + .filter_map(|c| { + if c == '_' { + next_caps = true; + None } else { - Some(c) + if next_caps { + next_caps = false; + // This assumes we're using ascii, should be okay + Some(c.to_ascii_uppercase()) + } else { + Some(c) + } } - } - }).collect(); + }) + .collect(); Ident::new(&ret, name.span()) } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 0658b3497..b6bdd59be 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -207,7 +207,10 @@ impl Remove { #[cfg(test)] mod tests { use super::*; - use crate::{actions::schemas::GetSchema, schema::{StructField, DataType, ArrayType, MapType}}; + use crate::{ + actions::schemas::GetSchema, + schema::{ArrayType, DataType, MapType, StructField}, + }; #[test] fn test_metadata_schema() { @@ -225,11 +228,7 @@ mod tests { StructField::new("provider", DataType::STRING, false), StructField::new( "options", - MapType::new( - DataType::STRING, - DataType::STRING, - false, - ), + MapType::new(DataType::STRING, DataType::STRING, false), false, ), ]), @@ -244,11 +243,7 @@ mod tests { StructField::new("createdTime", DataType::LONG, true), StructField::new( "configuration", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), + MapType::new(DataType::STRING, DataType::STRING, true), false, ), ]), From 48219eef82613c36bd31f797b7d23f7332404967 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 1 Mar 2024 12:22:07 -0800 Subject: [PATCH 08/35] clippy --- derive-macros/src/lib.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index dd720a6a0..cb94d7a87 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -84,14 +84,12 @@ fn get_schema_name(name: &Ident) -> Ident { if c == '_' { next_caps = true; None + } else if next_caps { + next_caps = false; + // This assumes we're using ascii, should be okay + Some(c.to_ascii_uppercase()) } else { - if next_caps { - next_caps = false; - // This assumes we're using ascii, should be okay - Some(c.to_ascii_uppercase()) - } else { - Some(c) - } + Some(c) } }) .collect(); From 5e621486b8929e59cd238639221cb3ade1cdcc20 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 5 Mar 2024 14:52:09 -0800 Subject: [PATCH 09/35] make init static (at cost of returning a ref) --- derive-macros/src/lib.rs | 7 +++++-- kernel/src/actions/mod.rs | 6 +++--- kernel/src/actions/schemas.rs | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index cb94d7a87..b00a331b5 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -50,9 +50,12 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream let schema_fields = gen_schema_fields(&input.data); let output = quote! { impl crate::actions::schemas::GetSchema for #struct_ident { - fn get_schema() -> crate::schema::StructField { + fn get_schema() -> &'static crate::schema::StructField { use crate::actions::schemas::GetField; - Self::get_field(stringify!(#schema_name)) + static SCHEMA_LOCK: std::sync::OnceLock = std::sync::OnceLock::new(); + SCHEMA_LOCK.get_or_init(|| { + Self::get_field(stringify!(#schema_name)) + }) } } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index b6bdd59be..d09125026 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -9,7 +9,7 @@ use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; use crate::{schema::StructType, DeltaResult, EngineData}; -use self::deletion_vector::DeletionVectorDescriptor; +use self::{deletion_vector::DeletionVectorDescriptor, schemas::GetSchema}; #[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Format { @@ -51,8 +51,8 @@ pub struct Metadata { impl Metadata { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { - let schema = StructType::new(vec![crate::actions::schemas::METADATA_FIELD.clone()]); let mut visitor = MetadataVisitor::default(); + let schema = StructType::new(vec![Metadata::get_schema().clone()]); data.extract(Arc::new(schema), &mut visitor)?; Ok(visitor.metadata) } @@ -249,6 +249,6 @@ mod tests { ]), false, ); - assert_eq!(schema, expected); + assert_eq!(schema, &expected); } } diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index d723d493f..21dd9f853 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -8,7 +8,7 @@ use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; /// A trait that says you can ask for the [`Schema`] of the implementor pub(crate) trait GetSchema { - fn get_schema() -> StructField; + fn get_schema() -> &'static StructField; } /// A trait that allows getting a `StructField` based on the provided name and nullability From 2cd6027284ba17d0778278ed6f400e6c3d65d06a Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 5 Mar 2024 14:59:36 -0800 Subject: [PATCH 10/35] better static via returning SchemaRef --- derive-macros/src/lib.rs | 10 ++++++---- kernel/src/actions/mod.rs | 9 ++++----- kernel/src/actions/schemas.rs | 4 ++-- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index b00a331b5..9875f72e2 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -50,12 +50,14 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream let schema_fields = gen_schema_fields(&input.data); let output = quote! { impl crate::actions::schemas::GetSchema for #struct_ident { - fn get_schema() -> &'static crate::schema::StructField { + fn get_schema() -> crate::schema::SchemaRef { use crate::actions::schemas::GetField; - static SCHEMA_LOCK: std::sync::OnceLock = std::sync::OnceLock::new(); + static SCHEMA_LOCK: std::sync::OnceLock = std::sync::OnceLock::new(); SCHEMA_LOCK.get_or_init(|| { - Self::get_field(stringify!(#schema_name)) - }) + std::sync::Arc::new(crate::schema::StructType::new(vec![ + Self::get_field(stringify!(#schema_name)) + ])) + }).clone() // cheap clone, it's an Arc } } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index d09125026..a4568adff 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -52,8 +52,7 @@ pub struct Metadata { impl Metadata { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = MetadataVisitor::default(); - let schema = StructType::new(vec![Metadata::get_schema().clone()]); - data.extract(Arc::new(schema), &mut visitor)?; + data.extract(Metadata::get_schema(), &mut visitor)?; Ok(visitor.metadata) } @@ -216,7 +215,7 @@ mod tests { fn test_metadata_schema() { let schema = Metadata::get_schema(); - let expected = StructField::new( + let expected = Arc::new(StructType::new(vec![StructField::new( "metaData", StructType::new(vec![ StructField::new("id", DataType::STRING, false), @@ -248,7 +247,7 @@ mod tests { ), ]), false, - ); - assert_eq!(schema, &expected); + )])); + assert_eq!(schema, expected); } } diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index 21dd9f853..046f134b8 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -4,11 +4,11 @@ use std::collections::HashMap; use lazy_static::lazy_static; -use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; +use crate::schema::{ArrayType, DataType, MapType, SchemaRef, StructField, StructType}; /// A trait that says you can ask for the [`Schema`] of the implementor pub(crate) trait GetSchema { - fn get_schema() -> &'static StructField; + fn get_schema() -> SchemaRef; } /// A trait that allows getting a `StructField` based on the provided name and nullability From 746b6f3cf618291d5ed0c7408b7154fb7eb93ae8 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 5 Mar 2024 16:03:47 -0800 Subject: [PATCH 11/35] use get_schema in actions/mod.rs --- kernel/src/actions/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index a4568adff..c5e71b627 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -4,7 +4,7 @@ pub(crate) mod schemas; pub(crate) mod visitors; use derive_macros::Schema; -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; use crate::{schema::StructType, DeltaResult, EngineData}; @@ -80,8 +80,7 @@ pub struct Protocol { impl Protocol { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = ProtocolVisitor::default(); - let schema = StructType::new(vec![crate::actions::schemas::PROTOCOL_FIELD.clone()]); - data.extract(Arc::new(schema), &mut visitor)?; + data.extract(Protocol::get_schema(), &mut visitor)?; Ok(visitor.protocol) } } @@ -135,8 +134,7 @@ impl Add { /// Since we always want to parse multiple adds from data, we return a `Vec` pub fn parse_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = AddVisitor::default(); - let schema = StructType::new(vec![crate::actions::schemas::ADD_FIELD.clone()]); - data.extract(Arc::new(schema), &mut visitor)?; + data.extract(Add::get_schema(), &mut visitor)?; Ok(visitor.adds) } @@ -205,6 +203,8 @@ impl Remove { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::{ actions::schemas::GetSchema, From 150b024d0ab81e566bf0209e93ab739c704657a0 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 5 Mar 2024 17:23:18 -0800 Subject: [PATCH 12/35] use dervied schemas, and fix remove schema --- kernel/src/actions/mod.rs | 60 ++++++++++++++++++++++++++++++++-- kernel/src/actions/visitors.rs | 16 ++++----- kernel/src/scan/file_stream.rs | 19 ++++++----- kernel/src/snapshot.rs | 5 +-- 4 files changed, 79 insertions(+), 21 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index c5e71b627..0103aaa10 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -152,13 +152,13 @@ pub(crate) struct Remove { /// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt pub(crate) path: String, + /// The time this logical file was created, as milliseconds since the epoch. + pub(crate) deletion_timestamp: Option, + /// When `false` the logical file must already be present in the table or the records /// in the added file must be contained in one or more remove actions in the same version. pub(crate) data_change: bool, - /// The time this logical file was created, as milliseconds since the epoch. - pub(crate) deletion_timestamp: Option, - /// When true the fields `partition_values`, `size`, and `tags` are present pub(crate) extended_file_metadata: Option, @@ -250,4 +250,58 @@ mod tests { )])); assert_eq!(schema, expected); } + + fn tags_field() -> StructField { + StructField::new( + "tags", + MapType::new(DataType::STRING, DataType::STRING, true), + true, + ) + } + + fn partition_values_field() -> StructField { + StructField::new( + "partitionValues", + MapType::new(DataType::STRING, DataType::STRING, true), + true, + ) + } + + fn deletion_vector_field() -> StructField { + StructField::new( + "deletionVector", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("storageType", DataType::STRING, false), + StructField::new("pathOrInlineDv", DataType::STRING, false), + StructField::new("offset", DataType::INTEGER, true), + StructField::new("sizeInBytes", DataType::INTEGER, false), + StructField::new("cardinality", DataType::LONG, false), + ]))), + true, + ) + } + + #[test] + fn test_remove_schema() { + let schema = Remove::get_schema(); + let expected = Arc::new(StructType::new(vec![ + StructField::new( + "remove", + StructType::new(vec![ + StructField::new("path", DataType::STRING, false), + StructField::new("deletionTimestamp", DataType::LONG, true), + StructField::new("dataChange", DataType::BOOLEAN, false), + StructField::new("extendedFileMetadata", DataType::BOOLEAN, true), + partition_values_field(), + StructField::new("size", DataType::LONG, true), + tags_field(), + deletion_vector_field(), + StructField::new("baseRowId", DataType::LONG, true), + StructField::new("defaultRowCommitVersion", DataType::LONG, true), + ]), + false, + ) + ])); + assert_eq!(schema, expected); + } } diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 3fb0a1589..a12d7ee31 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -196,20 +196,20 @@ impl RemoveVisitor { let size: Option = getters[5].get_opt(row_index, "remove.size")?; - // TODO(nick) stats are skipped in getters[6] and tags are skipped in getters[7] + // TODO(nick) tags are skipped in getters[6] let deletion_vector = if let Some(storage_type) = - getters[8].get_opt(row_index, "remove.deletionVector.storageType")? + getters[7].get_opt(row_index, "remove.deletionVector.storageType")? { // there is a storageType, so the whole DV must be there let path_or_inline_dv: String = - getters[9].get(row_index, "remove.deletionVector.pathOrInlineDv")?; + getters[8].get(row_index, "remove.deletionVector.pathOrInlineDv")?; let offset: Option = - getters[10].get_opt(row_index, "remove.deletionVector.offset")?; + getters[9].get_opt(row_index, "remove.deletionVector.offset")?; let size_in_bytes: i32 = - getters[11].get(row_index, "remove.deletionVector.sizeInBytes")?; + getters[10].get(row_index, "remove.deletionVector.sizeInBytes")?; let cardinality: i64 = - getters[12].get(row_index, "remove.deletionVector.cardinality")?; + getters[11].get(row_index, "remove.deletionVector.cardinality")?; Some(DeletionVectorDescriptor { storage_type, path_or_inline_dv, @@ -221,9 +221,9 @@ impl RemoveVisitor { None }; - let base_row_id: Option = getters[13].get_opt(row_index, "remove.baseRowId")?; + let base_row_id: Option = getters[12].get_opt(row_index, "remove.baseRowId")?; let default_row_commit_version: Option = - getters[14].get_opt(row_index, "remove.defaultRowCommitVersion")?; + getters[13].get_opt(row_index, "remove.defaultRowCommitVersion")?; Ok(Remove { path, diff --git a/kernel/src/scan/file_stream.rs b/kernel/src/scan/file_stream.rs index 372140819..f7878febf 100644 --- a/kernel/src/scan/file_stream.rs +++ b/kernel/src/scan/file_stream.rs @@ -5,6 +5,7 @@ use either::Either; use tracing::debug; use super::data_skipping::DataSkippingFilter; +use crate::actions::schemas::{GetSchema, GetField}; use crate::actions::{visitors::AddVisitor, visitors::RemoveVisitor, Add, Remove}; use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::Expression; @@ -80,18 +81,20 @@ impl LogReplayScanner { None => actions, }; - let schema_to_use = StructType::new(if is_log_batch { - vec![ - crate::actions::schemas::ADD_FIELD.clone(), - crate::actions::schemas::REMOVE_FIELD.clone(), - ] + let schema_to_use = if is_log_batch { + Arc::new(StructType::new( + vec![ + Option::::get_field("add"), + Option::::get_field("remove"), + ] + )) } else { // All checkpoint actions are already reconciled and Remove actions in checkpoint files // only serve as tombstones for vacuum jobs. So no need to load them here. - vec![crate::actions::schemas::ADD_FIELD.clone()] - }); + Add::get_schema() + }; let mut visitor = AddRemoveVisitor::default(); - actions.extract(Arc::new(schema_to_use), &mut visitor)?; + actions.extract(schema_to_use, &mut visitor)?; for remove in visitor.removes.into_iter() { self.seen diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index f7df065a6..c91886d56 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -9,6 +9,7 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use url::Url; +use crate::actions::schemas::GetField; use crate::actions::{Metadata, Protocol}; use crate::path::LogPath; use crate::schema::{Schema, SchemaRef, StructType}; @@ -67,8 +68,8 @@ impl LogSegment { engine_interface: &dyn EngineInterface, ) -> DeltaResult> { let schema = StructType::new(vec![ - crate::actions::schemas::METADATA_FIELD.clone(), - crate::actions::schemas::PROTOCOL_FIELD.clone(), + Option::::get_field("metaData"), + Option::::get_field("protocol"), ]); let data_batches = self.replay(engine_interface, Arc::new(schema), None)?; let mut metadata_opt: Option = None; From c4e037e6d6b9bd5a32109f587f83e9f41caddb61 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 5 Mar 2024 17:34:26 -0800 Subject: [PATCH 13/35] tags is optional on Add, use get_schema everywhere --- kernel/src/actions/mod.rs | 48 ++++++++++++---------------------- kernel/src/actions/visitors.rs | 13 ++++----- kernel/src/scan/file_stream.rs | 12 ++++----- kernel/src/scan/mod.rs | 7 ++--- 4 files changed, 31 insertions(+), 49 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 0103aaa10..77684a569 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -113,7 +113,7 @@ pub struct Add { pub stats: Option, /// Map containing metadata about this logical file. - pub tags: HashMap>, + pub tags: Option>>, /// Information about deletion vector (DV) associated with this add action pub deletion_vector: Option, @@ -184,18 +184,6 @@ pub(crate) struct Remove { } impl Remove { - // _try_new_from_data for now, to avoid warning, probably will need at some point - // pub(crate) fn _try_new_from_data( - // data: &dyn EngineData, - // ) -> DeltaResult { - // let mut visitor = Visitor::new(visit_remove); - // let schema = StructType::new(vec![crate::actions::schemas::REMOVE_FIELD.clone()]); - // data.extract(Arc::new(schema), &mut visitor)?; - // visitor - // .extracted - // .unwrap_or_else(|| Err(Error::generic("Didn't get expected remove"))) - // } - pub(crate) fn dv_unique_id(&self) -> Option { self.deletion_vector.as_ref().map(|dv| dv.unique_id()) } @@ -284,24 +272,22 @@ mod tests { #[test] fn test_remove_schema() { let schema = Remove::get_schema(); - let expected = Arc::new(StructType::new(vec![ - StructField::new( - "remove", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new("deletionTimestamp", DataType::LONG, true), - StructField::new("dataChange", DataType::BOOLEAN, false), - StructField::new("extendedFileMetadata", DataType::BOOLEAN, true), - partition_values_field(), - StructField::new("size", DataType::LONG, true), - tags_field(), - deletion_vector_field(), - StructField::new("baseRowId", DataType::LONG, true), - StructField::new("defaultRowCommitVersion", DataType::LONG, true), - ]), - false, - ) - ])); + let expected = Arc::new(StructType::new(vec![StructField::new( + "remove", + StructType::new(vec![ + StructField::new("path", DataType::STRING, false), + StructField::new("deletionTimestamp", DataType::LONG, true), + StructField::new("dataChange", DataType::BOOLEAN, false), + StructField::new("extendedFileMetadata", DataType::BOOLEAN, true), + partition_values_field(), + StructField::new("size", DataType::LONG, true), + tags_field(), + deletion_vector_field(), + StructField::new("baseRowId", DataType::LONG, true), + StructField::new("defaultRowCommitVersion", DataType::LONG, true), + ]), + false, + )])); assert_eq!(schema, expected); } } diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index a12d7ee31..2ef01256b 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -154,7 +154,7 @@ impl AddVisitor { modification_time, data_change, stats, - tags: HashMap::new(), + tags: None, deletion_vector, base_row_id, default_row_commit_version, @@ -262,8 +262,7 @@ mod tests { use super::*; use crate::{ - actions::schemas::log_schema, - schema::StructType, + actions::schemas::{log_schema, GetSchema}, simple_client::{data::SimpleData, json::SimpleJsonHandler, SimpleClient}, EngineData, EngineInterface, JsonHandler, }; @@ -356,11 +355,9 @@ mod tests { let batch = json_handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); - let add_schema = StructType::new(vec![crate::actions::schemas::ADD_FIELD.clone()]); + let add_schema = Add::get_schema(); let mut add_visitor = AddVisitor::default(); - batch - .extract(Arc::new(add_schema), &mut add_visitor) - .unwrap(); + batch.extract(add_schema, &mut add_visitor).unwrap(); let add1 = Add { path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet".into(), partition_values: HashMap::from([ @@ -371,7 +368,7 @@ mod tests { modification_time: 1670892998135, data_change: true, stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}".into()), - tags: HashMap::new(), + tags: None, deletion_vector: None, base_row_id: None, default_row_commit_version: None, diff --git a/kernel/src/scan/file_stream.rs b/kernel/src/scan/file_stream.rs index f7878febf..5dbf8254d 100644 --- a/kernel/src/scan/file_stream.rs +++ b/kernel/src/scan/file_stream.rs @@ -5,7 +5,7 @@ use either::Either; use tracing::debug; use super::data_skipping::DataSkippingFilter; -use crate::actions::schemas::{GetSchema, GetField}; +use crate::actions::schemas::{GetField, GetSchema}; use crate::actions::{visitors::AddVisitor, visitors::RemoveVisitor, Add, Remove}; use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::Expression; @@ -82,12 +82,10 @@ impl LogReplayScanner { }; let schema_to_use = if is_log_batch { - Arc::new(StructType::new( - vec![ - Option::::get_field("add"), - Option::::get_field("remove"), - ] - )) + Arc::new(StructType::new(vec![ + Option::::get_field("add"), + Option::::get_field("remove"), + ])) } else { // All checkpoint actions are already reconciled and Remove actions in checkpoint files // only serve as tombstones for vacuum jobs. So no need to load them here. diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 6dc94d41d..72d9414f8 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -3,7 +3,8 @@ use std::sync::Arc; use itertools::Itertools; use self::file_stream::log_replay_iter; -use crate::actions::Add; +use crate::actions::schemas::GetField; +use crate::actions::{Add, Remove}; use crate::expressions::{Expression, Scalar}; use crate::schema::{DataType, SchemaRef, StructType}; use crate::snapshot::Snapshot; @@ -129,8 +130,8 @@ impl Scan { engine_interface: &dyn EngineInterface, ) -> DeltaResult>> { let action_schema = Arc::new(StructType::new(vec![ - crate::actions::schemas::ADD_FIELD.clone(), - crate::actions::schemas::REMOVE_FIELD.clone(), + Option::::get_field("add"), + Option::::get_field("remove"), ])); let log_iter = self.snapshot.log_segment.replay( From c7785c2416ebca534591975258a7bcea006ea64e Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 5 Mar 2024 17:48:05 -0800 Subject: [PATCH 14/35] remove leftover code in schemas.rs --- kernel/src/actions/mod.rs | 24 +++ kernel/src/actions/schemas.rs | 253 +------------------------------ kernel/src/actions/visitors.rs | 7 +- kernel/src/client/json.rs | 6 +- kernel/src/simple_client/data.rs | 4 +- 5 files changed, 34 insertions(+), 260 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 77684a569..5d10149d6 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -189,6 +189,30 @@ impl Remove { } } +use crate::actions::schemas::GetField; +use lazy_static::lazy_static; + +lazy_static! { + static ref LOG_SCHEMA: StructType = StructType::new( + vec![ + Option::::get_field("add"), + Option::::get_field("remove"), + Option::::get_field("metaData"), + Option::::get_field("protocol"), + // We don't support the following actions yet + //Option::get_field("cdc"), + //Option::get_field("commitInfo"), + //Option::get_field("domainMetadata"), + //Option::get_field("txn"), + ] + ); +} + +#[cfg(test)] +pub(crate) fn get_log_schema() -> &'static StructType { + &LOG_SCHEMA +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index 046f134b8..c104e52a1 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -2,9 +2,7 @@ use std::collections::HashMap; -use lazy_static::lazy_static; - -use crate::schema::{ArrayType, DataType, MapType, SchemaRef, StructField, StructType}; +use crate::schema::{ArrayType, DataType, MapType, SchemaRef, StructField}; /// A trait that says you can ask for the [`Schema`] of the implementor pub(crate) trait GetSchema { @@ -49,252 +47,3 @@ impl GetField for Option { inner } } - -lazy_static! { - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata - pub(crate) static ref METADATA_FIELD: StructField = StructField::new( - "metaData", - StructType::new(vec![ - StructField::new("id", DataType::STRING, false), - StructField::new("name", DataType::STRING, true), - StructField::new("description", DataType::STRING, true), - StructField::new( - "format", - StructType::new(vec![ - StructField::new("provider", DataType::STRING, false), - StructField::new( - "options", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - true, - ), - ]), - false, - ), - StructField::new("schemaString", DataType::STRING, false), - StructField::new( - "partitionColumns", - ArrayType::new(DataType::STRING, false), - false, - ), - StructField::new("createdTime", DataType::LONG, true), - StructField::new( - "configuration", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - false, - ), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#protocol-evolution - pub(crate) static ref PROTOCOL_FIELD: StructField = StructField::new( - "protocol", - StructType::new(vec![ - StructField::new("minReaderVersion", DataType::INTEGER, false), - StructField::new("minWriterVersion", DataType::INTEGER, false), - StructField::new( - "readerFeatures", - ArrayType::new(DataType::STRING, false), - true, - ), - StructField::new( - "writerFeatures", - ArrayType::new(DataType::STRING, false), - true, - ), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#commit-provenance-information - static ref COMMIT_INFO_FIELD: StructField = StructField::new( - "commitInfo", - StructType::new(vec![ - StructField::new("timestamp", DataType::LONG, false), - StructField::new("operation", DataType::STRING, false), - StructField::new("isolationLevel", DataType::STRING, true), - StructField::new("isBlindAppend", DataType::BOOLEAN, true), - StructField::new("txnId", DataType::STRING, true), - StructField::new("readVersion", DataType::LONG, true), - StructField::new( - "operationParameters", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - true, - ), - StructField::new( - "operationMetrics", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - true, - ), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file - pub(crate) static ref ADD_FIELD: StructField = StructField::new( - "add", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - partition_values_field(), - StructField::new("size", DataType::LONG, false), - StructField::new("modificationTime", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), - StructField::new("stats", DataType::STRING, true), - tags_field(), - deletion_vector_field(), - StructField::new("baseRowId", DataType::LONG, true), - StructField::new("defaultRowCommitVersion", DataType::LONG, true), - StructField::new("clusteringProvider", DataType::STRING, true), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file - pub(crate) static ref REMOVE_FIELD: StructField = StructField::new( - "remove", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new("deletionTimestamp", DataType::LONG, true), - StructField::new("dataChange", DataType::BOOLEAN, false), - StructField::new("extendedFileMetadata", DataType::BOOLEAN, true), - partition_values_field(), - StructField::new("size", DataType::LONG, true), - StructField::new("stats", DataType::STRING, true), - tags_field(), - deletion_vector_field(), - StructField::new("baseRowId", DataType::LONG, true), - StructField::new("defaultRowCommitVersion", DataType::LONG, true), - ]), - true, - ); - static ref REMOVE_FIELD_CHECKPOINT: StructField = StructField::new( - "remove", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new("deletionTimestamp", DataType::LONG, true), - StructField::new("dataChange", DataType::BOOLEAN, false), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file - static ref CDC_FIELD: StructField = StructField::new( - "cdc", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - partition_values_field(), - StructField::new("size", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), - tags_field(), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers - static ref TXN_FIELD: StructField = StructField::new( - "txn", - StructType::new(vec![ - StructField::new("appId", DataType::STRING, false), - StructField::new("version", DataType::LONG, false), - StructField::new("lastUpdated", DataType::LONG, true), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#domain-metadata - static ref DOMAIN_METADATA_FIELD: StructField = StructField::new( - "domainMetadata", - StructType::new(vec![ - StructField::new("domain", DataType::STRING, false), - StructField::new( - "configuration", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - false, - ), - StructField::new("removed", DataType::BOOLEAN, false), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoint-metadata - static ref CHECKPOINT_METADATA_FIELD: StructField = StructField::new( - "checkpointMetadata", - StructType::new(vec![ - StructField::new("flavor", DataType::STRING, false), - tags_field(), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information - static ref SIDECAR_FIELD: StructField = StructField::new( - "sidecar", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new("sizeInBytes", DataType::LONG, false), - StructField::new("modificationTime", DataType::LONG, false), - StructField::new("type", DataType::STRING, false), - tags_field(), - ]), - true, - ); - - static ref LOG_SCHEMA: StructType = StructType::new( - vec![ - ADD_FIELD.clone(), - CDC_FIELD.clone(), - COMMIT_INFO_FIELD.clone(), - DOMAIN_METADATA_FIELD.clone(), - METADATA_FIELD.clone(), - PROTOCOL_FIELD.clone(), - REMOVE_FIELD.clone(), - TXN_FIELD.clone(), - ] - ); -} - -fn tags_field() -> StructField { - StructField::new( - "tags", - MapType::new(DataType::STRING, DataType::STRING, true), - true, - ) -} - -fn partition_values_field() -> StructField { - StructField::new( - "partitionValues", - MapType::new(DataType::STRING, DataType::STRING, true), - false, - ) -} - -fn deletion_vector_field() -> StructField { - StructField::new( - "deletionVector", - DataType::Struct(Box::new(StructType::new(vec![ - StructField::new("storageType", DataType::STRING, false), - StructField::new("pathOrInlineDv", DataType::STRING, false), - StructField::new("offset", DataType::INTEGER, true), - StructField::new("sizeInBytes", DataType::INTEGER, false), - StructField::new("cardinality", DataType::LONG, false), - ]))), - true, - ) -} - -#[cfg(test)] -pub(crate) fn log_schema() -> &'static StructType { - &LOG_SCHEMA -} diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 2ef01256b..1625b2e66 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -262,7 +262,8 @@ mod tests { use super::*; use crate::{ - actions::schemas::{log_schema, GetSchema}, + actions::get_log_schema, + actions::schemas::GetSchema, simple_client::{data::SimpleData, json::SimpleJsonHandler, SimpleClient}, EngineData, EngineInterface, JsonHandler, }; @@ -285,7 +286,7 @@ mod tests { r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, ] .into(); - let output_schema = Arc::new(log_schema().clone()); + let output_schema = Arc::new(get_log_schema().clone()); let parsed = handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); @@ -351,7 +352,7 @@ mod tests { r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#, ] .into(); - let output_schema = Arc::new(log_schema().clone()); + let output_schema = Arc::new(get_log_schema().clone()); let batch = json_handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); diff --git a/kernel/src/client/json.rs b/kernel/src/client/json.rs index 2a95a13df..87cb21f2f 100644 --- a/kernel/src/client/json.rs +++ b/kernel/src/client/json.rs @@ -234,7 +234,7 @@ mod tests { use object_store::{local::LocalFileSystem, ObjectStore}; use super::*; - use crate::{actions::schemas::log_schema, executor::tokio::TokioBackgroundExecutor}; + use crate::{actions::get_log_schema, executor::tokio::TokioBackgroundExecutor}; fn string_array_to_engine_data(string_array: StringArray) -> Box { let string_field = Arc::new(Field::new("a", DataType::Utf8, true)); @@ -255,7 +255,7 @@ mod tests { r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#, r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, ]); - let output_schema = Arc::new(log_schema().clone()); + let output_schema = Arc::new(get_log_schema().clone()); let batch = handler .parse_json(string_array_to_engine_data(json_strings), output_schema) @@ -282,7 +282,7 @@ mod tests { }]; let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); - let physical_schema = Arc::new(ArrowSchema::try_from(log_schema()).unwrap()); + let physical_schema = Arc::new(ArrowSchema::try_from(get_log_schema()).unwrap()); let data: Vec = handler .read_json_files(files, Arc::new(physical_schema.try_into().unwrap()), None) .unwrap() diff --git a/kernel/src/simple_client/data.rs b/kernel/src/simple_client/data.rs index 941c30a48..72da526aa 100644 --- a/kernel/src/simple_client/data.rs +++ b/kernel/src/simple_client/data.rs @@ -317,7 +317,7 @@ mod tests { use crate::actions::Metadata; use crate::DeltaResult; use crate::{ - actions::schemas::log_schema, + actions::get_log_schema, simple_client::{data::SimpleData, SimpleClient}, EngineData, EngineInterface, }; @@ -338,7 +338,7 @@ mod tests { r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, ] .into(); - let output_schema = Arc::new(log_schema().clone()); + let output_schema = Arc::new(get_log_schema().clone()); let parsed = handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); From b51b7b1b65247b5e6ddf177203f55ac136570528 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 12 Mar 2024 12:00:41 -0700 Subject: [PATCH 15/35] switch to using "project" to get schemas --- derive-macros/Cargo.toml | 2 +- derive-macros/src/lib.rs | 55 +------------------- kernel/src/actions/mod.rs | 87 ++++++++++++++++++-------------- kernel/src/actions/schemas.rs | 7 +-- kernel/src/actions/visitors.rs | 7 +-- kernel/src/scan/file_stream.rs | 12 ++--- kernel/src/scan/mod.rs | 8 +-- kernel/src/schema.rs | 35 +++++++++++++ kernel/src/simple_client/data.rs | 32 +++++++++--- 9 files changed, 125 insertions(+), 120 deletions(-) diff --git a/derive-macros/Cargo.toml b/derive-macros/Cargo.toml index 3d82ddb7d..e103333b6 100644 --- a/derive-macros/Cargo.toml +++ b/derive-macros/Cargo.toml @@ -14,7 +14,7 @@ proc-macro = true [dependencies] proc-macro2 = "1" -syn = "2.0" +syn = { version = "2.0", features = ["extra-traits"] } quote = "1.0" diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 9875f72e2..f9665def5 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -1,66 +1,15 @@ -use proc_macro2::{Ident, Spacing, TokenStream, TokenTree}; +use proc_macro2::{Ident, TokenStream}; use quote::{quote, quote_spanned}; use syn::spanned::Spanned; -use syn::{ - parse_macro_input, Attribute, Data, DataStruct, DeriveInput, Fields, Meta, PathArguments, Type, -}; - -static SCHEMA_ERR_STR: &str = "schema(...) only supports schema(name = name)"; - -// Return the ident to use as the schema name if it's been specified in the attributes of the struct -fn get_schema_name_from_attr<'a>(attrs: impl Iterator) -> Option { - for attr in attrs { - if let Meta::List(list) = &attr.meta { - if let Some(attr_name) = list.path.segments.iter().last() { - if attr_name.ident == "schema" { - // We have some schema(...) attribute, see if we've specified a different name - let tokens: Vec = list.tokens.clone().into_iter().collect(); - match tokens[..] { - // we only support `name = name` style - [TokenTree::Ident(ref name_ident), TokenTree::Punct(ref punct), TokenTree::Ident(ref schema_ident)] => - { - assert!(name_ident == "name", "{}", SCHEMA_ERR_STR); - assert!(punct.as_char() == '=', "{}", SCHEMA_ERR_STR); - assert!(punct.spacing() == Spacing::Alone, "{}", SCHEMA_ERR_STR); - return Some(schema_ident.clone()); - } - _ => panic!("{}", SCHEMA_ERR_STR), - } - } else { - panic!("Schema only accepts `schema` as an extra attribute") - } - } - } - } - None -} +use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields, PathArguments, Type}; #[proc_macro_derive(Schema, attributes(schema))] pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { let input = parse_macro_input!(input as DeriveInput); let struct_ident = input.ident; - let schema_name = get_schema_name_from_attr(input.attrs.iter()).unwrap_or_else(|| { - // default to the struct name, but lowercased - Ident::new( - &struct_ident.to_string().to_lowercase(), - struct_ident.span(), - ) - }); let schema_fields = gen_schema_fields(&input.data); let output = quote! { - impl crate::actions::schemas::GetSchema for #struct_ident { - fn get_schema() -> crate::schema::SchemaRef { - use crate::actions::schemas::GetField; - static SCHEMA_LOCK: std::sync::OnceLock = std::sync::OnceLock::new(); - SCHEMA_LOCK.get_or_init(|| { - std::sync::Arc::new(crate::schema::StructType::new(vec![ - Self::get_field(stringify!(#schema_name)) - ])) - }).clone() // cheap clone, it's an Arc - } - } - impl crate::actions::schemas::GetField for #struct_ident { fn get_field(name: impl Into) -> crate::schema::StructField { use crate::actions::schemas::GetField; diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 5d10149d6..a9af7a875 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -4,12 +4,39 @@ pub(crate) mod schemas; pub(crate) mod visitors; use derive_macros::Schema; -use std::collections::HashMap; +use lazy_static::lazy_static; use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; +use self::deletion_vector::DeletionVectorDescriptor; +use crate::actions::schemas::GetField; use crate::{schema::StructType, DeltaResult, EngineData}; -use self::{deletion_vector::DeletionVectorDescriptor, schemas::GetSchema}; +use std::collections::HashMap; + +lazy_static! { + static ref LOG_SCHEMA: StructType = StructType::new( + vec![ + Option::::get_field("add"), + Option::::get_field("remove"), + Option::::get_field("metaData"), + Option::::get_field("protocol"), + // We don't support the following actions yet + //Option::get_field("cdc"), + //Option::get_field("commitInfo"), + //Option::get_field("domainMetadata"), + //Option::get_field("txn"), + ] + ); +} + +pub(crate) static ADD_NAME: &str = "add"; +pub(crate) static REMOVE_NAME: &str = "remove"; +pub(crate) static METADATA_NAME: &str = "metaData"; +pub(crate) static PROTOCOL_NAME: &str = "protocol"; + +pub(crate) fn get_log_schema() -> &'static StructType { + &LOG_SCHEMA +} #[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Format { @@ -52,7 +79,10 @@ pub struct Metadata { impl Metadata { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = MetadataVisitor::default(); - data.extract(Metadata::get_schema(), &mut visitor)?; + data.extract( + get_log_schema().project_as_schema(&[METADATA_NAME])?, + &mut visitor, + )?; Ok(visitor.metadata) } @@ -80,7 +110,10 @@ pub struct Protocol { impl Protocol { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = ProtocolVisitor::default(); - data.extract(Protocol::get_schema(), &mut visitor)?; + data.extract( + get_log_schema().project_as_schema(&[PROTOCOL_NAME])?, + &mut visitor, + )?; Ok(visitor.protocol) } } @@ -134,7 +167,10 @@ impl Add { /// Since we always want to parse multiple adds from data, we return a `Vec` pub fn parse_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = AddVisitor::default(); - data.extract(Add::get_schema(), &mut visitor)?; + data.extract( + get_log_schema().project_as_schema(&[ADD_NAME])?, + &mut visitor, + )?; Ok(visitor.adds) } @@ -189,43 +225,18 @@ impl Remove { } } -use crate::actions::schemas::GetField; -use lazy_static::lazy_static; - -lazy_static! { - static ref LOG_SCHEMA: StructType = StructType::new( - vec![ - Option::::get_field("add"), - Option::::get_field("remove"), - Option::::get_field("metaData"), - Option::::get_field("protocol"), - // We don't support the following actions yet - //Option::get_field("cdc"), - //Option::get_field("commitInfo"), - //Option::get_field("domainMetadata"), - //Option::get_field("txn"), - ] - ); -} - -#[cfg(test)] -pub(crate) fn get_log_schema() -> &'static StructType { - &LOG_SCHEMA -} - #[cfg(test)] mod tests { use std::sync::Arc; use super::*; - use crate::{ - actions::schemas::GetSchema, - schema::{ArrayType, DataType, MapType, StructField}, - }; + use crate::schema::{ArrayType, DataType, MapType, StructField}; #[test] fn test_metadata_schema() { - let schema = Metadata::get_schema(); + let schema = get_log_schema() + .project_as_schema(&["metaData"]) + .expect("Couldn't get metaData field"); let expected = Arc::new(StructType::new(vec![StructField::new( "metaData", @@ -258,7 +269,7 @@ mod tests { false, ), ]), - false, + true, )])); assert_eq!(schema, expected); } @@ -295,7 +306,9 @@ mod tests { #[test] fn test_remove_schema() { - let schema = Remove::get_schema(); + let schema = get_log_schema() + .project_as_schema(&["remove"]) + .expect("Couldn't get remove field"); let expected = Arc::new(StructType::new(vec![StructField::new( "remove", StructType::new(vec![ @@ -310,7 +323,7 @@ mod tests { StructField::new("baseRowId", DataType::LONG, true), StructField::new("defaultRowCommitVersion", DataType::LONG, true), ]), - false, + true, )])); assert_eq!(schema, expected); } diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index c104e52a1..894bbf8fe 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -2,12 +2,7 @@ use std::collections::HashMap; -use crate::schema::{ArrayType, DataType, MapType, SchemaRef, StructField}; - -/// A trait that says you can ask for the [`Schema`] of the implementor -pub(crate) trait GetSchema { - fn get_schema() -> SchemaRef; -} +use crate::schema::{ArrayType, DataType, MapType, StructField}; /// A trait that allows getting a `StructField` based on the provided name and nullability pub(crate) trait GetField { diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 1625b2e66..1656e9163 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -262,8 +262,7 @@ mod tests { use super::*; use crate::{ - actions::get_log_schema, - actions::schemas::GetSchema, + actions::{get_log_schema, ADD_NAME}, simple_client::{data::SimpleData, json::SimpleJsonHandler, SimpleClient}, EngineData, EngineInterface, JsonHandler, }; @@ -356,7 +355,9 @@ mod tests { let batch = json_handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); - let add_schema = Add::get_schema(); + let add_schema = get_log_schema() + .project_as_schema(&[ADD_NAME]) + .expect("Can't get add schema"); let mut add_visitor = AddVisitor::default(); batch.extract(add_schema, &mut add_visitor).unwrap(); let add1 = Add { diff --git a/kernel/src/scan/file_stream.rs b/kernel/src/scan/file_stream.rs index 5dbf8254d..a2cf11245 100644 --- a/kernel/src/scan/file_stream.rs +++ b/kernel/src/scan/file_stream.rs @@ -1,15 +1,14 @@ use std::collections::HashSet; -use std::sync::Arc; use either::Either; use tracing::debug; use super::data_skipping::DataSkippingFilter; -use crate::actions::schemas::{GetField, GetSchema}; +use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME}; use crate::actions::{visitors::AddVisitor, visitors::RemoveVisitor, Add, Remove}; use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::Expression; -use crate::schema::{SchemaRef, StructType}; +use crate::schema::SchemaRef; use crate::{DataVisitor, DeltaResult, EngineData, EngineInterface}; struct LogReplayScanner { @@ -82,14 +81,11 @@ impl LogReplayScanner { }; let schema_to_use = if is_log_batch { - Arc::new(StructType::new(vec![ - Option::::get_field("add"), - Option::::get_field("remove"), - ])) + get_log_schema().project_as_schema(&[ADD_NAME, REMOVE_NAME])? } else { // All checkpoint actions are already reconciled and Remove actions in checkpoint files // only serve as tombstones for vacuum jobs. So no need to load them here. - Add::get_schema() + get_log_schema().project_as_schema(&[ADD_NAME])? }; let mut visitor = AddRemoveVisitor::default(); actions.extract(schema_to_use, &mut visitor)?; diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 72d9414f8..1806819c3 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -3,8 +3,7 @@ use std::sync::Arc; use itertools::Itertools; use self::file_stream::log_replay_iter; -use crate::actions::schemas::GetField; -use crate::actions::{Add, Remove}; +use crate::actions::{get_log_schema, Add, ADD_NAME, REMOVE_NAME}; use crate::expressions::{Expression, Scalar}; use crate::schema::{DataType, SchemaRef, StructType}; use crate::snapshot::Snapshot; @@ -129,10 +128,7 @@ impl Scan { &self, engine_interface: &dyn EngineInterface, ) -> DeltaResult>> { - let action_schema = Arc::new(StructType::new(vec![ - Option::::get_field("add"), - Option::::get_field("remove"), - ])); + let action_schema = get_log_schema().project_as_schema(&[ADD_NAME, REMOVE_NAME])?; let log_iter = self.snapshot.log_segment.replay( engine_interface, diff --git a/kernel/src/schema.rs b/kernel/src/schema.rs index 7d3050af2..94b29834b 100644 --- a/kernel/src/schema.rs +++ b/kernel/src/schema.rs @@ -3,8 +3,11 @@ use std::sync::Arc; use std::{collections::HashMap, fmt::Display}; use indexmap::IndexMap; +use itertools::Itertools; use serde::{Deserialize, Serialize}; +use crate::{DeltaResult, Error}; + pub type Schema = StructType; pub type SchemaRef = Arc; @@ -140,6 +143,38 @@ impl StructType { } } + /// Get a [`StructType`] containing [`StructField`]s of the given names, preserving the original + /// order of fields. Returns an Err if a specified field doesn't exist + pub fn project(&self, names: &[impl AsRef]) -> DeltaResult { + let mut indexes: Vec = names + .iter() + .map(|name| { + self.fields + .get_index_of(name.as_ref()) + .ok_or_else(|| Error::missing_column(name.as_ref())) + }) + .try_collect()?; + indexes.sort(); // keep schema order + let fields: Vec = indexes + .iter() + .map(|index| { + self.fields + .get_index(*index) + .expect("get_index_of returned non-existant index") + .1 + .clone() + }) + .collect(); + Ok(Self::new(fields)) + } + + /// Get a [`SchemaRef`] containing [`StructField`]s of the given names, preserving the original + /// order of fields. Returns an Err if a specified field doesn't exist + pub fn project_as_schema(&self, names: &[impl AsRef]) -> DeltaResult { + let struct_type = self.project(names)?; + Ok(Arc::new(struct_type)) + } + pub fn field(&self, name: impl AsRef) -> Option<&StructField> { self.fields.get(name.as_ref()) } diff --git a/kernel/src/simple_client/data.rs b/kernel/src/simple_client/data.rs index 72da526aa..5b0999708 100644 --- a/kernel/src/simple_client/data.rs +++ b/kernel/src/simple_client/data.rs @@ -214,13 +214,16 @@ impl SimpleData { .filter(|a| *a.data_type() != ArrowDataType::Null); // Note: if col is None we have either: // a) encountered a column that is all nulls or, - // b) recursed into a struct that was all null. - // So below if the field is allowed to be null, we push that, otherwise we error out. + // b) recursed into a optional struct that was null. In this case, array.is_none() is + // true and we don't need to check field nullability, because we assume all fields + // of a nullable struct can be null + // So below if the field is allowed to be null, OR array.is_none() we push that, + // otherwise we error out. if let Some(col) = col { Self::extract_column(out_col_array, field, col)?; - } else if field.is_nullable() { - if let DataType::Struct(_) = field.data_type() { - Self::extract_columns_from_array(out_col_array, schema, None)?; + } else if array.is_none() || field.is_nullable() { + if let DataType::Struct(inner_struct) = field.data_type() { + Self::extract_columns_from_array(out_col_array, inner_struct.as_ref(), None)?; } else { debug!("Pushing a null field for {}", field.name); out_col_array.push(&()); @@ -314,7 +317,7 @@ mod tests { use arrow_array::{RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; - use crate::actions::Metadata; + use crate::actions::{Metadata, Protocol}; use crate::DeltaResult; use crate::{ actions::get_log_schema, @@ -348,4 +351,21 @@ mod tests { assert_eq!(metadata.partition_columns, vec!("c1", "c2")); Ok(()) } + + #[test] + fn test_nullable_struct() -> DeltaResult<()> { + let client = SimpleClient::new(); + let handler = client.get_json_handler(); + let json_strings: StringArray = vec![ + r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, + ] + .into(); + let output_schema = get_log_schema().project_as_schema(&["metaData"])?; + let parsed = handler + .parse_json(string_array_to_engine_data(json_strings), output_schema) + .unwrap(); + let protocol = Protocol::try_new_from_data(parsed.as_ref())?; + assert!(protocol.is_none()); + Ok(()) + } } From 9bca3f562d6ea2483ca00768d9aca16345a7791c Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 12 Mar 2024 12:05:40 -0700 Subject: [PATCH 16/35] remove unneeded annotations --- kernel/src/actions/deletion_vector.rs | 1 - kernel/src/actions/mod.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/kernel/src/actions/deletion_vector.rs b/kernel/src/actions/deletion_vector.rs index b8bdb9b24..a8d80376a 100644 --- a/kernel/src/actions/deletion_vector.rs +++ b/kernel/src/actions/deletion_vector.rs @@ -11,7 +11,6 @@ use url::Url; use crate::{DeltaResult, Error, FileSystemClient}; #[derive(Debug, Clone, PartialEq, Eq, Schema)] -#[schema(name = deletionVector)] pub struct DeletionVectorDescriptor { /// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p']. pub storage_type: String, diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index a9af7a875..3eb325909 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -56,7 +56,6 @@ impl Default for Format { } #[derive(Debug, Default, Clone, PartialEq, Eq, Schema)] -#[schema(name = metaData)] pub struct Metadata { /// Unique identifier for this table pub id: String, From 0bf4c3a36e875afd33c3679141bd527909c1faf3 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 12 Mar 2024 12:09:58 -0700 Subject: [PATCH 17/35] move snapshot to new way --- kernel/src/snapshot.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index c91886d56..84c7ae77d 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -9,10 +9,9 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use url::Url; -use crate::actions::schemas::GetField; -use crate::actions::{Metadata, Protocol}; +use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}; use crate::path::LogPath; -use crate::schema::{Schema, SchemaRef, StructType}; +use crate::schema::{Schema, SchemaRef}; use crate::{DeltaResult, EngineInterface, Error, FileMeta, FileSystemClient, Version}; use crate::{EngineData, Expression}; @@ -67,11 +66,8 @@ impl LogSegment { &self, engine_interface: &dyn EngineInterface, ) -> DeltaResult> { - let schema = StructType::new(vec![ - Option::::get_field("metaData"), - Option::::get_field("protocol"), - ]); - let data_batches = self.replay(engine_interface, Arc::new(schema), None)?; + let schema = get_log_schema().project_as_schema(&[METADATA_NAME, PROTOCOL_NAME])?; + let data_batches = self.replay(engine_interface, schema, None)?; let mut metadata_opt: Option = None; let mut protocol_opt: Option = None; for batch in data_batches { From b273e3b8743d5a745439b250c93468fd85346b23 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 12 Mar 2024 12:12:45 -0700 Subject: [PATCH 18/35] use static names --- kernel/src/actions/mod.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 3eb325909..a72f811e0 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -13,27 +13,27 @@ use crate::{schema::StructType, DeltaResult, EngineData}; use std::collections::HashMap; +pub(crate) static ADD_NAME: &str = "add"; +pub(crate) static REMOVE_NAME: &str = "remove"; +pub(crate) static METADATA_NAME: &str = "metaData"; +pub(crate) static PROTOCOL_NAME: &str = "protocol"; + lazy_static! { static ref LOG_SCHEMA: StructType = StructType::new( vec![ - Option::::get_field("add"), - Option::::get_field("remove"), - Option::::get_field("metaData"), - Option::::get_field("protocol"), + Option::::get_field(ADD_NAME), + Option::::get_field(REMOVE_NAME), + Option::::get_field(METADATA_NAME), + Option::::get_field(PROTOCOL_NAME), // We don't support the following actions yet - //Option::get_field("cdc"), - //Option::get_field("commitInfo"), - //Option::get_field("domainMetadata"), - //Option::get_field("txn"), + //Option::get_field(CDC_NAME), + //Option::get_field(COMMIT_INFO_NAME), + //Option::get_field(DOMAIN_METADATA_NAME), + //Option::get_field(TRANSACTION_NAME), ] ); } -pub(crate) static ADD_NAME: &str = "add"; -pub(crate) static REMOVE_NAME: &str = "remove"; -pub(crate) static METADATA_NAME: &str = "metaData"; -pub(crate) static PROTOCOL_NAME: &str = "protocol"; - pub(crate) fn get_log_schema() -> &'static StructType { &LOG_SCHEMA } From 505983e2cd726ab0c8c1bdf9180c7c1f3364ae86 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 12 Mar 2024 12:13:47 -0700 Subject: [PATCH 19/35] fix comment --- derive-macros/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index f9665def5..dcb4a6b81 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -18,8 +18,7 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream crate::schema::StructType::new(vec![ #schema_fields ]), - // TODO: Ensure correct. By default not nullable, only can be made nullable by - // being wrapped in an Option + // By default not nullable. To make something nullable wrap it in an Option false, ) } From eeee0f3bf3f1808668e67c220d7c8b50f44d9435 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 28 Mar 2024 16:16:52 -0700 Subject: [PATCH 20/35] add docs, remove attribute --- derive-macros/src/lib.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index dcb4a6b81..40e5c739a 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -3,7 +3,13 @@ use quote::{quote, quote_spanned}; use syn::spanned::Spanned; use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields, PathArguments, Type}; -#[proc_macro_derive(Schema, attributes(schema))] +/// Derive a `deltakernel::schemas::GetField` implementation for the annotated struct. The actual +/// field names in the schema (and therefore of the struct members) are all mandated by Delta spec, +/// and so the user of this macro is responsible for ensuring that e.g. `Metadata::schema_string is +/// the snake_case-ified version of `schemaString` from Delta's Change Metadata action (this macro +/// allows the use of standard rust snake_case, and will convert to the correct delta schema +/// camelCase version). +#[proc_macro_derive(Schema)] pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { let input = parse_macro_input!(input as DeriveInput); let struct_ident = input.ident; From 24674690fbc5d4858ede243322dac42655c90a2b Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 28 Mar 2024 16:43:20 -0700 Subject: [PATCH 21/35] use full type path --- derive-macros/src/lib.rs | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 40e5c739a..42186c08a 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -69,20 +69,15 @@ fn gen_schema_fields(data: &Data) -> TokenStream { let name = get_schema_name(name); match field.ty { Type::Path(ref type_path) => { - if let Some(fin) = type_path.path.segments.iter().last() { - let type_ident = &fin.ident; - if let PathArguments::AngleBracketed(angle_args) = &fin.arguments { - quote_spanned! {field.span()=> - #type_ident::#angle_args::get_field(stringify!(#name)) - } - } else { - quote_spanned! {field.span()=> - #type_ident::get_field(stringify!(#name)) - } + let type_path_quoted: Vec = type_path.path.segments.iter().map(|segment| { + let segment_ident = &segment.ident; + match &segment.arguments { + PathArguments::None => quote! { #segment_ident }, + PathArguments::AngleBracketed(angle_args) => quote! { #segment_ident::#angle_args }, + _ => panic!("Can only handle <> type path args"), } - } else { - panic!("Couldn't get type"); - } + }).collect(); + quote_spanned! { field.span() => #(#type_path_quoted),* ::get_field(stringify!(#name))} } _ => { panic!("Can't handle type: {:?}", field.ty); From 2b9bccb53bfd566fc69df7134d98a7b086189d6f Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 28 Mar 2024 16:52:28 -0700 Subject: [PATCH 22/35] fix quoting joining with :: --- derive-macros/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 42186c08a..82cb556ae 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -72,12 +72,12 @@ fn gen_schema_fields(data: &Data) -> TokenStream { let type_path_quoted: Vec = type_path.path.segments.iter().map(|segment| { let segment_ident = &segment.ident; match &segment.arguments { - PathArguments::None => quote! { #segment_ident }, - PathArguments::AngleBracketed(angle_args) => quote! { #segment_ident::#angle_args }, + PathArguments::None => quote! { #segment_ident :: }, + PathArguments::AngleBracketed(angle_args) => quote! { #segment_ident::#angle_args :: }, _ => panic!("Can only handle <> type path args"), } }).collect(); - quote_spanned! { field.span() => #(#type_path_quoted),* ::get_field(stringify!(#name))} + quote_spanned! { field.span() => #(#type_path_quoted),* get_field(stringify!(#name))} } _ => { panic!("Can't handle type: {:?}", field.ty); From effdd0935a3ec070d1cd67b0f40a80ef0cdc8643 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 28 Mar 2024 16:58:36 -0700 Subject: [PATCH 23/35] switch order to be sure it doesn't matter --- kernel/src/snapshot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 34d9e4b0c..db6cb4337 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -69,7 +69,7 @@ impl LogSegment { &self, engine_interface: &dyn EngineInterface, ) -> DeltaResult> { - let schema = get_log_schema().project_as_schema(&[METADATA_NAME, PROTOCOL_NAME])?; + let schema = get_log_schema().project_as_schema(&[PROTOCOL_NAME, METADATA_NAME])?; // read the same protocol and metadata schema for both commits and checkpoints // TODO add metadata.table_id is not null and protocol.something_required is not null let data_batches = self.replay(engine_interface, schema.clone(), schema, None)?; From 5d87b4e96f8b6b0c9d8d4e41b2d8202b7c7f46c8 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 28 Mar 2024 17:07:33 -0700 Subject: [PATCH 24/35] fix doc comment --- derive-macros/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 82cb556ae..77f9ccbac 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -4,8 +4,8 @@ use syn::spanned::Spanned; use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields, PathArguments, Type}; /// Derive a `deltakernel::schemas::GetField` implementation for the annotated struct. The actual -/// field names in the schema (and therefore of the struct members) are all mandated by Delta spec, -/// and so the user of this macro is responsible for ensuring that e.g. `Metadata::schema_string is +/// field names in the schema (and therefore of the struct members) are all mandated by the Delta spec, +/// and so the user of this macro is responsible for ensuring that e.g. `Metadata::schema_string` is /// the snake_case-ified version of `schemaString` from Delta's Change Metadata action (this macro /// allows the use of standard rust snake_case, and will convert to the correct delta schema /// camelCase version). From eb9ecce9edd0ded73a07b12023b04bba8892a08d Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 28 Mar 2024 17:13:17 -0700 Subject: [PATCH 25/35] don't need to collect --- derive-macros/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 77f9ccbac..647ce364c 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -69,14 +69,14 @@ fn gen_schema_fields(data: &Data) -> TokenStream { let name = get_schema_name(name); match field.ty { Type::Path(ref type_path) => { - let type_path_quoted: Vec = type_path.path.segments.iter().map(|segment| { + let type_path_quoted = type_path.path.segments.iter().map(|segment| { let segment_ident = &segment.ident; match &segment.arguments { PathArguments::None => quote! { #segment_ident :: }, PathArguments::AngleBracketed(angle_args) => quote! { #segment_ident::#angle_args :: }, _ => panic!("Can only handle <> type path args"), } - }).collect(); + }); quote_spanned! { field.span() => #(#type_path_quoted),* get_field(stringify!(#name))} } _ => { From 78342ad17b7c68a88f0e5725ee059505fc04bf57 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 29 Mar 2024 09:58:22 -0700 Subject: [PATCH 26/35] make all hashmaps , not > --- kernel/src/actions/mod.rs | 16 ++++++++-------- kernel/src/actions/schemas.rs | 1 - kernel/src/actions/visitors.rs | 19 ++++++++----------- kernel/src/client/arrow_data.rs | 6 +++--- kernel/src/engine_data.rs | 10 +++++----- kernel/src/scan/mod.rs | 9 +++------ 6 files changed, 27 insertions(+), 34 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index a72f811e0..2deb354f9 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -72,7 +72,7 @@ pub struct Metadata { /// The time when this metadata action is created, in milliseconds since the Unix epoch pub created_time: Option, /// Configuration options for the metadata action - pub configuration: HashMap>, + pub configuration: HashMap, } impl Metadata { @@ -127,7 +127,7 @@ pub struct Add { pub path: String, /// A map from partition column to value for this logical file. - pub partition_values: HashMap>, + pub partition_values: HashMap, /// The size of this data file in bytes pub size: i64, @@ -145,7 +145,7 @@ pub struct Add { pub stats: Option, /// Map containing metadata about this logical file. - pub tags: Option>>, + pub tags: Option>, /// Information about deletion vector (DV) associated with this add action pub deletion_vector: Option, @@ -198,13 +198,13 @@ pub(crate) struct Remove { pub(crate) extended_file_metadata: Option, /// A map from partition column to value for this logical file. - pub(crate) partition_values: Option>>, + pub(crate) partition_values: Option>, /// The size of this data file in bytes pub(crate) size: Option, /// Map containing metadata about this logical file. - pub(crate) tags: Option>>, + pub(crate) tags: Option>, /// Information about deletion vector (DV) associated with this add action pub(crate) deletion_vector: Option, @@ -264,7 +264,7 @@ mod tests { StructField::new("createdTime", DataType::LONG, true), StructField::new( "configuration", - MapType::new(DataType::STRING, DataType::STRING, true), + MapType::new(DataType::STRING, DataType::STRING, false), false, ), ]), @@ -276,7 +276,7 @@ mod tests { fn tags_field() -> StructField { StructField::new( "tags", - MapType::new(DataType::STRING, DataType::STRING, true), + MapType::new(DataType::STRING, DataType::STRING, false), true, ) } @@ -284,7 +284,7 @@ mod tests { fn partition_values_field() -> StructField { StructField::new( "partitionValues", - MapType::new(DataType::STRING, DataType::STRING, true), + MapType::new(DataType::STRING, DataType::STRING, false), true, ) } diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index 894bbf8fe..c62ff8859 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -31,7 +31,6 @@ impl_get_field!( (f64, DataType::DOUBLE), (bool, DataType::BOOLEAN), (HashMap, MapType::new(DataType::STRING, DataType::STRING, false)), - (HashMap>, MapType::new(DataType::STRING, DataType::STRING, true)), (Vec, ArrayType::new(DataType::STRING, false)) ); diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index a75d69c3c..b4f2d2d0f 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -330,12 +330,9 @@ mod tests { let configuration = HashMap::from_iter([ ( "delta.enableDeletionVectors".to_string(), - Some("true".to_string()), - ), - ( - "delta.columnMapping.mode".to_string(), - Some("none".to_string()), + "true".to_string(), ), + ("delta.columnMapping.mode".to_string(), "none".to_string()), ]); let expected = Metadata { id: "testId".into(), @@ -379,8 +376,8 @@ mod tests { let add1 = Add { path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet".into(), partition_values: HashMap::from([ - ("c1".to_string(), Some("4".to_string())), - ("c2".to_string(), Some("c".to_string())), + ("c1".to_string(), "4".to_string()), + ("c2".to_string(), "c".to_string()), ]), size: 452, modification_time: 1670892998135, @@ -395,8 +392,8 @@ mod tests { let add2 = Add { path: "c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet".into(), partition_values: HashMap::from([ - ("c1".to_string(), Some("5".to_string())), - ("c2".to_string(), Some("b".to_string())), + ("c1".to_string(), "5".to_string()), + ("c2".to_string(), "b".to_string()), ]), modification_time: 1670892998136, stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}".into()), @@ -405,8 +402,8 @@ mod tests { let add3 = Add { path: "c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet".into(), partition_values: HashMap::from([ - ("c1".to_string(), Some("6".to_string())), - ("c2".to_string(), Some("a".to_string())), + ("c1".to_string(), "6".to_string()), + ("c2".to_string(), "a".to_string()), ]), modification_time: 1670892998137, stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}".into()), diff --git a/kernel/src/client/arrow_data.rs b/kernel/src/client/arrow_data.rs index 66870679f..e09ae4521 100644 --- a/kernel/src/client/arrow_data.rs +++ b/kernel/src/client/arrow_data.rs @@ -137,14 +137,14 @@ impl EngineMap for MapArray { None } - fn materialize(&self, row_index: usize) -> HashMap> { + fn materialize(&self, row_index: usize) -> HashMap { let mut ret = HashMap::new(); let map_val = self.value(row_index); let keys = map_val.column(0).as_string::(); let values = map_val.column(1).as_string::(); for (key, value) in keys.iter().zip(values.iter()) { - if let Some(key) = key { - ret.insert(key.into(), value.map(|v| v.into())); + if let (Some(key), Some(value)) = (key, value) { + ret.insert(key.into(), value.into()); } } ret diff --git a/kernel/src/engine_data.rs b/kernel/src/engine_data.rs index 8c330c494..9aefa2730 100644 --- a/kernel/src/engine_data.rs +++ b/kernel/src/engine_data.rs @@ -51,7 +51,7 @@ pub trait EngineMap { /// Get the item with the specified key from the map at `row_index` in the raw data, and return it as an `Option<&'a str>` fn get<'a>(&'a self, row_index: usize, key: &str) -> Option<&'a str>; /// Materialize the entire map at `row_index` in the raw data into a `HashMap` - fn materialize(&self, row_index: usize) -> HashMap>; + fn materialize(&self, row_index: usize) -> HashMap; } /// A map item is useful if the Engine needs to know what row of raw data it needs to access to @@ -70,7 +70,7 @@ impl<'a> MapItem<'a> { self.map.get(self.row, key) } - pub fn materialize(&self) -> HashMap> { + pub fn materialize(&self) -> HashMap { self.map.materialize(self.row) } } @@ -149,14 +149,14 @@ impl<'a> TypedGetData<'a, Vec> for dyn GetData<'a> + '_ { } } -/// Provide an impl to get a map field as a `HashMap>`. Note that this will +/// Provide an impl to get a map field as a `HashMap`. Note that this will /// allocate the map and allocate for each entry -impl<'a> TypedGetData<'a, HashMap>> for dyn GetData<'a> + '_ { +impl<'a> TypedGetData<'a, HashMap> for dyn GetData<'a> + '_ { fn get_opt( &'a self, row_index: usize, field_name: &str, - ) -> DeltaResult>>> { + ) -> DeltaResult>> { let map_opt: Option> = self.get_opt(row_index, field_name)?; Ok(map_opt.map(|map| map.materialize())) } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 25a6225d3..828a830df 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -280,12 +280,9 @@ impl Scan { } } -fn parse_partition_value( - raw: Option<&Option>, - data_type: &DataType, -) -> DeltaResult { +fn parse_partition_value(raw: Option<&String>, data_type: &DataType) -> DeltaResult { match raw { - Some(Some(v)) => match data_type { + Some(v) => match data_type { DataType::Primitive(primitive) => primitive.parse_scalar(v), _ => Err(Error::generic(format!( "Unexpected partition column type: {data_type:?}" @@ -381,7 +378,7 @@ mod tests { for (raw, data_type, expected) in &cases { let value = parse_partition_value( - Some(&Some(raw.to_string())), + Some(&raw.to_string()), &DataType::Primitive(data_type.clone()), ) .unwrap(); From ffa7d636aef599a8a787034f35d3c75f6f7bb367 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 29 Mar 2024 10:00:43 -0700 Subject: [PATCH 27/35] fix in acceptance --- acceptance/src/meta.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/acceptance/src/meta.rs b/acceptance/src/meta.rs index 43fec7fa1..b8d2404e9 100644 --- a/acceptance/src/meta.rs +++ b/acceptance/src/meta.rs @@ -87,7 +87,7 @@ impl TestCaseInfo { properties: metadata .configuration .iter() - .map(|(k, v)| (k.clone(), v.clone().unwrap())) + .map(|(k, v)| (k.clone(), v.clone())) .collect(), min_reader_version: protocol.min_reader_version as u32, min_writer_version: protocol.min_writer_version as u32, From 1152b8c9c808f85d6f2601f1c05b030009816fec Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 29 Mar 2024 13:59:31 -0700 Subject: [PATCH 28/35] switch to ToDataType --- derive-macros/src/lib.rs | 32 +++++++++------------ kernel/src/actions/mod.rs | 10 +++---- kernel/src/actions/schemas.rs | 53 ++++++++++++++++++++++++----------- 3 files changed, 56 insertions(+), 39 deletions(-) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 647ce364c..462e30840 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -3,12 +3,13 @@ use quote::{quote, quote_spanned}; use syn::spanned::Spanned; use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields, PathArguments, Type}; -/// Derive a `deltakernel::schemas::GetField` implementation for the annotated struct. The actual -/// field names in the schema (and therefore of the struct members) are all mandated by the Delta spec, -/// and so the user of this macro is responsible for ensuring that e.g. `Metadata::schema_string` is -/// the snake_case-ified version of `schemaString` from Delta's Change Metadata action (this macro -/// allows the use of standard rust snake_case, and will convert to the correct delta schema -/// camelCase version). +/// Derive a `deltakernel::schemas::ToDataType` implementation for the annotated struct. The actual +/// field names in the schema (and therefore of the struct members) are all mandated by the Delta +/// spec, and so the user of this macro is responsible for ensuring that +/// e.g. `Metadata::schema_string` is the snake_case-ified version of `schemaString` from [Delta's +/// Change Metadata](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata) +/// action (this macro allows the use of standard rust snake_case, and will convert to the correct +/// delta schema camelCase version). #[proc_macro_derive(Schema)] pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { let input = parse_macro_input!(input as DeriveInput); @@ -16,17 +17,12 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream let schema_fields = gen_schema_fields(&input.data); let output = quote! { - impl crate::actions::schemas::GetField for #struct_ident { - fn get_field(name: impl Into) -> crate::schema::StructField { - use crate::actions::schemas::GetField; - crate::schema::StructField::new( - name, - crate::schema::StructType::new(vec![ - #schema_fields - ]), - // By default not nullable. To make something nullable wrap it in an Option - false, - ) + impl crate::actions::schemas::ToDataType for #struct_ident { + fn to_data_type() -> crate::schema::DataType { + use crate::actions::schemas::{ToDataType, GetStructField}; + crate::schema::StructType::new(vec![ + #schema_fields + ]).into() } } }; @@ -77,7 +73,7 @@ fn gen_schema_fields(data: &Data) -> TokenStream { _ => panic!("Can only handle <> type path args"), } }); - quote_spanned! { field.span() => #(#type_path_quoted),* get_field(stringify!(#name))} + quote_spanned! { field.span() => #(#type_path_quoted),* get_struct_field(stringify!(#name))} } _ => { panic!("Can't handle type: {:?}", field.ty); diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 2deb354f9..0aaad4f59 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -8,7 +8,7 @@ use lazy_static::lazy_static; use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; use self::deletion_vector::DeletionVectorDescriptor; -use crate::actions::schemas::GetField; +use crate::actions::schemas::GetStructField; use crate::{schema::StructType, DeltaResult, EngineData}; use std::collections::HashMap; @@ -21,10 +21,10 @@ pub(crate) static PROTOCOL_NAME: &str = "protocol"; lazy_static! { static ref LOG_SCHEMA: StructType = StructType::new( vec![ - Option::::get_field(ADD_NAME), - Option::::get_field(REMOVE_NAME), - Option::::get_field(METADATA_NAME), - Option::::get_field(PROTOCOL_NAME), + Option::::get_struct_field(ADD_NAME), + Option::::get_struct_field(REMOVE_NAME), + Option::::get_struct_field(METADATA_NAME), + Option::::get_struct_field(PROTOCOL_NAME), // We don't support the following actions yet //Option::get_field(CDC_NAME), //Option::get_field(COMMIT_INFO_NAME), diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index c62ff8859..18dd45caa 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -4,24 +4,23 @@ use std::collections::HashMap; use crate::schema::{ArrayType, DataType, MapType, StructField}; -/// A trait that allows getting a `StructField` based on the provided name and nullability -pub(crate) trait GetField { - fn get_field(name: impl Into) -> StructField; +pub(crate) trait ToDataType { + fn to_data_type() -> DataType; } -macro_rules! impl_get_field { +macro_rules! impl_to_data_type { ( $(($rust_type: ty, $kernel_type: expr)), * ) => { $( - impl GetField for $rust_type { - fn get_field(name: impl Into) -> StructField { - StructField::new(name, $kernel_type, false) + impl ToDataType for $rust_type { + fn to_data_type() -> DataType { + $kernel_type } } )* }; } -impl_get_field!( +impl_to_data_type!( (String, DataType::STRING), (i64, DataType::LONG), (i32, DataType::INTEGER), @@ -29,15 +28,37 @@ impl_get_field!( (char, DataType::BYTE), (f32, DataType::FLOAT), (f64, DataType::DOUBLE), - (bool, DataType::BOOLEAN), - (HashMap, MapType::new(DataType::STRING, DataType::STRING, false)), - (Vec, ArrayType::new(DataType::STRING, false)) + (bool, DataType::BOOLEAN) ); -impl GetField for Option { - fn get_field(name: impl Into) -> StructField { - let mut inner = T::get_field(name); - inner.nullable = true; - inner +// ToDataType impl for non-nullable array types +impl ToDataType for Vec { + fn to_data_type() -> DataType { + ArrayType::new(T::to_data_type(), false).into() + } +} + +// ToDataType impl for non-nullable map types +impl ToDataType for HashMap { + fn to_data_type() -> DataType { + MapType::new(K::to_data_type(), V::to_data_type(), false).into() + } +} + +pub(crate) trait GetStructField { + fn get_struct_field(name: impl Into) -> StructField; +} + +// Normal types produce non-nullable fields +impl GetStructField for T { + fn get_struct_field(name: impl Into) -> StructField { + StructField::new(name, T::to_data_type(), false) + } +} + +// Option types produce nullable fields +impl GetStructField for Option { + fn get_struct_field(name: impl Into) -> StructField { + StructField::new(name, T::to_data_type(), true) } } From 1e624b4998a3b1b96fcedeef4d2e6a758bb1dd41 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 29 Mar 2024 17:36:06 -0700 Subject: [PATCH 29/35] use passed order to `project` and fix column reordering bug --- kernel/src/actions/mod.rs | 19 +++--------- kernel/src/actions/visitors.rs | 2 +- kernel/src/client/arrow_data.rs | 2 +- kernel/src/client/arrow_utils.rs | 46 ++++++++++++++++++++-------- kernel/src/client/default/parquet.rs | 5 +-- kernel/src/client/sync/parquet.rs | 4 +-- kernel/src/scan/file_stream.rs | 4 +-- kernel/src/scan/mod.rs | 4 +-- kernel/src/schema.rs | 32 ++++++++----------- kernel/src/snapshot.rs | 2 +- 10 files changed, 63 insertions(+), 57 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 0aaad4f59..9493ff299 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -78,10 +78,7 @@ pub struct Metadata { impl Metadata { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = MetadataVisitor::default(); - data.extract( - get_log_schema().project_as_schema(&[METADATA_NAME])?, - &mut visitor, - )?; + data.extract(get_log_schema().project(&[METADATA_NAME])?, &mut visitor)?; Ok(visitor.metadata) } @@ -109,10 +106,7 @@ pub struct Protocol { impl Protocol { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = ProtocolVisitor::default(); - data.extract( - get_log_schema().project_as_schema(&[PROTOCOL_NAME])?, - &mut visitor, - )?; + data.extract(get_log_schema().project(&[PROTOCOL_NAME])?, &mut visitor)?; Ok(visitor.protocol) } } @@ -166,10 +160,7 @@ impl Add { /// Since we always want to parse multiple adds from data, we return a `Vec` pub fn parse_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = AddVisitor::default(); - data.extract( - get_log_schema().project_as_schema(&[ADD_NAME])?, - &mut visitor, - )?; + data.extract(get_log_schema().project(&[ADD_NAME])?, &mut visitor)?; Ok(visitor.adds) } @@ -234,7 +225,7 @@ mod tests { #[test] fn test_metadata_schema() { let schema = get_log_schema() - .project_as_schema(&["metaData"]) + .project(&["metaData"]) .expect("Couldn't get metaData field"); let expected = Arc::new(StructType::new(vec![StructField::new( @@ -306,7 +297,7 @@ mod tests { #[test] fn test_remove_schema() { let schema = get_log_schema() - .project_as_schema(&["remove"]) + .project(&["remove"]) .expect("Couldn't get remove field"); let expected = Arc::new(StructType::new(vec![StructField::new( "remove", diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index b4f2d2d0f..d845646eb 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -369,7 +369,7 @@ mod tests { .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); let add_schema = get_log_schema() - .project_as_schema(&[ADD_NAME]) + .project(&[ADD_NAME]) .expect("Can't get add schema"); let mut add_visitor = AddVisitor::default(); batch.extract(add_schema, &mut add_visitor).unwrap(); diff --git a/kernel/src/client/arrow_data.rs b/kernel/src/client/arrow_data.rs index e09ae4521..1dd5c5f27 100644 --- a/kernel/src/client/arrow_data.rs +++ b/kernel/src/client/arrow_data.rs @@ -380,7 +380,7 @@ mod tests { r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, ] .into(); - let output_schema = get_log_schema().project_as_schema(&["metaData"])?; + let output_schema = get_log_schema().project(&["metaData"])?; let parsed = handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); diff --git a/kernel/src/client/arrow_utils.rs b/kernel/src/client/arrow_utils.rs index 88ea3311f..dd719d409 100644 --- a/kernel/src/client/arrow_utils.rs +++ b/kernel/src/client/arrow_utils.rs @@ -9,20 +9,31 @@ use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use itertools::Itertools; use parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor}; -/// Get the indicies in `parquet_schema` of the specified columns in `requested_schema` +/// Get the indicies in `parquet_schema` of the specified columns in `requested_schema`. This +/// returns a tuples of (Vec, Vec). The +/// `parquet_schema_index` is used for generating the mask for reading from the parquet file, while +/// the requested_index is used for re-ordering. The requested_index vec will be -1 for any columns +/// that are not selected, and will contain at selected indexes the position that the column should +/// appear in the output pub(crate) fn get_requested_indices( requested_schema: &ArrowSchema, parquet_schema: &ArrowSchemaRef, -) -> DeltaResult> { +) -> DeltaResult<(Vec, Vec)> { + let mut reorder_indicies = vec![-1; parquet_schema.fields().len()]; let indicies = requested_schema .fields .iter() - .map(|field| { + .enumerate() + .map(|(position, field)| { // todo: handle nested (then use `leaves` not `roots` below in generate_mask) - parquet_schema.index_of(field.name()) + let index = parquet_schema.index_of(field.name()); + if let Ok(index) = index { + reorder_indicies[index] = position as i32; + } + index }) .try_collect()?; - Ok(indicies) + Ok((indicies, reorder_indicies)) } /// Create a mask that will only select the specified indicies from the parquet. Currently we only @@ -47,13 +58,14 @@ pub(crate) fn generate_mask( } } -/// Reorder a RecordBatch to match `requested_schema`. This method takes `indicies` as computed by +/// Reorder a RecordBatch to match `requested_ordering`. This method takes `indicies` as computed by /// [`get_requested_indicies`] as an optimization. If the indicies are in order, then we don't need -/// to do any re-ordering. +/// to do any re-ordering. Otherwise, for each non-zero value in `requested_ordering`, the column at +/// that index will be added in order to returned batch pub(crate) fn reorder_record_batch( - requested_schema: Arc, input_data: RecordBatch, indicies: &[usize], + requested_ordering: &[i32], ) -> DeltaResult { if indicies.windows(2).all(|is| is[0] <= is[1]) { // indicies is already sorted, meaning we requested in the order that the columns were @@ -61,12 +73,22 @@ pub(crate) fn reorder_record_batch( Ok(input_data) } else { // requested an order different from the parquet, reorder - let reordered_columns = indicies + let input_schema = input_data.schema(); + let mut fields = Vec::with_capacity(indicies.len()); + let reordered_columns = requested_ordering .iter() - .map(|index| { - input_data.column(*index).clone() // cheap clones of `Arc`s + .filter_map(|index| { + if *index >= 0 { + let idx = *index as usize; + // cheap clones of `Arc`s + fields.push(input_schema.field(idx).clone()); + Some(input_data.column(idx).clone()) + } else { + None + } }) .collect(); - Ok(RecordBatch::try_new(requested_schema, reordered_columns)?) + let schema = Arc::new(ArrowSchema::new(fields)); + Ok(RecordBatch::try_new(schema, reordered_columns)?) } } diff --git a/kernel/src/client/default/parquet.rs b/kernel/src/client/default/parquet.rs index 671967f91..ecb8ceb34 100644 --- a/kernel/src/client/default/parquet.rs +++ b/kernel/src/client/default/parquet.rs @@ -130,7 +130,8 @@ impl FileOpener for ParquetOpener { let mut reader = ParquetObjectReader::new(store, meta); let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()).await?; let parquet_schema = metadata.schema(); - let indicies = get_requested_indices(&table_schema, parquet_schema)?; + let (indicies, requested_ordering) = + get_requested_indices(&table_schema, parquet_schema)?; let options = ArrowReaderOptions::new(); //.with_page_index(enable_page_index); let mut builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options).await?; @@ -152,7 +153,7 @@ impl FileOpener for ParquetOpener { let stream = stream.map(move |rbr| { // re-order each batch if needed rbr.map_err(Error::Parquet) - .and_then(|rb| reorder_record_batch(table_schema.clone(), rb, &indicies)) + .and_then(|rb| reorder_record_batch(rb, &indicies, &requested_ordering)) }); Ok(stream.boxed()) })) diff --git a/kernel/src/client/sync/parquet.rs b/kernel/src/client/sync/parquet.rs index 6d8b09aba..2192d781e 100644 --- a/kernel/src/client/sync/parquet.rs +++ b/kernel/src/client/sync/parquet.rs @@ -22,7 +22,7 @@ fn try_create_from_parquet(schema: SchemaRef, location: Url) -> DeltaResult DeltaResult DeltaResult>> { - let commit_read_schema = get_log_schema().project_as_schema(&[ADD_NAME, REMOVE_NAME])?; - let checkpoint_read_schema = get_log_schema().project_as_schema(&[ADD_NAME])?; + let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?; + let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME])?; let log_iter = self.snapshot.log_segment.replay( engine_interface, diff --git a/kernel/src/schema.rs b/kernel/src/schema.rs index 94b29834b..4407d03c0 100644 --- a/kernel/src/schema.rs +++ b/kernel/src/schema.rs @@ -143,35 +143,27 @@ impl StructType { } } - /// Get a [`StructType`] containing [`StructField`]s of the given names, preserving the original - /// order of fields. Returns an Err if a specified field doesn't exist - pub fn project(&self, names: &[impl AsRef]) -> DeltaResult { - let mut indexes: Vec = names + /// Get a [`StructType`] containing [`StructField`]s of the given names. The order of fields in + /// the returned schema will match the order passed to this function, which can be different + /// from this order in this schema. Returns an Err if a specified field doesn't exist. + pub fn project_as_struct(&self, names: &[impl AsRef]) -> DeltaResult { + let fields = names .iter() .map(|name| { self.fields - .get_index_of(name.as_ref()) + .get(name.as_ref()) + .cloned() .ok_or_else(|| Error::missing_column(name.as_ref())) }) .try_collect()?; - indexes.sort(); // keep schema order - let fields: Vec = indexes - .iter() - .map(|index| { - self.fields - .get_index(*index) - .expect("get_index_of returned non-existant index") - .1 - .clone() - }) - .collect(); Ok(Self::new(fields)) } - /// Get a [`SchemaRef`] containing [`StructField`]s of the given names, preserving the original - /// order of fields. Returns an Err if a specified field doesn't exist - pub fn project_as_schema(&self, names: &[impl AsRef]) -> DeltaResult { - let struct_type = self.project(names)?; + /// Get a [`SchemaRef`] containing [`StructField`]s of the given names. The order of fields in + /// the returned schema will match the order passed to this function, which can be different + /// from this order in this schema. Returns an Err if a specified field doesn't exist. + pub fn project(&self, names: &[impl AsRef]) -> DeltaResult { + let struct_type = self.project_as_struct(names)?; Ok(Arc::new(struct_type)) } diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index db6cb4337..74bf61174 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -69,7 +69,7 @@ impl LogSegment { &self, engine_interface: &dyn EngineInterface, ) -> DeltaResult> { - let schema = get_log_schema().project_as_schema(&[PROTOCOL_NAME, METADATA_NAME])?; + let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?; // read the same protocol and metadata schema for both commits and checkpoints // TODO add metadata.table_id is not null and protocol.something_required is not null let data_batches = self.replay(engine_interface, schema.clone(), schema, None)?; From 2f419f1b788847805c5d35abe475c77fbaa5ab12 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Sat, 30 Mar 2024 13:03:02 -0700 Subject: [PATCH 30/35] remove expensive parquet schema search --- kernel/src/client/arrow_utils.rs | 42 +++++++++++++++++----------- kernel/src/client/default/parquet.rs | 12 ++++---- kernel/src/client/sync/parquet.rs | 12 ++------ kernel/src/schema.rs | 4 +++ 4 files changed, 38 insertions(+), 32 deletions(-) diff --git a/kernel/src/client/arrow_utils.rs b/kernel/src/client/arrow_utils.rs index dd719d409..567abd29b 100644 --- a/kernel/src/client/arrow_utils.rs +++ b/kernel/src/client/arrow_utils.rs @@ -2,11 +2,10 @@ use std::sync::Arc; -use crate::DeltaResult; +use crate::{schema::SchemaRef, DeltaResult, Error}; use arrow_array::RecordBatch; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; -use itertools::Itertools; use parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor}; /// Get the indicies in `parquet_schema` of the specified columns in `requested_schema`. This @@ -16,23 +15,32 @@ use parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor}; /// that are not selected, and will contain at selected indexes the position that the column should /// appear in the output pub(crate) fn get_requested_indices( - requested_schema: &ArrowSchema, + requested_schema: &SchemaRef, parquet_schema: &ArrowSchemaRef, ) -> DeltaResult<(Vec, Vec)> { - let mut reorder_indicies = vec![-1; parquet_schema.fields().len()]; - let indicies = requested_schema - .fields + let requested_len = requested_schema.fields.len(); + let mut indicies = vec![0; requested_len]; + let mut found_count = 0; // verify that we found all requested fields + let reorder_indicies = parquet_schema + .fields() .iter() .enumerate() - .map(|(position, field)| { - // todo: handle nested (then use `leaves` not `roots` below in generate_mask) - let index = parquet_schema.index_of(field.name()); - if let Ok(index) = index { - reorder_indicies[index] = position as i32; - } - index - }) - .try_collect()?; + .map( + |(parquet_position, field)| match requested_schema.index_of(field.name()) { + Some(index) => { + found_count += 1; + indicies[index] = parquet_position; + index as i32 + } + None => -1, + }, + ) + .collect(); + if found_count != requested_len { + return Err(Error::generic( + "Didn't find all requested columns in parquet schema", + )); + } Ok((indicies, reorder_indicies)) } @@ -40,12 +48,12 @@ pub(crate) fn get_requested_indices( /// handle "root" level columns, and hence use `ProjectionMask::roots`, but will support leaf /// selection in the future. See issues #86 and #96 as well. pub(crate) fn generate_mask( - requested_schema: &ArrowSchema, + requested_schema: &SchemaRef, parquet_schema: &ArrowSchemaRef, parquet_physical_schema: &SchemaDescriptor, indicies: &[usize], ) -> Option { - if parquet_schema.fields.size() == requested_schema.fields.size() { + if parquet_schema.fields.size() == requested_schema.fields.len() { // we assume that in get_requested_indicies we will have caught any column name mismatches, // so here we can just say that if we request the same # of columns as the parquet file // actually has, we don't need to mask anything out diff --git a/kernel/src/client/default/parquet.rs b/kernel/src/client/default/parquet.rs index ecb8ceb34..51d25bed0 100644 --- a/kernel/src/client/default/parquet.rs +++ b/kernel/src/client/default/parquet.rs @@ -55,9 +55,9 @@ impl ParquetHandler for DefaultParquetHandler { return Ok(Box::new(std::iter::empty())); } - let schema: ArrowSchemaRef = Arc::new(physical_schema.as_ref().try_into()?); - let file_reader = ParquetOpener::new(1024, schema.clone(), self.store.clone()); - let mut stream = FileStream::new(files.to_vec(), schema, file_reader)?; + let arrow_schema: ArrowSchemaRef = Arc::new(physical_schema.as_ref().try_into()?); + let file_reader = ParquetOpener::new(1024, physical_schema.clone(), self.store.clone()); + let mut stream = FileStream::new(files.to_vec(), arrow_schema, file_reader)?; // This channel will become the output iterator. // The stream will execute in the background and send results to this channel. @@ -95,19 +95,19 @@ struct ParquetOpener { // projection: Arc<[usize]>, batch_size: usize, limit: Option, - table_schema: ArrowSchemaRef, + table_schema: SchemaRef, store: Arc, } impl ParquetOpener { pub(crate) fn new( batch_size: usize, - schema: ArrowSchemaRef, + table_schema: SchemaRef, store: Arc, ) -> Self { Self { batch_size, - table_schema: schema, + table_schema, limit: None, store, } diff --git a/kernel/src/client/sync/parquet.rs b/kernel/src/client/sync/parquet.rs index 2192d781e..c279883a2 100644 --- a/kernel/src/client/sync/parquet.rs +++ b/kernel/src/client/sync/parquet.rs @@ -1,6 +1,5 @@ use std::fs::File; -use arrow_schema::Schema as ArrowSchema; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder}; use tracing::debug; use url::Url; @@ -20,15 +19,10 @@ fn try_create_from_parquet(schema: SchemaRef, location: Url) -> DeltaResult) -> Option { + self.fields.get_index_of(name.as_ref()) + } + pub fn fields(&self) -> impl Iterator { self.fields.values() } From c69f7ce1c06485069befcacf554878961fc2c8fe Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 2 Apr 2024 11:41:02 -0700 Subject: [PATCH 31/35] no need for -1s --- kernel/src/client/arrow_utils.rs | 41 +++++++++++++++----------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/kernel/src/client/arrow_utils.rs b/kernel/src/client/arrow_utils.rs index 567abd29b..bc4147c8b 100644 --- a/kernel/src/client/arrow_utils.rs +++ b/kernel/src/client/arrow_utils.rs @@ -11,9 +11,13 @@ use parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor}; /// Get the indicies in `parquet_schema` of the specified columns in `requested_schema`. This /// returns a tuples of (Vec, Vec). The /// `parquet_schema_index` is used for generating the mask for reading from the parquet file, while -/// the requested_index is used for re-ordering. The requested_index vec will be -1 for any columns -/// that are not selected, and will contain at selected indexes the position that the column should -/// appear in the output +/// the requested_index is used for re-ordering. `requested_index` will be the same size as +/// requested_schema. Each index in `requested_index` represents a column that will be in the read +/// parquet data at that index. The value stored in `requested_index` is the position that the +/// column should appear in the final output. For example, if `requested_index` is `[2,0,1]`, then +/// the re-ordering code should take the third column in the raw-read parquet data, and move it to +/// the first column in the final output, the first column to the second, and the second to the +/// third. pub(crate) fn get_requested_indices( requested_schema: &SchemaRef, parquet_schema: &ArrowSchemaRef, @@ -25,16 +29,13 @@ pub(crate) fn get_requested_indices( .fields() .iter() .enumerate() - .map( - |(parquet_position, field)| match requested_schema.index_of(field.name()) { - Some(index) => { - found_count += 1; - indicies[index] = parquet_position; - index as i32 - } - None => -1, - }, - ) + .filter_map(|(parquet_position, field)| { + requested_schema.index_of(field.name()).map(|index| { + found_count += 1; + indicies[index] = parquet_position; + index as i32 + }) + }) .collect(); if found_count != requested_len { return Err(Error::generic( @@ -85,15 +86,11 @@ pub(crate) fn reorder_record_batch( let mut fields = Vec::with_capacity(indicies.len()); let reordered_columns = requested_ordering .iter() - .filter_map(|index| { - if *index >= 0 { - let idx = *index as usize; - // cheap clones of `Arc`s - fields.push(input_schema.field(idx).clone()); - Some(input_data.column(idx).clone()) - } else { - None - } + .map(|index| { + let idx = *index as usize; + // cheap clones of `Arc`s + fields.push(input_schema.field(idx).clone()); + input_data.column(idx).clone() }) .collect(); let schema = Arc::new(ArrowSchema::new(fields)); From 84f3d336933d61aebe642a19a8151bb78f55978a Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 2 Apr 2024 12:04:24 -0700 Subject: [PATCH 32/35] const over static --- kernel/src/actions/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 9493ff299..6d9a8349c 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -13,10 +13,10 @@ use crate::{schema::StructType, DeltaResult, EngineData}; use std::collections::HashMap; -pub(crate) static ADD_NAME: &str = "add"; -pub(crate) static REMOVE_NAME: &str = "remove"; -pub(crate) static METADATA_NAME: &str = "metaData"; -pub(crate) static PROTOCOL_NAME: &str = "protocol"; +pub(crate) const ADD_NAME: &str = "add"; +pub(crate) const REMOVE_NAME: &str = "remove"; +pub(crate) const METADATA_NAME: &str = "metaData"; +pub(crate) const PROTOCOL_NAME: &str = "protocol"; lazy_static! { static ref LOG_SCHEMA: StructType = StructType::new( From 376439dd0c52c729d11854973aa5e5e42af73111 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 2 Apr 2024 13:30:33 -0700 Subject: [PATCH 33/35] better comment, and don't need i32s anymore --- kernel/src/client/arrow_utils.rs | 50 ++++++++++++++++---------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/kernel/src/client/arrow_utils.rs b/kernel/src/client/arrow_utils.rs index bc4147c8b..6e7518364 100644 --- a/kernel/src/client/arrow_utils.rs +++ b/kernel/src/client/arrow_utils.rs @@ -9,21 +9,22 @@ use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor}; /// Get the indicies in `parquet_schema` of the specified columns in `requested_schema`. This -/// returns a tuples of (Vec, Vec). The -/// `parquet_schema_index` is used for generating the mask for reading from the parquet file, while -/// the requested_index is used for re-ordering. `requested_index` will be the same size as -/// requested_schema. Each index in `requested_index` represents a column that will be in the read -/// parquet data at that index. The value stored in `requested_index` is the position that the -/// column should appear in the final output. For example, if `requested_index` is `[2,0,1]`, then -/// the re-ordering code should take the third column in the raw-read parquet data, and move it to -/// the first column in the final output, the first column to the second, and the second to the -/// third. +/// returns a tuples of (mask_indicies: Vec, reorder_indicies: +/// Vec). `mask_indicies` is used for generating the mask for reading from the +/// parquet file, and simply contains an entry for each index we wish to select from the parquet +/// file set to the index of the requested column in the parquet. `reorder_indicies` is used for +/// re-ordering and will be the same size as `requested_schema`. Each index in `reorder_indicies` +/// represents a column that will be in the read parquet data at that index. The value stored in +/// `reorder_indicies` is the position that the column should appear in the final output. For +/// example, if `reorder_indicies` is `[2,0,1]`, then the re-ordering code should take the third +/// column in the raw-read parquet data, and move it to the first column in the final output, the +/// first column to the second, and the second to the third. pub(crate) fn get_requested_indices( requested_schema: &SchemaRef, parquet_schema: &ArrowSchemaRef, -) -> DeltaResult<(Vec, Vec)> { +) -> DeltaResult<(Vec, Vec)> { let requested_len = requested_schema.fields.len(); - let mut indicies = vec![0; requested_len]; + let mut mask_indicies = vec![0; requested_len]; let mut found_count = 0; // verify that we found all requested fields let reorder_indicies = parquet_schema .fields() @@ -32,8 +33,8 @@ pub(crate) fn get_requested_indices( .filter_map(|(parquet_position, field)| { requested_schema.index_of(field.name()).map(|index| { found_count += 1; - indicies[index] = parquet_position; - index as i32 + mask_indicies[index] = parquet_position; + index }) }) .collect(); @@ -42,7 +43,7 @@ pub(crate) fn get_requested_indices( "Didn't find all requested columns in parquet schema", )); } - Ok((indicies, reorder_indicies)) + Ok((mask_indicies, reorder_indicies)) } /// Create a mask that will only select the specified indicies from the parquet. Currently we only @@ -67,30 +68,29 @@ pub(crate) fn generate_mask( } } -/// Reorder a RecordBatch to match `requested_ordering`. This method takes `indicies` as computed by -/// [`get_requested_indicies`] as an optimization. If the indicies are in order, then we don't need -/// to do any re-ordering. Otherwise, for each non-zero value in `requested_ordering`, the column at -/// that index will be added in order to returned batch +/// Reorder a RecordBatch to match `requested_ordering`. This method takes `mask_indicies` as +/// computed by [`get_requested_indicies`] as an optimization. If the indicies are in order, then we +/// don't need to do any re-ordering. Otherwise, for each non-zero value in `requested_ordering`, +/// the column at that index will be added in order to returned batch pub(crate) fn reorder_record_batch( input_data: RecordBatch, - indicies: &[usize], - requested_ordering: &[i32], + mask_indicies: &[usize], + requested_ordering: &[usize], ) -> DeltaResult { - if indicies.windows(2).all(|is| is[0] <= is[1]) { + if mask_indicies.windows(2).all(|is| is[0] <= is[1]) { // indicies is already sorted, meaning we requested in the order that the columns were // stored in the parquet Ok(input_data) } else { // requested an order different from the parquet, reorder let input_schema = input_data.schema(); - let mut fields = Vec::with_capacity(indicies.len()); + let mut fields = Vec::with_capacity(requested_ordering.len()); let reordered_columns = requested_ordering .iter() .map(|index| { - let idx = *index as usize; // cheap clones of `Arc`s - fields.push(input_schema.field(idx).clone()); - input_data.column(idx).clone() + fields.push(input_schema.field(*index).clone()); + input_data.column(*index).clone() }) .collect(); let schema = Arc::new(ArrowSchema::new(fields)); From 58ef6e02e9fc3875b683b9f305561e76ae487416 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 2 Apr 2024 13:38:50 -0700 Subject: [PATCH 34/35] add automatically_derived attribute to derived impl --- derive-macros/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs index 462e30840..7c0fdebe7 100644 --- a/derive-macros/src/lib.rs +++ b/derive-macros/src/lib.rs @@ -17,6 +17,7 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream let schema_fields = gen_schema_fields(&input.data); let output = quote! { + #[automatically_derived] impl crate::actions::schemas::ToDataType for #struct_ident { fn to_data_type() -> crate::schema::DataType { use crate::actions::schemas::{ToDataType, GetStructField}; From 12a52411116c4979260722053155940dabaa7011 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Wed, 3 Apr 2024 10:04:54 -0700 Subject: [PATCH 35/35] doc fixup and comment about order --- kernel/src/scan/file_stream.rs | 3 +++ kernel/src/scan/mod.rs | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/kernel/src/scan/file_stream.rs b/kernel/src/scan/file_stream.rs index f24230315..3bdbef2a2 100644 --- a/kernel/src/scan/file_stream.rs +++ b/kernel/src/scan/file_stream.rs @@ -102,6 +102,9 @@ impl LogReplayScanner { .transpose()?; let schema_to_use = if is_log_batch { + // NB: We _must_ pass these in the order `ADD_NAME, REMOVE_NAME` as the visitor assumes + // the Add action comes first. The [`project`] method honors this order, so this works + // as long as we keep this order here. get_log_schema().project(&[ADD_NAME, REMOVE_NAME])? } else { // All checkpoint actions are already reconciled and Remove actions in checkpoint files diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 9bb3dadff..f69aa3dba 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -51,8 +51,8 @@ impl ScanBuilder { self } - /// Optionally provide a [`Schema`] for columns to select from the [`Snapshot`]. See - /// [`with_schema`] for details. If schema_opt is `None` this is a no-op. + /// Optionally provide a [`SchemaRef`] for columns to select from the [`Snapshot`]. See + /// [`ScanBuilder::with_schema`] for details. If schema_opt is `None` this is a no-op. pub fn with_schema_opt(self, schema_opt: Option) -> Self { match schema_opt { Some(schema) => self.with_schema(schema),