diff --git a/fabric_cf/actor/core/kernel/kernel.py b/fabric_cf/actor/core/kernel/kernel.py index d853cafe..81b67f79 100644 --- a/fabric_cf/actor/core/kernel/kernel.py +++ b/fabric_cf/actor/core/kernel/kernel.py @@ -341,7 +341,8 @@ def modify_lease(self, *, reservation: ABCReservationMixin): finally: reservation.unlock() - def extend_reservation(self, *, rid: ID, resources: ResourceSet, term: Term, dependencies: List[ABCReservationMixin]) -> int: + def extend_reservation(self, *, rid: ID, resources: ResourceSet, term: Term, + dependencies: List[ABCReservationMixin]) -> int: """ Extends the reservation with the given resources and term. @param rid reservation identifier of reservation to extend @@ -365,7 +366,9 @@ def extend_reservation(self, *, rid: ID, resources: ResourceSet, term: Term, dep raise KernelException(Constants.PENDING_OPERATION_ERROR) if isinstance(real, ReservationClient) and dependencies is not None: - real.redeem_predecessors.clear() + # Modify case - clear old dependencies + if resources.get_sliver() is not None: + real.redeem_predecessors.clear() for d in dependencies: real.add_redeem_predecessor(reservation=d) diff --git a/fabric_cf/actor/core/kernel/reservation_client.py b/fabric_cf/actor/core/kernel/reservation_client.py index 01360a32..cfac6a81 100644 --- a/fabric_cf/actor/core/kernel/reservation_client.py +++ b/fabric_cf/actor/core/kernel/reservation_client.py @@ -511,7 +511,7 @@ def prepare_ticket(self, extend: bool = False): parent_res = pred_state.get_reservation() if Constants.PEERED in value1: - self.logger.debug(f"KOMAL --- Node MAP:{ifs.get_node_map()} Result: {result}") + self.logger.debug(f"Node MAP:{ifs.get_node_map()} Result: {result}") if parent_res is not None: ns_sliver = parent_res.get_resources().get_sliver() # component_name contains => Peered:: @@ -553,6 +553,56 @@ def prepare_ticket(self, extend: bool = False): self.logger.trace(f"Updated Network Res# {self.get_reservation_id()} {sliver}") + def approve_extend_ticket(self) -> Tuple[bool, bool]: + """ + ExtendTicket predicate: invoked internally to determine if the reservation + should be extended. This gives subclasses an opportunity sequence actions at the orchestrator side. + + If false, the reservation enters a "BlockedTicket" sub-state until a subsequent approve_ticket returns true. + When true, the reservation can manipulate the current reservation's attributes to + facilitate ticketing from the broker. Note that approve_ticket may be polled multiple + times, and should be idempotent. + + @return tuple (true if approved; false otherwise, list of failed predecessors in case of false) + """ + approved = True + rollback = False + failed_preds = [] + for pred_state in self.redeem_predecessors.values(): + pred_reservation = pred_state.get_reservation() + if pred_reservation is None: + self.logger.error(f"redeem predecessor reservation is null, ignoring it: {pred_reservation}") + continue + + pred_reservation_term = pred_reservation.get_term() + if pred_reservation_term is None: + self.logger.error(f"redeem predecessor reservation term is null, ignoring it: {pred_reservation}") + continue + + if pred_reservation.is_failed() or pred_reservation.is_closed() or pred_reservation.is_closing(): + msg = f"redeem predecessor reservation# {pred_reservation.get_reservation_id()}"\ + f" is in a terminal state, failing the reservation# {self.get_reservation_id()}" + self.logger.error(msg) + + # In case of modify, roll back to the previous state and do not fail the reservation + if self.is_active() or self.is_active_joined() or self.is_active_ticketed(): + failed_preds.append(str(pred_reservation.get_reservation_id())) + approved = False + break + else: + self.fail(message=msg) + + if not (pred_reservation.is_ticketed() or pred_reservation.is_active()) or \ + pred_reservation.is_extending_ticket(): + approved = False + break + + if self.get_approved_term().ends_after(date=pred_reservation_term.get_end_time()): + rollback = True + break + + return approved, rollback + def approve_ticket(self, extend: bool = False) -> Tuple[bool, List[str]]: """ Ticket predicate: invoked internally to determine if the reservation @@ -594,6 +644,17 @@ def approve_ticket(self, extend: bool = False) -> Tuple[bool, List[str]]: return approved, failed_preds + def can_extend(self) -> bool: + ret_val = False + if self.get_type() is not None: + resource_type_str = str(self.get_type()) + if resource_type_str in Constants.SUPPORTED_SERVICES_STR: + ret_val, rollback = self.approve_extend_ticket() + else: + ret_val = True + + return ret_val + def can_ticket(self, extend: bool = False) -> bool: ret_val = False if self.get_type() is not None: @@ -754,12 +815,19 @@ def extend_ticket(self, *, actor: ABCActorMixin): self.error(err="Wrong state to initiate extend ticket: {}".format(ReservationStates(self.state).name)) # Extend Ticket is invoked by Probe; Check dependencies only in case of modify + if self.requested_resources.sliver is not None: + if not self.can_ticket(extend=True): + self.transition_with_join(prefix="Extend ticket blocked", state=self.state, + pending=self.pending_state, join_state=JoinState.BlockedExtendTicket) + self.logger.info("Reservation has to wait for the dependencies to be extended!") + return # No new sliver is passed for renew and does not require dependency check - if self.requested_resources.sliver is not None and not self.can_ticket(extend=True): - self.transition_with_join(prefix="Extend ticket blocked", state=self.state, - pending=self.pending_state, join_state=JoinState.BlockedExtendTicket) - self.logger.info("Reservation has to wait for the dependencies to be extended!") - return + else: + if not self.can_extend(): + self.transition_with_join(prefix="Extend ticket blocked", state=self.state, + pending=self.pending_state, join_state=JoinState.BlockedExtendTicket) + self.logger.info("Reservation has to wait for the dependencies to be extended!") + return self.sequence_ticket_out += 1 RPCManagerSingleton.get().extend_ticket(reservation=self) @@ -951,18 +1019,6 @@ def prepare_probe(self): def prepare_redeem(self): assert self.resources is not None assert self.resources.sliver is not None - sliver = self.resources.sliver - - ''' - self.logger.info(f"Redeem prepared for Sliver: {sliver}") - if isinstance(sliver, NetworkServiceSliver) and sliver.interface_info is not None: - for ifs in sliver.interface_info.interfaces.values(): - self.logger.info(f"Interface Sliver: {ifs}") - - if isinstance(sliver, NodeSliver) and sliver.attached_components_info is not None: - for c in sliver.attached_components_info.devices.values(): - self.logger.info(f"Component: {c}") - ''' def probe_join_state(self): """ @@ -996,17 +1052,27 @@ def probe_join_state(self): # this is existing reservation, and the extend ticket is # blocked for a predecessor: see if we can get it going now. assert self.state == ReservationStates.Active + rollback = False + failed_preds = [] + + if self.requested_resources.sliver is not None: + status, failed_preds = self.approve_ticket(extend=True) + else: + status, rollback = self.approve_extend_ticket() - status, failed_preds = self.approve_ticket(extend=True) if status: - self.transition_with_join(prefix="unblock ticket", state=self.state, - pending=self.pending_state, join_state=JoinState.NoJoin) + if not rollback: + self.transition_with_join(prefix="unblock ticket", state=self.state, + pending=self.pending_state, join_state=JoinState.NoJoin) - # This is a regular request for modifying network resources to an upstream broker. - self.sequence_ticket_out += 1 - self.logger.debug(f"Issuing an extend ticket " - f"{sliver_to_str(sliver=self.get_requested_resources().get_sliver())}") - RPCManagerSingleton.get().extend_ticket(reservation=self) + # This is a regular request for modifying network resources to an upstream broker. + self.sequence_ticket_out += 1 + self.logger.debug(f"Issuing an extend ticket " + f"{sliver_to_str(sliver=self.get_requested_resources().get_sliver())}") + RPCManagerSingleton.get().extend_ticket(reservation=self) + else: + self.transition_with_join(prefix="failed ticket update", state=self.state, + pending=ReservationPendingStates.None_, join_state=JoinState.NoJoin) # Update ASM with Reservation Info self.update_slice_graph(sliver=self.resources.sliver) @@ -1555,7 +1621,7 @@ def validate_redeem(self): if self.term is None: self.error(err=Constants.NOT_SPECIFIED_PREFIX.format("term")) - def add_redeem_predecessor(self, *, reservation: ABCReservationMixin, filters: dict = None): + def add_redeem_predecessor(self, *, reservation: ABCReservationMixin, filters: dict = None, extend: bool = False): if reservation.get_reservation_id() not in self.redeem_predecessors: state = PredecessorState(reservation=reservation, filters=filters) self.redeem_predecessors[reservation.get_reservation_id()] = state @@ -1565,7 +1631,7 @@ def remove_redeem_predecessor(self, *, rid: ID): self.redeem_predecessors.pop(rid) def add_join_predecessor(self, *, predecessor: ABCReservationMixin, filters: dict = None): - if predecessor.get_reservation_id() not in self.redeem_predecessors: + if predecessor.get_reservation_id() not in self.join_predecessors: state = PredecessorState(reservation=predecessor, filters=filters) self.join_predecessors[predecessor.get_reservation_id()] = state diff --git a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py index e157a055..523831c3 100644 --- a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py +++ b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py @@ -1221,12 +1221,13 @@ def issue_ticket(self, *, reservation: ABCBrokerReservation, units: int, rtype: return reservation def release(self, *, reservation): - duration = reservation.get_term().get_remaining_length() - if duration > 0: - from fabric_cf.actor.core.container.globals import GlobalsSingleton - if GlobalsSingleton.get().get_quota_mgr(): - GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation, - duration=duration) + if reservation.get_term(): + duration = reservation.get_term().get_remaining_length() + if duration > 0: + from fabric_cf.actor.core.container.globals import GlobalsSingleton + if GlobalsSingleton.get().get_quota_mgr(): + GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation, + duration=duration) if isinstance(reservation, ABCBrokerReservation): self.logger.debug("Broker reservation") diff --git a/fabric_cf/actor/core/util/quota_mgr.py b/fabric_cf/actor/core/util/quota_mgr.py index da2982d4..75cf5c3b 100644 --- a/fabric_cf/actor/core/util/quota_mgr.py +++ b/fabric_cf/actor/core/util/quota_mgr.py @@ -24,12 +24,14 @@ # # Author: Komal Thareja (kthare10@renci.org) import logging +import re import threading from typing import Any from fabrictestbed.external_api.core_api import CoreApi from fabrictestbed.slice_editor import InstanceCatalog from fim.slivers.network_node import NodeSliver +from fim.user import NodeType from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin from fabric_cf.actor.core.policy.inventory_for_type import InventoryForType @@ -63,7 +65,7 @@ def list_quotas(self, project_uuid: str, offset: int = 0, limit: int = 200) -> d quota_list = self.core_api.list_quotas(project_uuid=project_uuid, offset=offset, limit=limit) quotas = {} for q in quota_list: - quotas[(q.get("resource_type").lower(), q.get("resource_unit").lower())] = q + quotas[(q.get("resource_type").get("name").lower(), q.get("resource_unit").lower())] = q return quotas def update_quota(self, reservation: ABCReservationMixin, duration: float): @@ -116,6 +118,15 @@ def update_quota(self, reservation: ABCReservationMixin, duration: float): finally: self.logger.debug("Released lock for quota update.") + @staticmethod + def __massage_name(name: str) -> str: + """ + Massage to make it python friendly + :param name: + :return: + """ + return re.sub(r'[ -]', '_', name) + @staticmethod def extract_quota_usage(sliver: NodeSliver, duration: float) -> dict[tuple[str, str], float]: """ @@ -142,20 +153,26 @@ def extract_quota_usage(sliver: NodeSliver, duration: float) -> dict[tuple[str, else: allocations = sliver.get_capacities() - # Extract Core, Ram, Disk Hours - requested_resources[("core", unit)] = requested_resources.get(("core", unit), 0) + \ - (duration * allocations.core) - requested_resources[("ram", unit)] = requested_resources.get(("ram", unit), 0) +\ - (duration * allocations.ram) - requested_resources[("disk", unit)] = requested_resources.get(("disk", unit), 0) + \ - (duration * allocations.disk) + if allocations: + # Extract Core, Ram, Disk Hours + requested_resources[("core", unit)] = requested_resources.get(("core", unit), 0) + \ + (duration * allocations.core) + requested_resources[("ram", unit)] = requested_resources.get(("ram", unit), 0) +\ + (duration * allocations.ram) + requested_resources[("disk", unit)] = requested_resources.get(("disk", unit), 0) + \ + (duration * allocations.disk) + + if sliver.get_type() == NodeType.Switch and allocations: + requested_resources["p4", unit] = requested_resources.get(("p4", unit), 0) + \ + (duration * allocations.unit) # Extract component hours (e.g., GPU, FPGA, SmartNIC) if sliver.attached_components_info: for c in sliver.attached_components_info.devices.values(): - component_type = str(c.get_type()).lower() - requested_resources[(component_type, unit)] = ( - requested_resources.get((component_type, unit), 0) + duration + type_model_name = '_'.join([QuotaMgr.__massage_name(str(c.get_type())), + QuotaMgr.__massage_name(str(c.get_model()))]) + requested_resources[(type_model_name.lower(), unit)] = ( + requested_resources.get((type_model_name.lower(), unit), 0) + duration ) return requested_resources @@ -207,3 +224,4 @@ def enforce_quota_limits(self, reservation: ABCReservationMixin, duration: float return True, None except Exception as e: self.logger.error(f"Error while checking reservation: {str(e)}") + return False, None diff --git a/tools/db_cli.py b/tools/db_cli.py index f6a46373..eb7dd812 100644 --- a/tools/db_cli.py +++ b/tools/db_cli.py @@ -29,6 +29,7 @@ from logging.handlers import RotatingFileHandler from fim.user import ServiceType +from fim.slivers.network_service import NetworkServiceSliver from fabric_cf.actor.core.kernel.reservation_states import ReservationStates from fim.graph.neo4j_property_graph import Neo4jGraphImporter, Neo4jPropertyGraph @@ -160,8 +161,12 @@ def get_reservations(self, slice_id: str = None, res_id: str = None, email: str print(f"REQ RES Sliver: {r.get_requested_resources()} {r.get_requested_resources().get_sliver()}") print(f"APPR RES Sliver: {r.get_approved_resources()} {r.get_approved_resources().get_sliver()}") from fabric_cf.actor.core.kernel.reservation_client import ReservationClient - if isinstance(r, ReservationClient): + if isinstance(r, ReservationClient) and r.get_leased_resources(): print(r.get_leased_resources().get_sliver()) + sliver = r.get_resources().get_sliver() + if isinstance(sliver, NetworkServiceSliver): + from fabric_cf.actor.core.util.utils import sliver_to_str + print(sliver_to_str(sliver=sliver)) print() else: print(f"No reservations found: {res_list}")