Skip to content

Commit 250992a

Browse files
committed
Initial commit
0 parents  commit 250992a

15 files changed

+625
-0
lines changed

.gitignore

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
*.pyc
2+
*.db
3+
.coverage
4+
MANIFEST
5+
dist/
6+
build/
7+
env/
8+
html/
9+
htmlcov/
10+
*.egg-info/
11+
.tox/

django_dbq/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__version__ = '0.0.1'

django_dbq/management/__init__.py

Whitespace-only changes.

django_dbq/management/commands/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from django.conf import settings
2+
from django.core.management.base import BaseCommand, CommandError
3+
from django_dbq.apps.core.models import Job
4+
from optparse import make_option
5+
import json
6+
import logging
7+
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class Command(BaseCommand):
13+
14+
help = "Create a job"
15+
args = '<job_name>'
16+
17+
option_list = BaseCommand.option_list + (
18+
make_option('--workspace',
19+
help='JSON-formatted initial command workspace'),)
20+
21+
def handle(self, *args, **options):
22+
if len(args) != 1:
23+
raise CommandError("Please supply a single job name")
24+
25+
name = args[0]
26+
if name not in settings.JOBS:
27+
raise CommandError('"%s" is not a valid job name' % name)
28+
29+
workspace = options['workspace']
30+
if workspace:
31+
workspace = json.loads(workspace)
32+
33+
job = Job.objects.create(name=name, workspace=workspace)
34+
self.stdout.write('Created job: "%s", id=%s' % (job.name, job.pk))
+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from django.db import transaction
2+
from django.core.management.base import NoArgsCommand
3+
from django.utils.module_loading import import_by_path
4+
from django_dbq.apps.core.models import Job
5+
from simplesignals.process import WorkerProcessBase
6+
from time import sleep
7+
import logging
8+
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
def process_job():
14+
"""This function grabs the next available job, and runs its next task."""
15+
16+
with transaction.atomic():
17+
job = Job.objects.get_ready_or_none()
18+
if not job:
19+
return
20+
21+
logger.info('Processing job: name="%s" id=%s state=%s next_task=%s', job.name, job.pk, job.state, job.next_task)
22+
job.state = Job.STATES.PROCESSING
23+
job.save()
24+
25+
try:
26+
task_function = import_by_path(job.next_task)
27+
task_function(job)
28+
job.update_next_task()
29+
if not job.next_task:
30+
job.state = Job.STATES.COMPLETE
31+
else:
32+
job.state = Job.STATES.READY
33+
except Exception as exception:
34+
logger.exception("Job id=%s failed", job.pk)
35+
job.state = Job.STATES.FAILED
36+
37+
failure_hook_name = job.get_failure_hook_name()
38+
if failure_hook_name:
39+
logger.info("Running failure hook %s for job id=%s", failure_hook_name, job.pk)
40+
failure_hook_function = import_by_path(failure_hook_name)
41+
failure_hook_function(job, exception)
42+
else:
43+
logger.info("No failure hook for job id=%s", job.pk)
44+
45+
logger.info('Updating job: name="%s" id=%s state=%s next_task=%s', job.name, job.pk, job.state, job.next_task or 'none')
46+
47+
try:
48+
job.save()
49+
except:
50+
logger.error('Failed to save job: id=%s org=%s', job.pk, job.workspace.get('organisation_id'))
51+
raise
52+
53+
54+
class Worker(WorkerProcessBase):
55+
56+
process_title = "jobworker"
57+
58+
def do_work(self):
59+
sleep(1)
60+
process_job()
61+
62+
63+
class Command(NoArgsCommand):
64+
65+
help = "Run a queue worker process"
66+
67+
def handle_noargs(self, **options):
68+
logger.info("Starting job worker")
69+
worker = Worker()
70+
worker.run()

