Skip to content

Scan Iceberg table sorted on partition key without sort order #966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
BTheunissen opened this issue Jul 25, 2024 · 3 comments
Closed

Scan Iceberg table sorted on partition key without sort order #966

BTheunissen opened this issue Jul 25, 2024 · 3 comments

Comments

@BTheunissen
Copy link

BTheunissen commented Jul 25, 2024

Feature Request / Improvement

I am very excited by the recent additions to support PyArrow Record Batch Readers (#786) to better support out of memory reads of very large Iceberg tables.

I currently have a use-case where I want to read a large Iceberg table (1B+) rows, where a sort order key cannot be defined on the table due to the upstream sink processor not supporting it. The table has a partition_by on modified_date_hour, so cannot be used as a strict ordering key.

Example table schema:

user(
  1: op: optional string,
  4: lsn: optional long,
  5: __deleted: optional string,
  6: usr_created_date: optional long,
  7: usr_modified_date: optional long,
  8: modified_date: optional timestamptz,
  9: usr_id: optional string,
  12: table: optional string,
  13: ts_ms: optional long
),
partition by: [modified_date_hour],
sort order: [],
snapshot: Operation.APPEND: id=4107982468622810123, parent_id=4969834436785268669, schema_id=0

Is it possible currently in 0.7.0rc1, or would it be possible with some changes to support reading a PyArrow Batch Reader which fetches batches according to the partition key, and then sorts those batches in-memory by another table column provided to the client, in this case usr_modified_date, to return a stream of PyArrow batches. The idea here is to support being able to efficiently fetch and resume the stream via a connector to extract the records incrementally.

@kevinjqliu
Copy link
Contributor

Taking a stab at this,

support reading a PyArrow Batch Reader

Looking at the code for to_arrow_batch_reader

fetches batches according to the partition key

I believe this can be done in the plan_files function by specifying a partition field as a row_filter in scan

sorts those batches in-memory by another table column provided to the client

It's possible to sort the batches in-memory. However, I think all the data needs to be read into memory in order to perform a sort based on another table column.
Sort based on partition field can be done without reading all the data into memory since we can just work off the table metadata.

@BTheunissen
Copy link
Author

Sort based on partition field can be done without reading all the data into memory since we can just work off the table metadata.

Awesome this is just what I was looking for, I understand that the Arrow Record Batch Reader is a very new integration but I should be able to accomplish what I'm looking for with your tips!

@kevinjqliu
Copy link
Contributor

@BTheunissen happy to help!

When you get a working solution, would you mind posting back here for future reference? Even pseudo-code would be helpful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants