Skip to content

Commit

Permalink
Fix inverted ID comparison
Browse files Browse the repository at this point in the history
  • Loading branch information
andchiind committed Jan 23, 2025
1 parent 249593f commit 730167e
Show file tree
Hide file tree
Showing 7 changed files with 392 additions and 236 deletions.
266 changes: 46 additions & 220 deletions src/isar/robot/robot.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,14 @@
import logging
import time
import queue
from threading import Event
from injector import inject
from typing import Optional, Union, Any
from isar.config.settings import settings
from typing import Optional
from isar.models.communication.queues.queues import Queues
from isar.models.communication.queues.status_queue import StatusQueue
from isar.state_machine.states_enum import States
from robot_interface.models.mission.status import RobotStatus
from isar.services.utilities.threaded_request import (
ThreadedRequest,
)
from robot_interface.models.exceptions.robot_exceptions import (
ErrorMessage,
RobotCommunicationException,
RobotCommunicationTimeoutException,
RobotException,
RobotTaskStatusException,
)
from robot_interface.models.mission.status import TaskStatus
from isar.models.communication.queues.queue_io import QueueIO
from isar.robot.robot_queue_utils import check_for_event
from isar.robot.robot_start_mission_or_task import RobotStartMissionOrTaskThread
from isar.robot.robot_status import RobotStatusThread
from isar.robot.robot_task_status import RobotTaskStatusThread
from robot_interface.robot_interface import RobotInterface

from robot_interface.models.mission.mission import Mission
from robot_interface.models.mission.status import RobotStatus, TaskStatus
from robot_interface.models.mission.task import Task
from robot_interface.models.exceptions.robot_exceptions import (
ErrorMessage,
RobotException,
RobotInfeasibleTaskException,
RobotInfeasibleMissionException,
)


class Robot(object):

Expand All @@ -40,187 +17,26 @@ def __init__(self, queues: Queues, robot: RobotInterface):
self.logger = logging.getLogger("robot")
self.queues: Queues = queues
self.robot: RobotInterface = robot
self.start_mission_thread: Optional[ThreadedRequest] = None
self.robot_status_thread: Optional[ThreadedRequest] = None
self.robot_task_status_thread: Optional[ThreadedRequest] = None
self.last_robot_status_poll_time: float = time.time()
self.current_status = None
self.request_status_failure_counter: int = 0
self.request_status_failure_counter_limit: int = (
settings.REQUEST_STATUS_FAILURE_COUNTER_LIMIT
)

def _trigger_event(self, queueio: QueueIO, data: any = None) -> Any:
queueio.input.put(data if data != None else True)

def _check_shared_state(self, queueio: StatusQueue):
try:
return queueio.check()
except queue.Empty:
return None

def _check_for_event(self, queueio: QueueIO) -> Any:
try:
return queueio.input.get(block=False)
except queue.Empty:
return None

def run_robot_status_thread(self):
while True:
robot_status = self.robot.robot_status()
if (
robot_status != self.current_status
and robot_status == RobotStatus.Offline
):
self._trigger_event(self.queues.robot_offline)
if (
robot_status != self.current_status
and robot_status != RobotStatus.Offline
):
self._trigger_event(self.queues.robot_online)
self.current_status = robot_status
time.sleep(settings.FSM_SLEEP_TIME)

def run_robot_task_status_thread(self):
while True:
current_state: Optional[States] = self._check_shared_state(self.queues.state)

if not current_state in [States.Monitor, States.Paused, States.Stop]:
# Here we exit the thread as the mission is done
break

current_task: Optional[Task] = self._check_shared_state(
self.queues.state_machine_current_task
)

failed_task_error: Optional[ErrorMessage] = None

if current_task == None:
continue

task_status: TaskStatus
try:
task_status = self.robot.task_status(current_task.id)
self.request_status_failure_counter = 0
except (
RobotCommunicationTimeoutException,
RobotCommunicationException,
) as e:
self.request_status_failure_counter += 1

if (
self.request_status_failure_counter
>= self.request_status_failure_counter_limit
):
self.logger.error(
f"Failed to get task status "
f"{self.request_status_failure_counter} times because: "
f"{e.error_description}"
)
self.logger.error(
f"Monitoring task failed because: {e.error_description}"
)
failed_task_error = ErrorMessage(
error_reason=e.error_reason,
error_description=e.error_description,
)

else:
time.sleep(settings.FSM_SLEEP_TIME)
continue

except RobotTaskStatusException as e:
failed_task_error = ErrorMessage(
error_reason=e.error_reason,
error_description=e.error_description,
)

except RobotException as e:
failed_task_error = ErrorMessage(
error_reason=e.error_reason,
error_description=e.error_description,
)

if failed_task_error != None:
self._trigger_event(
self.queues.robot_task_status_failed,
failed_task_error,
)
task_status = TaskStatus.Failed
else:
self._trigger_event(self.queues.robot_task_status, task_status)
self.current_status = task_status
time.sleep(settings.FSM_SLEEP_TIME)
self.start_mission_thread: Optional[RobotStartMissionOrTaskThread] = None
self.robot_status_thread: Optional[RobotStatusThread] = None
self.robot_task_status_thread: Optional[RobotTaskStatusThread] = None
self.signal_thread_quitting: Event = Event()

