Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger Geopoint ES Index on Geospatial Feature Flag Enable #35126

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4b32aec
extend task tracker to handle messages
zandre-eng Sep 12, 2024
6280c17
refactor index command functions
zandre-eng Sep 12, 2024
7f52f0e
helper func to get celery task tracker
zandre-eng Sep 12, 2024
2184e76
start celery task to index docs on enabling feature flag
zandre-eng Sep 12, 2024
d7abcee
send task message to front-end
zandre-eng Sep 12, 2024
5409b02
display task message
zandre-eng Sep 12, 2024
177b1b3
unit tests
zandre-eng Sep 13, 2024
9deab96
setting for max index doc limit
zandre-eng Sep 16, 2024
b647463
error status for task helper
zandre-eng Sep 16, 2024
196ffd3
simplify tracker keys
zandre-eng Sep 17, 2024
6906d82
use progress instead of messages
zandre-eng Sep 17, 2024
ca4ad45
update unit tests
zandre-eng Sep 17, 2024
128757f
fix incorrect unit test assert
zandre-eng Sep 17, 2024
edb3a61
use local references
zandre-eng Sep 20, 2024
a4c7bb5
remove unused reference
zandre-eng Sep 20, 2024
707a97d
increase timeouts
zandre-eng Sep 20, 2024
ded63ee
keep track of error slug to show different messages
zandre-eng Sep 23, 2024
a161b10
move function
zandre-eng Oct 1, 2024
a030555
Merge remote-tracking branch 'origin/ze/trigger-es-index-geospatial-e…
ajeety4 Oct 9, 2024
29ec236
Merge branch 'master' into ze/trigger-es-index-geospatial-enable
zandre-eng Oct 17, 2024
037e5f2
Merge branch 'master' into ze/trigger-es-index-geospatial-enable
zandre-eng Oct 21, 2024
0724d43
fix import
zandre-eng Oct 21, 2024
d96c1d4
add notify_exception for processing failure
zandre-eng Oct 24, 2024
e1d82cc
make progress output optional
zandre-eng Oct 30, 2024
5137009
use geospatial queue
zandre-eng Nov 6, 2024
dd28ee8
account for batches that are smaller than query limit
zandre-eng Nov 6, 2024
1581ca5
add offset for query
zandre-eng Nov 6, 2024
cbe08cf
Merge branch 'master' into ze/trigger-es-index-geospatial-enable
zandre-eng Nov 6, 2024
e1ef27a
remove refreshing index
zandre-eng Nov 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions corehq/apps/geospatial/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,5 @@
}
}
}

