diff --git a/src/pyaro_readers/actrisebas/ActrisEbasReader.py b/src/pyaro_readers/actrisebas/ActrisEbasReader.py index 00d5e8c..ebf3ce7 100644 --- a/src/pyaro_readers/actrisebas/ActrisEbasReader.py +++ b/src/pyaro_readers/actrisebas/ActrisEbasReader.py @@ -1,13 +1,15 @@ import datetime import json import logging -import numpy as np import os -import polars import tomllib +from pathlib import Path +from urllib.parse import urlparse, quote + +import numpy as np +import polars import xarray as xr from tqdm import tqdm -from urllib.parse import urlparse, quote from urllib3.poolmanager import PoolManager from urllib3.util.retry import Retry @@ -47,14 +49,13 @@ # name of the ebas section in the DEFINITION_FILE EBAS_VAR_SECTION_NAME = "variables" -# +# Only variables having these cell methods are considered data variables CELL_METHODS_TO_COPY = [ "time: mean", "time: median", ] - # number of times an api request is tried before we consider it failed MAX_RETRIES = 2 @@ -80,6 +81,8 @@ # should be "time" as of CF convention, but other names can be added here TIME_VAR_NAME = ["time"] +CACHE_ENVIRONMENT_VAR_NAME = "PYARO_CACHE_DIR_EBAS_ACTRIS" + class ActrisEbasRetryException(Exception): pass @@ -96,18 +99,20 @@ class ActrisEbasQcVariableNotFoundException(Exception): class ActrisEbasTestDataNotFoundException(Exception): pass + class ActrisEbasWrongCellMethodOrUnitException(Exception): pass class ActrisEbasTimeSeriesReader(AutoFilterReaderEngine.AutoFilterReader): def __init__( - self, - filename_or_obj_or_url=BASE_API_URL, - filters=[], - # tqdm_desc: str | None = None, - # ts_type: str = "daily", - test_flag: bool = False, + self, + filename_or_obj_or_url=BASE_API_URL, + filters=[], + # tqdm_desc: str | None = None, + # ts_type: str = "daily", + test_flag: bool = False, + cache_flag: bool = True, ): """ """ self._filename = None @@ -128,16 +133,24 @@ def __init__( self.standard_names = {} # _laststatstr = "" self._revision = datetime.datetime.now() - self._metadata["revision"] = datetime.datetime.strftime( - self._revision, "%y%m%d%H%M%S" - ) + self._metadata["revision"] = datetime.datetime.strftime(self._revision, "%y%m%d%H%M%S") self.ebas_valid_flags = self.get_ebas_valid_flags() self.sites_to_read = None self.vars_to_read = None self.times_to_read = (np.datetime64(1, "Y"), np.datetime64(120, "Y")) + self.cache_dir = None + try: + _cache_dir = Path(os.environ[CACHE_ENVIRONMENT_VAR_NAME]) + if _cache_dir.exists(): + self.cache_flag = cache_flag + self.cache_dir = _cache_dir + except KeyError: + self.cache_flag = False + # set filters for filter in filters: + # pyaro filters... if isinstance(filter, Filter.StationFilter): self.sites_to_read = filter.init_kwargs()["include"] self.sites_to_exclude = filter.init_kwargs()["exclude"] @@ -154,6 +167,32 @@ def __init__( np.max(filter._start_include), ) logger.info(f"applying time include filter {self.times_to_read}...") + elif isinstance(filter, str): + if filter == 'stations': + assert filters[filter] + try: + self.sites_to_read = filters[filter]['include'] + except KeyError: + pass + try: + self.sites_to_exclude = filters[filter]['exclude'] + except KeyError: + pass + elif filter == 'variables': + assert filters[filter] + try: + self.vars_to_read = filters[filter]["include"] + logger.info(f"applying variable include filter {self.vars_to_read}...") + except KeyError: + pass + elif filter == 'time': + try: + self.times_to_read = filters[filter]["start_include"] + except Exception as e: + pass + assert filters[filter] + else: + pass else: # pass on not reader supported filters pass @@ -188,7 +227,8 @@ def __init__( try: self.standard_names[_actris_var] = self.get_ebas_standard_name(var) except KeyError: - logger.info(f"No ebas standard names found for {var}. Trying those of the actris variable {self.actris_vars_to_read[var][0]} instead...") + logger.info( + f"No ebas standard names found for {var}. Trying those of the actris variable {self.actris_vars_to_read[var][0]} instead...") self.standard_names[_actris_var] = self.get_actris_standard_name(_actris_var) # for _actris_var in self.actris_vars_to_read[var]: # try: @@ -266,8 +306,8 @@ def metadata(self): return self._metadata def _read( - self, - tqdm_desc="reading stations", + self, + tqdm_desc="reading stations", ): """ read the data from EBAS thredds server @@ -284,7 +324,17 @@ def _read( urls_to_dl = self.urls_to_dl[actris_variable] bar = tqdm(desc=tqdm_desc, total=len(urls_to_dl), disable=None) for s_idx, site_name in enumerate(urls_to_dl): - for f_idx, url in enumerate(urls_to_dl[site_name]): + for f_idx, thredds_url in enumerate(urls_to_dl[site_name]): + _local_file_flag = False + if self.cache_flag: + _local_file = self.cache_dir / Path(thredds_url).name + if _local_file.exists(): + url = _local_file + _local_file_flag = True + else: + url = thredds_url + else: + url = thredds_url logger.info(f"reading file {url}") try: tmp_data = xr.open_dataset(url) @@ -296,28 +346,51 @@ def _read( # np.datetime64(tmp_data.attrs["time_coverage_start"].split()[0]) # and # np.datetime64(tmp_data.attrs["time_coverage_end"].split()[0]) - # We also could have a look at the time variable, but the obe saves some time calulations - # (applying the time bounds to the middle points in the time variable) - file_start_time = np.datetime64( - tmp_data.attrs["time_coverage_start"].split()[0] - ) - file_end_time = np.datetime64( - tmp_data.attrs["time_coverage_end"].split()[0] - ) + # fall back to the time variable + try: + file_start_time = np.datetime64( + tmp_data.attrs["time_coverage_start"].split()[0] + ) + except Exception as e: + logger.error(f"URL {url} is missing the global attribute 'time_coverage_start'; Error: {e}") + logger.error(f"reading time values instead") + file_start_time = np.min(np.asarray(tmp_data["time_bnds"][:, 0])) + + try: + file_end_time = np.datetime64( + tmp_data.attrs["time_coverage_end"].split()[0] + ) + except Exception as e: + logger.error(f"URL {url} is missing the global attribute 'time_coverage_end'; Error: {e}") + logger.error(f"reading time values instead") + file_end_time = np.max(np.asarray(tmp_data["time_bnds"][:, 1])) + # if (file_start_time >= self.times_to_read[0] and file_start_time <= self.times_to_read[1]) \ # or (file_end_time >= self.times_to_read[0] and file_end_time <= self.times_to_read[1]): if ( - file_end_time < self.times_to_read[0] - or file_start_time > self.times_to_read[1] + file_end_time < self.times_to_read[0] + or file_start_time > self.times_to_read[1] ): logger.info(f"url {url} not read. Outside of time bounds.") continue - # put all data variables in the data struct for the moment + # write cache file if needed + if self.cache_flag and not _local_file_flag: + # some of the data files can't be read by xarray due to errors. So we + # can't cache them (all data is only realized here) + try: + tmp_data.to_netcdf(_local_file) + logger.info(f"saved cache file {_local_file}") + except Exception as e: + logger.error(f"failed to save cache file {_local_file} with error {e}") + logger.error(f"URL: {url}") + _local_file.unlink() + + # read needed data for d_idx, _data_var in enumerate( - self._get_ebas_data_vars( - tmp_data, - ) + self._get_ebas_data_vars( + tmp_data, + ) ): stat_code = None # look for a standard_name match and return only that variable @@ -354,8 +427,8 @@ def _read( ebas_flags = self.get_ebas_var_flags(tmp_data, _data_var) # quick test if we need to apply flags at all if ( - np.nansum(ebas_flags) - == ebas_flags.size * EBAS_FLAG_NAN_NUMBER + np.nansum(ebas_flags) + == ebas_flags.size * EBAS_FLAG_NAN_NUMBER ): flags = np.full(ts_no, Flag.VALID, dtype="i2") else: @@ -363,7 +436,7 @@ def _read( for _ebas_flag in ebas_flags: for f_idx, flag in enumerate(_ebas_flag): if (flag == 0) or ( - flag in self.ebas_valid_flags + flag in self.ebas_valid_flags ): flags[f_idx] = Flag.VALID @@ -542,8 +615,8 @@ def _get_ebas_data_vars(self, tmp_data, actris_var: str = None, units: str = Non # if cell_methods in CELL_METHODS_TO_COPY and units == self.def_data["actris_std_units"][data_var]: # data_vars.append(data_var) elif cell_methods is not None: - if cell_methods in CELL_METHODS_TO_COPY: - data_vars.append(data_var) + if cell_methods in CELL_METHODS_TO_COPY: + data_vars.append(data_var) # elif units is not None: # if units == self.def_data["actris_std_units"][data_var]: # data_vars.append(data_var) @@ -553,10 +626,10 @@ def _get_ebas_data_vars(self, tmp_data, actris_var: str = None, units: str = Non return data_vars def extract_urls( - self, - json_resp: dict, - sites_to_read: list[str] = [], - sites_to_exclude: list[str] = [], + self, + json_resp: dict, + sites_to_read: list[str] = [], + sites_to_exclude: list[str] = [], ) -> dict: """ small helper method to extract URLs to download from json reponse from the EBAS API @@ -577,11 +650,11 @@ def extract_urls( # site_data[DISTRIBUTION_ROOT_KEY] is also a list # search for protocol DISTRIBUTION_PROTOCOL_NAME for url_idx, distribution_data in enumerate( - site_data[DISTRIBUTION_ROOT_KEY] + site_data[DISTRIBUTION_ROOT_KEY] ): if ( - distribution_data[DISTRIBUTION_PROTOCOL_KEY] - != DISTRIBUTION_PROTOCOL_NAME + distribution_data[DISTRIBUTION_PROTOCOL_KEY] + != DISTRIBUTION_PROTOCOL_NAME ): logger.info( f"skipping site: {site_name} / proto: {distribution_data[DISTRIBUTION_PROTOCOL_KEY]}" diff --git a/src/pyaro_readers/actrisebas/definitions.toml b/src/pyaro_readers/actrisebas/definitions.toml index 7f39d99..7609a7a 100644 --- a/src/pyaro_readers/actrisebas/definitions.toml +++ b/src/pyaro_readers/actrisebas/definitions.toml @@ -50,7 +50,7 @@ ebas_matrix = ["pm25", "pm1"] standard_names = ["mass_concentration_of_elemental_carbon_in_pm2p5_in_air", "mass_concentration_of_elemental_carbon_in_pm1_in_air", ] -[variables.concCsopm25] +[variables.concCocpm25] actris_variable = ["aerosol particle organic carbon mass concentration"] actris_matrix = ["aerosol particle phase"] ebas_component = ["organic_carbon"] @@ -112,6 +112,7 @@ ebas_component = ["pm25_mass"] ebas_matrix = ["pm25",] standard_names = ["mass_concentration_of_pm2p5_ambient_aerosol_particles_in_air", ] + #[variables.] #actris_variable = [""] #actris_matrix = [""] diff --git a/tests/test_ActrisEbasReader.py b/tests/test_ActrisEbasReader.py index c6e340d..1defd54 100644 --- a/tests/test_ActrisEbasReader.py +++ b/tests/test_ActrisEbasReader.py @@ -24,9 +24,9 @@ class TestActrisEbasTimeSeriesReader(unittest.TestCase): # pyaerocom_vars_to_read = ["vmrso2"] # pyaerocom_vars_to_read = ["concso4t"] - # pyaerocom_vars_to_read = ["concpm10"] + pyaerocom_vars_to_read = ["concpm10"] # pyaerocom_vars_to_read = ["concpm25"] - pyaerocom_vars_to_read = ["concpm1"] + # pyaerocom_vars_to_read = ["concpm1"] station_filter = pyaro.timeseries.Filter.StationFilter( ["Birkenes II", "Jungfraujoch", "Ispra", "Melpitz", "Westerland"], [] diff --git a/tests/test_PyerocomBinding.py b/tests/test_PyerocomBinding.py index 5c44d01..c77e11b 100644 --- a/tests/test_PyerocomBinding.py +++ b/tests/test_PyerocomBinding.py @@ -27,7 +27,7 @@ def test_pyaerocom_aeronet(self): url = "https://pyaerocom.met.no/pyaro-suppl/testdata/aeronetsun_testdata.csv" obsconfig = PyaroConfig( name=data_name, - data_id=data_id, + reader_id=data_id, filename_or_obj_or_url=url, filters={"variables": {"include": ["AOD_440nm"]}}, name_map={"AOD_440nm": self.AERONETVAR}, @@ -63,7 +63,7 @@ def test_pyaerocom_actrisebas_single_var(self): url = self.ACTRISEBASVAR obsconfig = PyaroConfig( name=data_name, - data_id=data_id, + reader_id=data_id, filename_or_obj_or_url=url, filters=station_filter, ) @@ -102,12 +102,12 @@ def test_pyaerocom_actrisebas_many_var(self): url = self.ACTRISEBASVARLIST obsconfig = PyaroConfig( name=data_name, - data_id=data_id, + reader_id=data_id, filename_or_obj_or_url=url, filters=station_filter, ) reader = ReadUngridded(f"{data_name}") - data = reader._read(vars_to_retrieve=self.ACTRISEBASVAR, configs=obsconfig) + data = reader.read(vars_to_retrieve=self.ACTRISEBASVAR, configs=obsconfig) self.assertGreaterEqual(len(data.unique_station_names), 4) self.assertIn("Ispra", data.unique_station_names) self.assertIn(url[0], data.contains_vars)