django_dbq/migrations/0001_initial.py

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# -*- coding: utf-8 -*-
2+
from south.utils import datetime_utils as datetime
3+
from south.db import db
4+
from south.v2 import SchemaMigration
5+
from django.db import models
6+
7+
8+
class Migration(SchemaMigration):
9+
10+
def forwards(self, orm):
11+
# Adding model 'Job'
12+
db.create_table(u'core_job', (
13+
('id', self.gf('uuidfield.fields.UUIDField')(unique=True, max_length=32, primary_key=True, db_index=True)),
14+
('created', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, db_index=True, blank=True)),
15+
('modified', self.gf('django.db.models.fields.DateTimeField')(auto_now=True, blank=True)),
16+
('name', self.gf('django.db.models.fields.CharField')(max_length=100)),
17+
('state', self.gf('django.db.models.fields.CharField')(default='READY', max_length=20, db_index=True)),
18+
('next_task', self.gf('django.db.models.fields.CharField')(max_length=100, blank=True)),
19+
('workspace', self.gf('jsonfield.fields.JSONField')(null=True)),
20+
))
21+
db.send_create_signal(u'core', ['Job'])
22+
23+
24+
def backwards(self, orm):
25+
# Deleting model 'Job'
26+
db.delete_table(u'core_job')
27+
28+
29+
models = {
30+
u'core.job': {
31+
'Meta': {'ordering': "['-created']", 'object_name': 'Job'},
32+
'created': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'db_index': 'True', 'blank': 'True'}),
33+
'id': ('uuidfield.fields.UUIDField', [], {'unique': 'True', 'max_length': '32', 'primary_key': 'True', 'db_index': 'True'}),
34+
'modified': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'blank': 'True'}),
35+
'name': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
36+
'next_task': ('django.db.models.fields.CharField', [], {'max_length': '100', 'blank': 'True'}),
37+
'state': ('django.db.models.fields.CharField', [], {'default': "'READY'", 'max_length': '20', 'db_index': 'True'}),
38+
'workspace': ('jsonfield.fields.JSONField', [], {'null': 'True'})
39+
}
40+
}
41+
42+
complete_apps = ['core']

django_dbq/migrations/__init__.py

Whitespace-only changes.

django_dbq/models.py

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
from django.db import models
2+
from django.utils.module_loading import import_by_path
3+
from django_dbq.apps.core.tasks import get_next_task_name, get_failure_hook_name, get_creation_hook_name
4+
from jsonfield import JSONField
5+
from model_utils import Choices
6+
import logging
7+
import uuidfield
8+
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class JobManager(models.Manager):
14+
15+
def get_ready_or_none(self, max_retries=3):
16+
"""
17+
Get a job in state READY. Supports retrying in case of database deadlock
18+
19+
See https://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html
20+
21+
"Always be prepared to re-issue a transaction if it fails due to
22+
deadlock. Deadlocks are not dangerous. Just try again."
23+
24+
In the `except` clause, it's difficult to be more specific on the
25+
exception type, because it's different on different backends. MySQL,
26+
for example, raises a django.db.utils.InternalError for all manner of
27+
database-related problems. This code is more-or-less cribbed from
28+
django-celery, which uses a very similar approach.
29+
30+
"""
31+
retries_left = max_retries
32+
while True:
33+
try:
34+
return self.select_for_update().filter(state=Job.STATES.READY).first()
35+
except Exception as e:
36+
if retries_left == 0:
37+
raise
38+
retries_left -= 1
39+
logger.warn("Caught %s when looking for a READY job, retrying %s more times", str(e), retries_left)
40+
41+
42+
class Job(models.Model):
43+
44+
STATES = Choices("READY", "PROCESSING", "FAILED", "COMPLETE")
45+
46+
id = uuidfield.UUIDField(primary_key=True, auto=True, db_index=True)
47+
created = models.DateTimeField(auto_now_add=True, db_index=True)
48+
modified = models.DateTimeField(auto_now=True)
49+
name = models.CharField(max_length=100)
50+
state = models.CharField(max_length=20, choices=STATES, default=STATES.READY, db_index=True)
51+
next_task = models.CharField(max_length=100, blank=True)
52+
workspace = JSONField(null=True)
53+
54+
class Meta:
55+
ordering = ['-created']
56+
57+
objects = JobManager()
58+
59+
def save(self, *args, **kwargs):
60+
if not self.pk:
61+
self.next_task = get_next_task_name(self.name)
62+
self.workspace = self.workspace or {}
63+
64+
try:
65+
self.run_creation_hook()
66+
except Exception as exception: # noqa
67+
logger.exception("Failed to create new job, creation hook raised an exception")
68+
return # cancel the save
69+
70+
return super(Job, self).save(*args, **kwargs)
71+
72+
def update_next_task(self):
73+
self.next_task = get_next_task_name(self.name, self.next_task) or ''
74+
75+
def get_failure_hook_name(self):
76+
return get_failure_hook_name(self.name)
77+
78+
def get_creation_hook_name(self):
79+
return get_creation_hook_name(self.name)
80+
81+
def run_creation_hook(self):
82+
creation_hook_name = self.get_creation_hook_name()
83+
if creation_hook_name:
84+
logger.info("Running creation hook %s for new job", creation_hook_name)
85+
creation_hook_function = import_by_path(creation_hook_name)
86+
creation_hook_function(self)

