Skip to content

Commit

Permalink
[FEATURE] parallel display_layout general
Browse files Browse the repository at this point in the history
  • Loading branch information
eseiler committed Oct 19, 2023
1 parent 0bcf97a commit d70d1dc
Showing 1 changed file with 187 additions and 127 deletions.
314 changes: 187 additions & 127 deletions src/util/display_layout/general.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <algorithm>
#include <cassert>
#include <cmath>
#include <cstddef>
#include <cstdint>
#include <filesystem>
Expand All @@ -23,24 +24,57 @@
#include <chopper/layout/input.hpp>

#include <hibf/contrib/robin_hood.hpp>
#include <hibf/contrib/std/chunk_by_view.hpp>
#include <hibf/contrib/std/to.hpp>
#include <hibf/sketch/hyperloglog.hpp>

#include "shared.hpp"

static void print_progress(size_t const percentage)
struct progress_bar
{
assert(percentage <= 100u);
std::cerr << '[';
for (size_t i{}; i < percentage; ++i)
std::cerr << '=';
if (percentage < 100u)
progress_bar(size_t const total) : total{total}
{
std::cerr << '>';
for (size_t i{1u}; i < 100u - percentage; ++i)
std::cerr << ' ';
print_progress(0);
}

void report()
{
std::lock_guard report_guard{report_mutex};
++current;
if (current == total)
{
print_progress(100);
return;
}

size_t const percentage = std::min<size_t>(99, std::ceil(100 * current / static_cast<double>(total)));
if (percentage > last_printed)
{
last_printed = percentage;
print_progress(percentage);
}
}

void print_progress(size_t const percentage) const
{
assert(percentage <= 100u);
std::cerr << '[';
for (size_t i{}; i < percentage; ++i)
std::cerr << '=';
if (percentage < 100u)
{
std::cerr << '>';
for (size_t i{1u}; i < 100u - percentage; ++i)
std::cerr << ' ';
}
std::cerr << "] " << percentage << " %\r" << std::flush;
}
std::cerr << "] " << percentage << " %\r" << std::flush;
}

std::mutex report_mutex{};
size_t last_printed{};
size_t current{};
size_t total{};
};

// Using two sets and erasing from shared is slower
void keep_duplicates(robin_hood::unordered_set<uint64_t> & shared, std::vector<uint64_t> const & current)
Expand All @@ -57,6 +91,68 @@ void keep_duplicates(robin_hood::unordered_set<uint64_t> & shared, std::vector<u
shared = std::move(result);
}

struct record
{
size_t tb_index{};
size_t exact_size{};
size_t estimated_size{};
size_t shared_size{};
size_t ub_count{};
std::string_view kind{};
size_t splits{};

void write_to(std::ostream & stream) const
{
size_t split_count{splits};
for (size_t i{}, total{split_count}; i < total; ++i)
{
stream << tb_index + i << '\t' //
<< exact_size << '\t' //
<< estimated_size << '\t' //
<< shared_size << '\t' //
<< ub_count << '\t' //
<< kind << '\t' //
<< split_count << '\n';
// Subsequent split bins display 0, the first split bin displays the actual split count.
split_count = 0u;
}
}

static void write_header_to(std::ostream & stream, std::string_view const layout_filename)
{
stream << "# Layout: " << layout_filename << '\n' //
<< "tb_index\t"
<< "exact_size\t"
<< "estimated_size\t"
<< "shared_size\t"
<< "ub_count\t"
<< "kind\t"
<< "splits\n";
stream << std::flush;
}
};

void process_and_write_records_to(std::vector<record> & records, std::ostream & stream)
{
std::ranges::sort(records,
[](record const & lhs, record const & rhs)
{
return lhs.tb_index < rhs.tb_index;
});
// Split bins offset the tb index.
size_t tb_offset{};
for (auto & record : records)
{
assert(record.splits > 0u);
record.tb_index += tb_offset;
tb_offset += record.splits - 1u;
}
for (auto & record : records)
record.write_to(stream);

stream << std::flush;
}

int execute(config const & cfg)
{
std::ifstream layout_file{cfg.input};
Expand All @@ -81,6 +177,8 @@ int execute(config const & cfg)
if (!output_stream.good() || !output_stream.is_open())
throw std::logic_error{"Could not open file " + cfg.output.string() + " for reading"};

record::write_header_to(output_stream, cfg.input.c_str());

// Fetch all file sizes such that sorting by file size doesn't have to access the filesystem too often.
// n = filenames.size()
// Constructing this vector has `n` filesystem accesses.
Expand Down Expand Up @@ -109,133 +207,95 @@ int execute(config const & cfg)
return first_idx < second_idx || (first_idx == second_idx && filesizes[lhs.idx] < filesizes[rhs.idx]);
});

