Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-8507, DRILL-8508 Better handling of partially missing parquet columns #2937

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

ychernysh
Copy link

@ychernysh ychernysh commented Aug 28, 2024

DRILL-8507: Missing parquet columns quoted with backticks conflict with existing ones

DRILL-8508: Choosing the best suitable major type for a partially missing parquet column

Description

This PR aims to solve 2 separate Drill jiras at once: DRILL-8507 and DRILL-8508. They were reported separately because they solve different issues, but the coupling here is that DRILL-8508 depends on DRILL-8507 and requires it to be fixed first. But since in terms of code they share the same place, I decided to bring the commits in a single PR so it would be easier to review the code and understand the underlying motivations. Please review the 3 commits separately in order.

DRILL-8507 Missing parquet columns quoted with backticks conflict with existing ones
In this commit I replace col.toExpr() call with col.getAsUnescapedPath() in ParquetSchema#createMissingColumn() to get rid of some confusions when comparing quoted and unquoted field name. If the names itself are identical, the comparing should return true, but the difference in quotes make it return false. Obviously it is a bug.
However, the decision to use col.Expr() was made consciously based on the comment above that line. But I didn't really understand the motivation to do so... If anyone from the community can explain it, I would appreciate that.
Such change leads to a few regressions seen in unit tests. We should probably adjust the tests (because they relied on a bug), but I'm still not sure how exactly to do this properly. Here is how and why I've done it in the commit:

  1. TestCaseNullableTypes: removed testCoalesceWithUntypedNullValues* test methods because now we will always return Nullable Int for missing columns instead of Untyped Null. Other way to make these tests pass is to use CAST to cast the missing column to varchar, but it doesn't really make a lot of sense since based on the method name it expects exactly the Untyped Null
  2. TestParquetFilterPushDown: simply use CONVERT_TO to workaround NumberFormatException (because, again, now we have Nullable Int instead of Untyped Null)
  3. TestUntypedNull: moved testCoalesceOnNotExistentColumns* test methods to a separate TestParquetMissingColumns.java class and made them expect Nullable Int (for that same reason as above)
  4. DeltaQueriesTest (contrib/format-deltalake): some tests fail here after the fix because they use specific implicit columns, not listed here. There is no solution for these tests in this PR and we should probably handle it as a separate task (because it is probably something wrong with delta lake plugin)

TestParquetPartiallyMissingColumns is a new test class that brings unit tests for this exact fix. Check them out to understand what is the problem with backticking missing column. Also note that each of the following commits bring its own test cases in that class.

DRILL-8508 Choosing the best suitable major type for a partially missing parquet column (minor type solution)
This one is pretty simple: take that GroupScan table schema (that is merged from all of the parquet files to read) and pick the minor type for missing column from there

DRILL-8508 Choosing the best suitable major type for a partially missing parquet column (data mode solution)
Here I added the logic to enforce OPTIONAL data mode on ALL readers if we found a column missing or OPTIONAL. We use the same schema from the previous commit to catch such cases.
Note that I added synchronization in Metadata to properly resolve data modes (always choose least restrictive). I have tested a performance after that and didn't notice any significant impact.

Documentation

Since these changes only make Drill handle the cases that it was not able to handle before without any user interaction, there is no documentation needed.

Testing

Note that all of the 3 commits bring their own test cases in TestParquetPartiallyMissingColumns file. I added parquet data files in test resources for them.

…h existing ones

1. In ParquetSchema#createMissingColumn replaced col.toExpr() to col.getAsUnescapedPath() so that missing column name wouldn't be quoted with backticks
2. Fixed a typo in UnionAllRecordBatch ("counthas" -> "counts")
3. In TestParquetFilterPushDown workarounded NumberFormatException with CONVERT_TO
4. Removed testCoalesceWithUntypedNullValues* test methods from TestCaseNullableTypes
5. Moved testCoalesceOnNotExistentColumns* test methods from TestUntypedNull to a separate TestParquetMissingColumns and made them expect Nullable Int instead of Untyped Null
6. Created new TestParquetPartiallyMissingColumns test class with test cases for "backticks problem"
…ing parquet column (minor type solution)

1. Passed an overall table schema from AbstractParquetRowGroupScan to ParquetSchema
2. In ParquetSchema#createMissingColumn used the minor type from that schema instead of hardcoding the INT
…ing parquet column (data mode solution)

1. Added TypeCastRules#getLeastRestrictiveMajorType method for convenience
2. In Metadata, added resolving data mode (so it always prefer less restrictive one) when collecting file schemas and merging them into a single table schema. Synchronized merging to accomplish that
3. In ParquetTableMetadataUtils made the column either found OPTIONAL or missing in any of the files be OPTIONAL in the overall table schema
4. For such cases, added enforcing OPTIONAL data mode in ParquetSchema, ParquetColumnMetadata and ColumnReaderFactory. Now even if the file has the column as REQUIRED, but we need it as OPTIONAL, the nullable column reader and nullable value vector would be created
5. Added "() -> 1" initialization for definitionLevels in PageReader so that nullable column reader would be able to read REQUIRED columns
6. Added  testEnforcingOptional* test cases in TestParquetPartiallyMissingColumns
@cgivre cgivre added the json JSON Format label Aug 28, 2024
@rymarm rymarm requested review from cgivre, jnturton and rymarm and removed request for paul-rogers August 29, 2024 09:13
@rymarm rymarm added bug and removed json JSON Format labels Aug 29, 2024
@rymarm rymarm requested review from paul-rogers and removed request for cgivre August 29, 2024 09:15
@rymarm
Copy link
Member

rymarm commented Aug 29, 2024

@jnturton @paul-rogers could you please take a look?

Drill has a hidden issue caused by the following change:
https://github.com/apache/drill/pull/909/files#diff-437f42bceabae9bccd3ff19064898dc0d03df3dd40fcdbe5051c9fcced433c94R245-R247

This change caused a little-noticeable problem, that makes ParquetSchema#createMissingColumn to create a "dummy" column but with backticks in the name of the column:

Error: UNSUPPORTED_OPERATION ERROR: Schema changes not supported in External Sort. Please enable Union type.
Previous schema: BatchSchema [fields=[[`age` (INT:OPTIONAL)]], selectionVector=NONE]
Incoming schema: BatchSchema [fields=[[`age` (INT:OPTIONAL)], [``age`` (INT:OPTIONAL)]], selectionVector=NONE]

This change was definitely made intentionally, but the purpose of the change is not clear to me. @ychernysh and I think this change needs to be reverted, but maybe we missed something.

At this moment some of deltalake test fails due to this change, but before we move on, we would like to know, whether it's a good idea to keep using col.getAsUnescapedPath() instead of col.toExpr()

@paul-rogers
Copy link
Contributor

@rymarm, thanks for this fix. I'm a bit rusty on Drill, but let's see what I can sort out. This stuff is complex, so I'm going to throw a wall of text at you so we get on the same page.

First some background. If memory serves, the Parquet reader was written quickly by an intern early on: it is so complex that few have summoned the effort to understand or improve it. So, digging into it is a classic "legacy code" experience.

