Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize Parquet Serialization #7562

Merged
merged 6 commits into from
Sep 18, 2023

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Sep 15, 2023

Which issue does this PR close?

Related to apache/arrow-rs#1718

Rationale for this change

#7452 sped up writing of CSV and JSON files (even when writing only a single large file) by serializing RecordBatches in parallel on multiple threads. This PR attempts to accomplish the same but for Parquet files. This is more challenging vs. CSV/JSON since Parquet serialization is not embarrassingly parallelizable.

#7483 enabled writing different parquet files in parallel, but this PR attempts to allow writing a single parquet file in parallel.

What changes are included in this PR?

This PR implements the following strategy to parallelize writing a single parquet file:

  1. For each incoming RecordBatch stream, serialize a parquet file to an in memory buffer in parallel.
  2. As files from step 1 finish, consume them in order on another task and stitch them together into one large parquet file.
  3. As 2 progresses, flush bytes from a shared buffer to an ObjectStore in multiple parts.

Step 2 to 3 are streaming/incremental, but step 1 is not. In general, all N mini-parquet files are likely to be buffered in memory before they are consumed in step 2/3.

Given the tradeoff between execution time and memory usage in this implementation, this PR also provides a session config which allows turning off single file parallelization. This option is useful in memory constrained environments.

Further steps that could improve this PR:

  • Stream bytes from step 1. to step 2. to limit the increase in memory consumption with increase in parallelism
  • Enable parallelizing serialization of multiple large parquet files. E.g. a system with 32 cores wishes to write out 4 1GB parquet files. Each parquet file could be written with parallelization of 8 (i.e. stitch together 8 smaller parquet files for each of the 4 large parquet files).

Benchmarking

The following script is used to write parquet file(s) and capture execution time and peak memory consumption.

use datafusion::{dataframe::DataFrameWriteOptions, prelude::*};
use datafusion_common::DataFusionError;
use object_store::local::LocalFileSystem;
use peak_alloc::PeakAlloc;
use std::{sync::Arc, time::Instant};
use url::Url;

#[global_allocator]
static PEAK_ALLOC: PeakAlloc = PeakAlloc;

const FILENAME: &str =
    "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";

#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
    let mut runtimes = Vec::new();
    let mut max_memory = Vec::new();
    for parallelism in [64, 32, 16, 8, 4, 1] {
        PEAK_ALLOC.reset_peak_usage();
        let _ctx = SessionContext::new();
        let local = Arc::new(LocalFileSystem::new());
        let local_url = Url::parse("file://local").unwrap();
        _ctx.runtime_env().register_object_store(&local_url, local);

        let _read_options = ParquetReadOptions::default();


        let _df = _ctx
            .read_parquet(FILENAME, _read_options.clone())
            .await
            .unwrap()
            // Optionally select a subset of 3 columns
            //.select_columns(&["l_orderkey", "l_partkey", "l_receiptdate"])?
            .repartition(Partitioning::Hash(vec![col("l_orderkey")], parallelism))? ;

        println!("Number of columns: {}", _df.schema().field_names().len());

        let out_path = format!(
            "file://local/home/dev/arrow-datafusion/test_out/bench{parallelism}.parquet"
        );

        let start3 = Instant::now();
        _df.clone()
            .write_parquet(
                out_path.as_str(),
                DataFrameWriteOptions::new().with_single_file_output(true),
                None,
            )
            .await?;
        let elapsed3 = Instant::now() - start3;
        println!("write as parquet with parallelism {parallelism} to disk took -> {elapsed3:?}");
        runtimes.push(elapsed3);

        let peak_mem = PEAK_ALLOC.peak_usage_as_mb();
        println!(
            "Peak memory usage with parallelism {parallelism} is: {}MB",
            peak_mem
        );
        max_memory.push(peak_mem);
    }

    println!("Runtimes: {:?}", runtimes);
    println!("Peak memory: {:?}", max_memory);

    Ok(())
}

Notes for test results:

