Skip to content

Commit

Permalink
Experiment: How HDFReferenceRecipe would look as a Beam Pipeline.
Browse files Browse the repository at this point in the history
This is a prototype for using Apache Beam for the internal (and external?) data model of Pangeo Forge Recipes. Here, I demo how HDFReferenceRecipe could be structured into modular components via composite Beam transforms.

xref: pangeo-forge#256
  • Loading branch information
alxmrs committed Apr 12, 2022
1 parent f64a654 commit 910f740
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 8 deletions.
31 changes: 31 additions & 0 deletions examples/cmip6_ref_beam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import typing as t
import s3fs
import sys
import apache_beam as beam

from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.recipes.reference_hdf_zarr import HDFReferenceRecipe

BASE_PATH = 's3://esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/'


def run(pipeline_args: t.List[str]) -> None:
# Define pattern
fs = s3fs.S3FileSystem(anon=True)
all_paths = fs.ls(BASE_PATH)
pattern = pattern_from_file_sequence(['s3://' + path for path in all_paths], 'time')

# Create Recipe
rec = HDFReferenceRecipe(
pattern,
xarray_open_kwargs={"decode_coords": "all"},
netcdf_storage_options={"anon": True}
)

with beam.Pipeline(argv=pipeline_args) as p:
p | rec.to_beam()


if __name__ == '__main__':
run(sys.argv[1:])

24 changes: 24 additions & 0 deletions pangeo_forge_recipes/patterns.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""
Filename / URL patterns.
"""
import dataclasses

import apache_beam as beam
import inspect
from dataclasses import dataclass, field, replace
from enum import Enum
Expand Down Expand Up @@ -216,6 +219,27 @@ def items(self):
yield key, self[key]


@dataclasses.dataclass
class OpenPattern(beam.PTransform):

file_pattern: FilePattern

def expand(self, pcoll):
return pcoll | beam.Create(self.file_pattern.items())


@dataclasses.dataclass
class ChunkKeys(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.MapTuple(lambda key, fname: key)


@dataclasses.dataclass
class FileNames(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.MapTuple(lambda key, fname: fname)


def pattern_from_file_sequence(file_list, concat_dim, nitems_per_file=None, **kwargs):
"""Convenience function for creating a FilePattern from a list of files."""

Expand Down
49 changes: 41 additions & 8 deletions pangeo_forge_recipes/recipes/reference_hdf_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
import json
import os
from dataclasses import dataclass, field
from typing import Hashable, Iterable, Optional
from typing import Hashable, Iterable, Optional, Dict, List

import apache_beam as beam
import fsspec
import yaml
from fsspec_reference_maker.combine import MultiZarrToZarr

from .base import BaseRecipe, FilePatternMixin
from ..executors.base import Pipeline, Stage
from ..patterns import Index
from ..patterns import Index, OpenPattern, FileNames
from ..reference import create_hdf5_reference, unstrip_protocol
from ..storage import FSSpecTarget, MetadataTarget, file_opener
from .base import BaseRecipe, FilePatternMixin

ChunkKey = Index

Expand All @@ -26,18 +27,30 @@ def no_op(*_, **__) -> None:
def scan_file(chunk_key: ChunkKey, config: HDFReferenceRecipe):
assert config.metadata_cache is not None, "metadata_cache is required"
fname = config.file_pattern[chunk_key]
reference = scan_file_pure(fname, config)
ref_fname = os.path.basename(fname + ".json")
config.metadata_cache[ref_fname] = reference


def scan_file_pure(fname: str, config: HDFReferenceRecipe) -> Dict:
with file_opener(fname, **config.netcdf_storage_options) as fp:
protocol = getattr(getattr(fp, "fs", None), "protocol", None) # make mypy happy
if protocol is None:
raise ValueError("Couldn't determine protocol")
target_url = unstrip_protocol(fname, protocol)
config.metadata_cache[ref_fname] = create_hdf5_reference(fp, target_url, fname)
return create_hdf5_reference(fp, url=target_url, fname=fname)


def finalize(config: HDFReferenceRecipe):
assert config.target is not None, "target is required"
assert config.metadata_cache is not None, "metadata_cache is required"
files = list(
config.metadata_cache.getitems(list(config.metadata_cache.get_mapper())).values()
) # returns dicts from remote
finalize_pure(files, config)


def finalize_pure(files: List[Dict], config: HDFReferenceRecipe) -> None:
assert config.target is not None, "target is required"
remote_protocol = fsspec.utils.get_protocol(next(config.file_pattern.items())[1])
concat_args = config.xarray_concat_args.copy()
if "dim" in concat_args:
Expand All @@ -47,9 +60,6 @@ def finalize(config: HDFReferenceRecipe):
)
concat_args["dim"] = config.file_pattern.concat_dims[0] # there should only be one

files = list(
config.metadata_cache.getitems(list(config.metadata_cache.get_mapper())).values()
) # returns dicts from remote
if len(files) == 1:
out = files[0]
else:
Expand Down Expand Up @@ -100,6 +110,26 @@ def hdf_reference_recipe_compiler(recipe: HDFReferenceRecipe) -> Pipeline:
return Pipeline(stages=stages, config=recipe)


@dataclass
class ScanFiles(beam.PTransform):
config: BaseRecipe

def expand(self, pcoll):
return pcoll | beam.Map(scan_file_pure, config=self.config)


@dataclass
class WriteZarrReference(beam.PTransform):
config: BaseRecipe

def expand(self, pcoll):
return (
pcoll
| beam.combiners.ToList()
| beam.Map(finalize_pure, config=self.config)
)


@dataclass
class HDFReferenceRecipe(BaseRecipe, FilePatternMixin):
"""
Expand Down Expand Up @@ -168,3 +198,6 @@ def _validate_file_pattern(self):

def iter_inputs(self) -> Iterable[Hashable]:
yield from self.file_pattern

def to_beam(self):
return OpenPattern(self.file_pattern) | FileNames() | ScanFiles(self) | WriteZarrReference(self)

0 comments on commit 910f740

Please sign in to comment.