A Python framework for controlling and processing experiments built upon the CSI:Cobot Digital Twin.
- Generate twin configuration files from Python dataclasses
- Collect and process messages generated during a twin run
- Define temporal logic monitors to check for specific situations in a run
- Wrap experiments to support failure, retries, and archiving results
- Compute coverage of the observed situations across multiple runs
There is an experimental release of the library available on test.pypi.org
:
pip install py-csi-cobotics
The CSI cobotics framework aim to control an instance of the CSI:Cobot Digital Twin, and process the resulting outputs to identify occurrences of specific events or situations. Through the example of an industrial use case, we introduce the required steps, from exposing the configuration points of the twin instance, to collecting a trace of events in the system, and to identifying hazardous situations. Note that the process can be applied in parts for different kind of analyses.
This documentation will refer to the example available under the experiments/tcx_safety
, defining an experimental
setup built on top of an industrial case study. The case study comprises a human operator, an automated robotic arm, and
a spot welder. The operator and the arm interact at a shared bench, the arm then moves to a spot welder to process a
component before returning to the bench. A safety controller ensures that interferences from the operator, reaching at
the shared bench or entering the cell, do not result in hazardous situations.
The operator is set to follow a pre-established path through 5 different, ordered waypoints. The experiment exposes for
configuration the time spent by the operator at each waypoint, allowing for operator interferences at various stage of
the welding process. An example of randomised search linking all components of the framework and using this setup is
included in experiments/tcx_safety/search_ran.py
.
The first step in interacting with a CSI: Cobot Digital Twin instance is providing entry points to configure the twin
execution. The ConfigurationManager
aims to convert Python structures capturing the configuration of the system, and
the JSON-based configuration format of the twin. It supports a variety of structures (dataclasses.dataclass
types, csi.configuration.JsonSerializable
implementations, JSON objects) as inputs. Nesting and mixing structure types
is supported and the value of encoded fields will itself be encoded by the ConfigurationManager
. Additional meta-data
might be required to map input fields' names into their twin counterparts.
Consider a twin instance exposing two waypoints which duration property captures how long the operator should wait at
the waypoint. The scene configuration is declared as a hierarchy of dataclass
:
import dataclasses
# Configuration hierarchy declaration
@dataclasses.dataclass
class Waypoint:
duration: float = 1.0
@dataclasses.dataclass
class SceneConfiguration:
# twin meta-data
timestamp: datetime = dataclasses.field(default_factory=lambda: datetime.now())
version: str = "0.0.0.2"
# scene configuration
wp_start: Waypoint = dataclasses.field(default_factory=Waypoint)
wp_exit: Waypoint = dataclasses.field(default_factory=Waypoint)
# Declare mapping between Python field names and their configuration counterpart
_encoded_fieldnames = {
"timestamp": "$Generated",
"version": "$version",
"wp_start": "/Operator Controller/Waypoint Bench Entrance/Waypoint",
"wp_exit": "/Operator Controller/Waypoint Exit/Waypoint",
}
A configuration for the twin can be initialised and manipulated as a Python object:
configuration = SceneConfiguration()
configuration.wp_start.duration = 5.
configuration.wp_exit.duration = 10.
The ConfigurationManager
can then generate a twin-compliant configuration file for a specific configuration instance
of the configuration:
from csi import ConfigurationManager
ConfigurationManager().save(configuration, "/csi/build/configuration.json")
The ConfigurationManager
can further load serialised configuration into a specific configuration type declaration:
from csi import ConfigurationManager
configuration = ConfigurationManager(SceneConfiguration).load("/csi/build/configuration.json")
The resulting JSON file includes all the required meta-data for use by the twin, and the fields named as instructed. Values omitted from the JSON configuration file, such as the waypoint positions, will use the defaults built into the twin instance.
{
"$Generated": "12/12/2021 12:12:12",
"$version": "0.0.0.2",
"/Operator Controller/Waypoint Bench Entrance/Waypoint": {
"Waypoint.duration": 5.0
},
"/Operator Controller/Waypoint Exit/Waypoint": {
"Waypoint.duration": 10.0
}
}
The configuration file for a twin instance or build is located in Unity's streaming assets folder. Assuming the twin
instance is located under the /build
folder, the configuration file defaults to
/build/unity_Data/StreamingAssets/CSI/Configuration/configuration.json
. Runs of a same build copy will rely on the
same configuration file, and runs under different configurations should either be performed in sequence or by first
duplicating the build copy for each configuration file.
By default, the ConfigurationManager
uses the same name field name in Python and in the JSON configuration. The twin
configuration file format uses long hierarchical names to uniquely identify configurable items in a specific instance,
names which might not follow the Python naming conventions. The ConfigurationManager
thus looks for the
the _encoded_fieldnames: dict[str, str]
metadata dictionary, if it exists, to indicate how specific fields should be
mapped or renamed from Python to JSON. _encoded_fieldnames
is expected to be declared as a class attribute for
configuration-encoding classes.
As an example, consider the following Waypoint
declaration:
import dataclasses
@dataclasses.dataclass
class Vector3D:
x: float = 0.0
y: float = 0.0
z: float = 0.0
@dataclasses.dataclass
class Waypoint:
duration: float = 1.0
position: Vector3D = dataclasses.field(default_factory=Vector3D)
_encoded_fieldnames = {
"position": "Entity.position",
"duration": "Waypoint.duration",
}
# A Waypoint will be encoded as:
# {
# "Entity.position": {
# "x": 0.0,
# "y": 0.0,
# "z": 0.0
# },
# "Waypoint.duration": 1.0
# }
The ConfigurationManager
supports the generation of a JSON object from
a Data Class instance. Each field declared in the dataclass
will
be itself converted into an appropriate JSON value. The name of the encoded field defaults to the dataclass
field
name, unless specified otherwise through the supported metadata.
import dataclasses
@dataclasses.dataclass
class Vector3D:
x: float = 0.0
y: float = 0.0
z: float = 0.0
@dataclasses.dataclass
class Entity:
position: Vector3D = dataclasses.field(default_factory=Vector3D)
rotation: Vector3D = dataclasses.field(default_factory=Vector3D)
_encoded_fieldnames = {
"position": "Entity.position",
"rotation": "Entity.eulerAngles",
}
@dataclasses.dataclass
class Operator(Entity):
height: float = dataclasses.field(default=1.75)
_encoded_fieldnames = {
"height": "Operator.height",
"position": "Entity.position",
"rotation": "Entity.eulerAngles",
}
@dataclasses.dataclass
class WorldData:
operator: Operator = dataclasses.field(default_factory=Operator)
robot: Entity = dataclasses.field(default_factory=Entity)
_encoded_fieldnames = {
"operator": "/Operators/Tim/Operator",
"robot": "/ur10/UR10",
}
The ConfigurationManager
supports the generation of JSON value from JsonSerializable
objects, implementing
the to_json
and from_json
primitives. Those can further rely on the ConfigurationManager
methods to encode their
fields.
from csi import ConfigurationManager
from csi.configuration import JsonSerializable
class ComplexFloat(JsonSerializable):
_value: float
def to_json(self):
return self._value
@classmethod
def from_json(cls, obj):
c = ComplexFloat()
c._value = obj
return c
class ComplexWaypoint(JsonSerializable):
duration: ComplexFloat
def to_json(self):
return {
"d": ConfigurationManager().encode(self.duration),
}
@classmethod
def from_json(cls, obj):
w = ComplexWaypoint()
w.duration = ConfigurationManager(ComplexFloat).decode(obj["d"])
return w
Monitoring encapsulates all steps required to define the events and metrics exposed by a twin instance, and the
definition of situations to be monitored in the environment. This is the core of the csi.situation
module. Situations
capture states of interest in the system, and they can enforce requirements to be maintained or hazardous configurations
to be avoided over its lifetime. They are concretely defined as temporal logic predicates, through
the py-metric-temporal-logic library, combining components
using boolean or temporal operators. As an example the twin might expose the velocity of different autonomous agents
throughout the simulation, and their proximity to various obstacles. Specific monitors can be defined to ensure that
agents in proximity to each other do slow down to reduce the likelihood of accidents, and that they never exceed their
speed limit.
Components are the building blocks of a situation. They represent individual metrics which evolution over time might
satisfy a specific situation, a state of interest in the system. Related components are defined under the same context,
and contexts themselves may be nested in other contexts. Components of a system are thus defined as a type hierarchy
built on top of the Component
and Context
base types.
from csi.situation import Context, Component
class Position(Context):
in_cell = Component()
in_bench = Component()
in_tool = Component()
class Entity(Context):
position = Position()
is_moving = Component()
velocity = Component()
class Welder(Entity):
is_active = Component()
class Cell(Context):
welder = Welder()
robot = Entity()
class IndustrialContext(Context):
operator = Entity()
cell = Cell()
Situations capture states of interest in the system, requirements that should be maintained across its lifecycles,
occurrences which should be avoided, or combinations of events that need to be encountered during testing. Situations
are formalised as temporal logic predicates which condition components through boolean and temporal operators, using
the mtl
library. Component
can be combined into complex boolean or temporal operators using the syntax proposed in
py-metric-temporal-logic.
i = IndustrialContext()
# Hazard: the operator is in the cell while the welder is active
h = ((i.operator.position.in_cell | i.operator.position.in_tool) & i.cell.welder.is_active).eventually()
# Req: only an active robot at the tool can trigger the welder
r = i.cell.welder.is_active.implies(i.cell.robot.position.in_tool)
# Check: the welder never moves
c = (~i.cell.welder.is_moving).always()
In some instances, it might be useful to define a situation for any instance of a context, irrespective of how it can be
nested or reused in other contexts. The purpose of the Alias
primitive is to make such self-referencing definitions
more convenient.
from csi.situation import Alias
Position.reaches_tool = Alias((~Position.in_tool) & (Position.in_tool >> 1))
# Automatically defines the following:
# - i.operator.position.reaches_tool
# - i.cell.robot.reaches_tool
# - i.cell.welder.reaches_tool
Welder.check_setup = Alias(~Welder.is_moving & Welder.position.in_tool)
assert i.cell.welder.check_setup == (~i.cell.welder.is_moving & i.cell.position.in_tool)
A domain captures the possible values or ranges thereof for a component. By defining the domain of a component, one can assess the portion of said domain covered by a set of observations, and whether additional observations might be of interest. The notion of coverage extends to sets of components and their domains; covering a set of components requires covering all value combinations of their domains. A domain can be defined for a component upon declaration. A domain only requires to define a conversion operation, from an observed value to a domain one, capturing in which value/range thereof the observation fits, and a length if applicable. The framework offers a number of primitives to define common domains:
from csi.situation import domain_values, domain_identity, domain_threshold_range
from csi.situation import Context, Component
class Entity(Context):
# Domain values, exact set of values of interest
is_moving = Component(domain_values({True, False}))
# Identity domain, unbounded, all encountered values are recorded
status = Component(domain_identity())
# Range domain, the interval [0; 16) is divided into equal-length (0.25) intervals
velocity = Component(domain_threshold_range(0.0, 16.0, 0.25, upper=True))
Note that the domain is mostly used in the computation of coverage metrics for one or more event traces. Event traces are collected by processing the outputs of a twin instance and discussed in the following sections.
A run of a Digital Twin instance, allows the user to evaluate the response of the system under a given configuration. It
does require some care to ensure the appropriate files and configuration are generated to execute the run, and to
collect the data produced by the run for further processing. The Experiment
class aims to ease the process of running
multiple instances of the twin and collecting the resulting traces. Running an Experiment
generates a record of the
run, with the experiment configuration, status of the run, and a working directory with the generated data.
An experiment is defined by overriding the execute
method, introducing all required steps to produce the experimental
data. We recommend the use of relative paths for files generated by the experiment, such as event traces; each run of
the experiment will occur in its separate, dedicated working directory. The experiment configuration is expected to be
compliant with the csi.ConfigurationManager
as it is serialised into a JSON configuration file as part of the
experiment backup. Experiments stored in the same directory are part of the same Repository
. A repository provides
convenient access to all experiments, runs, or successful runs thereof.
from csi import Experiment
class SkeletonTwinRunner(Experiment):
def execute(self) -> None:
"""Run digital twin container with the specified configuration"""
# Generate configuration file
# ...
# Run twin container
# ...
# Generate event trace
# ...
# Monitor for hazard occurrences
# ...
e = SkeletonTwinRunner()
e.configuration = SceneConfiguration()
e.run(retries=5)
Due to the nature of the twin launcher, configuration and trace files are located in the build folder. Concurrent runs from the same build might thus result in conflicting experiments, and results being overwritten. It is advised to use a container to execute the Digital Twin to guarantee isolation, or copy the build folder for each concurrent thread.
Each run of the Digital Twin produces a record of the messages exchanged between the different entities modelled by the system. This record carries little information regarding the semantic of the messages, their contents, or their relations to the components required for monitoring situations of interests. It is up to the user to provide some meaning to said messages for a given twin instance, based on their type, contents, or communication channels.
The csi.twin
module provides primitives to ease the access to a message record. The DataBase
type covers the basic
interaction with the message database. from_table
iterates over all messages in the specified tables, returning each
as a separate object with the same fields as the original message. Note that the message types and contents may vary
based on your twin instance. It is recommended to assess the available messages on a trial run of each instance.
from csi.twin import DataBase, from_table
db = DataBase("/path/to/db")
for m in from_table(db, "movablestatus"):
print("At time {}, Entity {} is {}moving".format(m.timestamp, m.entity, "not " if not m.is_moving else ""))
The csi.situation
module defines the notion of an event trace to track the values of components' over time, and assess
the occurrence of specific situations. Entries are indexed by their component. Each new value should be recorded with
the instant, as a timestamp, at which the change occurs.
from csi.twin import DataBase, from_table
from csi.situation import Trace
db = DataBase("/path/to/db")
trace = Trace()
P = IndustrialContext()
# Define fixed-value constraint
trace[P.constraints.cobot.velocity.in_bench] = (0.0, 15.0)
# Record operator movement in cell
trace[P.operator.is_moving] = ( 0.0, False)
trace[P.operator.is_moving] = ( 1.0, True)
trace[P.operator.is_moving] = ( 4.0, True)
trace[P.operator.is_moving] = (10.0, False)
Situations can be evaluated against a trace to assess whether they occur or not. This is achieved through a monitor, which operators on a set of situations for evaluation, and extracting relevant properties such as the set of components used throughout said situations.
from csi.situation import Monitor
# An operator in movement always stops within 5.0 time units
c = (P.operator.is_moving.implies((~P.operator.is_moving).eventually(0, 5.0))).always()
# The operator eventually moves
d = P.operator.is_moving.eventually()
m = Monitor({c, d})
assert m.evaluate(trace)[c] == True
assert m.evaluate(trace)[d] == True
The experimental EventCombinationsRegistry
maintains a record of all combinations of values encountered for a given
set of components. It can process traces to extract such combinations and provide some primitives to compute the
combined components' domain coverage.
WARNING: This is an experimental feature which is neither pleasant to use nor reliable. Consider the example ad-hoc script provided by
experiments/tcx_safety/serialisation_dataset.py
instead.
from csi.situation import EventCombinationsRegistry
e = EventsCombinationsRegistry()
# Define the tracked components' domain (reuse in-situ domain definitions where possible)
e.domain[P.operator.is_moving] = P.operator.is_moving.domain
e.domain[P.operator.velocity] = P.operator.velocity.domain
# Register the events' combinations encountered by a trace
e.register(trace)
print(e.coverage)
@InProceedings{sassi,
author="Lesage, Benjamin and Alexander, Rob",
title="SASSI: Safety Analysis Using Simulation-Based Situation Coverage for Cobot Systems",
booktitle="Computer Safety, Reliability, and Security (Proceedings of SafeComp)",
year="2021",
publisher="Springer International Publishing",
pages="195--209",
isbn="978-3-030-83903-1"
}