Test 1, All 16 Columns, ~3.6GB Parquet File (release build)

Execution Time(s)

Parallelism Output multiple files** Output Single File (flush each subfile) Output Single File (flush each 10MB)
1* 22.48 22.53 22.53
4 12.24 16.39 14.40
8 10.79 13.83 12.37
16 10.52 14.31 12.67
32 10.91 14.10 12.07
64 10.21 14.93 12.97

Peak Memory Consumption(MB)

Parallelism Output multiple files** Output Single File (flush each subfile) Output Single File (flush each 10MB)
1* 1753.3 1758.0 1759.1
4 2445.4 7104.0 7094.3
8 3387.0 7623.1 7611.9
16 5047.6 8262.6 8258.0
32 7683.6 7672.6 7658.2
64 10961.1 10370.2 10323.4

Test 2, Subset of 3 Columns, ~895MB Parquet File (release build)

Execution Time(s)

Parallelism Output multiple files** Output Single File (flush each subfile) Output Single File (flush each 10MB)
1* 3.57 3.31 3.15
4 1.78 2.97 2.37
8 1.45 2.73 2.07
16 1.54 2.98 2.09
32 1.70 2.48 2.10
64 1.89 2.80 2.72

Peak Memory Consumption(MB)

Parallelism Output multiple files** Output Single File (flush each subfile) Output Single File (flush each 10MB)
1* 450.6 447.9 451.6
4 584.5 1788.1 1659.1
8 759.4 1869.1 1939.7
16 1045.8 2058.7 2051.2
32 1564.6 1899.7 1899.7
64 2318.8 1718.3 1726.1

Test 1, All 16 Columns, ~3.6GB Parquet File (dev build)

Execution Time(s)

Parallelism Output multiple files** Output Single File (flush each subfile) Output Single File (flush each 10MB)
1* 277.77 267.60 266.29
4 83.69 88.25 133.41
8 47.36 51.87 97.09
16 35.79 39.92 84.65
32 31.71 35.47 81.64
64 32.23 36.21 82.24

Peak Memory Consumption(MB)

Parallelism Output multiple files** Output Single File (flush each subfile) Output Single File (flush each 10MB)
1* 1753.9 1758.9 1759.7
4 2465.2 7082.4 7092.9
8 3402.5 7611.2 7604.8
16 5145.3 7041.2 7884.3
32 7795.9 7720.1 7720.7
64 10957.3 10480.9 10290.9

Test 2, Subset of 3 Columns, ~895MB Parquet File (dev build)

Execution Time(s)

Parallelism Output multiple files** Output Single File (flush each subfile) Output Single File (flush each 10MB)
1* 42.82 40.52 40.36
4 14.24 14.98 29.73
8 7.77 8.84 20.44
16 5.33 6.59 18.93
32 4.80 5.34 17.40
64 4.98 5.54 17.78

Peak Memory Consumption(MB)

Parallelism Output multiple files** Output Single File (flush each subfile) Output Single File (flush each 10MB)
1* 445.9 449.7 447.7
4 577.1 1789.3 1693.9
8 756.9 1923.4 1865.5
16 1038.2 1940.8 1933.0
32 1554.2 1899.7 1907.0
64 2337.5 1710.4 1726.8

Are these changes tested?

Yes by existing tests

Are there any user-facing changes?

Faster single parquet writes and a new config option to enable/disable (on by default).

Comment on lines +908 to +909
column_index: None,
offset_index: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are really important for query performance, so we probably want to find a way to support these. This may require modifying some of the upstream APIs if they don't expose the necessary information

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to get the bloom filter we would need to get a RowGroupReader and actually read the bloom filter https://docs.rs/parquet/latest/parquet/file/serialized_reader/struct.SerializedRowGroupReader.html#method.get_column_bloom_filter

Similarly, for the column index.

I filed apache/arrow-rs#4823 to potentially create such an API upstream. Maybe we can prototype it in DataFusion first

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #7589 and #7590 to track -- I'll also add a code level comment

