From 239ffb9d5763aee6889fc4a81c49348ab50eba2d Mon Sep 17 00:00:00 2001 From: Jan Berdajs Date: Tue, 24 Nov 2015 18:35:25 +0100 Subject: [PATCH] JRuby support: pooled application manager (instead of fork) - used if fork if not supported --- lib/spring/application.rb | 59 ++--- lib/spring/application/boot.rb | 9 +- lib/spring/application/fork_strategy.rb | 50 ++++ lib/spring/application/pool_strategy.rb | 30 +++ lib/spring/application_manager.rb | 138 +---------- .../application_manager/fork_strategy.rb | 139 +++++++++++ .../application_manager/pool_strategy.rb | 226 ++++++++++++++++++ lib/spring/binstub.rb | 2 +- lib/spring/client/run.rb | 6 +- lib/spring/configuration.rb | 8 + lib/spring/env.rb | 2 +- lib/spring/platform.rb | 25 ++ lib/spring/server.rb | 11 +- 13 files changed, 521 insertions(+), 184 deletions(-) create mode 100644 lib/spring/application/fork_strategy.rb create mode 100644 lib/spring/application/pool_strategy.rb create mode 100644 lib/spring/application_manager/fork_strategy.rb create mode 100644 lib/spring/application_manager/pool_strategy.rb create mode 100644 lib/spring/platform.rb diff --git a/lib/spring/application.rb b/lib/spring/application.rb index 992c5125..2e332ddc 100644 --- a/lib/spring/application.rb +++ b/lib/spring/application.rb @@ -1,9 +1,17 @@ require "spring/boot" require "set" require "pty" +require "spring/platform" module Spring class Application + if Spring.fork? + require 'spring/application/fork_strategy' + include ForkStrategy + else + require 'spring/application/pool_strategy' + include PoolStrategy + end attr_reader :manager, :watcher, :spring_env, :original_env def initialize(manager, original_env) @@ -114,13 +122,9 @@ def preload end end - def eager_preload - with_pty { preload } - end - def run state :running - manager.puts + manager.puts Process.pid loop do IO.select [manager, @interrupt.first] @@ -134,6 +138,7 @@ def run end def serve(client) + child_started = [false] log "got client" manager.puts @@ -153,7 +158,7 @@ def serve(client) ActionDispatch::Reloader.prepare! end - pid = fork { + fork_child(client, streams, child_started) { IGNORE_SIGNALS.each { |sig| trap(sig, "DEFAULT") } trap("TERM", "DEFAULT") @@ -182,24 +187,18 @@ def serve(client) command.call } - - disconnect_database - reset_streams - - log "forked #{pid}" - manager.puts pid - - wait pid, streams, client rescue Exception => e + Kernel.exit if exiting? && e.is_a?(SystemExit) + log "exception: #{e}" - manager.puts unless pid + manager.puts unless child_started[0] if streams && !e.is_a?(SystemExit) print_exception(stderr, e) streams.each(&:close) end - client.puts(1) if pid + client.puts(1) if child_started[0] client.close end @@ -280,39 +279,11 @@ def print_exception(stream, error) rest.each { |line| stream.puts("\tfrom #{line}") } end - def with_pty - PTY.open do |master, slave| - [STDOUT, STDERR, STDIN].each { |s| s.reopen slave } - Thread.new { master.read } - yield - reset_streams - end - end - def reset_streams [STDOUT, STDERR].each { |stream| stream.reopen(spring_env.log_file) } STDIN.reopen("/dev/null") end - def wait(pid, streams, client) - @mutex.synchronize { @waiting << pid } - - # Wait in a separate thread so we can run multiple commands at once - Thread.new { - begin - _, status = Process.wait2 pid - log "#{pid} exited with #{status.exitstatus}" - - streams.each(&:close) - client.puts(status.exitstatus) - client.close - ensure - @mutex.synchronize { @waiting.delete pid } - exit_if_finished - end - } - end - private def active_record_configured? diff --git a/lib/spring/application/boot.rb b/lib/spring/application/boot.rb index 6804b646..21981dc0 100644 --- a/lib/spring/application/boot.rb +++ b/lib/spring/application/boot.rb @@ -3,8 +3,15 @@ require "spring/application" +remote_socket = + if ENV["SPRING_SOCKET"] + UNIXSocket.open(ENV.delete("SPRING_SOCKET")) + else + UNIXSocket.for_fd(3) + end + app = Spring::Application.new( - UNIXSocket.for_fd(3), + remote_socket, Spring::JSON.load(ENV.delete("SPRING_ORIGINAL_ENV").dup) ) diff --git a/lib/spring/application/fork_strategy.rb b/lib/spring/application/fork_strategy.rb new file mode 100644 index 00000000..2c371c4f --- /dev/null +++ b/lib/spring/application/fork_strategy.rb @@ -0,0 +1,50 @@ +module Spring + class Application + module ForkStrategy + def eager_preload + with_pty { preload } + end + + def with_pty + PTY.open do |master, slave| + [STDOUT, STDERR, STDIN].each { |s| s.reopen slave } + Thread.new { master.read } + yield + reset_streams + end + end + + def wait(pid, streams, client) + @mutex.synchronize { @waiting << pid } + + # Wait in a separate thread so we can run multiple commands at once + Thread.new { + begin + _, status = Process.wait2 pid + log "#{pid} exited with #{status.exitstatus}" + + streams.each(&:close) + client.puts(status.exitstatus) + client.close + ensure + @mutex.synchronize { @waiting.delete pid } + exit_if_finished + end + } + end + + def fork_child(client, streams, child_started) + pid = fork { yield } + child_started[0] = true + + disconnect_database + reset_streams + + log "forked #{pid}" + manager.puts pid + + wait pid, streams, client + end + end + end +end diff --git a/lib/spring/application/pool_strategy.rb b/lib/spring/application/pool_strategy.rb new file mode 100644 index 00000000..d313b2d3 --- /dev/null +++ b/lib/spring/application/pool_strategy.rb @@ -0,0 +1,30 @@ +module Spring + class Application + module PoolStrategy + def eager_preload + reset_streams + preload + end + + def fork_child(client, streams, child_started) + child_started[0] = true + exitstatus = 0 + manager.puts Process.pid + begin + log "started #{Process.pid}" + yield + rescue SystemExit => ex + exitstatus = ex.status + end + + log "#{Process.pid} exited with #{exitstatus}" + + streams.each(&:close) + client.puts(exitstatus) + client.close + + exit + end + end + end +end diff --git a/lib/spring/application_manager.rb b/lib/spring/application_manager.rb index 8b34b9f6..0bd9a560 100644 --- a/lib/spring/application_manager.rb +++ b/lib/spring/application_manager.rb @@ -1,137 +1,7 @@ module Spring - class ApplicationManager - attr_reader :pid, :child, :app_env, :spring_env, :status - - def initialize(app_env) - @app_env = app_env - @spring_env = Env.new - @mutex = Mutex.new - @state = :running - end - - def log(message) - spring_env.log "[application_manager:#{app_env}] #{message}" - end - - # We're not using @mutex.synchronize to avoid the weird ":10" - # line which messes with backtraces in e.g. rspec - def synchronize - @mutex.lock - yield - ensure - @mutex.unlock - end - - def start - start_child - end - - def restart - return if @state == :stopping - start_child(true) - end - - def alive? - @pid - end - - def with_child - synchronize do - if alive? - begin - yield - rescue Errno::ECONNRESET, Errno::EPIPE - # The child has died but has not been collected by the wait thread yet, - # so start a new child and try again. - log "child dead; starting" - start - yield - end - else - log "child not running; starting" - start - yield - end - end - end - - # Returns the pid of the process running the command, or nil if the application process died. - def run(client) - with_child do - child.send_io client - child.gets or raise Errno::EPIPE - end - - pid = child.gets.to_i - - unless pid.zero? - log "got worker pid #{pid}" - pid - end - rescue Errno::ECONNRESET, Errno::EPIPE => e - log "#{e} while reading from child; returning no pid" - nil - ensure - client.close - end - - def stop - log "stopping" - @state = :stopping - - if pid - Process.kill('TERM', pid) - Process.wait(pid) - end - rescue Errno::ESRCH, Errno::ECHILD - # Don't care - end - - private - - def start_child(preload = false) - @child, child_socket = UNIXSocket.pair - - Bundler.with_clean_env do - @pid = Process.spawn( - { - "RAILS_ENV" => app_env, - "RACK_ENV" => app_env, - "SPRING_ORIGINAL_ENV" => JSON.dump(Spring::ORIGINAL_ENV), - "SPRING_PRELOAD" => preload ? "1" : "0" - }, - "ruby", - "-I", File.expand_path("../..", __FILE__), - "-e", "require 'spring/application/boot'", - 3 => child_socket - ) - end - - start_wait_thread(pid, child) if child.gets - child_socket.close - end - - def start_wait_thread(pid, child) - Process.detach(pid) - - Thread.new { - # The recv can raise an ECONNRESET, killing the thread, but that's ok - # as if it does we're no longer interested in the child - loop do - IO.select([child]) - break if child.recv(1, Socket::MSG_PEEK).empty? - sleep 0.01 - end - - log "child #{pid} shutdown" - - synchronize { - if @pid == pid - @pid = nil - restart - end - } - } - end + module ApplicationManager end end + +require 'spring/application_manager/fork_strategy' +require 'spring/application_manager/pool_strategy' diff --git a/lib/spring/application_manager/fork_strategy.rb b/lib/spring/application_manager/fork_strategy.rb new file mode 100644 index 00000000..ca8d4aab --- /dev/null +++ b/lib/spring/application_manager/fork_strategy.rb @@ -0,0 +1,139 @@ +module Spring + module ApplicationManager + class ForkStrategy + attr_reader :pid, :child, :app_env, :spring_env, :status + + def initialize(app_env) + @app_env = app_env + @spring_env = Env.new + @mutex = Mutex.new + @state = :running + end + + def log(message) + spring_env.log "[application_manager:#{app_env}] #{message}" + end + + # We're not using @mutex.synchronize to avoid the weird ":10" + # line which messes with backtraces in e.g. rspec + def synchronize + @mutex.lock + yield + ensure + @mutex.unlock + end + + def start + start_child + end + + def restart + return if @state == :stopping + start_child(true) + end + + def alive? + @pid + end + + def with_child + synchronize do + if alive? + begin + yield + rescue Errno::ECONNRESET, Errno::EPIPE + # The child has died but has not been collected by the wait thread yet, + # so start a new child and try again. + log "child dead; starting" + start + yield + end + else + log "child not running; starting" + start + yield + end + end + end + + # Returns the pid of the process running the command, or nil if the application process died. + def run(client) + with_child do + child.send_io client + child.gets or raise Errno::EPIPE + end + + pid = child.gets.to_i + + unless pid.zero? + log "got worker pid #{pid}" + pid + end + rescue Errno::ECONNRESET, Errno::EPIPE => e + log "#{e} while reading from child; returning no pid" + nil + ensure + client.close + end + + def stop + log "stopping" + @state = :stopping + + if pid + Process.kill('TERM', pid) + Process.wait(pid) + end + rescue Errno::ESRCH, Errno::ECHILD + # Don't care + end + + private + + def start_child(preload = false) + @child, child_socket = UNIXSocket.pair + + Bundler.with_clean_env do + @pid = Process.spawn( + { + "RAILS_ENV" => app_env, + "RACK_ENV" => app_env, + "SPRING_ORIGINAL_ENV" => JSON.dump(Spring::ORIGINAL_ENV), + "SPRING_PRELOAD" => preload ? "1" : "0" + }, + "ruby", + "-I", File.expand_path("../..", __FILE__), + "-e", "require 'spring/application/boot'", + 3 => child_socket + ) + end + + start_wait_thread(pid, child) if child.gets + child_socket.close + end + + def start_wait_thread(pid, child) + Process.detach(pid) + + Thread.new { + # The recv can raise an ECONNRESET, killing the thread, but that's ok + # as if it does we're no longer interested in the child + loop do + IO.select([child]) + break if child.recv(1, Socket::MSG_PEEK).empty? + sleep 0.01 + end + + log "child #{pid} shutdown" + + synchronize { + if @pid == pid + @pid = nil + restart + end + } + } + end + end + end +end diff --git a/lib/spring/application_manager/pool_strategy.rb b/lib/spring/application_manager/pool_strategy.rb new file mode 100644 index 00000000..7b3d0440 --- /dev/null +++ b/lib/spring/application_manager/pool_strategy.rb @@ -0,0 +1,226 @@ +require 'securerandom' + +module Spring + module ApplicationManager + class PoolStrategy + class Worker + attr_reader :pid, :uuid, :socket + attr_accessor :on_done + + def initialize(env, args) + @spring_env = Env.new + @uuid = SecureRandom.uuid + path = @spring_env.tmp_path.join("#{@uuid}.sock").to_s + @server = UNIXServer.open(path) + + Bundler.with_clean_env do + spawn_app( + env.merge("SPRING_SOCKET" => path), + args + ) + end + + @socket = @server.accept + end + + def spawn_app(env, args) + @pid = + Process.spawn( + env, + *args + ) + + log "(spawn #{@pid})" + end + + def await_boot + @pid = socket.gets.to_i + start_wait_thread(pid, socket) unless pid.zero? + end + + def start_wait_thread(pid, child) + Process.detach(pid) + + Thread.new { + begin + Process.kill(0, pid) while sleep(1) + rescue Errno::ESRCH + end + + log "child #{pid} shutdown" + + # socket.close + # @server.close + on_done.call(self) if on_done + } + end + + def log(message) + @spring_env.log "[worker:#{uuid}] #{message}" + end + end + + class WorkerPool + def initialize(app_env, *app_args) + @app_env = app_env + @app_args = app_args + @spring_env = Env.new + + @workers = [] + @workers_in_use = [] + @spawning_workers = [] + + @check_mutex = Mutex.new + @workers_mutex = Mutex.new + + run + end + + def add_worker + worker = Worker.new(@app_env, @app_args) + worker.on_done = method(:worker_done) + @workers_mutex.synchronize { @spawning_workers << worker } + Thread.new do + worker.await_boot + log "+ worker #{worker.pid} (#{worker.uuid})" + @workers_mutex.synchronize do + @spawning_workers.delete(worker) + @workers << worker + end + end + end + + def worker_done(worker) + log "- worker #{worker.pid} (#{worker.uuid})" + @workers_mutex.synchronize do + @workers_in_use.delete(worker) + end + end + + def get_worker(spawn_new = true) + add_worker if spawn_new && all_size == 0 + + worker = nil + while worker.nil? && all_size > 0 + @workers_mutex.synchronize do + worker = @workers.shift + @workers_in_use << worker if worker + end + break if worker + sleep 1 + end + + Thread.new { check_min_free_workers } if spawn_new + + worker + end + + def check_min_free_workers + if @check_mutex.try_lock + while all_size < Spring.pool_min_free_workers + unless Spring.pool_spawn_parallel + sleep 0.1 until @workers_mutex.synchronize { @spawning_workers.empty? } + end + add_worker + end + @check_mutex.unlock + end + end + + def all_size + @workers_mutex.synchronize { @workers.size + @spawning_workers.size } + end + + def stop! + if spawning_worker_pids.include?(nil) + log "Waiting for workers to quit..." + sleep 0.1 while spawning_worker_pids.include?(nil) + end + + @workers_mutex.synchronize do + (@spawning_workers + @workers_in_use + @workers).each do |worker| + kill_worker(worker) + end + end + end + private + def kill_worker(worker) + log "- worker #{worker.pid} (#{worker.uuid})." + system("kill -9 #{worker.pid} > /dev/null 2>&1") + rescue + end + + def spawning_worker_pids + @spawning_workers.map { |worker| worker.pid } + end + + def run + check_min_free_workers + end + + def log(message) + @spring_env.log "[worker:pool] #{message}" + end + end + + def initialize(app_env) + @app_env = app_env + @spring_env = Env.new + @pool = + WorkerPool.new( + { + "RAILS_ENV" => app_env, + "RACK_ENV" => app_env, + "SPRING_ORIGINAL_ENV" => JSON.dump(Spring::ORIGINAL_ENV), + "SPRING_PRELOAD" => "1", + }, + Spring.ruby_bin, + "-I", File.expand_path("../..", __FILE__), + "-e", "require 'spring/application/boot'" + ) + end + + # Returns the name of the screen running the command, or nil if the application process died. + def run(client) + pid = nil + with_child do |child| + child.socket.send_io(client) + IO.select([child.socket]) + child.socket.gets or raise Errno::EPIPE + IO.select([child.socket]) + pid = child.socket.gets.to_i + end + + unless pid.zero? + log "got worker pid #{pid}" + pid + end + rescue Errno::ECONNRESET, Errno::EPIPE => e + log "#{e} while reading from child; returning no pid" + nil + ensure + client.close + end + + def stop + log "stopping" + + @pool.stop! + rescue Errno::ESRCH, Errno::ECHILD + # Don't care + end + + protected + + attr_reader :app_env, :spring_env + + def log(message) + spring_env.log "[application_manager:#{app_env}] #{message}" + end + + def with_child + yield(@pool.get_worker) + end + end + end +end diff --git a/lib/spring/binstub.rb b/lib/spring/binstub.rb index 75f92fb4..ec40aa25 100644 --- a/lib/spring/binstub.rb +++ b/lib/spring/binstub.rb @@ -6,7 +6,7 @@ else disable = ENV["DISABLE_SPRING"] - if Process.respond_to?(:fork) && (disable.nil? || disable.empty? || disable == "0") + if disable.nil? || disable.empty? || disable == "0" ARGV.unshift(command) load bin_path end diff --git a/lib/spring/client/run.rb b/lib/spring/client/run.rb index 4a7c213f..7431b618 100644 --- a/lib/spring/client/run.rb +++ b/lib/spring/client/run.rb @@ -1,12 +1,12 @@ require "rbconfig" require "socket" require "bundler" +require "spring/platform" module Spring module Client class Run < Command - FORWARDED_SIGNALS = %w(INT QUIT USR1 USR2 INFO) & Signal.list.keys - TIMEOUT = 1 + TIMEOUT = Spring.fork? ? 1 : 60 def initialize(args) super @@ -127,6 +127,7 @@ def run_command(client, application) send_json application, "args" => args, "env" => ENV.to_hash + IO.select([server]) pid = server.gets pid = pid.chomp if pid @@ -139,6 +140,7 @@ def run_command(client, application) log "got pid: #{pid}" forward_signals(pid.to_i) + IO.select([application]) status = application.read.to_i log "got exit status #{status}" diff --git a/lib/spring/configuration.rb b/lib/spring/configuration.rb index e25e079e..c22df16d 100644 --- a/lib/spring/configuration.rb +++ b/lib/spring/configuration.rb @@ -37,6 +37,14 @@ def project_root_path @project_root_path ||= find_project_root(Pathname.new(File.expand_path(Dir.pwd))) end + def pool_min_free_workers + 2 + end + + def pool_spawn_parallel + true + end + private def find_project_root(current_dir) diff --git a/lib/spring/env.rb b/lib/spring/env.rb index e1e9656f..87e8f10a 100644 --- a/lib/spring/env.rb +++ b/lib/spring/env.rb @@ -6,9 +6,9 @@ require "spring/version" require "spring/sid" require "spring/configuration" +require "spring/platform" module Spring - IGNORE_SIGNALS = %w(INT QUIT) STOP_TIMEOUT = 2 # seconds class Env diff --git a/lib/spring/platform.rb b/lib/spring/platform.rb new file mode 100644 index 00000000..f875734c --- /dev/null +++ b/lib/spring/platform.rb @@ -0,0 +1,25 @@ +module Spring + def self.fork? + Process.respond_to?(:fork) + end + + def self.jruby? + RUBY_PLATFORM == "java" + end + + def self.ruby_bin + if RUBY_PLATFORM == "java" + "jruby" + else + "ruby" + end + end + + if jruby? + IGNORE_SIGNALS = %w(INT) + FORWARDED_SIGNALS = %w(INT USR2 INFO) & Signal.list.keys + else + IGNORE_SIGNALS = %w(INT QUIT) + FORWARDED_SIGNALS = %w(INT QUIT USR1 USR2 INFO) & Signal.list.keys + end +end diff --git a/lib/spring/server.rb b/lib/spring/server.rb index b922199f..8d17f30f 100644 --- a/lib/spring/server.rb +++ b/lib/spring/server.rb @@ -3,6 +3,7 @@ module Spring end require "spring/boot" +require "spring/platform" require "spring/application_manager" # Must be last, as it requires bundler/setup, which alters the load path @@ -18,7 +19,7 @@ def self.boot def initialize(env = Env.new) @env = env - @applications = Hash.new { |h, k| h[k] = ApplicationManager.new(k) } + @applications = Hash.new { |h, k| h[k] = new_application_manager(k) } @pidfile = env.pidfile_path.open('a') @mutex = Mutex.new end @@ -126,5 +127,13 @@ def set_process_title "spring server | #{env.app_name} | started #{distance} ago" } end + + def new_application_manager(app_env) + if Spring.fork? + ApplicationManager::ForkStrategy.new(app_env) + else + ApplicationManager::PoolStrategy.new(app_env) + end + end end end