forked from danieldugas/pymap2d
-
Notifications
You must be signed in to change notification settings - Fork 0
/
map2d_ros_tools.py
87 lines (77 loc) · 3.47 KB
/
map2d_ros_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import rospy
from threading import Lock
from CMap2D import CMap2D
# if this is after remove_python2..., somehow python2 and python3 mismatch leads to non-connecting topics
from nav_msgs.msg import OccupancyGrid
def remove_python2_entries_from_sys_path():
""" sourcing ros means adding its python2 paths to sys.path. This creates incompatibilities when
importing tf, so we remove them """
import sys
if sys.version[0] == str(3):
new_path = []
for p in sys.path:
if "python2" in p:
print("REMOVING python2 entry from sys.path: {}".format(p))
continue
new_path.append(p)
sys.path = new_path
remove_python2_entries_from_sys_path()
import tf
def default_refmap_update_callback(self_):
pass
""" so many nodes end up implementing this functionality, it made sense to create
a class for it.
This class loads and stores the reference map (for global planning, for example),
and keeps track of the robot's pose in the map
with refmap_update_callback, the user can specify a task to perform each time the map is updated
"""
class ReferenceMapAndLocalizationManager(object):
""" If a reference map is provided and a tf exists,
keeps track of the tf for given frame in the reference map frame """
def __init__(self, map_folder, map_filename, reference_map_frame, frame_to_track, refmap_update_callback=default_refmap_update_callback):
self.tf_frame_in_refmap = None
self.map_ = None
self.lock = Lock() # for avoiding race conditions
self.refmap_is_dynamic = False
# update callback
self.refmap_update_callback = refmap_update_callback
# get frame info
self.kRefMapFrame = reference_map_frame
self.kFrame = frame_to_track
# Load map
if map_folder == "rostopic":
self.refmap_is_dynamic = True
rospy.logwarn("Getting reference map from topic '{}'".format(map_filename))
rospy.Subscriber(map_filename, OccupancyGrid, self.map_callback, queue_size=1)
else:
# loads map based on ros params
folder = map_folder
filename = map_filename
try:
self.map_ = CMap2D(folder, filename)
except IOError as e:
rospy.logwarn(rospy.get_namespace())
rospy.logwarn("Failed to load reference map. Make sure {}.yaml and {}.pgm"
" are in the {} folder.".format(filename, filename, folder))
rospy.logwarn("Disabling. No global localization or reference map will be available.")
return
self.refmap_update_callback(self)
# launch callbacks
self.tf_listener = tf.TransformListener()
rospy.Timer(rospy.Duration(0.01), self.tf_callback)
self.has_last_failed = False
def tf_callback(self, event=None):
try:
self.tf_frame_in_refmap = self.tf_listener.lookupTransform(self.kRefMapFrame, self.kFrame, rospy.Time(0))
if self.has_last_failed:
rospy.logwarn("refmap tf found.")
self.has_last_failed = False
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException) as e:
rospy.logwarn_throttle(10., e)
self.has_last_failed = True
return
def map_callback(self, msg):
with self.lock:
self.map_ = CMap2D()
self.map_.from_msg(msg)
self.refmap_update_callback(self)