-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
WIP: Performance improvements for zarr backend #1800
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 20 commits
a0bea98
afdb254
cc02150
86240cd
9c89ef2
2568d21
d459c66
c59ca57
2dd186a
8f71b31
07b9c21
67fcd92
b38e1a6
47ba8b6
9152b12
26b6bcb
b7681ae
e084e9e
a6aeb36
264b13f
9c03bfc
c92020a
18434f9
9f89c7c
3590d28
69cacee
8d744e0
a8dabdf
7858db7
48bf7ef
53260c9
7ed6bf8
3872da2
e6b7068
c31decf
189d262
07b92e2
96996ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ | |
import time | ||
import traceback | ||
import contextlib | ||
from collections import Mapping | ||
from collections import Mapping, OrderedDict | ||
import warnings | ||
|
||
from ..conventions import cf_encoder | ||
|
@@ -96,6 +96,9 @@ def __getitem__(self, key): | |
def __len__(self): | ||
return len(self.variables) | ||
|
||
def get_dimensions(self): # pragma: no cover | ||
raise NotImplementedError | ||
|
||
def get_attrs(self): # pragma: no cover | ||
raise NotImplementedError | ||
|
||
|
@@ -195,6 +198,19 @@ def __init__(self, writer=None): | |
writer = ArrayWriter() | ||
self.writer = writer | ||
|
||
def encode(self, variables, attributes): | ||
variables = OrderedDict([(k, self.encode_variable(v)) | ||
for k, v in variables.items()]) | ||
attributes = OrderedDict([(k, self.encode_attribute(v)) | ||
for k, v in attributes.items()]) | ||
return variables, attributes | ||
|
||
def encode_variable(self, v): | ||
return v | ||
|
||
def encode_attribute(self, a): | ||
return a | ||
|
||
def set_dimension(self, d, l): # pragma: no cover | ||
raise NotImplementedError | ||
|
||
|
@@ -216,7 +232,10 @@ def store_dataset(self, dataset): | |
|
||
def store(self, variables, attributes, check_encoding_set=frozenset(), | ||
unlimited_dims=None): | ||
variables, attributes = self.encode(variables, attributes) | ||
|
||
self.set_attributes(attributes) | ||
self.set_dimensions(variables, unlimited_dims=unlimited_dims) | ||
self.set_variables(variables, check_encoding_set, | ||
unlimited_dims=unlimited_dims) | ||
|
||
|
@@ -234,23 +253,42 @@ def set_variables(self, variables, check_encoding_set, | |
|
||
self.writer.add(source, target) | ||
|
||
def set_necessary_dimensions(self, variable, unlimited_dims=None): | ||
def set_dimensions(self, variables, unlimited_dims=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a docstring or comment explaining what this method does? This would help new developers (including myself) come onboard with backend development. |
||
if unlimited_dims is None: | ||
unlimited_dims = set() | ||
dims = self.get_dimensions() | ||
for d, l in zip(variable.dims, variable.shape): | ||
if d not in dims: | ||
|
||
existing_dims = self.get_dimensions() | ||
|
||
dims = OrderedDict() | ||
for v in unlimited_dims: # put unlimited_dims first | ||
dims[v] = None | ||
for v in variables.values(): | ||
dims.update(dict(zip(v.dims, v.shape))) | ||
|
||
for d, l in dims.items(): | ||
|
||
if d in existing_dims and l != existing_dims[d]: | ||
raise ValueError("Unable to update size for existing dimension" | ||
"%r (%d != %d)" % (d, l, existing_dims[d])) | ||
elif d not in existing_dims: | ||
is_unlimited = d in unlimited_dims | ||
self.set_dimension(d, l, is_unlimited) | ||
|
||
|
||
class WritableCFDataStore(AbstractWritableDataStore): | ||
|
||
def store(self, variables, attributes, *args, **kwargs): | ||
def encode(self, variables, attributes): | ||
# All NetCDF files get CF encoded by default, without this attempting | ||
# to write times, for example, would fail. | ||
cf_variables, cf_attrs = cf_encoder(variables, attributes) | ||
AbstractWritableDataStore.store(self, cf_variables, cf_attrs, | ||
variables, attributes = cf_encoder(variables, attributes) | ||
variables = OrderedDict([(k, self.encode_variable(v)) | ||
for k, v in variables.items()]) | ||
attributes = OrderedDict([(k, self.encode_attribute(v)) | ||
for k, v in attributes.items()]) | ||
return variables, attributes | ||
|
||
def store(self, variables, attributes, *args, **kwargs): | ||
AbstractWritableDataStore.store(self, variables, attributes, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can remove this method -- the implementation is exactly the parent class method. |
||
*args, **kwargs) | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,13 +43,8 @@ def _ensure_valid_fill_value(value, dtype): | |
return _encode_zarr_attr_value(valid) | ||
|
||
|
||
def _decode_zarr_attr_value(value): | ||
return value | ||
|
||
|
||
def _decode_zarr_attrs(attrs): | ||
return OrderedDict([(k, _decode_zarr_attr_value(v)) | ||
for k, v in attrs.items()]) | ||
return OrderedDict(attrs) | ||
|
||
|
||
def _replace_slices_with_arrays(key, shape): | ||
|
@@ -297,9 +292,6 @@ def __init__(self, zarr_group, writer=None): | |
raise KeyError("Zarr group can't be read by xarray because " | ||
"it is missing the `%s` attribute." % | ||
_DIMENSION_KEY) | ||
else: | ||
# initialize hidden dimension attribute | ||
self.ds.attrs[_DIMENSION_KEY] = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the removal of these lines is relevant for the dimension key test error. |
||
|
||
if writer is None: | ||
# by default, we should not need a lock for writing zarr because | ||
|
@@ -331,29 +323,37 @@ def get_variables(self): | |
for k, v in self.ds.arrays()) | ||
|
||
def get_attrs(self): | ||
_, attributes = _get_zarr_dims_and_attrs(self.ds, _DIMENSION_KEY) | ||
attributes = HiddenKeyDict(self.ds.attrs.asdict(), [_DIMENSION_KEY]) | ||
return _decode_zarr_attrs(attributes) | ||
|
||
def get_dimensions(self): | ||
dimensions, _ = _get_zarr_dims_and_attrs(self.ds, _DIMENSION_KEY) | ||
try: | ||
dimensions = self.ds.attrs[_DIMENSION_KEY].asdict() | ||
except KeyError: | ||
raise KeyError("Zarr object is missing the attribute `%s`, which " | ||
"is required for xarray to determine variable " | ||
"dimensions." % (_DIMENSION_KEY)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be raising when you delete the dimension key. |
||
return dimensions | ||
|
||
def set_dimension(self, name, length, is_unlimited=False): | ||
if is_unlimited: | ||
def set_dimensions(self, variables, unlimited_dims=None): | ||
if unlimited_dims is not None: | ||
raise NotImplementedError( | ||
"Zarr backend doesn't know how to handle unlimited dimensions") | ||
# consistency check | ||
if name in self.ds.attrs[_DIMENSION_KEY]: | ||
if self.ds.attrs[_DIMENSION_KEY][name] != length: | ||
raise ValueError("Pre-existing array dimensions %r " | ||
"encoded in Zarr attributes are incompatible " | ||
"with newly specified dimension `%s`: %g" % | ||
(self.ds.attrs[_DIMENSION_KEY], name, length)) | ||
self.ds.attrs[_DIMENSION_KEY][name] = length | ||
|
||
def set_attribute(self, key, value): | ||
_, attributes = _get_zarr_dims_and_attrs(self.ds, _DIMENSION_KEY) | ||
attributes[key] = _encode_zarr_attr_value(value) | ||
|
||
dims = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make this an |
||
for v in variables.values(): | ||
dims.update(dict(zip(v.dims, v.shape))) | ||
|
||
self.ds.attrs.update({_DIMENSION_KEY: dims}) | ||
|
||
def set_attributes(self, attributes): | ||
encoded_attrs = OrderedDict((k, _encode_zarr_attr_value(v)) | ||
for k, v in iteritems(attributes)) | ||
self.ds.attrs.put(encoded_attrs) | ||
|
||
def encode_variable(self, variable): | ||
variable = encode_zarr_variable(variable) | ||
return variable | ||
|
||
def prepare_variable(self, name, variable, check_encoding=False, | ||
unlimited_dims=None): | ||
|
@@ -363,72 +363,27 @@ def prepare_variable(self, name, variable, check_encoding=False, | |
dtype = variable.dtype | ||
shape = variable.shape | ||
|
||
# TODO: figure out how zarr should deal with unlimited dimensions | ||
self.set_necessary_dimensions(variable, unlimited_dims=unlimited_dims) | ||
|
||
fill_value = _ensure_valid_fill_value(attrs.pop('_FillValue', None), | ||
dtype) | ||
|
||
# TODO: figure out what encoding is needed for zarr | ||
encoding = _extract_zarr_variable_encoding( | ||
variable, raise_on_invalid=check_encoding) | ||
|
||
# arguments for zarr.create: | ||
# zarr.creation.create(shape, chunks=None, dtype=None, | ||
# compressor='default', fill_value=0, order='C', store=None, | ||
# synchronizer=None, overwrite=False, path=None, chunk_store=None, | ||
# filters=None, cache_metadata=True, **kwargs) | ||
if name in self.ds: | ||
zarr_array = self.ds[name] | ||
else: | ||
zarr_array = self.ds.create(name, shape=shape, dtype=dtype, | ||
fill_value=fill_value, **encoding) | ||
# decided not to explicity enumerate encoding options because we | ||
# risk overriding zarr's defaults (e.g. if we specificy | ||
# cache_metadata=None instead of True). Alternative is to have lots of | ||
# logic in _extract_zarr_variable encoding to duplicate zarr defaults. | ||
# chunks=encoding.get('chunks'), | ||
# compressor=encoding.get('compressor'), | ||
# filters=encodings.get('filters'), | ||
# cache_metadata=encoding.get('cache_metadata')) | ||
|
||
encoded_attrs = OrderedDict() | ||
# the magic for storing the hidden dimension data | ||
zarr_array.attrs[_DIMENSION_KEY] = dims | ||
_, attributes = _get_zarr_dims_and_attrs(zarr_array, _DIMENSION_KEY) | ||
|
||
encoded_attrs[_DIMENSION_KEY] = dims | ||
for k, v in iteritems(attrs): | ||
attributes[k] = _encode_zarr_attr_value(v) | ||
encoded_attrs[k] = _encode_zarr_attr_value(v) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe leave this as a #TODO? |
||
|
||
zarr_array = self.ds.create(name, shape=shape, dtype=dtype, | ||
fill_value=fill_value, **encoding) | ||
zarr_array.attrs.put(encoded_attrs) | ||
|
||
return zarr_array, variable.data | ||
|
||
def store(self, variables, attributes, *args, **kwargs): | ||
new_vars = OrderedDict((k, encode_zarr_variable(v, name=k)) | ||
for k, v in iteritems(variables)) | ||
AbstractWritableDataStore.store(self, new_vars, attributes, | ||
AbstractWritableDataStore.store(self, variables, attributes, | ||
*args, **kwargs) | ||
# sync() and close() methods should not be needed with zarr | ||
|
||
|
||
# from zarr docs | ||
|
||
# Zarr arrays can be used as either the source or sink for data in parallel | ||
# computations. Both multi-threaded and multi-process parallelism are | ||
# supported. The Python global interpreter lock (GIL) is released for both | ||
# compression and decompression operations, so Zarr will not block other Python | ||
# threads from running. | ||
# | ||
# A Zarr array can be read concurrently by multiple threads or processes. No | ||
# synchronization (i.e., locking) is required for concurrent reads. | ||
# | ||
# A Zarr array can also be written to concurrently by multiple threads or | ||
# processes. Some synchronization may be required, depending on the way the | ||
# data is being written. | ||
|
||
# If each worker in a parallel computation is writing to a separate region of | ||
# the array, and if region boundaries are perfectly aligned with chunk | ||
# boundaries, then no synchronization is required. However, if region and chunk | ||
# boundaries are not perfectly aligned, then synchronization is required to | ||
# avoid two workers attempting to modify the same chunk at the same time. | ||
|
||
|
||
def open_zarr(store, group=None, synchronizer=None, auto_chunk=True, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is definitely cleaner than what we had before, but I am reluctant to give the idea that this this is a new supported method for third-party DataStore classes. Maybe we can call this
_encode
for now, or add a warning about implementing it?Eventually, I would like to remove all implementations from the DataStore base classes, and leave them as purely abstract. This will make it clearer to new backend implementers what they actually should/can implement.
So instead of implementing an
encode()
method, data store classes could have a list of defaultencoders
(see xarray.coding) used when reading/writing data. But xarray.coding isn't quite ready for this yet...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearly some refactoring will be needed to this once the overall backend refactoring moves forward. For now, however, this seems like a reasonable compromise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shoyer - I'm a bit confused here. As you'll see in the Zarr backend, the
encode_variable
method is applying a list of encoders. Where in theWritableDataStore
were you envisioning the application of the encoders?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shoyer - What would you say to merging this in its current state and leaving the encoders refactor to a separate PR? I'm happy to make more changes here but a) I'm not sure how to address your last comment, and b) I've already drifted a fair ways off track with this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with that. We may want to change it more in the future but this is a clear improvement for now.