Skip to content

Commit

Permalink
Updated the readme for torch, updates to scripts related to that. Upd…
Browse files Browse the repository at this point in the history
…ates to PBS
  • Loading branch information
jsschreck committed Aug 21, 2024
1 parent bc6752b commit c87c486
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 267 deletions.
12 changes: 4 additions & 8 deletions applications/predict_regressor_torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from torch.utils.data.distributed import DistributedSampler

from mlguess.torch.distributed import distributed_model_wrapper
from mlguess.torch.pbs import launch_script, launch_script_mpi
from mlguess.pbs import launch_pbs_jobs, launch_distributed_jobs
from mlguess.torch.checkpoint import load_model_state
from mlguess.torch.trainer_regression import Trainer
from mlguess.torch.regression_losses import LipschitzMSELoss
Expand All @@ -30,10 +30,6 @@
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"

os.environ['NCCL_SHM_DISABLE'] = '1'
os.environ['NCCL_IB_DISABLE'] = '1'


# https://stackoverflow.com/questions/59129812/how-to-avoid-cuda-out-of-memory-in-pytorch
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

Expand Down Expand Up @@ -226,7 +222,7 @@ def main(rank, world_size, conf, trial=False):
elif split == "valid":
df = valid_loader.dataset.valid_data
elif split == "test":
df = train_loader.dataset.test_data
df = test_loader.dataset.test_data

df["mu"] = mu
df["aleatoric"] = aleatoric
Expand Down Expand Up @@ -320,10 +316,10 @@ def main(rank, world_size, conf, trial=False):
script_path = Path(__file__).absolute()
if conf['pbs']['queue'] == 'casper':
logging.info("Launching to PBS on Casper")
launch_script(config, script_path)
launch_pbs_jobs(config, script_path)
else:
logging.info("Launching to PBS on Derecho")
launch_script_mpi(config, script_path)
launch_distributed_jobs(config, script_path)
sys.exit()

if use_wandb: # this needs updated
Expand Down
28 changes: 3 additions & 25 deletions applications/train_classifier_torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import sys
import copy
import yaml
import wandb
import optuna
import shutil
import logging
Expand All @@ -24,7 +23,7 @@

