Skip to content

Commit

Permalink
Merge pull request #167 from cisaacstern/scrub-tokens
Browse files Browse the repository at this point in the history
Make `fsspec_open_kwargs`, `query_string_secrets`, & `is_opendap` attributes of `FilePattern`
  • Loading branch information
rabernat authored Sep 2, 2021
2 parents 690594a + 6946ca1 commit 0352ca0
Show file tree
Hide file tree
Showing 17 changed files with 469 additions and 231 deletions.
33 changes: 31 additions & 2 deletions docs/recipe_user_guide/file_patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,36 @@ and type of combine dimensions they support.
``ConcatDim`` and allows at most one ``MergeDim``.


### Specifying `nitems_per_input` in a `ConcatDim`
### Extra keyword arguments for `FilePattern`

`FilePattern` objects carry all of the information needed to open source files. The following additional keyword
arguments may passed to `FilePattern` instances as appropriate:

- **`fsspec_open_kwargs`**: A dictionary of kwargs to pass to `fsspec.open` to aid opening of source files. For example,
`{"block_size": 0}` may be passed if an HTTP source file server does not permit range requests. Authentication for
`fsspec`-compatible filesystems may be handled here as well. For HTTP username/password-based authentication, your specific
`fsspec_open_kwargs` will depend on the configuration of the source file server, but are likely to conform to one of the following
two formats:

```ipython3
fsspec_open_kwargs={"username": "<your-username>", "password": "<your-password>"}
fsspec_open_kwargs={"auth": aiohttp.BasicAuth("<your-username>", "<your-password>")}
```

- **`query_string_secrets`**: A dictionary of key:value pairs to append to each source file url query at runtime. Query
parameters which are not secrets should instead be included in the `format_function`.
- **`is_opendap`**: Boolean value to specify whether or not the source files are served via OPeNDAP. Incompatible with caching,
and mutually exclusive with `fsspec_open_kwargs`. Defaults to `False`.

```{warning}
Secrets including login credentials and API tokens should never be committed to a public repository. As such,
we strongly suggest that you do **not** instantiate your `FilePattern` with these or any other secrets when
developing your recipe. If your source files require authentication via `fsspec_open_kwargs` and/or
`query_string_secrets`, it is advisable to update these attributes at execution time. Pangeo Forge will soon offer a
mechanism for securely handling such recipe secrets on GitHub.
```

### Specifying `nitems_per_file` in a `ConcatDim`

FilePatterns are deliberately very simple. However, there is one case where
we can annotate the FilePattern with a bit of extra information.
Expand All @@ -127,7 +156,7 @@ have one record of daily temperature? Ten?
In general, Pangeo Forge does not assume there is a constant, known number of
records in each file; instead it will discover this information by peeking into each file.
But _if we know a-priori that there is a fixed number of records per file_, we can
provide this as a hint, via `niterms_per_file` keyword in `ConcatDim`.
provide this as a hint, via `nitems_per_file` keyword in `ConcatDim`.
Providing this hint will allow Pangeo Forge to work more quickly because it
doesn't have to peek into the files.

