From 2258217a8c4dc1133b1a2145f762c28668d142dc Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Wed, 21 Dec 2016 19:48:47 -0700 Subject: [PATCH] Switch to shared Lock (SerializableLock if possible) for reading and writing Fixes #1172 The serializable lock will be useful for dask.distributed or multi-processing (xref #798, #1173, among others). --- doc/whats-new.rst | 6 +++++- xarray/backends/api.py | 10 +++------- xarray/backends/common.py | 14 +++++++++++--- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 05828051b6d..edbc8ae599b 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -56,7 +56,7 @@ Breaking changes By `Guido Imperiale `_ and `Stephan Hoyer `_. - Pickling a ``Dataset`` or ``DataArray`` linked to a file on disk no longer - caches its values into memory before pickling :issue:`1128`. Instead, pickle + caches its values into memory before pickling (:issue:`1128`). Instead, pickle stores file paths and restores objects by reopening file references. This enables preliminary, experimental use of xarray for opening files with `dask.distributed `_. @@ -206,6 +206,10 @@ Bug fixes - Fixed a bug with facetgrid (the ``norm`` keyword was ignored, :issue:`1159`). By `Fabien Maussion `_. +- Resolved a concurrency bug that could cause Python to crash when + simultaneously reading and writing netCDF4 files with dask (:issue:`1172`). + By `Stephan Hoyer `_. + .. _whats-new.0.8.2: v0.8.2 (18 August 2016) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index bc2afa4b373..c69fc63acec 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -3,7 +3,6 @@ from __future__ import print_function import gzip import os.path -import threading from distutils.version import StrictVersion from glob import glob from io import BytesIO @@ -12,7 +11,7 @@ import numpy as np from .. import backends, conventions -from .common import ArrayWriter +from .common import ArrayWriter, GLOBAL_LOCK from ..core import indexing from ..core.combine import auto_combine from ..core.utils import close_on_error, is_remote_uri @@ -55,9 +54,6 @@ def _normalize_path(path): return os.path.abspath(os.path.expanduser(path)) -_global_lock = threading.Lock() - - def _default_lock(filename, engine): if filename.endswith('.gz'): lock = False @@ -71,9 +67,9 @@ def _default_lock(filename, engine): else: # TODO: identify netcdf3 files and don't use the global lock # for them - lock = _global_lock + lock = GLOBAL_LOCK elif engine in {'h5netcdf', 'pynio'}: - lock = _global_lock + lock = GLOBAL_LOCK else: lock = False return lock diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 27291e65e3a..a7cce03a33a 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -2,7 +2,6 @@ from __future__ import division from __future__ import print_function import numpy as np -import itertools import logging import time import traceback @@ -12,7 +11,12 @@ from ..conventions import cf_encoder from ..core.utils import FrozenOrderedDict -from ..core.pycompat import iteritems, dask_array_type, OrderedDict +from ..core.pycompat import iteritems, dask_array_type + +try: + from dask.utils import SerializableLock as Lock +except ImportError: + from threading import Lock # Create a logger object, but don't add any handlers. Leave that to user code. logger = logging.getLogger(__name__) @@ -21,6 +25,10 @@ NONE_VAR_NAME = '__values__' +# dask.utils.SerializableLock if available, otherwise just a threading.Lock +GLOBAL_LOCK = Lock() + + def _encode_variable_name(name): if name is None: name = NONE_VAR_NAME @@ -150,7 +158,7 @@ def sync(self): import dask.array as da import dask if StrictVersion(dask.__version__) > StrictVersion('0.8.1'): - da.store(self.sources, self.targets, lock=threading.Lock()) + da.store(self.sources, self.targets, lock=GLOBAL_LOCK) else: da.store(self.sources, self.targets) self.sources = []