Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ejscreen semi-automatic #1184

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 150 additions & 115 deletions scripts/us_epa/ejscreen/ejscreen.py
Rohit231998 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,103 +1,77 @@
'''
Generates cleaned CSV for the EPA EJSCREEN data and TMCF.
Usage: python3 ejscreen.py
'''
# Copyright 2023 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import io
import zipfile
import requests
import pandas as pd
import json
from absl import logging, flags, app
import sys

_MODULE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(_MODULE_DIR, '../../../util/'))
import file_util

logging.set_verbosity(logging.INFO)
logger = logging
_FLAGS = flags.FLAGS

YEARS = ['2015', '2016', '2017', '2018', '2019', '2020']

NORM_CSV_COLUMNS = ['ID', 'DSLPM', 'CANCER', 'RESP', 'OZONE', 'PM25']

# 2015 has different csv column names
CSV_COLUMNS_BY_YEAR = {
'2015': ['FIPS', 'dpm', 'cancer', 'resp', 'o3', 'pm'],
'2016': NORM_CSV_COLUMNS,
'2017': NORM_CSV_COLUMNS,
'2018': NORM_CSV_COLUMNS,
'2019': NORM_CSV_COLUMNS,
'2020': NORM_CSV_COLUMNS
}

ZIP_FILENAMES = {
'2015': 'EJSCREEN_20150505.csv',
'2016': 'EJSCREEN_V3_USPR_090216_CSV',
'2017': None,
'2018': 'EJSCREEN_2018_USPR_csv',
'2019': 'EJSCREEN_2019_USPR.csv',
'2020': 'EJSCREEN_2020_USPR.csv'
}

FILENAMES = {
'2015': 'EJSCREEN_20150505',
'2016': 'EJSCREEN_Full_V3_USPR_TSDFupdate',
'2017': 'EJSCREEN_2017_USPR_Public',
'2018': 'EJSCREEN_Full_USPR_2018',
'2019': 'EJSCREEN_2019_USPR',
'2020': 'EJSCREEN_2020_USPR'
}

TEMPLATE_MCF = '''
Node: E:ejscreen_airpollutants->E0
typeOf: dcs:StatVarObservation
variableMeasured: dcs:Mean_Concentration_AirPollutant_DieselPM
observationDate: C:ejscreen_airpollutants->year
observationAbout: C:ejscreen_airpollutants->FIPS
observationPeriod: dcs:P1Y
value: C:ejscreen_airpollutants->DSLPM
unit: dcs:MicrogramsPerCubicMeter
Node: E:ejscreen_airpollutants->E1
typeOf: dcs:StatVarObservation
variableMeasured: dcs:AirPollutant_Cancer_Risk
observationDate: C:ejscreen_airpollutants->year
observationAbout: C:ejscreen_airpollutants->FIPS
observationPeriod: dcs:P1Y
value: C:ejscreen_airpollutants->CANCER
Node: E:ejscreen_airpollutants->E2
typeOf: dcs:StatVarObservation
variableMeasured: dcs:AirPollutant_Respiratory_Hazard
observationDate: C:ejscreen_airpollutants->year
observationAbout: C:ejscreen_airpollutants->FIPS
observationPeriod: dcs:P1Y
value: C:ejscreen_airpollutants->RESP
Node: E:ejscreen_airpollutants->E3
typeOf: dcs:StatVarObservation
variableMeasured: dcs:Mean_Concentration_AirPollutant_Ozone
observationDate: C:ejscreen_airpollutants->year
observationAbout: C:ejscreen_airpollutants->FIPS
observationPeriod: dcs:P1Y
value: C:ejscreen_airpollutants->OZONE
unit: dcs:PartsPerBillion
Node: E:ejscreen_airpollutants->E4
typeOf: dcs:StatVarObservation
variableMeasured: dcs:Mean_Concentration_AirPollutant_PM2.5
observationDate: C:ejscreen_airpollutants->year
observationAbout: C:ejscreen_airpollutants->FIPS
observationPeriod: dcs:P1Y
value: C:ejscreen_airpollutants->PM25
unit: dcs:MicrogramsPerCubicMeter
'''


# data: dictionary of dataframes in the format {year: dataframe}
# outfilename: name of the csv that data will be written to
# write_csv concatenates the dataframe from each year together
flags.DEFINE_string('config_path',
'gs://unresolved_mcf/epa/ejscreen/config.json',
'Path to config file')


