Skip to content

Commit

Permalink
Fix formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
magnusuMET committed Oct 31, 2024
1 parent 88fb69b commit 86eb67d
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 82 deletions.
203 changes: 128 additions & 75 deletions src/pyaro_readers/eeareader/EEATimeseriesReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
Data,
NpStructuredData,
Station,
Reader
Reader,
)
import pyaro.timeseries

Expand Down Expand Up @@ -355,6 +355,7 @@ def mapper(value: int) -> int:
return pyaro.timeseries.Flag.BELOW_THRESHOLD
else:
return pyaro.timeseries.Flag.INVALID

valid = self._data["Validity"].map_elements(mapper, return_dtype=int)
return np.array(valid)

Expand All @@ -375,17 +376,24 @@ def __init__(self, filename_or_obj_or_url, filters=None):
self._metadata = polars.read_csv(metadata_file)
pollutant_file = Path("pollutant.csv")
self._metadata_pollutant = polars.read_csv(pollutant_file).with_columns(
polars.col("URI").str.strip_prefix("http://dd.eionet.europa.eu/vocabulary/aq/pollutant/").cast(polars.Int32).alias("Id"),
polars.col("URI")
.str.strip_prefix("http://dd.eionet.europa.eu/vocabulary/aq/pollutant/")
.cast(polars.Int32)
.alias("Id"),
)
assert len(self._metadata_pollutant["Id"].unique()) == len(self._metadata_pollutant), "Pollutants are not unique"
assert len(self._metadata_pollutant["Id"].unique()) == len(
self._metadata_pollutant
), "Pollutants are not unique"

self._filters = []
if filters is not None:
for filter in filters:
if filter.name() in self.supported_filters():
self._filters.append(filter)
else:
raise NotImplementedError(f"This reader does not support filter {filter.name()}")
raise NotImplementedError(
f"This reader does not support filter {filter.name()}"
)
self._data_directory = data_directory

def supported_filters(self) -> list[str]:
Expand All @@ -410,9 +418,16 @@ def data(self, varname: str) -> Data:
data = self._read(varname, "GB", (datetime(2002, 1, 1), datetime(2004, 12, 31)))
return EEAData(data, varname)

def _read(self, variable: str, countrycode: str, timerange: Tuple[datetime, datetime] | None = None) -> polars.DataFrame:
def _read(
self,
variable: str,
countrycode: str,
timerange: Tuple[datetime, datetime] | None = None,
) -> polars.DataFrame:
# https://dd.eionet.europa.eu/vocabulary/aq/pollutant
pollutant_candidates = self._metadata_pollutant.filter(polars.col("Notation").eq(variable))
pollutant_candidates = self._metadata_pollutant.filter(
polars.col("Notation").eq(variable)
)
if len(pollutant_candidates) == 0:
raise Exception(f"No variable ID found for {variable}")

