Skip to content

First comit to add video instance segmentation #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/labelformat/cli/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@
ObjectDetectionInput,
ObjectDetectionOutput,
)
from labelformat.model.video_instance_segmentation import (
VideoInstanceSegmentationInput,
VideoInstanceSegmentationOutput,
)


class Task(Enum):
INSTANCE_SEGMENTATION = "instance-segmentation"
OBJECT_DETECTION = "object-detection"
VIDEO_INSTANCE_SEGMENTATION = "video-instance-segmentation"


@dataclass
Expand All @@ -30,12 +35,16 @@ class Registry:

def cli_register(format: str, task: Task) -> Callable[[Type], Type]: # type: ignore[type-arg]
def decorator(cls: Type) -> Type: # type: ignore[type-arg]
if issubclass(cls, ObjectDetectionInput) or issubclass(
cls, InstanceSegmentationInput
if (
issubclass(cls, ObjectDetectionInput)
or issubclass(cls, InstanceSegmentationInput)
or issubclass(cls, VideoInstanceSegmentationInput)
):
_REGISTRY.input[task][format] = cls
elif issubclass(cls, ObjectDetectionOutput) or issubclass(
cls, InstanceSegmentationOutput
elif (
issubclass(cls, ObjectDetectionOutput)
or issubclass(cls, InstanceSegmentationOutput)
or issubclass(cls, VideoInstanceSegmentationOutput)
):
_REGISTRY.output[task][format] = cls
else:
Expand All @@ -44,7 +53,9 @@ def decorator(cls: Type) -> Type: # type: ignore[type-arg]
f"'{ObjectDetectionInput}', "
f"'{InstanceSegmentationInput}', "
f"'{ObjectDetectionOutput}', "
f"'{InstanceSegmentationOutput}'"
f"'{InstanceSegmentationOutput}', "
f"'{VideoInstanceSegmentationInput}', "
f"'{VideoInstanceSegmentationOutput}'"
)
return cls

Expand Down
1 change: 1 addition & 0 deletions src/labelformat/formats/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@
YOLOv11ObjectDetectionInput,
YOLOv11ObjectDetectionOutput,
)
from labelformat.formats.youtube_vis import YouTubeVISInput, YouTubeVISOutput
215 changes: 215 additions & 0 deletions src/labelformat/formats/youtube_vis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import json
from argparse import ArgumentParser
from pathlib import Path
from typing import Any, Dict, Iterable, List, Union

import cv2
import numpy as np
import pycocotools.mask as mask_utils
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the long run we would like to avoid using pycocotools. But for this PR it's fine. We can make a follow up PR to address this. Same goes for opencv.

For this PR you'd there also have to add cv2 and pycocotools to the poetry.toml file.


from labelformat.cli.registry import Task, cli_register
from labelformat.model.category import Category
from labelformat.model.multipolygon import MultiPolygon
from labelformat.model.video import Video
from labelformat.model.video_instance_segmentation import (
SingleVideoInstanceSegmentation,
VideoInstanceSegmentation,
VideoInstanceSegmentationInput,
VideoInstanceSegmentationOutput,
)
from labelformat.types import JsonDict, ParseError


@cli_register(format="youtube_vis", task=Task.VIDEO_INSTANCE_SEGMENTATION)
class YouTubeVISInput(VideoInstanceSegmentationInput):
@staticmethod
def add_cli_arguments(parser: ArgumentParser) -> None:
parser.add_argument(
"--input-file",
type=Path,
required=True,
help="Path to input YouTube-VIS JSON file",
)

def __init__(self, input_file: Path) -> None:
with input_file.open() as file:
self._data = json.load(file)

def get_categories(self) -> Iterable[Category]:
for category in self._data["categories"]:
yield Category(
id=category["id"],
name=category["name"],
)

def get_videos(self) -> Iterable[Video]:
for video in self._data["videos"]:
yield Video(
id=video["id"],
filenames=video["file_names"],
width=int(video["width"]),
height=int(video["height"]),
length=int(video["length"]),
)

def get_labels(self) -> Iterable[VideoInstanceSegmentation]:
video_id_to_video = {video.id: video for video in self.get_videos()}
category_id_to_category = {
category.id: category for category in self.get_categories()
}
video_id_to_annotations: Dict[int, List[JsonDict]] = {
video_id: [] for video_id in video_id_to_video.keys()
}
for ann in self._data["annotations"]:
video_id_to_annotations[ann["video_id"]].append(ann)

