From 75fa40b2e7ede9911f1cb08932b38e1abd229a9e Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Wed, 31 Jan 2018 17:09:38 -0800 Subject: [PATCH 01/10] refactor logic out of command to support instrumentation --- eb_sqs/management/commands/process_queue.py | 81 +---------------- eb_sqs/worker/service.py | 97 +++++++++++++++++++++ 2 files changed, 99 insertions(+), 79 deletions(-) create mode 100644 eb_sqs/worker/service.py diff --git a/eb_sqs/management/commands/process_queue.py b/eb_sqs/management/commands/process_queue.py index e86ac1e..4b02bff 100644 --- a/eb_sqs/management/commands/process_queue.py +++ b/eb_sqs/management/commands/process_queue.py @@ -1,26 +1,13 @@ from __future__ import absolute_import, unicode_literals -from datetime import timedelta, datetime - -import boto3 -import logging - -from botocore.config import Config from django.core.management import BaseCommand, CommandError -from django.utils import timezone - -from eb_sqs import settings -from eb_sqs.worker.worker import Worker -from eb_sqs.worker.worker_factory import WorkerFactory -logger = logging.getLogger(__name__) +from eb_sqs.worker.service import WorkerService class Command(BaseCommand): help = 'Command to process tasks from one or more SQS queues' - PREFIX_STR = 'prefix:' - def add_arguments(self, parser): parser.add_argument('--queues', '-q', dest='queue_names', @@ -32,68 +19,4 @@ def handle(self, *args, **options): queue_names = [queue_name.rstrip() for queue_name in options['queue_names'].split(',')] - logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names))) - - sqs = boto3.resource( - 'sqs', - region_name=settings.AWS_REGION, - config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) - ) - - prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names)) - queues = self._get_queues_by_names(sqs, list(set(queue_names) - set(prefixes))) - - queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes] - static_queues = queues - last_update_time = timezone.make_aware(datetime.min) - - logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names))) - - worker = WorkerFactory.default().create() - - while True: - if len(queue_prefixes) > 0 and \ - timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) > last_update_time: - queues = static_queues + self._get_queues_by_prefixes(sqs, queue_prefixes) - last_update_time = timezone.now() - logger.info('[django-eb-sqs] Updated SQS queues: {}'.format( - ', '.join([queue.url for queue in queues]) - )) - - for queue in queues: - try: - messages = queue.receive_messages( - MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES, - WaitTimeSeconds=settings.WAIT_TIME_S, - ) - - for msg in messages: - self._process_message(msg, worker) - except Exception as exc: - logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1) - - @staticmethod - def _process_message(msg, worker): - # type: (Any, Worker) -> None - logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) - try: - worker.execute(msg.body) - logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) - except Exception as exc: - logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1) - finally: - msg.delete() - logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id)) - - @staticmethod - def _get_queues_by_names(sqs, queue_names): - return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names] - - @staticmethod - def _get_queues_by_prefixes(sqs, prefixes): - queues = [] - - for prefix in prefixes: - queues += sqs.queues.filter(QueueNamePrefix=prefix) - - return queues + WorkerService().process_queues(queue_names) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py new file mode 100644 index 0000000..2e59eaa --- /dev/null +++ b/eb_sqs/worker/service.py @@ -0,0 +1,97 @@ +from __future__ import absolute_import, unicode_literals + +from datetime import timedelta, datetime + +import boto3 +import logging + +from botocore.config import Config +from django.utils import timezone + +from eb_sqs import settings +from eb_sqs.worker.worker import Worker +from eb_sqs.worker.worker_factory import WorkerFactory + +logger = logging.getLogger(__name__) + + +class WorkerService(object): + PREFIX_STR = 'prefix:' + + def process_queues(self, queue_names): + logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names))) + + sqs = boto3.resource( + 'sqs', + region_name=settings.AWS_REGION, + config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES}) + ) + + prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names)) + queues = self.get_queues_by_names(sqs, list(set(queue_names) - set(prefixes))) + + queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes] + static_queues = queues + last_update_time = timezone.make_aware(datetime.min) + + logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names))) + + worker = WorkerFactory.default().create() + + logger.info('[django-eb-sqs] WAIT_TIME_S = {}'.format(settings.WAIT_TIME_S)) + logger.info('[django-eb-sqs] MAX_NUMBER_OF_MESSAGES = {}'.format(settings.MAX_NUMBER_OF_MESSAGES)) + logger.info('[django-eb-sqs] AUTO_ADD_QUEUE = {}'.format(settings.AUTO_ADD_QUEUE)) + logger.info('[django-eb-sqs] QUEUE_PREFIX = {}'.format(settings.QUEUE_PREFIX)) + logger.info('[django-eb-sqs] DEFAULT_QUEUE = {}'.format(settings.DEFAULT_QUEUE)) + logger.info('[django-eb-sqs] DEFAULT_MAX_RETRIES = {}'.format(settings.DEFAULT_MAX_RETRIES)) + logger.info('[django-eb-sqs] REFRESH_PREFIX_QUEUES_S = {}'.format(settings.REFRESH_PREFIX_QUEUES_S)) + + while True: + if len(queue_prefixes) > 0 and \ + timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) > last_update_time: + queues = static_queues + self.get_queues_by_prefixes(sqs, queue_prefixes) + last_update_time = timezone.now() + logger.info('[django-eb-sqs] Updated SQS queues: {}'.format( + ', '.join([queue.url for queue in queues]) + )) + + self.process_messages(queues, worker) + + def process_messages(self, queues, worker): + for queue in queues: + try: + messages = self.poll_messages(queue) + + for msg in messages: + self.process_message(msg, worker) + except Exception as exc: + logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1) + + def poll_messages(self, queue): + return queue.receive_messages( + MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES, + WaitTimeSeconds=settings.WAIT_TIME_S, + ) + + def process_message(self, msg, worker): + # type: (Any, Worker) -> None + logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) + try: + worker.execute(msg.body) + logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) + except Exception as exc: + logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1) + finally: + msg.delete() + logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id)) + + def get_queues_by_names(self, sqs, queue_names): + return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names] + + def get_queues_by_prefixes(self, sqs, prefixes): + queues = [] + + for prefix in prefixes: + queues += sqs.queues.filter(QueueNamePrefix=prefix) + + return queues From e637b7c2b3bb0ead097657feb4514970e8c73515 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Wed, 31 Jan 2018 22:53:34 -0800 Subject: [PATCH 02/10] added type definitions --- eb_sqs/worker/service.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 2e59eaa..45acc85 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -19,6 +19,7 @@ class WorkerService(object): PREFIX_STR = 'prefix:' def process_queues(self, queue_names): + # type: (list) -> None logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names))) sqs = boto3.resource( @@ -58,6 +59,7 @@ def process_queues(self, queue_names): self.process_messages(queues, worker) def process_messages(self, queues, worker): + # type: (list, Worker) -> None for queue in queues: try: messages = self.poll_messages(queue) @@ -68,13 +70,14 @@ def process_messages(self, queues, worker): logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1) def poll_messages(self, queue): + # type: (Queue) -> list return queue.receive_messages( MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES, WaitTimeSeconds=settings.WAIT_TIME_S, ) def process_message(self, msg, worker): - # type: (Any, Worker) -> None + # type: (Message, Worker) -> None logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) try: worker.execute(msg.body) @@ -86,9 +89,11 @@ def process_message(self, msg, worker): logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id)) def get_queues_by_names(self, sqs, queue_names): + # type: (ServiceResource, list) -> list return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names] def get_queues_by_prefixes(self, sqs, prefixes): + # type: (ServiceResource, list) -> list queues = [] for prefix in prefixes: From 922b10ee898846753eceb0d9c1c20f52ed66680f Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Thu, 1 Feb 2018 09:33:17 -0800 Subject: [PATCH 03/10] update version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 26001bf..4976fda 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='django-eb-sqs', - version='1.0', + version='1.01', package_dir={'eb_sqs': 'eb_sqs'}, include_package_data=True, packages=find_packages(), From 9fd95beaa6ccb1d44d35b749e83e3b3bbce9c751 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Thu, 1 Feb 2018 13:49:56 -0800 Subject: [PATCH 04/10] testing --- eb_sqs/worker/service.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 45acc85..2bb81a3 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -56,6 +56,7 @@ def process_queues(self, queue_names): ', '.join([queue.url for queue in queues]) )) + logger.debug('[django-eb-sqs] Processing {} queues'.format(len(queues))) self.process_messages(queues, worker) def process_messages(self, queues, worker): @@ -63,6 +64,7 @@ def process_messages(self, queues, worker): for queue in queues: try: messages = self.poll_messages(queue) + logger.debug('[django-eb-sqs] Polled {} messages'.format(len(messages))) for msg in messages: self.process_message(msg, worker) @@ -80,7 +82,7 @@ def process_message(self, msg, worker): # type: (Message, Worker) -> None logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) try: - worker.execute(msg.body) + # worker.execute(msg.body) logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) except Exception as exc: logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1) From 889f9f65424b9f1dca92a79535cd11f6a6d20f8f Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Thu, 1 Feb 2018 14:13:02 -0800 Subject: [PATCH 05/10] bulk delete --- eb_sqs/worker/service.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 2bb81a3..6a63534 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -66,8 +66,17 @@ def process_messages(self, queues, worker): messages = self.poll_messages(queue) logger.debug('[django-eb-sqs] Polled {} messages'.format(len(messages))) + msg_entries = [] + for msg in messages: self.process_message(msg, worker) + msg_entries.append({ + 'Id': msg.message_id, + 'ReceiptHandle': msg.receipt_handle + }) + + queue.delete_messages(Entries=msg_entries) + logger.debug('[django-eb-sqs] Deleted {} messages'.format(len(messages))) except Exception as exc: logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1) @@ -86,9 +95,6 @@ def process_message(self, msg, worker): logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) except Exception as exc: logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1) - finally: - msg.delete() - logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id)) def get_queues_by_names(self, sqs, queue_names): # type: (ServiceResource, list) -> list From a87b5620db51a1fc6528365e30b5af9d6a042f6f Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Thu, 1 Feb 2018 14:15:08 -0800 Subject: [PATCH 06/10] minor fix --- eb_sqs/worker/service.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 6a63534..321c06f 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -75,8 +75,9 @@ def process_messages(self, queues, worker): 'ReceiptHandle': msg.receipt_handle }) - queue.delete_messages(Entries=msg_entries) - logger.debug('[django-eb-sqs] Deleted {} messages'.format(len(messages))) + if len(messages) > 0: + queue.delete_messages(Entries=msg_entries) + logger.debug('[django-eb-sqs] Deleted {} messages'.format(len(messages))) except Exception as exc: logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1) From 3abaff76f95bd4b0f62aad4fd82099eaef92b59b Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Thu, 1 Feb 2018 14:27:23 -0800 Subject: [PATCH 07/10] detailed printout --- eb_sqs/worker/service.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 321c06f..b9651ed 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -76,8 +76,9 @@ def process_messages(self, queues, worker): }) if len(messages) > 0: - queue.delete_messages(Entries=msg_entries) - logger.debug('[django-eb-sqs] Deleted {} messages'.format(len(messages))) + response = queue.delete_messages(Entries=msg_entries) + logger.debug('[django-eb-sqs] Deleted {} messages successfully'.format(len(response['Successful']))) + logger.debug('[django-eb-sqs] Failed deleting {} messages'.format(len(response['Failed']))) except Exception as exc: logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1) From b284913b6a0cecf91aa193475b4376fbadc3ab40 Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Thu, 1 Feb 2018 14:31:09 -0800 Subject: [PATCH 08/10] fixed prints --- eb_sqs/worker/service.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index b9651ed..1c2dd6f 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -77,8 +77,12 @@ def process_messages(self, queues, worker): if len(messages) > 0: response = queue.delete_messages(Entries=msg_entries) - logger.debug('[django-eb-sqs] Deleted {} messages successfully'.format(len(response['Successful']))) - logger.debug('[django-eb-sqs] Failed deleting {} messages'.format(len(response['Failed']))) + logger.debug('[django-eb-sqs] Deleted {} messages successfully'.format( + len(response.get('Successful', [])) + )) + logger.debug('[django-eb-sqs] Failed deleting {} messages'.format( + len(response.get('Failed', [])) + )) except Exception as exc: logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1) From 8741a40b033d112d730fb9202cfda784c2a2391f Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Thu, 1 Feb 2018 14:33:51 -0800 Subject: [PATCH 09/10] revert comment out --- eb_sqs/worker/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index 1c2dd6f..e82d724 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -97,7 +97,7 @@ def process_message(self, msg, worker): # type: (Message, Worker) -> None logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id)) try: - # worker.execute(msg.body) + worker.execute(msg.body) logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id)) except Exception as exc: logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1) From 8dae7a3082ce4c7a20592226d9dc989735010a9d Mon Sep 17 00:00:00 2001 From: Alexey Tsitkin Date: Fri, 2 Feb 2018 23:00:03 -0800 Subject: [PATCH 10/10] minor refactoring --- eb_sqs/worker/service.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/eb_sqs/worker/service.py b/eb_sqs/worker/service.py index e82d724..bd13cd1 100644 --- a/eb_sqs/worker/service.py +++ b/eb_sqs/worker/service.py @@ -75,17 +75,21 @@ def process_messages(self, queues, worker): 'ReceiptHandle': msg.receipt_handle }) - if len(messages) > 0: - response = queue.delete_messages(Entries=msg_entries) - logger.debug('[django-eb-sqs] Deleted {} messages successfully'.format( - len(response.get('Successful', [])) - )) - logger.debug('[django-eb-sqs] Failed deleting {} messages'.format( - len(response.get('Failed', [])) - )) + self.delete_messages(queue, msg_entries) except Exception as exc: logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1) + def delete_messages(self, queue, msg_entries): + # type: (Queue, list) -> None + if len(msg_entries) > 0: + response = queue.delete_messages(Entries=msg_entries) + logger.debug('[django-eb-sqs] Deleted {} messages successfully'.format( + len(response.get('Successful', [])) + )) + logger.debug('[django-eb-sqs] Failed deleting {} messages'.format( + len(response.get('Failed', [])) + )) + def poll_messages(self, queue): # type: (Queue) -> list return queue.receive_messages(