Skip to content

Commit

Permalink
Refactor argovis pre-processor
Browse files Browse the repository at this point in the history
- make it serializable
-
  • Loading branch information
gmaze committed Sep 27, 2024
1 parent 5e5f22a commit 7ba07ca
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 118 deletions.
134 changes: 16 additions & 118 deletions argopy/data_fetchers/argovis_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from ..options import OPTIONS, DEFAULT
from ..utils.format import format_oneline
from ..utils.chunking import Chunker
from ..utils.decorators import deprecated
from ..errors import DataNotFound
from .proto import ArgoDataFetcherProto

from .argovis_data_processors import pre_process, add_attributes

access_points = ["wmo", "box"]
exit_formats = ["xarray"]
Expand Down Expand Up @@ -97,11 +98,7 @@ def __init__(
if not isinstance(parallel, bool):
parallel_method = parallel
parallel = True
if parallel_method not in ["thread"]:
raise ValueError(
"argovis only support multi-threading, use 'thread' instead of '%s'"
% parallel_method
)

self.parallel = parallel
self.parallel_method = parallel_method
self.progress = progress
Expand Down Expand Up @@ -147,105 +144,6 @@ def _add_history(self, this, txt):
this.attrs["history"] = txt
return this

def _add_attributes(self, this): # noqa: C901
"""Add variables attributes not return by argovis requests
#todo: This is hard coded, but should be retrieved from an API somewhere
"""
for v in this.data_vars:
if "TEMP" in v and "_QC" not in v:
this[v].attrs = {
"long_name": "SEA TEMPERATURE IN SITU ITS-90 SCALE",
"standard_name": "sea_water_temperature",
"units": "degree_Celsius",
"valid_min": -2.0,
"valid_max": 40.0,
"resolution": 0.001,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "PSAL" in v and "_QC" not in v:
this[v].attrs = {
"long_name": "PRACTICAL SALINITY",
"standard_name": "sea_water_salinity",
"units": "psu",
"valid_min": 0.0,
"valid_max": 43.0,
"resolution": 0.001,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "PRES" in v and "_QC" not in v:
this[v].attrs = {
"long_name": "Sea Pressure",
"standard_name": "sea_water_pressure",
"units": "decibar",
"valid_min": 0.0,
"valid_max": 12000.0,
"resolution": 0.1,
"axis": "Z",
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "DOXY" in v and "_QC" not in v:
this[v].attrs = {
"long_name": "Dissolved oxygen",
"standard_name": "moles_of_oxygen_per_unit_mass_in_sea_water",
"units": "micromole/kg",
"valid_min": -5.0,
"valid_max": 600.0,
"resolution": 0.001,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "_QC" in v:
attrs = {
"long_name": "Global quality flag of %s profile" % v,
"convention": "Argo reference table 2a",
}
this[v].attrs = attrs

if "CYCLE_NUMBER" in this.data_vars:
this["CYCLE_NUMBER"].attrs = {
"long_name": "Float cycle number",
"convention": "0..N, 0 : launch cycle (if exists), 1 : first complete cycle",
}

if "DATA_MODE" in this.data_vars:
this["DATA_MODE"].attrs = {
"long_name": "Delayed mode or real time data",
"convention": "R : real time; D : delayed mode; A : real time with adjustment",
}

if "DIRECTION" in this.data_vars:
this["DIRECTION"].attrs = {
"long_name": "Direction of the station profiles",
"convention": "A: ascending profiles, D: descending profiles",
}

if "PLATFORM_NUMBER" in this.data_vars:
this["PLATFORM_NUMBER"].attrs = {
"long_name": "Float unique identifier",
"convention": "WMO float identifier : A9IIIII",
}

return this

@property
def cachepath(self):
Expand All @@ -267,6 +165,7 @@ def safe_for_fsspec_cache(url):

return [safe_for_fsspec_cache(url) for url in urls]

@deprecated('Not serializable')
def json2dataframe(self, profiles):
"""convert json data to Pandas DataFrame"""
# Make sure we deal with a list
Expand Down Expand Up @@ -310,38 +209,39 @@ def json2dataframe(self, profiles):
df = pd.DataFrame(rows)
return df

def to_dataframe(self, errors: str = "ignore"):
def to_dataframe(self, errors: str = "ignore") -> pd.DataFrame:
"""Load Argo data and return a Pandas dataframe"""

# Download data:
if not self.parallel:
method = "sequential"
else:
method = self.parallel_method

preprocess_opts = {'key_map': self.key_map}
df_list = self.fs.open_mfjson(
self.uri,
method=method,
preprocess=self.json2dataframe,
preprocess=pre_process,
preprocess_opts=preprocess_opts,
progress=self.progress,
errors=errors,
)

# Merge results (list of dataframe):
for i, df in enumerate(df_list):
df = df.reset_index()
df = df.rename(columns=self.key_map)
df = df[[value for value in self.key_map.values() if value in df.columns]]
df_list[i] = df
df = pd.concat(df_list, ignore_index=True)
if df.shape[0] == 0:
raise DataNotFound("No data found for: %s" % self.cname())

df.sort_values(by=["TIME", "PRES"], inplace=True)
df["N_POINTS"] = np.arange(0, len(df["N_POINTS"]))
df = df.set_index(["N_POINTS"])
return df

def to_xarray(self, errors: str = "ignore"):
def to_xarray(self, errors: str = "ignore") -> xr.Dataset:
"""Download and return data as xarray Datasets"""
ds = self.to_dataframe(errors=errors).to_xarray()
# ds["TIME"] = pd.to_datetime(ds["TIME"], utc=True)
ds = ds.sortby(
["TIME", "PRES"]
) # should already be sorted by date in descending order
Expand All @@ -350,7 +250,6 @@ def to_xarray(self, errors: str = "ignore"):
) # Re-index to avoid duplicate values

# Set coordinates:
# ds = ds.set_coords('N_POINTS')
coords = ("LATITUDE", "LONGITUDE", "TIME", "N_POINTS")
ds = ds.reset_coords()
ds["N_POINTS"] = ds["N_POINTS"]
Expand All @@ -359,12 +258,11 @@ def to_xarray(self, errors: str = "ignore"):
ds = ds.rename({v: v.upper()})
ds = ds.set_coords(coords)

# Cast data types and add variable attributes (not available in the csv download):
ds["TIME"] = pd.to_datetime(ds["TIME"], utc=True)
ds = self._add_attributes(ds)
# Add variable attributes and cast data types:
ds = add_attributes(ds)
ds = ds.argo.cast_types()

# Remove argovis file attributes and replace them with argopy ones:
# Remove argovis dataset attributes and replace them with argopy ones:
ds.attrs = {}
if self.dataset_id == "phy":
ds.attrs["DATA_ID"] = "ARGO"
Expand Down
Loading

0 comments on commit 7ba07ca

Please sign in to comment.