Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enqueue-multi compatibility fixes #1

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions .github/workflows/rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,26 @@ on:
pull_request:
branches: [master]

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}'
cancel-in-progress: true

jobs:
rubocop:
name: Rubocop
runs-on: ${{ matrix.os }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest]
ruby: [
2.7
]
ruby-version:
- "3.3"
Copy link
Member Author

@onyxraven onyxraven Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to later released ruby


steps:
- uses: actions/checkout@v4
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
ruby-version: ${{ matrix.ruby-version }}
bundler-cache: true
- name: Ruby linter
run: bundle exec rubocop
62 changes: 24 additions & 38 deletions .github/workflows/ruby.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ on:
pull_request:
branches: [master]

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}'
cancel-in-progress: true

jobs:
test:
continue-on-error: true
Expand All @@ -15,56 +20,37 @@ jobs:
image: redis
ports:
- 6379:6379
# Set health checks to wait until redis has started
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
strategy:
fail-fast: false
matrix:
ruby-version:
- 2.3
- 2.4
- 2.5
- 2.6
- 2.7
- "3.0"
- 3.1
- 3.2
- "3.1"
- "3.2"
- "3.3"
resque-version:
- "master"
- "~> 2.4.0"
- "~> 2.4"
- "~> 1.27"
rufus-scheduler:
- "3.2"
- "3.4"
- "3.5"
- "3.6"
- "~> 3.6.0"
- "~> 3.7.0"
- "~> 3.8.0"
- "~> 3.9"
redis-version:
- "~> 4.x"
- "~> 5.x"
- "~> 3.3"
- "~> 4.8"
- "~> 5.2"
exclude:
- ruby-version: head
rufus-scheduler: 3.2
- ruby-version: 3.2
rufus-scheduler: 3.2

- ruby-version: 2.3
resque-version: "~> 1.27"
rufus-scheduler: 3.4
- ruby-version: 2.3
resque-version: "~> 1.27"
rufus-scheduler: 3.5
- ruby-version: 2.5
resque-version: "~> 2.4.0"
rufus-scheduler: 3.5
- ruby-version: 2.5
resque-version: master
rufus-scheduler: 3.2

- ruby-version: 2.3
redis-version: "~> 5.x"
- ruby-version: 2.4
redis-version: "~> 5.x"

# redis 5 / resque 1 don't get along
- resque-version: "~> 1.27"
redis-version: "~> 5.x"
redis-version: "~> 5.2"
env:
REDIS_VERSION: "${{ matrix.redis-version }}"
RESQUE: "${{ matrix.resque-version }}"
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ nbproject
.env
.env.*
/nul
vendor/
vendor/
.vscode
18 changes: 11 additions & 7 deletions .rubocop.yml
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
inherit_from: .rubocop_todo.yml

AllCops:
SuggestExtensions: false
TargetRubyVersion: 3.0
NewCops: enable
Include:
- Gemfile
- '**/Rakefile'
- resque-scheduler.gemspec
- bin/resque-scheduler

Documentation:
Gemspec/DevelopmentDependencies:
Enabled: false
Metrics/ClassLength:
Max: 110
Metrics/PerceivedComplexity:
Enabled: false
Naming/HeredocDelimiterNaming:
Enabled: false

Style/DoubleNegation:
Enabled: false
Metrics/PerceivedComplexity:
Style/FrozenStringLiteralComment:
Enabled: false
Metrics/ClassLength:
Max: 110
71 changes: 0 additions & 71 deletions .rubocop_todo.yml

This file was deleted.

8 changes: 6 additions & 2 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
# vim:fileencoding=utf-8
source 'https://rubygems.org'

case resque_version = ENV.fetch('RESQUE', 'master')
case resque_version = ENV.fetch('RESQUE', 'latest')
when 'master'
gem 'resque', git: 'https://github.com/resque/resque'
when 'latest'
gem 'resque'
else
gem 'resque', resque_version
end

case rufus_scheduler_version = ENV.fetch('RUFUS_SCHEDULER', '3.6')
case rufus_scheduler_version = ENV.fetch('RUFUS_SCHEDULER', 'latest')
when 'master'
gem 'rufus-scheduler', git: 'https://github.com/jmettraux/rufus-scheduler'
when 'latest'
gem 'rufus-scheduler'
else
gem 'rufus-scheduler', rufus_scheduler_version
end
Expand Down
43 changes: 27 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,24 +129,18 @@ Both the Rake task and standalone executable support the following
environment variables:

