Skip to content

Commit 81381d6

Browse files
committed
optimized write_csv
1 parent 58b51a6 commit 81381d6

File tree

4 files changed

+150
-36
lines changed

4 files changed

+150
-36
lines changed

doctor_visits/delphi_doctor_visits/process_data.py

+52-24
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,39 @@
66

77
from .config import Config
88

9-
def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
9+
def format_outname(prefix: str, se: bool, weekday:bool):
1010
'''
1111
1212
Parameters
1313
----------
14-
df
15-
geo_id
14+
prefix
1615
se
17-
logger
16+
weekday
1817
1918
Returns
2019
-------
2120
21+
'''
22+
# write out results
23+
out_name = "smoothed_adj_cli" if weekday else "smoothed_cli"
24+
if se:
25+
assert prefix is not None, "template has no obfuscated prefix"
26+
out_name = prefix + "_" + out_name
27+
return out_name
28+
29+
def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
30+
'''
31+
format dataframe and checks for anomalies to write results
32+
Parameters
33+
----------
34+
df: dataframe from output from update_sensor
35+
geo_id: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
36+
se: boolean to write out standard errors, if true, use an obfuscated name
37+
logger
38+
39+
Returns
40+
-------
41+
filtered and formatted dataframe
2242
'''
2343
# report in percentage
2444
df['val'] = df['val'] * 100
@@ -28,53 +48,61 @@ def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
2848
df_val_null = df[val_isnull]
2949
if not df_val_null.empty:
3050
logger.info("sensor value is nan, check pipeline")
31-
filtered_df = df[~val_isnull]
51+
df = df[~val_isnull]
3252

33-
se_too_high = filtered_df['se'] >= 5
34-
df_se_too_high = filtered_df[se_too_high]
35-
if len(df_se_too_high.empty) > 0:
53+
se_too_high = df['se'] >= 5
54+
df_se_too_high = df[se_too_high]
55+
if len(df_se_too_high) > 0:
3656
logger.info(f"standard error suspiciously high! investigate {geo_id}")
37-
filtered_df = filtered_df[~se_too_high]
57+
df = df[~se_too_high]
3858

39-
sensor_too_high = filtered_df['val'] >= 90
40-
df_sensor_too_high = filtered_df[sensor_too_high]
59+
sensor_too_high = df['val'] >= 90
60+
df_sensor_too_high = df[sensor_too_high]
4161
if len(df_sensor_too_high) > 0:
4262
logger.info(f"standard error suspiciously high! investigate {geo_id}")
43-
filtered_df = filtered_df[~sensor_too_high]
63+
df = df[~sensor_too_high]
4464

4565
if se:
46-
valid_cond = filtered_df['se'] > 0 & filtered_df['val'] > 0
47-
invalid_df = filtered_df[~valid_cond]
66+
valid_cond = (df['se'] > 0) & (df['val'] > 0)
67+
invalid_df = df[~valid_cond]
4868
if len(invalid_df) > 0:
4969
logger.info(f"p=0, std_err=0 invalid")
50-
filtered_df = filtered_df[valid_cond]
70+
df = df[valid_cond]
5171
else:
52-
filtered_df.drop(columns=['se'], inplace=True)
53-
72+
df["se"] = np.NAN
5473

74+
df["direction"] = np.NAN
75+
df["sample_size"] = np.NAN
76+
return df
5577

56-
def write_to_csv(output_df: pd.DataFrame, geo_level: str, se:bool, out_name: str, logger, output_path="."):
78+
def write_to_csv(output_df: pd.DataFrame, prefix: str, geo_id: str, weekday: bool, se:bool, logger, output_path="."):
5779
"""Write sensor values to csv.
5880
5981
Args:
6082
output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
61-
geo_level: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
83+
geo_id: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
6284
se: boolean to write out standard errors, if true, use an obfuscated name
6385
out_name: name of the output file
6486
output_path: outfile path to write the csv (default is current directory)
6587
"""
88+
out_name = format_outname(prefix, se, weekday)
89+
filtered_df = format_df(output_df, geo_id, se, logger)
90+
6691
if se:
6792
logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========")
6893

69-
out_n = 0
70-
for d in set(output_df["date"]):
94+
dates = set(list(output_df['date']))
95+
grouped = filtered_df.groupby('date')
96+
for d in dates:
7197
filename = "%s/%s_%s_%s.csv" % (output_path,
7298
(d + Config.DAY_SHIFT).strftime("%Y%m%d"),
73-
geo_level,
99+
geo_id,
74100
out_name)
75-
single_date_df = output_df[output_df["date"] == d]
101+
single_date_df = grouped.get_group(d)
102+
single_date_df = single_date_df.drop(columns=['date'])
103+
single_date_df.to_csv(filename, index=False, na_rep="NA")
76104

77-
logger.debug(f"wrote {out_n} rows for {geo_level}")
105+
logger.debug(f"wrote {len(single_date_df)} rows for {geo_id}")
78106

79107

80108
def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: datetime, logger) -> pd.DataFrame:

doctor_visits/delphi_doctor_visits/run.py

+1-6
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,8 @@ def run_module(params): # pylint: disable=too-many-statements
118118
if sensor is None:
119119
logger.error("No sensors calculated, no output will be produced")
120120
continue
121-
# write out results
122-
out_name = "smoothed_adj_cli" if weekday else "smoothed_cli"
123-
if params["indicator"]["se"]:
124-
assert prefix is not None, "template has no obfuscated prefix"
125-
out_name = prefix + "_" + out_name
126121

