-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconvert_omni_seb.py
299 lines (271 loc) · 13.2 KB
/
convert_omni_seb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# -*- coding: utf-8 -*-
from appscript import *
import subprocess
import re
import settings as s #constants
class SebException(Exception):
""" domain specific exception """
pass
class ParseQGM():
"""
Author nernst
Use py-appscript to
convert OmniGraffle QGM models to Seb SAT syntax
"""
def __init__(self, canvas_name=s.CANVAS_NAME):
__graffle = app('OmniGraffle Professional 5')
__doc = __graffle.documents[1] # the top document, note 1-indexed array
self.__s = __doc.canvases[canvas_name].get()
self.__shapes = self.__s.shapes.get()
self.__graphics = self.__s.graphics.get()
self.__node_cnt = 0
self.__opt_cnt = 0
self.__rel_cnt = 0
self.__true_rel_cnt = 0 # number of relations after AND/OR have been aggregated.
self.__rel_types = ['AND', 'OR', '+', '-','++', '--', 'p', 'conflict']
self.__src_map = {'AND':[], 'OR':[], '+':[], '-':[],'++':[], '--':[], 'p':[], 'conflict':[]} # store the node ids that are sources. #em dash is Unicode
self.__contribs = ['+', '-', '++', '--']
self.__opt_list = [] # a list of nodes which are optional
self.__decomp_str = ''
self.__contr_str = ''
self.__des_str = ''
self.__just_str = ''
self.__mand_str = ''
self.__conflict_str = ''
self.__id_sebid_map = {}
self.__atms_nodes_map = {'goal':[], 'task':[], 'domain-assumption':[], 'leaf':[]} # no quality-constraints
self.__atms_nodes = []
def set_node_ids(self):
"""pass 1 - set the node ids, zero indexed"""
for element in self.__shapes:
data = element.user_data.get()
nodeid = element.id.get()
if data != None:
opt = data['optional'] == 'T'
if not opt:
# set seb_id
data['seb_id'] = self.__node_cnt
self.__id_sebid_map[self.__node_cnt] = nodeid
element.user_data.set(data)
self.__node_cnt += 1
# set atms list
# ideally do something like this, where we set the type from the graph
node_type = data['node_type']
node_name = 'r-' + str(data['seb_id']) # LISP does not allow ints as identifiers
self.__atms_nodes_map[node_type].append(node_name)
self.__atms_nodes.append(node_name)
#pass 2 - set the node ids for optional nodes. Must be named after the mandatory ones.
# opt_cnt = self.__node_cnt
# for element in self.__shapes:
# opt = False
# text = element.text.get()
# data = element.user_data.get()
# if data != None:
# opt = data['optional'] == 'T'
# if opt:
# self.__opt_list.append(data['seb_id']) # build a list of nodes which are optional
# # set seb_id
# nodeid = element.id.get()
# data['seb_id'] = opt_cnt
# element.user_data.set(data)
# self.__id_sebid_map[opt_cnt] = nodeid
# opt_cnt += 1
# self.__opt_cnt = opt_cnt - self.__node_cnt
def generate_atms(self):
"""Generate .techne files for use with Techne ATMS implementation"""
for element in self.__graphics:
for key in self.__src_map.keys():
self.__src_map[key] = [] # remove preceding iteration's data
data = element.user_data.get()
if element.class_.get() != k.shape or data == None: ## NOTA BENE!! This does nothing yet is necessary to prevent errors. ::shrug::
#print 'not a node' #return #either a line, or a label
continue
# #print element.id.get()
in_lines = element.incoming_lines.get()
for l in in_lines:
try:
l_text = l.user_data.get()['rel_type'] # one of rel_types
seb_id = l.source.user_data.get()['seb_id']
# TODO only one possible rel_type per node. Check that.
self.__src_map[l_text].append('r-' + str(seb_id)) # e.g. ['AND'][r-93,r-94...]
except KeyError as ke:
print ke, 'no rel_type on that line', l.id.get(), l.user_data.get()['rel_type']
#generate relations
# AND/OR - the only node types with multiple srcs
and_srcs = ' '.join(self.__src_map['AND'])
or_srcs = self.__src_map['OR']
conflict_srcs = self.__src_map['conflict'] # we are assuming there is only binary conflict
if and_srcs != '':
self.__just_str += '(assert-formula ' + 'r-' + data['seb_id'] + ' (list ' + and_srcs + ') :DA \"a just\" *rekb* )\n'
if or_srcs != []:
for node in or_srcs:
self.__just_str += '(assert-formula ' + 'r-' + data['seb_id'] + ' (list ' + node + ' ) :DA \"a just\" *rekb* )\n'
if conflict_srcs != []:
self.__conflict_str += '(assert-formula (contradiction *rekb*) (list ' + conflict_srcs[0] + ' r-' + data['seb_id'] + ') :DA "hurt4" *rekb*)\n'
try:
if data['mandatory'] == 'T':
m_node_id = data['seb_id']
self.__mand_str += '(assert-mandatory r-' + m_node_id + ' *rekb* T)\n'
except KeyError as ke:
pass
def generate_seb(self):
opt = False
for element in self.__graphics:
for key in self.__src_map.keys():
self.__src_map[key] = [] # remove preceding iteration's data
data = element.user_data.get()
# if element.class_.get() == k.line and data != None:
# text = data['rel_type'] #AND/OR/+/- etc.
if element.class_.get() != k.shape or data == None:
#print 'not a node' #return #either a line, or a label
continue
#print element.id.get()
opt = data['optional'] == 'T'
if not opt:
in_lines = element.incoming_lines.get()
for l in in_lines:
try:
l_text = l.user_data.get()['rel_type'] # one of rel_types
seb_id = l.source.user_data.get()['seb_id']
self.__src_map[l_text].append(seb_id) # e.g. ['+'][93,94] ...
except KeyError as ke:
print ke, 'no rel_type on that line', l.id.get(), l.user_data.get()['rel_type']
#for non-optional nodes, generate relations
rel_type = ''
# purge optional nodes from list of sources for this sink
for rel_type in self.__rel_types:
tmp = []
for x in self.__src_map[rel_type]: # all nodes which are sources
if x not in self.__opt_list:
tmp.append(x)
self.__src_map[rel_type] = tmp
# AND/OR - the only node types with multiple srcs
and_srcs = ' '.join(self.__src_map['AND'])
or_srcs = ' '.join(self.__src_map['OR'])
if and_srcs != '':
self.__decomp_str += 'R AND ' + data['seb_id'] + ' ' + and_srcs + '\n'
self.__true_rel_cnt += 1
if or_srcs != '':
self.__decomp_str += 'R OR ' + data['seb_id'] + ' ' + or_srcs + '\n'
self.__true_rel_cnt += 1
# contributions
srcs = ''
for c in self.__contribs:
srcs = self.__src_map[c]
if srcs != []:
for src in srcs:
self.__contr_str += 'R ' + c + ' ' + data['seb_id'] + ' ' + src + '\n'
self.__true_rel_cnt += 1
if data['mandatory'] == 'T':
m_node_id = data['seb_id']
self.__des_str += 'TS ' + m_node_id + ' ;\n-PD ' + m_node_id + ' ;\n'
# here we will add to the .des file the goal ids that must be "TS" and -PD
def get_optional_ids(self):
""" return a list of graffle ids of nodes labeled as optional"""
#print "num optional: ", self.__opt_cnt
options_id_map = {}
for option in self.__opt_list:
# print option, options_id_map, self.__id_sebid_map
# try:
options_id_map[option] = self.__id_sebid_map[int(option)]
# except KeyError:
# print "option not in list of ids"
# return None
return options_id_map
def set_node_status(self, nodeid, op='to_mandatory'):
""" given a nodeid, set the user data"""
node_data = self.__s.shapes.ID(nodeid).user_data.get()
if op == 'to_mandatory':
node_data['optional'] = u'F'
node_data['mandatory'] = u'T'
elif op == 'to_optional':
node_data['optional'] = u'T'
node_data['mandatory'] = u'F'
elif op == 'to_unknown':
node_data['optional'] = u'F'
node_data['mandatory'] = u'F'
else:
print "unrecognized node status operation", op
self.__s.shapes.ID(nodeid).user_data.set(node_data)
def print_files(self):
""" legacy support"""
self.print_seb_files()
self.print_atms_files()
def find_node_type(self, node):
if node == '':
return
if node in self.__atms_nodes_map['goal']:
return "GOAL"
if node in self.__atms_nodes_map['task'] or node in self.__atms_nodes_map['leaf']:
return "TASK"
if node in self.__atms_nodes_map['domain-assumption']:
return "GOAL"
def print_atms_files(self):
atms_file = s.OUT_DIR + s.ATMS_FILE
node_str = ''
out = open(atms_file, 'w')
header = '(load "/Users/nernst/Dropbox/research/src/tms/techne-atms.dx64fsl")\n(load "/Users/nernst/Dropbox/research/src/tms/techne-psm.lisp")\n(defvar *rekb* (create-rekb "pcidss"))\n(setq \n'
out.write(header.encode('utf-8'))
for node in self.__atms_nodes:
node_type = self.find_node_type(node)
node_str += '\t' + node + ' (declare-atomic nil \"'+ node + '\" :'+ node_type + ' *rekb*)\n'
out.write(node_str.encode('utf-8'))
footer = ')\n\n;;Justifications\n'
out.write(footer.encode('utf-8'))
out.write(self.__just_str.encode('utf-8'))
out.write(u'\n;;Conflicts\n')
out.write(self.__conflict_str.encode('utf-8'))
out.write(self.__mand_str.encode('utf-8'))
out.close()
def print_seb_files(self):
out_file = s.OUT_DIR + s.GOAL_FILE
des_file = s.OUT_DIR + s.DES_FILE
out = open(out_file, 'w')
header = 'D ' + str(self.__node_cnt) + ' ' + str(self.__true_rel_cnt) + '\n'
print header
out.write(header.encode('utf-8'))
for i in range(self.__node_cnt):
out.write(u'N NO NO\n')
out.write(self.__contr_str.encode('utf-8'))
out.write(self.__decomp_str.encode('utf-8'))
out.close()
des = open(des_file, 'w')
des.write('\n')
des.write(self.__des_str.encode('utf-8'))
des.close()
def zero_counts(self):
self.__node_cnt = 0
self.__rel_cnt = 0
self.__true_rel_cnt =0
self.__opt_cnt = 0
def run_seb(self):
""" run the Sebastiani algorithm (via Chaff) and return admissibility """
sat_dir = '/Users/nernst/Dropbox/research/projects/goal-reasoning/backward_prop/GRTool-Src/Solvers/src/Goalreasoner/lin/Goalsolve/'
p = subprocess.Popen(args=[sat_dir + 'goalsolve', '-n', '/tmp/re2010.goal', '/tmp/re2010.des'],
stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
cwd=sat_dir)
stdout, stderr = p.communicate()
sat_pattern = re.compile('\+ The problem admits solutions \+')
unsat_pattern = re.compile('\+ The problem admits no solution \+')
chaff_err_pattern = re.compile('Wrong output from chaff')
sat = sat_pattern.search(stderr)
unsat = unsat_pattern.search(stderr)
if unsat != None:
print 'Error occured in SAT run or no admissible result'
print stderr
raise SebException
else:
return True
def negate_repl(matchobj):
""" replace 'literal' with '-literal' and vice-versa"""
neg, literal = matchobj.groups(1)
if neg == '-':
return literal
else:
return '-' + literal
def negate(sat_assign):
""" given a satisfying assignment, return that assignment of the initial labels, negated"""
new = re.sub('(-*)(\d+)', negate_repl, sat_assign)
return new
if __name__ == '__main__':
q = ParseQGM()