Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger Geopoint ES Index on Geospatial Feature Flag Enable #35126

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4b32aec
extend task tracker to handle messages
zandre-eng Sep 12, 2024
6280c17
refactor index command functions
zandre-eng Sep 12, 2024
7f52f0e
helper func to get celery task tracker
zandre-eng Sep 12, 2024
2184e76
start celery task to index docs on enabling feature flag
zandre-eng Sep 12, 2024
d7abcee
send task message to front-end
zandre-eng Sep 12, 2024
5409b02
display task message
zandre-eng Sep 12, 2024
177b1b3
unit tests
zandre-eng Sep 13, 2024
9deab96
setting for max index doc limit
zandre-eng Sep 16, 2024
b647463
error status for task helper
zandre-eng Sep 16, 2024
196ffd3
simplify tracker keys
zandre-eng Sep 17, 2024
6906d82
use progress instead of messages
zandre-eng Sep 17, 2024
ca4ad45
update unit tests
zandre-eng Sep 17, 2024
128757f
fix incorrect unit test assert
zandre-eng Sep 17, 2024
edb3a61
use local references
zandre-eng Sep 20, 2024
a4c7bb5
remove unused reference
zandre-eng Sep 20, 2024
707a97d
increase timeouts
zandre-eng Sep 20, 2024
ded63ee
keep track of error slug to show different messages
zandre-eng Sep 23, 2024
a161b10
move function
zandre-eng Oct 1, 2024
a030555
Merge remote-tracking branch 'origin/ze/trigger-es-index-geospatial-e…
ajeety4 Oct 9, 2024
29ec236
Merge branch 'master' into ze/trigger-es-index-geospatial-enable
zandre-eng Oct 17, 2024
037e5f2
Merge branch 'master' into ze/trigger-es-index-geospatial-enable
zandre-eng Oct 21, 2024
0724d43
fix import
zandre-eng Oct 21, 2024
d96c1d4
add notify_exception for processing failure
zandre-eng Oct 24, 2024
e1d82cc
make progress output optional
zandre-eng Oct 30, 2024
5137009
use geospatial queue
zandre-eng Nov 6, 2024
dd28ee8
account for batches that are smaller than query limit
zandre-eng Nov 6, 2024
1581ca5
add offset for query
zandre-eng Nov 6, 2024
cbe08cf
Merge branch 'master' into ze/trigger-es-index-geospatial-enable
zandre-eng Nov 6, 2024
e1ef27a
remove refreshing index
zandre-eng Nov 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions corehq/apps/geospatial/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,4 @@

# Case property to identify cases assigned through disbursement on the Case Management Page
ASSIGNED_VIA_DISBURSEMENT_CASE_PROPERTY = 'commcare_assigned_via_disbursement'
INDEX_ES_TASK_HELPER_BASE_KEY = 'geo_cases_index_cases'
40 changes: 37 additions & 3 deletions corehq/apps/geospatial/es.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from math import ceil

from corehq.apps.case_search.const import CASE_PROPERTIES_PATH
from corehq.apps.es import filters
from corehq.apps.es import filters, queries, CaseSearchES
from corehq.apps.es.aggregations import (
FilterAggregation,
GeoBoundsAggregation,
GeohashGridAggregation,
NestedAggregation,
)
from corehq.apps.es.case_search import PROPERTY_GEOPOINT_VALUE, PROPERTY_KEY
from corehq.apps.es.case_search import (
CASE_PROPERTIES_PATH,
PROPERTY_GEOPOINT_VALUE,
PROPERTY_KEY,
)
from corehq.apps.geospatial.const import MAX_GEOHASH_DOC_COUNT

