Skip to content

Commit

Permalink
r58
Browse files Browse the repository at this point in the history
Containment multithread and binary search needed more tests,
reverting these back to r53 for now.
Will fix soon.
  • Loading branch information
xfengnefx committed Mar 10, 2022
1 parent 9ebb3a4 commit 2c1787f
Showing 1 changed file with 26 additions and 74 deletions.
100 changes: 26 additions & 74 deletions Overlaps.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3101,10 +3101,7 @@ typedef struct {
long long n_read;
uint64_t *readLen;
ma_sub_t *coverage_cut;
int *counter; // per thread counter
uint32_t **buf_u32_q; // per thread linear buffers
uint32_t **buf_u32_t;
int* buf_u32_n; int* buf_u32_m;
int counter; // NOTE this might RACE during update, but it's an output and not used elsewhere
}hitcontain_aux_t;

static void hamt_hit_contained_drop_singleton_worker(void *data, long i_r, int tid){ // callback for kt_for()
Expand Down Expand Up @@ -3425,15 +3422,15 @@ static void hamt_hit_contained_drop_singleton_worker_v2(void *data, long i_r, in
__func__, (int)Get_NAME_LENGTH(R_INF, w), Get_NAME(R_INF, w));
}
need_to_protect = 0;
// note: do not go to finish yet, we still need to check other neighbors and see if any of them ask for protection.
}else{
if (verbose>1){
fprintf(stderr, "[debug::%s] ~ neighbor %.*s hinted more haps\n",
__func__, (int)Get_NAME_LENGTH(R_INF, w), Get_NAME(R_INF, w));
}
need_to_protect = 1;
goto finish;
}

if (verbose>1){
fprintf(stderr, "[debug::%s] ~ neighbor %.*s hinted more haps\n",
__func__, (int)Get_NAME_LENGTH(R_INF, w), Get_NAME(R_INF, w));
}
need_to_protect = 1;
goto finish;
}else{
if (verbose>1){
fprintf(stderr, "[debug::%s] ~ neighbor %.*s ok\n",
Expand All @@ -3457,17 +3454,10 @@ static void hamt_hit_contained_drop_singleton_worker_v2(void *data, long i_r, in
(int)Get_NAME_LENGTH(R_INF, hits[i]->tn), Get_NAME(R_INF, hits[i]->tn));
}
}
d->counter[tid]++;
for (i=0; i<buf_len; i++){
// delete_single_edge_both_dir(sources, coverage_cut, (uint32_t)i_r, hits[i]->tn);
if (d->buf_u32_n[tid]>=d->buf_u32_m[tid]){
d->buf_u32_m[tid] = d->buf_u32_m[tid] + (d->buf_u32_m[tid]>>1);
d->buf_u32_q[tid] = (uint32_t*)realloc(d->buf_u32_q[tid], sizeof(uint32_t)*d->buf_u32_m[tid]);
d->buf_u32_t[tid] = (uint32_t*)realloc(d->buf_u32_t[tid], sizeof(uint32_t)*d->buf_u32_m[tid]);
}
d->buf_u32_q[tid][d->buf_u32_n[tid]] = (uint32_t)i_r;
d->buf_u32_t[tid][d->buf_u32_n[tid]] = (uint32_t)(hits[i]->tn);
d->buf_u32_n[tid]++;
d->counter++;
for (i=0; i<buf_len; i++){
hits[i]->del = 1;
delete_single_edge_both_dir(sources, coverage_cut, (uint32_t)i_r, hits[i]->tn);

}
}else if (need_to_protect==0){
Expand Down Expand Up @@ -3677,7 +3667,7 @@ static void hamt_hit_contained_worker(void *data, long i_r, int tid){ // callba
sprintf(tmp_msg, "[debug::%s] (there were %d contained reads)\n", __func__, (int)IDs_contained.n);
hamt_dbgmsg_append(&msg, tmp_msg, strlen(tmp_msg));
}
d->counter[tid]++;
d->counter++;

finish:
kv_destroy(buf);
Expand Down Expand Up @@ -3713,72 +3703,34 @@ void hamt_hit_contained_multi(ma_hit_t_alloc* sources, ma_hit_t_alloc* reverse_s
aux.n_read = n_read;
aux.readLen = readLen;
aux.coverage_cut = coverage_cut;
aux.counter = (int*)calloc(asm_opt.thread_num, sizeof(int));
aux.counter = 0;

kt_for(asm_opt.thread_num, hamt_hit_contained_worker, &aux, (long)n_read);
int total = 0;
for (int i=0; i<asm_opt.thread_num; i++){
total+=aux.counter[i];
}
free(aux.counter);
// kt_for(1, hamt_hit_contained_worker, &aux, (long)n_read);

if (VERBOSE){
fprintf(stderr, "[M::%s] done, takes %0.2f s, treated %d spots; \n\n", __func__, Get_T()-startTime, total);
fflush(stderr);
fprintf(stderr, "[M::%s] done, takes %0.2f s, treated roughly %d spots; \n\n", __func__, Get_T()-startTime, aux.counter);
}
}

void hamt_hit_contained_drop_singleton_multi(ma_hit_t_alloc* sources, ma_hit_t_alloc* reverse_sources, long long n_read, uint64_t *readLen,
ma_sub_t *coverage_cut){
double startTime = Get_T();

int init_buffer_size = 128;
hitcontain_aux_t aux;
aux.sources = sources;
aux.reverse_sources = reverse_sources;
aux.n_read = n_read;
aux.readLen = readLen;
aux.coverage_cut = coverage_cut;
aux.counter = (int*)calloc(asm_opt.thread_num, sizeof(int));
// init per thread linear buffers
aux.buf_u32_n = (int*)calloc(asm_opt.thread_num, sizeof(int));
aux.buf_u32_m = (int*)calloc(asm_opt.thread_num, sizeof(int));
aux.buf_u32_q = (uint32_t**)calloc(asm_opt.thread_num, sizeof(uint32_t*));
aux.buf_u32_t = (uint32_t**)calloc(asm_opt.thread_num, sizeof(uint32_t*));
for (int i=0; i<asm_opt.thread_num; i++){
aux.buf_u32_q[i] = (uint32_t*)calloc(init_buffer_size, sizeof(uint32_t));
aux.buf_u32_t[i] = (uint32_t*)calloc(init_buffer_size, sizeof(uint32_t));
aux.buf_u32_m[i] = init_buffer_size;
}

// collect
kt_for(asm_opt.thread_num, hamt_hit_contained_drop_singleton_worker_v2, &aux, (long)n_read); // kt_for(asm_opt.thread_num, hamt_hit_contained_drop_singleton_worker, &aux, (long)n_read);
// delete
int total_del = 0;
for (int tid=0; tid<asm_opt.thread_num; tid++){
for (int j=0; j<aux.buf_u32_n[tid]; j++){
delete_single_edge_both_dir(sources, coverage_cut, aux.buf_u32_q[tid][j], aux.buf_u32_t[tid][j]);
total_del++;
}
}

int total_incident = 0;
for (int i=0; i<asm_opt.thread_num; i++){
total_incident+=aux.counter[i];
}
free(aux.counter);
for (int i=0; i<asm_opt.thread_num; i++){
free(aux.buf_u32_q[i]);
free(aux.buf_u32_t[i]);
}
free(aux.buf_u32_n);
free(aux.buf_u32_m);
free(aux.buf_u32_q);
free(aux.buf_u32_t);

aux.counter = 0;

// kt_for(asm_opt.thread_num, hamt_hit_contained_drop_singleton_worker, &aux, (long)n_read);
kt_for(asm_opt.thread_num, hamt_hit_contained_drop_singleton_worker_v2, &aux, (long)n_read);
// kt_for(1, hamt_hit_contained_drop_singleton_worker_v2, &aux, (long)n_read);

if (VERBOSE){
fprintf(stderr, "[M::%s] done, takes %0.2f s, treated %d spots (%d handles); \n\n", __func__, Get_T()-startTime, total_del, total_incident);
fflush(stderr);
fprintf(stderr, "[M::%s] done, takes %0.2f s, treated roughly %d spots; \n\n", __func__, Get_T()-startTime, aux.counter);
}

}
Expand Down Expand Up @@ -30253,7 +30205,7 @@ ma_sub_t **coverage_cut_ptr, int debug_g)

renew_graph_init(sources, reverse_sources, sg, coverage_cut, ruIndex, n_read);

asm_opt.get_specific_overlap_is_use_bf = 1; sort_paf_buffers_by_targetID(sources, n_read); sort_paf_buffers_by_targetID(reverse_sources, n_read);
asm_opt.get_specific_overlap_is_use_bf = 0; /////sort_paf_buffers_by_targetID(sources, n_read); sort_paf_buffers_by_targetID(reverse_sources, n_read);

// normalize_ma_hit_t_single_side_advance(sources, n_read);
// normalize_ma_hit_t_single_side_advance(reverse_sources, n_read);
Expand All @@ -30269,7 +30221,7 @@ ma_sub_t **coverage_cut_ptr, int debug_g)

