From d0a2771f09e98ebb69889e5f91407bbd62bb068a Mon Sep 17 00:00:00 2001 From: Yenaled Date: Wed, 10 May 2023 02:10:13 -0700 Subject: [PATCH] multithread kmer extraction from ColoredCDBG --- .gitignore | 1 + src/KmerIndex.cpp | 148 +++++++++++++++++++++++++++------------------- 2 files changed, 89 insertions(+), 60 deletions(-) diff --git a/.gitignore b/.gitignore index 86272df7..fac0b9a5 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,4 @@ ext/bifrost/src/bifrost-stamp/ ext/zlib-ng/src/ ext/zlib-ng/build/ ext/zlib-ng/tmp/ +src/.Rhistory diff --git a/src/KmerIndex.cpp b/src/KmerIndex.cpp index 0d5163b4..31c2aa5f 100644 --- a/src/KmerIndex.cpp +++ b/src/KmerIndex.cpp @@ -407,70 +407,98 @@ void KmerIndex::BuildDistinguishingGraph(const ProgramOptions& opt, std::ofstrea std::cerr << "[build] Extracting k-mers from graph" << std::endl; std::string tmp_file2 = opt.distinguishFile.empty() ? generate_tmp_file("--" + opt.index) : opt.distinguishFile; std::ofstream of(tmp_file2); // Write color contigs into another (possibly temporary) file + size_t max_threads_read = opt.threads; + std::vector, DataStorage, false>* > > > unitigs_v(max_threads_read); + size_t n = 0; + const size_t thresh_size = 50000; // Max number of unitigs across all threads + std::mutex mutex_unitigs; // Lock for multithreading writing output FASTA file + std::vector workers; // Worker threads for (const auto& unitig : ccdbg) { const UnitigColors* uc = unitig.getData()->getUnitigColors(unitig); - UnitigColors::const_iterator it_uc = uc->begin(unitig); - UnitigColors::const_iterator it_uc_end = uc->end(); - std::map> k_map; - for (; it_uc != it_uc_end; ++it_uc) { - int color = it_uc.getColorID(); - k_map[color].insert(it_uc.getKmerPosition()); - // DEBUG: - // std::cout << color << " " << unitig.getUnitigKmer(it_uc.getKmerPosition()).rep().toString() << " " << unitig.getUnitigKmer(it_uc.getKmerPosition()).toString() << " " << it_uc.getKmerPosition() << " " << unitig.strand << std::endl; - } - std::set positions_to_remove; - if (!opt.distinguish_all_but_one_color && !opt.distinguish_union) { - int i_ = 0; - for (const auto& k_elem : k_map) { - int j_ = 0; - for (const auto& k_elem2 : k_map) { - if (j_ > i_ && k_elem.first != k_elem2.first) { - std::set intersect; - std::set set_result; - std::set_intersection(k_elem.second.begin(), k_elem.second.end(), k_elem2.second.begin(), k_elem2.second.end(), std::inserter(intersect, intersect.begin())); //if (k_elem2.second.count(k_elem1.second)) // check if set intersection with k_elem2 - std::set_union(positions_to_remove.begin(), positions_to_remove.end(), intersect.begin(), intersect.end(), std::inserter(set_result,set_result.begin())); - positions_to_remove = std::move(set_result); - } - j_++; - } - i_++; - } - } else if (!opt.distinguish_union) { - int i_ = 0; - if (k_map.size() == tmp_files.size()) { - for (const auto& k_elem : k_map) { - i_++; - if (positions_to_remove.size() == 0) { - positions_to_remove = k_elem.second; - } else { - std::set set_result; - std::set_intersection(positions_to_remove.begin(), positions_to_remove.end(), k_elem.second.begin(), k_elem.second.end(), std::inserter(set_result,set_result.begin())); - positions_to_remove = std::move(set_result); - } - } - } - } - - for (const auto& k_elem : k_map) { - int curr_pos = -1; - std::string colored_contig = ""; - auto color = k_elem.first; - for (const auto &pos : k_elem.second) { - if (!positions_to_remove.count(pos)) { - std::string km = unitig.getUnitigKmer(pos).toString(); - if (curr_pos == -1) { // How to correspond color? - colored_contig = km; - } else if (pos == curr_pos+1) { - colored_contig += km[km.length()-1]; - } else { - of << ">" << std::to_string(color) << "\n" << colored_contig << "\n"; - colored_contig = km; + const UnitigMap, DataStorage, false>* unitig_ = &unitig; + unitigs_v[n % unitigs_v.size()].push_back(std::make_pair(uc, unitig_)); + n++; + if (unitigs_v[unitigs_v.size()-1].size() >= thresh_size || n >= ccdbg.size()) { + for (size_t u_i = 0; u_i < unitigs_v.size(); u_i++) { + workers.emplace_back( + [&, u_i] { + std::ostringstream oss; + for (auto unitig_x : unitigs_v[u_i]) { + auto uc = unitig_x.first; + auto& unitig = *(unitig_x.second); + UnitigColors::const_iterator it_uc = uc->begin(unitig); + UnitigColors::const_iterator it_uc_end = uc->end(); + std::map> k_map; + for (; it_uc != it_uc_end; ++it_uc) { + int color = it_uc.getColorID(); + k_map[color].insert(it_uc.getKmerPosition()); + // DEBUG: + // std::cout << color << " " << unitig.getUnitigKmer(it_uc.getKmerPosition()).rep().toString() << " " << unitig.getUnitigKmer(it_uc.getKmerPosition()).toString() << " " << it_uc.getKmerPosition() << " " << unitig.strand << std::endl; + } + std::set positions_to_remove; + if (!opt.distinguish_all_but_one_color && !opt.distinguish_union) { + int i_ = 0; + for (const auto& k_elem : k_map) { + int j_ = 0; + for (const auto& k_elem2 : k_map) { + if (j_ > i_ && k_elem.first != k_elem2.first) { + std::set intersect; + std::set set_result; + std::set_intersection(k_elem.second.begin(), k_elem.second.end(), k_elem2.second.begin(), k_elem2.second.end(), std::inserter(intersect, intersect.begin())); //if (k_elem2.second.count(k_elem1.second)) // check if set intersection with k_elem2 + std::set_union(positions_to_remove.begin(), positions_to_remove.end(), intersect.begin(), intersect.end(), std::inserter(set_result,set_result.begin())); + positions_to_remove = std::move(set_result); + } + j_++; + } + i_++; + } + } else if (!opt.distinguish_union) { + int i_ = 0; + if (k_map.size() == tmp_files.size()) { + for (const auto& k_elem : k_map) { + i_++; + if (positions_to_remove.size() == 0) { + positions_to_remove = k_elem.second; + } else { + std::set set_result; + std::set_intersection(positions_to_remove.begin(), positions_to_remove.end(), k_elem.second.begin(), k_elem.second.end(), std::inserter(set_result,set_result.begin())); + positions_to_remove = std::move(set_result); + } + } + } + } + for (const auto& k_elem : k_map) { + int curr_pos = -1; + std::string colored_contig = ""; + auto color = k_elem.first; + for (const auto &pos : k_elem.second) { + if (!positions_to_remove.count(pos)) { + std::string km = unitig.getUnitigKmer(pos).toString(); + if (curr_pos == -1) { // How to correspond color? + colored_contig = km; + } else if (pos == curr_pos+1) { + colored_contig += km[km.length()-1]; + } else { + oss << ">" << std::to_string(color) << "\n" << colored_contig << "\n"; + colored_contig = km; + } + curr_pos = pos; + } + } + if (colored_contig != "") { + oss << ">" << std::to_string(color) << "\n" << colored_contig << "\n"; + } + } + } + std::unique_lock lock(mutex_unitigs); + of << oss.str(); } - curr_pos = pos; - } + ); } - if (colored_contig != "") { - of << ">" << std::to_string(color) << "\n" << colored_contig << "\n"; + for (auto& t : workers) t.join(); + workers.clear(); + for (size_t u_i = 0; u_i < unitigs_v.size(); u_i++) { + unitigs_v[u_i].clear(); } } }