-
Notifications
You must be signed in to change notification settings - Fork 2
/
data_collection.py
59 lines (48 loc) · 2.08 KB
/
data_collection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# Import necessary libraries
import rosbag
import message_filters
from sensor_msgs.msg import Joy
from sensor_msgs.msg import LaserScan
from ackermann_msgs.msg import AckermannDriveStamped
# Global variables
drive = '/vesc/low_level/ackermann_cmd_mux/input/teleop' # Topic for drive commands
joy = '/vesc/joy' # Topic for joystick commands
lid = '/scan_filtered' # Lidar topic (filtered)
pressed = False # Button press flag
bag = None # ROS Bag to store collected data
bag_name = 'Dataset/out.bag' # path and name of the bag where the data needs to be stored
# ROS initialization
rospy.init_node('receive_position')
# Callback for receiving Ackermann and Lidar messages
def callback(ack_msg, ldr_msg):
if pressed and bag is not None:
print(f'Ackermann: {ack_msg}')
print(f'Lidar: {ldr_msg}')
bag.write('Ackermann', ack_msg) # Write Ackermann messages to bag
bag.write('Lidar', ldr_msg) # Write Lidar messages to bag
# Callback for receiving button press from joystick
def button_callback(j):
global pressed
global bag
# Check if button 1 is pressed which is mapped to button A of Logistic Joystick
if j.buttons[1] == 1 and not pressed:
pressed = True
bag = rosbag.Bag(bag_name, 'w') # Create a new bag for recording
print('Recording Started')
elif j.buttons[1] == 0 and pressed:
pressed = False
bag.close() # Close the bag when recording stops
print('Recording Stopped')
# Create subscribers for drive and lidar messages
drive_sub = message_filters.Subscriber(drive, AckermannDriveStamped)
lid_sub = message_filters.Subscriber(lid, LaserScan)
# Use time synchronizer to combine messages from both topics
ts = message_filters.ApproximateTimeSynchronizer([drive_sub, lid_sub], queue_size=10, slop=0.01, allow_headerless=True)
# Subscribe to joystick topic to receive button press
rospy.Subscriber(joy, Joy, button_callback)
# Register the callback function
ts.registerCallback(callback)
# Keep the program running
rospy.spin()
# End of data collection
print('\n-----------Recording Completed-----------')