Skip to content

Commit

Permalink
Add CI tests for Dask client parallelization method
Browse files Browse the repository at this point in the history
- httpstore._mfprocessor_json no longer raises DataNotFound
  • Loading branch information
gmaze committed Oct 11, 2024
1 parent e7512c1 commit aac32bc
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 5 deletions.
3 changes: 3 additions & 0 deletions argopy/data_fetchers/argovis_data_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
def pre_process(profiles: Any, key_map: dict = None) -> pd.DataFrame:
"""convert json data to Pandas DataFrame"""

if profiles is None:
return None

# Make sure we deal with a list
if isinstance(profiles, list):
data = profiles
Expand Down
3 changes: 3 additions & 0 deletions argopy/data_fetchers/gdac_data_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ def pre_process_multiprof(
-------
:class:`xarray.Dataset`
"""
if ds is None:
return None

# Remove raw netcdf file attributes and replace them with argopy ones:
raw_attrs = ds.attrs
ds.attrs = {}
Expand Down
2 changes: 1 addition & 1 deletion argopy/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class DataNotFound(NoData):


class NoDataLeft(NoData):
"""Raise when a data post-processing returns an empty dataset or dataframe"""
"""Raise when data processing returns an empty dataset or dataframe"""
pass


Expand Down
2 changes: 1 addition & 1 deletion argopy/stores/argo_index_pa.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def read_csv(input_file, nrows=None):

def csv2index(obj):
index = read_csv(obj, nrows=nrows)
log.debug(index.column_names)
# log.debug(index.column_names)
check_index_cols(
index.column_names,
convention=self.convention,
Expand Down
14 changes: 11 additions & 3 deletions argopy/stores/filesystems.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
FileSystemHasNoCache,
CacheFileNotFound,
DataNotFound,
NoDataLeft,
InvalidMethod,
ErddapHTTPUnauthorized,
ErddapHTTPNotFound,
Expand Down Expand Up @@ -787,6 +788,8 @@ def open_dataset(self, url, errors: str = "raise", **kwargs) -> xr.Dataset:
------
:class:`TypeError` if data returned by ``url`` are not CDF or HDF5 binary data.
:class:`DataNotFound` if ``errors`` is set to ``raise`` and url returns no data.
See Also
--------
:class:`httpstore.open_mfdataset`
Expand All @@ -803,6 +806,13 @@ def open_dataset(self, url, errors: str = "raise", **kwargs) -> xr.Dataset:
log.error("DataNotFound: %s" % url)
return None

if b'Not Found: Your query produced no matching results' in data:
if errors == "raise":
raise DataNotFound(url)
elif errors == "ignore":
log.error("DataNotFound from [%s]: %s" % (url, data))
return None

if data[0:3] != b"CDF" and data[0:3] != b"\x89HD":
raise TypeError(
"We didn't get a CDF or HDF5 binary data as expected ! We get: %s"
Expand Down Expand Up @@ -1411,9 +1421,7 @@ def _mfprocessor_json(
data = self.open_json(url, **open_json_opts)

# Pre-process
if data is None:
raise DataNotFound(url)
elif isinstance(preprocess, types.FunctionType) or isinstance(
if isinstance(preprocess, types.FunctionType) or isinstance(
preprocess, types.MethodType
):
if url_follow:
Expand Down
139 changes: 139 additions & 0 deletions argopy/tests/test_fetchers_Dask_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import pytest
import logging

from dask.distributed import Client
from argopy import DataFetcher
from collections import ChainMap
import xarray as xr

from mocked_http import mocked_server_address, mocked_httpserver


log = logging.getLogger("argopy.tests.dask")
USE_MOCKED_SERVER = True

"""
List data sources to be tested
"""
SRC_LIST = ["erddap", "argovis", "gdac"]


"""
List access points to be tested for each datasets: phy.
For each access points, we list 1-to-2 scenario to make sure all possibilities are tested
"""
PARALLEL_ACCESS_POINTS = [
{
"phy": [
{"region": [-60, -55, 40.0, 45.0, 0.0, 20.0, "2007-08-01", "2007-09-01"]},
]
},
]

"""
List user modes to be tested
"""
USER_MODES = ["standard"] # Because it's the only available with argovis

"""
Make a list of VALID dataset/access_points to be tested
"""
VALID_PARALLEL_ACCESS_POINTS, VALID_PARALLEL_ACCESS_POINTS_IDS = [], []
for entry in PARALLEL_ACCESS_POINTS:
for src in SRC_LIST:
for ds in entry:
for mode in USER_MODES:
for ap in entry[ds]:
VALID_PARALLEL_ACCESS_POINTS.append(
{"src": src, "ds": ds, "mode": mode, "access_point": ap}
)
VALID_PARALLEL_ACCESS_POINTS_IDS.append(
"src='%s', ds='%s', mode='%s', %s" % (src, ds, mode, ap)
)


def create_fetcher(fetcher_args, access_point):
"""Create a fetcher for a given set of facade options and access point"""

def core(fargs, apts):
try:
f = DataFetcher(**fargs)
if "float" in apts:
f = f.float(apts["float"])
elif "profile" in apts:
f = f.profile(*apts["profile"])
elif "region" in apts:
f = f.region(apts["region"])
except Exception:
raise
return f

fetcher = core(fetcher_args, access_point)
return fetcher


class Test_Backend:
"""Test Dask cluster parallelization"""

#############
# UTILITIES #
#############
def setup_class(self):
"""setup any state specific to the execution of the given class"""
# Create the cache folder here, so that it's not the same for the pandas and pyarrow tests
self.client = Client(processes=True)
log.debug(self.client.dashboard_link)

def _test2fetcherargs(self, this_request):
"""Helper method to set up options for a fetcher creation"""
defaults_args = {
"parallel": self.client,
"chunks_maxsize": {"lon": 2.5, "lat": 2.5},
}
if USE_MOCKED_SERVER:
defaults_args["server"] = mocked_server_address

src = this_request.param["src"]
dataset = this_request.param["ds"]
user_mode = this_request.param["mode"]
access_point = this_request.param["access_point"]

fetcher_args = ChainMap(
defaults_args,
{
"src": src,
"ds": dataset,
"mode": user_mode,
},
)

# log.debug("Setting up fetcher arguments:%s" % fetcher_args)
return fetcher_args, access_point

@pytest.fixture
def fetcher(self, request):
"""Fixture to create a data fetcher for a given dataset and access point"""
fetcher_args, access_point = self._test2fetcherargs(request)
yield create_fetcher(fetcher_args, access_point)

def teardown_class(self):
"""Cleanup once we are finished."""
self.client.close()

#########
# TESTS #
#########
@pytest.mark.parametrize(
"fetcher",
VALID_PARALLEL_ACCESS_POINTS,
indirect=True,
ids=VALID_PARALLEL_ACCESS_POINTS_IDS,
)
def test_fetching_erddap(self, mocked_httpserver, fetcher):
# log.debug(fetcher)
# log.debug(len(fetcher.uri))
# log.debug(fetcher.uri)
assert len(fetcher.uri) > 1

ds = fetcher.to_xarray()
assert isinstance(ds, xr.Dataset)

0 comments on commit aac32bc

Please sign in to comment.