Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

covid: evaluate current season #168

Merged
merged 20 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ reports/report.md
cache/
data/
.vscode/
.envrc
.nhsn_covid_cache.parquet
.nhsn_flu_cache.parquet
meta/
1 change: 1 addition & 0 deletions .renvignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
tmp.R
node_modules/*
local_logs/
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ submit: submit-covid submit-flu
get_nwss:
mkdir -p aux_data/nwss_covid_data; \
mkdir -p aux_data/nwss_flu_data; \
. .venv/bin/activate; \
cd scripts/nwss_export_tool/; \
python nwss_covid_export.py; \
python nwss_covid_export.py
python nwss_influenza_export.py

run-nohup:
nohup Rscript scripts/run.R &
Expand Down Expand Up @@ -94,3 +95,9 @@ update_site:

netlify:
netlify deploy --dir=reports --prod

get_flu_prod_errors:
Rscript -e "suppressPackageStartupMessages(source(here::here('R', 'load_all.R'))); get_targets_errors(project = 'flu_hosp_prod')"

get_covid_prod_errors:
Rscript -e "suppressPackageStartupMessages(source(here::here('R', 'load_all.R'))); get_targets_errors(project = 'covid_hosp_prod')"
175 changes: 137 additions & 38 deletions R/aux_data_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -188,38 +188,76 @@ daily_to_weekly <- function(epi_df, agg_method = c("sum", "mean"), day_of_week =
select(-epiweek, -year)
}

#' Aggregate a daily archive to a weekly archive.
#'
#' @param epi_arch the archive to aggregate.
#' @param agg_columns the columns to aggregate.
#' @param agg_method the method to use to aggregate the data, one of "sum" or "mean".
#' @param day_of_week the day of the week to use as the reference day.
#' @param day_of_week_end the day of the week to use as the end of the week.
daily_to_weekly_archive <- function(epi_arch,
agg_columns,
agg_method = c("sum", "mean"),
day_of_week = 4L,
day_of_week_end = 7L) {
# How to aggregate the windowed data.
agg_method <- arg_match(agg_method)
# The columns we will later group by when aggregating.
keys <- key_colnames(epi_arch, exclude = c("time_value", "version"))
# The versions we will slide over.
ref_time_values <- epi_arch$DT$version %>%
unique() %>%
sort()
# Choose a fast function to use to slide and aggregate.
if (agg_method == "sum") {
slide_fun <- epi_slide_sum
} else if (agg_method == "mean") {
slide_fun <- epi_slide_mean
}
too_many_tibbles <- epix_slide(
# Slide over the versions and aggregate.
epix_slide(
epi_arch,
.before = 99999999L,
.versions = ref_time_values,
function(x, group, ref_time) {
ref_time_last_week_end <-
floor_date(ref_time, "week", day_of_week_end - 1) # this is over by 1
function(x, group_keys, ref_time) {
# The last day of the week we will slide over.
ref_time_last_week_end <- floor_date(ref_time, "week", day_of_week_end - 1)

# To find the days we will slide over, we need to find the first and last
# complete weeks of data. Get the max and min times, and then find the
# first and last complete weeks of data.
min_time <- min(x$time_value)
max_time <- max(x$time_value)
valid_slide_days <- seq.Date(
from = ceiling_date(min(x$time_value), "week", week_start = day_of_week_end - 1),
to = floor_date(max(x$time_value), "week", week_start = day_of_week_end - 1),
by = 7L
)

# Let's determine if the min and max times are in the same week.
ceil_min_time <- ceiling_date(min_time, "week", week_start = day_of_week_end - 1)
ceil_max_time <- ceiling_date(max_time, "week", week_start = day_of_week_end - 1)

# If they're not in the same week, this means we have at least one
# complete week of data to slide over.
if (ceil_min_time < ceil_max_time) {
valid_slide_days <- seq.Date(
from = ceiling_date(min_time, "week", week_start = day_of_week_end - 1),
to = floor_date(max_time, "week", week_start = day_of_week_end - 1),
by = 7L
)
} else {
# This is the degenerate case, where we have about 1 week or less of
# data. In this case, we opt to return nothing for two reasons:
# 1. in most cases here, the data is incomplete for a single week,
# 2. if the data is complete, a single week of data is not enough to
# reasonably perform any kind of aggregation.
return(tibble())
}

# If the last day of the week is not the end of the week, add it to the
# list of valid slide days (this will produce an incomplete slide, but
# that's fine for us, since it should only be 1 day, historically.)
if (wday(max_time) != day_of_week_end) {
valid_slide_days <- c(valid_slide_days, max_time)
}
slid_result <- x %>%

# Slide over the days and aggregate.
x %>%
group_by(across(all_of(keys))) %>%
slide_fun(
agg_columns,
Expand All @@ -229,18 +267,13 @@ daily_to_weekly_archive <- function(epi_arch,
) %>%
select(-all_of(agg_columns)) %>%
rename_with(~ gsub("slide_value_", "", .x)) %>%
# only keep 1/week
# group_by week, keep the largest in each week
# alternatively
# switch time_value to the designated day of the week
rename_with(~ gsub("_7dsum", "", .x)) %>%
# Round all dates to reference day of the week. These will get
# de-duplicated by compactify in as_epi_archive below.
mutate(time_value = round_date(time_value, "week", day_of_week - 1)) %>%
as_tibble()
}
)
too_many_tibbles %>%
pull(time_value) %>%
max()
too_many_tibbles %>%
) %>%
as_epi_archive(compactify = TRUE)
}

Expand Down Expand Up @@ -313,9 +346,8 @@ get_health_data <- function(as_of, disease = c("covid", "flu")) {

most_recent_row <- meta_data %>%
# update_date is actually a time, so we need to filter for the day after.
filter(update_date <= as_of + 1) %>%
arrange(desc(update_date)) %>%
slice(1)
filter(update_date <= as.Date(as_of) + 1) %>%
slice_max(update_date)

if (nrow(most_recent_row) == 0) {
cli::cli_abort("No data available for the given date.")
Expand All @@ -331,9 +363,7 @@ get_health_data <- function(as_of, disease = c("covid", "flu")) {
if (disease == "covid") {
data %<>% mutate(
hhs = previous_day_admission_adult_covid_confirmed +
previous_day_admission_adult_covid_suspected +
previous_day_admission_pediatric_covid_confirmed +
previous_day_admission_pediatric_covid_suspected
previous_day_admission_pediatric_covid_confirmed
)
} else if (disease == "flu") {
data %<>% mutate(hhs = previous_day_admission_influenza_confirmed)
Expand Down Expand Up @@ -594,15 +624,16 @@ gen_ili_data <- function(default_day_of_week = 1) {
as_epi_archive(compactify = TRUE)
}

#' Process Raw NHSN Data
#'
#' Turns the raw NHSN data into a tidy format with the following columns:
#' - geo_value: the jurisdiction of the data
#' - disease: the disease of the data
#' - time_value: the date of the data
#' - version: the version of the data
#' - value: the value of the data
#'
process_nhsn_data <- function(raw_nhsn_data) {
# These are exception dates when the data was available on a different day
# than usual. In these two cases, it was the Thursday after. But to keep
# the rest of the pipeline the same, we pretend it was available on Wednesday.
remap_exceptions <- list(
"2024-12-26" = "2024-12-25",
"2025-01-02" = "2025-01-01"
)
fixed_version <- remap_exceptions[[as.character(Sys.Date())]] %||% Sys.Date()
raw_nhsn_data %>%
mutate(
geo_value = tolower(jurisdiction),
Expand All @@ -614,15 +645,58 @@ process_nhsn_data <- function(raw_nhsn_data) {
select(-weekendingdate, -jurisdiction, -starts_with("totalconf")) %>%
pivot_longer(cols = starts_with("nhsn"), names_to = "disease") %>%
filter(!is.na(value)) %>%
mutate(version = fixed_version) %>%
mutate(version = Sys.Date()) %>%
relocate(geo_value, disease, time_value, version)
}


# for filenames of the form nhsn_data_2024-11-19_16-29-43.191649.rds
get_version_timestamp <- function(filename) ymd_hms(str_match(filename, "[0-9]{4}-..-.._..-..-..\\.[^.^_]*"))

#' all in one function to get and cache a nhsn archive from raw files
#' Remove duplicate files from S3
#'
#' Removes duplicate files from S3 by keeping only the earliest timestamp file for each ETag.
#' You can modify keep_df, if this doesn't suit your needs.
#'
#' @param bucket The name of the S3 bucket.
#' @param prefix The prefix of the files to remove duplicates from.
#' @param dry_run Whether to actually delete the files.
#' @param .progress Whether to show a progress bar.
delete_duplicates_from_s3_by_etag <- function(bucket, prefix, dry_run = TRUE, .progress = TRUE) {
# Get a list of all new dataset snapshots from S3
files_df <- aws.s3::get_bucket_df(bucket = bucket, prefix = prefix) %>% as_tibble()

# Create a list of all the files to keep by keeping the earliest timestamp file for each ETag
keep_df <- files_df %>%
group_by(ETag) %>%
slice_min(LastModified) %>%
ungroup()
delete_df <- files_df %>%
anti_join(keep_df, by = "Key")
if (nrow(delete_df) > 0) {
if (dry_run) {
cli::cli_alert_info("Would delete {nrow(delete_df)} files from {bucket} with prefix {prefix}")
print(delete_df)
return(invisible(delete_df))
} else {
delete_files_from_s3(bucket = bucket, keys = delete_df$Key, .progress = .progress)
}
}
}

#' Delete files from S3
#'
#' Faster than aws.s3::delete_object, when there are many files to delete (thousands).
#'
#' @param bucket The name of the S3 bucket.
#' @param keys The keys of the files to delete, as a character vector.
#' @param batch_size The number of files to delete in each batch.
#' @param .progress Whether to show a progress bar.
delete_files_from_s3 <- function(bucket, keys, batch_size = 500, .progress = TRUE) {
split(keys, ceiling(seq_along(keys) / batch_size)) %>%
purrr::walk(~aws.s3::delete_object(bucket = bucket, object = .x), .progress = .progress)
}

#' @description
#' This takes in all of the raw data files for the nhsn data, creates a
#' quasi-archive (it keeps one example per version-day, rather than one per
Expand Down Expand Up @@ -709,10 +783,12 @@ create_nhsn_data_archive <- function(disease_name) {
as_epi_archive(compactify = TRUE)
}


up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) {
disease <- arg_match(disease)
nssp_state <- pub_covidcast(
nssp_state <- retry_fn(
max_attempts = 10,
wait_seconds = 1,
fn = pub_covidcast,
source = "nssp",
signal = glue::glue("pct_ed_visits_{disease}"),
time_type = "week",
Expand All @@ -728,3 +804,26 @@ up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) {
mutate(time_value = time_value + 3) %>%
as_epi_archive(compactify = TRUE)
}

# Get the last time the signal was updated.
get_covidcast_signal_last_update <- function(source, signal, geo_type) {
pub_covidcast_meta() %>%
filter(source == !!source, signal == !!signal, geo_type == !!geo_type) %>%
pull(last_update) %>%
as.POSIXct()
}

# Get the last time the Socrata dataset was updated.
get_socrata_updated_at <- function(dataset_url) {
httr::GET(dataset_url) %>%
httr::content() %>%
pluck("rowsUpdatedAt") %>%
as.POSIXct()
}

get_s3_object_last_modified <- function(bucket, key) {
# Format looks like "Fri, 31 Jan 2025 22:01:16 GMT"
attr(aws.s3::head_object(key, bucket = bucket), "last-modified") %>%
str_replace_all(" GMT", "") %>%
as.POSIXct(format = "%a, %d %b %Y %H:%M:%S")
}
33 changes: 13 additions & 20 deletions R/forecasters/data_transforms.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ get_nonkey_names <- function(epi_data) {

#' Get a rolling average for the named columns
#'
#' Add column(s) that are the rolling means of the specified columns, as
#' Add a column that is the rolling means of the specified column, as
#' implemented by slider. Defaults to the previous 7 days. Currently only
#' group_by's on the geo_value. Should probably extend to more keys if you have
#' them.
Expand All @@ -40,15 +40,10 @@ get_nonkey_names <- function(epi_data) {
#' @export
rolling_mean <- function(epi_data, width = 7L, cols_to_mean = NULL) {
cols_to_mean <- get_trainable_names(epi_data, cols_to_mean)
epi_data %<>% group_by(across(key_colnames(epi_data, exclude = "time_value")))
for (col in cols_to_mean) {
mean_name <- paste0("slide_", col, "_m", width)
epi_data %<>%
epi_slide_mean(all_of(col), .window_size = width) %>%
rename(!!mean_name := paste0("slide_value_", col))
}
epi_data %<>% ungroup()
return(epi_data)
epi_data %>%
group_by(across(key_colnames(epi_data, exclude = "time_value"))) %>%
epi_slide_mean(all_of(cols_to_mean), .window_size = width, .new_col_names = paste0("slide_", cols_to_mean, "_m", width)) %>%
ungroup()
}

#' Get a rolling standard deviation for the named columns
Expand All @@ -75,20 +70,18 @@ rolling_sd <- function(epi_data, sd_width = 29L, mean_width = NULL, cols_to_sd =
cols_to_sd <- get_trainable_names(epi_data, cols_to_sd)
result <- epi_data
result %<>% group_by(across(key_colnames(epi_data, exclude = "time_value")))
for (col in cols_to_sd) {
mean_name <- glue::glue("slide_{col}_m{mean_width}")
sd_name <- glue::glue("slide_{col}_sd{sd_width}")
for (col_name in cols_to_sd) {
mean_name <- glue::glue("slide_{col_name}_m{mean_width}")
sd_name <- glue::glue("slide_{col_name}_sd{sd_width}")

result %<>%
epi_slide_mean(all_of(col), .window_size = mean_width) %>%
rename(!!mean_name := paste0("slide_value_", col))
epi_slide_mean(all_of(col_name), .window_size = mean_width, .new_col_names = mean_name)

result %<>%
mutate(.temp = (.data[[mean_name]] - .data[[col]])^2) %>%
epi_slide_mean(all_of(".temp"), .window_size = sd_width) %>%
select(-.temp) %>%
rename(!!sd_name := "slide_value_.temp") %>%
mutate(!!sd_name := sqrt(.data[[sd_name]]))
mutate(.temp = (.data[[mean_name]] - .data[[col_name]])^2) %>%
epi_slide_mean(all_of(".temp"), .window_size = sd_width, .new_col_names = sd_name) %>%
mutate(!!sd_name := sqrt(.data[[sd_name]])) %>%
select(-.temp)

if (!keep_mean) {
result %<>% select(-{{ mean_name }})
Expand Down
4 changes: 2 additions & 2 deletions R/forecasters/ensemble_average.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ ensemble_average <- function(epi_data,
# their names are separated for obscure target related reasons
names(ensemble_args) <- ensemble_args_names %||% names(ensemble_args)
average_type <- ensemble_args$average_type %||% median
join_columns <- ensemble_args$join_columns %||% setdiff(names(forecasts[[1]]), "prediction")
join_columns <- ensemble_args$join_columns %||% setdiff(names(forecasts[[1]]), "value")

# begin actual analysis
forecasts %>%
bind_rows(.id = "id") %>%
group_by(across(all_of(join_columns))) %>%
summarize(prediction = average_type(prediction, na.rm = TRUE), .groups = "drop") %>%
summarize(value = average_type(value, na.rm = TRUE), .groups = "drop") %>%
ungroup()
}
Loading
Loading