diff --git a/NAMESPACE b/NAMESPACE index 1d8affef..65a749df 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -41,6 +41,7 @@ export(epi_cor) export(epi_slide) export(epix_as_of) export(epix_merge) +export(epix_rbind) export(epix_slide) export(epix_truncate_versions_after) export(filter) @@ -70,6 +71,7 @@ importFrom(data.table,set) importFrom(data.table,setkeyv) importFrom(dplyr,arrange) importFrom(dplyr,bind_rows) +importFrom(dplyr,distinct) importFrom(dplyr,dplyr_col_modify) importFrom(dplyr,dplyr_reconstruct) importFrom(dplyr,dplyr_row_slice) @@ -79,16 +81,22 @@ importFrom(dplyr,group_by_drop_default) importFrom(dplyr,group_modify) importFrom(dplyr,group_vars) importFrom(dplyr,groups) +importFrom(dplyr,intersect) importFrom(dplyr,mutate) importFrom(dplyr,relocate) importFrom(dplyr,rename) importFrom(dplyr,select) +importFrom(dplyr,setdiff) importFrom(dplyr,slice) importFrom(dplyr,ungroup) +importFrom(dplyr,union) importFrom(lubridate,days) importFrom(lubridate,weeks) importFrom(magrittr,"%>%") +importFrom(purrr,map) importFrom(purrr,map_lgl) +importFrom(purrr,map_vec) +importFrom(purrr,reduce) importFrom(rlang,"!!!") importFrom(rlang,"!!") importFrom(rlang,.data) @@ -117,6 +125,7 @@ importFrom(rlang,syms) importFrom(stats,cor) importFrom(stats,median) importFrom(tibble,as_tibble) +importFrom(tidyr,fill) importFrom(tidyr,unnest) importFrom(tidyselect,eval_select) importFrom(tidyselect,starts_with) diff --git a/R/methods-epi_archive.R b/R/methods-epi_archive.R index c110555c..77db1117 100644 --- a/R/methods-epi_archive.R +++ b/R/methods-epi_archive.R @@ -123,19 +123,11 @@ epix_fill_through_version = function(x, fill_versions_end, #' @param x,y Two `epi_archive` objects to join together. #' @param sync Optional; `"forbid"`, `"na"`, `"locf"`, or `"truncate"`; in the #' case that `x$versions_end` doesn't match `y$versions_end`, what do we do?: -#' `"forbid"`: emit an error; "na": use `max(x$versions_end, y$versions_end)` -#' as the result's `versions_end`, but ensure that, if we request a snapshot -#' as of a version after `min(x$versions_end, y$versions_end)`, the -#' observation columns from the less up-to-date archive will be all NAs (i.e., -#' imagine there was an update immediately after its `versions_end` which -#' revised all observations to be `NA`); `"locf"`: use `max(x$versions_end, -#' y$versions_end)` as the result's `versions_end`, allowing the last version -#' of each observation to be carried forward to extrapolate unavailable -#' versions for the less up-to-date input archive (i.e., imagining that in the -#' less up-to-date archive's data set remained unchanged between its actual -#' `versions_end` and the other archive's `versions_end`); or `"truncate"`: -#' use `min(x$versions_end, y$versions_end)` as the result's `versions_end`, -#' and discard any rows containing update rows for later versions. +#' +#' - `"forbid"`: emit an error; +#' - "na": use `max(x$versions_end, y$versions_end)` as the result's `versions_end`, but ensure that, if we request a snapshot as of a version after `min(x$versions_end, y$versions_end)`, the observation columns from the less up-to-date archive will be all NAs (i.e., imagine there was an update immediately after its `versions_end` which revised all observations to be `NA`); +#' - `"locf"`: use `max(x$versions_end, y$versions_end)` as the result's `versions_end`, allowing the last version of each observation to be carried forward to extrapolate unavailable versions for the less up-to-date input archive (i.e., imagining that in the less up-to-date archive's data set remained unchanged between its actual `versions_end` and the other archive's `versions_end`); or +#' - `"truncate"`: use `min(x$versions_end, y$versions_end)` as the result's `versions_end`, and discard any rows containing update rows for later versions. #' @param compactify Optional; `TRUE`, `FALSE`, or `NULL`; should the result be #' compactified? See [`as_epi_archive`] for an explanation of what this means. #' Default here is `TRUE`. @@ -360,6 +352,214 @@ epix_merge = function(x, y, )) } +#' combine epi_archives by rows +#' +#' Take a sequence of archives and combine by rows. Complications arise if +#' there are `time_value`s shared between the lists. `sync` determines how +#' any later `NA`'s are treated, with the default `"forbid"` throwing an error, +#' `"na"` treating them as intentional data (no modification), and `"locf"` +#' filling forward across versions. +#' Shared keys are another problem; by default, `force_distinct=FALSE`, meaning +#' the entry in the earlier archive overwrites later archives. Otherwise there +#' is an error on shared keys +#' this function is still under active development, so there may be remaining +#' edge cases +#' +#' @param ... list of `epi_archive` objects to append in order. +#' @param sync Optional; `"forbid"`, `"na"`, or `"locf"`; in the case that later +#' versions contain `NA`'s, what do we do? +#' - `"forbid"`: emit an error if there are any shared time values between +#' different archives; +#' - `"na"`: All `NA` values are treated as actual data, and thus are +#' maintained (up to archival compression). +#' - `"locf"`: for every shared time value, use earlier versions of +#' earlier archives to overwrite any `NA`'s found in later +#' versions of later archives. +#' @param force_distinct Optional; `TRUE`, `FALSE`, or `NULL`; should the keys +#' be forced to be distinct? +#' @param compactify Optional; `TRUE`, `FALSE`, or `NULL`; should the result be +#' compactified? See [`as_epi_archive`] for an explanation of what this means. +#' Default here is `TRUE`. +#' @return the resulting `epi_archive` +#' +#' @details In all cases, `additional_metadata` will be an empty list, and +#' `clobberable_versions_start` will be set to the latest version that could +#' be clobbered in either input archive. +#' +#' @examples +#' # create two example epi_archive datasets where using rbind alone would +#' # work incorrectly +#' x1 <- archive_cases_dv_subset$DT %>% +#' dplyr::select(geo_value,time_value,version,case_rate_7d_av) %>% +#' filter(time_value < "2021-06-01") %>% +#' as_epi_archive(compactify = TRUE) +#' x2 <- archive_cases_dv_subset$DT %>% +#' dplyr::select(geo_value,time_value,version,case_rate_7d_av) %>% +#' filter(time_value >= "2021-06-01") %>% +#' as_epi_archive(compactify = TRUE) +#' y1 <- archive_cases_dv_subset$DT %>% +#' dplyr::select(geo_value, time_value, version, percent_cli) %>% +#' filter(time_value < "2021-06-01") %>% +#' as_epi_archive(compactify = TRUE) +#' y2 <- archive_cases_dv_subset$DT %>% +#' dplyr::select(geo_value, time_value, version, percent_cli) %>% +#' filter(time_value >= "2021-06-01") %>% +#' as_epi_archive(compactify = TRUE) +#' # the problematic examples +#' first_merge <- epix_merge(x1, y1) +#' second_merge <- epix_merge(x2, y2) +#' # rebind the results together +#' epix_rbind(first_merge, second_merge) +#' +#' @importFrom data.table key set setkeyv +#' @importFrom purrr map map_vec reduce +#' @importFrom dplyr setdiff union intersect group_by ungroup distinct arrange +#' @importFrom tidyr fill +#' @export +epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = FALSE, compactify = TRUE) { + # things not currently supported that may be warranted: + # 1. extra keys beyond the default ones + # 2. treating different columns differently (leaving some na's, locfing others) + # 3. there are various parameters for rbind that we haven't defined here; some of them may actually be applicable + archives <- list(...) + if (any(map_vec(archives, function(x) { + !inherits(x, "epi_archive") + }))) { + Abort("all must be of class `epi_archive`.") + } + + sync <- rlang::arg_match(sync) + + geo_type <- archives[[1]]$geo_type + if (any(map_vec(archives, function(x) { + !identical(x$geo_type, geo_type) + }))) { + Abort("all must have the same `$geo_type`") + } + + time_type <- archives[[1]]$time_type + if (any(map_vec(archives, function(x) { + !identical(x$time_type, time_type) + }))) { + Abort("all must have the same `$time_type`") + } + + for (x in archives) { + if (length(x$additional_metadata) != 0L) { + Warn("x$additional_metadata won't appear in merge result", + class = "epiprocess__epix_rbind_ignores_additional_metadata" + ) + } + } + result_additional_metadata <- list() + + clobberable_versions_start <- map_vec(archives, function(x) { + (x$clobberable_versions_start) + }) + + versions_end <- max((map_vec(archives, "versions_end"))) + + result_clobberable_versions_start <- if (all(is.na(clobberable_versions_start))) { + NA # (any type of NA is fine here) + } else { + max(clobberable_versions_start) # unlike the case of merging, we want the later date + } + # plans: + # 1. isolate the shared from the non-shared + # 2. throw everything together, do a group_by and ffill + + DTs <- map(archives, "DT") + + # check the keys are correct as done in epix_merge + keys <- map(DTs, key) + new_key <- keys[[1]] + for (ii in seq_along(DTs)) { + if (!identical(keys[[ii]], new_key)) { + # Without some sort of annotations of what various columns represent, we can't + # do something that makes sense when rbinding archives with mismatched keys. + # E.g., even if we assume extra keys represent demographic breakdowns, a + # sensible default treatment of count-type and rate-type value columns would + # differ. + if (!identical(sort(key(DTs[[ii]])), sort(new_key))) { + Abort(" + The archives must have the same set of key column names; if the + key columns represent the same things, just with different + names, please retry after manually renaming to match; if they + represent different things (e.g., x has an age breakdown + but y does not), please retry after processing them to share + the same key (e.g., by summarizing x to remove the age breakdown, + or by applying a static age breakdown to y). + ", class = "epiprocess__epix_rbind_input_must_have_same_key_set") + } + } + } + + non_by_colnames <- reduce(map(DTs, function(DT) setdiff(names(DT), new_key)), union) + # find the shared (geo_values, time_values) which requires: + # first define a function to get the unique pairs in a given archive + unique_geo_times <- function(x) { + x %>% + select(geo_value, time_value) %>% + distinct() + } + + other_keys <- dplyr::setdiff(new_key, c("geo_value", "time_value", "version")) + if (length(other_keys) != 0) { + Abort("epix_rbind does not currently support additional keys", + class = "epiprocess__epix_rbind_unsupported" + ) + } + + shared_geo_time_values <- reduce(map(DTs, unique_geo_times), intersect) + # there are no difference between the methods if there's no overlap + if (nrow(shared_geo_time_values) == 0) { + DT <- reduce(DTs, rbind) + if (force_distinct) { + DT <- distinct(DT, geo_value, time_value, version, .keep_all = TRUE) + } + } else if (sync == "forbid") { + Abort(paste( + "There are shared time values with different versions;", + "either deal with those separately, or specify how to", + "handle `NA` values (either `NA` or `locf`)." + ), "epiprocess__epix_rbind_unresolved_sync") + } else if (sync == "na") { + # doesn't really care if there are repeated time_values, simply: + # binds the results together + DT <- reduce(DTs, rbind) + # remove any redundant keys + if (force_distinct) { + DT <- distinct(DT, geo_value, time_value, version, .keep_all = TRUE) + } + # and return an archive (which sorts) + } else if (sync == "locf") { + # filter, creating shared and non shared or + # just do forward fill on all times + DT <- reduce(DTs, rbind) + + if (force_distinct) { + DT <- distinct(possibly_redundant, geo_value, time_value, version, .keep_all = TRUE) + } + DT <- DT %>% + group_by(geo_value, time_value) %>% + arrange(geo_value, time_value, version) %>% + fill(!!!non_by_colnames, .direction = "downup") %>% # everything not in the keys + ungroup() + } + + return(as_epi_archive(DT[], + geo_type = geo_type, + time_type = time_type, + other_keys = other_keys, + additional_metadata = list(), + compactify = compactify, + clobberable_versions_start = result_clobberable_versions_start, + versions_end = versions_end + )) +} + + + # Helpers for `group_by`: #' Make non-testing mock to get [`dplyr::dplyr_col_modify`] input diff --git a/man/epix_merge.Rd b/man/epix_merge.Rd index 09f67fa2..566aed09 100644 --- a/man/epix_merge.Rd +++ b/man/epix_merge.Rd @@ -16,18 +16,12 @@ epix_merge( \item{sync}{Optional; \code{"forbid"}, \code{"na"}, \code{"locf"}, or \code{"truncate"}; in the case that \code{x$versions_end} doesn't match \code{y$versions_end}, what do we do?: -\code{"forbid"}: emit an error; "na": use \code{max(x$versions_end, y$versions_end)} -as the result's \code{versions_end}, but ensure that, if we request a snapshot -as of a version after \code{min(x$versions_end, y$versions_end)}, the -observation columns from the less up-to-date archive will be all NAs (i.e., -imagine there was an update immediately after its \code{versions_end} which -revised all observations to be \code{NA}); \code{"locf"}: use \code{max(x$versions_end, y$versions_end)} as the result's \code{versions_end}, allowing the last version -of each observation to be carried forward to extrapolate unavailable -versions for the less up-to-date input archive (i.e., imagining that in the -less up-to-date archive's data set remained unchanged between its actual -\code{versions_end} and the other archive's \code{versions_end}); or \code{"truncate"}: -use \code{min(x$versions_end, y$versions_end)} as the result's \code{versions_end}, -and discard any rows containing update rows for later versions.} +\itemize{ +\item \code{"forbid"}: emit an error; +\item "na": use \code{max(x$versions_end, y$versions_end)} as the result's \code{versions_end}, but ensure that, if we request a snapshot as of a version after \code{min(x$versions_end, y$versions_end)}, the observation columns from the less up-to-date archive will be all NAs (i.e., imagine there was an update immediately after its \code{versions_end} which revised all observations to be \code{NA}); +\item \code{"locf"}: use \code{max(x$versions_end, y$versions_end)} as the result's \code{versions_end}, allowing the last version of each observation to be carried forward to extrapolate unavailable versions for the less up-to-date input archive (i.e., imagining that in the less up-to-date archive's data set remained unchanged between its actual \code{versions_end} and the other archive's \code{versions_end}); or +\item \code{"truncate"}: use \code{min(x$versions_end, y$versions_end)} as the result's \code{versions_end}, and discard any rows containing update rows for later versions. +}} \item{compactify}{Optional; \code{TRUE}, \code{FALSE}, or \code{NULL}; should the result be compactified? See \code{\link{as_epi_archive}} for an explanation of what this means. diff --git a/man/epix_rbind.Rd b/man/epix_rbind.Rd new file mode 100644 index 00000000..422d5792 --- /dev/null +++ b/man/epix_rbind.Rd @@ -0,0 +1,81 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/methods-epi_archive.R +\name{epix_rbind} +\alias{epix_rbind} +\title{combine epi_archives by rows} +\usage{ +epix_rbind( + ..., + sync = c("forbid", "na", "locf"), + force_distinct = FALSE, + compactify = TRUE +) +} +\arguments{ +\item{...}{list of \code{epi_archive} objects to append in order.} + +\item{sync}{Optional; \code{"forbid"}, \code{"na"}, or \code{"locf"}; in the case that later +versions contain \code{NA}'s, what do we do? +\itemize{ +\item \code{"forbid"}: emit an error if there are any shared time values between +different archives; +\item \code{"na"}: All \code{NA} values are treated as actual data, and thus are +maintained (up to archival compression). +\item \code{"locf"}: for every shared time value, use earlier versions of +earlier archives to overwrite any \code{NA}'s found in later +versions of later archives. +}} + +\item{force_distinct}{Optional; \code{TRUE}, \code{FALSE}, or \code{NULL}; should the keys +be forced to be distinct?} + +\item{compactify}{Optional; \code{TRUE}, \code{FALSE}, or \code{NULL}; should the result be +compactified? See \code{\link{as_epi_archive}} for an explanation of what this means. +Default here is \code{TRUE}.} +} +\value{ +the resulting \code{epi_archive} +} +\description{ +Take a sequence of archives and combine by rows. Complications arise if +there are \code{time_value}s shared between the lists. \code{sync} determines how +any later \code{NA}'s are treated, with the default \code{"forbid"} throwing an error, +\code{"na"} treating them as intentional data (no modification), and \code{"locf"} +filling forward across versions. +Shared keys are another problem; by default, \code{force_distinct=FALSE}, meaning +the entry in the earlier archive overwrites later archives. Otherwise there +is an error on shared keys +this function is still under active development, so there may be remaining +edge cases +} +\details{ +In all cases, \code{additional_metadata} will be an empty list, and +\code{clobberable_versions_start} will be set to the latest version that could +be clobbered in either input archive. +} +\examples{ +# create two example epi_archive datasets where using rbind alone would +# work incorrectly +x1 <- archive_cases_dv_subset$DT \%>\% + dplyr::select(geo_value,time_value,version,case_rate_7d_av) \%>\% + filter(time_value < "2021-06-01") \%>\% + as_epi_archive(compactify = TRUE) +x2 <- archive_cases_dv_subset$DT \%>\% + dplyr::select(geo_value,time_value,version,case_rate_7d_av) \%>\% + filter(time_value >= "2021-06-01") \%>\% + as_epi_archive(compactify = TRUE) +y1 <- archive_cases_dv_subset$DT \%>\% + dplyr::select(geo_value, time_value, version, percent_cli) \%>\% + filter(time_value < "2021-06-01") \%>\% + as_epi_archive(compactify = TRUE) +y2 <- archive_cases_dv_subset$DT \%>\% + dplyr::select(geo_value, time_value, version, percent_cli) \%>\% + filter(time_value >= "2021-06-01") \%>\% + as_epi_archive(compactify = TRUE) +# the problematic examples +first_merge <- epix_merge(x1, y1) +second_merge <- epix_merge(x2, y2) +# rebind the results together +epix_rbind(first_merge, second_merge) + +} diff --git a/tests/testthat/test-epix_rbind.R b/tests/testthat/test-epix_rbind.R new file mode 100644 index 00000000..1a745e90 --- /dev/null +++ b/tests/testthat/test-epix_rbind.R @@ -0,0 +1,83 @@ +test_that("epix_rbind merges and carries forward updates properly", { + x1 <- as_epi_archive( + data.table::as.data.table( + tibble::tribble( + ~geo_value, ~time_value, ~version, ~x_value, + # X has initial versions defined + "g1", 1L, 1:3, paste0("XA", 1:3), + ) %>% + tidyr::unchop(c(version, x_value)) %>% + dplyr::mutate(dplyr::across(c(x_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + x2 <- as_epi_archive( + data.table::as.data.table( + tibble::tribble( + ~geo_value, ~time_value, ~version, ~x_value, + # X has later versions defined as well + "g1", 1L, 4:6, paste0("XA", 4:6), + "g1", 2L, 4:6, paste0("XB", 4:6), + ) %>% + tidyr::unchop(c(version, x_value)) %>% + dplyr::mutate(dplyr::across(c(x_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + y1 <- as_epi_archive( + data.table::as.data.table( + tibble::tribble( + ~geo_value, ~time_value, ~version, ~y_value, + # y also has earlier version defined + "g1", 1L, 1:3, paste0("YA", 1:3), + ) %>% + tidyr::unchop(c(version, y_value)) %>% + dplyr::mutate(dplyr::across(c(y_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + y2 <- as_epi_archive( + data.table::as.data.table( + tibble::tribble( + ~geo_value, ~time_value, ~version, ~y_value, + # but y's data is "correct", and gets no more updates + "g1", 2L, 4:6, paste0("YB", 4:6), + ) %>% + tidyr::unchop(c(version, y_value)) %>% + dplyr::mutate(dplyr::across(c(y_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + first_merge <- epix_merge(x1, y1) + second_merge <- epix_merge(x2, y2) + # We rely on testthat edition 3 expect_identical using waldo, not identical. See + # test-epix_fill_through_version.R comments for details. + testthat::local_edition(3) + # throw an error without a setting chosen when there's conflicts + expect_error(epix_rbind(first_merge, second_merge), class = "epiprocess__epix_rbind_unresolved_sync") + + # the sync = "na" case + canonical_no_overwrite <- as_epi_archive( + data.table::as.data.table( + tibble::tribble( + ~geo_value, ~time_value, ~version, ~x_value, ~y_value, + # X has initial versions defined + "g1", 1L, 1:3, paste0("XA", 1:3), paste0("YA", 1:3), + "g1", 1L, 4:6, paste0("XA", 4:6), NA, + "g1", 2L, 4:6, paste0("XB", 4:6), paste0("YB", 4:6), + ) %>% + tidyr::unchop(c(version, x_value, y_value)) %>% + dplyr::mutate(dplyr::across(c(x_value), ~ dplyr::if_else(grepl("NA", .x), NA_character_, .x))) + ) + ) + # produce exactly the above result when NA's are unmodified + rbinded_no_overwrite <- epix_rbind(first_merge, second_merge, sync = "na") + expect_identical(rbinded_no_overwrite, canonical_no_overwrite) + + # filling forward is equivalent to doing columns rbinds first, and then merging + x <- as_epi_archive(rbind(x1$DT, x2$DT)) + y <- as_epi_archive(rbind(y1$DT, y2$DT)) + canonical_locf <- epix_merge(x, y, sync = "locf") + rbinded_locf <- epix_rbind(second_merge, first_merge, sync = "locf") + expect_identical(canonical_locf$DT, rbinded_locf$DT) + # y should be the same as epix_rbind(y1,y2), since each has distinct dates + expect_identical(epix_rbind(y1,y2, sync = "forbid"), y) + expect_identical(epix_rbind(y1,y2, sync = "na"), y) + expect_identical(epix_rbind(y1,y2, sync = "locf"), y) +})