Skip to content

Commit e607fbd

Browse files
shoyerXarray-Beam authors
authored and
Xarray-Beam authors
committed
[xarray-beam] breaking refactor: replace ChunkKey with Key
Fixes #9 This changes the Xarray-Beam data model to keep track of chunk keys in a different way: as a pair of (offsets, variables), rather than only offsets. The new key type is given a different name (Key rather than ChunkKey) because it also has a completely different API. This should allow for more efficient pipelines, e.g., by splitting datasets across variables for rechunking. It also includes the new transforms SplitVariables() and ConsolidateVariables(), and exposes a number of internal helper functions as public APIs. PiperOrigin-RevId: 388305128
1 parent ec078c1 commit e607fbd

18 files changed

+1071
-483
lines changed

Diff for: .github/workflows/ci-build.yml

+8-4
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,14 @@ jobs:
3737
with:
3838
path: ${{ steps.pip-cache.outputs.dir }}
3939
key: ${{ runner.os }}-pip-${{ hashFiles('**/setup.py') }}
40-
- name: Install dependencies
40+
- name: Install Xarray-Beam
4141
run: |
42-
pip install dask rechunker apache_beam zarr xarray absl-py pytest
43-
- name: Test with pytest
42+
pip install -e .[tests]
43+
- name: Run unit tests
4444
run: |
45-
pip install -e .
4645
pytest xarray_beam
46+
- name: Run example tests
47+
# The examples define some of the same flags, so we run pytest in separate processes.
48+
run: |
49+
pytest examples/era5_climatology_test.py
50+
pytest examples/era5_rechunk_test.py

Diff for: README.md

+38-38
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ Xarray-Beam is a Python library for building
77
The project aims to facilitate data transformations and analysis on large-scale
88
multi-dimensional labeled arrays, such as:
99

