Skip to content

Commit

Permalink
Django Signals to expose the WorkerService progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Schweigi committed Jan 22, 2019
1 parent cad568f commit 54f8227
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ python manage.py process_queue --queues queue1,queue2 # process queue1 and queue
python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queue1, queue2 and any queue whose name starts with 'pr1-'
```

Use the signals `MESSAGES_RECEIVED`, `MESSAGES_PROCESSED`, `MESSAGES_DELETED` of the `WorkerService` to get informed about the current SQS batch being processed by the management command.

#### Group Tasks
Multiple tasks can be grouped by specifying the `group_id` argument when calling `delay` on a task.
If all tasks of a specific group are executed then the group callback task specified by `EB_SQS_GROUP_CALLBACK_TASK` is executed.
Expand Down
13 changes: 11 additions & 2 deletions eb_sqs/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
import django.dispatch
from django.utils import timezone

from eb_sqs import settings
Expand All @@ -16,10 +17,13 @@

logger = logging.getLogger(__name__)

MESSAGES_RECEIVED = django.dispatch.Signal(providing_args=['messages'])
MESSAGES_PROCESSED = django.dispatch.Signal(providing_args=['messages'])
MESSAGES_DELETED = django.dispatch.Signal(providing_args=['messages'])


class WorkerService(object):
_PREFIX_STR = 'prefix:'

_RECEIVE_COUNT_ATTRIBUTE = 'ApproximateReceiveCount'

def process_queues(self, queue_names):
Expand Down Expand Up @@ -70,16 +74,21 @@ def process_messages(self, queues, worker, static_queues):
messages = self.poll_messages(queue)
logger.debug('[django-eb-sqs] Polled {} messages'.format(len(messages)))

msg_entries = []
MESSAGES_RECEIVED.send(sender=self.__class__, messages=messages)

msg_entries = []
for msg in messages:
self.process_message(msg, worker)
msg_entries.append({
'Id': msg.message_id,
'ReceiptHandle': msg.receipt_handle
})

MESSAGES_PROCESSED.send(sender=self.__class__, messages=messages)

self.delete_messages(queue, msg_entries)

MESSAGES_DELETED.send(sender=self.__class__, messages=messages)
except ClientError as exc:
error_code = exc.response.get('Error', {}).get('Code', None)
if error_code == 'AWS.SimpleQueueService.NonExistentQueue' and queue not in static_queues:
Expand Down

0 comments on commit 54f8227

Please sign in to comment.