@@ -50,23 +50,11 @@ def perform
50
50
51
51
begin
52
52
server = T2Server ::Server . new ( @server , conn_params )
53
- wkf = File . read ( @workflow )
53
+ workflow = File . read ( @workflow )
54
+ run = create_run ( server , workflow , credentials )
54
55
55
- # Try and create the run bearing in mind that the server might be at
56
- # the limit of runs that it can hold at once.
57
- begin
58
- run = server . create_run ( wkf , credentials )
59
- rescue T2Server ::ServerAtCapacityError
60
- status_message ( "full" )
61
-
62
- if cancelled?
63
- cancel
64
- return
65
- end
66
-
67
- sleep ( TavernaPlayer . server_retry_interval )
68
- retry
69
- end
56
+ # If run is nil here then we could have failed or have been cancelled.
57
+ return if run . nil?
70
58
71
59
status_message ( "initialized" )
72
60
@@ -121,65 +109,7 @@ def perform
121
109
return
122
110
end
123
111
124
- run . notifications ( :requests ) . each do |note |
125
- if @run . has_parent?
126
- next if note . has_reply? || note . is_notification?
127
- int = Interaction . find_by_run_id_and_serial ( @run . parent_id , note . serial )
128
- new_int = Interaction . find_or_initialize_by_run_id_and_serial ( @run . id , note . serial )
129
- if new_int . new_record?
130
- note . reply ( int . feed_reply , int . data )
131
- new_int . displayed = true
132
- new_int . replied = true
133
- new_int . feed_reply = int . feed_reply
134
- new_int . data = int . data
135
- new_int . save
136
- end
137
- else
138
- int = Interaction . find_or_initialize_by_run_id_and_serial ( @run . id , note . serial )
139
-
140
- # Need to catch this here in case some other process has replied.
141
- if note . has_reply? && !int . replied?
142
- int . replied = true
143
- int . save
144
- end
145
-
146
- unless int . replied?
147
- if int . page . blank?
148
- page = server . read ( note . uri , "text/html" , credentials )
149
-
150
- INTERACTION_REGEX . match ( page ) do
151
- page_uri = $1
152
-
153
- if page_uri . starts_with? ( server . uri . to_s )
154
- page = server . read ( URI . parse ( page_uri ) , "text/html" , credentials )
155
- int . page = page . gsub ( "#{ run . interactions_uri . to_s } /pmrpc.js" ,
156
- "/assets/taverna_player/application.js" )
157
- else
158
- int . page_uri = page_uri
159
- end
160
- end
161
- end
162
-
163
- if note . is_notification? && !int . new_record?
164
- int . replied = true
165
- end
166
-
167
- if int . data . blank?
168
- int . data = note . input_data . force_encoding ( "UTF-8" )
169
- end
170
-
171
- if !int . feed_reply . blank? && !int . data . blank?
172
- note . reply ( int . feed_reply , int . data )
173
-
174
- int . replied = true
175
- end
176
-
177
- int . save
178
- end
179
-
180
- waiting = true unless int . replied?
181
- end
182
- end
112
+ waiting = interactions ( run , credentials )
183
113
184
114
status_message ( waiting ? "interact" : "running" )
185
115
end
@@ -220,6 +150,108 @@ def server_credentials
220
150
T2Server ::HttpBasic . new ( user , pass )
221
151
end
222
152
153
+ # Try and create the run bearing in mind that the server might be at
154
+ # the limit of runs that it can hold at once.
155
+ def create_run ( server , workflow , credentials )
156
+ retries ||= TavernaPlayer . server_connection_error_retries
157
+ server . create_run ( workflow , credentials )
158
+ rescue T2Server ::ServerAtCapacityError
159
+ status_message ( "full" )
160
+
161
+ if cancelled?
162
+ cancel
163
+ return
164
+ end
165
+
166
+ sleep ( TavernaPlayer . server_retry_interval )
167
+ retry
168
+ rescue T2Server ::ConnectionError => ce
169
+ status_message ( "network-error" )
170
+
171
+ if cancelled?
172
+ cancel
173
+ return
174
+ end
175
+
176
+ sleep ( TavernaPlayer . server_retry_interval )
177
+ unless retries . zero?
178
+ retries -= 1
179
+ retry
180
+ end
181
+
182
+ # If we're out of retries, fail the run.
183
+ failed ( ce )
184
+ end
185
+
186
+ def interactions ( run , credentials )
187
+ wait = false
188
+
189
+ run . notifications ( :requests ) . each do |note |
190
+ if @run . has_parent?
191
+ next if note . has_reply? || note . is_notification?
192
+
193
+ int = Interaction . find_by_run_id_and_serial ( @run . parent_id , note . serial )
194
+ new_int = Interaction . find_or_initialize_by_run_id_and_serial ( @run . id , note . serial )
195
+
196
+ if new_int . new_record?
197
+ note . reply ( int . feed_reply , int . data )
198
+ new_int . displayed = true
199
+ new_int . replied = true
200
+ new_int . feed_reply = int . feed_reply
201
+ new_int . data = int . data
202
+ new_int . save
203
+ end
204
+ else
205
+ int = Interaction . find_or_initialize_by_run_id_and_serial ( @run . id , note . serial )
206
+
207
+ # Need to catch this here in case some other process has replied.
208
+ if note . has_reply? && !int . replied?
209
+ int . replied = true
210
+ int . save
211
+ end
212
+
213
+ unless int . replied?
214
+ if int . page . blank?
215
+ server = run . server
216
+ page = server . read ( note . uri , "text/html" , credentials )
217
+
218
+ INTERACTION_REGEX . match ( page ) do
219
+ page_uri = $1
220
+
221
+ if page_uri . starts_with? ( server . uri . to_s )
222
+ page = server . read ( URI . parse ( page_uri ) , "text/html" , credentials )
223
+ int . page = page . gsub ( "#{ run . interactions_uri . to_s } /pmrpc.js" ,
224
+ "/assets/taverna_player/application.js" )
225
+ else
226
+ int . page_uri = page_uri
227
+ end
228
+ end
229
+ end
230
+
231
+ # If this is a pure notification that we've already seen then
232
+ # set it as replied as we don't want it blocking a proper
233
+ # interaction.
234
+ int . replied = true if note . is_notification? && !int . new_record?
235
+
236
+ if int . data . blank?
237
+ int . data = note . input_data . force_encoding ( "UTF-8" )
238
+ end
239
+
240
+ if !int . feed_reply . blank? && !int . data . blank?
241
+ note . reply ( int . feed_reply , int . data )
242
+ int . replied = true
243
+ end
244
+
245
+ int . save
246
+ end
247
+
248
+ wait = true unless int . replied?
249
+ end
250
+ end
251
+
252
+ wait
253
+ end
254
+
223
255
# Run the specified callback and return false on error so that we know to
224
256
# return out of the worker code completely.
225
257
def run_callback ( cb , message )
0 commit comments