Skip to content

Commit d64eb97

Browse files
committed
Factor out the interactions handler in the worker code.
1 parent 06c3cdb commit d64eb97

File tree

2 files changed

+71
-62
lines changed

2 files changed

+71
-62
lines changed

lib/taverna_player/worker.rb

+70-59
Original file line numberDiff line numberDiff line change
@@ -109,65 +109,7 @@ def perform
109109
return
110110
end
111111

112-
run.notifications(:requests).each do |note|
113-
if @run.has_parent?
114-
next if note.has_reply? || note.is_notification?
115-
int = Interaction.find_by_run_id_and_serial(@run.parent_id, note.serial)
116-
new_int = Interaction.find_or_initialize_by_run_id_and_serial(@run.id, note.serial)
117-
if new_int.new_record?
118-
note.reply(int.feed_reply, int.data)
119-
new_int.displayed = true
120-
new_int.replied = true
121-
new_int.feed_reply = int.feed_reply
122-
new_int.data = int.data
123-
new_int.save
124-
end
125-
else
126-
int = Interaction.find_or_initialize_by_run_id_and_serial(@run.id, note.serial)
127-
128-
# Need to catch this here in case some other process has replied.
129-
if note.has_reply? && !int.replied?
130-
int.replied = true
131-
int.save
132-
end
133-
134-
unless int.replied?
135-
if int.page.blank?
136-
page = server.read(note.uri, "text/html", credentials)
137-
138-
INTERACTION_REGEX.match(page) do
139-
page_uri = $1
140-
141-
if page_uri.starts_with?(server.uri.to_s)
142-
page = server.read(URI.parse(page_uri), "text/html", credentials)
143-
int.page = page.gsub("#{run.interactions_uri.to_s}/pmrpc.js",
144-
"/assets/taverna_player/application.js")
145-
else
146-
int.page_uri = page_uri
147-
end
148-
end
149-
end
150-
151-
if note.is_notification? && !int.new_record?
152-
int.replied = true
153-
end
154-
155-
if int.data.blank?
156-
int.data = note.input_data.force_encoding("UTF-8")
157-
end
158-
159-
if !int.feed_reply.blank? && !int.data.blank?
160-
note.reply(int.feed_reply, int.data)
161-
162-
int.replied = true
163-
end
164-
165-
int.save
166-
end
167-
168-
waiting = true unless int.replied?
169-
end
170-
end
112+
waiting = interactions(run, credentials)
171113

172114
status_message(waiting ? "interact" : "running")
173115
end
@@ -241,6 +183,75 @@ def create_run(server, workflow, credentials)
241183
failed(ce)
242184
end
243185

186+
def interactions(run, credentials)
187+
wait = false
188+
189+
run.notifications(:requests).each do |note|
190+
if @run.has_parent?
191+
next if note.has_reply? || note.is_notification?
192+
193+
int = Interaction.find_by_run_id_and_serial(@run.parent_id, note.serial)
194+
new_int = Interaction.find_or_initialize_by_run_id_and_serial(@run.id, note.serial)
195+
196+
if new_int.new_record?
197+
note.reply(int.feed_reply, int.data)
198+
new_int.displayed = true
199+
new_int.replied = true
200+
new_int.feed_reply = int.feed_reply
201+
new_int.data = int.data
202+
new_int.save
203+
end
204+
else
205+
int = Interaction.find_or_initialize_by_run_id_and_serial(@run.id, note.serial)
206+
207+
# Need to catch this here in case some other process has replied.
208+
if note.has_reply? && !int.replied?
209+
int.replied = true
210+
int.save
211+
end
212+
213+
unless int.replied?
214+
if int.page.blank?
215+
server = run.server
216+
page = server.read(note.uri, "text/html", credentials)
217+
218+
INTERACTION_REGEX.match(page) do
219+
page_uri = $1
220+
221+
if page_uri.starts_with?(server.uri.to_s)
222+
page = server.read(URI.parse(page_uri), "text/html", credentials)
223+
int.page = page.gsub("#{run.interactions_uri.to_s}/pmrpc.js",
224+
"/assets/taverna_player/application.js")
225+
else
226+
int.page_uri = page_uri
227+
end
228+
end
229+
end
230+
231+
# If this is a pure notification that we've already seen then
232+
# set it as replied as we don't want it blocking a proper
233+
# interaction.
234+
int.replied = true if note.is_notification? && !int.new_record?
235+
236+
if int.data.blank?
237+
int.data = note.input_data.force_encoding("UTF-8")
238+
end
239+
240+
if !int.feed_reply.blank? && !int.data.blank?
241+
note.reply(int.feed_reply, int.data)
242+
int.replied = true
243+
end
244+
245+
int.save
246+
end
247+
248+
wait = true unless int.replied?
249+
end
250+
end
251+
252+
wait
253+
end
254+
244255
# Run the specified callback and return false on error so that we know to
245256
# return out of the worker code completely.
246257
def run_callback(cb, message)

test/unit/taverna_player/worker_test.rb

+1-3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class WorkerTest < ActiveSupport::TestCase
3939

4040
# Stuff we can't test yet in TavernaPlayer::Worker.
4141
flexmock(TavernaPlayer::Worker).new_instances do |w|
42+
w.should_receive(:interactions).and_return(false)
4243
w.should_receive(:download_outputs).and_return_undefined
4344
w.should_receive(:process_outputs).and_return([])
4445
end
@@ -102,7 +103,6 @@ class WorkerTest < ActiveSupport::TestCase
102103
r.should_receive(:name=).once.and_return(true)
103104
r.should_receive(:start).twice.and_return(false, true)
104105
r.should_receive(:start_time).and_return(Time.now)
105-
r.should_receive(:notifications).and_return([])
106106
r.should_receive(:finish_time).and_return(Time.now)
107107
r.should_receive(:log).once.and_return(0)
108108
r.should_receive(:delete).and_return_undefined
@@ -296,7 +296,6 @@ class WorkerTest < ActiveSupport::TestCase
296296
r.should_receive(:name=).once.and_return(true)
297297
r.should_receive(:start).once.and_return(true)
298298
r.should_receive(:start_time).and_return(Time.now)
299-
r.should_receive(:notifications).and_return([])
300299
r.should_receive(:finish_time).and_return(Time.now)
301300
r.should_receive(:log).once.and_return(0)
302301
r.should_receive(:delete).and_return_undefined
@@ -341,7 +340,6 @@ class WorkerTest < ActiveSupport::TestCase
341340
r.should_receive(:name=).once.and_return(true)
342341
r.should_receive(:start).once.and_return(true)
343342
r.should_receive(:start_time).and_return(Time.now)
344-
r.should_receive(:notifications).and_return([])
345343
r.should_receive(:finish_time).and_return(Time.now)
346344
r.should_receive(:log).once.and_return(0)
347345
r.should_receive(:delete).and_return_undefined

0 commit comments

Comments
 (0)