// Estimates the cardinality of one technical bin. For merged bins, user bins will be iteratively added.
seqan::hibf::sketch::hyperloglog sketch{hibf_config.sketch_bits};
// Used to determine the exact cardinality for one technical bin.
robin_hood::unordered_set<uint64_t> current_kmer_set{};
// Stores shared k-mers across user bins of a merged technical bin.
robin_hood::unordered_set<uint64_t> shared_kmers{};
// We can't use `shared_kmers.size() == 0` instead of `shared_kmers_initialised`, because keep_duplicates
// will result in a size of 0 when there are no shared k-mers.
bool shared_kmers_initialised{false};
std::vector<uint64_t> current_kmers{};
size_t ub_count{}; // How many user bins are stored in the current technical bin? Always 1 for split bins.
size_t split_count{}; // Into how many techincal bins is the user bin split? Always 1 for merged bins.

std::vector<chopper::layout::hibf_statistics::bin_kind> bin_kinds(
hibf_config.tmax,
chopper::layout::hibf_statistics::bin_kind::split);

for (auto const & max_bin : hibf_layout.max_bins)
{
// max_bin.previous_TB_indices.size() == 1: true for merged bins, false for split bins
// max_bin.previous_TB_indices[0]: technical bin index on the top-level
if (max_bin.previous_TB_indices.size() == 1)
{
bin_kinds[max_bin.previous_TB_indices[0]] = chopper::layout::hibf_statistics::bin_kind::merged;
}
}

size_t const total_ub{hibf_layout.user_bins.size()}; // For progress bar
size_t const ub_percentage{std::max<size_t>(1u, total_ub / 100u)}; // For progress bar, 1 % of all user bins

size_t current_idx{}; // The current top-level technical bin index

// Stats file header
output_stream << "# Layout: " << cfg.input.c_str() << '\n' //
<< "tb_index\t"
<< "exact_size\t"
<< "estimated_size\t"
<< "shared_size\t"
<< "ub_count\t"
<< "kind\t"
<< "splits\n";

auto print_result_line = [&]()
{
bool const is_merged{bin_kinds[current_idx] == chopper::layout::hibf_statistics::bin_kind::merged};
size_t const avg_kmer_count = (current_kmer_set.size() + split_count - 1u) / split_count;
size_t const sketch_estimate = (sketch.estimate() + split_count - 1u) / split_count;

for (size_t i{}, total{split_count}; i < total; ++i)
{
output_stream << current_idx + i << '\t' //
<< avg_kmer_count << '\t' //
<< sketch_estimate << '\t' //
<< shared_kmers.size() << '\t' //
<< ub_count << '\t' //
<< (is_merged ? "merged" : "split") << '\t' //
<< split_count << '\n';
split_count = 0u; // Subsequent split bins display 0, the first split bin displays the actual split count.
}
};

// Iterate over all user bins. They are sorted by their technical bin index in the top-level.
// Hence, we can process one top-level technical bin, print the results, clear our stored data and continue with
// the next.
for (size_t ub_index{}; ub_index < hibf_layout.user_bins.size(); ++ub_index)
// Create chunks containing user bin indices for one technical bin.
// E.g., `[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16] [17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]`:
// The first technical bin contains user bins [0,...,16], the second technical bin contains user bins [17,...,32].
auto chunks = seqan::stl::ranges::to<std::vector<std::vector<size_t>>>(
std::views::iota(size_t{}, hibf_layout.user_bins.size())
| seqan::stl::views::chunk_by(
[&](size_t const lhs, size_t const rhs)
{
auto const & lhs_ub = hibf_layout.user_bins[lhs];
auto const & rhs_ub = hibf_layout.user_bins[rhs];
// The top-level technical bin index for the current user bin.
// user_bin.previous_TB_indices.size() == 0: true for split bins, false for merged bins
// user_bin.storage_TB_id: technical bin index on the lowest level
// user_bin.previous_TB_indices[0]: technical bin index on the top-level
size_t lhs_idx =
(lhs_ub.previous_TB_indices.size() == 0) ? lhs_ub.storage_TB_id : lhs_ub.previous_TB_indices[0];
size_t rhs_idx =
(rhs_ub.previous_TB_indices.size() == 0) ? rhs_ub.storage_TB_id : rhs_ub.previous_TB_indices[0];
// Two user bins belong to the same chunk if they are in the same technical bin.
return lhs_idx == rhs_idx;
}));

std::vector<record> records(chunks.size());
progress_bar progress{hibf_layout.user_bins.size()};