# Function to build the correct URL for each year
def build_url(year, zip_filename=None):
if zip_filename:
if year in URL_SUFFIX:
url = f'{BASE_URL}/{year}/{URL_SUFFIX[year]}/{zip_filename}.zip'
else:
url = f'{BASE_URL}/{year}/{zip_filename}.zip'
else:
url = f'{BASE_URL}/{year}/{FILENAMES[year]}.csv'
return url


# Download the file and save it in the input folder
def download_file(url, year, zip_filename=None):
response = requests.get(url, verify=False)
Rohit231998 marked this conversation as resolved.
Show resolved Hide resolved
if response.status_code == 200:
input_folder = os.path.join(_MODULE_DIR, 'input')
os.makedirs(input_folder,
exist_ok=True) # Create the folder if it doesn't exist

file_path = os.path.join(
input_folder, f'{year}.zip' if zip_filename else f'{year}.csv')
with open(file_path, 'wb') as f:
f.write(response.content)
logger.info(f"File downloaded and saved as {file_path}")
else:
logger.fatal(
f"Failed to download file for {year}. HTTP Status Code: {response.status_code}"
)


# Data processing function
def write_csv(data, outfilename):
full_df = pd.DataFrame()
for curr_year, one_year_df in data.items():
one_year_df['year'] = curr_year # add year column
full_df = pd.concat(
[full_df, one_year_df],
ignore_index=True) # concatenate year onto larger dataframe
one_year_df['year'] = curr_year
full_df = pd.concat([full_df, one_year_df], ignore_index=True)

