Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should we just adopt xarray-beam as our internal data model? #256

Open
rabernat opened this issue Dec 18, 2021 · 20 comments
Open

Should we just adopt xarray-beam as our internal data model? #256

rabernat opened this issue Dec 18, 2021 · 20 comments

Comments

@rabernat
Copy link
Contributor

rabernat commented Dec 18, 2021

TL;DR: what the title says

background

This team should feel very proud of the work we've done on this package over the past few months. We have shipped dozens of real recipes (albeit without any permanent infrastructure to deploy them on) and learned lots of important lessons in the process.

At the end of a long haul of work on this package, I'd like to zoom out and offer some reflections on our direction in terms of software architecture. I feel like we (specifically, I) have flailed around a bit in terms of the core internals of the Recipe model. Examples of big refactorings:

In the meantime, we have worked on many orthogonal but also important aspects, mostly around I/O. We have also waited months 😡 for Columbia to release our free credits from Prefect Cloud to actually deploy our primary bakery, which has given us plenty of time to contemplate our Prefect dependency.

In #245 (comment), based on my experience trying to generalize the input layer of pangeo-forge-recipes, I am basically suggesting we move to a much more general / flexible model for recipes, where we simply chain stages together. Something like

recipe = Recipe(source, transformation, destination)

or in beam-style:

recipe = source | transformation | destination

This, coupled with a one-way flow of data between these stages, would basically inevitably look a lot like xarray-beam.

We have already danced around various integrations with beam, for example:

I'd like to discuss the pros and cons of incorporating xarray-beam more tightly into pangeo-forge-recipes.

What is xarray-beam?

Xarray-beam's data model defines a convention for "keys", which are more-or-less equivalent to our Indexes. These keys can be used in the context of general Beam Pipelines which pass around xarray datasets. In contrast to pangeo-forge-recipes, there are not a lot of other custom objects. The API has just a few functions for reading, writing, rechunking, aggregating, etc.

A high level difference in the evolution of the two packages is that we kind of went bottom-up, starting from a very specific use case (many netcdfs to zarr) and are gradually trying to generalize, while they started as a much more general framework. So maybe not is the time to align more closely.

Some possible ways we could do this are...

Option A: Recipes ➡️ Beam Pipelines

This is the most "all-in" approach. We simply say that our Recipes are Beam Pipelines and build them using xarray-beam. We keep our input / file-pattern stuff and basically throw away a good portion of our code. (We still keep everything related to opening files / datasets, which is transferrable.)

Pros

  • We immediately gain more flexibility in terms of recipe structure. For example, Support for more than one ConcatDim #140 becomes trivial.
  • Less to maintain; we leverage the very mature Beam ecosystem for our executors (Runners as they call them)

Cons

  • By committing to the xarray-beam (or at least Beam) data model, we lose some flexibility. How would the HDFReference recipe look in this context? Are we certain the Beam data model can support all of our needs?
  • We have to rewrite lots of code
  • We may face serialization challenges
  • Storage (things like metadata cache) would likely have to be refactored / redesigned completely
  • We commit to only deploying on Beam, foregoing Prefect and Dask. This is the big one.
  • We have to learn about Beam
  • We have to refactor all the bakeries to use Beam instead of Prefect

Questions

  • Can each "offset" of an xarray_beam.Key point to a dataset of different length along that dimension? Like if there are a different number of timesteps in each file?
  • Can the key accommodate our concept of MergeDim (maybe not based on this code)?

Option B: Adopt parts of the xarray-beam data model but continue to use an adaptor

We continue to maintain a separate Pipeline object internally, but adopt more of the xarray-beam-like semantics, such as Keys for each stage. We use this to allow arbitrary stages to be chained together using beam-like operation like map.

Pros

  • We get more flexible recipes
  • We get to keep our Prefect and Dask executors

Cons

  • Our Pipeline object gets more complicated and harder to maintain
  • We end up basically reimplementing big parts of Beam

Option C: Do nothing

We continue more-or-less ignoring xarray-beam and doing our own thing, possibly going down architectural rabbit holes that have presumably been solved already.


I have enumerated a lot more cons for switching to Beam, but I am still kind of thinking it's the right option.