In the early days, every single format reader created its own way to write data to vectors. This lead to all manner of bugs (getting the code right once is hard, doing it a dozen times is impossible). It also resulted in inconsistent behavior. To solve these (and to avoid Drill's memory fragmentation issues of the time), we created EVF (extended vector framework) to unify how we write to vectors and how we solve schema consistency issues for readers. EVF replaced the older column writer mechanisms. By now, all Drill readers except Parquet are based on EVF.

However, the Parquet reader is special: it is the only one that still uses the original, fragile mechanisms. As the story goes, Parquet was written very quickly by an intern, and the result was so complex that few people have been brave enough to try to sort out the code. Since Parquet still uses the old mechanisms, it has its own way to solve schema issues, its own way to handle unprojected columns, and still suffers from the bugs in the original, fragile mechanisms. It looks like your PR tries to fix some of these. In particular, it may be that that you're trying to fix a bug in Parquet that EVF solves for other readers. It would be great if your fix is consistent with EVF. I'll try to check this when I review the code.

What we really need is for someone to take on a "weekend project" to rewrite the Parquet reader to use EVF so we have one complete schema mechanism rather than two inconsistent versions. (I can dream.)

Now let's look at the bug in question. You received a schema change error. This means that some operator in the DAG saw two different schemas for the same table. In particular, the SORT operator can't handle the case of, say, (a: INT, b: VARCHAR, c: double) and (a: INT, b: VARCHAR) or (a: INT: b: INT), even if our query only has ...ORDER BY a. There was discussion about using UNION columns, or dynamically reformatting data. But, all of that was far too complex (and slow) to actually build.

Most DB and query tools impose a schema at read time. That is, if we have files that have undergone schema evolution, as above, we work out at plan time that the common schema is, say (a: INT, b: VARCHAR, c: DOUBLE), with c being OPTIONAL (NULLABLE in standard DB parlance) since it appears in only some of the files. For example, Presto and Impala avoid the issue by reading into the correct common schema in the first place, rather than cleaning up the mess later in each downstream operator.

Of course, Drill's claim to fame is that it is schema-free: it works out the schema on the fly. That's all cool, except that Drill also supports operators that only work for a single schema. (Joins have the same issue.) That is, we promote a really cool feature, but we can't actually make it work. This is great marketing, but causes frustration in reality.

And, as befits any bit of software that has a few miles on its tires, Drill has multiple ways to work around the schema issue. There is a "provided schema" feature that tells readers the common schema. The reader's job is then to map the actual file columns into that schema. The provided schema is supported in EVF, but not, alas, by the Parquet reader. Also, the planner never got around to supporting schemas, so provided schemas only work in rather special cases. There is also a Drill Metastore feature that emulates the Hive Metastore (HMS), but be in common use.

The original way to manage schemas, for Parquet only, is to scan all the files and build up a JSON file that contains the union of all the schemas. I can't recall if this mechanism was supposed to provide type consistency, but it certainly could: we just look at the c column in all schemas of all Parquet files in a directory tree and work out a common type. We do this at plan time and send that type along to Parquet in the physical plan. I didn't work in that area, so my knowledge here is a bit light. It's worth a check.

Now, one might say, hold on, isn't there an easy answer? If we read file A and get one schema, then read file B and get another schema, can't we blend them on the fly? Indeed, in some cases this is possible, and EVF implements those cases. However as I'm fond of saying, "Drill can't predict the future", so there are cases where we get it wrong. For example, the query asks for columns (a, b, c). The first file has (a, b) and creates a dummy column for c that will hold all NULLs. But, of what type? Drill traditionally chooses INT. Then, we read file 2 that has (a, b, c: VARCHAR). Now we realize that c should have been VARCHAR, but we've sent a batch of rows downstream already with the "wrong" INT type.

Plus, remember, Drill is distributed. Those two files might have been read by different fragments running on different machines. They can't communicate until the rows come together in the sort, after which it is too late to fix the problem.

One other tricky item to remember: Drill is fastest when columns are REQUIRED (i.e. non-nullable or NOT NULL). This avoids the NULL check on each column operation. BUT, Drill also allows schema to vary, and we fill in missing columns with NULL values. So, unless we have some guarantee that column c actually exists in all files read by a query, we are pretty much forced to make c be OPTIONAL (nullable), even when reading a file that contains c as a NOT NULL column. Again, Drill can't predict the future, so we can't know that some future file (in, perhaps, some other fragment on some other machine) will read a file that doesn't have c.

There is a reason that SQL DBs, for 50+ years, have required a schema. It isn't that all those folks were backward, it is that SQL doesn't work without a common schema. (Apache Druid has a similar problem, but it solves it by treating all values as, essentially, untyped: the values change type on the fly as needed.) When faced with bugs of the kind here, it is important to sort out which are just "bad code" bugs and which are the "this design just can't work" bugs.

Now, with that background, we can try to sort out the problem you are trying to solve. Does your test case have Parquet files with differing column sets or types? If so, let's identify exactly how the schemas differ, then we can discuss possible solutions.

Looking at the specific line you pointed out, I'll hazard a guess as to what it is doing: that case we discussed above. Suppose our SQL is SELECT a, b FROM 'my-dir'. Suppose my-dir is a directory that contains two files. file1.parquet contains column (a: VARCHAR) and file2.parquet contains (a: VARCHAR, b: VARCHAR). Both are read by the same reader (the scan is not distributed in this case.) File read order is random. Suppose we read them in the order (file2, file1). We can set up a schema of (a: VARCHAR, b: VARCHAR). When we read file1, we notice that 'b' is missing, but that, when it appeared previously, it was VARCHAR, so we keep that type and fill with nulls. (This is what EVF does, I'll speculate that Parquet's reader does something similar.) Great. What if we read the other order? We see we want (a, b), but we have only (a: VARCHAR), so we create that vector. What about b? Well, we helpfully create (b: OPTIONAL INT). That is exactly what the code that you pointed to does. When we read file2, we see that b has "changed schema" to VARCHAR so we throw away the (a: VARCHAR, b: INT) schema and start reading with (a: VARCHAR, b: VARCHAR). This then blows up the SORT.

Given this, I don't think the problem is with the column name (which is what that referenced code change handles). The code change in question allowed handling a column name of the form foo.a where foo is not a MAP, with a as a member, but just a (poorly chosen) name of a column. That is, the problem is probably not that the test Parquet file columns are actually named foo.a and foo.b (with dots). You can try changing the code and rerunning the test, but I suspect that the problem won't go away unless you happen to run a test that reverses the file read order.

Instead, the problem may be with the part of that code that did not change: Drill is trying to predict the future and predicting that when the missing column appears, it will be INT. It may be that Drill is predicting incorrectly. We need to use the "crystal ball" module to improve the prediction. Sadly, however, we never got around to writing that module. Hence the multiple attempts to manage schemas globally.

Where does this leave us? If you can pin things down to one very specific case, we can sort out if it is a "bad code" bug or a "that just won't work given Drill's design" bug. In particular, reading Parquet files with inconsistent schemas, projecting the inconsistent columns, and adding a SORT won't work unless the missing columns will be of type INT when they appear. You can get lucky with file read order and work distribution so that, sometimes, it does work. Relying on luck produces flaky tests, however.

On the other hand, you can have as much inconsistency as you want as long as the columns you project appear in all files and the type of those columns stays the same. Feel free to add as many other, inconsistent, columns as you like: just don't project them in queries with a SORT.

I'd suggest that, since Drill doesn't handle Parquet schema changes well (though that is Drill's compelling feature), we maybe should not test stuff that can't actually work. Test with files with a consistent schema instead. Or, if files have an inconsistent schema, test with the Parquet schema feature enabled and after doing a scan of all the files. (I forget the command: ANALYZE maybe?) It may be that the Parquet schema feature unifies column types, but then, given the haste with which it was written way back when, maybe it doesn't. Or, to be more modern, test with the Drill Metastore enabled.

This stuff is horribly complex, and I may have missed the mark on this particular bug. But, at least we're now on the same page.

String colName = col.getAsUnescapedPath();
MaterializedField tableField = tableSchema.column(colName);
TypeProtos.MinorType type = tableField == null ? TypeProtos.MinorType.INT : tableField.getType().getMinorType();
MaterializedField field = MaterializedField.create(colName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change: we are propagating the old column type. This is consistent with EVF.

However, this stuff is horribly complex. If we reuse the column, we must reuse the actual value vector. Otherwise, you'll get crashes in the downstream operators that are bound to that vector. The binding is redone only on a schema change. But, your fix avoids the schema change, and hence prevents the rebinding.

Also, note that this fix works ONLY in one direction (column appears, then disappears), and ONLY within a single thread: it can't solve the same problem if the two files are read in different threads and sent to the SORT to reconcile.

Further, we are changing the mode to OPTIONAL as required so we can fill the vector with NULL values. However, change of mode (i.e. nullability) is a schema change and will cause the SORT to fail. We have to have known, on the previous file, that the column will be missing in this file, so that we can create the original column as OPTIONAL.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see this comment.

@ychernysh
Copy link
Author

Hi @paul-rogers ,
Thanks for your comments! Let me clarify some points here:

Given this, I don't think the problem is with the column name (which is what that referenced code change handles).

The thing is that regardless of whether we did or did not succeed with "guessing" the major type for the missing column, the solution won't work until we solve the backticks problem, because essentially instead of creating the missing column (unquoted), we currently create a brand new column (quoted) which has nothing to do with the "real" missing one. Please see my ORDER BY example in DRILL-8507 and the error from there:

Error: UNSUPPORTED_OPERATION ERROR: Schema changes not supported in External Sort. Please enable Union type.
Previous schema: BatchSchema [fields=[[`age` (INT:OPTIONAL)]], selectionVector=NONE]
Incoming schema: BatchSchema [fields=[[`age` (INT:OPTIONAL)], [``age`` (INT:OPTIONAL)]], selectionVector=NONE]

Note that all the major types here are guessed correctly (INT:OPTIONAL), but we fail because of different field counts. This happens because in the incoming schema we have 2 fields: quoted and unquoted, while it is supposed to have only one (unquoted, based on previous schema).
This is the reason I reported the issue as 2 separate jiras and divided the PR into separate commits.
So before proceeding to any type guessing logic, we have to get rid of the quoting problem first.

Also, note that this fix works ONLY in one direction (column appears, then disappears), and ONLY within a single thread: it can't solve the same problem if the two files are read in different threads and sent to the SORT to reconcile.

Thanks to AbstractParquetRowGroupScan's schema, which this (DRILL-8508) solution is based on, all readers in all fragments on all machines are aware about the overall (constructed from all parquet files to read) table schema. Foreman scans the footers of all the files and merges a single schema from them back in the planning phase and then sends it to each fragment. This is exactly what gives us the opportunity to "see the future" at reading phase, because all the metadata (even for future files) is already available and merged into a single schema.
With that, we are not just propagating the old column type, but we are propagating the certainly correct column type. And we are sure the type is correct because it was resolved back in the planner.
So, in fact, the fix works in ALL directions and within ALL threads and machines.

Further, we are changing the mode to OPTIONAL as required so we can fill the vector with NULL values. However, change of mode (i.e. nullability) is a schema change and will cause the SORT to fail. We have to have known, on the previous file, that the column will be missing in this file, so that we can create the original column as OPTIONAL.

Exactly, and that is why I added this enforcing OPTIONAL logic. That is, even if the particular parquet reader is going to read REQUIRED parquet column, we enforce it to put it in NULLABLE value vector to get consistent schema with missing column in some file. Note that we are able to so, because we know at the reading phase that the column is partially missing thanks to the aforementioned schema propagated to all readers.

@ychernysh
Copy link
Author

@paul-rogers
I have tested the fix the following way. I have a Hadoop/Drill cluster with 2 nodes running DataNode/Drillbit. I have a dfs.tmp.people table consisting of 100 parquet files each having 10 or more row groups with such schemas:

# case 1:
/tmp/people/{0..49}.parquet: id<INT(REQUIRED)> | name<VARCHAR(OPTIONAL)> | age<INT(OPTIONAL)>
/tmp/people/{50..99}.parquet: id<INT(REQUIRED)>
# case 2:
/tmp/people/{50..99}.parquet: id<INT(REQUIRED)>
/tmp/people/{100..149}.parquet: id<INT(REQUIRED)> | name<VARCHAR(OPTIONAL)> | age<INT(OPTIONAL)>

The files are spread evenly across all DataNodes and when I run a query, I see (in each Drillbit's logs, and in query profile) that Drill reads in parallel in 2 Drillbits. I run such queries:

SELECT age FROM dfs.tmp.people ORDER BY age;
SELECT name FROM dfs.tmp.people ORDER BY name;
SELECT age FROM dfs.tmp.people UNION ALL (VALUES(1));
SELECT age FROM dfs.tmp.people UNION (VALUES(1));
SELECT name FROM dfs.tmp.people UNION (VALUES ('Bob'));
SELECT name FROM dfs.tmp.people UNION ALL (VALUES ('Bob'));

They all succeeded. Without the fix, Drill would fail. And note that we need all of the 3 solutions provided in this PR to make all of them pass:

  1. Solve the naming problem
  2. Set the correct minor type
  3. Set the correct data mode

The main idea of DRILL-8508 solution is that since we scan footers of all the parquet files to read back at the planning phase in Foreman, we should already know what columns are (partially) missing and what are not. Knowing that file1.parquet contains (a: VARCHAR:REQUIRED) and file2.parquet has no a column at the planning phase, we can tell the reader 1 to forcefully put a column in a nullable vector (and not REQUIRED) and the reader 2 to create a missing column vector of type VARCHAR (and not default to INT). And since we've got this information even before any of the readers start to actually read, it doesn't matter what file would be read first. So this solution is order-agnostic

Note about tests: Unit tests for the fix, however, require having specific file read order. This is due to some operators such UNION ALL, who build their own output schema based on the first prefetched batches from left and right inputs. Here it does matter what file would be read first since it defines the UNION ALL output schema. So the unit tests aim to do such an order that without the fix there would have been an error, while with the fix it succeeds.

@paul-rogers
Copy link
Contributor

@ychernysh, thanks for the clarification of the column name issue. I agree that the name stored in the vector should not contain backticks.

I am unaware of any code in the Foreman that scans all the Parquet files. Is that something new? Doing that in the Foreman places extreme load on the Foreman, causes remote reads in Hadoop, and will be slow. Drill relies on statistical averages to balance load: each node does about the same amount of work. Doing a large prescan in the Foreman invalidates this assumption. The feature is fine for Drill run as a single process on a single machine for a single user. It will cause hotspots and resource issues on a busy, distributed cluster.

The old, original Parquet schema cache did, in fact, read all the files in a folder, but did so in a distributed fashion, and wrote the result to a JSON file. (But without locking, so that two updates at the same time led to corruption.) Are we talking about a new feature recently added in the last few years? Or, about the original Parquet schema cache?

If this feature does exist, then we now have six ways that Drill handles schema (Parquet cache, provides schema, Drill Metastore, HMS, on-the-fly, and this new prescan of Parquet files). If a schema is available, then I agree with your analysis. The planner should detect column type conflicts, and if a column is missing. If a column is missing in any file, then the schema sent to the readers should be OPTIONAL for that column. Further, if some Parquet files have REQUIRED, and some OPTIONAL, then the mode should be OPTIONAL. You know the drill (pardon the pun).

Note, however, that deciding on a common schema is a planner task. With EVF, the planner would provide the schema (using the provided schema mechanism) to the reader and EVF would "do the right thing" to ensure that the reader's output schema matches that defined by the planner. There are many CSV file unit tests that show this in action.

Calcite does a fine job working out the required types assuming that Drill provides column type information in its metadata. Does the new Parquet prescan feed into Calcite, or is it something done on the side? If on the side, then we'll run into issues where Calcite guesses one column type (based on operations) but the Parquet schema prescan works out some other type (based on the Parquet files.) For example, if I have a query SELECT SQRT(foo)..., Calcite will guess that the column has to be numeric. But, if the Parquet prescan sees that foo is VARCHAR, then there will be a runtime conflict rather than the planner sorting out the mess. Again, I'm ignorant of the prescan feature, so I don't know what might have been done there.

Thus, the task for Parquet should be to recreate what EVF does (since EVF already fought these battles), but using the older column writers. That is:

  • The Parquet reader has to properly handle the missing column to avoid a schema change. In Drill, "schema change" means "rebind the vectors and regenerate any operator-specific code."
  • If the first file does not have the column, create a dummy column of the type specified by the planner.
  • If the first file does have a given column, create a vector with the type specified by the planner, not the type specified by Parquet.
  • If a subsequent file does not have the column, reuse the prior vector, which should be of OPTIONAL mode.

Note that Parquet must already have a way to reuse columns common to two files, else we'd get a schema change on each new file. (Sorry, my knowledge of the old column writers is rusty; it's been several years since I last looked at them.)

In fact, given that Parquet is trying to solve the same problems as EVF, one might want to review the EVF unit tests and recreate those many cases using Parquet. "If one is ignorant of history one is doomed to repeat it."

Thanks again for the explanations. Now that I better understand the problem (and am aware of that new Parquet prescan feature), I can do a better code review.

@cgivre
Copy link
Contributor

cgivre commented Aug 30, 2024

@paul-rogers @ychernysh I'm wondering if it wouldn't be worth it to refactor the Parquet reader to use EVF2 rather than debug all this. I don't know what was involved, but I do know that refactoring all the other plugins to use EVF2 wasn't all that difficult, but I do know that Parquet is another ball game.

Copy link
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more "starter comments" as I wrap my head around the code.

@@ -661,6 +663,12 @@ static Map<SchemaPath, TypeProtos.MajorType> resolveFields(MetadataBase.ParquetT
// row groups in the file have the same schema, so using the first one
Map<SchemaPath, TypeProtos.MajorType> fileColumns = getFileFields(parquetTableMetadata, file);
fileColumns.forEach((columnPath, type) -> putType(columns, columnPath, type));
// If at least 1 parquet file to read doesn't contain a column, enforce this column
// DataMode to OPTIONAL in the overall table schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general rule has to be:

  • For all columns that exist, define a common type that can hold all of the associated column types.
  • If any column is optional (or missing), assign the OPTIONAL type -- but only if the other types are REQUIRED.
  • If all columns are REPEATED, then the missing column is also REPEATED. (In Drill, a zero-length array is the same as NULL: there is no such thing as a NULL array in Drill.)
  • If any column is REPEATED, and some column is OPTIONAL or REQUIRED, then choose REPEATED as the column type. Ensure that the runtime code handles the case of writing a single value into the array when we read the file with the OPTIONAL or REQUIRED column.

IIRC, EVF handles all the above for dynamic columns. If Drill had type logic in the Calcite planner, it should handle these same rules.

Again, this kind of logic requires extensive unit tests of all the cases above, plus any others you can think up.

Copy link
Author

@ychernysh ychernysh Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first item is about resolving different data types even if there are no missing columns, which I didn't cover.
but only if the other types are REQUIRED - is this condition necessary?
Regarding REPEATED - I haven't covered it in any way.

In theory, implementing these should not be that hard...

@@ -56,10 +56,10 @@ public ParquetColumnMetadata(ColumnDescriptor column) {
this.column = column;
}

public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options) {
public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options, boolean isEnforcedOptional) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, if we are enforcing a planner-provided schema, the job is to map whatever the Parquet type is into the given, fixed Drill type. There is only one right answer when the schema is provided. Again, see EVF for how this works.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try it...

* appropriate data mode in this schema. Our mission here is to enforce that OPTIONAL mode in our
* output schema, even if the particular parquet file we're reading from has this field REQUIRED,
* to provide consistency across all scan batches.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great start! See the other cases described above.

Also, I seem to remember creating code to handle evolving column types as part of EVF. Perhaps you can find that code. The code likely has a large number of unit tests (I'm a test-driven kinda guy) which you can reuse to test your parallel implementation.

/**
* Covers selecting completely missing columns from a parquet table. Should create Nullable Int
* ValueVector in that case since there is no chance to guess the correct data type here.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that NULLABLE INT is just a guess; there is nothing special about it. EVF allows the reader to define its own "default" column type. For example, for CSV, columns are NEVER INT, so it is better to guess VARCHAR, which is the only type CSV supports. Parquet can play the same trick if doing so is useful.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NULLABLE INT is just what was there when I came to this issue. I think it would be easy to make it configurable by user (if there is any function able to parse type from string). Should we add it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "default type" for the other readers is part of the reader definition. We could create yet another system/session option for the default Parquet type.

After this change, the default type will only apply if the user requests a column that appears in none of the files. Maybe the file appeared in older files, but not newer ones, or visa versa.

Suppose the user has a tool that sends the SQL. That tool expects comments to be VARCHAR, but this was something added recently. When using that query against old files, the column will suddenly become INT, perhaps breaking code that expected the VARCHAR type.

We cannot fix this case: only a full metastore that tracks all files could solve this case. Having an overall default column type can't fix per-column issues.

So, I guess we can set this issue aside for now.

* - parquet/partially_missing/o_m -- optional, then missing
* - parquet/partially_missing/m_o -- missing, then optional
* - parquet/partially_missing/r_m -- required, then missing
* - parquet/partially_missing/r_o -- required, then optional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining the schemas here in the test comments!

@paul-rogers
Copy link
Contributor

@paul-rogers @ychernysh I'm wondering if it wouldn't be worth it to refactor the Parquet reader to use EVF2 rather than debug all this. I don't know what was involved, but I do know that refactoring all the other plugins to use EVF2 wasn't all that difficult, but I do know that Parquet is another ball game.

Doing so would be the ideal solution. The challenge has always been that the Parquet reader is horribly complex. I took a few cracks a refactoring it back in the day, but it is still pretty complex.

The most challenging issue is that Parquet is parallel: it reads each column in a separate thread. MapR did a large number of hacks to maximize parallelism, so that code grew quite complex with many nuances to saturate threads while not using too many resources overall: that is, to maximize parallelism within a single query, but also across the "thousands of concurrent queries" that were the rage back in the day.

All other readers are row-based since that is how most other data formats work. EVF is a row-based implementation. As a result, EVF would be difficult to reuse.

This raises the reason that EVF was created in the first place: limiting batch size to prevent memory fragmentation. Back in the day, all readers read 64K records per batch, even if that resulted in huge vectors. EVF imposes a batch size limit, and gracefully wraps up each batch, rolling over any "excess" data to the next one.

In Parquet, that logic does not exist. That is, if we have, say, 20 column writers all busy building their own vectors, there is nothing to say, "hold, on, we're over our 16 MB batch size limit". Instead, the readers just read n rows, creating whatever size vectors are required. Read 1000 columns of 1 MB each and you need a 1GB value vector.

The memory fragmentation issue arises because Drill's Netty-based memory manager handles allocations up to 32 MB (IIRC) from its binary-buddy free list. Beyond that, every request comes from the OS. Netty does not release memory back to the OS if even a single byte is in use from a 32 MB block. Eventually, all memory resides in the Netty free list, and we reach the OS allocation limit. As a result, we can have 100% of the Netty pool free, but no OS capacity to allocate another 64MB vector and we get an OOM. The only recourse is to restart Drill to return memory to the OS.

While we long ago fixed the fragmentation issues in the other readers (via EVF) and other operators (using the "temporary" "batch sizer" hack), it may be that Parquet still suffers from memory fragmentation issues because of its unique, parallel structure.

Still, perhaps there is some way to have EVF handle the schema and vector management stuff, but to disable the row-oriented batch size checks and let the Parquet readers write as much data as they want to each vector (fragmenting memory if it chooses to do so). Or, maybe work out some way to give each column reader a "lease" to read up to x MB. EVF can handle the work needed to copy "extra" data over to a new vector for the next batch. I'd have to try to swap EVF knowledge back into the ole' brain to sort this out.

All this said, I can certainly see the argument for hacking the existing code just to get done. I guess I'd just suggest that the hacks at least reuse the rules we already worked out for EVF, even if they can't reuse the code.

All of this is premised on the notion that someone did, recently, add a "Parquet prescan" to the planner, and that someone added column type propagation to the Calcite planner. Was this actually done? Or, are we somehow assuming it was done? Are we confusing this with the old Parquet schema cache? Again, I've been out of the loop so I'm just verifying I'm understanding the situation.

@cgivre
Copy link
Contributor

cgivre commented Aug 30, 2024

All of this is premised on the notion that someone did, recently, add a "Parquet prescan" to the planner, and that someone added column type propagation to the Calcite planner. Was this actually done? Or, are we somehow assuming it was done? Are we confusing this with the old Parquet schema cache? Again, I've been out of the loop so I'm just verifying I'm understanding the situation.

@paul-rogers I'm not aware of any recent significant work on the Parquet reader. I know @jnturton did some work regarding adding new compression capabilities and there have been a few bug fixes here and there, but nothing major as I recall. So I don't think we've added any Parquet "prescan" that I am aware of.

@paul-rogers
Copy link
Contributor

@paul-rogers I'm not aware of any recent significant work on the Parquet reader. I know @jnturton did some work regarding adding new compression capabilities and there have been a few bug fixes here and there, but nothing major as I recall. So I don't think we've added any Parquet "prescan" that I am aware of.

Ah. I'm misunderstanding something. I outlined a number of the common problems we encounter when we try to figure out schema dynamically. A response suggested that this pull request solves this because it has access to the full Parquet schema in the planner. The only way to get schema that is either to use the old Parquet metadata cache, or something that scans all the Parquet files in the planner. I thought I saw a statement that such a scan was being done.

To prevent further confusion, what is the source of the Parquet schema in this fix?

@ychernysh
Copy link
Author

ychernysh commented Sep 2, 2024

@paul-rogers @cgivre

I am unaware of any code in the Foreman that scans all the Parquet files. Is that something new?

Drill parallelizes scanning a parquet table at file, row group, column chunk and page levels. A single ParquetRecordReader object (which is the key part of the issue, since it creates null-filled vectors for missing columns) reads 1 row group, so this is the level we're interested in. Each Minor Fragment is assigned a list of row groups (that is, a list of ParquetRecordReader objects) to read.
The assignment happens in Foreman at parallelization phase (Foreman#runPhysicalPlan:416, AbstractParquetGroupScan#applyAssignments) and requires the files metadata to be known at that phase (we need to know what row groups are there in order to assign them to the minor fragments).
So the metadata is scanned back at the physical plan creation phase (Foreman#runSQL:594) using the ParquetTableMetadataProvider interface (see this code block), whose implementations can either read from Drill Metastore, or from the files in FS themselves.
Furthermore, the schema from the metadata (that is used in this PR) is also needed at ScanBatch initialization phase (minor fragment initialization) for row group pruning (see RowsMatch for the logic). See this and this lines, where rowGroupScan.getSchema() is also used.
So, the answer is: no, that's not something new. Rather, it's a thing required for some basic Drill functionality (assigning the row groups to minor fragments), but also for some specific functionality (row group pruning).

Doing that in the Foreman places extreme load on the Foreman, causes remote reads in Hadoop, and will be slow.

As I said above, we cannot avoid it. But, for your information, the process of reading metadata from all the parquet files is parallelized into 16 threads within a Foreman JVM.

The old, original Parquet schema cache did, in fact, read all the files in a folder, but did so in a distributed fashion, and wrote the result to a JSON file. (But without locking, so that two updates at the same time led to corruption.) Are we talking about a new feature recently added in the last few years? Or, about the original Parquet schema cache?

Based on the above, and on that ParquetTableMetadataProvider can use metadata cache files, I assume that we're talking about the original Parquet schema cache. But I'm not sure...

If this feature does exist, then we now have six ways that Drill handles schema (Parquet cache, provides schema, Drill Metastore, HMS, on-the-fly, and this new prescan of Parquet files).

So, I guess we still have 5 ways, and let me summarize my knowledge on each:

  • parquet cache: used in this PR
  • provided schema: only supports text files, not parquet
  • Drill Metastore: I have just checked, all the errors described in the issue are still reproducible with this way. Seems like this schema is not passed down to ParquetRecordReader (so was the parquet cache schema before this PR). In theory, we could take this schema, but: 1) it requires users to use Drill metastore, while this PR does not; 2) this PR has already done the same, but with parquet cache schema
  • HMS: ???
  • on-the-fly: not applicable to this solution, since we want to take advantages of the "neighbor" parquet files, while this is the schema for a single file being read
  • The Parquet reader has to properly handle the missing column to avoid a schema change. In Drill, "schema change" means "rebind the vectors and regenerate any operator-specific code."
  • If the first file does not have the column, create a dummy column of the type specified by the planner.
  • If the first file does have a given column, create a vector with the type specified by the planner, not the type specified by Parquet.
  • If a subsequent file does not have the column, reuse the prior vector, which should be of OPTIONAL mode.

Basically all of the above is met by this PR, but with a bit different wording (not using terms first/subsequent file, since as said before, the fix is order-agnostic):

  1. If at least 1 parquet file contains a selected column, then the null-filled vectors should have its minor type
  2. If at least 1 parquet file does not have a selected column, or have it as OPTIONAL, then ALL of the readers are forced to return it as OPTIONAL

Please sorry if I missed answering some of the questions. I am new to Drill and this is one of my first tasks on it, so I might not understand some things. Regarding refactoring to EVF: I've never seen it before and probably I'm not the best person to implement it. I will try to research it though to understand better @paul-rogers 's reviews. But I think we should at least agree on how would we treat the issue: refactoring everything to EVF (which, as far as I understand, would erase everything made in this PR), making current parquet reader simulate the EVF as much as possible or just solving the concrete problems described in the tickets. Note that this PR went the 3rd way. So before applying any edits for this PR it would be nice to understand if it would make any sense at all first :)
@rymarm , maybe you could fill in the holes in my answer with your knowledge/experience?

@paul-rogers
Copy link
Contributor

@ychernysh, thank you for your detailed explanation. Let's focus in on one point.

The assignment happens in Foreman at parallelization phase (Foreman#runPhysicalPlan:416, AbstractParquetGroupScan#applyAssignments) and requires the files metadata to be known at that phase (we need to know what row groups are there in order to assign them to the minor fragments).

It is surprising that none of the "second generation" Drill developers ever knew about, or mentioned that Drill scans Parquet files at plan time. Of course, it could be that I just never understood what someone was saying. We used to wrestle with inconsistent schemas all the time, so it is surprising if the solution was available the whole time. That's why, if this code exists, I suspect it must have been added ether very early (by a "first generation" developer who later left) or within the last few years.

Another reason it is surprising that we have such code is the big deal we make of being "schema free." Of course, "schema free" has problems. Why would we not have mentioned that "schema free" means "infer the schema at plan time" if doing so would solve the schema inconsistency issues? Amazing...

If such code exists, then it should have been integrated not just into parallelization planning, but also Calcite type propagation, and the schema included in the physical plan sent to the Parquet readers. I suppose whoever added it could have just been focused on parallelization, and hoped Drill's "magic" would handle the schema. In fact, the "missing" type propagation code is very code that you're now adding, though, it seems, without using Calcite for the type propagation.

The discussion we are having depends entirely on whether schema information is available at plan time. Before I comment further, you've given me so me homework: I'll look at that code to determine if it really does scan all the file headers at plan time.

@ychernysh
Copy link
Author

@paul-rogers

though, it seems, without using Calcite for the type propagation

Sorry, I don't really understand the question regarding Calcite (I'm not familiar with it). I didn't consider Calcite in my work, so, probably, your statement is correct.

@rymarm
Copy link
Member

rymarm commented Sep 3, 2024

@paul-rogers, there is no new feature. This behavior of reading all parquet files metadata during the planning phase has been present for a long time. Moreover, we even have a feature called "parquet metadata cache" aimed to resolve the con of this logic
when the planning phase takes significant time due to the reading of metadata of many distinct parquet files

Parquet metadata caching is a feature that enables Drill to read a single metadata cache file instead of retrieving metadata from multiple Parquet files during the query-planning phase
...
Metadata caching is useful when planning time is a significant percentage of the total elapsed time of the query

https://drill.apache.org/docs/optimizing-parquet-metadata-reading/

@rymarm
Copy link
Member

rymarm commented Sep 3, 2024

@paul-rogers

Another reason it is surprising that we have such code is the big deal we make of being "schema free." Of course, "schema free" has problems. Why would we not have mentioned that "schema free" means "infer the schema at plan time" if doing so would solve the schema inconsistency issues? Amazing...

I was supposing that the Drill's key feature is that Drill takes care of the data structure and type, while the user is left only to query the data like it is a simple Porstgres or MySql table. Saying this I think it's fine to read all the available metadata of the data whenever at any query execution phase if it helps us to execute faster and with less fail chance the query and if there is no additional setup from the user required.

@paul-rogers
Copy link
Contributor

Thanks again for the many pointers and explanations. Much to my surprise, Drill does, indeed, seem to read the Parquet metadata at plan time. Your explanation makes sense: clearly we need the information for parallelization planning. The Parquet metadata cache simply bypasses the physical file reads if the cache is present.

I traced the whole flow. It all looks good except at the last step, writing data to a vector. That will prevent any direct attempt to have the Parquet reader write to a vector other than the one that matches the Parquet type.

That may still be OK. As you can tell, my comments are based on the general schema change case: the problems we've encountered over the years. But, I suspect this PR can address one very specific case: missing columns when the type is consistent across files where it does appear.

Recap of the Parquet Planning Process

The code is hard to follow statically. Instead, I ran the unit tests in the PR to get the dynamic view. As described above, the ParquetGroupScan does read the files by way of a number of builders, providers and associated classes. The actual reading is about here: ParquetFileAndRowCountMetadata.getParquetFileMetadata_v4.

Here I will insert a grumble that if Drill scans the Parquet files per query to get schema, then it could certainly do so for all other file types and avoid a large number of problems. Files have a path and a timestamp: we could easily build up a per-Drillbit cache of metadata so we read it only once per session, and only if the file changes. Trying to distribute that data is the hard part. (Everyone wants to use ZK, but ZK is not a distributed DB.) End of grumble.

So, we do have schema per row group. We can thus work out the common schema as is done in this PR. In general, we have to handle the following:

  • Missing columns
  • Different types
  • Different cardinalities (nullable, non-nullable, repeated)

The code here handles missing columns and the nullable/non-nullable modes. We can worry about the scalar/array issue later as only a novice would change a column from one to the other. Because of the use of the common type code, I assumed that this PR might also handle the differing types issue, but maybe I misunderstood.

As explained above, the right way to handle columns is to run them through Calcite. That way, Calcite can check for something like VARCHAR * INT, which SQL doesn't support. I can't tell if the original author did that work. Without the Calcite change, the readers will know the type, but not the SQL planner. Not great, but we can live with it since that's the way its worked until now.

Thanks for adding tests that show that the type inference works as intended. It would be handy if DESCRIBE <table> were to work in this case. I didn't check if that is wired up for Parquet files.

There are three cases that work prior to this PR. Assume we have a column c:

  • c appears in no files. The type is untyped NULL. (Nullable INT in other readers originally, changed with EVF.)
  • c appears in all files with the same type and mode. Works for all operators. The user sees a consistent column type.
  • c' changes type or mode, or is missing in some files. Works only if the query does not contain a SORT, JOIN or other type-sensitive operator. Presents the user with results with diffing types.

This PR generalizes the third case. As it is, the "sensitive" operators will raise a schema change exception. To fix the SCEs we need to handle all the cases in which the schema can change so that all readers return the same schema. Cases:

  1. c appears in all files with the same type but differing modes.
  2. c appears in all files, but with differing types.
  3. c appears in all files, but with differing types and modes.
  4. c is missing in some files, but when it appears, it always has the same type. Mode must be set to nullable. Handled by this PR.
  5. c is missing in some files. In the files in which c does appear, it has differing types. Handled by this PR, maybe.

A review response comment suggests that this PR fixes cases 4, which seems like the most reasonable form of schema change. Changing column types is just asking for trouble with tools such as Presto or Impala. Still, let's think through all the cases.

There is one additional case, just to make this more fun. The query can be of the form SELECT * FROM myParquet. In this case, the user might expect the union of all columns from all files and row groups, with the types reconciled for all of them. If we focus just on case 4, then the user would expect us to "fill in the blanks" if some of the columns are missing in some files.

Once we have the common schema, we need to ship it to the distributed readers. It looks like AbstractParquetRowGroupScan.schema provides the "physical plan" support. The AbstractParquetScanBatchCreator class creates a set of ParquetRecordReader instances from the AbstractParquetRowGroupScan and wraps them in the ScanBatch operator. The schema is placed in a ColumnExplorer.

As it turns out, Drill has not just one, but two different Parquet readers: DrillParquetReader and ParquetRecordReader. Both receive the list of columns, but only as a List<SchemaPath> which has no type information. ParquetRecordReader also receives the type information. It seems that the names are redundant: they appear in both the schema path and the TupleSchema. This may be the source of one of the bugs fixed in this PR.

So far so good.

The Problem

But, now all heck breaks loose. What we want is for each row group to honor the planner-provided schema. But, this won't work.

What we want is that the row group either has column c or not. If not, we make up a column of the type provided by the TupleSchema. There is much code to work out the Drill type for each Parquet type. That code should no longer be used by the reader, but rather by the planner. At read time, we just create the vector specified by the schema to ensure consistency.

But, the Parquet column readers assume that the type of the Drill vector is the same as that of Parquet columns. As it turns out, there are two of sets of Parquet column writers: the original ones and a second set of "bulk" readers. I don't know if both are still used. Eventually, the Parquet column readers call down to the following line from FixedByteAlignedReader:

  protected void writeData() {
    vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength);
  }

That is, the Drill vectors have been carefully chosen so that their byte layout of the vector matches the byte layout of Parquet. We read, say, INT32 values into INT vectors where the values are also 32 bits long. We must also assume the same endian layout, IEEE floating point layout, etc. To write to a different Drill type requires a per-value conversion.

The class ColumnReaderFactory chooses the column reader. It has no code to handle type conversions. To do a type conversion, we'd need a new set of readers: one for each (from type, to type) pair. EVF provides a large set of these so that EVF can perform this type coercion automatically for the common cases. Examples. read a CSV file (all VARCHAR), but into an INT column (requires a VARCHAR-to-INT conversion).

This code is complex, so I might be missing something. More testing would reveal the facts. It could be that writing a non-nullable column into a nullable vector works: I'm just not sure what sets the null bits to the correct values. Maybe they take on the correct values by default, if we zero out the null vector at allocation.

Handling Only Missing Columns

Since the Parquet reader is not set up to handle type changes, we can choose to restrict this fix to handle only missing columns. That is, if a column is missing, and it has the same type when it does appear, only then change the mode to nullable. Otherwise, let the schema change happen and the query will fail as today. Such a fix solves the issue for your case, but does not solve the general problem. This is still an improvement and worth doing.

We just need to ensure that if Parquet thinks it is writing to a non-nullable vector, but the vector is nullable, that the code "does the right thing." Nullable vectors are a composite: they have a null vector and a data vector. Parquet should use the data vector. We just need to ensure that we somehow set the null vector bits correctly. It may be that vector allocation zeros the vectors and so the null vector starts in the proper state. Since the tests pass, this may be the case.

Alternative Solution for Type Changes

Suppose we wanted to handle type changes as well. From the above description, we can't solve the problem by forcing Parquet to choose a different type of vector than the one for the column type. (I mentioned that this is possible when using EVF and suggested we could do the same for Parquet. It turns out that Parquet will not allow this solution.)

There are two possible solutions, one very hard, the other only hard. (The third, of course, is to skip this task.)

The first option is to create a type conversion mechanism as described above. These are tedious to write (we'd use code generation) and just as tedious to test. This seems an excessive amount of work to solve a problem which occurs infrequently.

The second option is simply to alter the execution plan to insert a PROJECT operator on top of the SCAN operator. We could set a flag that says that the PROJECT is needed. This flag would indicate that we found a type inconsistency. The PROJECT operator (ProjectRecordBatch), will handle type conversions: it is what implements CAST() operations.

The trick is that adding the PROJECT is, itself, complex. Basically, there needs to be a Calcite rule that rewrites a scan batch with our flag set to a project with the needed CAST operations. The Parquet reader should then ignore the TupleSchema and create vectors based on the Parquet schema.

We would want to verify that the PROJECT does what it is supposed to do: emit the same vectors even when it receives a schema change from its upstream (SCAN) operator.

Suggestion

Given the above facts (which you should certainly) verify, I suggest we choose the "Handling Only Missing Columns" solution and not attempt to handle column type changes. That is, don't attempt to fix the cases 3-5 (differing types) in this PR. Leave that as a future exercise.

If you found this bug due to an actual use case. ensure that the use case does not change column data types. If the type of a column is to change, just create a new column name instead. For example, if IP_addr is an INT32, don't try to make it an INT64 for IPV6 addresses. Instead, write those to a new IP6_addr column. With this fix, adding and removing columns should work as expected.

Does this seem a reasonable approach?

@paul-rogers
Copy link
Contributor

This PR fixes not just the type issues above, but also the name issue and the untyped null issue. Let's not forget those. As explained earlier, you are right that backticks should never appear at runtime. Backticks are a SQL only feature.

From the comment you mentioned, it could be that the col.toExpr() code was an ill-fated attempt to fix a real problem.
Parquet (and Drill) support maps. We can thus have a map m with a column foo. The SQL name: m.foo or 'm'.'foo' refer to foo within m. (Pardon the forward ticks: Markdown doesn't like nested backticks.) The bug is that the column name m.foo is a perfectly valid Parquet column name. So, 'm.foo' (the entire name quoted) refers to a top level column with the m.foo name.

We can change the code to fix the bug you discovered. We should, however, make sure we have a test for the name-with-dot/column-within-map case. We can perhaps look for unit tests associated with DRILL-4264 to understand what the original fix was trying to do.

@paul-rogers
Copy link
Contributor

The other fix is to change the type of a missing column from Untyped Null to Nullable INT.

First, let me make sure I understand. Prior to this PR, did Parquet use Untyped Null for the type of a missing column? And, after this fix, if a column is missing from all files, it will be Nullable INT? If so, i worry that this may be a step backwards.

The purpose of Untyped Null is to say, "hey, we don't know the type." This is a "good thing". It is a hack that Drill will "helpfully" guess nullable INT. Doing so can cause problems. If someone queries a set of files where some have foo and some don't, with a Nullable INT, the column type may change from VARCHAR (say) to INT depending on the query. It seems slightly more helpful to change from VARCHAR to Untyped Null.

Is it possible to leave the type of a "completely missing" column as Untyped Null?

/*
Field name for the missing column MUST NOT be quoted with back-ticks, so we should have ONLY ONE
column for that field (unquoted)
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The preferred style for comments is the way the class header comment is formatted:

  /*
   * Field name for the missing column MUST NOT be quoted with back-ticks, so we should have ONLY ONE
   * column for that field (unquoted)
   */

/**
* Covers selecting completely missing columns from a parquet table. Should create Nullable Int
* ValueVector in that case since there is no chance to guess the correct data type here.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "default type" for the other readers is part of the reader definition. We could create yet another system/session option for the default Parquet type.

After this change, the default type will only apply if the user requests a column that appears in none of the files. Maybe the file appeared in older files, but not newer ones, or visa versa.

Suppose the user has a tool that sends the SQL. That tool expects comments to be VARCHAR, but this was something added recently. When using that query against old files, the column will suddenly become INT, perhaps breaking code that expected the VARCHAR type.

We cannot fix this case: only a full metastore that tracks all files could solve this case. Having an overall default column type can't fix per-column issues.

So, I guess we can set this issue aside for now.

@ychernysh
Copy link
Author

Hi @paul-rogers ,

Different types. This PR DOES NOT handle different minor types, indeed. The main focus is on missing columns and different cardinalities (though I missed REPEATED case). I agree with all you say about the complexity of this task and support the suggestion to handle it as future exercise outside of this PR.
And, just in case, in the example of IPv4 and IPv6 columns, you mean that the user should manually create each column for each address type, right?

Grumble. Isn't that what REFRESH TABLE METADATA does? A user have to query it explicitly though.

Star query case. The SELECT * FROM myParquet is not handled in any way here, because creating null-filled vectors is skipped for such cases. See block1 and block2.

DrillParquetReader. Indeed, Drill seems to have 2 parquet readers, but this one seems to be unsupported, see this config. That's why I also ignored it here.

Passing type information to readers. See how I passed type information to ParquetRecordReader through AbstractParquetRowGroupScan#getSchema.

Reading non-nullable parquet column into a nullable vector. With this I actually had some problems, but managed to solve them with these changes:

  1. Choose a nullability of a column reader based on value vector's nullability, not the parquet column's one. Place1 and place2.
  2. For such case, simply always put 1 in a nullability bit. Place1 and place2.

Column name with dots. I will try to play around such cases and maybe add some tests...

UntypedNull. Well, it's not really that I intentionally replaced UntypedNull with NullableInt... It is rather a "side effect" of a backticks solution. You see, there is this if condition block in FieldIdUtil#getFieldId that basically says "if the underlying field names in expectedPath and vector aren't the same, return null, otherwise proceed to getting the real field id". The value we return here will later decide which if branch would we fall here. null would lead to NullExpression.INSTANCE (which is basically an UntypedNull), while a real value would lead to a real ValueVectorReadExpression. Note that if we have a problem with backticks, the very first equality check in FieldIdUtil#getFieldId would result in false, because of quotes difference (say, my_field and 'my_field' (you got the sense)). This leads us to an UntypedNull case. If the equality check would have returned true, we would fall in a ValueVectorReadExpression, which would later bring us to creating missing column vectors logic, where we use NullableInt. So the tests that expect UntypedNull passed ONLY because "my_field".equals("'my_field'") returned false. When we fixed the backticks problem, we made it look like "my_field".equals("my_field"), made it return true, use NullableInt and thus fail. Hope you got it...
You know that, however, I understand what you're talking about in that using UntypedNull for completely missing columns would be more helpful than defaulting to NullableInt. This, indeed, doesn't confuse us with some unexisting int value and clearly tells us that the type is unknown.
Currently, if we do something like SELECT my_column FROM left UNION ALL SELECT my_column FROM right; where left has my_column: VARCHAR, while right doesn't have it at all, it would result in NumberFormatException, because Drill would (for some reason) prefer INT (from a missing column) over a VARCHAR and would try to cast left column to right's type, thus failing. Though if right column would came as UntypedNull to UNION ALL, the operator would certainly prefer the left type, thanks to these lines.
Maybe we should consider defaulting to UntypedNull instead of NullableInt. In theory, the only case we should default is when the column is completely missing. I once tried doing it, but had some problems on client side reading the vector. Maybe I should try once more...

@ychernysh
Copy link
Author

ychernysh commented Sep 6, 2024

@paul-rogers
Regarding default type and comments example above. Yes, as I said, unhardcoding the NullableInt and making the default minor type configurable by user might be a good idea, but here are some concerns here:

  1. Wouldn't this conflict with defaulting to UntypedNull?
  2. Since the vectors for missing columns are null-filled, they can be casted/converted to any minor type. That is, the user can issue CAST or CONVERT_TO and it should help for a query to succeed. Well, then, if we can define a default minor type on a per-query level, does it make sense to configure it on per-session or per-system level? Note that such workaround also helps with the UNION ALL example I put above in my previous comment.

@paul-rogers
Copy link
Contributor

@ychernysh, thank you for the detailed explanation. It is impressive how well you understand this complex code.

Scope: We're clear now that this PR is not trying to handle conflicting types. Yes, in my example, I was suggesting that the person who creates the Parquet files manage column types themselves. There is, of course, another workaround that I did not mention: the SQL user can (I believe) insert the needed casts. Suppose that we have a set of files where the types differ for three columns, but we know a common type. We can coerce the types manually:

SELECT a, b, c
FROM (SELECT CAST(a AS, DOUBLE), CAST(b AS VARCHAR), CAST(c AS INT) FROM myParquet)
ORDER BY a, b

The above should insert that PROJECT I mentioned. In an ideal world, Drill would figure this out from the Parquet metadata. As we said, this can be left as a future project for another time.

Wildcard Query: Ideally, if I have a Parquet file with columns a and b, and b is missing in some files, the following two queries should work identically:

SELECT a, b FROM (SELECT * FROM myParquet) ORDER BY a, b

SELECT a, b FROM myParquet ORDER BY a, b

That is, it should not matter how we learn we will scan column b: if it is missing, it should have a consistent type after this fix.

The code you mentioned reflects the original schema-on-read design: the wildcard is expanded for each row group at run time. This is one reason I was surprised that we gather the schema at plan time. Now that it is clear that Parquet does have the schema at plan time, we can work out the union of all columns from all files at plan time. We can sort out the types of missing columns. We can then tell readers that SELECT * expands to not just all the columns in that particular row group, but to the union of all the columns.

It is clear that we've got a mess. Drill started as schema-on-read. Then, it was found that, for Parquet (only) we need schema at plan time. But, the folks that added that code didn't fully think through the design. The result is a huge muddle that you are now starting to sort out.

Suggestion: let's leave proper wildcard expansion to another time. You are fixing this bug for a reason: for some use case. If your use case does not use wildcard queries, then it is safe to defer this issue until someone actually needs a fix.

Reading non-nullable parquet column into a nullable vector: Thanks for ensuring we set the null vector correctly. Sounds like this part is good.

Passing type information to readers: I saw your fix. That is why I mentioned that we now have two lists of columns given to the reader:

        rowGroupScan.getColumns(), // Columns as SchemaPath
       ...
        // each parquet SubScan shares the same table schema constructed by a GroupScan
        rowGroupScan.getSchema()); // Same list of columns as above, but as TupleMetadata?

As a reader, I have to try to understand: are the two column lists the same? Is the order the same? Is the TupleMetadata version a 1:1 relationship with the getSchemaColumns() list? If not, what are the differences?

You are adding code to an existing implementation, and so you want to avoid changing things any more than necessary. Having redundant lists is messy, but probably the simplest fix.

Suggestion: Maybe just add a comment about the assumed relationship between the two lists.

UntypedNull (Part 1): Thanks for the detailed explanation. I appreciate the time you've put into fully understanding the convoluted logic.

When faced with complex legacy code, I find it helpful to ask, what is this trying to do? The code itself is the ultimate truth, and we have to start there. But, to sort out what should be happening, we have to work out the developer's intent, and figure out if they made a mistake or omitted some important condition.

You pointed out that we do two checks. In the first one :

  public static TypedFieldId getFieldId(ValueVector vector, int id, SchemaPath expectedPath, boolean hyper) {
    if (!expectedPath.getRootSegment().getPath().equalsIgnoreCase(vector.getField().getName())) {
      return null;
    }

This code says, here is a ValueVector and an expected SchemaPath. Let's make sure that the vector actually is for the given schema path by checking the MaterializedField for the vector. In the case where the name was corrupted with backticks, this check failed: we have a vector with the name foo, but the expected path is 'foo'. So, no match.

The core question is: what use case is this meant to handle? I can speculate that there are two possible cases.

First is the top-level fields. For top level fields, the names should always match. By the time we get here, we should have created any needed "dummy" top-level vectors. You correctly fixed a case where the top level did not match.

I speculate that this code is meant to handle a second case: a column within a MAP column. Suppose the Parquet file has a map field m that contains two columns, a and b. The query, however, is asking to project m.c which does not exist. Perhaps this bit of code handles that map case.

Suggestion: your fix is probably fine. Please check if we have a test for the map case above. If we don't, consider adding one, just so we verify that this PR doesn't break anything.

UntypedNull (Part 2): Next, let's understand what should happen if a column is missing. We have four cases:

  1. The column c exists in none of the Parquet files.
  2. The column c exists in some of the Parquet files.
  3. The column m.c (a column within a map) exists in some Parquet files.
  4. The column m.c exists in none Parquet files.

Your fix handles case 2: we will now correctly use the type of existing columns. So, we're good here.

Case 1 is where the Nullable INT/Untyped NULL question arises. I agree with you that we should default the column type to Untyped NULL. It is odd that we used Nullable INT in one case, and Untyped NULL in another case. Doing so makes no sense to a user. Can we modify the code so we always use Untyped NULL in this case? We would detect case 1 in the planner, and mark the corresponding column with the Untyped Null type. (I hope TupleSchema handles this type! I can't recall ever testing it.) The Parquet reader would then know to use the Untyped Null when creating the dummy vector. This is based on what should happen; the devil is in the details, so please check if this suggestion can actually work.

Case 3 is rather special. EVF has some rather complex logic to handle missing map columns (to any depth). Your fix relies on code in the planner to work out the type for case 2 (top-level columns). Does that code handle nested columns? If so, we just do the same as we do for case 2. However, if the planner code treats all maps as a single column (does not look inside), then maybe we just leave the existing code to do whatever it already does in this case.

Case 4 depends on what we do in case 1 and 3. The "right" answer is for the missing column to be of type Untyped NULL. But, this case is obscure, so we can leave it to do whatever it currently does.

Suggestion: handle case 1 as described above, and just test that cases 3 and 4 still work however they used to work.

@ychernysh
Copy link
Author

@paul-rogers when I have time, I'll do some experiments and investigations on the discussed topics and get back to you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants