From 4f688a8114c7cf8bb1242cf3e5fe0afb3af8d18a Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Thu, 26 Sep 2024 17:13:15 +0200 Subject: [PATCH] serializable gdac pre-processor --- argopy/data_fetchers/gdac_data.py | 149 ++++++++++--------- argopy/data_fetchers/gdac_data_processors.py | 120 +++++++++++++++ 2 files changed, 195 insertions(+), 74 deletions(-) create mode 100644 argopy/data_fetchers/gdac_data_processors.py diff --git a/argopy/data_fetchers/gdac_data.py b/argopy/data_fetchers/gdac_data.py index ad7af97e..d589ad39 100644 --- a/argopy/data_fetchers/gdac_data.py +++ b/argopy/data_fetchers/gdac_data.py @@ -4,6 +4,7 @@ This is not intended to be used directly, only by the facade at fetchers.py """ + import numpy as np import pandas as pd import xarray as xr @@ -13,10 +14,13 @@ import logging from ..utils.format import format_oneline, argo_split_path +from ..utils.decorators import deprecated from ..options import OPTIONS, check_gdac_path from ..errors import DataNotFound from ..stores import ArgoIndex from .proto import ArgoDataFetcherProto +from .gdac_data_processors import pre_process_multiprof, filter_points + log = logging.getLogger("argopy.gdac.data") access_points = ["wmo", "box"] @@ -42,27 +46,27 @@ class GDACArgoDataFetcher(ArgoDataFetcherProto): ### @abstractmethod def init(self, *args, **kwargs): - """ Initialisation for a specific fetcher """ + """Initialisation for a specific fetcher""" raise NotImplementedError("Not implemented") ### # Methods that must not change ### def __init__( - self, - gdac: str = "", - ds: str = "", - cache: bool = False, - cachedir: str = "", - dimension: str = "point", - errors: str = "raise", - parallel: bool = False, - parallel_method: str = "thread", - progress: bool = False, - api_timeout: int = 0, - **kwargs + self, + gdac: str = "", + ds: str = "", + cache: bool = False, + cachedir: str = "", + dimension: str = "point", + errors: str = "raise", + parallel: bool = False, + parallel_method: str = "thread", + progress: bool = False, + api_timeout: int = 0, + **kwargs ): - """ Init fetcher + """Init fetcher Parameters ---------- @@ -117,9 +121,7 @@ def __init__( if "N_RECORDS" in kwargs: nrows = kwargs["N_RECORDS"] # Number of records in the index, this will force to load the index file: - self.N_RECORDS = self.indexfs.load( - nrows=nrows - ).N_RECORDS + self.N_RECORDS = self.indexfs.load(nrows=nrows).N_RECORDS self._post_filter_points = False # Set method to download data: @@ -160,13 +162,13 @@ def __repr__(self): return "\n".join(summary) def cname(self): - """ Return a unique string defining the constraints """ + """Return a unique string defining the constraints""" return self._cname() @property @abstractmethod def uri(self): - """ Return the list of files to load + """Return the list of files to load Returns ------- @@ -175,7 +177,7 @@ def uri(self): raise NotImplementedError("Not implemented") def uri_mono2multi(self, URIs: list): - """ Convert mono-profile URI files to multi-profile files + """Convert mono-profile URI files to multi-profile files Multi-profile file name is based on the dataset requested ('phy', 'bgc'/'bgc-s') @@ -225,7 +227,7 @@ def mono2multi(mono_path): @property def cachepath(self): - """ Return path to cache file(s) for this request + """Return path to cache file(s) for this request Returns ------- @@ -234,13 +236,14 @@ def cachepath(self): return [self.fs.cachepath(url) for url in self.uri] def clear_cache(self): - """ Remove cached files and entries from resources opened with this fetcher """ + """Remove cached files and entries from resources opened with this fetcher""" self.indexfs.clear_cache() self.fs.clear_cache() return self + @deprecated("Not serializable") def _preprocess_multiprof(self, ds): - """ Pre-process one Argo multi-profile file as a collection of points + """Pre-process one Argo multi-profile file as a collection of points Parameters ---------- @@ -255,7 +258,7 @@ def _preprocess_multiprof(self, ds): # Remove raw netcdf file attributes and replace them with argopy ones: raw_attrs = ds.attrs ds.attrs = {} - ds.attrs.update({'raw_attrs': raw_attrs}) + ds.attrs.update({"raw_attrs": raw_attrs}) # Rename JULD and JULD_QC to TIME and TIME_QC ds = ds.rename( @@ -293,7 +296,7 @@ def _preprocess_multiprof(self, ds): try: ds.attrs["Fetched_by"] = getpass.getuser() except: # noqa: E722 - ds.attrs["Fetched_by"] = 'anonymous' + ds.attrs["Fetched_by"] = "anonymous" ds.attrs["Fetched_date"] = pd.to_datetime("now", utc=True).strftime("%Y/%m/%d") ds.attrs["Fetched_constraints"] = self.cname() ds.attrs["Fetched_uri"] = ds.encoding["source"] @@ -304,8 +307,11 @@ def _preprocess_multiprof(self, ds): return ds + def pre_process(self, ds, *args, **kwargs): + return pre_process_multiprof(ds, *args, **kwargs) + def to_xarray(self, errors: str = "ignore"): - """ Load Argo data and return a :class:`xarray.Dataset` + """Load Argo data and return a :class:`xarray.Dataset` Parameters ---------- @@ -332,19 +338,36 @@ def to_xarray(self, errors: str = "ignore"): elif len(self.uri) == 0: raise DataNotFound("No data found for: %s" % self.indexfs.cname) - # Download data: + if hasattr(self, "BOX"): + access_point = "BOX" + access_point_opts = {'BOX': self.BOX} + elif hasattr(self, "CYC"): + access_point = "CYC" + access_point_opts = {'CYC': self.CYC} + elif hasattr(self, "WMO"): + access_point = "WMO" + access_point_opts = {'WMO': self.WMO} + + # Download and pre-process data: ds = self.fs.open_mfdataset( self.uri, method=self.method, concat_dim="N_POINTS", concat=True, - preprocess=self._preprocess_multiprof, + preprocess=pre_process_multiprof, + preprocess_opts={ + "access_point": access_point, + "access_point_opts": access_point_opts, + "pre_filter_points": self._post_filter_points, + }, progress=self.progress, errors=errors, - open_dataset_opts={'xr_opts': {'decode_cf': 1, 'use_cftime': 0, 'mask_and_scale': 1}}, + open_dataset_opts={ + "xr_opts": {"decode_cf": 1, "use_cftime": 0, "mask_and_scale": 1} + }, ) - # Data post-processing: + # Meta-data processing: ds["N_POINTS"] = np.arange( 0, len(ds["N_POINTS"]) ) # Re-index to avoid duplicate values @@ -355,18 +378,20 @@ def to_xarray(self, errors: str = "ignore"): if "Fetched_from" not in ds.attrs: raw_attrs = ds.attrs ds.attrs = {} - ds.attrs.update({'raw_attrs': raw_attrs}) + ds.attrs.update({"raw_attrs": raw_attrs}) if self.dataset_id == "phy": ds.attrs["DATA_ID"] = "ARGO" - if self.dataset_id == "bgc": + if self.dataset_id in ["bgc", "bgc-s"]: ds.attrs["DATA_ID"] = "ARGO-BGC" ds.attrs["DOI"] = "http://doi.org/10.17882/42182" ds.attrs["Fetched_from"] = self.server try: ds.attrs["Fetched_by"] = getpass.getuser() except: - ds.attrs["Fetched_by"] = 'anonymous' - ds.attrs["Fetched_date"] = pd.to_datetime("now", utc=True).strftime("%Y/%m/%d") + ds.attrs["Fetched_by"] = "anonymous" + ds.attrs["Fetched_date"] = pd.to_datetime("now", utc=True).strftime( + "%Y/%m/%d" + ) ds.attrs["Fetched_constraints"] = self.cname() if len(self.uri) == 1: @@ -376,42 +401,18 @@ def to_xarray(self, errors: str = "ignore"): return ds + @deprecated("Refactored to GDAC pre-processor submodule") def filter_points(self, ds): - """ Enforce request criteria - - This may be necessary if for download performance improvement we had to work with multi instead of mono profile - files: we loaded and merged multi-profile files, and then we need to make sure to retain only profiles requested. - """ if hasattr(self, "BOX"): - # - box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max] - # - box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max, datim_min, datim_max] - ds = ( - ds.where(ds["LONGITUDE"] >= self.BOX[0], drop=True) - .where(ds["LONGITUDE"] < self.BOX[1], drop=True) - .where(ds["LATITUDE"] >= self.BOX[2], drop=True) - .where(ds["LATITUDE"] < self.BOX[3], drop=True) - .where(ds["PRES"] >= self.BOX[4], drop=True) # todo what about PRES_ADJUSTED ? - .where(ds["PRES"] < self.BOX[5], drop=True) - ) - if len(self.BOX) == 8: - ds = ds.where( - ds["TIME"] >= np.datetime64(self.BOX[6]), drop=True - ).where(ds["TIME"] < np.datetime64(self.BOX[7]), drop=True) - - if hasattr(self, "CYC"): - this_mask = xr.DataArray( - np.zeros_like(ds["N_POINTS"]), - dims=["N_POINTS"], - coords={"N_POINTS": ds["N_POINTS"]}, - ) - for cyc in self.CYC: - this_mask += ds["CYCLE_NUMBER"] == cyc - this_mask = this_mask >= 1 # any - ds = ds.where(this_mask, drop=True) - - ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"])) - - return ds + access_point = "BOX" + access_point_opts = {'BOX': self.BOX} + elif hasattr(self, "CYC"): + access_point = "CYC" + access_point_opts = {'CYC': self.CYC} + elif hasattr(self, "WMO"): + access_point = "WMO" + access_point_opts = {'WMO': self.WMO} + return filter_points(ds, access_point=access_point, **access_point_opts) def transform_data_mode(self, ds: xr.Dataset, **kwargs): """Apply xarray argo accessor transform_data_mode method""" @@ -436,9 +437,9 @@ def filter_qc(self, ds: xr.Dataset, **kwargs): def filter_researchmode(self, ds: xr.Dataset, *args, **kwargs) -> xr.Dataset: """Filter dataset for research user mode - This filter will select only QC=1 delayed mode data with pressure errors smaller than 20db + This filter will select only QC=1 delayed mode data with pressure errors smaller than 20db - Use this filter instead of transform_data_mode and filter_qc + Use this filter instead of transform_data_mode and filter_qc """ ds = ds.argo.filter_researchmode(**kwargs) if ds.argo._type == "point": @@ -457,7 +458,7 @@ class Fetch_wmo(GDACArgoDataFetcher): """ def init(self, WMO: list = [], CYC=None, **kwargs): - """ Create Argo data loader for WMOs + """Create Argo data loader for WMOs Parameters ---------- @@ -483,7 +484,7 @@ def init(self, WMO: list = [], CYC=None, **kwargs): @property def uri(self): - """ List of files to load for a request + """List of files to load for a request Returns ------- @@ -513,7 +514,7 @@ class Fetch_box(GDACArgoDataFetcher): """ def init(self, box: list, nrows=None, **kwargs): - """ Create Argo data loader + """Create Argo data loader Parameters ---------- @@ -540,7 +541,7 @@ def init(self, box: list, nrows=None, **kwargs): @property def uri(self): - """ List of files to load for a request + """List of files to load for a request Returns ------- diff --git a/argopy/data_fetchers/gdac_data_processors.py b/argopy/data_fetchers/gdac_data_processors.py new file mode 100644 index 00000000..6995f35c --- /dev/null +++ b/argopy/data_fetchers/gdac_data_processors.py @@ -0,0 +1,120 @@ +import numpy as np +import pandas as pd +import xarray as xr +import getpass + + +def pre_process_multiprof( + ds: xr.Dataset, + access_point: str, + access_point_opts: {}, + pre_filter_points: bool = False, + # dataset_id: str = "phy", + # cname: str = '?', +) -> xr.Dataset: + """Pre-process one Argo multi-profile file as a collection of points + + Parameters + ---------- + ds: :class:`xarray.Dataset` + Dataset to process + + Returns + ------- + :class:`xarray.Dataset` + """ + # Remove raw netcdf file attributes and replace them with argopy ones: + raw_attrs = ds.attrs + ds.attrs = {} + ds.attrs.update({"raw_attrs": raw_attrs}) + + # Rename JULD and JULD_QC to TIME and TIME_QC + ds = ds.rename( + {"JULD": "TIME", "JULD_QC": "TIME_QC", "JULD_LOCATION": "TIME_LOCATION"} + ) + ds["TIME"].attrs = { + "long_name": "Datetime (UTC) of the station", + "standard_name": "time", + } + + # Cast data types: + ds = ds.argo.cast_types() + + # Enforce real pressure resolution : 0.1 db + for vname in ds.data_vars: + if "PRES" in vname and "QC" not in vname: + ds[vname].values = np.round(ds[vname].values, 1) + + # Remove variables without dimensions: + # todo: We should be able to find a way to keep them somewhere in the data structure + for v in ds.data_vars: + if len(list(ds[v].dims)) == 0: + ds = ds.drop_vars(v) + + ds = ( + ds.argo.profile2point() + ) # Default output is a collection of points, along N_POINTS + + + # Attributes are added by the caller + + # if dataset_id == "phy": + # ds.attrs["DATA_ID"] = "ARGO" + # if dataset_id in ["bgc", "bgc-s"]: + # ds.attrs["DATA_ID"] = "ARGO-BGC" + # + # ds.attrs["DOI"] = "http://doi.org/10.17882/42182" + # + # # ds.attrs["Fetched_from"] = server + # ds.attrs["Fetched_constraints"] = cname + # try: + # ds.attrs["Fetched_by"] = getpass.getuser() + # except: # noqa: E722 + # ds.attrs["Fetched_by"] = "anonymous" + # ds.attrs["Fetched_date"] = pd.to_datetime("now", utc=True).strftime("%Y/%m/%d") + # ds.attrs["Fetched_uri"] = ds.encoding["source"] + ds = ds[np.sort(ds.data_vars)] + + if pre_filter_points: + ds = filter_points(ds, access_point=access_point, **access_point_opts) + + return ds + + +def filter_points(ds: xr.Dataset, access_point: str = None, **kwargs) -> xr.Dataset: + """Enforce request criteria + + This may be necessary if for download performance improvement we had to work with multi instead of mono profile + files: we loaded and merged multi-profile files, and then we need to make sure to retain only profiles requested. + """ + if access_point == "BOX": + BOX = kwargs["BOX"] + # - box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max] + # - box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max, datim_min, datim_max] + ds = ( + ds.where(ds["LONGITUDE"] >= BOX[0], drop=True) + .where(ds["LONGITUDE"] < BOX[1], drop=True) + .where(ds["LATITUDE"] >= BOX[2], drop=True) + .where(ds["LATITUDE"] < BOX[3], drop=True) + .where(ds["PRES"] >= BOX[4], drop=True) # todo what about PRES_ADJUSTED ? + .where(ds["PRES"] < BOX[5], drop=True) + ) + if len(BOX) == 8: + ds = ds.where(ds["TIME"] >= np.datetime64(BOX[6]), drop=True).where( + ds["TIME"] < np.datetime64(BOX[7]), drop=True + ) + + if access_point == "CYC": + this_mask = xr.DataArray( + np.zeros_like(ds["N_POINTS"]), + dims=["N_POINTS"], + coords={"N_POINTS": ds["N_POINTS"]}, + ) + for cyc in kwargs["CYC"]: + this_mask += ds["CYCLE_NUMBER"] == cyc + this_mask = this_mask >= 1 # any + ds = ds.where(this_mask, drop=True) + + ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"])) + + return ds