Skip to content

Commit

Permalink
log entire message in case of re-queueing
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Tsitkin committed Jul 28, 2020
1 parent 7827083 commit 7383f09
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
2 changes: 1 addition & 1 deletion eb_sqs/tests/worker/tests_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def setUp(self):
def test_worker_execution(self):
msg = '{"id": "id-1", "retry": 0, "queue": "default", "maxRetries": 5, "args": [], "func": "eb_sqs.tests.worker.tests_worker.dummy_task", "kwargs": {"msg": "Hello World!"}}'

result = self.worker.execute(msg, 2)
result = self.worker.execute(msg)

self.assertEqual(result, 'Hello World!')

Expand Down
15 changes: 10 additions & 5 deletions eb_sqs/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ def process_messages(self, queues, worker, static_queues):
except ClientError as exc:
error_code = exc.response.get('Error', {}).get('Code', None)
if error_code == 'AWS.SimpleQueueService.NonExistentQueue' and queue not in static_queues:
logger.debug('[django-eb-sqs] Queue was already deleted {}: {}'.format(queue.url, exc), exc_info=1)
logger.debug('[django-eb-sqs] Queue was already deleted {}: {}'.format(queue.url, exc), exc_info=True)
else:
logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1)
logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=True)
except Exception as exc:
logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1)
logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=True)

def delete_messages(self, queue, msg_entries):
# type: (Queue, list) -> None
Expand Down Expand Up @@ -140,7 +140,12 @@ def _process_message(self, msg, worker):
try:
receive_count = int(msg.attributes[self._RECEIVE_COUNT_ATTRIBUTE])

worker.execute(msg.body, receive_count)
if receive_count > 1:
logger.warning('[django-eb-sqs] SQS re-queued message {} times - msg: {}'.format(
receive_count, msg.body
))

worker.execute(msg.body)

logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
except ExecutionFailedException as exc:
Expand All @@ -153,7 +158,7 @@ def _execute_user_code(function):
with django_db_management():
function()
except Exception as exc:
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=True)

def get_queues_by_names(self, sqs, queue_names):
# type: (ServiceResource, list) -> list
Expand Down
12 changes: 4 additions & 8 deletions eb_sqs/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@ def __init__(self, queue_client):
super(Worker, self).__init__()
self.queue_client = queue_client

def execute(self, msg, receive_count=1):
# type: (unicode, int) -> Any
def execute(self, msg):
# type: (unicode) -> Any
try:
worker_task = WorkerTask.deserialize(msg)

if receive_count > 1:
logger.warning('[django-eb-sqs] SQS re-queued message {} times - queue: {} func: {} retry: {} args: {}'.format(
receive_count, worker_task.queue, worker_task.abs_func_name, worker_task.retry, worker_task.args
))
except Exception as ex:
logger.exception(
'Message %s is not a valid worker task: %s',
Expand Down Expand Up @@ -120,7 +115,8 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr

raise QueueException()

def _execute_task(self, worker_task):
@classmethod
def _execute_task(cls, worker_task):
# type: (WorkerTask) -> Any
result = worker_task.execute()
return result

0 comments on commit 7383f09

Please sign in to comment.