for video_id, annotations in video_id_to_annotations.items():
objects = []
for ann in annotations:
if "segmentations" not in ann:
raise ParseError(f"Segmentations missing for video id {video_id}")
segmentations = _youtube_vis_segmentation_to_multipolygon(ann["segmentations"])
objects.append(
SingleVideoInstanceSegmentation(
category=category_id_to_category[ann["category_id"]],
segmentation=segmentations,
)
)
yield VideoInstanceSegmentation(
video=video_id_to_video[video_id],
objects=objects,
)


@cli_register(format="youtube_vis", task=Task.VIDEO_INSTANCE_SEGMENTATION)
class YouTubeVISOutput(VideoInstanceSegmentationOutput):
def add_cli_arguments(parser: ArgumentParser) -> None:
parser.add_argument(
"--output-file",
type=Path,
required=True,
help="Path to output YouTube-VIS JSON file",
)

def save(self, label_input: VideoInstanceSegmentationInput) -> None:
data = {}
data["videos"] = _get_output_videos_dict(videos=label_input.get_videos())
data["categories"] = _get_output_categories_dict(
categories=label_input.get_categories()
)
data["annotations"] = []
unique_id = 1 # Initialize a counter for unique IDs
for label in label_input.get_labels():
for id, obj in enumerate(label.objects):
annotation = {
"video_id": label.video.id,
"category_id": obj.category.id,
"segmentations": _multipolygon_to_youtube_vis_segmentation(obj.segmentation,
label.video.height,
label.video.width),
"id": unique_id,
"width": label.video.width,
"height": label.video.height,
"iscrowd": 0,
"occlusion": ['no_occlusion' for _ in range(label.video.length)],
}
data["annotations"].append(annotation)
unique_id += 1

self.output_file.parent.mkdir(parents=True, exist_ok=True)
with self.output_file.open ("w") as file:
json.dump(data, file, indent=2)

def __init__(self, output_file: Path) -> None:
self.output_file = output_file



def _youtube_vis_segmentation_to_multipolygon(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this to a new folder and file src/labelformat/utils/segmentation.py

youtube_vis_segmentation: List[Union[List[float], Dict[str, Any]]],
) -> MultiPolygon:
"""Convert YouTube-VIS segmentation to MultiPolygon."""
polygons = []
for polygon in youtube_vis_segmentation:
if isinstance(polygon, dict) and "counts" in polygon and "size" in polygon:
# Convert RLE format to polygon
binary_mask = mask_utils.decode(polygon)
contours = _mask_to_polygons(binary_mask)
# Process each contour the same way as regular polygons
for contour in contours:
polygons.append(
list(
zip(
[float(x) for x in contour[:, 0]],
[float(x) for x in contour[:, 1]],
)
)
)
else:
# Handle polygon format
if len(polygon) % 2 != 0:
raise ParseError(
f"Invalid polygon with {len(polygon)} points: {polygon}"
)
polygons.append(
list(
zip(
[float(x) for x in polygon[0::2]],
[float(x) for x in polygon[1::2]],
)
)
)
return MultiPolygon(polygons=polygons)


def _mask_to_polygons(mask: np.ndarray) -> List[np.ndarray]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also move this to utils/segmentation.py

"""Convert binary mask to list of contours."""
contours, _ = cv2.findContours(
mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
return [contour.squeeze() for contour in contours if len(contour) >= 3]


def _multipolygon_to_youtube_vis_segmentation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also move this to utils/segmentation.py

multipolygon: MultiPolygon,
height: int,
width: int,
) -> List[Union[List[float], Dict[str, Any]]]:
"""Convert MultiPolygon to YouTube-VIS segmentation."""
youtube_vis_segmentation = []
for polygon in multipolygon.polygons:
# Convert polygon to RLE format
mask = np.zeros((height, width), dtype=np.uint8) # Define the mask size
cv2.fillPoly(mask, [np.array(polygon, dtype=np.int32)], 1)
rle = mask_utils.encode(np.asfortranarray(mask))
rle['counts'] = rle['counts'].decode('utf-8') # Ensure counts is a string
youtube_vis_segmentation.append(rle)
return youtube_vis_segmentation

def _get_output_videos_dict(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also move this to utils/segmentation.py

videos: Iterable[Video],
) -> List[JsonDict]:
"""Get the "videos" dict for YouTube-VIS JSON."""
return [
{
"id": video.id,
"file_names": video.filenames,
"length": video.length,
"width": video.width,
"height": video.height,
}
for video in videos
]


