Skip to content

Use a proc macro for being able to derive schemas for our action structs #129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from

Conversation

nicklan
Copy link
Collaborator

@nicklan nicklan commented Feb 28, 2024

Work to be able to do:

#[derive(Schema)]
pub struct Protocol {
    pub min_reader_version: i32,
    pub min_writer_version: i32,
    pub reader_features: Option<Vec<String>>,
    pub writer_features: Option<Vec<String>>,
}

This also then uses the generated schemas from our actions everywhere, and removes the old schema definitions. To use these we define a static LOG_SCHEMA which calls all the generated get_field methods to build the total log schema. We also add a project method to StructType so we can then pick out the columns we want when doing something like parsing only metadata.

How it works

We introduce a trait, GetField which is:

trait GetField {
    fn get_field(name: impl Into<String>) -> SchemaRef;
}

And then adding the derive will generate an impl that looks like:

    impl crate::actions::schemas::GetField for Protocol {
        fn get_field(name: impl Into<String>) -> crate::schema::StructField {
            use crate::actions::schemas::GetField;
            crate::schema::StructField::new(
                name,
                crate::schema::StructType::new(
                    <[_]>::into_vec(
                        #[rustc_box]
                        ::alloc::boxed::Box::new([
                            i32::get_field("minReaderVersion"),
                            i32::get_field("minWriterVersion"),
                            Option::<Vec<String>>::get_field("readerFeatures"),
                            Option::<Vec<String>>::get_field("writerFeatures"),
                        ]),
                    ),
                ),
                false,
            )
        }
    }

Right now all error handling is via panic, but that's generally okay as the compiler just fails with the panic message.

If you want to inspect the generated code, you can use cargo expand. Install it, and then run something like:
$ cargo expand | grep -A50 "impl crate::actions::GetField for Protocol"
and replace Protocol with whatever struct you want to see the impl for.

@nicklan nicklan force-pushed the schema_derive_macro branch from 1fd9ff9 to a52592f Compare February 28, 2024 18:55
Copy link
Contributor

@ryan-johnson-databricks ryan-johnson-databricks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a stab at this! Having something to iterate on is so much better than guessing what it might be like to implement it.

Once we get this sorted out, we should also be able to derive the various visitors such as MetadataVisitor?

}
}

fn get_data_type(path_segment: &PathSegment) -> Option<TokenStream> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some related issues here...

The name of the field is wrong. Unfortunately the struct name isn't exactly what we want (i.e. struct is named Metadata but schema name is metaData. also deletionVector vs. DeletionVectorDescriptor). We can likely use an attribute or a transformation rule to fix this.

I think the problem is that we try to return a StructField instead of DataType, which also makes life harder for everyone.

For example, the schema of any type doesn't have a "name" -- the type only becomes associated with a field name if/when it becomes a field of some struct. In Delta spark, for example, the Metadata action's name is metaData because that's the name given to it by the SingleAction type that unions all other action types, and the latter provides the schema we use to parse commit .json files.

Using StructField instead of DataType also leads to this get_data_type function. Once we define a schema as DataType instead, we can just impl GetSchema for all the basic types we want to support. See playground example.

The key to handling nullability is to recognize that it's not a property of the data type that might be null -- it's a property of the owning complex type. Thus, we would not impl<T: GetSchema> GetSchema for Option<T>. Instead, e.g. impl<T: GetSchema> GetSchema for Vec<T> covers non-nullable array elements, and impl<T: GetSchema> GetSchema for Vec<Option<T>> covers nullable array elements.

If we do all that, this macro's job gets simpler: Figure out the name, type, and nullability of each field (with name literally being the struct field name, type being the "base" type of that field, and nullability decided by whether the base type is wrapped in Option or not). The base types are handled recursively by appeal to the GetSchema trait, and any nested object that fails to implement the trait will trigger a compilation failure because the type bound is not satisfied.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense. Perhaps once #109 merges, we can look at changing the way we represent schemas and then make this macro less complex.

