-
Notifications
You must be signed in to change notification settings - Fork 980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DRILL-8507, DRILL-8508 Better handling of partially missing parquet columns #2937
base: master
Are you sure you want to change the base?
DRILL-8507, DRILL-8508 Better handling of partially missing parquet columns #2937
Conversation
…h existing ones 1. In ParquetSchema#createMissingColumn replaced col.toExpr() to col.getAsUnescapedPath() so that missing column name wouldn't be quoted with backticks 2. Fixed a typo in UnionAllRecordBatch ("counthas" -> "counts") 3. In TestParquetFilterPushDown workarounded NumberFormatException with CONVERT_TO 4. Removed testCoalesceWithUntypedNullValues* test methods from TestCaseNullableTypes 5. Moved testCoalesceOnNotExistentColumns* test methods from TestUntypedNull to a separate TestParquetMissingColumns and made them expect Nullable Int instead of Untyped Null 6. Created new TestParquetPartiallyMissingColumns test class with test cases for "backticks problem"
…ing parquet column (minor type solution) 1. Passed an overall table schema from AbstractParquetRowGroupScan to ParquetSchema 2. In ParquetSchema#createMissingColumn used the minor type from that schema instead of hardcoding the INT
…ing parquet column (data mode solution) 1. Added TypeCastRules#getLeastRestrictiveMajorType method for convenience 2. In Metadata, added resolving data mode (so it always prefer less restrictive one) when collecting file schemas and merging them into a single table schema. Synchronized merging to accomplish that 3. In ParquetTableMetadataUtils made the column either found OPTIONAL or missing in any of the files be OPTIONAL in the overall table schema 4. For such cases, added enforcing OPTIONAL data mode in ParquetSchema, ParquetColumnMetadata and ColumnReaderFactory. Now even if the file has the column as REQUIRED, but we need it as OPTIONAL, the nullable column reader and nullable value vector would be created 5. Added "() -> 1" initialization for definitionLevels in PageReader so that nullable column reader would be able to read REQUIRED columns 6. Added testEnforcingOptional* test cases in TestParquetPartiallyMissingColumns
@jnturton @paul-rogers could you please take a look? Drill has a hidden issue caused by the following change: This change caused a little-noticeable problem, that makes
This change was definitely made intentionally, but the purpose of the change is not clear to me. @ychernysh and I think this change needs to be reverted, but maybe we missed something. At this moment some of deltalake test fails due to this change, but before we move on, we would like to know, whether it's a good idea to keep using |
@rymarm, thanks for this fix. I'm a bit rusty on Drill, but let's see what I can sort out. This stuff is complex, so I'm going to throw a wall of text at you so we get on the same page. First some background. If memory serves, the Parquet reader was written quickly by an intern early on: it is so complex that few have summoned the effort to understand or improve it. So, digging into it is a classic "legacy code" experience. In the early days, every single format reader created its own way to write data to vectors. This lead to all manner of bugs (getting the code right once is hard, doing it a dozen times is impossible). It also resulted in inconsistent behavior. To solve these (and to avoid Drill's memory fragmentation issues of the time), we created EVF (extended vector framework) to unify how we write to vectors and how we solve schema consistency issues for readers. EVF replaced the older column writer mechanisms. By now, all Drill readers except Parquet are based on EVF. However, the Parquet reader is special: it is the only one that still uses the original, fragile mechanisms. As the story goes, Parquet was written very quickly by an intern, and the result was so complex that few people have been brave enough to try to sort out the code. Since Parquet still uses the old mechanisms, it has its own way to solve schema issues, its own way to handle unprojected columns, and still suffers from the bugs in the original, fragile mechanisms. It looks like your PR tries to fix some of these. In particular, it may be that that you're trying to fix a bug in Parquet that EVF solves for other readers. It would be great if your fix is consistent with EVF. I'll try to check this when I review the code. What we really need is for someone to take on a "weekend project" to rewrite the Parquet reader to use EVF so we have one complete schema mechanism rather than two inconsistent versions. (I can dream.) Now let's look at the bug in question. You received a schema change error. This means that some operator in the DAG saw two different schemas for the same table. In particular, the SORT operator can't handle the case of, say, Most DB and query tools impose a schema at read time. That is, if we have files that have undergone schema evolution, as above, we work out at plan time that the common schema is, say Of course, Drill's claim to fame is that it is schema-free: it works out the schema on the fly. That's all cool, except that Drill also supports operators that only work for a single schema. (Joins have the same issue.) That is, we promote a really cool feature, but we can't actually make it work. This is great marketing, but causes frustration in reality. And, as befits any bit of software that has a few miles on its tires, Drill has multiple ways to work around the schema issue. There is a "provided schema" feature that tells readers the common schema. The reader's job is then to map the actual file columns into that schema. The provided schema is supported in EVF, but not, alas, by the Parquet reader. Also, the planner never got around to supporting schemas, so provided schemas only work in rather special cases. There is also a Drill Metastore feature that emulates the Hive Metastore (HMS), but be in common use. The original way to manage schemas, for Parquet only, is to scan all the files and build up a JSON file that contains the union of all the schemas. I can't recall if this mechanism was supposed to provide type consistency, but it certainly could: we just look at the Now, one might say, hold on, isn't there an easy answer? If we read file A and get one schema, then read file B and get another schema, can't we blend them on the fly? Indeed, in some cases this is possible, and EVF implements those cases. However as I'm fond of saying, "Drill can't predict the future", so there are cases where we get it wrong. For example, the query asks for columns Plus, remember, Drill is distributed. Those two files might have been read by different fragments running on different machines. They can't communicate until the rows come together in the sort, after which it is too late to fix the problem. One other tricky item to remember: Drill is fastest when columns are There is a reason that SQL DBs, for 50+ years, have required a schema. It isn't that all those folks were backward, it is that SQL doesn't work without a common schema. (Apache Druid has a similar problem, but it solves it by treating all values as, essentially, untyped: the values change type on the fly as needed.) When faced with bugs of the kind here, it is important to sort out which are just "bad code" bugs and which are the "this design just can't work" bugs. Now, with that background, we can try to sort out the problem you are trying to solve. Does your test case have Parquet files with differing column sets or types? If so, let's identify exactly how the schemas differ, then we can discuss possible solutions. Looking at the specific line you pointed out, I'll hazard a guess as to what it is doing: that case we discussed above. Suppose our SQL is Given this, I don't think the problem is with the column name (which is what that referenced code change handles). The code change in question allowed handling a column name of the form Instead, the problem may be with the part of that code that did not change: Drill is trying to predict the future and predicting that when the missing column appears, it will be Where does this leave us? If you can pin things down to one very specific case, we can sort out if it is a "bad code" bug or a "that just won't work given Drill's design" bug. In particular, reading Parquet files with inconsistent schemas, projecting the inconsistent columns, and adding a SORT won't work unless the missing columns will be of type On the other hand, you can have as much inconsistency as you want as long as the columns you project appear in all files and the type of those columns stays the same. Feel free to add as many other, inconsistent, columns as you like: just don't project them in queries with a SORT. I'd suggest that, since Drill doesn't handle Parquet schema changes well (though that is Drill's compelling feature), we maybe should not test stuff that can't actually work. Test with files with a consistent schema instead. Or, if files have an inconsistent schema, test with the Parquet schema feature enabled and after doing a scan of all the files. (I forget the command: This stuff is horribly complex, and I may have missed the mark on this particular bug. But, at least we're now on the same page. |
String colName = col.getAsUnescapedPath(); | ||
MaterializedField tableField = tableSchema.column(colName); | ||
TypeProtos.MinorType type = tableField == null ? TypeProtos.MinorType.INT : tableField.getType().getMinorType(); | ||
MaterializedField field = MaterializedField.create(colName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good change: we are propagating the old column type. This is consistent with EVF.
However, this stuff is horribly complex. If we reuse the column, we must reuse the actual value vector. Otherwise, you'll get crashes in the downstream operators that are bound to that vector. The binding is redone only on a schema change. But, your fix avoids the schema change, and hence prevents the rebinding.
Also, note that this fix works ONLY in one direction (column appears, then disappears), and ONLY within a single thread: it can't solve the same problem if the two files are read in different threads and sent to the SORT to reconcile.
Further, we are changing the mode to OPTIONAL
as required so we can fill the vector with NULL
values. However, change of mode (i.e. nullability) is a schema change and will cause the SORT to fail. We have to have known, on the previous file, that the column will be missing in this file, so that we can create the original column as OPTIONAL
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see this comment.
Hi @paul-rogers ,
The thing is that regardless of whether we did or did not succeed with "guessing" the major type for the missing column, the solution won't work until we solve the backticks problem, because essentially instead of creating the missing column (unquoted), we currently create a brand new column (quoted) which has nothing to do with the "real" missing one. Please see my
Note that all the major types here are guessed correctly (
Thanks to AbstractParquetRowGroupScan's schema, which this (DRILL-8508) solution is based on, all readers in all fragments on all machines are aware about the overall (constructed from all parquet files to read) table schema. Foreman scans the footers of all the files and merges a single schema from them back in the planning phase and then sends it to each fragment. This is exactly what gives us the opportunity to "see the future" at reading phase, because all the metadata (even for future files) is already available and merged into a single schema.
Exactly, and that is why I added this enforcing OPTIONAL logic. That is, even if the particular parquet reader is going to read REQUIRED parquet column, we enforce it to put it in NULLABLE value vector to get consistent schema with missing column in some file. Note that we are able to so, because we know at the reading phase that the column is partially missing thanks to the aforementioned schema propagated to all readers. |
@paul-rogers
The files are spread evenly across all DataNodes and when I run a query, I see (in each Drillbit's logs, and in query profile) that Drill reads in parallel in 2 Drillbits. I run such queries:
They all succeeded. Without the fix, Drill would fail. And note that we need all of the 3 solutions provided in this PR to make all of them pass:
The main idea of DRILL-8508 solution is that since we scan footers of all the parquet files to read back at the planning phase in Foreman, we should already know what columns are (partially) missing and what are not. Knowing that Note about tests: Unit tests for the fix, however, require having specific file read order. This is due to some operators such |
@ychernysh, thanks for the clarification of the column name issue. I agree that the name stored in the vector should not contain backticks. I am unaware of any code in the Foreman that scans all the Parquet files. Is that something new? Doing that in the Foreman places extreme load on the Foreman, causes remote reads in Hadoop, and will be slow. Drill relies on statistical averages to balance load: each node does about the same amount of work. Doing a large prescan in the Foreman invalidates this assumption. The feature is fine for Drill run as a single process on a single machine for a single user. It will cause hotspots and resource issues on a busy, distributed cluster. The old, original Parquet schema cache did, in fact, read all the files in a folder, but did so in a distributed fashion, and wrote the result to a JSON file. (But without locking, so that two updates at the same time led to corruption.) Are we talking about a new feature recently added in the last few years? Or, about the original Parquet schema cache? If this feature does exist, then we now have six ways that Drill handles schema (Parquet cache, provides schema, Drill Metastore, HMS, on-the-fly, and this new prescan of Parquet files). If a schema is available, then I agree with your analysis. The planner should detect column type conflicts, and if a column is missing. If a column is missing in any file, then the schema sent to the readers should be OPTIONAL for that column. Further, if some Parquet files have REQUIRED, and some OPTIONAL, then the mode should be OPTIONAL. You know the drill (pardon the pun). Note, however, that deciding on a common schema is a planner task. With EVF, the planner would provide the schema (using the provided schema mechanism) to the reader and EVF would "do the right thing" to ensure that the reader's output schema matches that defined by the planner. There are many CSV file unit tests that show this in action. Calcite does a fine job working out the required types assuming that Drill provides column type information in its metadata. Does the new Parquet prescan feed into Calcite, or is it something done on the side? If on the side, then we'll run into issues where Calcite guesses one column type (based on operations) but the Parquet schema prescan works out some other type (based on the Parquet files.) For example, if I have a query Thus, the task for Parquet should be to recreate what EVF does (since EVF already fought these battles), but using the older column writers. That is:
Note that Parquet must already have a way to reuse columns common to two files, else we'd get a schema change on each new file. (Sorry, my knowledge of the old column writers is rusty; it's been several years since I last looked at them.) In fact, given that Parquet is trying to solve the same problems as EVF, one might want to review the EVF unit tests and recreate those many cases using Parquet. "If one is ignorant of history one is doomed to repeat it." Thanks again for the explanations. Now that I better understand the problem (and am aware of that new Parquet prescan feature), I can do a better code review. |
@paul-rogers @ychernysh I'm wondering if it wouldn't be worth it to refactor the Parquet reader to use EVF2 rather than debug all this. I don't know what was involved, but I do know that refactoring all the other plugins to use EVF2 wasn't all that difficult, but I do know that Parquet is another ball game. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more "starter comments" as I wrap my head around the code.
exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
Show resolved
Hide resolved
@@ -661,6 +663,12 @@ static Map<SchemaPath, TypeProtos.MajorType> resolveFields(MetadataBase.ParquetT | |||
// row groups in the file have the same schema, so using the first one | |||
Map<SchemaPath, TypeProtos.MajorType> fileColumns = getFileFields(parquetTableMetadata, file); | |||
fileColumns.forEach((columnPath, type) -> putType(columns, columnPath, type)); | |||
// If at least 1 parquet file to read doesn't contain a column, enforce this column | |||
// DataMode to OPTIONAL in the overall table schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general rule has to be:
- For all columns that exist, define a common type that can hold all of the associated column types.
- If any column is optional (or missing), assign the OPTIONAL type -- but only if the other types are REQUIRED.
- If all columns are REPEATED, then the missing column is also REPEATED. (In Drill, a zero-length array is the same as NULL: there is no such thing as a NULL array in Drill.)
- If any column is REPEATED, and some column is OPTIONAL or REQUIRED, then choose REPEATED as the column type. Ensure that the runtime code handles the case of writing a single value into the array when we read the file with the OPTIONAL or REQUIRED column.
IIRC, EVF handles all the above for dynamic columns. If Drill had type logic in the Calcite planner, it should handle these same rules.
Again, this kind of logic requires extensive unit tests of all the cases above, plus any others you can think up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first item is about resolving different data types even if there are no missing columns, which I didn't cover.
but only if the other types are REQUIRED
- is this condition necessary?
Regarding REPEATED - I haven't covered it in any way.
In theory, implementing these should not be that hard...
...xec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
Show resolved
Hide resolved
@@ -56,10 +56,10 @@ public ParquetColumnMetadata(ColumnDescriptor column) { | |||
this.column = column; | |||
} | |||
|
|||
public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options) { | |||
public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options, boolean isEnforcedOptional) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, if we are enforcing a planner-provided schema, the job is to map whatever the Parquet type is into the given, fixed Drill type. There is only one right answer when the schema is provided. Again, see EVF for how this works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try it...
...xec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
Show resolved
Hide resolved
* appropriate data mode in this schema. Our mission here is to enforce that OPTIONAL mode in our | ||
* output schema, even if the particular parquet file we're reading from has this field REQUIRED, | ||
* to provide consistency across all scan batches. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great start! See the other cases described above.
Also, I seem to remember creating code to handle evolving column types as part of EVF. Perhaps you can find that code. The code likely has a large number of unit tests (I'm a test-driven kinda guy) which you can reuse to test your parallel implementation.
/** | ||
* Covers selecting completely missing columns from a parquet table. Should create Nullable Int | ||
* ValueVector in that case since there is no chance to guess the correct data type here. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that NULLABLE INT
is just a guess; there is nothing special about it. EVF allows the reader to define its own "default" column type. For example, for CSV, columns are NEVER INT
, so it is better to guess VARCHAR
, which is the only type CSV supports. Parquet can play the same trick if doing so is useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NULLABLE INT
is just what was there when I came to this issue. I think it would be easy to make it configurable by user (if there is any function able to parse type from string). Should we add it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "default type" for the other readers is part of the reader definition. We could create yet another system/session option for the default Parquet type.
After this change, the default type will only apply if the user requests a column that appears in none of the files. Maybe the file appeared in older files, but not newer ones, or visa versa.
Suppose the user has a tool that sends the SQL. That tool expects comments
to be VARCHAR
, but this was something added recently. When using that query against old files, the column will suddenly become INT
, perhaps breaking code that expected the VARCHAR
type.
We cannot fix this case: only a full metastore that tracks all files could solve this case. Having an overall default column type can't fix per-column issues.
So, I guess we can set this issue aside for now.
* - parquet/partially_missing/o_m -- optional, then missing | ||
* - parquet/partially_missing/m_o -- missing, then optional | ||
* - parquet/partially_missing/r_m -- required, then missing | ||
* - parquet/partially_missing/r_o -- required, then optional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining the schemas here in the test comments!
Doing so would be the ideal solution. The challenge has always been that the Parquet reader is horribly complex. I took a few cracks a refactoring it back in the day, but it is still pretty complex. The most challenging issue is that Parquet is parallel: it reads each column in a separate thread. MapR did a large number of hacks to maximize parallelism, so that code grew quite complex with many nuances to saturate threads while not using too many resources overall: that is, to maximize parallelism within a single query, but also across the "thousands of concurrent queries" that were the rage back in the day. All other readers are row-based since that is how most other data formats work. EVF is a row-based implementation. As a result, EVF would be difficult to reuse. This raises the reason that EVF was created in the first place: limiting batch size to prevent memory fragmentation. Back in the day, all readers read 64K records per batch, even if that resulted in huge vectors. EVF imposes a batch size limit, and gracefully wraps up each batch, rolling over any "excess" data to the next one. In Parquet, that logic does not exist. That is, if we have, say, 20 column writers all busy building their own vectors, there is nothing to say, "hold, on, we're over our 16 MB batch size limit". Instead, the readers just read n rows, creating whatever size vectors are required. Read 1000 columns of 1 MB each and you need a 1GB value vector. The memory fragmentation issue arises because Drill's Netty-based memory manager handles allocations up to 32 MB (IIRC) from its binary-buddy free list. Beyond that, every request comes from the OS. Netty does not release memory back to the OS if even a single byte is in use from a 32 MB block. Eventually, all memory resides in the Netty free list, and we reach the OS allocation limit. As a result, we can have 100% of the Netty pool free, but no OS capacity to allocate another 64MB vector and we get an OOM. The only recourse is to restart Drill to return memory to the OS. While we long ago fixed the fragmentation issues in the other readers (via EVF) and other operators (using the "temporary" "batch sizer" hack), it may be that Parquet still suffers from memory fragmentation issues because of its unique, parallel structure. Still, perhaps there is some way to have EVF handle the schema and vector management stuff, but to disable the row-oriented batch size checks and let the Parquet readers write as much data as they want to each vector (fragmenting memory if it chooses to do so). Or, maybe work out some way to give each column reader a "lease" to read up to x MB. EVF can handle the work needed to copy "extra" data over to a new vector for the next batch. I'd have to try to swap EVF knowledge back into the ole' brain to sort this out. All this said, I can certainly see the argument for hacking the existing code just to get done. I guess I'd just suggest that the hacks at least reuse the rules we already worked out for EVF, even if they can't reuse the code. All of this is premised on the notion that someone did, recently, add a "Parquet prescan" to the planner, and that someone added column type propagation to the Calcite planner. Was this actually done? Or, are we somehow assuming it was done? Are we confusing this with the old Parquet schema cache? Again, I've been out of the loop so I'm just verifying I'm understanding the situation. |
@paul-rogers I'm not aware of any recent significant work on the Parquet reader. I know @jnturton did some work regarding adding new compression capabilities and there have been a few bug fixes here and there, but nothing major as I recall. So I don't think we've added any Parquet "prescan" that I am aware of. |
Ah. I'm misunderstanding something. I outlined a number of the common problems we encounter when we try to figure out schema dynamically. A response suggested that this pull request solves this because it has access to the full Parquet schema in the planner. The only way to get schema that is either to use the old Parquet metadata cache, or something that scans all the Parquet files in the planner. I thought I saw a statement that such a scan was being done. To prevent further confusion, what is the source of the Parquet schema in this fix? |
Drill parallelizes scanning a parquet table at file, row group, column chunk and page levels. A single ParquetRecordReader object (which is the key part of the issue, since it creates null-filled vectors for missing columns) reads 1 row group, so this is the level we're interested in. Each Minor Fragment is assigned a list of row groups (that is, a list of ParquetRecordReader objects) to read.
As I said above, we cannot avoid it. But, for your information, the process of reading metadata from all the parquet files is parallelized into 16 threads within a Foreman JVM.
Based on the above, and on that ParquetTableMetadataProvider can use metadata cache files, I assume that we're talking about the original Parquet schema cache. But I'm not sure...
So, I guess we still have 5 ways, and let me summarize my knowledge on each:
Basically all of the above is met by this PR, but with a bit different wording (not using terms first/subsequent file, since as said before, the fix is order-agnostic):
Please sorry if I missed answering some of the questions. I am new to Drill and this is one of my first tasks on it, so I might not understand some things. Regarding refactoring to EVF: I've never seen it before and probably I'm not the best person to implement it. I will try to research it though to understand better @paul-rogers 's reviews. But I think we should at least agree on how would we treat the issue: refactoring everything to EVF (which, as far as I understand, would erase everything made in this PR), making current parquet reader simulate the EVF as much as possible or just solving the concrete problems described in the tickets. Note that this PR went the 3rd way. So before applying any edits for this PR it would be nice to understand if it would make any sense at all first :) |
@ychernysh, thank you for your detailed explanation. Let's focus in on one point.
It is surprising that none of the "second generation" Drill developers ever knew about, or mentioned that Drill scans Parquet files at plan time. Of course, it could be that I just never understood what someone was saying. We used to wrestle with inconsistent schemas all the time, so it is surprising if the solution was available the whole time. That's why, if this code exists, I suspect it must have been added ether very early (by a "first generation" developer who later left) or within the last few years. Another reason it is surprising that we have such code is the big deal we make of being "schema free." Of course, "schema free" has problems. Why would we not have mentioned that "schema free" means "infer the schema at plan time" if doing so would solve the schema inconsistency issues? Amazing... If such code exists, then it should have been integrated not just into parallelization planning, but also Calcite type propagation, and the schema included in the physical plan sent to the Parquet readers. I suppose whoever added it could have just been focused on parallelization, and hoped Drill's "magic" would handle the schema. In fact, the "missing" type propagation code is very code that you're now adding, though, it seems, without using Calcite for the type propagation. The discussion we are having depends entirely on whether schema information is available at plan time. Before I comment further, you've given me so me homework: I'll look at that code to determine if it really does scan all the file headers at plan time. |
Sorry, I don't really understand the question regarding Calcite (I'm not familiar with it). I didn't consider Calcite in my work, so, probably, your statement is correct. |
@paul-rogers, there is no new feature. This behavior of reading all parquet files metadata during the planning phase has been present for a long time. Moreover, we even have a feature called "parquet metadata cache" aimed to resolve the con of this logic
https://drill.apache.org/docs/optimizing-parquet-metadata-reading/ |
I was supposing that the Drill's key feature is that Drill takes care of the data structure and type, while the user is left only to query the data like it is a simple Porstgres or MySql table. Saying this I think it's fine to read all the available metadata of the data whenever at any query execution phase if it helps us to execute faster and with less fail chance the query and if there is no additional setup from the user required. |
Thanks again for the many pointers and explanations. Much to my surprise, Drill does, indeed, seem to read the Parquet metadata at plan time. Your explanation makes sense: clearly we need the information for parallelization planning. The Parquet metadata cache simply bypasses the physical file reads if the cache is present. I traced the whole flow. It all looks good except at the last step, writing data to a vector. That will prevent any direct attempt to have the Parquet reader write to a vector other than the one that matches the Parquet type. That may still be OK. As you can tell, my comments are based on the general schema change case: the problems we've encountered over the years. But, I suspect this PR can address one very specific case: missing columns when the type is consistent across files where it does appear. Recap of the Parquet Planning ProcessThe code is hard to follow statically. Instead, I ran the unit tests in the PR to get the dynamic view. As described above, the Here I will insert a grumble that if Drill scans the Parquet files per query to get schema, then it could certainly do so for all other file types and avoid a large number of problems. Files have a path and a timestamp: we could easily build up a per-Drillbit cache of metadata so we read it only once per session, and only if the file changes. Trying to distribute that data is the hard part. (Everyone wants to use ZK, but ZK is not a distributed DB.) End of grumble. So, we do have schema per row group. We can thus work out the common schema as is done in this PR. In general, we have to handle the following:
The code here handles missing columns and the nullable/non-nullable modes. We can worry about the scalar/array issue later as only a novice would change a column from one to the other. Because of the use of the common type code, I assumed that this PR might also handle the differing types issue, but maybe I misunderstood. As explained above, the right way to handle columns is to run them through Calcite. That way, Calcite can check for something like Thanks for adding tests that show that the type inference works as intended. It would be handy if There are three cases that work prior to this PR. Assume we have a column
This PR generalizes the third case. As it is, the "sensitive" operators will raise a schema change exception. To fix the SCEs we need to handle all the cases in which the schema can change so that all readers return the same schema. Cases:
A review response comment suggests that this PR fixes cases 4, which seems like the most reasonable form of schema change. Changing column types is just asking for trouble with tools such as Presto or Impala. Still, let's think through all the cases. There is one additional case, just to make this more fun. The query can be of the form Once we have the common schema, we need to ship it to the distributed readers. It looks like As it turns out, Drill has not just one, but two different Parquet readers: So far so good. The ProblemBut, now all heck breaks loose. What we want is for each row group to honor the planner-provided schema. But, this won't work. What we want is that the row group either has column But, the Parquet column readers assume that the type of the Drill vector is the same as that of Parquet columns. As it turns out, there are two of sets of Parquet column writers: the original ones and a second set of "bulk" readers. I don't know if both are still used. Eventually, the Parquet column readers call down to the following line from protected void writeData() {
vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength);
} That is, the Drill vectors have been carefully chosen so that their byte layout of the vector matches the byte layout of Parquet. We read, say, The class This code is complex, so I might be missing something. More testing would reveal the facts. It could be that writing a non-nullable column into a nullable vector works: I'm just not sure what sets the null bits to the correct values. Maybe they take on the correct values by default, if we zero out the null vector at allocation. Handling Only Missing ColumnsSince the Parquet reader is not set up to handle type changes, we can choose to restrict this fix to handle only missing columns. That is, if a column is missing, and it has the same type when it does appear, only then change the mode to nullable. Otherwise, let the schema change happen and the query will fail as today. Such a fix solves the issue for your case, but does not solve the general problem. This is still an improvement and worth doing. We just need to ensure that if Parquet thinks it is writing to a non-nullable vector, but the vector is nullable, that the code "does the right thing." Nullable vectors are a composite: they have a null vector and a data vector. Parquet should use the data vector. We just need to ensure that we somehow set the null vector bits correctly. It may be that vector allocation zeros the vectors and so the null vector starts in the proper state. Since the tests pass, this may be the case. Alternative Solution for Type ChangesSuppose we wanted to handle type changes as well. From the above description, we can't solve the problem by forcing Parquet to choose a different type of vector than the one for the column type. (I mentioned that this is possible when using EVF and suggested we could do the same for Parquet. It turns out that Parquet will not allow this solution.) There are two possible solutions, one very hard, the other only hard. (The third, of course, is to skip this task.) The first option is to create a type conversion mechanism as described above. These are tedious to write (we'd use code generation) and just as tedious to test. This seems an excessive amount of work to solve a problem which occurs infrequently. The second option is simply to alter the execution plan to insert a PROJECT operator on top of the SCAN operator. We could set a flag that says that the PROJECT is needed. This flag would indicate that we found a type inconsistency. The PROJECT operator ( The trick is that adding the PROJECT is, itself, complex. Basically, there needs to be a Calcite rule that rewrites a scan batch with our flag set to a project with the needed We would want to verify that the PROJECT does what it is supposed to do: emit the same vectors even when it receives a schema change from its upstream (SCAN) operator. SuggestionGiven the above facts (which you should certainly) verify, I suggest we choose the "Handling Only Missing Columns" solution and not attempt to handle column type changes. That is, don't attempt to fix the cases 3-5 (differing types) in this PR. Leave that as a future exercise. If you found this bug due to an actual use case. ensure that the use case does not change column data types. If the type of a column is to change, just create a new column name instead. For example, if Does this seem a reasonable approach? |
This PR fixes not just the type issues above, but also the name issue and the untyped null issue. Let's not forget those. As explained earlier, you are right that backticks should never appear at runtime. Backticks are a SQL only feature. From the comment you mentioned, it could be that the We can change the code to fix the bug you discovered. We should, however, make sure we have a test for the name-with-dot/column-within-map case. We can perhaps look for unit tests associated with DRILL-4264 to understand what the original fix was trying to do. |
The other fix is to change the type of a missing column from Untyped Null to Nullable First, let me make sure I understand. Prior to this PR, did Parquet use Untyped Null for the type of a missing column? And, after this fix, if a column is missing from all files, it will be Nullable The purpose of Untyped Null is to say, "hey, we don't know the type." This is a "good thing". It is a hack that Drill will "helpfully" guess nullable Is it possible to leave the type of a "completely missing" column as Untyped Null? |
/* | ||
Field name for the missing column MUST NOT be quoted with back-ticks, so we should have ONLY ONE | ||
column for that field (unquoted) | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The preferred style for comments is the way the class header comment is formatted:
/*
* Field name for the missing column MUST NOT be quoted with back-ticks, so we should have ONLY ONE
* column for that field (unquoted)
*/
/** | ||
* Covers selecting completely missing columns from a parquet table. Should create Nullable Int | ||
* ValueVector in that case since there is no chance to guess the correct data type here. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "default type" for the other readers is part of the reader definition. We could create yet another system/session option for the default Parquet type.
After this change, the default type will only apply if the user requests a column that appears in none of the files. Maybe the file appeared in older files, but not newer ones, or visa versa.
Suppose the user has a tool that sends the SQL. That tool expects comments
to be VARCHAR
, but this was something added recently. When using that query against old files, the column will suddenly become INT
, perhaps breaking code that expected the VARCHAR
type.
We cannot fix this case: only a full metastore that tracks all files could solve this case. Having an overall default column type can't fix per-column issues.
So, I guess we can set this issue aside for now.
Hi @paul-rogers , Different types. This PR DOES NOT handle different minor types, indeed. The main focus is on missing columns and different cardinalities (though I missed REPEATED case). I agree with all you say about the complexity of this task and support the suggestion to handle it as future exercise outside of this PR. Grumble. Isn't that what Star query case. The DrillParquetReader. Indeed, Drill seems to have 2 parquet readers, but this one seems to be unsupported, see this config. That's why I also ignored it here. Passing type information to readers. See how I passed type information to ParquetRecordReader through Reading non-nullable parquet column into a nullable vector. With this I actually had some problems, but managed to solve them with these changes:
Column name with dots. I will try to play around such cases and maybe add some tests... UntypedNull. Well, it's not really that I intentionally replaced |
@paul-rogers
|
@ychernysh, thank you for the detailed explanation. It is impressive how well you understand this complex code. Scope: We're clear now that this PR is not trying to handle conflicting types. Yes, in my example, I was suggesting that the person who creates the Parquet files manage column types themselves. There is, of course, another workaround that I did not mention: the SQL user can (I believe) insert the needed casts. Suppose that we have a set of files where the types differ for three columns, but we know a common type. We can coerce the types manually: SELECT a, b, c
FROM (SELECT CAST(a AS, DOUBLE), CAST(b AS VARCHAR), CAST(c AS INT) FROM myParquet)
ORDER BY a, b The above should insert that PROJECT I mentioned. In an ideal world, Drill would figure this out from the Parquet metadata. As we said, this can be left as a future project for another time. Wildcard Query: Ideally, if I have a Parquet file with columns SELECT a, b FROM (SELECT * FROM myParquet) ORDER BY a, b
SELECT a, b FROM myParquet ORDER BY a, b That is, it should not matter how we learn we will scan column The code you mentioned reflects the original schema-on-read design: the wildcard is expanded for each row group at run time. This is one reason I was surprised that we gather the schema at plan time. Now that it is clear that Parquet does have the schema at plan time, we can work out the union of all columns from all files at plan time. We can sort out the types of missing columns. We can then tell readers that It is clear that we've got a mess. Drill started as schema-on-read. Then, it was found that, for Parquet (only) we need schema at plan time. But, the folks that added that code didn't fully think through the design. The result is a huge muddle that you are now starting to sort out. Suggestion: let's leave proper wildcard expansion to another time. You are fixing this bug for a reason: for some use case. If your use case does not use wildcard queries, then it is safe to defer this issue until someone actually needs a fix. Reading non-nullable parquet column into a nullable vector: Thanks for ensuring we set the null vector correctly. Sounds like this part is good. Passing type information to readers: I saw your fix. That is why I mentioned that we now have two lists of columns given to the reader: rowGroupScan.getColumns(), // Columns as SchemaPath
...
// each parquet SubScan shares the same table schema constructed by a GroupScan
rowGroupScan.getSchema()); // Same list of columns as above, but as TupleMetadata? As a reader, I have to try to understand: are the two column lists the same? Is the order the same? Is the You are adding code to an existing implementation, and so you want to avoid changing things any more than necessary. Having redundant lists is messy, but probably the simplest fix. Suggestion: Maybe just add a comment about the assumed relationship between the two lists. UntypedNull (Part 1): Thanks for the detailed explanation. I appreciate the time you've put into fully understanding the convoluted logic. When faced with complex legacy code, I find it helpful to ask, what is this trying to do? The code itself is the ultimate truth, and we have to start there. But, to sort out what should be happening, we have to work out the developer's intent, and figure out if they made a mistake or omitted some important condition. You pointed out that we do two checks. In the first one : public static TypedFieldId getFieldId(ValueVector vector, int id, SchemaPath expectedPath, boolean hyper) {
if (!expectedPath.getRootSegment().getPath().equalsIgnoreCase(vector.getField().getName())) {
return null;
} This code says, here is a The core question is: what use case is this meant to handle? I can speculate that there are two possible cases. First is the top-level fields. For top level fields, the names should always match. By the time we get here, we should have created any needed "dummy" top-level vectors. You correctly fixed a case where the top level did not match. I speculate that this code is meant to handle a second case: a column within a Suggestion: your fix is probably fine. Please check if we have a test for the map case above. If we don't, consider adding one, just so we verify that this PR doesn't break anything. UntypedNull (Part 2): Next, let's understand what should happen if a column is missing. We have four cases:
Your fix handles case 2: we will now correctly use the type of existing columns. So, we're good here. Case 1 is where the Nullable Case 3 is rather special. EVF has some rather complex logic to handle missing map columns (to any depth). Your fix relies on code in the planner to work out the type for case 2 (top-level columns). Does that code handle nested columns? If so, we just do the same as we do for case 2. However, if the planner code treats all maps as a single column (does not look inside), then maybe we just leave the existing code to do whatever it already does in this case. Case 4 depends on what we do in case 1 and 3. The "right" answer is for the missing column to be of type Untyped NULL. But, this case is obscure, so we can leave it to do whatever it currently does. Suggestion: handle case 1 as described above, and just test that cases 3 and 4 still work however they used to work. |
@paul-rogers when I have time, I'll do some experiments and investigations on the discussed topics and get back to you |
DRILL-8507: Missing parquet columns quoted with backticks conflict with existing ones
DRILL-8508: Choosing the best suitable major type for a partially missing parquet column
Description
This PR aims to solve 2 separate Drill jiras at once: DRILL-8507 and DRILL-8508. They were reported separately because they solve different issues, but the coupling here is that DRILL-8508 depends on DRILL-8507 and requires it to be fixed first. But since in terms of code they share the same place, I decided to bring the commits in a single PR so it would be easier to review the code and understand the underlying motivations. Please review the 3 commits separately in order.
DRILL-8507 Missing parquet columns quoted with backticks conflict with existing ones
In this commit I replace
col.toExpr()
call withcol.getAsUnescapedPath()
inParquetSchema#createMissingColumn()
to get rid of some confusions when comparing quoted and unquoted field name. If the names itself are identical, the comparing should return true, but the difference in quotes make it return false. Obviously it is a bug.However, the decision to use
col.Expr()
was made consciously based on the comment above that line. But I didn't really understand the motivation to do so... If anyone from the community can explain it, I would appreciate that.Such change leads to a few regressions seen in unit tests. We should probably adjust the tests (because they relied on a bug), but I'm still not sure how exactly to do this properly. Here is how and why I've done it in the commit:
testCoalesceWithUntypedNullValues*
test methods because now we will always return Nullable Int for missing columns instead of Untyped Null. Other way to make these tests pass is to use CAST to cast the missing column to varchar, but it doesn't really make a lot of sense since based on the method name it expects exactly the Untyped NulltestCoalesceOnNotExistentColumns*
test methods to a separate TestParquetMissingColumns.java class and made them expect Nullable Int (for that same reason as above)TestParquetPartiallyMissingColumns is a new test class that brings unit tests for this exact fix. Check them out to understand what is the problem with backticking missing column. Also note that each of the following commits bring its own test cases in that class.
DRILL-8508 Choosing the best suitable major type for a partially missing parquet column (minor type solution)
This one is pretty simple: take that GroupScan table schema (that is merged from all of the parquet files to read) and pick the minor type for missing column from there
DRILL-8508 Choosing the best suitable major type for a partially missing parquet column (data mode solution)
Here I added the logic to enforce OPTIONAL data mode on ALL readers if we found a column missing or OPTIONAL. We use the same schema from the previous commit to catch such cases.
Note that I added synchronization in
Metadata
to properly resolve data modes (always choose least restrictive). I have tested a performance after that and didn't notice any significant impact.Documentation
Since these changes only make Drill handle the cases that it was not able to handle before without any user interaction, there is no documentation needed.
Testing
Note that all of the 3 commits bring their own test cases in TestParquetPartiallyMissingColumns file. I added parquet data files in test resources for them.