|
| 1 | +import os |
| 2 | +import sys |
| 3 | +import unittest |
| 4 | + |
| 5 | +import numpy as np |
| 6 | +import torch |
| 7 | +import torch_xla |
| 8 | +import torch_xla.runtime as xr |
| 9 | +from torch_xla.experimental.assume_pure import assume_pure |
| 10 | +from torch_xla.distributed.spmd import mark_sharding, set_global_mesh, get_1d_mesh, Mesh |
| 11 | + |
| 12 | + |
| 13 | +class AssumePureSpmdTest(unittest.TestCase): |
| 14 | + |
| 15 | + @classmethod |
| 16 | + def setUpClass(cls): |
| 17 | + # Activate SPMD |
| 18 | + xr.use_spmd() |
| 19 | + |
| 20 | + def setUp(self): |
| 21 | + # Set up a simple SPMD mesh for these tests. |
| 22 | + self.spmd_mesh = get_1d_mesh(axis_name="model") |
| 23 | + set_global_mesh(self.spmd_mesh) |
| 24 | + |
| 25 | + @unittest.skipUnless(xr.global_runtime_device_count() > 1, |
| 26 | + "Multiple devices required") |
| 27 | + @unittest.skipIf( |
| 28 | + torch.cuda.is_available() or os.environ.get('PJRT_DEVICE') == 'CUDA', |
| 29 | + "TODO(https://github.com/pytorch/xla/issues/9017): Get these tests working on GPU" |
| 30 | + ) |
| 31 | + def test_assume_pure_works_with_mark_sharding(self): |
| 32 | + x = torch.randn((8, 4, 5, 128), device='xla') |
| 33 | + result = assume_pure(mark_sharding)(x, self.spmd_mesh, |
| 34 | + ("model", None, None, None)) |
| 35 | + torch_xla.sync(wait=True) |
| 36 | + N = xr.global_runtime_device_count() |
| 37 | + self.assertIn(f'devices=[{N}', |
| 38 | + torch_xla._XLAC._get_xla_sharding_spec(result)) |
| 39 | + |
| 40 | + @unittest.skipUnless(xr.global_runtime_device_count() > 1, |
| 41 | + "Multiple devices required") |
| 42 | + @unittest.skipIf( |
| 43 | + torch.cuda.is_available() or os.environ.get('PJRT_DEVICE') == 'CUDA', |
| 44 | + "TODO(https://github.com/pytorch/xla/issues/9017): Get these tests working on GPU" |
| 45 | + ) |
| 46 | + def test_convert_to_jax_mesh(self): |
| 47 | + jax_mesh = self.spmd_mesh.get_jax_mesh() |
| 48 | + self.assertEqual(jax_mesh.devices.shape, self.spmd_mesh.mesh_shape) |
| 49 | + np.testing.assert_equal( |
| 50 | + np.array([dev.id for dev in jax_mesh.devices.flatten()]), |
| 51 | + self.spmd_mesh.device_ids) |
| 52 | + |
| 53 | + @unittest.skipUnless(xr.global_runtime_device_count() > 1, |
| 54 | + "Multiple devices required") |
| 55 | + @unittest.skipUnless(os.environ.get('PJRT_DEVICE') == 'TPU', "TPU only test") |
| 56 | + def test_convert_to_jax_mesh_shuffled(self): |
| 57 | + """Test get_jax_mesh when the PyTorch/XLA mesh has a custom order.""" |
| 58 | + |
| 59 | + # Arrange |
| 60 | + num_devices = xr.global_runtime_device_count() |
| 61 | + device_ids = np.arange(num_devices) |
| 62 | + device_ids = np.random.permutation(device_ids) |
| 63 | + self.spmd_mesh = Mesh( |
| 64 | + device_ids, mesh_shape=(num_devices,), axis_names=('model',)) |
| 65 | + |
| 66 | + # Act |
| 67 | + jax_mesh = self.spmd_mesh.get_jax_mesh() |
| 68 | + |
| 69 | + # Assert |
| 70 | + torch_xla_devices = np.array( |
| 71 | + [xr.global_runtime_device_attributes()[i] for i in device_ids]) |
| 72 | + self.assertEqual(jax_mesh.devices.shape, self.spmd_mesh.mesh_shape) |
| 73 | + np.testing.assert_equal( |
| 74 | + np.array([dev.coords for dev in jax_mesh.devices.flatten()]), |
| 75 | + np.array([dev['coords'] for dev in torch_xla_devices.flatten()]), |
| 76 | + ) |
| 77 | + |
| 78 | + |
| 79 | +if __name__ == '__main__': |
| 80 | + test = unittest.main() |
| 81 | + sys.exit(0 if test.result.wasSuccessful() else 1) |
0 commit comments