Expand Down Expand Up @@ -448,23 +463,27 @@ def _read(self, variable: str, countrycode: str, timerange: Tuple[datetime, date
else:
raise NotImplementedError(f"Filter {filter.name()} not supported")

dataset = polars.DataFrame(schema={
"Samplingpoint": str,
"Pollutant": polars.Int32,
"Start": polars.Datetime("ns"),
"End": polars.Datetime("ns"),
"Value": polars.Float32,
"Unit": str,
# "AggType": str,
"Validity": polars.Int32,
# "Verification": polars.Int32,
# "ResultTime": datetime,
# "DataCapture": datetime,
# "FkObservationLog": str,
})
dataset = polars.DataFrame(
schema={
"Samplingpoint": str,
"Pollutant": polars.Int32,
"Start": polars.Datetime("ns"),
"End": polars.Datetime("ns"),
"Value": polars.Float32,
"Unit": str,
# "AggType": str,
"Validity": polars.Int32,
# "Verification": polars.Int32,
# "ResultTime": datetime,
# "DataCapture": datetime,
# "FkObservationLog": str,
}
)
countries = self._country_code_mappings_eea.values()

assert set(i.name for i in unverified_path.iterdir()).issubset(countries), "Some directories has an unknown country code"
assert set(i.name for i in unverified_path.iterdir()).issubset(
countries
), "Some directories has an unknown country code"

paths = []
for countrycode in countries:
Expand All @@ -489,7 +508,20 @@ def _read(self, variable: str, countrycode: str, timerange: Tuple[datetime, date
pbar = tqdm(paths)
for file in pbar:
pbar.set_description(f"Processing {file.name:>34}")
ds = polars.read_parquet(file, use_pyarrow=True, pyarrow_options={"filters": pyarrow_filters}, columns=["Samplingpoint", "Pollutant", "Start", "End", "Value", "Unit", "Validity"])
ds = polars.read_parquet(
file,
use_pyarrow=True,
pyarrow_options={"filters": pyarrow_filters},
columns=[
"Samplingpoint",
"Pollutant",
"Start",
"End",
"Value",
"Unit",
"Validity",
],
)
if ds.shape[0] == 0:
continue
dataset.vstack(ds.cast({"Value": polars.Float32}), in_place=True)
Expand All @@ -499,14 +531,28 @@ def _read(self, variable: str, countrycode: str, timerange: Tuple[datetime, date

# Join with metadata table to get latitude, longitude and altitude
md = self._metadata.with_columns(
(polars.col("Country").map_elements(self._country_code_eea, return_dtype=str)
+ "/" + polars.col("Sampling Point Id")).alias("selector")
).select([
"selector", "Altitude", "Longitude", "Latitude",
])
(
polars.col("Country").map_elements(
self._country_code_eea, return_dtype=str
)
+ "/"
+ polars.col("Sampling Point Id")
).alias("selector")
).select(
[
"selector",
"Altitude",
"Longitude",
"Latitude",
]
)

joined = dataset.join(md, left_on="Samplingpoint", right_on="selector", how="left")
assert joined.filter(polars.col("Longitude").is_null()).shape[0] == 0, "Some stations does not have a suitable left join"
joined = dataset.join(
md, left_on="Samplingpoint", right_on="selector", how="left"
)
assert (
joined.filter(polars.col("Longitude").is_null()).shape[0] == 0
), "Some stations does not have a suitable left join"

return joined

Expand All @@ -517,56 +563,61 @@ def variables(self) -> list[str]:

def stations(self) -> list[str]:
stations = self._metadata.with_columns(
(polars.col("Country").map_elements(self._country_code_eea, return_dtype=str)
+ "/" + polars.col("Sampling Point Id")).alias("selector")
(
polars.col("Country").map_elements(
self._country_code_eea, return_dtype=str
)
+ "/"
+ polars.col("Sampling Point Id")
).alias("selector")
)["selector"]
return list(stations)

# ISO 3166-1 alpha-2 for countries in EEA
@functools.cached_property
def _country_code_mappings(self) -> dict[str, str]:
return {
'Albania': "AL",
'Andorra': "AD",
'Austria': "AT",
'Belgium': "BE",
'Bosnia and Herzegovina': "BA",
'Bulgaria': "BG",
'Croatia': "HR",
'Cyprus': "CY",
'Czechia': "CZ",
'Denmark': "DK",
'Estonia': "EE",
'Finland': "FI",
'France': "FR",
'Georgia': "GE",
'Germany': "DE",
'Greece': "GR",
'Hungary': "HU",
'Iceland': "IS",
'Ireland': "IE",
'Italy': "IT",
'Kosovo under UNSCR 1244/99': "XK",
'Latvia': "LV",
'Lithuania': "LT",
'Luxembourg': "LU",
'Malta': "MT",
'Montenegro': "ME",
'Netherlands': "NL",
'North Macedonia': "MK",
'Norway': "NO",
'Poland': "PL",
'Portugal': "PT",
'Romania': "RO",
'Serbia': "RS",
'Slovakia': "SK",
'Slovenia': "SI",
'Spain': "ES",
'Sweden': "SE",
'Switzerland': "CH",
'Türkiye': "TR",
'Ukraine': "UA",
'United Kingdom': "UK",
"Albania": "AL",
"Andorra": "AD",
"Austria": "AT",
"Belgium": "BE",
"Bosnia and Herzegovina": "BA",
"Bulgaria": "BG",
"Croatia": "HR",
"Cyprus": "CY",
"Czechia": "CZ",
"Denmark": "DK",
"Estonia": "EE",
"Finland": "FI",
"France": "FR",
"Georgia": "GE",
"Germany": "DE",
"Greece": "GR",
"Hungary": "HU",
"Iceland": "IS",
"Ireland": "IE",
"Italy": "IT",
"Kosovo under UNSCR 1244/99": "XK",
"Latvia": "LV",
"Lithuania": "LT",
"Luxembourg": "LU",
"Malta": "MT",
"Montenegro": "ME",
"Netherlands": "NL",
"North Macedonia": "MK",
"Norway": "NO",
"Poland": "PL",
"Portugal": "PT",
"Romania": "RO",
"Serbia": "RS",
"Slovakia": "SK",
"Slovenia": "SI",
"Spain": "ES",
"Sweden": "SE",
"Switzerland": "CH",
"Türkiye": "TR",
"Ukraine": "UA",
"United Kingdom": "UK",
}

# ISO 3166-1 alpha-2 for countries in EEA
Expand All @@ -577,9 +628,11 @@ def _country_code(self, country: str) -> str | None:
@functools.cached_property
def _country_code_mappings_eea(self) -> dict[str, str]:
mappings = self._country_code_mappings.copy()
mappings.update({
"United Kingdom": "GB",
})
mappings.update(
{
"United Kingdom": "GB",
}
)
return mappings

# Country codes used in "Samplingpoint" provided by each country
Expand Down
6 changes: 5 additions & 1 deletion src/pyaro_readers/eeareader/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
from .EEATimeseriesReader import EEATimeseriesReader, EEATimeseriesEngine, EEATimeSeriesReader2
from .EEATimeseriesReader import (
EEATimeseriesReader,
EEATimeseriesEngine,
EEATimeSeriesReader2,
)
19 changes: 13 additions & 6 deletions tests/test_EEAReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,20 @@ def test_eea_reader2():
from pyaro_readers.eeareader import EEATimeSeriesReader2
import pyaro.timeseries

filters = pyaro.timeseries.FilterCollection({
# "time_bounds": {"start_include": [("2023-01-01 00:00:00", "2023-12-24 00:00:00")]},
"stations": {"exclude": ["GB/GB_SamplingPoint_61718", "GB/GB_SamplingPoint_99"]},
"countries": {"include": ["UK"]},
})
filters = pyaro.timeseries.FilterCollection(
{
# "time_bounds": {"start_include": [("2023-01-01 00:00:00", "2023-12-24 00:00:00")]},
"stations": {
"exclude": ["GB/GB_SamplingPoint_61718", "GB/GB_SamplingPoint_99"]
},
"countries": {"include": ["UK"]},
}
)

reader = EEATimeSeriesReader2("/lustre/storeB/project/aerocom/aerocom1/AEROCOM_OBSDATA/EEA-AQDS/download", filters=filters)
reader = EEATimeSeriesReader2(
"/lustre/storeB/project/aerocom/aerocom1/AEROCOM_OBSDATA/EEA-AQDS/download",
filters=filters,
)

_ = reader.stations()
_ = reader.variables()
Expand Down

0 comments on commit 86eb67d

Please sign in to comment.