Welcoming thoughts from everyone.

@TomAugspurger
Copy link
Contributor

My main two questions would be

  1. Will this make developing / testing scripts harder? (probably not, since it's our internal data model, so we can hopefully retain the regular Python functions API). And will it make debugging a running workflow harder? (What's the logging story around beam)
  2. How easy is Beam to deploy outside of GCP?

I will say that having the option to run a recipe using Dask has been helpful, since I've been able to manually run recipes wherever I happen to have a dask cluster. But as bakeries get more attention that will be less important.

@alxmrs
Copy link
Contributor

alxmrs commented Jan 4, 2022

I'll write a more detailed response soon, but I will answer this question now:

Can the key accommodate our concept of MergeDim

Yes, keys can accommodate MergeDims. An in progress solution for this can be found here: google/xarray-beam#39

@alxmrs
Copy link
Contributor

alxmrs commented Jan 4, 2022

Will this make developing / testing scripts harder? (probably not, since it's our internal data model, so we can hopefully retain the regular Python functions API). And will it make debugging a running workflow harder? (What's the logging story around beam)

Probably not, also because Beam pipelines can be run locally with the direct runner (or, in Jupyter notebooks) and can be run in unit tests. Beam's Python SDK uses the Python logging module. I use PyCharm as my IDE, and I've use it's flavor of the Python debugger to step into Beam steps (transformations).

How easy is Beam to deploy outside of GCP?

I don't have direct experience with this, and will have to investigate further. However, Beam pipelines "compile" onto Apache Spark environments. Thus, anywhere Spark can run (AWS, Azure, GCP, Databrick's offering, HPC), so can Beam pipelines. From what I've read, Beam is described as a high level API for agnostic MapReduce environments. Again, I will do some research to find out how easy the dev ops are in practice.

@shoyer
Copy link

shoyer commented Jan 4, 2022

  • Can each "offset" of an xarray_beam.Key point to a dataset of different length along that dimension? Like if there are a different number of timesteps in each file?

The model I picked for distributed data in xarray-beam is similar to dask.array. You can have datasets of different size, but offsets are integer offsets from the origin, rather than index labels like in dask.dataframe. If you don't know dataset sizes ahead of time, you would need to calculate the information before creating Key objects (which are mostly needed for writing to Zarr or rechunking). Alternative data models are definitely possible, but this seemed like the most straightforward to me.

Is there a particular example you are worried about?

  • Can the key accommodate our concept of MergeDim (maybe not based on this code)?

I think this would be straightforward to implement.

@alxmrs
Copy link
Contributor

alxmrs commented Feb 1, 2022

Hey @rabernat @cisaacstern, @sharkinsspatial and @TomAugspurger: How about we schedule a meeting to discuss execution engines and their tradeoffs?

@rabernat
Copy link
Contributor Author

rabernat commented Feb 1, 2022

@alxmrs - thanks for keeping alive this important issue. I didn't mean to shut down the discussion at today's meeting. I'm just hyper focused on getting actually launched atm. But it would actually be great if you (Alex) could keep moving this design discussion and prototyping forward in parallel to the deployment work.

Rather than scheduling a meeting right now, let's see if we can iterate a bit asynchronously, using this issue for discussion.

Let me kick things off by suggesting that we are convolving two separate but related questions:

  1. What should be the internal data model of pangeo_forge_recipes?

    Current status: our internal data model is that recipe classes must have a ._compiler ClassVar which knows how to translate the recipe in into a Pipeline object. That is very flexible (every recipe class can implement whatever compiler it wants), at the expense of modularity. There is no way to mix or combine elements from different recipe classes. Therefore, a central goal of the refactor is to support more modularity and component reuse both internally and by end users, as in the pseudocode in my original post. There is also a desire to avoid so many side effects, instead passing data explicitly between stages, which would support the modularity goal.

  2. What executors should be supported?

    We don't want to actually implement our own distributed processing engine. Instead, we "compile" the recipes to other execution frameworks. We currently support vanilla Python, Dask, Prefect, and Beam. Ideally we would grow this to include as many suitable frameworks as possible. (Dagster looks pretty interesting.) The challenge is that each framework has different feature sets. So to some degree, element 1 (internal data model) is coupled to element 2. Selecting one single supported executor (e.g. Beam) would effectively collapse 1 and 2 to the same data model, eliminating the need for a compiler at all.

