diff --git a/post_office/mail.py b/post_office/mail.py index 18a31ab8..3a9a95a1 100644 --- a/post_office/mail.py +++ b/post_office/mail.py @@ -7,7 +7,8 @@ from django.template import Context, Template from django.utils import timezone from email.utils import make_msgid -from multiprocessing import Pool +from multiprocessing import Pool, TimeoutError +from multiprocessing.context import TimeoutError as ContextTimeoutError from multiprocessing.dummy import Pool as ThreadPool from .connections import connections @@ -15,21 +16,47 @@ from .logutils import setup_loghandlers from .models import Email, EmailTemplate, Log, PRIORITY, STATUS from .settings import ( - get_available_backends, get_batch_size, get_log_level, get_max_retries, get_message_id_enabled, - get_message_id_fqdn, get_retry_timedelta, get_sending_order, get_threads_per_process, + get_available_backends, + get_batch_delivery_timeout, + get_batch_size, + get_log_level, + get_max_retries, + get_message_id_enabled, + get_message_id_fqdn, + get_retry_timedelta, + get_sending_order, + get_threads_per_process, ) from .signals import email_queued from .utils import ( - create_attachments, get_email_template, parse_emails, parse_priority, split_emails, + create_attachments, + get_email_template, + parse_emails, + parse_priority, + split_emails, ) logger = setup_loghandlers("INFO") -def create(sender, recipients=None, cc=None, bcc=None, subject='', message='', - html_message='', context=None, scheduled_time=None, expires_at=None, headers=None, - template=None, priority=None, render_on_delivery=False, commit=True, - backend=''): +def create( + sender, + recipients=None, + cc=None, + bcc=None, + subject="", + message="", + html_message="", + context=None, + scheduled_time=None, + expires_at=None, + headers=None, + template=None, + priority=None, + render_on_delivery=False, + commit=True, + backend="", +): """ Creates an email from supplied keyword arguments. If template is specified, email subject and content will be rendered during delivery. @@ -44,8 +71,10 @@ def create(sender, recipients=None, cc=None, bcc=None, subject='', message='', if bcc is None: bcc = [] if context is None: - context = '' - message_id = make_msgid(domain=get_message_id_fqdn()) if get_message_id_enabled() else None + context = "" + message_id = ( + make_msgid(domain=get_message_id_fqdn()) if get_message_id_enabled() else None + ) # If email is to be rendered during delivery, save all necessary # information @@ -58,12 +87,15 @@ def create(sender, recipients=None, cc=None, bcc=None, subject='', message='', scheduled_time=scheduled_time, expires_at=expires_at, message_id=message_id, - headers=headers, priority=priority, status=status, - context=context, template=template, backend_alias=backend + headers=headers, + priority=priority, + status=status, + context=context, + template=template, + backend_alias=backend, ) else: - if template: subject = template.subject message = template.content @@ -85,7 +117,9 @@ def create(sender, recipients=None, cc=None, bcc=None, subject='', message='', scheduled_time=scheduled_time, expires_at=expires_at, message_id=message_id, - headers=headers, priority=priority, status=status, + headers=headers, + priority=priority, + status=status, backend_alias=backend, template=template, ) @@ -96,25 +130,41 @@ def create(sender, recipients=None, cc=None, bcc=None, subject='', message='', return email -def send(recipients=None, sender=None, template=None, context=None, subject='', - message='', html_message='', scheduled_time=None, expires_at=None, headers=None, - priority=None, attachments=None, render_on_delivery=False, - log_level=None, commit=True, cc=None, bcc=None, language='', - backend=''): +def send( + recipients=None, + sender=None, + template=None, + context=None, + subject="", + message="", + html_message="", + scheduled_time=None, + expires_at=None, + headers=None, + priority=None, + attachments=None, + render_on_delivery=False, + log_level=None, + commit=True, + cc=None, + bcc=None, + language="", + backend="", +): try: recipients = parse_emails(recipients) except ValidationError as e: - raise ValidationError('recipients: %s' % e.message) + raise ValidationError("recipients: %s" % e.message) try: cc = parse_emails(cc) except ValidationError as e: - raise ValidationError('c: %s' % e.message) + raise ValidationError("c: %s" % e.message) try: bcc = parse_emails(bcc) except ValidationError as e: - raise ValidationError('bcc: %s' % e.message) + raise ValidationError("bcc: %s" % e.message) if sender is None: sender = settings.DEFAULT_FROM_EMAIL @@ -132,11 +182,17 @@ def send(recipients=None, sender=None, template=None, context=None, subject='', if template: if subject: - raise ValueError('You can\'t specify both "template" and "subject" arguments') + raise ValueError( + 'You can\'t specify both "template" and "subject" arguments' + ) if message: - raise ValueError('You can\'t specify both "template" and "message" arguments') + raise ValueError( + 'You can\'t specify both "template" and "message" arguments' + ) if html_message: - raise ValueError('You can\'t specify both "template" and "html_message" arguments') + raise ValueError( + 'You can\'t specify both "template" and "html_message" arguments' + ) # template can be an EmailTemplate instance or name if isinstance(template, EmailTemplate): @@ -148,11 +204,26 @@ def send(recipients=None, sender=None, template=None, context=None, subject='', template = get_email_template(template, language) if backend and backend not in get_available_backends().keys(): - raise ValueError('%s is not a valid backend alias' % backend) - - email = create(sender, recipients, cc, bcc, subject, message, html_message, - context, scheduled_time, expires_at, headers, template, priority, - render_on_delivery, commit=commit, backend=backend) + raise ValueError("%s is not a valid backend alias" % backend) + + email = create( + sender, + recipients, + cc, + bcc, + subject, + message, + html_message, + context, + scheduled_time, + expires_at, + headers, + template, + priority, + render_on_delivery, + commit=commit, + backend=backend, + ) if attachments: attachments = create_attachments(attachments) @@ -185,13 +256,15 @@ def get_queued(): - Has expires_at after the current time or is None """ now = timezone.now() - query = ( - (Q(scheduled_time__lte=now) | Q(scheduled_time=None)) & - (Q(expires_at__gt=now) | Q(expires_at=None)) + query = (Q(scheduled_time__lte=now) | Q(scheduled_time=None)) & ( + Q(expires_at__gt=now) | Q(expires_at=None) + ) + return ( + Email.objects.filter(query, status__in=[STATUS.queued, STATUS.requeued]) + .select_related("template") + .order_by(*get_sending_order()) + .prefetch_related("attachments")[: get_batch_size()] ) - return Email.objects.filter(query, status__in=[STATUS.queued, STATUS.requeued]) \ - .select_related('template') \ - .order_by(*get_sending_order()).prefetch_related('attachments')[:get_batch_size()] def send_queued(processes=1, log_level=None): @@ -202,8 +275,9 @@ def send_queued(processes=1, log_level=None): total_sent, total_failed, total_requeued = 0, 0, 0 total_email = len(queued_emails) - logger.info('Started sending %s emails with %s processes.' % - (total_email, processes)) + logger.info( + "Started sending %s emails with %s processes." % (total_email, processes) + ) if log_level is None: log_level = get_log_level() @@ -223,16 +297,40 @@ def send_queued(processes=1, log_level=None): email_lists = split_emails(queued_emails, processes) pool = Pool(processes) - results = pool.map(_send_bulk, email_lists) + + tasks = [] + for email_list in email_lists: + tasks.append(pool.apply_async(_send_bulk, args=(email_list,))) + + timeout = get_batch_delivery_timeout() + results = [] + + # Wait for all tasks to complete with a timeout + # The get method is used with a timeout to wait for each result + for task in tasks: + results.append(task.get(timeout=timeout)) + # for task in tasks: + # try: + # # Wait for all tasks to complete with a timeout + # # The get method is used with a timeout to wait for each result + # results.append(task.get(timeout=timeout)) + # except (TimeoutError, ContextTimeoutError): + # logger.exception("Process timed out after %d seconds" % timeout) + + # results = pool.map(_send_bulk, email_lists) pool.terminate() + pool.join() total_sent = sum(result[0] for result in results) total_failed = sum(result[1] for result in results) total_requeued = [result[2] for result in results] logger.info( - '%s emails attempted, %s sent, %s failed, %s requeued', - total_email, total_sent, total_failed, total_requeued, + "%s emails attempted, %s sent, %s failed, %s requeued", + total_email, + total_sent, + total_failed, + total_requeued, ) return total_sent, total_failed, total_requeued @@ -252,16 +350,17 @@ def _send_bulk(emails, uses_multiprocessing=True, log_level=None): failed_emails = [] # This is a list of two tuples (email, exception) email_count = len(emails) - logger.info('Process started, sending %s emails' % email_count) + logger.info("Process started, sending %s emails" % email_count) def send(email): try: - email.dispatch(log_level=log_level, commit=False, - disconnect_after_delivery=False) + email.dispatch( + log_level=log_level, commit=False, disconnect_after_delivery=False + ) sent_emails.append(email) - logger.debug('Successfully sent email #%d' % email.id) + logger.debug("Successfully sent email #%d" % email.id) except Exception as e: - logger.exception('Failed to send email #%d' % email.id) + logger.exception("Failed to send email #%d" % email.id) failed_emails.append((email, e)) # Prepare emails before we send these to threads for sending @@ -272,13 +371,30 @@ def send(email): try: email.prepare_email_message() except Exception as e: - logger.exception('Failed to prepare email #%d' % email.id) + logger.exception("Failed to prepare email #%d" % email.id) failed_emails.append((email, e)) number_of_threads = min(get_threads_per_process(), email_count) pool = ThreadPool(number_of_threads) - pool.map(send, emails) + results = [] + for email in emails: + results.append(pool.apply_async(send, args=(email,))) + + timeout = get_batch_delivery_timeout() + + # Wait for all tasks to complete with a timeout + # The get method is used with a timeout to wait for each result + for result in results: + result.get(timeout=timeout) + # for result in results: + # try: + # # Wait for all tasks to complete with a timeout + # # The get method is used with a timeout to wait for each result + # result.get(timeout=timeout) + # except TimeoutError: + # logger.exception("Process timed out after %d seconds" % timeout) + pool.close() pool.join() @@ -306,25 +422,28 @@ def send(email): email.status = STATUS.failed num_failed += 1 - Email.objects.bulk_update(emails_failed, ['status', 'scheduled_time', 'number_of_retries']) + Email.objects.bulk_update( + emails_failed, ["status", "scheduled_time", "number_of_retries"] + ) # If log level is 0, log nothing, 1 logs only sending failures # and 2 means log both successes and failures if log_level >= 1: - logs = [] - for (email, exception) in failed_emails: + for email, exception in failed_emails: logs.append( - Log(email=email, status=STATUS.failed, + Log( + email=email, + status=STATUS.failed, message=str(exception), - exception_type=type(exception).__name__) + exception_type=type(exception).__name__, + ) ) if logs: Log.objects.bulk_create(logs) if log_level == 2: - logs = [] for email in sent_emails: logs.append(Log(email=email, status=STATUS.sent)) @@ -333,8 +452,11 @@ def send(email): Log.objects.bulk_create(logs) logger.info( - 'Process finished, %s attempted, %s sent, %s failed, %s requeued', - email_count, len(sent_emails), num_failed, num_requeued, + "Process finished, %s attempted, %s sent, %s failed, %s requeued", + email_count, + len(sent_emails), + num_failed, + num_requeued, ) return len(sent_emails), num_failed, num_requeued @@ -346,12 +468,12 @@ def send_queued_mail_until_done(lockfile=default_lockfile, processes=1, log_leve """ try: with FileLock(lockfile): - logger.info('Acquired lock for sending queued emails at %s.lock', lockfile) + logger.info("Acquired lock for sending queued emails at %s.lock", lockfile) while True: try: send_queued(processes, log_level) except Exception as e: - logger.exception(e, extra={'status_code': 500}) + logger.exception(e, extra={"status_code": 500}) raise # Close DB connection to avoid multiprocessing errors @@ -360,4 +482,4 @@ def send_queued_mail_until_done(lockfile=default_lockfile, processes=1, log_leve if not get_queued().exists(): break except FileLocked: - logger.info('Failed to acquire lock, terminating now.') \ No newline at end of file + logger.info("Failed to acquire lock, terminating now.") diff --git a/post_office/settings.py b/post_office/settings.py index 98f30360..387d31db 100644 --- a/post_office/settings.py +++ b/post_office/settings.py @@ -124,6 +124,11 @@ def get_message_id_fqdn(): return get_config().get('MESSAGE_ID_FQDN', DNS_NAME) +# BATCH_DELIVERY_TIMEOUT defaults to 180 seconds (3 minutes) +def get_batch_delivery_timeout(): + return get_config().get('BATCH_DELIVERY_TIMEOUT', 180) + + CONTEXT_FIELD_CLASS = get_config().get('CONTEXT_FIELD_CLASS', 'django.db.models.JSONField') context_field_class = import_string(CONTEXT_FIELD_CLASS) diff --git a/post_office/test_settings.py b/post_office/test_settings.py index 3c193d11..d1da56c4 100644 --- a/post_office/test_settings.py +++ b/post_office/test_settings.py @@ -39,10 +39,12 @@ 'error': 'post_office.tests.test_backends.ErrorRaisingBackend', 'smtp': 'django.core.mail.backends.smtp.EmailBackend', 'connection_tester': 'post_office.tests.test_mail.ConnectionTestingBackend', + 'slow_backend': 'post_office.tests.test_mail.SlowTestBackend', }, 'CELERY_ENABLED': False, 'MAX_RETRIES': 2, 'MESSAGE_ID_ENABLED': True, + 'BATCH_DELIVERY_TIMEOUT': 2, 'MESSAGE_ID_FQDN': 'example.com', } diff --git a/post_office/tests/test_mail.py b/post_office/tests/test_mail.py index 40e70e06..37421e75 100644 --- a/post_office/tests/test_mail.py +++ b/post_office/tests/test_mail.py @@ -1,23 +1,22 @@ -from unittest.mock import patch, MagicMock +import re +import time from datetime import timedelta +from multiprocessing.context import TimeoutError +from unittest.mock import MagicMock, patch import pytz -import re - +from django.conf import settings from django.core import mail from django.core.exceptions import ValidationError from django.core.files.base import ContentFile -from django.conf import settings - from django.test import TestCase from django.test.utils import override_settings from django.utils import timezone -from ..settings import get_batch_size, get_log_level, get_threads_per_process, get_max_retries, get_retry_timedelta -from ..models import Email, EmailTemplate, Attachment, PRIORITY, STATUS -from ..mail import (create, get_queued, - send, send_many, send_queued, _send_bulk) - +from ..mail import _send_bulk, create, get_queued, send, send_many, send_queued +from ..models import PRIORITY, STATUS, Attachment, Email, EmailTemplate +from ..settings import (get_batch_size, get_log_level, get_max_retries, + get_retry_timedelta, get_threads_per_process) connection_counter = 0 @@ -35,6 +34,15 @@ def send_messages(self, email_messages): pass +class SlowTestBackend(mail.backends.base.BaseEmailBackend): + ''' + An EmailBackend that sleeps for 10 seconds when sending messages + ''' + + def send_messages(self, email_messages): + time.sleep(5) + + class MailTest(TestCase): @override_settings(EMAIL_BACKEND='django.core.mail.backends.locmem.EmailBackend') @@ -481,6 +489,22 @@ def test_invalid_expired(self): scheduled_time=timezone.datetime(2020, 5, 18, 9, 0, 1), expires_at=timezone.datetime(2020, 5, 18, 9, 0, 0)) + def test_batch_delivery_timeout(self): + """ + Ensure that batch delivery timeout is respected. + """ + email = Email.objects.create(to=['to@example.com'], + from_email='bob@example.com', subject='', + message='', status=STATUS.queued, backend_alias='slow_backend') + start_time = timezone.now() + # slow backend sleeps for 5 seconds, so we should get a timeout error since we set + # BATCH_DELIVERY_TIMEOUT timeout to 2 seconds in test_settings.py + with self.assertRaises(TimeoutError): + send_queued() + end_time = timezone.now() + # Assert that running time is less than 3 seconds (2 seconds timeout + 1 second buffer) + self.assertTrue(end_time - start_time < timezone.timedelta(seconds=3)) + @patch('post_office.signals.email_queued.send') def test_backend_signal(self, mock): """ diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..9bcd8a5a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[tool.black] +line-length = 120 +target-version = ["py38"] +skip-string-normalization = true \ No newline at end of file