Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment: How HDFReferenceRecipe could look as a Beam Pipeline. #337

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed docs/images/.DS_Store
Binary file not shown.
31 changes: 31 additions & 0 deletions examples/cmip6_ref_beam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import typing as t
import s3fs
import sys
import apache_beam as beam

from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.recipes.reference_hdf_zarr import HDFReferenceRecipe

BASE_PATH = 's3://esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/'


def run(pipeline_args: t.List[str]) -> None:
# Define pattern
fs = s3fs.S3FileSystem(anon=True)
all_paths = fs.ls(BASE_PATH)
pattern = pattern_from_file_sequence(['s3://' + path for path in all_paths], 'time')

# Create Recipe
rec = HDFReferenceRecipe(
pattern,
xarray_open_kwargs={"decode_coords": "all"},
netcdf_storage_options={"anon": True}
)

with beam.Pipeline(argv=pipeline_args) as p:
p | rec.to_beam()


if __name__ == '__main__':
run(sys.argv[1:])

24 changes: 24 additions & 0 deletions pangeo_forge_recipes/patterns.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""
Filename / URL patterns.
"""
import dataclasses

import apache_beam as beam
import inspect
import warnings
from dataclasses import dataclass, field, replace
Expand Down Expand Up @@ -250,6 +253,27 @@ def items(self):
yield key, self[key]


@dataclasses.dataclass
class OpenPattern(beam.PTransform):

file_pattern: FilePattern

def expand(self, pcoll):
return pcoll | beam.Create(self.file_pattern.items())


@dataclasses.dataclass
class ChunkKeys(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.MapTuple(lambda key, fname: key)


@dataclasses.dataclass
class FileNames(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.MapTuple(lambda key, fname: fname)


def pattern_from_file_sequence(file_list, concat_dim, nitems_per_file=None, **kwargs):
"""Convenience function for creating a FilePattern from a list of files."""

Expand Down
47 changes: 40 additions & 7 deletions pangeo_forge_recipes/recipes/reference_hdf_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
import json
import os
from dataclasses import dataclass, field
from typing import Callable, Hashable, Iterable, Optional
from typing import Callable, Hashable, Iterable, Optional, Dict, List

import apache_beam as beam
import fsspec
import yaml
from kerchunk.combine import MultiZarrToZarr

from .base import BaseRecipe, FilePatternMixin, StorageMixin
from ..executors.base import Pipeline, Stage
from ..patterns import FileType, Index
from ..patterns import FileType, Index, OpenPattern, FileNames
from ..reference import create_hdf5_reference, unstrip_protocol
from ..storage import file_opener
from .base import BaseRecipe, FilePatternMixin, StorageMixin

ChunkKey = Index

Expand All @@ -26,25 +27,34 @@ def no_op(*_, **__) -> None:
def scan_file(chunk_key: ChunkKey, config: HDFReferenceRecipe):
assert config.storage_config.metadata is not None, "metadata_cache is required"
fname = config.file_pattern[chunk_key]
reference = scan_file_pure(fname, config)
ref_fname = os.path.basename(fname + ".json")
config.storage_config.metadata[ref_fname] = reference


def scan_file_pure(fname: str, config: HDFReferenceRecipe) -> Dict:
with file_opener(fname, **config.netcdf_storage_options) as fp:
protocol = getattr(getattr(fp, "fs", None), "protocol", None) # make mypy happy
if protocol is None:
raise ValueError("Couldn't determine protocol")
target_url = unstrip_protocol(fname, protocol)
config.storage_config.metadata[ref_fname] = create_hdf5_reference(fp, target_url, fname)
return create_hdf5_reference(fp, target_url, fname)


def finalize(config: HDFReferenceRecipe):
assert config.storage_config.target is not None, "target is required"
assert config.storage_config.metadata is not None, "metadata_cache is required"
remote_protocol = fsspec.utils.get_protocol(next(config.file_pattern.items())[1])

files = list(
config.storage_config.metadata.getitems(
list(config.storage_config.metadata.get_mapper())
).values()
) # returns dicts from remote
finalize_pure(files, config)


def finalize_pure(files: List[Dict], config: HDFReferenceRecipe) -> None:
assert config.storage_config.target is not None, "target is required"
remote_protocol = fsspec.utils.get_protocol(next(config.file_pattern.items())[1])

if len(files) == 1:
out = files[0]
else:
Expand Down Expand Up @@ -99,6 +109,26 @@ def hdf_reference_recipe_compiler(recipe: HDFReferenceRecipe) -> Pipeline:
return Pipeline(stages=stages, config=recipe)


@dataclass
class ScanFiles(beam.PTransform):
config: BaseRecipe

def expand(self, pcoll):
return pcoll | beam.Map(scan_file_pure, config=self.config)


@dataclass
class WriteZarrReference(beam.PTransform):
config: BaseRecipe

def expand(self, pcoll):
return (
pcoll
| beam.combiners.ToList()
| beam.Map(finalize_pure, config=self.config)
)


@dataclass
class HDFReferenceRecipe(BaseRecipe, StorageMixin, FilePatternMixin):
"""
Expand Down Expand Up @@ -179,3 +209,6 @@ def _validate_file_pattern(self):

def iter_inputs(self) -> Iterable[Hashable]:
yield from self.file_pattern

def to_beam(self):
return OpenPattern(self.file_pattern) | FileNames() | ScanFiles(self) | WriteZarrReference(self)
Comment on lines +213 to +214
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martindurant - this is the part that is easy to read! 🤩