Skip to content

Commit 084c862

Browse files
authored
Merge pull request AtsushiSakai#313 from Guilyx/bidir-astar
Bidirectional A*
2 parents 50b5b96 + 38519b5 commit 084c862

File tree

2 files changed

+343
-0
lines changed

2 files changed

+343
-0
lines changed

PathPlanning/AStar/a_star.py

+1
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ def main():
271271
if show_animation: # pragma: no cover
272272
plt.plot(rx, ry, "-r")
273273
plt.show()
274+
plt.pause(0.001)
274275

275276

276277
if __name__ == '__main__':
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
"""
2+
3+
Bidirectional A* grid planning
4+
5+
author: Erwin Lejeune (@spida_rwin)
6+
7+
See Wikipedia article (https://en.wikipedia.org/wiki/Bidirectional_search)
8+
9+
"""
10+
11+
import math
12+
13+
import matplotlib.pyplot as plt
14+
15+
show_animation = True
16+
17+
18+
class BidirectionalAStarPlanner:
19+
20+
def __init__(self, ox, oy, reso, rr):
21+
"""
22+
Initialize grid map for a star planning
23+
24+
ox: x position list of Obstacles [m]
25+
oy: y position list of Obstacles [m]
26+
reso: grid resolution [m]
27+
rr: robot radius[m]
28+
"""
29+
30+
self.reso = reso
31+
self.rr = rr
32+
self.calc_obstacle_map(ox, oy)
33+
self.motion = self.get_motion_model()
34+
35+
class Node:
36+
def __init__(self, x, y, cost, pind):
37+
self.x = x # index of grid
38+
self.y = y # index of grid
39+
self.cost = cost
40+
self.pind = pind
41+
42+
def __str__(self):
43+
return str(self.x) + "," + str(self.y) + "," + str(
44+
self.cost) + "," + str(self.pind)
45+
46+
def planning(self, sx, sy, gx, gy):
47+
"""
48+
Bidirectional A star path search
49+
50+
input:
51+
sx: start x position [m]
52+
sy: start y position [m]
53+
gx: goal x position [m]
54+
gy: goal y position [m]
55+
56+
output:
57+
rx: x position list of the final path
58+
ry: y position list of the final path
59+
"""
60+
61+
nstart = self.Node(self.calc_xyindex(sx, self.minx),
62+
self.calc_xyindex(sy, self.miny), 0.0, -1)
63+
ngoal = self.Node(self.calc_xyindex(gx, self.minx),
64+
self.calc_xyindex(gy, self.miny), 0.0, -1)
65+
66+
open_set_A, closed_set_A = dict(), dict()
67+
open_set_B, closed_set_B = dict(), dict()
68+
open_set_A[self.calc_grid_index(nstart)] = nstart
69+
open_set_B[self.calc_grid_index(ngoal)] = ngoal
70+
71+
current_A = nstart
72+
current_B = ngoal
73+
74+
while 1:
75+
if len(open_set_A) == 0:
76+
print("Open set A is empty..")
77+
break
78+
79+
if len(open_set_B) == 0:
80+
print("Open set B is empty..")
81+
break
82+
83+
c_id_A = min(
84+
open_set_A,
85+
key=lambda o: self.find_total_cost(open_set_A, o, current_B))
86+
87+
current_A = open_set_A[c_id_A]
88+
89+
c_id_B = min(
90+
open_set_B,
91+
key=lambda o: self.find_total_cost(open_set_B, o, current_A))
92+
93+
current_B = open_set_B[c_id_B]
94+
95+
# show graph
96+
if show_animation: # pragma: no cover
97+
plt.plot(self.calc_grid_position(current_A.x, self.minx),
98+
self.calc_grid_position(current_A.y, self.miny), "xc")
99+
plt.plot(self.calc_grid_position(current_B.x, self.minx),
100+
self.calc_grid_position(current_B.y, self.miny), "xc")
101+
# for stopping simulation with the esc key.
102+
plt.gcf().canvas.mpl_connect('key_release_event',
103+
lambda event: [exit(
104+
0) if event.key == 'escape' else None])
105+
if len(closed_set_A.keys()) % 10 == 0:
106+
plt.pause(0.001)
107+
108+
if current_A.x == current_B.x and current_A.y == current_B.y:
109+
print("Found goal")
110+
meetpointA = current_A
111+
meetpointB = current_B
112+
break
113+
114+
# Remove the item from the open set
115+
del open_set_A[c_id_A]
116+
del open_set_B[c_id_B]
117+
118+
# Add it to the closed set
119+
closed_set_A[c_id_A] = current_A
120+
closed_set_B[c_id_B] = current_B
121+
122+
# expand_grid search grid based on motion model
123+
for i, _ in enumerate(self.motion):
124+
continue_A = False
125+
continue_B = False
126+
127+
child_node_A = self.Node(current_A.x + self.motion[i][0],
128+
current_A.y + self.motion[i][1],
129+
current_A.cost + self.motion[i][2],
130+
c_id_A)
131+
132+
child_node_B = self.Node(current_B.x + self.motion[i][0],
133+
current_B.y + self.motion[i][1],
134+
current_B.cost + self.motion[i][2],
135+
c_id_B)
136+
137+
n_id_A = self.calc_grid_index(child_node_A)
138+
n_id_B = self.calc_grid_index(child_node_B)
139+
140+
# If the node is not safe, do nothing
141+
if not self.verify_node(child_node_A):
142+
continue_A = True
143+
144+
if not self.verify_node(child_node_B):
145+
continue_B = True
146+
147+
if n_id_A in closed_set_A:
148+
continue_A = True
149+
150+
if n_id_B in closed_set_B:
151+
continue_B = True
152+
153+
if not continue_A:
154+
if n_id_A not in open_set_A:
155+
# discovered a new node
156+
open_set_A[n_id_A] = child_node_A
157+
else:
158+
if open_set_A[n_id_A].cost > child_node_A.cost:
159+
# This path is the best until now. record it
160+
open_set_A[n_id_A] = child_node_A
161+
162+
if not continue_B:
163+
if n_id_B not in open_set_B:
164+
# discovered a new node
165+
open_set_B[n_id_B] = child_node_B
166+
else:
167+
if open_set_B[n_id_B].cost > child_node_B.cost:
168+
# This path is the best until now. record it
169+
open_set_B[n_id_B] = child_node_B
170+
171+
rx, ry = self.calc_final_bidirectional_path(
172+
meetpointA, meetpointB, closed_set_A, closed_set_B)
173+
174+
return rx, ry
175+
176+
def calc_final_bidirectional_path(self, meetnode_A, meetnode_B, closed_set_A, closed_set_B):
177+
rx_A, ry_A = self.calc_final_path(meetnode_A, closed_set_A)
178+
rx_B, ry_B = self.calc_final_path(meetnode_B, closed_set_B)
179+
180+
rx_A.reverse()
181+
ry_A.reverse()
182+
183+
rx = rx_A + rx_B
184+
ry = ry_A + ry_B
185+
186+
return rx, ry
187+
188+
def calc_final_path(self, ngoal, closedset):
189+
# generate final course
190+
rx, ry = [self.calc_grid_position(ngoal.x, self.minx)], [
191+
self.calc_grid_position(ngoal.y, self.miny)]
192+
pind = ngoal.pind
193+
while pind != -1:
194+
n = closedset[pind]
195+
rx.append(self.calc_grid_position(n.x, self.minx))
196+
ry.append(self.calc_grid_position(n.y, self.miny))
197+
pind = n.pind
198+
199+
return rx, ry
200+
201+
@staticmethod
202+
def calc_heuristic(n1, n2):
203+
w = 1.0 # weight of heuristic
204+
d = w * math.hypot(n1.x - n2.x, n1.y - n2.y)
205+
return d
206+
207+
def find_total_cost(self, open_set, lambda_, n1):
208+
g_cost = open_set[lambda_].cost
209+
h_cost = self.calc_heuristic(n1, open_set[lambda_])
210+
f_cost = g_cost + h_cost
211+
return f_cost
212+
213+
def calc_grid_position(self, index, minp):
214+
"""
215+
calc grid position
216+
217+
:param index:
218+
:param minp:
219+
:return:
220+
"""
221+
pos = index * self.reso + minp
222+
return pos
223+
224+
def calc_xyindex(self, position, min_pos):
225+
return round((position - min_pos) / self.reso)
226+
227+
def calc_grid_index(self, node):
228+
return (node.y - self.miny) * self.xwidth + (node.x - self.minx)
229+
230+
def verify_node(self, node):
231+
px = self.calc_grid_position(node.x, self.minx)
232+
py = self.calc_grid_position(node.y, self.miny)
233+
234+
if px < self.minx:
235+
return False
236+
elif py < self.miny:
237+
return False
238+
elif px >= self.maxx:
239+
return False
240+
elif py >= self.maxy:
241+
return False
242+
243+
# collision check
244+
if self.obmap[node.x][node.y]:
245+
return False
246+
247+
return True
248+
249+
def calc_obstacle_map(self, ox, oy):
250+
251+
self.minx = round(min(ox))
252+
self.miny = round(min(oy))
253+
self.maxx = round(max(ox))
254+
self.maxy = round(max(oy))
255+
print("minx:", self.minx)
256+
print("miny:", self.miny)
257+
print("maxx:", self.maxx)
258+
print("maxy:", self.maxy)
259+
260+
self.xwidth = round((self.maxx - self.minx) / self.reso)
261+
self.ywidth = round((self.maxy - self.miny) / self.reso)
262+
print("xwidth:", self.xwidth)
263+
print("ywidth:", self.ywidth)
264+
265+
# obstacle map generation
266+
self.obmap = [[False for _ in range(self.ywidth)]
267+
for _ in range(self.xwidth)]
268+
for ix in range(self.xwidth):
269+
x = self.calc_grid_position(ix, self.minx)
270+
for iy in range(self.ywidth):
271+
y = self.calc_grid_position(iy, self.miny)
272+
for iox, ioy in zip(ox, oy):
273+
d = math.hypot(iox - x, ioy - y)
274+
if d <= self.rr:
275+
self.obmap[ix][iy] = True
276+
break
277+
278+
@staticmethod
279+
def get_motion_model():
280+
# dx, dy, cost
281+
motion = [[1, 0, 1],
282+
[0, 1, 1],
283+
[-1, 0, 1],
284+
[0, -1, 1],
285+
[-1, -1, math.sqrt(2)],
286+
[-1, 1, math.sqrt(2)],
287+
[1, -1, math.sqrt(2)],
288+
[1, 1, math.sqrt(2)]]
289+
290+
return motion
291+
292+
293+
def main():
294+
print(__file__ + " start!!")
295+
296+
# start and goal position
297+
sx = 10.0 # [m]
298+
sy = 10.0 # [m]
299+
gx = 50.0 # [m]
300+
gy = 50.0 # [m]
301+
grid_size = 2.0 # [m]
302+
robot_radius = 1.0 # [m]
303+
304+
# set obstacle positions
305+
ox, oy = [], []
306+
for i in range(-10, 60):
307+
ox.append(i)
308+
oy.append(-10.0)
309+
for i in range(-10, 60):
310+
ox.append(60.0)
311+
oy.append(i)
312+
for i in range(-10, 61):
313+
ox.append(i)
314+
oy.append(60.0)
315+
for i in range(-10, 61):
316+
ox.append(-10.0)
317+
oy.append(i)
318+
for i in range(-10, 40):
319+
ox.append(20.0)
320+
oy.append(i)
321+
for i in range(0, 40):
322+
ox.append(40.0)
323+
oy.append(60.0 - i)
324+
325+
if show_animation: # pragma: no cover
326+
plt.plot(ox, oy, ".k")
327+
plt.plot(sx, sy, "og")
328+
plt.plot(gx, gy, "ob")
329+
plt.grid(True)
330+
plt.axis("equal")
331+
332+
bidir_a_star = BidirectionalAStarPlanner(ox, oy, grid_size, robot_radius)
333+
rx, ry = bidir_a_star.planning(sx, sy, gx, gy)
334+
335+
if show_animation: # pragma: no cover
336+
plt.plot(rx, ry, "-r")
337+
plt.pause(.0001)
338+
plt.show()
339+
340+
341+
if __name__ == '__main__':
342+
main()

0 commit comments

Comments
 (0)