Skip to content

Commit

Permalink
WIP: ozone reading with pyaerocom variable name working
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Griesfeller committed Sep 17, 2024
1 parent 42c905b commit 316171b
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 65 deletions.
126 changes: 78 additions & 48 deletions src/pyaro_readers/actrisebas/ActrisEbasReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# BASE_API_URL = "https://prod-actris-md.nilu.no/Vocabulary/categories"
BASE_API_URL = "https://prod-actris-md.nilu.no/"
# base URL to query for data for a certain variable
VAR_QUERY_URL = f"{BASE_API_URL}Metadata/content/"
VAR_QUERY_URL = f"{BASE_API_URL}metadata/content/"
# basename of definitions.toml which connects the pyaerocom variable names with the ACTRIS variable names
DEFINITION_FILE_BASENAME = "definitions.toml"

Expand Down Expand Up @@ -89,26 +89,25 @@ def __init__(
self._urls_to_dl = {}
self._data = {} # var -> {data-array}
self._set_filters(filters)
self._header = []
# self._header = []
self._metadata = {}
# used for variable matching in the EBAS data files
# gives a mapping between the EBAS or pyaerocom variable name
# and the CF standard name found in the EBAS data files
# Due to standard_names aliases, the values are a list
self._standard_names = {}
_laststatstr = ""
# _laststatstr = ""
self._revision = datetime.datetime.now()
self._metadata["revision"] = datetime.datetime.strftime(
self._revision, "%y%m%d%H%M%S"
)

if "variables" in filters:
if "include" in filters["variables"]:
self.vars_to_read = filters["variables"]["include"]
logger.info(f"applying variable include filter {vars_to_read}...")
# try:
# self.vars_to_read = filters["variables"]["include"]
# except KeyError:
# raise ValueError(
# f"As of now, you have to give the species you want to read in filter.variables.include"
# )
# if "variables" in filters:
# if "include" in filters["variables"]:
# self.vars_to_read = filters["variables"]["include"]
# logger.info(f"applying variable include filter {vars_to_read}...")

# read only stations according to the station filter
try:
self.sites_to_read = filters["stations"]["include"]
except KeyError:
Expand All @@ -121,34 +120,73 @@ def __init__(

# read config file
self._def_data = self._read_definitions(file=DEFINITION_FILE)
# Because the user might have given a pyaerocom name, build self._actris_vars_to_read with a list
# of ACTRIS variables to read. values are a list
self._actris_vars_to_read = {}
for var in self.vars_to_read:
self._metadata[var] = {}
self._standard_names[var] = self.get_actris_standard_name(var)
# for testing since the API is error-prone and slow at the time of this writing
test_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
f"{var}.json",
)
if os.path.exists(test_file) and test_flag:
with open(test_file, "r") as f:
json_resp = json.load(f)
# handle pyaerocom variables here:
# if a given variable name is in the list of pyaerocom variable names in definitions.toml
self._actris_vars_to_read[var] = []
if var in self._def_data["variables"]:
# use gave a pyaerocom variable name
self._actris_vars_to_read[var] = self._def_data["variables"][var][
"actris_variable"
]
for _actris_var in self._actris_vars_to_read[var]:
try:
self._standard_names[var].extend(
self.get_actris_standard_name(_actris_var)
)
self._standard_names[_actris_var].extend(
self.get_actris_standard_name(_actris_var)
)
except KeyError:
self._standard_names[var] = [
self.get_actris_standard_name(_actris_var)
]
self._standard_names[_actris_var] = [
self.get_actris_standard_name(_actris_var)
]
else:
# search for variable metadata
query_url = f"{VAR_QUERY_URL}{quote(var)}"
retries = Retry(connect=5, read=2, redirect=5)
http = PoolManager(retries=retries)
response = http.request("GET", query_url)

json_resp = json.loads(response.data.decode("utf-8"))

self._metadata[var] = json_resp
self._urls_to_dl[var] = self.extract_urls(
json_resp,
sites_to_read=self.sites_to_read,
sites_to_exclude=self.sites_to_exclude,
)
self.read_data(var, self._urls_to_dl[var])
assert self._data[self.vars_to_read[0]]
# user gave ACTRIS name
self._actris_vars_to_read[var].append(var)
self._standard_names[var] = self.get_actris_standard_name(var)

for _pyaro_var in self._actris_vars_to_read:
self._metadata[_pyaro_var] = {}
for _actris_var in self._actris_vars_to_read[_pyaro_var]:
# for testing since the API is error-prone and slow at the time of this writing
test_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
f"{_actris_var}.json",
)
if os.path.exists(test_file) and test_flag:
with open(test_file, "r") as f:
json_resp = json.load(f)
else:
# search for variable metadata
query_url = (
f"{VAR_QUERY_URL}{quote(self._actris_vars_to_read[_pyaro_var])}"
)
retries = Retry(connect=5, read=2, redirect=5)
http = PoolManager(retries=retries)
response = http.request("GET", query_url)
json_resp = json.loads(response.data.decode("utf-8"))

