Skip to content

feat: make output file name of write task consistent with java api #1720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

sharkdtu
Copy link

@sharkdtu sharkdtu commented Feb 25, 2025

Resolves: #1719

The output file name of java api is "{partitionId}-{taskId}-{operationId}-{counterId}.{extension}"

java api ref: https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101

however, the output file name of pyiceberg is "00000-{task_id}-{write_uuid}.{extension}", where task_id is assigned as a counter id for a target file. it is ok for non-distributed writting, but for distributed writting, it's better to be consistent with java api.

background
I am implementing distributed writing to Iceberg tables based on Ray Datasink, where each Ray task is responsible for writing data files(by calling _dataframe_to_data_files), and the driver collects the data files information and commits the transaction. To better differentiate, it would be ideal to have a certain naming convention for the files written by each Ray task under the same transaction:

  • All file names written by Ray tasks share a common UUID.
  • Each file name written by a Ray task includes the Ray task ID.
  • When a single Ray task writes multiple files, the file names should include a counter ID.

@sharkdtu
Copy link
Author

@Fokko Could you please take a look at this PR? Thanks!

@Fokko
Copy link
Contributor

Fokko commented Feb 27, 2025

Hey @sharkdtu Thanks for raising this. What would be the benefit of adding this counter to the output? I think it is unique without the counter.

@sharkdtu
Copy link
Author

sharkdtu commented Feb 28, 2025

Hey @sharkdtu Thanks for raising this. What would be the benefit of adding this counter to the output? I think it is unique without the counter.

@Fokko Sorry for not providing detailed background information. I have updated the pr description, please take a look.
The 'counter id' you mentioned is used to distinguish the scenario where a single task writes multiple files during the distributed writing, just like spark.

@Fokko
Copy link
Contributor

Fokko commented Mar 4, 2025

@sharkdtu Thanks for the added context. Still, I don't think this is the right place to add this.

Would each of the Ray workers call _dataframe_to_data_files? In the worst case, this might lead to partitions * workers number of data files. Instead, the idea behind the notion of Tasks is that they can be fed into a distributed system. The current _dataframe_to_data_files does both the generation of Tasks and writes the Parquet files. How about splitting this into _dataframe_to_write_tasks and _write_tasks_to_parquet, where Ray would implement a distributed variant of the latter. Thoughts?

@@ -1874,7 +1875,7 @@ class WriteTask:
def generate_data_file_filename(self, extension: str) -> str:
Copy link
Contributor

@Fokko Fokko Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

Suggested change
def generate_data_file_filename(self, extension: str) -> str:
def generate_data_file_filename(self, extension: str, task_id: Optional[int] = None) -> str:

To only include the task ID for distributed engines?

@sharkdtu
Copy link
Author

sharkdtu commented Mar 5, 2025

@sharkdtu Thanks for the added context. Still, I don't think this is the right place to add this.

Would each of the Ray workers call _dataframe_to_data_files? In the worst case, this might lead to partitions * workers number of data files. Instead, the idea behind the notion of Tasks is that they can be fed into a distributed system. The current _dataframe_to_data_files does both the generation of Tasks and writes the Parquet files. How about splitting this into _dataframe_to_write_tasks and _write_tasks_to_parquet, where Ray would implement a distributed variant of the latter. Thoughts?

@Fokko Thanks for the comments. I think a WriteTask is not a task of distributed system, It's just like a writer in task. maybe one task has one writer, or one task has multiple writers. The number of data files can be controlled by repartitioning before writting, like spark.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Inconsistent output file name with java api
2 participants