Skip to content

Commit

Permalink
Merge pull request #104 from anenriquez/refactor/recovery-methods
Browse files Browse the repository at this point in the history
Refactor recovery methods
  • Loading branch information
anenriquez authored Apr 9, 2020
2 parents f72e6a2 + 565d975 commit 650419d
Show file tree
Hide file tree
Showing 19 changed files with 290 additions and 289 deletions.
31 changes: 16 additions & 15 deletions mrs/allocation/auctioneer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, timetable_manager, closure_window=5, **kwargs):
self.tasks_to_allocate = dict()
self.allocated_tasks = dict()
self.allocations = list()
self.rounds = list()
self.allocation_times = list()
self.winning_bid = None
self.changed_timetable = list()
self.waiting_for_user_confirmation = list()
Expand Down Expand Up @@ -104,28 +104,29 @@ def process_alternative_timeslot(self, exception):
self.send_task_contract(bid.task_id, bid.robot_id)

def process_allocation(self):
task = self.tasks_to_allocate.pop(self.winning_bid.task_id)
try:
task = self.tasks_to_allocate.pop(self.winning_bid.task_id)
self.allocated_tasks[task.task_id] = task

self.timetable_manager.update_timetable(self.winning_bid.robot_id,
self.winning_bid.get_allocation_info(),
task)

allocation = (self.winning_bid.task_id, [self.winning_bid.robot_id])
self.logger.debug("Allocation: %s", allocation)
self.logger.debug("Tasks to allocate %s", [task_id for task_id, task in self.tasks_to_allocate.items()])

self.logger.debug("Updating task status to ALLOCATED")

self.allocations.append(allocation)
self.rounds.append(self.round)
self.finish_round()

except InvalidAllocation as e:
self.logger.warning("The allocation of task %s to robot %s is inconsistent. Aborting allocation."
"Task %s will be included in next allocation round", e.task_id, e.robot_id, e.task_id)
self.undo_allocation(self.winning_bid.get_allocation_info())
self.tasks_to_allocate[task.task_id] = task
return

self.allocated_tasks[task.task_id] = task

allocation = (self.winning_bid.task_id, [self.winning_bid.robot_id])
self.logger.debug("Allocation: %s", allocation)
self.logger.debug("Tasks to allocate %s", [task_id for task_id, task in self.tasks_to_allocate.items()])

self.logger.debug("Updating task status to ALLOCATED")

self.allocations.append(allocation)
self.allocation_times.append(self.round.time_to_allocate)
self.finish_round()

def undo_allocation(self, allocation_info):
self.logger.warning("Undoing allocation of round %s", self.round.id)
Expand Down
2 changes: 2 additions & 0 deletions mrs/allocation/bidder.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ def insert_in(self, insertion_point):
return True
except TaskNotFound as e:
return True
except DoesNotExist:
return False

def get_previous_location(self, insertion_point):
if insertion_point == 1:
Expand Down
15 changes: 9 additions & 6 deletions mrs/ccu.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import argparse
import logging.config
import time

from fmlib.models.tasks import TransportationTask as Task
from fmlib.models.actions import GoTo
from fmlib.models.tasks import TaskPlan
from fmlib.models.tasks import TransportationTask as Task
from ropod.structs.status import TaskStatus as TaskStatusConst

from mrs.allocation.auctioneer import Auctioneer
from mrs.config.configurator import Configurator
from mrs.config.params import get_config_params
from fmlib.models.actions import GoTo
from mrs.db.models.performance.robot import RobotPerformance
from mrs.db.models.performance.task import TaskPerformance
from mrs.execution.delay_recovery import DelayRecovery
Expand Down Expand Up @@ -76,6 +77,7 @@ def add_task_plan(self, task):
task.update_duration(mean, variance)

action = GoTo.create_new(type="PICKUP-TO-DELIVERY", locations=path)
action.update_duration(mean, variance)
task_plan = TaskPlan(actions=[action])
task.update_plan(task_plan)
self.logger.debug('Task plan of task %s updated', task.task_id)
Expand All @@ -94,16 +96,16 @@ def process_allocation(self):
task_schedule = self.auctioneer.get_task_schedule(task_id, robot_ids[0])
task.update_schedule(task_schedule)

allocation_round = self.auctioneer.rounds.pop(0)
self.update_allocation_metrics(allocation_round)
allocation_time = self.auctioneer.allocation_times.pop(0)
self.update_allocation_metrics(allocation_time)

for robot_id in robot_ids:
self.dispatcher.send_d_graph_update(robot_id)

