Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add from_pickle option to FilePattern #205

Closed
wants to merge 6 commits into from

Conversation

cisaacstern
Copy link
Member

@cisaacstern cisaacstern commented Sep 15, 2021

@jbusecke, @dgergel, and I are continuing to explore ways to support real world CMIP6 workflows in Pangeo Forge.

https://github.com/jbusecke/cmip6_derived_cloud_datasets uses Prefect and Coiled to build derived CMIP6 Zarr datasets without Pangeo Forge. The first step of that process is discovery_and_preprocessing, which returns a dictionary of preprocessed xarray.Datasets.

Theoretically, any given dataset in this dictionary should be recreate-able using a combination of FilePattern (perhaps with a lot of conditional logic in the format_function) and the process_input/process_chunk kwargs of XarrayZarrRecipe (again, likely with a lot of conditionality in the functions passed to those kwargs). The degree of conditional logic development required to make this work may well represent too much friction for real world CMIP6 use cases.

Julius thought of a possible shortcut for plugging his dictionary into Pangeo Forge: if the objects in the CMIP6 dataset dictionary were written out to local pickle files, then they might be able to be arbitrarily executed with his Prefect/Coiled infrastructure or, alternatively, instantiated as an XarrayZarrRecipe and executed with Pangeo Forge infrastructure. This turns out to be true (at least for manual execution), as proven by a notebook he screen-shared with me, and requires surprisingly minimal changes to the codebase. This PR (again, h/t Julius for the concept) makes the necessary changes to pangeo-forge-recipes so that Julius can continue his experiments by installing pangeo-forge-recipes from

!pip install git+https://github.com/cisaacstern/pangeo-forge-recipes@from-pickle

This might be part of an eventual solution for #176 and the various CMIP6-related issues linked therein. Here is a pseudo-code example of how this feature would be used in combination with Julius's dataset dictionary:

import pickle
from cmip6_derived_cloud_datasets import discovery_and_preprocessing
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.recipes import XarrayZarrRecipe

dataset_identifiers = { ... }
ds_dict = discovery_and_preprocessing(**dataset_identifiers)

for k, ds in ds_dict:
   with open(f'{k}.pickle', 'wb') as f:
       pickle.dump(ds, f, protocol=-1)

patterns = {
    k: pattern_from_file_sequence(
        [f"{k}.pickle"],
        "time",
        nitems_per_file=len(ds_dict[k]["time"]),
        from_pickle=True,
    ) for k in ds_dict.keys()
}
recipes = {key: XarrayZarrRecipe(pattern) for key, pattern in patterns.items()}

Among other things, security is a consideration if we ever choose to support pickled inputs in production. The feature of pickle which does not appear to be easily replicated with other intermediate caching methods is its ability—in theory, at least?—to encode the entire xarray.Dataset (including combinations of arbitrary numbers of remote sources and delayed Dask operations) in a small, portable file that doesn't require .load()ing. Currently, the tests I've included with this PR don't test that full suite of potential, but we can certainly expand them to confirm.

@rabernat
Copy link
Contributor

I appreciate everyone's work on this. I think that @jbusecke's CMIP6 preprocessing work is an interesting possibility for Pangeo Forge. However, I don't think this is the right approach. I don't think we want to be dumping dask graphs to pickles and then reading those as "inputs". The reason is that this loses the provenance chain between the true inputs (i.e. the CMIP6 files in the cloud) and the datasets that Pangeo Forge will produce. Ideally we would want those input exposed via a FilePattern object, like all the other recipes.

In order to move forward with this, it would be very valuable if @jbusecke could explain what happens inside discovery_and_preprocessing. Yes we can go read the code, but more useful would be to list out conceptually all the steps that are happening. That will help us understand better how to integrate it with Pangeo Forge.

@jbusecke
Copy link
Contributor

I think that is a good idea. I will try to come up with a schematic that also illustrates our thinking here.

@rabernat
Copy link
Contributor

a schematic

Just text would also be fine. I'm thinking something like

  • Query intake-esm to find the desired datasets
  • Combine datasets along dimensions X and Y
  • Run preprocessing function FOO to fix issue BAR

etc.

@jbusecke
Copy link
Contributor

Ah ok that is indeed faster.

So my basic workflow has two stages:

1. Reading, preprocessing, and combining datasets

