Skip to content

Commit

Permalink
Merge pull request #11 from OSU-Nowlab/gems
Browse files Browse the repository at this point in the history
Support GEMS MASTER for AmoebaNet and ResNet
  • Loading branch information
Quentin-Anthony authored Sep 25, 2023
2 parents e05797a + 5473854 commit 4c3eb63
Show file tree
Hide file tree
Showing 8 changed files with 691 additions and 20 deletions.
67 changes: 67 additions & 0 deletions benchmarks/gems_master_model/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# GEMS: <u>G</u>PU-<u>E</u>nabled <u>M</u>emory-Aware Model-Parallelism <u>S</u>ystem for Distributed DNN Training
Model Parallelism is necessary for training out-of-core models; however, it can lead to the underutilization of resources. To address this limitation, Pipeline Parallelism is employed, where the batch size is set to greater than 1. But, when dealing with very high-resolution images, certain state-of-the-art models can only work with a unit batch size. GEMS is a memory-efficient design for model parallelism that enables training models with any batch size while utilizing the same resources. For more details, please refer to the original paper: [GEMS: <u>G</u>PU-<u>E</u>nabled <u>M</u>emory-Aware Model-Parallelism <u>S</u>ystem for Distributed DNN Training](https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=9355254).

## Run GEMS-MASTER:

#### Generic command:
```bash
$MV2_HOME/bin/mpirun_rsh --export-all -np $np --hostfile ${HOSTFILE} MV2_USE_GDRCOPY=0 MV2_ENABLE_AFFINITY=0 MV2_USE_CUDA=1 LD_PRELOAD=$MV2_HOME/lib/libmpi.so python ${gems_model_script} --split-size ${split_size} --image-size ${image_size} --batch-size ${batch_size} --times ${times}
```
#### Examples

