1
1
#!/usr/bin/env python3
2
+
3
+ # Import necessary libraries
2
4
import rclpy
3
5
from rclpy .node import Node
4
6
from pylx16a .lx16a import LX16A
5
7
from ros2_lx16a_driver .srv import GetLX16AInfo , SetLX16AParams , SetLX16ATorque
6
8
from std_msgs .msg import Float32MultiArray , Int32MultiArray
7
- from rclpy .logging import LoggingSeverity
8
- import threading
9
- import sys
10
- import time
11
9
12
10
class LX16AController (Node ):
11
+ """
12
+ ROS 2 Node for controlling LX-16A servos.
13
+
14
+ This node provides services to get servo information, set parameters, and control torque.
15
+ It also subscribes to position and velocity commands to move the servos in real-time.
16
+ """
17
+
13
18
def __init__ (self ):
14
19
super ().__init__ ('lx16a_controller' )
15
20
16
- # Définir les paramètres avec des valeurs par défaut
21
+
22
+ # Declare ROS 2 parameters with default values
17
23
self .declare_parameter ('default_angle_offset' , 0 )
18
24
self .declare_parameter ('default_angle_min' , 0 )
19
25
self .declare_parameter ('default_angle_max' , 240 )
@@ -24,7 +30,8 @@ def __init__(self):
24
30
self .declare_parameter ('port' , '/dev/ttyUSB0' )
25
31
self .declare_parameter ('scan_connected_servos' , False )
26
32
27
- # Charger les paramètres
33
+
34
+ # Store default parameter values
28
35
self .default_values = {
29
36
'angle_offset' : self .get_parameter ('default_angle_offset' ).value ,
30
37
'angle_min' : self .get_parameter ('default_angle_min' ).value ,
@@ -34,10 +41,12 @@ def __init__(self):
34
41
'temperature_limit' : self .get_parameter ('default_temperature_limit' ).value ,
35
42
}
36
43
37
- # Manage log verbosity
44
+
45
+ # Set logger verbosity level
38
46
self .get_logger ().set_level (self .get_parameter ('log_level' ).value )
39
47
40
- # Initialize the LX-16A bus
48
+
49
+ # Initialize communication with LX-16A servos
41
50
try :
42
51
LX16A .initialize (self .get_parameter ('port' ).value )
43
52
self .servos = {}
@@ -48,31 +57,35 @@ def __init__(self):
48
57
self .destroy_node ()
49
58
return
50
59
51
- # Create services
60
+
61
+ # Create ROS 2 services
52
62
self .srv_get = self .create_service (GetLX16AInfo , '/lx16a_get_info' , self .handle_get_info )
53
63
self .srv_set_params = self .create_service (SetLX16AParams , '/lx16a_set_params' , self .handle_set_params )
54
64
self .srv_set_torque = self .create_service (SetLX16ATorque , '/lx16a_set_torque' , self .handle_set_torque )
55
65
56
- # Create subscriber for position control
66
+
67
+ # Create ROS 2 subscribers
57
68
self .position_subscriber = self .create_subscription (
58
- Float32MultiArray , # Message type
59
- '/cmd_pose_lx16a' , # Topic name
60
- self .handle_position_command , # Callback
61
- 10 # Queue size
69
+ Float32MultiArray , '/cmd_pose_lx16a' , self .handle_position_command , 10
62
70
)
63
-
64
- # Create subscriber for velocity control
65
71
self .velocity_subscriber = self .create_subscription (
66
- Int32MultiArray , # Message type
67
- '/cmd_vel_lx16a' , # Topic name
68
- self .handle_velocity_command , # Callback
69
- 10 # Queue size
72
+ Int32MultiArray , '/cmd_vel_lx16a' , self .handle_velocity_command , 10
70
73
)
71
74
75
+
72
76
self .get_logger ().info ('LX-16A driver ready' )
73
77
78
+
74
79
def get_servo (self , servo_id ):
75
- """Returns an instance of a given servo, creates it if necessary."""
80
+ """
81
+ Retrieve and create an instance of a servo with a given ID.
82
+
83
+ Parameters:
84
+ servo_id (int): ID of the servo.
85
+
86
+ Returns:
87
+ LX16A: Servo instance or None if an error occurs.
88
+ """
76
89
if servo_id not in self .servos :
77
90
try :
78
91
self .servos [servo_id ] = LX16A (servo_id )
@@ -82,17 +95,29 @@ def get_servo(self, servo_id):
82
95
return None
83
96
return self .servos [servo_id ]
84
97
98
+
85
99
def handle_get_info (self , request , response ):
86
- """Service to get detailed information of a servo."""
100
+ """
101
+ Handle the GetLX16AInfo service to retrieve servo information.
102
+
103
+ Parameters:
104
+ request (GetLX16AInfo.Request): Service request.
105
+ response (GetLX16AInfo.Response): Service response.
106
+
107
+ Returns:
108
+ GetLX16AInfo.Response: Populated response with servo details.
109
+ """
87
110
servo_id = int (request .id )
88
111
servo = self .get_servo (servo_id )
89
112
113
+
90
114
if servo is None :
91
115
self .get_logger ().error (f"Servo { servo_id } not available." )
92
- return response # Leave response empty if the servo is not found
116
+ return response
117
+
93
118
94
119
try :
95
- # Retrieve servo information
120
+ # Retrieve detailed information about the servo
96
121
response .angle_offset = int (servo .get_angle_offset ())
97
122
response .angle_min = int (servo .get_angle_limits ()[0 ])
98
123
response .angle_max = int (servo .get_angle_limits ()[1 ])
@@ -114,16 +139,19 @@ def handle_get_info(self, request, response):
114
139
response .physical_angle = servo .get_physical_angle ()
115
140
response .commanded_angle = servo .get_commanded_angle ()
116
141
142
+
117
143
self .get_logger ().info (f"Information for servo { servo_id } retrieved." )
118
144
except Exception as e :
119
145
self .get_logger ().error (f"Error retrieving info for servo { servo_id } : { e } " )
120
- # Empty response if an error occurs
121
- return response
146
+
122
147
123
148
return response
124
149
150
+
125
151
def handle_set_params (self , request , response ):
126
- """Service to set the parameters of a servo."""
152
+ """
153
+ Handle the SetLX16AParams service to update servo parameters.
154
+ """
127
155
servo_id = int (request .id )
128
156
servo = self .get_servo (servo_id )
129
157
@@ -159,8 +187,11 @@ def handle_set_params(self, request, response):
159
187
160
188
return response
161
189
190
+
162
191
def handle_set_torque (self , request , response ):
163
- """Service to set torque of a servo."""
192
+ """
193
+ Handle the SetLX16ATorque service to enable or disable servo torque.
194
+ """
164
195
servo_id = int (request .id )
165
196
servo = self .get_servo (servo_id )
166
197
@@ -185,8 +216,11 @@ def handle_set_torque(self, request, response):
185
216
186
217
return response
187
218
219
+
188
220
def handle_position_command (self , msg ):
189
- """Callback to handle position command for servos."""
221
+ """
222
+ Handle position commands received on the '/cmd_pose_lx16a' topic.
223
+ """
190
224
if len (msg .data ) == 0 :
191
225
self .get_logger ().warn ("No position command received." )
192
226
return
@@ -202,8 +236,11 @@ def handle_position_command(self, msg):
202
236
except Exception as e :
203
237
self .get_logger ().error (f"Error moving servo { i } : { e } " )
204
238
239
+
205
240
def handle_velocity_command (self , msg ):
206
- """Callback to handle velocity command for servos."""
241
+ """
242
+ Handle velocity commands received on the '/cmd_vel_lx16a' topic.
243
+ """
207
244
if len (msg .data ) == 0 :
208
245
self .get_logger ().warn ("No velocity command received." )
209
246
return
@@ -218,7 +255,11 @@ def handle_velocity_command(self, msg):
218
255
except Exception as e :
219
256
self .get_logger ().error (f"Error commanding servo { i } : { e } " )
220
257
258
+
221
259
def scan_connected_servos (self ):
260
+ """
261
+ Scan and log all connected servos on the LX-16A bus.
262
+ """
222
263
connected_ids = []
223
264
for servo_id in range (254 ):
224
265
try :
@@ -233,7 +274,11 @@ def scan_connected_servos(self):
233
274
self .get_logger ().warn ("\033 [31mNo servos detected.\033 [0m" )
234
275
return connected_ids
235
276
277
+
236
278
def main (args = None ):
279
+ """
280
+ Main function to initialize and spin the ROS 2 node.
281
+ """
237
282
rclpy .init (args = args )
238
283
node = LX16AController ()
239
284
@@ -245,8 +290,8 @@ def main(args=None):
245
290
rclpy .shutdown () # Shutdown if the context is still valid
246
291
247
292
248
-
249
293
if __name__ == '__main__' :
250
294
main ()
251
295
252
296
297
+
0 commit comments