Skip to content

Commit cc54d1b

Browse files
authored
Merge pull request #203 from cmu-delphi/nsspForecast
initial nssp covid forecast
2 parents 965fce9 + e351fee commit cc54d1b

File tree

8 files changed

+807
-94
lines changed

8 files changed

+807
-94
lines changed

.lintr

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ linters: linters_with_defaults(
22
line_length_linter(120),
33
cyclocomp_linter = NULL,
44
object_length_linter(length = 40L),
5+
object_usage_linter = NULL,
56
commented_code_linter = NULL
67
)
78
exclusions: list(

R/aux_data_utils.R

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,10 +687,22 @@ up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) {
687687
issues = "*"
688688
)
689689
nssp_state %>%
690-
select(geo_value, time_value, issue, nssp = value) %>%
690+
select(geo_value, time_value, version = issue, nssp = value) %>%
691+
bind_rows(get_nssp_github()) %>%
691692
as_epi_archive(compactify = TRUE) %>%
692693
extract2("DT") %>%
693694
# End of week to midweek correction.
694-
mutate(time_value = time_value + 3) %>%
695+
mutate(time_value = floor_date(time_value, "week", week_start = 7) + 3) %>%
695696
as_epi_archive(compactify = TRUE)
696697
}
698+
699+
get_nssp_github <- function() {
700+
raw_file <- read_csv("https://raw.githubusercontent.com/CDCgov/covid19-forecast-hub/refs/heads/main/auxiliary-data/nssp-raw-data/latest.csv")
701+
state_map <- get_population_data() %>% filter(state_id !="usa")
702+
raw_file %>%
703+
filter(county == "All") %>%
704+
left_join(state_map, by = join_by(geography == state_name)) %>%
705+
select(geo_value = state_id, time_value = week_end, nssp = percent_visits_covid) %>%
706+
mutate(time_value = floor_date(time_value, "week", week_start = 7) + 3) %>%
707+
mutate(version = Sys.Date())
708+
}

R/utils.R

