Skip to content

Commit

Permalink
Merge pull request #33 from cuda-networks/fix_requeued_log
Browse files Browse the repository at this point in the history
print only relevant fields to log in case of SQS re-queueing
  • Loading branch information
alexeyts authored Apr 29, 2019
2 parents 2c5dff0 + 456dfb9 commit 2540ad8
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 12 deletions.
18 changes: 14 additions & 4 deletions eb_sqs/tests/worker/tests_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,44 +12,54 @@
from eb_sqs.worker.worker_exceptions import MaxRetriesReachedException
from eb_sqs.worker.worker_factory import WorkerFactory


class TestException(Exception):
pass


@task()
def dummy_task(msg):
return msg


@task(max_retries=100)
def retries_task(num_of_retries):
if retries_task.retry_num < num_of_retries:
retries_task.retry(execute_inline=True)


@task(max_retries=5)
def max_retries_task():
max_retries_task.retry(execute_inline=True)


@task(max_retries=100)
def repeating_group_task(num_of_retries):
if repeating_group_task.retry_num < num_of_retries:
repeating_group_task.retry(execute_inline=True)


@task()
def exception_group_task():
raise TestException()


@task(max_retries=100)
def exception_repeating_group_task(num_of_retries):
if exception_repeating_group_task.retry_num == num_of_retries:
raise TestException()
else:
exception_repeating_group_task.retry(execute_inline=True)


@task(max_retries=5)
def max_retries_group_task():
max_retries_group_task.retry(execute_inline=True)


global_group_mock = Mock()


class WorkerTest(TestCase):
def setUp(self):
settings.DEAD_LETTER_MODE = False
Expand All @@ -65,14 +75,14 @@ def setUp(self):

def setUpGroupsHandling(self):
self.group_set = set()
self.group_mock.add.side_effect = lambda task: self.group_set.add('{}-{}'.format(task.id, task.retry_id))
self.group_mock.remove.side_effect = lambda task: len(self.group_set) == 0 if self.group_set.discard(
'{}-{}'.format(task.id, task.retry_id)) is None else False
self.group_mock.add.side_effect = lambda tsk: self.group_set.add('{}-{}'.format(tsk.id, tsk.retry_id))
self.group_mock.remove.side_effect = lambda tsk: len(self.group_set) == 0 if self.group_set.discard(
'{}-{}'.format(tsk.id, tsk.retry_id)) is None else False

def test_worker_execution_no_group(self):
msg = '{"id": "id-1", "retry": 0, "queue": "default", "maxRetries": 5, "args": [], "func": "eb_sqs.tests.worker.tests_worker.dummy_task", "kwargs": {"msg": "Hello World!"}}'

result = self.worker.execute(msg)
result = self.worker.execute(msg, 2)

self.assertEqual(result, 'Hello World!')
self.group_mock.remove.assert_not_called()
Expand Down
6 changes: 1 addition & 5 deletions eb_sqs/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,9 @@ def process_message(self, msg, worker):
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
try:
receive_count = int(msg.attributes[self._RECEIVE_COUNT_ATTRIBUTE])
if receive_count > 1:
logger.warning('[django-eb-sqs] SQS re-queued message {} times: Msg Id: {} Body: {}'.format(
receive_count, msg.message_id, msg.body
))

with django_db_management():
worker.execute(msg.body)
worker.execute(msg.body, receive_count)

logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
except ExecutionFailedException as exc:
Expand Down
9 changes: 7 additions & 2 deletions eb_sqs/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ def __init__(self, queue_client, group_client):
self.queue_client = queue_client
self.group_client = group_client

def execute(self, msg):
# type: (unicode) -> Any
def execute(self, msg, receive_count=1):
# type: (unicode, int) -> Any
try:
worker_task = WorkerTask.deserialize(msg)

if receive_count > 1:
logger.warning('[django-eb-sqs] SQS re-queued message {} times - queue: {} func: {} retry: {}'.format(
receive_count, worker_task.queue, worker_task.func, worker_task.retry
))
except Exception as ex:
logger.exception(
'Message %s is not a valid worker task: %s',
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='django-eb-sqs',
version='1.14',
version='1.15',
package_dir={'eb_sqs': 'eb_sqs'},
include_package_data=True,
packages=find_packages(),
Expand Down

0 comments on commit 2540ad8

Please sign in to comment.