Skip to content

Commit

Permalink
Use #with to support Redis connection pool.
Browse files Browse the repository at this point in the history
Also fixes deprecation warning on #multi
  • Loading branch information
mlarraz committed Nov 22, 2024
1 parent 09cb7ec commit 69eaef6
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 14 deletions.
8 changes: 5 additions & 3 deletions lib/zhong.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ def self.latest_heartbeat
end

def self.all_heartbeats
heartbeats = redis.hgetall(heartbeat_key)
heartbeats = redis.with { |r| r.hgetall(heartbeat_key) }
now = redis_time

old_beats, new_beats = heartbeats.partition do |_, v|
Time.at(v.to_i) < (now - 15.minutes)
end

redis.multi do
old_beats.each { |b| redis.hdel(heartbeat_key, b) }
redis.with do |r|
r.multi do |rr|
old_beats.each { |b| rr.hdel(heartbeat_key, b) }
end
end

new_beats.map do |k, v|
Expand Down
16 changes: 8 additions & 8 deletions lib/zhong/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,24 @@ def running?
end

def refresh_last_ran
last_ran_val = redis.get(last_ran_key)
last_ran_val = redis.with { |r| r.get(last_ran_key) }
@last_ran = last_ran_val ? Time.at(last_ran_val.to_i) : nil
end

def disable
fire_callbacks(:before_disable, self)
redis.set(disabled_key, "true")
redis.with { |r| r.set(disabled_key, "true") }
fire_callbacks(:after_disable, self)
end

def enable
fire_callbacks(:before_enable, self)
redis.del(disabled_key)
redis.with { |r| r.del(disabled_key) }
fire_callbacks(:after_enable, self)
end

def disabled?
!redis.get(disabled_key).nil?
!redis.with { |r| r.get(disabled_key) }.nil?
end

def to_s
Expand All @@ -122,7 +122,7 @@ def next_at
end

def clear
redis.del(last_ran_key)
redis.with { |r| r.del(last_ran_key) }
end

def last_ran_key
Expand Down Expand Up @@ -152,7 +152,7 @@ def fire_callbacks(event, *args)
# if the @at value is changed across runs, the last_run becomes invalid
# so clear it
def clear_last_ran_if_at_changed
previous_at_msgpack = redis.get(desired_at_key)
previous_at_msgpack = redis.with { |r| r.get(desired_at_key) }

if previous_at_msgpack
previous_at = At.deserialize(previous_at_msgpack)
Expand All @@ -163,7 +163,7 @@ def clear_last_ran_if_at_changed
end
end

redis.set(desired_at_key, @at.serialize)
redis.with { |r| r.set(desired_at_key, @at.serialize) }
end

def run_every?(time)
Expand All @@ -180,7 +180,7 @@ def run_if?(time)

def ran!(time)
@last_ran = time
redis.set(last_ran_key, @last_ran.to_i)
redis.with { |r| r.set(last_ran_key, @last_ran.to_i) }
end

def redis_lock
Expand Down
4 changes: 2 additions & 2 deletions lib/zhong/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def find_by_name(job_name)
end

def redis_time
s, ms = redis.time # returns [seconds since epoch, microseconds]
s, ms = redis.with(&:time) # returns [seconds since epoch, microseconds]
now = Time.at(s + ms / (10**6))
tz ? now.in_time_zone(tz) : now
end
Expand Down Expand Up @@ -145,7 +145,7 @@ def run_job(job, time = redis_time)
end

def heartbeat(time)
redis.hset(heartbeat_key, heartbeat_field, time.to_i)
redis.with { |r| r.hset(heartbeat_key, heartbeat_field, time.to_i) }
end

def heartbeat_field
Expand Down
2 changes: 1 addition & 1 deletion lib/zhong/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ def safe_mget(keys)
if keys.empty?
{}
else
Zhong.redis.mapped_mget(*keys)
Zhong.redis.with { |r| r.mapped_mget(*keys) }
end
end

Expand Down

0 comments on commit 69eaef6

Please sign in to comment.