Skip to content

Commit

Permalink
Add progress tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
sergey-misuk-valor committed Nov 5, 2024
1 parent 67aaaad commit 5b2bd20
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/hope_dedup_engine/apps/api/admin/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@


@admin.register(DedupJob)
class AsyncJobAdmin(CeleryTaskModelAdmin):
pass
class DedupJobAdmin(CeleryTaskModelAdmin):
list_display = ["deduplication_set_id", "progress"]
11 changes: 8 additions & 3 deletions src/hope_dedup_engine/apps/api/deduplication/adapters.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Generator
from collections.abc import Callable, Generator
from typing import Any

from hope_dedup_engine.apps.api.deduplication.registry import DuplicateKeyPair
Expand All @@ -12,9 +12,12 @@ class DuplicateFaceFinder:
weight = 1

def __init__(self, deduplication_set: DeduplicationSet):
self.tracker = None
self.deduplication_set = deduplication_set

def run(self) -> Generator[DuplicateKeyPair, None, None]:
def run(
self, tracker: Callable[[int], None] | None = None
) -> Generator[DuplicateKeyPair, None, None]:
filename_to_reference_pk = {
filename: reference_pk
for reference_pk, filename in self.deduplication_set.image_set.values_list(
Expand All @@ -28,7 +31,9 @@ def run(self) -> Generator[DuplicateKeyPair, None, None]:
detector = DuplicationDetector(
tuple[str](filename_to_reference_pk.keys()), ds_config
)
for first_filename, second_filename, distance in detector.find_duplicates():
for first_filename, second_filename, distance in detector.find_duplicates(
tracker
):
yield filename_to_reference_pk[first_filename], filename_to_reference_pk[
second_filename
], 1 - distance
19 changes: 16 additions & 3 deletions src/hope_dedup_engine/apps/api/deduplication/process.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from collections.abc import Callable
from functools import partial

from celery import shared_task
from constance import config

Expand All @@ -8,6 +11,7 @@
get_finders,
)
from hope_dedup_engine.apps.api.models import DedupJob, DeduplicationSet, Duplicate
from hope_dedup_engine.apps.api.utils.progress import track_progress_multi


def _sort_keys(pair: DuplicateKeyPair) -> DuplicateKeyPair:
Expand All @@ -20,6 +24,7 @@ def _save_duplicates(
deduplication_set: DeduplicationSet,
lock_enabled: bool,
lock: DeduplicationSetLock,
tracker: Callable[[int], None],
) -> None:
reference_pk_to_filename_mapping = dict(
deduplication_set.image_set.values_list("reference_pk", "filename")
Expand All @@ -40,7 +45,7 @@ def _save_duplicates(
deduplication_set.ignoredreferencepkpair_set.values_list("first", "second")
)

for first, second, score in map(_sort_keys, finder.run()):
for first, second, score in map(_sort_keys, finder.run(tracker)):
first_filename, second_filename = sorted(
(
reference_pk_to_filename_mapping[first],
Expand All @@ -66,6 +71,11 @@ def _save_duplicates(
HOUR = 60 * 60


def update_job_progress(job: DedupJob, progress: int) -> None:
job.progress = progress
job.save()


@shared_task(soft_time_limit=0.5 * HOUR, time_limit=1 * HOUR)
def find_duplicates(dedup_job_id: int, version: int) -> None:
dedup_job: DedupJob = DedupJob.objects.get(pk=dedup_job_id, version=version)
Expand All @@ -87,8 +97,11 @@ def find_duplicates(dedup_job_id: int, version: int) -> None:
Duplicate.objects.filter(deduplication_set=deduplication_set).delete()

weight_total = 0
for finder in get_finders(deduplication_set):
_save_duplicates(finder, deduplication_set, lock_enabled, lock)
for finder, tracker in zip(
get_finders(deduplication_set),
track_progress_multi(partial(update_job_progress, dedup_job)),
):
_save_duplicates(finder, deduplication_set, lock_enabled, lock, tracker)
weight_total += finder.weight

for duplicate in deduplication_set.duplicate_set.all():
Expand Down
6 changes: 4 additions & 2 deletions src/hope_dedup_engine/apps/api/deduplication/registry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Generator, Iterable
from collections.abc import Callable, Generator, Iterable
from typing import Protocol

from hope_dedup_engine.apps.api.models import DeduplicationSet
Expand All @@ -9,7 +9,9 @@
class DuplicateFinder(Protocol):
weight: int

def run(self) -> Generator[DuplicateKeyPair, None, None]: ...
def run(
self, tracker: Callable[[int], None]
) -> Generator[DuplicateKeyPair, None, None]: ...


def get_finders(deduplication_set: DeduplicationSet) -> Iterable[DuplicateFinder]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.0.7 on 2024-11-04 22:42

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("api", "0010_dedupjob"),
]

operations = [
migrations.AddField(
model_name="dedupjob",
name="progress",
field=models.IntegerField(default=0),
),
]
1 change: 1 addition & 0 deletions src/hope_dedup_engine/apps/api/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class DedupJob(CeleryTaskModel):
"DeduplicationSet", on_delete=models.CASCADE, related_name="jobs"
)
serialized_lock = models.CharField(max_length=128, null=True, editable=False)
progress = models.IntegerField(default=0)

celery_task_name = (
"hope_dedup_engine.apps.api.deduplication.process.find_duplicates"
Expand Down
50 changes: 50 additions & 0 deletions src/hope_dedup_engine/apps/api/utils/progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from collections.abc import Callable, Generator
from functools import partial

STEP = 10


def callback_filter(
callback: Callable[[int], None], step: int
) -> Callable[[int], None]:
previous_callback_value = -1

def update(progress: int) -> None:
nonlocal previous_callback_value
if (callback_value := progress // step * step) != previous_callback_value:
callback(callback_value)
previous_callback_value = callback_value

return update


def track_progress(
callback: Callable[[int], None], send_zero: bool = True
) -> Callable[[int], None]:
update = callback_filter(callback, STEP)

if send_zero:
update(0)

return update


def track_progress_multi(
callback: Callable[[int], None]
) -> Generator[Callable[[int], None], None, None]:
progress = []

update = callback_filter(callback, STEP)

def individual_callback(value: int, index: int) -> None:
progress[index] = value
update(sum(progress) // len(progress))

send_zero = True
while True:
progress.append(0)
yield track_progress(
callback=partial(individual_callback, index=len(progress) - 1),
send_zero=send_zero,
)
send_zero = False
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
from collections.abc import Callable
from itertools import combinations
from typing import Any, Generator

Expand Down Expand Up @@ -107,7 +108,9 @@ def _existed_images_name(self) -> list[str]:
)
return filenames

def find_duplicates(self) -> Generator[tuple[str, str, float], None, None]:
def find_duplicates(
self, tracker: Callable[[int], None] | None = None
) -> Generator[tuple[str, str, float], None, None]:
"""
Finds duplicate images based on facial encodings and yields pairs of image paths with their minimum distance.
Expand All @@ -125,7 +128,8 @@ def find_duplicates(self) -> Generator[tuple[str, str, float], None, None]:
existed_images_name = self._existed_images_name()
encodings_all = self._load_encodings_all()

for path1, path2 in combinations(existed_images_name, 2):
total_pairs = (n := len(existed_images_name)) * (n - 1) // 2
for i, (path1, path2) in enumerate(combinations(existed_images_name, 2), 1):
encodings1 = encodings_all.get(path1)
encodings2 = encodings_all.get(path2)
if encodings1 is None or encodings2 is None:
Expand All @@ -144,6 +148,9 @@ def find_duplicates(self) -> Generator[tuple[str, str, float], None, None]:
):
yield (path1, path2, round(min_distance, 5))

if tracker:
tracker(100 * i // total_pairs)

except Exception as e:
self.logger.exception(
"Error finding duplicates for images %s", self.filenames
Expand Down
10 changes: 10 additions & 0 deletions tests/api/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
REQUEST_TIMEOUT,
send_notification,
)
from hope_dedup_engine.apps.api.utils.progress import callback_filter


@fixture
Expand Down Expand Up @@ -44,3 +45,12 @@ def test_exception_is_sent_to_sentry(
requests_get.side_effect = exception
send_notification("https://example.com")
sentry_sdk_capture_exception.assert_called_once_with(exception)


def test_callback_filter() -> None:
step = 10
values = []
update = callback_filter(lambda x: values.append(x), step)
for i in range(1, 101):
update(i)
assert values == list(range(0, 101, step))

0 comments on commit 5b2bd20

Please sign in to comment.