- Example to run AmoebaNet MASTER model for 1024 * 1024 image size with 4 model split size(i.e. # of partitions for MP), model replication factor (η = 2) and batch size for each model replica as 1 (i.e. effective batch size (EBS) = η × BS = 2).

```bash
$MV2_HOME/bin/mpirun_rsh --export-all -np $np --hostfile ${HOSTFILE} MV2_USE_GDRCOPY=0 MV2_ENABLE_AFFINITY=0 MV2_USE_CUDA=1 LD_PRELOAD=$MV2_HOME/lib/libmpi.so python benchmarks/gems_master_model/benchmark_amoebanet_gems_master.py --split-size 4 --image-size 1024 --batch-size 1 --times 2
```
- Similarly, we can run benchmark for ResNet MASTER model.
Below is example to run ResNet MASTER model for 2048 * 2048 image size with 4 model split size(i.e. # of partitions for MP), model replication factor (η = 4) and batch size for each model replica as 1 (i.e. effective batch size (EBS) = η × BS = 4).
```bash
$MV2_HOME/bin/mpirun_rsh --export-all -np $np --hostfile ${HOSTFILE} MV2_USE_GDRCOPY=0 MV2_ENABLE_AFFINITY=0 MV2_USE_CUDA=1 LD_PRELOAD=$MV2_HOME/lib/libmpi.so python benchmarks/gems_master_model/benchmark_resnet_gems_master.py --split-size 4 --image-size 2048 --batch-size 1 --times 4 &>> $OUTFILE 2>&1

```

Below are the available configuration options :

<pre>
usage: benchmark_amoebanet_sp.py [-h] [-v] [--batch-size BATCH_SIZE] [--parts PARTS] [--split-size SPLIT_SIZE] [--num-spatial-parts NUM_SPATIAL_PARTS]
[--spatial-size SPATIAL_SIZE] [--times TIMES] [--image-size IMAGE_SIZE] [--num-epochs NUM_EPOCHS] [--num-layers NUM_LAYERS]
[--num-filters NUM_FILTERS] [--balance BALANCE] [--halo-D2] [--fused-layers FUSED_LAYERS] [--local-DP LOCAL_DP] [--slice-method SLICE_METHOD]
[--app APP] [--datapath DATAPATH]

SP-MP-DP Configuration Script

optional arguments:
-h, --help show this help message and exit
-v, --verbose Prints performance numbers or logs (default: False)
--batch-size BATCH_SIZE
input batch size (default: 32)
--parts PARTS Number of parts for MP (default: 1)
--split-size SPLIT_SIZE
Number of process for MP (default: 2)
--num-spatial-parts NUM_SPATIAL_PARTS
Number of partitions in spatial parallelism (default: 4)
--spatial-size SPATIAL_SIZE
Number splits for spatial parallelism (default: 1)
--times TIMES Number of times to repeat MASTER 1: 2 repications, 2: 4 replications (default: 1)
--image-size IMAGE_SIZE
Image size for synthetic benchmark (default: 32)
--num-epochs NUM_EPOCHS
Number of epochs (default: 1)
--num-layers NUM_LAYERS
Number of layers in amoebanet (default: 18)
--num-filters NUM_FILTERS
Number of layers in amoebanet (default: 416)
--balance BALANCE length of list equals to number of partitions and sum should be equal to num layers (default: None)
--halo-D2 Enable design2 (do halo exhange on few convs) for spatial conv. (default: False)
--fused-layers FUSED_LAYERS
When D2 design is enables for halo exchange, number of blocks to fuse in ResNet model (default: 1)
--local-DP LOCAL_DP LBANN intergration of SP with MP. MP can apply data parallelism. 1: only one GPU for a given split, 2: two gpus for a given split (uses DP)
(default: 1)
--slice-method SLICE_METHOD
Slice method (square, vertical, and horizontal) in Spatial parallelism (default: square)
--app APP Application type (1.medical, 2.cifar, and synthetic) in Spatial parallelism (default: 3)
--datapath DATAPATH local Dataset path (default: ./train)
</pre>

*Note:"--times" is GEMS specific parameter and certain parameters such as "--num-spatial-parts", "--slice-method", "--halo-D2" would not be required by GEMS.*
293 changes: 293 additions & 0 deletions benchmarks/gems_master_model/benchmark_amoebanet_gems_master.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
import torch
import torch.distributed as dist
import torchvision.transforms as transforms
import torchvision
import numpy as np
import time
import sys
import math
import logging
from models import amoebanet
from torchgems import parser
from torchgems.mp_pipeline import model_generator
from torchgems.gems_master import train_model_master
import torchgems.comm as gems_comm

parser_obj = parser.get_parser()
args = parser_obj.parse_args()

if args.verbose:
logging.basicConfig(level=logging.DEBUG)

gems_comm.initialize_cuda()


class Unbuffered(object):
def __init__(self, stream):
self.stream = stream

def write(self, data):
self.stream.write(data)
self.stream.flush()

def writelines(self, datas):
self.stream.writelines(datas)
self.stream.flush()

def __getattr__(self, attr):
return getattr(self.stream, attr)


def init_processes(backend="mpi"):
"""Initialize the distributed environment."""
dist.init_process_group(backend)
size = dist.get_world_size()
rank = dist.get_rank()
return size, rank


sys.stdout = Unbuffered(sys.stdout)

np.random.seed(seed=1405)
parts = args.parts
batch_size = args.batch_size
epoch = args.num_epochs
# APP
# 1: Medical
# 2: Cifar
# 3: synthetic
APP = args.app
times = args.times
image_size = int(args.image_size)
num_layers = args.num_layers
num_filters = args.num_filters
balance = args.balance
mp_size = args.split_size
datapath = args.datapath
num_workers = args.num_workers
num_classes = args.num_classes

##################### AmoebaNet GEMS model specific parameters #####################

image_size_seq = 512
ENABLE_ASYNC = True

###############################################################################
mpi_comm = gems_comm.MPIComm(split_size=mp_size, ENABLE_MASTER=True)
rank = mpi_comm.rank
local_rank = rank % mp_size
if balance is not None:
balance = [int(i) for i in balance.split(",")]

# Initialize AmoebaNet model
model = amoebanet.amoebanetd(
num_classes=num_classes, num_layers=args.num_layers, num_filters=args.num_filters
)

mul_shape = int(args.image_size / image_size_seq)

# Initialize parameters for Model Parallelism
model_gen = model_generator(
model=model,
split_size=mp_size,
input_size=(int(batch_size / parts), 3, image_size_seq, image_size_seq),
balance=balance,
)

# Get the shape of model on each split rank for image_size_seq and move it to device
# Note : we take shape w.r.t image_size_seq as model w.r.t image_size may not be
# able to fit in memory
model_gen.ready_model(split_rank=local_rank, GET_SHAPES_ON_CUDA=True)

# Get the shape of model on each split rank for image_size
image_size_times = int(image_size / image_size_seq)
amoebanet_shapes_list = []
for output_shape in model_gen.shape_list:
if isinstance(output_shape, list):
temp_shape = []
for shape_tuple in output_shape:
x = (
shape_tuple[0],
shape_tuple[1],
int(shape_tuple[2] * image_size_times),
int(shape_tuple[3] * image_size_times),
)
temp_shape.append(x)
amoebanet_shapes_list.append(temp_shape)
else:
if len(output_shape) == 2:
amoebanet_shapes_list.append(output_shape)
else:
x = (
output_shape[0],
output_shape[1],
int(output_shape[2] * image_size_times),
int(output_shape[3] * image_size_times),
)
amoebanet_shapes_list.append(x)

model_gen.shape_list = amoebanet_shapes_list

logging.info(f"Shape of model on local_rank {local_rank} : {model_gen.shape_list}")

del model_gen
del model
torch.cuda.ipc_collect()

model = amoebanet.amoebanetd(
num_classes=num_classes, num_layers=args.num_layers, num_filters=args.num_filters
)

# GEMS Model 1
model_gen1 = model_gen = model_generator(
model=model,
split_size=mp_size,
input_size=(int(batch_size / parts), 3, image_size, image_size),
balance=balance,
shape_list=amoebanet_shapes_list,
)
model_gen1.ready_model(split_rank=local_rank)

model = amoebanet.amoebanetd(
num_classes=num_classes, num_layers=args.num_layers, num_filters=args.num_filters
)

# GEMS Model 2
model_gen2 = model_gen = model_generator(
model=model,
split_size=mp_size,
input_size=(int(batch_size / parts), 3, image_size, image_size),
balance=balance,
shape_list=amoebanet_shapes_list,
)
model_gen2.ready_model(split_rank=mp_size - local_rank - 1)

tm_master = train_model_master(
model_gen1,
model_gen2,
local_rank,
batch_size,
epoch,
criterion=None,
optimizer=None,
parts=parts,
ASYNC=ENABLE_ASYNC,
)

sync_allreduce = gems_comm.SyncAllreduce(mpi_comm)

############################## Dataset Definition ##############################

transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
torch.manual_seed(0)

if APP == 1:
trainset = torchvision.datasets.ImageFolder(
datapath,
transform=transform,
target_transform=None,
)
my_dataloader = torch.utils.data.DataLoader(
trainset,
batch_size=times * batch_size,
shuffle=True,
num_workers=num_workers,
pin_memory=True,
)
size_dataset = len(my_dataloader.dataset)
elif APP == 2:
transform = transforms.Compose(
[
transforms.Resize((512, 512)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]
)
torch.manual_seed(0)
trainset = torchvision.datasets.CIFAR10(
root=datapath, train=True, download=True, transform=transform
)
my_dataloader = torch.utils.data.DataLoader(
trainset,
batch_size=times * batch_size,
shuffle=False,
num_workers=num_workers,
pin_memory=True,
)
size_dataset = len(my_dataloader.dataset)
else:
my_dataset = torchvision.datasets.FakeData(
size=10 * batch_size,
image_size=(3, image_size, image_size),
num_classes=num_classes,
transform=transform,
target_transform=None,
random_offset=0,
)
my_dataloader = torch.utils.data.DataLoader(
my_dataset,
batch_size=batch_size * times,
shuffle=False,
num_workers=num_workers,
pin_memory=True,
)
size_dataset = 10 * batch_size


################################################################################

sync_allreduce.sync_model(model_gen1, model_gen2)

perf = []


def run_epoch():
for i_e in range(epoch):
loss = 0
correct = 0
size = len(my_dataloader.dataset)
t = time.time()
for batch, data in enumerate(my_dataloader, 0):
start_event = torch.cuda.Event(enable_timing=True, blocking=True)
end_event = torch.cuda.Event(enable_timing=True, blocking=True)
start_event.record()

if batch > math.floor(size_dataset / (times * batch_size)) - 1:
break

inputs, labels = data

local_loss, local_correct = tm_master.run_step(inputs, labels)
loss += local_loss
correct += local_correct

sync_allreduce.apply_allreduce_master_and_update(
tm_master, model_gen1, model_gen2
)

end_event.record()
torch.cuda.synchronize()
t = start_event.elapsed_time(end_event) / 1000

if local_rank == mp_size - 1:
logging.info(
f"Step :{batch}, LOSS: {local_loss}, Global loss: {loss/(batch+1)} Acc: {local_correct} [{batch * len(inputs):>5d}/{size:>5d}]"
)

if local_rank == 0:
print(f"Epoch: {i_e} images per sec:{batch_size / t}")
perf.append(batch_size / t)

t = time.time()
if local_rank == mp_size - 1:
print(f"Epoch {i_e} Global loss: {loss / batch} Acc {correct / batch}")


run_epoch()

if local_rank == 0:
print(f"Mean {sum(perf) / len(perf)} Median {np.median(perf)}")

################################################################################
Loading

0 comments on commit 4c3eb63

Please sign in to comment.