diff --git a/argopy/data_fetchers/argovis_data_processors.py b/argopy/data_fetchers/argovis_data_processors.py index a065c97d..30997c9b 100644 --- a/argopy/data_fetchers/argovis_data_processors.py +++ b/argopy/data_fetchers/argovis_data_processors.py @@ -7,6 +7,9 @@ def pre_process(profiles: Any, key_map: dict = None) -> pd.DataFrame: """convert json data to Pandas DataFrame""" + if profiles is None: + return None + # Make sure we deal with a list if isinstance(profiles, list): data = profiles diff --git a/argopy/data_fetchers/gdac_data_processors.py b/argopy/data_fetchers/gdac_data_processors.py index 6995f35c..7e94d8b3 100644 --- a/argopy/data_fetchers/gdac_data_processors.py +++ b/argopy/data_fetchers/gdac_data_processors.py @@ -23,6 +23,9 @@ def pre_process_multiprof( ------- :class:`xarray.Dataset` """ + if ds is None: + return None + # Remove raw netcdf file attributes and replace them with argopy ones: raw_attrs = ds.attrs ds.attrs = {} diff --git a/argopy/errors.py b/argopy/errors.py index 997f826c..f00fe900 100644 --- a/argopy/errors.py +++ b/argopy/errors.py @@ -22,7 +22,7 @@ class DataNotFound(NoData): class NoDataLeft(NoData): - """Raise when a data post-processing returns an empty dataset or dataframe""" + """Raise when data processing returns an empty dataset or dataframe""" pass diff --git a/argopy/stores/argo_index_pa.py b/argopy/stores/argo_index_pa.py index 441bfeae..47ad38ec 100644 --- a/argopy/stores/argo_index_pa.py +++ b/argopy/stores/argo_index_pa.py @@ -99,7 +99,7 @@ def read_csv(input_file, nrows=None): def csv2index(obj): index = read_csv(obj, nrows=nrows) - log.debug(index.column_names) + # log.debug(index.column_names) check_index_cols( index.column_names, convention=self.convention, diff --git a/argopy/stores/filesystems.py b/argopy/stores/filesystems.py index 0057151f..e399d324 100644 --- a/argopy/stores/filesystems.py +++ b/argopy/stores/filesystems.py @@ -45,6 +45,7 @@ FileSystemHasNoCache, CacheFileNotFound, DataNotFound, + NoDataLeft, InvalidMethod, ErddapHTTPUnauthorized, ErddapHTTPNotFound, @@ -787,6 +788,8 @@ def open_dataset(self, url, errors: str = "raise", **kwargs) -> xr.Dataset: ------ :class:`TypeError` if data returned by ``url`` are not CDF or HDF5 binary data. + :class:`DataNotFound` if ``errors`` is set to ``raise`` and url returns no data. + See Also -------- :class:`httpstore.open_mfdataset` @@ -803,6 +806,13 @@ def open_dataset(self, url, errors: str = "raise", **kwargs) -> xr.Dataset: log.error("DataNotFound: %s" % url) return None + if b'Not Found: Your query produced no matching results' in data: + if errors == "raise": + raise DataNotFound(url) + elif errors == "ignore": + log.error("DataNotFound from [%s]: %s" % (url, data)) + return None + if data[0:3] != b"CDF" and data[0:3] != b"\x89HD": raise TypeError( "We didn't get a CDF or HDF5 binary data as expected ! We get: %s" @@ -1411,9 +1421,7 @@ def _mfprocessor_json( data = self.open_json(url, **open_json_opts) # Pre-process - if data is None: - raise DataNotFound(url) - elif isinstance(preprocess, types.FunctionType) or isinstance( + if isinstance(preprocess, types.FunctionType) or isinstance( preprocess, types.MethodType ): if url_follow: diff --git a/argopy/tests/test_fetchers_Dask_cluster.py b/argopy/tests/test_fetchers_Dask_cluster.py new file mode 100644 index 00000000..e4005be2 --- /dev/null +++ b/argopy/tests/test_fetchers_Dask_cluster.py @@ -0,0 +1,139 @@ +import pytest +import logging + +from dask.distributed import Client +from argopy import DataFetcher +from collections import ChainMap +import xarray as xr + +from mocked_http import mocked_server_address, mocked_httpserver + + +log = logging.getLogger("argopy.tests.dask") +USE_MOCKED_SERVER = True + +""" +List data sources to be tested +""" +SRC_LIST = ["erddap", "argovis", "gdac"] + + +""" +List access points to be tested for each datasets: phy. +For each access points, we list 1-to-2 scenario to make sure all possibilities are tested +""" +PARALLEL_ACCESS_POINTS = [ + { + "phy": [ + {"region": [-60, -55, 40.0, 45.0, 0.0, 20.0, "2007-08-01", "2007-09-01"]}, + ] + }, +] + +""" +List user modes to be tested +""" +USER_MODES = ["standard"] # Because it's the only available with argovis + +""" +Make a list of VALID dataset/access_points to be tested +""" +VALID_PARALLEL_ACCESS_POINTS, VALID_PARALLEL_ACCESS_POINTS_IDS = [], [] +for entry in PARALLEL_ACCESS_POINTS: + for src in SRC_LIST: + for ds in entry: + for mode in USER_MODES: + for ap in entry[ds]: + VALID_PARALLEL_ACCESS_POINTS.append( + {"src": src, "ds": ds, "mode": mode, "access_point": ap} + ) + VALID_PARALLEL_ACCESS_POINTS_IDS.append( + "src='%s', ds='%s', mode='%s', %s" % (src, ds, mode, ap) + ) + + +def create_fetcher(fetcher_args, access_point): + """Create a fetcher for a given set of facade options and access point""" + + def core(fargs, apts): + try: + f = DataFetcher(**fargs) + if "float" in apts: + f = f.float(apts["float"]) + elif "profile" in apts: + f = f.profile(*apts["profile"]) + elif "region" in apts: + f = f.region(apts["region"]) + except Exception: + raise + return f + + fetcher = core(fetcher_args, access_point) + return fetcher + + +class Test_Backend: + """Test Dask cluster parallelization""" + + ############# + # UTILITIES # + ############# + def setup_class(self): + """setup any state specific to the execution of the given class""" + # Create the cache folder here, so that it's not the same for the pandas and pyarrow tests + self.client = Client(processes=True) + log.debug(self.client.dashboard_link) + + def _test2fetcherargs(self, this_request): + """Helper method to set up options for a fetcher creation""" + defaults_args = { + "parallel": self.client, + "chunks_maxsize": {"lon": 2.5, "lat": 2.5}, + } + if USE_MOCKED_SERVER: + defaults_args["server"] = mocked_server_address + + src = this_request.param["src"] + dataset = this_request.param["ds"] + user_mode = this_request.param["mode"] + access_point = this_request.param["access_point"] + + fetcher_args = ChainMap( + defaults_args, + { + "src": src, + "ds": dataset, + "mode": user_mode, + }, + ) + + # log.debug("Setting up fetcher arguments:%s" % fetcher_args) + return fetcher_args, access_point + + @pytest.fixture + def fetcher(self, request): + """Fixture to create a data fetcher for a given dataset and access point""" + fetcher_args, access_point = self._test2fetcherargs(request) + yield create_fetcher(fetcher_args, access_point) + + def teardown_class(self): + """Cleanup once we are finished.""" + self.client.close() + + ######### + # TESTS # + ######### + @pytest.mark.parametrize( + "fetcher", + VALID_PARALLEL_ACCESS_POINTS, + indirect=True, + ids=VALID_PARALLEL_ACCESS_POINTS_IDS, + ) + def test_fetching_erddap(self, mocked_httpserver, fetcher): + # log.debug(fetcher) + # log.debug(len(fetcher.uri)) + # log.debug(fetcher.uri) + assert len(fetcher.uri) > 1 + + ds = fetcher.to_xarray() + assert isinstance(ds, xr.Dataset)