Skip to content

Commit

Permalink
Refactor async task usage, use new Task module. Fixes to the TLC emul…
Browse files Browse the repository at this point in the history
…ator.
  • Loading branch information
emiltin committed Jan 31, 2022
1 parent 540368c commit 8082ebe
Show file tree
Hide file tree
Showing 38 changed files with 931 additions and 453 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ $ git submodule update # fetch submodules

Alternatively, you can pass --recurse-submodules to the git clone command, and it will automatically initialize and update each submodule in the repository.

## Usage
## Usage
### Site and Supervisor
The RSMP::Site and RSMP::Supervisor classes can be used to run a RSMP site.

Expand Down Expand Up @@ -172,7 +172,7 @@ Use the ```tlc``` site type to run an emulation of a traffic light controller. T
### CLI help and options.
Use ```--help <command>``` to get a list of available options.

Use ```--config <path>``` to point to a .yaml config file, controlling things like IP adresses, ports, and log output. Examples of config files can be found the folder ```config/```.
Use ```--config <path>``` to point to a .yaml config file, controlling things like IP adresses, ports, and log output. Examples of config files can be found the folder ```config/```.

## Tests
### RSpec
Expand Down
2 changes: 1 addition & 1 deletion bin/console
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env ruby

# Make IRB run inside Async, so async task
# will run the the background.
# will run the the background.

require 'bundler/setup'
require 'irb'
Expand Down
4 changes: 2 additions & 2 deletions documentation/collecting_message.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Collection
You often need to collect messages or responses. The collector classes are used to collect message asyncronously. Other tasks continue until the collection completes, time outs or is cancelled.

A collector can collect ingoing and/or outgoing messages.
A collector can collect ingoing and/or outgoing messages.

An object that includes the Notifier module (or implements the same functionality) must be provided when you construct a Collected. The collector will attach itself to this notifier when it starts collecting, to receive messages. The SiteProxy and SupervisorProxy classes both include the Notifier module, and can therefore be used as message sources.

Expand Down Expand Up @@ -32,7 +32,7 @@ outgoing: Whether to collect outgoing messages. Defaults to true
component: An RSMP component id.

### Collecting
Use collect() to start collecting and wait for completion or timeout. The status will be returned.
Use collect() to start collecting and wait for completion or timeout. The status will be returned.

```ruby
result = collector.collect # => :ok, :timeout or :cancelled
Expand Down
164 changes: 164 additions & 0 deletions documentation/tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Tasks

## Concurrency
The Async gem (which uses Rubys concurrent Fibers are the new Rubhy Fiber scheduler) is used to handle concurrency.

When you use a site or a supervisor, it runs asyncronously so you can run several concurrently, or do other things concurrently, like sending messages and waiting for reponses.

```
Site - SupervisorProxy - Reader < < < Writer - SiteProxy - Supervisor
\ Writer > > > Reader /
```

Running asyncronously means that the site/supervisor network handling is run in an async task.

Classes don't inherit for Async::Task. Instead they include task as instance variables. This means that the hierachy of objects and tasks can be different.

Async task are use for handle the the following concurrently:

- Running multiple sites or supervisors concurrently
- Running multiple connections concurrently
- Waiting for messages
- Waiting for connections or states

## Proxies
A supervisor waits for sites to connect. Each time a site connects, a proxy is created and run to handle the connection.

A site connects to one of more supervisors. A proxy is created and run to handle each connection.

A site can connect to one or more supervisor. It creates proxy for each and runs them.

When the proxy is run, it creates an async task to handle the communication. The proxy task will be a sub task of the site/supervisor task.

A proxy can use sub tasks to handle watchdog timers, etc.

## The run() cycle
The Task modules defines a life cycle for handling async tasks.

You first call `start`. If `@atask` already exists, it will return immedatiately.
Otherwise an async task is created and stored in `@task`, and `run` is called inside this task, to handle any long-running processes. The call to `start` returns immediately.

If you want to stop the task, call `stop`. If `@task`doesn't exist, it will return. Otherwise it wil call `shutdown`which will terminate the task stored in `@task` as well as any subtasks.

## Proxies and run()
Proxies build on the Task functionality by handling RSMP communication via a TCP socket. The TCP socket can be open or closed. The RSMP communication first goes through a handshake sequence before being ready. This is encapsulated in the `status` attribute, which can be one of `:disconnected`, `:connected` or `:ready`

Proxies implement `connect` and `close` for starting and stopping commununication, but supervisor and site proxies are a bit different. A supervisor proxies connects actively to a site proxy, whereas a site proxy waits for the supervisor proxy to connect. This means they are constructed a bit differently.

A supervisor proxy is created at startup, and is responsible for creating the tcp socket and connecting to the supervisor.

A site proxy is also created at startup, but the socket is created in the supervisor by `Aync::Endpoint#accept`when a site connects.


