Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare for "v1.0.0 Unicorn Release 🦄" #400

Merged
merged 23 commits into from
Oct 16, 2024
Merged

Prepare for "v1.0.0 Unicorn Release 🦄" #400

merged 23 commits into from
Oct 16, 2024

Conversation

gmaze
Copy link
Member

@gmaze gmaze commented Oct 15, 2024

Setup

Prepare code for release

Deprecation policy

  • Check the code for the deprecated decorator and enforce the deprecation policy:
    • If code is marked as deprecated since version = v1.0.0 : do nothing (first version with deprecation warning)
    • If code is marked as deprecated since version = v1.0.0-1 : do nothing (2nd and last version with deprecation warning)
    • If code is marked as deprecated since version = v1.0.0-2 : delete code (code will raise an error)
  • Update the documentation according to new deprecations

Update static content

  • Update CI tests data used by mocked ftp and http servers. Use CLI citests_httpdata_manager:
    cd cli
    ./citests_httpdata_manager -a clear --force --refresh
    ./citests_httpdata_manager -a download
    ./citests_httpdata_manager -a check
  • Update list of valid Reference tables from the NVS server
  • Update static assets files
  • Update the cheatsheet PDF with all new release features

Code clean-up and update

  • Run codespell from repo root and fix errors: codespell -q 2
  • Run flake8 from repo root and fix errors

Software distribution readiness

  • Manually trigger upstream CI tests for the release branch and ensure they are passed
  • Update pinned dependencies versions in ./ci/requirements/py*-*-pinned.yml environment files using upstream CI tests information
  • Possibly update ./requirements.txt and ./docs/requirements.txt if the oldest dependencies versions were upgraded
  • Make sure that all CI tests are passed
  • Make sure the documentation for this release branch is built on RTD

Preparation conclusion

  • Merge this PR to master
  • Update release date in ./docs/whats-new.rst
  • Make sure all CI tests are passed and RTD doc is built on the master branch

Publish the release

  • Last check the ./setup.py file version of the release and that the documentation is ready
  • "Create a new release" on GitHub.
    Choose a release tag v1.0.0, fill in the release title and click on the Auto-generate release notes button. Once ready, publish the release. This will trigger the publish Github action that will push the release on Pypi.
  • Checkout on Pypi and Conda that the new release is distributed.

Publish on pypi

CI tests / RTD build results

CI tests
CI tests Upstream
Documentation Status

@gmaze gmaze added the release label Oct 15, 2024
@gmaze gmaze added this to the Development roadmap milestone Oct 15, 2024
@gmaze gmaze self-assigned this Oct 15, 2024
Copy link

codecov bot commented Oct 16, 2024

❌ 13 Tests Failed:

Tests completed Failed Passed Skipped
1637 13 1624 141
View the top 3 failed tests by shortest run time
test_fetchers_data_erddap.py::Test_Backend::test_fetching_parallel_thread[ds='phy', mode='research', {'region': [-60, -55, 40.0, 45.0, 0.0, 20.0]}]
Stack Traces | 0.005s run time
self = <argopy.tests.test_fetchers_data_erddap.Test_Backend object at 0x000002299BD6BF10>
mocked_erddapserver = 'http://127.0.0.1:9898'
parallel_fetcher = <datafetcher.erddap>
⭐ Name: Ifremer erddap Argo data fetcher for a space/time region
#x1F5FA  Domain: [x=-60.00/-55.00; y=4... API: http://127.0.0.1:9898
#x1F6A3 User mode: research
#x1F7E1+#x1F535 Dataset: phy
#x1F324  Performances: cache=False, parallel=True [thread]

    @pytest.mark.parametrize("parallel_fetcher", VALID_PARALLEL_ACCESS_POINTS,
                             indirect=True,
                             ids=VALID_PARALLEL_ACCESS_POINTS_IDS)
    def test_fetching_parallel_thread(self, mocked_erddapserver, parallel_fetcher):
