Skip to content

Commit

Permalink
Merge pull request #12 from cuda-networks/process-queue-command
Browse files Browse the repository at this point in the history
Process queue Django Command
itaybleier authored Mar 23, 2017
2 parents 48c16a6 + 2238370 commit 9a6317b
Showing 5 changed files with 97 additions and 20 deletions.
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ django-eb-sqs is a simple task manager for the Elastic Beanstalk Worker Tier. It

### Installation

Install the module with `pip install django-eb-sqs` or add it to your `requirements.txt`.
Install the module with `pip install git+git://github.com/cuda-networks/django-eb-sqs.git` or add it to your `requirements.txt`.

Don't forget to add django-eb-sqs app to your Django `INSTALLED_APPS` settings:
```python
@@ -94,6 +94,17 @@ For example:
python manage.py run_eb_sqs_worker --url http://127.0.0.1:80/worker/process --queue default
```

#### Executing Tasks without Elastic Beanstalk

Another way of executing tasks is to use the Django command `process_queue`.
This command can work with one or more queues, reading from the queues infinitely and executing tasks as they come-in.

```bash
python manage.py process_queue --queues <comma-delimited list of queue names>
```

This is a good idea for someone who wants to execute tasks without an Elastic Beanstalk worker.


#### Group Tasks
Multiple tasks can be grouped by specifing the `group_id` argument when calling `delay` on a task.
@@ -119,6 +130,9 @@ def group_finished(group_id):

The following settings can be used to fine tune django-eb-sqs. Copy them into your Django `settings.py` file.

- EB_AWS_REGION (`us-east-1`): The AWS region to use when working with SQS.
- EB_SQS_MAX_NUMBER_OF_MESSAGES (`10`): The maximum number of messages to read in a single call from SQS (<= 10).
- EB_SQS_WAIT_TIME_S (`2`): The time to wait (seconds) when receiving messages from SQS.
- EB_SQS_AUTO_ADD_QUEUE (`True`): If queues should be added automatically to AWS if they don't exist.
- EB_SQS_DEAD_LETTER_MODE (`False`): Enable if this worker is handling the SQS dead letter queue. Tasks won't be executed but group callback is.
- EB_SQS_DEFAULT_DELAY (`0`): Default task delay time in seconds.
4 changes: 2 additions & 2 deletions development.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
boto3==1.3.1
Django==1.9.6
boto3==1.4.4
Django==1.10.6
mock==2.0.0
moto==0.4.24
redis==2.10.5
58 changes: 58 additions & 0 deletions eb_sqs/management/commands/process_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from __future__ import absolute_import, unicode_literals

import boto3
import logging

from django.core.management import BaseCommand, CommandError

from eb_sqs import settings
from eb_sqs.worker.worker import Worker
from eb_sqs.worker.worker_factory import WorkerFactory

logger = logging.getLogger(__name__)


class Command(BaseCommand):
help = 'Command to process tasks from one or more SQS queues'

def add_arguments(self, parser):
parser.add_argument('--queues', '-q',
dest='queue_names',
help='Name of queues to process, separated by commas')

def handle(self, *args, **options):
if not options['queue_names']:
raise CommandError('Queue names (--queues) not specified')

queue_names = [queue_name.rstrip() for queue_name in options['queue_names'].split(',')]

logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names)))

sqs = boto3.resource('sqs', region_name=settings.AWS_REGION)
queues = [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]

logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))

worker = WorkerFactory.default().create()

while True:
for queue in queues:
messages = queue.receive_messages(
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
WaitTimeSeconds=settings.WAIT_TIME_S,
)

for msg in messages:
self._process_message(msg, worker)

def _process_message(self, msg, worker):
# type: (Any, Worker) -> None
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
try:
worker.execute(msg.body)
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
except Exception as exc:
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)
finally:
msg.delete()
logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id))
35 changes: 20 additions & 15 deletions eb_sqs/settings.py
Original file line number Diff line number Diff line change
@@ -2,26 +2,31 @@

from django.conf import settings

AUTO_ADD_QUEUE = getattr(settings, 'EB_SQS_AUTO_ADD_QUEUE', True) # type: bool
QUEUE_PREFIX = getattr(settings, 'EB_SQS_QUEUE_PREFIX', 'eb-sqs-') # type: unicode
DEFAULT_QUEUE = getattr(settings, 'EB_SQS_DEFAULT_QUEUE', 'default') # type: unicode
AWS_REGION = getattr(settings, 'EB_AWS_REGION', 'us-east-1') # type: unicode

