Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hive partitioning tracking issue #15441

Open
10 of 13 tasks
stinodego opened this issue Apr 2, 2024 · 28 comments
Open
10 of 13 tasks

Hive partitioning tracking issue #15441

stinodego opened this issue Apr 2, 2024 · 28 comments
Labels
A-io-partitioning Area: reading/writing (Hive) partitioned files accepted Ready for implementation enhancement New feature or an improvement of an existing feature

Comments

@stinodego
Copy link
Member

stinodego commented Apr 2, 2024

@stinodego stinodego added enhancement New feature or an improvement of an existing feature A-io-partitioning Area: reading/writing (Hive) partitioned files labels Apr 2, 2024
@stinodego stinodego self-assigned this Apr 2, 2024
@stinodego stinodego changed the title Hive partitioning to-do list Hive partitioning tracking issue Apr 2, 2024
@stinodego stinodego added P-goal Priority: aligns with long-term Polars goals accepted Ready for implementation labels Apr 2, 2024
@github-project-automation github-project-automation bot moved this to Ready in Backlog Apr 2, 2024
@stinodego stinodego moved this from Ready to In progress in Backlog Apr 2, 2024
@deanm0000
Copy link
Collaborator

If I may, here's another one #14936

@kszlim
Copy link
Contributor

kszlim commented Apr 3, 2024

I see you just merged the ability to specify a hive partition schema manually, does it allow for partial inference? Ie. if you have multiple keys that are partitioned against but you specify only a subset of them, will it infer the rest?

@ion-elgreco
Copy link
Contributor

@stinodego why not extend the schema to the full table instead of just the partition columns?

@stinodego
Copy link
Member Author

I see you just merged the ability to specify a hive partition schema manually, does it allow for partial inference?

At this point it does not. You have to specify the full schema of the Hive partitions. Similar to other schema arguments in the API. I can see how a schema_overrides type of parameter would be useful though. Not sure if they should be combined, will have to think about it.

@stinodego why not extend the schema to the full table instead of just the partition columns?

At least in the case of Parquet, that part of the schema is already available from the data. Not sure a full schema/schema_overrides would provide much benefit over simply casting after scanning.

@ion-elgreco
Copy link
Contributor

ion-elgreco commented Apr 3, 2024

@stinodego it is part of the parquet, but in situations with schema evolution, Polars would not be able to handle those situations. Also if I know the schema ahead, you can esentially skip reading the parquet metadata

@stinodego
Copy link
Member Author

stinodego commented Apr 3, 2024

in situations with schema evolution, Polars would not be able to handle those situations

Can you give an example?

you can esentially skip reading the parquet metadata

I don't know, there's other stuff in the metadata besides the schema. Not sure yet exactly what we're actually using.

@ion-elgreco
Copy link
Contributor

in situations with schema evolution, Polars would not be able to handle those situations

Can you give an example?
Sure, take these two parquet files that we have written:

df = pl.DataFrame({
    "foo": [1],
    "bar": [2],
}).write_parquet("polars_parquet/test1.parquet")

df = pl.DataFrame({
    "foo": [2],
    "bar": [3],
    "baz": ["hello world"]
}).write_parquet("polars_parquet/test2.parquet")

When you read with Polars, it incorrectly assumes that the first parquet is the schema for all parquets in the table. So when you read you get only foo, bar:

pl.read_parquet("polars_parquet/*.parquet")
shape: (2, 2)
┌─────┬─────┐
│ foobar │
│ ------ │
│ i64i64 │
╞═════╪═════╡
│ 12   │
│ 23   │
└─────┴─────┘

Now let's write in the other order, and polars will panick because it cannot handle that a column is missing in a parquet file. See this issue I made a while ago #14980:

df = pl.DataFrame({
    "foo": [2],
    "bar": [3],
    "baz": ["hello world"]
}).write_parquet("polars_parquet/test1.parquet")

df = pl.DataFrame({
    "foo": [1],
    "bar": [2],
}).write_parquet("polars_parquet/test2.parquet")

pl.read_parquet("polars_parquet/*.parquet")

thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-parquet/src/arrow/read/deserialize/mod.rs:144:31:
called `Option::unwrap()` on a `None` value
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

It's a common use case to evolve parquet tables without having to rewrite all the older files to conform to this new schema

@ion-elgreco
Copy link
Contributor

Having something akin to Pyarrow datasets: #13086, would make lot's of sense

@stinodego
Copy link
Member Author

Ok, I see what you mean. We should support this.

@kszlim
Copy link
Contributor

kszlim commented Apr 6, 2024

This might be interesting inspiration/source of ideas for a dataset abstraction in polars:
https://padawan.readthedocs.io/en/latest/

@jrothbaum
Copy link

jrothbaum commented Apr 9, 2024

Any chance you would reconsider this as part of the reworking of hive partition handling? #12041

@deanm0000
Copy link
Collaborator

Here's another one #15586. It's to change the default for write_statistics to True, nothing complicated.

@deanm0000
Copy link
Collaborator

Here are a couple more.

Can't forget to document at the end.

This one might be a bit of a tangent but it's to incorporate the pageindex spec of parquet files #12752

@Smotrov
Copy link

Smotrov commented May 1, 2024

As I understand adding partitioned fields to the schema supposed to enable hive partitions support.
However in my case it shows an error instead

