From c04ca93193aec888e594d1df9e616c7c3eab877d Mon Sep 17 00:00:00 2001 From: Claudia Comito <39374113+ClaudiaComito@users.noreply.github.com> Date: Wed, 19 Jun 2024 17:25:46 +0200 Subject: [PATCH 1/4] debug test_skew --- heat/core/tests/test_statistics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heat/core/tests/test_statistics.py b/heat/core/tests/test_statistics.py index dc01cf9ff4..f47fcf710d 100644 --- a/heat/core/tests/test_statistics.py +++ b/heat/core/tests/test_statistics.py @@ -1263,7 +1263,7 @@ def __split_calc(ht_split, axis): # 1 dim ht_data = ht.random.rand(50) np_data = ht_data.copy().numpy() - np_skew32 = ht.array((ss.skew(np_data, bias=False)), dtype=ht_data.dtype) + np_skew32 = ht.array((ss.skew(np_data, bias=False)).astype(np.float32), dtype=ht_data.dtype) self.assertAlmostEqual(ht.skew(ht_data), np_skew32.item(), places=5) ht_data = ht.resplit(ht_data, 0) self.assertAlmostEqual(ht.skew(ht_data), np_skew32.item(), places=5) From ee099bddb0a81d29a0f4cc8270b847e2436f6c9e Mon Sep 17 00:00:00 2001 From: Claudia Comito <39374113+ClaudiaComito@users.noreply.github.com> Date: Thu, 20 Jun 2024 04:54:45 +0200 Subject: [PATCH 2/4] import netcdf4 before h5py --- heat/core/io.py | 435 ++++++++++++++++++++++++------------------------ 1 file changed, 217 insertions(+), 218 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index 739e9ad0f9..b74829ce94 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -29,224 +29,6 @@ __all__ = ["load", "load_csv", "save_csv", "save", "supports_hdf5", "supports_netcdf"] -try: - import h5py -except ImportError: - # HDF5 support is optional - def supports_hdf5() -> bool: - """ - Returns ``True`` if Heat supports reading from and writing to HDF5 files, ``False`` otherwise. - """ - return False - -else: - # add functions to exports - __all__.extend(["load_hdf5", "save_hdf5"]) - - # warn the user about serial hdf5 - if not h5py.get_config().mpi and MPI_WORLD.rank == 0: - warnings.warn( - "h5py does not support parallel I/O, falling back to slower serial I/O", ImportWarning - ) - - def supports_hdf5() -> bool: - """ - Returns ``True`` if Heat supports reading from and writing to HDF5 files, ``False`` otherwise. - """ - return True - - def load_hdf5( - path: str, - dataset: str, - dtype: datatype = types.float32, - load_fraction: float = 1.0, - split: Optional[int] = None, - device: Optional[str] = None, - comm: Optional[Communication] = None, - ) -> DNDarray: - """ - Loads data from an HDF5 file. The data may be distributed among multiple processing nodes via the split flag. - - Parameters - ---------- - path : str - Path to the HDF5 file to be read. - dataset : str - Name of the dataset to be read. - dtype : datatype, optional - Data type of the resulting array. - load_fraction : float between 0. (excluded) and 1. (included), default is 1. - if 1. (default), the whole dataset is loaded from the file specified in path - else, the dataset is loaded partially, with the fraction of the dataset (along the split axis) specified by load_fraction - If split is None, load_fraction is automatically set to 1., i.e. the whole dataset is loaded. - split : int or None, optional - The axis along which the data is distributed among the processing cores. - device : str, optional - The device id on which to place the data, defaults to globally set default device. - comm : Communication, optional - The communication to use for the data distribution. - - Raises - ------- - TypeError - If any of the input parameters are not of correct type - - Examples - -------- - >>> a = ht.load_hdf5('data.h5', dataset='DATA') - >>> a.shape - [0/2] (5,) - [1/2] (5,) - >>> a.lshape - [0/2] (5,) - [1/2] (5,) - >>> b = ht.load_hdf5('data.h5', dataset='DATA', split=0) - >>> b.shape - [0/2] (5,) - [1/2] (5,) - >>> b.lshape - [0/2] (3,) - [1/2] (2,) - """ - if not isinstance(path, str): - raise TypeError(f"path must be str, not {type(path)}") - elif not isinstance(dataset, str): - raise TypeError(f"dataset must be str, not {type(dataset)}") - elif split is not None and not isinstance(split, int): - raise TypeError(f"split must be None or int, not {type(split)}") - - if not isinstance(load_fraction, float): - raise TypeError(f"load_fraction must be float, but is {type(load_fraction)}") - else: - if split is not None and (load_fraction <= 0.0 or load_fraction > 1.0): - raise ValueError( - f"load_fraction must be between 0. (excluded) and 1. (included), but is {load_fraction}." - ) - - # infer the type and communicator for the loaded array - dtype = types.canonical_heat_type(dtype) - # determine the comm and device the data will be placed on - device = devices.sanitize_device(device) - comm = sanitize_comm(comm) - - # actually load the data from the HDF5 file - with h5py.File(path, "r") as handle: - data = handle[dataset] - gshape = data.shape - if split is not None: - gshape = list(gshape) - gshape[split] = int(gshape[split] * load_fraction) - gshape = tuple(gshape) - dims = len(gshape) - split = sanitize_axis(gshape, split) - _, _, indices = comm.chunk(gshape, split) - balanced = True - if split is None: - data = torch.tensor( - data[indices], dtype=dtype.torch_type(), device=device.torch_device - ) - elif indices[split].stop > indices[split].start: - data = torch.tensor( - data[indices], dtype=dtype.torch_type(), device=device.torch_device - ) - else: - warnings.warn("More MPI ranks are used then the length of splitting dimension!") - slice1 = tuple( - slice(0, gshape[i]) if i != split else slice(0, 1) for i in range(dims) - ) - slice2 = tuple( - slice(0, gshape[i]) if i != split else slice(0, 0) for i in range(dims) - ) - data = torch.tensor( - data[slice1], dtype=dtype.torch_type(), device=device.torch_device - ) - data = data[slice2] - - return DNDarray(data, gshape, dtype, split, device, comm, balanced) - - def save_hdf5( - data: DNDarray, path: str, dataset: str, mode: str = "w", **kwargs: Dict[str, object] - ): - """ - Saves ``data`` to an HDF5 file. Attempts to utilize parallel I/O if possible. - - Parameters - ---------- - data : DNDarray - The data to be saved on disk. - path : str - Path to the HDF5 file to be written. - dataset : str - Name of the dataset the data is saved to. - mode : str, optional - File access mode, one of ``'w', 'a', 'r+'`` - kwargs : dict, optional - Additional arguments passed to the created dataset. - - Raises - ------- - TypeError - If any of the input parameters are not of correct type. - ValueError - If the access mode is not understood. - - Examples - -------- - >>> x = ht.arange(100, split=0) - >>> ht.save_hdf5(x, 'data.h5', dataset='DATA') - """ - if not isinstance(data, DNDarray): - raise TypeError(f"data must be heat tensor, not {type(data)}") - if not isinstance(path, str): - raise TypeError(f"path must be str, not {type(path)}") - if not isinstance(dataset, str): - raise TypeError(f"dataset must be str, not {type(path)}") - - # we only support a subset of possible modes - if mode not in __VALID_WRITE_MODES: - raise ValueError(f"mode was {mode}, not in possible modes {__VALID_WRITE_MODES}") - - # chunk the data, if no split is set maximize parallel I/O and chunk first axis - is_split = data.split is not None - _, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0) - - # attempt to perform parallel I/O if possible - if h5py.get_config().mpi: - with h5py.File(path, mode, driver="mpio", comm=data.comm.handle) as handle: - dset = handle.create_dataset(dataset, data.shape, **kwargs) - dset[slices] = data.larray.cpu() if is_split else data.larray[slices].cpu() - - # otherwise a single rank only write is performed in case of local data (i.e. no split) - elif data.comm.rank == 0: - with h5py.File(path, mode) as handle: - dset = handle.create_dataset(dataset, data.shape, **kwargs) - if is_split: - dset[slices] = data.larray.cpu() - else: - dset[...] = data.larray.cpu() - - # ping next rank if it exists - if is_split and data.comm.size > 1: - data.comm.Isend([None, 0, MPI.INT], dest=1) - data.comm.Recv([None, 0, MPI.INT], source=data.comm.size - 1) - - # no MPI, but split data is more tricky, we have to serialize the writes - elif is_split: - # wait for the previous rank to finish writing its chunk, then write own part - data.comm.Recv([None, 0, MPI.INT], source=data.comm.rank - 1) - with h5py.File(path, "r+") as handle: - handle[dataset][slices] = data.larray.cpu() - - # ping the next node in the communicator, wrap around to 0 to complete barrier behavior - next_rank = (data.comm.rank + 1) % data.comm.size - data.comm.Isend([None, 0, MPI.INT], dest=next_rank) - - DNDarray.save_hdf5 = lambda self, path, dataset, mode="w", **kwargs: save_hdf5( - self, path, dataset, mode, **kwargs - ) - DNDarray.save_hdf5.__doc__ = save_hdf5.__doc__ - - try: import netCDF4 as nc except ImportError: @@ -668,6 +450,223 @@ def __merge_slices( ) DNDarray.save_netcdf.__doc__ = save_netcdf.__doc__ +try: + import h5py +except ImportError: + # HDF5 support is optional + def supports_hdf5() -> bool: + """ + Returns ``True`` if Heat supports reading from and writing to HDF5 files, ``False`` otherwise. + """ + return False + +else: + # add functions to exports + __all__.extend(["load_hdf5", "save_hdf5"]) + + # warn the user about serial hdf5 + if not h5py.get_config().mpi and MPI_WORLD.rank == 0: + warnings.warn( + "h5py does not support parallel I/O, falling back to slower serial I/O", ImportWarning + ) + + def supports_hdf5() -> bool: + """ + Returns ``True`` if Heat supports reading from and writing to HDF5 files, ``False`` otherwise. + """ + return True + + def load_hdf5( + path: str, + dataset: str, + dtype: datatype = types.float32, + load_fraction: float = 1.0, + split: Optional[int] = None, + device: Optional[str] = None, + comm: Optional[Communication] = None, + ) -> DNDarray: + """ + Loads data from an HDF5 file. The data may be distributed among multiple processing nodes via the split flag. + + Parameters + ---------- + path : str + Path to the HDF5 file to be read. + dataset : str + Name of the dataset to be read. + dtype : datatype, optional + Data type of the resulting array. + load_fraction : float between 0. (excluded) and 1. (included), default is 1. + if 1. (default), the whole dataset is loaded from the file specified in path + else, the dataset is loaded partially, with the fraction of the dataset (along the split axis) specified by load_fraction + If split is None, load_fraction is automatically set to 1., i.e. the whole dataset is loaded. + split : int or None, optional + The axis along which the data is distributed among the processing cores. + device : str, optional + The device id on which to place the data, defaults to globally set default device. + comm : Communication, optional + The communication to use for the data distribution. + + Raises + ------- + TypeError + If any of the input parameters are not of correct type + + Examples + -------- + >>> a = ht.load_hdf5('data.h5', dataset='DATA') + >>> a.shape + [0/2] (5,) + [1/2] (5,) + >>> a.lshape + [0/2] (5,) + [1/2] (5,) + >>> b = ht.load_hdf5('data.h5', dataset='DATA', split=0) + >>> b.shape + [0/2] (5,) + [1/2] (5,) + >>> b.lshape + [0/2] (3,) + [1/2] (2,) + """ + if not isinstance(path, str): + raise TypeError(f"path must be str, not {type(path)}") + elif not isinstance(dataset, str): + raise TypeError(f"dataset must be str, not {type(dataset)}") + elif split is not None and not isinstance(split, int): + raise TypeError(f"split must be None or int, not {type(split)}") + + if not isinstance(load_fraction, float): + raise TypeError(f"load_fraction must be float, but is {type(load_fraction)}") + else: + if split is not None and (load_fraction <= 0.0 or load_fraction > 1.0): + raise ValueError( + f"load_fraction must be between 0. (excluded) and 1. (included), but is {load_fraction}." + ) + + # infer the type and communicator for the loaded array + dtype = types.canonical_heat_type(dtype) + # determine the comm and device the data will be placed on + device = devices.sanitize_device(device) + comm = sanitize_comm(comm) + + # actually load the data from the HDF5 file + with h5py.File(path, "r") as handle: + data = handle[dataset] + gshape = data.shape + if split is not None: + gshape = list(gshape) + gshape[split] = int(gshape[split] * load_fraction) + gshape = tuple(gshape) + dims = len(gshape) + split = sanitize_axis(gshape, split) + _, _, indices = comm.chunk(gshape, split) + balanced = True + if split is None: + data = torch.tensor( + data[indices], dtype=dtype.torch_type(), device=device.torch_device + ) + elif indices[split].stop > indices[split].start: + data = torch.tensor( + data[indices], dtype=dtype.torch_type(), device=device.torch_device + ) + else: + warnings.warn("More MPI ranks are used then the length of splitting dimension!") + slice1 = tuple( + slice(0, gshape[i]) if i != split else slice(0, 1) for i in range(dims) + ) + slice2 = tuple( + slice(0, gshape[i]) if i != split else slice(0, 0) for i in range(dims) + ) + data = torch.tensor( + data[slice1], dtype=dtype.torch_type(), device=device.torch_device + ) + data = data[slice2] + + return DNDarray(data, gshape, dtype, split, device, comm, balanced) + + def save_hdf5( + data: DNDarray, path: str, dataset: str, mode: str = "w", **kwargs: Dict[str, object] + ): + """ + Saves ``data`` to an HDF5 file. Attempts to utilize parallel I/O if possible. + + Parameters + ---------- + data : DNDarray + The data to be saved on disk. + path : str + Path to the HDF5 file to be written. + dataset : str + Name of the dataset the data is saved to. + mode : str, optional + File access mode, one of ``'w', 'a', 'r+'`` + kwargs : dict, optional + Additional arguments passed to the created dataset. + + Raises + ------- + TypeError + If any of the input parameters are not of correct type. + ValueError + If the access mode is not understood. + + Examples + -------- + >>> x = ht.arange(100, split=0) + >>> ht.save_hdf5(x, 'data.h5', dataset='DATA') + """ + if not isinstance(data, DNDarray): + raise TypeError(f"data must be heat tensor, not {type(data)}") + if not isinstance(path, str): + raise TypeError(f"path must be str, not {type(path)}") + if not isinstance(dataset, str): + raise TypeError(f"dataset must be str, not {type(path)}") + + # we only support a subset of possible modes + if mode not in __VALID_WRITE_MODES: + raise ValueError(f"mode was {mode}, not in possible modes {__VALID_WRITE_MODES}") + + # chunk the data, if no split is set maximize parallel I/O and chunk first axis + is_split = data.split is not None + _, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0) + + # attempt to perform parallel I/O if possible + if h5py.get_config().mpi: + with h5py.File(path, mode, driver="mpio", comm=data.comm.handle) as handle: + dset = handle.create_dataset(dataset, data.shape, **kwargs) + dset[slices] = data.larray.cpu() if is_split else data.larray[slices].cpu() + + # otherwise a single rank only write is performed in case of local data (i.e. no split) + elif data.comm.rank == 0: + with h5py.File(path, mode) as handle: + dset = handle.create_dataset(dataset, data.shape, **kwargs) + if is_split: + dset[slices] = data.larray.cpu() + else: + dset[...] = data.larray.cpu() + + # ping next rank if it exists + if is_split and data.comm.size > 1: + data.comm.Isend([None, 0, MPI.INT], dest=1) + data.comm.Recv([None, 0, MPI.INT], source=data.comm.size - 1) + + # no MPI, but split data is more tricky, we have to serialize the writes + elif is_split: + # wait for the previous rank to finish writing its chunk, then write own part + data.comm.Recv([None, 0, MPI.INT], source=data.comm.rank - 1) + with h5py.File(path, "r+") as handle: + handle[dataset][slices] = data.larray.cpu() + + # ping the next node in the communicator, wrap around to 0 to complete barrier behavior + next_rank = (data.comm.rank + 1) % data.comm.size + data.comm.Isend([None, 0, MPI.INT], dest=next_rank) + + DNDarray.save_hdf5 = lambda self, path, dataset, mode="w", **kwargs: save_hdf5( + self, path, dataset, mode, **kwargs + ) + DNDarray.save_hdf5.__doc__ = save_hdf5.__doc__ + def load( path: str, *args: Optional[List[object]], **kwargs: Optional[Dict[str, object]] From 0b6fa7328f519bf9b0f586a981780db4911d1eb6 Mon Sep 17 00:00:00 2001 From: Claudia Comito <39374113+ClaudiaComito@users.noreply.github.com> Date: Thu, 20 Jun 2024 05:10:41 +0200 Subject: [PATCH 3/4] fix array building with dtype casting --- heat/core/tests/test_statistics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heat/core/tests/test_statistics.py b/heat/core/tests/test_statistics.py index f47fcf710d..83c036fe47 100644 --- a/heat/core/tests/test_statistics.py +++ b/heat/core/tests/test_statistics.py @@ -1263,7 +1263,7 @@ def __split_calc(ht_split, axis): # 1 dim ht_data = ht.random.rand(50) np_data = ht_data.copy().numpy() - np_skew32 = ht.array((ss.skew(np_data, bias=False)).astype(np.float32), dtype=ht_data.dtype) + np_skew32 = ht.array(ss.skew(np_data, bias=False)).astype(ht_data.dtype) self.assertAlmostEqual(ht.skew(ht_data), np_skew32.item(), places=5) ht_data = ht.resplit(ht_data, 0) self.assertAlmostEqual(ht.skew(ht_data), np_skew32.item(), places=5) From ea33e9da8e318d887ae2894a3e2b25c0690d88c0 Mon Sep 17 00:00:00 2001 From: Claudia Comito <39374113+ClaudiaComito@users.noreply.github.com> Date: Thu, 20 Jun 2024 08:52:20 +0200 Subject: [PATCH 4/4] pin version numpy <2 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 6aca05e38c..6682618917 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ ], install_requires=[ "mpi4py>=3.0.0", - "numpy>=1.22.0", + "numpy>=1.22.0, <2", "torch>=1.12.0, <2.3.2", "scipy>=1.10.0", "pillow>=6.0.0",