Skip to content

Commit

Permalink
Merge pull request #419 from eseiler/misc/openmp
Browse files Browse the repository at this point in the history
[MISC] Replace execution_handler with OpenMP
  • Loading branch information
eseiler authored Jul 4, 2024
2 parents 6e4929c + db5d744 commit 1d22ba7
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 30 deletions.
2 changes: 1 addition & 1 deletion include/raptor/build/index_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class index_factory
raptor_index<> index{*arguments};
arguments->index_allocation_timer.stop();

auto worker = [&](auto && zipped_view, auto &&)
auto worker = [&](auto && zipped_view)
{
seqan::hibf::serial_timer local_timer{};
auto & ibf = index.ibf();
Expand Down
29 changes: 22 additions & 7 deletions include/raptor/call_parallel_on_bins.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@

#pragma once

#include <seqan3/core/algorithm/detail/execution_handler_parallel.hpp>
#include <algorithm>
#include <bit>
#include <functional>
#include <omp.h>
#include <vector>

#include <hibf/contrib/std/chunk_view.hpp>
#include <hibf/contrib/std/zip_view.hpp>
#include <hibf/misc/divide_and_ceil.hpp>

namespace raptor
{
Expand All @@ -22,12 +27,22 @@ void call_parallel_on_bins(algorithm_t && worker,
std::vector<std::vector<std::string>> const & bin_paths,
uint8_t const threads)
{
// GCOVR_EXCL_START
size_t const chunk_size = std::clamp<size_t>(std::bit_ceil(bin_paths.size() / threads), 8u, 64u);
// GCOVR_EXCL_STOP
auto chunked_view = seqan::stl::views::zip(bin_paths, std::views::iota(0u)) | seqan::stl::views::chunk(chunk_size);
seqan3::detail::execution_handler_parallel executioner{threads};
executioner.bulk_execute(std::move(worker), std::move(chunked_view), []() {});
size_t const number_of_bins = bin_paths.size();
// clang-format off
size_t const chunk_size = std::clamp<size_t>(
std::bit_ceil(seqan::hibf::divide_and_ceil(number_of_bins, threads)),
8u,
64u);
auto chunked_view = seqan::stl::views::zip(bin_paths, std::views::iota(0u, number_of_bins))
| seqan::stl::views::chunk(chunk_size);
// clang-format on
size_t const number_of_chunks = std::ranges::size(chunked_view);

#pragma omp parallel for schedule(dynamic) num_threads(threads)
for (size_t i = 0; i < number_of_chunks; ++i)
{
std::invoke(worker, chunked_view[i]);
}
}

} // namespace raptor
23 changes: 12 additions & 11 deletions include/raptor/search/do_parallel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,29 @@

#pragma once

#include <chrono>
#include <future>
#include <algorithm>
#include <functional>
#include <omp.h>
#include <vector>

#include <hibf/misc/divide_and_ceil.hpp>

namespace raptor
{

template <typename algorithm_t>
void do_parallel(algorithm_t && worker, size_t const num_records, size_t const threads)
{
std::vector<decltype(std::async(std::launch::async, worker, size_t{}, size_t{}))> tasks;
size_t const records_per_thread = num_records / threads;
size_t const chunk_size = seqan::hibf::divide_and_ceil(num_records, threads * threads);
size_t const number_of_chunks = seqan::hibf::divide_and_ceil(num_records, chunk_size);

for (size_t i = 0; i < threads; ++i)
#pragma omp parallel for schedule(dynamic) num_threads(threads)
for (size_t i = 0; i < number_of_chunks; ++i)
{
size_t const start = records_per_thread * i;
size_t const extent = i == (threads - 1) ? num_records - i * records_per_thread : records_per_thread;
tasks.emplace_back(std::async(std::launch::async, worker, start, extent));
size_t const start = chunk_size * i;
size_t const extent = i == (number_of_chunks - 1) ? num_records - i * chunk_size : chunk_size;
std::invoke(worker, start, extent);
}

for (auto && task : tasks)
task.get();
}

} // namespace raptor
1 change: 1 addition & 0 deletions include/raptor/search/search_singular_ibf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#pragma once

#include <future>
#include <random>

#include <seqan3/search/views/minimiser_hash.hpp>
Expand Down
4 changes: 2 additions & 2 deletions src/argument_parsing/compute_bin_size.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ size_t kmer_count_from_minimiser_files(std::vector<std::vector<std::string>> con
}
};

