Skip to content

Commit

Permalink
On loading the same XTC offset in parallel (#3375)
Browse files Browse the repository at this point in the history
Fixes #3230, #1988

## Work done in this PR
* Add a fasteners.InterProcessLock for loading the offset for XDR files
* Add a ValueError exception for corrupted npz file (fix Error loading XDR files with corrupted offset files #3230)
* Adds fastener as a new core dependency
  • Loading branch information
yuxuanzhuang authored May 16, 2022
1 parent 82d1827 commit 174080c
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 42 deletions.
3 changes: 3 additions & 0 deletions .github/actions/setup-deps/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ inputs:
default: 'codecov'
cython:
default: 'cython'
fasteners:
default: 'fasteners'
griddataformats:
default: 'griddataformats'
gsd:
Expand Down Expand Up @@ -90,6 +92,7 @@ runs:
${{ inputs.biopython }}
${{ inputs.codecov }}
${{ inputs.cython }}
${{ inputs.fasteners }}
${{ inputs.griddataformats }}
${{ inputs.gsd }}
${{ inputs.hypothesis }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ benchmarks/results
*~
.idea
.vscode
*.lock
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ matrix:
- conda activate base
- conda update --yes conda
- conda install --yes pip pytest==6.1.2 mmtf-python biopython networkx cython matplotlib-base "scipy>=1.6" griddataformats hypothesis gsd codecov -c conda-forge -c biobuilds
- pip install pytest-xdist packaging threadpoolctl
- pip install pytest-xdist packaging threadpoolctl fasteners
- conda info
install:
- (cd package/ && python setup.py develop) && (cd testsuite/ && python setup.py install)
Expand All @@ -45,7 +45,7 @@ matrix:
if: type = cron
before_install:
- python -m pip install cython "numpy>=1.19.2" scipy
- python -m pip install --no-build-isolation hypothesis matplotlib packaging pytest pytest-cov pytest-xdist tqdm threadpoolctl
- python -m pip install --no-build-isolation hypothesis matplotlib packaging pytest pytest-cov pytest-xdist tqdm threadpoolctl fasteners
install:
- cd package
- python setup.py install
Expand Down
1 change: 1 addition & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ jobs:
h5py
tqdm
threadpoolctl
fasteners
displayName: 'Install dependencies'
# for wheel install testing, we pin to an
# older NumPy, the oldest version we support and that
Expand Down
1 change: 1 addition & 0 deletions maintainer/conda/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies:
- chemfiles>=0.10
- codecov
- cython
- fasteners
- griddataformats
- gsd
- h5py
Expand Down
7 changes: 6 additions & 1 deletion package/CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ The rules for this file:
??/??/?? IAlibay, BFedder, inomag, Agorfa, aya9aladdin, shudipto-amin, cbouy,
HenokB, umak1106, tamandeeps, Mrqeoqqt, megosato, AnirG, rishu235,
mtiberti, manishsaini6421, Sukeerti1, robotjellyzone, markvrma, alescoulie,
mjtadema, PicoCentauri, Atharva7K, aditi2906, orbeckst
mjtadema, PicoCentauri, Atharva7K, aditi2906, orbeckst, yuxuanzhuang

* 2.2.0

Fixes
* Fix the competition when generating offsets from multiple processes by
using a interprocess file lock. (Issue #1988, PR #3375)
* Fix the error when trying to load a corrupted offset file (Issue #3230
PR #3375)
* Iterating a SingleFrameReaderBase trajectory now rewinds the
reader, resetting any modified trajectory attributes to those
read from file. Fixes behavior that was previously undefined (Issue #3423)
Expand Down Expand Up @@ -71,6 +75,7 @@ Enhancements
keyword (Issue #3605)

Changes
* `fasteners` package is now a core dependency (Issues #3230, #1988, PR #3375)
* The minimm Python, NumPy, and SciPy versions (following NEP29) are now 3.8,
1.19, and 1.5.0; GSD now also has a minimum version of 1.9.3 and networkx
has minimum version 2.0 (Issue #3665)
Expand Down
91 changes: 56 additions & 35 deletions package/MDAnalysis/coordinates/XDR.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
import numpy as np
from os.path import getctime, getsize, isfile, split, join
import warnings
import fasteners

from . import base
from ..lib.mdamath import triclinic_box


def offsets_filename(filename, ending='npz'):
"""Return offset filename for XDR files. For this the filename is appended
"""Return offset or its lock filename for XDR files.
For this the filename is appended
with `_offsets.{ending}`.
Parameters
Expand Down Expand Up @@ -82,7 +84,9 @@ def read_numpy_offsets(filename):
"""
try:
return {k: v for k, v in np.load(filename).items()}
except IOError:

# `ValueError` is encountered when the offset file is corrupted.
except (ValueError, IOError):
warnings.warn("Failed to load offsets file {}\n".format(filename))
return False

Expand Down Expand Up @@ -113,6 +117,8 @@ class XDRBaseReader(base.ReaderBase):
.. versionchanged:: 1.0.0
XDR offsets read from trajectory if offsets file read-in fails
.. versionchanged:: 2.0.0
Add a InterProcessLock when generating offsets
"""
def __init__(self, filename, convert_units=True, sub=None,
Expand Down Expand Up @@ -179,53 +185,68 @@ def close(self):

def _load_offsets(self):
"""load frame offsets from file, reread them from the trajectory if that
fails"""
fails. To prevent the competition of generating the same offset file
from multiple processes, an `InterProcessLock` is used."""
fname = offsets_filename(self.filename)
lock_name = offsets_filename(self.filename,
ending='lock')

if not isfile(fname):
self._read_offsets(store=True)
return

# if offsets file read correctly, data will be a dictionary of offsets
# if not, data will be False
# if False, offsets should be read from the trajectory
# this warning can be avoided by loading Universe like:
# u = mda.Universe(data.TPR, data.XTC, refresh_offsets=True)
# refer to Issue #1893
data = read_numpy_offsets(fname)
if not data:
warnings.warn("Reading offsets from {} failed, "
"reading offsets from trajectory instead\n"
"Consider setting 'refresh_offsets=True' "
"when loading your Universe".format(fname))
# check if the location of the lock is writable.
try:
with fasteners.InterProcessLock(lock_name) as filelock:
pass
except PermissionError:
warnings.warn(f"Cannot write lock/offset file in same location as "
"{self.filename}. Using slow offset calculation.")
self._read_offsets(store=True)
return

ctime_ok = size_ok = n_atoms_ok = False
with fasteners.InterProcessLock(lock_name) as filelock:
if not isfile(fname):
self._read_offsets(store=True)
return

# if offsets file read correctly, data will be a dictionary of offsets
# if not, data will be False
# if False, offsets should be read from the trajectory
# this warning can be avoided by loading Universe like:
# u = mda.Universe(data.TPR, data.XTC, refresh_offsets=True)
# refer to Issue #1893
data = read_numpy_offsets(fname)
if not data:
warnings.warn("Reading offsets from {} failed, "
"reading offsets from trajectory instead.\n"
"Consider setting 'refresh_offsets=True' "
"when loading your Universe.".format(fname))
self._read_offsets(store=True)
return

ctime_ok = size_ok = n_atoms_ok = False

try:
ctime_ok = getctime(self.filename) == data['ctime']
size_ok = getsize(self.filename) == data['size']
n_atoms_ok = self._xdr.n_atoms == data['n_atoms']
except KeyError:
# we tripped over some old offset formated file
pass

if not (ctime_ok and size_ok and n_atoms_ok):
warnings.warn("Reload offsets from trajectory\n "
"ctime or size or n_atoms did not match")
self._read_offsets(store=True)
else:
self._xdr.set_offsets(data['offsets'])
try:
ctime_ok = getctime(self.filename) == data['ctime']
size_ok = getsize(self.filename) == data['size']
n_atoms_ok = self._xdr.n_atoms == data['n_atoms']
except KeyError:
# we tripped over some old offset formated file
pass

if not (ctime_ok and size_ok and n_atoms_ok):
warnings.warn("Reload offsets from trajectory\n "
"ctime or size or n_atoms did not match")
self._read_offsets(store=True)
else:
self._xdr.set_offsets(data['offsets'])

def _read_offsets(self, store=False):
"""read frame offsets from trajectory"""
fname = offsets_filename(self.filename)
offsets = self._xdr.offsets
if store:
ctime = getctime(self.filename)
size = getsize(self.filename)
try:
np.savez(offsets_filename(self.filename),
np.savez(fname,
offsets=offsets, size=size, ctime=ctime,
n_atoms=self._xdr.n_atoms)
except Exception as e:
Expand Down
1 change: 1 addition & 0 deletions package/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
biopython
codecov
cython
fasteners
griddataformats
gsd
hypothesis
Expand Down
1 change: 1 addition & 0 deletions package/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ def long_description(readme):
'tqdm>=4.43.0',
'threadpoolctl',
'packaging',
'fasteners',
'gsd>=1.9.3',
]

Expand Down
40 changes: 36 additions & 4 deletions testsuite/MDAnalysisTests/coordinates/test_xdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import errno
import numpy as np
import os
from os.path import split
import shutil
import subprocess

Expand Down Expand Up @@ -741,15 +742,24 @@ def test_offsets(self, trajectory, traj):
def test_reload_offsets(self, traj):
self._reader(traj, refresh_offsets=True)

def test_nonexistant_offsets_file(self, traj):
# assert that a nonexistant file returns False during read-in
def test_nonexistent_offsets_file(self, traj):
# assert that a nonexistent file returns False during read-in
outfile_offsets = XDR.offsets_filename(traj)
with patch.object(np, "load") as np_load_mock:
np_load_mock.side_effect = IOError
saved_offsets = XDR.read_numpy_offsets(outfile_offsets)
assert_equal(saved_offsets, False)

def test_reload_offsets_if_offsets_readin_fails(self, trajectory):
def test_nonexistent_offsets_file(self, traj):
# assert that a corrupted file returns False during read-in
# Issue #3230
outfile_offsets = XDR.offsets_filename(traj)
with patch.object(np, "load") as np_load_mock:
np_load_mock.side_effect = ValueError
saved_offsets = XDR.read_numpy_offsets(outfile_offsets)
assert_equal(saved_offsets, False)

def test_reload_offsets_if_offsets_readin_io_fails(self, trajectory):
# force the np.load call that is called in read_numpy_offsets
# during _load_offsets to give an IOError
# ensure that offsets are then read-in from the trajectory
Expand All @@ -761,6 +771,19 @@ def test_reload_offsets_if_offsets_readin_fails(self, trajectory):
self.ref_offsets,
err_msg="error loading frame offsets")

def test_reload_offsets_if_offsets_readin_value_fails(self, trajectory):
# force the np.load call that is called in read_numpy_offsets
# during _load_offsets to give an ValueError (Issue #3230)
# ensure that offsets are then read-in from the trajectory
with patch.object(np, "load") as np_load_mock:
np_load_mock.side_effect = ValueError
with pytest.warns(UserWarning, match="Failed to load offsets"):
trajectory._load_offsets()
assert_almost_equal(
trajectory._xdr.offsets,
self.ref_offsets,
err_msg="error loading frame offsets")

def test_persistent_offsets_size_mismatch(self, traj):
# check that stored offsets are not loaded when trajectory
# size differs from stored size
Expand Down Expand Up @@ -850,8 +873,17 @@ def test_persistent_offsets_readonly(self, tmpdir):

filename = str(tmpdir.join(os.path.basename(self.filename)))
# try to write a offsets file
self._reader(filename)
with (pytest.warns(UserWarning, match="Couldn't save offsets") and
pytest.warns(UserWarning, match="Cannot write")):
self._reader(filename)
assert_equal(os.path.exists(XDR.offsets_filename(filename)), False)
# check the lock file is not created as well.
assert_equal(os.path.exists(XDR.offsets_filename(filename,
ending='.lock')), False)

def test_offset_lock_created(self):
assert os.path.exists(XDR.offsets_filename(self.filename,
ending='lock'))


class TestXTCReader_offsets(_GromacsReader_offsets):
Expand Down
33 changes: 33 additions & 0 deletions testsuite/MDAnalysisTests/parallelism/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import multiprocessing

import numpy as np
import os
import shutil
import pytest
import pickle
from numpy.testing import assert_equal

import MDAnalysis as mda
import MDAnalysis.coordinates
from MDAnalysis.coordinates.core import get_reader_for
from MDAnalysis.analysis.rms import RMSD

Expand Down Expand Up @@ -85,6 +88,19 @@ def u(request):
top, trj = request.param
return mda.Universe(top, trj)

@pytest.fixture(scope="function")
def temp_xtc(tmp_path):
fresh_xtc = tmp_path / "testing.xtc"
shutil.copy(XTC, fresh_xtc)
# In principle there is no possibility that the offset for
# fresh_xtc exists as it is a fresh copy for every test. However,
# the code is left for documentation.
try:
os.remove(MDAnalysis.coordinates.XDR.offsets_filename(fresh_xtc))
except OSError:
pass
return fresh_xtc


# Define target functions here
# inside test functions doesn't work
Expand Down Expand Up @@ -125,6 +141,23 @@ def test_universe_unpickle_in_new_process():
assert_equal(ref, res)


def test_creating_multiple_universe_without_offset(temp_xtc, ncopies=3):
# test if they can be created without generating
# the offset simultaneously.
# The tested XTC file is way too short to induce a race scenario but the
# test is included as documentation for the scenario that used to create
# a problem (see PR #3375 and issues #3230, #1988)

args = (GRO, str(temp_xtc))
with multiprocessing.Pool(2) as p:
universes = [p.apply_async(mda.Universe, args) for i in range(ncopies)]
universes = [universe.get() for universe in universes]


assert_equal(universes[0].trajectory._xdr.offsets,
universes[1].trajectory._xdr.offsets)


@pytest.fixture(params=[
# formatname, filename
('CRD', CRD, dict()),
Expand Down

0 comments on commit 174080c

Please sign in to comment.