Expand Down
17 changes: 9 additions & 8 deletions docs/tutorials/xarray_zarr/cmip6-recipe.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/tutorials/xarray_zarr/multi_variable_recipe.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2270,7 +2270,7 @@
{
"data": {
"text/plain": [
"XarrayZarrRecipe(file_pattern=<FilePattern {'variable': 2, 'time': 12}>, inputs_per_chunk=1, target_chunks={}, target=None, input_cache=None, metadata_cache=None, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={'decode_times': False}, xarray_concat_kwargs={}, delete_input_encoding=True, fsspec_open_kwargs={}, process_input=<function fix_encoding_and_attrs at 0x7fafa00c1430>, process_chunk=None, lock_timeout=None, subset_inputs={})"
"XarrayZarrRecipe(file_pattern=<FilePattern {'variable': 2, 'time': 12}>, inputs_per_chunk=1, target_chunks={}, target=None, input_cache=None, metadata_cache=None, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={'decode_times': False}, xarray_concat_kwargs={}, delete_input_encoding=True, process_input=<function fix_encoding_and_attrs at 0x7fafa00c1430>, process_chunk=None, lock_timeout=None, subset_inputs={})"
]
},
"execution_count": 12,
Expand Down
8 changes: 3 additions & 5 deletions docs/tutorials/xarray_zarr/netcdf_zarr_sequential.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,6 @@
"\u001b[0;34m\u001b[0m \u001b[0mxarray_open_kwargs\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mdict\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m<\u001b[0m\u001b[0mfactory\u001b[0m\u001b[0;34m>\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n",
"\u001b[0;34m\u001b[0m \u001b[0mxarray_concat_kwargs\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mdict\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m<\u001b[0m\u001b[0mfactory\u001b[0m\u001b[0;34m>\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n",
"\u001b[0;34m\u001b[0m \u001b[0mdelete_input_encoding\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mbool\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n",
"\u001b[0;34m\u001b[0m \u001b[0mfsspec_open_kwargs\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mdict\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m<\u001b[0m\u001b[0mfactory\u001b[0m\u001b[0;34m>\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n",
"\u001b[0;34m\u001b[0m \u001b[0mprocess_input\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mUnion\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mCallable\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mxarray\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcore\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdataset\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mDataset\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstr\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mxarray\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcore\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdataset\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mDataset\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mNoneType\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n",
"\u001b[0;34m\u001b[0m \u001b[0mprocess_chunk\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mUnion\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mCallable\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mxarray\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcore\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdataset\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mDataset\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mxarray\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcore\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdataset\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mDataset\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mNoneType\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n",
"\u001b[0;34m\u001b[0m \u001b[0mlock_timeout\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mUnion\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mint\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mNoneType\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n",
Expand Down Expand Up @@ -849,7 +848,6 @@
" the inputs to form a chunk.\n",
":param delete_input_encoding: Whether to remove Xarray encoding from variables\n",
" in the input dataset\n",
":param fsspec_open_kwargs: Extra options for opening the inputs with fsspec.\n",
":param process_input: Function to call on each opened input, with signature\n",
" `(ds: xr.Dataset, filename: str) -> ds: xr.Dataset`.\n",
":param process_chunk: Function to call on each concatenated chunk, with signature\n",
Expand Down Expand Up @@ -889,7 +887,7 @@
{
"data": {
"text/plain": [
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 14372}>, inputs_per_chunk=1, target_chunks={}, target=None, input_cache=None, metadata_cache=None, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, fsspec_open_kwargs={}, process_input=None, process_chunk=None, lock_timeout=None, subset_inputs={})"
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 14372}>, inputs_per_chunk=1, target_chunks={}, target=None, input_cache=None, metadata_cache=None, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, process_input=None, process_chunk=None, lock_timeout=None, subset_inputs={})"
]
},
"execution_count": 13,
Expand Down Expand Up @@ -923,7 +921,7 @@
{
"data": {
"text/plain": [
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 14372}>, inputs_per_chunk=10, target_chunks={}, target=None, input_cache=None, metadata_cache=None, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, fsspec_open_kwargs={}, process_input=None, process_chunk=None, lock_timeout=None, subset_inputs={})"
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 14372}>, inputs_per_chunk=10, target_chunks={}, target=None, input_cache=None, metadata_cache=None, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, process_input=None, process_chunk=None, lock_timeout=None, subset_inputs={})"
]
},
"execution_count": 14,
Expand Down Expand Up @@ -1852,7 +1850,7 @@
{
"data": {
"text/plain": [
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 14372}>, inputs_per_chunk=10, target_chunks={}, target=FSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7f83e8d479d0>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpuz91tfhl'), input_cache=CacheFSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7f83e8d479d0>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpq3zo16e1'), metadata_cache=None, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, fsspec_open_kwargs={}, process_input=None, process_chunk=None, lock_timeout=None, subset_inputs={})"
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 14372}>, inputs_per_chunk=10, target_chunks={}, target=FSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7f83e8d479d0>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpuz91tfhl'), input_cache=CacheFSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7f83e8d479d0>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpq3zo16e1'), metadata_cache=None, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, process_input=None, process_chunk=None, lock_timeout=None, subset_inputs={})"
]
},
"execution_count": 19,
Expand Down
4 changes: 2 additions & 2 deletions docs/tutorials/xarray_zarr/opendap_subset_recipe.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@
{
"data": {
"text/plain": [
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 1}>, inputs_per_chunk=1, target_chunks={'time': 1}, target=None, input_cache=None, metadata_cache=None, cache_inputs=False, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={'engine': 'netcdf4'}, xarray_concat_kwargs={}, delete_input_encoding=True, fsspec_open_kwargs={}, process_input=None, process_chunk=None, lock_timeout=None, subset_inputs={'time': 30}, is_opendap=True)"
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 1}>, inputs_per_chunk=1, target_chunks={'time': 1}, target=None, input_cache=None, metadata_cache=None, cache_inputs=False, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={'engine': 'netcdf4'}, xarray_concat_kwargs={}, delete_input_encoding=True, process_input=None, process_chunk=None, lock_timeout=None, subset_inputs={'time': 30}, is_opendap=True)"
]
},
"execution_count": 5,
Expand Down Expand Up @@ -636,7 +636,7 @@
{
"data": {
"text/plain": [
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 1}>, inputs_per_chunk=1, target_chunks={'time': 1}, target=FSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7f9470d26e80>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpt58fl_jv'), input_cache=None, metadata_cache=MetadataTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7f9470d26e80>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpb9_y3bnl'), cache_inputs=False, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={'engine': 'netcdf4'}, xarray_concat_kwargs={}, delete_input_encoding=True, fsspec_open_kwargs={}, process_input=None, process_chunk=None, lock_timeout=None, subset_inputs={'time': 30}, is_opendap=True)"
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 1}>, inputs_per_chunk=1, target_chunks={'time': 1}, target=FSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7f9470d26e80>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpt58fl_jv'), input_cache=None, metadata_cache=MetadataTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7f9470d26e80>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpb9_y3bnl'), cache_inputs=False, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={'engine': 'netcdf4'}, xarray_concat_kwargs={}, delete_input_encoding=True, process_input=None, process_chunk=None, lock_timeout=None, subset_inputs={'time': 30}, is_opendap=True)"
]
},
"execution_count": 6,
Expand Down
4 changes: 2 additions & 2 deletions docs/tutorials/xarray_zarr/terraclimate.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@
{
"data": {
"text/plain": [
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 2, 'variable': 14}>, inputs_per_chunk=1, target_chunks={'lat': 1024, 'lon': 1024, 'time': 12}, target=None, input_cache=None, metadata_cache=None, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, fsspec_open_kwargs={}, process_input=None, process_chunk=<function preproc at 0x7fc048a84670>, lock_timeout=None, subset_inputs={})"
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 2, 'variable': 14}>, inputs_per_chunk=1, target_chunks={'lat': 1024, 'lon': 1024, 'time': 12}, target=None, input_cache=None, metadata_cache=None, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, process_input=None, process_chunk=<function preproc at 0x7fc048a84670>, lock_timeout=None, subset_inputs={})"
]
},
"execution_count": 5,
Expand Down Expand Up @@ -257,7 +257,7 @@
{
"data": {
"text/plain": [
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 2, 'variable': 14}>, inputs_per_chunk=1, target_chunks={'lat': 1024, 'lon': 1024, 'time': 12}, target=FSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7fc048a8ef40>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpbo124muo'), input_cache=CacheFSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7fc048a8ef40>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpf4qd_07g'), metadata_cache=MetadataTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7fc048a8ef40>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmployas62r'), cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, fsspec_open_kwargs={}, process_input=None, process_chunk=<function preproc at 0x7fc048a84670>, lock_timeout=None, subset_inputs={})"
"XarrayZarrRecipe(file_pattern=<FilePattern {'time': 2, 'variable': 14}>, inputs_per_chunk=1, target_chunks={'lat': 1024, 'lon': 1024, 'time': 12}, target=FSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7fc048a8ef40>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpbo124muo'), input_cache=CacheFSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7fc048a8ef40>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpf4qd_07g'), metadata_cache=MetadataTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7fc048a8ef40>, root_path='/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmployas62r'), cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, xarray_open_kwargs={}, xarray_concat_kwargs={}, delete_input_encoding=True, process_input=None, process_chunk=<function preproc at 0x7fc048a84670>, lock_timeout=None, subset_inputs={})"
]
},
"execution_count": 6,
Expand Down
27 changes: 24 additions & 3 deletions pangeo_forge_recipes/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,32 @@ class FilePattern:
list.
:param combine_dims: A sequence of either concat or merge dimensions. The outer
product of the keys is used to generate the full list of file paths.
:param fsspec_open_kwargs: Extra options for opening the inputs with fsspec.
May include ``block_size``, ``username``, ``password``, etc.
:param query_string_secrets: If provided, these key/value pairs are appended to
the query string of each ``file_pattern`` url at runtime.
:param is_opendap: If True, assume all input fnames represent opendap endpoints.
Cannot be used with caching.
"""

def __init__(self, format_function: Callable, *combine_dims: CombineDim):
def __init__(
self,
format_function: Callable,
*combine_dims: CombineDim,
fsspec_open_kwargs: Optional[Dict[str, Any]] = None,
query_string_secrets: Optional[Dict[str, str]] = None,
is_opendap: bool = False,
):
self.format_function = format_function
self.combine_dims = combine_dims
self.fsspec_open_kwargs = fsspec_open_kwargs if fsspec_open_kwargs else {}
self.query_string_secrets = query_string_secrets if query_string_secrets else {}
self.is_opendap = is_opendap
if self.fsspec_open_kwargs and self.is_opendap:
raise ValueError(
"OPeNDAP inputs are not opened with `fsspec`. "
"`is_opendap` must be `False` when passing `fsspec_open_kwargs`."
)

def __repr__(self):
return f"<FilePattern {self.dims}>"
Expand Down Expand Up @@ -214,7 +235,7 @@ def items(self):
yield key, self[key]


def pattern_from_file_sequence(file_list, concat_dim, nitems_per_file=None):
def pattern_from_file_sequence(file_list, concat_dim, nitems_per_file=None, **kwargs):
"""Convenience function for creating a FilePattern from a list of files."""

keys = list(range(len(file_list)))
Expand All @@ -223,7 +244,7 @@ def pattern_from_file_sequence(file_list, concat_dim, nitems_per_file=None):
def format_function(**kwargs):
return file_list[kwargs[concat_dim]]

return FilePattern(format_function, concat)
return FilePattern(format_function, concat, **kwargs)


def prune_pattern(fp: FilePattern, nkeep: int = 2) -> FilePattern:
Expand Down
Loading

0 comments on commit 0352ca0

Please sign in to comment.