Python API for the oRatio solver.
Recursively clone this repository through git clone --recurse-submodules https://github.com/ratioSolver/PyRatio
.
Compile and install PlExA.
cd extern/plexa
mkdir build
cd build
cmake -DCOMPUTE_NAMES=ON ..
sudo make install
Go back at PyRatio level and compile the wheel using python -m build
. Note that the build
module is required and can be installed through pip install build
.
Install the wheel using pip install
with the generated .whl
files.
The following example shows how to use the Python API for solving a problem instance.
from oRatio import Solver
s = Solver()
if s.read(["example.rddl"]) and s.solve():
print('hurray!! we have found a solution..')
tls = s.extract_timelines()
s.dispose()
In order to use the Python API, specifically, just import the Solver
from the oRatio
package.
Create a new instance of the solver. Read a RiDDLe script, or a list of RiDDLe files, and call the solve
method.
Both the read
and the solve
methods return a bool indicating, respectively, whether a trivial inconsistency has been detected in the problem and whether a solution has been found.
Once solved, it is possible to extract the generated timelines and to examine the solution.
Remember, once done with your solver, to call the dispose
method so as to release resources.
The read
and the solve
methods populate the internal data structures of the solver.
from oRatio import Solver
s = Solver()
if s.read('real a; a >= 5.0;') and s.solve():
print(s.exprs['a'])
s.dispose()
As an example, the above code reads a simple RiDDLe snippet which creates a real variable a
and constraints it to be greater or equal to 5.0
.
Once solved, it is possible to access to the a
variable through the exprs
attribute of the solver's instance.
The above code, in particular, prints 5 [5, +inf]
on the console, indicating the value assigned to the a
variable by the solver (i.e., 5
) and an estimate of the allowed values bounds (i.e., [5, +inf]
).
Once a problem has been solved, it is possible to execute the generated solution and, possibly, to dynamically update it along the execution.
To this end, an Executor
has to be created and a listener must be registered to it.
Similar to a callback, the listener allows to receive information from the executor about the tasks to be executed.
The following listener simply prints the starting/ending tasks on the console.
from oRatio import Executor, ExecutorListener, ExecutorState, Atom
class ExecListener(ExecutorListener):
def __init__(self, executor: Executor):
self.executor = executor
def executor_state_changed(self, state: ExecutorState) -> None:
print('executor state changed: ' + str(state))
def tick(self, current_time: Rational) -> None:
print('current time: ' + str(current_time))
def starting_atoms(self, atoms: Sequence[Atom]) -> None:
print('checking starting atoms ' + str(atoms))
def start_atoms(self, atoms: Sequence[Atom]) -> None:
print('starting atoms ' + str(atoms))
def ending_atoms(self, atoms: Sequence[Atom]) -> None:
print('checking ending atoms ' + str(atoms))
def end_atoms(self, atoms: Sequence[Atom]) -> None:
print('ending atoms ' + str(atoms))
Note that while the start_atoms
(end_atoms
) methods are meant to start (stop) the execution of tasks, the starting_atoms
(ending_atoms
) methods are meant to ask for the ability to start (end) the tasks.
In particular, within these methods, it is possible to delay the start (termination) of tasks by invoking the executor's dont_start_yet
(dont_end_yet
) method, passing the atoms whose start (termination) is to be delayed as a parameter.
def starting_atoms(self, atoms: Sequence[Atom]) -> None:
for atm in atoms:
if not_ready(atm):
self.executor.dont_start_yet(atm)
Finally, the following code shows how to register the listener to the executor.
s = Solver()
e = Executor(s)
e.add_executor_listener(ExecListener(e))
if not s.read(['example.rddl']) or not s.solve():
print('the problem is unsolvable.. :(')
sys.exit(-1)
Notice that since the executor should detect changes from the solver's state, it must be created before reading and solving any problem.
Any solution adaptation is made within the tick
method.
By invoking the start_execution
method, the tick
method, after the adaptation, advances also the current time, requesting listeners to start (end) scheduled tasks. The amount of advanced time is defined by an optional parameter (default to one) sent to the Executor
's constructor.
The tick
method can be invoked, for example, every second, by means of a scheduler.
import time
e.start_execution()
start_time = time.time()
while True:
e.tick()
time.sleep(1.0 - ((time.time() - start_time) % 1.0))
In order to release the resources, don't forget to call, once done, the executor's and the solver's dispose
methods, in this specific order.
e.dispose()
s.dispose()