-
Notifications
You must be signed in to change notification settings - Fork 77
Schema derive macro (reborn) #156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Co-authored-by: Ryan Johnson <[email protected]>
(at cost of returning a ref)
/// allows the use of standard rust snake_case, and will convert to the correct delta schema | ||
/// camelCase version). | ||
#[proc_macro_derive(Schema)] | ||
pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment above added to address https://github.com/delta-incubator/delta-kernel-rs/pull/129/files#r1527171512
This will show up in the docs for the derive-macros crate as:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you wanted you could also omit the crate from docs with #[doc(hidden)]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I sort of consider the crate docs like "implementer" docs. Like someone writing a connector would read them. I think understanding what this macro does is useful from that perspective, so likely it should be left in.
let name = field.ident.as_ref().unwrap(); // we know these are named fields | ||
let name = get_schema_name(name); | ||
match field.ty { | ||
Type::Path(ref type_path) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rescuing https://github.com/delta-incubator/delta-kernel-rs/pull/129/files#r1527170864:
Don't we need to emit the fully qualified type name, in case the user didn't
use
the (full) path to it?
(especially since, if I understand correctly, this is an unresolved token stream, so any qualifiers the user gave are probably needed for it to compile at all)
I've changed it here to join the full type path that was specified at the derive site. This means that if they have use
ed it, it will just be the name of the type, and if they have put::a:full::Path
, that will happen here too
kernel/src/actions/mod.rs
Outdated
@@ -113,7 +145,7 @@ pub struct Add { | |||
pub stats: Option<String>, | |||
|
|||
/// Map containing metadata about this logical file. | |||
pub tags: HashMap<String, Option<String>>, | |||
pub tags: Option<HashMap<String, Option<String>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tags are optional in the spec, so this is a bug fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please just remove the Option<String>
business? It complicates the schema handling and there's nothing in the Delta spec that allows for it -- add.tags is a string-string map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(ah, it's being tracked in the other comment... but still I wonder why wait)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. Changed to <String,String> everywhere (seems to work fine)
kernel/src/actions/schemas.rs
Outdated
(f64, DataType::DOUBLE), | ||
(bool, DataType::BOOLEAN), | ||
(HashMap<String, String>, MapType::new(DataType::STRING, DataType::STRING, false)), | ||
(HashMap<String, Option<String>>, MapType::new(DataType::STRING, DataType::STRING, true)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quoted comment:
Do we have a ticket somewhere that tracks getting rid of this map-of-option-of-string thing?
(it keeps coming up in various PR)Delta was originally written in Scala (Java variant), where there's no such thing as a nullable map entry: looking up a non-existent key returns null as a sentinel value. That makes it hard for me to imagine a case where map-of-option-of-string could ever be anything but a semantic overhead.
I guess we need to double check whether the json-serialized form of a string-string map somehow allows null entries. But even if it does somehow happen, I'd rather filter out such entries at parsing time rather than propagate them through the rest of the system?
Yes, #128. I'll get to that in not too long I hope
@@ -211,20 +211,20 @@ impl RemoveVisitor { | |||
|
|||
let size: Option<i64> = getters[5].get_opt(row_index, "remove.size")?; | |||
|
|||
// TODO(nick) stats are skipped in getters[6] and tags are skipped in getters[7] | |||
// TODO(nick) tags are skipped in getters[6] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rescued comment: https://github.com/delta-incubator/delta-kernel-rs/pull/129/files#r1513701837
Yes, will have a follow-up to derive visitors.
kernel/src/snapshot.rs
Outdated
crate::actions::schemas::METADATA_FIELD.clone(), | ||
crate::actions::schemas::PROTOCOL_FIELD.clone(), | ||
])); | ||
let schema = get_log_schema().project_as_schema(&[METADATA_NAME, PROTOCOL_NAME])?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rescuing https://github.com/delta-incubator/delta-kernel-rs/pull/129/files#r1527196393
similar to other comment -- now this is only correct if metadata comes before protocol in the log schema.
I don't think that's true, we use get_index_of
on the set of names, so it'll just find the correct items no matter the order given here. I've switched from &[METADATA_NAME, PROTOCOL_NAME]
in the old PR to &[PROTOCOL_NAME, METADATA_NAME]
to prove it works :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that this one works, because the try_new_from_data
methods do another projection of their own, to extract the column they care about. The other #129 (comment) is more serious because it directly uses the projected schema. We can pick up the discussion there, tho.
kernel/src/schema.rs
Outdated
.iter() | ||
.map(|name| { | ||
self.fields | ||
.get_index_of(name.as_ref()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rescuing: https://github.com/delta-incubator/delta-kernel-rs/pull/129/files#r1527174598
this will have O(n*m) cost, where n is the schema arity and m is the projection arity. Perhaps we could borrow spark's approach, which creates a set from the names (presumably m is much smaller than n) and then does a single filter-map pass over the fields, returning only those fields whose name is present in the set of names. Also avoids the other two passes (sort and index).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's quiet correct. get_index_of
is O(1), so this is O(m) in the number of requested fields, plus a sort.
Given that the set of fields is quiet small, I think that should be as fast, if not faster, than first hashing the input fields, and probing while iterating all the schema fields. Especially if m
is much smaller than n
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_index_of
isO(1)
, so this isO(m)
in the number of requested fields, plus a sort.
You're right. I keep forgetting that.
Going by the other comment in file_stream.rs below tho -- maybe it's not even desirable to preserve schema order in the first place. If so, then I think this code simplifies to something like:
let fields = names
.iter()
.map(|name| {
self.fields
.get(name.as_ref())
.ok_or_else(...)
})
.try_collect()?;
Ok(Self::new(fields))
(I think rust can infer the type of fields
thanks to a combination of try_collect
and Self::new
requiring a Vec
)
kernel/src/snapshot.rs
Outdated
crate::actions::schemas::METADATA_FIELD.clone(), | ||
crate::actions::schemas::PROTOCOL_FIELD.clone(), | ||
])); | ||
let schema = get_log_schema().project_as_schema(&[METADATA_NAME, PROTOCOL_NAME])?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that this one works, because the try_new_from_data
methods do another projection of their own, to extract the column they care about. The other #129 (comment) is more serious because it directly uses the projected schema. We can pick up the discussion there, tho.
kernel/src/scan/file_stream.rs
Outdated
crate::actions::schemas::REMOVE_FIELD.clone(), | ||
] | ||
let schema_to_use = if is_log_batch { | ||
get_log_schema().project_as_schema(&[ADD_NAME, REMOVE_NAME])? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rescuing #129 (comment)
- This schema is directly used by the
extract
below. - The AddRemoveVisitor hard-wires the assumption that
add
is the first column andremove
(if present) is the second column (by slicing the exploded list of getters) - In fact, the
schema_to_use
only hasAdd
beforeRemove
because that's how get_log_schema happens to define it (becuseproject_as_schema
preserves original schema order). - If we reversed the ordering in
get_log_schema
this code would break badly. Even if we wanted to rework theAddRemoveVisitor
to hard-wireRemove
first, it would still break for checkpoints that don't pass theRemove
column in the first place.
Overall, I worry that project
actually needs the ability reorder columns, or we'll get bitten repeatedly by gotchas like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Project now honors the order passed to project
kernel/src/schema.rs
Outdated
self.fields | ||
.get_index(*index) | ||
.expect("get_index_of returned non-existant index") | ||
.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside (rust n00b question): What is .1
? That's not legal syntax in most languages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It gets the second element of the tuple returned by get_index
, with .0
being the first.
kernel/src/schema.rs
Outdated
.iter() | ||
.map(|name| { | ||
self.fields | ||
.get_index_of(name.as_ref()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_index_of
isO(1)
, so this isO(m)
in the number of requested fields, plus a sort.
You're right. I keep forgetting that.
Going by the other comment in file_stream.rs below tho -- maybe it's not even desirable to preserve schema order in the first place. If so, then I think this code simplifies to something like:
let fields = names
.iter()
.map(|name| {
self.fields
.get(name.as_ref())
.ok_or_else(...)
})
.try_collect()?;
Ok(Self::new(fields))
(I think rust can infer the type of fields
thanks to a combination of try_collect
and Self::new
requiring a Vec
)
kernel/src/schema.rs
Outdated
/// Get a [`SchemaRef`] containing [`StructField`]s of the given names, preserving the original | ||
/// order of fields. Returns an Err if a specified field doesn't exist | ||
pub fn project_as_schema(&self, names: &[impl AsRef<str>]) -> DeltaResult<SchemaRef> { | ||
let struct_type = self.project(names)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be the only call site for project
... should we consider calling this method project
, and we either rename the other project_as_struct
or just fold it into this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
64955b7
to
c69f7ce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
posted some comments from our discussion earlier, otherwise lgtm
parquet_schema.index_of(field.name()) | ||
.enumerate() | ||
.filter_map(|(parquet_position, field)| { | ||
requested_schema.index_of(field.name()).map(|index| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another comment from our discussion earlier: consider duplicate names? I'm actually not even sure if that's allowed in parquet/elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kernel might need to handle duplicate column names if an expression from the user projects them, but I don't think the parquet reader should have to? Parquet itself doesn't allow duplicates, and it would be wasteful to allow duplicates at parquet scan level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use an IndexMap
to represent our schema. The field names are the keys, so there isn't a possibility of a duplicate here.
I've added #164 as potential follow-up work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Belated review, but possibly worth cutting a new PR to incorporate the suggested simplifications?
parquet_schema.index_of(field.name()) | ||
.enumerate() | ||
.filter_map(|(parquet_position, field)| { | ||
requested_schema.index_of(field.name()).map(|index| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kernel might need to handle duplicate column names if an expression from the user projects them, but I don't think the parquet reader should have to? Parquet itself doesn't allow duplicates, and it would be wasteful to allow duplicates at parquet scan level.
.filter_map(|(parquet_position, field)| { | ||
requested_schema.index_of(field.name()).map(|index| { | ||
found_count += 1; | ||
mask_indicies[index] = parquet_position; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we distinguish between reading parquet column 0 first vs. "no match"? e.g. if parquet schema is [a, b, c]
and read schema is [a]
, wouldn't that produce mask_indices [0, 0, 0]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: The docs for ProjectionMask.roots state that
repeated or out of order indices will not impact the final mask, i.e.
[0, 1, 2]
will construct the same mask as[1, 0, 0, 2]
I think that means we can simplify things a lot by doing something like:
let (selected_indices, reorder_indices): (Vec<_>, Vec<_>) = parquet_schema
.fields()
.iterator()
.enumerate()
.filter_map(|parquet_index, field| {
requested_schema.index_of(field.name()).map(|index| (parquet_index, index))
}
.unzip();
require!(selected_indices.len() == requested_schema.fields().len(), Error::generic(...));
if let Some(mask) = generate_mask(builder.parquet_schema(), selected_indices) {
...
}
let mut rb = ...;
if (!reorder_indices.windows(2).all(|a, b| a < b)) {
// requested columns are not in natural order
let reordered_columns = reorder_indices
.iterator()
.map(|index| rb.column(*index).clone()) // cheap Arc clone
.collect();
let schema = Arc::new(ArrowSchema::new(requested_schema));
rb = RecordBatch::try_new(schema, reordered_columns)?;
}
Note: The generate_mask
method only needs two args with the above code -- the physical schema and the indices of fields to select.
) -> DeltaResult<RecordBatch> { | ||
if indicies.windows(2).all(|is| is[0] <= is[1]) { | ||
if mask_indicies.windows(2).all(|is| is[0] <= is[1]) { | ||
// indicies is already sorted, meaning we requested in the order that the columns were | ||
// stored in the parquet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might be too strict a check? If parquet schema were [a, b, c, d]
and read schema were [b, d]
then the mask indexes would appear unordered because of the gaps? Shouldn't we instead be checking whether the requested ordering is ordered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does just check that i think? it just checks that for each pair of two elements the earlier one is <=
to the later one. so for your example it would be checking that 2<=4
, which is true
.
this can be changed to <
probably, since we won't have duplicate indices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good point. Not sure what I was thinking.
} | ||
|
||
/// Create a mask that will only select the specified indicies from the parquet. Currently we only | ||
/// handle "root" level columns, and hence use `ProjectionMask::roots`, but will support leaf | ||
/// selection in the future. See issues #86 and #96 as well. | ||
pub(crate) fn generate_mask( | ||
requested_schema: &ArrowSchema, | ||
requested_schema: &SchemaRef, | ||
parquet_schema: &ArrowSchemaRef, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What purpose does the parquet_schema
arg serve, as different from the parquet_physical_schema
? Seems like we could just check the latter's length at L58 below, and then wouldn't need the extra arg any more?
Reopening #129 .
Description from there:
Work to be able to do:
This also then uses the generated schemas from our actions everywhere, and removes the old schema definitions. To use these we define a static
LOG_SCHEMA
which calls all the generatedget_struct_field
methods to build the total log schema. We also add aproject
method toStructType
so we can then pick out the columns we want when doing something like parsing only metadata.How it works
We introduce a trait,
ToDataType
which is:All primitive types as well as vectors and maps know how to produce a
DataType
. Then we can derive a struct implementation of it by annotating with this macro, which will generate an impl that looks like:Finally, we add a
GetStructField
trait, which is auto derived for anything that implementsToDataType
. It simply allows you to pass a name and get aStructField
back. A schema is just aVec
ofStructField
s, so with this implemented, it's trivial to construct the schema for any type annotated with#[derive(Schema)]
Right now all error handling is via
panic
, but that's generally okay as the compiler just fails with the panic message.If you want to inspect the generated code, you can use cargo expand. Install it, and then run something like:
$ cargo expand | grep -A50 "ToDataType for Protocol"
and replace
Protocol
with whatever struct you want to see the impl for.