Skip to content

Commit

Permalink
feat: added unit tests and refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
AhtishamShahid committed Sep 3, 2024
1 parent b8a589f commit 4d10701
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 55 deletions.
42 changes: 42 additions & 0 deletions jira.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import requests
from jira import JIRA

def get_jira_issues(jira_url, api_token, project_key, sprint_id):
"""Fetches JIRA issues based on given criteria.
Args:
jira_url (str): The URL of your JIRA instance.
api_token (str): Your JIRA personal access token.
project_key (str): The project key.
sprint_id (str): The sprint ID.
Returns:
list: A list of JIRA issues that match the criteria.
"""

# Authenticate with JIRA
jira = JIRA(jira_url, basic_auth=(api_token, ""))

# JQL query to filter issues
jql = f"assignee is not EMPTY AND project = {project_key} AND sprint = {sprint_id} AND updatedDate < now() - 24h AND comments.created <= now() - 24h"

# Fetch issues
issues = jira.search_issues(jql, fields=["key", "summary", "assignee", "updatedDate", "comments"])

return issues

# Replace with your JIRA instance details
jira_url = "https://your-jira-instance.atlassian.net"
api_token = "your-api-token"
project_key = "your-project-key"
sprint_id = "your-sprint-id"

issues = get_jira_issues(jira_url, api_token, project_key, sprint_id)

for issue in issues:
print(f"Issue Key: {issue.key}")
print(f"Summary: {issue.fields.summary}")
print(f"Assignee: {issue.fields.assignee.key}")
print(f"Updated Date: {issue.fields.updated}")
print(f"Comments: {len(issue.fields.comments)}")
print()
4 changes: 2 additions & 2 deletions openedx/core/djangoapps/notifications/config/waffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
# .. toggle_tickets: INF-1259
ENABLE_EMAIL_NOTIFICATIONS = WaffleFlag(f'{WAFFLE_NAMESPACE}.enable_email_notifications', __name__)

# .. toggle_name: notifications.enable_group_notifications
# .. toggle_name: notifications.enable_notification_grouping
# .. toggle_implementation: CourseWaffleFlag
# .. toggle_default: False
# .. toggle_description: Waffle flag to enable the Notifications Grouping feature
Expand All @@ -38,4 +38,4 @@
# .. toggle_target_removal_date: 2025-06-01
# .. toggle_warning: When the flag is ON, Notifications Grouping feature is enabled.
# .. toggle_tickets: INF-1472
ENABLE_GROUP_NOTIFICATIONS = CourseWaffleFlag(f'{WAFFLE_NAMESPACE}.enable_group_notifications', __name__)
ENABLE_NOTIFICATION_GROUPING = CourseWaffleFlag(f'{WAFFLE_NAMESPACE}.enable_notification_grouping', __name__)
121 changes: 73 additions & 48 deletions openedx/core/djangoapps/notifications/grouping_notifications.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,90 @@
"""
Utils for grouping notifications
"""
import datetime
from typing import Dict, Type, Union

from pytz import utc

from abc import ABC, abstractmethod

from openedx.core.djangoapps.notifications.models import Notification


def get_notification_type_grouping_function(notification_type):
class BaseNotificationGrouper(ABC):
@abstractmethod
def group(self, new_notification, old_notification):
pass


class NotificationRegistry:
_groupers: Dict[str, Type[BaseNotificationGrouper]] = {}

@classmethod
def register(cls, notification_type: str):
def decorator(grouper_class: Type[BaseNotificationGrouper]) -> Type[BaseNotificationGrouper]:
cls._groupers[notification_type] = grouper_class
return grouper_class

return decorator

@classmethod
def get_grouper(cls, notification_type: str) -> Union[BaseNotificationGrouper, None]:
"""Retrieves the appropriate notification grouper based on the given notification type.
Args:
notification_type: The type of notification for which to retrieve the grouper.
Returns:
The corresponding BaseNotificationGrouper instance or None if no grouper is found.
"""

grouper_class = cls._groupers.get(notification_type)
if not grouper_class:
return None
return grouper_class()


@NotificationRegistry.register('new_comment')
class NewCommentGrouper(BaseNotificationGrouper):
@classmethod
def group(cls, new_notification, old_notification):
context = old_notification.content_context.copy()
user_key = 'replier_name'
group_key = f'{user_key}_grouped'
if not context.get('grouped'):
context['user_key'] = user_key
context['group_key'] = group_key
context[group_key] = [context[user_key]]
context['grouped_count'] = 1
context['grouped'] = True
context[group_key].append(new_notification.content_context['replier_name'])
context['grouped_count'] += 1
return context

