From db5d74438efbda2e2993c28afb860fd1f053b9fd Mon Sep 17 00:00:00 2001 From: Enrico Seiler Date: Thu, 4 Jul 2024 14:17:07 +0200 Subject: [PATCH] [MISC] Replace execution_handler with OpenMP --- include/raptor/build/index_factory.hpp | 2 +- include/raptor/call_parallel_on_bins.hpp | 29 ++++++++++++++----- include/raptor/search/do_parallel.hpp | 23 ++++++++------- include/raptor/search/search_singular_ibf.hpp | 1 + src/argument_parsing/compute_bin_size.cpp | 4 +-- src/build/max_count_per_partition.cpp | 2 +- src/prepare/compute_minimiser.cpp | 23 ++++++++++----- src/search/search_partitioned_ibf.cpp | 1 + 8 files changed, 55 insertions(+), 30 deletions(-) diff --git a/include/raptor/build/index_factory.hpp b/include/raptor/build/index_factory.hpp index 79c3ba1a..e7cf1023 100644 --- a/include/raptor/build/index_factory.hpp +++ b/include/raptor/build/index_factory.hpp @@ -68,7 +68,7 @@ class index_factory raptor_index<> index{*arguments}; arguments->index_allocation_timer.stop(); - auto worker = [&](auto && zipped_view, auto &&) + auto worker = [&](auto && zipped_view) { seqan::hibf::serial_timer local_timer{}; auto & ibf = index.ibf(); diff --git a/include/raptor/call_parallel_on_bins.hpp b/include/raptor/call_parallel_on_bins.hpp index ca26a5aa..8c8f7354 100644 --- a/include/raptor/call_parallel_on_bins.hpp +++ b/include/raptor/call_parallel_on_bins.hpp @@ -9,10 +9,15 @@ #pragma once -#include +#include +#include +#include +#include +#include #include #include +#include namespace raptor { @@ -22,12 +27,22 @@ void call_parallel_on_bins(algorithm_t && worker, std::vector> const & bin_paths, uint8_t const threads) { - // GCOVR_EXCL_START - size_t const chunk_size = std::clamp(std::bit_ceil(bin_paths.size() / threads), 8u, 64u); - // GCOVR_EXCL_STOP - auto chunked_view = seqan::stl::views::zip(bin_paths, std::views::iota(0u)) | seqan::stl::views::chunk(chunk_size); - seqan3::detail::execution_handler_parallel executioner{threads}; - executioner.bulk_execute(std::move(worker), std::move(chunked_view), []() {}); + size_t const number_of_bins = bin_paths.size(); + // clang-format off + size_t const chunk_size = std::clamp( + std::bit_ceil(seqan::hibf::divide_and_ceil(number_of_bins, threads)), + 8u, + 64u); + auto chunked_view = seqan::stl::views::zip(bin_paths, std::views::iota(0u, number_of_bins)) + | seqan::stl::views::chunk(chunk_size); + // clang-format on + size_t const number_of_chunks = std::ranges::size(chunked_view); + +#pragma omp parallel for schedule(dynamic) num_threads(threads) + for (size_t i = 0; i < number_of_chunks; ++i) + { + std::invoke(worker, chunked_view[i]); + } } } // namespace raptor diff --git a/include/raptor/search/do_parallel.hpp b/include/raptor/search/do_parallel.hpp index e1dafc26..b7d0d545 100644 --- a/include/raptor/search/do_parallel.hpp +++ b/include/raptor/search/do_parallel.hpp @@ -9,28 +9,29 @@ #pragma once -#include -#include +#include +#include +#include #include +#include + namespace raptor { template void do_parallel(algorithm_t && worker, size_t const num_records, size_t const threads) { - std::vector tasks; - size_t const records_per_thread = num_records / threads; + size_t const chunk_size = seqan::hibf::divide_and_ceil(num_records, threads * threads); + size_t const number_of_chunks = seqan::hibf::divide_and_ceil(num_records, chunk_size); - for (size_t i = 0; i < threads; ++i) +#pragma omp parallel for schedule(dynamic) num_threads(threads) + for (size_t i = 0; i < number_of_chunks; ++i) { - size_t const start = records_per_thread * i; - size_t const extent = i == (threads - 1) ? num_records - i * records_per_thread : records_per_thread; - tasks.emplace_back(std::async(std::launch::async, worker, start, extent)); + size_t const start = chunk_size * i; + size_t const extent = i == (number_of_chunks - 1) ? num_records - i * chunk_size : chunk_size; + std::invoke(worker, start, extent); } - - for (auto && task : tasks) - task.get(); } } // namespace raptor diff --git a/include/raptor/search/search_singular_ibf.hpp b/include/raptor/search/search_singular_ibf.hpp index 0bc02b12..d817f4f7 100644 --- a/include/raptor/search/search_singular_ibf.hpp +++ b/include/raptor/search/search_singular_ibf.hpp @@ -9,6 +9,7 @@ #pragma once +#include #include #include diff --git a/src/argument_parsing/compute_bin_size.cpp b/src/argument_parsing/compute_bin_size.cpp index 5b6ff423..73d1e7a7 100644 --- a/src/argument_parsing/compute_bin_size.cpp +++ b/src/argument_parsing/compute_bin_size.cpp @@ -39,7 +39,7 @@ size_t kmer_count_from_minimiser_files(std::vector> con } }; - auto worker = [&callback](auto && zipped_view, auto &&) + auto worker = [&callback](auto && zipped_view) { std::filesystem::path minimiser_file{}; std::filesystem::path biggest_file{}; @@ -94,7 +94,7 @@ size_t kmer_count_from_sequence_files(std::vector> cons } }; - auto worker = [&callback, &reader](auto && zipped_view, auto &&) + auto worker = [&callback, &reader](auto && zipped_view) { for (auto && [file_names, bin_number] : zipped_view) reader.on_hash(file_names, callback); diff --git a/src/build/max_count_per_partition.cpp b/src/build/max_count_per_partition.cpp index c47e04cd..d69360b1 100644 --- a/src/build/max_count_per_partition.cpp +++ b/src/build/max_count_per_partition.cpp @@ -38,7 +38,7 @@ std::vector max_count_per_partition(partition_config const & cfg, kmers_per_partition[i] = std::max(kmers_per_partition[i], kmer_counts[i]); }; - auto worker = [&callback, &reader, &cfg](auto && zipped_view, auto &&) + auto worker = [&callback, &reader, &cfg](auto && zipped_view) { std::vector max_kmer_counts(cfg.partitions); std::vector kmer_counts(cfg.partitions); diff --git a/src/prepare/compute_minimiser.cpp b/src/prepare/compute_minimiser.cpp index abbc379f..5e9d36d2 100644 --- a/src/prepare/compute_minimiser.cpp +++ b/src/prepare/compute_minimiser.cpp @@ -7,13 +7,15 @@ * \author Enrico Seiler */ -#include +#include + #include #include #include #include #include +#include #include #include @@ -52,7 +54,7 @@ void compute_minimiser(prepare_arguments const & arguments) file_reader const reader{arguments.shape, arguments.window_size}; raptor::cutoff const cutoffs{arguments}; - auto worker = [&](auto && zipped_view, auto &&) + auto worker = [&](auto && zipped_view) { seqan::hibf::serial_timer local_compute_minimiser_timer{}; seqan::hibf::serial_timer local_write_minimiser_timer{}; @@ -128,12 +130,17 @@ void compute_minimiser(prepare_arguments const & arguments) arguments.write_header_timer += local_write_header_timer; }; - size_t const chunk_size = - std::max(1, std::floor(arguments.bin_path.size() / static_cast(arguments.threads))); - auto chunked_view = - seqan::stl::views::zip(arguments.bin_path, std::views::iota(0u)) | seqan::stl::views::chunk(chunk_size); - seqan3::detail::execution_handler_parallel executioner{arguments.threads}; - executioner.bulk_execute(std::move(worker), std::move(chunked_view), []() {}); + size_t const number_of_bins = arguments.bin_path.size(); + size_t const chunk_size = seqan::hibf::divide_and_ceil(number_of_bins, arguments.threads); + auto chunked_view = seqan::stl::views::zip(arguments.bin_path, std::views::iota(0u, number_of_bins)) + | seqan::stl::views::chunk(chunk_size); + size_t const number_of_chunks = std::ranges::size(chunked_view); + +#pragma omp parallel for schedule(dynamic) num_threads(arguments.threads) + for (size_t i = 0; i < number_of_chunks; ++i) + { + std::invoke(worker, chunked_view[i]); + } write_list_file(arguments); } diff --git a/src/search/search_partitioned_ibf.cpp b/src/search/search_partitioned_ibf.cpp index 0d506f41..26c83595 100644 --- a/src/search/search_partitioned_ibf.cpp +++ b/src/search/search_partitioned_ibf.cpp @@ -7,6 +7,7 @@ * \author Enrico Seiler */ +#include #include #include