#pragma omp parallel for schedule(dynamic) num_threads(cfg.threads)
for (size_t tb_index = 0; tb_index < chunks.size(); ++tb_index)
{
if (ub_index % ub_percentage == 0)
print_progress(ub_index / ub_percentage);

auto const & user_bin = hibf_layout.user_bins[ub_index];
current_kmers.clear();

// The top-level technical bin index for the current user bin.
// user_bin.previous_TB_indices.size() == 0: true for split bins, false for merged bins
// user_bin.storage_TB_id: technical bin index on the lowest level
// user_bin.previous_TB_indices[0]: technical bin index on the top-level
size_t const idx =
(user_bin.previous_TB_indices.size() == 0) ? user_bin.storage_TB_id : user_bin.previous_TB_indices[0];

// We processed all user bins that belong to the `current_idx`th top-level technical bin.
// Print results, advance the current index, and reset all user bin-specific data.
if (idx != current_idx)
{
print_result_line();
sketch.reset();
current_kmer_set.clear();
shared_kmers.clear();
shared_kmers_initialised = false;
ub_count = 0u;
split_count = 0u;
current_idx = idx;
}

bool const is_merged = bin_kinds[idx] == chopper::layout::hibf_statistics::bin_kind::merged;
// For user bins in a merged bin, `user_bin.number_of_technical_bins` is the number of technical bins on
// the lowest level. A user bin could be part of a merged bin on the top-level, but still be split into
// `user_bin.number_of_technical_bins` many bins on the lowest level.
split_count = is_merged ? 1u : user_bin.number_of_technical_bins;

// We don't need to keep the current_kmers if there are no shared k-mers to merge them with.
bool const fill_current_kmers = is_merged && !(shared_kmers_initialised && shared_kmers.empty());

for (auto const & filename : filenames[user_bin.idx])
auto const & chunk = chunks[tb_index];
// Estimates the cardinality of one technical bin. For merged bins, user bins will be iteratively added.
seqan::hibf::sketch::hyperloglog sketch{hibf_config.sketch_bits};
// Used to determine the exact cardinality for one technical bin.
robin_hood::unordered_set<uint64_t> current_kmer_set{};
// Stores shared k-mers across user bins of a merged technical bin.
robin_hood::unordered_set<uint64_t> shared_kmers{};
// We can't use `shared_kmers.size() == 0` instead of `shared_kmers_initialised`, because keep_duplicates
// will result in a size of 0 when there are no shared k-mers.
bool shared_kmers_initialised{false};
std::vector<uint64_t> current_kmers{};
// How many user bins are stored in the current technical bin? Always 1 for split bins.
size_t const ub_count{chunk.size()};
bool const is_merged{ub_count > 1u};

for (size_t const ub_index : chunk)
{
++ub_count; // This assumes that each user bin has exactly one associated file. Currently the case.
auto const & user_bin = hibf_layout.user_bins[ub_index];
current_kmers.clear();

process_file(filename, current_kmer_set, current_kmers, sketch, fill_current_kmers, chopper_config.k);
}
// We don't need to keep the current_kmers if there are no shared k-mers to merge them with.
bool const fill_current_kmers = is_merged && !(shared_kmers_initialised && shared_kmers.empty());

// Compute set intersection: shared_kmers = shared_kmers ∩ current_kmers
// This happens for each user bin that belongs to a merged bin.
if (fill_current_kmers)
{
if (!shared_kmers_initialised)
for (auto const & filename : filenames[user_bin.idx])
{
shared_kmers_initialised = true;
shared_kmers.insert(current_kmers.begin(), current_kmers.end());
process_file(filename, current_kmer_set, current_kmers, sketch, fill_current_kmers, chopper_config.k);
}
else

// Compute set intersection: shared_kmers = shared_kmers ∩ current_kmers
// This happens for each user bin that belongs to a merged bin.
if (fill_current_kmers)
{
keep_duplicates(shared_kmers, current_kmers);
if (!shared_kmers_initialised)
{
shared_kmers_initialised = true;
shared_kmers.insert(current_kmers.begin(), current_kmers.end());
}
else
{
keep_duplicates(shared_kmers, current_kmers);
}
}

progress.report();
}

// Into how many techincal bins is the user bin split? Always 1 for merged bins.
size_t split_count{is_merged ? 1u : hibf_layout.user_bins[chunk[0]].number_of_technical_bins};
size_t const avg_kmer_count = (current_kmer_set.size() + split_count - 1u) / split_count;
size_t const sketch_estimate = (sketch.estimate() + split_count - 1u) / split_count;

records[tb_index] = record{.tb_index = tb_index,
.exact_size = avg_kmer_count,
.estimated_size = sketch_estimate,
.shared_size = shared_kmers.size(),
.ub_count = ub_count,
.kind = (is_merged ? "merged" : "split"),
.splits = split_count};
}

// Print results for the last top-level technical bin.
print_result_line();
process_and_write_records_to(records, output_stream);

// The progress bar uses a carriage return '\r' to only use a single line.
std::cerr << '\n';
Expand Down

0 comments on commit d70d1dc

Please sign in to comment.