INDEX_ES_TASK_HELPER_BASE_KEY = 'geo_cases_index_cases'
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,23 @@ def index_case_docs(domain, query_limit=DEFAULT_QUERY_LIMIT, chunk_size=DEFAULT_
query = _es_case_query(domain, geo_case_property, case_type)
count = query.count()
print(f'{count} case(s) to process')
batch_count = 1
if query_limit:
batch_count = math.ceil(count / query_limit)
batch_count = get_batch_count(count, query_limit)
print(f"Cases will be processed in {batch_count} batches")
for i in range(batch_count):
print(f'Processing {i+1}/{batch_count}')
query = _es_case_query(domain, geo_case_property, case_type, size=query_limit)
case_ids = query.get_ids()
_index_case_ids(domain, case_ids, chunk_size)
process_batch(domain, geo_case_property, case_type, query_limit, chunk_size)


def get_batch_count(doc_count, query_limit):
if not query_limit:
return 1
return math.ceil(doc_count / query_limit)


def process_batch(domain, geo_case_property, case_type, query_limit, chunk_size):
query = _es_case_query(domain, geo_case_property, case_type, size=query_limit)
case_ids = query.get_ids()
_index_case_ids(domain, case_ids, chunk_size)


def _index_case_ids(domain, case_ids, chunk_size):
Expand Down
56 changes: 55 additions & 1 deletion corehq/apps/geospatial/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
from django.utils.translation import gettext as _

from corehq.util.decorators import serial_task

from corehq.apps.celery import task
from corehq.apps.geospatial.utils import CeleryTaskTracker, update_cases_owner
from corehq.apps.geospatial.const import INDEX_ES_TASK_HELPER_BASE_KEY
from corehq.apps.geospatial.utils import (
get_celery_task_tracker,
CeleryTaskTracker,
update_cases_owner,
get_geo_case_property,
)
from corehq.apps.geospatial.management.commands.index_geolocation_case_properties import (
Copy link
Contributor

@kaapstorm kaapstorm Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It feels awkward for the tasks module to be importing from a management command, instead of the other way round (or both importing from somewhere else, but I'd prefer not to throw everything in the utils module, like that second drawer in the kitchen that somehow ends up not only with tongs and skewers, but also clothes pegs, and screws, and one rusty paper clip).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a good alternative would be to move the es_case_query function off to the es.py file as I feel it would fit well there. This can then be renamed to something like es_case_query_for_missing_geopoint_val or something to that effect.

Alternatively, I can also simply create a file in a new "utils" directory to contain just the above helper function. I do feel the first option makes sense enough and is a bit more straightforward, but happy to go in this direction as well if you think it sounds more suitable @kaapstorm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The es module sounds ideal for es_case_query! Yeah, I agree, the function is worth renaming too, and you could drop the "es_" prefix because we would know from its module that it would be an Elasticsearch query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in a161b10.

_es_case_query,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're importing a protected function somewhere other than a test, then it's possibly no longer being treated as protected and you should drop the leading underscore. -- This is a guideline, not a rule, so I'm not requesting a change, and I'll leave it to your discretion ... but interesting that the linter didn't flag this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I have always been a little unsure as to how strict we should be when it comes to marking functions in Python as private. Is having simply a single external reference enough justification, or do we need to start referencing it a few times before it becomes obvious that this is clearly a public function (I have seen plenty of examples in HQ that use both examples)? I suppose this is the potential downside of having it more as a guideline than a rule, it can be difficult to gauge where the line lies at times.

Thinking this through however, the former does make more sense since even a single external reference clearly means its not private/self-contained anymore. Given this, I agree it would make sense to drop the _ here.

get_batch_count,
process_batch,
DEFAULT_QUERY_LIMIT,
DEFAULT_CHUNK_SIZE,
)

from settings import MAX_GEOSPATIAL_INDEX_DOC_LIMIT


@task(queue="background_queue", ignore_result=True)
Expand All @@ -9,3 +28,38 @@ def geo_cases_reassignment_update_owners(domain, case_owner_updates_dict, task_k
finally:
celery_task_tracker = CeleryTaskTracker(task_key)
celery_task_tracker.mark_completed()


@serial_task('async-index-es-docs', timeout=30 * 60, queue='background_queue', ignore_result=True)
ajeety4 marked this conversation as resolved.
Show resolved Hide resolved
def index_es_docs_with_location_props(domain):
celery_task_tracker = get_celery_task_tracker(domain, INDEX_ES_TASK_HELPER_BASE_KEY)
if celery_task_tracker.is_active():
return

geo_case_prop = get_geo_case_property(domain)
query = _es_case_query(domain, geo_case_prop)
doc_count = query.count()
if doc_count > MAX_GEOSPATIAL_INDEX_DOC_LIMIT:
celery_task_tracker.set_message(
_('This domain contains too many cases and so they will not be made available '
'for use by this feature. Please reach out to support.')
)
return

celery_task_tracker.mark_requested()
batch_count = get_batch_count(doc_count, DEFAULT_QUERY_LIMIT)
try:
for i in range(batch_count):
progress = (i / batch_count) * 100
celery_task_tracker.set_message(
_(f'Cases are being made ready for use by this feature. Please be patient. ({progress}%)')
AmitPhulera marked this conversation as resolved.
Show resolved Hide resolved
)
process_batch(
domain,
geo_case_prop,
case_type=None,
query_limit=DEFAULT_QUERY_LIMIT,
chunk_size=DEFAULT_CHUNK_SIZE,
)
finally:
Copy link
Contributor

@ajeety4 ajeety4 Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking of a situation when an exception occurs while processing the cases. This would mark the task as completed in the tracker.
I think it would be good to handle this scenario.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the same logic as the task to async update owners. If we want to be a bit more cautious here, we could do a generic Exception catch, and then mark the tracker as having an error. I would need to slightly think through the tracker's current usage though, as we have no way of determining what error message to show when.

Copy link
Contributor

@ajeety4 ajeety4 Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is more of a safeguard thing. Good point about the task to async update owners. I think it would have been good to have the error marked and stored for that as well. Not very sure what norm is followed in HQ, however a quick search shows handling exception is a case by case basis.

I would recommend for this indexing task considering if it throws a exception, the pending cases would never be processed and will not be available for the usage by the feature.
Agreed, that the tracker would need to be updated to store error message as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, handling an exception here more gracefully would be a good idea. Thinking through this however, I'm a little unsure about trying to store a message in the redis cache again. We would need to store the raw string in redis and then translate it when rendering it on the front-end, and I'm not entirely confident or sure whether pulling a string from the redis cache into a variable and then trying to translate it would work correctly.

@ajeety4 What do you think of extending the status system to have the ability for custom error statuses instead? For the mark_as_error function we could pass in an optional slug to append to the end of the "ERROR" string (e.g. "ERROR_CELERY"). Having different error statuses would allow us to know which message to show on the front-end.

Copy link
Contributor

@ajeety4 ajeety4 Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely confident or sure whether pulling a string from the redis cache into a variable and then trying to translate it would work correctly.

Great catch. You are right on this, As per django docs,
The caveat with using variables or computed values, as in the previous two examples, is that Django’s translation-string-detecting utility, django-admin makemessages, won’t be able to find these strings

What do you think of extending the status system to have the ability for custom error statuses instead? For the mark_as_error function we could pass in an optional slug to append to the end of the "ERROR" string (e.g. "ERROR_CELERY")

This is a good idea. I feel like a cleaner approach would be to use a separate key error_slug instead of using the task_key while marking it is an error. This way it keeps the choices for the task_key predictable while giving the flexibility to the consumer to set error_slug of their choice.
That being said, I am good with initial approach if this makes things complicated.

Copy link
Contributor Author

@zandre-eng zandre-eng Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find! Doing message strings is definitely out then.

use a separate key error_slug instead of using the task_key while marking it is an error

Do you mean that the task_key would then only have "ACTIVE" or "ERROR" as its states and then we would keep the error_slug as a separate field, combining the two if the task_key is an error? Something like:

def get_status(self):
    status = self._client.get(self.task_key)
    if status == 'ERROR':
        status += f'_{self._client.get(self.error_slug))'
    return {
        'status': status,
        'progress': self.get_progress(),
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes correct. What do you think of returning the error_slug as well as shown below ? This would not then require to append or deappend a prefix

def get_status(self):
    status = self._client.get(self.task_key)
    return {
        'status': status,
        'error_slug': self._client.get(self.error_slug) if status == 'ERROR' else None,
        'progress': self.get_progress(),
    }


def mark_as_error(self, error_slug=None, timeout=ONE_DAY * 3):
    if error_slug:
        self._client.set(self.error_slug_key, error_slug)
    return self._client.set(self.task_key, 'ERROR', timeout=timeout)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea, it would then keep it completely clean from the task_key. This looks like quite a small lift, so I'll implement as such.

Copy link
Contributor Author

@zandre-eng zandre-eng Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in ded63ee.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3 Great discussion.

celery_task_tracker.mark_completed()
19 changes: 18 additions & 1 deletion corehq/apps/geospatial/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ def get_geo_user_property(domain):
return config.user_location_property_name


def get_celery_task_tracker(domain, base_key):
AmitPhulera marked this conversation as resolved.
Show resolved Hide resolved
task_key = f'{base_key}_{domain}'
message_key = f'{base_key}_message_{domain}'
AmitPhulera marked this conversation as resolved.
Show resolved Hide resolved
return CeleryTaskTracker(task_key, message_key)


def _format_coordinates(lat, lon):
return f"{lat} {lon} 0.0 0.0"

Expand Down Expand Up @@ -221,8 +227,9 @@ class CeleryTaskTracker(object):
Simple Helper class using redis to track if a celery task was requested and is not completed yet.
"""

def __init__(self, task_key):
def __init__(self, task_key, message_key=None):
AmitPhulera marked this conversation as resolved.
Show resolved Hide resolved
self.task_key = task_key
self.message_key = message_key
self._client = get_redis_client()

def mark_requested(self, timeout=ONE_DAY):
Expand All @@ -234,4 +241,14 @@ def is_active(self):
return self._client.has_key(self.task_key)

def mark_completed(self):
self.clear_message()
return self._client.delete(self.task_key)

def get_message(self):
return self._client.get(self.message_key)

def set_message(self, message, timeout=ONE_DAY * 3):
return self._client.set(self.message_key, message, timeout=timeout)

def clear_message(self):
return self._client.delete(self.message_key)
11 changes: 10 additions & 1 deletion corehq/toggles/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2555,13 +2555,22 @@ def _handle_attendance_tracking_role(domain, is_enabled):
save_fn=_handle_attendance_tracking_role,
)


def _handle_geospatial_es_index(domain, is_enabled):
from corehq.apps.geospatial.es import index_es_docs_with_location_props

if is_enabled:
index_es_docs_with_location_props.delay(domain)


GEOSPATIAL = StaticToggle(
'geospatial',
'Allows access to GIS functionality',
TAG_SOLUTIONS_LIMITED,
namespaces=[NAMESPACE_DOMAIN],
description='Additional views will be added allowing for visually viewing '
'and assigning cases on a map.'
'and assigning cases on a map.',
save_fn=_handle_geospatial_es_index,

)

Expand Down