def stop(self) -> None:
if self.robot_status_thread:
self.robot_status_thread.wait_for_thread()
if self.robot_task_status_thread:
self.robot_task_status_thread.wait_for_thread()
if self.start_mission_thread:
self.start_mission_thread.wait_for_thread()
self.signal_thread_quitting.set()
if self.robot_status_thread != None and self.robot_status_thread.is_alive():
self.robot_status_thread.join()
if (
self.robot_task_status_thread != None
and self.robot_status_thread.is_alive()
):
self.robot_task_status_thread.join()
if self.start_mission_thread != None and self.robot_status_thread.is_alive():
self.start_mission_thread.join()
self.robot_status_thread = None
self.robot_task_status_thread = None
self.start_mission_thread = None

def run_start_mission_thread(self, mission_or_task: Mission | Task):
retries = 0
started_mission = False
try:
while not started_mission:
try:
if settings.RUN_MISSION_BY_TASK:
self.robot.initiate_task(
mission_or_task
)
else:
self.robot.initiate_mission(mission_or_task)
except RobotException as e:
retries += 1
self.logger.warning(
f"Initiating failed #: {str(retries)} "
f"because: {e.error_description}"
)

if retries >= settings.INITIATE_FAILURE_COUNTER_LIMIT:
error_description = (
f"Mission will be cancelled after failing to initiate "
f"{settings.INITIATE_FAILURE_COUNTER_LIMIT} times because: "
f"{e.error_description}"
)

self._trigger_event(
self.queues.robot_mission_failed,
ErrorMessage(
error_reason=e.error_reason,
error_description=error_description,
),
)
started_mission = True
except RobotInfeasibleTaskException as e:
error_description = (
f"Failed to initiate task "
f"{str(mission_or_task.id)[:8]} after retrying "
f"{retries} times because: "
f"{e.error_description}"
)
self._trigger_event(
self.queues.robot_task_status_failed,
ErrorMessage(
error_reason=e.error_reason, error_description=error_description
),
)
except RobotInfeasibleMissionException as e:
self._trigger_event(
self.queues.robot_mission_failed,
ErrorMessage(
error_reason=e.error_reason, error_description=e.error_description
),
)
self.robot_task_status_thread = ThreadedRequest(
request_func=self.run_robot_task_status_thread
)
self.robot_task_status_thread.start_thread(name="Robot task status thread")
self.signal_thread_suicide = False

def handle_stop_mission(self):
return NotImplementedError()
Expand All @@ -229,33 +45,43 @@ def handle_pause_mission(self):
return NotImplementedError()

def run(self) -> None:
self.robot_status_thread = ThreadedRequest(
request_func=self.run_robot_status_thread
self.robot_status_thread = RobotStatusThread(
self.queues, self.robot, self.signal_thread_quitting
)
self.robot_status_thread.start_thread(name="Robot status thread")
self.robot_status_thread.start()

self.robot_task_status_thread = RobotTaskStatusThread(
self.queues, self.robot, self.signal_thread_quitting
)
self.robot_task_status_thread.start()

while True:
start_mission_or_task = self._check_for_event(
if self.signal_thread_quitting.is_set():
break
start_mission_or_task = check_for_event(
self.queues.state_machine_start_mission
)
if start_mission_or_task != None:
if (
self.start_mission_thread != None
and self.start_mission_thread._is_thread_alive()
and self.start_mission_thread.is_alive()
):
self.start_mission_thread.wait_for_thread()
self.start_mission_thread = ThreadedRequest(
request_func=self.run_start_mission_thread,
)
self.start_mission_thread.start_thread(
start_mission_or_task, name="Robot start mission thread"
self.start_mission_thread.join()
self.start_mission_thread = RobotStartMissionOrTaskThread(
self.queues,
self.robot,
self.signal_thread_quitting,
start_mission_or_task,
)
self.start_mission_thread.start()
# TODO: consider what happens when we get another start_mission while the start_mission_thread is running.
if self._check_for_event(self.queues.state_machine_stop_mission):
if check_for_event(self.queues.state_machine_stop_mission):
if (
self.start_mission_thread != None
and self.start_mission_thread._is_thread_alive()
and self.start_mission_thread.is_alive()
):
self.start_mission_thread.wait_for_thread()
self.start_mission_thread.join()
self.handle_stop_mission()
if self._check_for_event(self.queues.state_machine_pause_mission):
if check_for_event(self.queues.state_machine_pause_mission):
self.handle_pause_mission()
self.logger.info("Exiting robot service main thread")
22 changes: 22 additions & 0 deletions src/isar/robot/robot_queue_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import queue
from typing import Any
from isar.models.communication.queues.status_queue import StatusQueue
from isar.models.communication.queues.queue_io import QueueIO


def trigger_event(queueio: QueueIO, data: any = None) -> Any:
queueio.input.put(data if data != None else True)


def check_shared_state(queueio: StatusQueue):
try:
return queueio.check()
except queue.Empty:
return None


def check_for_event(queueio: QueueIO) -> Any:
try:
return queueio.input.get(block=False)
except queue.Empty:
return None
Loading

0 comments on commit 730167e

Please sign in to comment.