@tustvold
Copy link
Contributor

This is really cool to see, and thank for your empirical data gathering, I will try to review this over the next few days

@devinjdangelo
Copy link
Contributor Author

I realized that the benchmarking numbers I ran last night I neglected to run on a release build. I added additional results in the description for a release build. Single threaded serialization is 10x faster in the release build and the relative gains from this parallelization strategy are significantly lower. In release build, #7563 is a decent bit faster than this PR as well.

@alamb
Copy link
Contributor

alamb commented Sep 15, 2023

This looks amazing -- thank you @devinjdangelo -- I ran out of time to review this PR today but I plan to do so over the weekend

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @devinjdangelo -- this is pretty amazing.

I tested this PR locally and I also definitely saw a performance improvement writing parquet files

This branch

(arrow_dev) alamb@MacBook-Pro-8:~/Downloads$ /Users/alamb/Software/target-df2/release/datafusion-cli
DataFusion CLI v31.0.0
❯ copy traces to '/tmp/traces.parquet';
+---------+
| count   |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 3.996 seconds.

Main

(arrow_dev) alamb@MacBook-Pro-8:~/Downloads$ datafusion-cli
DataFusion CLI v31.0.0
❯ copy traces to '/tmp/traces.parquet';
+---------+
| count   |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 6.637 seconds.

I think at the moment, this PR represents a regression in certain functionality (like memory consumption and parquet file indexes) so I would be wary of merging it in as is.

Instead, what I recommend is

  1. change the default value of allow_single_file_parallelism to false so there is no regression unless the feature is enabled
  2. Work out the other issues (like metadata and buffering) as follow on tickets
  3. Once we have resolved those concerns we can enable this feature by default

What are you thoughts?

Again, really great work and we appreciate you pushing this feature forward

