From 88d451367a4a1b5fab6215916922a439725b2459 Mon Sep 17 00:00:00 2001 From: Theodore Kisner Date: Wed, 13 Mar 2024 22:08:17 -0700 Subject: [PATCH 1/4] Attempt to catch unit test failures more reliably --- pshmem/test.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pshmem/test.py b/pshmem/test.py index ffdf4e0..1f278b6 100644 --- a/pshmem/test.py +++ b/pshmem/test.py @@ -471,5 +471,17 @@ def run(): suite.addTest(unittest.makeSuite(LockTest)) suite.addTest(unittest.makeSuite(ShmemTest)) runner = unittest.TextTestRunner() - runner.run(suite) + + ret = 0 + _ret = runner.run(suite) + if not _ret.wasSuccessful(): + ret += 1 + + if self.comm is not None: + ret = self.comm.allreduce(ret, op=MPI.SUM) + + if ret > 0: + print(f"{ret} Processes had failures") + sys.exit(6) + return From 8e159bd5c7bfba0887b2c30b27eaf203a5911c5d Mon Sep 17 00:00:00 2001 From: Theodore Kisner Date: Wed, 13 Mar 2024 22:12:41 -0700 Subject: [PATCH 2/4] Fix typo --- pshmem/test.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pshmem/test.py b/pshmem/test.py index 1f278b6..1797376 100644 --- a/pshmem/test.py +++ b/pshmem/test.py @@ -467,6 +467,10 @@ def test_lock(self): def run(): + comm = None + if MPI is not None: + comm = MPI.COMM_WORLD + suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(LockTest)) suite.addTest(unittest.makeSuite(ShmemTest)) @@ -477,8 +481,8 @@ def run(): if not _ret.wasSuccessful(): ret += 1 - if self.comm is not None: - ret = self.comm.allreduce(ret, op=MPI.SUM) + if comm is not None: + ret = comm.allreduce(ret, op=MPI.SUM) if ret > 0: print(f"{ret} Processes had failures") From bd342836a42de52d216bee5670d9162255f8b8ac Mon Sep 17 00:00:00 2001 From: Theodore Kisner Date: Wed, 13 Mar 2024 22:38:10 -0700 Subject: [PATCH 3/4] Move pre-deletion of shared memory segment until after numpy wrapping --- pshmem/shmem.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/pshmem/shmem.py b/pshmem/shmem.py index 383da34..b473645 100644 --- a/pshmem/shmem.py +++ b/pshmem/shmem.py @@ -216,25 +216,13 @@ def __init__(self, shape, dtype, comm, comm_node=None, comm_node_rank=None): if self._nodecomm is not None: self._nodecomm.barrier() - # Now the rank zero process will call remove() to mark the shared - # memory segment for removal. However, this will not actually - # be removed until all processes detach. - if self._noderank == 0: - try: - self._shmem.remove() - except sysv_ipc.ExistentialError: - msg = "Process {}: {}".format(self._rank, self._name) - msg += " failed to remove shared memory" - msg += ": {}".format(e) - print(msg, flush=True) - raise - # Create a numpy array which acts as a view of the buffer. self._flat = np.ndarray( self._n, dtype=self._dtype, buffer=self._shmem, ) + # Initialize to zero. if self._noderank == 0: self._flat[:] = 0 @@ -242,6 +230,19 @@ def __init__(self, shape, dtype, comm, comm_node=None, comm_node_rank=None): # Wrap self.data = self._flat.reshape(self._shape) + # Now the rank zero process will call remove() to mark the shared + # memory segment for removal. However, this will not actually + # be removed until all processes detach. + if self._noderank == 0: + try: + self._shmem.remove() + except sysv_ipc.ExistentialError: + msg = "Process {}: {}".format(self._rank, self._name) + msg += " failed to remove shared memory" + msg += ": {}".format(e) + print(msg, flush=True) + raise + def __del__(self): self.close() From 7a265c38fda19c55f71c0b2d376ed1eba9b60b14 Mon Sep 17 00:00:00 2001 From: Theodore Kisner Date: Wed, 13 Mar 2024 22:42:20 -0700 Subject: [PATCH 4/4] Shift location of barrier until after numpy wrapping --- pshmem/shmem.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pshmem/shmem.py b/pshmem/shmem.py index b473645..41ffb57 100644 --- a/pshmem/shmem.py +++ b/pshmem/shmem.py @@ -212,10 +212,6 @@ def __init__(self, shape, dtype, comm, comm_node=None, comm_node_rank=None): print(msg, flush=True) raise - # Wait for other processes to attach - if self._nodecomm is not None: - self._nodecomm.barrier() - # Create a numpy array which acts as a view of the buffer. self._flat = np.ndarray( self._n, @@ -230,6 +226,10 @@ def __init__(self, shape, dtype, comm, comm_node=None, comm_node_rank=None): # Wrap self.data = self._flat.reshape(self._shape) + # Wait for other processes to attach and wrap + if self._nodecomm is not None: + self._nodecomm.barrier() + # Now the rank zero process will call remove() to mark the shared # memory segment for removal. However, this will not actually # be removed until all processes detach.