@@ -188,38 +188,56 @@ daily_to_weekly <- function(epi_df, agg_method = c("sum", "mean"), day_of_week =
188
188
select(- epiweek , - year )
189
189
}
190
190
191
+ # ' Aggregate a daily archive to a weekly archive.
192
+ # '
193
+ # ' @param epi_arch the archive to aggregate.
194
+ # ' @param agg_columns the columns to aggregate.
195
+ # ' @param agg_method the method to use to aggregate the data, one of "sum" or "mean".
196
+ # ' @param day_of_week the day of the week to use as the reference day.
197
+ # ' @param day_of_week_end the day of the week to use as the end of the week.
191
198
daily_to_weekly_archive <- function (epi_arch ,
192
199
agg_columns ,
193
200
agg_method = c(" sum" , " mean" ),
194
201
day_of_week = 4L ,
195
202
day_of_week_end = 7L ) {
203
+ # How to aggregate the windowed data.
196
204
agg_method <- arg_match(agg_method )
205
+ # The columns we will later group by when aggregating.
197
206
keys <- key_colnames(epi_arch , exclude = c(" time_value" , " version" ))
207
+ # The versions we will slide over.
198
208
ref_time_values <- epi_arch $ DT $ version %> %
199
209
unique() %> %
200
210
sort()
211
+ # Choose a fast function to use to slide and aggregate.
201
212
if (agg_method == " sum" ) {
202
213
slide_fun <- epi_slide_sum
203
214
} else if (agg_method == " mean" ) {
204
215
slide_fun <- epi_slide_mean
205
216
}
206
- too_many_tibbles <- epix_slide(
217
+ # Slide over the versions and aggregate.
218
+ epix_slide(
207
219
epi_arch ,
208
- .before = 99999999L ,
209
220
.versions = ref_time_values ,
210
- function (x , group , ref_time ) {
221
+ function (x , group_keys , ref_time ) {
222
+ # The last day of the week we will slide over.
211
223
ref_time_last_week_end <-
212
224
floor_date(ref_time , " week" , day_of_week_end - 1 ) # this is over by 1
225
+ # The last day of the week we will slide over.
213
226
max_time <- max(x $ time_value )
227
+ # The days we will slide over.
214
228
valid_slide_days <- seq.Date(
215
229
from = ceiling_date(min(x $ time_value ), " week" , week_start = day_of_week_end - 1 ),
216
230
to = floor_date(max(x $ time_value ), " week" , week_start = day_of_week_end - 1 ),
217
231
by = 7L
218
232
)
233
+ # If the last day of the week is not the end of the week, add it to the
234
+ # list of valid slide days (this will produce an incomplete slide, but
235
+ # that's fine for us, since it should only be 1 day, historically.)
219
236
if (wday(max_time ) != day_of_week_end ) {
220
237
valid_slide_days <- c(valid_slide_days , max_time )
221
238
}
222
- slid_result <- x %> %
239
+ # Slide over the days and aggregate.
240
+ x %> %
223
241
group_by(across(all_of(keys ))) %> %
224
242
slide_fun(
225
243
agg_columns ,
@@ -229,18 +247,13 @@ daily_to_weekly_archive <- function(epi_arch,
229
247
) %> %
230
248
select(- all_of(agg_columns )) %> %
231
249
rename_with(~ gsub(" slide_value_" , " " , .x )) %> %
232
- # only keep 1/week
233
- # group_by week, keep the largest in each week
234
- # alternatively
235
- # switch time_value to the designated day of the week
250
+ rename_with(~ gsub(" _7dsum" , " " , .x )) %> %
251
+ # Round all dates to reference day of the week. These will get
252
+ # de-duplicated by compactify in as_epi_archive below.
236
253
mutate(time_value = round_date(time_value , " week" , day_of_week - 1 )) %> %
237
254
as_tibble()
238
255
}
239
- )
240
- too_many_tibbles %> %
241
- pull(time_value ) %> %
242
- max()
243
- too_many_tibbles %> %
256
+ ) %> %
244
257
as_epi_archive(compactify = TRUE )
245
258
}
246
259
@@ -313,9 +326,8 @@ get_health_data <- function(as_of, disease = c("covid", "flu")) {
313
326
314
327
most_recent_row <- meta_data %> %
315
328
# update_date is actually a time, so we need to filter for the day after.
316
- filter(update_date < = as_of + 1 ) %> %
317
- arrange(desc(update_date )) %> %
318
- slice(1 )
329
+ filter(update_date < = as.Date(as_of ) + 1 ) %> %
330
+ slice_max(update_date )
319
331
320
332
if (nrow(most_recent_row ) == 0 ) {
321
333
cli :: cli_abort(" No data available for the given date." )
@@ -331,9 +343,7 @@ get_health_data <- function(as_of, disease = c("covid", "flu")) {
331
343
if (disease == " covid" ) {
332
344
data %<> % mutate(
333
345
hhs = previous_day_admission_adult_covid_confirmed +
334
- previous_day_admission_adult_covid_suspected +
335
- previous_day_admission_pediatric_covid_confirmed +
336
- previous_day_admission_pediatric_covid_suspected
346
+ previous_day_admission_pediatric_covid_confirmed
337
347
)
338
348
} else if (disease == " flu" ) {
339
349
data %<> % mutate(hhs = previous_day_admission_influenza_confirmed )
@@ -709,10 +719,12 @@ create_nhsn_data_archive <- function(disease_name) {
709
719
as_epi_archive(compactify = TRUE )
710
720
}
711
721
712
-
713
722
up_to_date_nssp_state_archive <- function (disease = c(" covid" , " influenza" )) {
714
723
disease <- arg_match(disease )
715
- nssp_state <- pub_covidcast(
724
+ nssp_state <- retry_fn(
725
+ max_attempts = 10 ,
726
+ wait_seconds = 1 ,
727
+ fn = pub_covidcast ,
716
728
source = " nssp" ,
717
729
signal = glue :: glue(" pct_ed_visits_{disease}" ),
718
730
time_type = " week" ,
@@ -728,3 +740,26 @@ up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) {
728
740
mutate(time_value = time_value + 3 ) %> %
729
741
as_epi_archive(compactify = TRUE )
730
742
}
743
+
744
+ # Get the last time the signal was updated.
745
+ get_covidcast_signal_last_update <- function (source , signal ) {
746
+ pub_covidcast_meta() %> %
747
+ filter(source == !! source , signal == !! signal ) %> %
748
+ pull(last_update ) %> %
749
+ as.POSIXct()
750
+ }
751
+
752
+ # Get the last time the Socrata dataset was updated.
753
+ get_socrata_updated_at <- function (dataset_url ) {
754
+ httr :: GET(dataset_url ) %> %
755
+ httr :: content() %> %
756
+ pluck(" rowsUpdatedAt" ) %> %
757
+ as.POSIXct()
758
+ }
759
+
760
+ get_s3_object_last_modified <- function (bucket , key ) {
761
+ # Format looks like "Fri, 31 Jan 2025 22:01:16 GMT"
762
+ attr(aws.s3 :: head_object(key , bucket = bucket ), " last-modified" ) %> %
763
+ str_replace_all(" GMT" , " " ) %> %
764
+ as.POSIXct(format = " %a, %d %b %Y %H:%M:%S" )
765
+ }
0 commit comments