from mlguess.torch.distributed import distributed_model_wrapper
from mlguess.torch.scheduler import load_scheduler
from mlguess.torch.pbs import launch_script, launch_script_mpi
from mlguess.pbs import launch_pbs_jobs, launch_distributed_jobs
from mlguess.torch.checkpoint import (
FSDPOptimizerWrapper,
TorchFSDPCheckpointIO
Expand All @@ -42,9 +41,6 @@
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"

os.environ['NCCL_SHM_DISABLE'] = '1'
os.environ['NCCL_IB_DISABLE'] = '1'


# https://stackoverflow.com/questions/59129812/how-to-avoid-cuda-out-of-memory-in-pytorch
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
Expand Down Expand Up @@ -506,14 +502,6 @@ def train(self, trial, conf):
default=0,
help="Submit workers to PBS.",
)
parser.add_argument(
"-w",
"--wandb",
dest="wandb",
type=int,
default=0,
help="Use wandb. Default = False"
)
parser.add_argument(
"-m",
"--mode",
Expand All @@ -525,7 +513,6 @@ def train(self, trial, conf):
args_dict = vars(args)
config = args_dict.pop("model_config")
launch = int(args_dict.pop("launch"))
use_wandb = int(args_dict.pop("wandb"))
mode = str(args_dict.pop("mode"))

# Set up logger to print stuff
Expand Down Expand Up @@ -561,21 +548,12 @@ def train(self, trial, conf):
script_path = Path(__file__).absolute()
if conf['pbs']['queue'] == 'casper':
logging.info("Launching to PBS on Casper")
launch_script(config, script_path)
launch_pbs_jobs(config, script_path)
else:
logging.info("Launching to PBS on Derecho")
launch_script_mpi(config, script_path)
launch_distributed_jobs(config, script_path)
sys.exit()

if use_wandb: # this needs updated
wandb.init(
# set the wandb project where this run will be logged
project="Derecho parallelism",
name=f"Worker {os.environ['RANK']} {os.environ['WORLD_SIZE']}",
# track hyperparameters and run metadata
config=conf
)

seed = 1000 if "seed" not in conf else conf["seed"]
seed_everything(seed)

Expand Down
119 changes: 82 additions & 37 deletions applications/train_regressor_torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import os
import sys
import yaml
import wandb
import optuna
import shutil
import logging
import importlib
import pandas as pd

from pathlib import Path
from argparse import ArgumentParser
from collections import defaultdict
from echo.src.base_objective import BaseObjective

import torch
Expand All @@ -20,7 +21,7 @@

from mlguess.torch.distributed import distributed_model_wrapper
from mlguess.torch.scheduler import load_scheduler
from mlguess.torch.pbs import launch_script, launch_script_mpi
from mlguess.pbs import launch_pbs_jobs, launch_distributed_jobs
from mlguess.torch.checkpoint import (
FSDPOptimizerWrapper,
TorchFSDPCheckpointIO
Expand All @@ -36,10 +37,6 @@
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"

os.environ['NCCL_SHM_DISABLE'] = '1'
os.environ['NCCL_IB_DISABLE'] = '1'


# https://stackoverflow.com/questions/59129812/how-to-avoid-cuda-out-of-memory-in-pytorch
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

Expand Down Expand Up @@ -79,7 +76,7 @@ def import_class_from_path(class_name, file_path):
return getattr(module, class_name)


def load_dataset_and_sampler(conf, world_size, rank, is_train, seed=42):
def load_dataset_and_sampler(conf, world_size, rank, split, seed=42):
"""
Load a dataset and its corresponding distributed sampler based on the configuration.
Expand All @@ -97,21 +94,20 @@ def load_dataset_and_sampler(conf, world_size, rank, is_train, seed=42):
Information about the dataset and sampler loaded.
"""

# Z-score
# Use the function to import your script
torch_dataset = import_class_from_path(conf["data"]["dataset_name"], conf["data"]["dataset_path"])
dataset = torch_dataset(conf, split='train' if is_train else 'valid')
dataset = torch_dataset(conf, split=split)

# Pytorch sampler
sampler = DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank,
seed=seed,
shuffle=is_train,
drop_last=(not is_train)
shuffle=False,
drop_last=False
)
flag = 'training' if is_train else 'validation'
logging.info(f"Loaded a {flag} torch dataset, and a distributed sampler")
logging.info(f"Loaded a {split} torch dataset, and a distributed sampler")

return dataset, sampler

Expand Down Expand Up @@ -231,8 +227,9 @@ def main(rank, world_size, conf, trial=False):

# load dataset and sampler

train_dataset, train_sampler = load_dataset_and_sampler(conf, world_size, rank, is_train=True)
valid_dataset, valid_sampler = load_dataset_and_sampler(conf, world_size, rank, is_train=False)
train_dataset, train_sampler = load_dataset_and_sampler(conf, world_size, rank, "train")
valid_dataset, valid_sampler = load_dataset_and_sampler(conf, world_size, rank, "valid")
test_dataset, test_sampler = load_dataset_and_sampler(conf, world_size, rank, "test")

# setup the dataloder for this process

Expand All @@ -257,6 +254,16 @@ def main(rank, world_size, conf, trial=False):
drop_last=False
)

test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=valid_batch_size,
shuffle=False,
sampler=test_sampler,
pin_memory=False,
num_workers=valid_thread_workers,
drop_last=False
)

# model

m = DNN(**conf["model"])
Expand All @@ -278,8 +285,6 @@ def main(rank, world_size, conf, trial=False):
model, optimizer, scheduler, scaler = load_model_states_and_optimizer(conf, model, device)

