diff --git a/include/raptor/search/search_partitioned_hibf.hpp b/include/raptor/search/search_partitioned_hibf.hpp index fc3c6963..6806a3a1 100644 --- a/include/raptor/search/search_partitioned_hibf.hpp +++ b/include/raptor/search/search_partitioned_hibf.hpp @@ -6,8 +6,8 @@ // -------------------------------------------------------------------------------------------------- /*!\file - * \brief Provides raptor::search_singular_ibf. - * \author Enrico Seiler + * \brief Provides raptor::search_partitioned_hibf. + * \author Svenja Mehringer */ #pragma once @@ -26,42 +26,26 @@ namespace raptor { template -void search_singular_ibf(search_arguments const & arguments, index_t && index) +void search_partitioned_hibf(search_arguments const & arguments, index_t && index) { - constexpr bool is_ibf = std::same_as>; - - auto cereal_future = std::async(std::launch::async, - [&]() - { - load_index(index, arguments); - }); - seqan3::sequence_file_input> fin{ arguments.query_file}; using record_type = typename decltype(fin)::record_type; std::vector records{}; - sync_out synced_out{arguments}; + robin_hood::unordered_flat_map results; // cache results when searching multiple hibfs raptor::threshold::threshold const thresholder{arguments.make_threshold_parameters()}; + // searching with storing all results in results map auto worker = [&](size_t const start, size_t const extent) { seqan::hibf::serial_timer local_compute_minimiser_timer{}; seqan::hibf::serial_timer local_query_ibf_timer{}; seqan::hibf::serial_timer local_generate_results_timer{}; -#if defined(__clang__) - auto counter = [&index]() -#else - auto counter = [&index, is_ibf]() -#endif - { - if constexpr (is_ibf) - return index.ibf().template counting_agent(); - else - return index.ibf().membership_agent(); - }(); + auto counter = index.ibf().membership_agent(); + std::string result_string{}; std::vector minimiser; @@ -71,9 +55,7 @@ void search_singular_ibf(search_arguments const & arguments, index_t && index) for (auto && [id, seq] : std::span{records.data() + start, extent}) { - result_string.clear(); - result_string += id; - result_string += '\t'; + std::string & result_string = results[id]; // TODO concurrent access?? auto minimiser_view = seq | hash_adaptor | std::views::common; local_compute_minimiser_timer.start(); @@ -83,34 +65,58 @@ void search_singular_ibf(search_arguments const & arguments, index_t && index) size_t const minimiser_count{minimiser.size()}; size_t const threshold = thresholder.get(minimiser_count); - if constexpr (is_ibf) + local_query_ibf_timer.start(); + auto & result = counter.membership_for(minimiser, threshold); // Results contains user bin IDs + local_query_ibf_timer.stop(); + local_generate_results_timer.start(); + for (auto && count : result) { - local_query_ibf_timer.start(); - auto & result = counter.bulk_count(minimiser); - local_query_ibf_timer.stop(); - size_t current_bin{0}; - local_generate_results_timer.start(); - for (auto && count : result) - { - if (count >= threshold) - { - result_string += std::to_string(current_bin); - result_string += ','; - } - ++current_bin; - } + result_string += std::to_string(count); + result_string += ','; } - else + + local_generate_results_timer.stop(); + } + + arguments.compute_minimiser_timer += local_compute_minimiser_timer; + arguments.query_ibf_timer += local_query_ibf_timer; + arguments.generate_results_timer += local_generate_results_timer; + }; + + // searching and writing results to file + auto output_worker = [&](size_t const start, size_t const extent) + { + seqan::hibf::serial_timer local_compute_minimiser_timer{}; + seqan::hibf::serial_timer local_query_ibf_timer{}; + seqan::hibf::serial_timer local_generate_results_timer{}; + + auto counter = return index.ibf().membership_agent(); + std::vector minimiser; + + auto hash_adaptor = seqan3::views::minimiser_hash(arguments.shape, + seqan3::window_size{arguments.window_size}, + seqan3::seed{adjust_seed(arguments.shape_weight)}); + + for (auto && [id, seq] : std::span{records.data() + start, extent}) + { + std::string & result_string = results[id]; + + auto minimiser_view = seq | hash_adaptor | std::views::common; + local_compute_minimiser_timer.start(); + minimiser.assign(minimiser_view.begin(), minimiser_view.end()); + local_compute_minimiser_timer.stop(); + + size_t const minimiser_count{minimiser.size()}; + size_t const threshold = thresholder.get(minimiser_count); + + local_query_ibf_timer.start(); + auto & result = counter.membership_for(minimiser, threshold); // Results contains user bin IDs + local_query_ibf_timer.stop(); + local_generate_results_timer.start(); + for (auto && count : result) { - local_query_ibf_timer.start(); - auto & result = counter.membership_for(minimiser, threshold); // Results contains user bin IDs - local_query_ibf_timer.stop(); - local_generate_results_timer.start(); - for (auto && count : result) - { - result_string += std::to_string(count); - result_string += ','; - } + result_string += std::to_string(count); + result_string += ','; } if (auto & last_char = result_string.back(); last_char == ',') @@ -118,6 +124,8 @@ void search_singular_ibf(search_arguments const & arguments, index_t && index) else result_string += '\n'; + result_string.insert(result_string.begin(), '\t'); + result_string.insert(result_string.begin(), id.begin(), id.end()); synced_out.write(result_string); local_generate_results_timer.stop(); } @@ -127,25 +135,31 @@ void search_singular_ibf(search_arguments const & arguments, index_t && index) arguments.generate_results_timer += local_generate_results_timer; }; - auto write_header = [&]() - { - if constexpr (is_ibf) - return synced_out.write_header(arguments, index.ibf().hash_function_count()); - else - return synced_out.write_header(arguments, index.ibf().ibf_vector[0].hash_function_count()); - }; - for (auto && chunked_records : fin | seqan::stl::views::chunk((1ULL << 20) * 10)) { + assert(arguments.parts > 1); // a partitioned HIBF should have at lease 2 partitions + // prefetch the first partition while query IO is done + auto cereal_future = std::async(std::launch::async, + [&]() + { + load_index(index, arguments, 0); + }); + records.clear(); arguments.query_file_io_timer.start(); std::ranges::move(chunked_records, std::back_inserter(records)); arguments.query_file_io_timer.stop(); cereal_future.get(); - [[maybe_unused]] static bool header_written = write_header(); // called exactly once + synced_out.write_header(arguments, index.ibf().ibf_vector[0].hash_function_count()); + + for (size_t part{}; part < arguments.parts; ++part) + { + do_parallel(worker, records.size(), arguments.threads); + load_index(index, arguments, part); + } - do_parallel(worker, records.size(), arguments.threads); + do_parallel(output_worker, records.size(), arguments.threads); // when last part also write result } } diff --git a/src/build/build_hibf.cpp b/src/build/build_hibf.cpp index abf9e1e2..095a2c78 100644 --- a/src/build/build_hibf.cpp +++ b/src/build/build_hibf.cpp @@ -19,6 +19,32 @@ namespace raptor { +void build_hibf(build_arguments const & arguments, + seqan::hibf::config const & config, + seqan::hibf::layout::layout const & layout) +{ + // Call ctor + seqan::hibf::hierarchical_interleaved_bloom_filter hibf{config, layout}; + + arguments.index_allocation_timer = std::move(hibf.index_allocation_timer); + arguments.user_bin_io_timer = std::move(hibf.user_bin_io_timer); + arguments.merge_kmers_timer = std::move(hibf.merge_kmers_timer); + arguments.fill_ibf_timer = std::move(hibf.fill_ibf_timer); + + arguments.index_allocation_timer.start(); + raptor_index index{window{arguments.window_size}, + arguments.shape, + arguments.parts, + arguments.bin_path, + arguments.fpr, + std::move(hibf)}; + arguments.index_allocation_timer.stop(); + + arguments.store_index_timer.start(); + store_index(arguments.out_path, std::move(index)); + arguments.store_index_timer.stop(); +} + void build_hibf(build_arguments const & arguments) { std::variant, file_reader> reader; @@ -51,47 +77,45 @@ void build_hibf(build_arguments const & arguments) config.input_fn = input_lambda; config.threads = arguments.threads; - // Call ctor - seqan::hibf::hierarchical_interleaved_bloom_filter hibf{config, layout}; - - arguments.index_allocation_timer = std::move(hibf.index_allocation_timer); - arguments.user_bin_io_timer = std::move(hibf.user_bin_io_timer); - arguments.merge_kmers_timer = std::move(hibf.merge_kmers_timer); - arguments.fill_ibf_timer = std::move(hibf.fill_ibf_timer); - - arguments.index_allocation_timer.start(); - raptor_index index{window{arguments.window_size}, - arguments.shape, - arguments.parts, - arguments.bin_path, - arguments.fpr, - std::move(hibf)}; - arguments.index_allocation_timer.stop(); - - arguments.store_index_timer.start(); - store_index(arguments.out_path, std::move(index)); - arguments.store_index_timer.stop(); + build_hibf(arguments, config, layout); } void build_partitioned_hibf(build_arguments const & arguments) { + std::variant, file_reader> reader; + if (arguments.input_is_minimiser) + reader = file_reader{}; + else + reader = file_reader{arguments.shape, arguments.window_size}; - partition_config const cfg{arguments.parts}; - index_factory factory{arguments, cfg}; - std::vector const kmers_per_partition = max_count_per_partition(cfg, arguments); - - for (size_t part = 0; part < arguments.parts; ++part) - { - arguments.bits = seqan::hibf::build::bin_size_in_bits( - {.fpr = arguments.fpr, .hash_count = arguments.hash, .elements = kmers_per_partition[part]}); - auto index = factory(part); - std::filesystem::path out_path{arguments.out_path}; - out_path += "_" + std::to_string(part); - arguments.store_index_timer.start(); - store_index(out_path, std::move(index)); - arguments.store_index_timer.stop(); - } + auto input_lambda = [&arguments, &reader](size_t const user_bin_id, seqan::hibf::insert_iterator it) + { + std::visit( + [&](auto const & reader) + { + reader.hash_into(arguments.bin_path[user_bin_id], it); + }, + reader); + }; + + seqan::hibf::config config{}; + seqan::hibf::layout::layout layout{}; + + std::ifstream layout_stream{arguments.bin_file}; + config.read_from(layout_stream); + config.threads = arguments.threads; + config.input_fn = input_lambda; + for (size_t part = 0; part < arguments.parts; ++part) + { + layout.read_from(layout_stream); // read current layout + + // replace out_path by appending `_[part]` for each index partition. + build_arguments local_arguments = arguments; + local_arguments.out_path = arguments.out_path + "_" + std::to_string(part); + + build_hibf(local_arguments, config, layout); + } } } // namespace raptor diff --git a/src/search/search_hibf.cpp b/src/search/search_hibf.cpp index 2944e72d..67f1cc0e 100644 --- a/src/search/search_hibf.cpp +++ b/src/search/search_hibf.cpp @@ -11,6 +11,7 @@ */ #include +#include #include namespace raptor @@ -19,7 +20,11 @@ namespace raptor void search_hibf(search_arguments const & arguments) { auto index = raptor_index{}; - search_singular_ibf(arguments, std::move(index)); + + if (arguments.parts == 1) + search_singular_ibf(arguments, std::move(index)); + else + search_partitioned_hibf(arguments, std::move(index)); } } // namespace raptor