const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/*";

 let mut schema = Schema::new();
    schema.with_column("year".into(), DataType::Int8);
    schema.with_column("month".into(), DataType::Int8);

    let schema = Arc::new(schema);

    let cloud_options = cloud::CloudOptions::default().with_aws([
        (Key::AccessKeyId, &cred.access_key.unwrap()),
        (Key::SecretAccessKey, &cred.secret_key.unwrap()),
        (Key::Region, &"eu-west-1".into()),
    ]);

    let mut args = ScanArgsParquet::default();
    args.hive_options.enabled = true;
    args.hive_options.schema = Some(schema);
    args.cloud_options = Some(cloud_options);

    // Check time required to read the data.
    let start = std::time::Instant::now();

    let df = LazyFrame::scan_parquet(TEST_S3, args)?
        .with_streaming(true)
        .collect()?;

the result is Error: Context { error: ComputeError(ErrString("Object at location data_lake/some_dir/partitioned_table_root_dir not found: Client error with status 404 Not Found: No Body")), msg: ErrString("'parquet scan' failed") }

@lmocsi
Copy link

lmocsi commented May 4, 2024

Enhancement request for "Support directory input": #14342

@Smotrov
Copy link

Smotrov commented May 7, 2024

Enhancement request for "Support directory input": #14342

Thank you. To be honest I'm quite surprised. How anyone can use this tool in any serious work without ability to load data from a directory. All tables are partitioned multi file. 👀

@stinodego
Copy link
Member Author

stinodego commented May 7, 2024

You can already achieve this by appending **/*.parquet to your directory, which will read all parquet files in that directory.

Directory support will function slightly differently, as it will do some additional validation, but it's mostly the same.

@lmocsi
Copy link

lmocsi commented May 7, 2024

Yes, it is described in the referenced enhancement request (the /**/*.parquet part).

@Smotrov
Copy link

Smotrov commented May 8, 2024

You can already achieve this by appending **/*.parquet to your directory, which will read all parquet files in that directory.

Thank you but my parquet tiles do not have any extensions. And adding /**/* does not help. It shows following error

const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/**/*
Error: Context { error: ComputeError(ErrString("Object at location partitioned_table_root_dir/year=2024/month=5 not found: Client error with status 404 Not Found: No Body")), msg: ErrString("'parquet scan' failed") }

Meanwhile if I manually set some specific combination of my partition values it works.

const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/year=2024/month=5/*";

But is believe manually adding values it is not how HIVE partitioning supposed to work? Or I'm doing something wrong?

If I'm adding extensions to all files the **/*.parquet trick works well.

@Smotrov
Copy link

Smotrov commented May 8, 2024

Would be grate to add Support Hive partitioning logic in other readers besides Parquet to JSON

@stinodego
Copy link
Member Author

Would be grate to add Support Hive partitioning logic in other readers besides Parquet to JSON

It's on the list!

@talawahtech
Copy link

Tossing in a suggestion to also support reading/writing Pyarrow/Spark compatible parquet _metadata files. See #7707

@stinodego stinodego moved this from In progress to Next in Backlog May 21, 2024
@stinodego stinodego moved this from Next to Candidate in Backlog May 26, 2024
@stinodego stinodego removed their assignment May 26, 2024
@deanm0000
Copy link
Collaborator

#15823 probably belongs here.

@stinodego stinodego moved this from Candidate to In progress in Backlog Jun 17, 2024
@nameexhaustion nameexhaustion self-assigned this Jun 24, 2024
@couling
Copy link

couling commented Jun 29, 2024

@stinodego, Regarding this comment

Ok, I see what you mean. We should support this.

Is there a github issue tracking this? It's not noted in the issue checklist here and, as far as I can see, the trail goes cold with the comment and #15508.

For us, the lack of ability to explicitly set schemas for the table has prevented us using scan_parquet. We are forced to go via scan_pyarrow_dataset instead, which is suboptimal and messy code.

@danielgafni
Copy link

danielgafni commented Jul 8, 2024

Not sure if this is the correct place to write this, but...

For the native partitioned Parquet reader, would it be possible to support loading unions of columns from different partitions when they contain different sets of columns? This would correspond to "diagonal" concat.

For example, when working with limit order book data, daily partitions of orderbook levels have varying amount of columns.

The pyarrow reader silently drops colums which are not present in all partitions at the same time.

I wonder if it would be possible to surface concatenation option to the top-level API in the native polars reader?

@lmocsi
Copy link

lmocsi commented Sep 3, 2024

Some addition to ion-elgreco's comment on schema evolution:
Let's not forget about hive-partitioned parquet files. It seems, that polars is working in a different way, when it comes to hive-partitioned files - it does not chop all files to the schema of the first file, but throws an error:

import polars as pl
import os

# create directories:
path1 = './a/month_code=M202406' 
if not os.path.exists(path1):
    os.makedirs(path1)
path2 = './a/month_code=M202407' 
if not os.path.exists(path2):
    os.makedirs(path2)

# create different partitions:
df = pl.DataFrame({'a': [1,2,3], 'b': ['a','b','c']})
df.write_parquet('./a/month_code=M202406/part_0.parquet')

df2 = pl.DataFrame({'a': [1,2,3], 'b': ['a','a','b'], 'c': [22,33,44]})
df2.write_parquet('./a/month_code=M202407/part_0.parquet')

# try to read data:
df3 = pl.scan_parquet('./a', hive_partitioning=True)
df3.collect()

And I get the error:
SchemaError: schemas contained differing number of columns: 2 != 3

This should be handled, as well (ideally with an option to fill those columns with null values, that do not exist in the current partition, but exists in some other partitions).

@Veiasai
Copy link
Contributor

Veiasai commented Sep 7, 2024

#12041 (comment)

@c-peters c-peters removed the P-goal Priority: aligns with long-term Polars goals label Sep 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-io-partitioning Area: reading/writing (Hive) partitioned files accepted Ready for implementation enhancement New feature or an improvement of an existing feature
Projects
Status: In progress
Development

No branches or pull requests