Skip to content

Commit

Permalink
Reimplement Interruptible using Thread#queue
Browse files Browse the repository at this point in the history
* Replaces a little Unix cleverness with a standard Ruby class.
  This pushes the responsibiity for meeting the SQ requirements
  from SQ to stock Ruby

* Delivers equivelent performance, identical API, and API behaviors
  with the original implementation (see note below on Futures)

* Mostly fixes a *platform / version dependent* issue with
  MySQL (see below)

* Meets 100% of SQ's functional requirements:

  * interruptible_sleep: a potentially blocking operation
    interruptible via either a "wake_event" (possibly requested
    prior to entering interruptible_sleep) or blocking until a
    timeout.

  * wake_up / interrupt: a Signal#trap and thread-safe method that
    does not require user-level synchronization (with the risk of
    not fully understanding all of the complexities required) code
    that either interrupts an inflight-interruptible_sleep or enqueues
    the event to processed in the invocation of interruptible_sleep

* Interruptible's API is trivially reproduceable via Thread::Queue
  * interruptible_sleep => Queue.pop(timeout:) where pushing anything
    into the queue acts as the interrupt event and timeout is reliable
    without any extra code or exception handling.

  * wake_up / interrupt => Queue.push(Anything) is thread, fiber, and
    Signal.trap safe (can be called from anywhere) and captures
    all wake_up events whenever requested, automaticall caching any
    "event" not processed by a currently executing interruptible_sleep
    matching existing functionality exactly.

Why the Future in #interruptible_sleep?

While Thread::Queue micro benchmarks as having the same performance on
the main thread Vs. any form of a sub-thread (or Fiber) and self-pipe,
when running the SQ test suite we see a 35% slow down Vs. the original
self-pipe implenentation.  One assumes this slowdown would manifest
in production. By moving the just the Queue#pop into a separate thread via
Concurrent::Promises.future we get +/- identical performance to the original
self-pipe implementation.

I'm assuming this root causes to Ruby main-thread only housekeeping and/or
possibly triggering a fast/slow path issue.

Why a Future Vs. Thread#new for each interruptible_sleep call?

Every other threaded operation in SQ is implemented using Concurrent
Ruby. Using a Future is for code and architectual consistency. There is
no difference in performance or functionality between the two.

MySQL *only* issues:

There seems to be a *platform specific* or *version specific* problem
with MySQL database connectivity and/or broken self-pipes leading to
randomly failing tests and a stream of distracting backtraces *even
with successful* tests.  Adding to the complexity sometimes, the lost
database connection can self-heal -- HOWEVER -- this takes time and given
how much of the test suite has time based assertions, leads to
additional random test failures.

These, or similar, issues have been observed in the past when changes to
the MySQL client library forced changes in the mysql2 gem.

With the Thread::Queue based implementation of the Interruptible concern,
the random failures and amount of spurious output are dramatically
improved (but not eliminated).
  • Loading branch information
hms committed Dec 2, 2024
1 parent 6cc550e commit a152f26
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 21 deletions.
6 changes: 3 additions & 3 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ GEM
concurrent-ruby (1.3.4)
connection_pool (2.4.1)
crass (1.0.6)
date (3.4.1)
debug (1.7.1)
irb (>= 1.5.0)
reline (>= 0.3.1)
Expand Down Expand Up @@ -98,7 +99,8 @@ GEM
ast (~> 2.4.1)
racc
pg (1.5.4)
psych (5.2.0)
psych (5.2.1)
date
stringio
puma (6.4.3)
nio4r (~> 2.0)
Expand Down Expand Up @@ -130,8 +132,6 @@ GEM
rake (13.2.1)
rdoc (6.8.1)
psych (>= 4.0.0)
rdoc (6.6.3.1)
psych (>= 4.0.0)
regexp_parser (2.9.2)
reline (0.5.12)
io-console (~> 0.5)
Expand Down
30 changes: 12 additions & 18 deletions lib/solid_queue/processes/interruptible.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,25 @@ def wake_up
end

private
SELF_PIPE_BLOCK_SIZE = 11

def interrupt
self_pipe[:writer].write_nonblock(".")
rescue Errno::EAGAIN, Errno::EINTR
# Ignore writes that would block and retry
# if another signal arrived while writing
retry
queue << true
end

def interruptible_sleep(time)
if time > 0 && self_pipe[:reader].wait_readable(time)
loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) }
end
rescue Errno::EAGAIN, Errno::EINTR
# Since this is invoked on the main thread, using some form of Async
# avoids a 35% slowdown (at least when running the test suite).
#
# Using Futures for architectural consistency with all the other Async in SolidQueue.
Concurrent::Promises.future(time) do |timeout|
if timeout > 0 && queue.pop(timeout:)
queue.clear # exiting the poll wait guarantees testing for SHUTDOWN before next poll
end
end.value
end

# Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html)
def self_pipe
@self_pipe ||= create_self_pipe
end

def create_self_pipe
reader, writer = IO.pipe
{ reader: reader, writer: writer }
def queue
@queue ||= Queue.new
end
end
end

0 comments on commit a152f26

Please sign in to comment.