-
Notifications
You must be signed in to change notification settings - Fork 0
/
celery_tasks.py
115 lines (96 loc) · 3.54 KB
/
celery_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#####################################################################################
# Defines Celery tasks for cyan-waterbody (reports and eventually image aggregation)
#####################################################################################
from time import sleep
import os
import logging
from celery import Celery
import uuid
from datetime import datetime
import json
from flaskr import report
import requests
redis_hostname = os.environ.get("REDIS_HOSTNAME", "localhost")
redis_port = os.environ.get("REDIS_PORT", 6379)
logging.info("REDIS_HOSTNAME: {}".format(redis_hostname))
logging.info("REDIS_PORT: {}".format(redis_port))
celery_instance = Celery(
"tasks",
broker="redis://{}:{}/0".format(redis_hostname, redis_port),
backend="redis://{}:{}/0".format(redis_hostname, redis_port),
)
# TODO: Update parameters for recent versions of Celery (throws warnings):
celery_instance.conf.update(
broker_url="redis://{}:{}/0".format(redis_hostname, redis_port),
result_backend="redis://{}:{}/0".format(redis_hostname, redis_port),
accept_content=["json"],
task_serializer="json",
result_serializer="json",
task_ignore_result=False,
task_track_started=True,
worker_max_tasks_per_child=50000000,
)
@celery_instance.task(bind=True)
def generate_report(self, request_obj):
token = request_obj.pop('token')
origin = request_obj.pop('origin')
app_name = request_obj.pop('app_name')
response = None
try:
report_response = report.generate_report(**request_obj) # returns report id
except Exception as e:
logging.warning("Exception generating report: {}".format(e))
@celery_instance.task(bind=True)
def test_celery(*args):
logging.warning("Testing celery: {}".format(args))
sleep(5)
logging.warning("Celery successfully called.")
return {"status": "celery task finished."}
class CeleryHandler:
def __init__(self):
self.states = [
"FAILURE",
"REVOKED",
"RETRY",
"PENDING",
"RECEIVED",
"STARTED",
"SUCCESS",
]
self.pending_states = ["RETRY", "PENDING", "RECEIVED", "STARTED"]
self.fail_states = ["FAILURE", "REVOKED"]
# self.cyano_request_timeout = 30 # seconds
def test_celery(self):
logging.warning("CALLING CELERY TASK")
celery_job = test_celery.apply_async(queue="celery")
return {"status": "test celery called"}
def start_task(self, request_obj):
"""
Starts celery task and saves job/task ID to job table.
"""
task_id = request_obj['report_id']
# Runs job on celery worker:
celery_job = generate_report.apply_async(
args=[request_obj], queue="celery", task_id=task_id
)
return celery_job
def check_celery_job_status(self, report_id):
"""
Checks the status of a celery job and returns
its status.
Celery States: FAILURE, PENDING, RECEIVED, RETRY,
REVOKED, STARTED, SUCCESS
"""
task = celery_instance.AsyncResult(report_id)
return task.status
def revoke_task(self, report_id):
"""
Revokes/cancels a celery task.
"""
try:
result = celery_instance.AsyncResult(report_id).revoke()
logging.warning("Task '{}' revoked: {}".format(report_id, result))
return {"status": "Job canceled"}
except Exception as e:
logging.error("revoke_task error: {}".format(e))
return {"status": "Failed to cancel job"}