diff --git a/src/labelformat/cli/registry.py b/src/labelformat/cli/registry.py index 0346831..5e32962 100644 --- a/src/labelformat/cli/registry.py +++ b/src/labelformat/cli/registry.py @@ -10,11 +10,16 @@ ObjectDetectionInput, ObjectDetectionOutput, ) +from labelformat.model.video_instance_segmentation import ( + VideoInstanceSegmentationInput, + VideoInstanceSegmentationOutput, +) class Task(Enum): INSTANCE_SEGMENTATION = "instance-segmentation" OBJECT_DETECTION = "object-detection" + VIDEO_INSTANCE_SEGMENTATION = "video-instance-segmentation" @dataclass @@ -30,12 +35,16 @@ class Registry: def cli_register(format: str, task: Task) -> Callable[[Type], Type]: # type: ignore[type-arg] def decorator(cls: Type) -> Type: # type: ignore[type-arg] - if issubclass(cls, ObjectDetectionInput) or issubclass( - cls, InstanceSegmentationInput + if ( + issubclass(cls, ObjectDetectionInput) + or issubclass(cls, InstanceSegmentationInput) + or issubclass(cls, VideoInstanceSegmentationInput) ): _REGISTRY.input[task][format] = cls - elif issubclass(cls, ObjectDetectionOutput) or issubclass( - cls, InstanceSegmentationOutput + elif ( + issubclass(cls, ObjectDetectionOutput) + or issubclass(cls, InstanceSegmentationOutput) + or issubclass(cls, VideoInstanceSegmentationOutput) ): _REGISTRY.output[task][format] = cls else: @@ -44,7 +53,9 @@ def decorator(cls: Type) -> Type: # type: ignore[type-arg] f"'{ObjectDetectionInput}', " f"'{InstanceSegmentationInput}', " f"'{ObjectDetectionOutput}', " - f"'{InstanceSegmentationOutput}'" + f"'{InstanceSegmentationOutput}', " + f"'{VideoInstanceSegmentationInput}', " + f"'{VideoInstanceSegmentationOutput}'" ) return cls diff --git a/src/labelformat/formats/__init__.py b/src/labelformat/formats/__init__.py index f904994..4fef2e5 100644 --- a/src/labelformat/formats/__init__.py +++ b/src/labelformat/formats/__init__.py @@ -47,3 +47,4 @@ YOLOv11ObjectDetectionInput, YOLOv11ObjectDetectionOutput, ) +from labelformat.formats.youtube_vis import YouTubeVISInput, YouTubeVISOutput diff --git a/src/labelformat/formats/youtube_vis.py b/src/labelformat/formats/youtube_vis.py new file mode 100644 index 0000000..21e0d7b --- /dev/null +++ b/src/labelformat/formats/youtube_vis.py @@ -0,0 +1,215 @@ +import json +from argparse import ArgumentParser +from pathlib import Path +from typing import Any, Dict, Iterable, List, Union + +import cv2 +import numpy as np +import pycocotools.mask as mask_utils + +from labelformat.cli.registry import Task, cli_register +from labelformat.model.category import Category +from labelformat.model.multipolygon import MultiPolygon +from labelformat.model.video import Video +from labelformat.model.video_instance_segmentation import ( + SingleVideoInstanceSegmentation, + VideoInstanceSegmentation, + VideoInstanceSegmentationInput, + VideoInstanceSegmentationOutput, +) +from labelformat.types import JsonDict, ParseError + + +@cli_register(format="youtube_vis", task=Task.VIDEO_INSTANCE_SEGMENTATION) +class YouTubeVISInput(VideoInstanceSegmentationInput): + @staticmethod + def add_cli_arguments(parser: ArgumentParser) -> None: + parser.add_argument( + "--input-file", + type=Path, + required=True, + help="Path to input YouTube-VIS JSON file", + ) + + def __init__(self, input_file: Path) -> None: + with input_file.open() as file: + self._data = json.load(file) + + def get_categories(self) -> Iterable[Category]: + for category in self._data["categories"]: + yield Category( + id=category["id"], + name=category["name"], + ) + + def get_videos(self) -> Iterable[Video]: + for video in self._data["videos"]: + yield Video( + id=video["id"], + filenames=video["file_names"], + width=int(video["width"]), + height=int(video["height"]), + length=int(video["length"]), + ) + + def get_labels(self) -> Iterable[VideoInstanceSegmentation]: + video_id_to_video = {video.id: video for video in self.get_videos()} + category_id_to_category = { + category.id: category for category in self.get_categories() + } + video_id_to_annotations: Dict[int, List[JsonDict]] = { + video_id: [] for video_id in video_id_to_video.keys() + } + for ann in self._data["annotations"]: + video_id_to_annotations[ann["video_id"]].append(ann) + + for video_id, annotations in video_id_to_annotations.items(): + objects = [] + for ann in annotations: + if "segmentations" not in ann: + raise ParseError(f"Segmentations missing for video id {video_id}") + segmentations = _youtube_vis_segmentation_to_multipolygon(ann["segmentations"]) + objects.append( + SingleVideoInstanceSegmentation( + category=category_id_to_category[ann["category_id"]], + segmentation=segmentations, + ) + ) + yield VideoInstanceSegmentation( + video=video_id_to_video[video_id], + objects=objects, + ) + + +@cli_register(format="youtube_vis", task=Task.VIDEO_INSTANCE_SEGMENTATION) +class YouTubeVISOutput(VideoInstanceSegmentationOutput): + def add_cli_arguments(parser: ArgumentParser) -> None: + parser.add_argument( + "--output-file", + type=Path, + required=True, + help="Path to output YouTube-VIS JSON file", + ) + + def save(self, label_input: VideoInstanceSegmentationInput) -> None: + data = {} + data["videos"] = _get_output_videos_dict(videos=label_input.get_videos()) + data["categories"] = _get_output_categories_dict( + categories=label_input.get_categories() + ) + data["annotations"] = [] + unique_id = 1 # Initialize a counter for unique IDs + for label in label_input.get_labels(): + for id, obj in enumerate(label.objects): + annotation = { + "video_id": label.video.id, + "category_id": obj.category.id, + "segmentations": _multipolygon_to_youtube_vis_segmentation(obj.segmentation, + label.video.height, + label.video.width), + "id": unique_id, + "width": label.video.width, + "height": label.video.height, + "iscrowd": 0, + "occlusion": ['no_occlusion' for _ in range(label.video.length)], + } + data["annotations"].append(annotation) + unique_id += 1 + + self.output_file.parent.mkdir(parents=True, exist_ok=True) + with self.output_file.open ("w") as file: + json.dump(data, file, indent=2) + + def __init__(self, output_file: Path) -> None: + self.output_file = output_file + + + +def _youtube_vis_segmentation_to_multipolygon( + youtube_vis_segmentation: List[Union[List[float], Dict[str, Any]]], +) -> MultiPolygon: + """Convert YouTube-VIS segmentation to MultiPolygon.""" + polygons = [] + for polygon in youtube_vis_segmentation: + if isinstance(polygon, dict) and "counts" in polygon and "size" in polygon: + # Convert RLE format to polygon + binary_mask = mask_utils.decode(polygon) + contours = _mask_to_polygons(binary_mask) + # Process each contour the same way as regular polygons + for contour in contours: + polygons.append( + list( + zip( + [float(x) for x in contour[:, 0]], + [float(x) for x in contour[:, 1]], + ) + ) + ) + else: + # Handle polygon format + if len(polygon) % 2 != 0: + raise ParseError( + f"Invalid polygon with {len(polygon)} points: {polygon}" + ) + polygons.append( + list( + zip( + [float(x) for x in polygon[0::2]], + [float(x) for x in polygon[1::2]], + ) + ) + ) + return MultiPolygon(polygons=polygons) + + +def _mask_to_polygons(mask: np.ndarray) -> List[np.ndarray]: + """Convert binary mask to list of contours.""" + contours, _ = cv2.findContours( + mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE + ) + return [contour.squeeze() for contour in contours if len(contour) >= 3] + + +def _multipolygon_to_youtube_vis_segmentation( + multipolygon: MultiPolygon, + height: int, + width: int, +) -> List[Union[List[float], Dict[str, Any]]]: + """Convert MultiPolygon to YouTube-VIS segmentation.""" + youtube_vis_segmentation = [] + for polygon in multipolygon.polygons: + # Convert polygon to RLE format + mask = np.zeros((height, width), dtype=np.uint8) # Define the mask size + cv2.fillPoly(mask, [np.array(polygon, dtype=np.int32)], 1) + rle = mask_utils.encode(np.asfortranarray(mask)) + rle['counts'] = rle['counts'].decode('utf-8') # Ensure counts is a string + youtube_vis_segmentation.append(rle) + return youtube_vis_segmentation + +def _get_output_videos_dict( + videos: Iterable[Video], +) -> List[JsonDict]: + """Get the "videos" dict for YouTube-VIS JSON.""" + return [ + { + "id": video.id, + "file_names": video.filenames, + "length": video.length, + "width": video.width, + "height": video.height, + } + for video in videos + ] + + +def _get_output_categories_dict( + categories: Iterable[Category], +) -> List[JsonDict]: + """Get the "categories" dict for YouTube-VIS JSON.""" + return [ + { + "id": category.id, + "name": category.name, + } + for category in categories + ] diff --git a/src/labelformat/model/video.py b/src/labelformat/model/video.py new file mode 100644 index 0000000..4baff49 --- /dev/null +++ b/src/labelformat/model/video.py @@ -0,0 +1,11 @@ +from dataclasses import dataclass +from typing import List + + +@dataclass(frozen=True) +class Video: + id: int + filenames: List[str] + width: int + height: int + length: int diff --git a/src/labelformat/model/video_instance_segmentation.py b/src/labelformat/model/video_instance_segmentation.py new file mode 100644 index 0000000..ecf458b --- /dev/null +++ b/src/labelformat/model/video_instance_segmentation.py @@ -0,0 +1,49 @@ +from abc import ABC, abstractmethod +from argparse import ArgumentParser +from dataclasses import dataclass +from typing import Iterable, List + +from labelformat.model.category import Category +from labelformat.model.multipolygon import MultiPolygon +from labelformat.model.video import Video + + +@dataclass(frozen=True) +class SingleVideoInstanceSegmentation: + category: Category + segmentation: List[MultiPolygon] + + +@dataclass(frozen=True) +class VideoInstanceSegmentation: + video: Video + objects: List[SingleVideoInstanceSegmentation] + + +class VideoInstanceSegmentationInput(ABC): + @staticmethod + @abstractmethod + def add_cli_arguments(parser: ArgumentParser) -> None: + raise NotImplementedError() + + @abstractmethod + def get_categories(self) -> Iterable[Category]: + raise NotImplementedError() + + @abstractmethod + def get_videos(self) -> Iterable[str]: + raise NotImplementedError() + + @abstractmethod + def get_labels(self) -> Iterable[VideoInstanceSegmentation]: + raise NotImplementedError() + + +class VideoInstanceSegmentationOutput(ABC): + @staticmethod + @abstractmethod + def add_cli_arguments(parser: ArgumentParser) -> None: + raise NotImplementedError() + + def save(self, label_input: VideoInstanceSegmentationInput) -> None: + raise NotImplementedError() diff --git a/tests/fixtures/video_instance_segmentation/OVIS/train/annotations_train.json b/tests/fixtures/video_instance_segmentation/OVIS/train/annotations_train.json new file mode 100644 index 0000000..87864a9 --- /dev/null +++ b/tests/fixtures/video_instance_segmentation/OVIS/train/annotations_train.json @@ -0,0 +1,773 @@ +{ + "info": { + "description": "OVIS", + "url": "http://songbai.site/ovis/", + "version": "1.0", + "year": 2021, + "contributor": "youku", + "date_created": "2021-01-01" + }, + "videos": [ + { + "width": 1920, + "length": 3, + "license": 1, + "file_names": [ + "85aa3b0e/img_0000001.jpg", + "85aa3b0e/img_0000002.jpg", + "85aa3b0e/img_0000003.jpg" + ], + "id": 1, + "height": 886 + } + ], + "annotations": [ + { + "length": 1, + "category_id": 19, + "video_id": 1, + "iscrowd": 0, + "id": 1, + "height": 886, + "width": 1920, + "segmentations": [ + { + "size": [ + 886, + 1920 + ], + "counts": "YTW71dk04cN0nVO3nh02mVO1FOZh04mWO0ZO?bh0EPXOW1ig0TOlWOS1mg0VOiWOR1Ph0b1H7I6J4L5K4L4L4L3M2M2O000000001O001O\\KhYOk3Wf0oKXZOi3he0oKfZOk3ef0M4M3M2M4L3N1N2N2N2O1N2N2N2M3N2N2N3M3L3N3M3M2M4M3L3M2M2N3M2N2N1K6O1L4H8BWUOHPk0OknlZ1" + }, + { + "size": [ + 886, + 1920 + ], + "counts": "PcZ72ck05[UO8Ui0J_VOl0Ui0WO_VOV1]i0f0fNoMfXOe2df0jMnXOY2nf0VNeXOk1Yg0[1M4M3N3L3N1N10000O0100O10O100001O1O2N1O2N3M6J7H9G6J3M4M2M3M3M3M3M3L4M3M3M3M2M3N1O2M2M4M1N2O0O2M2L5eNPVOk0Tj0UOnUOg0Uj0WOnUOh0Rj0WOQVOf0oi0YOTVOb0Pj0]OYVO5mi0IUXkZ1" + }, + { + "size": [ + 886, + 1920 + ], + "counts": "PcZ73o05\\i01XVO?`i0ETVOj0ei0i0iNWNVXOm1bg0`NVXOc1bg0iNVXOl1Sg0_NdXOc1Xg0b1L5K3M3M2O1O1O00O2O000O10000O4M4L=B4M4K4M2M3N3L3N3L3M4M2M2N1O1N2O1O1O3M4K4M4L4L4K3L4M00J6N12O8G;E5K5K5J6@kimZ1" + } + ], + "bboxes": [ + [ + 267, + 73, + 66, + 172 + ], + [ + 271, + 80, + 64, + 171 + ], + [ + 271, + 78, + 61, + 173 + ] + ], + "areas": [ + 11352, + 10944, + 10553 + ], + "occlusion": [ + "no_occlusion", + "no_occlusion", + "no_occlusion" + ] + }, + { + "length": 1, + "category_id": 19, + "video_id": 1, + "iscrowd": 0, + "id": 2, + "height": 886, + "width": 1920, + "segmentations": [ + { + "size": [ + 886, + 1920 + ], + "counts": "jYk8i0hj07I6J7I6K5K5K5K5J6K5K5K5K4L4M3L4L4M3L3J6H9G8H8F9G9G:J5O1O2N1O1O1O0101O1M3ZOUZO]KPf0\\4XZO^Kme0Y2]\\OeMdc0T2d\\OlM[c0m1m\\OSNRc0f1W]OXNib0g1Y]OYNfb0g1[]OXNeb0g1]]OYNbb0g1_]OYN`b0f1c]OXN]b0h1h300O100O101N100O1O1000000L5N1O1N3J5K5O1D=N1O1O3JboRY1" + }, + { + "size": [ + 886, + 1920 + ], + "counts": "jQm88Sk0`0@>Bdj02nWOQOVe0U1aZOlNae0\\1TZOfNme0c1gYO^N[f0k1ZYOUNhf0S2mXOnMUg0Z2`XOhMag0`2UXO`Mmg0T32003M6J5K4L2N1ON2N2N2N2O1N3M3M2O1N1O1O1O1O1N2O1N3L3N2N3M2O1O2L3N2M4M2M3N3L2OO00YN^VO[1bi0eN`VOY1`i0gNcVOV1]i0kNdVOS1\\i0mNfVOQ1Yi0POjVOk0Xi0UOjVOf0Yi0ZOPWO;Ri0FVWO0jh01YWOJih06ZWODih0 None: if isinstance(obj1, dict): - assert isinstance(obj2, dict) - assert sorted(obj1.keys()) == sorted(obj2.keys()) - for key in obj1.keys(): - assert_almost_equal_recursive( - obj1[key], obj2[key], rel=rel, abs=abs, nan_ok=nan_ok - ) + if 'counts' in obj1: #For RLE encodded segmentations + import pycocotools.mask as mask_utils + mask1 = mask_utils.decode(obj1) + mask2 = mask_utils.decode(obj2) + assert mask1.shape == mask2.shape, "RLE masks have different shapes" + # Allow for subtle differences by using a tolerance + difference = np.abs(mask1 - mask2).sum() + tolerance = 5 # Adjust tolerance as needed + assert (difference <= tolerance), "RLE masks differ beyond tolerance" + else: + assert isinstance(obj2, dict) + assert sorted(obj1.keys()) == sorted(obj2.keys()) + for key in obj1.keys(): + assert_almost_equal_recursive( + obj1[key], obj2[key], rel=rel, abs=abs, nan_ok=nan_ok + ) elif isinstance(obj1, list): assert isinstance(obj2, list) assert len(obj1) == len(obj2) diff --git a/tests/integration/video_instance_segmentation/test_youtube_vis.py b/tests/integration/video_instance_segmentation/test_youtube_vis.py new file mode 100644 index 0000000..7dcece4 --- /dev/null +++ b/tests/integration/video_instance_segmentation/test_youtube_vis.py @@ -0,0 +1,81 @@ +import json +from pathlib import Path + +import pytest +from labelformat.formats.youtube_vis import YouTubeVISInput, YouTubeVISOutput +from labelformat.model.category import Category +from labelformat.model.video import Video +from tests.integration.integration_utils import VIDEO_INSTANCE_SEGMENTATION_FIXTURES_DIR +from tests.integration.integration_utils import assert_almost_equal_recursive +REAL_DATA_FILE = VIDEO_INSTANCE_SEGMENTATION_FIXTURES_DIR / "OVIS" / "train" / "annotations_train.json" + +def test_youtube_vis_input_with_real_data() -> None: + label_input = YouTubeVISInput(input_file=REAL_DATA_FILE) + + categories = list(label_input.get_categories()) + # Add assertions based on the expected categories in your real data + assert categories # Ensure categories are not empty + + videos = list(label_input.get_videos()) + # Add assertions based on the expected videos in your real data + assert videos # Ensure videos are not empty + + labels = list(label_input.get_labels()) + # Add assertions based on the expected labels in your real data + assert labels # Ensure labels are not empty + +def test_youtube_vis_output_with_real_data(tmp_path: Path) -> None: + label_input = YouTubeVISInput(input_file=REAL_DATA_FILE) + output_file = tmp_path / "output.json" + label_output = YouTubeVISOutput(output_file=output_file) + + label_output.save(label_input=label_input) + + output_data = json.loads(output_file.read_text()) + assert "videos" in output_data + assert "categories" in output_data + assert "annotations" in output_data + + # Add assertions based on the expected output structure and content + assert output_data["videos"] # Ensure videos are not empty + assert output_data["categories"] # Ensure categories are not empty + assert output_data["annotations"] # Ensure annotations are not empty + + +def test_youtube_vis_to_youtube_vis(tmp_path: Path) -> None: + + label_input = YouTubeVISInput(input_file=REAL_DATA_FILE) + YouTubeVISOutput(output_file=tmp_path / "annotations_train.json").save( + label_input=label_input + ) + + # Compare jsons. + output_json = json.loads((tmp_path / "annotations_train.json").read_text()) + expected_json = json.loads( + REAL_DATA_FILE.read_text() + ) + + # Remove fields that are not converted or are expected to differ + if "info" in expected_json: + del expected_json["info"] + + if "licenses" in expected_json: + del expected_json["licenses"] + + for category in expected_json["categories"]: + del category["supercategory"] + + for video in expected_json["videos"]: + del video["license"] + + for annotation in expected_json["annotations"]: + del annotation["areas"] + del annotation["bboxes"] + del annotation["length"] + del annotation["occlusion"] + + for annotation in output_json["annotations"]: + del annotation["occlusion"] + + + assert_almost_equal_recursive(output_json, expected_json) \ No newline at end of file