diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..cbfc089 --- /dev/null +++ b/.flake8 @@ -0,0 +1,16 @@ +[flake8] +exclude = __init__.py,.eggs,doc +ignore = + # whitespace before ':' - doesn't work well with black + E203 + E402 + # line too long - let black worry about that + E501 + # do not assign a lambda expression, use a def + E731 + # line break before binary operator + W503 + E265 + F811 + # Allows type hinting as Gridded[DataArray, "(X:center)"], where we did `from typing import Annotated as Gridded` + F722 diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..58b1e28 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,34 @@ +name: CI + +on: + push: + branches: + - "master" + pull_request: + branches: + - "*" + schedule: + - cron: "0 13 * * 1" + +jobs: + build: + defaults: + run: + shell: bash -l {0} + strategy: + fail-fast: false + matrix: + os: ["ubuntu-latest"] + python-version: ["3.9", "3.10"] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: python -m pip install -r requirements.txt + - name: Install Package + run: python -m pip install -e . --no-deps + - name: Test with pytest + run: pytest pangeo_forge_esgf/tests diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..570fc20 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,21 @@ +default_language_version: + python: python3 +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.3.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - repo: https://github.com/PyCQA/isort + rev: 5.10.1 + hooks: + - id: isort + - repo: https://github.com/psf/black + rev: 22.10.0 + hooks: + - id: black + - repo: https://github.com/PyCQA/flake8 + rev: 5.0.4 + hooks: + - id: flake8 diff --git a/README.md b/README.md index d96f845..6ff38a3 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,9 @@ Using queries to the ESGF API to generate urls and keyword arguments for receipe ## Parsing a list of instance ids using wildcards -Pangeo forge recipes require the user to provide exact instance_id's for the datasets they want to be processed. Discovering these with the [web search](https://esgf-node.llnl.gov/search/cmip6/) can become cumbersome, especially when dealing with a large number of members/models etc. +Pangeo forge recipes require the user to provide exact instance_id's for the datasets they want to be processed. Discovering these with the [web search](https://esgf-node.llnl.gov/search/cmip6/) can become cumbersome, especially when dealing with a large number of members/models etc. -`pangeo-forge-esgf` provides some functions to query the ESGF API based on instance_id values with wildcards. +`pangeo-forge-esgf` provides some functions to query the ESGF API based on instance_id values with wildcards. For example if you want to find all the zonal (`uo`) and meridonal (`vo`) velocities available for the `lgm` experiment of PMIP, you can do: @@ -35,4 +35,36 @@ and you will get: 'CMIP6.PMIP.MPI-M.MPI-ESM1-2-LR.lgm.r1i1p1f1.Omon.vo.gn.v20190710'] ``` -Eventually I hope I can leverage this functionality to handle user requests in PRs that add wildcard instance_ids, but for now this might be helpful to manually construct lists of instance_ids to submit to a pangeo-forge feedstock. \ No newline at end of file +Eventually I hope I can leverage this functionality to handle user requests in PRs that add wildcard instance_ids, but for now this might be helpful to manually construct lists of instance_ids to submit to a pangeo-forge feedstock. + +## Use different ESGF project + +You can also use different ESGF projects and facets to search, e.g., for a search in the ESGF CORDEX datasets, use: +```python +parse_iids = [ + "CORDEX.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.*.r1i1p1.REMO2009.v1.mon.tas", + "CORDEX.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.*.r1i1p1.REMO2009.v1.mon.tas" +] +iids = [] +``` + +using a different url: +```python +url = "https://esgf-data.dkrz.de/esg-search/search" + +for piid in parse_iids: + iids.extend(parse_instance_ids(piid, url=url)) +iids +``` + +results in +``` +['cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.historical.r1i1p1.REMO2009.v1.mon.tas.v20160419', + 'cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp26.r1i1p1.REMO2009.v1.mon.tas.v20160525', + 'cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp85.r1i1p1.REMO2009.v1.mon.tas.v20160525', + 'cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp45.r1i1p1.REMO2009.v1.mon.tas.v20160525', + 'cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.rcp26.r1i1p1.REMO2009.v1.mon.tas.v20150609', + 'cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.rcp85.r1i1p1.REMO2009.v1.mon.tas.v20150609', + 'cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.rcp45.r1i1p1.REMO2009.v1.mon.tas.v20150609', + 'cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.historical.r1i1p1.REMO2009.v1.mon.tas.v20150609'] +``` diff --git a/environment.yaml b/environment.yaml index 9770197..555961d 100644 --- a/environment.yaml +++ b/environment.yaml @@ -1,6 +1,7 @@ -name: pangeo-forge-esgf +name: pangeo-forge-esgf-dev channels: - conda-forge -dependencies: - - python - - aiohttp \ No newline at end of file +dependencies: +- pip +- pip: + - -r requirements.txt diff --git a/pangeo_forge_esgf/dynamic_kwargs.py b/pangeo_forge_esgf/dynamic_kwargs.py index f0f9a0a..ced6993 100644 --- a/pangeo_forge_esgf/dynamic_kwargs.py +++ b/pangeo_forge_esgf/dynamic_kwargs.py @@ -1,6 +1,7 @@ -from typing import Dict, Union, List, Tuple +import ssl +from typing import Dict, List, Tuple + import aiohttp -from .utils import facets_from_iid # For certain table_ids it is preferrable to have time chunks that are a multiple of e.g. 1 year for monthly data. monthly_divisors = sorted( @@ -12,15 +13,16 @@ ) allowed_divisors = { - "Omon": monthly_divisors, - "SImon": monthly_divisors, - "Amon": monthly_divisors, + "mon": monthly_divisors, + # "Omon": monthly_divisors, + # "SImon": monthly_divisors, + # "Amon": monthly_divisors, } # Add table_ids and allowed divisors as needed -def get_timesteps_simple(dates, table_id): +def get_timesteps_simple(dates, frequency): assert ( - "mon" in table_id + "mon" in frequency ) # this needs some more careful treatment for other timefrequencies. timesteps = [ (int(d[1][0:4]) - int(d[0][0:4])) * 12 + (int(d[1][4:6]) - int(d[0][4:6]) + 1) @@ -96,13 +98,21 @@ async def response_data_processing( session: aiohttp.ClientSession, response_data: List[Dict[str, str]], iid: str, + ssl: ssl.SSLContext = None, ) -> Tuple[List[str], Dict[str, Dict[str, str]]]: - table_id = facets_from_iid(iid).get("table_id") + # table_id = facets_from_iid(iid).get("table_id") urls = [r["url"] for r in response_data] sizes = [r["size"] for r in response_data] titles = [r["title"] for r in response_data] + # determine frequency + frequencies = [r["frequency"][0] for r in response_data] + # check if not all files have the same frequency, that should never happen. + if not all(f == frequencies[0] for f in frequencies): + raise ValueError("Determined frequencies are not all equal.") + frequency = frequencies[0] + print(f"Found {len(urls)} urls") print(list(urls)) @@ -110,7 +120,7 @@ async def response_data_processing( # print(urls) print(f"{iid}: Check for netcdf 3 files") pattern_kwargs = {} - netcdf3_check = await is_netcdf3(session, urls[-1]) + netcdf3_check = await is_netcdf3(session, urls[-1], ssl) # netcdf3_check = is_netcdf3(urls[-1]) #TODO This works, but this is the part that is slow as hell, so I should async this one... if netcdf3_check: pattern_kwargs["file_type"] = "netcdf3" @@ -119,22 +129,22 @@ async def response_data_processing( # TODO: Is there a more robust way to do this? # otherwise maybe use `id` (harder to parse) dates = [t.replace(".nc", "").split("_")[-1].split("-") for t in titles] - - timesteps = get_timesteps_simple(dates, table_id) + print(dates) + timesteps = get_timesteps_simple(dates, frequency) print(f"Dates for each file: {dates}") print(f"Size per file in MB: {[f/1e6 for f in sizes]}") print(f"Inferred timesteps per file: {timesteps}") element_sizes = [size / n_t for size, n_t in zip(sizes, timesteps)] - ### Determine kwargs + # Determine kwargs # MAX_SUBSET_SIZE=1e9 # This is an option if the revised subsetting still runs into errors. MAX_SUBSET_SIZE = 500e6 DESIRED_CHUNKSIZE = 200e6 # TODO: We need a completely new logic branch which checks if the total size (sum(filesizes)) is smaller than a desired chunk target_chunks = { "time": choose_chunksize( - allowed_divisors[table_id], + allowed_divisors[frequency], DESIRED_CHUNKSIZE, element_sizes, timesteps, @@ -146,7 +156,7 @@ async def response_data_processing( if max(sizes) <= MAX_SUBSET_SIZE: subset_input = 0 else: - ## Determine subset_input parameters given the following constraints + # Determine subset_input parameters given the following constraints # - Needs to keep the subset size below MAX_SUBSET_SIZE # - (Not currently implemented) Resulting subsets should be evenly dividable by target_chunks (except for the last file, that can be odd). This might ultimately not be required once we figure out the locking issues. I cannot fulfill this right now with the dataset structure where often the first and last files have different number of timesteps than the 'middle' ones. @@ -168,16 +178,18 @@ async def response_data_processing( return urls, kwargs -async def is_netcdf3(session: aiohttp.ClientSession, url: str) -> bool: +async def is_netcdf3( + session: aiohttp.ClientSession, url: str, ssl: ssl.SSLContext = None +) -> bool: """Simple check to determine the netcdf file version behind a url. Requires the server to support range requests""" headers = {"Range": "bytes=0-2"} # TODO: how should i handle it if these are failing? # TODO: need to implement a retry here too # TODO: I believe these are independent of the search nodes? So we should not retry these with another node? I might need to look into what 'replicas' mean in this context. - async with session.get(url, headers=headers) as resp: + async with session.get(url, headers=headers, ssl=ssl) as resp: status_code = resp.status if not status_code == 206: raise RuntimeError(f"Range request failed with {status_code} for {url}") head = await resp.read() - return "CDF" in str(head) \ No newline at end of file + return "CDF" in str(head) diff --git a/pangeo_forge_esgf/params.py b/pangeo_forge_esgf/params.py new file mode 100644 index 0000000..68dc176 --- /dev/null +++ b/pangeo_forge_esgf/params.py @@ -0,0 +1,54 @@ +# example input4MIPs: input4MIPs.CMIP6.AerChemMIP.UoM.UoM-AIM-ssp370-lowNTCF-1-2-1.atmos.mon.mole_fraction_of_carbon_dioxide_in_air.gr1-GMNHSH.v20181127 +known_projects = [ + "CMIP6", + "CMIP5", + "obs4MIPs", + "input4MIPs", + "CORDEX", # Usual Cordex datasets from CMIP5 downscaling + "CORDEX-Reklies", # This is Downscaling from Reklies project + "CORDEX-Adjust", # Bias adjusted output + "CORDEX-ESD", # Statistical downscaling +] + + +# dataset id templates +cmip6_template = "mip_era.activity_id.institution_id.source_id.experiment_id.variant_label.table_id.variable_id.grid_label.version" +cordex_template = "project.product.domain.institute.driving_model.experiment.ensemble.rcm_name.rcm_version.time_frequency.variable.version" +cordex_adjust_template = "project.product.domain.institute.driving_model.experiment.ensemble.rcm_name.bias_adjustment.time_frequency.variable.version" + +# request params +base_params = { + # "type": "File", + "format": "application/solr+json", + # "fields": "instance_id", + "fields": "url,size,table_id,title,instance_id,replica,data_node,frequency,time_frequency", + "latest": "true", + "distrib": "true", + "limit": 500, +} + +cmip6_params = base_params | {"retracted": "false"} +cordex_params = base_params | {} + +# this needs refactoring when you request the dataset_id template from ESGF servers +id_templates = { + "CMIP6": cmip6_template, + "CORDEX": cordex_template, + "CORDEX-Reklies": cordex_template, + "CORDEX-Adjust": cordex_adjust_template, + "CORDEX-ESD": cordex_template, +} + +request_params = { + "CMIP6": cmip6_params, + "CORDEX": cordex_params, + "CORDEX-Reklies": cordex_params, + "CORDEX-Adjust": cordex_params, + "CORDEX-ESD": cordex_params, +} + + +# There is another problem with CORDEX-Reklies, e.g. +# "cordex-reklies.output.EUR-11.GERICS.MIROC-MIROC5.historical.r1i1p1.REMO2015.v1.mon.tas" +# The product="output" facet will give no result although the dataset_id clearly says it's "output". +# However the API result is empty list, so the output facet has to be removed when CORDEX-Reklies is searched, hmmm... diff --git a/pangeo_forge_esgf/parsing.py b/pangeo_forge_esgf/parsing.py index ce095cb..92ac833 100644 --- a/pangeo_forge_esgf/parsing.py +++ b/pangeo_forge_esgf/parsing.py @@ -1,18 +1,14 @@ import requests -import json -from .utils import facets_from_iid -def request_from_facets(url, **facets): - params = { - "type": "Dataset", - "retracted": "false", - "format": "application/solr+json", - "fields": "instance_id", - "latest": "true", - "distrib": "true", - "limit": 500, - } +from .params import request_params +from .utils import ensure_project_str, facets_from_iid + + +def request_from_facets(url, project, **facets): + params = request_params[project].copy() params.update(facets) + params["project"] = project + params["type"] = "Dataset" return requests.get(url=url, params=params) @@ -22,24 +18,34 @@ def instance_ids_from_request(json_dict): return uniqe_iids -def parse_instance_ids(iid: str) -> list[str]: +def parse_instance_ids(iid: str, url: str = None, project: str = None) -> list[str]: """Parse an instance id with wildcards""" - facets = facets_from_iid(iid) - #convert string to list if square brackets are found - for k,v in facets.items(): - if '[' in v: - v = v.replace('[', '').replace(']', '').replace('\'', '').replace(' ','').split(',') + # TODO: I should make the node url a keyword argument. For now this works well enough + if url is None: + url = "https://esgf-node.llnl.gov/esg-search/search" + # url = "https://esgf-data.dkrz.de/esg-search/search" + if project is None: + # take project id from first iid entry by default + project = ensure_project_str(iid.split(".")[0]) + facets = facets_from_iid(iid, project) + # convert string to list if square brackets are found + for k, v in facets.items(): + if "[" in v: + v = ( + v.replace("[", "") + .replace("]", "") + .replace("'", "") + .replace(" ", "") + .split(",") + ) facets[k] = v - facets_filtered = {k: v for k, v in facets.items() if v != "*"} - - #TODO: I should make the node url a keyword argument. For now this works well enough - url="https://esgf-node.llnl.gov/esg-search/search" - # url = "https://esgf-data.dkrz.de/esg-search/search" + facets_filtered = {k: v for k, v in facets.items() if v != "*" and k != "project"} + # print(facets_filtered) # TODO: how do I iterate over this more efficiently? Maybe we do not want to allow more than x files parsed? - resp = request_from_facets(url, **facets_filtered) + resp = request_from_facets(url, project, **facets_filtered) if resp.status_code != 200: print(f"Request [{resp.url}] failed with {resp.status_code}") return resp else: json_dict = resp.json() - return instance_ids_from_request(json_dict) \ No newline at end of file + return instance_ids_from_request(json_dict) diff --git a/pangeo_forge_esgf/recipe_inputs.py b/pangeo_forge_esgf/recipe_inputs.py index ed7a8c0..ebb1576 100644 --- a/pangeo_forge_esgf/recipe_inputs.py +++ b/pangeo_forge_esgf/recipe_inputs.py @@ -1,12 +1,14 @@ -from typing import Dict, Union, List, Tuple -import aiohttp import asyncio -import time -from .dynamic_kwargs import response_data_processing -from .utils import facets_from_iid +import ssl +from typing import Dict, List, Union + +import aiohttp +from .dynamic_kwargs import response_data_processing +from .params import request_params +from .utils import facets_from_iid, project_from_iid -## global variables +# global variables search_node_list = [ "https://esgf-node.llnl.gov/esg-search/search", "https://esgf-data.dkrz.de/esg-search/search", @@ -18,7 +20,7 @@ # For now just use llnl search_node = search_node_list[0] -## Data nodes in preferred order (from naomis code here: https://github.com/naomi-henderson/cmip6collect2/blob/main/myconfig.py) +# Data nodes in preferred order (from naomis code here: https://github.com/naomi-henderson/cmip6collect2/blob/main/myconfig.py) data_nodes = [ "esgf-data1.llnl.gov", "esgf-data2.llnl.gov", @@ -48,6 +50,8 @@ "esgf.ichec.ie", "esgf.nci.org.au", "esgf.rcec.sinica.edu.tw", + "esgf1.dkrz.de", + "esgf2.dkrz.de", "esgf3.dkrz.de", "noresg.nird.sigma2.no", "polaris.pknu.ac.kr", @@ -56,7 +60,7 @@ async def generate_recipe_inputs_from_iids( - iid_list: List[str], + iid_list: List[str], ssl: ssl.SSLContext = None ) -> Dict[str, Union[List[str], Dict[str, str]]]: """_summary_ @@ -80,8 +84,12 @@ async def generate_recipe_inputs_from_iids( tasks = [] for iid in iid_list: + project = project_from_iid(iid) + params = request_params[project] tasks.append( - asyncio.ensure_future(iid_request(session, iid, search_node)) + asyncio.ensure_future( + iid_request(session, iid, search_node, params, ssl) + ) ) raw_input = await asyncio.gather(*tasks) @@ -99,20 +107,23 @@ async def generate_recipe_inputs_from_iids( async def iid_request( - session: aiohttp.ClientSession, iid: str, node: List[str], params: Dict = {} + session: aiohttp.ClientSession, + iid: str, + node: List[str], + params: Dict = {}, + ssl: ssl.SSLContext = None, ): urls = None kwargs = None print(f"Requesting data for Node: {node} and {iid}...") response_data = await _esgf_api_request(session, node, iid, params) - print(f"Filtering response data for {iid}...") filtered_response_data = await sort_and_filter_response(response_data, session) print(f"Determining dynamics kwargs for {iid}...") urls, kwargs = await response_data_processing( - session, filtered_response_data, iid + session, filtered_response_data, iid, ssl ) return urls, kwargs @@ -122,18 +133,8 @@ async def _esgf_api_request( session: aiohttp.ClientSession, node: str, iid: str, params: Dict[str, str] ) -> Dict[str, str]: - # set default search parameters - default_params = { - "type": "File", - "retracted": "false", - "format": "application/solr+json", - "fields": "url,size,table_id,title,instance_id,replica,data_node", - "latest": "true", - "distrib": "true", - "limit": 500, # This determines the number of urls/files that are returned. I dont expect this to be ever more than 500? - } + params["type"] = "File" - params = default_params | params facets = facets_from_iid(iid) # if we use latest in the params we cannot use version # TODO: We might want to be specific about the version here and use latest in the 'parsing' logic only. Needs discussion. @@ -152,17 +153,25 @@ async def _esgf_api_request( content_type="text/json" ) # https://stackoverflow.com/questions/48840378/python-attempt-to-decode-json-with-unexpected-mimetype resp_data = resp_data["response"]["docs"] + if len(resp_data) == 0: raise ValueError(f"No Files were found for {iid}") + + # Since we have hacked CORDEX special case above, i'll do it here again: + # rename to common CMIP vocabulary if neccessary + resp_data = [ + {"frequency" if k == "time_frequency" else k: v for k, v in r.items()} + for r in resp_data + ] return resp_data async def check_url(url, session): - try: + try: async with session.head(url, timeout=5) as resp: return resp.status except asyncio.exceptions.TimeoutError: - return 503 #TODO: Is this best practice? + return 503 # TODO: Is this best practice? async def sort_and_filter_response( @@ -225,24 +234,25 @@ async def pick_data_node( ) -> Dict[str, Dict[str, str]]: """Filters out non-responsive data nodes, and then selects the preferred data node from available ones""" test_response_list = response_groups.get(list(response_groups.keys())[0]) - - ## Determine preferred data node + # Determine preferred data node for data_node in data_nodes: - print(f'DEBUG: Testing data node: {data_node}') - matching_data_nodes = [r for r in test_response_list if r['data_node']==data_node] - if len(matching_data_nodes)==1: - matching_data_node = matching_data_nodes[0] # TODO: this is kinda clunky - status = await check_url(matching_data_node['url'], session) - if status in [200, 308]: + print(f"DEBUG: Testing data node: {data_node}") + matching_data_nodes = [ + r for r in test_response_list if r["data_node"] == data_node + ] + if len(matching_data_nodes) == 1: + matching_data_node = matching_data_nodes[0] # TODO: this is kinda clunky + status = await check_url(matching_data_node["url"], session) + if status in [200, 302, 308]: picked_data_node = data_node print(f"DEBUG: Picking preferred data_node: {picked_data_node}") break else: print(f"Got status {status} for {matching_data_node['instance_id']}") elif len(matching_data_nodes) == 0: - print(f'DEBUG: Data node: {data_node} not available') + print(f"DEBUG: Data node: {data_node} not available") else: - raise # this should never happen + raise # this should never happen # loop through all groups and filter for the picked data node modified_response_groups = {} diff --git a/pangeo_forge_esgf/tests/test_parsing.py b/pangeo_forge_esgf/tests/test_parsing.py new file mode 100644 index 0000000..c5fd2c2 --- /dev/null +++ b/pangeo_forge_esgf/tests/test_parsing.py @@ -0,0 +1,69 @@ +from pangeo_forge_esgf.parsing import parse_instance_ids + + +def test_readme_example(): + # This is possibly flaky (due to the dependence on the ESGF API) + parse_iids = [ + "CMIP6.PMIP.*.*.lgm.*.*.uo.*.*", + "CMIP6.PMIP.*.*.lgm.*.*.vo.*.*", + ] + iids = [] + for piid in parse_iids: + iids.extend(parse_instance_ids(piid)) + iids + + # I expect at least these iids to be parsed + # (there might be new ones that are added at a later point) + expected_iids = [ + "CMIP6.PMIP.MIROC.MIROC-ES2L.lgm.r1i1p1f2.Omon.uo.gn.v20191002", + "CMIP6.PMIP.AWI.AWI-ESM-1-1-LR.lgm.r1i1p1f1.Odec.uo.gn.v20200212", + "CMIP6.PMIP.AWI.AWI-ESM-1-1-LR.lgm.r1i1p1f1.Omon.uo.gn.v20200212", + "CMIP6.PMIP.MIROC.MIROC-ES2L.lgm.r1i1p1f2.Omon.uo.gr1.v20200911", + "CMIP6.PMIP.MPI-M.MPI-ESM1-2-LR.lgm.r1i1p1f1.Omon.uo.gn.v20200909", + "CMIP6.PMIP.AWI.AWI-ESM-1-1-LR.lgm.r1i1p1f1.Omon.vo.gn.v20200212", + "CMIP6.PMIP.MIROC.MIROC-ES2L.lgm.r1i1p1f2.Omon.vo.gn.v20191002", + "CMIP6.PMIP.AWI.AWI-ESM-1-1-LR.lgm.r1i1p1f1.Odec.vo.gn.v20200212", + "CMIP6.PMIP.MIROC.MIROC-ES2L.lgm.r1i1p1f2.Omon.vo.gr1.v20200911", + "CMIP6.PMIP.MPI-M.MPI-ESM1-2-LR.lgm.r1i1p1f1.Omon.vo.gn.v20190710", + ] + + for iid in expected_iids: + assert iid in iids + + +def test_cordex_projects(): + parse_iids = [ + "cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.*.r1i1p1.REMO2009.v1.mon.tas", + "cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.*.r1i1p1.REMO2009.v1.mon.tas", + "cordex-reklies.*.EUR-11.GERICS.*.historical.r1i1p1.REMO2015.v1.mon.tas", + "cordex-adjust.*.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.*.r1i1p1.REMO2009.*.mon.tasAdjust", + "cordex-esd.*.EUR-11.*.MPI-M-MPI-ESM-LR.*.r1i1p1.*.*.mon.tas", + ] + iids = [] + for piid in parse_iids: + iids.extend(parse_instance_ids(piid)) + + expected_iids = [ + "cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp26.r1i1p1.REMO2009.v1.mon.tas.v20160525", + "cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.historical.r1i1p1.REMO2009.v1.mon.tas.v20160419", + "cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp45.r1i1p1.REMO2009.v1.mon.tas.v20160525", + "cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp85.r1i1p1.REMO2009.v1.mon.tas.v20160525", + "cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.rcp26.r1i1p1.REMO2009.v1.mon.tas.v20150609", + "cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.rcp45.r1i1p1.REMO2009.v1.mon.tas.v20150609", + "cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.rcp85.r1i1p1.REMO2009.v1.mon.tas.v20150609", + "cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.historical.r1i1p1.REMO2009.v1.mon.tas.v20150609", + "cordex-reklies.output.EUR-11.GERICS.MOHC-HadGEM2-ES.historical.r1i1p1.REMO2015.v1.mon.tas.v20170412", + "cordex-reklies.output.EUR-11.GERICS.MIROC-MIROC5.historical.r1i1p1.REMO2015.v1.mon.tas.v20170329", + "cordex-reklies.output.EUR-11.GERICS.CCCma-CanESM2.historical.r1i1p1.REMO2015.v1.mon.tas.v20170329", + "cordex-reklies.output.EUR-11.GERICS.CNRM-CERFACS-CNRM-CM5.historical.r1i1p1.REMO2015.v1.mon.tas.v20170208", + "cordex-adjust.bias-adjusted-output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp45.r1i1p1.REMO2009.v1-SMHI-DBS45-MESAN-1989-2010.mon.tasAdjust.v20160919", + "cordex-adjust.bias-adjusted-output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp26.r1i1p1.REMO2009.v1-SMHI-DBS45-MESAN-1989-2010.mon.tasAdjust.v20160919", + "cordex-adjust.bias-adjusted-output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp85.r1i1p1.REMO2009.v1-SMHI-DBS45-MESAN-1989-2010.mon.tasAdjust.v20160919", + "cordex-esd.output.EUR-11.DWD.MPI-M-MPI-ESM-LR.rcp45.r1i1p1.EPISODES2018.v1-r1.mon.tas.v20180409", + "cordex-esd.output.EUR-11.DWD.MPI-M-MPI-ESM-LR.historical.r1i1p1.EPISODES2018.v1-r1.mon.tas.v20180409", + "cordex-esd.output.EUR-11.DWD.MPI-M-MPI-ESM-LR.rcp85.r1i1p1.EPISODES2018.v1-r1.mon.tas.v20180409", + "cordex-esd.output.EUR-11.DWD.MPI-M-MPI-ESM-LR.rcp26.r1i1p1.EPISODES2018.v1-r1.mon.tas.v20180409", + ] + + for iid in expected_iids: + assert iid in iids diff --git a/pangeo_forge_esgf/tests/test_utils.py b/pangeo_forge_esgf/tests/test_utils.py new file mode 100644 index 0000000..4f8b2ef --- /dev/null +++ b/pangeo_forge_esgf/tests/test_utils.py @@ -0,0 +1,18 @@ +from pangeo_forge_esgf.utils import ensure_project_str + + +def test_ensure_project_str(): + # There is a mess of different project_ids in CORDEX ESGF. + assert ensure_project_str("cordex") == "CORDEX" + + unformatted = ["cordex-reklies", "CORDEX-REKLIES"] + for p in unformatted: + assert ensure_project_str(p) == "CORDEX-Reklies" + + unformatted = ["cordex-esd", "CORDEX-esd"] + for p in unformatted: + assert ensure_project_str(p) == "CORDEX-ESD" + + unformatted = ["CORDEX-ADJUST", "cordex-adjust"] + for p in unformatted: + assert ensure_project_str(p) == "CORDEX-Adjust" diff --git a/pangeo_forge_esgf/utils.py b/pangeo_forge_esgf/utils.py index 7f0a5d3..414700d 100644 --- a/pangeo_forge_esgf/utils.py +++ b/pangeo_forge_esgf/utils.py @@ -1,9 +1,69 @@ +import re from typing import Dict -def facets_from_iid(iid: str) -> Dict[str, str]: +import requests + +from .params import id_templates, known_projects + + +def ensure_project_str(project: str) -> str: + """Ensure that the project string has right format + + This is mainly neccessary for CORDEX projects because the + project facet in the dataset_id is lowercase while in the API + search we have to use uppercase or a mixture of upper and lowercase. + + """ + for p in known_projects: + if project.upper() == p.upper(): + return p + return project + + +def project_from_iid(iid: str) -> str: + """Get project information from first iid entry""" + return ensure_project_str(iid.split(".")[0]) + + +def facets_from_iid(iid: str, project: str = None) -> Dict[str, str]: """Translates iid string to facet dict according to CMIP6 naming scheme""" - iid_name_template = "mip_era.activity_id.institution_id.source_id.experiment_id.variant_label.table_id.variable_id.grid_label.version" + if project is None: + # take project id from first iid entry by default + project = project_from_iid(iid) + iid = f"{project}." + ".".join(iid.split(".")[1:]) + iid_name_template = id_templates[project] + # this does not work yet with CORDEX project + # template = get_dataset_id_template(project) + # facet_names = facets_from_template(template) facets = {} for name, value in zip(iid_name_template.split("."), iid.split(".")): facets[name] = value - return facets \ No newline at end of file + if project == "CORDEX-Reklies": + # see comment in params module + del facets["product"] + return facets + + +def get_dataset_id_template(project: str, url: str = None): + """Requests the dataset_id string template for an ESGF project""" + if url is None: + url = "https://esgf-node.llnl.gov/esg-search/search" + params = { + "project": project, + "fields": "project,dataset_id_template_", + "limit": 1, + "format": "application/solr+json", + } + r = requests.get(url, params) + return r.json()["response"]["docs"][0]["dataset_id_template_"][0] + + +def facets_from_template(template: str): + """Parse the (dataset_id) string template into a list of (facet) keys""" + regex = r"\((.*?)\)" + return re.findall(regex, template) + + +def request_project_facets(project: str, url: str = None): + template = get_dataset_id_template(project, url) + return facets_from_template(template) diff --git a/pyproject.toml b/pyproject.toml index 007f40a..f31c205 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,3 +6,11 @@ build-backend = "setuptools.build_meta" [tool.setuptools_scm] write_to = "pangeo_forge_esgf/_version.py" write_to_template = "__version__ = '{version}'" + +[tools.isort] +profile = "black" +skip_gitignore = true +force_to_top = true +default_section = "THIRDPARTY" +known_first_party = "pangeo-forge-esgf" +skip= ["doc/conf.py"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5bdc0d9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +aiohttp +requests +pytest diff --git a/setup.cfg b/setup.cfg index 9832028..fe220ec 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,7 +12,6 @@ classifiers = Intended Audience :: Science/Research Programming Language :: Python Programming Language :: Python :: 3 - Programming Language :: Python :: 3.8 Programming Language :: Python :: 3.9 Programming Language :: Python :: 3.10 Topic :: Scientific/Engineering @@ -23,8 +22,9 @@ keywords = pangeo, data, esgf [options] zip_safe = False -python_requires = >=3.8 +python_requires = >=3.9 packages = find: include_package_data = True install_requires = aiohttp + requests diff --git a/setup.py b/setup.py index fc1f76c..6068493 100644 --- a/setup.py +++ b/setup.py @@ -1,3 +1,3 @@ from setuptools import setup -setup() \ No newline at end of file +setup() diff --git a/test_cordex.py b/test_cordex.py new file mode 100644 index 0000000..d72a2b1 --- /dev/null +++ b/test_cordex.py @@ -0,0 +1,27 @@ +import asyncio +import ssl + +from pyesgf.logon import LogonManager + +from pangeo_forge_esgf import generate_recipe_inputs_from_iids + +# logon +manager = LogonManager() +if not manager.is_logged_on(): + myproxy_host = "esgf-data.dkrz.de" + manager.logon(hostname=myproxy_host, interactive=True, bootstrap=True) + +# create SSL context +sslcontext = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH) +sslcontext.load_verify_locations(capath=manager.esgf_certs_dir) +sslcontext.load_cert_chain(manager.esgf_credentials) + +iids = [ + "cordex.output.EUR-11.GERICS.ECMWF-ERAINT.evaluation.r1i1p1.REMO2015.v1.mon.tas.v20180813", + "cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.historical.r1i1p1.REMO2009.v1.mon.tas.v20150609", + "cordex-reklies.output.EUR-11.GERICS.MIROC-MIROC5.historical.r1i1p1.REMO2015.v1.mon.tas.v20170329", +] + +recipe_inputs = asyncio.run(generate_recipe_inputs_from_iids(iids, ssl=sslcontext)) +print("DONE") +print(recipe_inputs) diff --git a/test_recipe.py b/test_recipe.py new file mode 100644 index 0000000..bd6b8ec --- /dev/null +++ b/test_recipe.py @@ -0,0 +1,78 @@ +# Test recipe creation and recipe run locally for CORDEX datasets +# +# This only works with pange_forge_recipes <= 0.9.1 due to +# https://github.com/pangeo-forge/pangeo-forge-recipes/issues/418 +# + +import asyncio +import ssl + +import xarray as xr +from pangeo_forge_recipes.patterns import pattern_from_file_sequence +from pangeo_forge_recipes.recipes import XarrayZarrRecipe, setup_logging +from pyesgf.logon import LogonManager + +from pangeo_forge_esgf import generate_recipe_inputs_from_iids + + +def create_recipes(iids, ssl=None): + + recipe_inputs = asyncio.run(generate_recipe_inputs_from_iids(iids, ssl=ssl)) + + recipes = {} + + for iid, recipe_input in recipe_inputs.items(): + urls = recipe_input.get("urls", None) + pattern_kwargs = recipe_input.get("pattern_kwargs", {}) + # add ssl keyword to fsspec + pattern_kwargs["fsspec_open_kwargs"] = {"ssl": sslcontext} + recipe_kwargs = recipe_input.get("recipe_kwargs", {}) + pattern = pattern_from_file_sequence(urls, "time", **pattern_kwargs) + if urls is not None: + recipes[iid] = XarrayZarrRecipe( + pattern, xarray_concat_kwargs={"join": "exact"}, **recipe_kwargs + ) + print("+++Failed iids+++") + print(list(set(iids) - set(recipes.keys()))) + print("+++Successful iids+++") + print(list(recipes.keys())) + + return recipes + + +def logon(): + manager = LogonManager() + if not manager.is_logged_on(): + myproxy_host = "esgf-data.dkrz.de" + manager.logon(hostname=myproxy_host, interactive=True, bootstrap=True) + + # create SSL context + sslcontext = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH) + sslcontext.load_verify_locations(capath=manager.esgf_certs_dir) + sslcontext.load_cert_chain(manager.esgf_credentials) + return sslcontext + + +iids = [ + "cordex-reklies.output.EUR-11.GERICS.MIROC-MIROC5.historical.r1i1p1.REMO2015.v1.mon.tas", + "cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.historical.r1i1p1.REMO2009.v1.mon.tas", + "cordex-adjust.bias-adjusted-output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp45.r1i1p1.REMO2009.v1-SMHI-DBS45-MESAN-1989-2010.mon.tasAdjust.v20160919", + "cordex-esd.output.EUR-11.DWD.MPI-M-MPI-ESM-LR.rcp45.r1i1p1.EPISODES2018.v1-r1.mon.tas.v20180409", +] + +sslcontext = logon() + +recipes = create_recipes(iids, sslcontext) + +# now do the tutorial +setup_logging() + +# Prune the recipe +recipe_pruned = recipes[iids[3]].copy_pruned() + +# Run the recipe +run_function = recipe_pruned.to_function() +run_function() + +ds = xr.open_zarr(recipe_pruned.target_mapper, consolidated=True) +print(ds) diff --git a/test_script.py b/test_script.py index 1e2c8ee..5bfdd68 100644 --- a/test_script.py +++ b/test_script.py @@ -1,13 +1,54 @@ +# %% import asyncio + from pangeo_forge_esgf import generate_recipe_inputs_from_iids iids = [ - #PMIP runs requested by @CommonClimate - 'CMIP6.PMIP.MIROC.MIROC-ES2L.past1000.r1i1p1f2.Amon.tas.gn.v20200318', - 'CMIP6.PMIP.MRI.MRI-ESM2-0.past1000.r1i1p1f1.Amon.tas.gn.v20200120', - 'CMIP6.PMIP.MPI-M.MPI-ESM1-2-LR.past2k.r1i1p1f1.Amon.tas.gn.v20210714', + # PMIP runs requested by @CommonClimate + "CMIP6.PMIP.MIROC.MIROC-ES2L.past1000.r1i1p1f2.Amon.tas.gn.v20200318", + "CMIP6.PMIP.MRI.MRI-ESM2-0.past1000.r1i1p1f1.Amon.tas.gn.v20200120", + "CMIP6.PMIP.MPI-M.MPI-ESM1-2-LR.past2k.r1i1p1f1.Amon.tas.gn.v20210714", + "CMIP6.CMIP.NCC.NorESM2-LM.historical.r1i1p1f1.Omon.vmo.gr.v20190815", + "CMIP6.PMIP.MIROC.MIROC-ES2L.past1000.r1i1p1f2.Amon.tas.gn.v20200318", + "CMIP6.PMIP.MRI.MRI-ESM2-0.past1000.r1i1p1f1.Amon.tas.gn.v20200120", + "CMIP6.PMIP.MPI-M.MPI-ESM1-2-LR.past2k.r1i1p1f1.Amon.tas.gn.v20210714", + "CMIP6.CMIP.FIO-QLNM.FIO-ESM-2-0.piControl.r1i1p1f1.Omon.vsf.gn", # this one should not be available. This changes daily. Check the data nodes which are down to find examples. ] - +# recipe_inputs = asyncio.run(generate_recipe_inputs_from_iids(iids)) -print('DONE') -print(recipe_inputs) \ No newline at end of file +print("DONE") +print(recipe_inputs) + +# %% +# How can I check if a file is available? +# url = 'http://esg1.umr-cnrm.fr/thredds/fileServer/CMIP6_CNRM/PMIP/CNRM-CERFACS/CNRM-CM6-1/lig127k/r1i1p1f2/Amon/tauv/gr/v20200212/tauv_Amon_CNRM-CM6-1_lig127k_r1i1p1f2_gr_185001-209912.nc' +# url = 'http://esgf-nimscmip6.apcc21.org/thredds/fileServer/my_cmip6_dataroot/Historical/R2/aa008p-SImon/CMIP6/CMIP/NIMS-KMA/KACE-1-0-G/historical/r2i1p1f1/SImon/siflsensupbot/gr/v20200130/siflsensupbot_SImon_KACE-1-0-G_historical_r2i1p1f1_gr_185001-201412.nc' +# url = 'http://esgf.ichec.ie' +url = "http://cmip.dess.tsinghua.edu.cn" +# import requests +# from requests import ReadTimeout +# try: +# test = requests.head(url, timeout=5) +# print(test.status_code) +# except ReadTimeout: +# print('Caught timeout. Assuming this file is not available right now') + +# async version +# import asyncio +# import aiohttp +# # async def main(): +# # async with aiohttp.ClientSession() as session: +# # async with session.head(url, timeout=2) as resp: +# # print(resp.status) + + +# async def _check_url(url, session): +# async with session.head(url) as resp: +# return resp.status + +# session = aiohttp.ClientSession() +# status2 = await _check_url(url, session) +# print(status2) + +# 302 is ok? +# %%