-
Notifications
You must be signed in to change notification settings - Fork 7
/
jetson_vehicle.py
158 lines (134 loc) · 5.22 KB
/
jetson_vehicle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Jun 25 10:44:24 2017
@author: wroscoe
"""
import time
from threading import Thread
from ROAR_Jetson.jetson_cmd_sender import ArduinoCommandSender
import logging
from ROAR_Jetson.camera import RS_D435i
from typing import Optional, List
import numpy as np
from ROAR_Jetson.receiver import Receiver
from ROAR_Jetson.ar_marker_localization import Localization as ARMarkerLocalization
class Vehicle:
def __init__(self):
self.parts = []
self.on = True
self.threads = []
self.logger = logging.getLogger("Jetson Vehicle")
self.front_rgb_img: Optional[np.ndarray] = None
self.front_depth_img: Optional[np.ndarray] = None
def add(self, part, inputs: Optional[List] = None, outputs: Optional[List] = None,
threaded=False):
"""
Method to add a part to the vehicle drive loop.
Args:
part: Generic object that has a run_threaded and update method
inputs: input to the parts
outputs: Channel names to save to memory.
threaded: If a part should be run in a separate thread.
Returns:
None
"""
if outputs is None:
outputs = []
if inputs is None:
inputs = []
assert type(inputs) is list, "inputs is not a list: %r" % inputs
assert type(outputs) is list, "outputs is not a list: %r" % outputs
assert type(threaded) is bool, "threaded is not a boolean: %r" % threaded
p = part
self.logger.info('Adding part {}.'.format(p.__class__.__name__))
entry = dict()
entry['part'] = p
entry['inputs'] = inputs
entry['outputs'] = outputs
if threaded:
t = Thread(target=part.update, args=())
t.daemon = True
entry['thread'] = t
self.parts.append(entry)
self.logger.debug(f"{p.__class__.__name__} successfully added")
def remove(self, part):
"""
remove part form list
"""
self.parts.remove(part)
def start(self, rate_hz=10):
"""
Start vehicle's main drive loop.
This is the main thread of the vehicle. It starts all the new
threads for the threaded parts then starts an infinite loop
that runs each part.
Notes: This function is NOT used during integration with ROAR_Sim
Args:
rate_hz : int, The max frequency that the drive loop should run. The actual
frequency may be less than this if there are many blocking parts.
"""
try:
for entry in self.parts:
if entry.get('thread'):
# start the update thread
entry.get('thread').start()
# wait until the parts warm up.
self.logger.info('Starting vehicle...')
while True:
start_time = time.time()
self.update_parts()
sleep_time = 1.0 / rate_hz - (time.time() - start_time)
if sleep_time > 0.0:
time.sleep(sleep_time)
else:
pass
except KeyboardInterrupt:
self.stop()
except Exception as e:
self.logger.error(f"Something bad happened: [{e}]")
finally:
self.stop()
def update_parts(self, new_throttle: float = 0, new_steering: float = 0):
"""
Fail-safe method for loop over all parts and call update on each one
Notes: This function IS USED during integration with ROAR Sim
Args:
new_throttle: new throttle value, gaurenteed between -1 and 1
new_steering: new steering value, gaurenteed between -1 and 1
Returns:
None
"""
for entry in self.parts:
try:
p = entry["part"]
if entry.get('thread') and isinstance(p, ArduinoCommandSender):
# send the throttle and steering to Arduino
p.run_threaded(throttle=new_throttle, steering=new_steering)
elif entry.get('thread') and isinstance(p, RS_D435i):
self.front_rgb_img, self.front_depth_img = p.run_threaded()
elif isinstance(p, Receiver):
p.run_threaded()
elif isinstance(p, ARMarkerLocalization):
p.run_threaded()
else:
self.logger.warning(f"Unknown part [{p}]")
except KeyboardInterrupt as e:
exit(0) # this is a hack for existing the program. DON"T CHANGE!
except Exception as e:
self.logger.error(f"Something bad happened during update for part [{entry}]: {e}")
def stop(self):
"""
Stop all parts attached to the Jetson vehicle
Returns:
None
"""
self.logger.info('Shutting down vehicle and its parts...')
for entry in self.parts:
try:
entry['part'].shutdown()
except AttributeError:
# usually from missing shutdown method, which should be optional
pass
except Exception as e:
self.logger.error(e)