row_count = output_multiple_parquet_files(writers, data).await?;
}
true => {
if !allow_single_file_parallelism || data.len() <= 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check for data.len() < 1 means that the parallelism is controlled by the input partitioning of the plan, right? I think this is fine (and it is consistent with the rest of DataFusion's parallelism story) but I wanted to double check

Copy link
Contributor Author

@devinjdangelo devinjdangelo Sep 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is how it is implemented currently. FileSinkExec currently tells the optimizer that it does not benefit from partitioning which is now not true in this specific case.

I think that it would be better to not rely on the input partitioning to decide the parallelism (since the partitions are usually taken to mean the number of desired output files) but I'm not quite sure how to accomplish that yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that we eventually teach the planner that a data sink that can run in parallel would benefit from additional partitioning and then we let the existing planning infrastructure handle the actual work of parallelization

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a possibility. In the future, I'd like to enable parallelizing the serialization of each parquet file when outputting multiple. In that case, the number of output partitions is taken to be the number of output files. To parallelize each output file, the N incoming RecordBatch streams would need to be divided up within ParquetSink itself, similarly to how it is currently done in CsvSink and JsonSink.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devinjdangelo do you think this particular idea needs a ticket? It isn't clear to me that there is a specific task here quite yet -- it is more like "better parallelization of the parquet file writing". I am inclined to hold off filing anything specific here until we have more experience with how this implementation works in practice

/// This is the return type when joining subtasks which are serializing parquet files
/// into memory buffers. The first part of the tuple is the parquet bytes and the
/// second is how many rows were written into the file.
type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A refinement of this idea, might be to extend the type of work @Ted-Jiang is doing in #7570 and use a configurable buffering strategy -- I could see how some users would prefer to use local disk to buffer the files rather than memory, depending on the resources available.

This would be a follow on PR, of course, not this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing to disk as an option would be a cool feature. It might be too slow to spill to disk vs just writing on a single thread, but certainly worth testing that empirically.

I think the best possible solution would consume the sub parquet files incrementally from memory as they are produced, rather than buffering the entire file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that incremental consumption of the files (and applying backpressure if the final stage can't keep up) is likely the the best tradeoff.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filed #7591

Comment on lines +908 to +909
column_index: None,
offset_index: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to get the bloom filter we would need to get a RowGroupReader and actually read the bloom filter https://docs.rs/parquet/latest/parquet/file/serialized_reader/struct.SerializedRowGroupReader.html#method.get_column_bloom_filter

Similarly, for the column index.

I filed apache/arrow-rs#4823 to potentially create such an API upstream. Maybe we can prototype it in DataFusion first

let final_writer = writer.unwrap().into_inner()?;
let final_buff = final_writer.buffer.try_lock().unwrap();

object_store_writer.write_all(final_buff.as_slice()).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to avoid buffering the entire file into memory prior to writing (or at least make this configurable), but that can be done as a follow on project I think


let mut writer = None;
let shared_buff = SharedBuffer::new(1048576);
for handle in join_handles {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written I think this will create all parquet files in parallel even if the consumer can't keep up.

If we could somehow make this work as a futures::Stream we could use buffered() to limit the concurrency. I am not sure how important this is in practice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thats correct. As written, there is no back-pressure to prevent buffering the entire parquet file in memory.

@devinjdangelo
Copy link
Contributor Author

Thanks @alamb for the review. I agree with the plan of action you lay out!

I also merged in the changes from the alternative implementation PR given that it is about 10% faster and slightly lower memory usage in the release build benchmarks. I set the config default to false and expanded the doc comment to flag the missing features if you enable it.

let reader = bytes::Bytes::from(out);
row_count += num_rows;
//let reader = File::open(buffer)?;
let metadata = parquet::file::footer::parse_metadata(&reader)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold @alamb how hard would it be for this call to work on a stream of bytes rather than a fully buffered parquet files? That way we could eliminate the need for the parallel tasks to fully buffer the sub parquet files.

I'm not sure if an api exists for this in arrow-rs already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two lower level methods for decoding the footer from raw bytes if that is what you mean?

https://docs.rs/parquet/latest/parquet/file/footer/index.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately, I'd like to be able to call SerializedRowGroupWriter.append_column as soon as possible -- before any parquet file has been completely serialized in memory. I.e. as a parallel tasks finishes encoding a single column for a single row group, eagerly flush those bytes to the concatenation task, then flush to ObjectStore and discard from memory. If the concatenation task can keep up with all of the parallel serializing tasks, then we could prevent ever buffering an entire row group in memory.

The goal of course being lowering the memory overhead of this approach. Compared to streaming multiple files independently directly to an ObjectStore, the current approach in this PR consumes about double the memory.

row_count = output_multiple_parquet_files(writers, data).await?;
}
true => {
if !allow_single_file_parallelism || data.len() <= 1 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a possibility. In the future, I'd like to enable parallelizing the serialization of each parquet file when outputting multiple. In that case, the number of output partitions is taken to be the number of output files. To parallelize each output file, the N incoming RecordBatch streams would need to be divided up within ParquetSink itself, similarly to how it is currently done in CsvSink and JsonSink.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wanted to commend you for your exemplary demonstration of how to break a large project up into smaller PRs. I think it makes the changes both easier to review as well as easier to discuss and work on .

Screenshot 2023-09-18 at 12 32 16 PM

I think this PR is ready to merge -- I will file a few tickets to track the follow on work items we have identified.

cc @metesynnada

/// This is the return type when joining subtasks which are serializing parquet files
/// into memory buffers. The first part of the tuple is the parquet bytes and the
/// second is how many rows were written into the file.
type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that incremental consumption of the files (and applying backpressure if the final stage can't keep up) is likely the the best tradeoff.

@alamb
Copy link
Contributor

alamb commented Sep 18, 2023

Ok, I think all follow on tasks are tracked and this PR is ready to go. Thank you again @devinjdangelo !

@alamb alamb merged commit 5718a3f into apache:main Sep 18, 2023
22 checks passed
@alamb
Copy link
Contributor

alamb commented Sep 18, 2023

PR to add ticket references: #7592

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate enhancement New feature or request sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants