Skip to content

Commit

Permalink
Merge pull request #266 from waveygang/boundary-fx
Browse files Browse the repository at this point in the history
axis-weighted chaining
ekg authored Aug 26, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents c95b6a3 + d2069e7 commit 064fafa
Showing 7 changed files with 262 additions and 455 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -23,9 +23,6 @@ wfmash
src/common/wflign/build
build

# include directory
include

#Others
*.cache
*~
1 change: 1 addition & 0 deletions src/align/include/align_parameters.hpp
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@ struct Parameters {
bool emit_md_tag; //Output the MD tag
bool sam_format; //Emit the output in SAM format (PAF default)
bool no_seq_in_sam; //Do not fill the SEQ field in SAM format
bool multithread_fasta_input; //Multithreaded fasta input

#ifdef WFA_PNG_TSV_TIMING
// plotting
18 changes: 13 additions & 5 deletions src/align/include/computeAlignments.hpp
Original file line number Diff line number Diff line change
@@ -370,7 +370,7 @@ void processor_manager(seq_atomic_queue_t& seq_queue,
std::vector<std::atomic<bool>> thread_should_exit(max_processors);

const size_t queue_capacity = seq_queue.capacity();
const size_t low_threshold = queue_capacity * 0.2;
const size_t low_threshold = 1;
const size_t high_threshold = queue_capacity * 0.8;

auto spawn_processor = [&](size_t id) {
@@ -383,14 +383,22 @@ void processor_manager(seq_atomic_queue_t& seq_queue,
// Start with one processor
spawn_processor(0);
size_t current_processors = 1;
uint64_t exhausted = 0;

while (!reader_done.load() || !line_queue.was_empty() || !seq_queue.was_empty()) {
size_t queue_size = seq_queue.was_size();

if (queue_size < low_threshold && current_processors < max_processors) {
spawn_processor(current_processors++);
} else if (queue_size > high_threshold && current_processors > 1) {
thread_should_exit[--current_processors].store(true);
if (param.multithread_fasta_input) {
if (queue_size < low_threshold && current_processors < max_processors) {
++exhausted;
} else if (queue_size > high_threshold && current_processors > 1) {
thread_should_exit[--current_processors].store(true);
}

if (exhausted > 20 && queue_size < low_threshold) {
spawn_processor(current_processors++);
exhausted = 0;
}
}

std::this_thread::sleep_for(std::chrono::milliseconds(100));
628 changes: 200 additions & 428 deletions src/common/wflign/src/wflign_patch.cpp

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/common/wflign/src/wflign_patch.hpp
Original file line number Diff line number Diff line change
@@ -59,7 +59,8 @@ namespace wflign {
alignment_t& rev_aln,
const int64_t& chain_gap,
const int& max_patching_score,
const uint64_t& min_inversion_length);
const uint64_t& min_inversion_length,
bool ends_free);
void trim_alignment(alignment_t& aln);
std::vector<alignment_t> do_progressive_wfa_patch_alignment(
const char* query,
8 changes: 6 additions & 2 deletions src/interface/parse_args.hpp
Original file line number Diff line number Diff line change
@@ -86,7 +86,7 @@ void parse_args(int argc,
args::Flag approx_mapping(mapping_opts, "approx-map", "skip base-level alignment, producing an approximate mapping in PAF", {'m',"approx-map"});
args::Flag no_split(mapping_opts, "no-split", "disable splitting of input sequences during mapping [default: enabled]", {'N',"no-split"});
args::ValueFlag<std::string> chain_gap(mapping_opts, "N", "chain mappings closer than this distance in query and target, sets approximate maximum variant length detectable in alignment [default: 6*segment_length, up to 30k]", {'c', "chain-gap"});
args::ValueFlag<std::string> max_mapping_length(mapping_opts, "N", "maximum length of a single mapping before breaking [default: 1M]", {'P', "max-mapping-length"});
args::ValueFlag<std::string> max_mapping_length(mapping_opts, "N", "maximum length of a single mapping before breaking [default: inf]", {'P', "max-mapping-length"});
args::Flag drop_low_map_pct_identity(mapping_opts, "K", "drop mappings with estimated identity below --map-pct-id=%", {'K', "drop-low-map-id"});
args::ValueFlag<double> overlap_threshold(mapping_opts, "F", "drop mappings overlapping more than fraction F with a higher scoring mapping [default: 0.5]", {'O', "overlap-threshold"});
args::Flag no_filter(mapping_opts, "MODE", "disable mapping filtering", {'f', "no-filter"});
@@ -439,7 +439,7 @@ void parse_args(int argc,
}
map_parameters.max_mapping_length = l;
} else {
map_parameters.max_mapping_length = 1000000; // 1 Mbp default
map_parameters.max_mapping_length = std::numeric_limits<int64_t>::max();
}

if (drop_low_map_pct_identity) {
@@ -585,6 +585,10 @@ void parse_args(int argc,
map_parameters.threads = 1;
align_parameters.threads = 1;
}
// disable multi-fasta processing due to the memory inefficiency of samtools faidx readers
// which require us to duplicate the in-memory indexes of large files for each thread
// if aligner exhaustion is a problem, we could enable this
align_parameters.multithread_fasta_input = false;

// Compute optimal window size for sketching
{
56 changes: 40 additions & 16 deletions src/map/include/computeMap.hpp
Original file line number Diff line number Diff line change
@@ -726,6 +726,9 @@ namespace skch

// remove short chains that didn't exceed block length
filterWeakMappings(unfilteredMappings, std::floor(param.block_length / param.segLength));

// now that filtering has happened, set back the individual mapping coordinates and block length
setBlockCoordsToMappingCoords(unfilteredMappings);
} else {
// set block coordinates
setBlockCoordsToMappingCoords(unfilteredMappings);
@@ -1558,8 +1561,8 @@ namespace skch
int query_gap = curr.queryStartPos - prev.queryEndPos;
int ref_gap = curr.refStartPos - prev.refEndPos;

// Check if both gaps are within the threshold
if (std::abs(query_gap) <= threshold && std::abs(ref_gap) <= threshold) {
// Check if both gaps are >0 and within the threshold
if (query_gap > 0 && ref_gap > 0 && query_gap <= threshold && ref_gap <= threshold) {
// Calculate midpoints
int query_mid = (prev.queryEndPos + curr.queryStartPos) / 2;
int ref_mid = (prev.refEndPos + curr.refStartPos) / 2;
@@ -1582,6 +1585,12 @@ namespace skch
}
}
}

double axis_weighted_euclidean_distance(int64_t dx, int64_t dy, double w = 0.5) {
double euclidean = std::sqrt(dx*dx + dy*dy);
double axis_factor = 1.0 - (2.0 * std::min(std::abs(dx), std::abs(dy))) / (std::abs(dx) + std::abs(dy));
return euclidean * (1.0 + w * axis_factor);
}

/**
* @brief Merge fragment mappings by convolution of a 2D range over the alignment matrix
@@ -1616,7 +1625,7 @@ namespace skch

//Start the procedure to identify the chains
for (auto it = readMappings.begin(); it != readMappings.end(); it++) {
std::vector<std::pair<double, uint64_t>> distances;
std::vector<std::tuple<double, double, int64_t>> distances;
for (auto it2 = std::next(it); it2 != readMappings.end(); it2++) {
//If this mapping is for the same segment, ignore
if (it2->refSeqId == it->refSeqId && it2->queryStartPos == it->queryStartPos) {
@@ -1628,29 +1637,27 @@ namespace skch
}
//If the next mapping is within range, check if it's in range and
if (it2->strand == it->strand) {
int ref_dist = it2->refStartPos - it->refEndPos;
int query_dist = 0;
int64_t ref_dist = it2->refStartPos - it->refEndPos;
int64_t query_dist = std::numeric_limits<int64_t>::max();
auto dist = std::numeric_limits<double>::max();
auto score = std::numeric_limits<double>::max();
auto awed = std::numeric_limits<double>::max();
if (it->strand == strnd::FWD && it->queryStartPos <= it2->queryStartPos) {
query_dist = it2->queryStartPos - it->queryEndPos;
dist = std::sqrt(std::pow(query_dist,2) + std::pow(ref_dist,2));
score = std::pow(query_dist - ref_dist, 2);
awed = axis_weighted_euclidean_distance(query_dist, ref_dist, 0.9);
} else if (it->strand != strnd::FWD && it->queryEndPos >= it2->queryEndPos) {
query_dist = it->queryStartPos - it2->queryEndPos;
dist = std::sqrt(std::pow(query_dist,2) + std::pow(ref_dist,2));
score = std::pow(query_dist - ref_dist, 2);
awed = axis_weighted_euclidean_distance(query_dist, ref_dist, 0.9);
}
int query_mapping_len = std::min((it->queryEndPos - it->queryStartPos),
(it2->queryEndPos - it2->queryStartPos));
if (dist < max_dist) {
distances.push_back(std::make_pair(dist + score, it2->splitMappingId));
if (awed < max_dist) {
distances.push_back(std::make_tuple(awed, dist, it2->splitMappingId));
}
}
}
if (distances.size()) {
std::sort(distances.begin(), distances.end());
disjoint_sets.unite(it->splitMappingId, distances.front().second);
disjoint_sets.unite(it->splitMappingId, std::get<2>(distances.front()));
}
}

@@ -1680,8 +1687,18 @@ namespace skch
return std::tie(a.queryStartPos, a.refStartPos) < std::tie(b.queryStartPos, b.refStartPos);
});

// if we have an infinite max mappinng length, we should just emit the chain here
if (param.max_mapping_length == std::numeric_limits<int64_t>::max()) {
// Process any remaining fragment
processMappingFragment(it, it_end);
it = it_end;
continue;
}

// tweak start and end positions of consecutive mappings
adjustConsecutiveMappings(it, it_end, param.segLength);
// TODO: XXX double check that the consecutive mappings are not overlapping!!!
// extra: force global alignment by patching head and tail to mapping start and end coordinates
//adjustConsecutiveMappings(it, it_end, param.segLength);

// First pass: Mark cuttable positions
const int consecutive_mappings_window = 4; // Configurable parameter
@@ -1699,13 +1716,15 @@ namespace skch
if (current == it || current == std::prev(it_end)) {
continue;
}
if (current->queryStartPos - std::prev(current)->queryEndPos > param.segLength
|| current->refStartPos - std::prev(current)->refEndPos > param.segLength) {
if (current->queryStartPos - std::prev(current)->queryEndPos > param.segLength / 5
|| current->refStartPos - std::prev(current)->refEndPos > param.segLength / 5) {
is_cuttable[std::distance(it, current) - 1] = false;
is_cuttable[std::distance(it, current)] = false;
}
}

adjustConsecutiveMappings(it, it_end, param.segLength);

auto fragment_start = it;
auto current = it;
offset_t accumulate_length = 0;
@@ -1716,6 +1735,9 @@ namespace skch
accumulate_length += current->queryEndPos - current->queryStartPos;
if (accumulate_length >= param.max_mapping_length
&& is_cuttable[std::distance(it, current)]) {
if (current != fragment_start) {
adjustConsecutiveMappings(std::prev(current), current, param.chain_gap);
}
processMappingFragment(fragment_start, current);
fragment_start = current;
accumulate_length = 0;
@@ -1853,6 +1875,8 @@ namespace skch
if (!param.mergeMappings)
{
outstrm << sep << "jc:f:" << float(e.conservedSketches) / e.sketchSize;
} else {
outstrm << sep << "chain:i:" << e.splitMappingId;
}
} else
{

0 comments on commit 064fafa

Please sign in to comment.