From 8082ebef18fcc2e442a6ec5925792e62ddf49147 Mon Sep 17 00:00:00 2001 From: Emil Tin Date: Thu, 20 Jan 2022 13:41:52 +0100 Subject: [PATCH] Refactor async task usage, use new Task module. Fixes to the TLC emulator. --- README.md | 4 +- bin/console | 2 +- documentation/collecting_message.md | 4 +- documentation/tasks.md | 164 +++++++++++++ lib/rsmp.rb | 1 + lib/rsmp/archive.rb | 6 +- lib/rsmp/cli.rb | 31 ++- lib/rsmp/collect/state_collector.rb | 2 +- lib/rsmp/components.rb | 6 +- lib/rsmp/convert/export/json_schema.rb | 8 +- lib/rsmp/convert/import/yaml.rb | 2 +- lib/rsmp/error.rb | 3 - lib/rsmp/inspect.rb | 2 +- lib/rsmp/logger.rb | 10 +- lib/rsmp/logging.rb | 2 +- lib/rsmp/message.rb | 2 +- lib/rsmp/node.rb | 54 +---- lib/rsmp/proxy.rb | 294 ++++++++++++++---------- lib/rsmp/rsmp.rb | 2 +- lib/rsmp/site.rb | 83 ++----- lib/rsmp/site_proxy.rb | 38 +-- lib/rsmp/site_proxy_wait.rb | 0 lib/rsmp/supervisor.rb | 46 ++-- lib/rsmp/supervisor_proxy.rb | 80 ++++--- lib/rsmp/task.rb | 78 +++++++ lib/rsmp/tlc/signal_group.rb | 8 +- lib/rsmp/tlc/signal_plan.rb | 4 +- lib/rsmp/tlc/traffic_controller.rb | 62 +++-- lib/rsmp/tlc/traffic_controller_site.rb | 79 ++++--- lib/rsmp/wait.rb | 20 +- spec/collector_spec.rb | 2 +- spec/message_spec.rb | 24 +- spec/proxy_spec.rb | 92 ++++++++ spec/spec_helper.rb | 15 ++ spec/status_collector_spec.rb | 8 +- spec/supervisor_spec.rb | 48 ++-- spec/task_spec.rb | 71 ++++++ test.rb | 27 --- 38 files changed, 931 insertions(+), 453 deletions(-) create mode 100644 documentation/tasks.md delete mode 100644 lib/rsmp/site_proxy_wait.rb create mode 100644 lib/rsmp/task.rb create mode 100644 spec/proxy_spec.rb create mode 100644 spec/task_spec.rb delete mode 100644 test.rb diff --git a/README.md b/README.md index ab607e57..2e6f7538 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ $ git submodule update # fetch submodules Alternatively, you can pass --recurse-submodules to the git clone command, and it will automatically initialize and update each submodule in the repository. -## Usage +## Usage ### Site and Supervisor The RSMP::Site and RSMP::Supervisor classes can be used to run a RSMP site. @@ -172,7 +172,7 @@ Use the ```tlc``` site type to run an emulation of a traffic light controller. T ### CLI help and options. Use ```--help ``` to get a list of available options. -Use ```--config ``` to point to a .yaml config file, controlling things like IP adresses, ports, and log output. Examples of config files can be found the folder ```config/```. +Use ```--config ``` to point to a .yaml config file, controlling things like IP adresses, ports, and log output. Examples of config files can be found the folder ```config/```. ## Tests ### RSpec diff --git a/bin/console b/bin/console index acc8ead0..ed01facd 100755 --- a/bin/console +++ b/bin/console @@ -1,7 +1,7 @@ #!/usr/bin/env ruby # Make IRB run inside Async, so async task -# will run the the background. +# will run the the background. require 'bundler/setup' require 'irb' diff --git a/documentation/collecting_message.md b/documentation/collecting_message.md index 7039270f..cd69200c 100644 --- a/documentation/collecting_message.md +++ b/documentation/collecting_message.md @@ -1,7 +1,7 @@ # Collection You often need to collect messages or responses. The collector classes are used to collect message asyncronously. Other tasks continue until the collection completes, time outs or is cancelled. -A collector can collect ingoing and/or outgoing messages. +A collector can collect ingoing and/or outgoing messages. An object that includes the Notifier module (or implements the same functionality) must be provided when you construct a Collected. The collector will attach itself to this notifier when it starts collecting, to receive messages. The SiteProxy and SupervisorProxy classes both include the Notifier module, and can therefore be used as message sources. @@ -32,7 +32,7 @@ outgoing: Whether to collect outgoing messages. Defaults to true component: An RSMP component id. ### Collecting -Use collect() to start collecting and wait for completion or timeout. The status will be returned. +Use collect() to start collecting and wait for completion or timeout. The status will be returned. ```ruby result = collector.collect # => :ok, :timeout or :cancelled diff --git a/documentation/tasks.md b/documentation/tasks.md new file mode 100644 index 00000000..406a2db3 --- /dev/null +++ b/documentation/tasks.md @@ -0,0 +1,164 @@ +# Tasks + +## Concurrency +The Async gem (which uses Rubys concurrent Fibers are the new Rubhy Fiber scheduler) is used to handle concurrency. + +When you use a site or a supervisor, it runs asyncronously so you can run several concurrently, or do other things concurrently, like sending messages and waiting for reponses. + +``` +Site - SupervisorProxy - Reader < < < Writer - SiteProxy - Supervisor + \ Writer > > > Reader / +``` + +Running asyncronously means that the site/supervisor network handling is run in an async task. + +Classes don't inherit for Async::Task. Instead they include task as instance variables. This means that the hierachy of objects and tasks can be different. + +Async task are use for handle the the following concurrently: + +- Running multiple sites or supervisors concurrently +- Running multiple connections concurrently +- Waiting for messages +- Waiting for connections or states + +## Proxies +A supervisor waits for sites to connect. Each time a site connects, a proxy is created and run to handle the connection. + +A site connects to one of more supervisors. A proxy is created and run to handle each connection. + +A site can connect to one or more supervisor. It creates proxy for each and runs them. + +When the proxy is run, it creates an async task to handle the communication. The proxy task will be a sub task of the site/supervisor task. + +A proxy can use sub tasks to handle watchdog timers, etc. + +## The run() cycle +The Task modules defines a life cycle for handling async tasks. + +You first call `start`. If `@atask` already exists, it will return immedatiately. +Otherwise an async task is created and stored in `@task`, and `run` is called inside this task, to handle any long-running processes. The call to `start` returns immediately. + +If you want to stop the task, call `stop`. If `@task`doesn't exist, it will return. Otherwise it wil call `shutdown`which will terminate the task stored in `@task` as well as any subtasks. + +## Proxies and run() +Proxies build on the Task functionality by handling RSMP communication via a TCP socket. The TCP socket can be open or closed. The RSMP communication first goes through a handshake sequence before being ready. This is encapsulated in the `status` attribute, which can be one of `:disconnected`, `:connected` or `:ready` + +Proxies implement `connect` and `close` for starting and stopping commununication, but supervisor and site proxies are a bit different. A supervisor proxies connects actively to a site proxy, whereas a site proxy waits for the supervisor proxy to connect. This means they are constructed a bit differently. + +A supervisor proxy is created at startup, and is responsible for creating the tcp socket and connecting to the supervisor. + +A site proxy is also created at startup, but the socket is created in the supervisor by `Aync::Endpoint#accept`when a site connects. + + +## Stopping tasks +Be aware that if a task stops itself, code after the call to stop() will not be run - unless you use an ensure block: + +```ruby +require 'async' + +Async do |task| + task.stop + puts "I just stopped" # this will not be reaced, because the task was stopped +end + +Async do |task| + task.stop +ensure + puts "I just stopped" # this will be reached +end +``` + +This is important to keep in mind, e.g. when a timer task finds an acknowledgement was not received in time, and then closes the connection by calling stop() in the Proxy, which will thne in turn stop the timer task. + + +Object hierarchy: + +``` +Supervisor 1 + site proxy 1 + site proxy 2 +Supervisor 2 + site proxy 1 + site proxy 2 + +Site 1 + supervisor proxy 1 + supervisor proxy 2 +``` + +Task hierachy: + +``` +supervisor parent + accepting connections + incoming connection 1 + incoming connection 2 + reader + timer + +site parent + tlc site + connected + tlc timer + +``` + +The task hierachy matters when you stop a task or iterate on subtasks. Note that calling `Task#wait` + does not wait for subtasks, whereas Task#stop stops all subtasks. + + + + + +Async block usage + +```ruby +# new design: + +# running a site or supervisor +# returns immedately. code will run inside an async task +site.run +supervisor.run + +# when a site connects to supervisors, +# async task are implicitely created +@socket = @endpoint.connect + +# when a supervisor accepts incoming connections from sites, +# async task are implicitely created +@endpoint.accept + +# when you wait for messages +... + + + +# current design: +Async do |task| # Node#start + +@endpoint.accept do |socket| # Supervisor#start_action,implicit task creation +@socket = @endpoint.connect # SupervisorProxy#connect, implicit task creation + +@task.async do |task| # Site#start_action + +@reader = @task.async do |task| # Proxy#start_reader +@timer = @task.async do |task| # Proxy#start_reader + +task = @task.async { |task| yield task } # Proxy#send_and_optionally_collect +@timer = @task.async do |task| # TrafficControllerSite#start_timer +``` + + +Task assignment + +```ruby +@task = options[:task] # Node#initialize +@task = task # Node#do_start + +@task = options[:task] # Collector#initialize +@task = task # Collector#use_task + +``` + + + diff --git a/lib/rsmp.rb b/lib/rsmp.rb index f2f9631f..33d1e095 100644 --- a/lib/rsmp.rb +++ b/lib/rsmp.rb @@ -10,6 +10,7 @@ require 'async/queue' require 'rsmp/rsmp' +require 'rsmp/task' require 'rsmp/deep_merge' require 'rsmp/inspect' require 'rsmp/logging' diff --git a/lib/rsmp/archive.rb b/lib/rsmp/archive.rb index a4868c72..9bf93b7b 100644 --- a/lib/rsmp/archive.rb +++ b/lib/rsmp/archive.rb @@ -20,11 +20,11 @@ def inspect def self.prepare_item item raise ArgumentError unless item.is_a? Hash - + cleaned = item.select { |k,v| [:author,:level,:ip,:port,:site_id,:component,:text,:message,:exception].include? k } cleaned[:timestamp] = Clock.now if item[:message] - cleaned[:direction] = item[:message].direction + cleaned[:direction] = item[:message].direction cleaned[:component] = item[:message].attributes['cId'] end @@ -54,7 +54,7 @@ def add item @items.shift end end - + private def find options, &block diff --git a/lib/rsmp/cli.rb b/lib/rsmp/cli.rb index 04ed54c4..61f2e16d 100644 --- a/lib/rsmp/cli.rb +++ b/lib/rsmp/cli.rb @@ -11,7 +11,7 @@ def version desc "site", "Run RSMP site" method_option :config, :type => :string, :aliases => "-c", banner: 'Path to .yaml config file' method_option :id, :type => :string, :aliases => "-i", banner: 'RSMP site id' - method_option :supervisors, :type => :string, :aliases => "-s", banner: 'ip:port,... list of supervisor to connect to' + method_option :supervisors, :type => :string, :aliases => "-s", banner: 'ip:port,... list of supervisor to connect to' method_option :log, :type => :string, :aliases => "-l", banner: 'Path to log file' method_option :json, :type => :boolean, :aliases => "-j", banner: 'Show JSON messages in log' method_option :type, :type => :string, :aliases => "-t", banner: 'Type of site: [tlc]' @@ -60,19 +60,35 @@ def site site_class = RSMP::Site end end - site_class.new(site_settings:settings, log_settings: log_settings).start + Async do |task| + task.annotate 'cli' + loop do + begin + site = site_class.new(site_settings:settings, log_settings: log_settings) + site.start + site.wait + rescue RSMP::Restart + site.stop + end + end + end + rescue Interrupt + # cntr-c rescue RSMP::Schemer::UnknownSchemaTypeError => e puts "Cannot start site: #{e}" rescue RSMP::Schemer::UnknownSchemaVersionError => e puts "Cannot start site: #{e}" rescue Psych::SyntaxError => e puts "Cannot read config file #{e}" + rescue Exception => e + puts "Uncaught error: #{e}" + puts caller.join("\n") end desc "supervisor", "Run RSMP supervisor" method_option :config, :type => :string, :aliases => "-c", banner: 'Path to .yaml config file' method_option :id, :type => :string, :aliases => "-i", banner: 'RSMP site id' - method_option :ip, :type => :numeric, banner: 'IP address to listen on' + method_option :ip, :type => :numeric, banner: 'IP address to listen on' method_option :port, :type => :string, :aliases => "-p", banner: 'Port to listen on' method_option :log, :type => :string, :aliases => "-l", banner: 'Path to log file' method_option :json, :type => :boolean, :aliases => "-j", banner: 'Show JSON messages in log' @@ -110,7 +126,14 @@ def supervisor log_settings['json'] = options[:json] end - RSMP::Supervisor.new(supervisor_settings:settings,log_settings:log_settings).start + Async do |task| + task.annotate 'cli' + supervisor = RSMP::Supervisor.new(supervisor_settings:settings,log_settings:log_settings) + supervisor.start + supervisor.wait + end + rescue Interrupt + # ctrl-c rescue RSMP::ConfigurationError => e puts "Cannot start supervisor: #{e}" end diff --git a/lib/rsmp/collect/state_collector.rb b/lib/rsmp/collect/state_collector.rb index 3b914013..dd0ab4c2 100644 --- a/lib/rsmp/collect/state_collector.rb +++ b/lib/rsmp/collect/state_collector.rb @@ -25,7 +25,7 @@ module RSMP # matches this input: # # {"cCI"=>"M0104", "n"=>"month", "v"=>"9", "age"=>"recent"} - # + # # And the result is stored as: # { # {"cCI"=>"M0104", "cO"=>"setDate", "n"=>"month", "v"=>/\d+/} => diff --git a/lib/rsmp/components.rb b/lib/rsmp/components.rb index e346b5a5..a0f573d0 100644 --- a/lib/rsmp/components.rb +++ b/lib/rsmp/components.rb @@ -3,7 +3,7 @@ module RSMP module Components attr_reader :components - + def initialize_components @components = {} end @@ -25,10 +25,10 @@ def setup_components settings def check_main_component settings unless settings['main'] && settings['main'].size >= 1 - raise ConfigurationError.new("main component must be defined") + raise ConfigurationError.new("main component must be defined") end if settings['main'].size > 1 - raise ConfigurationError.new("only one main component can be defined, found #{settings['main'].keys.join(', ')}") + raise ConfigurationError.new("only one main component can be defined, found #{settings['main'].keys.join(', ')}") end end diff --git a/lib/rsmp/convert/export/json_schema.rb b/lib/rsmp/convert/export/json_schema.rb index 205f8823..63e71fac 100644 --- a/lib/rsmp/convert/export/json_schema.rb +++ b/lib/rsmp/convert/export/json_schema.rb @@ -23,7 +23,7 @@ def self.output_json item def self.build_value item out = {} - + if item['description'] out["description"] = item['description'] end @@ -95,7 +95,7 @@ def self.output_alarms out, items } end json = { - "properties" => { + "properties" => { "aCId" => { "enum" => items.keys.sort }, "rvs" => { "items" => { "allOf" => list } } } @@ -175,7 +175,7 @@ def self.output_root out } ] } - out["sxl.json"] = output_json json + out["sxl.json"] = output_json json end def self.generate sxl @@ -192,7 +192,7 @@ def self.write sxl, folder out.each_pair do |relative_path,str| path = File.join(folder, relative_path) FileUtils.mkdir_p File.dirname(path) # create folders if needed - file = File.open(path, 'w+') # w+ means truncate or create new file + file = File.open(path, 'w+') # w+ means truncate or create new file file.puts str end end diff --git a/lib/rsmp/convert/import/yaml.rb b/lib/rsmp/convert/import/yaml.rb index fce7b1a1..8ce70c14 100644 --- a/lib/rsmp/convert/import/yaml.rb +++ b/lib/rsmp/convert/import/yaml.rb @@ -24,7 +24,7 @@ def self.convert yaml commands: {} } - yaml['objects'].each_pair do |type,object| + yaml['objects'].each_pair do |type,object| object["alarms"].each { |id,item| sxl[:alarms][id] = item } object["statuses"].each { |id,item| sxl[:statuses][id] = item } object["commands"].each { |id,item| sxl[:commands][id] = item } diff --git a/lib/rsmp/error.rb b/lib/rsmp/error.rb index 444b7eb0..8b03de3d 100644 --- a/lib/rsmp/error.rb +++ b/lib/rsmp/error.rb @@ -64,7 +64,4 @@ class RepeatedAlarmError < Error class RepeatedStatusError < Error end - - class TimestampError < Error - end end diff --git a/lib/rsmp/inspect.rb b/lib/rsmp/inspect.rb index cc1ee2a2..c698738f 100644 --- a/lib/rsmp/inspect.rb +++ b/lib/rsmp/inspect.rb @@ -1,5 +1,5 @@ # Costume inspect, to reduce noise -# +# # Instance variables of classes starting with Async or RSMP are shown # with only their class name and object id, which reduces output, # especially for deep object structures. diff --git a/lib/rsmp/logger.rb b/lib/rsmp/logger.rb index d223526b..a1032071 100644 --- a/lib/rsmp/logger.rb +++ b/lib/rsmp/logger.rb @@ -2,7 +2,7 @@ module RSMP class Logger attr_accessor :settings - + def initialize settings={} defaults = { 'active'=>true, @@ -115,7 +115,7 @@ def output? item, force=false end end end - return false if ack && @settings["acknowledgements"] == false && + return false if ack && @settings["acknowledgements"] == false && [:not_acknowledged,:warning,:error].include?(item[:level]) == false end true @@ -159,7 +159,7 @@ def colorize level, str def log item, force:false if output?(item, force) - output item[:level], build_output(item) + output item[:level], build_output(item) end end @@ -169,7 +169,7 @@ def self.shorten_message_id m_id, length=4 else ' '*length end - end + end def dump archive, force:false, num:nil num ||= archive.items.size @@ -183,7 +183,7 @@ def dump archive, force:false, num:nil def build_part parts, item, key, &block skey = key.to_s return unless @settings[skey] - + part = item[key] part = yield part if block part = part.to_s diff --git a/lib/rsmp/logging.rb b/lib/rsmp/logging.rb index c29762a7..e0758717 100644 --- a/lib/rsmp/logging.rb +++ b/lib/rsmp/logging.rb @@ -8,7 +8,7 @@ module Logging def initialize_logging options @archive = options[:archive] || RSMP::Archive.new - @logger = options[:logger] || RSMP::Logger.new(options[:log_settings]) + @logger = options[:logger] || RSMP::Logger.new(options[:log_settings]) end def author diff --git a/lib/rsmp/message.rb b/lib/rsmp/message.rb index 36caf323..8073f165 100644 --- a/lib/rsmp/message.rb +++ b/lib/rsmp/message.rb @@ -188,7 +188,7 @@ def versions class Unknown < Message end - class AggregatedStatus < Message + class AggregatedStatus < Message def initialize attributes = {} super({ "type" => "AggregatedStatus", diff --git a/lib/rsmp/node.rb b/lib/rsmp/node.rb index f2331f55..ea0c8e81 100644 --- a/lib/rsmp/node.rb +++ b/lib/rsmp/node.rb @@ -5,12 +5,13 @@ class Node include Logging include Wait include Inspect + include Task attr_reader :archive, :logger, :task, :deferred, :error_queue, :clock, :collector def initialize options initialize_logging options - @task = options[:task] + initialize_task @deferred = [] @clock = Clock.new @error_queue = Async::Queue.new @@ -18,6 +19,13 @@ def initialize options @collect = options[:collect] end + # stop proxies, then call super + def stop_subtasks + @proxies.each { |proxy| proxy.stop } + @proxies.clear + super + end + def ignore_errors classes, &block was, @ignore_errors = @ignore_errors, [classes].flatten yield @@ -50,55 +58,13 @@ def do_deferred item def clear_deferred @deferred.clear - end - - def do_start task - task.annotate self.class.to_s - @task = task - start_action - idle - end - - def start - starting - if @task - do_start @task - else - Async do |task| - do_start task - end - end - rescue Errno::EADDRINUSE => e - log "Cannot start: #{e.to_s}", level: :error - rescue SystemExit, SignalException, Interrupt - @logger.unmute_all - exiting - end - - def idle - loop do - @task.sleep 60 - end - end - - def stop - @task.stop if @task - end - - def restart - stop - start - end - - def exiting - log "Exiting", level: :info end def check_required_settings settings, required raise ArgumentError.new "Settings is empty" unless settings required.each do |setting| raise ArgumentError.new "Missing setting: #{setting}" unless settings.include? setting.to_s - end + end end def author diff --git a/lib/rsmp/proxy.rb b/lib/rsmp/proxy.rb index b7ebbb57..98674fe5 100644 --- a/lib/rsmp/proxy.rb +++ b/lib/rsmp/proxy.rb @@ -1,8 +1,10 @@ -# Logging class for a connection to a remote site or supervisor. +# A connection to a remote site or supervisor. +# Uses the Task module to handle asyncronous work, but adds +# the concept of a connection that can be connected or disconnected. require 'rubygems' -module RSMP +module RSMP class Proxy WRAPPING_DELIMITER = "\f" @@ -10,23 +12,94 @@ class Proxy include Wait include Notifier include Inspect + include Task - attr_reader :state, :archive, :connection_info, :sxl, :task, :collector, :ip, :port + attr_reader :state, :archive, :connection_info, :sxl, :collector, :ip, :port def initialize options initialize_logging options initialize_distributor + initialize_task setup options clear + @state = :disconnected end + def disconnect + end + + # wait for the reader task to complete, + # which is not expected to happen before the connection is closed + def wait_for_reader + @reader.wait if @reader + end + + # close connection, but keep our main task running so we can reconnect + def close + log "Closing connection", level: :warning + close_stream + close_socket + set_state :disconnected + notify_error DisconnectError.new("Connection was closed") + stop_timer + end + + def stop_subtasks + stop_timer + stop_reader + clear + super + end + + def stop_timer + return unless @timer + @timer.stop + @timer = nil + end + + def stop_reader + return unless @reader + @reader.stop + @reader = nil + end + + def close_stream + return unless @stream + @stream.close + @stream = nil + end + + def close_socket + return unless @socket + @socket.close + @socket = nil + end + + def terminate + close + super + end + + # change our state + def set_state state + return if state == @state + @state = state + state_changed + end + + # the state changed + # override to to things like notifications + def state_changed + @state_condition.signal @state + end + + # revive after a reconnect def revive options setup options end def setup options @settings = options[:settings] - @task = options[:task] @socket = options[:socket] @stream = options[:stream] @protocol = options[:protocol] @@ -35,7 +108,6 @@ def setup options @connection_info = options[:info] @sxl = nil @site_settings = nil # can't pick until we know the site id - @state = :stopped if options[:collect] @collector = RSMP::Collector.new self, options[:collect] @collector.start @@ -52,35 +124,16 @@ def clock node.clock end - def run - start - @reader.wait if @reader - ensure - stop unless [:stopped, :stopping].include? @state - end - def ready? @state == :ready end def connected? - @state == :starting || @state == :ready + @state == :connected || @state == :ready end - - def start - set_state :starting - end - - def stop - return if @state == :stopped - set_state :stopping - stop_tasks - notify_error DisconnectError.new("Connection was closed") - ensure - close_socket - clear - set_state :stopped + def disconnected? + @state == :disconnected end def clear @@ -97,56 +150,57 @@ def clear @acknowledgement_condition = Async::Notification.new end - def close_socket - if @stream - @stream.close - @stream = nil - end - - if @socket - @socket.close - @socket = nil + # run an async task that reads from @socket + def start_reader + @reader = @task.async do |task| + task.annotate "reader" + run_reader end end - def start_reader - @reader = @task.async do |task| - task.annotate "reader" - @stream ||= Async::IO::Stream.new(@socket) - @protocol ||= Async::IO::Protocol::Line.new(@stream,WRAPPING_DELIMITER) # rsmp messages are json terminated with a form-feed - while json = @protocol.read_line - beginning = Time.now - message = process_packet json - duration = Time.now - beginning - ms = (duration*1000).round(4) - if duration > 0 - per_second = (1.0 / duration).round - else - per_second = Float::INFINITY - end - if message - type = message.type - m_id = Logger.shorten_message_id(message.m_id) - else - type = 'Unknown' - m_id = nil - end - str = [type,m_id,"processed in #{ms}ms, #{per_second}req/s"].compact.join(' ') - log str, level: :statistics - end - rescue Async::Wrapper::Cancelled - # ignore - rescue EOFError - log "Connection closed", level: :warning - rescue IOError => e - log "IOError: #{e}", level: :warning - rescue Errno::ECONNRESET - log "Connection reset by peer", level: :warning - rescue Errno::EPIPE - log "Broken pipe", level: :warning - rescue StandardError => e - notify_error e, level: :internal + def run_reader + @stream ||= Async::IO::Stream.new(@socket) + @protocol ||= Async::IO::Protocol::Line.new(@stream,WRAPPING_DELIMITER) # rsmp messages are json terminated with a form-feed + loop do + read_line + end + rescue Restart + log "Closing connection", level: :warning + raise + rescue Async::Wrapper::Cancelled + # ignore exceptions raised when a wait is aborted because a task is stopped + rescue EOFError, Async::Stop + log "Connection closed", level: :warning + rescue IOError => e + log "IOError: #{e}", level: :warning + rescue Errno::ECONNRESET + log "Connection reset by peer", level: :warning + rescue Errno::EPIPE + log "Broken pipe", level: :warning + rescue StandardError => e + notify_error e, level: :internal + end + + def read_line + json = @protocol.read_line + beginning = Time.now + message = process_packet json + duration = Time.now - beginning + ms = (duration*1000).round(4) + if duration > 0 + per_second = (1.0 / duration).round + else + per_second = Float::INFINITY + end + if message + type = message.type + m_id = Logger.shorten_message_id(message.m_id) + else + type = 'Unknown' + m_id = nil end + str = [type,m_id,"processed in #{ms}ms, #{per_second}req/s"].compact.join(' ') + log str, level: :statistics end def notify_error e, options={} @@ -160,36 +214,40 @@ def start_watchdog end def start_timer + return if @timer name = "timer" interval = @site_settings['intervals']['timer'] || 1 log "Starting #{name} with interval #{interval} seconds", level: :debug @latest_watchdog_received = Clock.now - @timer = @task.async do |task| task.annotate "timer" - next_time = Time.now.to_f - loop do - begin - now = Clock.now - timer(now) - rescue RSMP::Schemer::Error => e - puts "Timer: Schema error: #{e}" - rescue EOFError => e - log "Timer: Connection closed: #{e}", level: :warning - rescue IOError => e - log "Timer: IOError", level: :warning - rescue Errno::ECONNRESET - log "Timer: Connection reset by peer", level: :warning - rescue Errno::EPIPE => e - log "Timer: Broken pipe", level: :warning - rescue StandardError => e - notify_error e, level: :internal - end - ensure - next_time += interval - duration = next_time - Time.now.to_f - task.sleep duration + run_timer task, interval + end + end + + def run_timer task, interval + next_time = Time.now.to_f + loop do + begin + now = Clock.now + timer(now) + rescue RSMP::Schemer::Error => e + log "Timer: Schema error: #{e}", level: :warning + rescue EOFError => e + log "Timer: Connection closed: #{e}", level: :warning + rescue IOError => e + log "Timer: IOError", level: :warning + rescue Errno::ECONNRESET + log "Timer: Connection reset by peer", level: :warning + rescue Errno::EPIPE => e + log "Timer: Broken pipe", level: :warning + rescue StandardError => e + notify_error e, level: :internal end + ensure + next_time += interval + duration = next_time - Time.now.to_f + task.sleep duration end end @@ -200,7 +258,7 @@ def timer now end def watchdog_send_timer now - return unless @watchdog_started + return unless @watchdog_started return if @site_settings['intervals']['watchdog'] == :never if @latest_watchdog_send_at == nil send_watchdog now @@ -226,9 +284,13 @@ def check_ack_timeout now @awaiting_acknowledgement.clone.each_pair do |m_id, message| latest = message.timestamp + timeout if now > latest - log "No acknowledgements for #{message.type} #{message.m_id_short} within #{timeout} seconds", level: :error - stop - notify_error MissingAcknowledgment.new('No ack') + str = "No acknowledgements for #{message.type} #{message.m_id_short} within #{timeout} seconds" + log str, level: :error + begin + close + ensure + notify_error MissingAcknowledgment.new(str) + end end end end @@ -238,16 +300,16 @@ def check_watchdog_timeout now latest = @latest_watchdog_received + timeout left = latest - now if left < 0 - log "No Watchdog within #{timeout} seconds", level: :error - stop + str = "No Watchdog within #{timeout} seconds" + log str, level: :error + begin + close # this will stop the current task (ourself) + ensure + notify_error MissingWatchdog.new(str) # but ensure block will still be reached + end end end - def stop_tasks - @timer.stop if @timer - @reader.stop if @reader - end - def log str, options={} super str, options.merge(ip: @ip, port: @port, site_id: @site_id) end @@ -355,7 +417,7 @@ def process_packet json str = "Rejected #{message.type}," notify_error e.exception(str), message: message dont_acknowledge message, str, reason - stop + close message ensure node.clear_deferred @@ -436,19 +498,14 @@ def dont_acknowledge original, prefix=nil, reason=nil send_message message, "for #{original.type} #{original.m_id_short}" end - def set_state state - @state = state - @state_condition.signal @state - end - - def wait_for_state state, timeout + def wait_for_state state, timeout: states = [state].flatten return if states.include?(@state) - wait_for(@state_condition,timeout) do + wait_for_condition(@state_condition,timeout: timeout) do states.include?(@state) end @state - rescue Async::TimeoutError + rescue RSMP::TimeoutError raise RSMP::TimeoutError.new "Did not reach state #{state} within #{timeout}s" end @@ -557,10 +614,10 @@ def expect_version_message message end end - def connection_complete + def handshake_complete set_state :ready end - + def version_acknowledged end @@ -576,6 +633,7 @@ def send_and_optionally_collect message, options, &block collect_options = options[:collect] || options[:collect!] if collect_options task = @task.async do |task| + task.annotate 'send_and_optionally_collect' collector = yield collect_options # call block to create collector collector.collect collector.ok! if options[:collect!] # raise any errors if the bang version was specified diff --git a/lib/rsmp/rsmp.rb b/lib/rsmp/rsmp.rb index fc337709..851e17e8 100644 --- a/lib/rsmp/rsmp.rb +++ b/lib/rsmp/rsmp.rb @@ -1,5 +1,5 @@ # Get the current time in UTC, with optional adjustment -# Convertion to string uses the RSMP format 2015-06-08T12:01:39.654Z +# Convertion to string uses the RSMP format 2015-06-08T12:01:39.654Z # Note that using to_s on a my_clock.to_s will not produce an RSMP formatted timestamp, # you need to use Clock.to_s my_clock diff --git a/lib/rsmp/site.rb b/lib/rsmp/site.rb index a4979f79..afa90e58 100644 --- a/lib/rsmp/site.rb +++ b/lib/rsmp/site.rb @@ -15,6 +15,8 @@ def initialize options={} @proxies = [] @sleep_condition = Async::Notification.new @proxies_condition = Async::Notification.new + + build_proxies end def site_id @@ -62,25 +64,29 @@ def check_sxl_version RSMP::Schemer::find_schema! sxl, version, lenient: true end - def reconnect - @sleep_condition.signal + def run + log "Starting site #{@site_settings["site_id"]}", + level: :info, + timestamp: @clock.now + @proxies.each { |proxy| proxy.start } + @proxies.each { |proxy| proxy.wait } end - def start_action + def build_proxies @site_settings["supervisors"].each do |supervisor_settings| - @task.async do |task| - task.annotate "site proxy" - connect_to_supervisor task, supervisor_settings - rescue StandardError => e - notify_error e, level: :internal - end + @proxies << SupervisorProxy.new({ + site: self, + task: @task, + settings: @site_settings, + ip: supervisor_settings['ip'], + port: supervisor_settings['port'], + logger: @logger, + archive: @archive, + collect: @collect + }) end end - def build_proxy settings - SupervisorProxy.new settings - end - def aggregated_status_changed component, options={} @proxies.each do |proxy| proxy.send_aggregated_status component, options if proxy.ready? @@ -91,7 +97,7 @@ def connect_to_supervisor task, supervisor_settings proxy = build_proxy({ site: self, task: @task, - settings: @site_settings, + settings: @site_settings, ip: supervisor_settings['ip'], port: supervisor_settings['port'], logger: @logger, @@ -99,63 +105,20 @@ def connect_to_supervisor task, supervisor_settings collect: @collect }) @proxies << proxy + proxy.start @proxies_condition.signal - run_site_proxy task, proxy - ensure - @proxies.delete proxy - @proxies_condition.signal - end - - def run_site_proxy task, proxy - loop do - proxy.run # run until disconnected - rescue IOError => e - log "Stream error: #{e}", level: :warning - rescue StandardError => e - notify_error e, level: :internal - ensure - begin - if @site_settings['intervals']['watchdog'] != :no - # sleep until waken by reconnect() or the reconnect interval passed - proxy.set_state :wait_for_reconnect - task.with_timeout(@site_settings['intervals']['watchdog']) do - @sleep_condition.wait - end - else - proxy.set_state :cannot_connect - break - end - rescue Async::TimeoutError - # ignore - end - end end + # stop def stop log "Stopping site #{@site_settings["site_id"]}", level: :info - @proxies.each do |proxy| - proxy.stop - end - @proxies.clear super end - - def starting - log "Starting site #{@site_settings["site_id"]}", - level: :info, - timestamp: @clock.now - end - - def alarm - @proxies.each do |proxy| - proxy.stop - end - end def wait_for_supervisor ip, timeout supervisor = find_supervisor ip return supervisor if supervisor - wait_for(@proxy_condition,timeout) { find_supervisor ip } + wait_for_condition(@proxy_condition,timeout:timeout) { find_supervisor ip } rescue Async::TimeoutError raise RSMP::TimeoutError.new "Supervisor '#{ip}' did not connect within #{timeout}s" end diff --git a/lib/rsmp/site_proxy.rb b/lib/rsmp/site_proxy.rb index 19b01b4e..0a9b0117 100644 --- a/lib/rsmp/site_proxy.rb +++ b/lib/rsmp/site_proxy.rb @@ -1,6 +1,6 @@ # Handles a supervisor connection to a remote client -module RSMP +module RSMP class SiteProxy < Proxy include Components @@ -14,33 +14,37 @@ def initialize options @site_id = options[:site_id] end + # handle communication + # when we're created, the socket is already open + def run + set_state :connected + start_reader + wait_for_reader # run until disconnected + rescue RSMP::ConnectionError => e + log e, level: :error + rescue StandardError => e + notify_error e, level: :internal + ensure + close + end + def revive options super options @supervisor = options[:supervisor] @settings = @supervisor.supervisor_settings.clone end - def inspect "#<#{self.class.name}:#{self.object_id}, #{inspector( :@acknowledgements,:@settings,:@site_settings,:@components )}>" end + def node supervisor end - def start - super - start_reader - end - - def stop - log "Closing connection to site", level: :info - super - end - - def connection_complete + def handshake_complete super sanitized_sxl_version = RSMP::Schemer.sanitize_version(@site_sxl_version) log "Connection to site #{@site_id} established, using core #{@rsmp_version}, #{@sxl} #{sanitized_sxl_version}", level: :info @@ -68,7 +72,7 @@ def process_message message else super message end - rescue RSMP::RepeatedAlarmError, RSMP::RepeatedStatusError, TimestampError => e + rescue RSMP::RepeatedAlarmError, RSMP::RepeatedStatusError str = "Rejected #{message.type} message," dont_acknowledge message, str, "#{e}" notify_error e.exception("#{str}#{e.message} #{message.json}") @@ -148,7 +152,7 @@ def process_alarm message end def version_acknowledged - connection_complete + handshake_complete end def process_watchdog message @@ -196,7 +200,7 @@ def process_status_response message def subscribe_to_status component_id, status_list, options={} validate_ready 'subscribe to status' m_id = options[:m_id] || RSMP::Message.make_m_id - + # additional items can be used when verifying the response, # but must to remove from the subscribe message subscribe_list = status_list.map { |item| item.slice('sCI','n','uRt') } @@ -324,7 +328,7 @@ def find_site_settings site_id log "Using site settings for guest", level: :debug return @settings['guest'] end - + nil end diff --git a/lib/rsmp/site_proxy_wait.rb b/lib/rsmp/site_proxy_wait.rb deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/rsmp/supervisor.rb b/lib/rsmp/supervisor.rb index 44c51697..f1e08442 100644 --- a/lib/rsmp/supervisor.rb +++ b/lib/rsmp/supervisor.rb @@ -58,26 +58,31 @@ def check_site_sxl_types end end - def start_action + # listen for connections + # Async::IO::Endpoint#accept createa an async task that we will wait for + def run + log "Starting supervisor on port #{@supervisor_settings["port"]}", + level: :info, + timestamp: @clock.now + @endpoint = Async::IO::Endpoint.tcp('0.0.0.0', @supervisor_settings["port"]) - @endpoint.accept do |socket| # creates async tasks + tasks = @endpoint.accept do |socket| # creates async tasks handle_connection(socket) rescue StandardError => e notify_error e, level: :internal end + tasks.each { |task| task.wait } rescue StandardError => e notify_error e, level: :internal end + # stop def stop log "Stopping supervisor #{@supervisor_settings["site_id"]}", level: :info - @proxies.each { |proxy| proxy.stop } - @proxies.clear super - @tcp_server.close if @tcp_server - @tcp_server = nil end + # handle an incoming connction by either accepting of rejecting it def handle_connection socket remote_port = socket.remote_address.ip_port remote_hostname = socket.remote_address.ip_address @@ -85,9 +90,9 @@ def handle_connection socket info = {ip:remote_ip, port:remote_port, hostname:remote_hostname, now:Clock.now} if accept? socket, info - connect socket, info + accept_connection socket, info else - reject socket, info + reject_connection socket, info end rescue ConnectionError => e log "Rejected connection from #{remote_ip}:#{remote_port}, #{e.to_s}", level: :warning @@ -99,12 +104,6 @@ def handle_connection socket close socket, info end - def starting - log "Starting supervisor on port #{@supervisor_settings["port"]}", - level: :info, - timestamp: @clock.now - end - def accept? socket, info true end @@ -143,7 +142,8 @@ def peek_version_message protocol message.attribute('siteId').first['sId'] end - def connect socket, info + # accept an incoming connecting by creating and starting a proxy + def accept_connection socket, info log "Site connected from #{format_ip_and_port(info)}", ip: info[:ip], port: info[:port], @@ -182,7 +182,8 @@ def connect socket, info proxy = build_proxy settings.merge(site_id:id) # keep the id learned by peeking above @proxies.push proxy end - proxy.run # will run until the site disconnects + proxy.start # will run until the site disconnects + proxy.wait ensure site_ids_changed stop if @supervisor_settings['one_shot'] @@ -192,7 +193,7 @@ def site_ids_changed @site_id_condition.signal end - def reject socket, info + def reject_connection socket, info log "Site rejected", ip: info[:ip], level: :info end @@ -224,10 +225,13 @@ def find_site site_id nil end - def wait_for_site site_id, timeout + def wait_for_site site_id, timeout: site = find_site site_id return site if site - wait_for(@site_id_condition,timeout) { find_site site_id } + wait_for_condition(@site_id_condition,timeout:timeout) do + find_site site_id + end + rescue Async::TimeoutError if site_id == :any str = "No site connected" @@ -237,8 +241,8 @@ def wait_for_site site_id, timeout raise RSMP::TimeoutError.new "#{str} within #{timeout}s" end - def wait_for_site_disconnect site_id, timeout - wait_for(@site_id_condition,timeout) { true unless find_site site_id } + def wait_for_site_disconnect site_id, timeout: + wait_for_condition(@site_id_condition,timeout:timeout) { true unless find_site site_id } rescue Async::TimeoutError raise RSMP::TimeoutError.new "Site '#{site_id}' did not disconnect within #{timeout}s" end diff --git a/lib/rsmp/supervisor_proxy.rb b/lib/rsmp/supervisor_proxy.rb index 6d1424a6..326449db 100644 --- a/lib/rsmp/supervisor_proxy.rb +++ b/lib/rsmp/supervisor_proxy.rb @@ -2,7 +2,7 @@ require 'digest' -module RSMP +module RSMP class SupervisorProxy < Proxy attr_reader :supervisor_id, :site @@ -22,44 +22,67 @@ def node site end - def start + # handle communication + # if disconnected, then try to reconnect + def run + loop do + connect + start_reader + start_handshake + wait_for_reader # run until disconnected + break if reconnect_delay == false + rescue Restart + @logger.mute @ip, @port + raise + rescue RSMP::ConnectionError => e + log e, level: :error + break if reconnect_delay == false + rescue StandardError => e + notify_error e, level: :internal + break if reconnect_delay == false + ensure + close + stop_subtasks + end + end + + def start_handshake + send_version @site_settings['site_id'], @site_settings["rsmp_versions"] + end + + # connect to the supervisor and initiate handshake supervisor + def connect log "Connecting to supervisor at #{@ip}:#{@port}", level: :info - super - connect + set_state :connecting + connect_tcp @logger.unmute @ip, @port log "Connected to supervisor at #{@ip}:#{@port}", level: :info - start_reader - send_version @site_settings['site_id'], @site_settings["rsmp_versions"] rescue SystemCallError => e - log "Could not connect to supervisor at #{@ip}:#{@port}: Errno #{e.errno} #{e}", level: :error - retry_notice + raise ConnectionError.new "Could not connect to supervisor at #{@ip}:#{@port}: Errno #{e.errno} #{e}" rescue StandardError => e - log "Error while connecting to supervisor at #{@ip}:#{@port}: #{e}", level: :error - retry_notice - end - - def retry_notice - unless @site.site_settings['intervals']['reconnect'] == :no - log "Will try to reconnect again every #{@site.site_settings['intervals']['reconnect']} seconds..", level: :info - @logger.mute @ip, @port - end + raise ConnectionError.new "Error while connecting to supervisor at #{@ip}:#{@port}: #{e}" end - def stop - log "Closing connection to supervisor", level: :info + def terminate super @last_status_sent = nil end - def connect - return if @socket + def connect_tcp @endpoint = Async::IO::Endpoint.tcp(@ip, @port) - @socket = @endpoint.connect + + # Async::IO::Endpoint#connect renames the current task. run in a subtask to avoid this + @task.async do |task| + task.annotate 'socket task' + @socket = @endpoint.connect + end.wait + @stream = Async::IO::Stream.new(@socket) @protocol = Async::IO::Protocol::Line.new(@stream,WRAPPING_DELIMITER) # rsmp messages are json terminated with a form-feed + set_state :connected end - def connection_complete + def handshake_complete super sanitized_sxl_version = RSMP::Schemer.sanitize_version(sxl_version) log "Connection to supervisor established, using core #{@rsmp_version}, #{sxl} #{sanitized_sxl_version}", level: :info @@ -114,16 +137,19 @@ def send_all_aggregated_status end def reconnect_delay + return false if @site_settings['intervals']['reconnect'] == :no interval = @site_settings['intervals']['reconnect'] - log "Waiting #{interval} seconds before trying to reconnect", level: :info + log "Will try to reconnect again every #{interval} seconds...", level: :info + @logger.mute @ip, @port @task.sleep interval + true end def version_accepted message log "Received Version message, using RSMP #{@rsmp_version}", message: message, level: :log start_timer acknowledge message - connection_complete + handshake_complete @version_determined = true end @@ -234,7 +260,7 @@ def process_status_subcribe message update_list = {} component = message.attributes["cId"] @status_subscriptions[component] ||= {} - update_list[component] ||= {} + update_list[component] ||= {} now = Time.now # internal timestamp subs = @status_subscriptions[component] @@ -299,7 +325,7 @@ def status_update_timer now by_name.each_pair do |name,subscription| current = nil should_send = false - if subscription[:interval] == 0 + if subscription[:interval] == 0 # send as soon as the data changes if component_object current, age = *(component_object.get_status code, name) diff --git a/lib/rsmp/task.rb b/lib/rsmp/task.rb new file mode 100644 index 00000000..6cae0be9 --- /dev/null +++ b/lib/rsmp/task.rb @@ -0,0 +1,78 @@ +module RSMP + class Restart < StandardError + end + + module Task + attr_reader :task + + def initialize_task + @task = nil + end + + # start our async tasks and return immediately + # run() will be called inside the task to perform actual long-running work + def start + return if @task + Async do |task| + task.annotate "#{self.class.name} main task" + @task = task + run + stop_subtasks + @task = nil + end + self + end + + # initiate restart by raising a Restart exception + def restart + raise Restart.new "restart initiated by #{self.class.name}:#{object_id}" + end + + # get the status of our task, or nil of no task + def status + @task.status if @task + end + + # perform any long-running work + # the method will be called from an async task, and should not return + # if subtasks are needed, the method should call wait() on each of them + # once running, ready() must be called + def run + start_subtasks + end + + # wait for our task to complete + def wait + @task.wait if @task + end + + # stop our task + def stop + stop_subtasks + terminate if @task + end + + def stop_subtasks + end + + def self.print_hierarchy task=Async::Task.current.reactor, level=0 + if task.parent + status = task.status + puts "#{'. '*level}#{task.object_id} #{task.annotation.to_s}: #{status}" + else + puts "#{'. '*level}#{task.object_id} reactor" + end + task.children&.each do |child| + print_hierarchy child, level+1 + end + end + + + # stop our task and any subtask + def terminate + @task.stop + @task = nil + end + + end +end \ No newline at end of file diff --git a/lib/rsmp/tlc/signal_group.rb b/lib/rsmp/tlc/signal_group.rb index 4a37c022..a811552a 100644 --- a/lib/rsmp/tlc/signal_group.rb +++ b/lib/rsmp/tlc/signal_group.rb @@ -9,25 +9,27 @@ def initialize node:, id: end def timer - @state = get_state + @state = compute_state end - def get_state + def compute_state return 'a' if node.main.dark_mode return 'c' if node.main.yellow_flash cycle_counter = node.main.cycle_counter if node.main.startup_sequence_active - @state = node.main.startup_state || 'a' + return node.main.startup_state || 'a' end default = 'a' # phase a means disabled/dark plan = node.main.current_plan return default unless plan return default unless plan.states + states = plan.states[c_id] return default unless states + state = states[cycle_counter] return default unless state =~ /[a-hA-G0-9N-P]/ # valid signal group states state diff --git a/lib/rsmp/tlc/signal_plan.rb b/lib/rsmp/tlc/signal_plan.rb index 4ac6e68a..bb8aa138 100644 --- a/lib/rsmp/tlc/signal_plan.rb +++ b/lib/rsmp/tlc/signal_plan.rb @@ -1,8 +1,8 @@ module RSMP module TLC # A Traffic Light Controller Signal Plan. - # A signal plan is a description of how all signal groups should change - # state over time. + # A signal plan is a description of how all signal groups should change + # state over time. class SignalPlan attr_reader :nr, :states, :dynamic_bands def initialize nr:, states:, dynamic_bands: diff --git a/lib/rsmp/tlc/traffic_controller.rb b/lib/rsmp/tlc/traffic_controller.rb index d8d00d9c..2a595c91 100644 --- a/lib/rsmp/tlc/traffic_controller.rb +++ b/lib/rsmp/tlc/traffic_controller.rb @@ -23,24 +23,29 @@ def initialize node:, id:, cycle_time: 10, signal_plans:, reset end - def reset - @cycle_counter = 0 - @plan = 1 + def reset_modes @dark_mode = true @yellow_flash = false @booting = false - @control_mode = 'control' - @police_key = 0 - @intersection = 0 @is_starting = false - @emergency_route = false - @emergency_route_number = 0 - @traffic_situation = 0 + @control_mode = 'control' @manual_control = false @fixed_time_control = false @isolated_control = false @yellow_flash = false @all_red = false + @police_key = 0 + end + + def reset + reset_modes + + @cycle_counter = 0 + @plan = 1 + @intersection = 0 + @emergency_route = false + @emergency_route_number = 0 + @traffic_situation = 0 @inputs = '0'*@num_inputs @input_activations = '0'*@num_inputs @@ -76,12 +81,14 @@ def add_detector_logic logic def timer now # TODO use monotone timer, to avoid jumps in case the user sets the system time - @signal_groups.each { |group| group.timer } time = Time.now.to_i return if time == @time_int @time_int = time move_cycle_counter move_startup_sequence if @startup_sequence_active + + @signal_groups.each { |group| group.timer } + output_states end @@ -98,6 +105,8 @@ def startup_state def initiate_startup_sequence log "Initiating startup sequence", level: :info + reset_modes + @dark_mode = false @startup_sequence_active = true @startup_sequence_initiated_at = nil @startup_sequence_pos = nil @@ -107,7 +116,6 @@ def end_startup_sequence @startup_sequence_active = false @startup_sequence_initiated_at = nil @startup_sequence_pos = nil - @yellow_flash = false @dark_mode = false end @@ -127,26 +135,40 @@ def move_startup_sequence def output_states return unless @live_output + str = @signal_groups.map do |group| - s = "#{group.c_id}:#{group.state}" - if group.state =~ /^[1-9]$/ + state = group.state + s = "#{group.c_id}:#{state}" + if state =~ /^[1-9]$/ s.colorize(:green) - elsif group.state =~ /^[NOP]$/ + elsif state =~ /^[NOP]$/ s.colorize(:yellow) - elsif group.state =~ /^[ae]$/ - s.colorize(:black) - elsif group.state =~ /^[f]$/ + elsif state =~ /^[ae]$/ + s.colorize(:light_black) + elsif state =~ /^[f]$/ s.colorize(:yellow) - elsif group.state =~ /^[g]$/ + elsif state =~ /^[g]$/ s.colorize(:red) else s.colorize(:red) end end.join ' ' + + modes = '.'*9 + modes[0] = 'B' if @booting + modes[1] = 'S' if @startup_sequence_active + modes[2] = 'D' if @dark_mode + modes[3] = 'Y' if @yellow_flash + modes[4] = 'M' if @manual_control + modes[5] = 'F' if @fixed_time_control + modes[6] = 'R' if @all_red + modes[7] = 'I' if @isolated_control + modes[8] = 'P' if @police_key != 0 + plan = "P#{@plan}" File.open @live_output, 'w' do |file| - file.puts "#{plan.rjust(4)} #{pos.to_s.rjust(4)} #{str}\r" + file.puts "#{modes} #{plan.rjust(2)} #{@cycle_counter.to_s.rjust(3)} #{str}\r" end end @@ -316,7 +338,7 @@ def set_input i, value return unless i>=0 && i<@num_inputs @inputs[i] = (arg['value'] ? '1' : '0') end - + def set_fixed_time_control status @fixed_time_control = status end diff --git a/lib/rsmp/tlc/traffic_controller_site.rb b/lib/rsmp/tlc/traffic_controller_site.rb index 84f6caf4..3833af39 100644 --- a/lib/rsmp/tlc/traffic_controller_site.rb +++ b/lib/rsmp/tlc/traffic_controller_site.rb @@ -18,6 +18,18 @@ def initialize options={} unless @main raise ConfigurationError.new "TLC must have a main component" end + + end + + def start + super + start_tlc_timer + @main.initiate_startup_sequence + end + + def stop_subtasks + stop_tlc_timer + super end def build_plans signal_plans @@ -56,49 +68,45 @@ def build_component id:, type:, settings:{} end end - def start_action - super - start_timer - @main.initiate_startup_sequence - end - - def start_timer + def start_tlc_timer task_name = "tlc timer" log "Starting #{task_name} with interval #{@interval} seconds", level: :debug @timer = @task.async do |task| - task.annotate task_name - next_time = Time.now.to_f - loop do - begin - timer(@clock.now) - rescue EOFError => e - log "Connection closed: #{e}", level: :warning - rescue IOError => e - log "IOError", level: :warning - rescue Errno::ECONNRESET - log "Connection reset by peer", level: :warning - rescue Errno::EPIPE => e - log "Broken pipe", level: :warning - rescue StandardError => e - notify_error e, level: :internal - ensure - # adjust sleep duration to avoid drift. so wake up always happens on the - # same fractional second. - # note that Time.now is not monotonic. If the clock is changed, - # either manaully or via NTP, the sleep interval might jump. - # an alternative is to use ::Process.clock_gettime(::Process::CLOCK_MONOTONIC), - # to get the current time. this ensures a constant interval, but - # if the clock is changed, the wake up would then happen on a different - # fractional second - next_time += @interval - duration = next_time - Time.now.to_f - task.sleep duration - end + task.annotate task_name + run_tlc_timer task + end + end + + def run_tlc_timer task + next_time = Time.now.to_f + loop do + begin + timer(@clock.now) + rescue StandardError => e + notify_error e, level: :internal + ensure + # adjust sleep duration to avoid drift. so wake up always happens on the + # same fractional second. + # note that Time.now is not monotonic. If the clock is changed, + # either manaully or via NTP, the sleep interval might jump. + # an alternative is to use ::Process.clock_gettime(::Process::CLOCK_MONOTONIC), + # to get the current time. this ensures a constant interval, but + # if the clock is changed, the wake up would then happen on a different + # fractional second + next_time += @interval + duration = next_time - Time.now.to_f + task.sleep duration end end end + def stop_tlc_timer + return unless @timer + @timer.stop + @timer = nil + end + def timer now return unless @main @main.timer now @@ -142,7 +150,6 @@ def do_deferred item when :restart log "Restarting TLC", level: :info restart - initiate_startup_sequence end end end diff --git a/lib/rsmp/wait.rb b/lib/rsmp/wait.rb index 6f4d2400..dfc099f2 100644 --- a/lib/rsmp/wait.rb +++ b/lib/rsmp/wait.rb @@ -2,15 +2,21 @@ module RSMP module Wait # wait for an async condition to signal, then yield to block # if block returns true we're done. otherwise, wait again - def wait_for condition, timeout, &block - raise RuntimeError.new("Can't wait for condition because task is not running") unless @task.running? - @task.with_timeout(timeout) do - while @task.running? do + def wait_for_condition condition, timeout:, task:Async::Task.current, &block + unless task + raise RuntimeError.new("Can't wait without a task") + end + task.with_timeout(timeout) do + while task.running? value = condition.wait - result = yield value - return result if result # return result of check, if not nil + return value unless block + result = yield value + return result if result end + raise RuntimeError.new("Can't wait for condition because task #{task.object_id} #{task.annotation} is not running") end - end + rescue Async::TimeoutError + raise RSMP::TimeoutError.new + end end end \ No newline at end of file diff --git a/spec/collector_spec.rb b/spec/collector_spec.rb index d3300527..1ff1383c 100644 --- a/spec/collector_spec.rb +++ b/spec/collector_spec.rb @@ -328,7 +328,7 @@ end end end - + describe "#wait!" do it "returns messages if already complete" do RSMP::SiteProxyStub.async do |task,proxy| diff --git a/spec/message_spec.rb b/spec/message_spec.rb index d908dc36..393ff283 100644 --- a/spec/message_spec.rb +++ b/spec/message_spec.rb @@ -25,29 +25,29 @@ def build json context 'when parsing json packages' do it 'raises ArgumentError when parsing nil' do - expect { RSMP::Message.parse_attributes(nil) }.to raise_error(ArgumentError) + expect { RSMP::Message.parse_attributes(nil) }.to raise_error(ArgumentError) end it 'raises InvalidPacket when parsing empty string' do - expect { RSMP::Message.parse_attributes('') }.to raise_error(RSMP::InvalidPacket) + expect { RSMP::Message.parse_attributes('') }.to raise_error(RSMP::InvalidPacket) end it 'raises InvalidPacket when parsing whitespace' do - expect { RSMP::Message.parse_attributes(' ') }.to raise_error(RSMP::InvalidPacket) - expect { RSMP::Message.parse_attributes("\t") }.to raise_error(RSMP::InvalidPacket) - expect { RSMP::Message.parse_attributes("\n") }.to raise_error(RSMP::InvalidPacket) - expect { RSMP::Message.parse_attributes("\f") }.to raise_error(RSMP::InvalidPacket) - expect { RSMP::Message.parse_attributes("\r") }.to raise_error(RSMP::InvalidPacket) + expect { RSMP::Message.parse_attributes(' ') }.to raise_error(RSMP::InvalidPacket) + expect { RSMP::Message.parse_attributes("\t") }.to raise_error(RSMP::InvalidPacket) + expect { RSMP::Message.parse_attributes("\n") }.to raise_error(RSMP::InvalidPacket) + expect { RSMP::Message.parse_attributes("\f") }.to raise_error(RSMP::InvalidPacket) + expect { RSMP::Message.parse_attributes("\r") }.to raise_error(RSMP::InvalidPacket) end it 'raises InvalidPacket when parsing invalid JSON ' do - expect { RSMP::Message.parse_attributes('{"a":"1"') }.to raise_error(RSMP::InvalidPacket) - expect { RSMP::Message.parse_attributes('"a":"1"}') }.to raise_error(RSMP::InvalidPacket) - expect { RSMP::Message.parse_attributes('/') }.to raise_error(RSMP::InvalidPacket) + expect { RSMP::Message.parse_attributes('{"a":"1"') }.to raise_error(RSMP::InvalidPacket) + expect { RSMP::Message.parse_attributes('"a":"1"}') }.to raise_error(RSMP::InvalidPacket) + expect { RSMP::Message.parse_attributes('/') }.to raise_error(RSMP::InvalidPacket) end it 'parses valid JSON' do - expect(RSMP::Message.parse_attributes('"string"')).to eq("string") + expect(RSMP::Message.parse_attributes('"string"')).to eq("string") expect(RSMP::Message.parse_attributes('123')).to eq(123) expect(RSMP::Message.parse_attributes('3.14')).to eq(3.14) expect(RSMP::Message.parse_attributes('[1,2,3]')).to eq([1,2,3]) @@ -134,7 +134,7 @@ def build json it 'generates json' do message = RSMP::Version.new(json) - message.generate_json + message.generate_json str = '{"mType":"rSMsg","type":"Version","RSMP":[{"vers":"3.1.1"},{"vers":"3.1.2"},{"vers":"3.1.3"},{"vers":"3.1.4"}],"SXL":"1.1","mId":"8db00f0a-4124-406f-b3f9-ceb0dbe4aeb6","siteId":[{"sId":"RN+SI0001"}]}' expect(message.json).to eq(str) end diff --git a/spec/proxy_spec.rb b/spec/proxy_spec.rb new file mode 100644 index 00000000..af80802d --- /dev/null +++ b/spec/proxy_spec.rb @@ -0,0 +1,92 @@ +RSpec.describe RSMP::Proxy do + let(:options) { {} } + let(:proxy) { RSMP::Proxy.new options } + + describe '#wait_for_state' do + it 'wakes up' do + async_context(terminate:false) do |task| + subtask = task.async do |subtask| + proxy.wait_for_state :connected, timeout: 0.001 + end + proxy.set_state :connected + subtask.result + end + end + + it 'accepts array of states and returns current state' do + async_context(terminate:false) do |task| + subtask = task.async do |subtask| + state = proxy.wait_for_state [:ok,:ready], timeout: 0.001 + expect(state).to eq(:ready) + end + proxy.set_state :ready + subtask.result + end + end + + it 'times out' do + async_context(terminate:false) do |task| + expect { + proxy.wait_for_state :connected, timeout: 0.001 + }.to raise_error(RSMP::TimeoutError) + end + end + + it 'returns immediately if state is already correct' do + async_context(terminate:false) do |task| + proxy.set_state :disconnected + proxy.wait_for_state :disconnected, timeout: 0.001 + end + end + end + + describe '#wait_for_condition without block' do + it 'wakes up' do + async_context(terminate:false) do |task| + condition = Async::Notification.new + subtask = task.async do |subtask| + proxy.wait_for_condition condition, timeout: 0.001 + end + condition.signal + subtask.result + end + end + + it 'times out' do + async_context(terminate:false) do |task| + condition = Async::Notification.new + expect { + proxy.wait_for_condition condition, timeout: 0.001 + }.to raise_error(RSMP::TimeoutError) + end + end + end + + describe '#wait_for_condition with block' do + it 'wakes up' do + async_context(terminate:false) do |task| + condition = Async::Notification.new + result = nil + subtask = task.async do |subtask| + proxy.wait_for_condition condition, timeout: 1 do |state| + result = (state == :banana) + result + end + end + condition.signal :pear + task.yield + expect(result).to be(false) + + condition.signal :apple + task.yield + expect(result).to be(false) + + condition.signal :banana + task.yield + expect(result).to be(true) + + subtask.result + end + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 4a5c8266..e7033fa1 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -16,3 +16,18 @@ end include RSpec + + +def async_context &block + Async do |task| + yield task + task.reactor.stop + end +end + +def async_context terminate:true, &block + Async do |task| + yield task + task.reactor.stop if terminate + end.result +end \ No newline at end of file diff --git a/spec/status_collector_spec.rb b/spec/status_collector_spec.rb index 53a537ca..ebfe213d 100644 --- a/spec/status_collector_spec.rb +++ b/spec/status_collector_spec.rb @@ -34,15 +34,15 @@ def build_status_message status_list s11: {"sCI" => "S0011","n" => "status","s" => "True"}, } } - + describe "#collect" do - + it 'completes with a single status update' do RSMP::SiteProxyStub.async do |task,proxy| collector = StatusCollector.new(proxy, want.values, timeout: timeout) expect(collector.summary).to eq([false,false,false]) expect(collector.done?).to be(false) - + collector.start collector.notify build_status_message(ok.values) expect(collector.summary).to eq([true,true,true]) @@ -86,7 +86,7 @@ def build_status_message status_list expect(collector.summary).to eq([true,true,false]) expect(collector.done?).to be(false) - collector.notify build_status_message(reject[:s5]) # clear s5 + collector.notify build_status_message(reject[:s5]) # clear s5 expect(collector.summary).to eq([false,true,false]) expect(collector.done?).to be(false) diff --git a/spec/supervisor_spec.rb b/spec/supervisor_spec.rb index 19c0f120..7b98f7e6 100644 --- a/spec/supervisor_spec.rb +++ b/spec/supervisor_spec.rb @@ -20,6 +20,29 @@ } } + let(:supervisor) { + RSMP::Supervisor.new( + supervisor_settings: supervisor_settings, + log_settings: log_settings + ) + } + + let(:endpoint) { + Async::IO::Endpoint.tcp("127.0.0.1", supervisor.supervisor_settings['port']) + } + + let(:socket) { + endpoint.connect + } + + let(:stream) { + Async::IO::Stream.new(socket) + } + + let(:protocol) { + Async::IO::Protocol::Line.new(stream,RSMP::Proxy::WRAPPING_DELIMITER) # rsmp messages are json terminated with a form-feed + } + it 'runs without options' do expect { RSMP::Supervisor.new({}) }.not_to raise_error end @@ -31,7 +54,7 @@ ) end - it 'starts' do + it 'accepts connections' do # mock SecureRandom.uui() so we get known message ids: allow(SecureRandom).to receive(:uuid).and_return( '1b206e56-31be-4739-9164-3a24d47b0aa2', @@ -44,21 +67,11 @@ '16ec49e4-6ac1-4da6-827c-2a6562b91731' ) - supervisor = RSMP::Supervisor.new( - supervisor_settings: supervisor_settings, - log_settings: log_settings - ) - Async do |task| + async_context do |task| task.async do supervisor.start end - # create stream - endpoint = Async::IO::Endpoint.tcp("127.0.0.1", supervisor.supervisor_settings['port']) - socket = endpoint.connect - stream = Async::IO::Stream.new(socket) - protocol = Async::IO::Protocol::Line.new(stream,RSMP::Proxy::WRAPPING_DELIMITER) # rsmp messages are json terminated with a form-feed - # write version message protocol.write_lines '{"mType":"rSMsg","type":"Version","RSMP":[{"vers":"3.1.5"}],"siteId":[{"sId":"RN+SI0001"}],"SXL":"1.0.15","mId":"8db00f0a-4124-406f-b3f9-ceb0dbe4aeb6"}' @@ -77,16 +90,11 @@ protocol.write_lines JSON.generate("mType"=>"rSMsg","type"=>"MessageAck","oMId"=>version["mId"],"mId"=>SecureRandom.uuid()) # supervisor should see our tcp socket and create a proxy - proxy = supervisor.wait_for_site "RN+SI0001", 0.1 + proxy = supervisor.wait_for_site "RN+SI0001", timeout: 0.1 + proxy.wait_for_state(:ready, timeout: 0.1) expect(proxy).to be_an(RSMP::SiteProxy) expect(proxy.site_id).to eq("RN+SI0001") - - - expect { - proxy.wait_for_state(:ready, 0.1) - }.not_to raise_error - # verify log content got = supervisor.archive.by_level([:log, :info]).map { |item| item[:text] } expect( got ).to match_array([ @@ -98,8 +106,6 @@ "Received MessageAck for Version 1b20", "Connection to site RN+SI0001 established, using core 3.1.5, tlc 1.0.15" ]) - - supervisor.stop end end end diff --git a/spec/task_spec.rb b/spec/task_spec.rb new file mode 100644 index 00000000..b044ff9f --- /dev/null +++ b/spec/task_spec.rb @@ -0,0 +1,71 @@ +RSpec.describe RSMP::Task do + class TaskTest + include RSMP::Task + def run + loop do + @task.sleep 1 + end + end + end + + let(:obj) { TaskTest.new } + + describe 'initialize' do + it 'does not create task' do + expect(obj.task).to be_nil + end + end + + describe 'start' do + it 'creates task' do + async_context do |task| + obj.start + expect(obj.task).to be_a(Async::Task) + expect(obj.task.status).to eq(:running) + end + end + + it 'calls run' do + async_context do |task| + expect(obj).to receive(:run) + obj.start + end + end + + end + + describe 'start' do + it 'can be called several times' do + async_context do |task| + obj.start + obj.start + expect(obj.task).to be_a(Async::Task) + expect(obj.task.status).to eq(:running) + end + end + end + + describe 'stop' do + it 'stops the task' do + async_context do |task| + obj.start + obj.stop + expect(obj.task).to be_nil + expect(obj.status).to be_nil + end + end + end + + describe 'restart' do + it 'raises Restart' do + async_context do |task| + obj.start + expect(obj.task).to be_a(Async::Task) + expect(obj.task.status).to eq(:running) + + first_task = obj.task + expect { obj.restart }.to raise_error(RSMP::Restart) + end + end + end +end \ No newline at end of file diff --git a/test.rb b/test.rb deleted file mode 100644 index 6ab6fe14..00000000 --- a/test.rb +++ /dev/null @@ -1,27 +0,0 @@ -class A - def go &block - @block = block # block will be converted automatically to a Proc - indirect - end - - def call - @block.call - end - - def indirect - call - end - -end - -a = A.new - -a.go do - break # this is ok. break causes the block to exit, and the encasing method to return - go() will exit -end - -# this raises an error. the block we passed to go() will be called again, and it tries to break -# but we're not inside a method we can exit from - - -a.indirect