diff --git a/lib/zhong.rb b/lib/zhong.rb index 1c5ce8f..25e5ff9 100644 --- a/lib/zhong.rb +++ b/lib/zhong.rb @@ -42,15 +42,17 @@ def self.latest_heartbeat end def self.all_heartbeats - heartbeats = redis.hgetall(heartbeat_key) + heartbeats = redis.with { |r| r.hgetall(heartbeat_key) } now = redis_time old_beats, new_beats = heartbeats.partition do |_, v| Time.at(v.to_i) < (now - 15.minutes) end - redis.multi do - old_beats.each { |b| redis.hdel(heartbeat_key, b) } + redis.with do |r| + r.multi do |rr| + old_beats.each { |b| rr.hdel(heartbeat_key, b) } + end end new_beats.map do |k, v| diff --git a/lib/zhong/job.rb b/lib/zhong/job.rb index d9f7be7..39af2d6 100644 --- a/lib/zhong/job.rb +++ b/lib/zhong/job.rb @@ -91,24 +91,24 @@ def running? end def refresh_last_ran - last_ran_val = redis.get(last_ran_key) + last_ran_val = redis.with { |r| r.get(last_ran_key) } @last_ran = last_ran_val ? Time.at(last_ran_val.to_i) : nil end def disable fire_callbacks(:before_disable, self) - redis.set(disabled_key, "true") + redis.with { |r| r.set(disabled_key, "true") } fire_callbacks(:after_disable, self) end def enable fire_callbacks(:before_enable, self) - redis.del(disabled_key) + redis.with { |r| r.del(disabled_key) } fire_callbacks(:after_enable, self) end def disabled? - !redis.get(disabled_key).nil? + !redis.with { |r| r.get(disabled_key) }.nil? end def to_s @@ -122,7 +122,7 @@ def next_at end def clear - redis.del(last_ran_key) + redis.with { |r| r.del(last_ran_key) } end def last_ran_key @@ -152,7 +152,7 @@ def fire_callbacks(event, *args) # if the @at value is changed across runs, the last_run becomes invalid # so clear it def clear_last_ran_if_at_changed - previous_at_msgpack = redis.get(desired_at_key) + previous_at_msgpack = redis.with { |r| r.get(desired_at_key) } if previous_at_msgpack previous_at = At.deserialize(previous_at_msgpack) @@ -163,7 +163,7 @@ def clear_last_ran_if_at_changed end end - redis.set(desired_at_key, @at.serialize) + redis.with { |r| r.set(desired_at_key, @at.serialize) } end def run_every?(time) @@ -180,7 +180,7 @@ def run_if?(time) def ran!(time) @last_ran = time - redis.set(last_ran_key, @last_ran.to_i) + redis.with { |r| r.set(last_ran_key, @last_ran.to_i) } end def redis_lock diff --git a/lib/zhong/scheduler.rb b/lib/zhong/scheduler.rb index 203cd75..9b7ab86 100644 --- a/lib/zhong/scheduler.rb +++ b/lib/zhong/scheduler.rb @@ -113,7 +113,7 @@ def find_by_name(job_name) end def redis_time - s, ms = redis.time # returns [seconds since epoch, microseconds] + s, ms = redis.with(&:time) # returns [seconds since epoch, microseconds] now = Time.at(s + ms / (10**6)) tz ? now.in_time_zone(tz) : now end @@ -145,7 +145,7 @@ def run_job(job, time = redis_time) end def heartbeat(time) - redis.hset(heartbeat_key, heartbeat_field, time.to_i) + redis.with { |r| r.hset(heartbeat_key, heartbeat_field, time.to_i) } end def heartbeat_field diff --git a/lib/zhong/util.rb b/lib/zhong/util.rb index d3c277d..1a580bb 100644 --- a/lib/zhong/util.rb +++ b/lib/zhong/util.rb @@ -4,7 +4,7 @@ def safe_mget(keys) if keys.empty? {} else - Zhong.redis.mapped_mget(*keys) + Zhong.redis.with { |r| r.mapped_mget(*keys) } end end