diff --git a/applications/predict_regressor_torch.py b/applications/predict_regressor_torch.py index 83ff3dd..2b6d8d3 100644 --- a/applications/predict_regressor_torch.py +++ b/applications/predict_regressor_torch.py @@ -17,7 +17,7 @@ from torch.utils.data.distributed import DistributedSampler from mlguess.torch.distributed import distributed_model_wrapper -from mlguess.torch.pbs import launch_script, launch_script_mpi +from mlguess.pbs import launch_pbs_jobs, launch_distributed_jobs from mlguess.torch.checkpoint import load_model_state from mlguess.torch.trainer_regression import Trainer from mlguess.torch.regression_losses import LipschitzMSELoss @@ -30,10 +30,6 @@ os.environ["OMP_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1" -os.environ['NCCL_SHM_DISABLE'] = '1' -os.environ['NCCL_IB_DISABLE'] = '1' - - # https://stackoverflow.com/questions/59129812/how-to-avoid-cuda-out-of-memory-in-pytorch os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" @@ -226,7 +222,7 @@ def main(rank, world_size, conf, trial=False): elif split == "valid": df = valid_loader.dataset.valid_data elif split == "test": - df = train_loader.dataset.test_data + df = test_loader.dataset.test_data df["mu"] = mu df["aleatoric"] = aleatoric @@ -320,10 +316,10 @@ def main(rank, world_size, conf, trial=False): script_path = Path(__file__).absolute() if conf['pbs']['queue'] == 'casper': logging.info("Launching to PBS on Casper") - launch_script(config, script_path) + launch_pbs_jobs(config, script_path) else: logging.info("Launching to PBS on Derecho") - launch_script_mpi(config, script_path) + launch_distributed_jobs(config, script_path) sys.exit() if use_wandb: # this needs updated diff --git a/applications/train_classifier_torch.py b/applications/train_classifier_torch.py index fd8a0fe..b372ea6 100644 --- a/applications/train_classifier_torch.py +++ b/applications/train_classifier_torch.py @@ -3,7 +3,6 @@ import sys import copy import yaml -import wandb import optuna import shutil import logging @@ -24,7 +23,7 @@ from mlguess.torch.distributed import distributed_model_wrapper from mlguess.torch.scheduler import load_scheduler -from mlguess.torch.pbs import launch_script, launch_script_mpi +from mlguess.pbs import launch_pbs_jobs, launch_distributed_jobs from mlguess.torch.checkpoint import ( FSDPOptimizerWrapper, TorchFSDPCheckpointIO @@ -42,9 +41,6 @@ os.environ["OMP_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1" -os.environ['NCCL_SHM_DISABLE'] = '1' -os.environ['NCCL_IB_DISABLE'] = '1' - # https://stackoverflow.com/questions/59129812/how-to-avoid-cuda-out-of-memory-in-pytorch os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" @@ -506,14 +502,6 @@ def train(self, trial, conf): default=0, help="Submit workers to PBS.", ) - parser.add_argument( - "-w", - "--wandb", - dest="wandb", - type=int, - default=0, - help="Use wandb. Default = False" - ) parser.add_argument( "-m", "--mode", @@ -525,7 +513,6 @@ def train(self, trial, conf): args_dict = vars(args) config = args_dict.pop("model_config") launch = int(args_dict.pop("launch")) - use_wandb = int(args_dict.pop("wandb")) mode = str(args_dict.pop("mode")) # Set up logger to print stuff @@ -561,21 +548,12 @@ def train(self, trial, conf): script_path = Path(__file__).absolute() if conf['pbs']['queue'] == 'casper': logging.info("Launching to PBS on Casper") - launch_script(config, script_path) + launch_pbs_jobs(config, script_path) else: logging.info("Launching to PBS on Derecho") - launch_script_mpi(config, script_path) + launch_distributed_jobs(config, script_path) sys.exit() - if use_wandb: # this needs updated - wandb.init( - # set the wandb project where this run will be logged - project="Derecho parallelism", - name=f"Worker {os.environ['RANK']} {os.environ['WORLD_SIZE']}", - # track hyperparameters and run metadata - config=conf - ) - seed = 1000 if "seed" not in conf else conf["seed"] seed_everything(seed) diff --git a/applications/train_regressor_torch.py b/applications/train_regressor_torch.py index 44945e6..b7f710e 100644 --- a/applications/train_regressor_torch.py +++ b/applications/train_regressor_torch.py @@ -2,14 +2,15 @@ import os import sys import yaml -import wandb import optuna import shutil import logging import importlib +import pandas as pd from pathlib import Path from argparse import ArgumentParser +from collections import defaultdict from echo.src.base_objective import BaseObjective import torch @@ -20,7 +21,7 @@ from mlguess.torch.distributed import distributed_model_wrapper from mlguess.torch.scheduler import load_scheduler -from mlguess.torch.pbs import launch_script, launch_script_mpi +from mlguess.pbs import launch_pbs_jobs, launch_distributed_jobs from mlguess.torch.checkpoint import ( FSDPOptimizerWrapper, TorchFSDPCheckpointIO @@ -36,10 +37,6 @@ os.environ["OMP_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1" -os.environ['NCCL_SHM_DISABLE'] = '1' -os.environ['NCCL_IB_DISABLE'] = '1' - - # https://stackoverflow.com/questions/59129812/how-to-avoid-cuda-out-of-memory-in-pytorch os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" @@ -79,7 +76,7 @@ def import_class_from_path(class_name, file_path): return getattr(module, class_name) -def load_dataset_and_sampler(conf, world_size, rank, is_train, seed=42): +def load_dataset_and_sampler(conf, world_size, rank, split, seed=42): """ Load a dataset and its corresponding distributed sampler based on the configuration. @@ -97,9 +94,9 @@ def load_dataset_and_sampler(conf, world_size, rank, is_train, seed=42): Information about the dataset and sampler loaded. """ - # Z-score + # Use the function to import your script torch_dataset = import_class_from_path(conf["data"]["dataset_name"], conf["data"]["dataset_path"]) - dataset = torch_dataset(conf, split='train' if is_train else 'valid') + dataset = torch_dataset(conf, split=split) # Pytorch sampler sampler = DistributedSampler( @@ -107,11 +104,10 @@ def load_dataset_and_sampler(conf, world_size, rank, is_train, seed=42): num_replicas=world_size, rank=rank, seed=seed, - shuffle=is_train, - drop_last=(not is_train) + shuffle=False, + drop_last=False ) - flag = 'training' if is_train else 'validation' - logging.info(f"Loaded a {flag} torch dataset, and a distributed sampler") + logging.info(f"Loaded a {split} torch dataset, and a distributed sampler") return dataset, sampler @@ -231,8 +227,9 @@ def main(rank, world_size, conf, trial=False): # load dataset and sampler - train_dataset, train_sampler = load_dataset_and_sampler(conf, world_size, rank, is_train=True) - valid_dataset, valid_sampler = load_dataset_and_sampler(conf, world_size, rank, is_train=False) + train_dataset, train_sampler = load_dataset_and_sampler(conf, world_size, rank, "train") + valid_dataset, valid_sampler = load_dataset_and_sampler(conf, world_size, rank, "valid") + test_dataset, test_sampler = load_dataset_and_sampler(conf, world_size, rank, "test") # setup the dataloder for this process @@ -257,6 +254,16 @@ def main(rank, world_size, conf, trial=False): drop_last=False ) + test_loader = torch.utils.data.DataLoader( + test_dataset, + batch_size=valid_batch_size, + shuffle=False, + sampler=test_sampler, + pin_memory=False, + num_workers=valid_thread_workers, + drop_last=False + ) + # model m = DNN(**conf["model"]) @@ -278,8 +285,6 @@ def main(rank, world_size, conf, trial=False): model, optimizer, scheduler, scaler = load_model_states_and_optimizer(conf, model, device) # Train and validation losses - # train_criterion = EvidentialRegressionLoss(coef=10.84134458514458) - # valid_criterion = EvidentialRegressionLoss(coef=10.84134458514458) train_criterion = LipschitzMSELoss(**conf["train_loss"]) valid_criterion = LipschitzMSELoss(**conf["valid_loss"]) @@ -304,6 +309,64 @@ def main(rank, world_size, conf, trial=False): trial=trial ) + # Predict with the model + + # Reload train data and do not drop the last batch + train_loader = torch.utils.data.DataLoader( + train_dataset, + batch_size=train_batch_size, + shuffle=False, + sampler=train_sampler, + pin_memory=True, + persistent_workers=True if thread_workers > 0 else False, + num_workers=thread_workers, + drop_last=False + ) + + splits = ["train", "valid", "test"] + loaders = [train_loader, valid_loader, test_loader] + all_results = defaultdict(dict) + + for split, dataloader in zip(splits, loaders): + + result = trainer.predict( + conf, + dataloader, + valid_criterion, + regression_metrics, + train_dataset.y_scaler, + split + ) + + metrics = result["metrics"] + mu = result["mu"] + aleatoric = result["aleatoric"] + epistemic = result["epistemic"] + total = result["total"] + + # Add predictions back to the DataFrame + if split == "train": + df = train_loader.dataset.train_data + elif split == "valid": + df = valid_loader.dataset.valid_data + elif split == "test": + df = test_loader.dataset.test_data + + df["mu"] = mu + df["aleatoric"] = aleatoric + df["epistemic"] = epistemic + df["total_uncertainty"] = total + + for key, values in metrics.items(): + if isinstance(values, list): + values = values[0] + all_results[key][split] = values + + df.to_csv(os.path.join(conf['save_loc'], f"{split}.csv")) + + mets = pd.DataFrame.from_dict(all_results, orient='index') + mets.to_csv(os.path.join(conf['save_loc'], "metrics.csv")) + return result @@ -384,14 +447,6 @@ def train(self, trial, conf): default=0, help="Submit workers to PBS.", ) - parser.add_argument( - "-w", - "--wandb", - dest="wandb", - type=int, - default=0, - help="Use wandb. Default = False" - ) parser.add_argument( "-m", "--mode", @@ -403,7 +458,6 @@ def train(self, trial, conf): args_dict = vars(args) config = args_dict.pop("model_config") launch = int(args_dict.pop("launch")) - use_wandb = int(args_dict.pop("wandb")) mode = str(args_dict.pop("mode")) # Set up logger to print stuff @@ -439,21 +493,12 @@ def train(self, trial, conf): script_path = Path(__file__).absolute() if conf['pbs']['queue'] == 'casper': logging.info("Launching to PBS on Casper") - launch_script(config, script_path) + launch_pbs_jobs(config, script_path) else: logging.info("Launching to PBS on Derecho") - launch_script_mpi(config, script_path) + launch_distributed_jobs(config, script_path) sys.exit() - if use_wandb: # this needs updated - wandb.init( - # set the wandb project where this run will be logged - project="Derecho parallelism", - name=f"Worker {os.environ['RANK']} {os.environ['WORLD_SIZE']}", - # track hyperparameters and run metadata - config=conf - ) - seed = 1000 if "seed" not in conf else conf["seed"] seed_everything(seed) diff --git a/config/evidential_classifier_torch.yml b/config/evidential_classifier_torch.yml index bcef086..50db3ab 100644 --- a/config/evidential_classifier_torch.yml +++ b/config/evidential_classifier_torch.yml @@ -1,6 +1,6 @@ # Where to save stuff -save_loc: '/glade/work/schreck/repos/miles-guess/miles-guess/results/torch_classifier' +save_loc: '/glade/work/schreck/repos/miles-guess/miles-guess/testing/classifier' # Random state seed: 1000 diff --git a/config/evidential_regression_torch.yml b/config/evidential_regression_torch.yml index ac1ee3c..cda74aa 100644 --- a/config/evidential_regression_torch.yml +++ b/config/evidential_regression_torch.yml @@ -1,4 +1,4 @@ -save_loc: "/glade/work/$USER/repos/miles-guess/miles-guess/results/" +save_loc: "/glade/work/$USER/repos/miles-guess/miles-guess/testing/regression/" seed: 1000 data: @@ -29,7 +29,7 @@ data: with_mean: true with_std: true type: normalize - batch_size: &batch_size 9163 # Example batch size + batch_size: &batch_size 5470 # Example batch size data_path: '/glade/p/cisl/aiml/ai2es/surfacelayer/cabauw_derived_data_20210720.csv' trainer: @@ -39,10 +39,10 @@ trainer: valid_batch_size: *batch_size batches_per_epoch: 500 # Set to 0 to use len(dataloader) valid_batches_per_epoch: 0 - learning_rate: 0.0015285262808755972 - weight_decay: 0.0009378550509012784 + learning_rate: 0.0061048983425573185 + weight_decay: 3.5931998006241314e-07 start_epoch: 0 - epochs: 100 + epochs: 1 amp: False grad_accum_every: 1 grad_max_norm: 1.0 @@ -67,34 +67,34 @@ model: train_loss: tol: 1e-8 # Tolerance parameter for loss calculations - coef: 0.1 # Factor for the evidence regularizer + coef: 5.806527626063491 # Factor for the evidence regularizer reduction: "mean" # Reduction method for the loss calculation valid_loss: tol: 1e-8 # Tolerance parameter for loss calculations - coef: 0.1 # Factor for the evidence regularizer + coef: 5.806527626063491 # Factor for the evidence regularizer reduction: "mean" # Reduction method for the loss calculation -pbs: #derecho - conda: "holodec" - project: "NAML0001" - job_name: "xformer" - walltime: "24:00:00" - nodes: 8 - ncpus: 64 - ngpus: 4 - mem: '480GB' - queue: 'preempt' +# pbs: #derecho +# conda: "holodec" +# project: "NAML0001" +# job_name: "xformer" +# walltime: "24:00:00" +# nodes: 8 +# ncpus: 64 +# ngpus: 4 +# mem: '480GB' +# queue: 'preempt' -# pbs: # casper -# conda: "/glade/work/schreck/miniconda3/envs/evidential" -# job_name: 'latlon' -# nodes: 1 -# ncpus: 8 -# ngpus: 1 -# mem: '128GB' -# walltime: '12:00:00' -# gpu_type: 'a100' -# cpu_type: 'milan' -# project: 'NAML0001' -# queue: 'casper' +pbs: # casper + conda: "/glade/work/schreck/miniconda3/envs/evidential" + job_name: 'latlon' + nodes: 1 + ncpus: 8 + ngpus: 1 + mem: '128GB' + walltime: '12:00:00' + gpu_type: 'a100' + cpu_type: 'milan' + project: 'NAML0001' + queue: 'casper' diff --git a/config/pbs_example.yaml b/config/pbs_example.yaml new file mode 100644 index 0000000..ec28dc6 --- /dev/null +++ b/config/pbs_example.yaml @@ -0,0 +1,25 @@ +# Option for casper +pbs: + account: your account + env_setup: "conda activate mlguess" + gpu_type: a100 + cpu_type: milan + mem: 128GB + name: ptype + ncpus: 8 + ngpus: 1 + queue: casper + select: 1 + walltime: 43200 + +# Option for derecho +pbs: + project: "your account" + conda: "conda activate mlguess" + job_name: "my-guess-job" + walltime: "12:00:00" + nodes: 8 # this example asks for 32 total GPUs + ncpus: 64 # take all CPUs on a node + ngpus: 4 # always 4 on derecho + mem: '480GB' # take all memory on a node + queue: 'main' \ No newline at end of file diff --git a/docs/source/torch.md b/docs/source/torch.md index 220f6bf..fca43aa 100644 --- a/docs/source/torch.md +++ b/docs/source/torch.md @@ -2,7 +2,7 @@ Welcome to the pyTorch users page. The instructions below outline how to compute ## Regression usage -There are two provided scripts which are mostly similar, one for training and one for loading a trained model and predicting. +There are two provided scripts which are mostly similar, one for training (and predicting), and one for loading a trained model and predicting. The second script serves as an example on how to load the model from a checkpoint as well as potentially scale inference across GPUs. You may only need to run the trainer script and not necessarily the predict script, as they will both save model predictions to file as well as metrics. Run the training script with: `python applications/train_regressor_torch.py -c [-l] [-m ]` @@ -12,13 +12,20 @@ Arguments: - `-m, --mode`: Set the training mode to 'none', 'ddp', or 'fsdp' (optional) Example: -`python trainer.py -c config.yml -m ddp -l 1` +`python trainer.py -c config.yml -m none -l 1` -The YAML configuration file should contain settings for the model, training, data, and pbs or slurm settings. For distributed training, set the `mode` in the config file or use the `-m` argument to specify 'ddp' or 'fsdp', and use the `-l` flag to submit jobs to PBS or manually set up your distributed environment. +Running distribued mode: + +`torchrun [options] trainer.py -c config.yml -m ddp` + +`torchrun [options] trainer.py -c config.yml -m fsdp` + +The YAML configuration file should contain settings for the model, training, data, and pbs or slurm settings. For distributed training, set the `mode` in the config file or use the `-m` argument to specify 'ddp' or 'fsdp'. Use the `-l` flag to submit jobs to PBS or manually set up your distributed environment. If you plan to use more than 1 node, you may need to customize the torchrun for your system. FSDP is relatively hard to set up automatically, you will need to choose the model/data sharding policy on your own. For more detailed information about configuration options and advanced usage, please refer to the code documentation and comments within the script. -Once a model is trained, then run +[Optional] +Once a model is trained, if you would like to load the model after-the-fact and predict on the training splits, run `python applications/predict_regressor_torch.py -c [-l] [-m ]` @@ -26,12 +33,15 @@ which will load the trained model from disk and predict on the training splits a ## Classifier usage -For the classifier models, training and evaluating an evidential model on a dataset is performed in the same script, with options for distributed training using either DDP or FSDP. +For the classifier models, training and evaluating an evidential model on a dataset is performed in the same script, with options for distributed training using either DDP or FSDP. See the regression prediction script for an example on model checkpoint reloading and prediction. -Run the combined script with: `python applications/train_classifier_torch.py -c [-l] [-m ]` +Run the combined script with: + +`python applications/train_classifier_torch.py -c [-l] [-m ]` Example: -`python applications/train_classifier_torch.py -c config.yml -m ddp -l 1` + +`python applications/train_classifier_torch.py -c config.yml -m none` As noted this script will doubly train a model and then predict on the supplied training splits. The predicted quanties include the task(s) predictions along with the Dempster-Shafer uncertainty, and aleatoric and epistemic quantities for a $K$-class problem. Please see the full documentation for more. diff --git a/mlguess/pbs.py b/mlguess/pbs.py index efd7276..cba2b3a 100644 --- a/mlguess/pbs.py +++ b/mlguess/pbs.py @@ -1,9 +1,28 @@ import os -import subprocess +import re import yaml +import shutil +import logging +import subprocess + + +def launch_pbs_jobs(config_file, trainer_path, args=''): + """ + Launches a PBS job using the specified configuration file and trainer script. + This function reads the configuration file to construct a PBS script, writes the + script to a file, submits the job using `qsub`, and then cleans up the script file. + + Args: + config_file (str): Path to the YAML configuration file containing PBS options. + trainer_path (str): Path to the Python training script to be executed. + args (str, optional): Additional command-line arguments to pass to the training script. Defaults to an empty string. + + Raises: + ValueError: If the 'pbs' section is not present in the configuration file. + + """ -def launch_pbs_jobs(config_file, trainer_path, args = ''): # Load configuration file with open(config_file, "r") as f: config = yaml.safe_load(f) @@ -15,12 +34,20 @@ def launch_pbs_jobs(config_file, trainer_path, args = ''): # Build PBS script save_path = config["save_loc"] + script = f"""#!/bin/bash -l #PBS -N {config['pbs']['name']} #PBS -l select={config['pbs']['select']}:ncpus={config['pbs']['ncpus']}:ngpus={config['pbs']['ngpus']}:mem={config['pbs']['mem']} #PBS -l walltime={config['pbs']['walltime']} - #PBS -l gpu_type={config['pbs']['gpu_type']} - #PBS -A {config['pbs']['account']} + """ + + # Add optional fields if they exist + if 'gpu_type' in config['pbs']: + script += f"#PBS -l gpu_type={config['pbs']['gpu_type']}\n" + if 'cpu_type' in config['pbs']: + script += f"#PBS -l cpu_type={config['pbs']['cpu_type']}\n" + + script += f"""#PBS -A {config['pbs']['account']} #PBS -q {config['pbs']['queue']} #PBS -o {os.path.join(save_path, "out")} #PBS -e {os.path.join(save_path, "out")} @@ -40,7 +67,109 @@ def launch_pbs_jobs(config_file, trainer_path, args = ''): stderr=subprocess.PIPE, ).communicate()[0] jobid = jobid.decode("utf-8").strip("\n") - print(jobid) + logging.info(f"Launched job {jobid}") # Clean up PBS script os.remove("launcher.sh") + + +def launch_distributed_jobs(config_file, script_path, launch=True): + """ + Launches a distributed job across multiple nodes using PBS and MPI. + + This function generates a PBS script based on the provided configuration file, + copies the necessary files, and optionally submits the job to the queue. + + Args: + config_file (str): Path to the YAML configuration file containing PBS options. + script_path (str): Path to the Python script to be executed in the distributed environment. + launch (bool, optional): If True, submits the job using `qsub`. If False, only generates the script. Defaults to True. + + """ + + with open(config_file) as cf: + config = yaml.load(cf, Loader=yaml.FullLoader) + + # Extract PBS options from the config + pbs_options = config.get('pbs', {}) + + user = os.environ.get('USER') + num_nodes = pbs_options.get('nodes', 1) + num_gpus = pbs_options.get('ngpus', 1) + total_gpus = num_nodes * num_gpus + + # Create the CUDA_VISIBLE_DEVICES string + cuda_devices = ",".join(str(i) for i in range(total_gpus)) + save_loc = os.path.expandvars(config["save_loc"]) + + config_save_path = os.path.join(save_loc, "model.yml") + + if os.path.exists(config_save_path): + os.remove(config_save_path) + logging.info('Remove the old model.yml at {}'.format(config_save_path)) + + shutil.copy(config_file, config_save_path) + logging.info('Copy the new {} to {}'.format(config_file, config_save_path)) + + # Generate the PBS script + script = f'''#!/bin/bash + #PBS -A {pbs_options.get('project', 'default_project')} + #PBS -N {pbs_options.get('job_name', 'default_job')} + #PBS -l walltime={pbs_options.get('walltime', '00:10:00')} + #PBS -l select={num_nodes}:ncpus={pbs_options.get('ncpus', 1)}:ngpus={num_gpus}:mem={pbs_options.get('mem', '4GB')} + #PBS -q {pbs_options.get('queue', 'default_queue')} + #PBS -j oe + #PBS -k eod + + # Load modules + module purge + module load nvhpc cuda cray-mpich conda + conda activate {pbs_options.get('conda', 'holodec')} + + # Get a list of allocated nodes + nodes=( $( cat $PBS_NODEFILE ) ) + head_node=${{nodes[0]}} + head_node_ip=$(ssh $head_node hostname -i | awk '{{print $1}}') + + # Export environment variables + export LSCRATCH=/glade/derecho/scratch/{user}/ + export LOGLEVEL=INFO + export NCCL_DEBUG=INFO + + # Print the results + echo "Number of nodes: {num_nodes}" + echo "Number of GPUs per node: {num_gpus}" + echo "Total number of GPUs: {total_gpus}" + + # Launch MPIs + CUDA_VISIBLE_DEVICES="{cuda_devices}" mpiexec -n {num_nodes} --ppn 1 --cpu-bind none torchrun --nnodes={num_nodes} --nproc-per-node={num_gpus} --rdzv-backend=c10d --rdzv-endpoint=$head_node_ip {script_path} -c {config_save_path} + ''' + + script = re.sub(r'^\s+', '', script, flags=re.MULTILINE) + + # Save the script to a file + with open('launch.sh', 'w') as script_file: + script_file.write(script) + + if launch: + jobid = subprocess.Popen( + "qsub launch.sh", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ).communicate()[0] + jobid = jobid.decode("utf-8").strip("\n") + logging.info(jobid) + + # copy launch.sh to the design location + launch_path = os.path.join(save_loc, "launch.sh") + + if os.path.exists(launch_path): + os.remove(launch_path) + print('Remove the old launch.sh at {}'.format(launch_path)) + + shutil.copy("launch.sh", os.path.join(save_loc, "launch.sh")) + logging.info('Generating the new script at {}'.format(launch_path)) + + # remove the one from local space + os.remove("launch.sh") diff --git a/mlguess/torch/pbs.py b/mlguess/torch/pbs.py deleted file mode 100644 index b49471a..0000000 --- a/mlguess/torch/pbs.py +++ /dev/null @@ -1,155 +0,0 @@ -import re -import os -import yaml -import shutil -import subprocess - - -def launch_script(config_file, script_path, launch=True): - """ - This function launches a script using PBS (Portable Batch System) based on configurations in a YAML file. - - Args: - config_file (str): Path to the YAML configuration file. - script_path (str): Path to the Python script to be launched. - launch (bool, optional): Whether to submit the job to PBS (default: True). - """ - - # Load the configuration file - with open(config_file, 'r') as file: - config = yaml.safe_load(file) - - # Extract PBS options from the config - pbs_options = config['pbs'] - - config_save_path = os.path.expandvars(os.path.join(config["save_loc"], "model.yml")) - - # Generate the PBS script - script = f"""#!/bin/bash -l - #PBS -N {pbs_options['job_name']} - #PBS -l select=1:ncpus={pbs_options['ncpus']}:ngpus={pbs_options['ngpus']}:mem={pbs_options['mem']} - #PBS -l walltime={pbs_options['walltime']} - #PBS -l gpu_type={pbs_options['gpu_type']} - #PBS -A {pbs_options['project']} - #PBS -q {pbs_options['queue']} - #PBS -j oe - #PBS -k eod - - source ~/.bashrc - - conda activate {pbs_options['conda']} - - python {script_path} -c {config_save_path} - """ - - script = re.sub(r'^\s+', '', script, flags=re.MULTILINE) - - # Save the script to a file - with open('launch.sh', 'w') as script_file: - script_file.write(script) - - if launch: - jobid = subprocess.Popen( - "qsub launch.sh", - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ).communicate()[0] - jobid = jobid.decode("utf-8").strip("\n") - print(jobid) - save_loc = os.path.expandvars(config["save_loc"]) - if not os.path.exists(os.path.join(save_loc, "launch.sh")): - shutil.copy('launch.sh', os.path.join(save_loc, "launch.sh")) - os.remove("launch.sh") - - -def launch_script_mpi(config_file, script_path, launch=True): - """ - This function launches a script using PBS with MPI (Message Passing Interface) for distributed training on multiple GPUs. - - Args: - config_file (str): Path to the YAML configuration file. - script_path (str): Path to the Python script to be launched with MPI. - launch (bool, optional): Whether to submit the job to PBS (default: True). - """ - - with open(config_file) as cf: - config = yaml.load(cf, Loader=yaml.FullLoader) - - # Extract PBS options from the config - pbs_options = config.get('pbs', {}) - - user = os.environ.get('USER') - num_nodes = pbs_options.get('nodes', 1) - num_gpus = pbs_options.get('ngpus', 1) - total_gpus = num_nodes * num_gpus - - # Create the CUDA_VISIBLE_DEVICES string - cuda_devices = ",".join(str(i) for i in range(total_gpus)) - save_loc = os.path.expandvars(config["save_loc"]) - - config_save_path = os.path.join(save_loc, "model.yml") - - # Generate the PBS script - script = f'''#!/bin/bash - #PBS -A {pbs_options.get('project', 'default_project')} - #PBS -N {pbs_options.get('job_name', 'default_job')} - #PBS -l walltime={pbs_options.get('walltime', '00:10:00')} - #PBS -l select={num_nodes}:ncpus={pbs_options.get('ncpus', 1)}:ngpus={num_gpus}:mem={pbs_options.get('mem', '4GB')} - #PBS -q {pbs_options.get('queue', 'default_queue')} - #PBS -j oe - #PBS -k eod - - # Load modules - module purge - module load nvhpc cuda cray-mpich conda - conda activate {pbs_options.get('conda', 'holodec')} - - # Get a list of allocated nodes - nodes=( $( cat $PBS_NODEFILE ) ) - head_node=${{nodes[0]}} - head_node_ip=$(ssh $head_node hostname -i | awk '{{print $1}}') - - # Export environment variables - export LSCRATCH=/glade/derecho/scratch/{user}/ - export LOGLEVEL=INFO - export NCCL_DEBUG=INFO - - # Print the results - echo "Number of nodes: {num_nodes}" - echo "Number of GPUs per node: {num_gpus}" - echo "Total number of GPUs: {total_gpus}" - - # Log in to WandB if needed - # wandb login 02d2b1af00b5df901cb2bee071872de774781520 - - # Launch MPIs - CUDA_VISIBLE_DEVICES="{cuda_devices}" mpiexec -n {num_nodes} --ppn 1 --cpu-bind none torchrun --nnodes={num_nodes} --nproc-per-node={num_gpus} --rdzv-backend=c10d --rdzv-endpoint=$head_node_ip {script_path} -c {config_save_path} - ''' - - script = re.sub(r'^\s+', '', script, flags=re.MULTILINE) - - # Save the script to a file - with open('launch.sh', 'w') as script_file: - script_file.write(script) - - if launch: - jobid = subprocess.Popen( - "qsub launch.sh", - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ).communicate()[0] - jobid = jobid.decode("utf-8").strip("\n") - print(jobid) - if not os.path.exists(os.path.join(save_loc, "launch.sh")): - shutil.copy("launch.sh", os.path.join(save_loc, "launch.sh")) - os.remove("launch.sh") - - -if __name__ == "__main__": - config_file = "../config/vit2d.yml" - # Where does this script live? - script_path = "../applications/trainer_vit2d.py" - launch_script(config_file, script_path, launch=False) - #launch_script_mpi(config_file, script_path, launch = False)