Lines changed: 104 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,6 @@ delete_files_from_s3 <- function(keys, bucket, batch_size = 500, .progress = TRU
626626

627627

628628
MIN_TIMESTAMP <- as.POSIXct("2000-01-01 00:00:00S", tz = "UTC")
629-
MAX_TIMESTAMP <- as.POSIXct("2040-01-01 00:00:00S", tz = "UTC")
630629

631630
#' Get the last time a covidcast signal was updated.
632631
#'
@@ -674,37 +673,40 @@ get_s3_object_last_modified <- function(key, bucket, missing_value = MIN_TIMESTA
674673
#' @param dataset_url The URL of the Socrata dataset.
675674
#'
676675
#' @return The last updated date of the Socrata dataset in POSIXct format.
677-
get_socrata_updated_at <- function(dataset_url, missing_value = MAX_TIMESTAMP) {
676+
get_socrata_updated_at <- function(dataset_url, missing_value) {
678677
tryCatch(
679678
{
680-
httr::with_config(
679+
rowsUpdatedAt <- httr::with_config(
681680
httr::config(timeout = 5),
682681
httr::RETRY("GET", dataset_url, times = 5, pause_min = 5, pause_cap = 5)
683682
) %>%
684683
httr::content() %>%
685684
# This field comes in as integer seconds since epoch, so we need to convert it.
686-
pluck("rowsUpdatedAt") %>%
687-
as.POSIXct(origin = "1970-01-01", tz = "UTC")
685+
pluck("rowsUpdatedAt")
686+
if (is.null(rowsUpdatedAt)) {
687+
return(missing_value)
688+
}
689+
rowsUpdatedAt %>% as.POSIXct(origin = "1970-01-01", tz = "UTC")
688690
},
689691
error = function(cond) {
690692
return(missing_value)
691693
}
692694
)
693695
}
694696

695-
696697
#' get the unique shared (geo_value, forecast_date, target_end_date) tuples present for each forecaster in `forecasts`
697698
get_unique <- function(forecasts) {
698699
forecasters <- forecasts %>%
699700
pull(forecaster) %>%
700701
unique()
701702
distinct <- map(
702703
forecasters,
703-
\(x)
704+
\(x) {
704705
forecasts %>%
705706
filter(forecaster == x) %>%
706707
select(geo_value, forecast_date, target_end_date) %>%
707708
distinct()
709+
}
708710
)
709711
distinct_dates <- reduce(
710712
distinct,
@@ -746,3 +748,98 @@ filter_shared_geo_dates <- function(
746748
inner_join(viable_dates, by = c("geo_value", "forecast_date", "target_end_date"))
747749
)
748750
}
751+
752+
753+
#' Calculate MD5 hash of a file
754+
#'
755+
#' This function reads a file into memory, calculates an MD5 hash of the
756+
#' binary data, and returns the hash as a character string.
757+
#'
758+
#' @param file The path to the file to hash
759+
#' @param algorithm The hash algorithm to use. Defaults to "md5".
760+
get_file_hash <- function(file, algorithm = "md5") {
761+
readBin(file, what = "raw", n = file.size(file)) %>%
762+
digest::digest(algo = algorithm, serialize = FALSE)
763+
}
764+
765+
#' Calculate MD5 hash of a tibble as Parquet data
766+
#'
767+
#' This function takes a tibble, writes it to a Parquet file in memory,
768+
#' and calculates an MD5 hash of the resulting binary data. This is useful
769+
#' for creating content-based hashes of data that can be used for caching
770+
#' or detecting changes in data.
771+
#'
772+
#' @param df A tibble or data frame to hash
773+
#' @param algorithm The hash algorithm to use. Defaults to "md5".
774+
#' Other options include "sha1", "sha256", "crc32", etc.
775+
#'
776+
#' @return A character string containing the MD5 hash
777+
#'
778+
#' @examples
779+
#' \dontrun{
780+
#' library(dplyr)
781+
#' data <- tibble(x = 1:5, y = letters[1:5])
782+
#' hash <- get_parquet_hash(data)
783+
#' print(hash)
784+
#' }
785+
#'
786+
#' @export
787+
get_tibble_hash <- function(df, algorithm = "md5") {
788+
temp_file <- tempfile(fileext = ".parquet")
789+
on.exit(unlink(temp_file), add = TRUE)
790+
nanoparquet::write_parquet(df, temp_file)
791+
get_file_hash(temp_file, algorithm = algorithm)
792+
}
793+
794+
#' Compare an S3 ETag with local hashes
795+
#'
796+
#' This function downloads a file from S3, calculates various hashes of the
797+
#' binary data, and compares them to the ETag of the S3 object. A test to verify
798+
#' that I understand how S3 ETags are computed.
799+
#'
800+
#' @param bucket The name of the S3 bucket.
801+
#' @param key The key of the S3 object.
802+
compare_s3_etag <- function(bucket, key, region = "us-east-1") {
803+
# Download file to temp location
804+
temp_file <- tempfile()
805+
on.exit(unlink(temp_file), add = TRUE)
806+
807+
# Download from S3
808+
aws.s3::save_object(object = key, bucket = bucket, file = temp_file, region = region)
809+
810+
# Get S3 metadata to extract ETag
811+
s3_meta <- aws.s3::head_object(object = key, bucket = bucket, region = region)
812+
813+
# Extract ETag (remove quotes if present)
814+
s3_etag <- gsub('"', '', attr(s3_meta, "etag"))
815+
816+
# Calculate various hashes of the local file
817+
raw_data <- readBin(temp_file, "raw", file.info(temp_file)$size)
818+
819+
hashes <- list(
820+
md5 = digest::digest(raw_data, algo = "md5", serialize = FALSE),
821+
sha1 = digest::digest(raw_data, algo = "sha1", serialize = FALSE),
822+
sha256 = digest::digest(raw_data, algo = "sha256", serialize = FALSE),
823+
crc32 = digest::digest(raw_data, algo = "crc32", serialize = FALSE)
824+
)
825+
826+
# Compare results
827+
cat("S3 ETag:", s3_etag, "\n")
828+
cat("Local hashes:\n")
829+
for (name in names(hashes)) {
830+
match_indicator <- if (hashes[[name]] == s3_etag) " ✓ MATCH" else ""
831+
cat(sprintf(" %s: %s%s\n", name, hashes[[name]], match_indicator))
832+
}
833+
834+
# Check if it's a multipart upload (contains hyphen)
835+
if (grepl("-", s3_etag)) {
836+
cat("\nNote: ETag contains hyphen - this was likely a multipart upload\n")
837+
cat("Multipart ETags are MD5 of concatenated part MD5s, plus part count\n")
838+
}
839+
840+
invisible(list(
841+
s3_etag = s3_etag,
842+
local_hashes = hashes,
843+
file_size = file.info(temp_file)$size
844+
))
845+
}

scripts/build_nhsn_archive.R

Lines changed: 66 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ config <- list(
4747
prelim_metadata_url = "https://data.cdc.gov/api/views/mpgq-jmmr",
4848
raw_file_name_prefix = "nhsn_data_raw",
4949
s3_bucket = "forecasting-team-data",
50-
archive_s3_key = "nhsn_data_archive.parquet"
50+
archive_s3_key = "nhsn_data_archive.parquet",
51+
local_raw_cache_path = "cache/nhsn_raw_cache",
52+
hash_archive_file = "nhsn_hash_archive.parquet"
5153
)
5254

5355

@@ -79,6 +81,7 @@ get_last_raw_update_at <- function(type = c("raw", "prelim"), missing_value = MI
7981
)
8082
}
8183

84+
8285
#' Download the latest NHSN data from Socrata
8386
#'
8487
#' This function downloads the latest NHSN data from Socrata, if it has been
@@ -87,44 +90,81 @@ get_last_raw_update_at <- function(type = c("raw", "prelim"), missing_value = MI
8790
#'
8891
#' @param verbose Whether to print verbose output.
8992
update_nhsn_data_raw <- function() {
90-
# If this request fails (which occurs surprisingly often, eyeroll), we
91-
# will just return a future date (2040-01-01) and download anyway.
92-
raw_update_at <- get_socrata_updated_at(config$raw_metadata_url)
93-
# Same here.
94-
prelim_update_at <- get_socrata_updated_at(config$prelim_metadata_url)
93+
current_time <- with_tz(Sys.time(), tzone = "UTC")
94+
# WARNING: These Socrata metadata fields have been unreliable. If they fail, they
95+
# default to current time, which will trigger a download and then we compare
96+
# with hash archive.
97+
raw_update_at <- get_socrata_updated_at(config$raw_metadata_url, missing_value = current_time)
98+
prelim_update_at <- get_socrata_updated_at(config$prelim_metadata_url, missing_value = current_time)
99+
# Get the last time the raw data was updated from S3.
95100
last_raw_file_update_at <- get_last_raw_update_at("raw")
96101
last_prelim_file_update_at <- get_last_raw_update_at("prelim")
97102

103+
# Some derived values for logging and file naming.
104+
raw_update_at_local <- with_tz(raw_update_at)
105+
raw_update_at_formatted <- format(raw_update_at, "%Y-%m-%d_%H-%M-%OS5")
106+
raw_file <- glue("{config$raw_file_name_prefix}_{raw_update_at_formatted}.parquet")
107+
local_file_path <- here::here(config$local_raw_cache_path, raw_file)
108+
prelim_update_at_local <- with_tz(prelim_update_at)
109+
prelim_update_at_formatted <- format(prelim_update_at, "%Y-%m-%d_%H-%M-%OS5")
110+
prelim_file <- glue("{config$raw_file_name_prefix}_{prelim_update_at_formatted}_prelim.parquet")
111+
local_prelim_file_path <- here::here(config$local_raw_cache_path, prelim_file)
112+
hash_archive_path <- here::here(config$local_raw_cache_path, config$hash_archive_file)
113+
114+
# Open the hash archive file.
115+
hash_archive <- nanoparquet::read_parquet(hash_archive_path)
116+
117+
# If the raw data has been updated or there was a failure getting metadata,
118+
# download it.
98119
if (raw_update_at > last_raw_file_update_at) {
99-
raw_update_at_local <- with_tz(raw_update_at)
100120
cli_inform("The raw data has been updated at {raw_update_at_local} (UTC: {raw_update_at}).")
101-
raw_update_at_formatted <- format(raw_update_at, "%Y-%m-%d_%H-%M-%OS5")
102-
raw_file <- glue("{config$raw_file_name_prefix}_{raw_update_at_formatted}.parquet")
103121
cli_inform("Downloading the raw data... {raw_file}")
104-
read_csv(config$raw_query_url) %>% s3write_using(write_parquet, object = raw_file, bucket = config$s3_bucket)
122+
read_csv(config$raw_query_url) %>% write_parquet(local_file_path)
123+
124+
# Get the hash of the raw file.
125+
raw_file_hash <- get_file_hash(local_file_path)
126+
127+
# If the raw file hash is not in the archive, add it to S3 and local file.
128+
if (!raw_file_hash %in% hash_archive$hash) {
129+
hash_archive <- bind_rows(hash_archive, tibble(file = raw_file, hash = raw_file_hash))
130+
cli_inform("Adding raw file to S3 and local cache.")
131+
132+
# Back up the raw file to S3.
133+
# s3write_using(write_parquet, object = raw_file, bucket = config$s3_bucket)
134+
135+
# Write the hash archive back to the file.
136+
write_parquet(hash_archive, hash_archive_path)
137+
} else {
138+
cli_inform("New raw file is a duplicate, removing from local cache.")
139+
unlink(local_file_path)
140+
}
105141
}
106142

143+
# If the prelim data has been updated or there was a failure getting metadata,
144+
# download it.
107145
if (prelim_update_at > last_prelim_file_update_at) {
108-
prelim_update_at_local <- with_tz(prelim_update_at)
109146
cli_inform("The prelim data has been updated at {prelim_update_at_local} (UTC: {prelim_update_at}).")
110-
prelim_update_at_formatted <- format(prelim_update_at, "%Y-%m-%d_%H-%M-%OS5")
111-
prelim_file <- glue("{config$raw_file_name_prefix}_{prelim_update_at_formatted}_prelim.parquet")
112147
cli_inform("Downloading the prelim data... {prelim_file}")
113-
read_csv(config$prelim_query_url) %>% s3write_using(write_parquet, object = prelim_file, bucket = config$s3_bucket)
114-
}
148+
read_csv(config$prelim_query_url) %>% write_parquet(local_prelim_file_path)
115149

116-
# Since we may have downloaded a duplicate file above, filter out the ones
117-
# that have the same ETag. (I don't feel like rederiving AWS S3's ETag field
118-
# and computing ahead of time.)
119-
delete_df <- delete_duplicates_from_s3_by_etag(config$s3_bucket, config$raw_file_name_prefix, dry_run = FALSE)
120-
if (nrow(delete_df) > 0) {
121-
cli_inform("Deleted {nrow(delete_df)} duplicate files from S3.")
122-
cli_inform("Deleted files:")
123-
cli_inform(paste0(" - ", delete_df$Key))
124-
} else {
125-
cli_inform("No duplicate files to delete.")
150+
# Get the hash of the prelim file.
151+
prelim_file_hash <- get_file_hash(local_prelim_file_path)
152+
153+
# If the prelim file hash is not in the archive, add it to S3 and local file.
154+
if (!prelim_file_hash %in% hash_archive$hash) {
155+
hash_archive <- bind_rows(hash_archive, tibble(file = prelim_file, hash = prelim_file_hash))
156+
cli_inform("Adding prelim file to S3 and local cache.")
157+
158+
# Back up the prelim file to S3.
159+
# s3write_using(write_parquet, object = prelim_file, bucket = config$s3_bucket)
160+
161+
# Write the hash archive back to the file.
162+
write_parquet(hash_archive, hash_archive_path)
163+
} else {
164+
cli_inform("New prelim file is a duplicate, removing from local cache.")
165+
unlink(local_prelim_file_path)
166+
}
126167
}
127-
cli_inform("Finished fetching NHSN data.")
128168
}
129169

130170
#' Process Raw NHSN Data File

scripts/covid_geo_exclusions.csv

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,21 @@ forecast_date,forecaster,geo_value,weight
1212
2024-10-01, climate_geo_agged, all, 0
1313
2024-10-01, climate_quantile_extrapolated, all, 0
1414
##################
15+
# Jun 25
16+
##################
17+
2024-10-01, all, mp, 0
18+
2024-10-01, windowed_seasonal, all, 0.0001
19+
2024-10-01, windowed_seasonal_extra_sources, all, 3
20+
2024-10-01, climate_linear, all, 0.0001
21+
2024-10-01, linear, all, 3
22+
2024-10-01, linearlog, all, 0
23+
2024-10-01, climate_base, all, 2
24+
2024-10-01, climate_geo_agged, all, 0
25+
2024-10-01, climate_quantile_extrapolated, all, 0.001
26+
2024-10-01, windowed_seasonal, ak, 10
27+
2024-10-01, windowed_seasonal_extra_sources, ak, 0.001
28+
2024-10-01, climate_linear, ak, 0.001
29+
##################
1530
# April 30th
1631
##################
1732
2025-04-30, all, mp, 0

0 commit comments

Comments
 (0)