From 8279b4605cb1a8f8ed51ffc6fc8d23c4ccc258db Mon Sep 17 00:00:00 2001 From: jingpengw Date: Mon, 29 Aug 2022 11:25:05 -0400 Subject: [PATCH] semantic training --- | 1 + environment.yml | 286 ++++++++++++++++++ neutorch/dataset/ | 13 +- neutorch/dataset/ | 104 +++++++ neutorch/dataset/ | 20 +- neutorch/{cli => train}/ | 0 neutorch/train/ | 212 +++++++++++++ .../} | 0 .../} | 1 + neutorch/train/ | 39 +++ neutorch/{cli => train}/ | 0 | 7 +- 12 files changed, 663 insertions(+), 20 deletions(-) create mode 100644 environment.yml create mode 100644 neutorch/dataset/ rename neutorch/{cli => train}/ (100%) create mode 100644 neutorch/train/ rename neutorch/{cli/ => train/} (100%) rename neutorch/{cli/ => train/} (98%) create mode 100644 neutorch/train/ rename neutorch/{cli => train}/ (100%) diff --git a/ b/ index 5fb495d..d5083f0 100644 --- a/ +++ b/ @@ -11,6 +11,7 @@ Neuron segmentation and synapse detection using PyTorch # Features - [x] Training using whole terabyte or even petabyte of min(dataset.start + per_worker, overall_end) +def path_to_dataset_name(path: str, dataset_names: list): + for dataset_name in dataset_names: + if dataset_name in path: + return dataset_name + + class DatasetBase( def __init__(self, @@ -48,8 +55,10 @@ def __init__(self, self.transform.shrink_size[:3] + \ self.transform.shrink_size[-3:] - # inherite this class and build the samples - self.samples = None + @cached_property + @abstractproperty + def samples(self): + pass @cached_property def sample_num(self): diff --git a/neutorch/dataset/ b/neutorch/dataset/ new file mode 100644 index 0000000..8073e67 --- /dev/null +++ b/neutorch/dataset/ @@ -0,0 +1,104 @@ +import os +from functools import cached_property + +from tqdm import tqdm + +from chunkflow.chunk import Chunk +from chunkflow.lib.cartesian_coordinate import Cartesian +from chunkflow.volume import Volume + +from neutorch.dataset.base import DatasetBase, path_to_dataset_name +from neutorch.dataset.ground_truth_sample import GroundTruthSample +from neutorch.dataset.transform import * + + +class SemanticDataset(DatasetBase): + def __init__(self, path_list: list, + sample_name_to_image_versions: dict, + patch_size: Cartesian = Cartesian(128, 128, 128)): + super().__init__(patch_size=patch_size) + + self.path_list = path_list + self.sample_name_to_image_versions = sample_name_to_image_versions + + self.vols = {} + for dataset_name, dir_list in sample_name_to_image_versions.items(): + vol_list = [] + for dir_path in dir_list: + vol = Volume.from_cloudvolume_path( + 'file://' + dir_path, + bounded = True, + fill_missing = False, + parallel = True, + green_threads = False, + ) + vol_list.append(vol) + self.vols[dataset_name] = vol_list + + self.compute_sample_weights() + self.setup_iteration_range() + + @cached_property + def samples(self): + samples = [] + for sem_path in tqdm(self.path_list): + assert os.path.exists(sem_path) + sem = Chunk.from_h5(sem_path) + + images = [] + dataset_name = path_to_dataset_name( + sem_path, + self.sample_name_to_image_versions.keys() + ) + for vol in self.vols[dataset_name]: + image = vol.cutout(sem.bbox) + images.append(image) + + target = (sem.array>0) + target = target.astype(np.float32) + sample = GroundTruthSample( + images, + target=target, + patch_size=self.patch_size_before_transform + ) + samples.append(sample) + + return samples + + def _prepare_transform(self): + self.transform = Compose([ + NormalizeTo01(probability=1.), + AdjustBrightness(), + AdjustContrast(), + Gamma(), + OneOf([ + Noise(), + GaussianBlur2D(), + ]), + BlackBox(), + Perspective2D(), + # RotateScale(probability=1.), + #DropSection(), + Flip(), + Transpose(), + MissAlignment(), + ]) + + +if __name__ == '__main__': + + from yacs.config import CfgNode + + cfg_file = '/mnt/home/jwu/wasp/jwu/15_rna_granule_net/11/config.yaml' + with open(cfg_file) as file: + cfg = CfgNode.load_cfg(file) + cfg.freeze() + + sd = SemanticDataset( + path_list=['/mnt/ceph/users/neuro/wasp_em/jwu/40_gt/13_wasp_sample3/vol_01700/rna_v1.h5'], + sample_name_to_image_versions=cfg.dataset.sample_name_to_image_versions, + patch_size=Cartesian(128, 128, 128), + ) + + # print(sd.samples) + diff --git a/neutorch/dataset/ b/neutorch/dataset/ index 81e138d..047f86e 100644 --- a/neutorch/dataset/ +++ b/neutorch/dataset/ @@ -1,11 +1,8 @@ -import os from time import time, sleep -from collections import OrderedDict from functools import cached_property from typing import Union, List import numpy as np -from scipy.stats import describe from chunkflow.lib.cartesian_coordinate import Cartesian, BoundingBox from chunkflow.lib.synapses import Synapses @@ -13,16 +10,9 @@ import torch -from neutorch.dataset.ground_truth_sample import PostSynapseGroundTruth -from neutorch.dataset.transform import * -from .base import DatasetBase -from .ground_truth_sample import GroundTruthSampleWithPointAnnotation - - -def syns_path_to_dataset_name(syns_path: str, dataset_names: list): - for dataset_name in dataset_names: - if dataset_name in syns_path: - return dataset_name +from .transform import * +from .base import DatasetBase, path_to_dataset_name +from .ground_truth_sample import GroundTruthSampleWithPointAnnotation, PostSynapseGroundTruth class SynapsesDatasetBase(DatasetBase): @@ -40,7 +30,7 @@ def __init__(self, vol = Volume.from_cloudvolume_path( 'file://' + dir_path, bounded = True, - fill_missing = True, + fill_missing = False, parallel=True, ) vol_list.append(vol) @@ -50,7 +40,7 @@ def __init__(self, def syns_path_to_images(self, syns_path: str, bbox: BoundingBox): images = [] - dataset_name = syns_path_to_dataset_name( + dataset_name = path_to_dataset_name( syns_path, self.sample_name_to_image_versions.keys() ) diff --git a/neutorch/cli/ b/neutorch/train/ similarity index 100% rename from neutorch/cli/ rename to neutorch/train/ diff --git a/neutorch/train/ b/neutorch/train/ new file mode 100644 index 0000000..c629816 --- /dev/null +++ b/neutorch/train/ @@ -0,0 +1,212 @@ +from abc import ABC, abstractproperty +from functools import cached_property +from glob import glob + +import random +import os +from time import time + +from yacs.config import CfgNode +import numpy as np + +from chunkflow.lib.cartesian_coordinate import Cartesian + +import torch +from torch.utils.tensorboard import SummaryWriter +from import DataLoader +from neutorch.dataset.patch import collate_batch + +from neutorch.model.IsoRSUNet import Model +from import save_chkpt, load_chkpt, log_tensor +from neutorch.loss import BinomialCrossEntropyWithLogits +from neutorch.dataset.base import worker_init_fn + + +class TrainerBase(ABC): + def __init__(self, cfg: CfgNode, + batch_size: int = 1) -> None: + if isinstance(cfg, str) and os.path.exists(cfg): + with open(cfg) as file: + cfg = CfgNode.load_cfg(file) + cfg.freeze() + + if cfg.system.seed is not None: + random.seed(cfg.system.seed) + + self.cfg = cfg + self.batch_size = batch_size + self.patch_size=Cartesian.from_collection(cfg.train.patch_size) + + self._split_path_list() + + @cached_property + def path_list(self): + glob_path = os.path.expanduser(self.cfg.dataset.glob_path) + path_list = glob(glob_path, recursive=True) + path_list = sorted(path_list) + print(f'path_list \n: {path_list}') + assert len(path_list) > 1 + assert len(path_list) % 2 == 0, \ + "the image and synapses should be paired." + return path_list + + def _split_path_list(self): + training_path_list = [] + validation_path_list = [] + for path in self.path_list: + assignment_flag = False + for validation_name in self.cfg.dataset.validation_names: + if validation_name in path: + validation_path_list.append(path) + assignment_flag = True + + for test_name in self.cfg.dataset.test_names: + if test_name in path: + assignment_flag = True + + if not assignment_flag: + training_path_list.append(path) + + print(f'split {len(self.path_list)} ground truth samples to {len(training_path_list)} training samples, {len(validation_path_list)} validation samples, and {len(self.path_list)-len(training_path_list)-len(validation_path_list)} test samples.') + self.training_path_list = training_path_list + self.validation_path_list = validation_path_list + + @cached_property + def model(self): + model = Model(self.cfg.model.in_channels, self.cfg.model.out_channels) + if torch.cuda.is_available(): + device = torch.device("cuda") + gpu_num = torch.cuda.device_count() + print("Let's use ", gpu_num, " GPUs!") + model = torch.nn.DataParallel( + model, + device_ids=list(range(gpu_num)), + dim=0, + ) + # we normally use one batch for each GPU + self.batch_size *= gpu_num + else: + device = torch.device("cpu") + + # note that we have to wrap the nn.DataParallel(model) before + # loading the model since the dictionary is changed after the wrapping + model = load_chkpt( + model, + self.cfg.train.output_dir, + self.cfg.train.iter_start) + print('send model to device: ', device) + model = + return model + + @cached_property + def optimizer(self): + return torch.optim.Adam( + self.model.parameters(), + lr=self.cfg.train.learning_rate + ) + + + @cached_property + def loss_module(self): + return BinomialCrossEntropyWithLogits() + + @cached_property + @abstractproperty + def training_dataset(self): + pass + + @cached_property + @abstractproperty + def validation_dataset(self): + pass + + @cached_property + def training_data_loader(self): + training_data_loader = DataLoader( + self.training_dataset, + #num_workers=self.cfg.system.cpus, + num_workers=1, + prefetch_factor=1, + drop_last=False, + multiprocessing_context='spawn', + collate_fn=collate_batch, + worker_init_fn=worker_init_fn, + batch_size=self.batch_size, + ) + return training_data_loader + + @cached_property + def validation_data_loader(self): + validation_data_loader = DataLoader( + self.validation_dataset, + num_workers=1, + prefetch_factor=2, + drop_last=False, + multiprocessing_context='spawn', + collate_fn=collate_batch, + batch_size=self.batch_size, + ) + return validation_data_loader + + @cached_property + def validation_data_iter(self): + validation_data_iter = iter(self.validation_data_loader) + return validation_data_iter + + @cached_property + def voxel_num(self): + return np.product(self.patch_size) * self.batch_size + + def __call__(self) -> None: + writer = SummaryWriter(log_dir=self.cfg.train.output_dir) + accumulated_loss = 0. + iter_idx = self.cfg.train.iter_start + for image, target in self.training_data_loader: + iter_idx += 1 + if iter_idx> self.cfg.train.iter_stop: + print('exceeds the maximum iteration: ', self.cfg.train.iter_stop) + return + + ping = time() + # print(f'preparing patch takes {round(time()-ping, 3)} seconds') + logits = self.model(image) + loss = self.loss_module(logits, target) + self.optimizer.zero_grad() + loss.backward() + self.optimizer.step() + accumulated_loss += loss.tolist() + print(f'iteration {iter_idx} takes {round(time()-ping, 3)} seconds.') + + if iter_idx % self.cfg.train.training_interval == 0 and iter_idx > 0: + per_voxel_loss = accumulated_loss / \ + self.cfg.train.training_interval / \ + self.voxel_num + + print(f'training loss {round(per_voxel_loss, 3)}') + accumulated_loss = 0. + predict = torch.sigmoid(logits) + writer.add_scalar('Loss/train', per_voxel_loss, iter_idx) + log_tensor(writer, 'train/image', image, iter_idx) + log_tensor(writer, 'train/prediction', predict, iter_idx) + log_tensor(writer, 'train/target', target, iter_idx) + + if iter_idx % self.cfg.train.validation_interval == 0 and iter_idx > 0: + fname = os.path.join(self.cfg.train.output_dir, f'model_{iter_idx}.chkpt') + print(f'save model to {fname}') + save_chkpt(self.model, self.cfg.train.output_dir, iter_idx, self.optimizer) + + print('evaluate prediction: ') + validation_image, validation_target = next(self.validation_data_iter) + + with torch.no_grad(): + validation_logits = self.model(validation_image) + validation_predict = torch.sigmoid(validation_logits) + validation_loss = self.loss_module(validation_logits, validation_target) + per_voxel_loss = validation_loss.tolist() / self.voxel_num + print(f'iter {iter_idx}: validation loss: {round(per_voxel_loss, 3)}') + writer.add_scalar('Loss/validation', per_voxel_loss, iter_idx) + log_tensor(writer, 'evaluate/image', validation_image, iter_idx) + log_tensor(writer, 'evaluate/prediction', validation_predict, iter_idx) + log_tensor(writer, 'evaluate/target', validation_target, iter_idx) + + writer.close() diff --git a/neutorch/cli/ b/neutorch/train/ similarity index 100% rename from neutorch/cli/ rename to neutorch/train/ diff --git a/neutorch/cli/ b/neutorch/train/ similarity index 98% rename from neutorch/cli/ rename to neutorch/train/ index b0e836e..a7a4195 100644 --- a/neutorch/cli/ +++ b/neutorch/train/ @@ -82,6 +82,7 @@ def main(config_file: str): else: device = torch.device("cpu") + # since we trained this model using DataParallel, we have to wrap it with DataParallel as well in the inference stage. # note that we have to wrap the nn.DataParallel(model) before # loading the model since the dictionary is changed after the wrapping model = load_chkpt(model, cfg.train.output_dir, cfg.train.iter_start) diff --git a/neutorch/train/ b/neutorch/train/ new file mode 100644 index 0000000..825f0cc --- /dev/null +++ b/neutorch/train/ @@ -0,0 +1,39 @@ +from functools import cached_property + +import click +from yacs.config import CfgNode + +from .base import TrainerBase +from neutorch.dataset.semantic import SemanticDataset + + +class SemanticTrainer(TrainerBase): + def __init__(self, cfg: CfgNode, batch_size: int = 1) -> None: + super().__init__(cfg, batch_size) + + @cached_property + def training_dataset(self): + return SemanticDataset( + self.training_path_list, + self.cfg.dataset.sample_name_to_image_versions, + patch_size=self.patch_size, + ) + + @cached_property + def validation_dataset(self): + return SemanticDataset( + self.validation_path_list, + self.cfg.dataset.sample_name_to_image_versions, + patch_size=self.patch_size, + ) + + +@click.command() +@click.option('--config-file', '-c', + type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True, resolve_path=True), + default='./config.yaml', + help = 'configuration file containing all the parameters.' +) +def main(config_file: str): + trainer = SemanticTrainer(config_file) + trainer() \ No newline at end of file diff --git a/neutorch/cli/ b/neutorch/train/ similarity index 100% rename from neutorch/cli/ rename to neutorch/train/ diff --git a/ b/ index d556900..e06bedc 100755 --- a/ +++ b/ @@ -12,9 +12,10 @@ packages=find_packages(exclude=['bin']), entry_points=''' [console_scripts] - neutrain-pre=neutorch.cli.train_pre_synapses:main - neutrain-denoise=neutorch.cli.train_denoise:main - neutrain-post=neutorch.cli.train_post_synapses:main + neutrain-sem=neutorch.train.semantic:main + neutrain-pre=neutorch.train.pre_synapses:main + neutrain-denoise=neutorch.train.denoise:main + neutrain-post=neutorch.train.post_synapses:main ''', classifiers=[ 'Development Status :: 4 - Beta',