auto worker = [&callback](auto && zipped_view, auto &&)
auto worker = [&callback](auto && zipped_view)
{
std::filesystem::path minimiser_file{};
std::filesystem::path biggest_file{};
Expand Down Expand Up @@ -94,7 +94,7 @@ size_t kmer_count_from_sequence_files(std::vector<std::vector<std::string>> cons
}
};

auto worker = [&callback, &reader](auto && zipped_view, auto &&)
auto worker = [&callback, &reader](auto && zipped_view)
{
for (auto && [file_names, bin_number] : zipped_view)
reader.on_hash(file_names, callback);
Expand Down
2 changes: 1 addition & 1 deletion src/build/max_count_per_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ std::vector<size_t> max_count_per_partition(partition_config const & cfg,
kmers_per_partition[i] = std::max<size_t>(kmers_per_partition[i], kmer_counts[i]);
};

auto worker = [&callback, &reader, &cfg](auto && zipped_view, auto &&)
auto worker = [&callback, &reader, &cfg](auto && zipped_view)
{
std::vector<size_t> max_kmer_counts(cfg.partitions);
std::vector<size_t> kmer_counts(cfg.partitions);
Expand Down
23 changes: 15 additions & 8 deletions src/prepare/compute_minimiser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
* \author Enrico Seiler <enrico.seiler AT fu-berlin.de>
*/

#include <seqan3/core/algorithm/detail/execution_handler_parallel.hpp>
#include <omp.h>

#include <seqan3/io/sequence_file/input.hpp>
#include <seqan3/search/views/minimiser_hash.hpp>

#include <hibf/contrib/robin_hood.hpp>
#include <hibf/contrib/std/chunk_view.hpp>
#include <hibf/contrib/std/zip_view.hpp>
#include <hibf/misc/divide_and_ceil.hpp>

#include <raptor/adjust_seed.hpp>
#include <raptor/dna4_traits.hpp>
Expand Down Expand Up @@ -52,7 +54,7 @@ void compute_minimiser(prepare_arguments const & arguments)
file_reader<file_types::sequence> const reader{arguments.shape, arguments.window_size};
raptor::cutoff const cutoffs{arguments};

auto worker = [&](auto && zipped_view, auto &&)
auto worker = [&](auto && zipped_view)
{
seqan::hibf::serial_timer local_compute_minimiser_timer{};
seqan::hibf::serial_timer local_write_minimiser_timer{};
Expand Down Expand Up @@ -128,12 +130,17 @@ void compute_minimiser(prepare_arguments const & arguments)
arguments.write_header_timer += local_write_header_timer;
};

size_t const chunk_size =
std::max<size_t>(1, std::floor(arguments.bin_path.size() / static_cast<double>(arguments.threads)));
auto chunked_view =
seqan::stl::views::zip(arguments.bin_path, std::views::iota(0u)) | seqan::stl::views::chunk(chunk_size);
seqan3::detail::execution_handler_parallel executioner{arguments.threads};
executioner.bulk_execute(std::move(worker), std::move(chunked_view), []() {});
size_t const number_of_bins = arguments.bin_path.size();
size_t const chunk_size = seqan::hibf::divide_and_ceil(number_of_bins, arguments.threads);
auto chunked_view = seqan::stl::views::zip(arguments.bin_path, std::views::iota(0u, number_of_bins))
| seqan::stl::views::chunk(chunk_size);
size_t const number_of_chunks = std::ranges::size(chunked_view);

#pragma omp parallel for schedule(dynamic) num_threads(arguments.threads)
for (size_t i = 0; i < number_of_chunks; ++i)
{
std::invoke(worker, chunked_view[i]);
}

write_list_file(arguments);
}
Expand Down
1 change: 1 addition & 0 deletions src/search/search_partitioned_ibf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* \author Enrico Seiler <enrico.seiler AT fu-berlin.de>
*/

#include <future>
#include <random>

#include <seqan3/search/views/minimiser_hash.hpp>
Expand Down

0 comments on commit 1d22ba7

Please sign in to comment.