10-
- Ad-hoc computation on Xarray data, by dividing a `xarray.Dataset` into many
11-
smaller pieces ("chunks").
12-
- Adjusting array chunks, using the
13-
[Rechunker algorithm](https://rechunker.readthedocs.io/en/latest/algorithm.html)
14-
- Ingesting large multi-dimensional array datasets into an analysis-ready,
15-
cloud-optimized format, namely [Zarr](https://zarr.readthedocs.io/) (see also
16-
[Pangeo Forge](https://github.com/pangeo-forge/pangeo-forge-recipes))
17-
- Calculating statistics (e.g., "climatology") across distributed datasets with
18-
arbitrary groups.
10+
- Ad-hoc computation on Xarray data, by dividing a `xarray.Dataset` into many
11+
smaller pieces ("chunks").
12+
- Adjusting array chunks, using the
13+
[Rechunker algorithm](https://rechunker.readthedocs.io/en/latest/algorithm.html)
14+
- Ingesting large multi-dimensional array datasets into an analysis-ready,
15+
cloud-optimized format, namely [Zarr](https://zarr.readthedocs.io/) (see
16+
also [Pangeo Forge](https://github.com/pangeo-forge/pangeo-forge-recipes))
17+
- Calculating statistics (e.g., "climatology") across distributed datasets
18+
with arbitrary groups.
1919

2020
Xarray-Beam is implemented as a _thin layer_ on top of existing libraries for
2121
working with large-scale Xarray datasets. For example, it leverages
@@ -35,29 +35,29 @@ from early adopters, and hope to have it ready for wider audience soon.
3535
We love Dask! Xarray-Beam explores a different part of the design space for
3636
distributed data pipelines than Xarray's built-in Dask integration:
3737

38-
- Xarray-Beam is built around explicit manipulation of
39-
`(ChunkKey, xarray.Dataset)` pairs to perform operations on distributed
40-
datasets, where `ChunkKey` is an immutable dict keeping track of the offsets
41-
from the origin for a small contiguous "chunk" of a larger distributed
42-
dataset. This requires more boilerplate but is also more robust than
43-
generating distributed computation graphs in Dask using Xarray's built-in API.
44-
The user is expected to have a mental model for how their data pipeline is
45-
distributed across many machines.
46-
- Xarray-Beam distributes datasets by splitting them into many `xarray.Dataset`
47-
chunks, rather than the chunks of NumPy arrays typically used by Xarray with
48-
Dask (unless using
49-
[xarray.map_blocks](http://xarray.pydata.org/en/stable/user-guide/dask.html#automatic-parallelization-with-apply-ufunc-and-map-blocks)).
50-
Chunks of datasets is a more convenient data-model for writing ad-hoc
51-
whole dataset transformations, but is potentially a bit less efficient.
52-
- Beam ([like Spark](https://docs.dask.org/en/latest/spark.html)) was designed
53-
around a higher-level model for distributed computation than Dask (although
54-
Dask has been making
55-
[progress in this direction](https://coiled.io/blog/dask-under-the-hood-scheduler-refactor/)).
56-
Roughly speaking, this trade-off favors scalability over flexibility.
57-
- Beam allows for executing distributed computation using multiple runners,
58-
notably including Google Cloud Dataflow and Apache Spark. These runners are
59-
more mature than Dask, and in many cases are supported as a service by major
60-
commercial cloud providers.
38+
- Xarray-Beam is built around explicit manipulation of `(xarray_beam.Key,
39+
xarray.Dataset)` pairs to perform operations on distributed datasets, where
40+
`Key` is an immutable dict keeping track of the offsets from the origin for
41+
a small contiguous "chunk" of a larger distributed dataset. This requires
42+
more boilerplate but is also more robust than generating distributed
43+
computation graphs in Dask using Xarray's built-in API. The user is expected
44+
to have a mental model for how their data pipeline is distributed across
45+
many machines.
46+
- Xarray-Beam distributes datasets by splitting them into many
47+
`xarray.Dataset` chunks, rather than the chunks of NumPy arrays typically
48+
used by Xarray with Dask (unless using
49+
[xarray.map_blocks](http://xarray.pydata.org/en/stable/user-guide/dask.html#automatic-parallelization-with-apply-ufunc-and-map-blocks)).
50+
Chunks of datasets is a more convenient data-model for writing ad-hoc whole
51+
dataset transformations, but is potentially a bit less efficient.
52+
- Beam ([like Spark](https://docs.dask.org/en/latest/spark.html)) was designed
53+
around a higher-level model for distributed computation than Dask (although
54+
Dask has been making
55+
[progress in this direction](https://coiled.io/blog/dask-under-the-hood-scheduler-refactor/)).
56+
Roughly speaking, this trade-off favors scalability over flexibility.
57+
- Beam allows for executing distributed computation using multiple runners,
58+
notably including Google Cloud Dataflow and Apache Spark. These runners are
59+
more mature than Dask, and in many cases are supported as a service by major
60+
commercial cloud providers.
6161

6262
![Xarray-Beam datamodel vs Xarray-Dask](./static/xarray-beam-vs-xarray-dask.png)
6363

@@ -70,9 +70,9 @@ representation similar to that used by dask.array.
7070

7171
## Getting started
7272

73-
Xarray-Beam requires recent versions of xarray, dask, rechunker and zarr. It
74-
needs the latest release of Apache Beam (2.31.0 or later). For good performance
75-
when writing Zarr files, we strongly recommend patching Xarray with
73+
Xarray-Beam requires recent versions of immutabledict, xarray, dask, rechunker
74+
and zarr. It needs the latest release of Apache Beam (2.31.0 or later). For good
75+
performance when writing Zarr files, we strongly recommend patching Xarray with
7676
[this pull request](https://github.com/pydata/xarray/pull/5252).
7777

7878
TODO(shoyer): write a tutorial here! For now, see the test suite for examples.
@@ -90,6 +90,6 @@ See the "Contribution guidelines" for more.
9090

9191
Contributors:
9292

93-
- Stephan Hoyer
94-
- Jason Hickey
95-
- Cenk Gazen
93+
- Stephan Hoyer
94+
- Jason Hickey
95+
- Cenk Gazen

Diff for: examples/__init__.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.

Diff for: examples/era5_climatology.py

+14-9
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
"""Calculate climatology for the Pangeo ERA5 surface dataset."""
15+
from typing import Tuple
16+
1517
from absl import app
1618
from absl import flags
1719
import apache_beam as beam
1820
import numpy as np
1921
import xarray
20-
import xarray_beam
22+
import xarray_beam as xbeam
2123

2224

2325
INPUT_PATH = flags.DEFINE_string('input_path', None, help='Input Zarr path')
@@ -29,10 +31,13 @@
2931
# pylint: disable=expression-not-assigned
3032

3133

32-
def rekey_chunk_on_month_hour(key, dataset):
34+
def rekey_chunk_on_month_hour(
35+
key: xbeam.Key, dataset: xarray.Dataset,
36+
) -> Tuple[xbeam.Key, xarray.Dataset]:
37+
"""Replace the 'time' dimension with 'month'/'hour'."""
3338
month = dataset.time.dt.month.item()
3439
hour = dataset.time.dt.hour.item()
35-
new_key = key - {'time'} | {'month': month - 1, 'hour': hour}
40+
new_key = key.with_offsets(time=None, month=month - 1, hour=hour)
3641
new_dataset = (
3742
dataset
3843
.squeeze('time', drop=True)
@@ -52,24 +57,24 @@ def main(argv):
5257
# pipeline. We don't really need to supply a template here because the outputs
5358
# are small (the template argument in ChunksToZarr is optional), but it makes
5459
# the pipeline slightly more efficient.
60+
max_month = source_dataset.time.dt.month.max().item() # normally 12
5561
template = (
5662
source_dataset
5763
.isel(time=0, drop=True)
5864
.pipe(xarray.zeros_like) # don't load even time=0 into memory
59-
.expand_dims(month=np.arange(12)+1, hour=np.arange(24))
65+
.expand_dims(month=np.arange(1, max_month + 1), hour=np.arange(24))
6066
.chunk({'hour': 1, 'month': 1}) # make lazy with dask
6167
.pipe(xarray.zeros_like) # compress the dask graph
6268
)
6369

6470
with beam.Pipeline(runner=RUNNER.value, argv=argv) as root:
6571
(
6672
root
67-
| xarray_beam.DatasetToChunks(source_dataset, {'time': 31})
68-
| xarray_beam.SplitChunks({'time': 1})
73+
| xbeam.DatasetToChunks(source_dataset, {'time': 31})
74+
| xbeam.SplitChunks({'time': 1})
6975
| beam.MapTuple(rekey_chunk_on_month_hour)
70-
| xarray_beam.Mean.PerKey(dtype=np.float64) # avoid overflow
71-
| beam.MapTuple(lambda k, v: (k, v.astype(np.float32)))
72-
| xarray_beam.ChunksToZarr(OUTPUT_PATH.value, template)
76+
| xbeam.Mean.PerKey()
77+
| xbeam.ChunksToZarr(OUTPUT_PATH.value, template)
7378
)
7479

7580

Diff for: examples/era5_climatology_test.py

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Tests for era5_climatology."""
15+
16+
from absl.testing import absltest
17+
from absl.testing import flagsaver
18+
import numpy as np
19+
import pandas as pd
20+
import xarray
21+
22+
from . import era5_climatology
23+
from xarray_beam._src import test_util
24+
25+
26+
class Era5ClimatologyTest(test_util.TestCase):
27+
28+
def test(self):
29+
input_path = self.create_tempdir('source').full_path
30+
output_path = self.create_tempdir('destination').full_path
31+
32+
input_ds = test_util.dummy_era5_surface_dataset(times=90*24, freq='1H')
33+
input_ds.chunk({'time': 31}).to_zarr(input_path)
34+
35+
expected = input_ds.groupby('time.month').apply(
36+
lambda x: x.groupby('time.hour').mean('time')
37+
)
38+
39+
with flagsaver.flagsaver(
40+
input_path=input_path,
41+
output_path=output_path,
42+
):
43+
era5_climatology.main([])
44+
45+
actual = xarray.open_zarr(output_path)
46+
xarray.testing.assert_allclose(actual, expected)
47+
48+
49+
if __name__ == '__main__':
50+
absltest.main()

Diff for: examples/era5_rechunk.py

+6-15
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from absl import flags
1717
import apache_beam as beam
1818
import xarray
19-
import xarray_beam
19+
import xarray_beam as xbeam
2020

2121

2222
INPUT_PATH = flags.DEFINE_string('input_path', None, help='Input Zarr path')
@@ -34,28 +34,19 @@ def main(argv):
3434
)
3535
template = xarray.zeros_like(source_dataset.chunk())
3636
source_chunks = {'latitude': -1, 'longitude': -1, 'time': 31}
37-
split_chunks = {'latitude': 1440//8, 'longitude': -1, 'time': 31}
3837
target_chunks = {'latitude': 5, 'longitude': 5, 'time': -1}
3938

4039
with beam.Pipeline(runner=RUNNER.value, argv=argv) as root:
4140
(
4241
root
43-
| xarray_beam.DatasetToChunks(source_dataset, source_chunks)
44-
# add an intermediate splitting, because rechunker complains about
45-
# source chunks too big to fit into memory.
46-
| xarray_beam.SplitChunks(split_chunks)
47-
# TODO(shoyer): split this rechunk per data variable; it currently ends
48-
# up producing tiny intermediate chunks (50 KB), which adds significant
49-
# overhead.
50-
| xarray_beam.Rechunk(
42+
| xbeam.DatasetToChunks(source_dataset, source_chunks, split_vars=True)
43+
| xbeam.Rechunk(
5144
source_dataset.sizes,
52-
split_chunks,
45+
source_chunks,
5346
target_chunks,
54-
itemsize=len(source_dataset.data_vars) * 4,
55-
)
56-
| xarray_beam.ChunksToZarr(
57-
OUTPUT_PATH.value, template, target_chunks,
47+
itemsize=4,
5848
)
49+
| xbeam.ChunksToZarr(OUTPUT_PATH.value, template, target_chunks)
5950
)
6051

6152

Diff for: examples/era5_rechunk_test.py

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Tests for era5_rechunk."""
15+
16+
from absl.testing import absltest
17+
from absl.testing import flagsaver
18+
import numpy as np
19+
import pandas as pd
20+
import xarray
21+
22+
from . import era5_rechunk
23+
from xarray_beam._src import test_util
24+
25+
26+
class Era5RechunkTest(test_util.TestCase):
27+
28+
def test(self):
29+
input_path = self.create_tempdir('source').full_path
30+
output_path = self.create_tempdir('destination').full_path
31+
32+
input_ds = test_util.dummy_era5_surface_dataset(times=365)
33+
input_ds.chunk({'time': 31}).to_zarr(input_path)
34+
35+
with flagsaver.flagsaver(
36+
input_path=input_path,
37+
output_path=output_path,
38+
):
39+
era5_rechunk.main([])
40+
41+
output_ds = xarray.open_zarr(output_path)
42+
self.assertEqual(
43+
{k: v[0] for k, v in output_ds.chunks.items()},
44+
{'latitude': 5, 'longitude': 5, 'time': 365}
45+
)
46+
xarray.testing.assert_identical(input_ds, output_ds)
47+
48+
49+
if __name__ == '__main__':
50+
absltest.main()

Diff for: setup.py

+14-3
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,23 @@
1616
import setuptools
1717

1818

19-
base_requires = ['apache_beam>=2.31.0', 'dask', 'rechunker', 'zarr', 'xarray']
20-
tests_requires = ['absl-py', 'pytest']
19+
base_requires = [
20+
'apache_beam>=2.31.0',
21+
'dask',
22+
'immutabledict',
23+
'rechunker',
24+
'zarr',
25+
'xarray',
26+
]
27+
tests_requires = [
28+
'absl-py',
29+
'pandas',
30+
'pytest',
31+
]
2132

2233
setuptools.setup(
2334
name='xarray-beam',
24-
version='0.0.1',
35+
version='0.2.0 ',
2536
license='Apache 2.0',
2637
author='Google LLC',
2738
author_email='[email protected]',

0 commit comments

Comments
 (0)