Skip to content

Commit

Permalink
Reformat for newer rubies
Browse files Browse the repository at this point in the history
  • Loading branch information
ukd1 committed Mar 21, 2024
1 parent 619c937 commit 2578974
Show file tree
Hide file tree
Showing 19 changed files with 314 additions and 347 deletions.
1 change: 0 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# frozen_string_literal: true
source 'https://rubygems.org'

source 'https://rubygems.org' do
gem 'rake'
Expand Down
12 changes: 6 additions & 6 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# frozen_string_literal: true

$:.unshift("lib")
$:.unshift('lib')

require "bundler/gem_tasks"
require "rake/testtask"
require "./lib/queue_classic"
require "./lib/queue_classic/tasks"
require 'bundler/gem_tasks'
require 'rake/testtask'
require './lib/queue_classic'
require './lib/queue_classic/tasks'

task :default => ['test']
task default: ['test']
Rake::TestTask.new do |t|
t.libs << 'test'
t.test_files = FileList['test/**/*_test.rb']
Expand Down
10 changes: 5 additions & 5 deletions lib/generators/queue_classic/install_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ module QC
class InstallGenerator < Rails::Generators::Base
include Rails::Generators::Migration

namespace "queue_classic:install"
self.source_paths << File.join(File.dirname(__FILE__), 'templates')
namespace 'queue_classic:install'
source_paths << File.join(File.dirname(__FILE__), 'templates')
desc 'Generates (but does not run) a migration to add a queue_classic table.'

def self.next_migration_number(dirname)
Expand All @@ -34,9 +34,9 @@ def create_migration_file
migration_template 'update_queue_classic_3_1_0.rb', 'db/migrate/update_queue_classic_3_1_0.rb'
end

if self.class.migration_exists?('db/migrate', 'update_queue_classic_4_0_0').nil?
migration_template 'update_queue_classic_4_0_0.rb', 'db/migrate/update_queue_classic_4_0_0.rb'
end
return unless self.class.migration_exists?('db/migrate', 'update_queue_classic_4_0_0').nil?

migration_template 'update_queue_classic_4_0_0.rb', 'db/migrate/update_queue_classic_4_0_0.rb'
end
end
end
61 changes: 31 additions & 30 deletions lib/queue_classic.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

require_relative "queue_classic/config"
require_relative 'queue_classic/config'

module QC
extend QC::Config
Expand All @@ -9,21 +9,21 @@ module QC
# They should no longer be used. Prefer the corresponding methods.
# See +QC::Config+ for more details.
DEPRECATED_CONSTANTS = {
:APP_NAME => :app_name,
:WAIT_TIME => :wait_time,
:TABLE_NAME => :table_name,
:QUEUE => :queue,
:QUEUES => :queues,
:TOP_BOUND => :top_bound,
:FORK_WORKER => :fork_worker?,
APP_NAME: :app_name,
WAIT_TIME: :wait_time,
TABLE_NAME: :table_name,
QUEUE: :queue,
QUEUES: :queues,
TOP_BOUND: :top_bound,
FORK_WORKER: :fork_worker?
}

def self.const_missing(const_name)
if DEPRECATED_CONSTANTS.key? const_name
config_method = DEPRECATED_CONSTANTS[const_name]
$stderr.puts <<-MSG
The constant QC::#{const_name} is deprecated and will be removed in the future.
Please use the method QC.#{config_method} instead.
warn <<~MSG
The constant QC::#{const_name} is deprecated and will be removed in the future.
Please use the method QC.#{config_method} instead.
MSG
QC.public_send config_method
else
Expand All @@ -42,7 +42,7 @@ def self.method_missing(sym, *args, &block)
end

# Ensure QC.respond_to?(:enqueue) equals true (ruby 1.9 only)
def self.respond_to_missing?(method_name, include_private = false)
def self.respond_to_missing?(method_name, _include_private = false)
default_queue.respond_to?(method_name)
end

Expand All @@ -62,33 +62,33 @@ def self.log_yield(data)
t0 = Time.now
begin
yield
rescue => e
log({:at => "error", :error => e.inspect}.merge(data))
rescue StandardError => e
log({ at: 'error', error: e.inspect }.merge(data))
raise
ensure
t = Integer((Time.now - t0)*1000)
log(data.merge(:elapsed => t)) unless e
t = Integer((Time.now - t0) * 1000)
log(data.merge(elapsed: t)) unless e
end
end