As a specific example of these tradeoffs, let's look at how Prefect does mapping. A prefect task can be either regular task of a mapped task. Many mapped tasks can be chained together. However, chained maps must be the same length. There is no mechanism for aggregation or grouping of tasks, as there is in Beam (and also possibly Dask, via blockwise). If we wanted to move to a model where data is actually passed between tasks (rather than written to disk via side effects), that means that _each stage of a Pangeo Forge Pipeline in Prefect would need to use the same number of tasks. For the typical NetCDF to Zarr workflow, assuming that the first layer contains one file per tasks, this would imply inputs_per_chunk=1 and subset_inputs=None; i.e. once the chunking is set, by one stage, we cannot change it.


A specific proposal for investigation: what if we adopt xarray-beam's general concept where every stage can be thought of as a generator which yields a key, value pair? The flow could look something like this

FilePattern -> (key, str) -> Cache -> (key, fsspec.File) -> XarrayOpener -> (key, xr.Dataset) -> ZarrWriter

Some questions that this raises:

  1. What type of key would be generic enough to support pangeo forge's use cases. The first stage for example, has no arrays yet. I think our current Index object is almost there already.
  2. How would be support stages that change the number of items in the iteration? Beam can do this via groupby or other fanciness that I don't fully understand.
  3. How would we handle reductions / side inputs? For example, ZarrWriter would need a "template" for the dataset as "side input". For unevenly sized arrays, the only way to get this is to do a reduction on all the input shapes. We are currently using cache_metadata for this. But we could instead imagine something like the following (leaving out cache for brevity)
FilePattern -> (key, str) -XarrayOpener -> (key, xr.Dataset) -> ZarrWriter
                                                   |               ^
                                             reduce schema --------|      

These side inputs would possibly be able to eliminate our reliance on a metadata cache.


The ideal outcome of this brainstorming would be a well-crafted design document for a new internal data model. Then we could get to work on actually performing this refactoring while maintaining backwards compatibility in terms of all of the real-world use cases we already support.

@alxmrs
Copy link
Contributor

alxmrs commented Feb 3, 2022

Hey Ryan,

Thanks for reframing the problem in this way. This is a great write up! It certainly gives me a lot to think about... I will start thinking about the internal PGF data model. I'll take my time, since the project is focused on the first launch anyway.

@rabernat rabernat pinned this issue Mar 26, 2022
@rabernat
Copy link
Contributor Author

rabernat commented Mar 26, 2022

I recently learned about lithops

Lithops is a Python multi-cloud serverless computing framework. It allows to run unmodified local python code at massive scale in the main serverless computing platforms.

Based on reading through the documentation, it looks like pangeo-forge today is already extremely compatible with the lithops execution model (does not support much inter-process communication).

Lithops supports nearly every cloud's serverless offering. There would be some big advantages in moving to a serverless execution framework. We should consider the potential pros and cons vs. the apache beam idea.

@alxmrs
Copy link
Contributor

alxmrs commented Apr 12, 2022

The ideal outcome of this brainstorming would be a well-crafted design document for a new internal data model.

I’m more actively working on this specific design doc. For now, I’d like to respond to open concerns listed in this thread so far.

We commit to only deploying on Beam, foregoing Prefect and Dask.

In theory, we could implement Prefect and Dask as Beam Runners (e.g. have adapter layers between Beam and, say, Prefect).

In fact, the Beam Runner docs suggest that Beam’s internal data model is similar to Pangeo’s, at least in goal. Beam in essence includes a compiler from dataflow description graph onto execution primitives.

We don't want to actually implement our own distributed processing engine. Instead, we "compile" the recipes to other execution frameworks.

Switching to Beam could mean reusing their compiler architecture.

Ideally we would grow this to include as many suitable frameworks as possible. … The challenge is that each framework has different feature sets.

