Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check timeout on acquire #52

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/perfectqueue/backend/rdb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module PerfectQueue::Backend
class RDBBackend
MAX_RETRY = ::PerfectQueue::Backend::RDBCompatBackend::MAX_RETRY
DELETE_OFFSET = ::PerfectQueue::Backend::RDBCompatBackend::DELETE_OFFSET
EVENT_HORIZON = ::PerfectQueue::Backend::RDBCompatBackend::EVENT_HORIZON
class Token < Struct.new(:key)
end

Expand Down Expand Up @@ -48,7 +49,7 @@ def submit(id, data, time=Process.clock_gettime(Process::CLOCK_REALTIME, :second

def cancel(id, delete_timeout=3600, now=Process.clock_gettime(Process::CLOCK_REALTIME, :second))
connect {
n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout-DELETE_OFFSET, id].update
n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND #{EVENT_HORIZON} < timeout;", now+delete_timeout-DELETE_OFFSET, id].update
return n > 0
}
end
Expand Down
20 changes: 14 additions & 6 deletions lib/perfectqueue/backend/rdb_compat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,15 @@ def finish(task_token, retention_time, options)
key = task_token.key

connect {
n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL", delete_timeout, key].update
n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND #{EVENT_HORIZON} < timeout", delete_timeout, key].update
if n <= 0
raise IdempotentAlreadyFinishedError, "task key=#{key} does not exist or already finished."
end
}
nil
end

# => nil
# => next_timeout
def heartbeat(task_token, alive_time, options)
now = (options[:now] || Time.now).to_i
next_timeout = now + alive_time
Expand All @@ -305,8 +305,14 @@ def heartbeat(task_token, alive_time, options)
sql << ", data=?"
params << compress_data(data.to_json, options[:compression])
end
sql << " WHERE id=? AND created_at IS NOT NULL"
params << key
if last_heartbeat = options[:last_heartbeat]
sql << " WHERE id=? AND timeout=?"
params << key
params << last_heartbeat
else
sql << " WHERE id=? AND #{EVENT_HORIZON} < timeout"
params << key
end

connect {
n = @db[*params].update
Expand All @@ -315,13 +321,15 @@ def heartbeat(task_token, alive_time, options)
if row == nil
raise PreemptedError, "task key=#{key} does not exist or preempted."
elsif row[:created_at] == nil
raise PreemptedError, "task key=#{key} preempted."
raise PreemptedError, "task key=#{key} is finished or canceled"
elsif options[:last_heartbeat] && row[:timeout] != options[:last_heartbeat]
raise PreemptedError, "task key=#{key} is preempted by another worker."
else # row[:timeout] == next_timeout
# ok
end
end
}
nil
next_timeout
end

def release(task_token, alive_time, options)
Expand Down
7 changes: 4 additions & 3 deletions lib/perfectqueue/multiprocess/thread_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ def run_loop
@finish_flag.wait(@poll_interval)
else
begin
last_heartbeat = Time.now.to_i
while task = tasks.shift
process(task)
process(task, last_heartbeat)
end
ensure
# TODO do not call release! because rdb_compat backend
Expand All @@ -120,11 +121,11 @@ def run_loop
@tm.stop
end

def process(task)
def process(task, last_heartbeat=Time.now.to_i)
@log.info "acquired task task=#{task.key} id=#{@processor_id}: #{task.inspect}"
begin
r = @runner.new(task)
@tm.set_task(task, r)
@tm.set_task(task, r, last_heartbeat)
begin
r.run
ensure
Expand Down
2 changes: 1 addition & 1 deletion lib/perfectqueue/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def initialize(client, key, attributes, task_token)
end

def heartbeat!(options={})
@client.heartbeat(@task_token, options)
@attributes[:timeout] = @client.heartbeat(@task_token, options.merge(last_heartbeat: timeout))
end

def finish!(options={})
Expand Down
6 changes: 1 addition & 5 deletions lib/perfectqueue/task_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@ def created_at
end

def timeout
if t = @attributes[:timeout]
return Time.at(t)
else
return nil
end
@attributes[:timeout]
end

def finished?
Expand Down
13 changes: 8 additions & 5 deletions lib/perfectqueue/task_monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,20 @@ def join
@thread.join
end

def set_task(task, runner)
def set_task(task, runner, last_heartbeat=Time.now.to_i)
task.extend(TaskMonitorHook)
task.log = @log
task.task_monitor = self
task.runner = runner
@mutex.synchronize {
@task = task
@last_task_heartbeat = Time.now.to_i
@last_task_heartbeat = last_heartbeat
@cond.broadcast
}
now = Time.now.to_i
while @task && @last_task_heartbeat + @task_heartbeat_interval < now
sleep 1
end
end

def stop_task(immediate)
Expand Down Expand Up @@ -101,7 +106,6 @@ def external_task_heartbeat(task, &block)
@mutex.synchronize {
if task == @task
ret = block.call if block
@last_task_heartbeat = Time.now.to_i
end
ret
}
Expand All @@ -118,15 +122,14 @@ def run
next_task_heartbeat = @last_task_heartbeat + @task_heartbeat_interval
next_time = [next_child_heartbeat, next_task_heartbeat].min
else
next_task_heartbeat = nil
next_time = next_child_heartbeat
end

next_wait = next_time - now
@cond.wait(next_wait) if next_wait > 0

now = Time.now.to_i
if @task && next_task_heartbeat && next_task_heartbeat <= now
if @task && @last_task_heartbeat + @task_heartbeat_interval <= now
task_heartbeat
@last_task_heartbeat = now
end
Expand Down
2 changes: 1 addition & 1 deletion spec/multiprocess/child_process_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
end

describe '#process' do
let (:task){ double('task', key: double) }
let (:task){ double('task', key: double, timeout: Time.now.to_i) }
before do
expect(runner_insntace).to receive(:run)
end
Expand Down
24 changes: 18 additions & 6 deletions spec/rdb_compat_backend_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,13 @@
expect(ary[3]).to be_an_instance_of(AcquiredTask)
expect(ary[4]).to be_an_instance_of(AcquiredTask)

now1 = Time.at(now + alive_time)
expect(now1).to receive(:to_time).exactly(5).times.and_call_original
db.list({}){|task| expect(task.timeout).to eq now1.to_time }
now1 = now + alive_time
i = 0
db.list({}) do |task|
expect(task.timeout).to eq now1
i += 1
end
expect(i).to eq(5)
end
end
end
Expand Down Expand Up @@ -294,15 +298,18 @@
let (:delete_timeout){ now + retention_time }
let (:options){ {now: now} }
before{ allow(STDERR).to receive(:puts) }
context 'have a queueuled task' do
context 'have a queued task' do
before do
db.submit(key, 'test', nil, {})
end
it 'returns nil if next_run_time is not updated' do
expect(db.heartbeat(task_token, 0, {now: now})).to be_nil
expect(db.heartbeat(task_token, 0, {now: now})).to be_a(Integer)
end
it 'returns nil even if next_run_time is updated' do
expect(db.heartbeat(task_token, 1, {})).to be_nil
expect(db.heartbeat(task_token, 1, {})).to be_a(Integer)
end
it 'raises PreemptedError if last_heartbeat is not matched' do
expect{db.heartbeat(task_token, 1, {last_heartbeat: now-100})}.to raise_error(PreemptedError)
end
end
context 'no tasks' do
Expand All @@ -319,6 +326,11 @@
expect{db.heartbeat(task_token, 0, {})}.to raise_error(PreemptedError)
end
end
context 'stolen task' do
it 'raises PreemptedError if the task has unpexpected last_heartbeat' do
expect{db.heartbeat(task_token, 0, last_heartbeat: 0)}.to raise_error(PreemptedError)
end
end
end

context '#connect' do
Expand Down
4 changes: 2 additions & 2 deletions spec/task_metadata_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@
end

describe 'timeout' do
it 'returns a time of given timeout' do
it 'returns given timeout' do
epoch = 72
tm = TaskMetadata.new(double, double, timeout: epoch)
expect(tm.timeout).to eq(Time.at(epoch))
expect(tm.timeout).to eq(epoch)
end
end
end
95 changes: 85 additions & 10 deletions spec/task_monitor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
ret = double('ret')
tm.instance_variable_set(:@task, task)
expect(tm.external_task_heartbeat(task){ret}).to eq(ret)
expect(tm.instance_variable_get(:@last_task_heartbeat)).to eq(epoch)
end
end

Expand All @@ -39,23 +38,99 @@
end

describe '#task_heartbeat' do
let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object) }
let (:config){ {type: 'rdb_compat', url: 'mysql2://root:@localhost/perfectqueue_test', table: 'test_queues', alive_time: 11} }
let (:client){ Client.new(config) }
let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object, task_heartbeat_interval: 1) }
let (:err){ StandardError.new('heartbeat preempted') }
before do
task = double('task')
allow(task).to receive(:heartbeat!){ raise err }
tm.set_task(task, double('runner'))
let (:now){ Time.now.to_i }
let (:task){ double('task', timeout: now) }
let (:runner){ double('runner') }
context 'timeout' do
before do
tm.set_task(task, double('runner'))
end
it 'calls kill_task($!) on heartbeat error' do
allow(task).to receive(:heartbeat!){ raise err }
expect(tm).to receive(:kill_task).with(err).exactly(:once)
tm.__send__(:task_heartbeat)
end
end
context 'normal' do
before do
client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } }
client.init_database
client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'})
tm.start
end
after do
tm.stop
tm.join
end
it 'update timeout' do
tasks = client.acquire(now: now-80)
task = tasks[0]
expect(task.timeout).to eq(now-80+config[:alive_time])
allow(Time).to receive(:now).and_return(now-50)
tm.set_task(task, runner, now-80)
expect(task.timeout).to eq(now-50+config[:alive_time])
end
end
it 'calls kill_task($!) on heartbeat error' do
expect(tm).to receive(:kill_task).with(err).exactly(:once)
tm.__send__(:task_heartbeat)
context 'stolen' do
before do
client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } }
client.init_database
client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'})
tm.start
end
after do
tm.stop
tm.join
end
it 'raise error' do
now1 = now - 80
tasks = client.acquire(now: now1)
task1 = tasks[0]
expect(task1.timeout).to eq(now1+config[:alive_time])

now2 = now - 50
tasks = client.acquire(now: now2)
task2 = tasks[0]
expect(task2.timeout).to eq(now2+config[:alive_time])

now3 = now - 20
allow(Time).to receive(:now).and_return(now3)
expect(runner).to receive(:kill)
tm.set_task(task1, runner, now1)
end
end
context 'timeout but can acquire' do
before do
client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } }
client.init_database
client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'})
tm.start
end
after do
tm.stop
tm.join
end
it 'raise error' do
tasks = client.acquire(now: now-80)
task1 = tasks[0]
expect(task1.timeout).to eq(now-80+config[:alive_time])

allow(Time).to receive(:now).and_return(now-50)
tm.set_task(task1, runner, now-50)

expect(task1.runner).to eq(runner)
end
end
end
end

describe PerfectQueue::TaskMonitorHook do
let (:task) do
obj = AcquiredTask.new(double(:client).as_null_object, 'key', {}, double)
obj = AcquiredTask.new(double(:client).as_null_object, 'key', {timeout: Time.now.to_i}, double)
tm = TaskMonitor.new(logger: double('logger').as_null_object)
tm.set_task(obj, double('runner'))
obj
Expand Down