## Stopping tasks
Be aware that if a task stops itself, code after the call to stop() will not be run - unless you use an ensure block:

```ruby
require 'async'

Async do |task|
task.stop
puts "I just stopped" # this will not be reaced, because the task was stopped
end

Async do |task|
task.stop
ensure
puts "I just stopped" # this will be reached
end
```

This is important to keep in mind, e.g. when a timer task finds an acknowledgement was not received in time, and then closes the connection by calling stop() in the Proxy, which will thne in turn stop the timer task.


Object hierarchy:

```
Supervisor 1
site proxy 1
site proxy 2
Supervisor 2
site proxy 1
site proxy 2
Site 1
supervisor proxy 1
supervisor proxy 2
```

Task hierachy:

```
supervisor parent
accepting connections
incoming connection 1
incoming connection 2
reader
timer
site parent
tlc site
connected
tlc timer
```

The task hierachy matters when you stop a task or iterate on subtasks. Note that calling `Task#wait`
does not wait for subtasks, whereas Task#stop stops all subtasks.





Async block usage

```ruby
# new design:

# running a site or supervisor
# returns immedately. code will run inside an async task
site.run
supervisor.run

# when a site connects to supervisors,
# async task are implicitely created
@socket = @endpoint.connect

# when a supervisor accepts incoming connections from sites,
# async task are implicitely created
@endpoint.accept

# when you wait for messages
...



# current design:
Async do |task| # Node#start

@endpoint.accept do |socket| # Supervisor#start_action,implicit task creation
@socket = @endpoint.connect # SupervisorProxy#connect, implicit task creation

@task.async do |task| # Site#start_action

@reader = @task.async do |task| # Proxy#start_reader
@timer = @task.async do |task| # Proxy#start_reader

task = @task.async { |task| yield task } # Proxy#send_and_optionally_collect
@timer = @task.async do |task| # TrafficControllerSite#start_timer
```


Task assignment

```ruby
@task = options[:task] # Node#initialize
@task = task # Node#do_start

@task = options[:task] # Collector#initialize
@task = task # Collector#use_task

```



1 change: 1 addition & 0 deletions lib/rsmp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
require 'async/queue'

require 'rsmp/rsmp'
require 'rsmp/task'
require 'rsmp/deep_merge'
require 'rsmp/inspect'
require 'rsmp/logging'
Expand Down
6 changes: 3 additions & 3 deletions lib/rsmp/archive.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ def inspect

def self.prepare_item item
raise ArgumentError unless item.is_a? Hash

cleaned = item.select { |k,v| [:author,:level,:ip,:port,:site_id,:component,:text,:message,:exception].include? k }
cleaned[:timestamp] = Clock.now
if item[:message]
cleaned[:direction] = item[:message].direction
cleaned[:direction] = item[:message].direction
cleaned[:component] = item[:message].attributes['cId']
end

Expand Down Expand Up @@ -54,7 +54,7 @@ def add item
@items.shift
end
end

private

def find options, &block
Expand Down
31 changes: 27 additions & 4 deletions lib/rsmp/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def version
desc "site", "Run RSMP site"
method_option :config, :type => :string, :aliases => "-c", banner: 'Path to .yaml config file'
method_option :id, :type => :string, :aliases => "-i", banner: 'RSMP site id'
method_option :supervisors, :type => :string, :aliases => "-s", banner: 'ip:port,... list of supervisor to connect to'
method_option :supervisors, :type => :string, :aliases => "-s", banner: 'ip:port,... list of supervisor to connect to'
method_option :log, :type => :string, :aliases => "-l", banner: 'Path to log file'
method_option :json, :type => :boolean, :aliases => "-j", banner: 'Show JSON messages in log'
method_option :type, :type => :string, :aliases => "-t", banner: 'Type of site: [tlc]'
Expand Down Expand Up @@ -60,19 +60,35 @@ def site
site_class = RSMP::Site
end
end
site_class.new(site_settings:settings, log_settings: log_settings).start
Async do |task|
task.annotate 'cli'
loop do
begin
site = site_class.new(site_settings:settings, log_settings: log_settings)
site.start
site.wait
rescue RSMP::Restart
site.stop
end
end
end
rescue Interrupt
# cntr-c
rescue RSMP::Schemer::UnknownSchemaTypeError => e
puts "Cannot start site: #{e}"
rescue RSMP::Schemer::UnknownSchemaVersionError => e
puts "Cannot start site: #{e}"
rescue Psych::SyntaxError => e
puts "Cannot read config file #{e}"
rescue Exception => e
puts "Uncaught error: #{e}"
puts caller.join("\n")
end