ma_hit_sub(min_dp, sources, n_read, readLen, mini_overlap_length, &coverage_cut);

asm_opt.get_specific_overlap_is_use_bf = 1; sort_paf_buffers_by_targetID(sources, n_read); sort_paf_buffers_by_targetID(reverse_sources, n_read);
asm_opt.get_specific_overlap_is_use_bf = 0; //////sort_paf_buffers_by_targetID(sources, n_read); sort_paf_buffers_by_targetID(reverse_sources, n_read);
detect_chimeric_reads_conservative(sources, n_read, readLen, coverage_cut, asm_opt.max_ov_diff_final * 2.0); // detect_chimeric_reads(sources, n_read, readLen, coverage_cut, asm_opt.max_ov_diff_final * 2.0);
ma_hit_cut(sources, n_read, readLen, mini_overlap_length, &coverage_cut);
ma_hit_flt(sources, n_read, coverage_cut, max_hang_length, mini_overlap_length);
Expand All @@ -30278,7 +30230,7 @@ ma_sub_t **coverage_cut_ptr, int debug_g)
hamt_hit_contained_multi(sources, reverse_sources, n_read, readLen, coverage_cut);
hamt_hit_contained_drop_singleton_multi(sources, reverse_sources, n_read, readLen, coverage_cut);

asm_opt.get_specific_overlap_is_use_bf = 1; sort_paf_buffers_by_targetID(sources, n_read); sort_paf_buffers_by_targetID(reverse_sources, n_read);
asm_opt.get_specific_overlap_is_use_bf = 0; //////sort_paf_buffers_by_targetID(sources, n_read); sort_paf_buffers_by_targetID(reverse_sources, n_read);
ma_hit_contained_advance(sources, n_read, coverage_cut, ruIndex, max_hang_length, mini_overlap_length); // hamt_threaded_ma_hit_contained_advance(sources, n_read, coverage_cut, ruIndex); // TODO
asm_opt.get_specific_overlap_is_use_bf = 0;

Expand Down

0 comments on commit 2c1787f

Please sign in to comment.