diff --git a/pretrain.py b/pretrain.py index 7208172..f273dca 100644 --- a/pretrain.py +++ b/pretrain.py @@ -115,11 +115,11 @@ def main(): elif args.world_size == 1 and ranks_num == 1: # Single GPU mode. assert torch.cuda.is_available(), "No available GPUs." - args.gpu_id = args.gpu_ranks[0] - assert args.gpu_id < torch.cuda.device_count(), "Invalid specified GPU device." + args.local_rank = args.gpu_ranks[0] + assert args.local_rank < torch.cuda.device_count(), "Invalid specified GPU device." args.dist_train = False args.single_gpu = True - print("Using GPU %d for training." % args.gpu_id) + print("Using GPU %d for training." % args.local_rank) else: # CPU mode. assert ranks_num == 0, "GPUs are specified, please check the arguments." diff --git a/tencentpretrain/trainer.py b/tencentpretrain/trainer.py index a959b3b..6fdb92c 100755 --- a/tencentpretrain/trainer.py +++ b/tencentpretrain/trainer.py @@ -82,7 +82,7 @@ def train_and_validate(args): mp.spawn(worker, nprocs=args.ranks_num, args=(args.gpu_ranks, args, model_for_training, model_for_dataloader), daemon=False) elif args.single_gpu: # Single GPU mode. - worker(args.gpu_id, None, args, model_for_training, model_for_dataloader) + worker(args.local_rank, None, args, model_for_training, model_for_dataloader) else: # CPU mode. worker(None, None, args, model_for_training, model_for_dataloader) @@ -114,7 +114,7 @@ def report_and_reset_stats(self): raise NotImplementedError - def train(self, args, gpu_id, rank, loader, model, optimizer, scheduler): + def train(self, args, local_rank, global_rank, loader, model, optimizer, scheduler): model.train() loader_iter = iter(loader) while True: @@ -122,10 +122,10 @@ def train(self, args, gpu_id, rank, loader, model, optimizer, scheduler): break batch = list(next(loader_iter)) self.seq_length = batch[0].size(1) - if gpu_id is not None: + if local_rank is not None: for i in range(len(batch)): if torch.is_tensor(batch[i]): - batch[i] = batch[i].cuda(gpu_id) + batch[i] = batch[i].cuda(local_rank) loss = self.forward_propagation(batch, model) @@ -143,21 +143,21 @@ def train(self, args, gpu_id, rank, loader, model, optimizer, scheduler): model.zero_grad() if self.current_step % self.report_steps == 0 and \ - (not self.dist_train or (self.dist_train and rank == 0)): + (not self.dist_train or (self.dist_train and global_rank == 0)): self.report_and_reset_stats() self.start_time = time.time() if args.deepspeed: if self.current_step % self.save_checkpoint_steps == 0: if args.use_lora: - if rank == 0: + if global_rank == 0: save_model(model, self.output_model_path + "-" + str(self.current_step), args.use_lora) else: model.save_checkpoint(self.output_model_path, str(self.current_step)) else: if self.current_step % self.save_checkpoint_steps == 0 and \ - (not self.dist_train or (self.dist_train and rank == 0)): + (not self.dist_train or (self.dist_train and global_rank == 0)): save_model(model, self.output_model_path + "-" + str(self.current_step), args.use_lora) self.current_step += 1 @@ -559,11 +559,11 @@ class AlpacaTrainer(LmTrainer): "beit": BeitTrainer, "dalle": DalleTrainer, "alpaca": AlpacaTrainer} -def worker(proc_id, gpu_ranks, args, model_for_training, model_for_dataloader=None): +def worker(local_rank, gpu_ranks, args, model_for_training, model_for_dataloader=None): """ Args: - proc_id: The id of GPU for single GPU mode; - The id of process (and GPU) for multiprocessing distributed mode. + local_rank: The id of GPU for single GPU mode; + The id of process (and GPU) for multiprocessing distributed mode. gpu_ranks: List of ranks of each process. """ set_seed(args.seed) @@ -574,17 +574,13 @@ def worker(proc_id, gpu_ranks, args, model_for_training, model_for_dataloader=No if args.deepspeed: import deepspeed deepspeed.init_distributed(dist_backend=args.backend) - rank = dist.get_rank() - gpu_id = proc_id + global_rank = dist.get_rank() elif args.dist_train: - rank = gpu_ranks[proc_id] - gpu_id = proc_id + global_rank = gpu_ranks[local_rank] elif args.single_gpu: - rank = None - gpu_id = proc_id + global_rank = None else: - rank = None - gpu_id = None + global_rank = None # Build optimizer. param_optimizer = list(model_for_training.named_parameters()) @@ -634,10 +630,10 @@ def worker(proc_id, gpu_ranks, args, model_for_training, model_for_dataloader=No if load_path is None: raise ValueError(f"[deepspeed] failed to resume from checkpoint {args.resume_from_checkpoint}") else: - if gpu_id is not None: - model_for_training.cuda(gpu_id) + if local_rank is not None: + model_for_training.cuda(local_rank) if model_for_dataloader is not None: - model_for_dataloader.cuda(gpu_id) + model_for_dataloader.cuda(local_rank) optimizer = custom_optimizer scheduler = custom_scheduler @@ -646,21 +642,21 @@ def worker(proc_id, gpu_ranks, args, model_for_training, model_for_dataloader=No dist.init_process_group(backend=args.backend, init_method=args.master_ip, world_size=args.world_size, - rank=rank) - model_for_training = DistributedDataParallel(model_for_training, device_ids=[gpu_id], find_unused_parameters=True) + rank=global_rank) + model_for_training = DistributedDataParallel(model_for_training, device_ids=[local_rank], find_unused_parameters=True) if model_for_dataloader is not None: - model_for_dataloader = DistributedDataParallel(model_for_dataloader, device_ids=[gpu_id], find_unused_parameters=False) - args.logger.info("Worker %d is training ... " % rank) + model_for_dataloader = DistributedDataParallel(model_for_dataloader, device_ids=[local_rank], find_unused_parameters=False) + args.logger.info("Worker %d is training ... " % global_rank) else: args.logger.info("Worker is training ...") if args.dist_train: if model_for_dataloader is not None: model_for_dataloader = model_for_dataloader.module - train_loader = str2dataloader[args.data_processor](args, args.dataset_path, args.batch_size, rank, args.world_size, gpu_id, True, model_for_dataloader) + train_loader = str2dataloader[args.data_processor](args, args.dataset_path, args.batch_size, global_rank, args.world_size, local_rank, True, model_for_dataloader) else: - train_loader = str2dataloader[args.data_processor](args, args.dataset_path, args.batch_size, 0, 1, gpu_id, True, model_for_dataloader) + train_loader = str2dataloader[args.data_processor](args, args.dataset_path, args.batch_size, 0, 1, local_rank, True, model_for_dataloader) trainer = str2trainer[args.data_processor](args) - trainer.train(args, gpu_id, rank, train_loader, model_for_training, optimizer, scheduler) + trainer.train(args, local_rank, global_rank, train_loader, model_for_training, optimizer, scheduler) diff --git a/tencentpretrain/utils/dataloader.py b/tencentpretrain/utils/dataloader.py index d57c979..2f30126 100755 --- a/tencentpretrain/utils/dataloader.py +++ b/tencentpretrain/utils/dataloader.py @@ -9,13 +9,13 @@ class Dataloader(object): - def __init__(self, args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle=False, model_for_dataloader=None): + def __init__(self, args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle=False, model_for_dataloader=None): self.tokenizer = args.tokenizer self.batch_size = batch_size self.instances_buffer_size = args.instances_buffer_size - self.rank = rank + self.global_rank = global_rank self.world_size = world_size - self.gpu_id = gpu_id + self.local_rank = local_rank self.shuffle = shuffle self.model_for_dataloader = model_for_dataloader self.dataset_reader = open(dataset_path, "rb") @@ -35,7 +35,7 @@ def _fill_buf(self): while True: instance = pickle.load(self.dataset_reader) self.read_count += 1 - if (self.read_count - 1) % self.world_size == self.rank: + if (self.read_count - 1) % self.world_size == self.global_rank: self.buffer.append(instance) if len(self.buffer) >= self.instances_buffer_size: break @@ -541,8 +541,8 @@ def __iter__(self): class VisionDataloader(Dataloader): - def __init__(self, args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle=False, model_for_dataloader=None): - super(VisionDataloader, self).__init__(args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle, model_for_dataloader) + def __init__(self, args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle=False, model_for_dataloader=None): + super(VisionDataloader, self).__init__(args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle, model_for_dataloader) self.patch_size = args.patch_size self.image_height = args.image_height self.image_width = args.image_width @@ -593,7 +593,7 @@ def __iter__(self): for ins in instances: image = read_image(ins[1], ImageReadMode.RGB) - image = image.cuda(self.gpu_id) + image = image.cuda(self.local_rank) src.append(self.transform(image)) tgt.append(ins[0]) seg.append([1] * ((self.image_height // self.patch_size) * (self.image_width // self.patch_size) + 1)) @@ -658,7 +658,7 @@ def __iter__(self): seg_image = [2] * ((self.image_height // self.patch_size) * (self.image_width // self.patch_size) + 1) tgt_mlm[-1].extend([0] * len(seg_image)) - image = image.cuda(self.gpu_id) + image = image.cuda(self.local_rank) src_image_single = self.transform(image) src_image.append(src_image_single) seg.append([1] * ins[1][0] + [0] * pad_num + seg_image) @@ -712,7 +712,7 @@ def __iter__(self): src_text.append(src_text_single) seg_text.append([1] * ins[1][0] + [0] * pad_num) image = read_image(ins[2], ImageReadMode.RGB) - image = image.cuda(self.gpu_id) + image = image.cuda(self.local_rank) src_image.append(self.transform(image)) seg_image.append([1] * ((self.image_height // self.patch_size) * (self.image_width // self.patch_size) + 1)) @@ -723,8 +723,8 @@ def __iter__(self): class AudioDataloader(Dataloader): - def __init__(self, args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle=False, model_for_dataloader=None): - super(AudioDataloader, self).__init__(args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle, model_for_dataloader) + def __init__(self, args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle=False, model_for_dataloader=None): + super(AudioDataloader, self).__init__(args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle, model_for_dataloader) self.dataset_folder = os.path.dirname(dataset_path) self.sampling_rate = args.sampling_rate self.normalize_means, self.normalize_vars, self.ceptral_normalize = True, True, True @@ -743,7 +743,7 @@ def __init__(self, args, dataset_path, batch_size, rank, world_size, gpu_id, shu if "sepcaugment" in args: self.specaugment = SpecAugment(args) -def utterance_cmvn(x, normalize_means=True, normalize_vars=True, gpu_id=None): +def utterance_cmvn(x, normalize_means=True, normalize_vars=True, local_rank=None): mean = x.mean(axis=0) square_sums = (x ** 2).sum(axis=0) @@ -751,8 +751,8 @@ def utterance_cmvn(x, normalize_means=True, normalize_vars=True, gpu_id=None): x = torch.sub(x, mean) if normalize_vars: var = square_sums / x.size(0) - mean ** 2 - if gpu_id is not None: - std = torch.sqrt(torch.maximum(var, torch.full(var.size(), 1e-10).cuda(gpu_id))) + if local_rank is not None: + std = torch.sqrt(torch.maximum(var, torch.full(var.size(), 1e-10).cuda(local_rank))) else: std = torch.sqrt(torch.maximum(var, torch.full(var.size(), 1e-10))) x = torch.div(x, std) @@ -766,7 +766,7 @@ def __iter__(self): import torchaudio import torchaudio.compliance.kaldi as ta_kaldi - padding_vector = torch.FloatTensor(self.audio_feature_size * [self.padding_value] if self.audio_feature_size > 1 else self.padding_value).unsqueeze(0).cuda(self.gpu_id) + padding_vector = torch.FloatTensor(self.audio_feature_size * [self.padding_value] if self.audio_feature_size > 1 else self.padding_value).unsqueeze(0).cuda(self.local_rank) while True: while self._empty(): self._fill_buf() @@ -790,11 +790,11 @@ def __iter__(self): waveform, _ = torchaudio.load(ins[2]) # waveform, sample_rate waveform = waveform * (2 ** 15) # Kaldi compliance: 16-bit signed integers - waveform = waveform.cuda(self.gpu_id) + waveform = waveform.cuda(self.local_rank) feature = ta_kaldi.fbank(waveform, num_mel_bins=self.audio_feature_size, sample_frequency=self.sampling_rate) if self.ceptral_normalize: - feature = utterance_cmvn(feature, self.normalize_means, self.normalize_vars, self.gpu_id) + feature = utterance_cmvn(feature, self.normalize_means, self.normalize_vars, self.local_rank) difference = self.max_audio_frames - feature.size(0) if difference < 0: continue @@ -824,8 +824,8 @@ def __iter__(self): class BeitDataloader(VisionDataloader): - def __init__(self, args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle=False, model_for_dataloader=None): - super(BeitDataloader, self).__init__(args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle, model_for_dataloader) + def __init__(self, args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle=False, model_for_dataloader=None): + super(BeitDataloader, self).__init__(args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle, model_for_dataloader) from tencentpretrain.utils.image_tokenizer import build_vqgan_model self.vqgan = self.model_for_dataloader @@ -871,7 +871,7 @@ def __iter__(self): for ins in instances: image = read_image(ins, ImageReadMode.RGB) - image = image.cuda(self.gpu_id) + image = image.cuda(self.local_rank) image = self.transform(image) src.append(image) image_tokens = [0] + image_tokenize(self.vqgan, image) @@ -888,8 +888,8 @@ def __iter__(self): class DalleDataloader(VisionDataloader): - def __init__(self, args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle=False, model_for_dataloader=None): - super(DalleDataloader, self).__init__(args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle, model_for_dataloader) + def __init__(self, args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle=False, model_for_dataloader=None): + super(DalleDataloader, self).__init__(args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle, model_for_dataloader) from tencentpretrain.utils.image_tokenizer import build_vqgan_model self.vqgan = self.model_for_dataloader self.vocab_bias = args.tokenizer.vocab_bias @@ -917,7 +917,7 @@ def __iter__(self): src_single, pad_num = ins[0] image = read_image(ins[2], ImageReadMode.RGB) - image = image.cuda(self.gpu_id) + image = image.cuda(self.local_rank) image = self.transform(image) image_tokens = [i + self.vocab_bias for i in image_tokenize(self.vqgan, image)] src_single.extend(image_tokens)