-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelize Parquet Serialization #7562
Conversation
column_index: None, | ||
offset_index: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are really important for query performance, so we probably want to find a way to support these. This may require modifying some of the upstream APIs if they don't expose the necessary information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think to get the bloom filter we would need to get a RowGroupReader and actually read the bloom filter https://docs.rs/parquet/latest/parquet/file/serialized_reader/struct.SerializedRowGroupReader.html#method.get_column_bloom_filter
Similarly, for the column index.
I filed apache/arrow-rs#4823 to potentially create such an API upstream. Maybe we can prototype it in DataFusion first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really cool to see, and thank for your empirical data gathering, I will try to review this over the next few days |
I realized that the benchmarking numbers I ran last night I neglected to run on a release build. I added additional results in the description for a release build. Single threaded serialization is 10x faster in the release build and the relative gains from this parallelization strategy are significantly lower. In release build, #7563 is a decent bit faster than this PR as well. |
This looks amazing -- thank you @devinjdangelo -- I ran out of time to review this PR today but I plan to do so over the weekend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @devinjdangelo -- this is pretty amazing.
I tested this PR locally and I also definitely saw a performance improvement writing parquet files
This branch
(arrow_dev) alamb@MacBook-Pro-8:~/Downloads$ /Users/alamb/Software/target-df2/release/datafusion-cli
DataFusion CLI v31.0.0
❯ copy traces to '/tmp/traces.parquet';
+---------+
| count |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 3.996 seconds.
Main
(arrow_dev) alamb@MacBook-Pro-8:~/Downloads$ datafusion-cli
DataFusion CLI v31.0.0
❯ copy traces to '/tmp/traces.parquet';
+---------+
| count |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 6.637 seconds.
I think at the moment, this PR represents a regression in certain functionality (like memory consumption and parquet file indexes) so I would be wary of merging it in as is.
Instead, what I recommend is
- change the default value of
allow_single_file_parallelism
to false so there is no regression unless the feature is enabled - Work out the other issues (like metadata and buffering) as follow on tickets
- Once we have resolved those concerns we can enable this feature by default
What are you thoughts?
Again, really great work and we appreciate you pushing this feature forward
row_count = output_multiple_parquet_files(writers, data).await?; | ||
} | ||
true => { | ||
if !allow_single_file_parallelism || data.len() <= 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check for data.len() < 1
means that the parallelism is controlled by the input partitioning of the plan, right? I think this is fine (and it is consistent with the rest of DataFusion's parallelism story) but I wanted to double check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is how it is implemented currently. FileSinkExec currently tells the optimizer that it does not benefit from partitioning which is now not true in this specific case.
I think that it would be better to not rely on the input partitioning to decide the parallelism (since the partitions are usually taken to mean the number of desired output files) but I'm not quite sure how to accomplish that yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that we eventually teach the planner that a data sink that can run in parallel would benefit from additional partitioning and then we let the existing planning infrastructure handle the actual work of parallelization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is a possibility. In the future, I'd like to enable parallelizing the serialization of each parquet file when outputting multiple. In that case, the number of output partitions is taken to be the number of output files. To parallelize each output file, the N incoming RecordBatch streams would need to be divided up within ParquetSink itself, similarly to how it is currently done in CsvSink and JsonSink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@devinjdangelo do you think this particular idea needs a ticket? It isn't clear to me that there is a specific task here quite yet -- it is more like "better parallelization of the parquet file writing". I am inclined to hold off filing anything specific here until we have more experience with how this implementation works in practice
/// This is the return type when joining subtasks which are serializing parquet files | ||
/// into memory buffers. The first part of the tuple is the parquet bytes and the | ||
/// second is how many rows were written into the file. | ||
type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A refinement of this idea, might be to extend the type of work @Ted-Jiang is doing in #7570 and use a configurable buffering strategy -- I could see how some users would prefer to use local disk to buffer the files rather than memory, depending on the resources available.
This would be a follow on PR, of course, not this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing to disk as an option would be a cool feature. It might be too slow to spill to disk vs just writing on a single thread, but certainly worth testing that empirically.
I think the best possible solution would consume the sub parquet files incrementally from memory as they are produced, rather than buffering the entire file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that incremental consumption of the files (and applying backpressure if the final stage can't keep up) is likely the the best tradeoff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filed #7591
column_index: None, | ||
offset_index: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think to get the bloom filter we would need to get a RowGroupReader and actually read the bloom filter https://docs.rs/parquet/latest/parquet/file/serialized_reader/struct.SerializedRowGroupReader.html#method.get_column_bloom_filter
Similarly, for the column index.
I filed apache/arrow-rs#4823 to potentially create such an API upstream. Maybe we can prototype it in DataFusion first
let final_writer = writer.unwrap().into_inner()?; | ||
let final_buff = final_writer.buffer.try_lock().unwrap(); | ||
|
||
object_store_writer.write_all(final_buff.as_slice()).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to avoid buffering the entire file into memory prior to writing (or at least make this configurable), but that can be done as a follow on project I think
|
||
let mut writer = None; | ||
let shared_buff = SharedBuffer::new(1048576); | ||
for handle in join_handles { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As written I think this will create all parquet files in parallel even if the consumer can't keep up.
If we could somehow make this work as a futures::Stream
we could use buffered()
to limit the concurrency. I am not sure how important this is in practice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thats correct. As written, there is no back-pressure to prevent buffering the entire parquet file in memory.
Thanks @alamb for the review. I agree with the plan of action you lay out! I also merged in the changes from the alternative implementation PR given that it is about 10% faster and slightly lower memory usage in the release build benchmarks. I set the config default to false and expanded the doc comment to flag the missing features if you enable it. |
let reader = bytes::Bytes::from(out); | ||
row_count += num_rows; | ||
//let reader = File::open(buffer)?; | ||
let metadata = parquet::file::footer::parse_metadata(&reader)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two lower level methods for decoding the footer from raw bytes if that is what you mean?
https://docs.rs/parquet/latest/parquet/file/footer/index.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ultimately, I'd like to be able to call SerializedRowGroupWriter.append_column
as soon as possible -- before any parquet file has been completely serialized in memory. I.e. as a parallel tasks finishes encoding a single column for a single row group, eagerly flush those bytes to the concatenation task, then flush to ObjectStore and discard from memory. If the concatenation task can keep up with all of the parallel serializing tasks, then we could prevent ever buffering an entire row group in memory.
The goal of course being lowering the memory overhead of this approach. Compared to streaming multiple files independently directly to an ObjectStore, the current approach in this PR consumes about double the memory.
row_count = output_multiple_parquet_files(writers, data).await?; | ||
} | ||
true => { | ||
if !allow_single_file_parallelism || data.len() <= 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is a possibility. In the future, I'd like to enable parallelizing the serialization of each parquet file when outputting multiple. In that case, the number of output partitions is taken to be the number of output files. To parallelize each output file, the N incoming RecordBatch streams would need to be divided up within ParquetSink itself, similarly to how it is currently done in CsvSink and JsonSink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also wanted to commend you for your exemplary demonstration of how to break a large project up into smaller PRs. I think it makes the changes both easier to review as well as easier to discuss and work on .
I think this PR is ready to merge -- I will file a few tickets to track the follow on work items we have identified.
cc @metesynnada
/// This is the return type when joining subtasks which are serializing parquet files | ||
/// into memory buffers. The first part of the tuple is the parquet bytes and the | ||
/// second is how many rows were written into the file. | ||
type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that incremental consumption of the files (and applying backpressure if the final stage can't keep up) is likely the the best tradeoff.
Ok, I think all follow on tasks are tracked and this PR is ready to go. Thank you again @devinjdangelo ! |
PR to add ticket references: #7592 |
Which issue does this PR close?
Related to apache/arrow-rs#1718
Rationale for this change
#7452 sped up writing of CSV and JSON files (even when writing only a single large file) by serializing RecordBatches in parallel on multiple threads. This PR attempts to accomplish the same but for Parquet files. This is more challenging vs. CSV/JSON since Parquet serialization is not embarrassingly parallelizable.
#7483 enabled writing different parquet files in parallel, but this PR attempts to allow writing a single parquet file in parallel.
What changes are included in this PR?
This PR implements the following strategy to parallelize writing a single parquet file:
Step 2 to 3 are streaming/incremental, but step 1 is not. In general, all N mini-parquet files are likely to be buffered in memory before they are consumed in step 2/3.
Given the tradeoff between execution time and memory usage in this implementation, this PR also provides a session config which allows turning off single file parallelization. This option is useful in memory constrained environments.
Further steps that could improve this PR:
Benchmarking
The following script is used to write parquet file(s) and capture execution time and peak memory consumption.
Notes for test results:
Test 1, All 16 Columns, ~3.6GB Parquet File (release build)
Execution Time(s)
Peak Memory Consumption(MB)
Test 2, Subset of 3 Columns, ~895MB Parquet File (release build)
Execution Time(s)
Peak Memory Consumption(MB)
Test 1, All 16 Columns, ~3.6GB Parquet File (dev build)
Execution Time(s)
Peak Memory Consumption(MB)
Test 2, Subset of 3 Columns, ~895MB Parquet File (dev build)
Execution Time(s)
Peak Memory Consumption(MB)
Are these changes tested?
Yes by existing tests
Are there any user-facing changes?
Faster single parquet writes and a new config option to enable/disable (on by default).