127-
write_to_csv(sensor, geo, se, out_name, logger, export_dir)
122+
write_to_csv(sensor, prefix, geo, weekday, se, logger, export_dir)
128123
max_dates.append(sensor.date.max())
129124
n_csv_export.append(sensor.date.unique().shape[0])
130125
logger.debug(f"wrote files to {export_dir}")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
geo_id,val,se,direction,sample_size
2+
ak,0.569287,NA,NA,NA
3+
al,0.328228,NA,NA,NA
4+
ar,0.370763,NA,NA,NA
5+
az,0.530073,NA,NA,NA
6+
ca,0.351530,NA,NA,NA
7+
co,0.401868,NA,NA,NA
8+
ct,0.601417,NA,NA,NA
9+
dc,0.878253,NA,NA,NA
10+
de,0.324467,NA,NA,NA
11+
fl,0.479217,NA,NA,NA
12+
ga,0.475930,NA,NA,NA
13+
hi,0.393773,NA,NA,NA
14+
ia,0.481491,NA,NA,NA
15+
id,0.445713,NA,NA,NA
16+
il,0.380958,NA,NA,NA
17+
in,0.357658,NA,NA,NA
18+
ks,0.365005,NA,NA,NA
19+
ky,0.368104,NA,NA,NA
20+
la,0.405224,NA,NA,NA
21+
ma,0.347109,NA,NA,NA
22+
md,0.478480,NA,NA,NA
23+
me,0.292373,NA,NA,NA
24+
mi,0.432469,NA,NA,NA
25+
mn,0.436532,NA,NA,NA
26+
mo,0.354799,NA,NA,NA
27+
ms,0.385404,NA,NA,NA
28+
mt,0.363729,NA,NA,NA
29+
nc,0.502467,NA,NA,NA
30+
nd,0.384162,NA,NA,NA
31+
ne,0.504449,NA,NA,NA
32+
nh,0.406304,NA,NA,NA
33+
nj,0.350642,NA,NA,NA
34+
nm,0.336862,NA,NA,NA
35+
nv,0.590539,NA,NA,NA
36+
ny,0.369274,NA,NA,NA
37+
oh,0.402905,NA,NA,NA
38+
ok,0.339027,NA,NA,NA
39+
or,0.421793,NA,NA,NA
40+
pa,0.342980,NA,NA,NA
41+
ri,0.353920,NA,NA,NA
42+
sc,0.321687,NA,NA,NA
43+
sd,0.508804,NA,NA,NA
44+
tn,0.454150,NA,NA,NA
45+
tx,0.358389,NA,NA,NA
46+
ut,0.488488,NA,NA,NA
47+
va,0.371326,NA,NA,NA
48+
vt,0.307760,NA,NA,NA
49+
wa,0.440772,NA,NA,NA
50+
wi,0.373994,NA,NA,NA
51+
wv,0.317498,NA,NA,NA
52+
wy,0.346961,NA,NA,NA
+45-6
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,60 @@
11
"""Tests for update_sensor.py."""
22
from datetime import datetime
33
import logging
4+
import os
5+
from pathlib import Path
46
import pandas as pd
57

6-
from delphi_doctor_visits.process_data import csv_to_df
8+
from delphi_doctor_visits.process_data import csv_to_df, write_to_csv, format_outname
79

810
TEST_LOGGER = logging.getLogger()
911

1012
class TestProcessData:
13+
geo = "state",
14+
startdate = datetime(2020, 2, 4)
15+
enddate = datetime(2020, 2, 5)
16+
dropdate = datetime(2020, 2,6)
17+
geo = "state"
18+
se = False
19+
weekday = False
20+
prefix = "wip_XXXXX"
21+
filepath = "./test_data"
22+
compare_path = "./comparison"
23+
1124
def test_csv_to_df(self):
1225
actual = csv_to_df(
13-
filepath="./test_data/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz",
14-
startdate=datetime(2020, 2, 4),
15-
enddate=datetime(2020, 2, 5),
16-
dropdate=datetime(2020, 2,6),
26+
filepath=f"{self.filepath}/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz",
27+
startdate=self.startdate,
28+
enddate=self.enddate,
29+
dropdate=self.dropdate,
1730
logger=TEST_LOGGER,
1831
)
1932

20-
comparison = pd.read_pickle("./comparison/process_data/main_after_date_SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.pkl")
33+
comparison = pd.read_pickle(f"{self.compare_path}/process_data/main_after_date_SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.pkl")
2134
pd.testing.assert_frame_equal(actual.reset_index(drop=True), comparison)
35+
36+
37+
def test_write_to_csv(self):
38+
output_df = pd.read_csv(f"{self.compare_path}/update_sensor/all.csv", parse_dates=["date"])
39+
40+
write_to_csv(
41+
output_df=output_df,
42+
prefix=self.prefix,
43+
geo_id=self.geo,
44+
se=self.se,
45+
weekday=self.weekday,
46+
logger=TEST_LOGGER,
47+
output_path=self.filepath
48+
)
49+
50+
outname = format_outname(self.prefix, self.se, self.weekday)
51+
52+
files = list(Path(self.filepath).glob(f"*{outname}.csv"))
53+
54+
for f in files:
55+
filename = f.name
56+
actual = pd.read_csv(f)
57+
comparison = pd.read_csv(f"{self.compare_path}/write_csv/{filename}")
58+
pd.testing.assert_frame_equal(actual, comparison)
59+
os.remove(f)
60+

0 commit comments

Comments
 (0)