diff --git a/app/celery/reporting_tasks.py b/app/celery/reporting_tasks.py index c7a58ce7dd..ed49df01b8 100644 --- a/app/celery/reporting_tasks.py +++ b/app/celery/reporting_tasks.py @@ -1,4 +1,5 @@ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone +from itertools import islice from flask import current_app from notifications_utils.statsd_decorators import statsd @@ -12,6 +13,7 @@ fetch_notification_status_for_day, update_fact_notification_status, ) +from app.models import Service @notify_celery.task(name="create-nightly-billing") @@ -72,19 +74,38 @@ def create_nightly_notification_status(day_start=None): @notify_celery.task(name="create-nightly-notification-status-for-day") @statsd(namespace="tasks") def create_nightly_notification_status_for_day(process_day): - process_day = datetime.strptime(process_day, "%Y-%m-%d").date() - - start = datetime.utcnow() - transit_data = fetch_notification_status_for_day(process_day=process_day) - end = datetime.utcnow() - current_app.logger.info( - "create-nightly-notification-status-for-day {} fetched in {} seconds".format(process_day, (end - start).seconds) - ) + """ + This function gets all the service ids and fetches the notification status for the given day. + It does it in chunks of 20 service ids at a time. - update_fact_notification_status(transit_data, process_day) + Args: + process_day (_type_): datetime object + """ + process_day = datetime.strptime(process_day, "%Y-%m-%d").date() + service_ids = [x.id for x in Service.query.all()] + chunk_size = 20 + iter_service_ids = iter(service_ids) + + while True: + chunk = list(islice(iter_service_ids, chunk_size)) + + if not chunk: + current_app.logger.info( + "create-nightly-notification-status-for-day job completed for process_day {} on {}".format( + process_day, datetime.now(timezone.utc).date() + ) + ) + break + start = datetime.now(timezone.utc) + transit_data = fetch_notification_status_for_day(process_day=process_day, service_ids=chunk) + end = datetime.now(timezone.utc) + current_app.logger.info( + "create-nightly-notification-status-for-day {} fetched in {} seconds".format(process_day, (end - start).seconds) + ) + update_fact_notification_status(transit_data, process_day) - current_app.logger.info( - "create-nightly-notification-status-for-day task complete: {} rows updated for day: {}".format( - len(transit_data), process_day + current_app.logger.info( + "create-nightly-notification-status-for-day task complete: {} rows updated for day: {}, for service_ids: {}".format( + len(transit_data), process_day, service_ids + ) ) - ) diff --git a/app/dao/fact_notification_status_dao.py b/app/dao/fact_notification_status_dao.py index 0da5e1b143..47721ad6b3 100644 --- a/app/dao/fact_notification_status_dao.py +++ b/app/dao/fact_notification_status_dao.py @@ -37,7 +37,7 @@ ) -def fetch_notification_status_for_day(process_day, service_id=None): +def fetch_notification_status_for_day(process_day, service_ids=None): start_date = datetime.combine(process_day, time.min) end_date = datetime.combine(process_day + timedelta(days=1), time.min) # use notification_history if process day is older than 7 days @@ -45,7 +45,7 @@ def fetch_notification_status_for_day(process_day, service_id=None): current_app.logger.info("Fetch ft_notification_status for {} to {}".format(start_date, end_date)) all_data_for_process_day = [] - service_ids = [x.id for x in Service.query.all()] + service_ids = service_ids if service_ids else [x.id for x in Service.query.all()] # for each service # for each notification type # query notifications for day