-
Notifications
You must be signed in to change notification settings - Fork 0
/
Search.py
414 lines (377 loc) · 18.1 KB
/
Search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# ARNAB KUMAR MISHRA ([email protected]) HW2_search.py
# Discussed high level concepts with VIKAS PALAKURTHI, PRADEEP RAVILLA and MANISH VUTTUNOORI
import time
from heapq import *
import functools
import copy
import math
import random
# import matplotlib.pyplot as plt
import os.path
import subprocess
import numpy as np
from matplotlib import colors
from mpl_toolkits.mplot3d import Axes3D
infinity = float("inf")
priority_bfs = 1
priority_dfs = -1
flag = 0
class RobotWorld:
"The idea is that a RobotWorld is the datatype that goes in the queue."
def __init__(self,width,length,height,initial,goal):
self.hand = {'location' : (0,0,0), 'held' : None, 'closed?' : False}
self.width, self.length, self.height = width, length, height
self.blocks, self.goal = initial, goal
self.cost, self.title = 0, ''
assert(initial.keys() == goal.keys()) #there can't be blocks without a goal state, or goal states with a block that is not initialized.
self.colormap, possible_colors = {}, set(colors.cnames.keys()) #for the sake of the visualization, not important to the function of RobotWorlds.
for blockname in self.blocks:
self.colormap[blockname] = possible_colors.pop()
def __lt__(self,other):
"Not meaningful, but necessary for RobotWorld to interact with a heapq"
return True
# The actions return the change in cost
def moveUp(self):
(x,y,z) = self.hand['location']
if z < (self.height - 1):
self.hand['location'] = (x,y,z+1)
if self.hand['held']: self.blocks[self.hand['held']] = (x,y,z+1)
self.cost += 1.0
return 1.0
else:
print("Why is the 'moveUp' action occuring? The hand is as far up as it can go.")
return 0
def moveDown(self):
(x,y,z) = self.hand['location']
if z > 0:
self.hand['location'] = (x,y,z-1)
if self.hand['held']: self.blocks[self.hand['held']] = (x,y,z-1)
self.cost += 1.0
return 1.0
else:
print("Why is the 'moveDown' action occuring? The hand is on the floor.")
return 0
def moveLeft(self):
(x,y,z) = self.hand['location']
if y > 0:
self.hand['location'] = (x,y-1,z)
if self.hand['held']: self.blocks[self.hand['held']] = (x,y-1,z)
self.cost += 1.0
return 1.0
else:
print("Why is the 'moveLeft' action occuring? The hand is on the left edge.")
return 0
def moveRight(self):
(x,y,z) = self.hand['location']
if y < (self.length - 1):
self.hand['location'] = (x,y+1,z)
if self.hand['held']: self.blocks[self.hand['held']] = (x,y+1,z)
self.cost += 1.0
return 1.0
else:
print("Why is the 'moveRight' action occuring? The hand is on the right edge.")
return 0
def moveForward(self):
(x,y,z) = self.hand['location']
if x < (self.width - 1):
self.hand['location'] = (x+1,y,z)
if self.hand['held']: self.blocks[self.hand['held']] = (x+1,y,z)
self.cost += 1
return 1.0
else:
print("Why is the 'moveForward' action occuring? The hand is on the front edge.")
return 0
def moveBackward(self):
(x,y,z) = self.hand['location']
if x > 0:
self.hand['location'] = (x-1,y,z)
if self.hand['held']: self.blocks[self.hand['held']] = (x-1,y,z)
self.cost += 1
return 1.0
else:
print("Why is the 'moveBackward' action occuring? The hand is on the back edge.")
return 0
# the reason for the handOpen and handClose actions to have non-zero costs is to prevent the search from considering the "I'll close my hand on this block" and "I'll open and close my hand on this block 20 million times" as equivalent.
def handOpen(self):
if not self.hand['closed?']:
print("Why is the 'handOpen' action occuring? The hand is already open.")
return 0
self.hand['closed?'] = False
if self.hand['held']:
self.hand['held'] = None
self.cost += 0.1
return 0.1
def handClose(self):
if self.hand['closed?']:
print("Why is the 'handClose' action occuring? The hand is already closed.")
return 0
else:
for (name,location) in self.blocks.iteritems():
if location == self.hand['location']:
self.hand['held'] = name
self.hand['closed?'] = True
self.cost += 0.1
return 0.1
print("Why did the 'handClose' action occur? There is no block here at {}".format(str(self.hand['location'])))
return 0
def isGoal(self):
return self.blocks == self.goal
def allowedActions(self):
def alreadyThere(coord):
for (block_name, block_coord) in self.blocks.iteritems():
if (block_coord == coord) and (block_coord == self.goal[block_name]):
return True
return False
possibilities = ['close','forward','backward','left','right','up','down']
if self.hand['closed?'] and alreadyThere(self.hand['location']): #try to open first if its a good idea
possibilities = ['open'] + possibilities
# Removing the close action if 'alreadyThere' is True could lead to problems in a 3D world, or even rare scenarios in a 2D world.
# This is not relevant to the worlds here, but this should be removed if we re-use this for planning.
if self.hand['closed?'] or (self.hand['location'] not in self.blocks.values()) or (alreadyThere(self.hand['location'])):
possibilities.remove('close')
(x,y,z) = self.hand['location']
if x == 0: possibilities.remove('backward')
if x == (self.length - 1): possibilities.remove('forward')
if y == 0: possibilities.remove('left')
if y == (self.width - 1): possibilities.remove('right')
if z == 0: possibilities.remove('down')
if z == (self.height - 1): possibilities.remove('up')
if self.hand['closed?'] and ('open' not in possibilities): #try to put 'open' at the end, so moving around happens before
possibilities = possibilities + ['open'] #pointless opening and closing.
return possibilities
def do(self,action):
'''action is a string indicating an action for the RobotWorld to do. These strings come from the 'allowedActions' method, and are part of the process of
iterating over neighboring nodes in the graph.'''
if (action == 'up'): return self.moveUp()
elif (action == 'down'): return self.moveDown()
elif (action == 'left'): return self.moveLeft()
elif (action == 'right'): return self.moveRight()
elif (action == 'forward'): return self.moveForward()
elif (action == 'backward'): return self.moveBackward()
elif (action == 'open'): return self.handOpen()
elif (action == 'close'): return self.handClose()
else: print("Unexpected action {}".format(action))
def visualize(self,serial_number):
'''The blocks are colored according to their identity (exactly one color per block). These are persistent over time, so a block with a certain name in one frame of the animation will have the same color in all other frames.
The triangles of the same color as a block indicate where the block's goal location is.
The title 'cost=<number>' at the top indicates the cumulative cost of actions so far to bring the RobotWorld into its current state.
The cross indicates the location of the robot hand.
'''
fig = plt.figure()
axis = fig.add_subplot(111,projection='3d')
axis.set_xlim(0, self.width)
axis.set_ylim(0, self.length)
axis.set_zlim(0, self.height)
axis.set_xlabel('X Axis')
axis.set_ylabel('Y Axis')
axis.set_zlabel('Z Axis')
n = len(self.blocks)
for block in self.blocks.iteritems():
blockname, coord = block
axis.scatter(coord[0], coord[1], coord[2], s=500, marker=(4,0), c=self.colormap[blockname])
target = self.goal[blockname]
axis.scatter(target[0], target[1], target[2], s=300, marker=(3,0), c=self.colormap[blockname])
robo_x, robo_y, robo_z = self.hand['location']
axis.scatter(robo_x,robo_y,robo_z,s=500,marker='+', c='black')
axis.set_title("cost={}".format(str(self.cost)))
plt.savefig("snapshots/{}_{}.png".format(self.title,str(serial_number)))
plt.close()
def graphsearch(queue,queue_modification,timeout,visualize=False):
"The things that I can think to say about graphsearch are mentioned in the directions on the website. If you have questions, email me at [email protected]"
global flag
t0 = time.time()
visited, history = [], []
solution_info = {'cost' : infinity, 'num_expanded' : infinity, 'walltime' : infinity}
while len(queue) > 0:
if timeout == 0:
print("Ran out of time for thinking")
print("The queue is of size: {}".format(len(queue)))
return solution_info
queue, expanded = queue_modification(queue,visited)
if expanded not in visited:
visited.append(expanded)
if expanded.isGoal():
tf = time.time()
history.append(expanded)
solution_info['cost'] = expanded.cost
solution_info['num_expanded'] = len(history)
solution_info['walltime'] = tf - t0
print("The solution's cost is: {}, and involved expanding {} verticies".format(str(expanded.cost), str(len(history))))
print("Finding the solution involved {} seconds of walltime".format(str(tf - t0)))
print("----------------------------------------------------------------------------------")
if visualize:
for (i, world) in enumerate(history):
world.visualize(i)
return solution_info
else:
timeout -= 1
history.append(expanded)
print("No possible actions left. And you may ask yourself, well, how did I get here? You may tell yourself, this is not my beautiful search algorithm.")
return solution_info
def duplicateWorld(world,worlds):
"You may want to test if a world is in some visited set. This function is needed to define a more loose idea of equality, so that worlds with different accumulated costs are considered equivalent."
for w in worlds:
if (w.hand == world.hand) and (w.blocks == world.blocks):
return True
return False
def dummyExample():
"Shows the mechanics of a RobotWorld's 'do', and 'allowedActions' methods, as well as how to use a queue."
easy = RobotWorld(5,5,1,{'A' : (1,0,0), 'B' : (3,1,0)}, {'A' : (1,1,0), 'B' : (3,1,0)})
if easy.isGoal():
print("You've found the solution")
queue = []
heappush(queue, (0, easy)) #this is a minqueue where '0' is the key.
for action in easy.allowedActions():
neighbor = copy.deepcopy(easy)
deltaCost = neighbor.do(action)
print("You now have a neighbor of the start state. What should you do with it?")
def breadthFirst(queue,visited):
global priority_bfs
cur_world = heappop(queue)[1]
for actions in cur_world.allowedActions():
copy_cur_world = copy.deepcopy(cur_world)
copy_cur_world.do(actions)
# Keep adding the successors to the end of the queue
if duplicateWorld(copy_cur_world, visited) == False:
heappush(queue, (priority_bfs, copy_cur_world))
priority_bfs = priority_bfs + 1
return queue, cur_world
def depthFirst(queue,visited):
global priority_dfs
cur_world = heappop(queue)[1]
for actions in cur_world.allowedActions():
copy_cur_world = copy.deepcopy(cur_world)
copy_cur_world.do(actions)
# Children nodes should have a priority one less than the parent node
if duplicateWorld(copy_cur_world, visited) == False:
heappush(queue, (priority_dfs, copy_cur_world))
priority_dfs = priority_dfs - 1
return queue, cur_world
def bestFirst(queue,visited,heuristic):
cur_world_tuple = heappop(queue)
cur_world_priority = cur_world_tuple[0]
cur_world = cur_world_tuple[1]
for actions in cur_world.allowedActions():
copy_cur_world = copy.deepcopy(cur_world)
copy_cur_world.do(actions)
# Push into the queue with the priority value returned from the heuristic functions
if duplicateWorld(copy_cur_world, visited) == False:
h = heuristic(copy_cur_world)
heappush(queue, (h, copy_cur_world))
return queue, cur_world
def aStar(queue,visited,heuristic):
cur_world_tuple = heappop(queue)
cur_world_priority = cur_world_tuple[0]
cur_world = cur_world_tuple[1]
for actions in cur_world.allowedActions():
copy_cur_world = copy.deepcopy(cur_world)
copy_cur_world.do(actions)
# Push into the queue with priority value with added expanded cost
if duplicateWorld(copy_cur_world, visited) == False:
h = heuristic(copy_cur_world)
heappush(queue, (h + copy_cur_world.cost, copy_cur_world))
return queue, cur_world
def run(world,title,heuristics,timeout=5000):
solutions = []
queue = []
world1 = copy.deepcopy(world)
world1.title=(title + '_BFS')
heappush(queue,(0,world1))
print("Doing Breadth First Search on {}:".format(title))
solutions.append(graphsearch(queue, breadthFirst, timeout))
queue = []
world0 = copy.deepcopy(world)
world0.title=(title + '_DFS')
heappush(queue,(0,world0))
print("Doing Depth First Search on {}:".format(title))
solutions.append(graphsearch(queue, depthFirst, timeout))
for h in heuristics:
queue, hname = [], str(h).split(' ')[1]
world2 = copy.deepcopy(world)
world2.title=(title + hname + '_BestFirst')
heappush(queue,(0,world2))
bestFirst_h = functools.partial(bestFirst,heuristic=h)
print("Doing Best First with heuristic {} on {}:".format(hname,title))
solutions.append(graphsearch(queue, bestFirst_h, timeout))
queue = []
world4 = copy.deepcopy(world)
world4.title=(title + hname + '_Astar')
heappush(queue,(0,world4))
aStar_h = functools.partial(aStar,heuristic=h)
print("Doing A* with heuristic {} on {}:".format(hname,title))
solutions.append(graphsearch(queue, aStar_h, timeout))
return solutions
def experiment(heuristics,described_worlds):
data = [run(world,title,heuristics,timeout=10000) for (world,title) in described_worlds]
fig, axes = plt.subplots(nrows=len(described_worlds))
fig.set_size_inches(20,30)
if len(described_worlds) == 1: # a hack to get consistent indexing behavoir from matplotlib.
axes = [axes]
color_codes = random.sample(colors.cnames.keys(), len(described_worlds) * (2 + 2 * len(heuristics)))
labels = ['DFS','BFS']
for heuristic in heuristics:
hname = str(heuristic).split(' ')[1]
labels.append('Best_' + hname)
labels.append('A*_' + hname)
# set the legend of the figure to correspond to colors.
for (i, solutions) in enumerate(data): #2 because discard breadthFirst and depthFirst b/c no heuristic.
axes[i].set_title(described_worlds[i][1])
axes[i].set_xlabel("verticies expanded")
axes[i].set_ylabel("cost of solution")
for (j, solution) in enumerate(solutions):
print("{} is {}".format(labels[j], color_codes[j]))
if solution['cost'] == infinity:
pass
else:
axes[i].scatter(solution['num_expanded'],solution['cost'], color=color_codes[j], label=labels[j])
plt.savefig("experiment.png")
plt.close()
if not os.path.exists("snapshots"):
if subprocess.call(["mkdir","snapshots"]) != 0:
print("Failed to make a directory to store images in.")
def h1(robo):
x_hand = robo.hand['location'][0]
y_hand = robo.hand['location'][1]
total_sum = 0
for items in robo.blocks.keys():
item_xaxis = robo.blocks[items][0]
item_yaxis = robo.blocks[items][1]
item_goal_xaxis = robo.goal[items][0]
item_goal_yaxis = robo.goal[items][1]
dist1 = abs(item_xaxis - x_hand) + abs(item_yaxis - y_hand)
dist2 = abs(item_xaxis - item_goal_xaxis) + abs(item_yaxis - item_goal_yaxis)
total_sum += dist1 + dist2
return total_sum
def h2(robo):
x_hand = robo.hand['location'][0]
y_hand = robo.hand['location'][1]
total_sum = 0
dist_f_b_t_g = {}
dist_f_h_t_b = {}
for items in robo.blocks.keys():
item_xaxis = robo.blocks[items][0]
item_yaxis = robo.blocks[items][1]
item_goal_xaxis = robo.goal[items][0]
item_goal_yaxis = robo.goal[items][1]
dist1 = abs(item_xaxis - x_hand) + abs(item_yaxis - y_hand)
dist_f_h_t_b[items] = dist1
dist2 = abs(item_xaxis - item_goal_xaxis) + abs(item_yaxis - item_goal_yaxis)
dist_f_b_t_g[items] = dist2
if max(dist_f_b_t_g.values()) == 0:
return -1
for key in robo.blocks.keys():
if dist_f_b_t_g[key] == 0:
del dist_f_b_t_g[key]
del dist_f_h_t_b[key]
total_sum = sum(dist_f_h_t_b.values()) + sum(dist_f_b_t_g.values())
return total_sum
hs = {h1, h2} #this is a set, put your heuristic functions in here.
presolved = RobotWorld(4,4,1,{'A' : (3,3,0), 'B' : (3,1,0)}, {'A' : (3,3,0), 'B' : (3,1,0)})
run(presolved,'presolved',hs)
easy = RobotWorld(5,5,1,{'A' : (1,0,0), 'B' : (3,1,0)}, {'A' : (1,1,0), 'B' : (3,1,0)})
run(easy,'easy',hs)
medium = RobotWorld(6,6,1,{'A' : (1,1,0), 'B' : (3,1,0)}, {'A' : (4,4,0), 'B' : (4,5,0)})
run(medium,'medium',hs)
hard = RobotWorld(10,10,1,{'A' : (1,0,0), 'B' : (9,9,0), 'C' : (4,4,0)}, {'A' : (4,4,0), 'B' : (1,0,0), 'C' : (9,9,0)})
run(hard,'hard',hs)
#experiment(hs, [(presolved, 'presolved'), (easy,'easy'), (medium, 'medium'), (hard, 'hard')])