self._metadata[_pyaro_var][_actris_var] = json_resp
self._urls_to_dl[_actris_var] = self.extract_urls(
json_resp,
sites_to_read=self.sites_to_read,
sites_to_exclude=self.sites_to_exclude,
)
# The following needs some refinement once we read pyaerocom variables that hold more than
# one EBAS variable
# we need to decide per station which EBAS variable to return at a certain station and potentially time
self.read_data(
actris_variable=_pyaro_var, urls_to_dl=self._urls_to_dl[_actris_var]
)
assert self._data[_pyaro_var]

def metadata(self):
return self._metadata
Expand Down Expand Up @@ -185,16 +223,10 @@ def read_data(
tmp_data,
)
):
# the naming of the variable in the file does not reflect the vocabulary naming ot pyaerocom's
# naming
# ret_data_var = _data_var.copy()
# if ret_data_var not in self.vars_to_read and :
# # we need

# look for a standard_name match and return only that variable
if (
self.get_ebas_data_standard_name(tmp_data, _data_var)
!= self._standard_names[actris_variable]
not in self._standard_names[actris_variable]
):
logger.info(
f"station {site_name}, file #{f_idx}: skipping variable {_data_var} due to wrong standard name"
Expand Down Expand Up @@ -362,9 +394,7 @@ def extract_urls(
return urls_to_dl

def _unfiltered_data(self, varname) -> Data:
ret_data = deepcopy(self._data[varname])
return ret_data
# return self._data[varname]
return self._data[varname]

def _unfiltered_stations(self) -> dict[str, Station]:
return self._stations
Expand Down
38 changes: 21 additions & 17 deletions tests/test_ActrisEbasReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class TestActrisEbasTimeSeriesReader(unittest.TestCase):
"stations": {"include": ["Birkenes II", "Jungfraujoch"]},
}
vars_to_read = ["ozone mass concentration"]
pyaerocom_vars_to_read = ["conco3"]

def test_api_online(self, url=TEST_URL):
try:
Expand Down Expand Up @@ -72,23 +73,26 @@ def test_api_reading_small_data_set(self):

self.assertIn("revision", ts.metadata())

# def test_api_reading_pyaerocom_naming(self):
# # test access to the EBAS API
# filters = {
# "variables": {
# "include": [
# "vmro3",
# ]
# },
# "stations": {"include": ["Birkenes II", "Jungfraujoch"]},
# }
# engine = pyaro.list_timeseries_engines()[self.engine]
# #
# with engine.open(
# filters=filters,
# vars_to_read=["vmro3"],
# ) as ts:
# self.assertGreaterEqual(len(ts.variables()), 1)
def test_api_reading_pyaerocom_naming(self):
# test access to the EBAS API
filters = {
"stations": {"include": ["Birkenes II", "Jungfraujoch"]},
# "variables": {
# "include": self.vars_to_read,
# },
}
engine = pyaro.list_timeseries_engines()[self.engine]
#
with engine.open(
filters=filters,
vars_to_read=self.pyaerocom_vars_to_read,
test_flag=True,
) as ts:
self.assertGreaterEqual(len(ts.variables()), 1)
self.assertEqual(len(ts.stations()), 2)
self.assertGreaterEqual(len(ts._data[ts.variables()[0]]), 1000)
self.assertGreaterEqual(len(ts.data(ts.variables()[0])), 1000)

#
# #
# def test_wrappers(self):
Expand Down

0 comments on commit 316171b

Please sign in to comment.