Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: cuda-networks/django-eb-sqs
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.35
Choose a base ref
...
head repository: cuda-networks/django-eb-sqs
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref

Commits on Jan 6, 2020

  1. Copy the full SHA
    17ba8b3 View commit details

Commits on Jan 7, 2020

  1. Copy the full SHA
    1922685 View commit details
  2. Copy the full SHA
    901c2f4 View commit details
  3. Copy the full SHA
    07aed48 View commit details
  4. Copy the full SHA
    5e1c811 View commit details
  5. BNCASB-2204: Remove caching in circleci job.

    Pipenv fails to resolve few sub-depedencies
    rohandev committed Jan 7, 2020
    Copy the full SHA
    fa0a5c6 View commit details
  6. Copy the full SHA
    d1ed43b View commit details

Commits on Jan 8, 2020

  1. BNCASB-2204: Changes for pushing the project to PyPI

    Initial changes to make project ready to push to PyPI
    rohandev committed Jan 8, 2020
    Copy the full SHA
    9b18d8f View commit details
  2. Copy the full SHA
    3ae55a8 View commit details
  3. Merge pull request #43 from cuda-networks/BNCASB-2204-add-support-cir…

    …cleci
    
    BNCASB: 2204 add support circleci
    rohandev authored Jan 8, 2020
    Copy the full SHA
    0d58f25 View commit details
  4. Copy the full SHA
    3b57d74 View commit details
  5. Merge pull request #44 from cuda-networks/BNCASB-2204-changes-for-PyP…

    …I-package
    
    BNCASB-2204: Changes for pushing the project to PyPI
    rohandev authored Jan 8, 2020
    Copy the full SHA
    44f592b View commit details

Commits on Jan 9, 2020

  1. Copy the full SHA
    87fcd7a View commit details
  2. Merge pull request #45 from cuda-networks/BNCASB-2204-add-support-cir…

    …cleci
    
    BNCASB-2204: Update python image versions
    rohandev authored Jan 9, 2020
    Copy the full SHA
    98787b0 View commit details
  3. BNCASB-2204: Initial changes to clean the project (removing group cli…

    …ent and unused code)
    
    Initial changes to clean the project (removing group client and unused code)
    rohandev committed Jan 9, 2020
    Copy the full SHA
    15082db View commit details
  4. Copy the full SHA
    0ef48a2 View commit details
  5. Copy the full SHA
    0a84c9f View commit details

Commits on Jan 10, 2020

  1. Copy the full SHA
    912a8df View commit details
  2. BNCASB-2204: Updating the project version number

    Also resolving PR comments.
    rohandev committed Jan 10, 2020
    Copy the full SHA
    2212d14 View commit details

Commits on Jan 13, 2020

  1. Merge pull request #47 from cuda-networks/BNCASB-2204-cleanup-eb-sqs

    BNCASB-2204: Initial changes to clean the project (removing group client and unused code)
    rohandev authored Jan 13, 2020
    Copy the full SHA
    1911786 View commit details

Commits on Jan 14, 2020

  1. Copy the full SHA
    609ed34 View commit details
  2. Copy the full SHA
    d6210ad View commit details
  3. Bump the version to 1.36

    rohandev committed Jan 14, 2020
    Copy the full SHA
    873433f View commit details
  4. Copy the full SHA
    750259f View commit details

Commits on Jan 15, 2020

  1. BNCASB-2204: Update the ReadME.md file to include badges

    Inlcude badges
    rohandev committed Jan 15, 2020
    Copy the full SHA
    ab6ffc8 View commit details
  2. Copy the full SHA
    5547ff1 View commit details
  3. Copy the full SHA
    142501d View commit details
  4. Copy the full SHA
    0016da2 View commit details

Commits on Jan 16, 2020

  1. Copy the full SHA
    d4908e6 View commit details

Commits on Jan 17, 2020

  1. Copy the full SHA
    1759c1f View commit details

Commits on Jan 18, 2020

  1. BNCASB-2204: Split the update the script into two separate helper scr…

    …ipts.
    
    update-version script to update the package version
    publish-pypi script to update the publish the package to PyPI
    rohandev committed Jan 18, 2020
    Copy the full SHA
    e859b08 View commit details
  2. Copy the full SHA
    7ebed70 View commit details

Commits on Jan 20, 2020

  1. Merge pull request #48 from cuda-networks/BNCASB-2204-build-PyPI-package

    Bncasb 2204 build py pi package
    rohandev authored Jan 20, 2020
    Copy the full SHA
    fd0c493 View commit details

