-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dynamic params and facets #4
Changes from all commits
0348186
f64a938
e1b6155
415005c
ca1ffc2
f2f8513
a81bd26
7b2b027
06397a3
653d637
ad37153
c18eb2c
66737c0
3d00487
fa0d756
dc6f333
904a256
7bd23dd
4bcaec9
81a6f27
d7c9196
45c0234
d9c17ac
60baa7a
a5e7535
146679c
7682ef1
c9b9923
153693c
6f9db6a
a0fd5f0
ae4362d
25b94ef
0221630
f5f9f64
f7a5b60
fb23a35
a9bbc15
174b645
baee2ca
81828ce
a7a7992
1fee7ff
0fae197
53304ba
814d691
b115b23
1dc1942
666a7eb
efbde5f
034f5f7
92d34da
f1b1b55
e628e55
0a11650
161c89f
f51dd7a
b6eb964
1ec6eee
b5ba1fa
16be9ee
91644ff
50def3b
b791c4c
59670f6
46be3cd
d27af7b
c40f55a
8676f5d
dcd3a74
f7efa42
28295da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
[flake8] | ||
exclude = __init__.py,.eggs,doc | ||
ignore = | ||
# whitespace before ':' - doesn't work well with black | ||
E203 | ||
E402 | ||
# line too long - let black worry about that | ||
E501 | ||
# do not assign a lambda expression, use a def | ||
E731 | ||
# line break before binary operator | ||
W503 | ||
E265 | ||
F811 | ||
# Allows type hinting as Gridded[DataArray, "(X:center)"], where we did `from typing import Annotated as Gridded` | ||
F722 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
name: CI | ||
|
||
on: | ||
push: | ||
branches: | ||
- "master" | ||
pull_request: | ||
branches: | ||
- "*" | ||
schedule: | ||
- cron: "0 13 * * 1" | ||
|
||
jobs: | ||
build: | ||
defaults: | ||
run: | ||
shell: bash -l {0} | ||
strategy: | ||
fail-fast: false | ||
matrix: | ||
os: ["ubuntu-latest"] | ||
python-version: ["3.9", "3.10"] | ||
runs-on: ${{ matrix.os }} | ||
steps: | ||
- uses: actions/checkout@v2 | ||
- uses: actions/setup-python@v2 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
- name: Install dependencies | ||
run: python -m pip install -r requirements.txt | ||
- name: Install Package | ||
run: python -m pip install -e . --no-deps | ||
- name: Test with pytest | ||
run: pytest pangeo_forge_esgf/tests |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
default_language_version: | ||
python: python3 | ||
repos: | ||
- repo: https://github.com/pre-commit/pre-commit-hooks | ||
rev: v4.3.0 | ||
hooks: | ||
- id: trailing-whitespace | ||
- id: end-of-file-fixer | ||
- id: check-yaml | ||
- repo: https://github.com/PyCQA/isort | ||
rev: 5.10.1 | ||
hooks: | ||
- id: isort | ||
- repo: https://github.com/psf/black | ||
rev: 22.10.0 | ||
hooks: | ||
- id: black | ||
- repo: https://github.com/PyCQA/flake8 | ||
rev: 5.0.4 | ||
hooks: | ||
- id: flake8 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,9 +3,9 @@ Using queries to the ESGF API to generate urls and keyword arguments for receipe | |
|
||
|
||
## Parsing a list of instance ids using wildcards | ||
Pangeo forge recipes require the user to provide exact instance_id's for the datasets they want to be processed. Discovering these with the [web search](https://esgf-node.llnl.gov/search/cmip6/) can become cumbersome, especially when dealing with a large number of members/models etc. | ||
Pangeo forge recipes require the user to provide exact instance_id's for the datasets they want to be processed. Discovering these with the [web search](https://esgf-node.llnl.gov/search/cmip6/) can become cumbersome, especially when dealing with a large number of members/models etc. | ||
|
||
`pangeo-forge-esgf` provides some functions to query the ESGF API based on instance_id values with wildcards. | ||
`pangeo-forge-esgf` provides some functions to query the ESGF API based on instance_id values with wildcards. | ||
|
||
For example if you want to find all the zonal (`uo`) and meridonal (`vo`) velocities available for the `lgm` experiment of PMIP, you can do: | ||
|
||
|
@@ -35,4 +35,36 @@ and you will get: | |
'CMIP6.PMIP.MPI-M.MPI-ESM1-2-LR.lgm.r1i1p1f1.Omon.vo.gn.v20190710'] | ||
``` | ||
|
||
Eventually I hope I can leverage this functionality to handle user requests in PRs that add wildcard instance_ids, but for now this might be helpful to manually construct lists of instance_ids to submit to a pangeo-forge feedstock. | ||
Eventually I hope I can leverage this functionality to handle user requests in PRs that add wildcard instance_ids, but for now this might be helpful to manually construct lists of instance_ids to submit to a pangeo-forge feedstock. | ||
|
||
## Use different ESGF project | ||
|
||
You can also use different ESGF projects and facets to search, e.g., for a search in the ESGF CORDEX datasets, use: | ||
```python | ||
parse_iids = [ | ||
"CORDEX.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.*.r1i1p1.REMO2009.v1.mon.tas", | ||
"CORDEX.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.*.r1i1p1.REMO2009.v1.mon.tas" | ||
] | ||
iids = [] | ||
``` | ||
|
||
using a different url: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you clarify if this is necessary or if this is just to show that the functionality works on all the ESGF search nodes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that was just to show, that you can choose a different url. CORDEX searches should actually also work for all ESGF nodes. |
||
```python | ||
url = "https://esgf-data.dkrz.de/esg-search/search" | ||
|
||
for piid in parse_iids: | ||
iids.extend(parse_instance_ids(piid, url=url)) | ||
iids | ||
``` | ||
|
||
results in | ||
``` | ||
['cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.historical.r1i1p1.REMO2009.v1.mon.tas.v20160419', | ||
'cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp26.r1i1p1.REMO2009.v1.mon.tas.v20160525', | ||
'cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp85.r1i1p1.REMO2009.v1.mon.tas.v20160525', | ||
'cordex.output.EUR-11.MPI-CSC.MPI-M-MPI-ESM-LR.rcp45.r1i1p1.REMO2009.v1.mon.tas.v20160525', | ||
'cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.rcp26.r1i1p1.REMO2009.v1.mon.tas.v20150609', | ||
'cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.rcp85.r1i1p1.REMO2009.v1.mon.tas.v20150609', | ||
'cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.rcp45.r1i1p1.REMO2009.v1.mon.tas.v20150609', | ||
'cordex.output.EUR-44.MPI-CSC.MPI-M-MPI-ESM-LR.historical.r1i1p1.REMO2009.v1.mon.tas.v20150609'] | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
name: pangeo-forge-esgf | ||
name: pangeo-forge-esgf-dev | ||
channels: | ||
- conda-forge | ||
dependencies: | ||
- python | ||
- aiohttp | ||
dependencies: | ||
- pip | ||
- pip: | ||
- -r requirements.txt |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
from typing import Dict, Union, List, Tuple | ||
import ssl | ||
from typing import Dict, List, Tuple | ||
|
||
import aiohttp | ||
from .utils import facets_from_iid | ||
|
||
# For certain table_ids it is preferrable to have time chunks that are a multiple of e.g. 1 year for monthly data. | ||
monthly_divisors = sorted( | ||
|
@@ -12,15 +13,16 @@ | |
) | ||
|
||
allowed_divisors = { | ||
"Omon": monthly_divisors, | ||
"SImon": monthly_divisors, | ||
"Amon": monthly_divisors, | ||
"mon": monthly_divisors, | ||
# "Omon": monthly_divisors, | ||
# "SImon": monthly_divisors, | ||
# "Amon": monthly_divisors, | ||
} # Add table_ids and allowed divisors as needed | ||
|
||
|
||
def get_timesteps_simple(dates, table_id): | ||
def get_timesteps_simple(dates, frequency): | ||
assert ( | ||
"mon" in table_id | ||
"mon" in frequency | ||
) # this needs some more careful treatment for other timefrequencies. | ||
timesteps = [ | ||
(int(d[1][0:4]) - int(d[0][0:4])) * 12 + (int(d[1][4:6]) - int(d[0][4:6]) + 1) | ||
|
@@ -96,21 +98,29 @@ async def response_data_processing( | |
session: aiohttp.ClientSession, | ||
response_data: List[Dict[str, str]], | ||
iid: str, | ||
ssl: ssl.SSLContext = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now this is great, but adding all these input arguments is a bit of a code smell to me. I think we should aim to set up a 'Connection' or 'Client' object once and then pass this around the different logic that needs to make a request? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed! |
||
) -> Tuple[List[str], Dict[str, Dict[str, str]]]: | ||
|
||
table_id = facets_from_iid(iid).get("table_id") | ||
# table_id = facets_from_iid(iid).get("table_id") | ||
urls = [r["url"] for r in response_data] | ||
sizes = [r["size"] for r in response_data] | ||
titles = [r["title"] for r in response_data] | ||
|
||
# determine frequency | ||
frequencies = [r["frequency"][0] for r in response_data] | ||
# check if not all files have the same frequency, that should never happen. | ||
if not all(f == frequencies[0] for f in frequencies): | ||
raise ValueError("Determined frequencies are not all equal.") | ||
frequency = frequencies[0] | ||
|
||
Comment on lines
+109
to
+115
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added |
||
print(f"Found {len(urls)} urls") | ||
print(list(urls)) | ||
|
||
# Check for netcdf version early so that we can fail quickly | ||
# print(urls) | ||
print(f"{iid}: Check for netcdf 3 files") | ||
pattern_kwargs = {} | ||
netcdf3_check = await is_netcdf3(session, urls[-1]) | ||
netcdf3_check = await is_netcdf3(session, urls[-1], ssl) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. |
||
# netcdf3_check = is_netcdf3(urls[-1]) #TODO This works, but this is the part that is slow as hell, so I should async this one... | ||
if netcdf3_check: | ||
pattern_kwargs["file_type"] = "netcdf3" | ||
|
@@ -119,22 +129,22 @@ async def response_data_processing( | |
# TODO: Is there a more robust way to do this? | ||
# otherwise maybe use `id` (harder to parse) | ||
dates = [t.replace(".nc", "").split("_")[-1].split("-") for t in titles] | ||
|
||
timesteps = get_timesteps_simple(dates, table_id) | ||
print(dates) | ||
timesteps = get_timesteps_simple(dates, frequency) | ||
|
||
print(f"Dates for each file: {dates}") | ||
print(f"Size per file in MB: {[f/1e6 for f in sizes]}") | ||
print(f"Inferred timesteps per file: {timesteps}") | ||
element_sizes = [size / n_t for size, n_t in zip(sizes, timesteps)] | ||
|
||
### Determine kwargs | ||
# Determine kwargs | ||
# MAX_SUBSET_SIZE=1e9 # This is an option if the revised subsetting still runs into errors. | ||
MAX_SUBSET_SIZE = 500e6 | ||
DESIRED_CHUNKSIZE = 200e6 | ||
# TODO: We need a completely new logic branch which checks if the total size (sum(filesizes)) is smaller than a desired chunk | ||
target_chunks = { | ||
"time": choose_chunksize( | ||
allowed_divisors[table_id], | ||
allowed_divisors[frequency], | ||
DESIRED_CHUNKSIZE, | ||
element_sizes, | ||
timesteps, | ||
|
@@ -146,7 +156,7 @@ async def response_data_processing( | |
if max(sizes) <= MAX_SUBSET_SIZE: | ||
subset_input = 0 | ||
else: | ||
## Determine subset_input parameters given the following constraints | ||
# Determine subset_input parameters given the following constraints | ||
# - Needs to keep the subset size below MAX_SUBSET_SIZE | ||
# - (Not currently implemented) Resulting subsets should be evenly dividable by target_chunks (except for the last file, that can be odd). This might ultimately not be required once we figure out the locking issues. I cannot fulfill this right now with the dataset structure where often the first and last files have different number of timesteps than the 'middle' ones. | ||
|
||
|
@@ -168,16 +178,18 @@ async def response_data_processing( | |
return urls, kwargs | ||
|
||
|
||
async def is_netcdf3(session: aiohttp.ClientSession, url: str) -> bool: | ||
async def is_netcdf3( | ||
session: aiohttp.ClientSession, url: str, ssl: ssl.SSLContext = None | ||
) -> bool: | ||
"""Simple check to determine the netcdf file version behind a url. | ||
Requires the server to support range requests""" | ||
headers = {"Range": "bytes=0-2"} | ||
# TODO: how should i handle it if these are failing? | ||
# TODO: need to implement a retry here too | ||
# TODO: I believe these are independent of the search nodes? So we should not retry these with another node? I might need to look into what 'replicas' mean in this context. | ||
async with session.get(url, headers=headers) as resp: | ||
async with session.get(url, headers=headers, ssl=ssl) as resp: | ||
status_code = resp.status | ||
if not status_code == 206: | ||
raise RuntimeError(f"Range request failed with {status_code} for {url}") | ||
head = await resp.read() | ||
return "CDF" in str(head) | ||
return "CDF" in str(head) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# example input4MIPs: input4MIPs.CMIP6.AerChemMIP.UoM.UoM-AIM-ssp370-lowNTCF-1-2-1.atmos.mon.mole_fraction_of_carbon_dioxide_in_air.gr1-GMNHSH.v20181127 | ||
known_projects = [ | ||
"CMIP6", | ||
"CMIP5", | ||
"obs4MIPs", | ||
"input4MIPs", | ||
"CORDEX", # Usual Cordex datasets from CMIP5 downscaling | ||
"CORDEX-Reklies", # This is Downscaling from Reklies project | ||
"CORDEX-Adjust", # Bias adjusted output | ||
"CORDEX-ESD", # Statistical downscaling | ||
] | ||
|
||
|
||
# dataset id templates | ||
cmip6_template = "mip_era.activity_id.institution_id.source_id.experiment_id.variant_label.table_id.variable_id.grid_label.version" | ||
cordex_template = "project.product.domain.institute.driving_model.experiment.ensemble.rcm_name.rcm_version.time_frequency.variable.version" | ||
cordex_adjust_template = "project.product.domain.institute.driving_model.experiment.ensemble.rcm_name.bias_adjustment.time_frequency.variable.version" | ||
|
||
# request params | ||
base_params = { | ||
# "type": "File", | ||
"format": "application/solr+json", | ||
# "fields": "instance_id", | ||
"fields": "url,size,table_id,title,instance_id,replica,data_node,frequency,time_frequency", | ||
"latest": "true", | ||
"distrib": "true", | ||
"limit": 500, | ||
} | ||
|
||
cmip6_params = base_params | {"retracted": "false"} | ||
cordex_params = base_params | {} | ||
|
||
# this needs refactoring when you request the dataset_id template from ESGF servers | ||
id_templates = { | ||
"CMIP6": cmip6_template, | ||
"CORDEX": cordex_template, | ||
"CORDEX-Reklies": cordex_template, | ||
"CORDEX-Adjust": cordex_adjust_template, | ||
"CORDEX-ESD": cordex_template, | ||
} | ||
|
||
request_params = { | ||
"CMIP6": cmip6_params, | ||
"CORDEX": cordex_params, | ||
"CORDEX-Reklies": cordex_params, | ||
"CORDEX-Adjust": cordex_params, | ||
"CORDEX-ESD": cordex_params, | ||
} | ||
|
||
|
||
# There is another problem with CORDEX-Reklies, e.g. | ||
# "cordex-reklies.output.EUR-11.GERICS.MIROC-MIROC5.historical.r1i1p1.REMO2015.v1.mon.tas" | ||
# The product="output" facet will give no result although the dataset_id clearly says it's "output". | ||
# However the API result is empty list, so the output facet has to be removed when CORDEX-Reklies is searched, hmmm... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to get rid of these commits? These have been introduced by merging an earlier commit and are not part of this PR, right?