def group_user_notifications(new_notification: Notification, old_notification: Notification):
"""
Returns a method that groups notification of same notification type
If method doesn't exist, it returns None
Groups user notification based on notification type and group_id
"""
try:
return globals()[f"group_{notification_type}_notification"]
except KeyError:
return None
notification_type = new_notification.notification_type
grouper_class = NotificationRegistry.get_grouper(notification_type)

if grouper_class:
old_notification.content_context = grouper_class.group(new_notification, old_notification)
old_notification.content_context['grouped'] = True
old_notification.web = old_notification.web or new_notification.web
old_notification.email = old_notification.email or new_notification.email
old_notification.last_read = None
old_notification.last_seen = None
old_notification.created = utc.localize(datetime.datetime.now())
old_notification.save()


def get_user_existing_notifications(user_ids, notification_type, group_by_id, course_id):
"""
Returns user last groupable notification
Returns user last group able notification
"""
notifications = Notification.objects.filter(
user__in=user_ids, notification_type=notification_type, group_by_id=group_by_id,
user__in=user_ids,
notification_type=notification_type,
group_by_id=group_by_id,
course_id=course_id
)
notifications_mapping = {user_id: [] for user_id in user_ids}
Expand All @@ -34,39 +95,3 @@ def get_user_existing_notifications(user_ids, notification_type, group_by_id, co
notifications.sort(key=lambda elem: elem.created)
notifications_mapping[user_id] = notifications[0] if notifications else None
return notifications_mapping


def group_user_notifications(new_notification, old_notification):
"""
Groups user notification based on notification type and group_id
Params:
new_notification: [Notification] (Donot used object that has already been saved to DB)
existing_notifications: List[Notification]. Latest created notification will be updated to grouped
"""
notification_type = new_notification.notification_type
func = get_notification_type_grouping_function(notification_type)
func(new_notification, old_notification)
old_notification.content_context['grouped'] = True
old_notification.web = old_notification.web or new_notification.web
old_notification.email = old_notification.email or new_notification.email
old_notification.last_read = None
old_notification.last_seen = None
old_notification.created = utc.localize(datetime.datetime.now())
old_notification.save()


def group_new_comment_notification(new_notification, old_notification):
"""
Groups new_comment notification
"""
context = old_notification.content_context
user_key = 'replier_name'
group_key = f'{user_key}_grouped'
if not context.get('grouped'):
context['user_key'] = user_key
context['group_key'] = group_key
context[group_key] = [context[user_key]]
context['grouped_count'] = 1
context['grouped'] = True
context[group_key].append(new_notification.content_context['replier_name'])
context['grouped_count'] += 1
9 changes: 4 additions & 5 deletions openedx/core/djangoapps/notifications/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
get_default_values_of_preference,
get_notification_content
)
from openedx.core.djangoapps.notifications.config.waffle import ENABLE_GROUP_NOTIFICATIONS, ENABLE_NOTIFICATIONS
from openedx.core.djangoapps.notifications.config.waffle import ENABLE_NOTIFICATION_GROUPING, ENABLE_NOTIFICATIONS
from openedx.core.djangoapps.notifications.events import notification_generated_event
from openedx.core.djangoapps.notifications.filters import NotificationFilter
from openedx.core.djangoapps.notifications.grouping_notifications import (
get_notification_type_grouping_function,
get_user_existing_notifications,
group_user_notifications,
group_user_notifications, NotificationRegistry,
)
from openedx.core.djangoapps.notifications.models import (
CourseNotificationPreference,
Expand Down Expand Up @@ -132,8 +131,8 @@ def send_notifications(user_ids, course_key: str, app_name, notification_type, c
batch_size = settings.NOTIFICATION_CREATION_BATCH_SIZE

group_by_id = context.pop('group_by_id', '')
grouping_function = get_notification_type_grouping_function(notification_type)
waffle_flag_enabled = ENABLE_GROUP_NOTIFICATIONS.is_enabled(course_key)
grouping_function = NotificationRegistry.get_grouper(notification_type)
waffle_flag_enabled = ENABLE_NOTIFICATION_GROUPING.is_enabled(course_key)
grouping_enabled = waffle_flag_enabled and group_by_id and grouping_function is not None
notifications_generated = False
notification_content = ''
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""
Tests for notification grouping module
"""

import unittest
from unittest.mock import MagicMock, patch
from datetime import datetime
from pytz import utc

from openedx.core.djangoapps.notifications.grouping_notifications import (
BaseNotificationGrouper,
NotificationRegistry,
NewCommentGrouper,
group_user_notifications,
get_user_existing_notifications
)
from openedx.core.djangoapps.notifications.models import Notification


class TestNotificationRegistry(unittest.TestCase):
"""
Tests for the NotificationRegistry class
"""

def test_register_and_get_grouper(self):
"""
Test that the register and get_grouper methods work as expected
"""

class TestGrouper(BaseNotificationGrouper):
def group(self, new_notification, old_notification):
pass

NotificationRegistry.register('test_notification')(TestGrouper)
grouper = NotificationRegistry.get_grouper('test_notification')
self.assertIsInstance(grouper, TestGrouper)

def test_get_grouper_returns_none_for_unregistered_type(self):
"""
Test that get_grouper returns None for an unregistered notification type
"""
grouper = NotificationRegistry.get_grouper('non_existent')
self.assertIsNone(grouper)


class TestNewCommentGrouper(unittest.TestCase):

def setUp(self):
self.new_notification = MagicMock(spec=Notification)
self.old_notification = MagicMock(spec=Notification)
self.old_notification.content_context = {
'replier_name': 'User1'
}

def test_group_creates_grouping_keys(self):
"""
Test that the function creates the grouping keys
"""
updated_context = NewCommentGrouper.group(self.new_notification, self.old_notification)

self.assertIn('replier_name_grouped', updated_context)
self.assertIn('grouped_count', updated_context)
self.assertEqual(updated_context['grouped_count'], 2)
self.assertTrue(updated_context['grouped'])

def test_group_appends_to_existing_grouping(self):
"""
Test that the function appends to the existing grouping
"""
# Mock a pre-grouped notification
self.old_notification.content_context = {
'replier_name': 'User1',
'replier_name_grouped': ['User1', 'User2'],
'grouped': True,
'grouped_count': 2
}
self.new_notification.content_context = {'replier_name': 'User3'}

updated_context = NewCommentGrouper.group(self.new_notification, self.old_notification)

self.assertIn('replier_name_grouped', updated_context)
self.assertEqual(len(updated_context['replier_name_grouped']), 3)
self.assertEqual(updated_context['grouped_count'], 3)


class TestGroupUserNotifications(unittest.TestCase):

@patch('openedx.core.djangoapps.notifications.grouping_notifications.NotificationRegistry.get_grouper')
def test_group_user_notifications(self, mock_get_grouper):
"""
Test that the function groups notifications using the appropriate grou
"""
# Mock the grouper
mock_grouper = MagicMock(spec=NewCommentGrouper)
mock_get_grouper.return_value = mock_grouper

new_notification = MagicMock(spec=Notification)
old_notification = MagicMock(spec=Notification)

group_user_notifications(new_notification, old_notification)

mock_grouper.group.assert_called_once_with(new_notification, old_notification)
self.assertTrue(old_notification.save.called)
self.assertIsNone(old_notification.last_read)
self.assertIsNone(old_notification.last_seen)
self.assertIsNotNone(old_notification.created)

def test_group_user_notifications_no_grouper(self):
"""
Test that the function does nothing if no grouper is found
"""
new_notification = MagicMock(spec=Notification)
old_notification = MagicMock(spec=Notification)

group_user_notifications(new_notification, old_notification)

self.assertFalse(old_notification.save.called)


class TestGetUserExistingNotifications(unittest.TestCase):
"""
Tests for the get_user_existing_notifications function
"""

@patch('openedx.core.djangoapps.notifications.models.Notification.objects.filter')
def test_get_user_existing_notifications(self, mock_filter):
"""
Test that the function returns the last notification for each user
"""
# Mock the notification objects returned by the filter
mock_notification1 = MagicMock(spec=Notification)
mock_notification1.user_id = 1
mock_notification1.created = datetime(2023, 9, 1, tzinfo=utc)

mock_notification2 = MagicMock(spec=Notification)
mock_notification2.user_id = 1
mock_notification2.created = datetime(2023, 9, 2, tzinfo=utc)

mock_filter.return_value = [mock_notification1, mock_notification2]

user_ids = [1, 2]
notification_type = 'new_comment'
group_by_id = 'group_id_1'
course_id = 'course_1'

result = get_user_existing_notifications(user_ids, notification_type, group_by_id, course_id)

# Verify the results
self.assertEqual(result[1], mock_notification1)
self.assertIsNone(result[2]) # user 2 has no notifications

0 comments on commit 4d10701

Please sign in to comment.