* `APP_NAME` - Application name used in procline (`$0`) (default empty)
* `BACKGROUND` - [Run in the background](#running-in-the-background) if
non-empty (via `Process.daemon`, if supported) (default `false`)
* `DYNAMIC_SCHEDULE` - Enables [dynamic scheduling](#dynamic-schedules)
if non-empty (default `false`)
* `RAILS_ENV` - Environment to use in procline (`$0`) (default empty)
* `INITIALIZER_PATH` - Path to a Ruby file that will be loaded *before*
requiring `resque` and `resque/scheduler` (default empty).
* `RESQUE_SCHEDULER_INTERVAL` - Interval in seconds for checking if a
scheduled job must run (coerced with `Kernel#Float()`) (default `5`)
* `BACKGROUND` - [Run in the background](#running-in-the-background) if non-empty (via `Process.daemon`, if supported) (default `false`)
* `DELAYED_REQUEUE_BATCH_SIZE` - Set the delayed job batch size if enabled (default `100`). If `<= 1`, this disables batching.
* `DISABLE_DELAYED_REQUEUE_BATCH` - Disable batched delayed job queuing (default `false`) - [See section below on consequences](#batched-delayed-job-and-resque-enqueue-hooks)
* `DYNAMIC_SCHEDULE` - Enables [dynamic scheduling](#dynamic-schedules) if non-empty (default `false`)
* `INITIALIZER_PATH` - Path to a Ruby file that will be loaded *before* requiring `resque` and `resque/scheduler` (default empty).
* `LOGFILE` - Log file name (default empty, meaning `$stdout`)
* `LOGFORMAT` - Log output format to use (either `'text'`, `'json'` or `'logfmt'`,
default `'text'`)
* `LOGFORMAT` - Log output format to use (either `'text'`, `'json'` or `'logfmt'`, default `'text'`)
* `PIDFILE` - If non-empty, write process PID to file (default empty)
* `QUIET` - Silence most output if non-empty (equivalent to a level of
`MonoLogger::FATAL`, default `false`)
* `VERBOSE` - Maximize log verbosity if non-empty (equivalent to a level
of `MonoLogger::DEBUG`, default `false`)

* `QUIET` - Silence most output if non-empty (equivalent to a level of `MonoLogger::FATAL`, default `false`)
* `RAILS_ENV` - Environment to use in procline (`$0`) (default empty)
* `RESQUE_SCHEDULER_INTERVAL` - Interval in seconds for checking if a scheduled job must run (coerced with `Kernel#Float()`) (default `5`)
* `VERBOSE` - Maximize log verbosity if non-empty (equivalent to a level of `MonoLogger::DEBUG`, default `false`)

### Resque Pool integration

Expand Down Expand Up @@ -755,6 +749,23 @@ This table explains the version requirements for rufus-scheduler
| `~> 4.0` | `~> 3.0` |
| `< 4.0` | `~> 2.0` |

##### Batched delayed job and resque enqueue hooks

Batching delayed job queuing can speed up when per-second job counts grows,
avoiding situations that may cause delayed enqueues to fall behind. This
batching wraps enqueues in a `multi` pipeline, making far fewer roundtrips to
the server.

However, in `redis` gem `>= 4.0`, any operations to redis within the `multi`
block must use the multi handle so that the actions are captured. Resque's hooks
do not currently have a way to pass this around, and so compatibility with other
resque plugins or hooks which access redis at enqueue time is impacted with
batch mode.

Detecting when this occurs can be tricky, you must watch for logs
emitted by your `resque-scheduler` process such as `Redis::CommandError: ERR
MULTI calls can not be nested` or `NoMethodError: undefined method nil? for
<Redis::Future`, and delayed jobs you expect would not be enqueued.

### Contributing

Expand Down
25 changes: 18 additions & 7 deletions lib/resque/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,25 +204,36 @@ def enqueue_next_item(timestamp)
item
end

def batch_delayed_items?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This starts the refactor that revives the 'non batched' mode by deciding if batching is appropriate

!disable_delayed_requeue_batches && delayed_requeue_batch_size > 1
end

# Enqueues all delayed jobs for a timestamp
def enqueue_delayed_items_for_timestamp(timestamp)
count = 0
batch_size = delayed_requeue_batch_size
actual_batch_size = nil
batch_size = batch_delayed_items? ? delayed_requeue_batch_size : 1

log "Processing delayed items for timestamp #{timestamp}, in batches of #{batch_size}"
message = "Processing delayed items for timestamp #{timestamp}"
message += ", in batches of #{batch_size}" if batch_delayed_items?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log messages are updated to reflect batching

log message

loop do
actual_batch_size = 0

handle_shutdown do
# Continually check that it is still the master
if am_master
actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp,
batch_size)
if batch_delayed_items?
actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp, batch_size)
log "queued batch of #{actual_batch_size} jobs" if actual_batch_size != -1
else
item = enqueue_next_item(timestamp)
actual_batch_size = item.nil? ? 0 : 1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non batched version

end
end
end

count += actual_batch_size
log "queued #{count} jobs" if actual_batch_size != -1

# continue processing until there are no more items in this
# timestamp. If we don't have a full batch, this is the last one.
Expand All @@ -231,7 +242,7 @@ def enqueue_delayed_items_for_timestamp(timestamp)
break if actual_batch_size < batch_size
end

log "finished queueing #{count} total jobs for timestamp #{timestamp}" if count != -1
log "finished queueing #{count} total jobs for timestamp #{timestamp}"
end

def timestamp_key(timestamp)
Expand Down
8 changes: 8 additions & 0 deletions lib/resque/scheduler/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,20 @@ def app_name
@app_name ||= environment['APP_NAME']
end

attr_writer :delayed_requeue_batch_size

def delayed_requeue_batch_size
@delayed_requeue_batch_size ||= \
ENV['DELAYED_REQUEUE_BATCH_SIZE'].to_i if environment['DELAYED_REQUEUE_BATCH_SIZE']
@delayed_requeue_batch_size ||= 100
end

attr_writer :disable_delayed_requeue_batches

def disable_delayed_requeue_batches
@enable_delayed_requeue_batches ||= to_bool(environment['DISABLE_DELAYED_REQUEUE_BATCH'])
end

# Amount of time in seconds to sleep between polls of the delayed
# queue. Defaults to 5
attr_writer :poll_sleep_amount
Expand Down
2 changes: 1 addition & 1 deletion lib/resque/scheduler/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Resque
module Scheduler
VERSION = '4.10.2'.freeze
VERSION = '5.11.0'.freeze
end
end
Loading