Skip to content

Commit

Permalink
Task/refactor create notification status (#2296)
Browse files Browse the repository at this point in the history
* first

* Refactor

* fix

* fix
  • Loading branch information
jzbahrai authored Sep 23, 2024
1 parent 3535888 commit 96b39cd
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 16 deletions.
49 changes: 35 additions & 14 deletions app/celery/reporting_tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from itertools import islice

from flask import current_app
from notifications_utils.statsd_decorators import statsd
Expand All @@ -12,6 +13,7 @@
fetch_notification_status_for_day,
update_fact_notification_status,
)
from app.models import Service


@notify_celery.task(name="create-nightly-billing")
Expand Down Expand Up @@ -72,19 +74,38 @@ def create_nightly_notification_status(day_start=None):
@notify_celery.task(name="create-nightly-notification-status-for-day")
@statsd(namespace="tasks")
def create_nightly_notification_status_for_day(process_day):
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()

start = datetime.utcnow()
transit_data = fetch_notification_status_for_day(process_day=process_day)
end = datetime.utcnow()
current_app.logger.info(
"create-nightly-notification-status-for-day {} fetched in {} seconds".format(process_day, (end - start).seconds)
)
"""
This function gets all the service ids and fetches the notification status for the given day.
It does it in chunks of 20 service ids at a time.
update_fact_notification_status(transit_data, process_day)
Args:
process_day (_type_): datetime object
"""
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
service_ids = [x.id for x in Service.query.all()]
chunk_size = 20
iter_service_ids = iter(service_ids)

while True:
chunk = list(islice(iter_service_ids, chunk_size))

if not chunk:
current_app.logger.info(
"create-nightly-notification-status-for-day job completed for process_day {} on {}".format(
process_day, datetime.now(timezone.utc).date()
)
)
break
start = datetime.now(timezone.utc)
transit_data = fetch_notification_status_for_day(process_day=process_day, service_ids=chunk)
end = datetime.now(timezone.utc)
current_app.logger.info(
"create-nightly-notification-status-for-day {} fetched in {} seconds".format(process_day, (end - start).seconds)
)
update_fact_notification_status(transit_data, process_day)

current_app.logger.info(
"create-nightly-notification-status-for-day task complete: {} rows updated for day: {}".format(
len(transit_data), process_day
current_app.logger.info(
"create-nightly-notification-status-for-day task complete: {} rows updated for day: {}, for service_ids: {}".format(
len(transit_data), process_day, service_ids
)
)
)
4 changes: 2 additions & 2 deletions app/dao/fact_notification_status_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@
)


def fetch_notification_status_for_day(process_day, service_id=None):
def fetch_notification_status_for_day(process_day, service_ids=None):
start_date = datetime.combine(process_day, time.min)
end_date = datetime.combine(process_day + timedelta(days=1), time.min)
# use notification_history if process day is older than 7 days
# this is useful if we need to rebuild the ft_billing table for a date older than 7 days ago.
current_app.logger.info("Fetch ft_notification_status for {} to {}".format(start_date, end_date))

all_data_for_process_day = []
service_ids = [x.id for x in Service.query.all()]
service_ids = service_ids if service_ids else [x.id for x in Service.query.all()]
# for each service
# for each notification type
# query notifications for day
Expand Down

0 comments on commit 96b39cd

Please sign in to comment.