CASE_PROPERTIES_AGG = 'case_properties'
Expand Down Expand Up @@ -131,3 +134,34 @@ def mid(lower, upper):
"""
assert lower <= upper
return ceil(lower + (upper - lower) / 2)


def case_query_for_missing_geopoint_val(domain, geo_case_property, case_type=None, size=None, offset=0):
query = (
CaseSearchES()
.domain(domain)
.filter(_geopoint_value_missing_for_property(geo_case_property))
)
if case_type:
query = query.case_type(case_type)
query.sort('opened_on')
query.start(offset)
if size:
query = query.size(size)
return query


def _geopoint_value_missing_for_property(geo_case_property_name):
"""
Query to find docs with missing 'geopoint_value' for the given case property.
"""
return queries.nested(
CASE_PROPERTIES_PATH,
queries.filtered(
queries.match_all(),
filters.AND(
filters.term(PROPERTY_KEY, geo_case_property_name),
filters.missing(PROPERTY_GEOPOINT_VALUE)
)
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,8 @@

from dimagi.utils.chunked import chunked

from corehq.apps.es import CaseSearchES, case_search_adapter, filters, queries
from corehq.apps.es.case_search import (
CASE_PROPERTIES_PATH,
PROPERTY_GEOPOINT_VALUE,
PROPERTY_KEY,
)
from corehq.apps.es.client import manager
from corehq.apps.es import case_search_adapter
from corehq.apps.geospatial.es import case_query_for_missing_geopoint_val
from corehq.apps.geospatial.utils import get_geo_case_property
from corehq.form_processor.models import CommCareCase
from corehq.util.log import with_progress_bar
Expand Down Expand Up @@ -38,51 +33,35 @@ def handle(self, *args, **options):

def index_case_docs(domain, query_limit=DEFAULT_QUERY_LIMIT, chunk_size=DEFAULT_CHUNK_SIZE, case_type=None):
geo_case_property = get_geo_case_property(domain)
query = _es_case_query(domain, geo_case_property, case_type)
query = case_query_for_missing_geopoint_val(domain, geo_case_property, case_type)
count = query.count()
print(f'{count} case(s) to process')
batch_count = 1
if query_limit:
batch_count = math.ceil(count / query_limit)
batch_count = get_batch_count(count, query_limit)
print(f"Cases will be processed in {batch_count} batches")
for i in range(batch_count):
print(f'Processing {i+1}/{batch_count}')
query = _es_case_query(domain, geo_case_property, case_type, size=query_limit)
case_ids = query.get_ids()
_index_case_ids(domain, case_ids, chunk_size)
process_batch(domain, geo_case_property, case_type, query_limit, chunk_size, with_progress=True)


def _index_case_ids(domain, case_ids, chunk_size):
for case_id_chunk in chunked(with_progress_bar(case_ids), chunk_size):
case_chunk = CommCareCase.objects.get_cases(list(case_id_chunk), domain)
case_search_adapter.bulk_index(case_chunk)
manager.index_refresh(case_search_adapter.index_name)
def get_batch_count(doc_count, query_limit):
if not query_limit:
return 1
return math.ceil(doc_count / query_limit)


def _es_case_query(domain, geo_case_property, case_type=None, size=None):
query = (
CaseSearchES()
.domain(domain)
.filter(_geopoint_value_missing_for_property(geo_case_property))
)
if case_type:
query = query.case_type(case_type)
if size:
query = query.size(size)
return query


def _geopoint_value_missing_for_property(geo_case_property_name):
"""
Query to find docs with missing 'geopoint_value' for the given case property.
"""
return queries.nested(
CASE_PROPERTIES_PATH,
queries.filtered(
queries.match_all(),
filters.AND(
filters.term(PROPERTY_KEY, geo_case_property_name),
filters.missing(PROPERTY_GEOPOINT_VALUE)
)
)
def process_batch(domain, geo_case_property, case_type, query_limit, chunk_size, with_progress=False, offset=0):
query = case_query_for_missing_geopoint_val(
domain, geo_case_property, case_type, size=query_limit, offset=offset
)
case_ids = query.get_ids()
_index_case_ids(domain, case_ids, chunk_size, with_progress)


def _index_case_ids(domain, case_ids, chunk_size, with_progress):
if with_progress:
ids = with_progress_bar(case_ids)
else:
ids = case_ids
for case_id_chunk in chunked(ids, chunk_size):
case_chunk = CommCareCase.objects.get_cases(list(case_id_chunk), domain)
case_search_adapter.bulk_index(case_chunk)
4 changes: 4 additions & 0 deletions corehq/apps/geospatial/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from corehq.util.quickcache import quickcache

from .dispatchers import CaseManagementMapDispatcher
from corehq.apps.geospatial.const import INDEX_ES_TASK_HELPER_BASE_KEY
from .es import (
BUCKET_CASES_AGG,
CASE_PROPERTIES_AGG,
Expand All @@ -38,6 +39,7 @@
geojson_to_es_geoshape,
get_geo_case_property,
validate_geometry,
get_celery_task_tracker,
)


Expand All @@ -59,12 +61,14 @@ class BaseCaseMapReport(ProjectReport, CaseListMixin, XpathCaseSearchFilterMixin
def template_context(self):
# Whatever is specified here can be accessed through initial_page_data
context = super(BaseCaseMapReport, self).template_context
celery_task_tracker = get_celery_task_tracker(self.domain, task_slug=INDEX_ES_TASK_HELPER_BASE_KEY)
context.update({
'mapbox_access_token': settings.MAPBOX_ACCESS_TOKEN,
'saved_polygons': [
{'id': p.id, 'name': p.name, 'geo_json': p.geo_json}
for p in GeoPolygon.objects.filter(domain=self.domain).all()
],
'task_status': celery_task_tracker.get_status(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥇

})
return context

Expand Down
5 changes: 2 additions & 3 deletions corehq/apps/geospatial/static/geospatial/js/models.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
'use strict';

Check warning on line 1 in corehq/apps/geospatial/static/geospatial/js/models.js

View workflow job for this annotation

GitHub Actions / Lint Javascript

'use strict' is unnecessary inside of modules
hqDefine('geospatial/js/models', [
'jquery',
'knockout',
Expand All @@ -19,8 +19,6 @@
const SELECTED_FEATURE_ID_QUERY_PARAM = 'selected_feature_id';
const DEFAULT_CENTER_COORD = [-20.0, -0.0];
const DISBURSEMENT_LAYER_PREFIX = 'route-';
const saveGeoPolygonUrl = initialPageData.reverse('geo_polygons');
const reassignCasesUrl = initialPageData.reverse('reassign_cases');
const unexpectedErrorMessage = gettext(
"Oops! Something went wrong!" +
" Please report an issue if the problem persists."
Expand Down Expand Up @@ -808,6 +806,7 @@
if (!validateSavedPolygonName(name)) {
return;
}
const saveGeoPolygonUrl = initialPageData.reverse('geo_polygons');

if (!clearDisbursementBeforeProceeding()) {
return;
Expand Down Expand Up @@ -1062,7 +1061,7 @@
'case_id_to_owner_id': caseIdToOwnerId,
'include_related_cases': self.includeRelatedCases(),
};

const reassignCasesUrl = initialPageData.reverse('reassign_cases');
self.assignmentAjaxInProgress(true);
$.ajax({
type: 'post',
Expand Down
55 changes: 54 additions & 1 deletion corehq/apps/geospatial/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
from dimagi.utils.logging import notify_exception

from corehq.apps.celery import task
from corehq.apps.geospatial.const import INDEX_ES_TASK_HELPER_BASE_KEY
from corehq.apps.geospatial.es import case_query_for_missing_geopoint_val
from corehq.apps.geospatial.utils import (
CeleryTaskTracker,
get_celery_task_tracker,
get_flag_assigned_cases_config,
CeleryTaskTracker,
update_cases_owner,
get_geo_case_property,
)
from corehq.apps.geospatial.management.commands.index_geolocation_case_properties import (
Copy link
Contributor

@kaapstorm kaapstorm Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It feels awkward for the tasks module to be importing from a management command, instead of the other way round (or both importing from somewhere else, but I'd prefer not to throw everything in the utils module, like that second drawer in the kitchen that somehow ends up not only with tongs and skewers, but also clothes pegs, and screws, and one rusty paper clip).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a good alternative would be to move the es_case_query function off to the es.py file as I feel it would fit well there. This can then be renamed to something like es_case_query_for_missing_geopoint_val or something to that effect.

Alternatively, I can also simply create a file in a new "utils" directory to contain just the above helper function. I do feel the first option makes sense enough and is a bit more straightforward, but happy to go in this direction as well if you think it sounds more suitable @kaapstorm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The es module sounds ideal for es_case_query! Yeah, I agree, the function is worth renaming too, and you could drop the "es_" prefix because we would know from its module that it would be an Elasticsearch query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in a161b10.

get_batch_count,
process_batch,
DEFAULT_QUERY_LIMIT,
DEFAULT_CHUNK_SIZE,
)

from settings import MAX_GEOSPATIAL_INDEX_DOC_LIMIT


@task(queue="background_queue", ignore_result=True)
Expand All @@ -14,3 +28,42 @@ def geo_cases_reassignment_update_owners(domain, case_owner_updates_dict, task_k
finally:
celery_task_tracker = CeleryTaskTracker(task_key)
celery_task_tracker.mark_completed()


@task(queue='geospatial_queue', ignore_result=True)
def index_es_docs_with_location_props(domain):
celery_task_tracker = get_celery_task_tracker(domain, INDEX_ES_TASK_HELPER_BASE_KEY)
if celery_task_tracker.is_active():
return

geo_case_prop = get_geo_case_property(domain)
query = case_query_for_missing_geopoint_val(domain, geo_case_prop)
doc_count = query.count()
if doc_count > MAX_GEOSPATIAL_INDEX_DOC_LIMIT:
celery_task_tracker.mark_as_error(error_slug='TOO_MANY_CASES')
return

celery_task_tracker.mark_requested()
batch_count = get_batch_count(doc_count, DEFAULT_QUERY_LIMIT)
try:
for i in range(batch_count):
docs_left = doc_count - (DEFAULT_QUERY_LIMIT * i)
limit = min(DEFAULT_QUERY_LIMIT, docs_left)
process_batch(
domain,
geo_case_prop,
case_type=None,
query_limit=limit,
chunk_size=DEFAULT_CHUNK_SIZE,
offset=i * DEFAULT_QUERY_LIMIT,
)
celery_task_tracker.update_progress(current=i + 1, total=batch_count)
except Exception as e:
celery_task_tracker.mark_as_error(error_slug='CELERY')
notify_exception(
None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how do we ensure this is followed up on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The domain will receive an error alert if the Celery task fails, asking them to reach out to support. Sending this off to Sentry will allow us to have a bit more insight other than a generic CELERY_ERROR message to understand what could have gone wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, that works 👍

'Something went wrong with index_es_docs_with_location_props()',
details={'error': str(e)}
)
else:
celery_task_tracker.mark_completed()
1 change: 1 addition & 0 deletions corehq/apps/geospatial/templates/case_grouping_map.html
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{% load hq_shared_tags %}

{% block reportcontent %}
{% include 'geospatial/partials/index_alert.html' %}
<div class="row panel">
<div class="col col-md-2">
<span id="lock-groups-controls">
Expand Down
1 change: 1 addition & 0 deletions corehq/apps/geospatial/templates/case_management.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{% load i18n %}

{% block reportcontent %}
{% include 'geospatial/partials/index_alert.html' %}
<div class="panel panel-default" id="user-filters-panel">
<div class="panel-body collapse in" aria-expanded="true">
<legend>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{% load i18n %}

{% if task_status.status == 'ERROR' %}
<div class="alert alert-danger">
<p>
{% if task_status.error_slug == 'TOO_MANY_CASES' %}
{% blocktrans %}
This domain contains too many cases and so they will not be made available
for use by this feature. Please reach out to support.
{% endblocktrans %}
{% elif task_status.error_slug == 'CELERY' %}
{% blocktrans %}
Oops! Something went wrong while attempting to make cases ready for use
by this feature. Please reach out to support.
{% endblocktrans %}
{% endif %}
</p>
</div>
{% elif task_status.status == 'ACTIVE' %}
<div class="alert alert-info">
<p>
{% blocktrans %}
Cases are being made ready for use by this feature. Please be patient.
{% endblocktrans %}
({{ task_status.progress}}%)
</p>
</div>
{% endif %}
1 change: 1 addition & 0 deletions corehq/apps/geospatial/templates/geospatial/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
{% initial_page_data 'road_network_algorithm_slug' road_network_algorithm_slug %}

<form id="geospatial-config-form" class="form-horizontal disable-on-submit ko-template" method="post">
{% include 'geospatial/partials/index_alert.html' %}
{% crispy form %}
</form>
{% endblock %}
1 change: 1 addition & 0 deletions corehq/apps/geospatial/templates/gps_capture_view.html
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
{% initial_page_data 'case_types_with_gps' case_types_with_gps %}
{% initial_page_data 'couch_user_username' couch_user_username %}

{% include 'geospatial/partials/index_alert.html' %}
<ul id="tabs-list" class="nav nav-tabs">
<li data-bind="click: onclickAction" class="active"><a data-toggle="tab" href="#tabs-cases">{% trans 'Update Case Data' %}</a></li>
<li data-bind="click: onclickAction"><a data-toggle="tab" href="#tabs-users">{% trans 'Update Mobile Worker Data' %}</a></li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

from corehq.apps.es import CaseSearchES
from corehq.apps.es.case_search import case_search_adapter
from corehq.apps.es.client import manager
from corehq.apps.es.tests.utils import es_test
from corehq.apps.geospatial.management.commands.index_geolocation_case_properties import (
_es_case_query,
index_case_docs,
)
from corehq.apps.geospatial.es import case_query_for_missing_geopoint_val
from corehq.apps.geospatial.management.commands.index_geolocation_case_properties import index_case_docs
from corehq.apps.geospatial.models import GeoConfig


Expand Down Expand Up @@ -50,13 +49,14 @@ def setUpClass(cls):
cls.addClassCleanup(cls.geo_config.delete)

def test_has_cases_to_index(self):
query = _es_case_query(self.domain, self.gps_prop_name, self.primary_case_type)
query = case_query_for_missing_geopoint_val(self.domain, self.gps_prop_name, self.primary_case_type)
case_count = query.count()
self.assertEqual(case_count, 2)

def test_cases_correctly_indexed(self):
index_case_docs(self.domain, case_type=self.secondary_case_type)
query = _es_case_query(self.domain, self.gps_prop_name, self.secondary_case_type)
manager.index_refresh(case_search_adapter.index_name)
query = case_query_for_missing_geopoint_val(self.domain, self.gps_prop_name, self.secondary_case_type)
case_count = query.count()
self.assertEqual(case_count, 0)
doc = (
Expand Down
Loading
Loading