-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
129 lines (109 loc) · 4.13 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from fractions import Fraction
from pathlib import Path
from typing import Any, Dict, List, Optional
from unified_planning.engines.results import (
LogMessage,
PlanGenerationResultStatus,
PlanGenerationResult,
)
from unified_planning.io import PDDLWriter
from unified_planning.shortcuts import Problem
from tyr.planners.model.pddl_planner import TyrPDDLPlanner
class OpticPlanner(TyrPDDLPlanner):
"""
The Optic planner wrapped into local PDDL planner.
NOTE: In Anytime mode, the planner will only return the last plan found.
"""
def _get_cmd(
self,
domain_filename: str,
problem_filename: str,
plan_filename: str,
) -> List[str]:
binary = (Path(__file__).parent / "optic-cplex").resolve().as_posix()
return f"{binary} -N {domain_filename} {problem_filename}".split()
def _get_anytime_cmd(
self,
domain_filename: str,
problem_filename: str,
plan_filename: str,
) -> List[str]:
return (
" ".join(self._get_cmd(domain_filename, problem_filename, plan_filename))
.replace("-N", "-n")
.split()
)
def _parse_planner_output(self, writer, planner_output):
assert isinstance(self._writer, PDDLWriter)
for l in planner_output.splitlines():
if self._starting_plan_str() in l:
writer.storing = True
elif writer.storing and "" == l:
plan_str = "\n".join(writer.current_plan)
plan = self._plan_from_str(
writer.problem, plan_str, self._writer.get_item_named
)
res = PlanGenerationResult(
PlanGenerationResultStatus.INTERMEDIATE,
plan=plan,
engine_name=self.name,
)
writer.res_queue.put(res)
writer.current_plan = []
writer.storing = False
elif writer.storing and l:
writer.current_plan.append(self._parse_plan_line(l))
def _get_engine_epsilon(self) -> Optional[Fraction]:
return Fraction(1, 1000)
def _get_write_domain_options(self) -> Dict[str, bool]:
opts = super()._get_write_domain_options()
opts["force_predicate"] = True
return opts
def _get_computation_time(self, logs: List[LogMessage]) -> Optional[float]:
for log in logs:
for line in reversed(log.message.splitlines()):
if line.startswith("; Time"):
return float(line.split()[2])
return None
def _get_plan(self, proc_out: List[str]) -> str:
plan: List[str] = []
parsing = False
for line in proc_out:
if self._starting_plan_str() in line:
parsing = True
plan = [] # Clear the plan to keep only the last one
continue
if not parsing:
continue
if self._ending_plan_str() == line:
parsing = False
plan.append(self._parse_plan_line(line))
return "\n".join(plan)
def _starting_plan_str(self) -> str:
return "; Time"
def _ending_plan_str(self) -> str:
return "\n"
def _parse_plan_line(self, plan_line: str) -> str:
return plan_line.strip()
def _result_status(
self,
problem: Problem,
plan: Optional[Any],
retval: int,
log_messages: Optional[List[LogMessage]] = None,
) -> PlanGenerationResultStatus:
for log in log_messages:
for line in log.message.splitlines():
if line.startswith(";; Problem unsolvable!"):
return PlanGenerationResultStatus.UNSOLVABLE_PROVEN
if plan is not None:
splitted = str(plan).strip().split("\n")
has_plan = len(splitted) > 1
else:
has_plan = False
if has_plan:
return PlanGenerationResultStatus.SOLVED_SATISFICING
if retval == 0:
return PlanGenerationResultStatus.UNSOLVABLE_INCOMPLETELY
return PlanGenerationResultStatus.INTERNAL_ERROR
__all__ = ["OpticPlanner"]