diff --git a/.github/actions/setup-deps/action.yaml b/.github/actions/setup-deps/action.yaml index 57657d8131e..5c203dbeb30 100644 --- a/.github/actions/setup-deps/action.yaml +++ b/.github/actions/setup-deps/action.yaml @@ -20,6 +20,8 @@ inputs: default: 'codecov' cython: default: 'cython' + fasteners: + default: 'fasteners' griddataformats: default: 'griddataformats' gsd: @@ -90,6 +92,7 @@ runs: ${{ inputs.biopython }} ${{ inputs.codecov }} ${{ inputs.cython }} + ${{ inputs.fasteners }} ${{ inputs.griddataformats }} ${{ inputs.gsd }} ${{ inputs.hypothesis }} diff --git a/.gitignore b/.gitignore index 70f1106a6a2..0760f8129e4 100644 --- a/.gitignore +++ b/.gitignore @@ -50,3 +50,4 @@ benchmarks/results *~ .idea .vscode +*.lock diff --git a/.travis.yml b/.travis.yml index ca7e5a48cf1..cfa0f516beb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,7 +25,7 @@ matrix: - conda activate base - conda update --yes conda - conda install --yes pip pytest==6.1.2 mmtf-python biopython networkx cython matplotlib-base "scipy>=1.6" griddataformats hypothesis gsd codecov -c conda-forge -c biobuilds - - pip install pytest-xdist packaging threadpoolctl + - pip install pytest-xdist packaging threadpoolctl fasteners - conda info install: - (cd package/ && python setup.py develop) && (cd testsuite/ && python setup.py install) @@ -45,7 +45,7 @@ matrix: if: type = cron before_install: - python -m pip install cython "numpy>=1.19.2" scipy - - python -m pip install --no-build-isolation hypothesis matplotlib packaging pytest pytest-cov pytest-xdist tqdm threadpoolctl + - python -m pip install --no-build-isolation hypothesis matplotlib packaging pytest pytest-cov pytest-xdist tqdm threadpoolctl fasteners install: - cd package - python setup.py install diff --git a/azure-pipelines.yml b/azure-pipelines.yml index b57f545e18f..d28a8adf51b 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -95,6 +95,7 @@ jobs: h5py tqdm threadpoolctl + fasteners displayName: 'Install dependencies' # for wheel install testing, we pin to an # older NumPy, the oldest version we support and that diff --git a/maintainer/conda/environment.yml b/maintainer/conda/environment.yml index f9756fa77b6..ad3d10da4ed 100644 --- a/maintainer/conda/environment.yml +++ b/maintainer/conda/environment.yml @@ -6,6 +6,7 @@ dependencies: - chemfiles>=0.10 - codecov - cython + - fasteners - griddataformats - gsd - h5py diff --git a/package/CHANGELOG b/package/CHANGELOG index 8a5a5d207a8..8c0581f3933 100644 --- a/package/CHANGELOG +++ b/package/CHANGELOG @@ -16,11 +16,15 @@ The rules for this file: ??/??/?? IAlibay, BFedder, inomag, Agorfa, aya9aladdin, shudipto-amin, cbouy, HenokB, umak1106, tamandeeps, Mrqeoqqt, megosato, AnirG, rishu235, mtiberti, manishsaini6421, Sukeerti1, robotjellyzone, markvrma, alescoulie, - mjtadema, PicoCentauri, Atharva7K, aditi2906, orbeckst + mjtadema, PicoCentauri, Atharva7K, aditi2906, orbeckst, yuxuanzhuang * 2.2.0 Fixes + * Fix the competition when generating offsets from multiple processes by + using a interprocess file lock. (Issue #1988, PR #3375) + * Fix the error when trying to load a corrupted offset file (Issue #3230 + PR #3375) * Iterating a SingleFrameReaderBase trajectory now rewinds the reader, resetting any modified trajectory attributes to those read from file. Fixes behavior that was previously undefined (Issue #3423) @@ -71,6 +75,7 @@ Enhancements keyword (Issue #3605) Changes + * `fasteners` package is now a core dependency (Issues #3230, #1988, PR #3375) * The minimm Python, NumPy, and SciPy versions (following NEP29) are now 3.8, 1.19, and 1.5.0; GSD now also has a minimum version of 1.9.3 and networkx has minimum version 2.0 (Issue #3665) diff --git a/package/MDAnalysis/coordinates/XDR.py b/package/MDAnalysis/coordinates/XDR.py index da348b032c9..e3109e413ae 100644 --- a/package/MDAnalysis/coordinates/XDR.py +++ b/package/MDAnalysis/coordinates/XDR.py @@ -38,13 +38,15 @@ import numpy as np from os.path import getctime, getsize, isfile, split, join import warnings +import fasteners from . import base from ..lib.mdamath import triclinic_box def offsets_filename(filename, ending='npz'): - """Return offset filename for XDR files. For this the filename is appended + """Return offset or its lock filename for XDR files. + For this the filename is appended with `_offsets.{ending}`. Parameters @@ -82,7 +84,9 @@ def read_numpy_offsets(filename): """ try: return {k: v for k, v in np.load(filename).items()} - except IOError: + + # `ValueError` is encountered when the offset file is corrupted. + except (ValueError, IOError): warnings.warn("Failed to load offsets file {}\n".format(filename)) return False @@ -113,6 +117,8 @@ class XDRBaseReader(base.ReaderBase): .. versionchanged:: 1.0.0 XDR offsets read from trajectory if offsets file read-in fails + .. versionchanged:: 2.0.0 + Add a InterProcessLock when generating offsets """ def __init__(self, filename, convert_units=True, sub=None, @@ -179,53 +185,68 @@ def close(self): def _load_offsets(self): """load frame offsets from file, reread them from the trajectory if that - fails""" + fails. To prevent the competition of generating the same offset file + from multiple processes, an `InterProcessLock` is used.""" fname = offsets_filename(self.filename) + lock_name = offsets_filename(self.filename, + ending='lock') - if not isfile(fname): - self._read_offsets(store=True) - return - - # if offsets file read correctly, data will be a dictionary of offsets - # if not, data will be False - # if False, offsets should be read from the trajectory - # this warning can be avoided by loading Universe like: - # u = mda.Universe(data.TPR, data.XTC, refresh_offsets=True) - # refer to Issue #1893 - data = read_numpy_offsets(fname) - if not data: - warnings.warn("Reading offsets from {} failed, " - "reading offsets from trajectory instead\n" - "Consider setting 'refresh_offsets=True' " - "when loading your Universe".format(fname)) + # check if the location of the lock is writable. + try: + with fasteners.InterProcessLock(lock_name) as filelock: + pass + except PermissionError: + warnings.warn(f"Cannot write lock/offset file in same location as " + "{self.filename}. Using slow offset calculation.") self._read_offsets(store=True) return - ctime_ok = size_ok = n_atoms_ok = False + with fasteners.InterProcessLock(lock_name) as filelock: + if not isfile(fname): + self._read_offsets(store=True) + return + + # if offsets file read correctly, data will be a dictionary of offsets + # if not, data will be False + # if False, offsets should be read from the trajectory + # this warning can be avoided by loading Universe like: + # u = mda.Universe(data.TPR, data.XTC, refresh_offsets=True) + # refer to Issue #1893 + data = read_numpy_offsets(fname) + if not data: + warnings.warn("Reading offsets from {} failed, " + "reading offsets from trajectory instead.\n" + "Consider setting 'refresh_offsets=True' " + "when loading your Universe.".format(fname)) + self._read_offsets(store=True) + return + + ctime_ok = size_ok = n_atoms_ok = False - try: - ctime_ok = getctime(self.filename) == data['ctime'] - size_ok = getsize(self.filename) == data['size'] - n_atoms_ok = self._xdr.n_atoms == data['n_atoms'] - except KeyError: - # we tripped over some old offset formated file - pass - - if not (ctime_ok and size_ok and n_atoms_ok): - warnings.warn("Reload offsets from trajectory\n " - "ctime or size or n_atoms did not match") - self._read_offsets(store=True) - else: - self._xdr.set_offsets(data['offsets']) + try: + ctime_ok = getctime(self.filename) == data['ctime'] + size_ok = getsize(self.filename) == data['size'] + n_atoms_ok = self._xdr.n_atoms == data['n_atoms'] + except KeyError: + # we tripped over some old offset formated file + pass + + if not (ctime_ok and size_ok and n_atoms_ok): + warnings.warn("Reload offsets from trajectory\n " + "ctime or size or n_atoms did not match") + self._read_offsets(store=True) + else: + self._xdr.set_offsets(data['offsets']) def _read_offsets(self, store=False): """read frame offsets from trajectory""" + fname = offsets_filename(self.filename) offsets = self._xdr.offsets if store: ctime = getctime(self.filename) size = getsize(self.filename) try: - np.savez(offsets_filename(self.filename), + np.savez(fname, offsets=offsets, size=size, ctime=ctime, n_atoms=self._xdr.n_atoms) except Exception as e: diff --git a/package/requirements.txt b/package/requirements.txt index 6134a5e6a01..214eb995dd8 100644 --- a/package/requirements.txt +++ b/package/requirements.txt @@ -1,6 +1,7 @@ biopython codecov cython +fasteners griddataformats gsd hypothesis diff --git a/package/setup.py b/package/setup.py index 0ce697e5076..79d2d626752 100755 --- a/package/setup.py +++ b/package/setup.py @@ -601,6 +601,7 @@ def long_description(readme): 'tqdm>=4.43.0', 'threadpoolctl', 'packaging', + 'fasteners', 'gsd>=1.9.3', ] diff --git a/testsuite/MDAnalysisTests/coordinates/test_xdr.py b/testsuite/MDAnalysisTests/coordinates/test_xdr.py index 1ee505f415c..236db03cbda 100644 --- a/testsuite/MDAnalysisTests/coordinates/test_xdr.py +++ b/testsuite/MDAnalysisTests/coordinates/test_xdr.py @@ -26,6 +26,7 @@ import errno import numpy as np import os +from os.path import split import shutil import subprocess @@ -741,15 +742,24 @@ def test_offsets(self, trajectory, traj): def test_reload_offsets(self, traj): self._reader(traj, refresh_offsets=True) - def test_nonexistant_offsets_file(self, traj): - # assert that a nonexistant file returns False during read-in + def test_nonexistent_offsets_file(self, traj): + # assert that a nonexistent file returns False during read-in outfile_offsets = XDR.offsets_filename(traj) with patch.object(np, "load") as np_load_mock: np_load_mock.side_effect = IOError saved_offsets = XDR.read_numpy_offsets(outfile_offsets) assert_equal(saved_offsets, False) - def test_reload_offsets_if_offsets_readin_fails(self, trajectory): + def test_nonexistent_offsets_file(self, traj): + # assert that a corrupted file returns False during read-in + # Issue #3230 + outfile_offsets = XDR.offsets_filename(traj) + with patch.object(np, "load") as np_load_mock: + np_load_mock.side_effect = ValueError + saved_offsets = XDR.read_numpy_offsets(outfile_offsets) + assert_equal(saved_offsets, False) + + def test_reload_offsets_if_offsets_readin_io_fails(self, trajectory): # force the np.load call that is called in read_numpy_offsets # during _load_offsets to give an IOError # ensure that offsets are then read-in from the trajectory @@ -761,6 +771,19 @@ def test_reload_offsets_if_offsets_readin_fails(self, trajectory): self.ref_offsets, err_msg="error loading frame offsets") + def test_reload_offsets_if_offsets_readin_value_fails(self, trajectory): + # force the np.load call that is called in read_numpy_offsets + # during _load_offsets to give an ValueError (Issue #3230) + # ensure that offsets are then read-in from the trajectory + with patch.object(np, "load") as np_load_mock: + np_load_mock.side_effect = ValueError + with pytest.warns(UserWarning, match="Failed to load offsets"): + trajectory._load_offsets() + assert_almost_equal( + trajectory._xdr.offsets, + self.ref_offsets, + err_msg="error loading frame offsets") + def test_persistent_offsets_size_mismatch(self, traj): # check that stored offsets are not loaded when trajectory # size differs from stored size @@ -850,8 +873,17 @@ def test_persistent_offsets_readonly(self, tmpdir): filename = str(tmpdir.join(os.path.basename(self.filename))) # try to write a offsets file - self._reader(filename) + with (pytest.warns(UserWarning, match="Couldn't save offsets") and + pytest.warns(UserWarning, match="Cannot write")): + self._reader(filename) assert_equal(os.path.exists(XDR.offsets_filename(filename)), False) + # check the lock file is not created as well. + assert_equal(os.path.exists(XDR.offsets_filename(filename, + ending='.lock')), False) + + def test_offset_lock_created(self): + assert os.path.exists(XDR.offsets_filename(self.filename, + ending='lock')) class TestXTCReader_offsets(_GromacsReader_offsets): diff --git a/testsuite/MDAnalysisTests/parallelism/test_multiprocessing.py b/testsuite/MDAnalysisTests/parallelism/test_multiprocessing.py index 12e63ba31b5..4df5c8d0938 100644 --- a/testsuite/MDAnalysisTests/parallelism/test_multiprocessing.py +++ b/testsuite/MDAnalysisTests/parallelism/test_multiprocessing.py @@ -23,11 +23,14 @@ import multiprocessing import numpy as np +import os +import shutil import pytest import pickle from numpy.testing import assert_equal import MDAnalysis as mda +import MDAnalysis.coordinates from MDAnalysis.coordinates.core import get_reader_for from MDAnalysis.analysis.rms import RMSD @@ -85,6 +88,19 @@ def u(request): top, trj = request.param return mda.Universe(top, trj) +@pytest.fixture(scope="function") +def temp_xtc(tmp_path): + fresh_xtc = tmp_path / "testing.xtc" + shutil.copy(XTC, fresh_xtc) + # In principle there is no possibility that the offset for + # fresh_xtc exists as it is a fresh copy for every test. However, + # the code is left for documentation. + try: + os.remove(MDAnalysis.coordinates.XDR.offsets_filename(fresh_xtc)) + except OSError: + pass + return fresh_xtc + # Define target functions here # inside test functions doesn't work @@ -125,6 +141,23 @@ def test_universe_unpickle_in_new_process(): assert_equal(ref, res) +def test_creating_multiple_universe_without_offset(temp_xtc, ncopies=3): + # test if they can be created without generating + # the offset simultaneously. + # The tested XTC file is way too short to induce a race scenario but the + # test is included as documentation for the scenario that used to create + # a problem (see PR #3375 and issues #3230, #1988) + + args = (GRO, str(temp_xtc)) + with multiprocessing.Pool(2) as p: + universes = [p.apply_async(mda.Universe, args) for i in range(ncopies)] + universes = [universe.get() for universe in universes] + + + assert_equal(universes[0].trajectory._xdr.offsets, + universes[1].trajectory._xdr.offsets) + + @pytest.fixture(params=[ # formatname, filename ('CRD', CRD, dict()),