Skip to content

Commit

Permalink
multithread kmer extraction from ColoredCDBG
Browse files Browse the repository at this point in the history
  • Loading branch information
Yenaled committed May 10, 2023
1 parent 93a1583 commit d0a2771
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 60 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ ext/bifrost/src/bifrost-stamp/
ext/zlib-ng/src/
ext/zlib-ng/build/
ext/zlib-ng/tmp/
src/.Rhistory
148 changes: 88 additions & 60 deletions src/KmerIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,70 +407,98 @@ void KmerIndex::BuildDistinguishingGraph(const ProgramOptions& opt, std::ofstrea
std::cerr << "[build] Extracting k-mers from graph" << std::endl;
std::string tmp_file2 = opt.distinguishFile.empty() ? generate_tmp_file("--" + opt.index) : opt.distinguishFile;
std::ofstream of(tmp_file2); // Write color contigs into another (possibly temporary) file
size_t max_threads_read = opt.threads;
std::vector<std::vector<std::pair<const UnitigColors*, const UnitigMap<DataAccessor<void>, DataStorage<void>, false>* > > > unitigs_v(max_threads_read);
size_t n = 0;
const size_t thresh_size = 50000; // Max number of unitigs across all threads
std::mutex mutex_unitigs; // Lock for multithreading writing output FASTA file
std::vector<std::thread> workers; // Worker threads
for (const auto& unitig : ccdbg) {
const UnitigColors* uc = unitig.getData()->getUnitigColors(unitig);
UnitigColors::const_iterator it_uc = uc->begin(unitig);
UnitigColors::const_iterator it_uc_end = uc->end();
std::map<int, std::set<int>> k_map;
for (; it_uc != it_uc_end; ++it_uc) {
int color = it_uc.getColorID();
k_map[color].insert(it_uc.getKmerPosition());
// DEBUG:
// std::cout << color << " " << unitig.getUnitigKmer(it_uc.getKmerPosition()).rep().toString() << " " << unitig.getUnitigKmer(it_uc.getKmerPosition()).toString() << " " << it_uc.getKmerPosition() << " " << unitig.strand << std::endl;
}
std::set<int> positions_to_remove;
if (!opt.distinguish_all_but_one_color && !opt.distinguish_union) {
int i_ = 0;
for (const auto& k_elem : k_map) {
int j_ = 0;
for (const auto& k_elem2 : k_map) {
if (j_ > i_ && k_elem.first != k_elem2.first) {
std::set<int> intersect;
std::set<int> set_result;
std::set_intersection(k_elem.second.begin(), k_elem.second.end(), k_elem2.second.begin(), k_elem2.second.end(), std::inserter(intersect, intersect.begin())); //if (k_elem2.second.count(k_elem1.second)) // check if set intersection with k_elem2
std::set_union(positions_to_remove.begin(), positions_to_remove.end(), intersect.begin(), intersect.end(), std::inserter(set_result,set_result.begin()));
positions_to_remove = std::move(set_result);
}
j_++;
}
i_++;
}
} else if (!opt.distinguish_union) {
int i_ = 0;
if (k_map.size() == tmp_files.size()) {
for (const auto& k_elem : k_map) {
i_++;
if (positions_to_remove.size() == 0) {
positions_to_remove = k_elem.second;
} else {
std::set<int> set_result;
std::set_intersection(positions_to_remove.begin(), positions_to_remove.end(), k_elem.second.begin(), k_elem.second.end(), std::inserter(set_result,set_result.begin()));
positions_to_remove = std::move(set_result);
}
}
}
}

for (const auto& k_elem : k_map) {
int curr_pos = -1;
std::string colored_contig = "";
auto color = k_elem.first;
for (const auto &pos : k_elem.second) {
if (!positions_to_remove.count(pos)) {
std::string km = unitig.getUnitigKmer(pos).toString();
if (curr_pos == -1) { // How to correspond color?
colored_contig = km;
} else if (pos == curr_pos+1) {
colored_contig += km[km.length()-1];
} else {
of << ">" << std::to_string(color) << "\n" << colored_contig << "\n";
colored_contig = km;
const UnitigMap<DataAccessor<void>, DataStorage<void>, false>* unitig_ = &unitig;
unitigs_v[n % unitigs_v.size()].push_back(std::make_pair(uc, unitig_));
n++;
if (unitigs_v[unitigs_v.size()-1].size() >= thresh_size || n >= ccdbg.size()) {
for (size_t u_i = 0; u_i < unitigs_v.size(); u_i++) {
workers.emplace_back(
[&, u_i] {
std::ostringstream oss;
for (auto unitig_x : unitigs_v[u_i]) {
auto uc = unitig_x.first;
auto& unitig = *(unitig_x.second);
UnitigColors::const_iterator it_uc = uc->begin(unitig);
UnitigColors::const_iterator it_uc_end = uc->end();
std::map<int, std::set<int>> k_map;
for (; it_uc != it_uc_end; ++it_uc) {
int color = it_uc.getColorID();
k_map[color].insert(it_uc.getKmerPosition());
// DEBUG:
// std::cout << color << " " << unitig.getUnitigKmer(it_uc.getKmerPosition()).rep().toString() << " " << unitig.getUnitigKmer(it_uc.getKmerPosition()).toString() << " " << it_uc.getKmerPosition() << " " << unitig.strand << std::endl;
}
std::set<int> positions_to_remove;
if (!opt.distinguish_all_but_one_color && !opt.distinguish_union) {
int i_ = 0;
for (const auto& k_elem : k_map) {
int j_ = 0;
for (const auto& k_elem2 : k_map) {
if (j_ > i_ && k_elem.first != k_elem2.first) {
std::set<int> intersect;
std::set<int> set_result;
std::set_intersection(k_elem.second.begin(), k_elem.second.end(), k_elem2.second.begin(), k_elem2.second.end(), std::inserter(intersect, intersect.begin())); //if (k_elem2.second.count(k_elem1.second)) // check if set intersection with k_elem2
std::set_union(positions_to_remove.begin(), positions_to_remove.end(), intersect.begin(), intersect.end(), std::inserter(set_result,set_result.begin()));
positions_to_remove = std::move(set_result);
}
j_++;
}
i_++;
}
} else if (!opt.distinguish_union) {
int i_ = 0;
if (k_map.size() == tmp_files.size()) {
for (const auto& k_elem : k_map) {
i_++;
if (positions_to_remove.size() == 0) {
positions_to_remove = k_elem.second;
} else {
std::set<int> set_result;
std::set_intersection(positions_to_remove.begin(), positions_to_remove.end(), k_elem.second.begin(), k_elem.second.end(), std::inserter(set_result,set_result.begin()));
positions_to_remove = std::move(set_result);
}
}
}
}
for (const auto& k_elem : k_map) {
int curr_pos = -1;
std::string colored_contig = "";
auto color = k_elem.first;
for (const auto &pos : k_elem.second) {
if (!positions_to_remove.count(pos)) {
std::string km = unitig.getUnitigKmer(pos).toString();
if (curr_pos == -1) { // How to correspond color?
colored_contig = km;
} else if (pos == curr_pos+1) {
colored_contig += km[km.length()-1];
} else {
oss << ">" << std::to_string(color) << "\n" << colored_contig << "\n";
colored_contig = km;
}
curr_pos = pos;
}
}
if (colored_contig != "") {
oss << ">" << std::to_string(color) << "\n" << colored_contig << "\n";
}
}
}
std::unique_lock<std::mutex> lock(mutex_unitigs);
of << oss.str();
}
curr_pos = pos;
}
);
}
if (colored_contig != "") {
of << ">" << std::to_string(color) << "\n" << colored_contig << "\n";
for (auto& t : workers) t.join();
workers.clear();
for (size_t u_i = 0; u_i < unitigs_v.size(); u_i++) {
unitigs_v[u_i].clear();
}
}
}
Expand Down

0 comments on commit d0a2771

Please sign in to comment.