def update_allocation_metrics(self, allocation_round):
def update_allocation_metrics(self, allocation_time):
allocation_info = self.auctioneer.winning_bid.get_allocation_info()
task = Task.get_task(allocation_info.new_task.task_id)
self.performance_tracker.update_allocation_metrics(task, allocation_round)
self.performance_tracker.update_allocation_metrics(task, allocation_time)
if allocation_info.next_task:
task = Task.get_task(allocation_info.next_task.task_id)
self.performance_tracker.update_allocation_metrics(task, only_constraints=True)
Expand All @@ -118,6 +120,7 @@ def run(self):
self.process_allocation()
self.performance_tracker.run()
self.api.run()
time.sleep(0.5)
except (KeyboardInterrupt, SystemExit):
self.api.shutdown()
self.simulator_interface.stop()
Expand Down
7 changes: 4 additions & 3 deletions mrs/config/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,20 @@ def get_stp_solver(self):
raise ValueError(allocation_method)
return STP(solver_name)

def configure_component(self, component_name, config):
def configure_component(self, component_name, config, **kwargs):
self.logger.debug("Creating %s", component_name)
component = self._component_modules.get(component_name)

if component and isinstance(config, dict):
self.register_component(component_name, component)
_component = component(**config, **self._components)
_component = component(**config, **self._components, **kwargs)
return _component

def __call__(self, **kwargs):
d_graph_watchdog = kwargs.get("d_graph_watchdog", False)
for component_name in self.config_order:
if component_name in self._component_modules:
component_config = kwargs.get(component_name, dict())
component = self.configure_component(component_name, component_config)
component = self.configure_component(component_name, component_config, d_graph_watchdog=d_graph_watchdog)
self._components[component_name] = component
return self._components
23 changes: 8 additions & 15 deletions mrs/config/default/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ fleet:
- robot_005

allocation_method: tessi-srea
approach: tessi-srea-corrective-re-allocate

planner:
map_name: brsu
Expand All @@ -33,6 +32,8 @@ delay_recovery:
type_: corrective
method: re-allocate

d_graph_watchdog: False

auctioneer:
closure_window: 1 # minutes
alternative_timeslots: False
Expand Down Expand Up @@ -62,15 +63,13 @@ ccu_api:
interface: null
groups:
- TASK-ALLOCATION
- ROPOD
message_types: # Types of messages the node will listen to. Messages not listed will be ignored
- START-TEST
- ROBOT-POSE
- BID
- NO-BID
- TASK-CONTRACT-ACKNOWLEDGEMENT
- TASK-STATUS
- RECOVER-TASK
acknowledge: false
debug_messages:
- 'TASK-REQUEST'
Expand All @@ -95,8 +94,8 @@ ccu_api:
msg_type: 'D-GRAPH-UPDATE'
groups: ['TASK-ALLOCATION']
method: whisper
remove-task:
msg_type: 'REMOVE-TASK'
remove-task-from-schedule:
msg_type: 'REMOVE-TASK-FROM-SCHEDULE'
groups: ['TASK-ALLOCATION']
method: whisper
callbacks:
Expand All @@ -110,8 +109,6 @@ ccu_api:
component: 'auctioneer.task_contract_acknowledgement_cb'
- msg_type: 'TASK-STATUS'
component: 'timetable_monitor.task_status_cb'
- msg_type: 'RECOVER-TASK'
component: 'timetable_monitor.recover_task_cb'
- msg_type: 'ROBOT-POSE'
component: 'fleet_monitor.robot_pose_cb'

Expand All @@ -125,16 +122,14 @@ robot_proxy_api:
interface: null
groups:
- TASK-ALLOCATION
- ROPOD
message_types: # Types of messages the node will listen to. Messages not listed will be ignored
- TASK
- TASK-ANNOUNCEMENT
- TASK-CONTRACT
- TASK-CONTRACT-CANCELLATION
- ROBOT-POSE
- TASK-STATUS
- RECOVER-TASK
- REMOVE-TASK
- REMOVE-TASK-FROM-SCHEDULE
debug_msgs: false
acknowledge: false
publish:
Expand Down Expand Up @@ -163,14 +158,12 @@ robot_proxy_api:
component: 'bidder.task_contract_cancellation_cb'
- msg_type: 'ROBOT-POSE'
component: '.robot_pose_cb'
- msg_type: 'REMOVE-TASK'
- msg_type: 'REMOVE-TASK-FROM-SCHEDULE'
component: '.remove_task_cb'
- msg_type: 'TASK'
component: '.task_cb'
- msg_type: 'TASK-STATUS'
component: '.task_status_cb'
- msg_type: 'RECOVER-TASK'
component: '.recover_task_cb'


robot_api:
Expand All @@ -194,9 +187,9 @@ robot_api:
task:
msg_type: 'TASK'
method: whisper
recover-task:
task-status:
groups: ['TASK-ALLOCATION']
msg_type: 'RECOVER-TASK'
msg_type: 'TASK-STATUS'
method: shout
callbacks:
- msg_type: 'D-GRAPH-UPDATE'
Expand Down
96 changes: 66 additions & 30 deletions mrs/execution/delay_recovery.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
from ropod.structs.status import ActionStatus as ActionStatusConst