def _get_output_categories_dict(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also move this to utils/segmentation.py

categories: Iterable[Category],
) -> List[JsonDict]:
"""Get the "categories" dict for YouTube-VIS JSON."""
return [
{
"id": category.id,
"name": category.name,
}
for category in categories
]
11 changes: 11 additions & 0 deletions src/labelformat/model/video.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dataclasses import dataclass
from typing import List


@dataclass(frozen=True)
class Video:
id: int
filenames: List[str]
width: int
height: int
length: int
49 changes: 49 additions & 0 deletions src/labelformat/model/video_instance_segmentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from abc import ABC, abstractmethod
from argparse import ArgumentParser
from dataclasses import dataclass
from typing import Iterable, List

from labelformat.model.category import Category
from labelformat.model.multipolygon import MultiPolygon
from labelformat.model.video import Video


@dataclass(frozen=True)
class SingleVideoInstanceSegmentation:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a bunch of SingleVideoInstanceSegmentation it would be hard to figure out how they are connected. Shouldn't we add a track id? Otherwise we would need to use the index in the list or so. I think adding a track id might be easier and less error prone.

I'd also suggest to add the frame number to each of the objects.

I'd suggest something like this instead:

@dataclass(frozen=True)
class FrameSegmentation:
    frame: int 
    segmentation: MultiPolygon

@dataclass(frozen=True)
class SingleVideoInstanceSegmentation:
    track_id: int
    category: Category
    frame_segmentations: List[FrameSegmentation]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I expected something similar when I first used YouTube VIS (YVIS) datasets. However, YVIS uses an approach without track IDs. More specifically, each video in YVIS has a list of annotations for each object with the size 'video_length.' This means that if an object disappears in a frame, its corresponding segmentation is saved as 'null.'
Since the most popular VIS dataset is YVIS, and many other VIS datasets like OVIS follow the exact same pattern, I thought it was a good idea. It also has the same logit as 'SingleInstanceSegmentation' and 'ImageInstanceSegmentation'. So, we can assume masks are 3D in VIS, with the extra dimension being time.
Anyway, some other datasets, like KITTI-MOTS, use the track-id format similar to what you have proposed.
Let me also implement KITTI-MOTS, and we can reconsider which format is better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks a lot for the explanation. I guess the important point is then what we will use as the internal format.
We designed labelformat to load and write based on an internal representation. Based on what you describe we could pick either one, YVIS or KITTI-MOTS.
I would personally prefer to use something well-defined as an internal representation and then when we load a dataset we convert to that format and might have to make assumptions.

For examples for object detection this internal format is composed of:

@dataclass(frozen=True)
class ImageObjectDetection:
    image: Image
    objects: List[SingleObjectDetection]

Which is composed of

@dataclass(frozen=True)
class Image:
    id: int
    filename: str
    width: int
    height: int

@dataclass(frozen=True)
class SingleObjectDetection:
    category: Category
    box: BoundingBox

@dataclass(frozen=True)
class BoundingBox:
    xmin: float
    ymin: float
    xmax: float
    ymax: float

So if we load YOLO format datasets we would need to extract the image width and height as well.

Now for video datasets we just need to find a suitable internal representation. The rest should then be straightforward. It's also not a big deal if we would have to change this in the future. As I'm not familiar with the formats I'd trust the pick on you :)

category: Category
segmentation: List[MultiPolygon]


@dataclass(frozen=True)
class VideoInstanceSegmentation:
video: Video
objects: List[SingleVideoInstanceSegmentation]


class VideoInstanceSegmentationInput(ABC):
@staticmethod
@abstractmethod
def add_cli_arguments(parser: ArgumentParser) -> None:
raise NotImplementedError()

@abstractmethod
def get_categories(self) -> Iterable[Category]:
raise NotImplementedError()

@abstractmethod
def get_videos(self) -> Iterable[str]:
raise NotImplementedError()

@abstractmethod
def get_labels(self) -> Iterable[VideoInstanceSegmentation]:
raise NotImplementedError()


class VideoInstanceSegmentationOutput(ABC):
@staticmethod
@abstractmethod
def add_cli_arguments(parser: ArgumentParser) -> None:
raise NotImplementedError()

def save(self, label_input: VideoInstanceSegmentationInput) -> None:
raise NotImplementedError()
Loading