An early section of the Beam Runner docs mentions that the framework is flexible here: “You don’t have to do it all at once, and there may even be features that don’t make sense for your runner to ever support.” The standard practice is to throw an error if the runner receives a pipeline that it cannot execute. This suggests that one could build an MVP Beam runner and iterate.

It would be interesting to investigate how feasible it would be to implement a Prefect runner for a Beam pipeline. The mapping docs you link lead me to believe that this would be totally tractable.

[Responses concerning how this could be implemented with Prefect] > However, chained maps must be the same length. > There is no mechanism for aggregation or grouping of tasks, as there is in Beam

Sorry, can you point out where in the docs that it mentions this? Maybe, I’m misunderstanding their conception of a “Task”. Prefect seems to offer reduce and flatten operations. With flatten, iterables between maps can vary in length. With reduce, iterables can be aggregated (for example, reducing an iterable into another collection, like a dictionary, seems possible). Prefect does mention an optimization about invoking multiple maps in succession over the same terrible. However, I don’t think a mapreducemap is disallowed. Wouldn’t these be sufficient for aggregation, grouping, and working with data of variable length?

From reviewing the linked page, it sounds like Prefect includes all the same primitives that Beam offers – check out their guide on implementing primitives (it’s worth pointing out, we could implement this in Python only).

I’m not necessarily advocating for Pangeo Forge to maintain a Beam adapter to Prefect. I want to establish that a mapping is possible. As mentioned in the PGF biweekly meetings, it should also be possible to create a runner from Beam to Dask, which I think would be way more fruitful.

A core benefit to adopting the Beam model is that PGF would gain a lot of modularity, towards this point:

There is no way to mix or combine elements from different recipe classes. Therefore, a central goal of the refactor is to support more modularity and component reuse both internally and by end users, as in the pseudocode in my original post. There is also a desire to avoid so many side effects, instead passing data explicitly between stages, which would support the modularity goal.

One way this could be achieved is if Pangeo-Forge Recipes became a library of Composite Transforms that interacted with each other, likely via shared PCollection interfaces. XArray-Beam does provide one model for such an interface that’s oriented around XArray Datasets. However, Pangeo-Forge could provide other models for other recipe types, like the HDReference recipe. Or, it’s likely for this case that the Beam primitives would be sufficient.

How would we handle reductions / side inputs?

Beam Transformations include the capacity for adding side inputs. You can extract the value from a reduction and pass it into a side input, like so.

Lithos vs Beam

I agree with you that Pangeo-Forge today is compatible with Lithos. Here are a few pros vs cons about doubling down on the adaptor approach vs switching to Beam:

  • A fundamental question that Pangeo-Forge needs to reckon with is: should it be oriented towards workflow orchestration, or towards dealing with streams of data?
  • Lithos + Pangeo-Forge today is oriented towards Task orchestration. Here, functions are executed in a dependency graph of functions that have no knowledge of data. This is orthogonal to how Beam operates.
    • Task orchestration will be less computationally efficient than a dataflow model of programming. Without knowledge of the data, it’s hard to autoscale up more compute resources to process tasks faster. There are workarounds to this, of course, but I doubt they are as succinct as adopting a data-oriented approach. Further, a task-based approach requires the production and management of side effects, and pre-requisite stages of computation. In data-aware pipelines, side effects are managed by the framework; developers program in terms of pure functions instead of dealing with IO + caches, and execution is often more performant since operations can be fused together.
    • Task orchestration frameworks generally struggle to operate on streams of data. Data-flow systems typically are better suited to handle these cases. The code change to transition from a batch to streaming pipeline in Beam is minimal – and can still deploy on any SDK that supports streaming operations.
  • Is the end goal of Pangeo-Forge to be deployed anywhere? As far as I am aware, every major cloud provider has the capability to host infrastruction to run Beam pipelines – if not with managed services [e.g. AWS Kinesis (Apache Fink), AWS Elastic Map Reduce (Apache Spark)].
  • What is the definition of serverless in this context? I don’t think Beam uses that terminology, however, the SDKs where Beam is deployed often make use of containerization and autoscaling to use compute resources as efficiently as possible.