def self.log(data)
result = nil
data = {:lib => "queue-classic"}.merge(data)
data = { lib: 'queue-classic' }.merge(data)
if block_given?
result = yield
data.merge(:elapsed => Integer((Time.now - t0)*1000))
data.merge(elapsed: Integer((Time.now - t0) * 1000))
end
data.reduce(out=String.new) do |s, tup|
s << [tup.first, tup.last].join("=") << " "
data.reduce(out = String.new) do |s, tup|
s << [tup.first, tup.last].join('=') << ' '
end
puts(out) if ENV["DEBUG"]
return result
puts(out) if ENV['DEBUG']
result
end

def self.measure(data)
if ENV['QC_MEASURE']
$stdout.puts("measure#qc.#{data}")
end
return unless ENV['QC_MEASURE']

$stdout.puts("measure#qc.#{data}")
end

# This will unlock all jobs any postgres' PID that is not existing anymore
Expand All @@ -104,12 +104,13 @@ class << self
def rails_connection_sharing_enabled?
enabled = ENV.fetch('QC_RAILS_DATABASE', 'true') != 'false'
return false unless enabled
return Object.const_defined?("ActiveRecord") && ActiveRecord::Base.respond_to?("connection")

Object.const_defined?('ActiveRecord') && ActiveRecord::Base.respond_to?('connection')
end
end
end

require_relative "queue_classic/queue"
require_relative "queue_classic/worker"
require_relative "queue_classic/setup"
require_relative "queue_classic/railtie" if defined?(Rails)
require_relative 'queue_classic/queue'
require_relative 'queue_classic/worker'
require_relative 'queue_classic/setup'
require_relative 'queue_classic/railtie' if defined?(Rails)
19 changes: 9 additions & 10 deletions lib/queue_classic/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ module Config
# postgres related process information in the
# pg_stat_activity table.
def app_name
@app_name ||= ENV["QC_APP_NAME"] || "queue_classic"
@app_name ||= ENV['QC_APP_NAME'] || 'queue_classic'
end

# Number of seconds to block on the listen chanel for new jobs.
def wait_time
@wait_time ||= (ENV["QC_LISTEN_TIME"] || 5).to_i
@wait_time ||= (ENV['QC_LISTEN_TIME'] || 5).to_i
end

# Why do you want to change the table name?
Expand All @@ -20,11 +20,11 @@ def wait_time
# need to update the PL/pgSQL lock_head() function.
# Come on. Don't do it.... Just stick with the default.
def table_name
@table_name ||= "queue_classic_jobs"
@table_name ||= 'queue_classic_jobs'
end

def queue
@queue = ENV["QUEUE"] || "default"
@queue = ENV['QUEUE'] || 'default'
end

# The default queue used by `QC.enqueue`.
Expand All @@ -40,13 +40,13 @@ def default_queue=(queue)
# notes the queue. You can point your workers
# at different queues.
def queues
@queues ||= (ENV["QUEUES"] && ENV["QUEUES"].split(",").map(&:strip)) || []
@queues ||= (ENV['QUEUES'] && ENV['QUEUES'].split(',').map(&:strip)) || []
end

# Set this to 1 for strict FIFO.
# There is nothing special about 9....
def top_bound
@top_bound ||= (ENV["QC_TOP_BOUND"] || 9).to_i
@top_bound ||= (ENV['QC_TOP_BOUND'] || 9).to_i
end

# Set this variable if you wish for
Expand All @@ -55,14 +55,13 @@ def top_bound
# any database connections. See the worker
# for more details.
def fork_worker?
@fork_worker ||= (!ENV["QC_FORK_WORKER"].nil?)
@fork_worker ||= !ENV['QC_FORK_WORKER'].nil?
end

# The worker class instantiated by QC's rake tasks.
def default_worker_class
@worker_class ||= (ENV["QC_DEFAULT_WORKER_CLASS"] && Kernel.const_get(ENV["QC_DEFAULT_WORKER_CLASS"]) ||
QC::Worker)