if let Some(fin) = type_path.path.segments.iter().last() {
get_data_type(fin)
} else {
panic!("Path for generic type must have last")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now all error handling is via panic

Is that a problem for macros that are "running" at compile-time?
Intuitively, it should just result in a compilation error?

Copy link
Collaborator Author

@nicklan nicklan Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's not really an issue, but doing something more complex like this will give nicer errors.

Not something we need to do up front, hence all the panics here

@nicklan nicklan force-pushed the schema_derive_macro branch 2 times, most recently from 8da02b1 to 9389f06 Compare March 5, 2024 00:02
@@ -196,20 +196,20 @@ impl RemoveVisitor {

let size: Option<i64> = getters[5].get_opt(row_index, "remove.size")?;

// TODO(nick) stats are skipped in getters[6] and tags are skipped in getters[7]
// TODO(nick) tags are skipped in getters[6]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the stats field appears to have been mistakenly copied over into the schema from Add. Remove does not actually have a stats field, so this was all incorrect below (but we just hadn't tested it properly before)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next PR: derive macro for basic visitors? That would eliminate the possibility that they get out of sync.

For fields that are themselves structs, we could use that struct's own visitor (after verifying that the first non-nullable sub-field is non-null, in case the field was also nullable).

The one annoyance is all those e.g. "remove.xxx" error message helpers -- we'd have to either derive remove from the struct name, or else tell the macro the field name to use.

@nicklan nicklan changed the title POC: using a proc macro for being able to derive schemas for our action structs Use a proc macro for being able to derive schemas for our action structs Mar 6, 2024
@@ -113,7 +113,7 @@ pub struct Add {
pub stats: Option<String>,

/// Map containing metadata about this logical file.
pub tags: HashMap<String, Option<String>>,
pub tags: Option<HashMap<String, Option<String>>>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tags are optional in the spec, so this is a bug fix

Comment on lines 17 to 18
let tokens: Vec<TokenTree> = list.tokens.clone().into_iter().collect();
match tokens[..] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity -- do we actually need to clone tokens? Or can we just slice and match it directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let tokens: Vec<TokenTree> = list.tokens.clone().into_iter().collect();
match tokens[..] {
match list.tokens[..] {

(might not need the ref matching any more after that?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(TokenStream)[https://doc.rust-lang.org/proc_macro/struct.TokenStream.html] doesn't support being used as a slice. it's really only an iter, which is why we collect it and then use it as a slice.

I think I could probably re-write this to avoid the clone, but it would look more like the previous code where we'd have to match one token at a time, and was much uglier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

let mut visitor = MetadataVisitor::default();
data.extract(Arc::new(schema), &mut visitor)?;
data.extract(Metadata::get_schema(), &mut visitor)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this get_schema method is helpful. Every read we perform is ultimately some projection of fields from actions::get_log_schema(), and the latter already names all the fields (metaData in this case).

Instead of "creating" a metadata schema, and needing to worry about "magically" getting the right field name, can we just filter get_log_schema()? Spark's StructType has specific methods for extracting one or several fields -- you pass 1+ fields to be projected, and they are returned in schema order. The rust analogue would be something like:

Suggested change
data.extract(Metadata::get_schema(), &mut visitor)?;
data.extract(get_log_schema().project_one("metaData"), &mut visitor)?;

or, for the add+remove case in scan/mod.rs below,

        let action_schema = Arc::new(StructType::new(vec![
            Option::<Add>::get_field("add"),
            Option::<Remove>::get_field("remove"),
        ]));

becomes

        let action_schema = Arc::new(get_log_schema().project(&["add", "remove"]));

The one bummer is, I don't see any way to actually get away from the field names. Even if we use e.g. Metadata::get_schema() to "hide" the name, the selection logic still needs to know it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could observe that the above is equivalent to:

Suggested change
data.extract(Metadata::get_schema(), &mut visitor)?;
data.extract(Arc::new(StructType::new(vec![Metadata::get_field("metaData")])), &mut visitor)?;

... which is annoying because of arc/struct/vec wrappings. But we can fix that once for everyone by defining something like:

trait GetSchema : GetField {
  fn get_schema(name: Into<String>) -> SchemaRef {
    Arc::new(StructType::new(vec![Self::get_field(name)]))
  }
}

... which produces here:

Suggested change
data.extract(Metadata::get_schema(), &mut visitor)?;
data.extract(Metadata::get_schema("metaData"), &mut visitor)?;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually quite like the project option. It has a few advantages:

  1. It lets us make only LOG_SCHEMA have to be in a lazy_static, so we can get rid of the somewhat tricky OnceLock construct currently used to make the generated schema static
  2. We no longer need the annotations to allow field rename, so we can remove the most complex macro parsing code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me!

We still need to solve the problem that the magic constant column name passed to project might not be correct, and trigger a runtime error. But that problem existed before, and at least now we can define a constant (in LogSchema perhaps?) for each top-level column name if we want?

@@ -79,13 +80,12 @@ pub struct Protocol {
impl Protocol {
pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult<Option<Protocol>> {
let mut visitor = ProtocolVisitor::default();
let schema = StructType::new(vec![crate::actions::schemas::PROTOCOL_FIELD.clone()]);
data.extract(Arc::new(schema), &mut visitor)?;
data.extract(Protocol::get_schema(), &mut visitor)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work? The protocol column of the EngineData should be nullable, since most rows will contain some other type. But this is non-nullable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works because even if we didn't read any protocol objects we still read using a schema with a protocol column in it. That means that at the top level when you do column_by_name("protocol") in the arrow you do get a StructArray with children that match the schema, it's just that those columns are all null.

This would give: Error::MissingData("Found required field protocol, but it's null") if called on an EngineData that had been read with the incorrect schema. That feels maybe correct, but we could also have it just not error out be return None in that case.

It does get a bit tricky to model though. What are the semantics of a struct that is nullable, with fields that are not? I guess it's reasonable to say that if the struct is null, everything can be null. I'd have to modify the existing extract code though, I don't think it would handle that case properly (i.e. it would complain that your schema says a Protocol must have a minReaderVersion even if the data had no protocol column at all and we marked protocol as nullable)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the semantics of a struct that is nullable, with fields that are not?

IIRC, parquet handles this case very badly in practice (corrupt file). Spark compensates by trying to force all children of a nullable field to themselves be nullable. See StructType.asNullable, for example, tho the latter only goes one layer deep instead of being fully transitive.

Given that spark, parquet, and arrow all seem to treat null-struct vs struct-of-null as ~equivalent, maybe we should just formalize the idea in kernel? An exploded field is nullable if it or any parent is nullable, and null if it or any parent is null?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An exploded field is nullable if it or any parent is nullable, and null if it or any parent is null?

yes, I think this is the most logical way to represent this. I can update our extraction code to do this, and we'll need to be careful to document it for connectors so their extraction code can do the same.

@nicklan nicklan force-pushed the schema_derive_macro branch from 44b7e07 to b51b7b1 Compare March 12, 2024 19:02
Self::extract_columns_from_array(out_col_array, schema, None)?;
} else if array.is_none() || field.is_nullable() {
if let DataType::Struct(inner_struct) = field.data_type() {
Self::extract_columns_from_array(out_col_array, inner_struct.as_ref(), None)?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a bug before where we were passing the parent schema instead of the child one

}
} else {
quote_spanned! {field.span()=>
#type_ident::get_field(stringify!(#name))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to emit the fully qualified type name, in case the user didn't use the (full) path to it?
(especially since, if I understand correctly, this is an unresolved token stream, so any qualifiers the user gave are probably needed for it to compile at all)

proc_macro::TokenStream::from(output)
}

// turn our struct name into the schema name, goes from snake_case to camelCase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know where to put the doc comment, but somewhere we should be careful to explain that the actual field names are all mandated by Delta spec, and so the user of this macro is responsible to ensure that e.g. Metadata::schema_string is the snake-case-ified version of schemaString from Delta's Change Metadata action, in order to keep rust happy. This macro is written with the assumption that it merely undoes that (previously correctly performed) transformation.

The same explains why it's ok to use to_ascii_uppercase below -- all Delta field names are plain ASCII.

(f64, DataType::DOUBLE),
(bool, DataType::BOOLEAN),
(HashMap<String, String>, MapType::new(DataType::STRING, DataType::STRING, false)),
(HashMap<String, Option<String>>, MapType::new(DataType::STRING, DataType::STRING, true)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a ticket somewhere that tracks getting rid of this map-of-option-of-string thing?
(it keeps coming up in various PR)

Delta was originally written in Scala (Java variant), where there's no such thing as a nullable map entry: looking up a non-existent key returns null as a sentinel value. That makes it hard for me to imagine a case where map-of-option-of-string could ever be anything but a semantic overhead.

I guess we need to double check whether the json-serialized form of a string-string map somehow allows null entries. But even if it does somehow happen, I'd rather filter out such entries at parsing time rather than propagate them through the rest of the system?

@@ -196,20 +196,20 @@ impl RemoveVisitor {

let size: Option<i64> = getters[5].get_opt(row_index, "remove.size")?;

// TODO(nick) stats are skipped in getters[6] and tags are skipped in getters[7]
// TODO(nick) tags are skipped in getters[6]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next PR: derive macro for basic visitors? That would eliminate the possibility that they get out of sync.

For fields that are themselves structs, we could use that struct's own visitor (after verifying that the first non-nullable sub-field is non-null, in case the field was also nullable).

The one annoyance is all those e.g. "remove.xxx" error message helpers -- we'd have to either derive remove from the struct name, or else tell the macro the field name to use.

.iter()
.map(|name| {
self.fields
.get_index_of(name.as_ref())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will have O(n*m) cost, where n is the schema arity and m is the projection arity. Perhaps we could borrow spark's approach, which creates a set from the names (presumably m is much smaller than n) and then does a single filter-map pass over the fields, returning only those fields whose name is present in the set of names. Also avoids the other two passes (sort and index).

crate::actions::schemas::REMOVE_FIELD.clone(),
]
let schema_to_use = if is_log_batch {
get_log_schema().project_as_schema(&[ADD_NAME, REMOVE_NAME])?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because project_as_schema preserves schema order, this will only work because add comes before remove in the log schema. Could be a surprising pitfall worth a comment?

crate::actions::schemas::PROTOCOL_FIELD.clone(),
]);
let data_batches = self.replay(engine_interface, Arc::new(schema), None)?;
let schema = get_log_schema().project_as_schema(&[METADATA_NAME, PROTOCOL_NAME])?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to other comment -- now this is only correct if metadata comes before protocol in the log schema.

@nicklan nicklan closed this Mar 19, 2024
@nicklan nicklan reopened this Mar 19, 2024
@nicklan nicklan closed this Mar 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants