Skip to content

Commit

Permalink
Merge pull request #36 from cuda-networks/auto_tasks
Browse files Browse the repository at this point in the history
Auto tasks
  • Loading branch information
alexeyts authored Jul 17, 2019
2 parents 77ab42c + a60ff8f commit 07f2940
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 1 deletion.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,32 @@ def group_finished(group_id):
pass
```

#### Auto Tasks

This is a helper tool for the case you wish to define one of your class method as a task, and make it seamless to all callers.
This makes the code much simpler, and allows using classes to invoke your method directly without considering whether it's invoked async or not.

This is how you would define your class:
```python
class MyService:
def __init__(self, p1=default1, ..., pN=defaultN, auto_task_service=None):
self._auto_task_service = auto_task_service or AutoTaskService()

self._auto_task_service.register_task(self.my_task_method)

def my_task_method(self, *args, **kwargs):
...

```

Notice the following:
1. Your class needs to have defaults for all parameters in the c'tor
2. The c'tor must have a parameter named `auto_task_service`
3. The method shouldn't have any return value (as it's invoked async)

In case you want your method to retry certain cases, you need to raise `RetryableTaskException`.
You can provide on optional `delay` time for the retry, set `count_retries=False` in case you don't want to limit retries, or use `max_retries_func` to specify a function which will be invoked when the defined maximum number of retries is exhausted.

#### Settings

The following settings can be used to fine tune django-eb-sqs. Copy them into your Django `settings.py` file.
Expand Down
Empty file added eb_sqs/auto_tasks/__init__.py
Empty file.
24 changes: 24 additions & 0 deletions eb_sqs/auto_tasks/base_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from abc import ABCMeta, abstractmethod


class BaseAutoTaskService:
__metaclass__ = ABCMeta

@abstractmethod
def register_task(self, method, queue_name=None, max_retries=None):
# type: (Any, str, int) -> None
pass


class NoopTaskService(BaseAutoTaskService):
def __init__(self):
# type: () -> None
self._registered_func_names = []

def register_task(self, method, queue_name=None, max_retries=None):
# type: (Any, str, int) -> None
self._registered_func_names.append(method.__name__)

def is_func_name_registered(self, func_name):
# type: (str) -> bool
return func_name in self._registered_func_names
12 changes: 12 additions & 0 deletions eb_sqs/auto_tasks/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class RetryableTaskException(Exception):
def __init__(self, inner, delay=None, count_retries=None, max_retries_func=None):
# type: (Exception, int, bool, Any) -> None
self._inner = inner

self.delay = delay
self.count_retries = count_retries
self.max_retries_func = max_retries_func

def __repr__(self):
# type: () -> str
return repr(self._inner)
81 changes: 81 additions & 0 deletions eb_sqs/auto_tasks/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import importlib
import logging

from eb_sqs.auto_tasks.base_service import BaseAutoTaskService, NoopTaskService
from eb_sqs.auto_tasks.exceptions import RetryableTaskException
from eb_sqs.decorators import task
from eb_sqs.worker.worker_exceptions import MaxRetriesReachedException

logger = logging.getLogger(__name__)


@task()
def _auto_task_wrapper(module_name, class_name, func_name, *args, **kwargs):
try:
logger.debug(
'Invoke _auto_task_wrapper with module: %s class: %s func: %s args: %s and kwargs: %s',
module_name,
class_name,
func_name,
args,
kwargs
)

module = importlib.import_module(module_name) # import module
class_ = getattr(module, class_name) # find class

noop_task_service = NoopTaskService()
instance = class_(auto_task_service=noop_task_service) # instantiate class using NoopTaskService

if noop_task_service.is_func_name_registered(func_name):
getattr(instance, func_name)(*args, **kwargs) # invoke method on instance
else:
logger.error(
'Trying to invoke _auto_task_wrapper for unregistered task with module: %s class: %s func: %s args: %s and kwargs: %s',
module_name,
class_name,
func_name,
args,
kwargs
)
except RetryableTaskException as exc:
try:
retry_kwargs = {}

if exc.delay is not None:
retry_kwargs['delay'] = exc.delay

if exc.count_retries is not None:
retry_kwargs['count_retries'] = exc.count_retries

_auto_task_wrapper.retry(**retry_kwargs)
except MaxRetriesReachedException:
if exc.max_retries_func:
exc.max_retries_func()
else:
# by default log an error
logger.error('Reached max retries in auto task {}.{}.{} with error: {}'.format(module_name, class_name, func_name, repr(exc)))


class AutoTaskService(BaseAutoTaskService):
def register_task(self, method, queue_name=None, max_retries=None):
# type: (Any, str, int) -> None
instance = method.__self__
class_ = instance.__class__
func_name = method.__name__

def _auto_task_wrapper_invoker(*args, **kwargs):
if queue_name is not None:
kwargs['queue_name'] = queue_name

if max_retries is not None:
kwargs['max_retries'] = max_retries

_auto_task_wrapper.delay(
class_.__module__,
class_.__name__,
func_name,
*args, **kwargs
)

setattr(instance, func_name, _auto_task_wrapper_invoker)
Empty file.
63 changes: 63 additions & 0 deletions eb_sqs/tests/auto_tasks/tests_auto_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from unittest import TestCase

from mock import Mock, call

from eb_sqs import settings
from eb_sqs.auto_tasks.exceptions import RetryableTaskException
from eb_sqs.auto_tasks.service import AutoTaskService, _auto_task_wrapper


class TestService:
_TEST_MOCK = Mock()
_MAX_RETRY_NUM = 5

def __init__(self, auto_task_service=None):
self._auto_task_service = auto_task_service or AutoTaskService()
self._auto_task_service.register_task(self.task_method)
self._auto_task_service.register_task(self.task_retry_method, max_retries=self._MAX_RETRY_NUM)

def task_method(self, *args, **kwargs):
self._TEST_MOCK.task_method(*args, **kwargs)

def task_retry_method(self, *args, **kwargs):
self._TEST_MOCK.task_retry_method(*args, **kwargs)

def max_retry_fun():
self._TEST_MOCK.task_max_retry_method(*args, **kwargs)

raise RetryableTaskException(Exception('Test'), max_retries_func=max_retry_fun)

def non_task_method(self):
self._TEST_MOCK.non_task_method()


class AutoTasksTest(TestCase):
def setUp(self):
self._test_service = TestService()

self._args = [5, '6']
self._kwargs = {'p1': 'bla', 'p2': 130}

settings.EXECUTE_INLINE = True

def test_task_method(self):
self._test_service.task_method(*self._args, **self._kwargs)

TestService._TEST_MOCK.task_method.assert_called_once_with(*self._args, **self._kwargs)

def test_task_retry_method(self):
self._test_service.task_retry_method(*self._args, **self._kwargs)

TestService._TEST_MOCK.task_retry_method.assert_has_calls([call(*self._args, **self._kwargs)] * TestService._MAX_RETRY_NUM)

TestService._TEST_MOCK.task_max_retry_method.assert_called_once_with(*self._args, **self._kwargs)

def test_non_task_method(self):
_auto_task_wrapper.delay(
self._test_service.__class__.__module__,
self._test_service.__class__.__name__,
TestService.non_task_method.__name__,
execute_inline=True
)

TestService._TEST_MOCK.non_task_method.assert_not_called()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='django-eb-sqs',
version='1.20',
version='1.30',
package_dir={'eb_sqs': 'eb_sqs'},
include_package_data=True,
packages=find_packages(),
Expand Down

0 comments on commit 07f2940

Please sign in to comment.