diff --git a/applications/physics/cosmology/cosmoflow/cosmoflow_model.py b/applications/physics/cosmology/cosmoflow/cosmoflow_model.py index 14522639f5b..e329eade709 100644 --- a/applications/physics/cosmology/cosmoflow/cosmoflow_model.py +++ b/applications/physics/cosmology/cosmoflow/cosmoflow_model.py @@ -12,7 +12,8 @@ def construct_cosmoflow_model(parallel_strategy, min_distconv_width, mlperf, transform_input, - dropout_keep_prob=0.5): + dropout_keep_prob=0.5, + cosine_schedule=None): # Construct layer graph universes = lbann.Input(data_field='samples') @@ -66,15 +67,20 @@ def construct_cosmoflow_model(parallel_strategy, # lbann.CallbackLinearGrowthLearningRate(target=learning_rate, num_epochs=5), # lbann.CallbackSetLearningRate(step=32, val=0.25 * learning_rate), # lbann.CallbackSetLearningRate(step=64, val=0.125 * learning_rate), - # lbann.CallbackCosineDecayLearningRate( - # lr_max=1e-3, - # lr_min=1e-5, - # decay_steps=10000, - # initial_warmup_learning_rate=0, - # warmup_steps=100 - # ), lbann.CallbackProgressBar(newline_interval=1, print_mem_usage=True) ] + + if cosine_schedule: + callbacks.append( + lbann.CallbackCosineDecayLearningRate( + lr_max=learning_rate, + lr_min=cosine_schedule['lr_min'], + decay_steps=cosine_schedule['decay_steps'], + initial_warmup_learning_rate=cosine_schedule['init_warmup_lr'], + warmup_steps=cosine_schedule['warmup_steps'] + ) + ) + return lbann.Model( epochs=num_epochs, layers=layers, diff --git a/applications/physics/cosmology/cosmoflow/train_cosmoflow.py b/applications/physics/cosmology/cosmoflow/train_cosmoflow.py index 904bc53ab2e..9eab637a335 100644 --- a/applications/physics/cosmology/cosmoflow/train_cosmoflow.py +++ b/applications/physics/cosmology/cosmoflow/train_cosmoflow.py @@ -165,6 +165,21 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any: parser.add_argument( '--dropout-keep-prob', action='store', type=float, default=0.5, help='Probability of keeping activations in dropout layers (default: 0.5). Set to 1 to disable dropout') + parser.add_argument( + '--cosine-schedule', action='store_true', + help='Use cosine learning rate scheduler') + parser.add_argument( + '--lr-min', action='store', type=float, default=0., + help='Minimum leaning rate for cosine scheduler') + parser.add_argument( + '--decay-steps', action='store', type=int, default=50000, + help='Steps to decay learning rate over for cosine scheduler') + parser.add_argument( + '--init-warmup-lr', action='store', type=float, default=0., + help='Initial warmup learning rate for cosine scheduler') + parser.add_argument( + '--warmup-steps', action='store', type=int, default=1000, + help='Number of steps to warmup learnign rate over with cosine scheduler') # Parallelism arguments parser.add_argument( @@ -226,6 +241,15 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any: channel_groups=args.channel_groups, filter_groups=args.filter_groups, depth_groups=args.depth_groups) + + cosine_scheduler_args = None + if args.cosine_schedule: + cosine_scheduler_args = { + 'lr_min': args.lr_min, + 'decay_steps': args.decay_steps, + 'init_warmup_lr': args.init_warmup_lr, + 'warmup_steps': args.warmup_steps + } model = cosmoflow_model.construct_cosmoflow_model(parallel_strategy=parallel_strategy, local_batchnorm=args.local_batchnorm, @@ -237,14 +261,14 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any: min_distconv_width=args.min_distconv_width, mlperf=args.mlperf, transform_input=args.transform_input, - dropout_keep_prob=args.dropout_keep_prob) + dropout_keep_prob=args.dropout_keep_prob, + cosine_schedule=cosine_scheduler_args) # Add profiling callbacks if needed. model.callbacks.extend(lbann.contrib.args.create_profile_callbacks(args)) # Setup optimizer optimizer = lbann.contrib.args.create_optimizer(args) - # optimizer.learn_rate *= 1e-2 # Setup data reader serialize_io = False diff --git a/include/lbann/data_ingestion/readers/data_reader_python_dataset.hpp b/include/lbann/data_ingestion/readers/data_reader_python_dataset.hpp index 6d51efc1cad..b4e1bb46b4a 100644 --- a/include/lbann/data_ingestion/readers/data_reader_python_dataset.hpp +++ b/include/lbann/data_ingestion/readers/data_reader_python_dataset.hpp @@ -127,6 +127,12 @@ class python_dataset_reader : public generic_data_reader #ifdef LBANN_HAS_DISTCONV /** @brief Whether or not tensor needs shuffling for distconv. */ bool m_tensor_shuffle_required = true; + /** @brief The current number of minibatches in the epoch that have been + * fetched and returned by fetch_data_block. This is not the same as + * m_dataset_minibatch_offset as that variable tracks the number of + * minibatches that have been queued which can be several minibatches ahead of + * the current one we are returning. */ + uint64_t m_fetched_minibatch_count; #endif // LBANN_HAS_DISTCONV }; diff --git a/src/data_ingestion/readers/data_reader_python_dataset.cpp b/src/data_ingestion/readers/data_reader_python_dataset.cpp index 07b7ea404d5..2277a3b4129 100644 --- a/src/data_ingestion/readers/data_reader_python_dataset.cpp +++ b/src/data_ingestion/readers/data_reader_python_dataset.cpp @@ -179,6 +179,14 @@ void python_dataset_reader::shuffle_responses(DataType* responses_ptr) // in a batch that can't be split evenly will be split evenly across the // first n ranks (or subsets of ranks in the distconv case). +#ifndef LBANN_BUILT_WITH_SPECTRUM + auto syncInfo = El::SyncInfo{}; + El::mpi::EnsureComm(m_comm->get_world_comm(), + syncInfo); + El::mpi::EnsureComm(m_comm->get_world_comm(), + syncInfo); +#endif + uint64_t rank = m_comm->get_rank_in_trainer(); uint64_t nprocs = m_comm->get_procs_per_trainer(); uint64_t trainer_rank = m_comm->get_trainer_rank(); @@ -187,13 +195,14 @@ void python_dataset_reader::shuffle_responses(DataType* responses_ptr) execution_mode mode = exec_mode_from_string(get_role()); dataset& ds = get_trainer().get_data_coordinator().get_dataset(mode); uint64_t global_mb_size{}; - if (m_dataset_minibatch_offset < (ds.get_num_iterations_per_epoch() - 1)) { + if (m_fetched_minibatch_count < (ds.get_num_iterations_per_epoch() - 1)) { global_mb_size = ds.get_mini_batch_size(); } - else if (m_dataset_minibatch_offset == + else if (m_fetched_minibatch_count == (ds.get_num_iterations_per_epoch() - 1)) { global_mb_size = ds.get_last_mini_batch_size(); } + m_fetched_minibatch_count++; uint64_t local_mb_size = global_mb_size / nprocs; uint64_t extra_samples = global_mb_size % nprocs; @@ -234,10 +243,21 @@ void python_dataset_reader::shuffle_responses(DataType* responses_ptr) } } else if (rank == recv_rank) { +#ifdef LBANN_BUILT_WITH_SPECTRUM + EL_CHECK_MPI_CALL( + MPI_Recv(&responses_ptr[recv_rank_count * m_num_responses], + m_num_responses * sizeof(DataType), + MPI_BYTE, + m_comm->get_world_rank(trainer_rank, send_rank), + 0, + m_comm->get_world_comm().GetMPIComm(), + nullptr)); +#else m_comm->recv(&responses_ptr[recv_rank_count * m_num_responses], m_num_responses, trainer_rank, send_rank); +#endif } send_rank_count += 1; @@ -344,6 +364,9 @@ void python_dataset_reader::queue_epoch() m_dataset_minibatch_offset = 0; m_dataset_sample_offset = 0; m_queued_samples = 0; +#ifdef LBANN_HAS_DISTCONV + m_fetched_minibatch_count = 0; +#endif // LBANN_HAS_DISTCONV // Prefetch the first set of samples (if less than minibatch size, the first // minibatch read will take care of the rest)