diff --git a/scripts/preprocess.py b/scripts/preprocess.py index ddf0664..3092767 100644 --- a/scripts/preprocess.py +++ b/scripts/preprocess.py @@ -395,10 +395,16 @@ def parse_coco_json( img.save(os.path.join(path, img_info["file_name"])) elif "detection" in path_splits: # Normalize bbox (x_center, y_center, width, height) - x = (ann["bbox"][0] + ann["bbox"][2] / 2) / img_info["width"] - y = (ann["bbox"][1] + ann["bbox"][3] / 2) / img_info["height"] - w = ann["bbox"][2] / img_info["width"] - h = ann["bbox"][3] / img_info["height"] + # x = (ann["bbox"][0] + ann["bbox"][2] / 2) / img_info["width"] + # y = (ann["bbox"][1] + ann["bbox"][3] / 2) / img_info["height"] + # w = ann["bbox"][2] / img_info["width"] + # h = ann["bbox"][3] / img_info["height"] + + # Convert to pascal_voc format (with resized bbox) + x1 = int(ann["bbox"][0] * size[0] / img_info["width"]) + y1 = int(ann["bbox"][1] * size[1] / img_info["height"]) + x2 = x1 + int(ann["bbox"][2] * size[0] / img_info["width"]) + y2 = y1 + int(ann["bbox"][3] * size[1] / img_info["height"]) # Copy the image and create .txt annotation filename img.save(os.path.join(path, "images", img_info["file_name"])) @@ -406,7 +412,7 @@ def parse_coco_json( with open(os.path.join(path, "annotations", txt), "w") as f: # Write the bounding box - f.write(f"{x} {y} {w} {h}") + f.write(f"{x1} {y1} {x2} {y2}") elif "segmentation" in path_splits: # Update the mask for the current class mask = masks[class_name] diff --git a/scripts/run.py b/scripts/run.py index f6deb97..b96280f 100644 --- a/scripts/run.py +++ b/scripts/run.py @@ -17,8 +17,12 @@ torch.set_float32_matmul_precision("medium") from glasses_detector import GlassesClassifier, GlassesDetector, GlassesSegmenter -from glasses_detector._data import ImageClassificationDataset, ImageSegmentationDataset -from glasses_detector._wrappers import BinaryClassifier, BinarySegmenter +from glasses_detector._data import ( + ImageClassificationDataset, + ImageDetectionDataset, + ImageSegmentationDataset, +) +from glasses_detector._wrappers import BinaryClassifier, BinaryDetector, BinarySegmenter class RunCLI(LightningCLI): @@ -152,7 +156,7 @@ def create_wrapper_callback( # Get model and dataset classes model_cls, data_cls = { "classification": (GlassesClassifier, ImageClassificationDataset), - "detection": (GlassesDetector, None), + "detection": (GlassesDetector, ImageDetectionDataset), "segmentation": (GlassesSegmenter, ImageSegmentationDataset), }[task] @@ -168,10 +172,8 @@ def create_wrapper_callback( kwargs["label_type"] = {kind: 1, "no_" + kind: 0} wrapper_cls = BinaryClassifier elif task == "detection": - raise NotImplementedError("Detection is not implemented yet!") + wrapper_cls = BinaryDetector elif task == "segmentation": - kwargs["img_dirname"] = "images" - kwargs["name_map_fn"] = {"masks": lambda x: f"{int(x[:5])}.jpg"} wrapper_cls = BinarySegmenter # Initialize model architecture and load weights if needed diff --git a/src/glasses_detector/_data/__init__.py b/src/glasses_detector/_data/__init__.py index 0adcd70..4e14f42 100644 --- a/src/glasses_detector/_data/__init__.py +++ b/src/glasses_detector/_data/__init__.py @@ -1,3 +1,4 @@ -from .mixins import ImageLoaderMixin, DataLoaderMixin from .classification_dataset import ImageClassificationDataset +from .detection_dataset import ImageDetectionDataset +from .mixins import DataLoaderMixin, ImageLoaderMixin from .segmentation_dataset import ImageSegmentationDataset diff --git a/src/glasses_detector/_data/detection_dataset.py b/src/glasses_detector/_data/detection_dataset.py new file mode 100644 index 0000000..c70192b --- /dev/null +++ b/src/glasses_detector/_data/detection_dataset.py @@ -0,0 +1,107 @@ +import os +import random +from collections import defaultdict +from typing import Callable + +import albumentations as A +import torch +from torch.utils.data import Dataset + +from .mixins import DataLoaderMixin, ImageLoaderMixin + + +class ImageDetectionDataset(Dataset, ImageLoaderMixin, DataLoaderMixin): + # It's more efficient to implement a specific dataset for each task + # And it is very unlikely that multiple tasks will be considered at + # once, meaning a generic dataset is not needed + def __init__( + self, + root: str = ".", + split_type: str = "train", + img_folder: str = "images", + ann2img_fn: dict[str, Callable[[str], str]] = {}, + # for each annotation folder name, a function that maps annotation file name to the image file name it belongs + seed: int = 0, + ): + super().__init__() + + self.data = [] + cat2paths = defaultdict(lambda: {"names": [], "paths": []}) + + for dataset in os.listdir(root): + if not os.path.isdir(p := os.path.join(root, dataset, split_type)): + continue + + for cat in os.scandir(p): + # Read the list of names and paths to images/masks + name_fn = ann2img_fn.get(cat.name, lambda x: x.replace(".txt", ".jpg")) + names = list(map(name_fn, os.listdir(cat.path))) + paths = [f.path for f in os.scandir(cat.path)] + + # Extend the lists of image/annot names + paths + cat2paths[cat.name]["names"].extend(names) + cat2paths[cat.name]["paths"].extend(paths) + + # Pop the non-category folder (get image names and paths) + img_names, img_paths = cat2paths.pop(img_folder).values() + + for img_name, img_path in zip(img_names, img_paths): + # Add the default image entry + self.data.append({"image": img_path}) + + for cat_dirname, names_and_paths in cat2paths.items(): + if img_name in names_and_paths["names"]: + # Get the index of corresponding annotation + i = names_and_paths["names"].index(img_name) + annotation_path = names_and_paths["paths"][i] + self.data[-1][cat_dirname] = annotation_path + else: + # No annotation but add for equally sized batches + self.data[-1][cat_dirname] = None + + # Sort & shuffle + self.data.sort(key=lambda x: x["image"]) + random.seed(seed) + random.shuffle(self.data) + + # Create image augmentation pipeline based on split type + p = A.BboxParams(format="pascal_voc", label_fields=["classes"]) + self.transform = self.create_transform(split_type == "train", bbox_params=p) + + @property + def name2idx(self): + return dict(zip(self.data[0].keys()), range(len(self.data[0]))) + + @property + def idx2name(self): + return dict(zip(range(len(self.data[0]), self.data[0].keys()))) + + def __getitem__(self, index): + # Load the image, bboxes and classes + image = self.data[index]["image"] + bboxes = list(self.data[index].values())[1:] + labels = [1] * len(bboxes) + # labels = [self.cat2label(k) for k in list(self.data[index].keys())[1:]] + + (image, bboxes, labels) = self.load_image( + image=image, + bboxes=bboxes, + classes=labels, + transform=self.transform, + ) + + # TODO: create cat2label map and map class names to labels + # TODO: there may be more bboxes read than classes after loading + # the transformed image so consider adding either a max_bbox + # argument or implement a custom collate function for dataloader + + if len(bboxes) == 0: + bboxes = torch.tensor([[0, 0, 1, 1]], dtype=torch.float32) + labels = torch.tensor([0], dtype=torch.int64) + + annotations = {"boxes": bboxes, "labels": labels} + + return image, annotations + + def __len__(self): + return len(self.data) diff --git a/src/glasses_detector/_data/mixins.py b/src/glasses_detector/_data/mixins.py index f0f5171..45dc324 100644 --- a/src/glasses_detector/_data/mixins.py +++ b/src/glasses_detector/_data/mixins.py @@ -1,6 +1,9 @@ +from typing import Any + import albumentations as A import numpy import PIL.Image as Image +import skimage.transform as st import torch from albumentations.pytorch import ToTensorV2 from torch.utils.data import DataLoader @@ -8,7 +11,7 @@ class ImageLoaderMixin: @staticmethod - def create_transform(is_train: bool = False) -> A.Compose: + def create_transform(is_train: bool = False, **kwargs) -> A.Compose: # Default augmentation transform = [ A.VerticalFlip(), @@ -18,10 +21,10 @@ def create_transform(is_train: bool = False) -> A.Compose: A.OneOf( [ A.RandomResizedCrop(256, 256, p=0.5), - A.GridDistortion(), A.OpticalDistortion(distort_limit=0.1, shift_limit=0.1), A.PiecewiseAffine(), A.Perspective(), + A.GridDistortion(), ] ), A.OneOf( @@ -45,21 +48,26 @@ def create_transform(is_train: bool = False) -> A.Compose: A.GaussNoise(), ] ), - A.CoarseDropout(max_holes=5, p=0.3), A.Normalize(), ToTensorV2(), ] + if "bbox_params" not in kwargs: + transform.insert(-2, A.CoarseDropout(max_holes=5, p=0.3)) + if not is_train: # Only keep the last two transform = transform[-2:] - return A.Compose(transform) + return A.Compose(transform, **kwargs) @staticmethod def load_image( image: str | Image.Image | numpy.ndarray, masks: list[str | Image.Image | numpy.ndarray] = [], + bboxes: list[str | list[int | float | str]] = [], # x_min, y_min, x_max, y_max + classes: list[Any] = [], # one for each bbox + resize: tuple[int, int] | None = None, transform: A.Compose | bool = False, ) -> torch.Tensor: def open_image_file(image_file, is_mask=False): @@ -81,24 +89,109 @@ def open_image_file(image_file, is_mask=False): # Image is not a mask, so convert it to RGB image_file = numpy.stack([image_file] * 3, axis=-1) + if resize is not None: + # Resize image to new (w, h) + size = resize[1], resize[0] + image_file = st.resize(image_file, size) + return image_file + def open_bbox_files(bbox_files, classes): + # Init new + _bboxes, _classes = [], [] + + for i, bbox_file in enumerate(bbox_files): + if isinstance(bbox_file, str): + with open(bbox_file, "r") as f: + # Each line is bbox: "x_min y_min x_max y_max" + batch = [xyxy.strip().split() for xyxy in f.readlines()] + else: + # bbox_file is a single bbox (list[str | int | float]) + batch = [bbox_file] + + batch = [list(map(float, xyxy)) for xyxy in batch] + + for i, xyxy in enumerate(batch): + if xyxy[2] <= xyxy[0]: + batch[i][0] = min(xyxy[0], image.shape[1] - 1) + batch[i][2] = batch[i][0] + 1 + + if xyxy[3] <= xyxy[1]: + batch[i][1] = min(xyxy[1], image.shape[0] - 1) + batch[i][3] = batch[i][1] + 1 + + if resize is not None: + # Get old and new width, height + old_h, old_w = image.shape[:2] + new_w, new_h = resize + + # Convert bboxes to new (w, h) + batch = [ + [ + xyxy[0] * new_w / old_w, + xyxy[1] * new_h / old_h, + xyxy[2] * new_w / old_w, + xyxy[3] * new_h / old_h, + ] + for xyxy in batch + ] + + # Add to list + _bboxes.extend(batch) + + if classes != []: + # If classes are provided, add them + _classes.extend([classes[i]] * len(batch)) + + return _bboxes, _classes + + kwargs = {} + if isinstance(transform, bool): + if bboxes != []: + kwargs.update( + { + "bbox_params": A.BboxParams( + format="pascal_voc", + label_fields=["classes"] if classes != [] else None, + ) + } + ) + # Load transform (train/test is based on bool) - transform = ImageLoaderMixin.create_transform(transform) + transform = ImageLoaderMixin.create_transform(transform, **kwargs) - # Load image and mask files + # Load image, mask, bbox files image = open_image_file(image) masks = [open_image_file(m, True) for m in masks] + bboxes, classes = open_bbox_files(bboxes, classes) + + # Create transform kwargs + kwargs["image"] = image + kwargs.update({"masks": masks} if masks != [] else {}) + kwargs.update({"bboxes": bboxes} if bboxes != [] else {}) + kwargs.update({"classes": classes} if classes != [] else {}) + + # Transform everything, init returns + transformed = transform(**kwargs) + return_list = [transformed["image"]] + + if masks != []: + # TODO: check if transformation is converted to a tensor + return_list.append(transformed["masks"]) + + if bboxes != []: + bboxes = torch.tensor(transformed["bboxes"], dtype=torch.float32) + return_list.append(bboxes) - if masks == []: - return transform(image=image)["image"] + if classes != []: + classes = torch.tensor(transformed["classes"], dtype=torch.int64) + return_list.append(classes) - # Transform the image and masks - transformed = transform(image=image, masks=masks) - image, masks = transformed["image"], transformed["masks"] + if len(return_list) == 1: + return return_list[0] - return image, masks + return tuple(return_list) class DataLoaderMixin: diff --git a/src/glasses_detector/_wrappers/__init__.py b/src/glasses_detector/_wrappers/__init__.py index 1710c0f..f4762ff 100644 --- a/src/glasses_detector/_wrappers/__init__.py +++ b/src/glasses_detector/_wrappers/__init__.py @@ -1,2 +1,3 @@ from .binary_classifier import BinaryClassifier -from .binary_segmenter import BinarySegmenter \ No newline at end of file +from .binary_detector import BinaryDetector +from .binary_segmenter import BinarySegmenter diff --git a/src/glasses_detector/_wrappers/binary_detector.py b/src/glasses_detector/_wrappers/binary_detector.py index e69de29..e50d075 100644 --- a/src/glasses_detector/_wrappers/binary_detector.py +++ b/src/glasses_detector/_wrappers/binary_detector.py @@ -0,0 +1,104 @@ +import pytorch_lightning as pl +import torch +import torchmetrics +from torch.optim import AdamW +from torch.optim.lr_scheduler import ReduceLROnPlateau +from torchvision.ops import box_iou + + +class BinaryDetector(pl.LightningModule): + def __init__(self, model, train_loader=None, val_loader=None, test_loader=None): + super().__init__() + + # Assign attributes + self.model = model + self.train_loader = train_loader + self.val_loader = val_loader + self.test_loader = test_loader + + # Initialize some metrics to monitor the performance + self.label_metric = torchmetrics.F1Score(task="binary") + self.boxes_metric = torchmetrics.R2Score(num_outputs=4) + + def forward(self, *args): + return self.model(*args) + + def training_step(self, batch, batch_idx): + # Forward propagate and compute loss + # imgs = [img for img in batch[0]] + # annotations = [{k: v for k, v in t.items()} for t in batch[1]] + imgs = [*batch[0]] + annotations = [ + {"boxes": b, "labels": l} + for b, l in zip(batch[1]["boxes"], batch[1]["labels"]) + ] + loss_dict = self(imgs, annotations) + loss = sum(loss for loss in loss_dict.values()) + self.log("train_loss", loss, prog_bar=True) + return loss + + def eval_step(self, batch, prefix=""): + # Forward pass and compute loss + # imgs = [img for img in batch[0]] + # annotations = [{k: v for k, v in t.items()} for t in batch[1]] + + imgs = [*batch[0]] + annotations = [ + {"boxes": b, "labels": l} + for b, l in zip(batch[1]["boxes"], batch[1]["labels"]) + ] + + with torch.inference_mode(): + self.train() + loss_dict = self(imgs, annotations) + self.eval() + + loss = sum(loss for loss in loss_dict.values()) + + # Get actual labels and predictions + y_labels = torch.stack([ann["labels"] for ann in annotations]) + y_boxes = torch.stack([ann["boxes"] for ann in annotations]) + y_hat = self(imgs) + y_hat_labels = torch.stack([pred["labels"] for pred in y_hat]) + y_hat_boxes = torch.stack([pred["boxes"] for pred in y_hat]) + + # Compute metrics + f1_score = self.label_metric(y_hat_labels, y_labels) + r2_score = self.boxes_metric(y_hat_boxes.squeeze(), y_boxes.squeeze()) + ious = [ + box_iou(pred_box, target_box) + for pred_box, target_box in zip(y_hat_boxes, y_boxes) + ] + mean_iou = torch.stack([iou.mean() for iou in ious]).mean() + + # Log the loss and the metrics + self.log(f"{prefix}_loss", loss, prog_bar=True) + self.log(f"{prefix}_f1", f1_score, prog_bar=True) + self.log(f"{prefix}_r2", r2_score, prog_bar=True) + self.log(f"{prefix}_iou", mean_iou, prog_bar=True) + + def validation_step(self, batch, batch_idx): + self.eval_step(batch, prefix="val") + + def test_step(self, batch, batch_idx): + self.eval_step(batch, prefix="test") + + def train_dataloader(self): + return self.train_loader + + def val_dataloader(self): + return self.val_loader + + def test_dataloader(self): + return self.test_loader + + def configure_optimizers(self): + optimizer = AdamW(self.parameters(), lr=1e-3, weight_decay=1e-2) + # scheduler = CosineAnnealingWarmRestarts(optimizer, 10, 2, 1e-6) + scheduler = ReduceLROnPlateau(optimizer, factor=0.1, patience=10) + + return { + "optimizer": optimizer, + "lr_scheduler": scheduler, + "monitor": "val_loss", + } diff --git a/src/glasses_detector/detector.py b/src/glasses_detector/detector.py index fb77963..69442f7 100644 --- a/src/glasses_detector/detector.py +++ b/src/glasses_detector/detector.py @@ -6,6 +6,8 @@ fasterrcnn_resnet50_fpn_v2, ssdlite320_mobilenet_v3_large, ) +from torchvision.models.detection.faster_rcnn import FastRCNNPredictor +from torchvision.models.detection.ssd import SSDHead from .components.base_model import BaseGlassesModel from .models import TinyBinaryDetector @@ -37,11 +39,17 @@ def create_model(model_name: str) -> nn.Module: case "tinydetnet_v1": m = TinyBinaryDetector() case "ssdlite320_mobilenet_v3_large": - m = ssdlite320_mobilenet_v3_large() - # m.classifier = SSDHead(40, 960, 1, 128) + m = ssdlite320_mobilenet_v3_large( + num_classes=2, + detections_per_img=1, + topk_candidates=10, + ) + # num_in = m.backbone.out_channels + # m.head = SSDHead(num_in, m.head.num_anchors, 2) case "fasterrcnn_resnet50_fpn_v2": - m = fasterrcnn_resnet50_fpn_v2() - # m.roi_heads.box_predictor = FastRCNNPredictor(512, 4) + m = fasterrcnn_resnet50_fpn_v2(num_classes=2) + # num_in = m.roi_heads.box_predictor.cls_score.in_features + # m.roi_heads.box_predictor = FastRCNNPredictor(num_in, 2) case _: raise ValueError(f"{model_name} is not a valid choice!") diff --git a/src/glasses_detector/models/tiny_binary_detector.py b/src/glasses_detector/models/tiny_binary_detector.py index 96c568d..bf07d86 100644 --- a/src/glasses_detector/models/tiny_binary_detector.py +++ b/src/glasses_detector/models/tiny_binary_detector.py @@ -37,21 +37,65 @@ def _create_block(self, num_in, num_out, filter_size): nn.MaxPool2d(2, 2), ) - def forward(self, x: torch.Tensor) -> torch.Tensor: - """Performs forward pass. + def forward( + self, + imgs: list[torch.Tensor], + targets: list[dict[str, torch.Tensor]] | None = None, + ) -> dict[str, torch.Tensor] | list[dict[str, torch.Tensor]]: + """Forward pass through the network. - Predicts the bounding box for the given batch of inputs. + Args: + imgs (list[torch.Tensor]): A list of images. + annotations (list[dict[str, torch.Tensor]], optional): A + list of annotations for each image. Each annotation is a + dictionary that contains the bounding boxes and labels + for all objects in the image. If ``None``, the network + is in inference mode. + + Returns: + dict[str, torch.Tensor] | list[dict[str, torch.Tensor]]: + During training, returns a dictionary containing the + classification and regression losses for each image in the + batch. During inference, returns a list of dictionaries, one + for each input image. Each dictionary contains the predicted + boxes, labels, and scores for all detections in the image. + """ + preds = [self.fc(self.features(img)) for img in imgs] + + if targets is not None: + return self.compute_loss(preds, targets) + else: + return [ + { + "boxes": pred, + "labels": torch.ones(pred.size(0), dtype=torch.int64), + "scores": torch.ones(pred.size(0)), + } + for pred in preds + ] + + def compute_loss( + self, + preds: list[torch.Tensor], + targets: list[dict[str, torch.Tensor]], + ) -> dict[str, torch.Tensor]: + """Compute the loss for the predicted bounding boxes. Args: - x (torch.Tensor): Image batch of shape (N, C, H, W). Note - that pixel values are normalized and squeezed between - 0 and 1. + preds (list[torch.Tensor]): A list of predicted bounding + boxes for each image. + targets (list[dict[str, torch.Tensor]]): A list of targets + for each image. Returns: - torch.Tensor: An output tensor of shape (N, 4) indicating - the bounding box coordinates for each nth image. The - coordinates are in the format (x_min, y_min, x_max, y_max), - where (x_min, y_min) is the top-left corner of the bounding - box and (x_max, y_max) is the bottom-right corner. + dict[str, torch.Tensor]: A dictionary of losses for each + image in the batch. """ - return self.fc(self.features(x)) + criterion = nn.MSELoss() + loss_dict = {} + + for i, pred in enumerate(preds): + loss = criterion(pred, targets[i]["boxes"]) + loss_dict[i] = loss + + return loss_dict diff --git a/test.py b/test.py deleted file mode 100644 index 86899d4..0000000 --- a/test.py +++ /dev/null @@ -1,5 +0,0 @@ -from src.glasses_detector import GlassesDetector - -if __name__ == "__main__": - detector = GlassesDetector(size="medium", pretrained=False) - print(detector.model)