9
9
10
10
import time
11
11
from delphi_epidata import Epidata
12
- from delphi_utils .export import create_export_csv
13
- from delphi_utils .geomap import GeoMapper
14
- from delphi_utils import get_structured_logger
12
+ from delphi_utils import create_export_csv , get_structured_logger , Nans , GeoMapper
15
13
import numpy as np
16
14
import pandas as pd
17
15
18
16
from .constants import SIGNALS , GEOS , SMOOTHERS , CONFIRMED , SUM_CONF_SUSP , CONFIRMED_FLU
19
17
18
+
20
19
def _date_to_int (d ):
21
20
"""Return a date object as a yyyymmdd int."""
22
21
return int (d .strftime ("%Y%m%d" ))
@@ -64,6 +63,19 @@ def generate_date_ranges(start, end):
64
63
return output
65
64
66
65
66
+ def add_nancodes (df ):
67
+ """Add nancodes to a signal dataframe."""
68
+ # Default missingness codes
69
+ df ["missing_val" ] = Nans .NOT_MISSING
70
+ df ["missing_se" ] = Nans .NOT_APPLICABLE
71
+ df ["missing_sample_size" ] = Nans .NOT_APPLICABLE
72
+
73
+ # Mark any remaining nans with unknown
74
+ remaining_nans_mask = df ["val" ].isnull ()
75
+ df .loc [remaining_nans_mask , "missing_val" ] = Nans .OTHER
76
+ return df
77
+
78
+
67
79
def run_module (params ):
68
80
"""
69
81
Generate ground truth HHS hospitalization data.
@@ -79,16 +91,16 @@ def run_module(params):
79
91
"""
80
92
start_time = time .time ()
81
93
logger = get_structured_logger (
82
- __name__ , filename = params ["common" ].get ("log_filename" ),
83
- log_exceptions = params ["common" ].get ("log_exceptions" , True ))
94
+ __name__ ,
95
+ filename = params ["common" ].get ("log_filename" ),
96
+ log_exceptions = params ["common" ].get ("log_exceptions" , True ),
97
+ )
84
98
mapper = GeoMapper ()
85
99
request_all_states = "," .join (mapper .get_geo_values ("state_id" ))
86
100
end_day = date .today ()
87
- if "epidata" in params ["common" ] and \
88
- "as_of" in params ["common" ]["epidata" ]:
101
+ if "epidata" in params ["common" ] and "as_of" in params ["common" ]["epidata" ]:
89
102
end_day = min (
90
- end_day ,
91
- datetime .strptime (str (params ["common" ]["epidata" ]["as_of" ]), "%Y%m%d" ).date ()
103
+ end_day , datetime .strptime (str (params ["common" ]["epidata" ]["as_of" ]), "%Y%m%d" ).date ()
92
104
)
93
105
past_reference_day = date (year = 2020 , month = 1 , day = 1 ) # first available date in DB
94
106
date_range = generate_date_ranges (past_reference_day , end_day )
@@ -100,33 +112,32 @@ def run_module(params):
100
112
raise Exception (f"Bad result from Epidata for { r } : { response ['message' ]} " )
101
113
if response ["result" ] == - 2 and r == date_range [- 1 ]: # -2 code means no results
102
114
continue
103
- dfs .append (pd .DataFrame (response [' epidata' ]))
115
+ dfs .append (pd .DataFrame (response [" epidata" ]))
104
116
all_columns = pd .concat (dfs )
105
117
geo_mapper = GeoMapper ()
106
118
stats = []
107
119
for sensor , smoother , geo in product (SIGNALS , SMOOTHERS , GEOS ):
108
- logger .info ("Generating signal and exporting to CSV" ,
109
- geo_res = geo ,
110
- sensor = sensor ,
111
- smoother = smoother )
112
- df = geo_mapper .add_geocode (make_signal (all_columns , sensor ),
113
- "state_id" ,
114
- "state_code" ,
115
- from_col = "state" )
120
+ logger .info (
121
+ "Generating signal and exporting to CSV" , geo_res = geo , sensor = sensor , smoother = smoother
122
+ )
123
+ df = geo_mapper .add_geocode (
124
+ make_signal (all_columns , sensor ), "state_id" , "state_code" , from_col = "state"
125
+ )
116
126
if sensor .endswith ("_prop" ):
117
- df = pop_proportion (df , geo_mapper )
127
+ df = pop_proportion (df , geo_mapper )
118
128
df = make_geo (df , geo , geo_mapper )
129
+ df ["se" ] = np .nan
130
+ df ["sample_size" ] = np .nan
119
131
df = smooth_values (df , smoother [0 ])
132
+ df = add_nancodes (df )
120
133
if df .empty :
121
134
continue
122
135
sensor_name = sensor + smoother [1 ]
123
136
# don't export first 6 days for smoothed signals since they'll be nan.
124
137
start_date = min (df .timestamp ) + timedelta (6 ) if smoother [1 ] else min (df .timestamp )
125
- dates = create_export_csv (df ,
126
- params ["common" ]["export_dir" ],
127
- geo ,
128
- sensor_name ,
129
- start_date = start_date )
138
+ dates = create_export_csv (
139
+ df , params ["common" ]["export_dir" ], geo , sensor_name , start_date = start_date
140
+ )
130
141
if len (dates ) > 0 :
131
142
stats .append ((max (dates ), len (dates )))
132
143
@@ -135,71 +146,75 @@ def run_module(params):
135
146
csv_export_count = sum (s [- 1 ] for s in stats )
136
147
max_lag_in_days = min_max_date and (datetime .now () - min_max_date ).days
137
148
formatted_min_max_date = min_max_date and min_max_date .strftime ("%Y-%m-%d" )
138
- logger .info ("Completed indicator run" ,
139
- elapsed_time_in_seconds = elapsed_time_in_seconds ,
140
- csv_export_count = csv_export_count ,
141
- max_lag_in_days = max_lag_in_days ,
142
- oldest_final_export_date = formatted_min_max_date )
149
+ logger .info (
150
+ "Completed indicator run" ,
151
+ elapsed_time_in_seconds = elapsed_time_in_seconds ,
152
+ csv_export_count = csv_export_count ,
153
+ max_lag_in_days = max_lag_in_days ,
154
+ oldest_final_export_date = formatted_min_max_date ,
155
+ )
143
156
144
157
145
158
def smooth_values (df , smoother ):
146
159
"""Smooth the value column in the dataframe."""
147
160
df ["val" ] = df ["val" ].astype (float )
148
- df ["val" ] = df [["geo_id" , "val" ]].groupby ("geo_id" )["val" ].transform (
149
- smoother .smooth
150
- )
161
+ df ["val" ] = df [["geo_id" , "val" ]].groupby ("geo_id" )["val" ].transform (smoother .smooth )
151
162
return df
152
163
153
- def pop_proportion (df ,geo_mapper ):
164
+
165
+ def pop_proportion (df , geo_mapper ):
154
166
"""Get the population-proportionate variants as the dataframe val."""
155
- pop_val = geo_mapper .add_population_column (df , "state_code" )
156
- df ["val" ]= round (df ["val" ]/ pop_val ["population" ]* 100000 , 7 )
167
+ pop_val = geo_mapper .add_population_column (df , "state_code" )
168
+ df ["val" ] = round (df ["val" ] / pop_val ["population" ] * 100000 , 7 )
157
169
pop_val .drop ("population" , axis = 1 , inplace = True )
158
170
return df
159
171
172
+
160
173
def make_geo (state , geo , geo_mapper ):
161
174
"""Transform incoming geo (state) to another geo."""
162
175
if geo == "state" :
163
176
exported = state .rename (columns = {"state" : "geo_id" })
164
177
else :
165
- exported = geo_mapper .replace_geocode (state , "state_code" , geo , new_col = "geo_id" )
166
- exported [ "se" ] = np . nan
167
- exported [ "sample_size" ] = np . nan
178
+ exported = geo_mapper .replace_geocode (
179
+ state , "state_code" , geo , new_col = "geo_id" , date_col = "timestamp"
180
+ )
168
181
return exported
169
182
170
183
171
184
def make_signal (all_columns , sig ):
172
185
"""Generate column sums according to signal name."""
173
- assert sig in SIGNALS , f"Unexpected signal name '{ sig } ';" + \
174
- " familiar names are '{', '.join(SIGNALS)}'"
186
+ assert sig in SIGNALS , (
187
+ f"Unexpected signal name '{ sig } ';" + " familiar names are '{', '.join(SIGNALS)}'"
188
+ )
175
189
if sig .startswith (CONFIRMED ):
176
- df = pd .DataFrame ({
177
- "state" : all_columns .state .apply (str .lower ),
178
- "timestamp" :int_date_to_previous_day_datetime (all_columns .date ),
179
- "val" : \
180
- all_columns .previous_day_admission_adult_covid_confirmed + \
181
- all_columns .previous_day_admission_pediatric_covid_confirmed
182
- })
190
+ df = pd .DataFrame (
191
+ {
192
+ "state" : all_columns .state .apply (str .lower ),
193
+ "timestamp" : int_date_to_previous_day_datetime (all_columns .date ),
194
+ "val" : all_columns .previous_day_admission_adult_covid_confirmed
195
+ + all_columns .previous_day_admission_pediatric_covid_confirmed ,
196
+ }
197
+ )
183
198
elif sig .startswith (SUM_CONF_SUSP ):
184
- df = pd .DataFrame ({
185
- "state" : all_columns .state .apply (str .lower ),
186
- "timestamp" :int_date_to_previous_day_datetime (all_columns .date ),
187
- "val" : \
188
- all_columns .previous_day_admission_adult_covid_confirmed + \
189
- all_columns .previous_day_admission_adult_covid_suspected + \
190
- all_columns .previous_day_admission_pediatric_covid_confirmed + \
191
- all_columns .previous_day_admission_pediatric_covid_suspected ,
192
- })
199
+ df = pd .DataFrame (
200
+ {
201
+ "state" : all_columns .state .apply (str .lower ),
202
+ "timestamp" : int_date_to_previous_day_datetime (all_columns .date ),
203
+ "val" : all_columns .previous_day_admission_adult_covid_confirmed
204
+ + all_columns .previous_day_admission_adult_covid_suspected
205
+ + all_columns .previous_day_admission_pediatric_covid_confirmed
206
+ + all_columns .previous_day_admission_pediatric_covid_suspected ,
207
+ }
208
+ )
193
209
elif sig .startswith (CONFIRMED_FLU ):
194
- df = pd .DataFrame ({
195
- "state" : all_columns .state .apply (str .lower ),
196
- "timestamp" :int_date_to_previous_day_datetime (all_columns .date ),
197
- "val" : \
198
- all_columns .previous_day_admission_influenza_confirmed
199
- })
200
- else :
201
- raise Exception (
202
- "Bad programmer: signal '{sig}' in SIGNALS but not handled in make_signal"
210
+ df = pd .DataFrame (
211
+ {
212
+ "state" : all_columns .state .apply (str .lower ),
213
+ "timestamp" : int_date_to_previous_day_datetime (all_columns .date ),
214
+ "val" : all_columns .previous_day_admission_influenza_confirmed ,
215
+ }
203
216
)
217
+ else :
218
+ raise Exception ("Bad programmer: signal '{sig}' in SIGNALS but not handled in make_signal" )
204
219
df ["val" ] = df .val .astype (float )
205
220
return df
0 commit comments