desc "supervisor", "Run RSMP supervisor"
method_option :config, :type => :string, :aliases => "-c", banner: 'Path to .yaml config file'
method_option :id, :type => :string, :aliases => "-i", banner: 'RSMP site id'
method_option :ip, :type => :numeric, banner: 'IP address to listen on'
method_option :ip, :type => :numeric, banner: 'IP address to listen on'
method_option :port, :type => :string, :aliases => "-p", banner: 'Port to listen on'
method_option :log, :type => :string, :aliases => "-l", banner: 'Path to log file'
method_option :json, :type => :boolean, :aliases => "-j", banner: 'Show JSON messages in log'
Expand Down Expand Up @@ -110,7 +126,14 @@ def supervisor
log_settings['json'] = options[:json]
end

RSMP::Supervisor.new(supervisor_settings:settings,log_settings:log_settings).start
Async do |task|
task.annotate 'cli'
supervisor = RSMP::Supervisor.new(supervisor_settings:settings,log_settings:log_settings)
supervisor.start
supervisor.wait
end
rescue Interrupt
# ctrl-c
rescue RSMP::ConfigurationError => e
puts "Cannot start supervisor: #{e}"
end
Expand Down
2 changes: 1 addition & 1 deletion lib/rsmp/collect/state_collector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module RSMP
# matches this input:
#
# {"cCI"=>"M0104", "n"=>"month", "v"=>"9", "age"=>"recent"}
#
#
# And the result is stored as:
# {
# {"cCI"=>"M0104", "cO"=>"setDate", "n"=>"month", "v"=>/\d+/} =>
Expand Down
6 changes: 3 additions & 3 deletions lib/rsmp/components.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module RSMP
module Components
attr_reader :components

def initialize_components
@components = {}
end
Expand All @@ -25,10 +25,10 @@ def setup_components settings

def check_main_component settings
unless settings['main'] && settings['main'].size >= 1
raise ConfigurationError.new("main component must be defined")
raise ConfigurationError.new("main component must be defined")
end
if settings['main'].size > 1
raise ConfigurationError.new("only one main component can be defined, found #{settings['main'].keys.join(', ')}")
raise ConfigurationError.new("only one main component can be defined, found #{settings['main'].keys.join(', ')}")
end
end

Expand Down
8 changes: 4 additions & 4 deletions lib/rsmp/convert/export/json_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def self.output_json item

def self.build_value item
out = {}

if item['description']
out["description"] = item['description']
end
Expand Down Expand Up @@ -95,7 +95,7 @@ def self.output_alarms out, items
}
end
json = {
"properties" => {
"properties" => {
"aCId" => { "enum" => items.keys.sort },
"rvs" => { "items" => { "allOf" => list } }
}
Expand Down Expand Up @@ -175,7 +175,7 @@ def self.output_root out
}
]
}
out["sxl.json"] = output_json json
out["sxl.json"] = output_json json
end

def self.generate sxl
Expand All @@ -192,7 +192,7 @@ def self.write sxl, folder
out.each_pair do |relative_path,str|
path = File.join(folder, relative_path)
FileUtils.mkdir_p File.dirname(path) # create folders if needed
file = File.open(path, 'w+') # w+ means truncate or create new file
file = File.open(path, 'w+') # w+ means truncate or create new file
file.puts str
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/rsmp/convert/import/yaml.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def self.convert yaml
commands: {}
}

yaml['objects'].each_pair do |type,object|
yaml['objects'].each_pair do |type,object|
object["alarms"].each { |id,item| sxl[:alarms][id] = item }
object["statuses"].each { |id,item| sxl[:statuses][id] = item }
object["commands"].each { |id,item| sxl[:commands][id] = item }
Expand Down
3 changes: 0 additions & 3 deletions lib/rsmp/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,4 @@ class RepeatedAlarmError < Error

class RepeatedStatusError < Error
end

class TimestampError < Error
end
end
2 changes: 1 addition & 1 deletion lib/rsmp/inspect.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Costume inspect, to reduce noise
#
#
# Instance variables of classes starting with Async or RSMP are shown
# with only their class name and object id, which reduces output,
# especially for deep object structures.
Expand Down
Loading

0 comments on commit 8082ebe

Please sign in to comment.