class RecoveryMethod:

options = ["re-allocate", "re-schedule-re-allocate", "re-schedule-abort", "abort"]
options = ["re-allocate", "abort"]

def __init__(self, name):
self.logger = logging.getLogger('mrs.recovery.method')
Expand All @@ -15,57 +16,85 @@ def validate_name(self, name):
raise ValueError(name)
return name

def recover(self, timetable, task, is_consistent):
def recover(self, timetable, task, task_progress, r_assigned_time, is_consistent):
next_task = timetable.get_next_task(task)

if next_task and (self.name == "re-allocate" and timetable.is_next_task_late(task, next_task)) \
or self.name.startswith("re-schedule") \
or next_task and self.name == "abort" and timetable.is_next_task_late(task, next_task):
if next_task and self.is_next_task_late(timetable, task, next_task, task_progress, r_assigned_time):
return next_task

def is_next_task_late(self, timetable, task, next_task, task_progress, r_assigned_time):
self.logger.debug("Checking if task %s is at risk", next_task.task_id)
mean = 0
variance = 0

action_idx = None
for i, action in enumerate(task.plan[0].actions):
if action.action_id == task_progress.action_id:
action_idx = i

if task_progress.action_status.status == ActionStatusConst.COMPLETED:
# The remaining actions do not include the current action
try:
remaining_actions = task.plan[0].actions[action_idx + 1:]
except IndentationError:
# No remaining actions left
remaining_actions = []
else:
# The remaining actions include the current action
remaining_actions = task.plan[0].actions[action_idx:]

print("Remaining actions")
for action in remaining_actions:
if action.duration:
mean += action.duration.mean
variance += action.duration.variance

estimated_duration = mean + 2 * round(variance ** 0.5, 3)
self.logger.debug("Remaining estimated task duration: %s ", estimated_duration)

node_id, node = timetable.dispatchable_graph.get_node_by_type(next_task.task_id, 'start')
latest_start_time_next_task = timetable.dispatchable_graph.get_node_latest_time(node_id)
self.logger.debug("Latest permitted start time of next task: %s ", latest_start_time_next_task)

estimated_start_time_of_next_task = r_assigned_time + estimated_duration
self.logger.debug("Estimated start time of next task: %s ", estimated_start_time_of_next_task)

if estimated_start_time_of_next_task > latest_start_time_next_task:
self.logger.warning("Task %s is at risk", next_task.task_id)
return True
return False
else:
self.logger.debug("Task %s is not at risk", next_task.task_id)
return False


class Corrective(RecoveryMethod):

""" Maps allocation methods with their available corrective measures """

reactions = {'tessi': ["re-allocate", "abort"],
'tessi-srea': ["re-allocate", "abort"],
'tessi-dsc': ["re-allocate", "abort"],
}

def __init__(self, name, allocation_method):
def __init__(self, name):
super().__init__(name)
if self.name not in self.reactions.get(allocation_method):
raise ValueError(name)

def recover(self, timetable, task, is_consistent):
def recover(self, timetable, task, task_progress, r_assigned_time, is_consistent):
""" React only if the last assignment was inconsistent
"""
if is_consistent:
return False
elif not is_consistent and super().recover(timetable, task, is_consistent):
return True
return None
elif not is_consistent:
return super().recover(timetable, task, task_progress, r_assigned_time, is_consistent)


class Preventive(RecoveryMethod):

""" Maps allocation methods with their available preventive measures """

reactions = {'tessi': ["re-allocate", "abort"],
'tessi-srea': ["re-allocate", "re-schedule-re-allocate", "re-schedule-abort", "abort"],
'tessi-dsc': ["re-allocate", "abort"],
}

def __init__(self, name, allocation_method):
def __init__(self, name):
super().__init__(name)
if self.name not in self.reactions.get(allocation_method):
raise ValueError(name)

def recover(self, timetable, task, is_consistent):
def recover(self, timetable, task, task_progress, r_assigned_time, is_consistent):
""" React both, when the last assignment was consistent and when it was inconsistent
"""
return super().recover(timetable, task, is_consistent)

return super().recover(timetable, task, task_progress, r_assigned_time, is_consistent)


class RecoveryMethodFactory:
Expand All @@ -89,6 +118,13 @@ def get_recovery_method(self, recovery_type):


class DelayRecovery:
def __init__(self, allocation_method, type_, method, **kwargs):
def __init__(self, type_, method, **kwargs):
cls_ = recovery_method_factory.get_recovery_method(type_)
self.method = cls_(method, allocation_method)
self.method = cls_(method)

@property
def name(self):
return self.method.name

def recover(self, timetable, task, task_progress, r_assigned_time, is_consistent):
return self.method.recover(timetable, task, task_progress, r_assigned_time, is_consistent)
Loading

0 comments on commit 650419d

Please sign in to comment.