diff --git a/.gitignore b/.gitignore index 5b5108ce..a9a43334 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,7 @@ reports/report.md cache/ data/ .vscode/ +.envrc +.nhsn_covid_cache.parquet +.nhsn_flu_cache.parquet +meta/ diff --git a/.renvignore b/.renvignore index de5b7c4a..9a19dca7 100644 --- a/.renvignore +++ b/.renvignore @@ -1,2 +1,3 @@ tmp.R node_modules/* +local_logs/ \ No newline at end of file diff --git a/Makefile b/Makefile index b48f60a7..cbd2cefa 100644 --- a/Makefile +++ b/Makefile @@ -63,9 +63,10 @@ submit: submit-covid submit-flu get_nwss: mkdir -p aux_data/nwss_covid_data; \ mkdir -p aux_data/nwss_flu_data; \ + . .venv/bin/activate; \ cd scripts/nwss_export_tool/; \ python nwss_covid_export.py; \ - python nwss_covid_export.py + python nwss_influenza_export.py run-nohup: nohup Rscript scripts/run.R & @@ -94,3 +95,9 @@ update_site: netlify: netlify deploy --dir=reports --prod + +get_flu_prod_errors: + Rscript -e "suppressPackageStartupMessages(source(here::here('R', 'load_all.R'))); get_targets_errors(project = 'flu_hosp_prod')" + +get_covid_prod_errors: + Rscript -e "suppressPackageStartupMessages(source(here::here('R', 'load_all.R'))); get_targets_errors(project = 'covid_hosp_prod')" diff --git a/R/aux_data_utils.R b/R/aux_data_utils.R index 837fa296..cfaa8310 100644 --- a/R/aux_data_utils.R +++ b/R/aux_data_utils.R @@ -188,38 +188,76 @@ daily_to_weekly <- function(epi_df, agg_method = c("sum", "mean"), day_of_week = select(-epiweek, -year) } +#' Aggregate a daily archive to a weekly archive. +#' +#' @param epi_arch the archive to aggregate. +#' @param agg_columns the columns to aggregate. +#' @param agg_method the method to use to aggregate the data, one of "sum" or "mean". +#' @param day_of_week the day of the week to use as the reference day. +#' @param day_of_week_end the day of the week to use as the end of the week. daily_to_weekly_archive <- function(epi_arch, agg_columns, agg_method = c("sum", "mean"), day_of_week = 4L, day_of_week_end = 7L) { + # How to aggregate the windowed data. agg_method <- arg_match(agg_method) + # The columns we will later group by when aggregating. keys <- key_colnames(epi_arch, exclude = c("time_value", "version")) + # The versions we will slide over. ref_time_values <- epi_arch$DT$version %>% unique() %>% sort() + # Choose a fast function to use to slide and aggregate. if (agg_method == "sum") { slide_fun <- epi_slide_sum } else if (agg_method == "mean") { slide_fun <- epi_slide_mean } - too_many_tibbles <- epix_slide( + # Slide over the versions and aggregate. + epix_slide( epi_arch, - .before = 99999999L, .versions = ref_time_values, - function(x, group, ref_time) { - ref_time_last_week_end <- - floor_date(ref_time, "week", day_of_week_end - 1) # this is over by 1 + function(x, group_keys, ref_time) { + # The last day of the week we will slide over. + ref_time_last_week_end <- floor_date(ref_time, "week", day_of_week_end - 1) + + # To find the days we will slide over, we need to find the first and last + # complete weeks of data. Get the max and min times, and then find the + # first and last complete weeks of data. + min_time <- min(x$time_value) max_time <- max(x$time_value) - valid_slide_days <- seq.Date( - from = ceiling_date(min(x$time_value), "week", week_start = day_of_week_end - 1), - to = floor_date(max(x$time_value), "week", week_start = day_of_week_end - 1), - by = 7L - ) + + # Let's determine if the min and max times are in the same week. + ceil_min_time <- ceiling_date(min_time, "week", week_start = day_of_week_end - 1) + ceil_max_time <- ceiling_date(max_time, "week", week_start = day_of_week_end - 1) + + # If they're not in the same week, this means we have at least one + # complete week of data to slide over. + if (ceil_min_time < ceil_max_time) { + valid_slide_days <- seq.Date( + from = ceiling_date(min_time, "week", week_start = day_of_week_end - 1), + to = floor_date(max_time, "week", week_start = day_of_week_end - 1), + by = 7L + ) + } else { + # This is the degenerate case, where we have about 1 week or less of + # data. In this case, we opt to return nothing for two reasons: + # 1. in most cases here, the data is incomplete for a single week, + # 2. if the data is complete, a single week of data is not enough to + # reasonably perform any kind of aggregation. + return(tibble()) + } + + # If the last day of the week is not the end of the week, add it to the + # list of valid slide days (this will produce an incomplete slide, but + # that's fine for us, since it should only be 1 day, historically.) if (wday(max_time) != day_of_week_end) { valid_slide_days <- c(valid_slide_days, max_time) } - slid_result <- x %>% + + # Slide over the days and aggregate. + x %>% group_by(across(all_of(keys))) %>% slide_fun( agg_columns, @@ -229,18 +267,13 @@ daily_to_weekly_archive <- function(epi_arch, ) %>% select(-all_of(agg_columns)) %>% rename_with(~ gsub("slide_value_", "", .x)) %>% - # only keep 1/week - # group_by week, keep the largest in each week - # alternatively - # switch time_value to the designated day of the week + rename_with(~ gsub("_7dsum", "", .x)) %>% + # Round all dates to reference day of the week. These will get + # de-duplicated by compactify in as_epi_archive below. mutate(time_value = round_date(time_value, "week", day_of_week - 1)) %>% as_tibble() } - ) - too_many_tibbles %>% - pull(time_value) %>% - max() - too_many_tibbles %>% + ) %>% as_epi_archive(compactify = TRUE) } @@ -313,9 +346,8 @@ get_health_data <- function(as_of, disease = c("covid", "flu")) { most_recent_row <- meta_data %>% # update_date is actually a time, so we need to filter for the day after. - filter(update_date <= as_of + 1) %>% - arrange(desc(update_date)) %>% - slice(1) + filter(update_date <= as.Date(as_of) + 1) %>% + slice_max(update_date) if (nrow(most_recent_row) == 0) { cli::cli_abort("No data available for the given date.") @@ -331,9 +363,7 @@ get_health_data <- function(as_of, disease = c("covid", "flu")) { if (disease == "covid") { data %<>% mutate( hhs = previous_day_admission_adult_covid_confirmed + - previous_day_admission_adult_covid_suspected + - previous_day_admission_pediatric_covid_confirmed + - previous_day_admission_pediatric_covid_suspected + previous_day_admission_pediatric_covid_confirmed ) } else if (disease == "flu") { data %<>% mutate(hhs = previous_day_admission_influenza_confirmed) @@ -594,15 +624,16 @@ gen_ili_data <- function(default_day_of_week = 1) { as_epi_archive(compactify = TRUE) } +#' Process Raw NHSN Data +#' +#' Turns the raw NHSN data into a tidy format with the following columns: +#' - geo_value: the jurisdiction of the data +#' - disease: the disease of the data +#' - time_value: the date of the data +#' - version: the version of the data +#' - value: the value of the data +#' process_nhsn_data <- function(raw_nhsn_data) { - # These are exception dates when the data was available on a different day - # than usual. In these two cases, it was the Thursday after. But to keep - # the rest of the pipeline the same, we pretend it was available on Wednesday. - remap_exceptions <- list( - "2024-12-26" = "2024-12-25", - "2025-01-02" = "2025-01-01" - ) - fixed_version <- remap_exceptions[[as.character(Sys.Date())]] %||% Sys.Date() raw_nhsn_data %>% mutate( geo_value = tolower(jurisdiction), @@ -614,7 +645,7 @@ process_nhsn_data <- function(raw_nhsn_data) { select(-weekendingdate, -jurisdiction, -starts_with("totalconf")) %>% pivot_longer(cols = starts_with("nhsn"), names_to = "disease") %>% filter(!is.na(value)) %>% - mutate(version = fixed_version) %>% + mutate(version = Sys.Date()) %>% relocate(geo_value, disease, time_value, version) } @@ -622,7 +653,50 @@ process_nhsn_data <- function(raw_nhsn_data) { # for filenames of the form nhsn_data_2024-11-19_16-29-43.191649.rds get_version_timestamp <- function(filename) ymd_hms(str_match(filename, "[0-9]{4}-..-.._..-..-..\\.[^.^_]*")) -#' all in one function to get and cache a nhsn archive from raw files +#' Remove duplicate files from S3 +#' +#' Removes duplicate files from S3 by keeping only the earliest timestamp file for each ETag. +#' You can modify keep_df, if this doesn't suit your needs. +#' +#' @param bucket The name of the S3 bucket. +#' @param prefix The prefix of the files to remove duplicates from. +#' @param dry_run Whether to actually delete the files. +#' @param .progress Whether to show a progress bar. +delete_duplicates_from_s3_by_etag <- function(bucket, prefix, dry_run = TRUE, .progress = TRUE) { + # Get a list of all new dataset snapshots from S3 + files_df <- aws.s3::get_bucket_df(bucket = bucket, prefix = prefix) %>% as_tibble() + + # Create a list of all the files to keep by keeping the earliest timestamp file for each ETag + keep_df <- files_df %>% + group_by(ETag) %>% + slice_min(LastModified) %>% + ungroup() + delete_df <- files_df %>% + anti_join(keep_df, by = "Key") + if (nrow(delete_df) > 0) { + if (dry_run) { + cli::cli_alert_info("Would delete {nrow(delete_df)} files from {bucket} with prefix {prefix}") + print(delete_df) + return(invisible(delete_df)) + } else { + delete_files_from_s3(bucket = bucket, keys = delete_df$Key, .progress = .progress) + } + } +} + +#' Delete files from S3 +#' +#' Faster than aws.s3::delete_object, when there are many files to delete (thousands). +#' +#' @param bucket The name of the S3 bucket. +#' @param keys The keys of the files to delete, as a character vector. +#' @param batch_size The number of files to delete in each batch. +#' @param .progress Whether to show a progress bar. +delete_files_from_s3 <- function(bucket, keys, batch_size = 500, .progress = TRUE) { + split(keys, ceiling(seq_along(keys) / batch_size)) %>% + purrr::walk(~aws.s3::delete_object(bucket = bucket, object = .x), .progress = .progress) +} + #' @description #' This takes in all of the raw data files for the nhsn data, creates a #' quasi-archive (it keeps one example per version-day, rather than one per @@ -709,10 +783,12 @@ create_nhsn_data_archive <- function(disease_name) { as_epi_archive(compactify = TRUE) } - up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) { disease <- arg_match(disease) - nssp_state <- pub_covidcast( + nssp_state <- retry_fn( + max_attempts = 10, + wait_seconds = 1, + fn = pub_covidcast, source = "nssp", signal = glue::glue("pct_ed_visits_{disease}"), time_type = "week", @@ -728,3 +804,26 @@ up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) { mutate(time_value = time_value + 3) %>% as_epi_archive(compactify = TRUE) } + +# Get the last time the signal was updated. +get_covidcast_signal_last_update <- function(source, signal, geo_type) { + pub_covidcast_meta() %>% + filter(source == !!source, signal == !!signal, geo_type == !!geo_type) %>% + pull(last_update) %>% + as.POSIXct() +} + +# Get the last time the Socrata dataset was updated. +get_socrata_updated_at <- function(dataset_url) { + httr::GET(dataset_url) %>% + httr::content() %>% + pluck("rowsUpdatedAt") %>% + as.POSIXct() +} + +get_s3_object_last_modified <- function(bucket, key) { + # Format looks like "Fri, 31 Jan 2025 22:01:16 GMT" + attr(aws.s3::head_object(key, bucket = bucket), "last-modified") %>% + str_replace_all(" GMT", "") %>% + as.POSIXct(format = "%a, %d %b %Y %H:%M:%S") +} diff --git a/R/forecasters/data_transforms.R b/R/forecasters/data_transforms.R index 903f8b03..83012e2b 100644 --- a/R/forecasters/data_transforms.R +++ b/R/forecasters/data_transforms.R @@ -26,7 +26,7 @@ get_nonkey_names <- function(epi_data) { #' Get a rolling average for the named columns #' -#' Add column(s) that are the rolling means of the specified columns, as +#' Add a column that is the rolling means of the specified column, as #' implemented by slider. Defaults to the previous 7 days. Currently only #' group_by's on the geo_value. Should probably extend to more keys if you have #' them. @@ -40,15 +40,10 @@ get_nonkey_names <- function(epi_data) { #' @export rolling_mean <- function(epi_data, width = 7L, cols_to_mean = NULL) { cols_to_mean <- get_trainable_names(epi_data, cols_to_mean) - epi_data %<>% group_by(across(key_colnames(epi_data, exclude = "time_value"))) - for (col in cols_to_mean) { - mean_name <- paste0("slide_", col, "_m", width) - epi_data %<>% - epi_slide_mean(all_of(col), .window_size = width) %>% - rename(!!mean_name := paste0("slide_value_", col)) - } - epi_data %<>% ungroup() - return(epi_data) + epi_data %>% + group_by(across(key_colnames(epi_data, exclude = "time_value"))) %>% + epi_slide_mean(all_of(cols_to_mean), .window_size = width, .new_col_names = paste0("slide_", cols_to_mean, "_m", width)) %>% + ungroup() } #' Get a rolling standard deviation for the named columns @@ -75,20 +70,18 @@ rolling_sd <- function(epi_data, sd_width = 29L, mean_width = NULL, cols_to_sd = cols_to_sd <- get_trainable_names(epi_data, cols_to_sd) result <- epi_data result %<>% group_by(across(key_colnames(epi_data, exclude = "time_value"))) - for (col in cols_to_sd) { - mean_name <- glue::glue("slide_{col}_m{mean_width}") - sd_name <- glue::glue("slide_{col}_sd{sd_width}") + for (col_name in cols_to_sd) { + mean_name <- glue::glue("slide_{col_name}_m{mean_width}") + sd_name <- glue::glue("slide_{col_name}_sd{sd_width}") result %<>% - epi_slide_mean(all_of(col), .window_size = mean_width) %>% - rename(!!mean_name := paste0("slide_value_", col)) + epi_slide_mean(all_of(col_name), .window_size = mean_width, .new_col_names = mean_name) result %<>% - mutate(.temp = (.data[[mean_name]] - .data[[col]])^2) %>% - epi_slide_mean(all_of(".temp"), .window_size = sd_width) %>% - select(-.temp) %>% - rename(!!sd_name := "slide_value_.temp") %>% - mutate(!!sd_name := sqrt(.data[[sd_name]])) + mutate(.temp = (.data[[mean_name]] - .data[[col_name]])^2) %>% + epi_slide_mean(all_of(".temp"), .window_size = sd_width, .new_col_names = sd_name) %>% + mutate(!!sd_name := sqrt(.data[[sd_name]])) %>% + select(-.temp) if (!keep_mean) { result %<>% select(-{{ mean_name }}) diff --git a/R/forecasters/ensemble_average.R b/R/forecasters/ensemble_average.R index 03449d32..81a79cd8 100644 --- a/R/forecasters/ensemble_average.R +++ b/R/forecasters/ensemble_average.R @@ -32,12 +32,12 @@ ensemble_average <- function(epi_data, # their names are separated for obscure target related reasons names(ensemble_args) <- ensemble_args_names %||% names(ensemble_args) average_type <- ensemble_args$average_type %||% median - join_columns <- ensemble_args$join_columns %||% setdiff(names(forecasts[[1]]), "prediction") + join_columns <- ensemble_args$join_columns %||% setdiff(names(forecasts[[1]]), "value") # begin actual analysis forecasts %>% bind_rows(.id = "id") %>% group_by(across(all_of(join_columns))) %>% - summarize(prediction = average_type(prediction, na.rm = TRUE), .groups = "drop") %>% + summarize(value = average_type(value, na.rm = TRUE), .groups = "drop") %>% ungroup() } diff --git a/R/forecasters/ensemble_linear_climate.R b/R/forecasters/ensemble_linear_climate.R index 8ce5c138..1113f12a 100644 --- a/R/forecasters/ensemble_linear_climate.R +++ b/R/forecasters/ensemble_linear_climate.R @@ -128,3 +128,40 @@ make_ahead_weights <- function(aheads, tibble(forecast_family = "linear", ahead = sort(aheads), weight = 1 - ahead_weight_values) ) } + +ensemble_weighted <- function(forecasts, other_weights) { + forecasters <- unique(forecasts$forecaster) + filtered_weights <- other_weights %>% + filter(forecaster %in% forecasters) %>% + inner_join( + forecasts %>% distinct(forecaster, geo_value), + by = c("forecaster", "geo_value"), + ) + full_weights <- filtered_weights %>% + left_join( + forecasts %>% mutate(ahead = target_end_date - forecast_date) %>% distinct(forecaster, ahead), + by = "forecaster", + relationship = "many-to-many" + ) + grouping_cols <- c("geo_value", "ahead") + renorm <- + full_weights %>% + group_by(across(all_of(grouping_cols))) %>% + summarize(mass = sum(weight), .groups = "drop") + full_weights <- full_weights %>% + left_join(renorm, by = grouping_cols) %>% + mutate(weight = weight / mass) %>% + select(-mass) + weighted_forecasts <- + forecasts %>% + mutate(ahead = target_end_date - forecast_date) %>% + left_join( + full_weights, + by = c("forecaster", "forecast_date", grouping_cols) + ) %>% + mutate(value = weight * value) %>% + group_by(geo_value, forecast_date, target_end_date, quantile) %>% + summarize(value = sum(value, na.rm = TRUE), .groups = "drop") %>% + sort_by_quantile() + return(weighted_forecasts) +} diff --git a/R/forecasters/epipredict_utilities.R b/R/forecasters/epipredict_utilities.R index a41539c0..8a075fbe 100644 --- a/R/forecasters/epipredict_utilities.R +++ b/R/forecasters/epipredict_utilities.R @@ -114,6 +114,11 @@ run_workflow_and_format <- function(preproc, if (is.null(as_of)) { as_of <- max(train_data$time_value) } + + # Look at the train data (uncomment for debuggin). + # df <- preproc %>% prep(train_data) %>% bake(train_data) + # browser() + workflow <- epi_workflow(preproc, trainer) %>% fit(train_data) %>% add_frosting(postproc) @@ -125,9 +130,7 @@ run_workflow_and_format <- function(preproc, # keeping only the last time_value for any given location/key pred %<>% group_by(across(all_of(key_colnames(train_data, exclude = "time_value")))) %>% - # TODO: slice_max(time_value)? - arrange(time_value) %>% - filter(row_number() == n()) %>% + slice_max(time_value) %>% ungroup() return(format_storage(pred, as_of)) } diff --git a/R/forecasters/forecaster_baseline_linear.R b/R/forecasters/forecaster_baseline_linear.R index a185d944..13faf802 100644 --- a/R/forecasters/forecaster_baseline_linear.R +++ b/R/forecasters/forecaster_baseline_linear.R @@ -1,5 +1,6 @@ #' epi_data is expected to have: geo_value, time_value, and value columns. forecaster_baseline_linear <- function(epi_data, ahead, log = FALSE, sort = FALSE, residual_tail = 0.85, residual_center = 0.085, no_intercept = FALSE) { + epi_data <- validate_epi_data(epi_data) forecast_date <- attributes(epi_data)$metadata$as_of population_data <- get_population_data() %>% rename(geo_value = state_id) %>% diff --git a/R/forecasters/forecaster_climatological.R b/R/forecasters/forecaster_climatological.R index 25b2f5fa..47f283c4 100644 --- a/R/forecasters/forecaster_climatological.R +++ b/R/forecasters/forecaster_climatological.R @@ -16,6 +16,9 @@ climate_linear_ensembled <- function(epi_data, scale_method <- arg_match(scale_method) center_method <- arg_match(center_method) nonlin_method <- arg_match(nonlin_method) + + epi_data <- validate_epi_data(epi_data) + args_list <- list(...) ahead <- as.integer(ahead / 7) epi_data %<>% filter_extraneous(filter_source, filter_agg_level) diff --git a/R/forecasters/forecaster_flatline.R b/R/forecasters/forecaster_flatline.R index 632b96d4..0ede389d 100644 --- a/R/forecasters/forecaster_flatline.R +++ b/R/forecasters/forecaster_flatline.R @@ -17,6 +17,7 @@ flatline_fc <- function(epi_data, filter_source = "", filter_agg_level = "", ...) { + epi_data <- validate_epi_data(epi_data) # perform any preprocessing not supported by epipredict epi_data %<>% filter_extraneous(filter_source, filter_agg_level) # this is a temp fix until a real fix gets put into epipredict diff --git a/R/forecasters/forecaster_flusion.R b/R/forecasters/forecaster_flusion.R index 6e811d59..0954bdb1 100644 --- a/R/forecasters/forecaster_flusion.R +++ b/R/forecasters/forecaster_flusion.R @@ -22,6 +22,9 @@ flusion <- function(epi_data, center_method <- arg_match(center_method) nonlin_method <- arg_match(nonlin_method) derivative_estimator <- arg_match(derivative_estimator) + + epi_data <- validate_epi_data(epi_data) + # perform any preprocessing not supported by epipredict args_input <- list(...) # this next part is basically unavoidable boilerplate you'll want to copy diff --git a/R/forecasters/forecaster_no_recent_outcome.R b/R/forecasters/forecaster_no_recent_outcome.R index 40046363..048cbfa6 100644 --- a/R/forecasters/forecaster_no_recent_outcome.R +++ b/R/forecasters/forecaster_no_recent_outcome.R @@ -22,6 +22,9 @@ no_recent_outcome <- function(epi_data, center_method <- arg_match(center_method) nonlin_method <- arg_match(nonlin_method) week_method <- arg_match(week_method) + + epi_data <- validate_epi_data(epi_data) + # this is for the case where there are multiple sources in the same column epi_data %<>% filter_extraneous(filter_source, filter_agg_level) args_input <- list(...) diff --git a/R/forecasters/forecaster_scaled_pop.R b/R/forecasters/forecaster_scaled_pop.R index dd3a92d8..23636922 100644 --- a/R/forecasters/forecaster_scaled_pop.R +++ b/R/forecasters/forecaster_scaled_pop.R @@ -62,6 +62,9 @@ scaled_pop <- function(epi_data, scale_method <- arg_match(scale_method) center_method <- arg_match(center_method) nonlin_method <- arg_match(nonlin_method) + + epi_data <- validate_epi_data(epi_data) + # perform any preprocessing not supported by epipredict # # this is for the case where there are multiple sources in the same column diff --git a/R/forecasters/forecaster_scaled_pop_seasonal.R b/R/forecasters/forecaster_scaled_pop_seasonal.R index 77642e6e..5f858b1c 100644 --- a/R/forecasters/forecaster_scaled_pop_seasonal.R +++ b/R/forecasters/forecaster_scaled_pop_seasonal.R @@ -56,6 +56,9 @@ scaled_pop_seasonal <- function(epi_data, scale_method <- arg_match(scale_method) center_method <- arg_match(center_method) nonlin_method <- arg_match(nonlin_method) + + epi_data <- validate_epi_data(epi_data) + if (typeof(seasonal_method) == "list") { seasonal_method <- seasonal_method[[1]] } diff --git a/R/forecasters/forecaster_smoothed_scaled.R b/R/forecasters/forecaster_smoothed_scaled.R index 650fc809..538dea9e 100644 --- a/R/forecasters/forecaster_smoothed_scaled.R +++ b/R/forecasters/forecaster_smoothed_scaled.R @@ -71,6 +71,9 @@ smoothed_scaled <- function(epi_data, scale_method <- arg_match(scale_method) center_method <- arg_match(center_method) nonlin_method <- arg_match(nonlin_method) + + epi_data <- validate_epi_data(epi_data) + # perform any preprocessing not supported by epipredict # # this is for the case where there are multiple sources in the same column @@ -170,6 +173,7 @@ smoothed_scaled <- function(epi_data, keep_mean = keep_mean ) } + # need to make a version with the non seasonal and problematic flu seasons removed if (drop_non_seasons) { season_data <- epi_data %>% drop_non_seasons() diff --git a/R/forecasters/formatters.R b/R/forecasters/formatters.R index 16f6024f..41fa6e93 100644 --- a/R/forecasters/formatters.R +++ b/R/forecasters/formatters.R @@ -72,6 +72,26 @@ format_flusight <- function(pred, disease = c("flu", "covid")) { select(reference_date, target, horizon, target_end_date, location, output_type, output_type_id, value) } +format_scoring_utils <- function(forecasts_and_ensembles, disease = c("flu", "covid")) { + forecasts_and_ensembles %>% + filter(!grepl("region.*", geo_value)) %>% + mutate( + reference_date = get_forecast_reference_date(forecast_date), + target = glue::glue("wk inc {disease} hosp"), + horizon = as.integer(floor((target_end_date - reference_date) / 7)), + output_type = "quantile", + output_type_id = quantile, + value = value + ) %>% + left_join( + get_population_data() %>% + select(state_id, state_code), by = c("geo_value" = "state_id") + ) %>% + rename(location = state_code, model_id = forecaster) %>% + select(reference_date, target, horizon, target_end_date, location, output_type, output_type_id, value, model_id) %>% + drop_na() +} + #' The quantile levels used by the covidhub repository #' #' @param type either standard or inc_case, with inc_case being a small subset of the standard diff --git a/R/imports.R b/R/imports.R index 4f82b35d..94809ecd 100644 --- a/R/imports.R +++ b/R/imports.R @@ -29,6 +29,7 @@ library(parsnip) library(paws.storage) library(plotly) library(purrr) +library(qs2) library(quantreg) library(readr) library(recipes) diff --git a/R/scoring.R b/R/scoring.R index d0742b2b..1cd79aad 100644 --- a/R/scoring.R +++ b/R/scoring.R @@ -1,10 +1,10 @@ # Scoring and Evaluation Functions -evaluate_predictions <- function(predictions_cards, truth_data) { - checkmate::assert_data_frame(predictions_cards) +evaluate_predictions <- function(forecasts, truth_data) { + checkmate::assert_data_frame(forecasts) checkmate::assert_data_frame(truth_data) checkmate::assert_names( - names(predictions_cards), + names(forecasts), must.include = c("model", "geo_value", "forecast_date", "target_end_date", "quantile", "prediction") ) checkmate::assert_names( @@ -12,19 +12,23 @@ evaluate_predictions <- function(predictions_cards, truth_data) { must.include = c("geo_value", "target_end_date", "true_value") ) - left_join(predictions_cards, truth_data, by = c("geo_value", "target_end_date")) %>% - scoringutils::score(metrics = c("interval_score", "ae_median", "coverage")) %>% - scoringutils::add_coverage(by = c("model", "geo_value", "forecast_date", "target_end_date"), ranges = c(80)) %>% - scoringutils::summarize_scores(by = c("model", "geo_value", "forecast_date", "target_end_date")) %>% + forecast_obj <- left_join(forecasts, truth_data, by = c("geo_value", "target_end_date")) %>% + scoringutils::as_forecast_quantile( + quantile_level = "quantile", + observed = "true_value", + predicted = "prediction", + forecast_unit = c("model", "geo_value", "forecast_date", "target_end_date") + ) + + scores <- forecast_obj %>% + scoringutils::score(metrics = get_metrics(.)) %>% as_tibble() %>% select( - model, - geo_value, - forecast_date, - target_end_date, - wis = interval_score, + model, geo_value, forecast_date, target_end_date, + wis, ae = ae_median, - coverage_80 + coverage_50 = interval_coverage_50, + coverage_90 = interval_coverage_90 ) %>% mutate(ahead = as.numeric(target_end_date - forecast_date)) } diff --git a/R/targets/score_targets.R b/R/targets/score_targets.R new file mode 100644 index 00000000..3dcd385f --- /dev/null +++ b/R/targets/score_targets.R @@ -0,0 +1,65 @@ +get_external_forecasts <- function(disease) { + locations_crosswalk <- get_population_data() %>% + select(state_id, state_code) %>% + filter(state_id != "usa") + arrow::read_parquet(glue::glue("data/forecasts/{disease}_hosp_forecasts.parquet")) %>% + filter(output_type == "quantile") %>% + select(forecaster, geo_value = location, forecast_date, target_end_date, quantile = output_type_id, value) %>% + inner_join(locations_crosswalk, by = c("geo_value" = "state_code")) %>% + mutate(geo_value = state_id) %>% + select(forecaster, geo_value, forecast_date, target_end_date, quantile, value) +} + +score_forecasts <- function(nhsn_latest_data, joined_forecasts_and_ensembles) { + truth_data <- + nhsn_latest_data %>% + select(geo_value, target_end_date = time_value, oracle_value = value) %>% + left_join( + get_population_data() %>% + select(state_id, state_code), + by = c("geo_value" = "state_id") + ) %>% + drop_na() %>% + rename(location = state_code) %>% + select(-geo_value) + forecasts_formatted <- + joined_forecasts_and_ensembles %>% + format_scoring_utils(disease = "covid") + scores <- forecasts_formatted %>% + filter(location != "US") %>% + hubEvals::score_model_out( + truth_data, + metrics = c("wis", "ae_median", "interval_coverage_50", "interval_coverage_90"), + summarize = FALSE, + by = c("model_id") + ) + scores %>% + left_join( + get_population_data() %>% + select(state_id, state_code), + by = c("location" = "state_code") + ) %>% + rename( + forecaster = model_id, + forecast_date = reference_date, + ahead = horizon, + geo_value = state_id + ) %>% + select(-location) +} + + +render_score_plot <- function(score_report_rmd, scores, forecast_dates, disease) { + rmarkdown::render( + score_report_rmd, + params = list( + scores = scores, + forecast_dates = forecast_dates, + disease = disease + ), + output_file = here::here( + "reports", + glue::glue("{disease}_backtesting_2024_2025_on_{as.Date(Sys.Date())}") + ) + ) +} diff --git a/R/utils.R b/R/utils.R index 567d7b1c..8448754d 100644 --- a/R/utils.R +++ b/R/utils.R @@ -155,7 +155,7 @@ get_exclusions <- function( } data_substitutions <- function(dataset, disease, forecast_generation_date) { - disease <- "flu" + # Get the substitutions from the table, matched by forecast generation date substitutions <- readr::read_csv( glue::glue("{disease}_data_substitutions.csv"), comment = "#", @@ -163,42 +163,56 @@ data_substitutions <- function(dataset, disease, forecast_generation_date) { ) %>% filter(forecast_date == forecast_generation_date) %>% select(-forecast_date) %>% - rename(new_value = value) - dataset %>% - left_join(substitutions) %>% + rename(new_value = value) %>% + select(-time_value) + # Replace the most recent values in the appropriate keys with the substitutions + new_values <- dataset %>% + group_by(geo_value) %>% + slice_max(time_value) %>% + inner_join(substitutions) %>% mutate(value = ifelse(!is.na(new_value), new_value, value)) %>% select(-new_value) + # Remove keys from dataset that have been substituted + dataset %>% + anti_join(new_values, by = c("geo_value", "time_value")) %>% + bind_rows(new_values) } -parse_prod_weights <- function(filename = here::here("covid_geo_exclusions.csv"), - forecast_date_int, forecaster_fns) { - forecast_date <- as.Date(forecast_date_int) +parse_prod_weights <- function(filename, forecast_date_int, forecaster_fn_names) { + forecast_date_val <- as.Date(forecast_date_int) all_states <- c( unique(readr::read_csv("https://raw.githubusercontent.com/cmu-delphi/covidcast-indicators/refs/heads/main/_delphi_utils_python/delphi_utils/data/2020/state_pop.csv", show_col_types = FALSE)$state_id), "usa", "us" ) all_prod_weights <- readr::read_csv(filename, comment = "#", show_col_types = FALSE) # if we haven't set specific weights, use the overall defaults - useful_prod_weights <- filter(all_prod_weights, forecast_date == forecast_date) + useful_prod_weights <- filter(all_prod_weights, forecast_date == forecast_date_val) if (nrow(useful_prod_weights) == 0) { useful_prod_weights <- all_prod_weights %>% filter(forecast_date == min(forecast_date)) %>% - mutate(forecast_date = forecast_date) + mutate(forecast_date = forecast_date_val) } - useful_prod_weights %>% + # weights that apply to specific states + state_weights <- useful_prod_weights %>% + filter(geo_value != "all") %>% mutate( - geo_value = ifelse(geo_value == "all", list(all_states), geo_value), + forecaster = ifelse(forecaster == "all", list(forecaster_fn_names), forecaster), ) %>% - unnest_longer(geo_value) %>% + unnest_longer(forecaster) + forecaster_weights <- + useful_prod_weights %>% + filter(geo_value == "all") %>% mutate( - forecaster = ifelse(forecaster == "all", list(names(forecaster_fns)), forecaster), + geo_value = list(all_states) ) %>% - unnest_longer(forecaster) %>% + unnest_longer(geo_value) + # bind together and overwrite any generic weights with geo_specific ones + forecaster_weights %>% + bind_rows(state_weights) %>% group_by(forecast_date, forecaster, geo_value) %>% - summarize(weight = min(weight), .groups = "drop") %>% + filter(row_number() == n()) %>% mutate(forecast_date = as.Date(forecast_date_int)) %>% - group_by(forecast_date, geo_value) %>% - mutate(weight = ifelse(near(weight, 0), 0, weight / sum(weight))) + ungroup() } exclude_geos <- function(geo_forecasters_weights) { @@ -280,11 +294,18 @@ write_submission_file <- function(pred, forecast_reference_date, submission_dire #' Utility to get the reference date for a given date. This is the last day of #' the epiweek that the date falls in. get_forecast_reference_date <- function(date) { - date <- as.Date(date) + date <- as.Date(date) MMWRweek::MMWRweek2Date(lubridate::epiyear(date), lubridate::epiweek(date)) + 6 } -update_site <- function() { +#' Update the site with the latest reports. +#' +#' Looks at that `reports/` directory and updates `template.md` with new reports +#' that follow a naming convention. This is translated into `report.md` which is +#' then converted to `index.html` with pandoc. +#' +#' @param sync_to_s3 Whether to sync the reports to the S3 bucket. +update_site <- function(sync_to_s3 = TRUE) { library(fs) library(stringr) # Define the directories @@ -297,12 +318,15 @@ update_site <- function() { } # Sync the reports directory with the S3 bucket - aws.s3::s3sync(path = reports_dir, bucket = "forecasting-team-data", prefix = "reports-2024/", verbose = FALSE) + if (sync_to_s3) { + aws.s3::s3sync(path = reports_dir, bucket = "forecasting-team-data", prefix = "reports-2024/", verbose = FALSE) + } # Read the template file if (!file_exists(template_path)) { stop("Template file does not exist.") } + report_md_content <- readLines(template_path) # Get the list of files in the reports directory report_files <- dir_ls(reports_dir, regexp = ".*_prod_on_.*.html") @@ -322,8 +346,7 @@ update_site <- function() { # forecast date used_reports <- report_table %>% group_by(forecast_date, disease) %>% - arrange(generation_date) %>% - filter(generation_date == max(generation_date)) %>% + slice_max(generation_date) %>% ungroup() %>% arrange(forecast_date) @@ -333,13 +356,38 @@ update_site <- function() { file_parts <- str_split(fs::path_ext_remove(file_name), "_", simplify = TRUE) date <- file_parts[1] disease <- file_parts[2] + generation_date <- file_parts[5] - report_link <- sprintf("- [%s Forecasts %s](%s)", str_to_title(disease), date, file_name) + report_link <- sprintf("- [%s Forecasts %s, Rendered %s](%s)", str_to_title(disease), date, generation_date, file_name) # Insert into Production Reports section, skipping a line prod_reports_index <- which(grepl("## Production Reports", report_md_content)) + 1 report_md_content <- append(report_md_content, report_link, after = prod_reports_index) } + # add scoring notebooks if they exist + score_files <- dir_ls(reports_dir, regexp = ".*_backtesting_2024_2025_on_.*.html") + if (length(score_files) > 0) { + # a tibble of all score files, along with their generation date and disease + score_table <- tibble( + filename = score_files, + dates = str_match_all(filename, "[0-9]{4}-..-..") + ) %>% + unnest_wider(dates, names_sep = "_") %>% + rename(generation_date = dates_1) %>% + mutate( + generation_date = ymd(generation_date), + disease = str_match(filename, "flu|covid") + ) + used_files <- score_table %>% + group_by(disease) %>% + slice_max(generation_date) + # iterating over the diseases + for (row_num in seq_along(used_files$filename)) { + scoring_index <- which(grepl("### Scoring this season", report_md_content)) + 1 + score_link <- sprintf("- [%s Scoring, Rendered %s](%s)", str_to_title(used_files$disease[[row_num]]), used_files$generation_date[[row_num]], used_files$filename[[row_num]]) + report_md_content <- append(report_md_content, score_link, after = scoring_index) + } + } # Write the updated content to report.md report_md_path <- path(reports_dir, "report.md") @@ -349,6 +397,76 @@ update_site <- function() { system("pandoc reports/report.md -s -o reports/index.html --css=reports/style.css --mathjax='https://cdn.jsdelivr.net/npm/mathjax@3/es5/tex-mml-chtml.js' --metadata pagetitle='Delphi Reports'") } +#' Delete unused reports from the S3 bucket. +#' +#' @param dry_run List files that would be deleted if `dry_run` is `FALSE`. +delete_extra_s3_files <- function(dry_run = TRUE) { + local_path <- "reports" + bucket <- "forecasting-team-data" + prefix <- "reports-2024/" + # Get list of local files (relative paths) + local_files <- list.files(local_path, recursive = TRUE) + + # Get list of S3 files + s3_objects <- aws.s3::get_bucket(bucket, prefix = prefix) + s3_files <- sapply(s3_objects, function(x) x$Key) + + # Find files that exist in S3 but not locally + # Remove prefix from s3_files for comparison + s3_files_clean <- gsub(prefix, "", s3_files) + files_to_delete <- s3_files[!(s3_files_clean %in% local_files)] + + if (dry_run) { + message("Would delete ", length(files_to_delete), " files from S3") + message("Files: ", paste(files_to_delete, collapse = ", ")) + return(invisible(files_to_delete)) + } + + # Delete each extra file + if (length(files_to_delete) > 0) { + message("Deleting ", length(files_to_delete), " files from S3") + for (file in files_to_delete) { + message("Deleting: ", file) + aws.s3::delete_object(file, bucket) + } + } else { + message("No files to delete") + } +} + +#' Find unused report files in index.html. +find_unused_report_files <- function() { + library(rvest) + library(fs) + library(stringr) + + # Read all files in reports directory + all_files <- dir_ls("reports", recurse = TRUE) %>% + path_file() # just get filenames, not full paths + + # Read index.html and extract all href links + index_html <- read_html("reports/index.html") + used_files <- index_html %>% + html_elements("a") %>% + html_attr("href") %>% + # Add known required files like CSS + c("style.css", "template.md", "report.md", "index.html", .) %>% + # Remove links like "https://" from the list + keep(~ !grepl("^https?://", .)) + + # Find files that exist but aren't referenced + unused_files <- setdiff(all_files, used_files) + + if (length(unused_files) > 0) { + cat("The following files in 'reports' are not referenced in index.html:\n") + cat(paste("-", unused_files), sep = "\n") + } else { + cat("All files in 'reports' are referenced in index.html\n") + } + + return(invisible(unused_files)) +} + #' Ensure that forecast values are monotically increasing #' in quantile order. sort_by_quantile <- function(forecasts) { @@ -358,3 +476,91 @@ sort_by_quantile <- function(forecasts) { mutate(value = sort(value)) %>% ungroup() } + + +#' Print recent targets errors. +get_targets_errors <- function(project = tar_path_store(), top_n = 10) { + meta_df <- targets::tar_meta(store = project) + forecast_errors <- meta_df %>% + filter(!is.na(parent), !is.na(error)) %>% + distinct(parent, error, .keep_all = TRUE) %>% + mutate(parent = gsub("forecast_", "", parent)) %>% + slice_max(time, n = top_n) + + # Print each error message, along with the parent target. + if (nrow(forecast_errors) > 0) { + cat("Forecast errors:\n") + for (i in 1:nrow(forecast_errors)) { + cli::cli_inform(c( + "Parent target: {forecast_errors$parent[i]}", + "Time: {forecast_errors$time[i]}", + "Error: {forecast_errors$error[i]}" + )) + } + } + + other_errors <- meta_df %>% + filter(!is.na(error)) %>% + distinct(error, .keep_all = TRUE) %>% + slice_max(time, n = top_n) + + # Print each error message, along with the parent target. + if (nrow(other_errors) > 0) { + cat("Other errors:\n") + for (i in 1:nrow(other_errors)) { + cli::cli_inform(c( + "Target: {other_errors$name[i]}", + "Time: {other_errors$time[i]}", + "Error: {other_errors$error[i]}" + )) + } + } + + return(invisible(meta_df %>% slice_max(time, n = top_n))) +} + +#' Retry a function. +#' +#' @param max_attempts The maximum number of attempts. +#' @param wait_seconds The number of seconds to wait between attempts. +#' @param fn The function to retry. +#' @param ... Additional arguments to pass to the function. +#' +#' @examples +#' retry_fn( +#' max_attempts = 10, +#' wait_seconds = 1, +#' fn = pub_covidcast, +#' source = "nssp", +#' signal = "pct_ed_visits_covid", +#' geo_type = "state", +#' geo_values = "*", +#' time_type = "week" +#' ) +retry_fn <- function(max_attempts = 10, wait_seconds = 1, fn, ...) { + for (attempt in 1:max_attempts) { + tryCatch( + { + result <- fn(...) + return(result) # Return successful result + }, + error = function(e) { + if (attempt == max_attempts) { + stop("Maximum retry attempts reached. Last error: ", e$message) + } + message(sprintf("Attempt %d failed. Retrying in %d second(s)...", attempt, wait_seconds)) + Sys.sleep(wait_seconds) + } + ) + } +} + +validate_epi_data <- function(epi_data) { + if (!inherits(epi_data, "epi_df")) { + epi_data <- epi_data %>% as_epi_df(as_of = max(epi_data$time_value)) + } + if (is.null(attributes(epi_data)$metadata$as_of)) { + attributes(epi_data)$metadata$as_of <- max(epi_data$time_value) + } + return(epi_data) +} diff --git a/README.md b/README.md index 4b2217d2..103d161b 100644 --- a/README.md +++ b/README.md @@ -50,9 +50,9 @@ make prod-flu make prod-covid # The job output can be found in nohup.out -# If there are errors, you can view them with the following command (replace with appropriate project) -Sys.setenv(TAR_PROJECT = "covid_hosp_prod"); -targets::tar_meta(fields = error, complete_only = FALSE) +# If there are errors, you can view the top n with the following command (replace with appropriate project) +source("scripts/targets-common.R"); +get_targets_errors("covid_hosp_prod", top_n = 10) # Automatically append the new reports to the site index and host the site on netlify # (this requires the netlify CLI to be installed and configured, talk to Dmitry about this) diff --git a/_targets.yaml b/_targets.yaml index c198722e..0b0b0d5c 100644 --- a/_targets.yaml +++ b/_targets.yaml @@ -2,11 +2,12 @@ covid_hosp_explore: script: scripts/covid_hosp_explore.R store: covid_hosp_explore use_crew: no - error: null + reporter_make: timestamp flu_hosp_explore: script: scripts/flu_hosp_explore.R store: flu_hosp_explore use_crew: no + reporter_make: timestamp flu_hosp_tiny: script: scripts/flu_hosp_tiny.R store: flu_hosp_tiny @@ -15,7 +16,9 @@ flu_hosp_prod: script: scripts/flu_hosp_prod.R store: flu_hosp_prod use_crew: no + reporter_make: timestamp covid_hosp_prod: script: scripts/covid_hosp_prod.R store: covid_hosp_prod use_crew: no + reporter_make: timestamp diff --git a/covid_data_substitutions.csv b/covid_data_substitutions.csv index 86a2e79f..e168122f 100644 --- a/covid_data_substitutions.csv +++ b/covid_data_substitutions.csv @@ -1,2 +1,4 @@ geo_value, forecast_date, time_value, value nh, 2025-01-15, 2025-01-11, 150 +dc, 2025-02-19, 2025-02-15, 18 +ga, 2025-02-19, 2025-02-15, 210 diff --git a/covid_geo_exclusions.csv b/covid_geo_exclusions.csv index 6d2d787e..04c5983e 100644 --- a/covid_geo_exclusions.csv +++ b/covid_geo_exclusions.csv @@ -3,11 +3,42 @@ forecast_date,forecaster,geo_value,weight "2024-10-01", "all", "mp", 0 "2024-10-01", "windowed_seasonal", "all", 3 "2024-10-01", "windowed_seasonal_extra_sources", "all", 3 +"2024-10-01", "windowed_seasonal_extra_sources", "mo", 0 +"2024-10-01", "windowed_seasonal_extra_sources", "usa", 0 +"2024-10-01", "windowed_seasonal_extra_sources", "wy", 0 "2024-10-01", "linear", "all", 3 "2024-10-01", "linearlog", "all", 0 "2024-10-01", "climate_base", "all", 2 "2024-10-01", "climate_geo_agged", "all", 0.5 "2024-10-01", "climate_quantile_extrapolated", "all", 0 +# feb 19 +"2025-02-19", "all", "mp", 0 +"2025-02-19", "windowed_seasonal", "all", 0 +"2025-02-19", "windowed_seasonal_extra_sources", "all", 3 +# this is weighing the linear climate ensemble +"2025-02-19", "linear_climate", "all", 0.0001 +"2025-02-19", "linear_climate", "ma", 3 +"2025-02-19", "linear_climate", "nh", 3 +"2025-02-19", "linear_climate", "nm", 3 +"2025-02-19", "windowed_seasonal", "pr", 3 +"2025-02-19", "windowed_seasonal", "mo", 3 +"2025-02-19", "windowed_seasonal", "wy", 3 +"2025-02-19", "windowed_seasonal", "usa", 3 +# creating linear climate +"2025-02-19", "linear", "all", 3 +"2025-02-19", "linearlog", "all", 0 +"2025-02-19", "climate_base", "all", 2 +"2025-02-19", "climate_geo_agged", "all", 0.5 +"2025-02-19", "climate_quantile_extrapolated", "all", 0 + +# feb 12 +2025-02-05, all, mp, 0 +2025-02-05, windowed_seasonal, all, 3 +2025-02-05, windowed_seasonal_extra_sources, all, 0.0 +2025-02-05, linear, all, 0.5 +2025-02-05, linearlog, all, 0 +2025-02-05, climate_base, all, 0 +2025-02-05, climate_geo_agged, all, 0.0 # feb 5 2025-02-05, all, mp, 0 2025-02-05, windowed_seasonal, all, 3 diff --git a/dashboard-proj/.gitignore b/dashboard-proj/.gitignore new file mode 100644 index 00000000..4f8b6a01 --- /dev/null +++ b/dashboard-proj/.gitignore @@ -0,0 +1,6 @@ +* +!.gitignore +!meta +!*.R +meta/* +# !meta/meta diff --git a/flu_data_substitutions.csv b/flu_data_substitutions.csv index ee6d70f2..f44143b9 100644 --- a/flu_data_substitutions.csv +++ b/flu_data_substitutions.csv @@ -8,3 +8,7 @@ nm, 2025-02-12, 2025-02-05, 200 ca, 2025-02-12, 2025-02-05, 3893 ms, 2025-02-12, 2025-02-05, 420 ok, 2025-02-12, 2025-02-05, 850 +dc, 2025-02-19, 2025-02-15, 130 +ga, 2025-02-19, 2025-02-15, 700 +nm, 2025-02-19, 2025-02-15, 170 +ok, 2025-02-19, 2025-02-15, 600 \ No newline at end of file diff --git a/flu_geo_exclusions.csv b/flu_geo_exclusions.csv index 2eac41d6..aca672bb 100644 --- a/flu_geo_exclusions.csv +++ b/flu_geo_exclusions.csv @@ -2,11 +2,31 @@ forecast_date,forecaster,geo_value,weight # default values 2024-10-01, all, mp, 0 2024-10-01, windowed_seasonal, all, 3 +2024-10-01, windowed_seasonal_extra_sources, all, 3 2024-10-01, linear, all, 0.5 2024-10-01, linearlog, all, 0 2024-10-01, climate_base, all, 0.5 2024-10-01, climate_geo_agged, all, 0.25 2024-10-01, climate_quantile_extrapolated, all, 0 +# feb 19 +2025-02-19, all, mp, 0 +2025-02-19, windowed_seasonal, all, 3 +2025-02-19, windowed_seasonal_extra_sources, all, 3 +2025-02-19, linear_climate, all, 3 +2025-02-19, linear, all, 0.5 +2025-02-19, linearlog, all, 0 +2025-02-19, climate_base, all, 0.5 +2025-02-19, climate_geo_agged, all, 0.25 +2025-02-19, climate_quantile_extrapolated, all, 0 +# state specific +2025-02-19, linear, in, 3 +2025-02-19, linear, ks, 3 +2025-02-19, linear, ky, 3 +2025-02-19, linear, mt, 3 +2025-02-19, windowed_seasonal_extra_sources, mo, 0 +2025-02-19, windowed_seasonal_extra_sources, pr, 0 +2025-02-19, windowed_seasonal_extra_sources, us, 0 +2025-02-19, windowed_seasonal_extra_sources, wy, 0 # feb 5 2025-02-05, all, mp, 0 2025-02-05, windowed_seasonal, all, 3 diff --git a/renv.lock b/renv.lock index b779ec2a..2653cf85 100644 --- a/renv.lock +++ b/renv.lock @@ -1,6 +1,6 @@ { "R": { - "Version": "4.4.2", + "Version": "4.4.1", "Repositories": [ { "Name": "RSPM", @@ -1702,7 +1702,8 @@ "URL": "https://gitlab.com/luke-tierney/codetools", "License": "GPL", "NeedsCompilation": "no", - "Repository": "CRAN" + "Repository": "RSPM", + "Encoding": "UTF-8" }, "collections": { "Package": "collections", @@ -2809,11 +2810,11 @@ "Author": "Logan Brooks [aut], Dmitry Shemetov [aut], Samuel Gratzl [aut], David Weber [ctb, cre], Nat DeFries [ctb], Alex Reinhart [ctb], Daniel J. McDonald [ctb], Kean Ming Tan [ctb], Will Townes [ctb], George Haff [ctb], Kathryn Mazaitis [ctb]", "Maintainer": "David Weber ", "RemoteType": "github", - "RemoteHost": "api.github.com", "RemoteUsername": "cmu-delphi", "RemoteRepo": "epidatr", "RemoteRef": "dev", "RemoteSha": "0026856af0f41f5caf11d53d244e68e31136418f", + "RemoteHost": "api.github.com", "Remotes": "cmu-delphi/delphidocs" }, "epipredict": { @@ -2952,11 +2953,11 @@ "Author": "Jacob Bien [ctb], Logan Brooks [aut, cre], Rafael Catoia [ctb], Nat DeFries [aut], Daniel McDonald [aut], Rachel Lobay [ctb], Ken Mawer [ctb], Chloe You [ctb], Quang Nguyen [ctb], Evan Ray [aut], Dmitry Shemetov [aut], Ryan Tibshirani [aut], David Weber [ctb], Lionel Henry [ctb] (Author of included rlang fragments), Hadley Wickham [ctb] (Author of included rlang fragments), Posit [cph] (Copyright holder of included rlang fragments), Johns Hopkins University Center for Systems Science and Engineering [dtc] (Owner of COVID-19 cases and deaths data from the COVID-19 Data Repository), Johns Hopkins University [cph] (Copyright holder of COVID-19 cases and deaths data from the COVID-19 Data Repository), Carnegie Mellon University Delphi Group [dtc] (Owner of claims-based CLI data from the Delphi Epidata API)", "Maintainer": "Logan Brooks ", "RemoteType": "github", + "RemoteHost": "api.github.com", "RemoteUsername": "cmu-delphi", "RemoteRepo": "epiprocess", "RemoteRef": "dev", "RemoteSha": "a699ae8f7c77aabd6a72cdb0209dd34960402047", - "RemoteHost": "api.github.com", "Remotes": "cmu-delphi/delphidocs, cmu-delphi/epidatasets, cmu-delphi/epidatr, cmu-delphi/epipredict, glmgen/trendfilter, reconverse/outbreaks" }, "evaluate": { @@ -5228,7 +5229,8 @@ "NeedsCompilation": "yes", "Author": "Deepayan Sarkar [aut, cre] (), Felix Andrews [ctb], Kevin Wright [ctb] (documentation), Neil Klepeis [ctb], Johan Larsson [ctb] (miscellaneous improvements), Zhijian (Jason) Wen [cph] (filled contour code), Paul Murrell [ctb], Stefan Eng [ctb] (violin plot improvements), Achim Zeileis [ctb] (modern colors), Alexandre Courtiol [ctb] (generics for larrows, lpolygon, lrect and lsegments)", "Maintainer": "Deepayan Sarkar ", - "Repository": "CRAN" + "Repository": "RSPM", + "Encoding": "UTF-8" }, "lava": { "Package": "lava", @@ -5686,7 +5688,8 @@ "NeedsCompilation": "no", "Author": "Joe Cheng [cre, aut], RStudio [cph]", "Maintainer": "Joe Cheng ", - "Repository": "CRAN" + "Repository": "RSPM", + "Encoding": "UTF-8" }, "mirai": { "Package": "mirai", @@ -6541,7 +6544,8 @@ ], "Collate": "'adjective.R' 'adverb.R' 'exclamation.R' 'verb.R' 'rpackage.R' 'package.R'", "NeedsCompilation": "no", - "Repository": "CRAN" + "Repository": "RSPM", + "Encoding": "UTF-8" }, "prettyunits": { "Package": "prettyunits", @@ -6955,6 +6959,49 @@ "Author": "Travers Ching [aut, cre, cph], Yann Collet [ctb, cph] (Yann Collet is the author of the bundled zstd, lz4 and xxHash code), Facebook, Inc. [cph] (Facebook is the copyright holder of the bundled zstd code), Reichardt Tino [ctb, cph] (Contributor/copyright holder of zstd bundled code), Skibinski Przemyslaw [ctb, cph] (Contributor/copyright holder of zstd bundled code), Mori Yuta [ctb, cph] (Contributor/copyright holder of zstd bundled code), Romain Francois [ctb, cph] (Derived example/tutorials for ALTREP structures), Francesc Alted [ctb, cph] (Shuffling routines derived from Blosc library), Bryce Chamberlain [ctb] (qsavem and qload functions), Salim Brüggemann [ctb] (Contributing to documentation (ORCID:0000-0002-5329-5987))", "Repository": "RSPM" }, + "qs2": { + "Package": "qs2", + "Version": "0.1.4", + "Source": "Repository", + "Type": "Package", + "Title": "Efficient Serialization of R Objects", + "Date": "2024-12-11", + "Authors@R": "c( person(\"Travers\", \"Ching\", email = \"traversc@gmail.com\", role = c(\"aut\", \"cre\", \"cph\")), person(\"Yann\", \"Collet\", role = c(\"ctb\", \"cph\"), comment = \"Yann Collet is the author of the bundled zstd\"), person(\"Facebook, Inc.\", role = \"cph\", comment = \"Facebook is the copyright holder of the bundled zstd code\"), person(\"Reichardt\", \"Tino\", role = c(\"ctb\", \"cph\"), comment = \"Contributor/copyright holder of zstd bundled code\"), person(\"Skibinski\", \"Przemyslaw\", role = c(\"ctb\", \"cph\"), comment = \"Contributor/copyright holder of zstd bundled code\"), person(\"Mori\", \"Yuta\", role = c(\"ctb\", \"cph\"), comment = \"Contributor/copyright holder of zstd bundled code\"), person(\"Francesc\", \"Alted\", role = c(\"ctb\", \"cph\"), comment = \"Shuffling routines derived from Blosc library\"))", + "Maintainer": "Travers Ching ", + "Description": "Streamlines and accelerates the process of saving and loading R objects, improving speed and compression compared to other methods. The package provides two compression formats: the 'qs2' format, which uses R serialization via the C API while optimizing compression and disk I/O, and the 'qdata' format, featuring custom serialization for slightly faster performance and better compression. Additionally, the 'qs2' format can be directly converted to the standard 'RDS' format, ensuring long-term compatibility with future versions of R.", + "License": "GPL-3", + "LazyData": "true", + "Biarch": "true", + "Depends": [ + "R (>= 3.5.0)" + ], + "Imports": [ + "Rcpp", + "stringfish (>= 0.15.1)" + ], + "LinkingTo": [ + "Rcpp", + "stringfish", + "RcppParallel" + ], + "Suggests": [ + "knitr", + "rmarkdown", + "dplyr", + "data.table", + "stringi" + ], + "SystemRequirements": "GNU make", + "Encoding": "UTF-8", + "RoxygenNote": "7.3.2", + "VignetteBuilder": "knitr", + "Copyright": "This package includes code from the 'zstd' library owned by Facebook, Inc. and created by Yann Collet; and code derived from the 'Blosc' library created and owned by Francesc Alted.", + "URL": "https://github.com/qsbase/qs2", + "BugReports": "https://github.com/qsbase/qs2/issues", + "NeedsCompilation": "yes", + "Author": "Travers Ching [aut, cre, cph], Yann Collet [ctb, cph] (Yann Collet is the author of the bundled zstd), Facebook, Inc. [cph] (Facebook is the copyright holder of the bundled zstd code), Reichardt Tino [ctb, cph] (Contributor/copyright holder of zstd bundled code), Skibinski Przemyslaw [ctb, cph] (Contributor/copyright holder of zstd bundled code), Mori Yuta [ctb, cph] (Contributor/copyright holder of zstd bundled code), Francesc Alted [ctb, cph] (Shuffling routines derived from Blosc library)", + "Repository": "RSPM" + }, "quadprog": { "Package": "quadprog", "Version": "1.5-8", @@ -10115,9 +10162,10 @@ "R (>= 2.10.0)" ], "License": "GPL (>= 2)", - "Repository": "CRAN", + "Repository": "RSPM", "NeedsCompilation": "no", - "Author": "David B. Dahl [aut], David Scott [aut, cre], Charles Roosen [aut], Arni Magnusson [aut], Jonathan Swinton [aut], Ajay Shah [ctb], Arne Henningsen [ctb], Benno Puetz [ctb], Bernhard Pfaff [ctb], Claudio Agostinelli [ctb], Claudius Loehnert [ctb], David Mitchell [ctb], David Whiting [ctb], Fernando da Rosa [ctb], Guido Gay [ctb], Guido Schulz [ctb], Ian Fellows [ctb], Jeff Laake [ctb], John Walker [ctb], Jun Yan [ctb], Liviu Andronic [ctb], Markus Loecher [ctb], Martin Gubri [ctb], Matthieu Stigler [ctb], Robert Castelo [ctb], Seth Falcon [ctb], Stefan Edwards [ctb], Sven Garbade [ctb], Uwe Ligges [ctb]" + "Author": "David B. Dahl [aut], David Scott [aut, cre], Charles Roosen [aut], Arni Magnusson [aut], Jonathan Swinton [aut], Ajay Shah [ctb], Arne Henningsen [ctb], Benno Puetz [ctb], Bernhard Pfaff [ctb], Claudio Agostinelli [ctb], Claudius Loehnert [ctb], David Mitchell [ctb], David Whiting [ctb], Fernando da Rosa [ctb], Guido Gay [ctb], Guido Schulz [ctb], Ian Fellows [ctb], Jeff Laake [ctb], John Walker [ctb], Jun Yan [ctb], Liviu Andronic [ctb], Markus Loecher [ctb], Martin Gubri [ctb], Matthieu Stigler [ctb], Robert Castelo [ctb], Seth Falcon [ctb], Stefan Edwards [ctb], Sven Garbade [ctb], Uwe Ligges [ctb]", + "Encoding": "UTF-8" }, "yaml": { "Package": "yaml", diff --git a/reports/template.md b/reports/template.md index 068b57c1..5b0f5106 100644 --- a/reports/template.md +++ b/reports/template.md @@ -5,6 +5,9 @@ ## Production Reports +### Scoring this season + + ## Exploration Reports - [NHSN 2024-2025 Data Analysis](new_data.html) diff --git a/scripts/covid_hosp_prod.R b/scripts/covid_hosp_prod.R index 4730aa19..77606a64 100644 --- a/scripts/covid_hosp_prod.R +++ b/scripts/covid_hosp_prod.R @@ -1,7 +1,6 @@ # The COVID Hospitalization Production Forecasting Pipeline. source("scripts/targets-common.R") -submit_climatological <- TRUE submission_directory <- Sys.getenv("COVID_SUBMISSION_DIRECTORY", "cache") insufficient_data_geos <- c("as", "mp", "vi", "gu") # date to cut the truth data off at, so we don't have too much of the past @@ -11,15 +10,30 @@ truth_data_date <- "2023-09-01" # today, which is a Wednesday. Sometimes, if we're doing a delayed forecast, # it's a Thursday. It's used for stamping the data and for determining the # appropriate as_of when creating the forecast. -forecast_generation_date <- Sys.Date() +forecast_generation_dates <- Sys.Date() # Usually, the forecast_date is the same as the generation date, but you can # override this. It should be a Wednesday. -forecast_date <- round_date(forecast_generation_date, "weeks", week_start = 3) +forecast_dates <- round_date(forecast_generation_dates, "weeks", week_start = 3) + + # forecast_generation_date needs to follow suit, but it's more complicated # because sometimes we forecast on Thursday. -# forecast_generation_date <- c(as.Date(c("2024-11-20", "2024-11-27", "2024-12-04", "2024-12-11", "2024-12-18", "2024-12-26", "2025-01-02")), seq.Date(as.Date("2025-01-08"), Sys.Date(), by = 7L)) +# forecast_generation_dates <- c(as.Date(c("2024-11-22", "2024-11-27", "2024-12-04", "2024-12-11", "2024-12-18", "2024-12-26", "2025-01-02")), seq.Date(as.Date("2025-01-08"), Sys.Date(), by = 7L)) # If doing backfill, you can set the forecast_date to a sequence of dates. -# forecast_date <- seq.Date(as.Date("2024-11-20"), Sys.Date(), by = 7L) +# forecast_dates <- seq.Date(as.Date("2024-11-20"), Sys.Date(), by = 7L) + +# Select first two for debugging +# forecast_generation_dates <- forecast_generation_dates[1:2] +# forecast_dates <- forecast_dates[1:2] + +# Whether we're running in backtest mode. +# If TRUE, we don't run the report notebook, which is (a) slow and (b) should be +# preserved as an ASOF snapshot of our production results for that week. +# If TRUE, we run a scoring notebook, which scores the historical forecasts +# against the truth data and compares them to the ensemble. +# If FALSE, we run the weekly report notebook. +backtest_mode <- length(forecast_dates) > 1 + forecaster_fns <- list2( linear = function(epi_data, ahead, extra_data, ...) { @@ -76,13 +90,31 @@ forecaster_fns <- list2( fcst } ) -indices <- seq_along(forecaster_fns) +forecaster_fn_names_ <- names(forecaster_fns) + -rlang::list2( +# ================================ PARAMETERS AND DATE TARGETS ================================ +parameters_and_date_targets <- rlang::list2( tar_target(aheads, command = -1:3), - tar_target(forecasters, command = indices), - tar_target( - name = nhsn_latest_data, + tar_file( + forecast_report_rmd, + command = "scripts/reports/forecast_report.Rmd" + ), + tar_file( + score_report_rmd, + command = "scripts/reports/score_report.Rmd" + ), + tar_file( + covid_geo_exclusions, + command = "covid_geo_exclusions.csv" + ), + tar_file( + covid_data_substitutions, + command = "covid_data_substitutions.csv" + ), + tar_change( + nhsn_latest_data, + change = get_socrata_updated_at("https://data.cdc.gov/api/views/mpgq-jmmr"), command = { if (wday(Sys.Date()) < 6 & wday(Sys.Date()) > 3) { # download from the preliminary data source from Wednesday to Friday @@ -97,229 +129,274 @@ rlang::list2( select(-disease) %>% filter(geo_value %nin% insufficient_data_geos) most_recent_result - }, - cue = tar_cue("always") + } ), - tar_target( + tar_change( name = nhsn_archive_data, + change = get_s3_object_last_modified("forecasting-team-data", "archive_timestamped.parquet"), command = { create_nhsn_data_archive(disease = "nhsn_covid") } ), - tar_target( + tar_change( current_nssp_archive, + change = get_covidcast_signal_last_update("nssp", "pct_ed_visits_covid", "state"), command = { up_to_date_nssp_state_archive("covid") - }, - cue = tar_cue(mode = "always") + } + ) +) + + +# ================================ FORECAST TARGETS ================================ +forecast_targets <- tar_map( + values = tidyr::expand_grid( + tibble(forecaster_fn_names = forecaster_fn_names_), + tibble( + forecast_date_int = forecast_dates, + forecast_generation_date_int = forecast_generation_dates, + forecast_date_chr = as.character(forecast_dates) + ) ), - tar_map( - values = tibble( - forecast_date_int = forecast_date, - forecast_generation_date_int = forecast_generation_date, - forecast_date_chr = as.character(forecast_date_int) - ), - names = "forecast_date_chr", - tar_target( - name = geo_forecasters_weights, - command = { - geo_forecasters_weights <- parse_prod_weights(here::here("covid_geo_exclusions.csv"), forecast_date_int, forecaster_fns) - if (nrow(geo_forecasters_weights %>% filter(forecast_date == as.Date(forecast_date_int))) == 0) { - cli_abort("there are no weights for the forecast date {forecast_date}") - } - geo_forecasters_weights - }, - ), - tar_target( - name = geo_exclusions, - command = { - exclude_geos(geo_forecasters_weights) - } - ), - tar_target( - name = forecast_res, - command = { - if (as.Date(forecast_generation_date_int) < Sys.Date()) { - train_data <- nhsn_archive_data %>% - epix_as_of(min(forecast_date, current_nssp_archive$versions_end)) %>% - add_season_info() %>% - mutate( - geo_value = ifelse(geo_value == "usa", "us", geo_value), - time_value = time_value - 3 - ) - } else { - train_data <- - nhsn_latest_data %>% - data_substitutions(disease = "covid", forecast_generation_date) %>% - as_epi_df(as_of = as.Date(forecast_date_int)) %>% - mutate(time_value = time_value - 3) - } - nssp <- current_nssp_archive %>% - epix_as_of(min(forecast_date, current_nssp_archive$versions_end)) %>% - mutate(time_value = time_value) - attributes(train_data)$metadata$as_of <- as.Date(forecast_date_int) - train_data %>% - forecaster_fns[[forecasters]](ahead = aheads, extra_data = nssp) %>% + names = c("forecaster_fn_names", "forecast_date_chr"), + tar_target( + name = forecast_res, + command = { + if (as.Date(forecast_generation_date_int) < Sys.Date()) { + train_data <- nhsn_archive_data %>% + epix_as_of(min(as.Date(forecast_date_int), nhsn_archive_data$versions_end)) %>% + add_season_info() %>% mutate( - forecaster = names(forecaster_fns[forecasters]), - geo_value = as.factor(geo_value) + geo_value = ifelse(geo_value == "usa", "us", geo_value), + time_value = time_value - 3 ) - }, - pattern = cross(aheads, forecasters) - ), - tar_target( - name = ensemble_res, - command = { - forecast_res %>% - ensemble_linear_climate( - aheads, - other_weights = geo_forecasters_weights, - max_climate_ahead_weight = 0.6, - max_climate_quantile_weight = 0.6 - ) %>% - filter(geo_value %nin% geo_exclusions) %>% - ungroup() %>% - sort_by_quantile() - }, - ), - tar_target( - name = ensemble_mixture_res, - command = { - forecast_res %>% - ensemble_linear_climate( - aheads, - other_weights = geo_forecasters_weights, - max_climate_ahead_weight = 0.6, - max_climate_quantile_weight = 0.6 - ) %>% - filter(geo_value %nin% geo_exclusions) %>% - ungroup() %>% - bind_rows(forecast_res %>% - filter(forecaster %in% c("windowed_seasonal_extra_sources")) %>% - filter(forecast_date < target_end_date)) %>% # don't use for neg aheads - group_by(geo_value, forecast_date, target_end_date, quantile) %>% - summarize(value = mean(value, na.rm = TRUE), .groups = "drop") %>% - sort_by_quantile() - }, - ), - tar_target( - name = forecasts_and_ensembles, - command = { - bind_rows( - forecast_res, - ensemble_res %>% mutate(forecaster = "ensemble"), - ensemble_mixture_res %>% mutate(forecaster = "ensemble_mix"), - # TODO: Maybe later, match with flu_hosp_prod - # ensemble_mixture_res_2 %>% mutate(forecaster = "ensemble_mix_2"), - # combo_ensemble_mixture_res %>% mutate(forecaster = "combo_ensemble_mix") + } else { + covid_data_substitutions + train_data <- + nhsn_latest_data %>% + data_substitutions(disease = "covid", as.Date(forecast_generation_date_int)) %>% + as_epi_df(as_of = as.Date(forecast_date_int)) %>% + mutate(time_value = time_value - 3) + } + nssp <- current_nssp_archive %>% + epix_as_of(min(as.Date(forecast_date_int), current_nssp_archive$versions_end)) %>% + mutate(time_value = time_value) + attributes(train_data)$metadata$as_of <- as.Date(forecast_date_int) + train_data %>% + forecaster_fns[[forecaster_fn_names]](ahead = aheads, extra_data = nssp) %>% + mutate( + forecaster = forecaster_fn_names, + geo_value = as.factor(geo_value) ) + }, + pattern = map(aheads) + ) +) +combined_forecasts <- tar_combine( + name = forecast_full, + forecast_targets[["forecast_res"]], + command = { + dplyr::bind_rows(!!!.x) + } +) + +# ================================ ENSEMBLE TARGETS ================================ +ensemble_targets <- tar_map( + values = tibble( + forecast_date_int = forecast_dates, + forecast_generation_date_int = forecast_generation_dates, + forecast_date_chr = as.character(forecast_dates) + ), + names = "forecast_date_chr", + tar_target( + name = forecast_full_filtered, + command = { + forecast_full %>% + filter(forecast_date == as.Date(forecast_date_int)) + } + ), + tar_target( + name = geo_forecasters_weights, + command = { + geo_forecasters_weights <- parse_prod_weights(covid_geo_exclusions, forecast_date_int, forecaster_fn_names_) + if (nrow(geo_forecasters_weights %>% filter(forecast_date == as.Date(forecast_date_int))) == 0) { + cli_abort("there are no weights for the forecast date {forecast_date}") } - ), - tar_target( - name = make_submission_csv, - command = { + geo_forecasters_weights + }, + ), + tar_target( + name = geo_exclusions, + command = { + exclude_geos(geo_forecasters_weights) + } + ), + tar_target( + name = ensemble_lin_clim, + command = { + forecast_full_filtered %>% + ensemble_linear_climate( + aheads, + other_weights = geo_forecasters_weights, + max_climate_ahead_weight = 0.6, + max_climate_quantile_weight = 0.6 + ) %>% + filter(geo_value %nin% geo_exclusions) %>% + ungroup() %>% + sort_by_quantile() + }, + ), + tar_target( + name = ens_ar_only, + command = { + forecast_full_filtered %>% + filter(forecaster %in% c("windowed_seasonal", "windowed_seasonal_extra_sources")) %>% + group_by(geo_value, forecast_date, target_end_date, quantile) %>% + summarize(value = mean(value, na.rm = TRUE), .groups = "drop") %>% + sort_by_quantile() + } + ), + tar_target( + name = ensemble_mixture_res, + command = { + all_ensembled <- + ensemble_lin_clim %>% + mutate(forecaster = "linear_climate") %>% + bind_rows( + forecast_full_filtered %>% + filter(forecaster %in% c("windowed_seasonal", "windowed_seasonal_extra_sources")) %>% + filter(forecast_date < target_end_date) # don't use for neg aheads + ) %>% + ensemble_weighted(geo_forecasters_weights) + }, + ), + tar_target( + name = forecasts_and_ensembles, + command = { + bind_rows( + forecast_full_filtered, + ensemble_lin_clim %>% mutate(forecaster = "linear_climate"), + ensemble_mixture_res %>% mutate(forecaster = "ensemble_mix"), + ens_ar_only %>% mutate(forecaster = "ens_ar_only") + ) + } + ), + tar_target( + name = make_submission_csv, + command = { + if (!backtest_mode && submission_directory != "cache") { forecast_reference_date <- get_forecast_reference_date(forecast_date_int) ensemble_mixture_res %>% format_flusight(disease = "covid") %>% write_submission_file(forecast_reference_date, file.path(submission_directory, "model-output/CMU-TimeSeries")) - }, - ), - tar_target( - name = make_climate_submission_csv, - command = { - if (submit_climatological) { - forecasts <- forecast_res - forecasts %>% - filter(forecaster %in% c("climate_base", "climate_geo_agged")) %>% - group_by(geo_value, target_end_date, quantile) %>% - summarize(forecast_date = first(forecast_date), value = mean(value, na.rm = TRUE), .groups = "drop") %>% - ungroup() %>% - format_flusight(disease = "covid") %>% - filter(location %nin% c("60", "66", "78")) %>% - write_submission_file( - get_forecast_reference_date(forecast_date_int), - submission_directory = file.path(submission_directory, "model-output/CMU-climate_baseline"), - file_name = "CMU-climate_baseline" - ) - } - }, - ), - tar_target( - name = validate_result, - command = { - make_submission_csv - # only validate if we're saving the result to a hub - if (submission_directory != "cache") { - validation <- validate_submission( - submission_directory, - file_path = sprintf("CMU-TimeSeries/%s-CMU-TimeSeries.csv", get_forecast_reference_date(forecast_date_int)) - ) - } else { - validation <- "not validating when there is no hub (set submission_directory)" - } - validation - }, - ), - tar_target( - name = validate_climate_result, - command = { - make_climate_submission_csv - # only validate if we're saving the result to a hub - if (submission_directory != "cache" && submit_climatological) { - validation <- validate_submission( - submission_directory, - file_path = sprintf("CMU-climate_baseline/%s-CMU-climate_baseline.csv", get_forecast_reference_date(forecast_date_int)) + } else { + cli_alert_info("Not making submission csv because we're in backtest mode or submission directory is cache") + } + }, + ), + tar_target( + name = make_climate_submission_csv, + command = { + if (!backtest_mode && submission_directory != "cache") { + forecast_full_filtered %>% + filter(forecaster %in% c("climate_base", "climate_geo_agged")) %>% + group_by(geo_value, target_end_date, quantile) %>% + summarize(forecast_date = as.Date(forecast_date_int), value = mean(value, na.rm = TRUE), .groups = "drop") %>% + ungroup() %>% + format_flusight(disease = "covid") %>% + filter(location %nin% c("60", "66", "78")) %>% + write_submission_file( + get_forecast_reference_date(forecast_date_int), + submission_directory = file.path(submission_directory, "model-output/CMU-climate_baseline"), + file_name = "CMU-climate_baseline" ) - } else { - validation <- "not validating when there is no hub (set submission_directory)" - } - validation - }, - ), - tar_target( - name = truth_data, - command = { - nssp_state <- pub_covidcast( - source = "nssp", - signal = "pct_ed_visits_covid", - time_type = "week", - geo_type = "state", - geo_values = "*", - fetch_args = epidatr::fetch_args_list(timeout_seconds = 400) - ) %>% - select(geo_value, source, target_end_date = time_value, value) %>% - filter(target_end_date > truth_data_date, geo_value %nin% insufficient_data_geos) %>% - mutate(target_end_date = target_end_date + 6) - truth_data <- nhsn_latest_data %>% - mutate(target_end_date = time_value) %>% - filter(time_value > truth_data_date) %>% - mutate(source = "nhsn") %>% - select(geo_value, target_end_date, source, value) - nssp_renormalized <- + } else { + cli_alert_info("Not making climate submission csv because we're in backtest mode or submission directory is cache") + } + }, + ), + tar_target( + name = validate_result, + command = { + make_submission_csv + # only validate if we're saving the result to a hub + if (!backtest_mode && submission_directory != "cache") { + validation <- validate_submission( + submission_directory, + file_path = sprintf("CMU-TimeSeries/%s-CMU-TimeSeries.csv", get_forecast_reference_date(forecast_date_int)) + ) + } else { + validation <- "not validating when there is no hub (set submission_directory)" + } + validation + }, + ), + tar_target( + name = validate_climate_result, + command = { + make_climate_submission_csv + # only validate if we're saving the result to a hub + if (!backtest_mode && submission_directory != "cache") { + validation <- validate_submission( + submission_directory, + file_path = sprintf("CMU-climate_baseline/%s-CMU-climate_baseline.csv", get_forecast_reference_date(forecast_date_int)) + ) + } else { + validation <- "not validating when there is no hub (set submission_directory)" + } + validation + }, + ), + tar_target( + name = truth_data, + command = { + nssp_state <- retry_fn( + max_attempts = 20, + wait_seconds = 2, + fn = pub_covidcast, + source = "nssp", + signal = "pct_ed_visits_covid", + time_type = "week", + geo_type = "state", + geo_values = "*", + fetch_args = epidatr::fetch_args_list(timeout_seconds = 400) + ) %>% + select(geo_value, source, target_end_date = time_value, value) %>% + filter(target_end_date > truth_data_date, geo_value %nin% insufficient_data_geos) %>% + mutate(target_end_date = target_end_date + 6) + truth_data <- nhsn_latest_data %>% + mutate(target_end_date = time_value) %>% + filter(time_value > truth_data_date) %>% + mutate(source = "nhsn") %>% + select(geo_value, target_end_date, source, value) + nssp_renormalized <- + nssp_state %>% + left_join( nssp_state %>% - left_join( - nssp_state %>% - rename(nssp = value) %>% - full_join( - truth_data %>% - select(geo_value, target_end_date, value), - by = join_by(geo_value, target_end_date) - ) %>% - group_by(geo_value) %>% - summarise(rel_max_value = max(value, na.rm = TRUE) / max(nssp, na.rm = TRUE)), - by = join_by(geo_value) - ) %>% - mutate(value = value * rel_max_value) %>% - select(-rel_max_value) - truth_data %>% bind_rows(nssp_renormalized) - }, - ), - tar_target( - notebook, - command = { + rename(nssp = value) %>% + full_join( + truth_data %>% + select(geo_value, target_end_date, value), + by = join_by(geo_value, target_end_date) + ) %>% + group_by(geo_value) %>% + summarise(rel_max_value = max(value, na.rm = TRUE) / max(nssp, na.rm = TRUE)), + by = join_by(geo_value) + ) %>% + mutate(value = value * rel_max_value) %>% + select(-rel_max_value) + truth_data %>% bind_rows(nssp_renormalized) + }, + ), + tar_target( + notebook, + command = { + # Only render the report if there is only one forecast date + # i.e. we're running this in prod on schedule + if (!backtest_mode) { if (!dir.exists(here::here("reports"))) dir.create(here::here("reports")) rmarkdown::render( - "scripts/reports/forecast_report.Rmd", + forecast_report_rmd, output_file = here::here( "reports", sprintf("%s_covid_prod_on_%s.html", as.Date(forecast_date_int), as.Date(Sys.Date())) @@ -331,8 +408,50 @@ rlang::list2( truth_data = truth_data ) ) + } + } + ) +) + + +# ================================ SCORE TARGETS ================================ +if (backtest_mode) { + score_targets <- list2( + tar_target( + external_forecasts, + command = { + get_external_forecasts("covid") + } + ), + tar_combine( + name = joined_forecasts_and_ensembles, + ensemble_targets[["forecasts_and_ensembles"]], + command = { + dplyr::bind_rows(!!!.x, external_forecasts) + } + ), + tar_target( + name = scores, + command = { + score_forecasts(nhsn_latest_data, joined_forecasts_and_ensembles) + } + ), + tar_target( + name = score_plot, + command = { + render_score_plot(score_report_rmd, scores, forecast_dates, "covid") }, - cue = tar_cue(mode = "always") + cue = tar_cue("always") ) - ), + ) +} else { + score_targets <- list() +} + +list2( + parameters_and_date_targets, + forecast_targets, + ensemble_targets, + combined_forecasts, + score_targets ) diff --git a/scripts/flu_hosp_prod.R b/scripts/flu_hosp_prod.R index ff674bbd..133e88f0 100644 --- a/scripts/flu_hosp_prod.R +++ b/scripts/flu_hosp_prod.R @@ -2,7 +2,6 @@ source("scripts/targets-common.R") source("scripts/targets-exploration-common.R") -submit_climatological <- TRUE submission_directory <- Sys.getenv("FLU_SUBMISSION_DIRECTORY", "cache") insufficient_data_geos <- c("as", "mp", "vi", "gu") excluded_geos <- c("as", "gu", "mh") @@ -15,16 +14,29 @@ end_date <- Sys.Date() # today, which is a Wednesday. Sometimes, if we're doing a delayed forecast, # it's a Thursday. It's used for stamping the data and for determining the # appropriate as_of when creating the forecast. -forecast_generation_date <- Sys.Date() +forecast_generation_dates <- Sys.Date() # Usually, the forecast_date is the same as the generation date, but you can # override this. It should be a Wednesday. -forecast_date <- round_date(forecast_generation_date, "weeks", week_start = 3) +forecast_dates <- round_date(forecast_generation_dates, "weeks", week_start = 3) # If doing backfill, you can set the forecast_date to a sequence of dates. -# forecast_date <- seq.Date(as.Date("2024-11-20"), Sys.Date(), by = 7L) +# forecast_dates <- seq.Date(as.Date("2024-11-20"), Sys.Date(), by = 7L) # forecast_generation_date needs to follow suit, but it's more complicated # because sometimes we forecast on Thursday. -# forecast_generation_date <- c(as.Date(c("2024-11-21", "2024-11-27", "2024-12-04", "2024-12-11", "2024-12-18", "2024-12-26", "2025-01-02")), seq.Date(as.Date("2025-01-08"), Sys.Date(), by = 7L)) +# forecast_generation_dates <- c(as.Date(c("2024-11-22", "2024-11-27", "2024-12-04", "2024-12-11", "2024-12-18", "2024-12-26", "2025-01-02")), seq.Date(as.Date("2025-01-08"), Sys.Date(), by = 7L)) +# Whether we're running in backtest mode. +# If TRUE, we don't run the report notebook, which is (a) slow and (b) should be +# preserved as an ASOF snapshot of our production results for that week. +# If TRUE, we run a scoring notebook, which scores the historical forecasts +# against the truth data and compares them to the ensemble. +# If FALSE, we run the weekly report notebook. +backtest_mode <- length(forecast_dates) > 1 + +# Select first two for debugging +## forecast_generation_dates <- forecast_generation_dates[1:2] +## forecast_dates <- forecast_dates[1:2] + +# needed for windowed_seasonal very_latent_locations <- list(list( c("source"), c("flusurv", "ILI+") @@ -84,11 +96,12 @@ forecaster_fns <- list2( lags = list(c(0, 7), c(0, 7)), keys_to_ignore = very_latent_locations ) %>% + select(-source) %>% mutate(target_end_date = target_end_date + 3) fcst } ) -indices <- seq_along(forecaster_fns) +forecaster_fn_names_ <- names(forecaster_fns) # This is needed to build the data archive ref_time_values_ <- seq.Date(as.Date("2023-10-04"), as.Date("2024-04-24"), by = 7L) @@ -98,19 +111,37 @@ smooth_last_n <- function(x, n = 1, k = 2) { x } -rlang::list2( + +# ================================ PARAMETERS AND DATE TARGETS ================================ +parameters_and_date_targets <- rlang::list2( rlang::list2( tar_target(aheads, command = -1:3), tar_target(forecasters, command = indices), tar_target(name = ref_time_values, command = ref_time_values_), + tar_file( + forecast_report_rmd, + command = "scripts/reports/forecast_report.Rmd" + ), + tar_file( + score_report_rmd, + command = "scripts/reports/score_report.Rmd" + ), + tar_file( + flu_geo_exclusions, + command = "flu_geo_exclusions.csv" + ), + tar_file( + flu_data_substitutions, + command = "flu_data_substitutions.csv" + ) ), make_historical_flu_data_targets(), - tar_target( + tar_change( current_nssp_archive, + change = get_covidcast_signal_last_update("nssp", "pct_ed_visits_influenza", "state"), command = { up_to_date_nssp_state_archive("influenza") - }, - cue = tar_cue(mode = "always") + } ), tar_target( joined_latest_extra_data, @@ -124,8 +155,9 @@ rlang::list2( filter(source != "nhsn") } ), - tar_target( + tar_change( nhsn_latest_data, + change = get_socrata_updated_at("https://data.cdc.gov/api/views/mpgq-jmmr"), command = { if (wday(Sys.Date()) < 6 & wday(Sys.Date()) > 3) { # download from the preliminary data source from Wednesday to Friday @@ -133,6 +165,7 @@ rlang::list2( } else { most_recent_result <- readr::read_csv("https://data.cdc.gov/resource/ua7e-t2fy.csv?$limit=20000&$select=weekendingdate,jurisdiction,totalconfc19newadm,totalconfflunewadm") } + flu_data_substitutions most_recent_result <- most_recent_result %>% process_nhsn_data() %>% @@ -146,299 +179,278 @@ rlang::list2( ) %>% filter(version == max(version)) %>% select(-version) %>% - data_substitutions(disease = "flu", forecast_generation_date) %>% + data_substitutions(disease = "flu", last(forecast_generation_dates)) %>% as_epi_df(other_keys = "source", as_of = Sys.Date()) most_recent_result }, - description = "Download the result, and update the file only if it's actually different", - priority = 1, - cue = tar_cue(mode = "always") ), - tar_map( - # Because targets relies on R metaprogramming, it loses the Date class. - values = tibble( - forecast_date_int = forecast_date, - forecast_generation_date_int = forecast_generation_date, - forecast_date_chr = as.character(forecast_date_int) - ), - names = "forecast_date_chr", - tar_change( - name = geo_forecasters_weights, - command = { - geo_forecasters_weights <- parse_prod_weights(here::here("flu_geo_exclusions.csv"), forecast_date_int, forecaster_fns) - if (nrow(geo_forecasters_weights %>% filter(forecast_date == as.Date(forecast_date_int))) == 0) { - cli_abort("there are no weights for the forecast date {forecast_date}") - } - geo_forecasters_weights - }, - change = here::here("flu_geo_exclusions.csv") - ), - tar_target( - name = geo_exclusions, - command = exclude_geos(geo_forecasters_weights) - ), - tar_target( - full_data, - command = { - if (as.Date(forecast_generation_date_int) < Sys.Date()) { - train_data <- nhsn_archive_data %>% - epix_as_of(as.Date(forecast_generation_date_int)) %>% - add_season_info() %>% - mutate( - source = "nhsn", - geo_value = ifelse(geo_value == "usa", "us", geo_value), - time_value = time_value - 3 - ) - } else { - train_data <- nhsn_latest_data - } - full_data <- train_data %>% - bind_rows(joined_latest_extra_data) - attributes(full_data)$metadata$other_keys <- "source" - attributes(full_data)$metadata$as_of <- as.Date(forecast_date_int) - full_data - } - ), - tar_target( - forecast_res, - command = { - forecast_date <- as.Date(forecast_date_int) - nssp <- current_nssp_archive %>% epix_as_of(min(forecast_date, current_nssp_archive$versions_end)) - full_data %>% - forecaster_fns[[forecasters]](ahead = aheads, extra_data = nssp) %>% - mutate( - forecaster = names(forecaster_fns[forecasters]), - geo_value = as.factor(geo_value) - ) - }, - pattern = cross(aheads, forecasters) - ), - # A hack to model our uncertainty in the data. We smooth the last few points - # to make the forecast more stable. - tar_target( - forecast_res_modified, - command = { - as_of <- attributes(full_data)$metadata$as_of - other_keys <- attributes(full_data)$metadata$other_keys - forecast_date <- as.Date(forecast_date_int) - nssp <- current_nssp_archive %>% epix_as_of(min(forecast_date, current_nssp_archive$versions_end)) + tar_change( + name = nhsn_archive_data, + change = get_s3_object_last_modified("forecasting-team-data", "archive_timestamped.parquet"), + command = { + create_nhsn_data_archive(disease = "nhsn_flu") + } + ) +) - # Smooth last few points for every geo. - # TODO: This is a hack, we can try some more sophisticated - # smoothing/nowcasting here. - modified_full_data <- full_data %>% - filter(source == "nhsn") %>% - arrange(geo_value, time_value) %>% - group_by(geo_value) %>% - mutate(value = smooth_last_n(value)) %>% - ungroup() - # Add back in the non-nhsn data. - modified_full_data <- modified_full_data %>% - bind_rows(full_data %>% filter(source != "nhsn")) - attributes(modified_full_data)$metadata$as_of <- as_of - attributes(modified_full_data)$metadata$other_keys <- other_keys - modified_full_data %>% - forecaster_fns[[forecasters]](ahead = aheads, extra_data = nssp) %>% +# ================================ FORECAST TARGETS ================================ +forecast_targets <- tar_map( + values = tidyr::expand_grid( + tibble(forecaster_fn_names = forecaster_fn_names_), + tibble( + forecast_date_int = forecast_dates, + forecast_generation_date_int = forecast_generation_dates, + forecast_date_chr = as.character(forecast_dates) + ) + ), + names = c("forecaster_fn_names", "forecast_date_chr"), + tar_target( + full_data, + command = { + if (as.Date(forecast_generation_date_int) < Sys.Date()) { + train_data <- nhsn_archive_data %>% + epix_as_of(min(as.Date(forecast_generation_date_int), nhsn_archive_data$versions_end)) %>% + add_season_info() %>% mutate( - forecaster = names(forecaster_fns[forecasters]), - geo_value = as.factor(geo_value) + source = "nhsn", + geo_value = ifelse(geo_value == "usa", "us", geo_value), + time_value = time_value - 3 ) - }, - pattern = cross(aheads, forecasters), - cue = tar_cue(mode = "always") - ), - tar_target( - name = climate_linear, - command = { - forecast_res %>% - # Apply the ahead-by-quantile weighting scheme - ensemble_linear_climate(aheads, other_weights = geo_forecasters_weights) %>% - filter(geo_value %nin% geo_exclusions) %>% - ungroup() %>% - sort_by_quantile() - } - ), - tar_target( - name = ens_climate_linear_window_season, - command = { - climate_linear %>% - # Ensemble with windowed_seasonal and windowed_seasonal_extra_sources - bind_rows(forecast_res %>% filter(forecaster %in% c("windowed_seasonal", "windowed_seasonal_extra_sources"))) %>% - group_by(geo_value, forecast_date, target_end_date, quantile) %>% - summarize(value = mean(value, na.rm = TRUE), .groups = "drop") %>% - sort_by_quantile() - } - ), - tar_target( - name = ens_ar_only, - command = { - forecast_res %>% - filter(forecaster %in% c("windowed_seasonal", "windowed_seasonal_extra_sources")) %>% - group_by(geo_value, forecast_date, target_end_date, quantile) %>% - summarize(value = mean(value, na.rm = TRUE), .groups = "drop") %>% - sort_by_quantile() - } - ), - tar_target( - name = climate_linear_modified, - command = { - forecast_res_modified %>% - # Apply the ahead-by-quantile weighting scheme - ensemble_linear_climate(aheads, other_weights = geo_forecasters_weights) %>% - filter(geo_value %nin% geo_exclusions) %>% - ungroup() %>% - sort_by_quantile() - } - ), - tar_target( - name = ens_climate_linear_window_season_modified, - command = { - climate_linear_modified %>% - # Ensemble with windowed_seasonal - bind_rows(forecast_res_modified %>% filter(forecaster %in% c("windowed_seasonal", "windowed_seasonal_extra_sources"))) %>% - group_by(geo_value, forecast_date, target_end_date, quantile) %>% - summarize(value = mean(value, na.rm = TRUE), .groups = "drop") %>% - sort_by_quantile() - } - ), - tar_target( - name = combo_ens_climate_linear_window_season, - command = { - inner_join( - ens_climate_linear_window_season, ens_climate_linear_window_season_modified, - by = join_by(geo_value, forecast_date, target_end_date, quantile) - ) %>% - rowwise() %>% - mutate(value = ifelse(quantile > 0.5, max(value.x, value.y), NA)) %>% - mutate(value = ifelse(quantile < 0.5, min(value.x, value.y), value)) %>% - mutate(value = ifelse(quantile == 0.5, (value.x + value.y) / 2, value)) %>% - select(geo_value, forecast_date, target_end_date, quantile, value) %>% - ungroup() + } else { + train_data <- nhsn_latest_data } - ), - tar_target( - name = forecasts_and_ensembles, - command = { - bind_rows( - forecast_res, - climate_linear %>% mutate(forecaster = "climate_linear"), - ens_ar_only %>% mutate(forecaster = "ens_ar_only"), - ens_climate_linear_window_season %>% mutate(forecaster = "ensemble_linclim_windowed_seasonal"), - combo_ens_climate_linear_window_season %>% mutate(forecaster = "ensemble_combo") + full_data <- train_data %>% + bind_rows(joined_latest_extra_data) + attributes(full_data)$metadata$other_keys <- "source" + attributes(full_data)$metadata$as_of <- as.Date(forecast_date_int) + full_data + } + ), + tar_target( + name = forecast_res, + command = { + nssp <- current_nssp_archive %>% + epix_as_of(min(as.Date(forecast_date_int), current_nssp_archive$versions_end)) %>% + mutate(time_value = time_value) + full_data %>% + forecaster_fns[[forecaster_fn_names]](ahead = aheads, extra_data = nssp) %>% + mutate( + forecaster = forecaster_fn_names, + geo_value = as.factor(geo_value) ) + }, + pattern = map(aheads) + ) +) + +combined_forecasts <- tar_combine( + name = forecast_full, + forecast_targets[["forecast_res"]], + command = { + dplyr::bind_rows(!!!.x) + } +) +# ================================ ENSEMBLE TARGETS ================================ +ensemble_targets <- tar_map( + values = tibble( + forecast_date_int = forecast_dates, + forecast_generation_date_int = forecast_generation_dates, + forecast_date_chr = as.character(forecast_dates) + ), + names = "forecast_date_chr", + tar_target( + name = forecast_full_filtered, + command = { + forecast_full %>% + filter(forecast_date == as.Date(forecast_date_int)) + } + ), + tar_target( + name = geo_forecasters_weights, + command = { + geo_forecasters_weights <- parse_prod_weights(flu_geo_exclusions, forecast_date_int, forecaster_fn_names_) + if (nrow(geo_forecasters_weights %>% filter(forecast_date == as.Date(forecast_date_int))) == 0) { + cli_abort("there are no weights for the forecast date {forecast_date}") } - ), - tar_target( - name = make_submission_csv, - command = { - combo_ens_climate_linear_window_season %>% + geo_forecasters_weights + }, + ), + tar_target( + name = geo_exclusions, + command = { + exclude_geos(geo_forecasters_weights) + } + ), + tar_target( + name = climate_linear, + command = { + forecast_full_filtered %>% + # Apply the ahead-by-quantile weighting scheme + ensemble_linear_climate(aheads, other_weights = geo_forecasters_weights) %>% + filter(geo_value %nin% geo_exclusions) %>% + ungroup() %>% + sort_by_quantile() + } + ), + tar_target( + name = ens_climate_linear_window_season, + command = { + climate_linear %>% + mutate(forecaster = "climate_linear") %>% + # Ensemble with windowed_seasonal and windowed_seasonal_extra_sources + bind_rows( + forecast_full_filtered %>% + filter(forecaster %in% c("windowed_seasonal", "windowed_seasonal_extra_sources")) + ) %>% + ensemble_weighted(geo_forecasters_weights) + } + ), + tar_target( + name = ens_ar_only, + command = { + forecast_full_filtered %>% + filter(forecaster %in% c("windowed_seasonal", "windowed_seasonal_extra_sources")) %>% + group_by(geo_value, forecast_date, target_end_date, quantile) %>% + summarize(value = mean(value, na.rm = TRUE), .groups = "drop") %>% + sort_by_quantile() + } + ), + tar_target( + name = forecasts_and_ensembles, + command = { + bind_rows( + forecast_full_filtered, + climate_linear %>% mutate(forecaster = "climate_linear"), + ens_ar_only %>% mutate(forecaster = "ens_ar_only"), + ens_climate_linear_window_season %>% mutate(forecaster = "ensemble_linclim_windowed_seasonal") + ) + } + ), + tar_target( + name = make_submission_csv, + command = { + if (!backtest_mode && submission_directory != "cache") { + ens_climate_linear_window_season %>% format_flusight(disease = "flu") %>% write_submission_file( get_forecast_reference_date(forecast_date_int), file.path(submission_directory, "model-output/CMU-TimeSeries") ) + } else { + cli_alert_info("Not making submission csv because we're in backtest mode or submission directory is cache") } - ), - tar_target( - name = make_climate_submission_csv, - command = { - if (submit_climatological) { - forecasts <- forecast_res - forecasts %>% - filter(forecaster %in% c("climate_base", "climate_geo_agged")) %>% - group_by(geo_value, target_end_date, quantile) %>% - summarize(forecast_date = first(forecast_date), value = mean(value, na.rm = TRUE), .groups = "drop") %>% - ungroup() %>% - filter(!(geo_value %in% excluded_geos)) %>% - format_flusight(disease = "flu") %>% - filter(location %nin% c("60", "66", "78")) %>% - write_submission_file( - get_forecast_reference_date(forecast_date_int), - submission_directory = file.path(submission_directory, "model-output/CMU-climate_baseline"), - file_name = "CMU-climate_baseline" - ) - } - }, - priority = 0.99 - ), - tar_target( - name = validate_result, - command = { - make_submission_csv - # only validate if we're saving the result to a hub - if (submission_directory != "cache") { - validation <- validate_submission( - submission_directory, - file_path = sprintf("CMU-TimeSeries/%s-CMU-TimeSeries.csv", get_forecast_reference_date(forecast_date_int)) - ) - } else { - validation <- "not validating when there is no hub (set submission_directory)" - } - validation - }, - ), - tar_target( - name = validate_climate_result, - command = { - make_climate_submission_csv - # only validate if we're saving the result to a hub - if (submission_directory != "cache" && submit_climatological) { - validation <- validate_submission( - submission_directory, - file_path = sprintf("CMU-climate_baseline/%s-CMU-climate_baseline.csv", get_forecast_reference_date(forecast_date_int)) + } + ), + tar_target( + name = make_climate_submission_csv, + command = { + if (!backtest_mode && submission_directory != "cache") { + forecast_full_filtered %>% + filter(forecaster %in% c("climate_base", "climate_geo_agged")) %>% + group_by(geo_value, target_end_date, quantile) %>% + summarize(forecast_date = as.Date(forecast_date_int), value = mean(value, na.rm = TRUE), .groups = "drop") %>% + ungroup() %>% + filter(!(geo_value %in% excluded_geos)) %>% + format_flusight(disease = "flu") %>% + filter(location %nin% c("60", "66", "78")) %>% + write_submission_file( + get_forecast_reference_date(forecast_date_int), + submission_directory = file.path(submission_directory, "model-output/CMU-climate_baseline"), + file_name = "CMU-climate_baseline" ) - } else { - validation <- "not validating when there is no hub (set submission_directory)" - } - validation - }, - ), - tar_target( - name = truth_data, - command = { - date <- forecast_generation_date_int - nssp_state <- - current_nssp_archive %>% - epix_as_of(min(as.Date(date), current_nssp_archive$versions_end)) %>% - rename(target_end_date = time_value) %>% - filter(target_end_date > truth_data_date, geo_value %nin% insufficient_data_geos) %>% - mutate(target_end_date = target_end_date + 6) - if (as.Date(forecast_generation_date_int) < Sys.Date()) { - truth_dat <- nhsn_archive_data %>% epix_as_of(as.Date(forecast_generation_date_int)) - } else { - truth_dat <- nhsn_latest_data - } - truth_dat <- truth_dat %>% - mutate(target_end_date = time_value) %>% - filter(time_value > truth_data_date) %>% - mutate(source = "nhsn") %>% - select(geo_value, target_end_date, source, value) - nssp_renormalized <- + } else { + cli_alert_info("Not making climate submission csv because we're in backtest mode or submission directory is cache") + } + }, + priority = 0.99 + ), + tar_target( + name = validate_result, + command = { + make_submission_csv + # only validate if we're saving the result to a hub + if (!backtest_mode && submission_directory != "cache") { + validation <- validate_submission( + submission_directory, + file_path = sprintf("CMU-TimeSeries/%s-CMU-TimeSeries.csv", get_forecast_reference_date(forecast_date_int)) + ) + } else { + validation <- "not validating when there is no hub (set submission_directory)" + } + validation + }, + ), + tar_target( + name = validate_climate_result, + command = { + make_climate_submission_csv + # only validate if we're saving the result to a hub + if (!backtest_mode && submission_directory != "cache") { + validation <- validate_submission( + submission_directory, + file_path = sprintf("CMU-climate_baseline/%s-CMU-climate_baseline.csv", get_forecast_reference_date(forecast_date_int)) + ) + } else { + validation <- "not validating when there is no hub (set submission_directory)" + } + validation + }, + ), + tar_target( + name = truth_data, + command = { + date <- forecast_generation_date_int + nssp_state <- retry_fn( + max_attempts = 20, + wait_seconds = 2, + fn = pub_covidcast, + source = "nssp", + signal = "pct_ed_visits_influenza", + time_type = "week", + geo_type = "state", + geo_values = "*", + fetch_args = epidatr::fetch_args_list(timeout_seconds = 400) + ) %>% + select(geo_value, source, target_end_date = time_value, value) %>% + filter(target_end_date > truth_data_date, geo_value %nin% insufficient_data_geos) %>% + mutate(target_end_date = target_end_date + 6) + if (as.Date(forecast_generation_date_int) < Sys.Date()) { + truth_dat <- nhsn_archive_data %>% epix_as_of(min(as.Date(forecast_generation_date_int), nhsn_archive_data$versions_end)) + } else { + truth_dat <- nhsn_latest_data + } + truth_dat <- truth_dat %>% + mutate(target_end_date = time_value) %>% + filter(time_value > truth_data_date) %>% + mutate(source = "nhsn") %>% + select(geo_value, target_end_date, source, value) + nssp_renormalized <- + nssp_state %>% + left_join( nssp_state %>% - rename(value = nssp) %>% - left_join( - nssp_state %>% - full_join( - truth_dat %>% - select(geo_value, target_end_date, value), - by = c("geo_value", "target_end_date") - ) %>% - group_by(geo_value) %>% - summarise(rel_max_value = max(value, na.rm = TRUE) / max(nssp, na.rm = TRUE)), - by = "geo_value" - ) %>% - mutate(value = value * rel_max_value) %>% - select(-rel_max_value) - truth_dat %>% bind_rows(nssp_renormalized) - }, - ), - tar_target( - notebook, - command = { + rename(nssp = value) %>% + full_join( + truth_dat %>% + select(geo_value, target_end_date, value), + by = c("geo_value", "target_end_date") + ) %>% + group_by(geo_value) %>% + summarise(rel_max_value = max(value, na.rm = TRUE) / max(nssp, na.rm = TRUE)), + by = "geo_value" + ) %>% + mutate(value = value * rel_max_value) %>% + select(-rel_max_value) + truth_dat %>% bind_rows(nssp_renormalized) + }, + ), + tar_target( + notebook, + command = { + if (!backtest_mode) { if (!dir.exists(here::here("reports"))) dir.create(here::here("reports")) rmarkdown::render( - "scripts/reports/forecast_report.Rmd", + forecast_report_rmd, output_file = here::here( "reports", sprintf("%s_flu_prod_on_%s.html", as.Date(forecast_date_int), Sys.Date()) @@ -450,14 +462,55 @@ rlang::list2( truth_data = truth_data ) ) - }, - cue = tar_cue(mode = "always") - ) - ), - tar_target( - new_data_notebook, - command = { - rmarkdown::render("scripts/reports/new_data.Rmd", output_file = here::here("reports", "new_data.html")) + } } ) ) + + + +# ================================ SCORE TARGETS ================================ +if (backtest_mode) { + score_targets <- list2( + tar_target( + external_forecasts, + command = { + get_external_forecasts("flu") + } + ), + tar_combine( + name = joined_forecasts_and_ensembles, + ensemble_targets[["forecasts_and_ensembles"]], + command = { + dplyr::bind_rows(!!!.x, external_forecasts) + } + ), + tar_target( + name = scores, + command = { + nhsn_latest_end_of_week <- + nhsn_latest_data %>% + mutate( + time_value = ceiling_date(time_value, unit = "week", week_start = 6) + ) + score_forecasts(nhsn_latest_end_of_week, joined_forecasts_and_ensembles) + } + ), + tar_target( + name = score_plot, + command = { + render_score_plot(score_report_rmd, scores, forecast_dates, "flu") + } + ) + ) +} else { + score_targets <- list() +} + +list2( + parameters_and_date_targets, + forecast_targets, + ensemble_targets, + combined_forecasts, + score_targets +) diff --git a/scripts/one_offs/get_forecast_data.r b/scripts/one_offs/get_forecast_data.r new file mode 100644 index 00000000..242559a8 --- /dev/null +++ b/scripts/one_offs/get_forecast_data.r @@ -0,0 +1,131 @@ +# Get Forecast Data +# +# This is a basic script to download the forecast data from a forecasting hub. +# +# The goal is to: +# 1. Download some select forecasts from a forecasting hub +# 2. Either save locally to a parquet file or upload to an S3 bucket +# 3. Optionally, read the parquet file back in and use it in a target pipeline +# 4. TODO: Add a companion metadata file that describes the forecasts in the parquet file +# 5. TODO: Add the metadata file to the S3 bucket +# 6. TODO: Allow incremental updates to the forecasts, by downloading only the new files + +library(tidyverse) +library(httr) +library(lubridate) +library(progress) + +options(readr.show_progress = FALSE) +options(readr.show_col_types = FALSE) + + +# Configuration +## config <- list( +## base_url = "https://raw.githubusercontent.com/cdcgov/covid19-forecast-hub/main/model-output", +## forecasters = c("CMU-TimeSeries", "CovidHub-baseline", "CovidHub-ensemble"), +## s3_bucket = "forecasting-team-data", +## s3_key = "covid/covid_hosp_forecasts.parquet", +## disease = "covid" +## ) +# same but for flu +config <- list( + base_url = "https://raw.githubusercontent.com/cdcepi/FluSight-forecast-hub/main/model-output", + forecasters = c("FluSight-baseline", "FluSight-ensemble", "CMU-TimeSeries"), + s3_bucket = "forecasting-team-data", + s3_key = "flu/flu_hosp_forecasts.parquet", + disease = "flu" +) + + +# Function to check if file exists on GitHub +check_github_file <- function(forecaster, filename) { + url <- paste0(config$base_url, "/", forecaster, "/", filename) + response <- GET(url) + return(status_code(response) == 200) +} + +# Function to download and read a single file +download_forecast_file <- function(forecaster, filename) { + url <- paste0(config$base_url, "/", forecaster, "/", filename) + + tryCatch( + { + # Download directly to memory and parse + df <- readr::read_csv(url) %>% + mutate( + forecaster = forecaster, + forecast_date = as.Date(str_extract(filename, "\\d{4}-\\d{2}-\\d{2}")), + output_type_id = as.numeric(output_type_id) + ) %>% + filter(output_type == "quantile") + return(df) + }, + error = function(e) { + warning(sprintf("Failed to download %s: %s", filename, e$message)) + return(NULL) + } + ) +} + +# Main function to fetch and combine forecast files +fetch_forecast_files <- function(days_back = 7 * 4 * 5, sync_to_s3 = TRUE) { + # Generate date range + # First get the nearest Saturday + end_date <- round_date(Sys.Date(), "week", 6) + start_date <- end_date - days_back + + # Initialize list to store all forecasts + all_forecasts <- list() + + pb_forecasters <- progress_bar$new( + format = "Downloading forecasts from :forecaster [:bar] :percent :eta", + total = length(config$forecasters), + clear = FALSE, + width = 60 + ) + + for (forecaster in config$forecasters) { + pb_forecasters$tick(tokens = list(forecaster = forecaster)) + + # Generate filenames for date range + dates <- seq(start_date, end_date, by = "week") + filenames <- paste0(format(dates, "%Y-%m-%d"), "-", forecaster, ".csv") + + # Create nested progress bar for files + pb_files <- progress_bar$new( + format = " Downloading files [:bar] :current/:total :filename", + total = length(filenames) + ) + + for (filename in filenames) { + pb_files$tick(tokens = list(filename = filename)) + + if (check_github_file(forecaster, filename)) { + forecast_data <- download_forecast_file(forecaster, filename) + if (!is.null(forecast_data)) { + all_forecasts[[length(all_forecasts) + 1]] <- forecast_data + } + } + } + } + + # Combine all forecasts + combined_forecasts <- bind_rows(all_forecasts) + + if (sync_to_s3) { + # Write to Parquet and upload to S3 + temp_file <- tempfile(fileext = ".parquet") + write_parquet(combined_forecasts, temp_file) + put_object( + file = temp_file, + object = config$s3_key, + bucket = config$s3_bucket + ) + unlink(temp_file) + } + + return(combined_forecasts) +} + +df <- fetch_forecast_files(sync_to_s3 = FALSE) +arrow::write_parquet(df, here::here(glue::glue("data/forecasts/{config$disease}_hosp_forecasts.parquet"))) diff --git a/scripts/one_offs/get_forecasts_and_scores_covid.R b/scripts/one_offs/get_forecasts_and_scores_covid.R deleted file mode 100644 index 188e44d2..00000000 --- a/scripts/one_offs/get_forecasts_and_scores_covid.R +++ /dev/null @@ -1,97 +0,0 @@ -# Bundle the forecasts from targets and put it in the S3 bucket. -# This script assumes that you have used `make pull` to download the needed -# forecasts and data from S3. Make sure that your project is set appropriately -# in .Renviron, for instance: TAR_PROJECT=flu_hosp_tiny. -source(here::here("R", "load_all.R")) - - - -# Get the truth data used for scoring -covid_2023_truth_data <- tar_read(hhs_evaluation_data) %>% - # This is a correction for the fact that we date the forecasts for the - # Wednesday they were made, but FluSight dates the for the upcoming Saturday - mutate(target_end_date = target_end_date - 2) %>% - # The data has unserializable ALTREP objects that bork on S3 load without - # this. - as.data.table() %>% - as_tibble() -s3save(covid_2023_truth_data, object = "covid_2023_truth_data.rds", bucket = "forecasting-team-data") - - -# Get the local forecasts -state_geo_values <- covid_2023_truth_data %>% - pull(geo_value) %>% - unique() -df <- tar_manifest() -forecasts <- df$name %>% - keep(~ stringr::str_detect(., "forecast_")) %>% - map(function(name) { - tryCatch( - { - tibble(forecaster = str_remove(name, "forecast_"), tar_read_raw(name)) - }, - error = function(e) { - print(name) - tibble() - } - ) - }) %>% - bind_rows() %>% - filter(geo_value %in% state_geo_values) -cmu_covid_2023_forecasts <- forecasts %>% - # left_join( - # covid_2023_truth_data %>% - # select(geo_value, population) %>% - # distinct(), - # by = "geo_value" - # ) %>% - # # mutate(prediction = prediction * population / 10L**5) %>% - # select(-source, -population) %>% - # This is a correction for the fact that we date the forecasts for the - # Wednesday they were made, but FluSight dates the for the upcoming Saturday - mutate(forecast_date = forecast_date - 2, target_end_date = target_end_date - 2) - - -# Get the forecasts from Covidhub 2023 -s3load("covidhub_forecasts_2023.rds", bucket = "forecasting-team-data") -cmu_forecast_dates <- cmu_covid_2023_forecasts %>% - pull(forecast_date) %>% - unique() - -# Join and score -covid_2023_joined_forecasts <- bind_rows( - cmu_covid_2023_forecasts, - covidhub_forecasts_2023 %>% - filter(forecast_date %in% cmu_forecast_dates) %>% - mutate(prediction = prediction * 7) -) -s3save(covid_2023_joined_forecasts, object = "covid_2023_joined_forecasts.rds", bucket = "forecasting-team-data") - -covid_2023_truth_data %>% - pull(target_end_date) %>% - unique() %>% - sort() - -covid_2023_joined_scores <- covid_2023_joined_forecasts %>% - left_join( - covid_2023_truth_data, - by = c("geo_value", "target_end_date") - ) %>% - as.data.table() %>% - rename(model = forecaster) %>% - scoringutils::score(metrics = c("interval_score", "ae_median", "coverage")) %>% - scoringutils::add_coverage(by = c("model", "geo_value", "forecast_date", "target_end_date"), ranges = c(80)) %>% - scoringutils::summarize_scores(by = c("model", "geo_value", "forecast_date", "target_end_date")) %>% - as_tibble() %>% - select( - forecaster = model, - geo_value, - forecast_date, - target_end_date, - wis = interval_score, - ae = ae_median, - coverage_80 - ) %>% - mutate(ahead = as.numeric(target_end_date - forecast_date)) - -s3save(covid_2023_joined_scores, object = "covid_2023_joined_scores.rds", bucket = "forecasting-team-data") diff --git a/scripts/one_offs/read_covid_forecast_hub_data.jl b/scripts/one_offs/read_covid_forecast_hub_data.jl index ffaef48c..327bd2a8 100644 --- a/scripts/one_offs/read_covid_forecast_hub_data.jl +++ b/scripts/one_offs/read_covid_forecast_hub_data.jl @@ -3,22 +3,23 @@ # to get the rds, run # # full_results <- readr::read_csv("../covid19-forecast-hub/data-processed/covid19-2023season-results.csv") -# aws.s3::s3save(full_results, object = "covid19_forecast_hub_2023.rds", bucket = "forecasting-team-data") +# aws.s3::s3save(full_results, object = "covid19_forecast_hub_2023_full_summed.rds", bucket = "forecasting-team-data") # +using Base: floatrange using CSV using DataFrames using DataFramesMeta using Dates using RData +import Base.lowercase pwd() -res = CSV.read("COVIDhub-ensemble/2023-10-02-COVIDhub-ensemble.csv", DataFrame) -pathname = "COVIDhub-ensemble/" -filename = "2023-10-02-COVIDhub-ensemble.csv" +res = CSV.read("COVIDhub_CDC-ensemble/2023-10-02-COVIDhub_CDC-ensemble.csv", DataFrame) +pathname = "COVIDhub_CDC-ensemble/" +filename = "2023-10-02-COVIDhub_CDC-ensemble.csv" state_names = CSV.read("../data-locations/locations.csv", DataFrame) lowercase(m::Missing) = m @rtransform! state_names @passmissing :abbreviation = lowercase(:abbreviation) @select! state_names :abbreviation :location - function format_file(pathname, filename, state_names) if length(filename) < 10 || match(r"[0-9]{4}-[0-9]{2}-[0-9]{2}", filename[1:10]) == nothing || @@ -26,9 +27,7 @@ function format_file(pathname, filename, state_names) return DataFrame() end println(joinpath(pathname, filename)) - - res = CSV.read(joinpath(pathname, filename), DataFrame, missingstring="NA") - + res = CSV.read(joinpath(pathname, filename), DataFrame, missingstring="NA", types=Dict("value" => Float64)) if !("forecast_date" in names(res)) || res[!, :forecast_date] |> minimum < Date(2023, 1, 1) return DataFrame() @@ -36,12 +35,16 @@ function format_file(pathname, filename, state_names) @transform(res, :target = (:target)) res = @chain res begin @rtransform :target = parse(Int64, match(r"[0-9]*", :target).match) - @transform :forecaster = pathname + @transform :forecaster = pathname[3:end] @rsubset :type == "quantile" end res = leftjoin(res, state_names, on=:location) @select! res :forecaster :geo_value = :abbreviation :forecast_date :target_end_date :ahead = :target :quantile :value - res + @chain res begin + @rtransform :week_ahead = div(:ahead, 7) + @groupby :forecaster :geo_value :forecast_date :week_ahead :quantile + @combine :value = sum(:value) + end end results = DataFrame[] for (root, dirs, files) in walkdir(".") @@ -51,10 +54,3 @@ for (root, dirs, files) in walkdir(".") end full_results = vcat(results...) CSV.write("covid19-2023season-results.csv", full_results) -full_results[!, :forecaster] |> unique -@rsubset! full_results :ahead % 7 == 0 -@rtransform! full_results :forecaster = :forecaster[3:end] -"./fqfae"[3:end] -3 % 7 -@rsubset full_results !ismissing(:geo_value) :forecast_date == Date(2023,11,13) -@rsubset res :forecast_date == Date(2023,11,0) diff --git a/scripts/reports/forecast_report.Rmd b/scripts/reports/forecast_report.Rmd index 5d04c624..592e4466 100644 --- a/scripts/reports/forecast_report.Rmd +++ b/scripts/reports/forecast_report.Rmd @@ -4,6 +4,7 @@ author: Delphi Forecast Team date: "Rendered: `r format(Sys.time(), '%Y-%m-%d %H:%M:%S')`, Forecast date: `r params$forecast_date`, Last day of data: `r max(params$truth_data$target_end_date)`" output: html_document: + code_folding: hide toc: True # self_contained: False # lib_dir: libs diff --git a/scripts/reports/new_data.Rmd b/scripts/reports/new_data.Rmd index 1d05923f..82c7add4 100644 --- a/scripts/reports/new_data.Rmd +++ b/scripts/reports/new_data.Rmd @@ -4,6 +4,9 @@ date: "Rendered: `r format(Sys.time(), '%Y-%m-%d %H:%M:%S')`" output: html_document: code_folding: hide + toc: True + # self_contained: False + # lib_dir: libs editor_options: chunk_output_type: console --- @@ -40,7 +43,7 @@ if (wday(Sys.Date()) < 6 & wday(Sys.Date()) > 3) { most_recent_result %<>% process_nhsn_data() df <- most_recent_result %>% filter(disease == "nhsn_flu") -df %>% +p <- df %>% filter(geo_value %in% c("ca", "fl", "tx", "ny", "pa", "mn", "nm")) %>% ggplot(aes(x = time_value, y = value, color = geo_value)) + geom_line() + @@ -51,6 +54,7 @@ df %>% y = "Total Confirmed Flu Admissions" ) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) +ggplotly(p, tooltip = "text", height = 800, width = 1000) ``` # State Season Comparison {.tabset} @@ -58,7 +62,7 @@ df %>% ## California ```{r} -df %>% +p <- df %>% filter(geo_value == "ca") %>% ggplot(aes(x = season_week, y = value, color = season)) + geom_line() + @@ -69,12 +73,13 @@ df %>% y = "Total Confirmed Flu Admissions" ) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) +ggplotly(p, tooltip = "text", height = 800, width = 1000) ``` ## Florida ```{r} -df %>% +p <- df %>% filter(geo_value == "fl") %>% ggplot(aes(x = season_week, y = value, color = season)) + geom_line() + @@ -85,12 +90,13 @@ df %>% y = "Total Confirmed Flu Admissions" ) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) +ggplotly(p, tooltip = "text", height = 800, width = 1000) ``` ## Texas ```{r} -df %>% +p <- df %>% filter(geo_value == "tx") %>% ggplot(aes(x = season_week, y = value, color = season)) + geom_line() + @@ -101,12 +107,13 @@ df %>% y = "Total Confirmed Flu Admissions" ) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) +ggplotly(p, tooltip = "text", height = 800, width = 1000) ``` ## New York ```{r} -df %>% +p <- df %>% filter(geo_value == "ny") %>% ggplot(aes(x = season_week, y = value, color = season)) + geom_line() + @@ -117,12 +124,13 @@ df %>% y = "Total Confirmed Flu Admissions" ) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) +ggplotly(p, tooltip = "text", height = 800, width = 1000) ``` ## Pennsylvania ```{r} -df %>% +p <- df %>% filter(geo_value == "pa") %>% ggplot(aes(x = season_week, y = value, color = season)) + geom_line() + @@ -133,12 +141,13 @@ df %>% y = "Total Confirmed Flu Admissions" ) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) +ggplotly(p, tooltip = "text", height = 800, width = 1000) ``` ## Minnesota ```{r} -df %>% +p <- df %>% filter(geo_value == "mn") %>% ggplot(aes(x = season_week, y = value, color = season)) + geom_line() + @@ -149,12 +158,13 @@ df %>% y = "Total Confirmed Flu Admissions" ) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) +ggplotly(p, tooltip = "text", height = 800, width = 1000) ``` ## New Mexico ```{r} -df %>% +p <- df %>% filter(geo_value == "nm") %>% ggplot(aes(x = season_week, y = value, color = season)) + geom_line() + @@ -165,6 +175,7 @@ df %>% y = "Total Confirmed Flu Admissions" ) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) +ggplotly(p, tooltip = "text", height = 800, width = 1000) ``` # Comparing with Old NHSN Data diff --git a/scripts/reports/score_report.Rmd b/scripts/reports/score_report.Rmd new file mode 100644 index 00000000..26b4d9b0 --- /dev/null +++ b/scripts/reports/score_report.Rmd @@ -0,0 +1,116 @@ +--- +title: "`r `r params$disease` score report" +author: Delphi Forecast Team +date: "Rendered: `r format(Sys.time(), '%Y-%m-%d %H:%M:%S')`" +output: + html_document: + code_folding: hide + toc: True + # self_contained: False + # lib_dir: libs +params: + disease: "covid" + scores: "" + forecast_dates: "" +--- + +```{css, echo=FALSE} +body { + display: block; + max-width: 1280px !important; + margin-left: auto; + margin-right: auto; +} + +body .main-container { + max-width: 1280px !important; + width: 1280px !important; +} +``` + +```{r echo=FALSE} +knitr::opts_chunk$set( + fig.align = "center", + message = FALSE, + warning = FALSE, + cache = FALSE +) +ggplot2::theme_set(ggplot2::theme_bw()) +``` + +```{r setup, include=FALSE} +suppressPackageStartupMessages(source(here::here("R", "load_all.R"))) +library(DT) + +# Define aggregation functions +Mean <- function(x) mean(x, na.rm = TRUE) +GeoMean <- function(x, offset = 0) exp(Mean(log(x + offset))) +scores <- params$scores +``` + +# Forecaster Predictions for `r params$disease`: {.tabset} + +Forecast dates: `r params$forecast_dates` + +## Scores Aggregated By Forecaster + +```{r, fig.height = 60, fig.width = 12, echo=FALSE} +scores %>% + group_by(forecaster) %>% + summarize( + mean_wis = round(Mean(wis), 2), + geomean_wis = round(GeoMean(wis), 2), + mean_ae = round(Mean(ae_median), 2), + geomean_ae = round(GeoMean(ae_median), 2), + mean_coverage_90 = round(Mean(interval_coverage_90), 2), + n = n() + ) %>% + rename(id = forecaster) %>% + datatable() +``` + +## Scores Aggregated By Forecast Date + +```{r, fig.height = 8, fig.width = 12, echo=FALSE} +agg <- scores %>% + group_by(forecaster, forecast_date) %>% + summarize( + mean_wis = round(Mean(wis), 2), + geomean_wis = round(GeoMean(wis), 2), + mean_ae = round(Mean(ae_median), 2), + geomean_ae = round(GeoMean(ae_median), 2), + mean_interval_coverage_90 = round(Mean(interval_coverage_90), 2), + ) + +# Plot the scores as lines across forecast_date +p <- ggplot(agg, aes(x = forecast_date, y = mean_wis, color = forecaster)) + + geom_line() + + theme_bw() + + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + + labs(x = "Forecast Date", y = "Mean WIS") + +ggplotly(p) +``` + +## Scores Aggregated By Ahead + +```{r, fig.height = 8, fig.width = 12, echo=FALSE} +agg <- scores %>% + group_by(forecaster, ahead) %>% + summarize( + mean_wis = round(Mean(wis), 2), + geomean_wis = round(GeoMean(wis), 2), + mean_ae = round(Mean(ae_median), 2), + geomean_ae = round(GeoMean(ae_median), 2), + mean_interval_coverage_90 = round(Mean(interval_coverage_90), 2), + ) + +# Plot the scores as lines across forecast_date +p <- ggplot(agg, aes(x = ahead, y = mean_wis, color = forecaster)) + + geom_line() + + theme_bw() + + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + + labs(x = "Forecast Date", y = "Mean WIS") + +ggplotly(p) +``` diff --git a/scripts/run_prod.R b/scripts/run_prod.R index 1f2ab0bd..7e694dd6 100644 --- a/scripts/run_prod.R +++ b/scripts/run_prod.R @@ -26,7 +26,9 @@ suppressPackageStartupMessages(source(here::here("R", "load_all.R"))) # ) # # Save to disk # saveRDS(scorecards, "exploration-scorecards-2023-10-04.RDS") -print(glue::glue("starting a run at {Sys.time()}")) +print("########################################################") +print("########################################################") +print(glue::glue("Starting a run at {Sys.time()}")) tar_project <- Sys.getenv("TAR_RUN_PROJECT", "flu_hosp_prod") external_scores_path <- Sys.getenv("EXTERNAL_SCORES_PATH", "") debug_mode <- as.logical(Sys.getenv("DEBUG_MODE", FALSE)) @@ -47,24 +49,17 @@ cli::cli_inform( ) ) -suppressPackageStartupMessages({ - library(targets) - library(shiny) -}) - # targets needs the output dir to already exist. store_dir <- tar_path_store() if (!dir.exists(store_dir)) dir.create(store_dir) -tar_manifest() -print(Sys.time()) tar_make( store = tar_config_get("store", project = tar_project), script = tar_config_get("script", project = tar_project), use_crew = TRUE ) -print(Sys.time()) +print(glue::glue("Finished at {Sys.time()}")) print("########################################################") print("########################################################") diff --git a/scripts/targets-common.R b/scripts/targets-common.R index 3cb3c194..55258ca7 100644 --- a/scripts/targets-common.R +++ b/scripts/targets-common.R @@ -4,25 +4,19 @@ suppressPackageStartupMessages({ # On tanka, we have 64 cores, but we leave some free to try to reduce thrashing # and to allow for other users. -if (parallel::detectCores() < 30) { - num_workers <- parallel::detectCores() - 1L +if (parallel::detectCores() == 64) { + num_workers <- 30L } else { - num_workers <- parallel::detectCores() - 20L + num_workers <- parallel::detectCores() - 1L } main_controller <- crew_controller_local( name = "main_controller", workers = num_workers, - # These settings were cobbled together from various discussion threads on the - # targets Github. There's been an ongoing issue in a dependency of {crew} - # called {mirai}, where workers mysteriously stop working. The settings below - # are an attempt to mitigate that. seconds_idle = 60L, - seconds_timeout = 7 * 24 * 60 * 60L, # 7 days is probably enough + seconds_timeout = 24 * 60 * 60L, garbage_collection = TRUE, - options_local = crew_options_local(log_directory = "local_logs"), - tasks_max = 1L, - launch_max = 10000L + options_local = crew_options_local(log_directory = "local_logs") ) # The external scores processing causes the pipeline to exit with an error, # apparently due to running out of memory. Set up a non-parallel `crew` @@ -33,10 +27,8 @@ serial_controller <- crew_controller_local( workers = 1L, options_local = crew_options_local(log_directory = "local_logs"), seconds_idle = 60L, - seconds_timeout = 7 * 24 * 60 * 60L, - garbage_collection = TRUE, - tasks_max = 1L, - launch_max = 10000L + seconds_timeout = 24 * 60 * 60L, + garbage_collection = TRUE ) # Serial mode is better for debugging. diff --git a/scripts/targets-exploration-common.R b/scripts/targets-exploration-common.R index 27cdff27..89457c1d 100644 --- a/scripts/targets-exploration-common.R +++ b/scripts/targets-exploration-common.R @@ -11,7 +11,10 @@ make_data_targets <- function() { tar_target( name = hhs_latest_data, command = { - epidatr::pub_covidcast( + retry_fn( + max_attempts = 10, + wait_seconds = 1, + fn = pub_covidcast, source = "hhs", signals = hhs_signal, geo_type = "state", @@ -25,7 +28,10 @@ make_data_targets <- function() { tar_target( name = chng_latest_data, command = { - epidatr::pub_covidcast( + retry_fn( + max_attempts = 10, + wait_seconds = 1, + fn = pub_covidcast, source = "chng", signals = chng_signal, geo_type = "state", @@ -67,7 +73,10 @@ make_data_targets <- function() { tar_target( name = hhs_archive_data, command = { - epidatr::pub_covidcast( + retry_fn( + max_attempts = 10, + wait_seconds = 1, + fn = pub_covidcast, source = "hhs", signals = hhs_signal, geo_type = "state", @@ -85,7 +94,10 @@ make_data_targets <- function() { start_time <- as.Date(training_time$from, format = "%Y%m%d") stop_time <- Sys.Date() half <- floor((stop_time - start_time) / 2) - first_half <- epidatr::pub_covidcast( + first_half <- retry_fn( + max_attempts = 10, + wait_seconds = 1, + fn = pub_covidcast, source = "chng", signals = chng_signal, geo_type = "state", @@ -95,7 +107,10 @@ make_data_targets <- function() { issues = epidatr::epirange(from = start_time, to = start_time + half), fetch_args = fetch_args ) - second_half <- epidatr::pub_covidcast( + second_half <- retry_fn( + max_attempts = 10, + wait_seconds = 1, + fn = pub_covidcast, source = "chng", signals = chng_signal, geo_type = "state", @@ -338,7 +353,7 @@ make_historical_flu_data_targets <- function() { add_season_info() %>% mutate(agg_level = ifelse(grepl("[0-9]{2}", geo_value), "hhs_region", ifelse("us" == geo_value, "nation", "state"))) %>% add_pop_and_density() %>% - mutate(hhs = hhs_7dsum / population * 10L^5) %>% + mutate(hhs = hhs / population * 10L^5) %>% mutate(source = "nhsn") %>% mutate(agg_level = ifelse(geo_value == "us", "nation", "state")) %>% as_epi_archive(other_keys = "source", compactify = TRUE) %>% @@ -402,14 +417,20 @@ make_historical_flu_data_targets <- function() { tar_target( name = nssp_archive, command = { - nssp_state <- pub_covidcast( + nssp_state <- retry_fn( + max_attempts = 10, + wait_seconds = 1, + fn = pub_covidcast, source = "nssp", signal = "pct_ed_visits_influenza", time_type = "week", geo_type = "state", geo_values = "*" ) - nssp_hhs <- pub_covidcast( + nssp_hhs <- retry_fn( + max_attempts = 10, + wait_seconds = 1, + fn = pub_covidcast, source = "nssp", signal = "pct_ed_visits_influenza", time_type = "week", @@ -439,14 +460,20 @@ make_historical_flu_data_targets <- function() { # source going down completely, which means we're actually just comparing # with the version without this source all_of_them <- lapply(used_searches, \(search_name) { - google_symptoms_state_archive <- pub_covidcast( + google_symptoms_state_archive <- retry_fn( + max_attempts = 10, + wait_seconds = 1, + fn = pub_covidcast, source = "google-symptoms", signal = glue::glue("s0{search_name}_smoothed_search"), time_type = "day", geo_type = "state", geo_values = "*" ) - google_symptoms_hhs_archive <- pub_covidcast( + google_symptoms_hhs_archive <- retry_fn( + max_attempts = 10, + wait_seconds = 1, + fn = pub_covidcast, source = "google-symptoms", signal = glue::glue("s0{search_name}_smoothed_search"), time_type = "day", diff --git a/tests/testthat/_snaps/forecasters-basics.md b/tests/testthat/_snaps/forecasters-basics.md new file mode 100644 index 00000000..113509a4 --- /dev/null +++ b/tests/testthat/_snaps/forecasters-basics.md @@ -0,0 +1,39 @@ +# scaled_pop deals with no as_of + + Code + res <- forecaster[[2]](jhu, "case_rate", extra_sources = "death_rate", ahead = 2L) + +# flatline_fc deals with no as_of + + Code + res <- forecaster[[2]](jhu, "case_rate", extra_sources = "death_rate", ahead = 2L) + +# smoothed_scaled deals with no as_of + + Code + res <- forecaster[[2]](jhu, "case_rate", extra_sources = "death_rate", ahead = 2L) + Condition + Error in `rename()`: + ! Can't rename columns that don't exist. + x Column `slide_value_case_rate` doesn't exist. + +# flusion deals with no as_of + + Code + res <- forecaster[[2]](jhu, "case_rate", extra_sources = "death_rate", ahead = 2L) + Condition + Warning: + No columns were selected in `add_role()`. + Error in `dplyr::transmute()`: + i In argument: `across(...)`. + i In group 1: `geo_value = ak` and `source = nhsn`. + Caused by error in `across()`: + ! Can't compute column `gr_21_rel_change_case_rate`. + Caused by error in `epiprocess::growth_rate()`: + ! `x` contains duplicate values. (If being run on a column in an `epi_df`, did you group by relevant key variables?) + +# no_recent_outcome deals with no as_of + + Code + res <- forecaster[[2]](jhu, "case_rate", extra_sources = "death_rate", ahead = 2L) + diff --git a/tests/testthat/test-data-whitening.R b/tests/testthat/test-data-whitening.R index 02aed452..c22e5290 100644 --- a/tests/testthat/test-data-whitening.R +++ b/tests/testthat/test-data-whitening.R @@ -1,5 +1,5 @@ source(here::here("R", "load_all.R")) -real_ex <- epipredict::case_death_rate_subset %>% +real_ex <- epidatasets::covid_case_death_rates %>% as_tibble() %>% mutate(source = "same") %>% as_epi_df(other_keys = "source") diff --git a/tests/testthat/test-forecaster-utils.R b/tests/testthat/test-forecaster-utils.R index f6ba8261..737f6349 100644 --- a/tests/testthat/test-forecaster-utils.R +++ b/tests/testthat/test-forecaster-utils.R @@ -1,7 +1,7 @@ source(here::here("R", "load_all.R")) test_that("sanitize_args_predictors_trainer", { - epi_data <- epipredict::case_death_rate_subset + epi_data <- epidatasets::covid_case_death_rates # don't need to test validate_forecaster_inputs as that's inherited # testing args_list inheritance ex_args <- default_args_list() diff --git a/tests/testthat/test-forecasters-basics.R b/tests/testthat/test-forecasters-basics.R index 09dff3c8..66e20dd9 100644 --- a/tests/testthat/test-forecasters-basics.R +++ b/tests/testthat/test-forecasters-basics.R @@ -10,7 +10,7 @@ forecasters <- list( ) for (forecaster in forecasters) { test_that(paste(forecaster[[1]], "gets the date and columns right"), { - jhu <- epipredict::case_death_rate_subset %>% + jhu <- epidatasets::covid_case_death_rates %>% dplyr::filter(time_value >= as.Date("2021-11-01")) # the as_of for this is wildly far in the future attributes(jhu)$metadata$as_of <- max(jhu$time_value) + 3 @@ -28,7 +28,7 @@ for (forecaster in forecasters) { }) test_that(paste(forecaster[[1]], "handles only using 1 column correctly"), { - jhu <- epipredict::case_death_rate_subset %>% + jhu <- epidatasets::covid_case_death_rates %>% dplyr::filter(time_value >= as.Date("2021-11-01")) # the as_of for this is wildly far in the future attributes(jhu)$metadata$as_of <- max(jhu$time_value) + 3 @@ -40,21 +40,17 @@ for (forecaster in forecasters) { }) test_that(paste(forecaster[[1]], "deals with no as_of"), { - jhu <- epipredict::case_death_rate_subset %>% + jhu <- epidatasets::covid_case_death_rates %>% dplyr::filter(time_value >= as.Date("2021-11-01")) # what if we have no as_of date? assume they mean the last available data attributes(jhu)$metadata$as_of <- NULL - if (forecaster[[1]] != "flatline_fc") { - expect_snapshot(error = TRUE, res <- forecaster[[2]](jhu, "case_rate", c("death_rate"), 2L)) - } else { - expect_snapshot(error = FALSE, res <- forecaster[[2]](jhu, "case_rate", c("death_rate"), 2L)) - } + expect_snapshot(error = FALSE, res <- forecaster[[2]](jhu, "case_rate", extra_sources = "death_rate", ahead = 2L)) }) test_that(paste(forecaster[[1]], "handles last second NA's"), { # if the last entries are NA, we should still predict # TODO: currently this checks that we DON'T predict - jhu <- epipredict::case_death_rate_subset %>% + jhu <- epidatasets::covid_case_death_rates %>% dplyr::filter(time_value >= as.Date("2021-11-01")) geo_values <- jhu$geo_value %>% unique() one_day_nas <- tibble( @@ -84,7 +80,7 @@ for (forecaster in forecasters) { test_that(paste(forecaster[[1]], "handles unused extra sources with NAs"), { # if there is an extra source we aren't using, we should ignore any NA's it has - jhu <- epipredict::case_death_rate_subset %>% + jhu <- epidatasets::covid_case_death_rates %>% dplyr::filter(time_value >= as.Date("2021-11-01")) jhu_nad <- jhu %>% as_tibble() %>% @@ -103,7 +99,7 @@ for (forecaster in forecasters) { # any forecaster specific tests if (forecaster[[1]] == "scaled_pop" || forecaster[[1]] == "smoothed_scaled") { test_that(paste(forecaster[[1]], "scaled and unscaled don't make the same predictions"), { - jhu <- epipredict::case_death_rate_subset %>% + jhu <- epidatasets::covid_case_death_rates %>% dplyr::filter(time_value >= as.Date("2021-11-01")) # the as_of for this is wildly far in the future attributes(jhu)$metadata$as_of <- max(jhu$time_value) + 3 @@ -125,7 +121,7 @@ for (forecaster in forecasters) { }) } else if (forecaster[[1]] == "smoothed_scaled") { testthat("smoothed_scaled handles variable lags correctly", { - jhu <- epipredict::case_death_rate_subset %>% + jhu <- epidatasets::covid_case_death_rates %>% dplyr::filter(time_value >= as.Date("2021-11-01")) # the as_of for this is wildly far in the future attributes(jhu)$metadata$as_of <- max(jhu$time_value) + 3 @@ -136,7 +132,7 @@ for (forecaster in forecasters) { # test case where extra_sources is "empty" # test case where the epi_df is empty test_that(paste(forecaster[[1]], "empty epi_df predicts nothing"), { - jhu <- epipredict::case_death_rate_subset %>% + jhu <- epidatasets::covid_case_death_rates %>% dplyr::filter(time_value >= as.Date("2021-11-01")) # the as_of for this is wildly far in the future attributes(jhu)$metadata$as_of <- max(jhu$time_value) + 3 @@ -153,7 +149,7 @@ for (forecaster in forecasters) { # unique tests test_that("flatline_fc same across aheads", { - jhu <- epipredict::case_death_rate_subset %>% + jhu <- epidatasets::covid_case_death_rates %>% dplyr::filter(time_value >= as.Date("2021-11-01")) attributes(jhu)$metadata$as_of <- max(jhu$time_value) + 3 resM2 <- flatline_fc(jhu, "case_rate", c("death_rate"), -2L) %>% @@ -170,7 +166,7 @@ test_that("flatline_fc same across aheads", { }) test_that("ensemble_average", { - jhu <- epipredict::case_death_rate_subset %>% + jhu <- epidatasets::covid_case_death_rates %>% dplyr::filter(time_value >= as.Date("2021-11-01")) # the as_of for this is wildly far in the future attributes(jhu)$metadata$as_of <- max(jhu$time_value) + 3 diff --git a/tests/testthat/test-latency_adjusting.R b/tests/testthat/test-latency_adjusting.R index 8b65afcf..10daa2ce 100644 --- a/tests/testthat/test-latency_adjusting.R +++ b/tests/testthat/test-latency_adjusting.R @@ -2,14 +2,14 @@ source(here::here("R", "load_all.R")) test_that("extend_ahead", { # testing that POSIXct converts correctly (as well as basic types) - expect_no_error(epidataAhead <- extend_ahead(epipredict::case_death_rate_subset, 1)) + expect_no_error(epidataAhead <- extend_ahead(epidatasets::covid_case_death_rates, 1)) epi_data <- epidataAhead[[1]] effective_ahead <- epidataAhead[[2]] - expect_identical(epi_data, epipredict::case_death_rate_subset) + expect_identical(epi_data, epidatasets::covid_case_death_rates) expect_type(effective_ahead, "integer") # testing the date math works correctly - jhu <- epipredict::case_death_rate_subset %>% + jhu <- epidatasets::covid_case_death_rates %>% dplyr::filter(time_value >= as.Date("2021-12-01")) # the as_of for this is wildly far in the future attributes(jhu)$metadata$as_of <- max(jhu$time_value) + 3