Skip to content

Commit

Permalink
Rename variables (#100)
Browse files Browse the repository at this point in the history
* Update

* Update

* Update
  • Loading branch information
hhou435 authored Oct 11, 2023
1 parent 0ccdcf5 commit 554c078
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 54 deletions.
6 changes: 3 additions & 3 deletions pretrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ def main():
elif args.world_size == 1 and ranks_num == 1:
# Single GPU mode.
assert torch.cuda.is_available(), "No available GPUs."
args.gpu_id = args.gpu_ranks[0]
assert args.gpu_id < torch.cuda.device_count(), "Invalid specified GPU device."
args.local_rank = args.gpu_ranks[0]
assert args.local_rank < torch.cuda.device_count(), "Invalid specified GPU device."
args.dist_train = False
args.single_gpu = True
print("Using GPU %d for training." % args.gpu_id)
print("Using GPU %d for training." % args.local_rank)
else:
# CPU mode.
assert ranks_num == 0, "GPUs are specified, please check the arguments."
Expand Down
52 changes: 24 additions & 28 deletions tencentpretrain/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def train_and_validate(args):
mp.spawn(worker, nprocs=args.ranks_num, args=(args.gpu_ranks, args, model_for_training, model_for_dataloader), daemon=False)
elif args.single_gpu:
# Single GPU mode.
worker(args.gpu_id, None, args, model_for_training, model_for_dataloader)
worker(args.local_rank, None, args, model_for_training, model_for_dataloader)
else:
# CPU mode.
worker(None, None, args, model_for_training, model_for_dataloader)
Expand Down Expand Up @@ -114,18 +114,18 @@ def report_and_reset_stats(self):

raise NotImplementedError

def train(self, args, gpu_id, rank, loader, model, optimizer, scheduler):
def train(self, args, local_rank, global_rank, loader, model, optimizer, scheduler):
model.train()
loader_iter = iter(loader)
while True:
if self.current_step == self.total_steps + 1:
break
batch = list(next(loader_iter))
self.seq_length = batch[0].size(1)
if gpu_id is not None:
if local_rank is not None:
for i in range(len(batch)):
if torch.is_tensor(batch[i]):
batch[i] = batch[i].cuda(gpu_id)
batch[i] = batch[i].cuda(local_rank)

loss = self.forward_propagation(batch, model)

Expand All @@ -143,21 +143,21 @@ def train(self, args, gpu_id, rank, loader, model, optimizer, scheduler):
model.zero_grad()

if self.current_step % self.report_steps == 0 and \
(not self.dist_train or (self.dist_train and rank == 0)):
(not self.dist_train or (self.dist_train and global_rank == 0)):
self.report_and_reset_stats()
self.start_time = time.time()

if args.deepspeed:
if self.current_step % self.save_checkpoint_steps == 0:
if args.use_lora:
if rank == 0:
if global_rank == 0:
save_model(model, self.output_model_path + "-" + str(self.current_step), args.use_lora)
else:
model.save_checkpoint(self.output_model_path, str(self.current_step))

else:
if self.current_step % self.save_checkpoint_steps == 0 and \
(not self.dist_train or (self.dist_train and rank == 0)):
(not self.dist_train or (self.dist_train and global_rank == 0)):
save_model(model, self.output_model_path + "-" + str(self.current_step), args.use_lora)

self.current_step += 1
Expand Down Expand Up @@ -559,11 +559,11 @@ class AlpacaTrainer(LmTrainer):
"beit": BeitTrainer, "dalle": DalleTrainer, "alpaca": AlpacaTrainer}


