Skip to content

Commit

Permalink
Merge pull request #17 from cuda-networks/refactor_command
Browse files Browse the repository at this point in the history
refactor logic out of command to support instrumentation
alexeyts authored Feb 6, 2018
2 parents 80853d2 + 8dae7a3 commit e19fa45
Showing 3 changed files with 123 additions and 80 deletions.
81 changes: 2 additions & 79 deletions eb_sqs/management/commands/process_queue.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,13 @@
from __future__ import absolute_import, unicode_literals

from datetime import timedelta, datetime

import boto3
import logging

from botocore.config import Config
from django.core.management import BaseCommand, CommandError
from django.utils import timezone

from eb_sqs import settings
from eb_sqs.worker.worker import Worker
from eb_sqs.worker.worker_factory import WorkerFactory

logger = logging.getLogger(__name__)
from eb_sqs.worker.service import WorkerService


class Command(BaseCommand):
help = 'Command to process tasks from one or more SQS queues'

PREFIX_STR = 'prefix:'

def add_arguments(self, parser):
parser.add_argument('--queues', '-q',
dest='queue_names',
@@ -32,68 +19,4 @@ def handle(self, *args, **options):

queue_names = [queue_name.rstrip() for queue_name in options['queue_names'].split(',')]

logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names)))

sqs = boto3.resource(
'sqs',
region_name=settings.AWS_REGION,
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
)

prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names))
queues = self._get_queues_by_names(sqs, list(set(queue_names) - set(prefixes)))

queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes]
static_queues = queues
last_update_time = timezone.make_aware(datetime.min)

logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))

worker = WorkerFactory.default().create()

while True:
if len(queue_prefixes) > 0 and \
timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) > last_update_time:
queues = static_queues + self._get_queues_by_prefixes(sqs, queue_prefixes)
last_update_time = timezone.now()
logger.info('[django-eb-sqs] Updated SQS queues: {}'.format(
', '.join([queue.url for queue in queues])
))

for queue in queues:
try:
messages = queue.receive_messages(
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
WaitTimeSeconds=settings.WAIT_TIME_S,
)

for msg in messages:
self._process_message(msg, worker)
except Exception as exc:
logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1)

@staticmethod
def _process_message(msg, worker):
# type: (Any, Worker) -> None
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
try:
worker.execute(msg.body)
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
except Exception as exc:
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)
finally:
msg.delete()
logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id))

@staticmethod
def _get_queues_by_names(sqs, queue_names):
return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]

@staticmethod
def _get_queues_by_prefixes(sqs, prefixes):
queues = []

for prefix in prefixes:
queues += sqs.queues.filter(QueueNamePrefix=prefix)

return queues
WorkerService().process_queues(queue_names)
120 changes: 120 additions & 0 deletions eb_sqs/worker/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from __future__ import absolute_import, unicode_literals

from datetime import timedelta, datetime

import boto3
import logging

from botocore.config import Config
from django.utils import timezone

from eb_sqs import settings
from eb_sqs.worker.worker import Worker
from eb_sqs.worker.worker_factory import WorkerFactory

logger = logging.getLogger(__name__)


class WorkerService(object):
PREFIX_STR = 'prefix:'

def process_queues(self, queue_names):
# type: (list) -> None
logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names)))

sqs = boto3.resource(
'sqs',
region_name=settings.AWS_REGION,
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
)

prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names))
queues = self.get_queues_by_names(sqs, list(set(queue_names) - set(prefixes)))

queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes]
static_queues = queues
last_update_time = timezone.make_aware(datetime.min)

logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))

worker = WorkerFactory.default().create()

logger.info('[django-eb-sqs] WAIT_TIME_S = {}'.format(settings.WAIT_TIME_S))
logger.info('[django-eb-sqs] MAX_NUMBER_OF_MESSAGES = {}'.format(settings.MAX_NUMBER_OF_MESSAGES))
logger.info('[django-eb-sqs] AUTO_ADD_QUEUE = {}'.format(settings.AUTO_ADD_QUEUE))
logger.info('[django-eb-sqs] QUEUE_PREFIX = {}'.format(settings.QUEUE_PREFIX))
logger.info('[django-eb-sqs] DEFAULT_QUEUE = {}'.format(settings.DEFAULT_QUEUE))
logger.info('[django-eb-sqs] DEFAULT_MAX_RETRIES = {}'.format(settings.DEFAULT_MAX_RETRIES))
logger.info('[django-eb-sqs] REFRESH_PREFIX_QUEUES_S = {}'.format(settings.REFRESH_PREFIX_QUEUES_S))

while True:
if len(queue_prefixes) > 0 and \
timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) > last_update_time:
queues = static_queues + self.get_queues_by_prefixes(sqs, queue_prefixes)
last_update_time = timezone.now()
logger.info('[django-eb-sqs] Updated SQS queues: {}'.format(
', '.join([queue.url for queue in queues])
))

logger.debug('[django-eb-sqs] Processing {} queues'.format(len(queues)))
self.process_messages(queues, worker)

def process_messages(self, queues, worker):
# type: (list, Worker) -> None
for queue in queues:
try:
messages = self.poll_messages(queue)
logger.debug('[django-eb-sqs] Polled {} messages'.format(len(messages)))

msg_entries = []

for msg in messages:
self.process_message(msg, worker)
msg_entries.append({
'Id': msg.message_id,
'ReceiptHandle': msg.receipt_handle
})

self.delete_messages(queue, msg_entries)
except Exception as exc:
logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1)

def delete_messages(self, queue, msg_entries):
# type: (Queue, list) -> None
if len(msg_entries) > 0:
response = queue.delete_messages(Entries=msg_entries)
logger.debug('[django-eb-sqs] Deleted {} messages successfully'.format(
len(response.get('Successful', []))
))
logger.debug('[django-eb-sqs] Failed deleting {} messages'.format(
len(response.get('Failed', []))
))

def poll_messages(self, queue):
# type: (Queue) -> list
return queue.receive_messages(
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
WaitTimeSeconds=settings.WAIT_TIME_S,
)

def process_message(self, msg, worker):
# type: (Message, Worker) -> None
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
try:
worker.execute(msg.body)
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
except Exception as exc:
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)

def get_queues_by_names(self, sqs, queue_names):
# type: (ServiceResource, list) -> list
return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]

def get_queues_by_prefixes(self, sqs, prefixes):
# type: (ServiceResource, list) -> list
queues = []

for prefix in prefixes:
queues += sqs.queues.filter(QueueNamePrefix=prefix)

return queues
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@

setup(
name='django-eb-sqs',
version='1.0',
version='1.01',
package_dir={'eb_sqs': 'eb_sqs'},
include_package_data=True,
packages=find_packages(),

0 comments on commit e19fa45

Please sign in to comment.