forked from enterstudio/ckanext-datagovau
-
Notifications
You must be signed in to change notification settings - Fork 0
/
threaded-datapusher.patch
113 lines (103 loc) · 4.46 KB
/
threaded-datapusher.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
diff --git a/ckanext/datapusher/logic/action.py b/ckanext/datapusher/logic/action.py
index 1af23c3..e9ab619 100644
--- a/ckanext/datapusher/logic/action.py
+++ b/ckanext/datapusher/logic/action.py
@@ -51,8 +51,11 @@ def datapusher_submit(context, data_dict):
site_url = pylons.config['ckan.site_url']
callback_url = site_url.rstrip('/') + '/api/3/action/datapusher_hook'
- user = p.toolkit.get_action('user_show')(context, {'id': context['user']})
-
+ try:
+ user = p.toolkit.get_action('user_show')(context, {'id': context['user']})
+ except:
+ user = p.toolkit.get_action('get_site_user')({'ignore_auth': True}, {})
+ context['user'] = user
task = {
'entity_id': res_id,
'entity_type': 'resource',
diff --git a/ckanext/datapusher/plugin.py b/ckanext/datapusher/plugin.py
index 25858f6..d0cab03 100644
--- a/ckanext/datapusher/plugin.py
+++ b/ckanext/datapusher/plugin.py
@@ -10,6 +10,8 @@ import ckan.logic as logic
import ckan.model as model
import ckan.plugins.toolkit as toolkit
+import Queue
+import threading
from ckan.common import _
log = logging.getLogger(__name__)
@@ -53,9 +55,9 @@ class ResourceDataController(base.BaseController):
None, {'id': resource_id}
)
except logic.NotFound:
- base.abort(404, _('Resource not found'))
+ base.abort(404, 'Resource not found')
except logic.NotAuthorized:
- base.abort(401, _('Unauthorized to edit this resource'))
+ base.abort(401, 'Unauthorized to edit this resource')
try:
datapusher_status = p.toolkit.get_action('datapusher_status')(
@@ -69,6 +71,30 @@ class ResourceDataController(base.BaseController):
return base.render('package/resource_data.html',
extra_vars={'status': datapusher_status})
+class SubmitThread(threading.Thread):
+ """Threaded Url POST"""
+ def __init__(self, queue):
+ threading.Thread.__init__(self)
+ self.queue = queue
+
+ def run(self):
+ while True:
+ # grabs host from queue
+ (context,entity_id) = self.queue.get()
+
+ log.debug("Queue Processing: Sending resource to datastore: " + entity_id)
+ try:
+ p.toolkit.get_action('datapusher_submit')(context, {
+ 'resource_id': entity_id
+ })
+ except p.toolkit.ValidationError, e:
+ # If datapusher is offline want to catch error instead
+ # of raising otherwise resource save will fail with 500
+ log.critical(e)
+ pass
+ # signals to queue job is done
+ self.queue.task_done()
+
class DatapusherPlugin(p.SingletonPlugin):
p.implements(p.IConfigurable, inherit=True)
@@ -81,12 +107,16 @@ class DatapusherPlugin(p.SingletonPlugin):
legacy_mode = False
resource_show_action = None
-
+ submit_queue = Queue.Queue()
def configure(self, config):
self.config = config
datapusher_formats = config.get('ckan.datapusher.formats', '').lower()
self.datapusher_formats = datapusher_formats.split() or DEFAULT_FORMATS
+ for i in range(2):
+ t = SubmitThread(self.submit_queue)
+ t.setDaemon(True)
+ t.start()
for config_option in ('ckan.site_url', 'ckan.datapusher.url',):
if not config.get(config_option):
@@ -105,16 +135,10 @@ class DatapusherPlugin(p.SingletonPlugin):
'defer_commit': True}
if (entity.format and
entity.format.lower() in self.datapusher_formats and
- entity.url_type != 'datapusher'):
- try:
- p.toolkit.get_action('datapusher_submit')(context, {
- 'resource_id': entity.id
- })
- except p.toolkit.ValidationError, e:
- # If datapusher is offline want to catch error instead
- # of raising otherwise resource save will fail with 500
- log.critical(e)
- pass
+ entity.url_type != 'datapusher' and
+ not hasattr(entity,'datapusher_exempt')):
+ log.debug('Added datapusher to queue for '+entity.id)
+ self.submit_queue.put((context,entity.id))
def before_map(self, m):
m.connect(