# sort by FIPS and make into dcid
full_df = full_df.rename(columns={'ID': 'FIPS'})
full_df = full_df.sort_values(by=['FIPS'], ignore_index=True)
full_df['FIPS'] = 'dcid:geoId/' + (
Expand All @@ -108,32 +82,93 @@ def write_csv(data, outfilename):


def write_tmcf(outfilename):
if isinstance(TEMPLATE_MCF, list):
template_content = "\n".join(str(item) for item in TEMPLATE_MCF)
else:
template_content = str(TEMPLATE_MCF)

with open(outfilename, 'w') as f_out:
f_out.write(TEMPLATE_MCF)
f_out.write(template_content)


def main(_):
global URL_SUFFIX, BASE_URL, TEMPLATE_MCF, FILENAMES
Rohit231998 marked this conversation as resolved.
Show resolved Hide resolved

try:
# Load configuration from config.json
with file_util.FileIO(_FLAGS.config_path, 'r') as f:
config = json.load(f)

YEARS = config["YEARS"]
NORM_CSV_COLUMNS = config["NORM_CSV_COLUMNS"]
NORM_CSV_COLUMNS1 = config["NORM_CSV_COLUMNS1"]
CSV_COLUMNS_BY_YEAR = config["CSV_COLUMNS_BY_YEAR"]
ZIP_FILENAMES = config["ZIP_FILENAMES"]
FILENAMES = config["FILENAMES"]
TEMPLATE_MCF = config["TEMPLATE_MCF"]
BASE_URL = config["BASE_URL"]
URL_SUFFIX = config["URL_SUFFIX"]
RENAME_COLUMNS_YEARS = config["RENAME_COLUMNS_YEARS"]

dfs = {}

for year in YEARS:
try:
logger.info(f"Processing year: {year}")
columns = CSV_COLUMNS_BY_YEAR[year]
zip_filename = ZIP_FILENAMES.get(year, None)

# If the file for the current year is not already downloaded, download it
input_folder = os.path.join(_MODULE_DIR, 'input')
file_path = os.path.join(
input_folder,
f'{year}.zip' if zip_filename else f'{year}.csv')

# Download if the file is missing
if not os.path.exists(file_path):
logger.info(f"File for {year} not found. Downloading...")
url = build_url(year, zip_filename)
download_file(url, year, zip_filename)

# Process the downloaded file
if zip_filename:
with zipfile.ZipFile(file_path, 'r') as zfile:
with zfile.open(f'{FILENAMES[year]}.csv',
'r') as newfile:
dfs[year] = pd.read_csv(newfile,
engine='python',
encoding='latin1',
usecols=columns)
else:
dfs[year] = pd.read_csv(file_path, sep=',', usecols=columns)

logger.info(f"File processed for {year} successfully")

if year in RENAME_COLUMNS_YEARS:
cols_renamed = dict(zip(columns, NORM_CSV_COLUMNS1))
else:
cols_renamed = dict(zip(columns, NORM_CSV_COLUMNS))

dfs[year] = dfs[year].rename(columns=cols_renamed)
logger.info(f"Columns renamed for {year} successfully")

except Exception as e:
logger.fatal(f"Error processing data for year {year}: {e}")
continue

# Write the combined data and template
logger.info("Writing data to CSV")
write_csv(dfs, 'ejscreen_airpollutants.csv')

logger.info("Writing template to TMCF")
write_tmcf('ejscreen.tmcf')

logger.info("Process completed successfully")

except Exception as e:
logger.fatal(f"Unexpected error in the main process: {e}")
sys.exit(1)

Rohit231998 marked this conversation as resolved.
Show resolved Hide resolved

if __name__ == '__main__':
dfs = {}
for year in YEARS:
print(year)
columns = CSV_COLUMNS_BY_YEAR[year]
# request file
zip_filename = ZIP_FILENAMES[year]
if zip_filename is not None:
response = requests.get(
f'https://gaftp.epa.gov/EJSCREEN/{year}/{zip_filename}.zip')
with zipfile.ZipFile(io.BytesIO(response.content())) as zfile:
with zfile.open(f'{FILENAMES[year]}.csv', 'r') as newfile:
dfs[year] = pd.read_csv(newfile, usecols=columns)
# some years are not zipped
else:
response = requests.get(
f'https://gaftp.epa.gov/EJSCREEN/{year}/{FILENAMES[year]}.csv')
dfs[year] = pd.read_csv(response, usecols=columns)
# rename weird column names to match other years
if columns != NORM_CSV_COLUMNS:
cols_renamed = dict(zip(columns, NORM_CSV_COLUMNS))
dfs[year] = dfs[year].rename(columns=cols_renamed)

write_csv(dfs, 'ejscreen_airpollutants.csv')
write_tmcf('ejscreen.tmcf')
app.run(main)
47 changes: 5 additions & 42 deletions scripts/us_epa/ejscreen/ejscreen.tmcf
Original file line number Diff line number Diff line change
@@ -1,42 +1,5 @@
Node: E:ejscreen_airpollutants->E0
typeOf: dcs:StatVarObservation
variableMeasured: dcs:Mean_Concentration_AirPollutant_DieselPM
observationDate: C:ejscreen_airpollutants->year
observationAbout: C:ejscreen_airpollutants->FIPS
observationPeriod: dcs:P1Y
value: C:ejscreen_airpollutants->DSLPM
unit: dcs:MicrogramsPerCubicMeter

Node: E:ejscreen_airpollutants->E1
typeOf: dcs:StatVarObservation
variableMeasured: dcs:AirPollutant_Cancer_Risk
observationDate: C:ejscreen_airpollutants->year
observationAbout: C:ejscreen_airpollutants->FIPS
observationPeriod: dcs:P1Y
value: C:ejscreen_airpollutants->CANCER

Node: E:ejscreen_airpollutants->E2
typeOf: dcs:StatVarObservation
variableMeasured: dcs:AirPollutant_Respiratory_Hazard
observationDate: C:ejscreen_airpollutants->year
observationAbout: C:ejscreen_airpollutants->FIPS
observationPeriod: dcs:P1Y
value: C:ejscreen_airpollutants->RESP

Node: E:ejscreen_airpollutants->E3
typeOf: dcs:StatVarObservation
variableMeasured: dcs:Mean_Concentration_AirPollutant_Ozone
observationDate: C:ejscreen_airpollutants->year
observationAbout: C:ejscreen_airpollutants->FIPS
observationPeriod: dcs:P1Y
value: C:ejscreen_airpollutants->OZONE
unit: dcs:PartsPerBillion

Node: E:ejscreen_airpollutants->E4
typeOf: dcs:StatVarObservation
variableMeasured: dcs:Mean_Concentration_AirPollutant_PM2.5
observationDate: C:ejscreen_airpollutants->year
observationAbout: C:ejscreen_airpollutants->FIPS
observationPeriod: dcs:P1Y
value: C:ejscreen_airpollutants->PM25
unit: dcs:MicrogramsPerCubicMeter
{'Node': 'E:ejscreen_airpollutants->E0', 'typeOf': 'dcs:StatVarObservation', 'variableMeasured': 'dcs:Mean_Concentration_AirPollutant_DieselPM', 'observationDate': 'C:ejscreen_airpollutants->year', 'observationAbout': 'C:ejscreen_airpollutants->FIPS', 'observationPeriod': 'dcs:P1Y', 'value': 'C:ejscreen_airpollutants->DSLPM', 'unit': 'dcs:MicrogramsPerCubicMeter'}
{'Node': 'E:ejscreen_airpollutants->E1', 'typeOf': 'dcs:StatVarObservation', 'variableMeasured': 'dcs:AirPollutant_Cancer_Risk', 'observationDate': 'C:ejscreen_airpollutants->year', 'observationAbout': 'C:ejscreen_airpollutants->FIPS', 'observationPeriod': 'dcs:P1Y', 'value': 'C:ejscreen_airpollutants->CANCER', 'unit': 'dcs:PerMillionPerson'}
{'Node': 'E:ejscreen_airpollutants->E2', 'typeOf': 'dcs:StatVarObservation', 'variableMeasured': 'dcs:AirPollutant_Respiratory_Hazard', 'observationDate': 'C:ejscreen_airpollutants->year', 'observationAbout': 'C:ejscreen_airpollutants->FIPS', 'observationPeriod': 'dcs:P1Y', 'value': 'C:ejscreen_airpollutants->RESP'}
{'Node': 'E:ejscreen_airpollutants->E3', 'typeOf': 'dcs:StatVarObservation', 'variableMeasured': 'dcs:Mean_Concentration_AirPollutant_Ozone', 'observationDate': 'C:ejscreen_airpollutants->year', 'observationAbout': 'C:ejscreen_airpollutants->FIPS', 'observationPeriod': 'dcs:P1Y', 'value': 'C:ejscreen_airpollutants->OZONE', 'unit': 'dcs:PartsPerBillion'}
{'Node': 'E:ejscreen_airpollutants->E4', 'typeOf': 'dcs:StatVarObservation', 'variableMeasured': 'dcs:Mean_Concentration_AirPollutant_PM2.5', 'observationDate': 'C:ejscreen_airpollutants->year', 'observationAbout': 'C:ejscreen_airpollutants->FIPS', 'observationPeriod': 'dcs:P1Y', 'value': 'C:ejscreen_airpollutants->PM25', 'unit': 'dcs:MicrogramsPerCubicMeter'}
41 changes: 32 additions & 9 deletions scripts/us_epa/ejscreen/ejscreen_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
Unit tests for ejscreen.py
Usage: python3 -m unittest discover -v -s ../ -p "ejscreen_test.py"
'''

import unittest
import os
import tempfile
Expand All @@ -16,19 +28,30 @@ class TestEjscreen(unittest.TestCase):

def test_write_csv(self):
with tempfile.TemporaryDirectory() as tmp_dir:
# Ensure test data file exists in the expected directory
test_data_file = os.path.join(module_dir_,
'test_data/test_data.csv')
expected_data_file = os.path.join(
module_dir_, 'test_data/test_data_expected.csv')

if not os.path.exists(test_data_file) or not os.path.exists(
expected_data_file):
raise FileNotFoundError(
f"Test data files are missing: {test_data_file}, {expected_data_file}"
)

dfs = {}
dfs['2020'] = pd.read_csv(os.path.join(module_dir_,
'test_data/test_data.csv'),
float_precision='high')
dfs['2020'] = pd.read_csv(test_data_file, float_precision='high')
test_csv = os.path.join(tmp_dir, 'test_csv.csv')
write_csv(dfs, test_csv)
expected_csv = os.path.join(module_dir_,
'test_data/test_data_expected.csv')

with open(test_csv, 'r') as test:
test_str: str = test.read()
with open(expected_csv, 'r') as expected:
expected_str: str = expected.read()
test_str = test.read()
with open(expected_data_file, 'r') as expected:
expected_str = expected.read()
self.assertEqual(test_str, expected_str)

# Remove temporary test file after assertion
os.remove(test_csv)


Expand Down
Loading
Loading