Commits on Jan 21, 2020

  1. Bump the version to 1.37

    rohandev committed Jan 21, 2020
    Copy the full SHA
    1a99416 View commit details

Commits on Jan 29, 2020

  1. Copy the full SHA
    80894cf View commit details
  2. Merge pull request #49 from cuda-networks/BNCASB-2204-add-PyPI-badge

    BNCASB-2204: Adding PyPI version badge to README
    rohandev authored Jan 29, 2020
    Copy the full SHA
    8e60a29 View commit details
  3. Bump the version to 1.38

    rohandev committed Jan 29, 2020
    Copy the full SHA
    6a507fc View commit details

Commits on Feb 24, 2020

  1. fix PyPI link

    alexeyts authored Feb 24, 2020
    Copy the full SHA
    5546d59 View commit details

Commits on Jun 16, 2020

  1. Copy the full SHA
    71fec89 View commit details

Commits on Jun 29, 2020

  1. make default prefix empty, and minor cleanups (#52)

    make default prefix empty, and minor readme & test cleanups
    alexeyts authored Jun 29, 2020
    Copy the full SHA
    d131eb6 View commit details
  2. Bump the version to 1.39

    Alexey Tsitkin committed Jun 29, 2020
    Copy the full SHA
    5bf8b2b View commit details
  3. adding git ignore stuff

    Alexey Tsitkin committed Jun 29, 2020
    Copy the full SHA
    7827083 View commit details

Commits on Jul 28, 2020

  1. log entire message in case of re-queueing

    Alexey Tsitkin committed Jul 28, 2020
    Copy the full SHA
    7383f09 View commit details
  2. Merge pull request #53 from cuda-networks/log_requeue_message

    log entire message in case of re-queueing
    alexeyts authored Jul 28, 2020
    Copy the full SHA
    559224c View commit details
  3. Bump the version to 1.40

    Alexey Tsitkin committed Jul 28, 2020
    Copy the full SHA
    e9e3295 View commit details

Commits on Aug 20, 2020

  1. Fix the healthcheck

    Currently we update the health check file on end of process messages
    instead now we update the health check file after processing each queue
    rohandev committed Aug 20, 2020
    Copy the full SHA
    838d348 View commit details
  2. Copy the full SHA
    2dcce5a View commit details
  3. PR review comments

    rohandev committed Aug 20, 2020
    Copy the full SHA
    3ef04ec View commit details
  4. Merge pull request #54 from cuda-networks/fix-health-check

    Fix the healthcheck
    rohandev authored Aug 20, 2020
    Copy the full SHA
    f268442 View commit details
  5. Bump the version to 1.41

    rohandev committed Aug 20, 2020
    Copy the full SHA
    baa9da5 View commit details
24 changes: 24 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: 2.1
workflows:
version: 2
eb-sqs-jobs:
jobs:
- test-python-3-9
jobs:
test-python-3-9:
docker:
- image: cimg/python:3.9.16
steps:
- add_ssh_keys
- checkout
- run:
name: Install pip packages
command: |
python3 -m venv venv
. venv/bin/activate
pip install -r development.txt
- run:
name: Run tests
command: |
. venv/bin/activate
python -m django test --settings=eb_sqs.test_settings
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -10,3 +10,7 @@
# Ignore EGG files
*.egg-info
*.egg

# Ignore built files for PyPi
build/
dist/
File renamed without changes.
72 changes: 13 additions & 59 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
## Django EB SQS - Background Tasks for Elastic Beanstalk and Amazon SQS

django-eb-sqs is a simple task manager for the Elastic Beanstalk Worker Tier. It uses SQS and the [boto3](https://github.com/boto/boto3) library.
[![PyPI version](https://img.shields.io/pypi/v/django-eb-sqs)](https://pypi.org/project/django-eb-sqs/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![CircleCI](https://img.shields.io/circleci/build/github/cuda-networks/django-eb-sqs/master)](https://circleci.com/gh/cuda-networks/django-eb-sqs/tree/master)

# Django EB SQS - Background Tasks for Amazon SQS

django-eb-sqs is a simple task manager for AWS SQS. It uses SQS and the [boto3](https://github.com/boto/boto3) library.

### Installation

@@ -31,7 +36,7 @@ echo.delay(message='Hello World!')
```
**NOTE:** This assumes that you have your AWS keys in the appropriate environment variables, or are using IAM roles. Consult the `boto3` [documentation](https://boto3.readthedocs.org/en/latest/) for further info.

If you don't pass a queue name, the `EB_SQS_DEFAULT_QUEUE` setting is used. If not set, the queue name is `default`.
If you don't pass a queue name, the `EB_SQS_DEFAULT_QUEUE` setting is used. If not set, the queue name is `eb-sqs-default`.

Additionally the task decorator supports `max_retries` (default `0`) and `use_pickle` (default `False`) attributes for advanced control task execution.

@@ -69,42 +74,13 @@ The retry call supports the `delay` and `execute_inline` arguments in order to d

#### Executing Tasks

The Elastic Beanstalk Worker Tier sends all tasks to a API endpoint. django-eb-sqs has already such an endpoint which can be used by specifying the url mapping in your `urls.py` file.

```python
urlpatterns = [
...
url(r'^worker/', include('eb_sqs.urls', namespace='eb_sqs'))
]
```

In that case the relative endpoint url would be: `worker/process`

Set this url in the Elastic Beanstalk Worker settings prior to deployment.

During development you can use the included Django command to execute a small script which retrieves messages from SQS and posts them to this endpoint.

```bash
python manage.py run_eb_sqs_worker --url <absoulte endpoint url> --queue <queue-name>
```

For example:

```bash
python manage.py run_eb_sqs_worker --url http://127.0.0.1:80/worker/process --queue default
```

#### Executing Tasks without Elastic Beanstalk

Another way of executing tasks is to use the Django command `process_queue`.
In order to execute tasks, use the Django command `process_queue`.
This command can work with one or more queues, reading from the queues infinitely and executing tasks as they come-in.

```bash
python manage.py process_queue --queues <comma-delimited list of queue names>
```

This is a good idea for someone who wants to execute tasks without an Elastic Beanstalk worker.

You can either use full queue names, or queue prefix using `prefix:*my_example_prefix*` notation.

Examples:
@@ -115,33 +91,15 @@ python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queu

Use the signals `MESSAGES_RECEIVED`, `MESSAGES_PROCESSED`, `MESSAGES_DELETED` of the `WorkerService` to get informed about the current SQS batch being processed by the management command.

#### Group Tasks
Multiple tasks can be grouped by specifying the `group_id` argument when calling `delay` on a task.
If all tasks of a specific group are executed then the group callback task specified by `EB_SQS_GROUP_CALLBACK_TASK` is executed.

Example calls:
```python
echo.delay(message='Hello World!', group_id='1')
echo.delay(message='Hallo Welt!', group_id='1')
echo.delay(message='Hola mundo!', group_id='1')
```

Example callback which is executed when all three tasks are finished:
```python
from eb_sqs.decorators import task

@task(queue_name='test', max_retries=5)
def group_finished(group_id):
pass
```

#### Auto Tasks

This is a helper tool for the case you wish to define one of your class method as a task, and make it seamless to all callers.
This makes the code much simpler, and allows using classes to invoke your method directly without considering whether it's invoked async or not.

This is how you would define your class:
```python
from eb_sqs.auto_tasks.service import AutoTaskService

class MyService:
def __init__(self, p1=default1, ..., pN=defaultN, auto_task_service=None):
self._auto_task_service = auto_task_service or AutoTaskService()
@@ -176,14 +134,10 @@ The following settings can be used to fine tune django-eb-sqs. Copy them into yo
- EB_SQS_DEFAULT_DELAY (`0`): Default task delay time in seconds.
- EB_SQS_DEFAULT_MAX_RETRIES (`0`): Default retry limit for all tasks.
- EB_SQS_DEFAULT_COUNT_RETRIES (`True`): Count retry calls. Needed if max retries check shall be executed.
- EB_SQS_DEFAULT_QUEUE (`default`): Default queue name if none is specified when creating a task.
- EB_SQS_DEFAULT_QUEUE (`eb-sqs-default`): Default queue name if none is specified when creating a task.
- EB_SQS_EXECUTE_INLINE (`False`): Execute tasks immediately without using SQS. Useful during development. Global setting `True` will override setting it on a task level.
- EB_SQS_FORCE_SERIALIZATION (`False`): Forces serialization of tasks when executed `inline`. This setting is helpful during development to see if all arguments are serialized and deserialized properly.
- EB_SQS_GROUP_CALLBACK_TASK (`None`): Group callback (String or Function). Must be a valid task.
- EB_SQS_QUEUE_PREFIX (`eb-sqs-`): Prefix to use for the queues. The prefix is added to the queue name.
- EB_SQS_REDIS_CLIENT (`None`): Set the Redis connection client (`StrictRedis`)
- EB_SQS_REDIS_EXPIRY (`604800`): Default expiry time in seconds until a group is removed
- EB_SQS_REDIS_KEY_PREFIX (`eb-sqs-`): Prefix used for all Redis keys
- EB_SQS_QUEUE_PREFIX (``): Prefix to use for the queues. The prefix is added to the queue name.
- EB_SQS_USE_PICKLE (`False`): Enable to use `pickle` to serialize task parameters. Uses `json` as default.
- EB_SQS_AWS_MAX_RETRIES (`30`): Default retry limit on a boto3 call to AWS SQS.
- EB_SQS_REFRESH_PREFIX_QUEUES_S (`10`): Minimal number of seconds to wait between refreshing queue list, in case prefix is used
1 change: 1 addition & 0 deletions VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.44
11 changes: 5 additions & 6 deletions development.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
boto3==1.9.86
Django==1.10.6
mock==2.0.0
moto==1.3.13
redis==2.10.5
requests==2.10.0
boto3==1.26.99
Django==4.1.7
mock==5.0.1
moto==4.1.6
requests==2.28.2
9 changes: 5 additions & 4 deletions eb_sqs/auto_tasks/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from typing import Any


class RetryableTaskException(Exception):
def __init__(self, inner, delay=None, count_retries=None, max_retries_func=None):
# type: (Exception, int, bool, Any) -> None
def __init__(self, inner: Exception, delay: int = None, count_retries: bool = None, max_retries_func: Any = None):
self._inner = inner

self.delay = delay
self.count_retries = count_retries
self.max_retries_func = max_retries_func

def __repr__(self):
# type: () -> str
def __repr__(self) -> str:
return repr(self._inner)
18 changes: 8 additions & 10 deletions eb_sqs/auto_tasks/service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import importlib
import logging
from typing import Any

from eb_sqs.auto_tasks.exceptions import RetryableTaskException
from eb_sqs.decorators import task
@@ -24,7 +25,7 @@ def _auto_task_wrapper(module_name, class_name, func_name, *args, **kwargs):
class_ = getattr(module, class_name) # find class

auto_task_executor_service = _AutoTaskExecutorService(func_name)
instance = class_(auto_task_service=auto_task_executor_service) # instantiate class using _AutoTaskExecutorService
instance = class_(auto_task_service=auto_task_executor_service)

executor_func_name = auto_task_executor_service.get_executor_func_name()
if executor_func_name:
@@ -54,12 +55,12 @@ def _auto_task_wrapper(module_name, class_name, func_name, *args, **kwargs):
exc.max_retries_func()
else:
# by default log an error
logger.error('Reached max retries in auto task {}.{}.{} with error: {}'.format(module_name, class_name, func_name, repr(exc)))
logger.error('Reached max retries in auto task {}.{}.{} with error: {}'.format(module_name, class_name,
func_name, repr(exc)))


class AutoTaskService(object):
def register_task(self, method, queue_name=None, max_retries=None):
# type: (Any, str, int) -> None
def register_task(self, method: Any, queue_name: str = None, max_retries: int = None):
instance = method.__self__
class_ = instance.__class__
func_name = method.__name__
@@ -82,14 +83,12 @@ def _auto_task_wrapper_invoker(*args, **kwargs):


class _AutoTaskExecutorService(AutoTaskService):
def __init__(self, func_name):
# type: (str) -> None
def __init__(self, func_name: str):
self._func_name = func_name

self._executor_func_name = None

def register_task(self, method, queue_name=None, max_retries=None):
# type: (Any, str, int) -> None
def register_task(self, method: Any, queue_name: str = None, max_retries: int = None):
if self._func_name == method.__name__:
# circuit breaker to allow actually executing the method once
instance = method.__self__
@@ -99,6 +98,5 @@ def register_task(self, method, queue_name=None, max_retries=None):

super(_AutoTaskExecutorService, self).register_task(method, queue_name, max_retries)

def get_executor_func_name(self):
# type: () -> str
def get_executor_func_name(self) -> str:
return self._executor_func_name
15 changes: 5 additions & 10 deletions eb_sqs/aws/sqs_queue_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import absolute_import, unicode_literals
from typing import Any

import boto3
from botocore.config import Config
@@ -10,15 +10,13 @@

class SqsQueueClient(QueueClient):
def __init__(self):
# type: () -> None
self.sqs = boto3.resource('sqs',
region_name=settings.AWS_REGION,
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
)
self.queue_cache = {}

def _get_queue(self, queue_name, use_cache=True):
# type: (unicode, bool) -> Any
def _get_queue(self, queue_name: str, use_cache: bool = True) -> Any:
full_queue_name = '{}{}'.format(settings.QUEUE_PREFIX, queue_name)

queue = self._get_sqs_queue(full_queue_name, use_cache)
@@ -27,8 +25,7 @@ def _get_queue(self, queue_name, use_cache=True):

return queue

def _get_sqs_queue(self, queue_name, use_cache):
# type: (unicode, bool) -> Any
def _get_sqs_queue(self, queue_name: str, use_cache: bool) -> Any:
if use_cache and self.queue_cache.get(queue_name):
return self.queue_cache[queue_name]

@@ -43,8 +40,7 @@ def _get_sqs_queue(self, queue_name, use_cache):
else:
raise ex

def _add_sqs_queue(self, queue_name):
# type: (unicode) -> Any
def _add_sqs_queue(self, queue_name: str) -> Any:
if settings.AUTO_ADD_QUEUE:
queue = self.sqs.create_queue(
QueueName=queue_name,
@@ -58,8 +54,7 @@ def _add_sqs_queue(self, queue_name):
else:
raise QueueDoesNotExistException(queue_name)

def add_message(self, queue_name, msg, delay):
# type: (unicode, unicode, int) -> None
def add_message(self, queue_name: str, msg: str, delay: int):
try:
queue = self._get_queue(queue_name)
try:
23 changes: 8 additions & 15 deletions eb_sqs/decorators.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
from __future__ import absolute_import, unicode_literals
from typing import Any

from eb_sqs import settings
from eb_sqs.worker.worker_factory import WorkerFactory
from eb_sqs.worker.worker_task import WorkerTask


def _get_kwarg_val(kwargs, key, default):
# type: (dict, str, Any) -> Any
def _get_kwarg_val(kwargs: dict, key: str, default: Any) -> Any:
return kwargs.pop(key, default) if kwargs else default


def func_delay_decorator(func, queue_name, max_retries_count, use_pickle):
# type: (Any, str, int, bool) -> (tuple, dict)
def wrapper(*args, **kwargs):
# type: (tuple, dict) -> Any
def func_delay_decorator(func: Any, queue_name: str, max_retries_count: int, use_pickle: bool) -> (tuple, dict):
def wrapper(*args: tuple, **kwargs: dict) -> Any:
queue = _get_kwarg_val(kwargs, 'queue_name', queue_name if queue_name else settings.DEFAULT_QUEUE)
max_retries = _get_kwarg_val(kwargs, 'max_retries', max_retries_count if max_retries_count else settings.DEFAULT_MAX_RETRIES)
pickle = _get_kwarg_val(kwargs, 'use_pickle', use_pickle if use_pickle else settings.USE_PICKLE)
@@ -28,10 +25,8 @@ def wrapper(*args, **kwargs):
return wrapper


def func_retry_decorator(worker_task):
# type: (WorkerTask) -> (tuple, dict)
def wrapper(*args, **kwargs):
# type: (tuple, dict) -> Any
def func_retry_decorator(worker_task: WorkerTask) -> (tuple, dict):
def wrapper(*args: tuple, **kwargs: dict) -> Any:
execute_inline = _get_kwarg_val(kwargs, 'execute_inline', False) or settings.EXECUTE_INLINE
delay = _get_kwarg_val(kwargs, 'delay', settings.DEFAULT_DELAY)
count_retries = _get_kwarg_val(kwargs, 'count_retries', settings.DEFAULT_COUNT_RETRIES)
@@ -42,14 +37,12 @@ def wrapper(*args, **kwargs):


class task(object):
def __init__(self, queue_name=None, max_retries=None, use_pickle=None):
# type: (str, int, bool) -> None
def __init__(self, queue_name: str = None, max_retries: int = None, use_pickle: bool = None):
self.queue_name = queue_name
self.max_retries = max_retries
self.use_pickle = use_pickle

def __call__(self, *args, **kwargs):
# type: (tuple, dict) -> Any
def __call__(self, *args: tuple, **kwargs: dict) -> Any:
func = args[0]
func.retry_num = 0
func.delay = func_delay_decorator(func, self.queue_name, self.max_retries, self.use_pickle)
2 changes: 0 additions & 2 deletions eb_sqs/management/commands/process_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import, unicode_literals

from django.core.management import BaseCommand, CommandError

from eb_sqs.worker.service import WorkerService
Loading