django_dbq/serializers.py

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from django.conf import settings
2+
from django_dbq.apps.core.models import Job
3+
from rest_framework import serializers
4+
import json
5+
6+
7+
class JobSerializer(serializers.Serializer):
8+
name = serializers.ChoiceField()
9+
created = serializers.DateTimeField(read_only=True)
10+
modified = serializers.DateTimeField(read_only=True)
11+
state = serializers.CharField(read_only=True)
12+
workspace = serializers.WritableField(required=False)
13+
url = serializers.HyperlinkedIdentityField(view_name='job_detail')
14+
15+
def __init__(self, *args, **kwargs):
16+
super(JobSerializer, self).__init__(*args, **kwargs)
17+
self.fields['name'].choices = ((key, key) for key in settings.JOBS)
18+
19+
def validate_workspace(self, attrs, source):
20+
workspace = attrs.get('workspace')
21+
if workspace and isinstance(workspace, basestring):
22+
try:
23+
attrs['workspace'] = json.loads(workspace)
24+
except ValueError:
25+
raise serializers.ValidationError("Invalid JSON")
26+
return attrs
27+
28+
def restore_object(self, attrs, instance=None):
29+
return Job(**attrs)

django_dbq/tasks.py

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from django.conf import settings
2+
3+
4+
TASK_LIST_KEY = 'tasks'
5+
FAILURE_HOOK_KEY = 'failure_hook'
6+
CREATION_HOOK_KEY = 'creation_hook'
7+
8+
9+
def get_next_task_name(job_name, current_task=None):
10+
"""Given a job name and (optionally) a task name, return the
11+
next task in the list. If the current_task is None, return the
12+
first task. If current_task is the last task in the list, return None"""
13+
14+
task_list = settings.JOBS[job_name][TASK_LIST_KEY]
15+
16+
if current_task is None:
17+
return task_list[0]
18+
19+
next_task_index = task_list.index(current_task) + 1
20+
21+
try:
22+
return task_list[next_task_index]
23+
except IndexError:
24+
return None
25+
26+
27+
def get_failure_hook_name(job_name):
28+
"""Return the name of the failure hook for the given job (as a string) or None"""
29+
return settings.JOBS[job_name].get(FAILURE_HOOK_KEY)
30+
31+
32+
def get_creation_hook_name(job_name):
33+
"""Return the name of the creation hook for the given job (as a string) or None"""
34+
return settings.JOBS[job_name].get(CREATION_HOOK_KEY)

0 commit comments

Comments
 (0)