Skip to content

PERF : modify SMJ shuffle file reader to skip validation #15948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

getChan
Copy link
Contributor

@getChan getChan commented May 5, 2025

Which issue does this PR close?

Rationale for this change

#14078 #15454 shows when read shuffle file, skipping validation is effective.

What changes are included in this PR?

when SortMergeJoinExec read shuffle file, skipping validation

Are these changes tested?

  • already exist cargo test passed.
  • Verification completed that the read shuffle file is identical to the written one
  • Benchmark test is not included because it was difficult to limit them to shuffle read scope.

Are there any user-facing changes?

no

@getChan getChan force-pushed the smj-skip-validation branch 3 times, most recently from 9d16d52 to 1bc8ef3 Compare May 5, 2025 17:13
@getChan getChan force-pushed the smj-skip-validation branch from 1bc8ef3 to 11bf46e Compare May 5, 2025 17:34
@alamb
Copy link
Contributor

alamb commented May 5, 2025

Benchmark test is not included because it was difficult to limit them to shuffle read scope.

Did you test it locally? Do you have any performance numbers you can share?

@2010YOUY01
Copy link
Contributor

2010YOUY01 commented May 6, 2025

Benchmark test is not included because it was difficult to limit them to shuffle read scope.

Did you test it locally? Do you have any performance numbers you can share?

I believe the SMJ reader is the only remaining component that hasn't been migrated to SpillManager (the write path of SMJ has already been refactored to use SpillManager). I agree it would be great to also refactor it to use SpillManager.

@getChan
Copy link
Contributor Author

getChan commented May 6, 2025

SMJExec already use SpillManager at spill write

Err(_) if self.runtime_env.disk_manager.tmp_files_enabled() => {
// Spill buffered batch to disk
if let Some(batch) = buffered_batch.batch {
let spill_file = self
.spill_manager
.spill_record_batch_and_finish(
&[batch],
"sort_merge_join_buffered_spill",
)?
.unwrap(); // Operation only return None if no batches are spilled, here we ensure that at least one batch is spilled
buffered_batch.spill_file = Some(spill_file);
buffered_batch.batch = None;

but at spill read, direct read spill files

(Some(spill_file), None) => {
let mut buffered_cols: Vec<ArrayRef> =
Vec::with_capacity(buffered_indices.len());
let file = BufReader::new(File::open(spill_file.path())?);
let reader = unsafe {StreamReader::try_new(file, None)?.with_skip_validation(true)};

I will check if using SpillManager is possible. (another issue)

@getChan
Copy link
Contributor Author

getChan commented May 6, 2025

I just add benchmarks for SMJExec spill read execution.
maybe since it is not limited to the read spill scope, the diff is small.

sort_merge_join_spill/SortMergeJoinExec_spill
                        time:   [321.88 µs 325.75 µs 330.51 µs]
                        change: [-2.8665% -1.3757% +0.1785%] (p = 0.09 > 0.05)
                        No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe

@@ -92,5 +92,4 @@ pub mod udaf {
}

pub mod coalesce;
#[cfg(test)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use test utilities in the bench as well. Is it okay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could avoid doing this by using the actual operators -- see comment above


fn create_test_data() -> SortMergeJoinExec {
let left_batch = build_table_i32(
("a1", &vec![0, 1, 2, 3, 4, 5]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a bencmark that has only 5 rows is likely going to only measure the overhead of plan setup rather than the actual performance of a large join that needs to spill

Perhaps we can increase this size to 1M rows or something (is important that b1 and b2 remain sorted)

Copy link
Contributor Author

@getChan getChan May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for review!
now I run benchmark with 1_048_576 rows. with all row spill.
but benchmark result is little performance improvement...

SortMergeJoinExec_spill time:   [79.761 s 79.858 s 79.974 s]
                        change: [-0.7912% -0.5805% -0.3386%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) high mild

It seems that skip validation is small impact on the overall execution.

@@ -92,5 +92,4 @@ pub mod udaf {
}

pub mod coalesce;
#[cfg(test)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could avoid doing this by using the actual operators -- see comment above

use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::common::collect;
use datafusion_physical_plan::joins::SortMergeJoinExec;
use datafusion_physical_plan::test::{build_table_i32, TestMemoryExec};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that using test only structures like this will means the benchmark is not measuring performance that will map directly to query performance. I think you could move the file from

  • datafusion/physical-plan/benches/sort_merge_join.rs

to

  • datafusion/core/benches/sort_merge_join.rs

And use a SessionContext and actual query to run to be closer.

Here is an example that does something similar: https://github.com/apache/datafusion/blob/main/datafusion/core/benches/filter_query_sql.rs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I moved file to /core/benches/
  2. try to using actual query by SessionContext, but it is hard to simulate.
    Because It satisfied SMJExec+spill execution, but securing enough memory for operations like RepartitionExec was challenging.

@github-actions github-actions bot added the core Core DataFusion crate label May 7, 2025
@getChan getChan force-pushed the smj-skip-validation branch from f0779c9 to 28839e3 Compare May 7, 2025 15:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants