Skip to content

Commit

Permalink
BNCASB-2204: Initial changes to clean the project (removing group cli…
Browse files Browse the repository at this point in the history
…ent and unused code)

Initial changes to clean the project (removing group client and unused code)
  • Loading branch information
rohandev committed Jan 9, 2020
1 parent 98787b0 commit 15082db
Show file tree
Hide file tree
Showing 13 changed files with 24 additions and 469 deletions.
49 changes: 1 addition & 48 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,33 +67,6 @@ The retry call supports the `delay` and `execute_inline` arguments in order to d

**NOTE:** `retry()` throws a `MaxRetriesReachedException` exception if the maximum number of retries is reached.

#### Executing Tasks

The Elastic Beanstalk Worker Tier sends all tasks to a API endpoint. django-eb-sqs has already such an endpoint which can be used by specifying the url mapping in your `urls.py` file.

```python
urlpatterns = [
...
url(r'^worker/', include('eb_sqs.urls', namespace='eb_sqs'))
]
```

In that case the relative endpoint url would be: `worker/process`

Set this url in the Elastic Beanstalk Worker settings prior to deployment.

During development you can use the included Django command to execute a small script which retrieves messages from SQS and posts them to this endpoint.

```bash
python manage.py run_eb_sqs_worker --url <absoulte endpoint url> --queue <queue-name>
```

For example:

```bash
python manage.py run_eb_sqs_worker --url http://127.0.0.1:80/worker/process --queue default
```

#### Executing Tasks without Elastic Beanstalk

Another way of executing tasks is to use the Django command `process_queue`.
Expand All @@ -113,27 +86,7 @@ python manage.py process_queue --queues queue1,queue2 # process queue1 and queue
python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queue1, queue2 and any queue whose name starts with 'pr1-'
```

Use the signals `MESSAGES_RECEIVED`, `MESSAGES_PROCESSED`, `MESSAGES_DELETED` of the `WorkerService` to get informed about the current SQS batch being processed by the management command.

#### Group Tasks
Multiple tasks can be grouped by specifying the `group_id` argument when calling `delay` on a task.
If all tasks of a specific group are executed then the group callback task specified by `EB_SQS_GROUP_CALLBACK_TASK` is executed.

Example calls:
```python
echo.delay(message='Hello World!', group_id='1')
echo.delay(message='Hallo Welt!', group_id='1')
echo.delay(message='Hola mundo!', group_id='1')
```

Example callback which is executed when all three tasks are finished:
```python
from eb_sqs.decorators import task

@task(queue_name='test', max_retries=5)
def group_finished(group_id):
pass
```
Use the signals `MESSAGES_RECEIVED`, `MESSAGES_PROCESSED`, `MESSAGES_DELETED` of

#### Auto Tasks

Expand Down
79 changes: 0 additions & 79 deletions eb_sqs/management/commands/run_eb_sqs_worker.py

This file was deleted.

1 change: 0 additions & 1 deletion eb_sqs/redis/__init__.py

This file was deleted.

53 changes: 0 additions & 53 deletions eb_sqs/redis/redis_group_client.py

This file was deleted.

45 changes: 0 additions & 45 deletions eb_sqs/tests/tests_views.py

This file was deleted.

99 changes: 1 addition & 98 deletions eb_sqs/tests/worker/tests_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from eb_sqs import settings
from eb_sqs.decorators import task
from eb_sqs.worker.group_client import GroupClient
from eb_sqs.worker.queue_client import QueueClient
from eb_sqs.worker.worker import Worker
from eb_sqs.worker.worker_exceptions import MaxRetriesReachedException
Expand Down Expand Up @@ -65,35 +64,18 @@ def setUp(self):
settings.DEAD_LETTER_MODE = False

self.queue_mock = Mock(autospec=QueueClient)
self.group_mock = Mock(autospec=GroupClient)
self.group_mock.remove.return_value = True
self.worker = Worker(self.queue_mock, self.group_mock)
self.worker = Worker(self.queue_mock)

factory_mock = Mock(autospec=WorkerFactory)
factory_mock.create.return_value = self.worker
settings.WORKER_FACTORY = factory_mock

def setUpGroupsHandling(self):
self.group_set = set()
self.group_mock.add.side_effect = lambda tsk: self.group_set.add('{}-{}'.format(tsk.id, tsk.retry_id))
self.group_mock.remove.side_effect = lambda tsk: len(self.group_set) == 0 if self.group_set.discard(
'{}-{}'.format(tsk.id, tsk.retry_id)) is None else False

def test_worker_execution_no_group(self):
msg = '{"id": "id-1", "retry": 0, "queue": "default", "maxRetries": 5, "args": [], "func": "eb_sqs.tests.worker.tests_worker.dummy_task", "kwargs": {"msg": "Hello World!"}}'

result = self.worker.execute(msg, 2)

self.assertEqual(result, 'Hello World!')
self.group_mock.remove.assert_not_called()

def test_worker_execution_with_group(self):
msg = '{"id": "id-1", "groupId": "group-5", "retry": 0, "queue": "default", "maxRetries": 5, "args": [], "func": "eb_sqs.tests.worker.tests_worker.dummy_task", "kwargs": {"msg": "Hello World!"}}'

result = self.worker.execute(msg)

self.assertEqual(result, 'Hello World!')
self.group_mock.remove.assert_called_once()

def test_worker_execution_dead_letter_queue(self):
settings.DEAD_LETTER_MODE = True
Expand All @@ -103,28 +85,20 @@ def test_worker_execution_dead_letter_queue(self):
result = self.worker.execute(msg)

self.assertIsNone(result)
self.group_mock.remove.assert_called_once()

def test_delay(self):
self.worker.delay(None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 3, False)

self.group_mock.add.assert_not_called()
self.queue_mock.add_message.assert_called_once()
queue_delay = self.queue_mock.add_message.call_args[0][2]
self.assertEqual(queue_delay, 3)

def test_delay_inline(self):
result = self.worker.delay(None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 0, True)

self.group_mock.add.assert_not_called()
self.queue_mock.add_message.assert_not_called()
self.assertEqual(result, 'Hello World!')

def test_delay_with_group(self):
self.worker.delay('group-id', 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 3, False)

self.group_mock.add.assert_called_once()

def test_retry_max_reached_execution(self):
with self.assertRaises(MaxRetriesReachedException):
max_retries_task.delay(execute_inline=True)
Expand All @@ -133,74 +107,3 @@ def test_retry_no_limit(self):
retries_task.delay(10, execute_inline=True)

self.assertEqual(retries_task.retry_num, 10)

def test_group(self):
settings.GROUP_CALLBACK_TASK = Mock()

self.worker.delay('group-id', 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 0, True)

settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
settings.GROUP_CALLBACK_TASK = None

def test_group_with_exception(self):
settings.GROUP_CALLBACK_TASK = Mock()
self.setUpGroupsHandling()

with self.assertRaises(TestException):
exception_group_task.delay(group_id='group-id', execute_inline=True)

self.assertEqual(len(self.group_set), 0)
self.assertEqual(self.group_mock.add.call_count, 1)
self.assertEqual(self.group_mock.remove.call_count, 1)

settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
settings.GROUP_CALLBACK_TASK = None

def test_group_retries(self):
settings.GROUP_CALLBACK_TASK = Mock()
self.setUpGroupsHandling()

repeating_group_task.delay(3, group_id='group-id', execute_inline=True)

self.assertEqual(len(self.group_set), 0)
self.assertEqual(self.group_mock.add.call_count, 4)
self.assertEqual(self.group_mock.remove.call_count, 4)

settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
settings.GROUP_CALLBACK_TASK = None

def test_group_exception_in_retries(self):
settings.GROUP_CALLBACK_TASK = Mock()
self.setUpGroupsHandling()

with self.assertRaises(TestException):
exception_repeating_group_task.delay(2, group_id='group-id', execute_inline=True)

self.assertEqual(len(self.group_set), 0)
self.assertEqual(self.group_mock.add.call_count, 3)
self.assertEqual(self.group_mock.remove.call_count, 3)

settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
settings.GROUP_CALLBACK_TASK = None

def test_group_match_retries_reached(self):
settings.GROUP_CALLBACK_TASK = Mock()
self.setUpGroupsHandling()

with self.assertRaises(MaxRetriesReachedException):
max_retries_group_task.delay(group_id='group-id', execute_inline=True)

self.assertEqual(len(self.group_set), 0)
self.assertEqual(self.group_mock.add.call_count, 5)
self.assertEqual(self.group_mock.remove.call_count, 5)

settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
settings.GROUP_CALLBACK_TASK = None

def test_group_callback_string(self):
settings.GROUP_CALLBACK_TASK = 'eb_sqs.tests.worker.tests_worker.global_group_mock'

self.worker.delay('group-id', 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 0, True)

global_group_mock.delay.assert_called_once()
settings.GROUP_CALLBACK_TASK = None
Loading

0 comments on commit 15082db

Please sign in to comment.