From 61570f0c5d68212c60c5518f76005462abe3d444 Mon Sep 17 00:00:00 2001 From: Louis Aslett Date: Mon, 23 Sep 2024 16:51:56 +0100 Subject: [PATCH 01/43] Start work on v2 --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index c71448f..3a696fd 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: kalis Type: Package Title: High Performance Li & Stephens Local Ancestry Inference -Version: 1.0.0 +Version: 2.0.0 Authors@R: c(person("Louis", "Aslett", role = c("aut", "cre"), email = "louis.aslett@durham.ac.uk"), person("Ryan", "Christ", role = "aut", From 55c3672d33b8a066ec3cd74dd14a4a23e5591f5d Mon Sep 17 00:00:00 2001 From: ryanchrist Date: Mon, 23 Sep 2024 11:43:39 -0500 Subject: [PATCH 02/43] Introducing O(n) clustering algorithm (blobby) and C-core for clade calling and clade matrix construction --- src/R_CladeMat.c | 834 +++++++++++++++++++++++++++++++++++++++++++++++ src/R_CladeMat.h | 20 ++ 2 files changed, 854 insertions(+) create mode 100644 src/R_CladeMat.c create mode 100644 src/R_CladeMat.h diff --git a/src/R_CladeMat.c b/src/R_CladeMat.c new file mode 100644 index 0000000..6c26f30 --- /dev/null +++ b/src/R_CladeMat.c @@ -0,0 +1,834 @@ +#include "R_CladeMat.h" + +#define _GNU_SOURCE +#include +#include "R_Kalis.h" +#include "Cache.h" + +#define min(X, Y) ((X) < (Y) ? (X) : (Y)) + +typedef struct blob { + struct blob* blob; + size_t num_in_blob; + double lower; + double upper; + struct blob* next; + struct blob* prev; + struct blob* morepop; + struct blob* lesspop; + double c3; +} blob; + + +void printblob(blob* bobtheblob) { + Rprintf("Blob %p (%p): # = %d, range = (%lf, %lf), prv = %p, nxt = %p, lp = %p, mp = %p \n", + bobtheblob, + (bobtheblob==bobtheblob->blob?NULL:bobtheblob->blob), + bobtheblob->num_in_blob, + bobtheblob->lower, + bobtheblob->upper, + bobtheblob->prev, + bobtheblob->next, + bobtheblob->lesspop, + bobtheblob->morepop); +} + +void blobby_pop_contest(blob* cur, blob** headpop, blob** tailpop) { + if(cur->morepop != NULL && cur->morepop->num_in_blob < cur->num_in_blob) { + blob *other; + other = cur->morepop; + + if(other->morepop != NULL) { + other->morepop->lesspop = cur; + } + cur->morepop = other->morepop; + other->morepop = cur; + if(cur->morepop == NULL) { + *headpop = cur; + } + + if(cur->lesspop != NULL) { + cur->lesspop->morepop = other; + } + other->lesspop = cur->lesspop; + cur->lesspop = other; + if(other->lesspop == NULL) { + *tailpop = other; + } + + blobby_pop_contest(cur, headpop, tailpop); + } else if(cur->lesspop != NULL && cur->lesspop->num_in_blob > cur->num_in_blob) { + blob *other; + other = cur->lesspop; + + if(cur->morepop != NULL) { + cur->morepop->lesspop = other; + } + other->morepop = cur->morepop; + cur->morepop = other; + if(other->morepop == NULL) { + *headpop = other; + } + + if(other->lesspop != NULL) { + other->lesspop->morepop = cur; + } + cur->lesspop = other->lesspop; + other->lesspop = cur; + if(cur->lesspop == NULL) { + *tailpop = cur; + } + + blobby_pop_contest(cur, headpop, tailpop); + } +} + + +blob* hunttheblob(blob* blob) { + if(blob->blob == blob) + return(blob); + return(hunttheblob(blob->blob)); +} + +double alphabetascaling(double x, double z0) { + x = x * z0; + if(x == 0.0) { + return(744.4400719213812180897); + } else { + return(-log(x)); + } +} + +blob* blobby_BB(const double* alpha, const double* beta, const size_t recipient, size_t n, blob* blobs, blob*** x_in_blob, double* n_clade, const double thres, const double maxd, const double unitdist, const int max1var) { + double z0 = 0.0; + for(size_t i = 0; inext; + // } + // Rprintf("Adding obs: %d = %lf\n", i, x); + + //Rprintf("Adding obs %lf\n", x); + + while(cur != NULL) { + + + // this observation is between blobs + if(cur->prev != NULL && x < cur->lower && x >= cur->prev->upper) { // do we want to check to the right here or just leave to that pointer iteration? + blobs[next_new_blob].blob = blobs + next_new_blob; + blobs[next_new_blob].num_in_blob = 1; + blobs[next_new_blob].lower = x - thres; + blobs[next_new_blob].upper = x + thres; + blobs[next_new_blob].next = cur; + blobs[next_new_blob].prev = cur->prev; + blobs[next_new_blob].morepop = tailpop; + blobs[next_new_blob].lesspop = NULL; + cur->prev->next = blobs + next_new_blob; + cur->prev = blobs + next_new_blob; + tailpop->lesspop = blobs + next_new_blob; + tailpop = blobs + next_new_blob; + x_in_blob[i] = &(blobs[next_new_blob].blob); + next_new_blob++; + a_ptr++; + b_ptr++; + break; + } + + // this observation is between and causes blobs to merge (to the right) + if(cur->next != NULL && x >= cur->next->lower && x < cur->upper) { + cur->next->blob = cur->blob; + cur->num_in_blob += cur->next->num_in_blob + 1; + if(tail == cur->next) { + tail = cur->blob; + } else { + cur->next->next->prev = cur; + } + if(cur->next == headpop) { + headpop = cur->next->lesspop; + } else { + cur->next->morepop->lesspop = cur->next->lesspop; + } + if(cur->next == tailpop) { + tailpop = cur->next->morepop; + } else { + cur->next->lesspop->morepop = cur->next->morepop; + } + cur->upper = cur->next->upper; + cur->next = cur->next->next; + blobby_pop_contest(cur, &headpop, &tailpop); + x_in_blob[i] = &(cur->blob); + a_ptr++; + b_ptr++; + break; + } + + // this observation is between and causes blobs to merge (to the left) + if(cur->prev != NULL && x >= cur->lower && x < cur->prev->upper) { + cur->prev->blob = cur->blob; + cur->num_in_blob += cur->prev->num_in_blob + 1; + if(head == cur->prev) { + head = cur->blob; + } else { + cur->prev->prev->next = cur; + } + if(cur->prev == headpop) { + headpop = cur->prev->lesspop; + } else { + cur->prev->morepop->lesspop = cur->prev->lesspop; + } + if(cur->prev == tailpop) { + tailpop = cur->prev->morepop; + } else { + cur->prev->lesspop->morepop = cur->prev->morepop; + } + cur->lower = cur->prev->lower; + cur->prev = cur->prev->prev; + blobby_pop_contest(cur, &headpop, &tailpop); + x_in_blob[i] = &(cur->blob); + a_ptr++; + b_ptr++; + break; + } + + // this observation is in this blob + if(x >= cur->lower && x < cur->upper) { + (cur->num_in_blob)++; + if(x - thres < cur->lower) { + cur->lower = x - thres; + } else if(x + thres > cur->upper) { + cur->upper = x + thres; + } + blobby_pop_contest(cur, &headpop, &tailpop); + x_in_blob[i] = &(cur->blob); + a_ptr++; + b_ptr++; + break; + } + + cur = cur->lesspop; + } + // this observation is smaller than any previously seen blob + if(cur == NULL && x < head->lower) { + blobs[next_new_blob].blob = blobs + next_new_blob; + blobs[next_new_blob].num_in_blob = 1; + blobs[next_new_blob].lower = x - thres; + blobs[next_new_blob].upper = x + thres; + blobs[next_new_blob].next = head; + blobs[next_new_blob].prev = NULL; + blobs[next_new_blob].morepop = tailpop; + blobs[next_new_blob].lesspop = NULL; + head->prev = blobs + next_new_blob; + head = blobs + next_new_blob; + tailpop->lesspop = blobs + next_new_blob; + tailpop = blobs + next_new_blob; + x_in_blob[i] = &(blobs[next_new_blob].blob); + next_new_blob++; + a_ptr++; + b_ptr++; + continue; + } + // this observation is larger than any previously seen blob + if(cur == NULL && x >= tail->upper) { + // this observation is beyond the end of all blobs + blobs[next_new_blob].blob = blobs + next_new_blob; + blobs[next_new_blob].num_in_blob = 1; + blobs[next_new_blob].lower = x - thres; + blobs[next_new_blob].upper = x + thres; + blobs[next_new_blob].next = NULL; + blobs[next_new_blob].prev = tail; + blobs[next_new_blob].morepop = tailpop; + blobs[next_new_blob].lesspop = NULL; + tail->next = blobs + next_new_blob; + tail = blobs + next_new_blob; + tailpop->lesspop = blobs + next_new_blob; + tailpop = blobs + next_new_blob; + x_in_blob[i] = &(blobs[next_new_blob].blob); + next_new_blob++; + a_ptr++; + b_ptr++; + continue; + } + } + + // Chase down pointers + // DEBUG + // Rprintf("ALL USED BLOBS (pre-parse):\n\n"); + // for(size_t i = 0; iblob == blobs[i].blob) + continue; + + blobs[i].blob = hunttheblob(blobs[i].blob); + } + + // DEBUG + // Rprintf("ALL USED BLOBS:\n\n"); + // for(size_t i = 0; ic3 = 0.0; + int num_surv_blob = 0; + *n_clade = 0.0; + while(cur->prev != NULL) { + num_surv_blob++; + j -= cur->num_in_blob; + diff = cur->lower - cur->prev->upper + 2*thres; + temp_n_mut = diff/unitdist; + if(max1var){ + cur->prev->c3 = *n_clade += min(temp_n_mut,1.0)/j; + } else { + cur->prev->c3 = *n_clade += temp_n_mut/j; + } + cur = cur->prev; + } + // DEBUG + // cur = head; + // while(cur != NULL) { + // Rprintf("%lf\n", cur->c3); + // cur = cur->next; + // } + + //DEBUG + // Rprintf("\nCOMPUTED BLOBS:\n\n"); + // cur = head; + // int i=0; + // while(cur != NULL) { + // i++; + // printblob(cur); + // cur = cur->next; + // } + // Rprintf("%d blobs used in total, %d survive.\n", next_new_blob+1, num_surv_blob+1); + + return(head); +} + +void blobby_B1(double* alpha1, double* beta1, size_t recipient, size_t n, double thres, const double unitdist, const int max1var) { + thres *= unitdist; + double maxd = 744.4400719213812180897; + blob blobs[(int) (maxd/thres+2)]; + blob** x_in_blob[n]; + double* n_clade; + blobby_BB(alpha1, beta1, recipient, n, blobs, x_in_blob, n_clade, thres, maxd, unitdist, max1var); +} + + +SEXP blobbyB1(SEXP ALPHA, SEXP BETA, SEXP FROMRECIPIENT, SEXP THRES, SEXP UNITDIST, SEXP MAX1VAR) { + double* alpha = REAL(ALPHA); + double* beta = REAL(BETA); + double* thres = REAL(THRES); + double* unitdist = REAL(UNITDIST); + int* max1var = INTEGER(MAX1VAR); + size_t cur_left_recipient = (size_t) *INTEGER(FROMRECIPIENT) - 1; + + blobby_B1(alpha, beta, cur_left_recipient, (size_t) Rf_length(ALPHA), *thres, *unitdist, *max1var); + + return(R_NilValue); +} + + + +void blobby_B2(const double* alpha1, const double* beta1, const double* alpha2, const double* beta2, size_t cur_left_recipient, + size_t n, double thres, const double unitdist, const int max1var, + double* res, + int** neigh1, int** neigh2, int* n_neigh1, int* n_neigh2, + double* n_clade1, double* n_clade2, + double* similarities1, double* similarities2, + blob*** x_in_blob1, blob*** x_in_blob2, + const double maxd, blob* blobs1, blob* blobs2) { + thres *= unitdist; + blob* head1 = blobby_BB(alpha1, beta1, cur_left_recipient, n, blobs1, x_in_blob1, n_clade1, thres, maxd, unitdist, max1var); + blob* head2 = blobby_BB(alpha2, beta2, cur_left_recipient + 1, n, blobs2, x_in_blob2, n_clade2, thres, maxd, unitdist, max1var); + + int both1, both2; + if(head1->num_in_blob > 1) { + *n_neigh1 = head1->num_in_blob; + both1 = 0; + similarities1[0] = head1->c3; + similarities1[1] = head1->c3; + if(head1->next != NULL) { + similarities1[2] = head1->next->c3; + } else { + similarities1[2] = 0.0; + } + } else { + *n_neigh1 = 1 + head1->next->num_in_blob; + both1 = 1; + similarities1[0] = head1->c3; + similarities1[1] = head1->next->c3; + if(head1->next->next != NULL) { + similarities1[2] = head1->next->next->c3; + } else { + similarities1[2] = 0.0; + } + } + if(head2->num_in_blob > 1) { + *n_neigh2 = head2->num_in_blob; + both2 = 0; + similarities2[0] = head2->c3; + similarities2[1] = head2->c3; + if(head2->next != NULL) { + similarities2[2] = head2->next->c3; + } else { + similarities2[2] = 0.0; + } + } else { + *n_neigh2 = 1 + head2->next->num_in_blob; + both2 = 1; + similarities2[0] = head2->c3; + similarities2[1] = head2->next->c3; + if(head2->next->next != NULL) { + similarities2[2] = head2->next->next->c3; + } else { + similarities2[2] = 0.0; + } + } + + *neigh1 = malloc(sizeof(int)* *n_neigh1); + *neigh2 = malloc(sizeof(int)* *n_neigh2); + + // Neighbours and dedip + size_t j = 0, ni1 = 0, ni2 = 0; + for(size_t i = 0; iblob == head1 || (both1 && (*(x_in_blob1[j]))->blob == head1->next)) { + (*neigh1)[ni1++] = j+1; + } + if((*(x_in_blob1[j+1]))->blob == head1 || (both1 && (*(x_in_blob1[j+1]))->blob == head1->next)) { + (*neigh1)[ni1++] = j+2; + } + if((*(x_in_blob2[j]))->blob == head2 || (both2 && (*(x_in_blob2[j]))->blob == head2->next)) { + (*neigh2)[ni2++] = j+1; + } + if((*(x_in_blob2[j+1]))->blob == head2 || (both2 && (*(x_in_blob2[j+1]))->blob == head2->next)) { + (*neigh2)[ni2++] = j+2; + } + res[i] = (*(x_in_blob1[j]))->blob->c3 + (*(x_in_blob1[j+1]))->blob->c3 + (*(x_in_blob2[j]))->blob->c3 + (*(x_in_blob2[j+1]))->blob->c3; + j += 2; + } +} + +SEXP blobbyB2(SEXP ALPHA, SEXP BETA, SEXP FROMRECIPIENT, SEXP THRES, SEXP UNITDIST, SEXP MAX1VAR, SEXP DEDIP) { + // ALPHA and BETA must each be a R matrix with two columns + + double* alpha = REAL(ALPHA); + double* beta = REAL(BETA); + size_t from_recipient = (size_t) Rf_asInteger(FROMRECIPIENT) - 1; + double thres = Rf_asReal(THRES); + double unitdist = Rf_asReal(UNITDIST); + int max1var = Rf_asInteger(MAX1VAR); + double* dedip = REAL(DEDIP); + size_t n = Rf_nrows(ALPHA); + + if(Rf_ncols(ALPHA) != Rf_ncols(BETA)) { + Rf_error("All alphas/betas must have same number of columns"); + } + if(Rf_nrows(ALPHA) != Rf_nrows(BETA)) { + Rf_error("All alphas/betas must have same number of rows"); + } + if(Rf_ncols(ALPHA) != 2 || Rf_ncols(BETA) != 2) { + Rf_error("alpha and beta must both have two columns"); + } + if(Rf_nrows(ALPHA)/2 != Rf_length(DEDIP)) { + Rf_error("Length of DEDIP must equal nrows(alpha)/2."); + } + + + int* neigh1; + int* neigh2; + int n_neigh[2]; + + double simi1[3], simi2[3]; + + blob*** x_in_blob1 = malloc(sizeof(blob**)*n); + blob*** x_in_blob2 = malloc(sizeof(blob**)*n); + + const double maxd = 744.4400719213812180897; + blob* blobs1 = malloc(sizeof(blob)*((int) (maxd/thres+2))); + blob* blobs2 = malloc(sizeof(blob)*((int) (maxd/thres+2))); + + double* n_clade1; + double* n_clade2; + + blobby_B2(alpha, beta, alpha+n, beta+n, from_recipient, n, thres, unitdist, max1var, dedip, + &neigh1, &neigh2, &n_neigh[0], &n_neigh[1], n_clade1, n_clade2, + simi1, simi2, + x_in_blob1, x_in_blob2, + maxd, blobs1, blobs2); + + free(x_in_blob1); free(x_in_blob2); + free(blobs1); free(blobs2); + + return(R_NilValue); +} + + + +struct blobby_core_args { + const double* const restrict alpha; + const double* const restrict beta; + double* const restrict dedip; + const size_t from_recipient; + const size_t n; + const double thres; + const double unitdist; + const int max1var; + int** const neigh; + int* const n_neigh; + double* const n_clade; + double* const similarities1; + double* const similarities2; + double* const similarities3; +}; + +struct blobby_args { + struct blobby_core_args *core_args; + size_t from; + size_t N; +}; + +void* blobby_B(void *args) { + struct blobby_args *b_args; + b_args = (struct blobby_args *) args; + const double* restrict alpha = b_args->core_args->alpha; + const double* restrict beta = b_args->core_args->beta; + double* restrict dedip = b_args->core_args->dedip; + size_t from_recipient = b_args->core_args->from_recipient; + const size_t n = b_args->core_args->n; + const double thres = b_args->core_args->thres; + const double unitdist = b_args->core_args->unitdist; + const int max1var = b_args->core_args->max1var; + int** neigh = b_args->core_args->neigh; + int* n_neigh = b_args->core_args->n_neigh; + double* n_clade = b_args->core_args->n_clade; + double* similarities1 = b_args->core_args->similarities1; + double* similarities2 = b_args->core_args->similarities2; + double* similarities3 = b_args->core_args->similarities3; + size_t from = b_args->from; + size_t N = b_args->N; + + alpha += n*from; + beta += n*from; + dedip += (n/2)*(from/2); + from_recipient += from; + neigh += from; + n_neigh += from; + n_clade += from; + similarities1 += from; + similarities2 += from; + similarities3 += from; + + double simi1[3], simi2[3]; + + blob*** x_in_blob1 = malloc(sizeof(blob**)*n); + blob*** x_in_blob2 = malloc(sizeof(blob**)*n); + + const double maxd = 744.4400719213812180897; + blob* blobs1 = malloc(sizeof(blob)*((int) (maxd/thres+2))); + blob* blobs2 = malloc(sizeof(blob)*((int) (maxd/thres+2))); + + for(size_t i = 0; i < N; i+=2) { + blobby_B2(alpha, beta, alpha+n, beta+n, from_recipient, n, thres, unitdist, max1var, dedip, + neigh, neigh+1, n_neigh, n_neigh+1, n_clade, n_clade+1, + simi1, simi2, + x_in_blob1, x_in_blob2, + maxd, blobs1, blobs2); + + similarities1[i] = simi1[0]; + similarities2[i] = simi1[1]; + similarities3[i] = simi1[2]; + similarities1[i+1] = simi2[0]; + similarities2[i+1] = simi2[1]; + similarities3[i+1] = simi2[2]; + + alpha += 2*n; + beta += 2*n; + dedip += n/2; + from_recipient += 2; + neigh += 2; + n_neigh += 2; + n_clade += 2; + } + + free(x_in_blob1); free(x_in_blob2); + free(blobs1); free(blobs2); + + return(NULL); +} + +void blobby_A(const double* const restrict alpha, + const double* const restrict beta, + double* const restrict dedip, + size_t from_recipient, + size_t n, + size_t p, + double thres, + double unitdist, + int max1var, + int** const neigh, + int* const n_neigh, + double* const n_clade, + double* const similarities1, + double* const similarities2, + double* const similarities3, + size_t nthreads) { + + + struct blobby_core_args core_args = { + .alpha = alpha, + .beta = beta, + .dedip = dedip, + .from_recipient = from_recipient, + .n = n, + .thres = thres, + .unitdist = unitdist, + .max1var = max1var, + .neigh = neigh, + .n_neigh = n_neigh, + .n_clade = n_clade, + .similarities1 = similarities1, + .similarities2 = similarities2, + .similarities3 = similarities3 + }; + + if(nthreads > 1) { + + pthread_t threads[nthreads]; + pthread_attr_t attr; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + size_t num_perth = (p/2)/nthreads; + size_t rag_end = (p/2)%nthreads; + + struct blobby_args args[nthreads+1]; + for(size_t i=0; i +#include + +SEXP CladeMat(SEXP Rfwd, + SEXP Rbck, + SEXP RM, + SEXP Runit_dist, + SEXP Rthresh, + SEXP Rmax1var, + SEXP Rnthreads); + +SEXP UpdateRealInPlace(SEXP RM, + SEXP Ridx, + SEXP Rvec); + +#endif From e0df97fff10f4e424db746d871ea5bcf7add34e1 Mon Sep 17 00:00:00 2001 From: ryanchrist Date: Mon, 23 Sep 2024 12:08:40 -0500 Subject: [PATCH 03/43] Introducing R interface for Clade/Sprig calling and Clade Matrix (generalized eGRM) construction --- R/CladeMat.R | 45 ++++ R/Clades.R | 627 +++++++++++++++++++++++++++++++++++++++++++ R/GoldMasterClades.R | 58 ++++ 3 files changed, 730 insertions(+) create mode 100644 R/CladeMat.R create mode 100644 R/Clades.R create mode 100644 R/GoldMasterClades.R diff --git a/R/CladeMat.R b/R/CladeMat.R new file mode 100644 index 0000000..2e1a601 --- /dev/null +++ b/R/CladeMat.R @@ -0,0 +1,45 @@ +#' Fast Clade Matrix Construction +#' @export CladeMat +CladeMat <- function(fwd, bck, M, unit.dist, thresh = 0.2, max1var = FALSE, + nthreads = min(parallel::detectCores(logical = FALSE), fwd$to_recipient-fwd$from_recipient+1)){ + + # input checks + ######################### + input_checks_for_probs_and_dist_mat(fwd,bck) + + if(nrow(fwd$alpha)%%2 !=0 || ncol(fwd$alpha)%%2 !=0 || nrow(bck$beta)%%2 !=0 || ncol(bck$beta)%%2 !=0 ){ + stop("fwd and bck must both have an even number of recipient haplotypes and an even number of donor haplotypes") + } + + if(!is.matrix(M) || !is.double(M) || nrow(M) != nrow(fwd$alpha)/2 || ncol(M) != ncol(fwd$alpha)/2){ + stop("M must be a matrix of doubles with nrow(fwd$alpha)/2 rows and ncol(fwd$alpha)/2 columns")} + + if(!is.atomic(unit.dist) || length(unit.dist)!=1L || !is.finite(unit.dist) || unit.dist <= 0){ + stop("unit.dist must be a number greater than 0")} + + if(is.integer(unit.dist)){ + unit.dist <- as.double(unit.dist) + } else { + if(!is.double(unit.dist)){stop("unit.dist must be a number greater than 0")}} + + if(!is.atomic(thresh) || length(thresh)!=1L || !is.finite(thresh) || thresh < 0 || thresh > 1){ + stop("thresh must be a number in [0,1]")} + + if(is.integer(thresh)){ + thresh <- as.double(thresh) + } else { + if(!is.double(thresh)){stop("thresh must be a number in [0,1]")}} + + if(!is.logical(max1var) || length(max1var) > 1){ + stop("max1var must be TRUE or FALSE")} + + nthreads <- as.integer(nthreads) + if(!is.integer(nthreads) || length(nthreads)!=1L || !is.finite(nthreads) || nthreads <= 0){ + stop("nthreads must be a positive integer")} + + if(nthreads > ncol(fwd$alpha)/2){ + stop("nthreads cannot be greater than the number of recipient haplotypes divided by 2.") + } + + invisible(.Call(CCall_CladeMat, fwd, bck, M, unit.dist, thresh, max1var, nthreads)) +} diff --git a/R/Clades.R b/R/Clades.R new file mode 100644 index 0000000..398c49a --- /dev/null +++ b/R/Clades.R @@ -0,0 +1,627 @@ + + + +get_neigh <- function(x,i){ + idx <- x[[1]][c(i,i+1L)] + x[[2]][seq.int(idx[1],idx[2]-1L)] +} + +get_neigh_seq <- function(x, i, return.lengths = FALSE){ + from <- x[[1]][i] + nvec <- x[[1]][i+1] - from + if(return.lengths){ + list("seq" = x[[2]][sequence(nvec,from)], + "lengths" = nvec) + } else { + x[[2]][sequence(nvec,from)] + } +} + +#' Sprigs +#' @export Sprigs +Sprigs <- function(x, old.sprigs = FALSE){ + + N <- length(x[[1]])-1L + roster <- rep(NA_integer_,N) + label <- 0L + done <- neighborhood.is.sprig.ind <- rep(FALSE,N) + + if(old.sprigs){ + + xx <- as.list(1:N) + for(i in 1:N){xx[[i]] <- get_neigh(x,i)} + + roster <- Sprigs_old(xx, use.forking = FALSE, nthreads = 1L, add.self = FALSE) + + label <- attr(roster,"n.sprigs") + + attributes(roster) <- NULL + + neighborhood.is.sprig.ind <- !is.na(roster) + + + # for(i in sample.int(N)){ + # + # if(done[i]){next} + # + # C = get_neigh(x,i) + # lC <- length(C) + # + # neigh.list <- get_neigh_seq(x, C, return.lengths = TRUE) + # + # temp.table <- table(neigh.list[[1]]) + # proposed.set <- as.integer(names(temp.table)[which(temp.table == lC)]) + # + # # in case the neighborhood of i overshoots into previously established cliques, it has a BIG effect in real data + # proposed.set <- proposed.set[!done[proposed.set]] + # + # if(length(proposed.set) > 1 && i %in% proposed.set){ + # + # label <- label + 1L + # # this repeated intersection step has truly a small effect but + # # helps guard us against the case where i might have erroneously added some candidates in its neighborhood that + # # do not belong in the clade and do not include some clade members. This steps helps us recover those clade members + # + # neigh.list <- get_neigh_seq(x, proposed.set, return.lengths = TRUE) + # temp.table <- table(neigh.list[[1]]) + # proposed.set <- as.integer(names(temp.table)[which(temp.table == lC)]) + # proposed.set <- proposed.set[!done[proposed.set]] + # + # roster[proposed.set] <- label + # done[proposed.set] <- TRUE + # neighborhood.is.sprig.ind[proposed.set] <- neigh.list[[2]] == lC # indicator that a haplotype's neighborhood is exactly the proposed sprig + # } + # } + + + } else { + + # the randomness in indices here is not really essential but + for(i in seq_len(N)){ #sample.int(N)){ + + if(done[i]){next} + + C = get_neigh(x,i) # keep simple get_neigh here because about 2x faster than get_neigh_seq when only querying one index + C <- C[!done[C]] # we know this will at least include i because of the if(!done[i]) check above and every neighborhood includes self + + lC <- length(C) + + if(lC == 1){ # C is an orphan that could never be a part of another clade because all of it's neighbors are already assigned + done[i] <- TRUE + next} + + neigh.list <- get_neigh_seq(x, C, return.lengths = TRUE) + + if(all(tabulate(factor(neigh.list[[1]],C),nbins = lC)==lC)){ + label <- label + 1L + roster[C] <- label + done[C] <- TRUE + neighborhood.is.sprig.ind[C] <- neigh.list[[2]] == lC # indicator that a haplotype's neighborhood is exactly the proposed sprig + } + } + + } + + list("assignments" = roster, + "to.prune" = neighborhood.is.sprig.ind, + "num.sprigs" = label) +} + + + +UpdateMatrixInPlace <- function(M,row.idx,col.idx,x){ + invisible(.Call(getFromNamespace("CCall_UpdateRealInPlace","kalis"),M, + as.integer(row.idx + (col.idx-1L)*nrow(M)),x)) +} + +# test <- matrix(as.double(1:144),12,12) +# UpdateMatrixInPlace(test,c(5,12,12),c(1,3,5),as.double(c(100,200,300))) + +#' PruneCladeMat +#' @export PruneCladeMat +PruneCladeMat <- function(M, neigh, sprigs, prune = "singleton.info", from.recipient = 1L){ + + if(!from.recipient%%2){stop("from.recipient must be odd and encode the index of the first recipient haplotype")} + + N.recipients <- 2 * ncol(M) + + if(prune=="singleton.info"){ + + v <- neigh[[2]][[2]] - neigh[[2]][[1]] + v <- v[seq.int(1,N.recipients,2)] + v[seq.int(2,N.recipients,2)] + UpdateMatrixInPlace(M, + seq.int(from = (from.recipient+1L)/2L,length.out = ncol(M)), + seq.int(from = 1, to = ncol(M)), + v) + + } else if(prune=="sprigs"){ + + neigh.list <- get_neigh_seq(neigh[[1]], which(sprigs$to.prune), return.lengths = TRUE) + + hap.idx <- cbind(neigh.list[[1]], rep(which(sprigs$to.prune), times = neigh.list[[2]])) + + key <- rep(0L,nrow(hap.idx)) + + hap.idx.odd <- hap.idx%%2 + temp.hap.idx.odd <- hap.idx.odd[,1] + hap.idx.odd[,2] + key[temp.hap.idx.odd==2] <- 1L + key[temp.hap.idx.odd==0] <- 4L + temp.hap.idx.odd <- hap.idx.odd[,1] - hap.idx.odd[,2] + key[temp.hap.idx.odd==1] <- 3L + key[temp.hap.idx.odd==-1] <- 2L + + sim.updates <- rep((neigh[[2]][[3]]-neigh[[2]][[2]])[sprigs$to.prune], times = neigh.list[[2]]) + + # if M was not already dediploided: + # M[hap.idx] <- M[hap.idx] - sim.updates + + # since M is dediploided, we run + + if(!is.na(match(1L,key))){ + to.fetch <- key==1L + UpdateMatrixInPlace(M, + (hap.idx[to.fetch,1L]+1L)/2L, + (hap.idx[to.fetch,2L]+1L)/2L, + sim.updates[to.fetch])} + + if(!is.na(match(2L,key))){ + to.fetch <- key==2L + UpdateMatrixInPlace(M, + hap.idx[to.fetch,1L]/2L, + (hap.idx[to.fetch,2L]+1L)/2L, + sim.updates[to.fetch])} + + if(!is.na(match(3L,key))){ + to.fetch <- key==3L + UpdateMatrixInPlace(M, + (hap.idx[to.fetch,1L]+1L)/2L, + (hap.idx[to.fetch,2L])/2L, + sim.updates[to.fetch])} + + if(!is.na(match(4L,key))){ + to.fetch <- key==4L + UpdateMatrixInPlace(M, + hap.idx[to.fetch,1L]/2L, + hap.idx[to.fetch,2L]/2L, + sim.updates[to.fetch])} + + } else { + stop("unrecognized option for prune") + } + invisible(NULL) +} + + + + + + +#' Probabilistic Clades +#' +#' Utility for calling probabilistic clades at, in between, or excluding variants. +#' @param fwd a forward table as returned by \code{\link{MakeForwardTable}} +#' @param bck a backward table as returned by \code{\link{MakeBackwardTable}} +#' @param pars a \code{kalisParameters} object, as returned by +#' \code{\link{Parameters}}. +#' @param beta.theta.opts a list; see Details for \code{\link{DistMat}}. +#' @param safety.checks a logical, should safety checks be applied to the distances? See \code{\link{DistMat}}. +#' @param neighbors a logical, should nearest neighbors be pre-calculated? See \code{\link{Neighbors}}. +#' @param use.forking a logical, should forked processes be used? +#' @param nthreads the number of CPU cores to use. Currently, no parallelism is used. +#' @return +#' a \code{kalisClades} object encoding probabilistic clade calls +#' +#' @export Clades +Clades <- function(fwd, bck, pars, beta.theta.opts = NULL, + safety.checks = FALSE, neighbors = FALSE, + #use.bettermc = FALSE, + use.forking = FALSE, + forking.chunk.size = 100L, + mc.preschedule = FALSE, # FALSE is more conservative of memory but means many new forked processes need to be launched so it's slower than TRUE + nthreads = 1L){ + # currently only outputs a list but should eventually also output a matrix of integers and an attribute list of clades + + unit.mut.dist <- -log(pars$pars$mu) + + M <- DistMat(fwd, bck, beta.theta.opts = beta.theta.opts, nthreads = nthreads) + + if(safety.checks){ + M[!is.finite(M)] <- 0 + diag(M) <- NA_real_ + } + + rank_donors_func <- function(x, type="linear_20", neighbors = FALSE, mac.range = c(NA,NA)){ + rank_donors_func_res <- as.list(1:length(x)) + for(j in 1:length(x)){ + d.ranks <- data.table::frank(M[,x[j]], na.last = FALSE, ties.method = "first") + phi <- c(diff(M[order(d.ranks),x[j]]),0) + phi[1] <- 0 + phi <- phi / unit.mut.dist # an N-long vector + if(type == "linear_20"){ + phi[phi > 1] <- 1 + phi[phi < 0.2] <- 0 + } else if(type == "step_80"){ + phi[phi < 0.8] <- 0 + phi[phi > 0] <- 1 + } + + if(!is.na(mac.range[1])){phi[1:mac.range[1]] <- 0} + if(!is.na(mac.range[2])){phi[mac.range[2]:nrow(fwd$alpha)] <- 0} + + i <- which(phi!=0) + + # compress phi + clades <- cbind(i,phi[i]) # if i = integer(0) (no clades called), clades will be a 0 x 2 matrix. + attr(d.ranks,"clades") <- clades + + if(neighbors){ + attr(d.ranks,"neighbors") <- if(nrow(clades)){ + match(2:clades[1,1],d.ranks) + } else { + NA_integer_ + } + } + + rank_donors_func_res[[j]] <- d.ranks + } + rank_donors_func_res + } + + + chunks <- chunk_int(ncol(M), chunk.size = forking.chunk.size) + + if(use.forking){ + # if(use.bettermc){ + # rank.list <- bettermc::mclapply(chunks, rank_donors_func, neighbors = neighbors, mc.preschedule = mc.preschedule, mc.cores=nthreads, mc.share.copy = FALSE) + # } else { + rank.list <- parallel::mclapply(chunks, rank_donors_func, neighbors = neighbors, mc.preschedule = mc.preschedule, mc.cores=nthreads) + #} + } else { + rank.list <- lapply(chunks, rank_donors_func, neighbors = neighbors) # this matrix is ranked in each column, not scaled by Ne or Mu + } + + rank.list <- unlist(rank.list,recursive = FALSE) + + attr(rank.list,"from_recipient") <- fwd$from_recipient + attr(rank.list,"to_recipient") <- fwd$to_recipient + + class(rank.list) <- c("kalisClades","list") # rank.list is a list where each element is a vector of ranks with attributes clades + + rank.list +} + + +#' Neighbors +#' +#' Utility for calling tied nearest neighbors for each recipient haplotype +#' @param x a \code{kalisClades} object returned by \code{\link{kalisClades}} +#' @param use.forking a logical, should forked processes be used? +#' @param nthreads the number of CPU cores to use. Currently, no parallelism is used. +#' @return +#' a \code{kalisNeighbors} encoding the nearest neighbors for each recipient haplotype +#' +#' @export Neighbors +Neighbors <- function(x, + #use.bettermc = FALSE, + use.forking = FALSE, nthreads = 1L){ + # currently only supports list x but should support matrix x as well + + if(!is.null(attr(x[[1]],"neighbors"))){ + + neighbors <- lapply(x,function(z){attr(z,"neighbors")}) + + } else { + + + call_neighbors <- function(z){ + # x should be a vector of ranks with attribute "clades" + clades <- attr(z,"clades") + if(nrow(clades)){ + match(2:clades[1,1],z) + } else { + NA_integer_ + } + } + + if(use.forking){ + # if(use.bettermc){ + # neighbors <- bettermc::mclapply(x, call_neighbors, mc.cores = nthreads, mc.share.copy = FALSE) + # } else { + neighbors <- parallel::mclapply(x, call_neighbors, mc.cores = nthreads) + #} + } else { + neighbors <- lapply(x,call_neighbors) + } + } + + attr(neighbors,"from_recipient") <- attr(x,"from_recipient") + attr(neighbors,"to_recipient") <- attr(x,"to_recipient") + class(neighbors) <- c("kalisNeighbors","list") + + neighbors +} + + +#' Sprigs +#' +#' Utility for calling sprigs from probabilistic clades +#' @param x a \code{kalisNeighbors} object returned by \code{\link{kalisNeighbors}}, a \code{kalisClades} object returned by \code{\link{kalisClades}} with \code{neighbors = TRUE}, or a list +#' @param use.forking a logical, should forked processes be used? +#' @param nthreads the number of CPU cores to use. Currently, no parallelism is used. +#' @return +#' a \code{kalisSprigs} object assigning each haplotype to a sprig +#' +#' @export Sprigs_old +Sprigs_old <- function(x, use.forking = FALSE, nthreads = 1L, add.self = TRUE){ + + # this version of Sprigs still has a bit of randomness in it's sprig building between runs on the same input + # which can be seen by running table(is.na(s),is.na(s1)) where s and s1 are the output of Sprigs + # for the same data run twice. it's relatively minor + + if(inherits(x,"kalisClades")){ + if(!is.null(attr(x[[1]],"neighbors"))){ + x <- lapply(x,function(z){attr(z,"neighbors")}) + } else { + stop("The kalisClades provided do not have the Neighbors pre-calculated, use kalis::Neighbors to obtain them and then pass them to Sprigs") + } + } + + # x here is a list that's N long st x[[i]] gives the indices of the (tied) nearest neighbors of i + roster <- rep(NA_integer_,length(x)) + + label <- 0L + # add self to own neighborhood + if(add.self){x <- mapply(c,x,1:length(x))} + + done <- rep(FALSE,length(x)) + to.prune <- rep(NA_integer_,length(x)) + + # the randomness in indices here is not really essential but + for(i in sample.int(length(x))){ + if(!done[i]){ + + # pulling out cliques in the graph that are fully connected bi-directionally: + # if i is in a clique, rather trivially, this will return the full clique + # Note, we require i %in% proposed.set to prevent called cliques from being broken up later in the for loop: + # if i is not in a clique, then it's still possible for a partial clique to be returned that doesn't include i if i projects onto + # a superset or subset of a clique. If i supercedes a clique member and projects + # onto a subset, this clique subset will be overwritten later by the larger clique. However, it would still be possible for a i that comes + # after all of the clade members in our for loop to break up the clique by projecting onto a subset of them. + # Enforcing i %in% proposed.set avoids that possibility. + + # we also require that length(proposed.set) > 1 so that we don't end up with solo cliques being called that are just i by itself. + + #missing_sprig_6 <- c(6103,1804, 6015, 4726, 4752, 807,3118,3991,6466,6068, 10,1250, 3669, 3658, 1997, 1399, 1116, 3738, 5015) + proposed.set <- Reduce(intersect,x[x[[i]]]) + # we can really speed up this Reduce by using table and looking for entries that are present in all groups + + # in case the neighborhood of i overshoots into previously established cliques, it has a BIG effect in real data + proposed.set <- proposed.set[!done[proposed.set]] + + if(length(proposed.set) > 1 && i %in% proposed.set){ + + label <- label + 1L + # this repeated intersection step has truly a small effect but + # helps guard us against the case where i might have erroneously added some candidates in its neighborhood that + # do not belong in the clade and do not include some clade members. This steps helps us recover those clade members + proposed.set <- Reduce(intersect,x[proposed.set]) + proposed.set <- proposed.set[!done[proposed.set]] + + # if(!all(is.na(roster[missing_sprig_6])) && !all(roster[missing_sprig_6]==6L)){ + # print(i) + # print(label) + # print(roster[missing_sprig_6]) + # browser() + # } + roster[proposed.set] <- label + done[proposed.set] <- TRUE + } + } + + # individuals that are not part of a fully connected clique are left with NA_integer_ on the roster + } + + # Size frequency spectrum: table(table(roster)) + + attr(roster,"n.sprigs") <- label + attr(roster,"from_recipient") <- attr(x,"from_recipient") + attr(roster,"to_recipient") <- attr(x,"to_recipient") + class(roster) <- c("kalisSprigs","integer") + + roster +} + +#Testing Sprigs +# kalis::Sprigs(list( +# 1:2, +# 1:2, +# 3:7, +# 1:10, +# 1:10, +# 1:10, +# 5:11 +# )) + +#' CladeMat OLD +#' +#' Utility for contructing a probabilistic clade matrix +#' @param x a \code{kalisClades} object returned by \code{\link{kalisClades}} +#' @param ploidy an integer, the ploidy of the organism +#' @param sprigs.to.prune a \code{kalisSprigs} object returned by \code{\link{kalisSprigs}} encoding sprigs that should be excluded from the matrix returned +#' @param assemble a logical, if FALSE return the clade matrix as a list of columns rather than as a symmetrized matrix +#' @param use.forking a logical, should forked processes be used? +#' @param nthreads the number of CPU cores to use. Currently, no parallelism is used. +#' @return +#' a matrix representation of the probabilistic clades provided +#' +#' @export CladeMat_old +CladeMat_old <- function(x, ploidy = 2L, sprigs.to.prune = NULL, assemble = TRUE, + #use.bettermc = FALSE, + use.forking = FALSE, forking.chunk.size = 100L, mc.preschedule = FALSE, nthreads = 1L){ + + # prepare sprigs + if(is.null(sprigs.to.prune)){sprigs.to.prune <- integer()} + sl <- length(sprigs.to.prune) + if(sl){sprig.sizes <- tabulate(sprigs.to.prune)} + + n.recipient.samples <- as.integer(length(x)/ploidy) + + chunks <- chunk_int(n.recipient.samples, chunk.size = forking.chunk.size) + + if(ploidy == 1){ + omega_func <- function(s){ + omega_func_res <- as.list(1:length(s)) + for(j in 1:length(s)){ + + N <- length(x[[s[j]]]) + + idx <- attr(x[[s[j]]],"clades")[,1] + phi <- attr(x[[s[j]]],"clades")[,2] + + # prune sprig + if(sl && !is.na(sprigs.to.prune[s[j]]) && length(idx) && sprig.sizes[sprigs.to.prune[s[j]]] == idx[1]){ + idx <- idx[-1] + phi <- phi[-1] + } + + # we know that phi[N] = 0, so there must always be a 0 appended + omega_func_res[[j]] <- inverse.rle(list("values" = c(rev(cumsum(rev(phi/idx))),0), + "lengths" = diff(c(0,idx,N))))[x[[s[j]]]] + + } + omega_func_res + } + + } else if(ploidy == 2){ + + omega_func <- function(s){ + omega_func_res <- as.list(1:length(s)) + for(j in 1:length(s)){ + N <- length(x[[s[j]]]) + + idx <- attr(x[[s[j]*2-1]],"clades")[,1] + phi <- attr(x[[s[j]*2-1]],"clades")[,2] + + idx2 <- attr(x[[s[j]*2]],"clades")[,1] + phi2 <- attr(x[[s[j]*2]],"clades")[,2] + + + if(sl && !is.na(sprigs.to.prune[s[j]*2-1]) && length(idx) && sprig.sizes[sprigs.to.prune[s[j]*2-1]] == idx[1]){ + idx <- idx[-1] + phi <- phi[-1] + } + + if(sl && !is.na(sprigs.to.prune[s[j]*2]) && length(idx2) && sprig.sizes[sprigs.to.prune[s[j]*2]] == idx2[1]){ + idx2 <- idx2[-1] + phi2 <- phi2[-1] + } + + # we know that phi[N] = 0, so there must always be a 0 appended + w <- inverse.rle(list("values" = c(rev(cumsum(rev(phi/idx))),0), + "lengths" = diff(c(0,idx,N))))[x[[s[j]*2-1]]] + + inverse.rle(list("values" = c(rev(cumsum(rev(phi2/idx2))),0), + "lengths" = diff(c(0,idx2,N))))[x[[s[j]*2]]] + + omega_func_res[[j]] <- w[seq(1,N,by=2)] + w[seq(2,N,by=2)] + } + omega_func_res + } + + } else { + stop("Relatedness currently only supports ploidy = 1 or 2") + } + + # we don't simplify this list to a matrix at this stage to help preserve memory. + if(use.forking){ + # if(use.bettermc){ + # res <- bettermc::mclapply(chunks, omega_func, mc.preschedule = mc.preschedule, mc.cores=nthreads, mc.share.copy = FALSE) + # } else { + res <- parallel::mclapply(chunks, omega_func, mc.preschedule = mc.preschedule, mc.cores=nthreads) + #} + } else { + res <- lapply(chunks, omega_func) + } + + res <- unlist(res, recursive = FALSE) + + if(assemble){ + res <- do.call(cbind,res) + res <- 0.5 * (res + t(res)) + } + + res +} + +chunk_int <- function(n, chunk.size = 100){ + # subdivide 1:n into chunks of size at most chunk.size + if(n < 1){stop("n must be an integer >= 1")} + interval.starts <- seq(1,n,by=chunk.size) + interval.ends <- c(interval.starts[-1]-1,n) + res <- as.list(1:length(interval.starts)) + for(i in 1:length(interval.starts)){ + res[[i]] <- seq.int(interval.starts[i],interval.ends[i])} + res +} +# +# use.forking <- FALSE +# use.forking <- TRUE +# nthreads <- 8L +# +# require(kalis) +# haps.file <-"~/Dropbox/Benchmarking_StatGen/kalis_benchmarking_tests/benchmark_on_msprime_simulations/msprime_sim_N_100000_L_10000.hdf5" +# CacheHaplotypes(haps = haps.file,loci.idx = 1:1000,hap.idx = 1:24000)#SmallHaps) +# #CacheHaplotypes(SmallHaps) +# pars <- Parameters(rep(1e-2, L() - 1)) +# fwd <- MakeForwardTable(pars) +# bck <- MakeBackwardTable(pars) +# Forward(fwd,pars,floor(L()/2),1) +# Backward(bck,pars,floor(L()/2),1) +# # +# start <- proc.time() +# rl2 <- Clades(fwd, bck, pars, neighbors = TRUE, safety.checks = FALSE, use.forking = use.forking, nthreads = nthreads) +# finish <- proc.time() - start +# print(finish) +# # +# sprigs <- Sprigs(rl2) +# start <- proc.time() +# M <- CladeMat(rl2, sprigs.to.prune = sprigs, use.forking = use.forking, nthreads=nthreads) +# finish <- proc.time() - start +# print(finish) +# + +# rl<- readRDS("~/Downloads/clades_test.rds") +# all.equal(rl,rl2) + +# sprigs <- CallSprigs(rl, use.forking = use.forking, nthreads = nthreads) +# +# #hist(sapply(rl,function(x){nrow(attr(x,"clades"))}),breaks=20) +# #mean(unlist(lapply(rl,function(x){attr(x,"clades")[,2]}))>0.5) +# +# rl <- CladeMat(rl, ploidy = 2L, sprigs.to.prune = sprigs, assemble = FALSE, use.forking = use.forking, nthreads = nthreads) +# rl <- do.call(cbind,rl) +# rl <- 0.5 * (rl + t(rl)) +# +# r2 <- -r2 +# class(r2) <- c("kalisDistanceMatrix","matrix") +# plot(r2) +# +# M <- DistMat(fwd,bck) +# M <- M + t(M) +# +# perm <- fastcluster::hclust(stats::as.dist(M),method="average")$order +# +# layout(matrix(1:3,1)) +# print(lattice::levelplot(M[perm,][,rev(perm)], +# useRaster = TRUE, +# col.regions = grDevices::colorRampPalette(RColorBrewer::brewer.pal(9,name = "BuPu"))(100), +# yaxt = "n", xaxt = "n", xlab = "", ylab = "", xaxt = "n")) +# print(lattice::levelplot(r1[perm,][,rev(perm)], +# useRaster = TRUE, +# col.regions = grDevices::colorRampPalette(RColorBrewer::brewer.pal(9,name = "BuPu"))(100), +# yaxt = "n", xaxt = "n", xlab = "", ylab = "", xaxt = "n")) +# print(lattice::levelplot(r2[perm,][,rev(perm)], +# useRaster = TRUE, +# col.regions = grDevices::colorRampPalette(RColorBrewer::brewer.pal(9,name = "BuPu"))(100), +# yaxt = "n", xaxt = "n", xlab = "", ylab = "", xaxt = "n")) +# + diff --git a/R/GoldMasterClades.R b/R/GoldMasterClades.R new file mode 100644 index 0000000..96e6e09 --- /dev/null +++ b/R/GoldMasterClades.R @@ -0,0 +1,58 @@ +##### Gold Master ##### + +goldmaster.blobby <- function(alpha,beta,recipient_hap, unit.dist = 1, thresh = 0.2){ + f <- function(x,c){ifelse(x nrow(alpha)-1 || as.integer(left_recipient_hap)!=left_recipient_hap){ + stop("left_recipient_hap must be an integer in [1,nrow(alpha)-1]") + } + if(c < 0 || c > 1){stop("c must be in [0,1]")} + + v <- goldmaster.blobby(alpha[,1],beta[,1],recipient_hap=left_recipient_hap,unit.dist,thresh) + v <- v + goldmaster.blobby(alpha[,2],beta[,2],recipient_hap=left_recipient_hap+1L,unit.dist,thresh) + v[seq.int(1,length(v),2)] + v[seq.int(2,length(v),2)] +} + +goldmaster.blobby.full <- function(alpha,beta,left_recipient_hap, unit.dist, thresh){ + if(ncol(alpha) != ncol(beta)){stop("alpha and beta must have the same number of columns")} + if(nrow(alpha) != nrow(beta)){stop("alpha and beta must have the same number of rows")} + + if( nrow(alpha)%%2 || ncol(alpha)%%2 ){stop("alpha must be a matrix with an even number of rows and columns")} + + if(left_recipient_hap <= 0 || left_recipient_hap > nrow(alpha)-1 || as.integer(left_recipient_hap)!=left_recipient_hap){ + stop("left_recipient_hap must be an integer in [1,nrow(alpha)-1]") + } + + if(thresh < 0 || thresh > 1){stop("thresh must be in [0,1]")} + + n.samps <- ncol(alpha)/2 + + M <- matrix(0,n.samps,n.samps) + + for(i in 1:n.samps){ + v <- goldmaster.blobby(alpha[,2*i-1],beta[,2*i-1],recipient_hap=left_recipient_hap+2*i-2L,unit.dist,thresh) + v <- v + goldmaster.blobby(alpha[,2*i],beta[,2*i],recipient_hap=left_recipient_hap+2*i-1L,unit.dist,thresh) + M[,i] <- v[seq.int(1,length(v),2)] + v[seq.int(2,length(v),2)] + } + M +} + + +CladeMat.GM <- function(fwd,bck,unit.dist,thresh = 0.2){ + M <- goldmaster.blobby.full(fwd$alpha,bck$beta,left_recipient_hap = bck$from_recipient,unit.dist,thresh) + M +} \ No newline at end of file From 2805f3110909b8484e23722fec700dda36a610c7 Mon Sep 17 00:00:00 2001 From: ryanchrist Date: Mon, 23 Sep 2024 12:11:06 -0500 Subject: [PATCH 04/43] Introducing optimal checkpointing routines and iterator interface for using checkpointing to have kalis sequentially visit (infer ancestries at) consecutive target variants --- R/Iterator.R | 694 ++++++++++++++++++++++++++++++++++++++++++++++++ R/TableCache.R | 427 +++++++++++++++++++++++++++++ src/R_OptCkpt.c | 58 ++++ src/R_OptCkpt.h | 10 + 4 files changed, 1189 insertions(+) create mode 100644 R/Iterator.R create mode 100644 R/TableCache.R create mode 100644 src/R_OptCkpt.c create mode 100644 src/R_OptCkpt.h diff --git a/R/Iterator.R b/R/Iterator.R new file mode 100644 index 0000000..e28a095 --- /dev/null +++ b/R/Iterator.R @@ -0,0 +1,694 @@ +#' Build an efficient iterator over loci +#' +#' Create a \code{kalisForwardIterator} for propagating a forward table iteratively over target loci using a table cache and optimal checkpointing. +#' +#' See example. +#' +#' +#' @param ram.ckpts an integer specifying the number of checkpoints to store in RAM +#' @param targets a vector of loci to iterate over (starting with the most downstream target) +#' @param base.fwd.table a \code{kalisForwardTable} either at the most upstream target, or if the targets are evenly spaced, one interval upstream of the most upstream target. +#' NULL (the default) is interpretted as the prior \code{Pi}, see \code{\link{Parameters}} +#' @param disk.ckpts an integer specifying the number of checkpoints to store on disk +#' @param disk.dir a path to a directory where a temporary folder may be made to store checkpoints on disk +#' @param force.unif a logical, if TRUE iterate over targets as if they were uniformly spaced. WARNING: DO NOT use this in conjunction with the targets method, still experimental. With force.unif = TRUE, the resulting iterator appear to be targeting the first length(targets) variants with all methods, but in fact will be silently iterating over the original targets. +#' @seealso \code{\link{MakeForwardTable}} to create a \code{kalisForwardTable}. +#' +#' @examples +#' \dontrun{ +#' data("SmallHaps") +#' CacheHaplotypes(SmallHaps) +#' pars <- Parameters() +#' fwd <- MakeForwardTable(pars) +#' bck <- MakeBackwardTable(pars) +#' Iter <- ForwardIterator(2) +#' for(t in targets(Iter)){ +#' Iter(fwd,pars,t) +#' Backward(bck,pars,t) +#' print(paste("Mean Distance at locus",t,"is",mean(DistMat(fwd,bck)))) +#' } +#' } +#' +#' @export +ForwardIterator <- function(pars, + ram.ckpts = 1L, + targets = 1:kalis:::pkgVars$L, + base.fwd.table = NULL, + disk.ckpts = 0, + disk.dir = NULL, + from_recipient = 1, + to_recipient = Inf, + lookup.tables = NULL, + cache = NULL, + save.cache = FALSE, + exact = TRUE, + force.unif = FALSE){ + + force(force.unif) + + # Sanity checks + #################### + ram.ckpts <- as.integer(ram.ckpts) + if(ram.ckpts <= 0){stop("ram.ckpts must be a positive integer")} + + + # Check to ensure that the cache provided can actually be recycled for this problem + + if(!is.null(cache)){ + for(i in 1:length(cache)){ + if(cache[[i]]$from_recipient!=from_recipient){stop("The provided cache must have the same from_recipient as currently requested.")} + if(cache[[i]]$to_recipient!=min(to_recipient,kalis:::pkgVars$N)){stop("The provided cache must have the same to_recipient as currently requested.")} + } + } + + + + # Only RAM checkpoints for now + ################## + # for now we ignore disk checkpoints: + num.available.ckpts <- ram.ckpts + + if(disk.ckpts != 0 | !is.null(disk.dir)){ + warning("disk checkpoints not yet implemnted, proceeding ignoring disk.ckpts and disk.dir") + } + + + # Cover case when base.fwd.table provided + ################## + + if(is.null(base.fwd.table)){ + use.pi <- TRUE + }else{ + if( !("kalisForwardTable" %in% class(base.fwd.table)) ){stop("base.fwd.table is not a kalisForwardTable")} + if(any(targets < base.fwd.table$l)){stop("no targets may be less than base.fwd.table$l")} + use.pi <- FALSE + } + + + if(force.unif){ + + if(!use.pi){ + if(targets[1] != base.fwd.table$l){stop("When using force.unif, for now, base.fwd.table$l must be AT the first target")} + } + + targets.idx <- targets + targets <- 1:length(targets) + base.fwd.table.l <- 1 + + } else { + targets.idx <- NULL + base.fwd.table.l <- base.fwd.table$l + } + + + # Figure out whether using uniform or general checkpointing + #################### + # by default + uniform.ckpts <- FALSE + first.target.given <- FALSE + + intervals <- unique(diff(targets)) + + if(length(intervals) == 1){ + if(use.pi){ + if(targets[1] == intervals){ uniform.ckpts <- TRUE } + }else{ + if(targets[1] == base.fwd.table.l){ uniform.ckpts <- TRUE; first.target.given <- TRUE} + if( (targets[1] - intervals) == base.fwd.table.l){ uniform.ckpts <- TRUE; first.target.given <- FALSE} + } + } + rm(intervals) + + + # Perform Table Benchmarking + #################### + # bench <- BenchmarkTables() + + propagation.cost <- 1:length(targets) + + + # Solve Optimal Checkpointing + ############################### + + if(uniform.ckpts){ + + if(is.null(lookup.tables)){ + message("Calculating Optimal Checkpoint Schedule") + lookup.tables <- calc_tables(propagation.cost,num.available.ckpts) + } + + cost.table <- lookup.tables$cost + index.table <- lookup.tables$index + + + + + if(!first.target.given){ + + uniform_SolveSchedule <- uniform_MakeSolveSchedule(targets,cost.table,index.table) + assign("uniform_SolveSchedule",uniform_SolveSchedule,envir = environment(uniform_SolveSchedule)) + + uniform_SolveSchedule(1,length(targets),num.available.ckpts) + + cost <- uniform_LookupCost(length(targets),num.available.ckpts,cost.table) + + }else{ + + uniform_SolveSchedule <- uniform_MakeSolveSchedule(targets[-1],cost.table,index.table) + assign("uniform_SolveSchedule",uniform_SolveSchedule,envir = environment(uniform_SolveSchedule)) + + uniform_SolveSchedule(1,length(targets)-1,num.available.ckpts) + cost <- uniform_LookupCost(length(targets)-1,num.available.ckpts,cost.table) + + } + + sch <- uniform_trim.sch(uniform_SolveSchedule) + + # I don't believe we need to modify this schedule in order to still request the baseline locus as our last target + + }else{ + + # Solve general problem + SolveSchedule <- MakeSolveSchedule(exact = exact) + environment(obj_func_for_SolveSchedule) <- environment(SolveSchedule) + + if(!first.target.given){ + cost <- SolveSchedule(d = c(targets[1],diff(targets)),targets, num.available.ckpts) + }else{ + cost <- SolveSchedule(d = diff(targets),targets[-1], num.available.ckpts) + } + sch <- trim.sch(SolveSchedule) + + } + + + # Construct Table Cache + ######################## + + max.tables <- max(sch$k) + + if(is.null(cache) || length(cache) < max.tables){ + cache <- CreateForwardTableCache(pars = pars,size = Inf, from_recipient = from_recipient, to_recipient = to_recipient, max.tables = max.tables) + + }else{ + + for(i in 1:length(cache)){ + + if(i > max.tables){ + cache[[i]] <- NULL + }else{ + # check if parameters match, if not, overwrite with warning + if(cache[[i]]$pars.sha256!=pars$sha256){ + warning("The provided cache was initialized with parameters that are different from those currently in pars. Overwritting the pars in the provided cache...") + cache[[i]]$pars.sha256 <- pars$sha256 + } + kalis:::ResetTable(cache[[i]]) + } + } + + } + + rm(pars); gc() + + + # Construct Iterator + ###################### + + UpdateCache <- MakeUpdateCache(sch, use.pi, targets.idx = targets.idx) + + current.sch <- data.frame("i" = Inf) + + current.target.index <- length(targets) + + iter.internal <- function(fwd, pars, t, nthreads = 1){ + + if(force.unif){t <- match(t,targets.idx)} + + if(current.target.index == 0){warning("This iterator has been exhausted."); return()} + + if(t == targets[current.target.index]){ + current.target.index <<- current.target.index - 1 + }else{ + stop(paste("The next target locus for this iterator is",targets[current.target.index],"not",t)) + } + + if(identical(lobstr::obj_addr(fwd),lobstr::obj_addr(base.fwd.table))){ + stop("base.fwd.table cannot point to the same table as fwd: they must be created independently.") + } + + #print(c(current.sch$i, t)) + if(current.sch$i > t){ current.sch <<- UpdateCache(cache, pars, nthreads, base.fwd.table) } + + if(current.sch$k != 0){ + CopyTable(to = fwd, from = cache[[current.sch$k]]) + }else{ + if(use.pi){ + kalis:::ResetTable(fwd) + }else{ + CopyTable(to = fwd, from = base.fwd.table) + } + } + + # Clean Up cache unless we're instructed to save it + if(current.target.index == 0){ + if(!save.cache){ + cache <<- NULL + gc() + } + } + + if(force.unif){ + Forward(fwd = fwd, pars = pars, t = targets.idx[t], nthreads = nthreads) + } else { + Forward(fwd = fwd, pars = pars, t = t, nthreads = nthreads) + } + } + + class(iter.internal) <- c("kalisIterator",class(iter.internal)) + + iter.internal +} + + +targets <- function(x) { # put this declaration above and below because it seems that order determines whether targets is recognized + UseMethod("targets") +} + +targets.kalisIterator <- function(iter){ + if(!"kalisIterator"%in%class(iter)){stop("argument must be a kalisIterator")} + rev(get("targets", envir = environment(iter))) +} + +targets <- function(x) { + UseMethod("targets") +} + +print.kalisIterator <- function(iter){ + if(!"kalisIterator"%in%class(iter)){stop("argument must be a kalisIterator")} + + if(get("current.target.index", envir=environment(iter)) == 0){ + "This is an exhausted kalisIterator." + }else{ + target.range <- range(get("targets", envir = environment(iter))) + message(paste("A kalisIterator for",length(get("targets", envir = environment(iter))),"targets ranging from",target.range[1],"to",target.range[2]),appendLF = TRUE) + message(paste("Contains",get("max.tables", envir = environment(iter)),"checkpoints using ~",utils::object.size(get("cache", envir = environment(iter)))/1e9,"Gb of RAM"),appendLF = TRUE) + message(paste("Next target locus:",get("targets", envir = environment(iter))[get("current.target.index", envir = environment(iter))]),appendLF = TRUE) + message("",appendLF = TRUE) + } +} + +plot.kalisIterator <- function(iter){ + if(!"kalisIterator"%in%class(iter)){stop("argument must be a kalisIterator")} + sch <- get("sch",envir = environment(iter)) + loci <- get("targets",envir = environment(iter)) + plot(sch$i[-c(1,nrow(sch))],sch$k[-c(1,nrow(sch))],type="h",lwd=1,bty="n",ylab="K",xlab="locus",las=1,ylim=c(0,max(sch$k)),xlim=range(loci),xaxt="n",yaxt="n") + p.loci <- pretty(loci) + axis(1,at = p.loci ,pos=0) + axis(2,at = pretty(0:max(sch$k)),pos=p.loci[1],las=2) +} + + +#' @export +calc_tables <- function(propagation.cost,max.num.checkpoints, use.R = FALSE){ + start <- proc.time() + + propagation.cost <- as.numeric(propagation.cost) + + max.n <- length(propagation.cost) + + # the first row corresponds to solving a 0 locus problem + cost.table <- matrix(0,nrow=max.n + 1,ncol= max.num.checkpoints + 1) + index.table <- matrix(0L,nrow=max.n,ncol= max.num.checkpoints) + + cost.table[,1] <- c(0,cumsum(as.numeric(propagation.cost))) + + if(use.R){ + + for(k in 1:max.num.checkpoints){ + for(n in 1:max.n){ + # now solving a n long problem with k checkpoints + v <- cost.table[1:n,k + 1] + propagation.cost[1:n] + cost.table[n:1,k] + x <- which.min(v) + index.table[n, k] <- x + cost.table[n + 1,k + 1] <- v[x] + } + print(paste(k,"done at",c(proc.time() - start)[3]/3600,"hours from start.")) + } + }else{ + invisible(.Call(CCall_OptCkpt, cost.table, index.table, propagation.cost)) + } + + cost.table <- cost.table[-1,] + return(list("cost" = cost.table,"index" = index.table)) +} + + +MakeUpdateCache <- function(sch, use.pi, cost.list = NULL, targets.idx = NULL){ + + force(targets.idx) + + track.cost <- FALSE + + if(!is.null(cost.list)){ + + mem.copy.cost = cost.list$mem.copy.cost + disk.read.cost = cost.list$disk.read.cost + disk.write.cost = cost.list$disk.write.cost + num.ram.ckpts = cost.list$num.ram.ckpts + num.disk.ckpts = cost.list$num.disk.ckpts + K <- num.ram.ckpts + num.disk.ckpts + track.cost <- TRUE + cost <- 0 + + transfer.cost <- function(to_k,from_k){ + from_ram <- from_k <= num.ram.ckpts + to_ram <- to_k <= num.ram.ckpts + + if(from_ram & to_ram){ + return(mem.copy.cost) + } + + if(!from_ram & to_ram){ + return(disk.read.cost) + } + } + + write.cost <- function(k){ + if(k <= num.ram.ckpts ){ + return(0) + }else{ + return(disk.write.cost) + } + } + + } + + + exhausted <- FALSE + current.ins <- leading.ins <- 1 + ancestor <- 1 + cost <- 0 + + function(cache, pars, nthreads, base.fwd.table){ + + if(exhausted){ + warning("This iterator has been exhausted.") + return(data.frame("k" = 0L,"i" = 0L)) + } + + repeat{ + + candidates <- which(sch$i[1:(current.ins-1)] <= sch$i[current.ins]) + ancestor <<- candidates[which.max(sch$i[candidates])] + + if(sch$i[leading.ins + 1] < sch$i[ancestor]){ # if the next checkpoint destination is on the left side of the current ancestor + current.ins <<- ancestor + return(sch[current.ins,]) + } else { + leading.ins <<- leading.ins + 1 + current.ins <<- leading.ins + } + + candidates <- which(sch$i[1:(current.ins-1)] <= sch$i[current.ins]) + ancestor <<- candidates[which.max(sch$i[candidates])] + + + if(sch$i[current.ins] != 0){ # we're not at the end yet + + # Update Cache + kk <- sch$k[current.ins] + akk <- sch$k[ancestor] + + if(akk != 0){ + if(track.cost){ cost <<- cost + transfer.cost(kk,akk) } + CopyTable(cache[[ kk ]],cache[[ akk ]]) + }else{ + if(track.cost){ cost <<- cost + transfer.cost(kk,0) } + if(use.pi){ + kalis:::ResetTable(cache[[kk]]) # Pi could also be the baseline table here for the entire interval + }else{ + CopyTable(to = cache[[kk]],base.fwd.table) + } + } + + # advance cache table from ancestor to current checkpoint destination + if(track.cost){ cost <<- cost + sum( d[ (sch$i[ancestor] + 1) : sch$i[current.ins] ])} + if(!is.null(targets.idx)){ + Forward(cache[[kk]],pars,targets.idx[sch$i[current.ins]],nthreads) + } else { + Forward(cache[[kk]],pars,sch$i[current.ins],nthreads) + } + if(sch$i[current.ins + 1] > sch$i[current.ins]){ + next + }else{ + return(sch[current.ins,]) + } + + }else{ # we are at the end + exhausted <<- TRUE + rm(cache) + rm(sch, envir = parent.env(environment())) # remove large objects from memory + gc() + return(data.frame("k" = 0L,"i" = 0L)) + } + } + } +} + + + + +uniform_MakeSolveSchedule <- function(loci,cost.table,index.table){ + + uniform_SolveSchedule <- function(){NULL} + + sch.k <- 0L + sch.i <- 0L + nrow.sch <- 1 + + function(i,j,num.available.ckpts){ # i is the index of the first locus and j is the index of the last locus in the problem to solve (from indicies[i] to indicies[j]) + + l.d <- j-i+1 + + k <- as.integer(min(l.d-1,num.available.ckpts)) + if(k==0){return(cost.table[l.d,1])} + + # at this point, we know that num.available.ckpts is at least 1 + # and l.d is at least 2 + + # If neither of the above cases, create a new instruction + ins <- which.max(sch.k < 0) # this is the first emtpy slot for an instruction + if(ins == nrow.sch){ # then we're about to assign to the last schedule entry and need to add on space for instructions before we can call obj.func + sch.k <<- c(sch.k, rep(-1L,50)) + sch.i <<- c(sch.i, rep(-1L,50)) + nrow.sch <<- length(sch.k) + ins <- which.max(sch.k < 0) + } + + sch.k[ins] <<- k + ckpt.location <- index.table[l.d,k] + sch.i[ins] <<- loci[i-1+ckpt.location] + + # solve right problem if the interval to the right contains at least one target locus + if(l.d > ckpt.location){ uniform_SolveSchedule(i+ckpt.location, j, num.available.ckpts - 1) } + #if(l.d > ckpt.location){ get("uniform_SolveSchedule", envir = parent.frame())(i+ckpt.location, j, num.available.ckpts - 1) } + + + # solve left problem if the interval to the left contains at least one target locus + if(ckpt.location > 1){ uniform_SolveSchedule(i, i-2+ckpt.location,num.available.ckpts) } + #if(ckpt.location > 1){ get("uniform_SolveSchedule", envir = parent.frame())(i, i-2+ckpt.location,num.available.ckpts) } + + + return() + } +} + +uniform_LookupCost<- function(L,num.available.ckpts,cost.table){cost.table[L,as.integer(min(L-1,num.available.ckpts)) + 1]} + +uniform_trim.sch <- function(f){ + sch.k <- get("sch.k",envir = environment(f)) + sch.i <- get("sch.i",envir = environment(f)) + # prune + if(any(sch.k == -1)){ + upper.limit <- which.max(sch.k == -1) - 1 + sch.k <- sch.k[1:upper.limit] + sch.i <- sch.i[1:upper.limit] + } + + # create dataframe schedule + sch <- data.frame("k" = c(sch.k,0L), "i" = c(sch.i,0L)) +} + + + + +obj_func_for_SolveSchedule <- function(i,ins,d,indicies,num.available.ckpts){ + + i <- floor(i) + + # clear out schedule for all entries below this instruction + sch.k[(ins+1):nrow.sch] <<- -1L + sch.i[(ins+1):nrow.sch] <<- -1L + + proposed.location <- which(indicies==i) + + # solve right problem + if(length(d) > proposed.location){ # if the interval to the right contains at least one target locus + right.cost <- SolveSchedule(d[(proposed.location+1):length(d)], + indicies[(proposed.location+1):length(d)], + num.available.ckpts - 1) + }else{ + right.cost <- 0 + } + + + # solve left problem + if(proposed.location > 1){ # if the interval to the left contains at least one target locus + left.cost <- SolveSchedule(d[1:(proposed.location-1)], + indicies[1:(proposed.location-1)], + num.available.ckpts) + }else{ + left.cost <- 0 + } + + # total up the cost + sum(d[1:proposed.location]) + left.cost + right.cost # Cost to initialize and place and record the checkpoint is first +} + + +MakeSolveSchedule <- function(exact = TRUE){ + + sch.k <- 0L + sch.i <- 0L + nrow.sch <- 1 + + function(d, indicies, num.available.ckpts){ + + l.d <- length(d) + if(l.d==0){return(0)} # nothing to solve + + k <- as.integer(min(l.d-1,num.available.ckpts)) + if(k==0){return(sum(d*(l.d:1)))} + + + # at this point, we know that num.available.ckpts is at least 1 + # and l.d is at least 2 + + + # If neither of the above cases, create a new instruction + ins <- which.max(sch.k < 0) # this is the first emtpy slot for an instruction + if(ins == nrow.sch){ # then we're about to assign to the last schedule entry and need to add on space for instructions before we can call obj.func + sch.k <<- c(sch.k, rep(-1L,50)) + sch.i <<- c(sch.i, rep(-1L,50)) + + if(sequential){ + first.index <<- c(first.index, rep(NA_integer_,50)) + last.index <<- c(last.index, rep(NA_integer_,50)) + } + nrow.sch <<- length(sch.k) + ins <- which.max(sch.k < 0) + } + + sch.k[ins] <<- k + + if( (l.d-1) <= num.available.ckpts ){ # We know what the solution is to this problem + cost <- d[1] + SolveSchedule(d[-1], + indicies[-1], + num.available.ckpts - 1) + sch.i[ins] <<- indicies[1] + + }else{ + + if(exact){ + tol <- 1e-4 + }else{ + tol <- l.d / 20 * 0.4 + } + + ans <- optimize(obj_func_for_SolveSchedule, + lower = indicies[1], # NOTE THIS WILL NEED TO BE CHANGED BACK TO LOWER IF ABOVE UNCOMMENTED + upper=tail(indicies,1), + ins = ins, + d = d, + indicies = indicies, + num.available.ckpts = num.available.ckpts, + tol = tol) + # choose tol so that we will get the exact solution when we have 20 or fewer locations to choose from + # slight approximation for larger sequences, but those being a bit off shouldn't be as critical to having the low level + # solutions all correct. + + cost <- ans$objective + sch.i[ins] <<- as.integer(floor(ans$minimum)) + } + + cost + } +} + +trim.sch <- function(f){ + sch.k <- get("sch.k",envir = environment(f)) + sch.i <- get("sch.i",envir = environment(f)) + lookup.available <- get("lookup.available",envir = environment(f)) + + if(lookup.available){ + first.index <- get("first.index",envir = environment(f)) + last.index <- get("last.index",envir = environment(f)) + } + + # prune + if(any(sch.k == -1)){ + upper.limit <- which.max(sch.k == -1) - 1 + sch.k <- sch.k[1:upper.limit] + sch.i <- sch.i[1:upper.limit] + if(lookup.available){ + first.index <- first.index[1:upper.limit] + last.index <- last.index[1:upper.limit] + } + } + + # create dataframe schedule + sch <- data.frame("k" = c(sch.k,0L), "i" = c(sch.i,0L)) + + # Expand any incomplete parts of the schedule + while(!all(is.na(first.index))){ + + # identify an entry that needs to be expanded + c.row <- which.min(is.na(first.index)) + + # separate that entry from entries above and below + if(c.row!=1){sch.above <- sch[1:(c.row-1),]}else{sch.above <- data.frame("k" = 0L, "i" = 0L); sch.above <- sch.above[-1,]} + if(c.row!=nrow(sch)){sch.below <- sch[(c.row+1):nrow(sch),]}else{sch.below <- data.frame("k" = 0L, "i" = 0L); sch.below <- sch.below[-1,]} + + kk <- sch$k[c.row] # this is the number of checkpoints we must apply over the range first.index to last.index + + + } + +} + + +# Some code for file backed checkpointing: + +# X2 <- list(x=X) +# system.time(fst:::fststore(normalizePath("~/test.fst", mustWork = FALSE), X2, +# as.integer(50), TRUE)) +# +# library(fst) +# X.res <- matrix(0, 10000, 40000) +# +# # from and to index rows from massive 1 column data.frame +# # split it up to read in 10 to 20 blocks of columns +# # check if assignment to column is triggering a copy. +# +# Read to a block and then use a C function like the substitution one I have +# to write in C from a vector to fwd$alpha +# +# system.time({ +# for(i in 1:100) { +# X.res[,i] <- read_fst("~/test.fst", from=(i-1)*10000+1, to=i*10000) +# } +# }) +# + + + diff --git a/R/TableCache.R b/R/TableCache.R new file mode 100644 index 0000000..e6a3428 --- /dev/null +++ b/R/TableCache.R @@ -0,0 +1,427 @@ +#' Create cache for forward tables +#' +#' Create an in-memory cache for forward tables to improve efficiency when +#' iterating in reverse along the haplotype sequences. +#' +#' If the objective is to run the Li and Stephens hidden Markov model both +#' forwards and backwards to the same locus (and to do so for every possible +#' locus), then considerable efficiency can be achieved by first performing a +#' full scan forwards, filling a geometrically spaced cache whilst doing so. +#' Then, by working backwards, the backward propagation moves one locus at a +#' time and the forward propagation can move backwards by moving forward from a +#' recently cached local table. +#' +#' Memory for a cache can be allocated using this function and should then be +#' filled with \code{\link{FillTableCache}}. +#' To use the cache, then instead of using the \code{\link{Forward}} function, +#' use \code{\link{ForwardUsingTableCache}}. +#' +#' @param pars a \code{kalisParameters} object, as returned by +#' \code{Parameters}. +#' @param size the maximum amount of RAM (in GB) to devote to this cache. +#' @param from_recipient first recipient haplotype if creating a partial forward +#' table cache. By default includes from the first recipient haplotype. +#' @param to_recipient last recipient haplotype if creating a partial forward +#' table cache. By default includes to the last recipient haplotype. +#' +#' @return +#' A list of forward tables representing a cache and ready to be filled is +#' returned. +#' +#' @seealso +#' \code{\link{MakeForwardTable}} to make a forward table; +#' \code{\link{FillTableCache}} to fill a cache; +#' \code{\link{ForwardUsingTableCache}} to use a cache; +#' \code{\link{Forward}} for forward function without using a cache. +#' +#' @examples +#' \dontrun{ +#' # This code assumes you have already: +#' # i) cached the haplotypes using CacheHaplotypes function +#' # ii) setup parameters in a variable called pars +#' # iii) set the number of loci in a variable called L +#' +#' # Allocate up to 10GB to a cache, with parameters already setup in pars ... +#' cache <- CreateForwardTableCache(pars, 10) +#' # ... and fill it +#' FillTableCache(cache, pars, nthreads = 8) +#' +#' # Create forward and backward tables +#' fwd <- MakeForwardTable(pars) +#' bck <- MakeBackwardTable(pars) +#' +#' # Then reach every locus faster by iterating backwards, using the cache to +#' # move the forward table into position faster +#' for(l in L:1) { +#' Backward(bck, pars, l, nthreads = 8) +#' ForwardUsingTableCache(fwd, pars, cache, l, nthreads = 8) +#' # Do whatever work is required at +#' # every locus here using fwd and bck +#' } +#' } +#' +#' @export +CreateForwardTableCache <- function(pars, size = 1, from_recipient = 1, to_recipient = Inf, max.tables = 0) { + if(!("kalisParameters" %in% class(pars))) { + stop("The pars argument is not a valid parameters object.") + } + + N <- get("N", envir = pkgVars) + L <- get("L", envir = pkgVars) + + if(anyNA(N)) { + stop("No haplotypes cached ... cannot determine table size until cache is loaded with CacheAllHaplotypes().") + } + + if(from_recipient>to_recipient) { + stop("from_recipient must be smaller than to_recipient.") + } + if(from_recipient < 1) { + from_recipient <- 1 + } + if(to_recipient > N) { + to_recipient <- N + } + delN <- to_recipient-from_recipient+1 + if(!is.vector(max.tables) || !is.numeric(max.tables) || length(max.tables) != 1 || max.tables < 0) { + stop("max.tables must be a positive scalar.") + } + + cat("Found", N, "haplotypes in the cache.") + if((delN*N+2*delN+1)*8/1e9 > size) { + stop(size, "GB is not big enough for even 1 table.") + } + cat(" Constructing table cache of appropriate size ...\n") + + if(max.tables == 0) { + max.tables <- floor(log2(L)) + } + cache <- list() + i <- 1 + while((length(cache) == 0 || ((utils::object.size(cache)*(length(cache)+1))/length(cache))/1e9 < size) && length(cache)