Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow asynchronous annotation upload #509

Merged
merged 8 commits into from
Nov 21, 2024
10 changes: 7 additions & 3 deletions geti_sdk/geti.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,10 +713,12 @@ def create_single_task_project_from_dataset(
workspace_id=self.workspace_id,
annotation_reader=annotation_reader,
)
annotation_client.upload_annotations_for_images(images)
annotation_client.upload_annotations_for_images(images, max_threads=max_threads)

if len(videos) > 0:
annotation_client.upload_annotations_for_videos(videos)
annotation_client.upload_annotations_for_videos(
videos, max_threads=max_threads
)

configuration_client.set_project_auto_train(auto_train=enable_auto_train)
return project
Expand Down Expand Up @@ -854,7 +856,9 @@ def create_task_chain_project_from_dataset(
annotation_reader=reader,
)
annotation_client.upload_annotations_for_images(
images=images, append_annotations=append_annotations
images=images,
append_annotations=append_annotations,
max_threads=max_threads,
)
append_annotations = True
previous_task_type = task_type
Expand Down
3 changes: 2 additions & 1 deletion geti_sdk/import_export/import_export_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def upload_project_data(
if len(images) > 0:
annotation_client.upload_annotations_for_images(
images=images,
max_threads=max_threads,
)
if len(videos) > 0:
are_videos_processed = False
Expand All @@ -253,7 +254,7 @@ def upload_project_data(
are_videos_processed = uploaded_ids.issubset(project_video_ids)
time.sleep(1)
annotation_client.upload_annotations_for_videos(
videos=videos,
videos=videos, max_threads=max_threads
)

configuration_file = os.path.join(target_folder, "configuration.json")
Expand Down
56 changes: 44 additions & 12 deletions geti_sdk/rest_clients/annotation_clients/annotation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,19 @@ def get_latest_annotations_for_video(self, video: Video) -> List[AnnotationScene
return annotation_scenes

def upload_annotations_for_video(
self, video: Video, append_annotations: bool = False
self, video: Video, append_annotations: bool = False, max_threads: int = 5
):
"""
Upload annotations for a video. If append_annotations is set to True,
annotations will be appended to the existing annotations for the video in the
project. If set to False, existing annotations will be overwritten.

:param video: Video to upload annotations for
:param append_annotations:
:param append_annotations: True to append annotations from the local disk to
the existing annotations on the server, False to overwrite the server
annotations by those on the local disk.
:param max_threads: Maximum number of threads to use for uploading. Defaults to 5.
Set to -1 to use all available threads.
:return:
"""
annotation_filenames = self.annotation_reader.get_data_filenames()
Expand All @@ -83,27 +87,38 @@ def upload_annotations_for_video(
]
)
upload_count = self._upload_annotations_for_2d_media_list(
media_list=video_frames, append_annotations=append_annotations
media_list=video_frames,
append_annotations=append_annotations,
max_threads=max_threads,
)
return upload_count

def upload_annotations_for_videos(
self, videos: Sequence[Video], append_annotations: bool = False
self,
videos: Sequence[Video],
append_annotations: bool = False,
max_threads: int = 5,
):
"""
Upload annotations for a list of videos. If append_annotations is set to True,
annotations will be appended to the existing annotations for the video in the
project. If set to False, existing annotations will be overwritten.

:param videos: List of videos to upload annotations for
:param append_annotations:
:param append_annotations: True to append annotations from the local disk to
the existing annotations on the server, False to overwrite the server
annotations by those on the local disk.
:param max_threads: Maximum number of threads to use for uploading. Defaults to 5.
Set to -1 to use all available threads.
:return:
"""
logging.info("Starting video annotation upload...")
upload_count = 0
for video in videos:
upload_count += self.upload_annotations_for_video(
video=video, append_annotations=append_annotations
video=video,
append_annotations=append_annotations,
max_threads=max_threads,
)
if upload_count > 0:
logging.info(
Expand All @@ -113,20 +128,29 @@ def upload_annotations_for_videos(
logging.info("No new video frame annotations were found.")

def upload_annotations_for_images(
self, images: Sequence[Image], append_annotations: bool = False
self,
images: Sequence[Image],
append_annotations: bool = False,
max_threads: int = 5,
):
"""
Upload annotations for a list of images. If append_annotations is set to True,
annotations will be appended to the existing annotations for the image in the
project. If set to False, existing annotations will be overwritten.

:param images: List of images to upload annotations for
:param append_annotations:
:param append_annotations: True to append annotations from the local disk to
the existing annotations on the server, False to overwrite the server
annotations by those on the local disk.
:param max_threads: Maximum number of threads to use for uploading. Defaults to 5.
Set to -1 to use all available threads.
:return:
"""
logging.info("Starting image annotation upload...")
upload_count = self._upload_annotations_for_2d_media_list(
media_list=images, append_annotations=append_annotations
media_list=images,
append_annotations=append_annotations,
max_threads=max_threads,
)
if upload_count > 0:
logging.info(
Expand Down Expand Up @@ -271,7 +295,9 @@ def download_all_annotations(
max_threads=max_threads,
)

def upload_annotations_for_all_media(self, append_annotations: bool = False):
def upload_annotations_for_all_media(
self, append_annotations: bool = False, max_threads: int = 5
):
"""
Upload annotations for all media in the project, If append_annotations is set
to True, annotations will be appended to the existing annotations for the
Expand All @@ -280,16 +306,22 @@ def upload_annotations_for_all_media(self, append_annotations: bool = False):
:param append_annotations: True to append annotations from the local disk to
the existing annotations on the server, False to overwrite the server
annotations by those on the local disk. Defaults to False.
:param max_threads: Maximum number of threads to use for uploading. Defaults to 5.
Set to -1 to use all available threads.
"""
image_list = self._get_all_media_by_type(media_type=Image)
video_list = self._get_all_media_by_type(media_type=Video)
if len(image_list) > 0:
self.upload_annotations_for_images(
images=image_list, append_annotations=append_annotations
images=image_list,
append_annotations=append_annotations,
max_threads=max_threads,
)
if len(video_list) > 0:
self.upload_annotations_for_videos(
videos=video_list, append_annotations=append_annotations
videos=video_list,
append_annotations=append_annotations,
max_threads=max_threads,
)

def upload_annotation(
Expand Down
54 changes: 48 additions & 6 deletions geti_sdk/rest_clients/annotation_clients/base_annotation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from geti_sdk.data_models.containers.media_list import MediaList
from geti_sdk.data_models.label import Label
from geti_sdk.data_models.media import MediaInformation, MediaItem
from geti_sdk.http_session import GetiSession
from geti_sdk.http_session import GetiRequestException, GetiSession
from geti_sdk.rest_clients.dataset_client import DatasetClient
from geti_sdk.rest_converters import AnnotationRESTConverter

Expand Down Expand Up @@ -300,7 +300,10 @@ def _append_annotation_for_2d_media_item(
return annotation_scene

def _upload_annotations_for_2d_media_list(
self, media_list: Sequence[MediaItem], append_annotations: bool
self,
media_list: Sequence[MediaItem],
append_annotations: bool,
max_threads: int = 5,
) -> int:
"""
Upload annotations to the server.
Expand All @@ -310,12 +313,21 @@ def _upload_annotations_for_2d_media_list(
:param append_annotations: True to append annotations from the local disk to
the existing annotations on the server, False to overwrite the server
annotations by those on the local disk.
:param max_threads: Maximum number of threads to use for uploading. Defaults to 5.
Set to -1 to use all available threads.
:return: Returns the number of uploaded annotations.
"""
if max_threads <= 0:
# ThreadPoolExecutor will use minimum 5 threads for 1 core cpu
# and maximum 32 threads for multi-core cpu.
max_threads = None
upload_count = 0
skip_count = 0
tqdm_prefix = "Uploading media annotations"
with logging_redirect_tqdm(tqdm_class=tqdm):
for media_item in tqdm(media_list, desc=tqdm_prefix):

def upload_annotation(media_item: MediaItem) -> None:
nonlocal upload_count, skip_count
try:
if not append_annotations:
response = self._upload_annotation_for_2d_media_item(
media_item=media_item
Expand All @@ -324,8 +336,38 @@ def _upload_annotations_for_2d_media_list(
response = self._append_annotation_for_2d_media_item(
media_item=media_item
)
if response.annotations:
upload_count += 1
except GetiRequestException as error:
skip_count += 1
if error.status_code == 500:
logging.error(
f"Failed to upload annotation for {media_item.name}. "
)
return
else:
raise error
if response is not None:
upload_count += 1

t_start = time.time()
with ThreadPoolExecutor(max_workers=max_threads) as executor:
with logging_redirect_tqdm(tqdm_class=tqdm):
list(
tqdm(
executor.map(upload_annotation, media_list),
total=len(media_list),
desc=tqdm_prefix,
)
)

t_elapsed = time.time() - t_start
if upload_count > 0:
logging.info(
f"Uploaded {upload_count} annotations in {t_elapsed:.1f} seconds."
)
if skip_count > 0:
logging.info(
f"Skipped {skip_count} media items, unable to upload annotations."
)
return upload_count

def annotation_scene_from_rest_response(
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers/project_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def add_annotated_media(
]
# Upload annotations
self.annotation_client.upload_annotations_for_images(
images=images, append_annotations=task_index > 0
images=images, append_annotations=task_index > 0, max_threads=1
)

def set_auto_train(self, auto_train: bool = True) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_upload_and_retrieve_annotations_for_video(
if fxt_test_mode != SdkTestMode.OFFLINE:
time.sleep(1)

annotation_client.upload_annotations_for_video(video=video)
annotation_client.upload_annotations_for_video(video=video, max_threads=1)

if fxt_test_mode != SdkTestMode.OFFLINE:
time.sleep(1)
Expand Down Expand Up @@ -198,7 +198,9 @@ def test_upload_and_retrieve_annotations_for_videos(
annotation_client = fxt_project_service.annotation_client
annotation_client.annotation_reader = annotation_reader

annotation_client.upload_annotations_for_videos(videos=[video_1, video_2])
annotation_client.upload_annotations_for_videos(
videos=[video_1, video_2], max_threads=1
)

if fxt_test_mode != SdkTestMode.OFFLINE:
time.sleep(10)
Expand Down
Loading