alxmrs added a commit to alxmrs/pangeo-forge-recipes that referenced this issue Apr 12, 2022
This is a prototype for using Apache Beam for the internal (and external?) data model of Pangeo Forge Recipes. Here, I demo how HDFReferenceRecipe could be structured into modular components via composite Beam transforms.

xref: pangeo-forge#256
alxmrs added a commit to alxmrs/pangeo-forge-recipes that referenced this issue Apr 12, 2022
This is a prototype for using Apache Beam for the internal (and external?) data model of Pangeo Forge Recipes. Here, I demo how HDFReferenceRecipe could be structured into modular components via composite Beam transforms.

xref: pangeo-forge#256
@rabernat
Copy link
Contributor Author

FYI Beam now supports Python 3.9 on both Pip and Conda. 🎉

@rabernat
Copy link
Contributor Author

Here is a little experiment I did tonight in preparation for tomorrow's sprint: https://gist.github.com/rabernat/15f77fb447e2cdbc73c4031c59768886

@cisaacstern
Copy link
Member

I've just successfully run a Beam Pipeline as a Dataflow job, deployed from a Jupyter Notebook. The pipeline is the official wordcount example. Notes:

  • Running this yourself will require running gcloud auth login --cred-file=$CRED_FILE where CRED_FILE is a json credential for the pangeo-forge-4967 project with Dataflow scope.
  • As noted inline below, we need to specify use_public_ips=False because Columbia GKE VMs can't have external IPs. Use NAT instead? pangeo-forge-gcs-bakery#29. Currently, our only private network is setup in us-central1, so that is the only region we can run these jobs in at the moment.
import re

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


class WordExtractingDoFn(beam.DoFn):
    """Parse each line of input text into words."""
    def process(self, element):
        """Returns an iterator over the words of this element.
        The element is a line of text.  If the line is blank, note that, too.
        Args:
          element: the element being processed
        Returns:
          The processed element.
        """
        return re.findall(r'[\w\']+', element, re.UNICODE)
    

input_file = 'gs://dataflow-samples/shakespeare/kinglear.txt'
output_path = 'gs://beam-dataflow-test/counts.txt'

beam_options = PipelineOptions(
    runner='DataflowRunner',
    project='pangeo-forge-4967',
    # Note: manually incremenent the `job_name` for each run.
    # In production, this will be handled programmatically.
    job_name='wordcount-example-0',
    temp_location='gs://beam-dataflow-test/temp',
    # Our institutional GCP policies forbid public IPs, xref:
    #   - https://cloud.google.com/dataflow/docs/reference/pipeline-options#security_and_networking
    #   - https://github.com/pangeo-forge/pangeo-forge-gcs-bakery/issues/29
    use_public_ips=False,
    # The private network we've already set up:
    #   - https://github.com/pangeo-forge/pangeo-forge-gcs-bakery/pull/30
    # is in us-central1, so that's our only option unless/until we establish private networks elsewhere. xref:
    #   - https://console.cloud.google.com/networking/networks/details/default?project=pangeo-forge-4967&pageTab=SUBNETS
    region='us-central1',
)


with beam.Pipeline(options=beam_options) as p:

    # Read the text file[pattern] into a PCollection.
    lines = p | 'Read' >> ReadFromText(input_file)

    counts = (
        lines
        | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum))

    # Format the counts into a PCollection of strings.
    def format_result(word, count):
        return '%s: %d' % (word, count)

    output = counts | 'Format' >> beam.MapTuple(format_result)

    # Write the output using a "Write" transform that has side effects.
    output | 'Write' >> WriteToText(output_path)

@sharkinsspatial
Copy link
Contributor

I'm in the process of preparing a separate issue discussing the multi-cloud bakery story for Beam pipeline runners but I have one initial question here. Do we want to aim for consistency across all platforms for our pipeline runner (Flink on k8s) or will we want to leverage Dataflow on GCP? I would lean towards a consistent approach across cloud providers so we can leverage the Flink monitoring API for integration with pangeo-forge-orchestrator but I'm interested to hear thoughts from others.

@cisaacstern
Copy link
Member