def worker(proc_id, gpu_ranks, args, model_for_training, model_for_dataloader=None):
def worker(local_rank, gpu_ranks, args, model_for_training, model_for_dataloader=None):
"""
Args:
proc_id: The id of GPU for single GPU mode;
The id of process (and GPU) for multiprocessing distributed mode.
local_rank: The id of GPU for single GPU mode;
The id of process (and GPU) for multiprocessing distributed mode.
gpu_ranks: List of ranks of each process.
"""
set_seed(args.seed)
Expand All @@ -574,17 +574,13 @@ def worker(proc_id, gpu_ranks, args, model_for_training, model_for_dataloader=No
if args.deepspeed:
import deepspeed
deepspeed.init_distributed(dist_backend=args.backend)
rank = dist.get_rank()
gpu_id = proc_id
global_rank = dist.get_rank()
elif args.dist_train:
rank = gpu_ranks[proc_id]
gpu_id = proc_id
global_rank = gpu_ranks[local_rank]
elif args.single_gpu:
rank = None
gpu_id = proc_id
global_rank = None
else:
rank = None
gpu_id = None
global_rank = None

# Build optimizer.
param_optimizer = list(model_for_training.named_parameters())
Expand Down Expand Up @@ -634,10 +630,10 @@ def worker(proc_id, gpu_ranks, args, model_for_training, model_for_dataloader=No
if load_path is None:
raise ValueError(f"[deepspeed] failed to resume from checkpoint {args.resume_from_checkpoint}")
else:
if gpu_id is not None:
model_for_training.cuda(gpu_id)
if local_rank is not None:
model_for_training.cuda(local_rank)
if model_for_dataloader is not None:
model_for_dataloader.cuda(gpu_id)
model_for_dataloader.cuda(local_rank)
optimizer = custom_optimizer
scheduler = custom_scheduler

Expand All @@ -646,21 +642,21 @@ def worker(proc_id, gpu_ranks, args, model_for_training, model_for_dataloader=No
dist.init_process_group(backend=args.backend,
init_method=args.master_ip,
world_size=args.world_size,
rank=rank)
model_for_training = DistributedDataParallel(model_for_training, device_ids=[gpu_id], find_unused_parameters=True)
rank=global_rank)
model_for_training = DistributedDataParallel(model_for_training, device_ids=[local_rank], find_unused_parameters=True)
if model_for_dataloader is not None:
model_for_dataloader = DistributedDataParallel(model_for_dataloader, device_ids=[gpu_id], find_unused_parameters=False)
args.logger.info("Worker %d is training ... " % rank)
model_for_dataloader = DistributedDataParallel(model_for_dataloader, device_ids=[local_rank], find_unused_parameters=False)
args.logger.info("Worker %d is training ... " % global_rank)
else:
args.logger.info("Worker is training ...")

if args.dist_train:
if model_for_dataloader is not None:
model_for_dataloader = model_for_dataloader.module
train_loader = str2dataloader[args.data_processor](args, args.dataset_path, args.batch_size, rank, args.world_size, gpu_id, True, model_for_dataloader)
train_loader = str2dataloader[args.data_processor](args, args.dataset_path, args.batch_size, global_rank, args.world_size, local_rank, True, model_for_dataloader)
else:
train_loader = str2dataloader[args.data_processor](args, args.dataset_path, args.batch_size, 0, 1, gpu_id, True, model_for_dataloader)
train_loader = str2dataloader[args.data_processor](args, args.dataset_path, args.batch_size, 0, 1, local_rank, True, model_for_dataloader)


trainer = str2trainer[args.data_processor](args)
trainer.train(args, gpu_id, rank, train_loader, model_for_training, optimizer, scheduler)
trainer.train(args, local_rank, global_rank, train_loader, model_for_training, optimizer, scheduler)
46 changes: 23 additions & 23 deletions tencentpretrain/utils/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@


class Dataloader(object):
def __init__(self, args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle=False, model_for_dataloader=None):
def __init__(self, args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle=False, model_for_dataloader=None):
self.tokenizer = args.tokenizer
self.batch_size = batch_size
self.instances_buffer_size = args.instances_buffer_size
self.rank = rank
self.global_rank = global_rank
self.world_size = world_size
self.gpu_id = gpu_id
self.local_rank = local_rank
self.shuffle = shuffle
self.model_for_dataloader = model_for_dataloader
self.dataset_reader = open(dataset_path, "rb")
Expand All @@ -35,7 +35,7 @@ def _fill_buf(self):
while True:
instance = pickle.load(self.dataset_reader)
self.read_count += 1
if (self.read_count - 1) % self.world_size == self.rank:
if (self.read_count - 1) % self.world_size == self.global_rank:
self.buffer.append(instance)
if len(self.buffer) >= self.instances_buffer_size:
break
Expand Down Expand Up @@ -541,8 +541,8 @@ def __iter__(self):


class VisionDataloader(Dataloader):
def __init__(self, args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle=False, model_for_dataloader=None):
super(VisionDataloader, self).__init__(args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle, model_for_dataloader)
def __init__(self, args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle=False, model_for_dataloader=None):
super(VisionDataloader, self).__init__(args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle, model_for_dataloader)
self.patch_size = args.patch_size
self.image_height = args.image_height
self.image_width = args.image_width
Expand Down Expand Up @@ -593,7 +593,7 @@ def __iter__(self):
for ins in instances:

image = read_image(ins[1], ImageReadMode.RGB)
image = image.cuda(self.gpu_id)
image = image.cuda(self.local_rank)
src.append(self.transform(image))
tgt.append(ins[0])
seg.append([1] * ((self.image_height // self.patch_size) * (self.image_width // self.patch_size) + 1))
Expand Down Expand Up @@ -658,7 +658,7 @@ def __iter__(self):

seg_image = [2] * ((self.image_height // self.patch_size) * (self.image_width // self.patch_size) + 1)
tgt_mlm[-1].extend([0] * len(seg_image))
image = image.cuda(self.gpu_id)
image = image.cuda(self.local_rank)
src_image_single = self.transform(image)
src_image.append(src_image_single)
seg.append([1] * ins[1][0] + [0] * pad_num + seg_image)
Expand Down Expand Up @@ -712,7 +712,7 @@ def __iter__(self):
src_text.append(src_text_single)
seg_text.append([1] * ins[1][0] + [0] * pad_num)
image = read_image(ins[2], ImageReadMode.RGB)
image = image.cuda(self.gpu_id)
image = image.cuda(self.local_rank)
src_image.append(self.transform(image))
seg_image.append([1] * ((self.image_height // self.patch_size) * (self.image_width // self.patch_size) + 1))

Expand All @@ -723,8 +723,8 @@ def __iter__(self):


class AudioDataloader(Dataloader):
def __init__(self, args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle=False, model_for_dataloader=None):
super(AudioDataloader, self).__init__(args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle, model_for_dataloader)
def __init__(self, args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle=False, model_for_dataloader=None):
super(AudioDataloader, self).__init__(args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle, model_for_dataloader)
self.dataset_folder = os.path.dirname(dataset_path)
self.sampling_rate = args.sampling_rate
self.normalize_means, self.normalize_vars, self.ceptral_normalize = True, True, True
Expand All @@ -743,16 +743,16 @@ def __init__(self, args, dataset_path, batch_size, rank, world_size, gpu_id, shu
if "sepcaugment" in args:
self.specaugment = SpecAugment(args)

def utterance_cmvn(x, normalize_means=True, normalize_vars=True, gpu_id=None):
def utterance_cmvn(x, normalize_means=True, normalize_vars=True, local_rank=None):
mean = x.mean(axis=0)
square_sums = (x ** 2).sum(axis=0)

if normalize_means:
x = torch.sub(x, mean)
if normalize_vars:
var = square_sums / x.size(0) - mean ** 2
if gpu_id is not None:
std = torch.sqrt(torch.maximum(var, torch.full(var.size(), 1e-10).cuda(gpu_id)))
if local_rank is not None:
std = torch.sqrt(torch.maximum(var, torch.full(var.size(), 1e-10).cuda(local_rank)))
else:
std = torch.sqrt(torch.maximum(var, torch.full(var.size(), 1e-10)))
x = torch.div(x, std)
Expand All @@ -766,7 +766,7 @@ def __iter__(self):
import torchaudio
import torchaudio.compliance.kaldi as ta_kaldi

padding_vector = torch.FloatTensor(self.audio_feature_size * [self.padding_value] if self.audio_feature_size > 1 else self.padding_value).unsqueeze(0).cuda(self.gpu_id)
padding_vector = torch.FloatTensor(self.audio_feature_size * [self.padding_value] if self.audio_feature_size > 1 else self.padding_value).unsqueeze(0).cuda(self.local_rank)
while True:
while self._empty():
self._fill_buf()
Expand All @@ -790,11 +790,11 @@ def __iter__(self):

waveform, _ = torchaudio.load(ins[2]) # waveform, sample_rate
waveform = waveform * (2 ** 15) # Kaldi compliance: 16-bit signed integers
waveform = waveform.cuda(self.gpu_id)
waveform = waveform.cuda(self.local_rank)
feature = ta_kaldi.fbank(waveform, num_mel_bins=self.audio_feature_size,
sample_frequency=self.sampling_rate)
if self.ceptral_normalize:
feature = utterance_cmvn(feature, self.normalize_means, self.normalize_vars, self.gpu_id)
feature = utterance_cmvn(feature, self.normalize_means, self.normalize_vars, self.local_rank)
difference = self.max_audio_frames - feature.size(0)
if difference < 0:
continue
Expand Down Expand Up @@ -824,8 +824,8 @@ def __iter__(self):

class BeitDataloader(VisionDataloader):

def __init__(self, args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle=False, model_for_dataloader=None):
super(BeitDataloader, self).__init__(args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle, model_for_dataloader)
def __init__(self, args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle=False, model_for_dataloader=None):
super(BeitDataloader, self).__init__(args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle, model_for_dataloader)
from tencentpretrain.utils.image_tokenizer import build_vqgan_model
self.vqgan = self.model_for_dataloader

Expand Down Expand Up @@ -871,7 +871,7 @@ def __iter__(self):
for ins in instances:

image = read_image(ins, ImageReadMode.RGB)
image = image.cuda(self.gpu_id)
image = image.cuda(self.local_rank)
image = self.transform(image)
src.append(image)
image_tokens = [0] + image_tokenize(self.vqgan, image)
Expand All @@ -888,8 +888,8 @@ def __iter__(self):

class DalleDataloader(VisionDataloader):

def __init__(self, args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle=False, model_for_dataloader=None):
super(DalleDataloader, self).__init__(args, dataset_path, batch_size, rank, world_size, gpu_id, shuffle, model_for_dataloader)
def __init__(self, args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle=False, model_for_dataloader=None):
super(DalleDataloader, self).__init__(args, dataset_path, batch_size, global_rank, world_size, local_rank, shuffle, model_for_dataloader)
from tencentpretrain.utils.image_tokenizer import build_vqgan_model
self.vqgan = self.model_for_dataloader
self.vocab_bias = args.tokenizer.vocab_bias
Expand Down Expand Up @@ -917,7 +917,7 @@ def __iter__(self):
src_single, pad_num = ins[0]

image = read_image(ins[2], ImageReadMode.RGB)
image = image.cuda(self.gpu_id)
image = image.cuda(self.local_rank)
image = self.transform(image)
image_tokens = [i + self.vocab_bias for i in image_tokenize(self.vqgan, image)]
src_single.extend(image_tokens)
Expand Down

0 comments on commit 554c078

Please sign in to comment.