-
Notifications
You must be signed in to change notification settings - Fork 6
/
autonomy.py
96 lines (71 loc) · 2.57 KB
/
autonomy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#
# Mars Rover Design Team
# run.py
#
# Created on Oct 23, 2016
# Updated on Aug 21, 2022
#
import core
import logging
import asyncio
import interfaces.nav_board
import utm
logger = logging.getLogger(__name__)
def custom_exception_handler(loop, context):
"""
Autonomy Exception Handler
:return: None
"""
print(context)
loop.stop()
def main() -> None:
"""
Main Autonomy Loop
:return: None
"""
logger.info("Entering main autonomy loop")
# Setting up the asyncio loop
loop = asyncio.get_event_loop()
loop.set_exception_handler(custom_exception_handler)
# Setup feeds for AR Tag and obstacle avoidance
core.vision.feed_handler.add_feed(2, "artag", stream_video=core.vision.STREAM_FLAG)
core.vision.feed_handler.add_feed(3, "obstacle", stream_video=core.vision.STREAM_FLAG)
# Create our two detection tasks
loop.create_task(core.vision.ar_tag_detector.async_ar_tag_detector())
# Only start obstacle detection if flag was enabled.
if core.vision.AVOIDANCE_FLAG:
loop.create_task(core.vision.obstacle_avoidance.async_obstacle_detector())
# Run core autonomy state machine loop
loop.run_until_complete(autonomy_state_loop())
async def autonomy_state_loop():
"""
Asynchronous autonomy state machine loop
:return: None
"""
while True:
# Run the current state in the state machine (and handle enable/disable)
await core.states.state_machine.run()
logger.info(f"Current State: {core.states.state_machine.state}")
# Transmit the current state to Base Station
core.rovecomm_node.write(
core.RoveCommPacket(
core.manifest["Autonomy"]["Telemetry"]["CurrentState"]["dataId"],
"B",
(core.states.state_machine.get_state_str(),),
port=core.UDP_OUTGOING_PORT,
),
False,
)
# Print debug
# relpos = interfaces.nav_board.location()
# abspos = interfaces.nav_board.location(force_absolute=True)
# print("RELPOS:", utm.from_latlon(relpos[0], relpos[1])[:2])
# print("ABSPOS:", utm.from_latlon(abspos[0], abspos[1])[:2])
# print("RELHEAD:", interfaces.nav_board.heading())
# print("ABSHEAD:", interfaces.nav_board._heading)
# Core state machine runs every X ms, to prevent unnecessarily fast computation.
# Sensor data is processed separately, as that is the bulk of processing time
await asyncio.sleep(core.EVENT_LOOP_DELAY)
if __name__ == "__main__":
# Run main()
main()