@worker_class ||= ENV['QC_DEFAULT_WORKER_CLASS'] && Kernel.const_get(ENV['QC_DEFAULT_WORKER_CLASS']) ||
QC::Worker
end

def default_worker_class=(worker_class)
Expand Down
63 changes: 29 additions & 34 deletions lib/queue_classic/conn_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

module QC
class ConnAdapter

def initialize(args={})
def initialize(args = {})
@active_record_connection_share = args[:active_record_connection_share]
@_connection = args[:connection]
@mutex = Mutex.new
Expand All @@ -22,15 +21,15 @@ def connection

def execute(stmt, *params)
@mutex.synchronize do
QC.log(:at => "exec_sql", :sql => stmt.inspect)
QC.log(at: 'exec_sql', sql: stmt.inspect)
begin
params = nil if params.empty?
r = connection.exec(stmt, params)
result = []
r.each {|t| result << t}
r.each { |t| result << t }
result.length > 1 ? result : result.pop
rescue PG::Error => e
QC.log(:error => e.inspect)
QC.log(error: e.inspect)
connection.reset
raise
end
Expand All @@ -39,60 +38,55 @@ def execute(stmt, *params)

def wait(time, *channels)
@mutex.synchronize do
listen_cmds = channels.map {|c| 'LISTEN "' + c.to_s + '"'}
listen_cmds = channels.map { |c| 'LISTEN "' + c.to_s + '"' }
connection.exec(listen_cmds.join(';'))
wait_for_notify(time)
unlisten_cmds = channels.map {|c| 'UNLISTEN "' + c.to_s + '"'}
unlisten_cmds = channels.map { |c| 'UNLISTEN "' + c.to_s + '"' }
connection.exec(unlisten_cmds.join(';'))
drain_notify
end
end

def disconnect
@mutex.synchronize do
begin
connection.close
rescue => e
QC.log(:at => 'disconnect', :error => e.message)
end
connection.close
rescue StandardError => e
QC.log(at: 'disconnect', error: e.message)
end
end

def server_version
@server_version ||= begin
version = execute("SHOW server_version_num;")["server_version_num"]
version && version.to_i
end
version = execute('SHOW server_version_num;')['server_version_num']
version && version.to_i
end
end

private

def wait_for_notify(t)
Array.new.tap do |msgs|
connection.wait_for_notify(t) {|event, pid, msg| msgs << msg}
[].tap do |msgs|
connection.wait_for_notify(t) { |_event, _pid, msg| msgs << msg }
end
end

def drain_notify
until connection.notifies.nil?
QC.log(:at => "drain_notifications")
end
QC.log(at: 'drain_notifications') until connection.notifies.nil?
end

def validate!(c)
return c if c.is_a?(PG::Connection)

err = "connection must be an instance of PG::Connection, but was #{c.class}"
raise(ArgumentError, err)
end

def establish_new
QC.log(:at => "establish_conn")
QC.log(at: 'establish_conn')
conn = PG.connect(*normalize_db_url(db_url))
if conn.status != PG::CONNECTION_OK
QC.log(:error => conn.error)
end
QC.log(error: conn.error) if conn.status != PG::CONNECTION_OK

if conn.server_version < 90600
if conn.server_version < 90_600
raise "This version of Queue Classic does not support Postgres older than 9.6 (90600). This version is #{conn.server_version}. If you need that support, please use an older version."
end

Expand All @@ -105,20 +99,21 @@ def normalize_db_url(url)
host = host.gsub(/%2F/i, '/') if host

[
host, # host or percent-encoded socket path
url.port || 5432,
nil, nil, #opts, tty
url.path.gsub("/",""), # database name
url.user,
url.password
host, # host or percent-encoded socket path
url.port || 5432,
nil, nil, # opts, tty
url.path.gsub('/', ''), # database name
url.user,
url.password
]
end

def db_url
return @db_url if defined?(@db_url) && @db_url
url = ENV["QC_DATABASE_URL"] ||
ENV["DATABASE_URL"] ||
raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL")

url = ENV['QC_DATABASE_URL'] ||
ENV['DATABASE_URL'] ||
raise(ArgumentError, 'missing QC_DATABASE_URL or DATABASE_URL')
@db_url = URI.parse(url)
end
end
Expand Down
Loading

0 comments on commit 2578974

Please sign in to comment.