# Train and validation losses
# train_criterion = EvidentialRegressionLoss(coef=10.84134458514458)
# valid_criterion = EvidentialRegressionLoss(coef=10.84134458514458)

train_criterion = LipschitzMSELoss(**conf["train_loss"])
valid_criterion = LipschitzMSELoss(**conf["valid_loss"])
Expand All @@ -304,6 +309,64 @@ def main(rank, world_size, conf, trial=False):
trial=trial
)

# Predict with the model

# Reload train data and do not drop the last batch
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=train_batch_size,
shuffle=False,
sampler=train_sampler,
pin_memory=True,
persistent_workers=True if thread_workers > 0 else False,
num_workers=thread_workers,
drop_last=False
)

splits = ["train", "valid", "test"]
loaders = [train_loader, valid_loader, test_loader]
all_results = defaultdict(dict)

for split, dataloader in zip(splits, loaders):

result = trainer.predict(
conf,
dataloader,
valid_criterion,
regression_metrics,
train_dataset.y_scaler,
split
)

metrics = result["metrics"]
mu = result["mu"]
aleatoric = result["aleatoric"]
epistemic = result["epistemic"]
total = result["total"]

# Add predictions back to the DataFrame
if split == "train":
df = train_loader.dataset.train_data
elif split == "valid":
df = valid_loader.dataset.valid_data
elif split == "test":
df = test_loader.dataset.test_data

df["mu"] = mu
df["aleatoric"] = aleatoric
df["epistemic"] = epistemic
df["total_uncertainty"] = total

for key, values in metrics.items():
if isinstance(values, list):
values = values[0]
all_results[key][split] = values

df.to_csv(os.path.join(conf['save_loc'], f"{split}.csv"))

mets = pd.DataFrame.from_dict(all_results, orient='index')
mets.to_csv(os.path.join(conf['save_loc'], "metrics.csv"))

return result


Expand Down Expand Up @@ -384,14 +447,6 @@ def train(self, trial, conf):
default=0,
help="Submit workers to PBS.",
)
parser.add_argument(
"-w",
"--wandb",
dest="wandb",
type=int,
default=0,
help="Use wandb. Default = False"
)
parser.add_argument(
"-m",
"--mode",
Expand All @@ -403,7 +458,6 @@ def train(self, trial, conf):
args_dict = vars(args)
config = args_dict.pop("model_config")
launch = int(args_dict.pop("launch"))
use_wandb = int(args_dict.pop("wandb"))
mode = str(args_dict.pop("mode"))

# Set up logger to print stuff
Expand Down Expand Up @@ -439,21 +493,12 @@ def train(self, trial, conf):
script_path = Path(__file__).absolute()
if conf['pbs']['queue'] == 'casper':
logging.info("Launching to PBS on Casper")
launch_script(config, script_path)
launch_pbs_jobs(config, script_path)
else:
logging.info("Launching to PBS on Derecho")
launch_script_mpi(config, script_path)
launch_distributed_jobs(config, script_path)
sys.exit()

if use_wandb: # this needs updated
wandb.init(
# set the wandb project where this run will be logged
project="Derecho parallelism",
name=f"Worker {os.environ['RANK']} {os.environ['WORLD_SIZE']}",
# track hyperparameters and run metadata
config=conf
)

seed = 1000 if "seed" not in conf else conf["seed"]
seed_everything(seed)

Expand Down
2 changes: 1 addition & 1 deletion config/evidential_classifier_torch.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

# Where to save stuff
save_loc: '/glade/work/schreck/repos/miles-guess/miles-guess/results/torch_classifier'
save_loc: '/glade/work/schreck/repos/miles-guess/miles-guess/testing/classifier'

# Random state
seed: 1000
Expand Down
Loading

0 comments on commit c87c486

Please sign in to comment.