diff --git a/megatron/arguments.py b/megatron/arguments.py index de16fd5b3..46bfd5bb4 100644 --- a/megatron/arguments.py +++ b/megatron/arguments.py @@ -546,6 +546,15 @@ def _add_checkpointing_args(parser): 'or rng state from checkpoint and set iteration to 0. ' 'Assumed when loading a release checkpoint.') + group.add_argument('--scr', action='store_true', default=None, + help='Enable SCR for checkpointing.') + group.add_argument('--scr-interval', type=int, default=None, + help='Number of iterations between defensive checkpoints.') + group.add_argument('--scr-seconds', type=float, default=None, + help='Number of seconds between defensive checkpoints.') + group.add_argument('--scr-overhead', type=float, default=None, + help='Maximum runtime percentage for defensive checkpoints.') + return parser diff --git a/megatron/checkpointing.py b/megatron/checkpointing.py index f7328dcbb..d32a8df7e 100644 --- a/megatron/checkpointing.py +++ b/megatron/checkpointing.py @@ -119,6 +119,10 @@ def save_checkpoint(iteration, model, optimizer, lr_scheduler): print_rank_0('saving checkpoint at iteration {:7d} to {}'.format( iteration, args.save)) + # SCR: the call to start output lives in DeepSpeed right now + #if args.scr: + # scr.start_output(checkpoint_name, scr.FLAG_CHECKPOINT) + if not torch.distributed.is_initialized() or mpu.get_data_parallel_rank() == 0 \ or args.deepspeed: @@ -184,6 +188,10 @@ def save_checkpoint(iteration, model, optimizer, lr_scheduler): if torch.distributed.is_initialized(): torch.distributed.barrier() + # SCR: the call to start output lives in DeepSpeed right now + #if args.scr: + # scr.complete_output(valid) + # since the code can be exited or aborted in various places we use the checkpoint saving as # a save saving point for the codecarbon tracker. If the program doesn't run to its normal # end, then only the data since the last saved checkpoint will be lost. diff --git a/megatron/initialize.py b/megatron/initialize.py index a00502772..373f590e6 100644 --- a/megatron/initialize.py +++ b/megatron/initialize.py @@ -33,6 +33,9 @@ import deepspeed +from mpi4py import MPI +import scr + def initialize_megatron(extra_args_provider=None, args_defaults={}, ignore_unknown_args=False, allow_no_cuda=False): @@ -66,6 +69,31 @@ def finish_mpu_init(): _set_random_seed(args.seed) args = get_args() + + # SCR: point SCR_PREFIX to checkpoint path + if args.scr: + # SCR only supports a single directory to both read previous checkpoints and to write new checkpoints + if args.save != args.load: + raise ValueError(f"--save {args.save} must match --load {args.load} when using SCR") + + # SCR will default to use the current working dir if args.save not specified + if args.save is not None: + scr.config(f"SCR_PREFIX={args.save}") + + # DeepSpeed expects files to be on global file system + # This will flush any cached checkpoint to the file system on restart + scr.config("SCR_GLOBAL_RESTART=1") + + # Configure seconds between checkpoints if user provided a limit. + if args.scr_seconds is not None: + scr.config(f"SCR_CHECKPOINT_SECONDS={args.scr_seconds}") + + # Configure max percentage of runtime for checkpointing if user provided a limit. + if args.scr_overhead is not None: + scr.config(f"SCR_CHECKPOINT_OVERHEAD={args.scr_overhead}") + + scr.init() + if args.lazy_mpu_init: args.use_cpu_initialization=True # delayed initialization of DDP-related stuff @@ -208,7 +236,8 @@ def _initialize_distributed(): args.local_rank = device torch.cuda.set_device(device) # Call the init process - init_method = 'tcp://' + #init_method = 'tcp://' + init_method = 'env://' master_ip = os.getenv('MASTER_ADDR', 'localhost') master_port = os.getenv('MASTER_PORT', '6000') init_method += master_ip + ':' + master_port diff --git a/megatron/training.py b/megatron/training.py index f66544dff..669d85a00 100644 --- a/megatron/training.py +++ b/megatron/training.py @@ -55,6 +55,9 @@ import deepspeed +# SCR: import scalable checkpoint/restart library +import scr + def print_datetime(string): """Note that this call will sync across all ranks.""" @@ -167,6 +170,10 @@ def pretrain(train_valid_test_dataset_provider, codecarbon_tracker_stop() + # SCR: flush any cached checkpoint + if args.scr: + scr.finalize() + def update_train_iters(args): @@ -730,6 +737,31 @@ def train(forward_step_func, model, optimizer, lr_scheduler, lr_scheduler) saved_checkpoint = True + # SCR: Take a defensive checkpoint if it's time + if args.save and args.scr and args.scr_interval and \ + iteration % args.scr_interval == 0: + if not saved_checkpoint: + save_checkpoint_and_time(iteration, model, optimizer, + lr_scheduler) + saved_checkpoint = True + + # SCR: Take a defensive checkpoint if SCR recommends its + if args.save and args.scr and scr.need_checkpoint(): + if not saved_checkpoint: + save_checkpoint_and_time(iteration, model, optimizer, + lr_scheduler) + saved_checkpoint = True + + # SCR: Save checkpiont and exit run if SCR recommends its + #if args.save and args.scr and scr.should_exit(): + # if not saved_checkpoint: + # save_checkpoint_and_time(iteration, model, optimizer, + # lr_scheduler) + # torch.distributed.barrier() + # print_datetime('exiting program at iteration {}'.format(iteration)) + # scr.finalize() + # sys.exit() + # Exiting based on duration if args.exit_duration_in_mins: train_time = (time.time() - _TRAIN_START_TIME) / 60.0 @@ -743,6 +775,11 @@ def train(forward_step_func, model, optimizer, lr_scheduler, save_checkpoint_and_time(iteration, model, optimizer, lr_scheduler) print_datetime('exiting program after {} minutes'.format(train_time)) + + # SCR: finalize to flush any cached checkpoint + if args.scr: + scr.finalize() + sys.exit() # Exiting based on iterations @@ -752,6 +789,11 @@ def train(forward_step_func, model, optimizer, lr_scheduler, lr_scheduler) torch.distributed.barrier() print_datetime('exiting program at iteration {}'.format(iteration)) + + # SCR: finalize to flush any cached checkpoint + if args.scr: + scr.finalize() + sys.exit() diff --git a/pretrain_gpt_scr.py b/pretrain_gpt_scr.py new file mode 100644 index 000000000..d2f23a4a2 --- /dev/null +++ b/pretrain_gpt_scr.py @@ -0,0 +1,233 @@ +# coding=utf-8 +# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pretrain GPT""" + +import torch +from functools import partial +from megatron import get_args +from megatron import print_rank_0 +from megatron import get_timers +from megatron import get_tokenizer +from megatron import mpu +from megatron.data.gpt_dataset import build_train_valid_test_datasets +from megatron.model import GPTModel, GPTModelPipe +from megatron.training import pretrain +from megatron.utils import get_ltor_masks_and_position_ids +from megatron.utils import average_losses_across_data_parallel_group + +import deepspeed +from deepspeed.runtime.utils import see_memory_usage +import os +import subprocess + + +def model_provider(pre_process=True, post_process=True): + """Build the model.""" + + print_rank_0('building GPT model ...') + see_memory_usage(f"Before Building Model", force=True) + + args = get_args() + with deepspeed.zero.Init(data_parallel_group=mpu.get_data_parallel_group(), + remote_device=None if args.remote_device == 'none' else args.remote_device, + config_dict_or_path=args.deepspeed_config, + enabled=args.zero_stage == 3, + mpu=mpu): + if args.deepspeed: + model = GPTModelPipe( + num_tokentypes=0, + parallel_output=True + ) + # This is a hack to give us a reference to get_batch_pipe from within training.py + # We need to call model.set_batch_fn after deepspeed.initialize + model._megatron_batch_fn = get_batch_pipe + + # Predompute the attention mask and store it in args. This avoids having to + # pipeline it as an activation during training. The mask is constant, and thus + # we can reuse it. + attention_mask = torch.tril(torch.ones( + (1, args.seq_length, args.seq_length), device=torch.cuda.current_device())).view( + 1, 1, args.seq_length, args.seq_length) + + # Convert attention mask to binary: + attention_mask = (attention_mask < 0.5) + if args.fp16: + attention_mask = attention_mask.half() + elif args.bf16: + attention_mask = attention_mask.bfloat16() + + # must be bool or the training crashes expecting bool, but getting Half + args.attn_mask = attention_mask.to(torch.bool) + + else: + model = GPTModel( + num_tokentypes=0, + parallel_output=True, + pre_process=pre_process, + post_process=post_process + ) + see_memory_usage(f"After Building Model", force=True) + return model + + +def get_batch(data_iterator): + """Generate a batch""" + args = get_args() + tokenizer = get_tokenizer() + + # Items and their type. + keys = ['text'] + datatype = torch.int64 + + # Broadcast data. + if data_iterator is not None: + data = next(data_iterator) + else: + data = None + data_b = mpu.broadcast_data(keys, data, datatype) + + # Unpack. + tokens_ = data_b['text'].long() + labels = tokens_[:, 1:].contiguous() + tokens = tokens_[:, :-1].contiguous() + + # Get the masks and postition ids. + attention_mask, loss_mask, position_ids = get_ltor_masks_and_position_ids( + tokens, + tokenizer.eod, + args.reset_position_ids, + args.reset_attention_mask, + args.eod_mask_loss) + + return tokens, labels, loss_mask, attention_mask, position_ids + + +def get_batch_pipe(data): + """Modification of `get_batch` to work on `next(data_iterator)` instead of `data_iterator`""" + args = get_args() + tokenizer = get_tokenizer() + + # Items and their type. + keys = ['text'] + datatype = torch.int64 + + # Broadcast data. + data_b = mpu.broadcast_data(keys, data, datatype) + + # Unpack. + tokens_ = data_b['text'].long() + labels = tokens_[:, 1:].contiguous() + tokens = tokens_[:, :-1].contiguous() + + # Get the masks and postition ids. + attention_mask, loss_mask, position_ids = get_ltor_masks_and_position_ids( + tokens, + tokenizer.eod, + args.reset_position_ids, + args.reset_attention_mask, + args.eod_mask_loss) + + return (tokens, position_ids, attention_mask), (labels, loss_mask) + + +def loss_func(loss_mask, output_tensor): + losses = output_tensor.float() + loss_mask = loss_mask.view(-1).float() + loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum() + + # Reduce loss for logging. + averaged_loss = average_losses_across_data_parallel_group([loss]) + + return loss, {'lm loss': averaged_loss[0]} + + +def forward_step(data_iterator, model): + """Forward step.""" + args = get_args() + timers = get_timers() + + # Get the batch. + timers('batch-generator').start() + tokens, labels, loss_mask, attention_mask, position_ids = get_batch( + data_iterator) + timers('batch-generator').stop() + + output_tensor = model(tokens, position_ids, attention_mask, + labels=labels) + + return output_tensor, partial(loss_func, loss_mask) + + +def train_valid_test_datasets_provider(train_val_test_num_samples): + """Build train, valid, and test datasets.""" + args = get_args() + + print_rank_0('> building train, validation, and test datasets ' + 'for GPT ...') + train_ds, valid_ds, test_ds = build_train_valid_test_datasets( + data_prefix=args.data_path, + data_impl=args.data_impl, + splits_string=args.split, + train_valid_test_num_samples=train_val_test_num_samples, + seq_length=args.seq_length, + seed=args.seed, + skip_warmup=(not args.mmap_warmup)) + print_rank_0("> finished creating GPT datasets ...") + + return train_ds, valid_ds, test_ds + + +def command_exists(cmd): + result = subprocess.Popen(f'type {cmd}', stdout=subprocess.PIPE, shell=True) + return result.wait() == 0 + + +def git_ds_info(): + from deepspeed.env_report import main as ds_report + ds_report() + + # Write out version/git info + git_hash_cmd = "git rev-parse --short HEAD" + git_branch_cmd = "git rev-parse --abbrev-ref HEAD" + if command_exists('git'): + try: + result = subprocess.check_output(git_hash_cmd, shell=True) + git_hash = result.decode('utf-8').strip() + result = subprocess.check_output(git_branch_cmd, shell=True) + git_branch = result.decode('utf-8').strip() + except subprocess.CalledProcessError: + git_hash = "unknown" + git_branch = "unknown" + else: + git_hash = "unknown" + git_branch = "unknown" + print(f'**** Git info for Megatron: git_hash={git_hash} git_branch={git_branch} ****') + + +if __name__ == "__main__": + if 'OMPI_COMM_WORLD_RANK' in os.environ: + os.environ["RANK"] = os.environ['OMPI_COMM_WORLD_RANK'] + if 'OMPI_COMM_WORLD_SIZE' in os.environ: + os.environ["WORLD_SIZE"] = os.environ['OMPI_COMM_WORLD_SIZE'] + if 'OMPI_COMM_WORLD_LOCAL_RANK' in os.environ: + os.environ["LOCAL_RANK"] = os.environ['OMPI_COMM_WORLD_LOCAL_RANK'] + + rank = int(os.environ["RANK"]) + if rank == 0: + git_ds_info() + + pretrain(train_valid_test_datasets_provider, model_provider, forward_step, + args_defaults={'tokenizer_type': 'GPT2BPETokenizer'}) diff --git a/tr1-3layer-scr.jsrun b/tr1-3layer-scr.jsrun new file mode 100755 index 000000000..f6e3c8caf --- /dev/null +++ b/tr1-3layer-scr.jsrun @@ -0,0 +1,202 @@ +#!/bin/bash + +source /usr/WS2/moody20/projects/Megatron-DeepSpeed.git/examples/llnlenv.sh + +# Install SCR into python environment +#pushd /g/g0/moody20/packages/scr/dist/scr-lassen/scr-top-develop/install/share/scr/python +#python setup.py install +#popd +#exit 0 + +export SCR_DEBUG=1 +export SCR_CACHE_BYPASS=0 +export SCR_COPY_TYPE=SINGLE + +#conda install psutil +#exit 0 + +set -x -e + +#source $six_ALL_CCFRWORK/code/tr1-13B/bigscience/train/tr1-13B-base/start-tr1-13B + +echo "START TIME: $(date)" + +DATA_OUTPUT_PATH=${CKPTDIR}/tr1-3layer +CHECKPOINT_PATH=$DATA_OUTPUT_PATH/checkpoints +TENSORBOARD_PATH=$DATA_OUTPUT_PATH/tensorboard +CODECARBON_PATH=$DATA_OUTPUT_PATH/codecarbon +LOGS_PATH=$DATA_OUTPUT_PATH/logs + +MEGATRON_DEEPSPEED_REPO=$NFSDIR + +VOCAB_FILE=${DATADIR}/openwebtext/gpt2-vocab.json +MERGE_FILE=${DATADIR}/openwebtext/gpt2-merges.txt +DATA_PATH=${DATADIR}/openwebtext/oscar-shuf-eod-gpt2bpe_text_document + +cd $MEGATRON_DEEPSPEED_REPO + +GPUS_PER_NODE=4 +NNODES=64 # switch to 64 +TP_SIZE=4 # always fixed to the size of a single node +PP_SIZE=4 # NLAYERS must be a multiple of PP_SIZE here +#DP_SIZE=$NNODES*$GPUS_PER_NODE/($PP_SIZE*$TP_SIZE) # will get derived automatically by trainer + +# GLOBAL_BATCH_SIZE has to be divisible by MICRO_BATCH_SIZE*DP_size +# GLOBAL_BATCH_SIZE=$(($MICRO_BATCH_SIZE*$GAS*$DP_SIZE)) - GAS is auto-derived by deepspeed +MICRO_BATCH_SIZE=1 +GLOBAL_BATCH_SIZE=1536 + +NLAYERS=4 +NHIDDEN=10752 +#NHIDDEN=5120 +NHEADS=84 +#NHEADS=40 +FFN_HIDDEN_SIZE=$(($NHIDDEN * 4)) +SEQ_LEN=2048 +VOCAB_SIZE=50257 + +SAVE_INTERVAL=20 + +OPTIMIZER_ARGS=" \ + --optimizer adam \ + --adam-beta1 0.9 \ + --adam-beta2 0.95 \ + --adam-eps 1e-8 \ + --lr 6.0e-5 \ + --min-lr 6.0e-6 \ + --lr-decay-style cosine \ + --lr-decay-samples 126_953_125 \ + --lr-warmup-samples 183_105 \ + --clip-grad 1.0 \ + --weight-decay 1e-1 \ + --init-method-std 0.006 \ + " + +EXIT_OPTS=" \ + --exit-duration-in-mins 10 \ + " + +GPT_ARGS=" \ + --num-layers $NLAYERS \ + --hidden-size $NHIDDEN \ + --ffn-hidden-size $FFN_HIDDEN_SIZE \ + --num-attention-heads $NHEADS \ + --seq-length $SEQ_LEN \ + --max-position-embeddings $SEQ_LEN \ + --micro-batch-size $MICRO_BATCH_SIZE \ + --rampup-batch-size 16 16 5_859_375 \ + --global-batch-size $GLOBAL_BATCH_SIZE \ + --train-samples 146_484_375 \ + --vocab-file $VOCAB_FILE \ + --merge-file $MERGE_FILE \ + --loss-scale 12 \ + --fp16 \ + --checkpoint-activations \ + --seed 42 + $OPTIMIZER_ARGS \ + $EXIT_OPTS \ + " + +# codecarbon isn't ready, will enable once it's working +# --codecarbon-dir $CODECARBON_PATH \ +OUTPUT_ARGS=" \ + --log-interval 10 \ + --save-interval $SAVE_INTERVAL \ + --eval-interval 1000 \ + --eval-iters 40 \ + --tensorboard-dir $TENSORBOARD_PATH \ + --tensorboard-queue-size 5 \ + --log-timers-to-tensorboard \ + --log-batch-size-to-tensorboard \ + --log-validation-ppl-to-tensorboard \ + " + +ZERO_STAGE=1 +#ZERO_STAGE=3 + +#config_json="./ds_config.$SLURM_JOBID.json" +config_json="./ds_config.$LSB_JOBID.json" + +# "zero_optimization": { +# "stage": $ZERO_STAGE +# }, +# "zero_optimization": { +# "stage": $ZERO_STAGE, +# "offload_optimizer": { +# "device": "cpu" +# }, +# "offload_param": { +# "device": "cpu" +# } +# }, +# Deepspeed figures out GAS dynamically from dynamic GBS via set_train_batch_size() +cat < $config_json +{ + "train_micro_batch_size_per_gpu": $MICRO_BATCH_SIZE, + "train_batch_size": $GLOBAL_BATCH_SIZE, + "gradient_clipping": 1.0, + "zero_optimization": { + "stage": $ZERO_STAGE + }, + "fp16": { + "enabled": true, + "loss_scale": 0, + "loss_scale_window": 500, + "hysteresis": 2, + "min_loss_scale": 1, + "initial_scale_power": 12 + }, + "steps_per_print": 2000, + "wall_clock_breakdown": false +} +EOT + + +# to use offload, add the following to deepspeed config json +# "zero_optimization": { +# "stage": $ZERO_STAGE, +# "offload_optimizer": { +# "device": "cpu" +# }, +# "offload_param": { +# "device": "cpu" +# } +# }, +# and set --cpu-optimizer in deepspeed args +# however, it cannot currently be used with megatron: +# https://github.com/microsoft/Megatron-DeepSpeed/blob/1a74a0513441cf4cc6444ada028fee2bc335b236/megatron/optimizer/__init__.py#L54 + +DEEPSPEED_ARGS=" \ + --deepspeed \ + --deepspeed_config ${config_json} \ + --zero-stage ${ZERO_STAGE} \ + --deepspeed-activation-checkpointing \ + " + +export CMD=" \ + `pwd`/pretrain_gpt_scr.py \ + --tensor-model-parallel-size $TP_SIZE \ + --pipeline-model-parallel-size $PP_SIZE \ + $GPT_ARGS \ + $OUTPUT_ARGS \ + --save $CHECKPOINT_PATH \ + --load $CHECKPOINT_PATH \ + --data-path $DATA_PATH \ + --data-impl mmap \ + --split 949,50,1 \ + --distributed-backend nccl \ + $DEEPSPEED_ARGS \ + " + +echo $CMD + +#If the trainer hangs in compiling and loading fused kernels it means it dropped a lock file, delete it and restart: +rm -f ./megatron/fused_kernels/build/lock +sleep 2 + +#export NCCL_DEBUG=INFO +jsrun --stdio_mode prepended --bind=none -r 4 -c 10 python3 $CMD + +echo "END TIME: $(date)" + +#