diff --git a/src/hope_dedup_engine/apps/api/admin/jobs.py b/src/hope_dedup_engine/apps/api/admin/jobs.py index d14c495..360331d 100644 --- a/src/hope_dedup_engine/apps/api/admin/jobs.py +++ b/src/hope_dedup_engine/apps/api/admin/jobs.py @@ -6,5 +6,5 @@ @admin.register(DedupJob) -class AsyncJobAdmin(CeleryTaskModelAdmin): - pass +class DedupJobAdmin(CeleryTaskModelAdmin): + list_display = ["deduplication_set_id", "progress"] diff --git a/src/hope_dedup_engine/apps/api/deduplication/adapters.py b/src/hope_dedup_engine/apps/api/deduplication/adapters.py index 7c72c64..a6345bc 100644 --- a/src/hope_dedup_engine/apps/api/deduplication/adapters.py +++ b/src/hope_dedup_engine/apps/api/deduplication/adapters.py @@ -1,4 +1,4 @@ -from collections.abc import Generator +from collections.abc import Callable, Generator from typing import Any from hope_dedup_engine.apps.api.deduplication.registry import DuplicateKeyPair @@ -12,9 +12,12 @@ class DuplicateFaceFinder: weight = 1 def __init__(self, deduplication_set: DeduplicationSet): + self.tracker = None self.deduplication_set = deduplication_set - def run(self) -> Generator[DuplicateKeyPair, None, None]: + def run( + self, tracker: Callable[[int], None] | None = None + ) -> Generator[DuplicateKeyPair, None, None]: filename_to_reference_pk = { filename: reference_pk for reference_pk, filename in self.deduplication_set.image_set.values_list( @@ -28,7 +31,9 @@ def run(self) -> Generator[DuplicateKeyPair, None, None]: detector = DuplicationDetector( tuple[str](filename_to_reference_pk.keys()), ds_config ) - for first_filename, second_filename, distance in detector.find_duplicates(): + for first_filename, second_filename, distance in detector.find_duplicates( + tracker + ): yield filename_to_reference_pk[first_filename], filename_to_reference_pk[ second_filename ], 1 - distance diff --git a/src/hope_dedup_engine/apps/api/deduplication/process.py b/src/hope_dedup_engine/apps/api/deduplication/process.py index 5c187ed..bc64822 100644 --- a/src/hope_dedup_engine/apps/api/deduplication/process.py +++ b/src/hope_dedup_engine/apps/api/deduplication/process.py @@ -1,3 +1,6 @@ +from collections.abc import Callable +from functools import partial + from celery import shared_task from constance import config @@ -8,6 +11,7 @@ get_finders, ) from hope_dedup_engine.apps.api.models import DedupJob, DeduplicationSet, Duplicate +from hope_dedup_engine.apps.api.utils.progress import track_progress_multi def _sort_keys(pair: DuplicateKeyPair) -> DuplicateKeyPair: @@ -20,6 +24,7 @@ def _save_duplicates( deduplication_set: DeduplicationSet, lock_enabled: bool, lock: DeduplicationSetLock, + tracker: Callable[[int], None], ) -> None: reference_pk_to_filename_mapping = dict( deduplication_set.image_set.values_list("reference_pk", "filename") @@ -40,7 +45,7 @@ def _save_duplicates( deduplication_set.ignoredreferencepkpair_set.values_list("first", "second") ) - for first, second, score in map(_sort_keys, finder.run()): + for first, second, score in map(_sort_keys, finder.run(tracker)): first_filename, second_filename = sorted( ( reference_pk_to_filename_mapping[first], @@ -66,6 +71,11 @@ def _save_duplicates( HOUR = 60 * 60 +def update_job_progress(job: DedupJob, progress: int) -> None: + job.progress = progress + job.save() + + @shared_task(soft_time_limit=0.5 * HOUR, time_limit=1 * HOUR) def find_duplicates(dedup_job_id: int, version: int) -> None: dedup_job: DedupJob = DedupJob.objects.get(pk=dedup_job_id, version=version) @@ -87,8 +97,11 @@ def find_duplicates(dedup_job_id: int, version: int) -> None: Duplicate.objects.filter(deduplication_set=deduplication_set).delete() weight_total = 0 - for finder in get_finders(deduplication_set): - _save_duplicates(finder, deduplication_set, lock_enabled, lock) + for finder, tracker in zip( + get_finders(deduplication_set), + track_progress_multi(partial(update_job_progress, dedup_job)), + ): + _save_duplicates(finder, deduplication_set, lock_enabled, lock, tracker) weight_total += finder.weight for duplicate in deduplication_set.duplicate_set.all(): diff --git a/src/hope_dedup_engine/apps/api/deduplication/registry.py b/src/hope_dedup_engine/apps/api/deduplication/registry.py index 9886ab4..603261c 100644 --- a/src/hope_dedup_engine/apps/api/deduplication/registry.py +++ b/src/hope_dedup_engine/apps/api/deduplication/registry.py @@ -1,4 +1,4 @@ -from collections.abc import Generator, Iterable +from collections.abc import Callable, Generator, Iterable from typing import Protocol from hope_dedup_engine.apps.api.models import DeduplicationSet @@ -9,7 +9,9 @@ class DuplicateFinder(Protocol): weight: int - def run(self) -> Generator[DuplicateKeyPair, None, None]: ... + def run( + self, tracker: Callable[[int], None] + ) -> Generator[DuplicateKeyPair, None, None]: ... def get_finders(deduplication_set: DeduplicationSet) -> Iterable[DuplicateFinder]: diff --git a/src/hope_dedup_engine/apps/api/migrations/0011_dedupjob_progress.py b/src/hope_dedup_engine/apps/api/migrations/0011_dedupjob_progress.py new file mode 100644 index 0000000..577e926 --- /dev/null +++ b/src/hope_dedup_engine/apps/api/migrations/0011_dedupjob_progress.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0.7 on 2024-11-04 22:42 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0010_dedupjob"), + ] + + operations = [ + migrations.AddField( + model_name="dedupjob", + name="progress", + field=models.IntegerField(default=0), + ), + ] diff --git a/src/hope_dedup_engine/apps/api/models/jobs.py b/src/hope_dedup_engine/apps/api/models/jobs.py index 606dd23..58549c1 100644 --- a/src/hope_dedup_engine/apps/api/models/jobs.py +++ b/src/hope_dedup_engine/apps/api/models/jobs.py @@ -12,6 +12,7 @@ class DedupJob(CeleryTaskModel): "DeduplicationSet", on_delete=models.CASCADE, related_name="jobs" ) serialized_lock = models.CharField(max_length=128, null=True, editable=False) + progress = models.IntegerField(default=0) celery_task_name = ( "hope_dedup_engine.apps.api.deduplication.process.find_duplicates" diff --git a/src/hope_dedup_engine/apps/api/utils/progress.py b/src/hope_dedup_engine/apps/api/utils/progress.py new file mode 100644 index 0000000..74179f9 --- /dev/null +++ b/src/hope_dedup_engine/apps/api/utils/progress.py @@ -0,0 +1,50 @@ +from collections.abc import Callable, Generator +from functools import partial + +STEP = 10 + + +def callback_filter( + callback: Callable[[int], None], step: int +) -> Callable[[int], None]: + previous_callback_value = -1 + + def update(progress: int) -> None: + nonlocal previous_callback_value + if (callback_value := progress // step * step) != previous_callback_value: + callback(callback_value) + previous_callback_value = callback_value + + return update + + +def track_progress( + callback: Callable[[int], None], send_zero: bool = True +) -> Callable[[int], None]: + update = callback_filter(callback, STEP) + + if send_zero: + update(0) + + return update + + +def track_progress_multi( + callback: Callable[[int], None] +) -> Generator[Callable[[int], None], None, None]: + progress = [] + + update = callback_filter(callback, STEP) + + def individual_callback(value: int, index: int) -> None: + progress[index] = value + update(sum(progress) // len(progress)) + + send_zero = True + while True: + progress.append(0) + yield track_progress( + callback=partial(individual_callback, index=len(progress) - 1), + send_zero=send_zero, + ) + send_zero = False diff --git a/src/hope_dedup_engine/apps/faces/services/duplication_detector.py b/src/hope_dedup_engine/apps/faces/services/duplication_detector.py index e9cef4e..38191cb 100644 --- a/src/hope_dedup_engine/apps/faces/services/duplication_detector.py +++ b/src/hope_dedup_engine/apps/faces/services/duplication_detector.py @@ -1,5 +1,6 @@ import logging import os +from collections.abc import Callable from itertools import combinations from typing import Any, Generator @@ -107,7 +108,9 @@ def _existed_images_name(self) -> list[str]: ) return filenames - def find_duplicates(self) -> Generator[tuple[str, str, float], None, None]: + def find_duplicates( + self, tracker: Callable[[int], None] | None = None + ) -> Generator[tuple[str, str, float], None, None]: """ Finds duplicate images based on facial encodings and yields pairs of image paths with their minimum distance. @@ -125,7 +128,8 @@ def find_duplicates(self) -> Generator[tuple[str, str, float], None, None]: existed_images_name = self._existed_images_name() encodings_all = self._load_encodings_all() - for path1, path2 in combinations(existed_images_name, 2): + total_pairs = (n := len(existed_images_name)) * (n - 1) // 2 + for i, (path1, path2) in enumerate(combinations(existed_images_name, 2), 1): encodings1 = encodings_all.get(path1) encodings2 = encodings_all.get(path2) if encodings1 is None or encodings2 is None: @@ -144,6 +148,9 @@ def find_duplicates(self) -> Generator[tuple[str, str, float], None, None]: ): yield (path1, path2, round(min_distance, 5)) + if tracker: + tracker(100 * i // total_pairs) + except Exception as e: self.logger.exception( "Error finding duplicates for images %s", self.filenames diff --git a/tests/api/test_utils.py b/tests/api/test_utils.py index 2b809da..234929d 100644 --- a/tests/api/test_utils.py +++ b/tests/api/test_utils.py @@ -8,6 +8,7 @@ REQUEST_TIMEOUT, send_notification, ) +from hope_dedup_engine.apps.api.utils.progress import callback_filter @fixture @@ -44,3 +45,12 @@ def test_exception_is_sent_to_sentry( requests_get.side_effect = exception send_notification("https://example.com") sentry_sdk_capture_exception.assert_called_once_with(exception) + + +def test_callback_filter() -> None: + step = 10 + values = [] + update = callback_filter(lambda x: values.append(x), step) + for i in range(1, 101): + update(i) + assert values == list(range(0, 101, step))