Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Griesfeller committed Feb 20, 2025
1 parent 8dab15f commit 3052ea0
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 51 deletions.
161 changes: 117 additions & 44 deletions src/pyaro_readers/actrisebas/ActrisEbasReader.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import datetime
import json
import logging
import numpy as np
import os
import polars
import tomllib
from pathlib import Path
from urllib.parse import urlparse, quote

import numpy as np
import polars
import xarray as xr
from tqdm import tqdm
from urllib.parse import urlparse, quote
from urllib3.poolmanager import PoolManager
from urllib3.util.retry import Retry

Expand Down Expand Up @@ -47,14 +49,13 @@
# name of the ebas section in the DEFINITION_FILE
EBAS_VAR_SECTION_NAME = "variables"

#
# Only variables having these cell methods are considered data variables
CELL_METHODS_TO_COPY = [
"time: mean",
"time: median",

]


# number of times an api request is tried before we consider it failed
MAX_RETRIES = 2

Expand All @@ -80,6 +81,8 @@
# should be "time" as of CF convention, but other names can be added here
TIME_VAR_NAME = ["time"]

CACHE_ENVIRONMENT_VAR_NAME = "PYARO_CACHE_DIR_EBAS_ACTRIS"


class ActrisEbasRetryException(Exception):
pass
Expand All @@ -96,18 +99,20 @@ class ActrisEbasQcVariableNotFoundException(Exception):
class ActrisEbasTestDataNotFoundException(Exception):
pass


class ActrisEbasWrongCellMethodOrUnitException(Exception):
pass