EXECUTE_INLINE = getattr(settings, 'EB_SQS_EXECUTE_INLINE', False) # type: bool
FORCE_SERIALIZATION = getattr(settings, 'EB_SQS_FORCE_SERIALIZATION', False) # type: bool
MAX_NUMBER_OF_MESSAGES = getattr(settings, 'EB_SQS_MAX_NUMBER_OF_MESSAGES', 10) # type: int
WAIT_TIME_S = getattr(settings, 'EB_SQS_WAIT_TIME_S', 2) # type: int

DEFAULT_DELAY = getattr(settings, 'EB_SQS_DEFAULT_DELAY', 0) # type: int
DEFAULT_MAX_RETRIES = getattr(settings, 'EB_SQS_DEFAULT_MAX_RETRIES', 0) # type: int
DEFAULT_COUNT_RETRIES = getattr(settings, 'EB_SQS_DEFAULT_COUNT_RETRIES', True) # type: bool
AUTO_ADD_QUEUE = getattr(settings, 'EB_SQS_AUTO_ADD_QUEUE', True) # type: bool
QUEUE_PREFIX = getattr(settings, 'EB_SQS_QUEUE_PREFIX', 'eb-sqs-') # type: unicode
DEFAULT_QUEUE = getattr(settings, 'EB_SQS_DEFAULT_QUEUE', 'default') # type: unicode

USE_PICKLE = getattr(settings, 'EB_SQS_USE_PICKLE', False) # type: bool
EXECUTE_INLINE = getattr(settings, 'EB_SQS_EXECUTE_INLINE', False) # type: bool
FORCE_SERIALIZATION = getattr(settings, 'EB_SQS_FORCE_SERIALIZATION', False) # type: bool

GROUP_CALLBACK_TASK = getattr(settings, 'EB_SQS_GROUP_CALLBACK_TASK', None) # type: Any
DEFAULT_DELAY = getattr(settings, 'EB_SQS_DEFAULT_DELAY', 0) # type: int
DEFAULT_MAX_RETRIES = getattr(settings, 'EB_SQS_DEFAULT_MAX_RETRIES', 0) # type: int
DEFAULT_COUNT_RETRIES = getattr(settings, 'EB_SQS_DEFAULT_COUNT_RETRIES', True) # type: bool

REDIS_CLIENT = getattr(settings, 'EB_SQS_REDIS_CLIENT', None) # type: StrictRedis
USE_PICKLE = getattr(settings, 'EB_SQS_USE_PICKLE', False) # type: bool

GROUP_CALLBACK_TASK = getattr(settings, 'EB_SQS_GROUP_CALLBACK_TASK', None) # type: Any

REDIS_CLIENT = getattr(settings, 'EB_SQS_REDIS_CLIENT', None) # type: StrictRedis
# default: 7 days
REDIS_EXPIRY = getattr(settings, 'EB_SQS_REDIS_EXPIRY', 3600*24*7) # type: int
REDIS_KEY_PREFIX = getattr(settings, 'EB_SQS_REDIS_KEY_PREFIX', 'eb-sqs-') # type: string
REDIS_EXPIRY = getattr(settings, 'EB_SQS_REDIS_EXPIRY', 3600 * 24 * 7) # type: int
REDIS_KEY_PREFIX = getattr(settings, 'EB_SQS_REDIS_KEY_PREFIX', 'eb-sqs-') # type: string

WORKER_FACTORY = getattr(settings, 'EB_SQS_WORKER_FACTORY', None) # type: WorkerFactory
WORKER_FACTORY = getattr(settings, 'EB_SQS_WORKER_FACTORY', None) # type: WorkerFactory

DEAD_LETTER_MODE = getattr(settings, 'EB_SQS_DEAD_LETTER_MODE', False) # type: bool
DEAD_LETTER_MODE = getattr(settings, 'EB_SQS_DEAD_LETTER_MODE', False) # type: bool
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -6,13 +6,13 @@

setup(
name='django-eb-sqs',
version='0.96',
version='0.97',
package_dir={'eb_sqs': 'eb_sqs'},
include_package_data=True,
packages=find_packages(),
description='A SQS worker implementation for Elastic Beanstalk',
long_description=README,
url='https://github.com/sookasa/django-eb-sqs',
url='https://github.com/cuda-networks/django-eb-sqs',
install_requires=[
'boto3>=1.3.1',
'Django>=1.7',

0 comments on commit 9a6317b

Please sign in to comment.