Skip to content

Commit

Permalink
Make analytics report update job scheduling more efficient (#7576)
Browse files Browse the repository at this point in the history
Currently, `schedule_analytics_report_autoupdate_job` attempts to
debounce job scheduling by examining existing jobs before scheduling a
new one. Unfortunately, the `scheduler.get_jobs` function, which it uses
for this purpose, scales poorly. Not only does it fetch IDs of all
scheduled jobs (and not just ones related to the current object), but it
then fetches information about every job, one by one. The current logic
doesn't even need this information, but RQ Scheduler provides no method
to get just the IDs.

Replace the current logic with a new lightweight approach that uses a
custom Redis key to block scheduling of additional jobs.
  • Loading branch information
SpecLad authored Mar 11, 2024
1 parent cd5cd44 commit 009f9f8
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Fixed

- Made analytics report update job scheduling more efficient
(<https://github.com/opencv/cvat/pull/7576>)
68 changes: 22 additions & 46 deletions cvat/apps/analytics_report/report/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,20 @@ def _get_scheduler(self):
def _get_queue(self):
return django_rq.get_queue(settings.CVAT_QUEUES.ANALYTICS_REPORTS.value)

def _make_queue_job_prefix(self, obj) -> str:
def _make_queue_job_id_base(self, obj) -> str:
if isinstance(obj, Task):
return f"{self._QUEUE_JOB_PREFIX_TASK}{obj.id}-"
return f"{self._QUEUE_JOB_PREFIX_TASK}{obj.id}"
else:
return f"{self._QUEUE_JOB_PREFIX_PROJECT}{obj.id}-"
return f"{self._QUEUE_JOB_PREFIX_PROJECT}{obj.id}"

def _make_custom_analytics_check_job_id(self) -> str:
return uuid4().hex

def _make_initial_queue_job_id(self, obj) -> str:
return f"{self._make_queue_job_prefix(obj)}initial"
def _make_queue_job_id(self, obj, start_time: timezone.datetime) -> str:
return f"{self._make_queue_job_id_base(obj)}-{start_time.timestamp()}"

def _make_regular_queue_job_id(self, obj, start_time: timezone.datetime) -> str:
return f"{self._make_queue_job_prefix(obj)}{start_time.timestamp()}"
def _make_autoupdate_blocker_key(self, obj) -> str:
return f"cvat:analytics:autoupdate-blocker:{self._make_queue_job_id_base(obj)}"

@classmethod
def _get_last_report_time(cls, obj):
Expand All @@ -90,39 +90,6 @@ def _get_last_report_time(cls, obj):
except ObjectDoesNotExist:
return None

def _find_next_job_id(self, existing_job_ids, obj, *, now) -> str:
job_id_prefix = self._make_queue_job_prefix(obj)

def _get_timestamp(job_id: str) -> timezone.datetime:
job_timestamp = job_id.split(job_id_prefix, maxsplit=1)[-1]
if job_timestamp == "initial":
return timezone.datetime.min.replace(tzinfo=timezone.utc)
else:
return timezone.datetime.fromtimestamp(float(job_timestamp), tz=timezone.utc)

max_job_id = max(
(j for j in existing_job_ids if j.startswith(job_id_prefix)),
key=_get_timestamp,
default=None,
)
max_timestamp = _get_timestamp(max_job_id) if max_job_id else None

last_update_time = self._get_last_report_time(obj)
if last_update_time is None:
# Report has never been computed, is queued, or is being computed
queue_job_id = self._make_initial_queue_job_id(obj)
elif max_timestamp is not None and now < max_timestamp:
# Reuse the existing next job
queue_job_id = max_job_id
else:
# Add an updating job in the queue in the next time frame
delay = self._get_analytics_check_job_delay()
intervals = max(1, 1 + (now - last_update_time) // delay)
next_update_time = last_update_time + delay * intervals
queue_job_id = self._make_regular_queue_job_id(obj, next_update_time)

return queue_job_id

class AnalyticsReportsNotAvailable(Exception):
pass

Expand All @@ -133,9 +100,6 @@ def schedule_analytics_report_autoupdate_job(self, *, job=None, task=None, proje
delay = self._get_analytics_check_job_delay()
next_job_time = now.utcnow() + delay

scheduler = self._get_scheduler()
existing_job_ids = set(j.id for j in scheduler.get_jobs(until=next_job_time))

target_obj = None
cvat_project_id = None
cvat_task_id = None
Expand All @@ -157,16 +121,28 @@ def schedule_analytics_report_autoupdate_job(self, *, job=None, task=None, proje
target_obj = project
cvat_project_id = project.id

queue_job_id = self._find_next_job_id(existing_job_ids, target_obj, now=now)
if queue_job_id not in existing_job_ids:
scheduler.enqueue_at(
with django_rq.get_connection(settings.CVAT_QUEUES.ANALYTICS_REPORTS.value) as connection:
# The blocker key is used to avoid scheduling a report update job
# for every single change. The first time this method is called
# for a given object, we schedule the job and create a blocker
# that expires at the same time as the job is supposed to start.
# Until the blocker expires, we don't schedule any more jobs.
blocker_key = self._make_autoupdate_blocker_key(target_obj)
if connection.exists(blocker_key):
return

queue_job_id = self._make_queue_job_id(target_obj, next_job_time)

self._get_scheduler().enqueue_at(
next_job_time,
self._check_analytics_report,
cvat_task_id=cvat_task_id,
cvat_project_id=cvat_project_id,
job_id=queue_job_id,
)

connection.set(blocker_key, queue_job_id, exat=next_job_time)

def schedule_analytics_check_job(self, *, job=None, task=None, project=None, user_id):
rq_id = self._make_custom_analytics_check_job_id()

Expand Down

0 comments on commit 009f9f8

Please sign in to comment.