class ActrisEbasTimeSeriesReader(AutoFilterReaderEngine.AutoFilterReader):
def __init__(
self,
filename_or_obj_or_url=BASE_API_URL,
filters=[],
# tqdm_desc: str | None = None,
# ts_type: str = "daily",
test_flag: bool = False,
self,
filename_or_obj_or_url=BASE_API_URL,
filters=[],
# tqdm_desc: str | None = None,
# ts_type: str = "daily",
test_flag: bool = False,
cache_flag: bool = True,
):
""" """
self._filename = None
Expand All @@ -128,16 +133,24 @@ def __init__(
self.standard_names = {}
# _laststatstr = ""
self._revision = datetime.datetime.now()
self._metadata["revision"] = datetime.datetime.strftime(
self._revision, "%y%m%d%H%M%S"
)
self._metadata["revision"] = datetime.datetime.strftime(self._revision, "%y%m%d%H%M%S")
self.ebas_valid_flags = self.get_ebas_valid_flags()
self.sites_to_read = None
self.vars_to_read = None
self.times_to_read = (np.datetime64(1, "Y"), np.datetime64(120, "Y"))

self.cache_dir = None
try:
_cache_dir = Path(os.environ[CACHE_ENVIRONMENT_VAR_NAME])
if _cache_dir.exists():
self.cache_flag = cache_flag
self.cache_dir = _cache_dir
except KeyError:
self.cache_flag = False

# set filters
for filter in filters:
# pyaro filters...
if isinstance(filter, Filter.StationFilter):
self.sites_to_read = filter.init_kwargs()["include"]
self.sites_to_exclude = filter.init_kwargs()["exclude"]
Expand All @@ -154,6 +167,32 @@ def __init__(
np.max(filter._start_include),
)
logger.info(f"applying time include filter {self.times_to_read}...")
elif isinstance(filter, str):
if filter == 'stations':
assert filters[filter]
try:
self.sites_to_read = filters[filter]['include']
except KeyError:
pass
try:
self.sites_to_exclude = filters[filter]['exclude']
except KeyError:
pass
elif filter == 'variables':
assert filters[filter]
try:
self.vars_to_read = filters[filter]["include"]
logger.info(f"applying variable include filter {self.vars_to_read}...")
except KeyError:
pass
elif filter == 'time':
try:
self.times_to_read = filters[filter]["start_include"]
except Exception as e:
pass
assert filters[filter]
else:
pass
else:
# pass on not reader supported filters
pass
Expand Down Expand Up @@ -188,7 +227,8 @@ def __init__(
try:
self.standard_names[_actris_var] = self.get_ebas_standard_name(var)
except KeyError:
logger.info(f"No ebas standard names found for {var}. Trying those of the actris variable {self.actris_vars_to_read[var][0]} instead...")
logger.info(
f"No ebas standard names found for {var}. Trying those of the actris variable {self.actris_vars_to_read[var][0]} instead...")
self.standard_names[_actris_var] = self.get_actris_standard_name(_actris_var)
# for _actris_var in self.actris_vars_to_read[var]:
# try:
Expand Down Expand Up @@ -266,8 +306,8 @@ def metadata(self):
return self._metadata

def _read(
self,
tqdm_desc="reading stations",
self,
tqdm_desc="reading stations",
):
"""
read the data from EBAS thredds server
Expand All @@ -284,7 +324,17 @@ def _read(
urls_to_dl = self.urls_to_dl[actris_variable]
bar = tqdm(desc=tqdm_desc, total=len(urls_to_dl), disable=None)
for s_idx, site_name in enumerate(urls_to_dl):
for f_idx, url in enumerate(urls_to_dl[site_name]):
for f_idx, thredds_url in enumerate(urls_to_dl[site_name]):
_local_file_flag = False
if self.cache_flag:
_local_file = self.cache_dir / Path(thredds_url).name
if _local_file.exists():
url = _local_file
_local_file_flag = True
else:
url = thredds_url
else:
url = thredds_url
logger.info(f"reading file {url}")
try:
tmp_data = xr.open_dataset(url)
Expand All @@ -296,28 +346,51 @@ def _read(
# np.datetime64(tmp_data.attrs["time_coverage_start"].split()[0])
# and
# np.datetime64(tmp_data.attrs["time_coverage_end"].split()[0])
# We also could have a look at the time variable, but the obe saves some time calulations
# (applying the time bounds to the middle points in the time variable)
file_start_time = np.datetime64(
tmp_data.attrs["time_coverage_start"].split()[0]
)
file_end_time = np.datetime64(
tmp_data.attrs["time_coverage_end"].split()[0]
)
# fall back to the time variable
try:
file_start_time = np.datetime64(
tmp_data.attrs["time_coverage_start"].split()[0]
)
except Exception as e:
logger.error(f"URL {url} is missing the global attribute 'time_coverage_start'; Error: {e}")
logger.error(f"reading time values instead")
file_start_time = np.min(np.asarray(tmp_data["time_bnds"][:, 0]))

try:
file_end_time = np.datetime64(
tmp_data.attrs["time_coverage_end"].split()[0]
)
except Exception as e:
logger.error(f"URL {url} is missing the global attribute 'time_coverage_end'; Error: {e}")
logger.error(f"reading time values instead")
file_end_time = np.max(np.asarray(tmp_data["time_bnds"][:, 1]))

# if (file_start_time >= self.times_to_read[0] and file_start_time <= self.times_to_read[1]) \
# or (file_end_time >= self.times_to_read[0] and file_end_time <= self.times_to_read[1]):
if (
file_end_time < self.times_to_read[0]
or file_start_time > self.times_to_read[1]
file_end_time < self.times_to_read[0]
or file_start_time > self.times_to_read[1]
):
logger.info(f"url {url} not read. Outside of time bounds.")
continue

# put all data variables in the data struct for the moment
# write cache file if needed
if self.cache_flag and not _local_file_flag:
# some of the data files can't be read by xarray due to errors. So we
# can't cache them (all data is only realized here)
try:
tmp_data.to_netcdf(_local_file)
logger.info(f"saved cache file {_local_file}")
except Exception as e:
logger.error(f"failed to save cache file {_local_file} with error {e}")
logger.error(f"URL: {url}")
_local_file.unlink()

# read needed data
for d_idx, _data_var in enumerate(
self._get_ebas_data_vars(
tmp_data,
)
self._get_ebas_data_vars(
tmp_data,
)
):
stat_code = None
# look for a standard_name match and return only that variable
Expand Down Expand Up @@ -354,16 +427,16 @@ def _read(
ebas_flags = self.get_ebas_var_flags(tmp_data, _data_var)
# quick test if we need to apply flags at all
if (
np.nansum(ebas_flags)
== ebas_flags.size * EBAS_FLAG_NAN_NUMBER
np.nansum(ebas_flags)
== ebas_flags.size * EBAS_FLAG_NAN_NUMBER
):
flags = np.full(ts_no, Flag.VALID, dtype="i2")
else:
flags = np.full(ts_no, Flag.INVALID, dtype="i2")
for _ebas_flag in ebas_flags:
for f_idx, flag in enumerate(_ebas_flag):
if (flag == 0) or (
flag in self.ebas_valid_flags
flag in self.ebas_valid_flags
):
flags[f_idx] = Flag.VALID

Expand Down Expand Up @@ -542,8 +615,8 @@ def _get_ebas_data_vars(self, tmp_data, actris_var: str = None, units: str = Non
# if cell_methods in CELL_METHODS_TO_COPY and units == self.def_data["actris_std_units"][data_var]:
# data_vars.append(data_var)
elif cell_methods is not None:
if cell_methods in CELL_METHODS_TO_COPY:
data_vars.append(data_var)
if cell_methods in CELL_METHODS_TO_COPY:
data_vars.append(data_var)
# elif units is not None:
# if units == self.def_data["actris_std_units"][data_var]:
# data_vars.append(data_var)
Expand All @@ -553,10 +626,10 @@ def _get_ebas_data_vars(self, tmp_data, actris_var: str = None, units: str = Non
return data_vars

def extract_urls(
self,
json_resp: dict,
sites_to_read: list[str] = [],
sites_to_exclude: list[str] = [],
self,
json_resp: dict,
sites_to_read: list[str] = [],
sites_to_exclude: list[str] = [],
) -> dict:
"""
small helper method to extract URLs to download from json reponse from the EBAS API
Expand All @@ -577,11 +650,11 @@ def extract_urls(
# site_data[DISTRIBUTION_ROOT_KEY] is also a list
# search for protocol DISTRIBUTION_PROTOCOL_NAME
for url_idx, distribution_data in enumerate(
site_data[DISTRIBUTION_ROOT_KEY]
site_data[DISTRIBUTION_ROOT_KEY]
):
if (
distribution_data[DISTRIBUTION_PROTOCOL_KEY]
!= DISTRIBUTION_PROTOCOL_NAME
distribution_data[DISTRIBUTION_PROTOCOL_KEY]
!= DISTRIBUTION_PROTOCOL_NAME
):
logger.info(
f"skipping site: {site_name} / proto: {distribution_data[DISTRIBUTION_PROTOCOL_KEY]}"
Expand Down
3 changes: 2 additions & 1 deletion src/pyaro_readers/actrisebas/definitions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ ebas_matrix = ["pm25", "pm1"]
standard_names = ["mass_concentration_of_elemental_carbon_in_pm2p5_in_air",
"mass_concentration_of_elemental_carbon_in_pm1_in_air", ]

[variables.concCsopm25]
[variables.concCocpm25]
actris_variable = ["aerosol particle organic carbon mass concentration"]
actris_matrix = ["aerosol particle phase"]
ebas_component = ["organic_carbon"]
Expand Down Expand Up @@ -112,6 +112,7 @@ ebas_component = ["pm25_mass"]
ebas_matrix = ["pm25",]
standard_names = ["mass_concentration_of_pm2p5_ambient_aerosol_particles_in_air", ]


#[variables.]
#actris_variable = [""]
#actris_matrix = [""]
Expand Down
4 changes: 2 additions & 2 deletions tests/test_ActrisEbasReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ class TestActrisEbasTimeSeriesReader(unittest.TestCase):
# pyaerocom_vars_to_read = ["vmrso2"]

# pyaerocom_vars_to_read = ["concso4t"]
# pyaerocom_vars_to_read = ["concpm10"]
pyaerocom_vars_to_read = ["concpm10"]
# pyaerocom_vars_to_read = ["concpm25"]
pyaerocom_vars_to_read = ["concpm1"]
# pyaerocom_vars_to_read = ["concpm1"]

station_filter = pyaro.timeseries.Filter.StationFilter(
["Birkenes II", "Jungfraujoch", "Ispra", "Melpitz", "Westerland"], []
Expand Down
8 changes: 4 additions & 4 deletions tests/test_PyerocomBinding.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_pyaerocom_aeronet(self):
url = "https://pyaerocom.met.no/pyaro-suppl/testdata/aeronetsun_testdata.csv"
obsconfig = PyaroConfig(
name=data_name,
data_id=data_id,
reader_id=data_id,
filename_or_obj_or_url=url,
filters={"variables": {"include": ["AOD_440nm"]}},
name_map={"AOD_440nm": self.AERONETVAR},
Expand Down Expand Up @@ -63,7 +63,7 @@ def test_pyaerocom_actrisebas_single_var(self):
url = self.ACTRISEBASVAR
obsconfig = PyaroConfig(
name=data_name,
data_id=data_id,
reader_id=data_id,
filename_or_obj_or_url=url,
filters=station_filter,
)
Expand Down Expand Up @@ -102,12 +102,12 @@ def test_pyaerocom_actrisebas_many_var(self):
url = self.ACTRISEBASVARLIST
obsconfig = PyaroConfig(
name=data_name,
data_id=data_id,
reader_id=data_id,
filename_or_obj_or_url=url,
filters=station_filter,
)
reader = ReadUngridded(f"{data_name}")
data = reader._read(vars_to_retrieve=self.ACTRISEBASVAR, configs=obsconfig)
data = reader.read(vars_to_retrieve=self.ACTRISEBASVAR, configs=obsconfig)
self.assertGreaterEqual(len(data.unique_station_names), 4)
self.assertIn("Ispra", data.unique_station_names)
self.assertIn(url[0], data.contains_vars)
Expand Down

0 comments on commit 3052ea0

Please sign in to comment.