Very interesting question, @sharkinsspatial. Personally, I would favor an incremental approach: Dataflow on GCP to begin, which allows us to de-risk and debug the Beam Pipelines themselves, with presumably minimal concerns regarding infrastructure. And then a subsequent phase in which we migrate to the more generalized, platform-agnostic implementation. My concern with going all-in on Flink to start is that too many things will be changing all at once (data model and infrastructure, instead of data model then infrastructure). Though I can also see the counter-argument that this incremental approach means accruing technical debt by working towards an approach which we may likely abandon before too long.

@rabernat
Copy link
Contributor Author

rabernat commented Jun 7, 2022

In order to kickstart the discussion of implementing a Dask Beam runner, I propose we meet during the week of June 13-17. I have created a When2Meet Poll here - Dask Beam Runner Discussion - When2meet . If you are interested in attending, please give your availability. Hope to see many people there! 🚀

@alxmrs
Copy link
Contributor

alxmrs commented Jul 12, 2022

Re: @cisaacstern's incremental approach. That LGTM, and – It would be prudent to ensure the core recipe transforms span the runners we want to target in the long-term, as described in Beam's capability matrix.

Also: I have a working prototype of a Dask Runner for Beam here alxmrs/beam#1. I would love to start stress testing it with Pangeo-Forge recipes, if at all possible. Ideally, PGF recipes should drive the capabilities of the Dask runner.

@cisaacstern
Copy link
Member

cisaacstern commented Jul 12, 2022

Also: I have a working prototype of a Dask Runner for Beam here alxmrs/beam#1. I would love to start stress testing it with Pangeo-Forge recipes, if at all possible. Ideally, PGF recipes should drive the capabilities of the Dask runner.

Awesome, @alxmrs! Here's how you could start testing the Dask Runner via a PR to pangeo-forge-recipes:

  1. Install Beam from your feature branch via pip in ci/upstream-dev.yml

  2. Add an execute_recipe_beam_dask fixture in conftest.py, which can be modeled on

    @pytest.fixture(params=[pytest.param(0, marks=pytest.mark.executor_beam)])
    def execute_recipe_beam():
    beam = pytest.importorskip("apache_beam")
    def execute(recipe):
    pcoll = recipe.to_beam()
    with beam.Pipeline() as p:
    p | pcoll
    return execute

    except with, presumably, beam.Pipeline(runner="DaskRunner"), and changing the value passed to the marks argument of the decorator.

  3. Add the new execute_recipe_beam_dask as a lazy_fixture to the execute_recipe fixture at the bottom of conftest.py

  4. In .github/workflows/main.yaml:

           dependencies: ["releases-only", "upstream-dev"]
           pytest-mark: ["no_executor", "executor_function", "executor_generator",
                         "executor_dask", "executor_prefect", "executor_prefect_dask",
    -                    "executor_prefect_wrapper", "executor_beam"]
    +                    "executor_prefect_wrapper", "executor_beam", "executor_beam_dask"]
           exclude:
    +        - dependencies: "releases-only"
    +          pytest-mark: "executor_beam_dask"
             - dependencies: "upstream-dev"
               pytest-mark: "executor_function"
             - dependencies: "upstream-dev"
               pytest-mark: "executor_generator"

Opening a PR to pangeo-forge-recipes should then result in the upstream-dev test parametrization installing from your feature branch and running your DaskRunner. LMK if this makes sense or if I can help in any way!

@alxmrs
Copy link
Contributor

alxmrs commented Jul 12, 2022

Thanks for the detailed instructions! I'll set this up later this week :)

@rabernat
Copy link
Contributor Author

So excited about this! 🚀

I am making steady progress on my end...see #379.

@alxmrs
Copy link
Contributor

alxmrs commented Nov 11, 2022

@rabernat wasn't sure the best place to give an update. I'm about to go on vacation for a week, and wanted to explain my status on Beam & Dask. From what I can tell, when apache/beam#23913 lands we should have enough implemented in the Dask Runner to support executing Pangeo Forge recipes. I've tried testing things locally, and PGF pipelines seem to run correctly (all exit codes are zero, but I haven't generated ARCO data e2e quite yet).

The outstanding work to do on this PR involves passing lint + formatting checks. @pabloem is a good point-of-contact for carrying this change out to completion in my absence.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants