From b5544a7029109f5839eac5431b16a4af2f4b5a41 Mon Sep 17 00:00:00 2001 From: Enrico Seiler Date: Thu, 19 Oct 2023 17:20:45 +0200 Subject: [PATCH] [FEATURE] parallel display_layout general --- src/util/display_layout/general.cpp | 347 ++++++++++++++++++---------- 1 file changed, 222 insertions(+), 125 deletions(-) diff --git a/src/util/display_layout/general.cpp b/src/util/display_layout/general.cpp index a49937d3..dd54b2f4 100644 --- a/src/util/display_layout/general.cpp +++ b/src/util/display_layout/general.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -23,24 +24,63 @@ #include #include +#include +#include #include #include "shared.hpp" -static void print_progress(size_t const percentage) +struct progress_bar { - assert(percentage <= 100u); - std::cerr << '['; - for (size_t i{}; i < percentage; ++i) - std::cerr << '='; - if (percentage < 100u) + progress_bar(size_t const total) : total{total} { - std::cerr << '>'; - for (size_t i{1u}; i < 100u - percentage; ++i) - std::cerr << ' '; + print_progress(0); + } + + void report() + { + // Locks on construction, unlocks on deconstruction. RAII-way to handle mutex lock/unlock. + // Only the thread which acquired the lock can proceed. The other threads have to **wait** until + // the lock is released. + std::lock_guard report_guard{report_mutex}; + ++current; + + // Would also work fine without this special case. However, we only want to announce "100% finished" once we + // are actually finished. + if (current == total) + { + print_progress(100); + return; + } + + size_t const percentage = std::min(99, std::ceil(100 * current / static_cast(total))); + if (percentage > last_printed) + { + last_printed = percentage; + print_progress(percentage); + } + } + + void print_progress(size_t const percentage) const + { + assert(percentage <= 100u); + std::cerr << '['; + for (size_t i{}; i < percentage; ++i) + std::cerr << '='; + if (percentage < 100u) + { + std::cerr << '>'; + for (size_t i{1u}; i < 100u - percentage; ++i) + std::cerr << ' '; + } + std::cerr << "] " << percentage << " %\r" << std::flush; } - std::cerr << "] " << percentage << " %\r" << std::flush; -} + + std::mutex report_mutex{}; + size_t last_printed{}; + size_t current{}; + size_t total{}; +}; // Using two sets and erasing from shared is slower void keep_duplicates(robin_hood::unordered_set & shared, std::vector const & current) @@ -57,6 +97,86 @@ void keep_duplicates(robin_hood::unordered_set & shared, std::vector & records, std::ostream & stream) +{ + std::ranges::sort(records, + [](record const & lhs, record const & rhs) + { + return lhs.tb_index < rhs.tb_index; + }); + // Split bins offset the tb index. At this point of code, the tb index is just consecutive and does not + // count split bins: + // tb_index ub_count kind split_count + // 0 4 merged 1 + // 1 1 split 3 + // 2 3 merged 1 + // Without the correction, the output would be: + // 0 4 merged 1 + // 1 1 split 3 + // 2 1 split 0 + // 3 1 split 0 + // 2 3 merged 1 + // Actually, the split bin will occupy 3 technical bins, i.e. we want: + // 0 4 merged 1 + // 1 1 split 3 + // 2 1 split 0 + // 3 1 split 0 + // 4 3 merged 1 + size_t tb_offset{}; + for (auto & record : records) + { + assert(record.splits > 0u); + record.tb_index += tb_offset; + tb_offset += record.splits - 1u; + } + // Now we can print the results. + for (auto & record : records) + record.write_to(stream); + + stream << std::flush; +} + int execute(config const & cfg) { std::ifstream layout_file{cfg.input}; @@ -76,11 +196,14 @@ int execute(config const & cfg) #endif auto const & hibf_config = chopper_config.hibf_config; + layout_file.close(); std::ofstream output_stream{cfg.output}; if (!output_stream.good() || !output_stream.is_open()) throw std::logic_error{"Could not open file " + cfg.output.string() + " for reading"}; + record::write_header_to(output_stream, cfg.input.c_str()); + // Fetch all file sizes such that sorting by file size doesn't have to access the filesystem too often. // n = filenames.size() // Constructing this vector has `n` filesystem accesses. @@ -109,133 +232,107 @@ int execute(config const & cfg) return first_idx < second_idx || (first_idx == second_idx && filesizes[lhs.idx] < filesizes[rhs.idx]); }); - // Estimates the cardinality of one technical bin. For merged bins, user bins will be iteratively added. - seqan::hibf::sketch::hyperloglog sketch{hibf_config.sketch_bits}; - // Used to determine the exact cardinality for one technical bin. - robin_hood::unordered_set current_kmer_set{}; - // Stores shared k-mers across user bins of a merged technical bin. - robin_hood::unordered_set shared_kmers{}; - // We can't use `shared_kmers.size() == 0` instead of `shared_kmers_initialised`, because keep_duplicates - // will result in a size of 0 when there are no shared k-mers. - bool shared_kmers_initialised{false}; - std::vector current_kmers{}; - size_t ub_count{}; // How many user bins are stored in the current technical bin? Always 1 for split bins. - size_t split_count{}; // Into how many techincal bins is the user bin split? Always 1 for merged bins. - - std::vector bin_kinds( - hibf_config.tmax, - chopper::layout::hibf_statistics::bin_kind::split); - - for (auto const & max_bin : hibf_layout.max_bins) + size_t const total_ub_count = hibf_layout.user_bins.size(); + progress_bar progress{total_ub_count}; + + // Create chunks containing user bin indices for one technical bin. + // We need to convert the chunk_by_view to a std::vector because we need a random access range for the + // parallelisation. As a side effect, std::vector is also a sized range, which makes things even easier. + // E.g., `[0,1,2,3] [4] [5,6,7,8]`: + // TB UBs + // 0 0,1,2,3 + // 1 4 + // 2 5,6,7,8 + std::vector> const chunks = [&]() { - // max_bin.previous_TB_indices.size() == 1: true for merged bins, false for split bins - // max_bin.previous_TB_indices[0]: technical bin index on the top-level - if (max_bin.previous_TB_indices.size() == 1) + auto ub_indices = std::views::iota(size_t{}, total_ub_count); + // Two user bins belong to the same chunk if they are in the same technical bin. + auto predicate = [&](size_t const lhs, size_t const rhs) { - bin_kinds[max_bin.previous_TB_indices[0]] = chopper::layout::hibf_statistics::bin_kind::merged; - } - } - - size_t const total_ub{hibf_layout.user_bins.size()}; // For progress bar - size_t const ub_percentage{std::max(1u, total_ub / 100u)}; // For progress bar, 1 % of all user bins - - size_t current_idx{}; // The current top-level technical bin index - - // Stats file header - output_stream << "# Layout: " << cfg.input.c_str() << '\n' // - << "tb_index\t" - << "exact_size\t" - << "estimated_size\t" - << "shared_size\t" - << "ub_count\t" - << "kind\t" - << "splits\n"; - - auto print_result_line = [&]() - { - bool const is_merged{bin_kinds[current_idx] == chopper::layout::hibf_statistics::bin_kind::merged}; - size_t const avg_kmer_count = (current_kmer_set.size() + split_count - 1u) / split_count; - size_t const sketch_estimate = (sketch.estimate() + split_count - 1u) / split_count; - - for (size_t i{}, total{split_count}; i < total; ++i) - { - output_stream << current_idx + i << '\t' // - << avg_kmer_count << '\t' // - << sketch_estimate << '\t' // - << shared_kmers.size() << '\t' // - << ub_count << '\t' // - << (is_merged ? "merged" : "split") << '\t' // - << split_count << '\n'; - split_count = 0u; // Subsequent split bins display 0, the first split bin displays the actual split count. - } - }; - - // Iterate over all user bins. They are sorted by their technical bin index in the top-level. - // Hence, we can process one top-level technical bin, print the results, clear our stored data and continue with - // the next. - for (size_t ub_index{}; ub_index < hibf_layout.user_bins.size(); ++ub_index) + auto const & lhs_ub = hibf_layout.user_bins[lhs]; + auto const & rhs_ub = hibf_layout.user_bins[rhs]; + // The top-level technical bin index for the current user bin. + // user_bin.previous_TB_indices.size() == 0: true for split bins, false for merged bins + // user_bin.storage_TB_id: technical bin index on the lowest level + // user_bin.previous_TB_indices[0]: technical bin index on the top-level + size_t const lhs_idx = + (lhs_ub.previous_TB_indices.size() == 0) ? lhs_ub.storage_TB_id : lhs_ub.previous_TB_indices[0]; + size_t const rhs_idx = + (rhs_ub.previous_TB_indices.size() == 0) ? rhs_ub.storage_TB_id : rhs_ub.previous_TB_indices[0]; + + return lhs_idx == rhs_idx; + }; + auto chunked_by_tb = ub_indices | seqan::stl::views::chunk_by(std::move(predicate)); + + return seqan::stl::ranges::to>>(chunked_by_tb); + }(); + + std::vector records(chunks.size()); + +#pragma omp parallel for schedule(dynamic) num_threads(cfg.threads) + for (size_t tb_index = 0; tb_index < chunks.size(); ++tb_index) { - if (ub_index % ub_percentage == 0) - print_progress(ub_index / ub_percentage); - - auto const & user_bin = hibf_layout.user_bins[ub_index]; - current_kmers.clear(); - - // The top-level technical bin index for the current user bin. - // user_bin.previous_TB_indices.size() == 0: true for split bins, false for merged bins - // user_bin.storage_TB_id: technical bin index on the lowest level - // user_bin.previous_TB_indices[0]: technical bin index on the top-level - size_t const idx = - (user_bin.previous_TB_indices.size() == 0) ? user_bin.storage_TB_id : user_bin.previous_TB_indices[0]; - - // We processed all user bins that belong to the `current_idx`th top-level technical bin. - // Print results, advance the current index, and reset all user bin-specific data. - if (idx != current_idx) - { - print_result_line(); - sketch.reset(); - current_kmer_set.clear(); - shared_kmers.clear(); - shared_kmers_initialised = false; - ub_count = 0u; - split_count = 0u; - current_idx = idx; - } - - bool const is_merged = bin_kinds[idx] == chopper::layout::hibf_statistics::bin_kind::merged; - // For user bins in a merged bin, `user_bin.number_of_technical_bins` is the number of technical bins on - // the lowest level. A user bin could be part of a merged bin on the top-level, but still be split into - // `user_bin.number_of_technical_bins` many bins on the lowest level. - split_count = is_merged ? 1u : user_bin.number_of_technical_bins; - - // We don't need to keep the current_kmers if there are no shared k-mers to merge them with. - bool const fill_current_kmers = is_merged && !(shared_kmers_initialised && shared_kmers.empty()); - - for (auto const & filename : filenames[user_bin.idx]) + auto const & chunk = chunks[tb_index]; + // Estimates the cardinality of one technical bin. For merged bins, user bins will be iteratively added. + seqan::hibf::sketch::hyperloglog sketch{hibf_config.sketch_bits}; + // Used to determine the exact cardinality for one technical bin. + robin_hood::unordered_set current_kmer_set{}; + // Stores shared k-mers across user bins of a merged technical bin. + robin_hood::unordered_set shared_kmers{}; + // We can't use `shared_kmers.size() == 0` instead of `shared_kmers_initialised`, because keep_duplicates + // will result in a size of 0 when there are no shared k-mers. + bool shared_kmers_initialised{false}; + std::vector current_kmers{}; + // How many user bins are stored in the current technical bin? Always 1 for split bins. + size_t const ub_count{chunk.size()}; + bool const is_merged{ub_count > 1u}; + + for (size_t const ub_index : chunk) { - ++ub_count; // This assumes that each user bin has exactly one associated file. Currently the case. + auto const & user_bin = hibf_layout.user_bins[ub_index]; + current_kmers.clear(); - process_file(filename, current_kmer_set, current_kmers, sketch, fill_current_kmers, chopper_config.k); - } + // We don't need to keep the current_kmers if there are no shared k-mers to merge them with. + bool const fill_current_kmers = is_merged && !(shared_kmers_initialised && shared_kmers.empty()); - // Compute set intersection: shared_kmers = shared_kmers ∩ current_kmers - // This happens for each user bin that belongs to a merged bin. - if (fill_current_kmers) - { - if (!shared_kmers_initialised) + for (auto const & filename : filenames[user_bin.idx]) { - shared_kmers_initialised = true; - shared_kmers.insert(current_kmers.begin(), current_kmers.end()); + process_file(filename, current_kmer_set, current_kmers, sketch, fill_current_kmers, chopper_config.k); } - else + + // Compute set intersection: shared_kmers = shared_kmers ∩ current_kmers + // This happens for each user bin that belongs to a merged bin. + if (fill_current_kmers) { - keep_duplicates(shared_kmers, current_kmers); + if (!shared_kmers_initialised) + { + shared_kmers_initialised = true; + shared_kmers.insert(current_kmers.begin(), current_kmers.end()); + } + else + { + keep_duplicates(shared_kmers, current_kmers); + } } + + progress.report(); } + + // Into how many techincal bins is the user bin split? Always 1 for merged bins. + size_t const split_count{is_merged ? 1u : hibf_layout.user_bins[chunk[0]].number_of_technical_bins}; + size_t const avg_kmer_count = (current_kmer_set.size() + split_count - 1u) / split_count; + size_t const sketch_estimate = (sketch.estimate() + split_count - 1u) / split_count; + + records[tb_index] = record{.tb_index = tb_index, + .exact_size = avg_kmer_count, + .estimated_size = sketch_estimate, + .shared_size = shared_kmers.size(), + .ub_count = ub_count, + .kind = (is_merged ? "merged" : "split"), + .splits = split_count}; } - // Print results for the last top-level technical bin. - print_result_line(); + process_and_write_records_to(records, output_stream); // The progress bar uses a carriage return '\r' to only use a single line. std::cerr << '\n';