Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out dynamic chunking func #152

Merged
merged 8 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 5 additions & 83 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
"""Modified transforms from Pangeo Forge"""

import apache_beam as beam
from typing import Dict
from dask.utils import parse_bytes
from pangeo_forge_esgf import setup_logging
from leap_data_management_utils import CMIPBQInterface, LogCMIPToBigQuery
from leap_data_management_utils.data_management_transforms import Copy, InjectAttrs
from leap_data_management_utils.cmip_transforms import TestDataset, Preprocessor
from leap_data_management_utils.cmip_transforms import (
TestDataset,
Preprocessor,
dynamic_chunking_func,
)
from pangeo_forge_esgf.client import ESGFClient
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
Expand All @@ -19,7 +21,6 @@
)
import logging
import os
import xarray as xr
import yaml
from tqdm.auto import tqdm

Expand Down Expand Up @@ -133,11 +134,9 @@
iids_filtered = list(set(iids) - iids_to_skip)
logger.info(f"Pruned {len(iids) - len(iids_filtered)}/{len(iids)} iids from input list")


if prune_iids:
iids_filtered = iids_filtered[0:20]


# Now that we have the iids that are not yet ingested, we can prune the full iid_info_dict and extract the 'id' field
iid_info_dict_filtered = {k: v for k, v in iid_info_dict.items() if k in iids_filtered}
dataset_ids_filtered = [v["id"] for v in iid_info_dict_filtered.values()]
Expand Down Expand Up @@ -177,83 +176,6 @@ def combine_dicts(dicts):
# Print the actual urls
logger.debug(f"{recipe_dict = }")


## Dynamic Chunking Wrapper
def dynamic_chunking_func(ds: xr.Dataset) -> Dict[str, int]:
import warnings

# trying to import inside the function
from dynamic_chunks.algorithms import (
even_divisor_algo,
iterative_ratio_increase_algo,
NoMatchingChunks,
)

logger.info(f"Input Dataset for dynamic chunking {ds =}")

target_chunk_size = "150MB"
target_chunks_aspect_ratio = {
"time": 10,
"x": 1,
"i": 1,
"ni": 1,
"xh": 1,
"nlon": 1,
"lon": 1, # TODO: Maybe import all the known spatial dimensions from xmip?
"y": 1,
"j": 1,
"nj": 1,
"yh": 1,
"nlat": 1,
"lat": 1,
}
size_tolerance = 0.5

# Some datasets are smaller than the target chunk size and should not be chunked at all
if ds.nbytes < parse_bytes(target_chunk_size):
target_chunks = dict(ds.dims)

else:
try:
target_chunks = even_divisor_algo(
ds,
target_chunk_size,
target_chunks_aspect_ratio,
size_tolerance,
allow_extra_dims=True,
)

except NoMatchingChunks:
warnings.warn(
"Primary algorithm using even divisors along each dimension failed "
"with. Trying secondary algorithm."
f"Input {ds=}"
)
try:
target_chunks = iterative_ratio_increase_algo(
ds,
target_chunk_size,
target_chunks_aspect_ratio,
size_tolerance,
allow_extra_dims=True,
)
except NoMatchingChunks:
raise ValueError(
(
"Could not find any chunk combinations satisfying "
"the size constraint with either algorithm."
f"Input {ds=}"
)
)
# If something fails
except Exception as e:
raise e
except Exception as e:
raise e
logger.info(f"Dynamic Chunking determined {target_chunks =}")
return target_chunks


## Create the recipes
recipes = {}

Expand Down
2 changes: 1 addition & 1 deletion feedstock/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
leap-data-management-utils==0.0.9
leap-data-management-utils==0.0.10
#pangeo-forge-esgf==0.2.0
git+https://github.com/jbusecke/pangeo-forge-esgf.git@new-request-scheme
dynamic-chunks==0.0.3
Expand Down