Skip to content

Commit

Permalink
Task is removed from group during retry
Browse files Browse the repository at this point in the history
  • Loading branch information
Schweigi committed Jun 24, 2016
1 parent 8d7de7f commit 5dc88d0
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 31 deletions.
6 changes: 4 additions & 2 deletions eb_sqs/tests/worker/tests_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ def repeating_group_task(count):
@task(max_retries=5)
def max_retries_group_task():
repeating_group_task.delay(3, group_id='group-id', execute_inline=True)
max_retries_group_task.retry()
max_retries_group_task.retry(execute_inline=True)

global_group_mock = Mock()

class WorkerTest(TestCase):
def setUp(self):
settings.FORCE_SERIALIZATION = True
settings.DEAD_LETTER_MODE = False

self.queue_mock = Mock(autospec=QueueClient)
Expand Down Expand Up @@ -129,7 +130,8 @@ def test_group_match_retries_reached(self):
self.group_mock.remove.side_effect = lambda task: len(group_set) == 0 if group_set.discard(
task.id) is None else False

max_retries_group_task.delay(group_id='group-id', execute_inline=True)
with self.assertRaises(MaxRetriesReachedException):
max_retries_group_task.delay(group_id='group-id', execute_inline=True)

settings.GROUP_CALLBACK_TASK.delay.assert_called_once()

Expand Down
70 changes: 42 additions & 28 deletions eb_sqs/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def execute(self, msg):
worker_task.id,
)

self._group_callback(worker_task)
self._remove_from_group(worker_task)
else:
logger.info(
'Execute task %s (%s) with args: %s and kwargs: %s',
Expand Down Expand Up @@ -83,20 +83,15 @@ def retry(self, worker_task, delay, execute_inline, count_retries):
def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retries):
# type: (WorkerTask, int, bool, bool, bool) -> Any
try:
if is_retry and count_retries:
worker_task.retry += 1
if worker_task.retry > worker_task.max_retries:
self._group_callback(worker_task)
raise MaxRetriesReachedException(worker_task.retry)
if is_retry:
if count_retries:
worker_task.retry += 1
if worker_task.retry > worker_task.max_retries:
self._remove_from_group(worker_task)
raise MaxRetriesReachedException(worker_task.retry)
worker_task.retry_scheduled = True

if worker_task.group_id:
logger.info(
'Add task %s (%s) to group %s',
worker_task.abs_func_name,
worker_task.id,
worker_task.group_id,
)
self.group_client.add(worker_task)
self._add_to_group(worker_task)

logger.info('%s task %s (%s): %s, %s (%s%s)',
'Retrying' if is_retry else 'Delaying',
Expand All @@ -106,6 +101,7 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr
worker_task.kwargs,
worker_task.queue,
', inline' if execute_inline else '')

if execute_inline:
if settings.FORCE_SERIALIZATION:
return self._execute_task(WorkerTask.deserialize(worker_task.serialize()))
Expand All @@ -114,11 +110,13 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr
else:
self.queue_client.add_message(worker_task.queue, worker_task.serialize(), delay)
return None
except MaxRetriesReachedException:
raise
except QueueDoesNotExistException as ex:
self._group_callback(worker_task)
self._remove_from_group(worker_task)
raise InvalidQueueException(ex.queue_name)
except QueueClientException as ex:
self._group_callback(worker_task)
self._remove_from_group(worker_task)

logger.exception('Task %s (%s) failed to enqueue to %s: %s',
worker_task.abs_func_name,
Expand All @@ -128,28 +126,44 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr

raise QueueException()
except Exception:
self._group_callback(worker_task)
self._remove_from_group(worker_task)
raise

def _execute_task(self, worker_task):
# type: (WorkerTask) -> Any
worker_task.retry_scheduled = False
result = worker_task.execute()
self._group_callback(worker_task)
self._remove_from_group(worker_task)
return result

def _group_callback(self, worker_task):
def _add_to_group(self, worker_task):
# type: (WorkerTask) -> None
if not worker_task.group_id:
return
if worker_task.group_id and not worker_task.retry_scheduled:
logger.info(
'Add task %s (%s) to group %s',
worker_task.abs_func_name,
worker_task.id,
worker_task.group_id,
)

self.group_client.add(worker_task)

def _remove_from_group(self, worker_task):
# type: (WorkerTask) -> None
if worker_task.group_id and not worker_task.retry_scheduled:
logger.info(
'Remove task %s (%s) from group %s',
worker_task.abs_func_name,
worker_task.id,
worker_task.group_id,
)

logger.info(
'Remove task %s (%s) from group %s',
worker_task.abs_func_name,
worker_task.id,
worker_task.group_id,
)
if self.group_client.remove(worker_task):
self._execute_group_callback(worker_task)

if self.group_client.remove(worker_task) and settings.GROUP_CALLBACK_TASK:
def _execute_group_callback(self, worker_task):
# type: (WorkerTask) -> None
if settings.GROUP_CALLBACK_TASK:
callback = settings.GROUP_CALLBACK_TASK

if isinstance(callback, basestring):
Expand Down
1 change: 1 addition & 0 deletions eb_sqs/worker/worker_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(self, id, group_id, queue, func, args, kwargs, max_retries, retry,
self.use_pickle = use_pickle

self.abs_func_name = '{}.{}'.format(self.func.__module__, self.func.func_name)
self.retry_scheduled = False

def execute(self):
# type: () -> Any
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='django-eb-sqs',
version='0.91',
version='0.92',
package_dir={'eb_sqs': 'eb_sqs'},
include_package_data=True,
packages=find_packages(),
Expand Down

0 comments on commit 5dc88d0

Please sign in to comment.