>       assert_fetcher(mocked_erddapserver, parallel_fetcher, cacheable=False)

argopy\tests\test_fetchers_data_erddap.py:217: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
argopy\tests\test_fetchers_data_erddap.py:127: in assert_fetcher
    assert_all(this_fetcher, cacheable)
argopy\tests\test_fetchers_data_erddap.py:103: in assert_all
    ds = this_fetcher.to_xarray(errors='raise')
argopy\fetchers.py:616: in to_xarray
    xds = self.fetcher.to_xarray(**kwargs)
argopy\data_fetchers\erddap_data.py:801: in to_xarray
    results = self.fs.open_mfdataset(URI, **opts)
argopy\stores\filesystems.py:1141: in open_mfdataset
    return self._open_mfdataset_from_erddap(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <argopy.stores.filesystems.httpstore object at 0x00000229BEA0B940>
urls = ['http://127.0.0.1:9898/tabledap/ArgoFloats.nc?config_mission_number,cycle_number,data_mode,direction,latitude,longitu...sted!=NaN&temp_adjusted!=NaN&psal_adjusted!=NaN&latitude!=NaN&longitude!=NaN&distinct()&orderBy("time,pres_adjusted")']
concat_dim = 'N_POINTS', max_workers = 6
preprocess = <function pre_process at 0x000002299BC50AF0>
preprocess_opts = {'URI': ['http://127.0.0.1:9898/tabledap/ArgoFloats.nc?config_mission_number,cycle_number,data_mode,direction,latitude...ted!=NaN&latitude!=NaN&longitude!=NaN&distinct()&orderBy("time,pres_adjusted")'], 'add_dm': False, 'dataset_id': 'phy'}
concat = True, progress = False, compute_details = False, args = ()
kwargs = {'final_opts': {'data_vars': 'all'}}
task_fct = <functools._lru_cache_wrapper object at 0x00000229C3B1C7D0>
postprocessing_fct = <function httpstore._open_mfdataset_from_erddap.<locals>.postprocessing_fct at 0x00000229C046FEB0>
finalize = <function httpstore._open_mfdataset_from_erddap.<locals>.finalize at 0x00000229C046D2D0>
task_legend = {'c': 'Callback', 'f': 'Failed or No Data', 'p': 'Formatting xarray dataset', 'w': 'Downloading netcdf from the erddap'}
finalize_fct = <function httpstore._open_mfdataset_from_erddap.<locals>.finalize at 0x00000229C046D2D0>
run = <argopy.utils.monitored_threadpool.MyThreadPoolExecutor object at 0x00000229C3AD6DD0>
results = DataNotFound('All URLs returned DataNotFound !'), failed = []

    def _open_mfdataset_from_erddap(
        self,
        urls: list,
        concat_dim: str = "rows",
        max_workers: int = 6,
        preprocess: Callable = None,
        preprocess_opts: dict = None,
        concat: bool = True,
        progress: bool = True,
        compute_details: bool = False,
        *args,
        **kwargs,
    ):
        """
        Method used by :class:`httpstore.open_mfdataset` dedicated to handle the case where we need to
        create a dataset from multiples erddap urls download/preprocessing and need a visual feedback of the
        procedure up to the final merge.
    
        - httpstore.open_dataset is distributed is handle by a pool of threads
    
        """
        strUrl = lambda x: x.replace("https://", "").replace(  # noqa: E731
            "http://", ""
        )
    
        @lru_cache
        def task_fct(url):
            try:
                ds = self.open_dataset(url)
                ds.attrs["Fetched_url"] = url
                ds.attrs["Fetched_constraints"] = UriCName(url).cname
                return ds, True
            except FileNotFoundError:
                log.debug("task_fct: This url returned no data: %s" % strUrl(url))
                return DataNotFound(url), True
            except Exception as e:
                log.debug(
                    "task_fct: Unexpected error when opening the remote dataset '%s':\n'%s'"
                    % (strUrl(url), str(e))
                )
                return None, False
    
        def postprocessing_fct(obj, **kwargs):
            if isinstance(obj, xr.Dataset):
                try:
                    ds = preprocess(obj, **kwargs)
                    return ds, True
                except Exception as e:
                    log.debug(
                        "postprocessing_fct: Unexpected error when post-processing dataset: '%s'"
                        % str(e)
                    )
                    return None, False
    
            elif isinstance(obj, DataNotFound):
                return obj, True
    
            elif obj is None:
                # This is because some un-expected Exception was raised in task_fct(url)
                return None, False
    
            else:
                log.debug("postprocessing_fct: Unexpected object: '%s'" % type(obj))
                return None, False
    
        def finalize(obj_list, **kwargs):
            try:
                # Read list of datasets from the list of objects:
                ds_list = [v for v in dict(sorted(obj_list.items())).values()]
                # Only keep non-empty results:
                ds_list = [
                    r
                    for r in ds_list
                    if (r is not None and not isinstance(r, DataNotFound))
                ]
                # log.debug(ds_list)
                if len(ds_list) > 0:
                    if "data_vars" in kwargs and kwargs["data_vars"] == "all":
                        # log.info('fill_variables_not_in_all_datasets')
                        ds_list = fill_variables_not_in_all_datasets(
                            ds_list, concat_dim=concat_dim
                        )
                    else:
                        # log.info('drop_variables_not_in_all_datasets')
                        ds_list = drop_variables_not_in_all_datasets(ds_list)
    
                    log.info("Nb of dataset to concat: %i" % len(ds_list))
                    # log.debug(concat_dim)
                    # for ds in ds_list:
                    #     log.debug(ds[concat_dim])
                    log.info(
                        "Dataset sizes before concat: %s"
                        % [len(ds[concat_dim]) for ds in ds_list]
                    )
                    ds = xr.concat(
                        ds_list,
                        dim=concat_dim,
                        data_vars="minimal",
                        coords="minimal",
                        compat="override",
                    )
                    log.info("Dataset size after concat: %i" % len(ds[concat_dim]))
                    return ds, True
                else:
                    ds_list = [v for v in dict(sorted(obj_list.items())).values()]
                    # Is the ds_list full of None or DataNotFound ?
                    if len([r for r in ds_list if (r is None)]) == len(ds_list):
                        log.debug("finalize: An error occurred with all URLs !")
                        return (
                            ValueError(
                                "An un-expected error occurred with all URLs, check log file for more "
                                "information"
                            ),
                            True,
                        )
                    elif len(
                        [r for r in ds_list if isinstance(r, DataNotFound)]
                    ) == len(ds_list):
                        log.debug("finalize: All URLs returned DataNotFound !")
                        return DataNotFound("All URLs returned DataNotFound !"), True
            except Exception as e:
                log.debug(
                    "finalize: Unexpected error when finalize request: '%s'" % str(e)
                )
                return None, False
    
        if ".nc" in urls[0]:
            task_legend = {
                "w": "Downloading netcdf from the erddap",
                "p": "Formatting xarray dataset",
                "c": "Callback",
                "f": "Failed or No Data",
            }
        else:
            task_legend = {"w": "Working", "p": "Post-processing", "c": "Callback"}
    
        if concat:
            finalize_fct = finalize
        else:
            finalize_fct = None
    
        run = MyExecutor(
            max_workers=max_workers,
            task_fct=task_fct,
            postprocessing_fct=postprocessing_fct,
            postprocessing_fct_kwargs=preprocess_opts,
            finalize_fct=finalize_fct,
            finalize_fct_kwargs=kwargs["final_opts"] if "final_opts" in kwargs else {},
            task_legend=task_legend,
            final_legend={
                "task": "Processing data chunks",
                "final": "Merging chunks of xarray dataset",
            },
            show=progress,
        )
        results, failed = run.execute(urls, list_failed=True)
    
        if concat:
            # results = Union[xr.DataSet, DataNotFound, None]
            if isinstance(results, xr.Dataset):
                if not compute_details:
                    return results
                else:
                    return results, failed, len(results)
            elif results is None:
                raise DataNotFound("An error occurred while finalizing the dataset")
            else:
>               raise results
E               argopy.errors.DataNotFound: 'All URLs returned DataNotFound !'

argopy\stores\filesystems.py:1041: DataNotFound
test_fetchers_data_erddap.py::Test_Backend::test_fetching_parallel_thread[ds='phy', mode='standard', {'region': [-60, -55, 40.0, 45.0, 0.0, 20.0]}]
Stack Traces | 0.005s run time
self = <argopy.tests.test_fetchers_data_erddap.Test_Backend object at 0x000002299BBDCB20>
mocked_erddapserver = 'http://127.0.0.1:9898'
parallel_fetcher = <datafetcher.erddap>
⭐ Name: Ifremer erddap Argo data fetcher for a space/time region
#x1F5FA  Domain: [x=-60.00/-55.00; y=4... API: http://127.0.0.1:9898
#x1F3CA User mode: standard
#x1F7E1+#x1F535 Dataset: phy
#x1F324  Performances: cache=False, parallel=True [thread]

    @pytest.mark.parametrize("parallel_fetcher", VALID_PARALLEL_ACCESS_POINTS,
                             indirect=True,
                             ids=VALID_PARALLEL_ACCESS_POINTS_IDS)
    def test_fetching_parallel_thread(self, mocked_erddapserver, parallel_fetcher):
>       assert_fetcher(mocked_erddapserver, parallel_fetcher, cacheable=False)

argopy\tests\test_fetchers_data_erddap.py:217: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
argopy\tests\test_fetchers_data_erddap.py:127: in assert_fetcher
    assert_all(this_fetcher, cacheable)
argopy\tests\test_fetchers_data_erddap.py:103: in assert_all
    ds = this_fetcher.to_xarray(errors='raise')
argopy\fetchers.py:616: in to_xarray
    xds = self.fetcher.to_xarray(**kwargs)
argopy\data_fetchers\erddap_data.py:801: in to_xarray
    results = self.fs.open_mfdataset(URI, **opts)
argopy\stores\filesystems.py:1141: in open_mfdataset
    return self._open_mfdataset_from_erddap(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <argopy.stores.filesystems.httpstore object at 0x00000229C027DB10>
urls = ['http://127.0.0.1:9898/tabledap/ArgoFloats.nc?config_mission_number,cycle_number,data_mode,direction,latitude,longitu...<=-55&latitude>=40.0&latitude<=45.0&pres>=0.0&pres<=20.0&latitude!=NaN&longitude!=NaN&distinct()&orderBy("time,pres")']
concat_dim = 'N_POINTS', max_workers = 6
preprocess = <function pre_process at 0x000002299BC50AF0>
preprocess_opts = {'URI': ['http://127.0.0.1:9898/tabledap/ArgoFloats.nc?config_mission_number,cycle_number,data_mode,direction,latitude...s>=0.0&pres<=20.0&latitude!=NaN&longitude!=NaN&distinct()&orderBy("time,pres")'], 'add_dm': False, 'dataset_id': 'phy'}
concat = True, progress = False, compute_details = False, args = ()
kwargs = {'final_opts': {'data_vars': 'all'}}
task_fct = <functools._lru_cache_wrapper object at 0x00000229BE2CAA30>
postprocessing_fct = <function httpstore._open_mfdataset_from_erddap.<locals>.postprocessing_fct at 0x00000229C046CD30>
finalize = <function httpstore._open_mfdataset_from_erddap.<locals>.finalize at 0x00000229C046CDC0>
task_legend = {'c': 'Callback', 'f': 'Failed or No Data', 'p': 'Formatting xarray dataset', 'w': 'Downloading netcdf from the erddap'}
finalize_fct = <function httpstore._open_mfdataset_from_erddap.<locals>.finalize at 0x00000229C046CDC0>
run = <argopy.utils.monitored_threadpool.MyThreadPoolExecutor object at 0x00000229C3B011E0>
results = DataNotFound('All URLs returned DataNotFound !'), failed = []

    def _open_mfdataset_from_erddap(
        self,
        urls: list,
        concat_dim: str = "rows",
        max_workers: int = 6,
        preprocess: Callable = None,
        preprocess_opts: dict = None,
        concat: bool = True,
        progress: bool = True,
        compute_details: bool = False,
        *args,
        **kwargs,
    ):
        """
        Method used by :class:`httpstore.open_mfdataset` dedicated to handle the case where we need to
        create a dataset from multiples erddap urls download/preprocessing and need a visual feedback of the
        procedure up to the final merge.
    
        - httpstore.open_dataset is distributed is handle by a pool of threads
    
        """
        strUrl = lambda x: x.replace("https://", "").replace(  # noqa: E731
            "http://", ""
        )
    
        @lru_cache
        def task_fct(url):
            try:
                ds = self.open_dataset(url)
                ds.attrs["Fetched_url"] = url
                ds.attrs["Fetched_constraints"] = UriCName(url).cname
                return ds, True
            except FileNotFoundError:
                log.debug("task_fct: This url returned no data: %s" % strUrl(url))
                return DataNotFound(url), True
            except Exception as e:
                log.debug(
                    "task_fct: Unexpected error when opening the remote dataset '%s':\n'%s'"
                    % (strUrl(url), str(e))
                )
                return None, False
    
        def postprocessing_fct(obj, **kwargs):
            if isinstance(obj, xr.Dataset):
                try:
                    ds = preprocess(obj, **kwargs)
                    return ds, True
                except Exception as e:
                    log.debug(
                        "postprocessing_fct: Unexpected error when post-processing dataset: '%s'"
                        % str(e)
                    )
                    return None, False
    
            elif isinstance(obj, DataNotFound):
                return obj, True
    
            elif obj is None:
                # This is because some un-expected Exception was raised in task_fct(url)
                return None, False
    
            else:
                log.debug("postprocessing_fct: Unexpected object: '%s'" % type(obj))
                return None, False
    
        def finalize(obj_list, **kwargs):
            try:
                # Read list of datasets from the list of objects:
                ds_list = [v for v in dict(sorted(obj_list.items())).values()]
                # Only keep non-empty results:
                ds_list = [
                    r
                    for r in ds_list
                    if (r is not None and not isinstance(r, DataNotFound))
                ]
                # log.debug(ds_list)
                if len(ds_list) > 0:
                    if "data_vars" in kwargs and kwargs["data_vars"] == "all":
                        # log.info('fill_variables_not_in_all_datasets')
                        ds_list = fill_variables_not_in_all_datasets(
                            ds_list, concat_dim=concat_dim
                        )
                    else:
                        # log.info('drop_variables_not_in_all_datasets')
                        ds_list = drop_variables_not_in_all_datasets(ds_list)
    
                    log.info("Nb of dataset to concat: %i" % len(ds_list))
                    # log.debug(concat_dim)
                    # for ds in ds_list:
                    #     log.debug(ds[concat_dim])
                    log.info(
                        "Dataset sizes before concat: %s"
                        % [len(ds[concat_dim]) for ds in ds_list]
                    )
                    ds = xr.concat(
                        ds_list,
                        dim=concat_dim,
                        data_vars="minimal",
                        coords="minimal",
                        compat="override",
                    )
                    log.info("Dataset size after concat: %i" % len(ds[concat_dim]))
                    return ds, True
                else:
                    ds_list = [v for v in dict(sorted(obj_list.items())).values()]
                    # Is the ds_list full of None or DataNotFound ?
                    if len([r for r in ds_list if (r is None)]) == len(ds_list):
                        log.debug("finalize: An error occurred with all URLs !")
                        return (
                            ValueError(
                                "An un-expected error occurred with all URLs, check log file for more "
                                "information"
                            ),
                            True,
                        )
                    elif len(
                        [r for r in ds_list if isinstance(r, DataNotFound)]
                    ) == len(ds_list):
                        log.debug("finalize: All URLs returned DataNotFound !")
                        return DataNotFound("All URLs returned DataNotFound !"), True
            except Exception as e:
                log.debug(
                    "finalize: Unexpected error when finalize request: '%s'" % str(e)
                )
                return None, False
    
        if ".nc" in urls[0]:
            task_legend = {
                "w": "Downloading netcdf from the erddap",
                "p": "Formatting xarray dataset",
                "c": "Callback",
                "f": "Failed or No Data",
            }
        else:
            task_legend = {"w": "Working", "p": "Post-processing", "c": "Callback"}
    
        if concat:
            finalize_fct = finalize
        else:
            finalize_fct = None
    
        run = MyExecutor(
            max_workers=max_workers,
            task_fct=task_fct,
            postprocessing_fct=postprocessing_fct,
            postprocessing_fct_kwargs=preprocess_opts,
            finalize_fct=finalize_fct,
            finalize_fct_kwargs=kwargs["final_opts"] if "final_opts" in kwargs else {},
            task_legend=task_legend,
            final_legend={
                "task": "Processing data chunks",
                "final": "Merging chunks of xarray dataset",
            },
            show=progress,
        )
        results, failed = run.execute(urls, list_failed=True)
    
        if concat:
            # results = Union[xr.DataSet, DataNotFound, None]
            if isinstance(results, xr.Dataset):
                if not compute_details:
                    return results
                else:
                    return results, failed, len(results)
            elif results is None:
                raise DataNotFound("An error occurred while finalizing the dataset")
            else:
>               raise results
E               argopy.errors.DataNotFound: 'All URLs returned DataNotFound !'

argopy\stores\filesystems.py:1041: DataNotFound
test_fetchers_data_erddap.py::Test_Backend::test_fetching_parallel_thread[ds='phy', mode='expert', {'region': [-60, -55, 40.0, 45.0, 0.0, 20.0]}]
Stack Traces | 0.006s run time
self = <argopy.tests.test_fetchers_data_erddap.Test_Backend object at 0x000002299BD69690>
mocked_erddapserver = 'http://127.0.0.1:9898'
parallel_fetcher = <datafetcher.erddap>
⭐ Name: Ifremer erddap Argo data fetcher for a space/time region
#x1F5FA  Domain: [x=-60.00/-55.00; y=4...
#x1F517 API: http://127.0.0.1:9898
#x1F3C4 User mode: expert
#x1F7E1+#x1F535 Dataset: phy
#x1F324  Performances: cache=False, parallel=True [thread]

    @pytest.mark.parametrize("parallel_fetcher", VALID_PARALLEL_ACCESS_POINTS,
                             indirect=True,
                             ids=VALID_PARALLEL_ACCESS_POINTS_IDS)
    def test_fetching_parallel_thread(self, mocked_erddapserver, parallel_fetcher):
>       assert_fetcher(mocked_erddapserver, parallel_fetcher, cacheable=False)

argopy\tests\test_fetchers_data_erddap.py:217: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
argopy\tests\test_fetchers_data_erddap.py:127: in assert_fetcher
    assert_all(this_fetcher, cacheable)
argopy\tests\test_fetchers_data_erddap.py:103: in assert_all
    ds = this_fetcher.to_xarray(errors='raise')
argopy\fetchers.py:616: in to_xarray
    xds = self.fetcher.to_xarray(**kwargs)
argopy\data_fetchers\erddap_data.py:801: in to_xarray
    results = self.fs.open_mfdataset(URI, **opts)
argopy\stores\filesystems.py:1141: in open_mfdataset
    return self._open_mfdataset_from_erddap(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <argopy.stores.filesystems.httpstore object at 0x00000229C1760220>
urls = ['http://127.0.0.1:9898/tabledap/ArgoFloats.nc?config_mission_number,cycle_number,data_mode,direction,latitude,longitu...<=-55&latitude>=40.0&latitude<=45.0&pres>=0.0&pres<=20.0&latitude!=NaN&longitude!=NaN&distinct()&orderBy("time,pres")']
concat_dim = 'N_POINTS', max_workers = 6
preprocess = <function pre_process at 0x000002299BC50AF0>
preprocess_opts = {'URI': ['http://127.0.0.1:9898/tabledap/ArgoFloats.nc?config_mission_number,cycle_number,data_mode,direction,latitude...s>=0.0&pres<=20.0&latitude!=NaN&longitude!=NaN&distinct()&orderBy("time,pres")'], 'add_dm': False, 'dataset_id': 'phy'}
concat = True, progress = False, compute_details = False, args = ()
kwargs = {'final_opts': {'data_vars': 'all'}}
task_fct = <functools._lru_cache_wrapper object at 0x00000229BE29F5E0>
postprocessing_fct = <function httpstore._open_mfdataset_from_erddap.<locals>.postprocessing_fct at 0x00000229C046D990>
finalize = <function httpstore._open_mfdataset_from_erddap.<locals>.finalize at 0x00000229C046F250>
task_legend = {'c': 'Callback', 'f': 'Failed or No Data', 'p': 'Formatting xarray dataset', 'w': 'Downloading netcdf from the erddap'}
finalize_fct = <function httpstore._open_mfdataset_from_erddap.<locals>.finalize at 0x00000229C046F250>
run = <argopy.utils.monitored_threadpool.MyThreadPoolExecutor object at 0x00000229C1760430>
results = DataNotFound('All URLs returned DataNotFound !'), failed = []

    def _open_mfdataset_from_erddap(
        self,
        urls: list,
        concat_dim: str = "rows",
        max_workers: int = 6,
        preprocess: Callable = None,
        preprocess_opts: dict = None,
        concat: bool = True,
        progress: bool = True,
        compute_details: bool = False,
        *args,
        **kwargs,
    ):
        """
        Method used by :class:`httpstore.open_mfdataset` dedicated to handle the case where we need to
        create a dataset from multiples erddap urls download/preprocessing and need a visual feedback of the
        procedure up to the final merge.
    
        - httpstore.open_dataset is distributed is handle by a pool of threads
    
        """
        strUrl = lambda x: x.replace("https://", "").replace(  # noqa: E731
            "http://", ""
        )
    
        @lru_cache
        def task_fct(url):
            try:
                ds = self.open_dataset(url)
                ds.attrs["Fetched_url"] = url
                ds.attrs["Fetched_constraints"] = UriCName(url).cname
                return ds, True
            except FileNotFoundError:
                log.debug("task_fct: This url returned no data: %s" % strUrl(url))
                return DataNotFound(url), True
            except Exception as e:
                log.debug(
                    "task_fct: Unexpected error when opening the remote dataset '%s':\n'%s'"
                    % (strUrl(url), str(e))
                )
                return None, False
    
        def postprocessing_fct(obj, **kwargs):
            if isinstance(obj, xr.Dataset):
                try:
                    ds = preprocess(obj, **kwargs)
                    return ds, True
                except Exception as e:
                    log.debug(
                        "postprocessing_fct: Unexpected error when post-processing dataset: '%s'"
                        % str(e)
                    )
                    return None, False
    
            elif isinstance(obj, DataNotFound):
                return obj, True
    
            elif obj is None:
                # This is because some un-expected Exception was raised in task_fct(url)
                return None, False
    
            else:
                log.debug("postprocessing_fct: Unexpected object: '%s'" % type(obj))
                return None, False
    
        def finalize(obj_list, **kwargs):
            try:
                # Read list of datasets from the list of objects:
                ds_list = [v for v in dict(sorted(obj_list.items())).values()]
                # Only keep non-empty results:
                ds_list = [
                    r
                    for r in ds_list
                    if (r is not None and not isinstance(r, DataNotFound))
                ]
                # log.debug(ds_list)
                if len(ds_list) > 0:
                    if "data_vars" in kwargs and kwargs["data_vars"] == "all":
                        # log.info('fill_variables_not_in_all_datasets')
                        ds_list = fill_variables_not_in_all_datasets(
                            ds_list, concat_dim=concat_dim
                        )
                    else:
                        # log.info('drop_variables_not_in_all_datasets')
                        ds_list = drop_variables_not_in_all_datasets(ds_list)
    
                    log.info("Nb of dataset to concat: %i" % len(ds_list))
                    # log.debug(concat_dim)
                    # for ds in ds_list:
                    #     log.debug(ds[concat_dim])
                    log.info(
                        "Dataset sizes before concat: %s"
                        % [len(ds[concat_dim]) for ds in ds_list]
                    )
                    ds = xr.concat(
                        ds_list,
                        dim=concat_dim,
                        data_vars="minimal",
                        coords="minimal",
                        compat="override",
                    )
                    log.info("Dataset size after concat: %i" % len(ds[concat_dim]))
                    return ds, True
                else:
                    ds_list = [v for v in dict(sorted(obj_list.items())).values()]
                    # Is the ds_list full of None or DataNotFound ?
                    if len([r for r in ds_list if (r is None)]) == len(ds_list):
                        log.debug("finalize: An error occurred with all URLs !")
                        return (
                            ValueError(
                                "An un-expected error occurred with all URLs, check log file for more "
                                "information"
                            ),
                            True,
                        )
                    elif len(
                        [r for r in ds_list if isinstance(r, DataNotFound)]
                    ) == len(ds_list):
                        log.debug("finalize: All URLs returned DataNotFound !")
                        return DataNotFound("All URLs returned DataNotFound !"), True
            except Exception as e:
                log.debug(
                    "finalize: Unexpected error when finalize request: '%s'" % str(e)
                )
                return None, False
    
        if ".nc" in urls[0]:
            task_legend = {
                "w": "Downloading netcdf from the erddap",
                "p": "Formatting xarray dataset",
                "c": "Callback",
                "f": "Failed or No Data",
            }
        else:
            task_legend = {"w": "Working", "p": "Post-processing", "c": "Callback"}
    
        if concat:
            finalize_fct = finalize
        else:
            finalize_fct = None
    
        run = MyExecutor(
            max_workers=max_workers,
            task_fct=task_fct,
            postprocessing_fct=postprocessing_fct,
            postprocessing_fct_kwargs=preprocess_opts,
            finalize_fct=finalize_fct,
            finalize_fct_kwargs=kwargs["final_opts"] if "final_opts" in kwargs else {},
            task_legend=task_legend,
            final_legend={
                "task": "Processing data chunks",
                "final": "Merging chunks of xarray dataset",
            },
            show=progress,
        )
        results, failed = run.execute(urls, list_failed=True)
    
        if concat:
            # results = Union[xr.DataSet, DataNotFound, None]
            if isinstance(results, xr.Dataset):
                if not compute_details:
                    return results
                else:
                    return results, failed, len(results)
            elif results is None:
                raise DataNotFound("An error occurred while finalizing the dataset")
            else:
>               raise results
E               argopy.errors.DataNotFound: 'All URLs returned DataNotFound !'

argopy\stores\filesystems.py:1041: DataNotFound

To view individual test run time comparison to the main branch, go to the Test Analytics Dashboard

@gmaze gmaze merged commit b2fd76a into master Oct 16, 2024
37 checks passed
@gmaze gmaze deleted the releasev1.0.0 branch October 16, 2024 16:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant