Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make --no-mmap calls still use parallelism when filesizes are large #361

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions b3sum/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions b3sum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ clap = { version = "4.0.8", features = ["derive", "wrap_help"] }
hex = "0.4.0"
memmap2 = "0.7.0"
rayon = "1.2.1"
read_chunks = "0.2.0"
wild = "2.0.3"

[dev-dependencies]
Expand Down
60 changes: 59 additions & 1 deletion b3sum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,62 @@ impl Args {
}
}

/// Hashes a reader in parallel into a hasher using rayon, optionally takes a length to heuristically decide if
/// we are better off sequentially hashing
fn hash_reader_parallel(
hasher: &mut blake3::Hasher,
reader: &mut (impl Read + Send),
len: Option<u64>,
) -> Result<()> {
// we use read_chunks here because I(ultrabear) coded it, and know it is as safe as the code here.
// TODO make this just a function in main.rs instead of an extra dep,
// but only worth doing if we want to merge the PR, this is a proof of concept
use read_chunks::ReadExt;

// 2MiB of total buffer is not an extreme amount of memory, and performance is a good bit
// better with that amount, increasing this will probably equal more performance up to a
// certain point, and there might be a "magic" size to target (where blake3 multithreading can
// split evenly and have maximum possible performance without using too much memory)
const BUF_SIZE: usize = 1024 * 1024;
// if anything is under 1MiB we don't want to put it through multithreading, this is probably an
// overshoot of where multithreading is effective (512KiB was found to also have a performance
// increase), but it is essential that we do not undershoot where it is effective, and risk
// having worse performance than before this codechange on small files.
const MIN_SIZE: u64 = BUF_SIZE as u64;

// fallback to update_reader if the length is too small
if len.is_some_and(|s| s < MIN_SIZE) {
hasher.update_reader(reader)?;
return Ok(());
}

// allocate the double buffers and their return memory locations
let mut hashing = vec![0; BUF_SIZE];
let mut hashing_res = reader.keep_reading(&mut *hashing)?;

let mut reading_to = vec![0; BUF_SIZE];
let mut reading_res = None::<io::Result<usize>>;

while hashing_res != 0 {
// by scoping we guarantee that all tasks complete, and can get our mutable references back
// to do error handling and buffer swapping
rayon::scope(|s| {
s.spawn(|_| {
reading_res = Some(reader.keep_reading(&mut *reading_to));
});

s.spawn(|_| {
hasher.update_rayon(&hashing[..hashing_res]);
});
});

hashing_res = reading_res.take().unwrap()?;
(hashing, reading_to) = (reading_to, hashing);
}

Ok(())
}

fn hash_path(args: &Args, path: &Path) -> Result<blake3::OutputReader> {
let mut hasher = args.base_hasher.clone();
if path == Path::new("-") {
Expand All @@ -171,7 +227,9 @@ fn hash_path(args: &Args, path: &Path) -> Result<blake3::OutputReader> {
}
hasher.update_reader(io::stdin().lock())?;
} else if args.no_mmap() {
hasher.update_reader(File::open(path)?)?;
let length = std::fs::metadata(path)?.len();

hash_reader_parallel(&mut hasher, &mut File::open(path)?, Some(length))?;
} else {
// The fast path: Try to mmap the file and hash it with multiple threads.
hasher.update_mmap_rayon(path)?;
Expand Down
50 changes: 50 additions & 0 deletions b3sum/tests/cli_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,53 @@ fn test_globbing() {
.unwrap();
assert_eq!(expected, output);
}

#[test]
// tests that hash_reader_parallel fallsback correctly and hashes multithreaded correctly
fn test_hash_reader_parallel() {
let dir = tempfile::tempdir().unwrap();

let file1 = dir.path().join("file1");
fs::write(&file1, b"foobar").unwrap();

let expected = blake3::hash(b"foobar");

let output = cmd!(b3sum_exe(), "--no-mmap", &file1)
.stdout_capture()
.run()
.unwrap()
.stdout;

let expected = format!("{} {}\n", expected.to_hex(), file1.display());

// fallback test
assert_eq!(output, expected.as_bytes());

// tests multithread gives correct results

let file2 = dir.path().join("file2");
let mut f = fs::File::create(&file2).unwrap();

let mut expected = blake3::Hasher::new();

// 20_000 * 62 is 1.2MiB, which passes the threshold of using multithreading

for _ in 0..20_000 {
// we use a big string here to avoid looping many times, which is bad for opt-level=0
const WRITE: &[u8] = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
assert_eq!(WRITE.len(), 62);

f.write_all(WRITE).unwrap();
expected.update(WRITE);
}

let output = cmd!(b3sum_exe(), "--no-mmap", &file2)
.stdout_capture()
.run()
.unwrap()
.stdout;

let expected = format!("{} {}\n", expected.finalize().to_hex(), file2.display());

assert_eq!(output, expected.as_bytes());
}