A very basic level example would be for globally averaged SST.

  • All starts from a single intake-esm collection (which could be a single FilePattern entry to the recipe)
  • I would then query a certain model, a subset of models or all of them (we can talk later if it would be beneficial to have e.g. one recipe per model) for a variable (tos) and possibly other facets'.
  • Then depending on the type of data I want to end up with, I am using all the goodies in cmip6_preprocessing.postprocessing to combine datasets.
    • In this case this might involve parsing metrics like the cell area to the temperature datasets, but this step can get very involved, and crucially relies on having not just one dataset, but others for context!

      Just to provide a concrete example: In my OMZ work, some models only put out regridded fields on oxygen, while temperature is given on the native grid. So I need all of these variables available to use cmip6_pp to interpolate and combine into a single dataset. This is NOT the case for all the models, and as such hardcoding this into a recipe via FilePatterns seems inflexible to me.

  • Finally the user might want to e.g. concatenate members or experiments, and maybe filter some datasets based on a certain criteria (e.g. this certain member of this model does not provide a complete ssp585 run).

Everything up until here incorporates all the heavy lifting with cmip6_pp, but at the end you will end up with a dict (or list) of clean and homogenous datasets that can be independently processed.

  • Now apply something to each of the datasets (they are completely independent of each other at this point). Keeping with the example this would be something extremly simple like:
ds.weighted(ds.areacello).mean(['x','y'])

2. Execute the built up computation for each dataset (in parallel?)

Of course we could execute this with a loop over the dictionary (that is in fact what I used to do up until now), but that would be slow. Instead I was hoping that we can somehow write a recipe that does Step 1, and then is somehow able to 'dispatch' each of these datasets (with all their dask graphs and modification applied) to a recipe similar to XarrayZarr, which can then process/save these in parallel.

@cisaacstern and my idea was initially that this would all be written into a single recipe (which writes pickeled representations to a temp space) and then return a bunch of XarrayZarr recipes, which operate on the temp pickled datasets. I thought this could be sufficient for provenance, because the pickles would correpond to a versioned recipe, but I see the point in your argument above.

I still think that if possible, this CMIP6 2-Step Recipe would be incredibley useful and powerful. Here are some reasons why I am so keen on making it work:

  • Portability: Most users will start (like me) with a notebook and experiment around with the files. The ability to basically just copy paste a everything_dict_wrapper and single_dataset_wrapper into a recipe sounds really appealing to me (and would have little overhead).
  • Speed: I used to just loop and process through datasets because I would work on limited resources on an HPC. With the cloud we have the ability to massively scale this! How awesome would it be if I can create and dispatch a calculation on many hundred cores and only wait hours for it, not days.
  • Flexibility+Error Handling: One of the largest frustrations for me was that above mentioned loop_and_process workflow was extremely error prone. There was always a dataset that would not work and basically stop the whole processe unless I put in tons of error handling statments in each notebook. I think that the tools provided by pangeo-forge (e.g. converting to a prefect flow). Also running models in parallel will enable the user to quickly see which models are failing, and work on debugging this instead of waiting.

@cisaacstern
Copy link
Member Author

@jbusecke thanks for this helpful context.

  1. Execute the built up computation for each dataset (in parallel?)

Pangeo Forge already supports definition of the recipe as a dictionary of recipes (see defintion of dict_object recipes here). IIUC, these dict_objects are already executed in parallel by Pangeo Forge cloud infrastructure, so this aspect of your use case should already be taken care of.

The real work will be to create an intermediate layer (possibly through a plugin) which allows you to experiment with dataset dictionaries in a notebook in way that's familiar and comfortable to you, but which translates that process into FilePatterns and other Pangeo Forge types when you're ready to move to executing your recipe.

@rabernat
Copy link
Contributor

Thanks so much for taking the time to write that all up! I have a much better idea of this use case now.

I think it would be a useful exercise to first recode a simplified version of this workflow from scratch using existing pangeo-forge-recipes features. The existing CMIP6 recipe tutorial would be a good place to start. (It should be no problem to use Zarr rather than netCDF inputs.)

That will reveal a crux issue: Pangeo Forge as currently constituted simply does not allow arbitrary computations to be run on a dask distributed cluster. Instead, we enforce a fairly limited model of parallel execution. At its core, Pangeo Forge requires that datasets be written in parallel in "chunks" (which are not the same as dask chunks; see #182). For our purposes a "chunk" is an individual Xarray dataset that can fit entirely into memory. A chunk can be a combination (concatentation / merge) of multiple inputs or, alternatively, a strided subset of one input. This requires rethinking the whole approach to how to produce a derived dataset.

Fortunately, for this application of global mean SST, we could easily accomplish that via a process_chunk callback:

def global_mean(ds):
    return ds.weighted(ds.areacello).mean(['x', 'y'])

recipe = XarrayZarrRecipe(..., process_chunk=global_mean)

Then it basically comes down to the challenge of defining the appropriate FilePattern object for a particular model. Other preprocessing, such as augmenting the coordinates, could be accomplished either at the process_chunk or process_input level.

One problem that I can't immediately see how to resolve is how to bring in an extra coordinate file, separate from the FilePattern.

@jbusecke
Copy link
Contributor

We might be able to close this in favor of #242?

If that avenue is successful it seems like a more general solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants