From 21ceff8c95d90192657a2a589b6748f0c9ae8a18 Mon Sep 17 00:00:00 2001 From: "NARUSE, Yui" Date: Tue, 13 Sep 2016 18:02:53 +0900 Subject: [PATCH 1/9] Update timeout on heartbeat From v0.9, AcquiredTask#timeout stores the time when it is acquired. This commit changes the semantics to the last heartbeated time. --- lib/perfectqueue/backend/rdb_compat.rb | 4 ++-- lib/perfectqueue/task_monitor.rb | 6 +++-- spec/multiprocess/child_process_spec.rb | 2 +- spec/rdb_compat_backend_spec.rb | 14 +++++++++--- spec/task_monitor_spec.rb | 29 ++++++++++++++++++++++--- 5 files changed, 44 insertions(+), 11 deletions(-) diff --git a/lib/perfectqueue/backend/rdb_compat.rb b/lib/perfectqueue/backend/rdb_compat.rb index 9b46f94..c28d921 100644 --- a/lib/perfectqueue/backend/rdb_compat.rb +++ b/lib/perfectqueue/backend/rdb_compat.rb @@ -292,7 +292,7 @@ def finish(task_token, retention_time, options) nil end - # => nil + # => next_timeout def heartbeat(task_token, alive_time, options) now = (options[:now] || Time.now).to_i next_timeout = now + alive_time @@ -321,7 +321,7 @@ def heartbeat(task_token, alive_time, options) end end } - nil + next_timeout end def release(task_token, alive_time, options) diff --git a/lib/perfectqueue/task_monitor.rb b/lib/perfectqueue/task_monitor.rb index dced0ef..16a6ab6 100644 --- a/lib/perfectqueue/task_monitor.rb +++ b/lib/perfectqueue/task_monitor.rb @@ -59,7 +59,7 @@ def set_task(task, runner) task.runner = runner @mutex.synchronize { @task = task - @last_task_heartbeat = Time.now.to_i + @last_task_heartbeat = @task.timeout.to_i } end @@ -145,7 +145,9 @@ def run private def task_heartbeat - @task.heartbeat! + v = @task.heartbeat!(last_heartbeat: @last_task_heartbeat) + @task.attributes[:timeout] = v + v rescue # finished, preempted, etc. kill_task($!) diff --git a/spec/multiprocess/child_process_spec.rb b/spec/multiprocess/child_process_spec.rb index f87499b..6537f2e 100644 --- a/spec/multiprocess/child_process_spec.rb +++ b/spec/multiprocess/child_process_spec.rb @@ -73,7 +73,7 @@ end describe '#process' do - let (:task){ double('task', key: double) } + let (:task){ double('task', key: double, timeout: Time.now.to_i) } before do expect(runner_insntace).to receive(:run) end diff --git a/spec/rdb_compat_backend_spec.rb b/spec/rdb_compat_backend_spec.rb index 214496a..f3f1975 100644 --- a/spec/rdb_compat_backend_spec.rb +++ b/spec/rdb_compat_backend_spec.rb @@ -294,15 +294,18 @@ let (:delete_timeout){ now + retention_time } let (:options){ {now: now} } before{ allow(STDERR).to receive(:puts) } - context 'have a queueuled task' do + context 'have a queued task' do before do db.submit(key, 'test', nil, {}) end it 'returns nil if next_run_time is not updated' do - expect(db.heartbeat(task_token, 0, {now: now})).to be_nil + expect(db.heartbeat(task_token, 0, {now: now})).to be_a(Integer) end it 'returns nil even if next_run_time is updated' do - expect(db.heartbeat(task_token, 1, {})).to be_nil + expect(db.heartbeat(task_token, 1, {})).to be_a(Integer) + end + it 'raises PreemptedError if last_timeout is not matched' do + expect{db.heartbeat(task_token, 1, {last_timeout: now-100})}.to raise_error(PreemptedError) end end context 'no tasks' do @@ -319,6 +322,11 @@ expect{db.heartbeat(task_token, 0, {})}.to raise_error(PreemptedError) end end + context 'stolen task' do + it 'raises PreemptedError if the task has unpexpected last_timeout' do + expect{db.heartbeat(task_token, 0, last_timeout: 0)}.to raise_error(PreemptedError) + end + end end context '#connect' do diff --git a/spec/task_monitor_spec.rb b/spec/task_monitor_spec.rb index 975cec0..b9140f1 100644 --- a/spec/task_monitor_spec.rb +++ b/spec/task_monitor_spec.rb @@ -39,17 +39,40 @@ end describe '#task_heartbeat' do - let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object) } + let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object, task_heartbeat_interval: 1) } let (:err){ StandardError.new('heartbeat preempted') } + let (:now){ Time.now.to_i } + let (:task){ double('task', timeout: now) } before do - task = double('task') - allow(task).to receive(:heartbeat!){ raise err } tm.set_task(task, double('runner')) end it 'calls kill_task($!) on heartbeat error' do + allow(task).to receive(:heartbeat!){ raise err } expect(tm).to receive(:kill_task).with(err).exactly(:once) tm.__send__(:task_heartbeat) end + context 'normal' do + let (:config){ {type: 'rdb_compat', url: 'mysql2://root:@localhost/perfectqueue_test', table: 'test_queues', alive_time: 11} } + let (:client){ Client.new(config) } + before do + client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } } + client.init_database + client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'}) + tm.start + end + after do + tm.stop + end + it 'update timeout' do + tasks = client.acquire(now: now-80) + task = tasks[0] + expect(task.timeout.to_i).to eq(now-80+config[:alive_time]) + tm.set_task(task, double('runner')) + allow(Time).to receive(:now).and_return(now-50) + sleep 1 until tm.instance_variable_get(:@last_task_heartbeat) == now-50 + expect(task.timeout.to_i).to eq(now-50+config[:alive_time]) + end + end end end From 8990c220171fff28b8fd78b71001ab73df7ea72d Mon Sep 17 00:00:00 2001 From: "NARUSE, Yui" Date: Tue, 13 Sep 2016 18:05:37 +0900 Subject: [PATCH 2/9] Check timeout on acquire * Avoid to touch created_at because it is misused and maybe used in different meanings in the future. From v0.8.48, PQ can use timeout to find living tasks. * Check timeout on heartbeat. v0.9's AcquiredTask#timeout saves the last heartbeated time. PQ can check whether the heartbeating task is still acquired by itself or not. --- lib/perfectqueue/backend/rdb.rb | 3 ++- lib/perfectqueue/backend/rdb_compat.rb | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/lib/perfectqueue/backend/rdb.rb b/lib/perfectqueue/backend/rdb.rb index 0a16ed8..b256df6 100644 --- a/lib/perfectqueue/backend/rdb.rb +++ b/lib/perfectqueue/backend/rdb.rb @@ -6,6 +6,7 @@ module PerfectQueue::Backend class RDBBackend MAX_RETRY = ::PerfectQueue::Backend::RDBCompatBackend::MAX_RETRY DELETE_OFFSET = ::PerfectQueue::Backend::RDBCompatBackend::DELETE_OFFSET + EVENT_HORIZON = ::PerfectQueue::Backend::RDBCompatBackend::EVENT_HORIZON class Token < Struct.new(:key) end @@ -48,7 +49,7 @@ def submit(id, data, time=Process.clock_gettime(Process::CLOCK_REALTIME, :second def cancel(id, delete_timeout=3600, now=Process.clock_gettime(Process::CLOCK_REALTIME, :second)) connect { - n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout-DELETE_OFFSET, id].update + n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND #{EVENT_HORIZON} < timeout;", now+delete_timeout-DELETE_OFFSET, id].update return n > 0 } end diff --git a/lib/perfectqueue/backend/rdb_compat.rb b/lib/perfectqueue/backend/rdb_compat.rb index c28d921..f716f63 100644 --- a/lib/perfectqueue/backend/rdb_compat.rb +++ b/lib/perfectqueue/backend/rdb_compat.rb @@ -284,7 +284,7 @@ def finish(task_token, retention_time, options) key = task_token.key connect { - n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL", delete_timeout, key].update + n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND #{EVENT_HORIZON} < timeout", delete_timeout, key].update if n <= 0 raise IdempotentAlreadyFinishedError, "task key=#{key} does not exist or already finished." end @@ -305,8 +305,14 @@ def heartbeat(task_token, alive_time, options) sql << ", data=?" params << compress_data(data.to_json, options[:compression]) end - sql << " WHERE id=? AND created_at IS NOT NULL" - params << key + if last_timeout = options[:last_timeout] + sql << " WHERE id=? AND timeout=?" + params << key + params << last_timeout + else + sql << " WHERE id=? AND #{EVENT_HORIZON} < timeout" + params << key + end connect { n = @db[*params].update @@ -315,7 +321,9 @@ def heartbeat(task_token, alive_time, options) if row == nil raise PreemptedError, "task key=#{key} does not exist or preempted." elsif row[:created_at] == nil - raise PreemptedError, "task key=#{key} preempted." + raise PreemptedError, "task key=#{key} is finished or canceled" + elsif options[:last_timeout] && row[:timeout] != options[:last_timeout] + raise PreemptedError, "task key=#{key} is preempted by another worker." else # row[:timeout] == next_timeout # ok end From fbdc3bbf5f3b418acdbf4d06e3814f62ad7eee12 Mon Sep 17 00:00:00 2001 From: "NARUSE, Yui" Date: Tue, 13 Sep 2016 20:53:42 +0900 Subject: [PATCH 3/9] Sleep until monitor thread sleeps pass thread execution until monitor thread send heartbeat (or wait while current alive time is available) and sleep for next heartbeat. --- lib/perfectqueue/task_monitor.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/perfectqueue/task_monitor.rb b/lib/perfectqueue/task_monitor.rb index 16a6ab6..545d703 100644 --- a/lib/perfectqueue/task_monitor.rb +++ b/lib/perfectqueue/task_monitor.rb @@ -60,6 +60,7 @@ def set_task(task, runner) @mutex.synchronize { @task = task @last_task_heartbeat = @task.timeout.to_i + Thread.pass until @thread.stop? if @thread } end From e2f251aa1f11233785b0bcb3083f72b1627fed81 Mon Sep 17 00:00:00 2001 From: "NARUSE, Yui" Date: Tue, 13 Sep 2016 20:56:22 +0900 Subject: [PATCH 4/9] fix typo and add specs for stolen tasks --- lib/perfectqueue/backend/rdb_compat.rb | 6 ++--- spec/rdb_compat_backend_spec.rb | 8 +++--- spec/task_monitor_spec.rb | 35 +++++++++++++++++++++++--- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/lib/perfectqueue/backend/rdb_compat.rb b/lib/perfectqueue/backend/rdb_compat.rb index f716f63..9b02d25 100644 --- a/lib/perfectqueue/backend/rdb_compat.rb +++ b/lib/perfectqueue/backend/rdb_compat.rb @@ -305,10 +305,10 @@ def heartbeat(task_token, alive_time, options) sql << ", data=?" params << compress_data(data.to_json, options[:compression]) end - if last_timeout = options[:last_timeout] + if last_heartbeat = options[:last_heartbeat] sql << " WHERE id=? AND timeout=?" params << key - params << last_timeout + params << last_heartbeat else sql << " WHERE id=? AND #{EVENT_HORIZON} < timeout" params << key @@ -322,7 +322,7 @@ def heartbeat(task_token, alive_time, options) raise PreemptedError, "task key=#{key} does not exist or preempted." elsif row[:created_at] == nil raise PreemptedError, "task key=#{key} is finished or canceled" - elsif options[:last_timeout] && row[:timeout] != options[:last_timeout] + elsif options[:last_heartbeat] && row[:timeout] != options[:last_heartbeat] raise PreemptedError, "task key=#{key} is preempted by another worker." else # row[:timeout] == next_timeout # ok diff --git a/spec/rdb_compat_backend_spec.rb b/spec/rdb_compat_backend_spec.rb index f3f1975..fb48745 100644 --- a/spec/rdb_compat_backend_spec.rb +++ b/spec/rdb_compat_backend_spec.rb @@ -304,8 +304,8 @@ it 'returns nil even if next_run_time is updated' do expect(db.heartbeat(task_token, 1, {})).to be_a(Integer) end - it 'raises PreemptedError if last_timeout is not matched' do - expect{db.heartbeat(task_token, 1, {last_timeout: now-100})}.to raise_error(PreemptedError) + it 'raises PreemptedError if last_heartbeat is not matched' do + expect{db.heartbeat(task_token, 1, {last_heartbeat: now-100})}.to raise_error(PreemptedError) end end context 'no tasks' do @@ -323,8 +323,8 @@ end end context 'stolen task' do - it 'raises PreemptedError if the task has unpexpected last_timeout' do - expect{db.heartbeat(task_token, 0, last_timeout: 0)}.to raise_error(PreemptedError) + it 'raises PreemptedError if the task has unpexpected last_heartbeat' do + expect{db.heartbeat(task_token, 0, last_heartbeat: 0)}.to raise_error(PreemptedError) end end end diff --git a/spec/task_monitor_spec.rb b/spec/task_monitor_spec.rb index b9140f1..2156ff1 100644 --- a/spec/task_monitor_spec.rb +++ b/spec/task_monitor_spec.rb @@ -39,6 +39,8 @@ end describe '#task_heartbeat' do + let (:config){ {type: 'rdb_compat', url: 'mysql2://root:@localhost/perfectqueue_test', table: 'test_queues', alive_time: 11} } + let (:client){ Client.new(config) } let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object, task_heartbeat_interval: 1) } let (:err){ StandardError.new('heartbeat preempted') } let (:now){ Time.now.to_i } @@ -52,8 +54,6 @@ tm.__send__(:task_heartbeat) end context 'normal' do - let (:config){ {type: 'rdb_compat', url: 'mysql2://root:@localhost/perfectqueue_test', table: 'test_queues', alive_time: 11} } - let (:client){ Client.new(config) } before do client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } } client.init_database @@ -69,10 +69,39 @@ expect(task.timeout.to_i).to eq(now-80+config[:alive_time]) tm.set_task(task, double('runner')) allow(Time).to receive(:now).and_return(now-50) - sleep 1 until tm.instance_variable_get(:@last_task_heartbeat) == now-50 + Timeout.timeout(5) do + sleep 0.5 until tm.instance_variable_get(:@last_task_heartbeat) == now-50 + end expect(task.timeout.to_i).to eq(now-50+config[:alive_time]) end end + context 'stolen' do + before do + client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } } + client.init_database + client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'}) + tm.start + end + after do + tm.stop + end + it 'raise error' do + tasks = client.acquire(now: now-80) + task1 = tasks[0] + expect(task1.timeout.to_i).to eq(now-80+config[:alive_time]) + + tasks = client.acquire(now: now-60) + task2 = tasks[0] + expect(task2.timeout.to_i).to eq(now-60+config[:alive_time]) + + tm.set_task(task1, double('runner')) + allow(Time).to receive(:now).and_return(now-50) + + flag = false + expect(task1.runner).to receive(:kill){flag = true} + Timeout.timeout(5){ sleep 0.5 until flag } + end + end end end From fb0864f2660ad898121926467f27907ff94d1a7d Mon Sep 17 00:00:00 2001 From: "NARUSE, Yui" Date: Wed, 14 Sep 2016 16:01:46 +0900 Subject: [PATCH 5/9] Wait while the task is alive To confirm task is alive, wait task_heartbeat in set_task. --- lib/perfectqueue/task_metadata.rb | 4 +++ lib/perfectqueue/task_monitor.rb | 20 ++++++------ spec/multiprocess/child_process_spec.rb | 2 +- spec/task_monitor_spec.rb | 41 +++++++++++++++++-------- 4 files changed, 42 insertions(+), 25 deletions(-) diff --git a/lib/perfectqueue/task_metadata.rb b/lib/perfectqueue/task_metadata.rb index 2caad60..105f2de 100644 --- a/lib/perfectqueue/task_metadata.rb +++ b/lib/perfectqueue/task_metadata.rb @@ -57,6 +57,10 @@ def timeout end end + def last_heartbeat + @attributes[:timeout] || 0 + end + def finished? status == TaskStatus::FINISHED end diff --git a/lib/perfectqueue/task_monitor.rb b/lib/perfectqueue/task_monitor.rb index 545d703..29a21ba 100644 --- a/lib/perfectqueue/task_monitor.rb +++ b/lib/perfectqueue/task_monitor.rb @@ -28,7 +28,6 @@ def initialize(config, child_heartbeat=nil, force_stop=nil) @child_heartbeat_interval = (@config[:child_heartbeat_interval] || 2).to_i @task_heartbeat_interval = (@config[:task_heartbeat_interval] || 2).to_i @last_child_heartbeat = Time.now.to_i - @last_task_heartbeat = Time.now.to_i @task = nil @@ -59,9 +58,11 @@ def set_task(task, runner) task.runner = runner @mutex.synchronize { @task = task - @last_task_heartbeat = @task.timeout.to_i - Thread.pass until @thread.stop? if @thread } + now = Time.now.to_i + while @task && @task.last_heartbeat + @task_heartbeat_interval < now + sleep 1 + end end def stop_task(immediate) @@ -102,7 +103,6 @@ def external_task_heartbeat(task, &block) @mutex.synchronize { if task == @task ret = block.call if block - @last_task_heartbeat = Time.now.to_i end ret } @@ -116,10 +116,9 @@ def run next_child_heartbeat = @last_child_heartbeat + @child_heartbeat_interval if @task - next_task_heartbeat = @last_task_heartbeat + @task_heartbeat_interval + next_task_heartbeat = @task.last_heartbeat + @task_heartbeat_interval next_time = [next_child_heartbeat, next_task_heartbeat].min else - next_task_heartbeat = nil next_time = next_child_heartbeat end @@ -127,9 +126,8 @@ def run @cond.wait(next_wait) if next_wait > 0 now = Time.now.to_i - if @task && next_task_heartbeat && next_task_heartbeat <= now + if @task && @task.last_heartbeat + @task_heartbeat_interval <= now task_heartbeat - @last_task_heartbeat = now end if next_child_heartbeat <= now @@ -146,12 +144,12 @@ def run private def task_heartbeat - v = @task.heartbeat!(last_heartbeat: @last_task_heartbeat) - @task.attributes[:timeout] = v - v + task = @task + task.attributes[:timeout] = task.heartbeat!(last_heartbeat: task.last_heartbeat) rescue # finished, preempted, etc. kill_task($!) + @task = nil end end diff --git a/spec/multiprocess/child_process_spec.rb b/spec/multiprocess/child_process_spec.rb index 6537f2e..97a65f6 100644 --- a/spec/multiprocess/child_process_spec.rb +++ b/spec/multiprocess/child_process_spec.rb @@ -73,7 +73,7 @@ end describe '#process' do - let (:task){ double('task', key: double, timeout: Time.now.to_i) } + let (:task){ double('task', key: double, last_heartbeat: Time.now.to_i) } before do expect(runner_insntace).to receive(:run) end diff --git a/spec/task_monitor_spec.rb b/spec/task_monitor_spec.rb index 2156ff1..c0f3c89 100644 --- a/spec/task_monitor_spec.rb +++ b/spec/task_monitor_spec.rb @@ -23,7 +23,6 @@ ret = double('ret') tm.instance_variable_set(:@task, task) expect(tm.external_task_heartbeat(task){ret}).to eq(ret) - expect(tm.instance_variable_get(:@last_task_heartbeat)).to eq(epoch) end end @@ -44,7 +43,8 @@ let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object, task_heartbeat_interval: 1) } let (:err){ StandardError.new('heartbeat preempted') } let (:now){ Time.now.to_i } - let (:task){ double('task', timeout: now) } + let (:task){ double('task', attributes: {}, last_heartbeat: now) } + let (:runner){ double('runner') } before do tm.set_task(task, double('runner')) end @@ -66,13 +66,10 @@ it 'update timeout' do tasks = client.acquire(now: now-80) task = tasks[0] - expect(task.timeout.to_i).to eq(now-80+config[:alive_time]) - tm.set_task(task, double('runner')) + expect(task.last_heartbeat).to eq(now-80+config[:alive_time]) allow(Time).to receive(:now).and_return(now-50) - Timeout.timeout(5) do - sleep 0.5 until tm.instance_variable_get(:@last_task_heartbeat) == now-50 - end - expect(task.timeout.to_i).to eq(now-50+config[:alive_time]) + tm.set_task(task, runner) + expect(task.last_heartbeat).to eq(now-50+config[:alive_time]) end end context 'stolen' do @@ -94,12 +91,30 @@ task2 = tasks[0] expect(task2.timeout.to_i).to eq(now-60+config[:alive_time]) - tm.set_task(task1, double('runner')) allow(Time).to receive(:now).and_return(now-50) + expect(runner).to receive(:kill) + tm.set_task(task1, runner) + end + end + context 'timeout but can acquire' do + before do + client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } } + client.init_database + client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'}) + tm.start + end + after do + tm.stop + end + it 'raise error' do + tasks = client.acquire(now: now-80) + task1 = tasks[0] + expect(task1.timeout.to_i).to eq(now-80+config[:alive_time]) + + allow(Time).to receive(:now).and_return(now-50) + tm.set_task(task1, runner) - flag = false - expect(task1.runner).to receive(:kill){flag = true} - Timeout.timeout(5){ sleep 0.5 until flag } + expect(task1.runner).to eq(runner) end end end @@ -107,7 +122,7 @@ describe PerfectQueue::TaskMonitorHook do let (:task) do - obj = AcquiredTask.new(double(:client).as_null_object, 'key', {}, double) + obj = AcquiredTask.new(double(:client).as_null_object, 'key', {timeout: Time.now.to_i}, double) tm = TaskMonitor.new(logger: double('logger').as_null_object) tm.set_task(obj, double('runner')) obj From 8fb4174fb764b1d7a2cacfae97ad4cf56f9c63f3 Mon Sep 17 00:00:00 2001 From: "NARUSE, Yui" Date: Wed, 14 Sep 2016 17:43:29 +0900 Subject: [PATCH 6/9] Change TaskMetadata#timeout to Unix epoch integer I thought timeout's meaning is changed in v0.9, now I can change its type. --- lib/perfectqueue/task.rb | 2 +- lib/perfectqueue/task_metadata.rb | 10 +++------ lib/perfectqueue/task_monitor.rb | 8 +++---- spec/multiprocess/child_process_spec.rb | 2 +- spec/rdb_compat_backend_spec.rb | 10 ++++++--- spec/task_metadata_spec.rb | 13 ++++++++++-- spec/task_monitor_spec.rb | 28 +++++++++++++------------ 7 files changed, 42 insertions(+), 31 deletions(-) diff --git a/lib/perfectqueue/task.rb b/lib/perfectqueue/task.rb index 1723fae..c94db80 100644 --- a/lib/perfectqueue/task.rb +++ b/lib/perfectqueue/task.rb @@ -72,7 +72,7 @@ def initialize(client, key, attributes, task_token) end def heartbeat!(options={}) - @client.heartbeat(@task_token, options) + self.timeout = @client.heartbeat(@task_token, options.merge(last_heartbeat: timeout)) end def finish!(options={}) diff --git a/lib/perfectqueue/task_metadata.rb b/lib/perfectqueue/task_metadata.rb index 105f2de..f662478 100644 --- a/lib/perfectqueue/task_metadata.rb +++ b/lib/perfectqueue/task_metadata.rb @@ -50,15 +50,11 @@ def created_at end def timeout - if t = @attributes[:timeout] - return Time.at(t) - else - return nil - end + @attributes[:timeout] end - def last_heartbeat - @attributes[:timeout] || 0 + def timeout=(v) + @attributes[:timeout] = v end def finished? diff --git a/lib/perfectqueue/task_monitor.rb b/lib/perfectqueue/task_monitor.rb index 29a21ba..23e81e9 100644 --- a/lib/perfectqueue/task_monitor.rb +++ b/lib/perfectqueue/task_monitor.rb @@ -60,7 +60,7 @@ def set_task(task, runner) @task = task } now = Time.now.to_i - while @task && @task.last_heartbeat + @task_heartbeat_interval < now + while @task && @task.timeout + @task_heartbeat_interval < now sleep 1 end end @@ -116,7 +116,7 @@ def run next_child_heartbeat = @last_child_heartbeat + @child_heartbeat_interval if @task - next_task_heartbeat = @task.last_heartbeat + @task_heartbeat_interval + next_task_heartbeat = @task.timeout + @task_heartbeat_interval next_time = [next_child_heartbeat, next_task_heartbeat].min else next_time = next_child_heartbeat @@ -126,7 +126,7 @@ def run @cond.wait(next_wait) if next_wait > 0 now = Time.now.to_i - if @task && @task.last_heartbeat + @task_heartbeat_interval <= now + if @task && @task.timeout + @task_heartbeat_interval <= now task_heartbeat end @@ -145,7 +145,7 @@ def run private def task_heartbeat task = @task - task.attributes[:timeout] = task.heartbeat!(last_heartbeat: task.last_heartbeat) + task.heartbeat! rescue # finished, preempted, etc. kill_task($!) diff --git a/spec/multiprocess/child_process_spec.rb b/spec/multiprocess/child_process_spec.rb index 97a65f6..6537f2e 100644 --- a/spec/multiprocess/child_process_spec.rb +++ b/spec/multiprocess/child_process_spec.rb @@ -73,7 +73,7 @@ end describe '#process' do - let (:task){ double('task', key: double, last_heartbeat: Time.now.to_i) } + let (:task){ double('task', key: double, timeout: Time.now.to_i) } before do expect(runner_insntace).to receive(:run) end diff --git a/spec/rdb_compat_backend_spec.rb b/spec/rdb_compat_backend_spec.rb index fb48745..27f5ebb 100644 --- a/spec/rdb_compat_backend_spec.rb +++ b/spec/rdb_compat_backend_spec.rb @@ -242,9 +242,13 @@ expect(ary[3]).to be_an_instance_of(AcquiredTask) expect(ary[4]).to be_an_instance_of(AcquiredTask) - now1 = Time.at(now + alive_time) - expect(now1).to receive(:to_time).exactly(5).times.and_call_original - db.list({}){|task| expect(task.timeout).to eq now1.to_time } + now1 = now + alive_time + i = 0 + db.list({}) do |task| + expect(task.timeout).to eq now1 + i += 1 + end + expect(i).to eq(5) end end end diff --git a/spec/task_metadata_spec.rb b/spec/task_metadata_spec.rb index 1619a3d..a4342f7 100644 --- a/spec/task_metadata_spec.rb +++ b/spec/task_metadata_spec.rb @@ -60,10 +60,19 @@ end describe 'timeout' do - it 'returns a time of given timeout' do + it 'returns given timeout' do epoch = 72 tm = TaskMetadata.new(double, double, timeout: epoch) - expect(tm.timeout).to eq(Time.at(epoch)) + expect(tm.timeout).to eq(epoch) + end + end + + describe 'timeout=' do + it 'sets timeout' do + epoch = 72 + tm = TaskMetadata.new(double, double, timeout: 1) + tm.timeout = epoch + expect(tm.timeout).to eq(epoch) end end end diff --git a/spec/task_monitor_spec.rb b/spec/task_monitor_spec.rb index c0f3c89..707c6b0 100644 --- a/spec/task_monitor_spec.rb +++ b/spec/task_monitor_spec.rb @@ -43,15 +43,17 @@ let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object, task_heartbeat_interval: 1) } let (:err){ StandardError.new('heartbeat preempted') } let (:now){ Time.now.to_i } - let (:task){ double('task', attributes: {}, last_heartbeat: now) } + let (:task){ double('task', timeout: now) } let (:runner){ double('runner') } - before do - tm.set_task(task, double('runner')) - end - it 'calls kill_task($!) on heartbeat error' do - allow(task).to receive(:heartbeat!){ raise err } - expect(tm).to receive(:kill_task).with(err).exactly(:once) - tm.__send__(:task_heartbeat) + context 'timeout' do + before do + tm.set_task(task, double('runner')) + end + it 'calls kill_task($!) on heartbeat error' do + allow(task).to receive(:heartbeat!){ raise err } + expect(tm).to receive(:kill_task).with(err).exactly(:once) + tm.__send__(:task_heartbeat) + end end context 'normal' do before do @@ -66,10 +68,10 @@ it 'update timeout' do tasks = client.acquire(now: now-80) task = tasks[0] - expect(task.last_heartbeat).to eq(now-80+config[:alive_time]) + expect(task.timeout).to eq(now-80+config[:alive_time]) allow(Time).to receive(:now).and_return(now-50) tm.set_task(task, runner) - expect(task.last_heartbeat).to eq(now-50+config[:alive_time]) + expect(task.timeout).to eq(now-50+config[:alive_time]) end end context 'stolen' do @@ -85,11 +87,11 @@ it 'raise error' do tasks = client.acquire(now: now-80) task1 = tasks[0] - expect(task1.timeout.to_i).to eq(now-80+config[:alive_time]) + expect(task1.timeout).to eq(now-80+config[:alive_time]) tasks = client.acquire(now: now-60) task2 = tasks[0] - expect(task2.timeout.to_i).to eq(now-60+config[:alive_time]) + expect(task2.timeout).to eq(now-60+config[:alive_time]) allow(Time).to receive(:now).and_return(now-50) expect(runner).to receive(:kill) @@ -109,7 +111,7 @@ it 'raise error' do tasks = client.acquire(now: now-80) task1 = tasks[0] - expect(task1.timeout.to_i).to eq(now-80+config[:alive_time]) + expect(task1.timeout).to eq(now-80+config[:alive_time]) allow(Time).to receive(:now).and_return(now-50) tm.set_task(task1, runner) From b6fd7ff491d0a63a1a027f5dc073cff591afae38 Mon Sep 17 00:00:00 2001 From: "NARUSE, Yui" Date: Thu, 15 Sep 2016 05:37:01 +0900 Subject: [PATCH 7/9] Set last_heartbeat on set_task On prefetch, Processor keeps when it acquires the task. --- .../multiprocess/thread_processor.rb | 7 +++--- lib/perfectqueue/task_monitor.rb | 22 +++++++++++-------- spec/task_monitor_spec.rb | 22 ++++++++++++------- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/lib/perfectqueue/multiprocess/thread_processor.rb b/lib/perfectqueue/multiprocess/thread_processor.rb index eb2cde7..718f55b 100644 --- a/lib/perfectqueue/multiprocess/thread_processor.rb +++ b/lib/perfectqueue/multiprocess/thread_processor.rb @@ -96,8 +96,9 @@ def run_loop @finish_flag.wait(@poll_interval) else begin + last_heartbeat = Time.now.to_i while task = tasks.shift - process(task) + process(task, last_heartbeat) end ensure # TODO do not call release! because rdb_compat backend @@ -120,11 +121,11 @@ def run_loop @tm.stop end - def process(task) + def process(task, last_heartbeat=Time.now.to_i) @log.info "acquired task task=#{task.key} id=#{@processor_id}: #{task.inspect}" begin r = @runner.new(task) - @tm.set_task(task, r) + @tm.set_task(task, r, last_heartbeat) begin r.run ensure diff --git a/lib/perfectqueue/task_monitor.rb b/lib/perfectqueue/task_monitor.rb index 23e81e9..0cbcc6f 100644 --- a/lib/perfectqueue/task_monitor.rb +++ b/lib/perfectqueue/task_monitor.rb @@ -28,6 +28,7 @@ def initialize(config, child_heartbeat=nil, force_stop=nil) @child_heartbeat_interval = (@config[:child_heartbeat_interval] || 2).to_i @task_heartbeat_interval = (@config[:task_heartbeat_interval] || 2).to_i @last_child_heartbeat = Time.now.to_i + @last_task_heartbeat = Time.now.to_i @task = nil @@ -51,18 +52,22 @@ def join @thread.join end - def set_task(task, runner) + def set_task(task, runner, last_heartbeat=Time.now.to_i) task.extend(TaskMonitorHook) task.log = @log task.task_monitor = self task.runner = runner @mutex.synchronize { @task = task + @last_task_heartbeat = last_heartbeat + @cond.broadcast } now = Time.now.to_i - while @task && @task.timeout + @task_heartbeat_interval < now - sleep 1 - end + Timeout.timeout(60) do + while @task && @last_task_heartbeat + @task_heartbeat_interval < now + sleep 1 + end + end rescue nil end def stop_task(immediate) @@ -116,7 +121,7 @@ def run next_child_heartbeat = @last_child_heartbeat + @child_heartbeat_interval if @task - next_task_heartbeat = @task.timeout + @task_heartbeat_interval + next_task_heartbeat = @last_task_heartbeat + @task_heartbeat_interval next_time = [next_child_heartbeat, next_task_heartbeat].min else next_time = next_child_heartbeat @@ -126,8 +131,9 @@ def run @cond.wait(next_wait) if next_wait > 0 now = Time.now.to_i - if @task && @task.timeout + @task_heartbeat_interval <= now + if @task && @last_task_heartbeat + @task_heartbeat_interval <= now task_heartbeat + @last_task_heartbeat = now end if next_child_heartbeat <= now @@ -144,12 +150,10 @@ def run private def task_heartbeat - task = @task - task.heartbeat! + @task.heartbeat! rescue # finished, preempted, etc. kill_task($!) - @task = nil end end diff --git a/spec/task_monitor_spec.rb b/spec/task_monitor_spec.rb index 707c6b0..7423afa 100644 --- a/spec/task_monitor_spec.rb +++ b/spec/task_monitor_spec.rb @@ -64,13 +64,14 @@ end after do tm.stop + tm.join end it 'update timeout' do tasks = client.acquire(now: now-80) task = tasks[0] expect(task.timeout).to eq(now-80+config[:alive_time]) allow(Time).to receive(:now).and_return(now-50) - tm.set_task(task, runner) + tm.set_task(task, runner, now-80) expect(task.timeout).to eq(now-50+config[:alive_time]) end end @@ -83,19 +84,23 @@ end after do tm.stop + tm.join end it 'raise error' do - tasks = client.acquire(now: now-80) + now1 = now - 80 + tasks = client.acquire(now: now1) task1 = tasks[0] - expect(task1.timeout).to eq(now-80+config[:alive_time]) + expect(task1.timeout).to eq(now1+config[:alive_time]) - tasks = client.acquire(now: now-60) + now2 = now - 50 + tasks = client.acquire(now: now2) task2 = tasks[0] - expect(task2.timeout).to eq(now-60+config[:alive_time]) + expect(task2.timeout).to eq(now2+config[:alive_time]) - allow(Time).to receive(:now).and_return(now-50) + now3 = now - 20 + allow(Time).to receive(:now).and_return(now3) expect(runner).to receive(:kill) - tm.set_task(task1, runner) + tm.set_task(task1, runner, now1) end end context 'timeout but can acquire' do @@ -107,6 +112,7 @@ end after do tm.stop + tm.join end it 'raise error' do tasks = client.acquire(now: now-80) @@ -114,7 +120,7 @@ expect(task1.timeout).to eq(now-80+config[:alive_time]) allow(Time).to receive(:now).and_return(now-50) - tm.set_task(task1, runner) + tm.set_task(task1, runner, now-50) expect(task1.runner).to eq(runner) end From 884dfc4770d79a299c18ff5ba5fe4bc06720c264 Mon Sep 17 00:00:00 2001 From: "NARUSE, Yui" Date: Thu, 15 Sep 2016 17:58:41 +0900 Subject: [PATCH 8/9] simplify arround timeout --- lib/perfectqueue/task.rb | 2 +- lib/perfectqueue/task_metadata.rb | 4 ---- spec/task_metadata_spec.rb | 9 --------- 3 files changed, 1 insertion(+), 14 deletions(-) diff --git a/lib/perfectqueue/task.rb b/lib/perfectqueue/task.rb index c94db80..84e28f7 100644 --- a/lib/perfectqueue/task.rb +++ b/lib/perfectqueue/task.rb @@ -72,7 +72,7 @@ def initialize(client, key, attributes, task_token) end def heartbeat!(options={}) - self.timeout = @client.heartbeat(@task_token, options.merge(last_heartbeat: timeout)) + @attributes[:timeout] = @client.heartbeat(@task_token, options.merge(last_heartbeat: timeout)) end def finish!(options={}) diff --git a/lib/perfectqueue/task_metadata.rb b/lib/perfectqueue/task_metadata.rb index f662478..fad3032 100644 --- a/lib/perfectqueue/task_metadata.rb +++ b/lib/perfectqueue/task_metadata.rb @@ -53,10 +53,6 @@ def timeout @attributes[:timeout] end - def timeout=(v) - @attributes[:timeout] = v - end - def finished? status == TaskStatus::FINISHED end diff --git a/spec/task_metadata_spec.rb b/spec/task_metadata_spec.rb index a4342f7..3d5a6db 100644 --- a/spec/task_metadata_spec.rb +++ b/spec/task_metadata_spec.rb @@ -66,13 +66,4 @@ expect(tm.timeout).to eq(epoch) end end - - describe 'timeout=' do - it 'sets timeout' do - epoch = 72 - tm = TaskMetadata.new(double, double, timeout: 1) - tm.timeout = epoch - expect(tm.timeout).to eq(epoch) - end - end end From 93bbb208754d1d4dca394826087662725fe60b65 Mon Sep 17 00:00:00 2001 From: "NARUSE, Yui" Date: Fri, 16 Sep 2016 14:18:40 +0900 Subject: [PATCH 9/9] remove Timeout.timeout --- lib/perfectqueue/task_monitor.rb | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/perfectqueue/task_monitor.rb b/lib/perfectqueue/task_monitor.rb index 0cbcc6f..d34f682 100644 --- a/lib/perfectqueue/task_monitor.rb +++ b/lib/perfectqueue/task_monitor.rb @@ -63,11 +63,9 @@ def set_task(task, runner, last_heartbeat=Time.now.to_i) @cond.broadcast } now = Time.now.to_i - Timeout.timeout(60) do - while @task && @last_task_heartbeat + @task_heartbeat_interval < now - sleep 1 - end - end rescue nil + while @task && @last_task_heartbeat + @task_heartbeat_interval < now + sleep 1 + end end def stop_task(immediate)