diff --git a/.gitignore b/.gitignore index ed2c6c8f..d8e639b1 100644 --- a/.gitignore +++ b/.gitignore @@ -138,10 +138,7 @@ logs *_creds *.lock *.log -neo4j1/ -neo4j2/ -neo4j3/ -neo4j4/ +neo4j*/ pdp/ pg_data/ schema/ diff --git a/Dockerfile-auth b/Dockerfile-auth index 302680c0..bd6c6596 100644 --- a/Dockerfile-auth +++ b/Dockerfile-auth @@ -1,7 +1,7 @@ FROM python:3.11.0 MAINTAINER Komal Thareja -ARG HANDLERS_VER=1.7.1 +ARG HANDLERS_VER=1.8.0 RUN mkdir -p /usr/src/app WORKDIR /usr/src/app diff --git a/fabric_cf/__init__.py b/fabric_cf/__init__.py index 1f6dc025..c3de882b 100644 --- a/fabric_cf/__init__.py +++ b/fabric_cf/__init__.py @@ -1,2 +1,2 @@ -__version__ = "1.7.0" +__version__ = "1.8.0" __VERSION__ = __version__ diff --git a/fabric_cf/actor/boot/configuration.py b/fabric_cf/actor/boot/configuration.py index 35ecd564..5171e928 100644 --- a/fabric_cf/actor/boot/configuration.py +++ b/fabric_cf/actor/boot/configuration.py @@ -45,6 +45,10 @@ def __init__(self, *, config: dict): if Constants.CONFIG_SECTION_O_AUTH in config: self.oauth = config.get(Constants.CONFIG_SECTION_O_AUTH) + self.smtp = {} + if Constants.CONFIG_SECTION_SMTP in config: + self.smtp = config.get(Constants.CONFIG_SECTION_SMTP) + self.database = {} if Constants.CONFIG_SECTION_DATABASE in config: self.database = config.get(Constants.CONFIG_SECTION_DATABASE) @@ -87,6 +91,12 @@ def get_oauth(self) -> dict: """ return self.oauth + def get_smtp(self) -> dict: + """ + Return smtp config + """ + return self.smtp + def get_database(self) -> dict: """ Return database config @@ -425,6 +435,10 @@ def get_oauth_config(self) -> dict: return self.global_config.get_oauth() return None + def get_smtp_config(self) -> dict: + if self.global_config: + return self.global_config.get_smtp() + def get_actor_config(self) -> ActorConfig: """ Return Actor Config diff --git a/fabric_cf/actor/core/apis/abc_actor_management_object.py b/fabric_cf/actor/core/apis/abc_actor_management_object.py index f993d081..da4da253 100644 --- a/fabric_cf/actor/core/apis/abc_actor_management_object.py +++ b/fabric_cf/actor/core/apis/abc_actor_management_object.py @@ -238,8 +238,8 @@ def get_sites(self, *, caller: AuthToken, site: str) -> ResultSitesAvro: def get_reservations(self, *, caller: AuthToken, states: List[int] = None, slice_id: ID = None, rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None, type: str = None, - site: str = None, node_id: str = None, - host: str = None, ip_subnet: str = None, full: bool = False) -> ResultReservationAvro: + site: str = None, node_id: str = None, host: str = None, ip_subnet: str = None, + full: bool = False, start: datetime = None, end: datetime = None) -> ResultReservationAvro: """ Get Reservations @param states states @@ -256,10 +256,30 @@ def get_reservations(self, *, caller: AuthToken, states: List[int] = None, @param host host @param ip_subnet ip subnet @param full + @param start: start time + @param end: end time @return returns list of the reservations """ + def get_components(self, *, node_id: str, rsv_type: list[str], states: list[int], + component: str = None, bdf: str = None, start: datetime = None, + end: datetime = None, excludes: List[str] = None) -> Dict[str, List[str]]: + """ + Returns components matching the search criteria + @param node_id: Worker Node ID to which components belong + @param states: list of states used to find reservations + @param rsv_type: type of reservations + @param component: component name + @param bdf: Component's PCI address + @param start: start time + @param end: end time + @param excludes: Excludes the list of reservations + NOTE# For P4 switches; node_id=node+renc-p4-sw component=ip+192.168.11.8 bdf=p1 + + @return Dictionary with component name as the key and value as list of associated PCI addresses in use. + """ + def get_slices(self, *, slice_id: ID, caller: AuthToken, slice_name: str = None, email: str = None, states: List[int] = None, project: str = None, limit: int = None, offset: int = None, user_id: str = None, search: str = None, diff --git a/fabric_cf/actor/core/apis/abc_mgmt_actor.py b/fabric_cf/actor/core/apis/abc_mgmt_actor.py index c4a675c9..27662195 100644 --- a/fabric_cf/actor/core/apis/abc_mgmt_actor.py +++ b/fabric_cf/actor/core/apis/abc_mgmt_actor.py @@ -26,6 +26,7 @@ from __future__ import annotations from abc import abstractmethod +from datetime import datetime from typing import TYPE_CHECKING, List, Tuple, Dict from fabric_mb.message_bus.messages.delegation_avro import DelegationAvro @@ -151,7 +152,8 @@ def accept_update_slice(self, *, slice_id: ID) -> bool: def get_reservations(self, *, states: List[int] = None, slice_id: ID = None, rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None, type: str = None, site: str = None, node_id: str = None, - host: str = None, ip_subnet: str = None, full: bool = False) -> List[ReservationMng]: + host: str = None, ip_subnet: str = None, full: bool = False, + start: datetime = None, end: datetime = None) -> List[ReservationMng]: """ Get Reservations @param states states @@ -166,10 +168,31 @@ def get_reservations(self, *, states: List[int] = None, slice_id: ID = None, @param ip_subnet ip subnet @param host host @param full + @param start: start time + @param end: end time Obtains all reservations @return returns list of the reservations """ + def get_components(self, *, node_id: str, rsv_type: list[str], states: list[int], + component: str = None, bdf: str = None, start: datetime = None, + end: datetime = None, excludes: List[str] = None) -> Dict[str, List[str]]: + """ + Returns components matching the search criteria + @param node_id: Worker Node ID to which components belong + @param states: list of states used to find reservations + @param rsv_type: type of reservations + @param component: component name + @param bdf: Component's PCI address + @param start: start time + @param end: end time + @param excludes: Excludes the list of reservations + NOTE# For P4 switches; node_id=node+renc-p4-sw component=ip+192.168.11.8 bdf=p1 + + @return Dictionary with component name as the key and value as list of associated PCI addresses in use. + """ + raise NotImplementedError + @abstractmethod def get_sites(self, *, site: str) -> List[SiteAvro] or None: """ diff --git a/fabric_cf/actor/core/common/constants.py b/fabric_cf/actor/core/common/constants.py index e6bb9dad..c4b4cf3b 100644 --- a/fabric_cf/actor/core/common/constants.py +++ b/fabric_cf/actor/core/common/constants.py @@ -173,6 +173,8 @@ class Constants: PROPERTY_CONF_O_AUTH_TRL_REFRESH = "trl-refresh" PROPERTY_CONF_O_AUTH_VERIFY_EXP = "verify-exp" + CONFIG_SECTION_SMTP = "smtp" + CONFIG_SECTION_DATABASE = "database" PROPERTY_CONF_DB_USER = "db-user" PROPERTY_CONF_DB_PASSWORD = "db-password" @@ -302,10 +304,11 @@ class Constants: USER_SSH_KEY = "user.ssh.key" ALGORITHM = 'algorithm' + CORE_CAPACITY_THRESHOLD = "core_capacity_threshold" # Orchestrator Lease params TWO_WEEKS = timedelta(days=15) - DEFAULT_MAX_DURATION = TWO_WEEKS + DEFAULT_MAX_DURATION_IN_WEEKS = TWO_WEEKS LEASE_TIME_FORMAT = "%Y-%m-%d %H:%M:%S %z" DEFAULT_LEASE_IN_HOURS = 24 LONG_LIVED_SLICE_TIME_WEEKS = timedelta(weeks=26) diff --git a/fabric_cf/actor/core/core/policy.py b/fabric_cf/actor/core/core/policy.py index a7f8e563..b7a34fc9 100644 --- a/fabric_cf/actor/core/core/policy.py +++ b/fabric_cf/actor/core/core/policy.py @@ -23,6 +23,9 @@ # # # Author: Komal Thareja (kthare10@renci.org) +import enum +from enum import Enum + from fabric_cf.actor.boot.configuration import ActorConfig from fabric_cf.actor.core.apis.abc_actor_mixin import ABCActorMixin from fabric_cf.actor.core.apis.abc_delegation import ABCDelegation @@ -36,6 +39,19 @@ from fabric_cf.actor.core.kernel.resource_set import ResourceSet +class AllocationAlgorithm(Enum): + FirstFit = enum.auto() + BestFit = enum.auto() + WorstFit = enum.auto() + Random = enum.auto() + + def __repr__(self): + return self.name + + def __str__(self): + return self.name + + class Policy(ABCPolicy): """ Base class for all policy implementations. diff --git a/fabric_cf/actor/core/kernel/reservation_client.py b/fabric_cf/actor/core/kernel/reservation_client.py index 43c3dda8..8be75693 100644 --- a/fabric_cf/actor/core/kernel/reservation_client.py +++ b/fabric_cf/actor/core/kernel/reservation_client.py @@ -1004,7 +1004,8 @@ def probe_join_state(self): # This is a regular request for modifying network resources to an upstream broker. self.sequence_ticket_out += 1 - print(f"Issuing an extend ticket {sliver_to_str(sliver=self.get_requested_resources().get_sliver())}") + self.logger.debug(f"Issuing an extend ticket " + f"{sliver_to_str(sliver=self.get_requested_resources().get_sliver())}") RPCManagerSingleton.get().extend_ticket(reservation=self) # Update ASM with Reservation Info diff --git a/fabric_cf/actor/core/manage/actor_management_object.py b/fabric_cf/actor/core/manage/actor_management_object.py index 45d9e4e6..efc4f09c 100644 --- a/fabric_cf/actor/core/manage/actor_management_object.py +++ b/fabric_cf/actor/core/manage/actor_management_object.py @@ -428,11 +428,17 @@ def get_sites(self, *, caller: AuthToken, site: str) -> ResultSitesAvro: return result + def get_components(self, *, node_id: str, rsv_type: list[str], states: list[int], + component: str = None, bdf: str = None, start: datetime = None, + end: datetime = None, excludes: List[str] = None) -> Dict[str, List[str]]: + return self.db.get_components(node_id=node_id, rsv_type=rsv_type, states=states, + component=component, bdf=bdf, start=start, end=end, excludes=excludes) + def get_reservations(self, *, caller: AuthToken, states: List[int] = None, slice_id: ID = None, rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None, type: str = None, - site: str = None, node_id: str = None, host: str = None, - ip_subnet: str = None, full: bool = False) -> ResultReservationAvro: + site: str = None, node_id: str = None, host: str = None, ip_subnet: str = None, + full: bool = False, start: datetime = None, end: datetime = None) -> ResultReservationAvro: result = ResultReservationAvro() result.status = ResultAvro() @@ -452,7 +458,8 @@ def get_reservations(self, *, caller: AuthToken, states: List[int] = None, else: res_list = self.db.get_reservations(slice_id=slice_id, rid=rid, email=email, states=states, rsv_type=rsv_type, site=site, - graph_node_id=node_id, host=host, ip_subnet=ip_subnet) + graph_node_id=node_id, host=host, ip_subnet=ip_subnet, + start=start, end=end) except Exception as e: self.logger.error("getReservations:db access {}".format(e)) result.status.set_code(ErrorCodes.ErrorDatabaseError.value) diff --git a/fabric_cf/actor/core/manage/kafka/kafka_actor.py b/fabric_cf/actor/core/manage/kafka/kafka_actor.py index 91603113..92376a0a 100644 --- a/fabric_cf/actor/core/manage/kafka/kafka_actor.py +++ b/fabric_cf/actor/core/manage/kafka/kafka_actor.py @@ -25,6 +25,7 @@ # Author: Komal Thareja (kthare10@renci.org) from __future__ import annotations +from datetime import datetime from typing import List from fabric_mb.message_bus.messages.close_delegations_avro import CloseDelegationsAvro @@ -132,7 +133,8 @@ def delete_slice(self, *, slice_id: ID) -> bool: def get_reservations(self, *, states: List[int] = None, slice_id: ID = None, rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None, type: str = None, site: str = None, node_id: str = None, - host: str = None, ip_subnet: str = None, full: bool = False) -> List[ReservationMng]: + host: str = None, ip_subnet: str = None, full: bool = False, + start: datetime = None, end: datetime = None) -> List[ReservationMng]: request = GetReservationsRequestAvro() request = self.fill_request_by_id_message(request=request, slice_id=slice_id, states=states, email=email, rid=rid, diff --git a/fabric_cf/actor/core/manage/local/local_actor.py b/fabric_cf/actor/core/manage/local/local_actor.py index d8ef51c7..3fb4ff83 100644 --- a/fabric_cf/actor/core/manage/local/local_actor.py +++ b/fabric_cf/actor/core/manage/local/local_actor.py @@ -26,6 +26,7 @@ from __future__ import annotations import traceback +from datetime import datetime from typing import TYPE_CHECKING, List, Tuple, Dict from fabric_mb.message_bus.messages.delegation_avro import DelegationAvro @@ -44,12 +45,11 @@ from fabric_mb.message_bus.messages.reservation_mng import ReservationMng from fabric_mb.message_bus.messages.reservation_state_avro import ReservationStateAvro from fabric_mb.message_bus.messages.slice_avro import SliceAvro - from fabric_cf.actor.core.manage.management_object import ManagementObject from fabric_cf.actor.security.auth_token import AuthToken class LocalActor(LocalProxy, ABCMgmtActor): - def __init__(self, *, manager: ManagementObject, auth: AuthToken): + def __init__(self, *, manager: ActorManagementObject, auth: AuthToken): super().__init__(manager=manager, auth=auth) if not isinstance(manager, ActorManagementObject): @@ -111,13 +111,14 @@ def remove_slice(self, *, slice_id: ID) -> bool: def get_reservations(self, *, states: List[int] = None, slice_id: ID = None, rid: ID = None, oidc_claim_sub: str = None, email: str = None, rid_list: List[str] = None, type: str = None, site: str = None, node_id: str = None, - host: str = None, ip_subnet: str = None, full: bool = False) -> List[ReservationMng]: + host: str = None, ip_subnet: str = None, full: bool = False, + start: datetime = None, end: datetime = None) -> List[ReservationMng]: self.clear_last() try: result = self.manager.get_reservations(caller=self.auth, states=states, slice_id=slice_id, rid=rid, oidc_claim_sub=oidc_claim_sub, email=email, rid_list=rid_list, type=type, site=site, node_id=node_id, host=host, - ip_subnet=ip_subnet, full=full) + ip_subnet=ip_subnet, full=full, start=start, end=end) self.last_status = result.status if result.status.get_code() == 0: @@ -126,6 +127,16 @@ def get_reservations(self, *, states: List[int] = None, slice_id: ID = None, except Exception as e: self.on_exception(e=e, traceback_str=traceback.format_exc()) + def get_components(self, *, node_id: str, rsv_type: list[str], states: list[int], + component: str = None, bdf: str = None, start: datetime = None, + end: datetime = None, excludes: List[str] = None) -> Dict[str, List[str]]: + try: + return self.manager.get_components(node_id=node_id, rsv_type=rsv_type, states=states, + component=component, bdf=bdf, start=start, + end=end, excludes=excludes) + except Exception as e: + self.on_exception(e=e, traceback_str=traceback.format_exc()) + def get_sites(self, *, site: str) -> List[SiteAvro] or None: self.clear_last() try: diff --git a/fabric_cf/actor/core/manage/management_utils.py b/fabric_cf/actor/core/manage/management_utils.py index 2b2e8aad..1977b1fc 100644 --- a/fabric_cf/actor/core/manage/management_utils.py +++ b/fabric_cf/actor/core/manage/management_utils.py @@ -67,6 +67,7 @@ class ManagementUtils: @staticmethod def update_slice(*, slice_obj: ABCSlice, slice_mng: SliceAvro) -> ABCSlice: slice_obj.set_graph_id(graph_id=slice_mng.get_graph_id()) + slice_obj.set_lease_start(lease_start=slice_mng.get_lease_start()) slice_obj.set_lease_end(lease_end=slice_mng.get_lease_end()) slice_obj.set_config_properties(value=slice_mng.get_config_properties()) return slice_obj diff --git a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py index cac0a409..8ad4e5b8 100644 --- a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py +++ b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py @@ -38,8 +38,8 @@ from fim.pluggable import PluggableRegistry, PluggableType from fim.slivers.attached_components import ComponentSliver, ComponentType from fim.slivers.base_sliver import BaseSliver -from fim.slivers.capacities_labels import Labels -from fim.slivers.interface_info import InterfaceSliver, InterfaceType +from fim.slivers.capacities_labels import Labels, Capacities +from fim.slivers.interface_info import InterfaceType from fim.slivers.network_node import NodeSliver, NodeType from fim.slivers.network_service import NetworkServiceSliver, ServiceType, NSLayer from fim.slivers.path_info import Path @@ -50,6 +50,7 @@ from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin from fabric_cf.actor.core.common.constants import Constants from fabric_cf.actor.core.container.maintenance import Maintenance +from fabric_cf.actor.core.core.policy import AllocationAlgorithm from fabric_cf.actor.core.delegation.resource_ticket import ResourceTicketFactory from fabric_cf.actor.core.common.exceptions import BrokerException, ExceptionErrorCode from fabric_cf.actor.core.kernel.reservation_states import ReservationStates, ReservationOperation @@ -75,19 +76,6 @@ from fabric_cf.actor.core.apis.abc_broker_mixin import ABCBrokerMixin -class BrokerAllocationAlgorithm(Enum): - FirstFit = enum.auto() - BestFit = enum.auto() - WorstFit = enum.auto() - Random = enum.auto() - - def __repr__(self): - return self.name - - def __str__(self): - return self.name - - class BrokerSimplerUnitsPolicy(BrokerCalendarPolicy): """ BrokerSimplerUnitsPolicy is a simple implementation of the broker policy interface. @@ -516,7 +504,6 @@ def __candidate_nodes(self, *, sliver: NodeSliver) -> List[str]: props=node_props, comps=sliver.attached_components_info) - # Skip nodes without any delegations which would be data-switch in this case if sliver.get_type() == NodeType.Switch: exclude = [] for n in result: @@ -558,6 +545,45 @@ def __prune_nodes_in_maintenance(self, node_id_list: List[str], site: str, reser return node_id_list + def __reshuffle_nodes(self, node_id_list: List[str], node_id_to_reservations: dict, + term: Term) -> List[str]: + """ + Reshuffles nodes based on their usage compared to a given threshold. + + @param: node_id_list (list): List of node_ids + + @return: list: Reshuffled list of nodes, with nodes exceeding the threshold shuffled separately. + """ + if len(node_id_list) == 1: + return node_id_list + + enabled, threshold = self.get_core_capacity_threshold() + if not enabled: + return node_id_list + + # Separate nodes based on whether their usage exceeds the threshold + above_threshold = [] + below_threshold = [] + + for node_id in node_id_list: + node, total, allocated = self.get_node_capacities(node_id=node_id, + node_id_to_reservations=node_id_to_reservations, + term=term) + if total and allocated: + self.logger.debug(f"Allocated: {allocated} Total: {total}") + cpu_usage_percent = int(((allocated.core * 100)/ total.core)) + self.logger.debug(f"CPU Usage for {node.get_name()}: {cpu_usage_percent}; " + f"threshold: {threshold}") + if cpu_usage_percent < threshold: + below_threshold.append(node_id) + else: + above_threshold.append(node_id) + + # Combine both shuffled lists (you can choose the order of combining) + reshuffled_nodes = below_threshold + above_threshold + + return reshuffled_nodes + def __find_first_fit(self, node_id_list: List[str], node_id_to_reservations: dict, inv: NetworkNodeInventory, reservation: ABCBrokerReservation, term: Term, sliver: NodeSliver, operation: ReservationOperation = ReservationOperation.Create) -> Tuple[str, BaseSliver, Any]: @@ -631,9 +657,17 @@ def __allocate_nodes(self, *, reservation: ABCBrokerReservation, inv: NetworkNod @return tuple containing delegation id, sliver, error message if any """ delegation_id = None - node_id_list = self.__candidate_nodes(sliver=sliver) - if self.get_algorithm_type(site=sliver.site) == BrokerAllocationAlgorithm.Random: + node_id_list = FimHelper.candidate_nodes(combined_broker_model=self.combined_broker_model, + sliver=sliver) + if self.get_algorithm_type(site=sliver.site) == AllocationAlgorithm.Random: random.shuffle(node_id_list) + else: + # Reshuffle Nodes based on CPU Threshold only for VMs when no specific host is specified + if sliver.get_type() == NodeType.VM and (sliver.labels is None or + (sliver.labels and sliver.labels.instance_parent is None)): + node_id_list = self.__reshuffle_nodes(node_id_list=node_id_list, + node_id_to_reservations=node_id_to_reservations, + term=term) if len(node_id_list) == 0 and sliver.site not in self.combined_broker_model.get_sites(): error_msg = f'Unknown site {sliver.site} requested for {reservation}' @@ -812,8 +846,8 @@ def __allocate_services(self, *, rid: ID, inv: NetworkServiceInventory, sliver: device_name = owner_switch.get_name() if device_name == Constants.AL2S: - delegation_id, delegated_label = InventoryForType.get_delegations(lab_cap_delegations= - net_cp.get_label_delegations()) + delegation_id, delegated_label = FimHelper.get_delegations(delegations= + net_cp.get_label_delegations()) device_name = delegated_label.device_name local_name = delegated_label.local_name @@ -893,11 +927,11 @@ def __allocate_services(self, *, rid: ID, inv: NetworkServiceInventory, sliver: owner_mpls_ns = ns break if owner_ns and ServiceType.MPLS == owner_ns.get_type(): - delegation_id, delegated_label = InventoryForType.get_delegations(lab_cap_delegations= - owner_switch.get_label_delegations()) + delegation_id, delegated_label = FimHelper.get_delegations(delegations= + owner_switch.get_label_delegations()) else: - delegation_id, delegated_label = InventoryForType.get_delegations(lab_cap_delegations= - owner_ns.get_label_delegations()) + delegation_id, delegated_label = FimHelper.get_delegations(delegations= + owner_ns.get_label_delegations()) # Set the Subnet and gateway from the Owner Switch (a) existing_reservations = self.get_existing_reservations(node_id=owner_ns_id, @@ -1673,18 +1707,64 @@ def unmerge_adm(self, *, graph_id: str): self.combined_broker_model.rollback(graph_id=snapshot_graph_id) raise e - def get_algorithm_type(self, site: str) -> BrokerAllocationAlgorithm: + def get_algorithm_type(self, site: str) -> AllocationAlgorithm: if self.properties is not None: algorithms = self.properties.get(Constants.ALGORITHM, None) - random_algo = algorithms.get(str(BrokerAllocationAlgorithm.Random)) + random_algo = algorithms.get(str(AllocationAlgorithm.Random)) if random_algo and random_algo.get('enabled') and random_algo.get('sites') and \ site in random_algo.get('sites'): - return BrokerAllocationAlgorithm.Random - first_fit_algo = algorithms.get(BrokerAllocationAlgorithm.Random.name) + return AllocationAlgorithm.Random + first_fit_algo = algorithms.get(AllocationAlgorithm.Random.name) if first_fit_algo and first_fit_algo.get('enabled'): - return BrokerAllocationAlgorithm.FirstFit - return BrokerAllocationAlgorithm.FirstFit + return AllocationAlgorithm.FirstFit + return AllocationAlgorithm.FirstFit + def get_core_capacity_threshold(self) -> Tuple[bool, int]: + if self.properties is not None: + core_capacity_threshold = self.properties.get(Constants.CORE_CAPACITY_THRESHOLD, None) + if core_capacity_threshold and core_capacity_threshold.get('enabled'): + core_usage_threshold_percent = core_capacity_threshold.get('core_usage_threshold_percent', 75) + return True, core_usage_threshold_percent + return False, 0 + + def get_node_capacities(self, node_id: str, node_id_to_reservations: dict, + term: Term) -> Tuple[NodeSliver, Capacities, Capacities]: + """ + Get Node capacities - total as well as allocated capacities + @param node_id: Node Id + @param node_id_to_reservations: Reservations assigned as part of this bid + @param term: Term + @return: Tuple containing node, total and allocated capacity + """ + try: + graph_node = self.get_network_node_from_graph(node_id=node_id) + existing_reservations = self.get_existing_reservations(node_id=node_id, + node_id_to_reservations=node_id_to_reservations, + start=term.get_start_time(), + end=term.get_end_time()) + + delegation_id, delegated_capacity = FimHelper.get_delegations( + delegations=graph_node.get_capacity_delegations()) + + allocated_capacity = Capacities() + + if existing_reservations: + for reservation in existing_reservations: + # For Active or Ticketed or Ticketing reservations; reduce the counts from available + resource_sliver = None + if reservation.is_ticketing() and reservation.get_approved_resources() is not None: + resource_sliver = reservation.get_approved_resources().get_sliver() + + if (reservation.is_active() or reservation.is_ticketed()) and \ + reservation.get_resources() is not None: + resource_sliver = reservation.get_resources().get_sliver() + + if resource_sliver is not None and isinstance(resource_sliver, NodeSliver): + allocated_capacity += resource_sliver.get_capacity_allocations() + + return graph_node, delegated_capacity, allocated_capacity + except Exception as e: + self.logger.error(f"Failed to determine node capacities: {node_id}, error: {e}") if __name__ == '__main__': policy = BrokerSimplerUnitsPolicy() diff --git a/fabric_cf/actor/core/policy/inventory_for_type.py b/fabric_cf/actor/core/policy/inventory_for_type.py index 719d3199..a8ec6b5c 100644 --- a/fabric_cf/actor/core/policy/inventory_for_type.py +++ b/fabric_cf/actor/core/policy/inventory_for_type.py @@ -26,11 +26,10 @@ from __future__ import annotations from abc import abstractmethod -from ctypes import Union -from typing import Tuple -from fim.slivers.capacities_labels import Labels, Capacities -from fim.slivers.delegations import DelegationFormat, Delegations +from fim.slivers.base_sliver import BaseSliver + +from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin class InventoryForType: @@ -49,18 +48,6 @@ def __setstate__(self, state): self.__dict__.update(state) self.logger = None - @staticmethod - def get_delegations(*, lab_cap_delegations: Delegations) -> Tuple[str or None, Union[Labels, Capacities] or None]: - # Grab Label Delegations - delegation_id, deleg = lab_cap_delegations.get_sole_delegation() - #self.logger.debug(f"Available label/capacity delegations: {deleg} format {deleg.get_format()}") - # ignore pool definitions and references for now - if deleg.get_format() != DelegationFormat.SinglePool: - return None, None - # get the Labels/Capacities object - delegated_label_capacity = deleg.get_details() - return delegation_id, delegated_label_capacity - @abstractmethod def free(self, *, count: int, request: dict = None, resource: dict = None) -> dict: """ @@ -70,3 +57,16 @@ def free(self, *, count: int, request: dict = None, resource: dict = None) -> di @param resource resource properties @return new resource properties """ + + @staticmethod + def _get_allocated_sliver(reservation: ABCReservationMixin) -> BaseSliver: + """ + Retrieve the allocated sliver from the reservation. + + :param reservation: An instance of ABCReservationMixin representing the reservation to retrieve the sliver from. + :return: The allocated NetworkServiceSliver if available, otherwise None. + """ + if reservation.is_ticketing() and reservation.get_approved_resources() is not None: + return reservation.get_approved_resources().get_sliver() + if (reservation.is_active() or reservation.is_ticketed()) and reservation.get_resources() is not None: + return reservation.get_resources().get_sliver() diff --git a/fabric_cf/actor/core/policy/network_node_inventory.py b/fabric_cf/actor/core/policy/network_node_inventory.py index 15fc33d0..df3e4895 100644 --- a/fabric_cf/actor/core/policy/network_node_inventory.py +++ b/fabric_cf/actor/core/policy/network_node_inventory.py @@ -23,8 +23,10 @@ # # # Author: Komal Thareja (kthare10@renci.org) +import logging from typing import Tuple, List, Dict +from fabric_cf.actor.fim.fim_helper import FimHelper from fim.slivers.attached_components import AttachedComponentsInfo, ComponentSliver, ComponentType from fim.slivers.base_sliver import BaseSliver from fim.slivers.capacities_labels import Capacities, Labels @@ -43,20 +45,23 @@ class NetworkNodeInventory(InventoryForType): - def __check_capacities(self, *, rid: ID, requested_capacities: Capacities, delegated_capacities: Delegations, - existing_reservations: List[ABCReservationMixin]) -> str or None: + @staticmethod + def check_capacities(*, rid: ID, requested_capacities: Capacities, delegated: Delegations, + existing_reservations: List[ABCReservationMixin], + logger: logging.Logger) -> str: """ Check if the requested capacities can be satisfied with the available capacities :param rid: reservation id of the reservation being served :param requested_capacities: Requested Capacities - :param delegated_capacities: Delegated Capacities + :param delegated: Delegated Capacities :param existing_reservations: Existing Reservations served by the same BQM node + :param logger: logger :return: Delegation Id of the delegation which satisfies the request :raises: BrokerException in case the request cannot be satisfied """ - self.logger.debug(f"requested_capacities: {requested_capacities} for reservation# {rid}") + logger.debug(f"requested_capacities: {requested_capacities} for reservation# {rid}") - delegation_id, delegated_capacity = self.get_delegations(lab_cap_delegations=delegated_capacities) + delegation_id, delegated_capacity = FimHelper.get_delegations(delegations=delegated) # Remove allocated capacities to the reservations if existing_reservations is not None: @@ -64,16 +69,10 @@ def __check_capacities(self, *, rid: ID, requested_capacities: Capacities, deleg if rid == reservation.get_reservation_id(): continue # For Active or Ticketed or Ticketing reservations; reduce the counts from available - resource_sliver = None - if reservation.is_ticketing() and reservation.get_approved_resources() is not None: - resource_sliver = reservation.get_approved_resources().get_sliver() - - if (reservation.is_active() or reservation.is_ticketed()) and \ - reservation.get_resources() is not None: - resource_sliver = reservation.get_resources().get_sliver() + resource_sliver = InventoryForType._get_allocated_sliver(reservation=reservation) if resource_sliver is not None and isinstance(resource_sliver, NodeSliver): - self.logger.debug( + logger.debug( f"Excluding already assigned resources {resource_sliver.get_capacity_allocations()} to " f"reservation# {reservation.get_reservation_id()}") delegated_capacity = delegated_capacity - resource_sliver.get_capacity_allocations() @@ -84,10 +83,10 @@ def __check_capacities(self, *, rid: ID, requested_capacities: Capacities, deleg if len(negative_fields) > 0: raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, msg=f"{negative_fields}") - return delegation_id - def __set_ips(self, *, req_ifs: InterfaceSliver, lab: Labels): + @staticmethod + def __set_ips(*, req_ifs: InterfaceSliver, lab: Labels): if req_ifs.labels is not None and req_ifs.labels.ipv4 is not None: lab.ipv4 = req_ifs.labels.ipv4 if req_ifs.labels.ipv4_subnet is not None: @@ -98,57 +97,59 @@ def __set_ips(self, *, req_ifs: InterfaceSliver, lab: Labels): lab.ipv6_subnet = req_ifs.labels.ipv6_subnet return lab - def __update_shared_nic_labels_and_capacities(self, *, available_component: ComponentSliver, - requested_component: ComponentSliver) -> ComponentSliver: + @staticmethod + def __update_shared_nic_labels_and_capacities(*, available: ComponentSliver, + requested: ComponentSliver, + logger: logging.Logger) -> ComponentSliver: """ Update the shared NIC Labels and Capacities. Assign the 1st available PCI address/bdf to the requested component Traverse the available component's labels to find the index for bdf assigned Using the found labels, assign BDF, MAC and VLAN address to the IFS on the Requested component In case of L2 service, also copy the requested IP address so it can be used by the AMHandler to configure the interface post VM creation - :param available_component: Available Component - :param requested_component: Requested Component + :param available: Available Component + :param requested: Requested Component :return updated requested component with VLAN, MAC and IP information """ # Check labels - delegation_id, delegated_label = self.get_delegations( - lab_cap_delegations=available_component.get_label_delegations()) + delegation_id, delegated_label = FimHelper.get_delegations( + delegations=available.get_label_delegations()) if delegated_label.bdf is None or len(delegated_label.bdf) < 1: message = "No PCI devices available in the delegation" - self.logger.error(message) + logger.error(message) raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, msg=f"{message}") # Find the VLAN from the BQM Component - if available_component.network_service_info is None or \ - len(available_component.network_service_info.network_services) != 1: + if available.network_service_info is None or \ + len(available.network_service_info.network_services) != 1: message = "Shared NIC Card must have one Network Service" - self.logger.error(message) + logger.error(message) raise BrokerException(error_code=ExceptionErrorCode.FAILURE, msg=f"{message}") - ns_name = next(iter(available_component.network_service_info.network_services)) - ns = available_component.network_service_info.network_services[ns_name] + ns_name = next(iter(available.network_service_info.network_services)) + ns = available.network_service_info.network_services[ns_name] if ns.interface_info is None or len(ns.interface_info.interfaces) != 1: message = "Shared NIC Card must have one Connection Point" - self.logger.error(message) + logger.error(message) raise BrokerException(error_code=ExceptionErrorCode.FAILURE, msg=f"{message}") ifs_name = next(iter(ns.interface_info.interfaces)) ifs = ns.interface_info.interfaces[ifs_name] - delegation_id, ifs_delegated_labels = self.get_delegations(lab_cap_delegations=ifs.get_label_delegations()) + delegation_id, ifs_delegated_labels = FimHelper.get_delegations(delegations=ifs.get_label_delegations()) assigned_bdf = delegated_label.bdf[0] assigned_numa = delegated_label.numa[0] # Check if the requested component's VLAN exists in the delegated labels - if requested_component.labels and requested_component.labels.vlan and \ - requested_component.labels.vlan in ifs_delegated_labels.vlan: - vlan_index = ifs_delegated_labels.vlan.index(requested_component.labels.vlan) + if requested.labels and requested.labels.vlan and \ + requested.labels.vlan in ifs_delegated_labels.vlan: + vlan_index = ifs_delegated_labels.vlan.index(requested.labels.vlan) bdf_for_requested_vlan = ifs_delegated_labels.bdf[vlan_index] if bdf_for_requested_vlan in delegated_label.bdf: @@ -157,19 +158,19 @@ def __update_shared_nic_labels_and_capacities(self, *, available_component: Comp assigned_numa = delegated_label.numa[bdf_index] # Assign the first PCI Id from the list of available PCI slots - requested_component.label_allocations = Labels(bdf=assigned_bdf, numa=assigned_numa) + requested.label_allocations = Labels(bdf=assigned_bdf, numa=assigned_numa) # Find index of assigned BDF in the interface delegated labels assigned_index = ifs_delegated_labels.bdf.index(assigned_bdf) # Updated the Requested component with VLAN, BDF, MAC - req_ns_name = next(iter(requested_component.network_service_info.network_services)) - req_ns = requested_component.network_service_info.network_services[req_ns_name] + req_ns_name = next(iter(requested.network_service_info.network_services)) + req_ns = requested.network_service_info.network_services[req_ns_name] req_ifs_name = next(iter(req_ns.interface_info.interfaces)) req_ifs = req_ns.interface_info.interfaces[req_ifs_name] # Do not copy VLAN for OpenStack-vNIC - if requested_component.get_model() == Constants.OPENSTACK_VNIC_MODEL: + if requested.get_model() == Constants.OPENSTACK_VNIC_MODEL: lab = Labels(bdf=ifs_delegated_labels.bdf[assigned_index], mac=ifs_delegated_labels.mac[assigned_index], local_name=ifs_delegated_labels.local_name[assigned_index]) else: @@ -180,45 +181,48 @@ def __update_shared_nic_labels_and_capacities(self, *, available_component: Comp # For the Layer 2 copying the IP address to the label allocations # This is to be used by AM Handler to configure Network Interface if req_ns.layer == NSLayer.L2: - lab = self.__set_ips(req_ifs=req_ifs, lab=lab) + lab = NetworkNodeInventory.__set_ips(req_ifs=req_ifs, lab=lab) req_ifs.set_label_allocations(lab=lab) - self.logger.info(f"Assigned Interface Sliver: {req_ifs}") - return requested_component + logger.info(f"Assigned Interface Sliver: {req_ifs}") + return requested - def __update_smart_nic_labels_and_capacities(self, *, available_component: ComponentSliver, - requested_component: ComponentSliver) -> ComponentSliver: + @staticmethod + def __update_smart_nic_labels_and_capacities(*, available: ComponentSliver, + requested: ComponentSliver, + logger: logging.Logger) -> ComponentSliver: """ Update the IFS for the Smart NIC with VLAN, MAC and IP Address information This is to enable AM handler to configure network interfaces at VM creation. This is only done for Layer 2 services - :param available_component: Available Component - :param requested_component: Requested Component + :param available: Available Component + :param requested: Requested Component :return updated requested component with VLAN, MAC and IP information """ # Find the VLAN from the BQM Component - if available_component.network_service_info is None or \ - len(available_component.network_service_info.network_services) != 1: + if available.network_service_info is None or \ + len(available.network_service_info.network_services) != 1: message = "Smart NIC Card must have at one Network Service" - self.logger.error(message) + logger.error(message) raise BrokerException(error_code=ExceptionErrorCode.FAILURE, msg=f"{message}") - ns_name = next(iter(available_component.network_service_info.network_services)) - ns = available_component.network_service_info.network_services[ns_name] + ns_name = next(iter(available.network_service_info.network_services)) + ns = available.network_service_info.network_services[ns_name] if ns.interface_info is None or len(ns.interface_info.interfaces) < 0: message = "Smart NIC Card must have at least one Connection Point" - self.logger.error(message) + logger.error(message) raise BrokerException(error_code=ExceptionErrorCode.FAILURE, msg=f"{message}") for ifs in ns.interface_info.interfaces.values(): - delegation_id, ifs_delegated_labels = self.get_delegations(lab_cap_delegations=ifs.get_label_delegations()) + delegation_id, ifs_delegated_labels = FimHelper.get_delegations( + delegations=ifs.get_label_delegations()) - for requested_ns in requested_component.network_service_info.network_services.values(): + for requested_ns in requested.network_service_info.network_services.values(): if requested_ns.interface_info is not None and requested_ns.interface_info.interfaces is not None: for requested_ifs in requested_ns.interface_info.interfaces.values(): if requested_ifs.labels.local_name == ifs_delegated_labels.local_name: @@ -232,85 +236,92 @@ def __update_smart_nic_labels_and_capacities(self, *, available_component: Compo if requested_ifs.labels is not None and requested_ifs.labels.vlan is not None: lab.vlan = requested_ifs.labels.vlan - lab = self.__set_ips(req_ifs=requested_ifs, lab=lab) + lab = NetworkNodeInventory.__set_ips(req_ifs=requested_ifs, lab=lab) requested_ifs.set_label_allocations(lab=lab) - self.logger.info(f"Assigned Interface Sliver: {requested_ifs}") - return requested_component + logger.info(f"Assigned Interface Sliver: {requested_ifs}") + return requested - def __check_component_labels_and_capacities(self, *, available_component: ComponentSliver, graph_id: str, - requested_component: ComponentSliver, + @staticmethod + def __check_component_labels_and_capacities(*, available: ComponentSliver, graph_id: str, + requested: ComponentSliver, logger: logging.Logger, operation: ReservationOperation = ReservationOperation.Create) -> ComponentSliver: """ Check if available component capacities, labels to match requested component - :param available_component: available component + :param available: available component :param graph_id: BQM graph id - :param requested_component: requested component + :param requested: requested component :param operation: operation :return: requested component annotated with properties in case of success, None otherwise """ - if requested_component.get_model() is not None and \ - requested_component.get_model() != available_component.get_model(): - return requested_component + if requested.get_model() is not None and \ + requested.get_model() != available.get_model(): + return requested # Checking capacity for component - delegation_id, delegated_capacity = self.get_delegations( - lab_cap_delegations=available_component.get_capacity_delegations()) + delegation_id, delegated_capacity = FimHelper.get_delegations( + delegations=available.get_capacity_delegations()) # Delegated capacity would have been decremented already to exclude allocated shared NICs if delegated_capacity.unit < 1: - message = f"Insufficient Capacities for component: {requested_component}" - self.logger.error(message) + message = f"Insufficient Capacities for component: {requested}" + logger.error(message) raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, msg=f"{message}") - requested_component.capacity_allocations = Capacities(unit=1) + requested.capacity_allocations = Capacities(unit=1) # Check labels - delegation_id, delegated_label = self.get_delegations( - lab_cap_delegations=available_component.get_label_delegations()) - - if requested_component.get_type() == ComponentType.SharedNIC: - requested_component = self.__update_shared_nic_labels_and_capacities(available_component=available_component, - requested_component=requested_component) + delegation_id, delegated_label = FimHelper.get_delegations( + delegations=available.get_label_delegations()) + + if requested.get_type() == ComponentType.SharedNIC: + requested = NetworkNodeInventory.__update_shared_nic_labels_and_capacities( + available=available, + requested=requested, + logger=logger) else: - requested_component.label_allocations = delegated_label - if requested_component.get_type() == ComponentType.SmartNIC: - requested_component = self.__update_smart_nic_labels_and_capacities(available_component=available_component, - requested_component=requested_component) - - node_map = tuple([graph_id, available_component.node_id]) - requested_component.set_node_map(node_map=node_map) - if requested_component.labels is None or operation == ReservationOperation.Create: - requested_component.labels = Labels.update(lab=requested_component.get_label_allocations()) - - return requested_component - - def __exclude_allocated_pci_device_from_shared_nic(self, shared_nic: ComponentSliver, - allocated_nic: ComponentSliver) -> Tuple[ComponentSliver, bool]: + requested.label_allocations = delegated_label + if requested.get_type() == ComponentType.SmartNIC: + requested = NetworkNodeInventory.__update_smart_nic_labels_and_capacities( + available=available, + requested=requested, + logger=logger) + + node_map = tuple([graph_id, available.node_id]) + requested.set_node_map(node_map=node_map) + if requested.labels is None or operation == ReservationOperation.Create: + requested.labels = Labels.update(lab=requested.get_label_allocations()) + + return requested + + @staticmethod + def __exclude_allocated_pci_device_from_shared_nic(*, shared: ComponentSliver, logger: logging.Logger, + allocated: ComponentSliver) -> Tuple[ComponentSliver, bool]: """ For Shared NIC cards, exclude the already assigned PCI addresses from the available PCI addresses in BQM Component Sliver for the NIC Card - @param shared_nic: Available Shared NIC - @param allocated_nic: Allocated NIC + @param shared: Available Shared NIC + @param allocated: Allocated NIC @return Available NIC updated to exclude the Allocated PCI addresses and True/False indicating if Available Shared NIC has any available PCI addresses """ - if shared_nic.get_type() != ComponentType.SharedNIC and allocated_nic.get_type() != ComponentType.SharedNIC: + if shared.get_type() != ComponentType.SharedNIC and allocated.get_type() != ComponentType.SharedNIC: raise BrokerException(error_code=ExceptionErrorCode.INVALID_ARGUMENT, - msg=f"shared_nic: {shared_nic} allocated_nic: {allocated_nic}") + msg=f"shared_nic: {shared} allocated_nic: {allocated}") # Reduce capacity for component - delegation_id, delegated_capacity = self.get_delegations( - lab_cap_delegations=shared_nic.get_capacity_delegations()) + delegation_id, delegated_capacity = FimHelper.get_delegations( + delegations=shared.get_capacity_delegations()) - self.logger.debug(f"Allocated NIC: {allocated_nic} labels: {allocated_nic.get_labels()}") + logger.debug(f"Allocated NIC: {allocated} labels: {allocated.get_labels()}") # Get the Allocated PCI address - allocated_labels = allocated_nic.get_labels() + allocated_labels = allocated.get_labels() - delegation_id, delegated_label = self.get_delegations(lab_cap_delegations=shared_nic.get_label_delegations()) + delegation_id, delegated_label = FimHelper.get_delegations( + delegations=shared.get_label_delegations()) # Remove allocated PCI address from delegations excluded_labels = [] @@ -323,35 +334,37 @@ def __exclude_allocated_pci_device_from_shared_nic(self, shared_nic: ComponentSl exists = False for e in excluded_labels: if e in delegated_label.bdf: - self.logger.debug(f"Excluding PCI device {e}") + logger.debug(f"Excluding PCI device {e}") delegated_label.bdf.remove(e) exists = True # Exclude already allocated Shared NIC cards if exists: - delegated_capacity -= allocated_nic.get_capacity_allocations() + delegated_capacity -= allocated.get_capacity_allocations() - return shared_nic, (delegated_capacity.unit < 1) + return shared, (delegated_capacity.unit < 1) - def __exclude_allocated_component(self, *, graph_node: NodeSliver, available_component: ComponentSliver, - allocated_component: ComponentSliver): + @staticmethod + def __exclude_allocated_component(*, graph_node: NodeSliver, available: ComponentSliver, + allocated: ComponentSliver, logger: logging.Logger): """ Remove the allocated component from the candidate Node. For dedicated components, the whole component is removed, for Shared NIC, only the allocated PCI address is removed and the number of units is reduced by 1. If all the PCIs are allocated for a Shared NIC, the complete Shared NIC is removed @param graph_node candidate node identified to satisfy the reservation - @param available_component available component - @param allocated_component allocated component + @param available available component + @param allocated allocated component """ exclude = True - if allocated_component.get_type() == ComponentType.SharedNIC: - available_component, exclude = self.__exclude_allocated_pci_device_from_shared_nic( - shared_nic=available_component, allocated_nic=allocated_component) + if allocated.get_type() == ComponentType.SharedNIC: + available, exclude = NetworkNodeInventory.__exclude_allocated_pci_device_from_shared_nic( + shared=available, allocated=allocated, logger=logger) if exclude: - graph_node.attached_components_info.remove_device(name=available_component.get_name()) + graph_node.attached_components_info.remove_device(name=available.get_name()) - def __exclude_components_for_existing_reservations(self, *, rid: ID, graph_node: NodeSliver, + @staticmethod + def __exclude_components_for_existing_reservations(*, rid: ID, graph_node: NodeSliver, logger: logging.Logger, existing_reservations: List[ABCReservationMixin], operation: ReservationOperation = ReservationOperation.Create) -> NodeSliver: """ @@ -367,12 +380,7 @@ def __exclude_components_for_existing_reservations(self, *, rid: ID, graph_node: (operation == ReservationOperation.Extend or not reservation.is_ticketed()): continue # For Active or Ticketed or Ticketing reservations; reduce the counts from available - allocated_sliver = None - if reservation.is_ticketing() and reservation.get_approved_resources() is not None: - allocated_sliver = reservation.get_approved_resources().get_sliver() - - if (reservation.is_active() or reservation.is_ticketed()) and reservation.get_resources() is not None: - allocated_sliver = reservation.get_resources().get_sliver() + allocated_sliver = InventoryForType._get_allocated_sliver(reservation=reservation) if reservation.is_extending_ticket() and reservation.get_requested_resources() is not None and \ reservation.get_requested_resources().get_sliver() is not None: @@ -390,20 +398,23 @@ def __exclude_components_for_existing_reservations(self, *, rid: ID, graph_node: resource_type = allocated.get_type() - self.logger.debug(f"Already allocated components {allocated} of resource_type " - f"{resource_type} to reservation# {reservation.get_reservation_id()}") + logger.debug(f"Already allocated components {allocated} of resource_type " + f"{resource_type} to reservation# {reservation.get_reservation_id()}") for av in graph_node.attached_components_info.devices.values(): if av.node_id == allocated_node_map[1]: - self.__exclude_allocated_component(graph_node=graph_node, available_component=av, - allocated_component=allocated) + NetworkNodeInventory.__exclude_allocated_component(graph_node=graph_node, + available=av, + allocated=allocated, + logger=logger) break return graph_node - def __check_components(self, *, rid: ID, requested_components: AttachedComponentsInfo, graph_id: str, - graph_node: NodeSliver, existing_reservations: List[ABCReservationMixin], - existing_components: Dict[str, List[str]], - operation: ReservationOperation = ReservationOperation.Create) -> AttachedComponentsInfo: + @staticmethod + def check_components(*, rid: ID, requested_components: AttachedComponentsInfo, graph_id: str, + graph_node: NodeSliver, existing_reservations: List[ABCReservationMixin], + existing_components: Dict[str, List[str]], logger: logging.Logger, + operation: ReservationOperation = ReservationOperation.Create) -> AttachedComponentsInfo: """ Check if the requested capacities can be satisfied with the available capacities :param rid: reservation id of the reservation being served @@ -411,17 +422,19 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent :param graph_id: BQM graph id :param graph_node: BQM graph node identified to serve the reservation :param existing_reservations: Existing Reservations served by the same BQM node + :param existing_components: Existing components :param operation: Flag indicating if this is create or modify + :param logger: logger :return: Components updated with the corresponding BQM node ids :raises: BrokerException in case the request cannot be satisfied """ - self.logger.debug(f"Available on {graph_node.node_id} components: {graph_node.attached_components_info.devices.keys()}") + logger.debug(f"Available on {graph_node.node_id} components: {graph_node.attached_components_info.devices.keys()}") - self.__exclude_components_for_existing_reservations(rid=rid, graph_node=graph_node, - existing_reservations=existing_reservations, - operation=operation) + NetworkNodeInventory.__exclude_components_for_existing_reservations(rid=rid, graph_node=graph_node, + existing_reservations=existing_reservations, + operation=operation, logger=logger) - self.logger.debug(f"Excluding components connected to Network Services: {existing_components}") + logger.debug(f"Excluding components connected to Network Services: {existing_components}") if existing_components and len(existing_components): comps_to_remove = [] @@ -437,21 +450,21 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent allocated_component.set_name(resource_name=av.get_name()) allocated_component.set_capacity_allocations(cap=Capacities(unit=len(bdfs))) allocated_component.set_labels(Labels(bdf=bdfs)) - self.logger.debug(f"Excluding Shared NICs connected to Network Services: {allocated_component}") - av, exclude = self.__exclude_allocated_pci_device_from_shared_nic(shared_nic=av, - allocated_nic=allocated_component) + logger.debug(f"Excluding Shared NICs connected to Network Services: {allocated_component}") + av, exclude = NetworkNodeInventory.__exclude_allocated_pci_device_from_shared_nic(shared=av, + allocated=allocated_component, + logger=logger) if exclude: comps_to_remove.append(av) for c in comps_to_remove: - self.logger.debug(f"Excluding component: {c.get_name()}") - print(f"Excluding component: {c.get_name()}") + logger.debug(f"Excluding component: {c.get_name()}") graph_node.attached_components_info.remove_device(name=c.get_name()) - self.logger.debug(f"requested_components: {requested_components.devices.values()} for reservation# {rid}") + logger.debug(f"requested_components: {requested_components.devices.values()} for reservation# {rid}") for name, requested_component in requested_components.devices.items(): if operation == ReservationOperation.Modify and requested_component.get_node_map() is not None: - self.logger.debug(f"Modify: Ignoring Allocated component: {requested_component}") + logger.debug(f"Modify: Ignoring Allocated component: {requested_component}") continue if operation == ReservationOperation.Extend and requested_component.get_node_map() is not None: @@ -464,7 +477,7 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent if isinstance(requested_component.labels.bdf, str): bdfs = [requested_component.labels.bdf] - self.logger.debug(f"Allocated BDFs: {allocated_bdfs}") + logger.debug(f"Allocated BDFs: {allocated_bdfs}") for x in bdfs: if x in allocated_bdfs: raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, @@ -476,10 +489,10 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent msg=f"Renew failed: Component of type: {requested_component.get_model()} " f"already in use by another reservation for node: {graph_node.node_id}") - self.logger.debug(f"Renew: Component {requested_component} still available") + logger.debug(f"Renew: Component {requested_component} still available") continue - self.logger.debug(f"Create: Allocating component: {requested_component}") + logger.debug(f"Create: Allocating component: {requested_component}") resource_type = requested_component.get_type() resource_model = requested_component.get_model() if resource_type == ComponentType.Storage: @@ -488,7 +501,7 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent requested_component.label_allocations = Labels.update(lab=requested_component.get_labels()) continue available_components = graph_node.attached_components_info.get_devices_by_type(resource_type=resource_type) - self.logger.debug(f"Available components of type: {resource_type} after excluding " + logger.debug(f"Available components of type: {resource_type} after excluding " f"allocated components: {available_components}") if available_components is None or len(available_components) == 0: @@ -498,18 +511,18 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent for component in available_components: # check model matches the requested model - requested_component = self.__check_component_labels_and_capacities( - available_component=component, graph_id=graph_id, - requested_component=requested_component, - operation=operation) + requested_component = NetworkNodeInventory.__check_component_labels_and_capacities( + available=component, graph_id=graph_id, + requested=requested_component, + operation=operation, logger=logger) if requested_component.get_node_map() is not None: - self.logger.info(f"Assigning {component.node_id} to component# " + logger.info(f"Assigning {component.node_id} to component# " f"{requested_component} in reservation# {rid} ") # Remove the component from available components as it is assigned - self.__exclude_allocated_component(graph_node=graph_node, available_component=component, - allocated_component=requested_component) + NetworkNodeInventory.__exclude_allocated_component(graph_node=graph_node, available=component, + allocated=requested_component, logger=logger) break if requested_component.get_node_map() is None: @@ -554,10 +567,9 @@ def __allocate_p4_switch(self, *, rid: ID, requested_sliver: NodeSliver, graph_i requested_capacities = requested_sliver.get_capacities() # Check if Capacities can be satisfied - delegation_id = self.__check_capacities(rid=rid, - requested_capacities=requested_capacities, - delegated_capacities=graph_node.get_capacity_delegations(), - existing_reservations=existing_reservations) + delegation_id = self.check_capacities(rid=rid, requested_capacities=requested_capacities, + delegated=graph_node.get_capacity_delegations(), + existing_reservations=existing_reservations, logger=self.logger) requested_sliver.capacity_allocations = Capacities() requested_sliver.capacity_allocations = Capacities.update(lab=requested_capacities) requested_sliver.label_allocations = Labels(local_name=graph_node.get_name()) @@ -621,21 +633,21 @@ def allocate(self, *, rid: ID, requested_sliver: BaseSliver, graph_id: str, grap delegation_id = next(iter(graph_node.get_capacity_delegations().get_delegation_ids())) # Check if Capacities can be satisfied - delegation_id = self.__check_capacities(rid=rid, - requested_capacities=requested_capacities, - delegated_capacities=graph_node.get_capacity_delegations(), - existing_reservations=existing_reservations) + delegation_id = self.check_capacities(rid=rid, requested_capacities=requested_capacities, + delegated=graph_node.get_capacity_delegations(), + existing_reservations=existing_reservations, logger=self.logger) # Check if Components can be allocated if requested_sliver.attached_components_info is not None: - requested_sliver.attached_components_info = self.__check_components( + requested_sliver.attached_components_info = self.check_components( rid=rid, requested_components=requested_sliver.attached_components_info, graph_id=graph_id, graph_node=graph_node, existing_reservations=existing_reservations, existing_components=existing_components, - operation=operation) + operation=operation, + logger=self.logger) # Do this only for create if operation == ReservationOperation.Create: diff --git a/fabric_cf/actor/core/policy/network_service_inventory.py b/fabric_cf/actor/core/policy/network_service_inventory.py index ac912b74..86612fe3 100644 --- a/fabric_cf/actor/core/policy/network_service_inventory.py +++ b/fabric_cf/actor/core/policy/network_service_inventory.py @@ -29,6 +29,7 @@ from ipaddress import IPv6Network, IPv4Network from typing import List, Tuple, Union +from fabric_cf.actor.fim.fim_helper import FimHelper from fim.slivers.capacities_labels import Labels from fim.slivers.gateway import Gateway from fim.slivers.interface_info import InterfaceSliver, InterfaceType @@ -75,13 +76,7 @@ def __exclude_allocated_vlans(self, *, rid: ID, available_vlan_range: List[int], continue # For Active or Ticketed or Ticketing reservations; reduce the counts from available - allocated_sliver = None - if reservation.is_ticketing() and reservation.get_approved_resources() is not None: - allocated_sliver = reservation.get_approved_resources().get_sliver() - - if (reservation.is_active() or reservation.is_ticketed()) and \ - reservation.get_resources() is not None: - allocated_sliver = reservation.get_resources().get_sliver() + allocated_sliver = self._get_allocated_sliver(reservation=reservation) self.logger.debug( f"Existing res# {reservation.get_reservation_id()} state:{reservation.get_state()} " @@ -154,7 +149,7 @@ def allocate_ifs(self, *, rid: ID, requested_ns: NetworkServiceSliver, requested ) return requested_ifs - delegation_id, delegated_label = self.get_delegations(lab_cap_delegations=owner_ns.get_label_delegations()) + delegation_id, delegated_label = FimHelper.get_delegations(delegations=owner_ns.get_label_delegations()) vlan_range = self.__extract_vlan_range(labels=delegated_label) if vlan_range and requested_vlan not in vlan_range: @@ -188,8 +183,8 @@ def allocate_ifs(self, *, rid: ID, requested_ns: NetworkServiceSliver, requested ) else: if bqm_ifs.get_type() != InterfaceType.FacilityPort: - delegation_id, delegated_label = self.get_delegations( - lab_cap_delegations=owner_ns.get_label_delegations()) + delegation_id, delegated_label = FimHelper.get_delegations( + delegations=owner_ns.get_label_delegations()) vlan_range = self.__extract_vlan_range(labels=delegated_label) else: vlan_range = self.__extract_vlan_range(labels=bqm_ifs.labels) @@ -283,13 +278,7 @@ def allocate_vnic(self, *, rid: ID, requested_ns: NetworkServiceSliver, owner_ns continue # For Active or Ticketed or Ticketing reservations; reduce the counts from available - allocated_sliver = None - if reservation.is_ticketing() and reservation.get_approved_resources() is not None: - allocated_sliver = reservation.get_approved_resources().get_sliver() - - if (reservation.is_active() or reservation.is_ticketed()) and \ - reservation.get_resources() is not None: - allocated_sliver = reservation.get_resources().get_sliver() + allocated_sliver = self._get_allocated_sliver(reservation=reservation) self.logger.debug(f"Existing res# {reservation.get_reservation_id()} " f"allocated: {allocated_sliver}") @@ -333,7 +322,7 @@ def allocate(self, *, rid: ID, requested_ns: NetworkServiceSliver, owner_ns: Net return requested_ns # Grab Label Delegations - delegation_id, delegated_label = self.get_delegations(lab_cap_delegations=owner_ns.get_label_delegations()) + delegation_id, delegated_label = FimHelper.get_delegations(delegations=owner_ns.get_label_delegations()) # HACK to use FabNetv6 for FabNetv6Ext as both have the same range requested_ns_type = requested_ns.get_type() @@ -472,20 +461,6 @@ def _exclude_allocated_subnets(self, *, subnet_list: List, requested_ns_type: st return subnet_list - def _get_allocated_sliver(self, reservation: ABCReservationMixin) -> NetworkServiceSliver: - """ - Retrieve the allocated sliver from the reservation. - - :param reservation: An instance of ABCReservationMixin representing the reservation to retrieve the sliver from. - :return: The allocated NetworkServiceSliver if available, otherwise None. - """ - if reservation.is_ticketing() and reservation.get_approved_resources() is not None: - return reservation.get_approved_resources().get_sliver() - if (reservation.is_active() or reservation.is_ticketed()) and reservation.get_resources() is not None: - return reservation.get_resources().get_sliver() - - self.logger.error("Could not find the allocated Sliver - should not reach here!") - def _assign_gateway_labels(self, *, ip_network: Union[IPv4Network, IPv6Network], subnet_list: List, requested_ns: NetworkServiceSliver) -> Labels: """ @@ -496,6 +471,9 @@ def _assign_gateway_labels(self, *, ip_network: Union[IPv4Network, IPv6Network], :param requested_ns: Network Service sliver. :return: Gateway labels populated with the appropriate subnet and IP address. """ + if len(subnet_list) == 0: + raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, + msg=f"No subnets available for {requested_ns.get_site()}") gateway_labels = Labels() if requested_ns.get_type() == ServiceType.FABNetv4: # Allocate the requested network if available else allocate new network @@ -564,7 +542,7 @@ def allocate_peered_ifs(self, *, rid: ID, owner_switch: NodeSliver, ifs_labels = Labels() if owner_switch.get_name() == Constants.AL2S: - delegation_id, delegated_label = self.get_delegations(lab_cap_delegations=bqm_interface.get_label_delegations()) + delegation_id, delegated_label = FimHelper.get_delegations(delegations=bqm_interface.get_label_delegations()) local_name = delegated_label.local_name device_name = delegated_label.device_name else: diff --git a/fabric_cf/actor/core/util/smtp.py b/fabric_cf/actor/core/util/smtp.py new file mode 100644 index 00000000..106d75f7 --- /dev/null +++ b/fabric_cf/actor/core/util/smtp.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +# MIT License +# +# Copyright (c) 2024 FABRIC Testbed +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# +# Author: Komal Thareja (kthare10@renci.org) +import smtplib +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + + +# Function to load the email template from a file and replace placeholders +from typing import Tuple + + +def load_and_update_template(*, template_path: str, user: str, slice_name: str, hours_left: float) -> Tuple[str, str]: + """ + Load the Email body from the template + @param template_path: location of the template + @param user: user name + @param slice_name: slice name + @param hours_left: hours left + @return: Subject and Body + """ + with open(template_path, 'r') as template_file: + email_template = template_file.read() + + # Replace placeholders with actual values + email_content = email_template.replace('', user) \ + .replace('', slice_name) \ + .replace('', str(hours_left)) + + # Extract the subject and the body from the email content + subject_line = email_content.split('\n')[0].replace("Subject: ", "") + email_body = "\n".join(email_content.split('\n')[1:]) + + return subject_line, email_body + + +def send_email(*, smtp_config: dict, to_email: str, subject: str, body: str): + """ + Send Email to a user + :param smtp_config: SMTP config parameters + :param to_email: User's email + :param subject: Email subject + :param body Email body + + :@raise Exception in case of error + """ + # Create the message container + msg = MIMEMultipart() + msg['From'] = smtp_config.get("from_email") + msg['To'] = to_email + msg['Subject'] = subject + msg.add_header('Reply-To', smtp_config.get("reply_to_email")) + + # Attach the message body + msg.attach(MIMEText(body, 'plain')) + + try: + # Establish an SMTP connection and send the email + server = smtplib.SMTP(smtp_config.get("smtp_server"), smtp_config.get("smtp_port")) + server.starttls() # Upgrade to TLS + server.login(smtp_config.get("smtp_user"), smtp_config.get("smtp_password")) + server.sendmail(smtp_config.get("from_email"), to_email, msg.as_string()) + # print(f"Email successfully sent to {to_email}") + finally: + server.quit() diff --git a/fabric_cf/actor/fim/fim_helper.py b/fabric_cf/actor/fim/fim_helper.py index d2a809db..608455c8 100644 --- a/fabric_cf/actor/fim/fim_helper.py +++ b/fabric_cf/actor/fim/fim_helper.py @@ -26,7 +26,15 @@ from __future__ import annotations from collections import defaultdict -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Dict + +from fabric_cf.actor.core.container.maintenance import Maintenance + +from fabric_cf.actor.core.core.policy import AllocationAlgorithm +from fabric_cf.actor.core.time.term import Term + + +from fim.graph.abc_property_graph_constants import ABCPropertyGraphConstants if TYPE_CHECKING: from fabric_cf.actor.core.apis.abc_database import ABCDatabase @@ -49,20 +57,21 @@ from fim.graph.slices.abc_asm import ABCASMPropertyGraph from fim.graph.slices.neo4j_asm import Neo4jASMFactory from fim.graph.slices.networkx_asm import NetworkxASM, NetworkXASMFactory -from fim.slivers.attached_components import ComponentSliver +from fim.slivers.attached_components import ComponentSliver, AttachedComponentsInfo from fim.slivers.base_sliver import BaseSliver from fim.slivers.capacities_labels import Capacities -from fim.slivers.delegations import Delegations +from fim.slivers.delegations import Delegations, DelegationFormat from fim.slivers.interface_info import InterfaceSliver, InterfaceType from fim.slivers.network_node import NodeSliver from fim.slivers.network_service import NetworkServiceSliver, ServiceType -from fim.user import ExperimentTopology, NodeType, Component, ReservationInfo, Node, GraphFormat, Labels +from fim.user import ExperimentTopology, NodeType, Component, ReservationInfo, Node, GraphFormat, Labels, ComponentType, \ + InstanceCatalog, CapacityHints from fim.user.composite_node import CompositeNode from fim.user.interface import Interface from fim.user.topology import AdvertizedTopology from fabric_cf.actor.core.common.constants import Constants -from fabric_cf.actor.core.kernel.reservation_states import ReservationStates +from fabric_cf.actor.core.kernel.reservation_states import ReservationStates, ReservationOperation class InterfaceSliverMapping: @@ -798,3 +807,163 @@ def build_broker_query_model(db: ABCDatabase, level_0_broker_query_model: str, l interface.set_property(pname="label_allocations", pval=label_allocations) return topology.serialize(fmt=graph_format) + + @staticmethod + def candidate_nodes(*, combined_broker_model: Neo4jCBMGraph, sliver: NodeSliver, + use_capacities: bool = False) -> List[str]: + """ + Identify candidate worker nodes in the specified site that have the required number + of components as defined in the sliver. If `use_capacities` is True, the function will + additionally check that each component's capacities meet the required thresholds. + + :param combined_broker_model: The Neo4jCBMGraph instance that provides access + to the Neo4j graph model for querying nodes. + :type combined_broker_model: Neo4jCBMGraph + + :param sliver: The NodeSliver object that specifies the component requirements, + including type, model, and optionally, capacity constraints. + :type sliver: NodeSliver + + :param use_capacities: A boolean flag indicating whether to check component + capacities in addition to component types and models. + If True, the function will validate that each component’s + capacities meet or exceed the values specified in the sliver. + Defaults to False. + :type use_capacities: bool + + :return: A list of candidate node IDs that meet the component requirements + specified in the sliver. + :rtype: List[str] + """ + # modify; return existing node map + if sliver.get_node_map() is not None: + graph_id, node_id = sliver.get_node_map() + return [node_id] + + node_props = {ABCPropertyGraphConstants.PROP_SITE: sliver.site, + ABCPropertyGraphConstants.PROP_TYPE: str(NodeType.Server)} + if sliver.get_type() == NodeType.Switch: + node_props[ABCPropertyGraphConstants.PROP_TYPE] = str(NodeType.Switch) + + storage_components = [] + # remove storage components before the check + if sliver.attached_components_info is not None: + for name, c in sliver.attached_components_info.devices.items(): + if c.get_type() == ComponentType.Storage: + storage_components.append(c) + for c in storage_components: + sliver.attached_components_info.remove_device(name=c.get_name()) + + if not use_capacities: + result = combined_broker_model.get_matching_nodes_with_components( + label=ABCPropertyGraphConstants.CLASS_NetworkNode, + props=node_props, + comps=sliver.attached_components_info) + else: + result = FimHelper.get_matching_nodes_with_components(combined_broker_model=combined_broker_model, + label=ABCPropertyGraphConstants.CLASS_NetworkNode, + props=node_props, + comps=sliver.attached_components_info) + + # Skip nodes without any delegations which would be data-switch in this case + if sliver.get_type() == NodeType.Switch: + exclude = [] + for n in result: + if "p4" not in n: + exclude.append(n) + for e in exclude: + result.remove(e) + + # re-add storage components + if len(storage_components) > 0: + for c in storage_components: + sliver.attached_components_info.add_device(device_info=c) + + return result + + @staticmethod + def compute_capacities(*, sliver: NodeSliver) -> NodeSliver: + """ + Map requested sliver capacities or capacity hints to a flavor supported by Sites + @param sliver: + @return: + """ + # Compute Requested Capacities from Capacity Hints + requested_capacities = sliver.get_capacities() + requested_capacity_hints = sliver.get_capacity_hints() + catalog = InstanceCatalog() + if requested_capacities is None and requested_capacity_hints is not None: + requested_capacities = catalog.get_instance_capacities( + instance_type=requested_capacity_hints.instance_type) + sliver.set_capacities(cap=requested_capacities) + + # Compute Capacity Hints from Requested Capacities + if requested_capacity_hints is None and requested_capacities is not None: + instance_type = catalog.map_capacities_to_instance(cap=requested_capacities) + requested_capacity_hints = CapacityHints(instance_type=instance_type) + sliver.set_capacity_hints(caphint=requested_capacity_hints) + return sliver + + @staticmethod + def get_delegations(*, delegations: Delegations) -> Tuple[str or None, Union[Labels, Capacities] or None]: + # Grab Label Delegations + delegation_id, delegation = delegations.get_sole_delegation() + # ignore pool definitions and references for now + if delegation.get_format() != DelegationFormat.SinglePool: + return None, None + # get the Labels/Capacities object + delegated_label_capacity = delegation.get_details() + return delegation_id, delegated_label_capacity + + @staticmethod + def get_matching_nodes_with_components(*, combined_broker_model: Neo4jCBMGraph, label: str, props: Dict, + comps: AttachedComponentsInfo = None) -> List[str]: + assert label is not None + assert props is not None + + # collect unique types, models and count them + component_counts = defaultdict(int) + if comps is not None: + for comp in comps.list_devices(): + assert(comp.resource_model is not None or comp.resource_type is not None) + # shared nic count should always be 1 + if comp.resource_type != ComponentType.SharedNIC: + component_counts[(comp.resource_type, comp.resource_model)] = \ + component_counts[(comp.resource_type, comp.resource_model)] + 1 + else: + component_counts[(comp.resource_type, comp.resource_model)] = 1 + # unroll properties + node_props = ", ".join([x + ": " + '"' + props[x] + '"' for x in props.keys()]) + + if len(component_counts.values()) == 0: + # simple query on the properties of the node (no components) + query = f"MATCH(n:GraphNode:{label} {{GraphID: $graphId, {node_props} }}) RETURN collect(n.NodeID) as candidate_ids" + else: + # build a query list + node_query = f"MATCH(n:GraphNode:{label} {{GraphID: $graphId, {node_props} }})" + component_clauses = list() + # add a clause for every tuple + idx = 0 + for k, v in component_counts.items(): + comp_props_list = list() + if k[0] is not None: + comp_props_list.append('Type: ' + '"' + str(k[0]) + '"' + ' ') + if k[1] is not None: + comp_props_list.append('Model: ' + '"' + k[1] + '"' + ' ') + comp_props = ", ".join(comp_props_list) + + # uses pattern comprehension rather than pattern matching as per Neo4j v4+ + component_clauses.append(f" MATCH (n) -[:has]- (c{idx}:Component {{GraphID: $graphId, " + f"{comp_props}}}) WHERE c{idx}.Capacities IS NOT NULL AND " + f"apoc.convert.fromJsonMap(c{idx}.Capacities).unit >={str(v)}") + idx += 1 + query = node_query + " ".join(component_clauses) + " RETURN collect(n.NodeID) as candidate_ids" + + print(f'**** Resulting query {query=}') + + with combined_broker_model.driver.session() as session: + + val = session.run(query, graphId=combined_broker_model.graph_id).single() + if val is None: + return list() + return val.data()['candidate_ids'] \ No newline at end of file diff --git a/fabric_cf/authority/docker-compose.yml b/fabric_cf/authority/docker-compose.yml index eae01309..64872434 100644 --- a/fabric_cf/authority/docker-compose.yml +++ b/fabric_cf/authority/docker-compose.yml @@ -52,7 +52,7 @@ services: network: host context: ../../../ dockerfile: Dockerfile-auth - image: authority:1.7.0 + image: authority:1.8.0 container_name: site1-am restart: always depends_on: diff --git a/fabric_cf/authority/setup.sh b/fabric_cf/authority/setup.sh index 4f7c5434..371c5cf7 100755 --- a/fabric_cf/authority/setup.sh +++ b/fabric_cf/authority/setup.sh @@ -51,6 +51,7 @@ cp env.template $name/.env cp $config $name/config.yaml cp $arm $name/arm.graphml cp $handler1 $name/$handler1 +cp switch_handler_config.yml $name/switch_handler_config.yml if [ -z $6 ]; then cp docker-compose.yml $name/ diff --git a/fabric_cf/authority/switch_handler_config.yml b/fabric_cf/authority/switch_handler_config.yml index c5a3fbe4..9f67dd2b 100644 --- a/fabric_cf/authority/switch_handler_config.yml +++ b/fabric_cf/authority/switch_handler_config.yml @@ -22,6 +22,8 @@ # # # Author: Komal Thareja (kthare10@renci.org) +runtime: + P4: /etc/fabric/actor/playbooks/p4/bf-sde-9.7.1.tgz playbooks: location: /etc/fabric/actor/playbooks inventory_location: /etc/fabric/actor/playbooks/inventory diff --git a/fabric_cf/authority/vm_handler_config.yml b/fabric_cf/authority/vm_handler_config.yml index c77b5995..262e20dc 100644 --- a/fabric_cf/authority/vm_handler_config.yml +++ b/fabric_cf/authority/vm_handler_config.yml @@ -35,12 +35,14 @@ runtime: images: default_centos8_stream: centos default_centos9_stream: cloud-user + default_centos10_stream: cloud-user default_debian_11: debian default_debian_12: debian default_fedora_39: fedora default_fedora_40: fedora default_freebsd_13_zfs: freebsd default_freebsd_14_zfs: freebsd + default_fedora_41: fedora default_kali: kali default_openbsd_7: openbsd default_rocky_8: rocky @@ -52,6 +54,8 @@ runtime: docker_rocky_9: rocky docker_ubuntu_20: ubuntu docker_ubuntu_22: ubuntu + docker_ubuntu_24: ubuntu + attestable_bmv2_v1_ubuntu_20: ubuntu attestable_bmv2_v2_ubuntu_20: ubuntu playbooks: location: /etc/fabric/actor/playbooks diff --git a/fabric_cf/broker/config.broker.yaml b/fabric_cf/broker/config.broker.yaml index 0a8e9f2d..468169ee 100644 --- a/fabric_cf/broker/config.broker.yaml +++ b/fabric_cf/broker/config.broker.yaml @@ -139,6 +139,9 @@ actor: module: fabric_cf.actor.core.policy.broker_simpler_units_policy class: BrokerSimplerUnitsPolicy properties: + core_capacity_threshold: + enabled: true + core_usage_threshold_percent: 75 algorithm: FirstFit: # Default policy for all sites enabled: true diff --git a/fabric_cf/broker/docker-compose.yml b/fabric_cf/broker/docker-compose.yml index 7df2cb2a..61743a82 100644 --- a/fabric_cf/broker/docker-compose.yml +++ b/fabric_cf/broker/docker-compose.yml @@ -54,7 +54,7 @@ services: build: context: ../../../ dockerfile: Dockerfile-broker - image: broker:1.7.0 + image: broker:1.8.0 container_name: broker restart: always networks: diff --git a/fabric_cf/orchestrator/config.orchestrator.yaml b/fabric_cf/orchestrator/config.orchestrator.yaml index 40317563..4d1f6a10 100644 --- a/fabric_cf/orchestrator/config.orchestrator.yaml +++ b/fabric_cf/orchestrator/config.orchestrator.yaml @@ -86,6 +86,15 @@ oauth: key-refresh: 00:10:00 verify-exp: True +smtp: + smtp_server: mail.fabric-testbed.net + smtp_port: 587 + smtp_user: fabric_cf@fabric-testbed.net + smtp_password: + from_email: fabric_cf@fabric-testbed.net + reply_to_email: no-reply@fabric-testbed.net + template_path: /etc/fabric/actor/config/slice_expiration_template.txt + database: db-user: fabric db-password: fabric diff --git a/fabric_cf/orchestrator/core/advance_scheduling_thread.py b/fabric_cf/orchestrator/core/advance_scheduling_thread.py new file mode 100644 index 00000000..8663a7dc --- /dev/null +++ b/fabric_cf/orchestrator/core/advance_scheduling_thread.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +# MIT License +# +# Copyright (c) 2020 FABRIC Testbed +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# +# Author: Komal Thareja (kthare10@renci.org) +import queue +import threading +import time +import traceback + +from fabric_cf.actor.core.time.actor_clock import ActorClock + +from fabric_cf.actor.core.kernel.reservation_states import ReservationStates +from fabric_cf.actor.core.util.id import ID +from fabric_cf.actor.core.util.iterable_queue import IterableQueue +from fabric_cf.orchestrator.core.exceptions import OrchestratorException +from fabric_cf.orchestrator.core.orchestrator_slice_wrapper import OrchestratorSliceWrapper + + +class AdvanceSchedulingThread: + """ + This runs as a standalone thread started by Orchestrator and deals with determining the + nearest start time for slices requested in future. + The purpose of this thread is to help orchestrator respond back to the create + without identifying the start time and waiting for the slivers to be demanded + """ + + def __init__(self, *, kernel): + self.slice_queue = queue.Queue() + self.slice_avail_condition = threading.Condition() + self.thread_lock = threading.Lock() + self.thread = None + self.stopped = False + from fabric_cf.actor.core.container.globals import GlobalsSingleton + self.logger = GlobalsSingleton.get().get_logger() + self.mgmt_actor = kernel.get_management_actor() + self.kernel = kernel + + def queue_slice(self, *, controller_slice: OrchestratorSliceWrapper): + """ + Queue a slice + :param controller_slice: + :return: + """ + try: + self.slice_queue.put_nowait(controller_slice) + self.logger.debug(f"Added slice to slices queue {controller_slice.get_slice_id()}") + except Exception as e: + self.logger.error(f"Failed to queue slice: {controller_slice.get_slice_id()} e: {e}") + + def start(self): + """ + Start thread + :return: + """ + try: + self.thread_lock.acquire() + if self.thread is not None: + raise OrchestratorException(f"This {self.__class__.__name__} has already been started") + + self.thread = threading.Thread(target=self.run) + self.thread.setName(self.__class__.__name__) + self.thread.setDaemon(True) + self.thread.start() + + finally: + self.thread_lock.release() + + def stop(self): + """ + Stop thread + :return: + """ + self.stopped = True + try: + self.thread_lock.acquire() + temp = self.thread + self.thread = None + if temp is not None: + self.logger.warning(f"It seems that the {self.__class__.__name__} is running. " + f"Interrupting it") + try: + # TODO find equivalent of interrupt + with self.slice_avail_condition: + self.slice_avail_condition.notify_all() + temp.join() + except Exception as e: + self.logger.error(f"Could not join {self.__class__.__name__} thread {e}") + finally: + self.thread_lock.release() + finally: + if self.thread_lock is not None and self.thread_lock.locked(): + self.thread_lock.release() + + def run(self): + """ + Thread main loop + :return: + """ + self.logger.debug(f"{self.__class__.__name__} started") + while True: + slices = [] + if not self.stopped and self.slice_queue.empty(): + time.sleep(0.001) + + if self.stopped: + self.logger.info(f"{self.__class__.__name__} exiting") + return + + if not self.slice_queue.empty(): + try: + for s in IterableQueue(source_queue=self.slice_queue): + slices.append(s) + except Exception as e: + self.logger.error(f"Error while adding slice to slice queue! e: {e}") + self.logger.error(traceback.format_exc()) + + if len(slices) > 0: + self.logger.debug(f"Processing {len(slices)} slices") + for s in slices: + try: + # Process the Slice i.e. Determine the nearest start time + # If start time found, queue the slice on SliceDeferredThread + # Else move the slice to Close State with failure + self.process_slice(controller_slice=s) + except Exception as e: + self.logger.error(f"Error while processing slice {type(s)}, {e}") + self.logger.error(traceback.format_exc()) + + def process_slice(self, *, controller_slice: OrchestratorSliceWrapper): + """ + Determine nearest start time for a slice requested in future and + add to deferred slice thread for further processing + :param controller_slice: + """ + computed_reservations = controller_slice.get_computed_reservations() + + try: + controller_slice.lock() + + # Determine nearest start time in the time range requested + # If not found, start time specified in the request is used as start resulting in slice failing + # with insufficient resources error + future_start, future_end = self.kernel.determine_future_lease_time( + computed_reservations=computed_reservations, + start=controller_slice.start, end=controller_slice.end, + duration=controller_slice.lifetime) + + self.logger.debug(f"Slice: {controller_slice.slice_obj.slice_name}/{controller_slice.slice_obj.get_slice_id()}" + f" Start Time: {future_start} End: {future_end}") + + # Update slice start/end time + controller_slice.slice_obj.set_lease_start(lease_start=future_start) + controller_slice.slice_obj.set_lease_end(lease_end=future_end) + self.logger.debug(f"Update Slice {controller_slice.slice_obj.slice_name}") + self.mgmt_actor.update_slice(slice_obj=controller_slice.slice_obj) + + # Update the reservations start/end time + for r in computed_reservations: + r.set_start(value=ActorClock.to_milliseconds(when=future_start)) + r.set_end(value=ActorClock.to_milliseconds(when=future_end)) + + self.add_and_demand_reservations(controller_slice=controller_slice) + + except Exception as e: + self.logger.error(traceback.format_exc()) + self.logger.error("Unable to schedule a future slice: {}".format(e)) + self.add_and_demand_reservations(controller_slice=controller_slice) + finally: + controller_slice.unlock() + + def add_and_demand_reservations(self, controller_slice: OrchestratorSliceWrapper): + try: + # Add Reservations to relational database; + controller_slice.add_reservations() + + # Queue the slice to be demanded on Slice Defer Thread + self.kernel.get_defer_thread().queue_slice(controller_slice=controller_slice) + except Exception as e: + self.logger.error(f"SHOULD_NOT_HAPPEN: Queueing slice on Deferred Queue Failed: {controller_slice.slice_obj}") + self.logger.error(f"Exception: {e}") + self.logger.error(traceback.format_exc()) diff --git a/fabric_cf/orchestrator/core/orchestrator_handler.py b/fabric_cf/orchestrator/core/orchestrator_handler.py index d0471c69..ef50ac66 100644 --- a/fabric_cf/orchestrator/core/orchestrator_handler.py +++ b/fabric_cf/orchestrator/core/orchestrator_handler.py @@ -134,6 +134,7 @@ def discover_broker_query_model(self, *, controller: ABCMgmtControllerMixin, tok :param end: end time :param includes: comma separated lists of sites to include :param excludes: comma separated lists of sites to exclude + :param email: Email of the user on whose behalf the request is initiated :return str or None """ broker_query_model = None @@ -146,16 +147,6 @@ def discover_broker_query_model(self, *, controller: ABCMgmtControllerMixin, tok else: saved_bqm.start_refresh() - ''' - if broker_query_model is None: - if self.local_bqm and level == 2 and not force_refresh: - saved_bqm = self.controller_state.get_saved_bqm(graph_format=GraphFormat.GRAPHML, level=0) - if saved_bqm and saved_bqm.get_bqm() and len(saved_bqm.get_bqm()): - broker_query_model = controller.build_broker_query_model(level_0_broker_query_model=saved_bqm.get_bqm(), - level=level, graph_format=graph_format, - start=start, end=end, includes=includes, - excludes=excludes) - ''' # Request the model from Broker as a fallback if not broker_query_model: broker = self.get_broker(controller=controller) @@ -221,7 +212,8 @@ def list_resources(self, *, level: int, force_refresh: bool = False, start: date raise e def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key: str, - lease_start_time: datetime = None, lease_end_time: datetime = None) -> List[dict]: + lease_start_time: datetime = None, lease_end_time: datetime = None, + lifetime: int = 24) -> List[dict]: """ Create a slice :param token Fabric Identity Token @@ -230,6 +222,7 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key :param ssh_key: User ssh key :param lease_start_time: Lease Start Time (UTC) :param lease_end_time: Lease End Time (UTC) + :param lifetime: Lifetime of the slice in hours :raises Raises an exception in case of failure :returns List of reservations created for the Slice on success """ @@ -244,12 +237,10 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key fabric_token = AccessChecker.validate_and_decode_token(token=token) project, tags, project_name = fabric_token.first_project allow_long_lived = True if Constants.SLICE_NO_LIMIT_LIFETIME in tags else False - start_time, end_time = self.__compute_lease_end_time(lease_end_time=lease_end_time, - allow_long_lived=allow_long_lived, - project_id=project, lease_start_time=lease_start_time) + start_time, end_time = self.__compute_lease_end_time(lease_end_time=lease_end_time, lifetime=lifetime, + allow_long_lived=allow_long_lived, project_id=project) controller = self.controller_state.get_management_actor() - self.logger.debug(f"create_slice invoked for Controller: {controller}") # Validate the slice graph create_ts = time.time() @@ -326,23 +317,48 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key new_slice_object.lock() # Create Slivers from Slice Graph; Compute Reservations from Slivers; - computed_reservations = new_slice_object.create(slice_graph=asm_graph) + computed_reservations = new_slice_object.create(slice_graph=asm_graph, + lease_start_time=lease_start_time, + lease_end_time=lease_end_time, + lifetime=lifetime) new_slice_object.update_topology(topology=topology) # Check if Testbed in Maintenance or Site in Maintenance self.check_maintenance_mode(token=fabric_token, reservations=computed_reservations) - # Add Reservations to relational database; - new_slice_object.add_reservations() - - self.logger.info(f"OC wrapper: TIME= {time.time() - create_ts:.0f}") + # TODO Future Slice + ''' + if lease_start_time and lease_end_time and lifetime: + future_start, future_end = self.controller_state.determine_future_lease_time(computed_reservations=computed_reservations, + start=lease_start_time, end=lease_end_time, + duration=lifetime) + self.logger.debug(f"Advanced Scheduling: Slice: {slice_name}({slice_id}) lifetime: {future_start} to {future_end}") + slice_obj.set_lease_start(lease_start=future_start) + slice_obj.set_lease_end(lease_end=future_end) + self.logger.debug(f"Update Slice {slice_name}") + slice_id = controller.update_slice(slice_obj=slice_obj) + for r in computed_reservations: + r.set_start(value=ActorClock.to_milliseconds(when=future_start)) + r.set_end(value=ActorClock.to_milliseconds(when=future_end)) + ''' - # Enqueue the slice on the demand thread - # Demand thread is responsible for demanding the reservations - # Helps improve the create response time create_ts = time.time() - self.controller_state.get_defer_thread().queue_slice(controller_slice=new_slice_object) - self.logger.info(f"QU queue: TIME= {time.time() - create_ts:.0f}") + if lease_start_time and lease_end_time and lifetime: + # Enqueue future slices on Advanced Scheduling Thread to determine possible start time + # Determining start time may take time so this is done asynchronously to avoid increasing response time + # of create slice API + self.controller_state.get_advance_scheduling_thread().queue_slice(controller_slice=new_slice_object) + else: + # Enqueue the slice on the demand thread + # Demand thread is responsible for demanding the reservations + # Helps improve the create response time + + # Add Reservations to relational database; + new_slice_object.add_reservations() + self.logger.info(f"OC wrapper: TIME= {time.time() - create_ts:.0f}") + self.controller_state.get_defer_thread().queue_slice(controller_slice=new_slice_object) + self.logger.info(f"QU queue: TIME= {time.time() - create_ts:.0f}") + EventLoggerSingleton.get().log_slice_event(slice_object=slice_obj, action=ActionId.create, topology=topology) @@ -752,9 +768,9 @@ def renew_slice(self, *, token: str, slice_id: str, new_lease_end_time: datetime fabric_token = AccessChecker.validate_and_decode_token(token=token) project, tags, project_name = fabric_token.first_project allow_long_lived = True if Constants.SLICE_NO_LIMIT_LIFETIME in tags else False - start_time, new_end_time = self.__compute_lease_end_time(lease_end_time=new_lease_end_time, - allow_long_lived=allow_long_lived, - project_id=project) + _, new_end_time = self.__compute_lease_end_time(lease_end_time=new_lease_end_time, + allow_long_lived=allow_long_lived, + project_id=project) reservations = controller.get_reservations(slice_id=slice_id) if reservations is None or len(reservations) < 1: @@ -836,35 +852,48 @@ def validate_lease_time(lease_time: str) -> Union[datetime, None]: return new_time - def __compute_lease_end_time(self, lease_end_time: datetime, allow_long_lived: bool = False, - project_id: str = None, lease_start_time: datetime = None) -> Tuple[datetime, datetime]: + from datetime import datetime, timedelta, timezone + from typing import Tuple + + def __compute_lease_end_time(self, lease_end_time: datetime = None, allow_long_lived: bool = False, + project_id: str = None, + lifetime: int = Constants.DEFAULT_LEASE_IN_HOURS) -> Tuple[datetime, datetime]: """ - Validate Lease End Time - :param lease_end_time: New End Time - :param allow_long_lived: Allow long lived tokens - :param project_id: Project Id - :param lease_start_time: New Start Time - :return End Time - :raises Exception if new end time is in past + Validate and compute Lease End Time. + + :param lease_end_time: The requested end time for the lease. + :param allow_long_lived: If True, allows extended duration for leases. + :param project_id: Project ID to check for special duration limits. + :param lifetime: Requested lease duration in hours. Defaults to the system-defined lease duration. + :return: A tuple containing the start time (current time) and the computed end time. + :raises ValueError: If the lease end time is in the past. """ base_time = datetime.now(timezone.utc) - if lease_start_time and lease_start_time > base_time: - base_time = lease_start_time - if lease_end_time is None: - new_end_time = base_time + timedelta(hours=Constants.DEFAULT_LEASE_IN_HOURS) - return base_time, new_end_time - new_end_time = lease_end_time + # Raise an error if lease_end_time is in the past + if lease_end_time and lease_end_time < base_time: + raise ValueError("Requested lease end time is in the past.") - if allow_long_lived: - default_long_lived_duration = Constants.LONG_LIVED_SLICE_TIME_WEEKS - else: - default_long_lived_duration = Constants.DEFAULT_MAX_DURATION - if project_id not in self.infrastructure_project_id and (new_end_time - base_time) > default_long_lived_duration: - self.logger.info(f"New term end time {new_end_time} exceeds system default " - f"{default_long_lived_duration}, setting to system default: ") + default_max_duration = (Constants.LONG_LIVED_SLICE_TIME_WEEKS + if allow_long_lived else Constants.DEFAULT_MAX_DURATION_IN_WEEKS).total_seconds() + # Convert weeks to hours + default_max_duration /= 3600 + + # Calculate lifetime if not directly provided + if lifetime is None: + if lease_end_time: + lifetime = (lease_end_time - base_time).total_seconds() / 3600 + else: + lifetime = Constants.DEFAULT_LEASE_IN_HOURS + + # Ensure the requested lifetime does not exceed allowed max duration for the project + if project_id not in self.infrastructure_project_id and lifetime > default_max_duration: + self.logger.info(f"Requested lifetime ({lifetime} hours) exceeds the allowed duration " + f"({default_max_duration} hours). Setting to maximum allowable.") + lifetime = default_max_duration - new_end_time = base_time + default_long_lived_duration + # Calculate the new end time + new_end_time = base_time + timedelta(hours=lifetime) return base_time, new_end_time diff --git a/fabric_cf/orchestrator/core/orchestrator_kernel.py b/fabric_cf/orchestrator/core/orchestrator_kernel.py index 5fbaac68..5ebfe93d 100644 --- a/fabric_cf/orchestrator/core/orchestrator_kernel.py +++ b/fabric_cf/orchestrator/core/orchestrator_kernel.py @@ -24,7 +24,19 @@ # # Author: Komal Thareja (kthare10@renci.org) import threading +from datetime import datetime, timedelta +from heapq import heappush, heappop +from http.client import BAD_REQUEST +from typing import List, Iterator, Tuple, Optional +from fabric_cf.actor.core.time.actor_clock import ActorClock + +from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro +from fim.slivers.network_node import NodeSliver + +from fabric_cf.actor.core.kernel.reservation_states import ReservationStates + +from fabric_cf.actor.fim.fim_helper import FimHelper from fim.user import GraphFormat from fabric_cf.actor.core.apis.abc_actor_event import ABCActorEvent @@ -34,9 +46,11 @@ from fabric_cf.actor.core.core.event_processor import EventProcessor from fabric_cf.actor.core.manage.management_utils import ManagementUtils from fabric_cf.actor.core.util.id import ID +from fabric_cf.orchestrator.core.advance_scheduling_thread import AdvanceSchedulingThread from fabric_cf.orchestrator.core.bqm_wrapper import BqmWrapper from fabric_cf.orchestrator.core.exceptions import OrchestratorException from fabric_cf.orchestrator.core.reservation_status_update_thread import ReservationStatusUpdateThread +from fabric_cf.orchestrator.core.resource_tracker import ResourceTracker from fabric_cf.orchestrator.core.slice_defer_thread import SliceDeferThread @@ -49,7 +63,8 @@ def process(self): oh = OrchestratorHandler() for graph_format, level in self.model_level_list: oh.discover_broker_query_model(controller=oh.controller_state.controller, - graph_format=graph_format, force_refresh=True, level=level) + graph_format=graph_format, force_refresh=True, + level=level) class OrchestratorKernel(ABCTick): @@ -59,6 +74,7 @@ class OrchestratorKernel(ABCTick): def __init__(self): self.defer_thread = None + self.adv_sch_thread = None self.sut = None self.broker = None self.logger = None @@ -66,6 +82,8 @@ def __init__(self): self.lock = threading.Lock() self.bqm_cache = {} self.event_processor = None + self.combined_broker_model_graph_id = None + self.combined_broker_model = None def get_saved_bqm(self, *, graph_format: GraphFormat, level: int) -> BqmWrapper: """ @@ -93,6 +111,8 @@ def save_bqm(self, *, bqm: str, graph_format: GraphFormat, level: int): saved_bqm.save(bqm=bqm, graph_format=graph_format, level=level) self.bqm_cache[key] = saved_bqm + if level == 0: + self.load_model(model=bqm) finally: self.lock.release() @@ -114,6 +134,9 @@ def get_broker(self) -> ID: def get_defer_thread(self) -> SliceDeferThread: return self.defer_thread + def get_advance_scheduling_thread(self) -> AdvanceSchedulingThread: + return self.adv_sch_thread + def get_sut(self) -> ReservationStatusUpdateThread: """ Get SUT thread @@ -145,6 +168,8 @@ def stop_threads(self): Stop threads :return: """ + if self.adv_sch_thread: + self.adv_sch_thread.stop() if self.defer_thread is not None: self.defer_thread.stop() if self.event_processor is not None: @@ -152,26 +177,39 @@ def stop_threads(self): #if self.sut is not None: # self.sut.stop() + def load_model(self, model: str): + if self.combined_broker_model_graph_id: + FimHelper.delete_graph(graph_id=self.combined_broker_model_graph_id) + + self.logger.debug(f"Loading an existing Combined Broker Model Graph") + self.combined_broker_model = FimHelper.get_neo4j_cbm_graph_from_string_direct( + graph_str=model, ignore_validation=True) + self.combined_broker_model_graph_id = self.combined_broker_model.get_graph_id() + self.logger.debug( + f"Successfully loaded an Combined Broker Model Graph: {self.combined_broker_model_graph_id}") + def start_threads(self): """ Start threads :return: """ - ''' - if not len(self.bqm_cache): - self.save_bqm(bqm="", graph_format=GraphFormat.GRAPHML, level=0) - saved_bqm = self.get_saved_bqm(graph_format=GraphFormat.GRAPHML, level=0) - saved_bqm.last_query_time = None - ''' - from fabric_cf.actor.core.container.globals import GlobalsSingleton GlobalsSingleton.get().get_container().register(tickable=self) + self.logger = GlobalsSingleton.get().get_logger() + from fabric_cf.orchestrator.core.orchestrator_handler import OrchestratorHandler + oh = OrchestratorHandler() + model = oh.discover_broker_query_model(controller=self.get_management_actor(), + graph_format=GraphFormat.GRAPHML, + force_refresh=True, level=0) + self.load_model(model=model) self.get_logger().debug("Starting SliceDeferThread") self.defer_thread = SliceDeferThread(kernel=self) self.defer_thread.start() self.event_processor = EventProcessor(name="PeriodicProcessor", logger=self.logger) self.event_processor.start() + self.adv_sch_thread = AdvanceSchedulingThread(kernel=self) + self.adv_sch_thread.start() #self.get_logger().debug("Starting ReservationStatusUpdateThread") #self.sut = ReservationStatusUpdateThread() #self.sut.start() @@ -197,6 +235,125 @@ def external_tick(self, *, cycle: int): def get_name(self) -> str: return self.__class__.__name__ + def find_common_start_time(self, reservation_start_times: list[list[datetime]]) -> datetime: + """ + Find the earliest common start time for a group of reservations. + + :param reservation_start_times: A list of lists, where each sublist contains possible start times for a reservation. + :type reservation_start_times: List[List[datetime]] + :return: The earliest common start time, or None if no common start time is found. + :rtype: datetime + """ + if not reservation_start_times: + return None + + # Convert the first list to a set of datetimes + common_times = set(reservation_start_times[0]) + + # Find the intersection with other reservation start times + for start_times in reservation_start_times[1:]: + common_times.intersection_update(start_times) + + # If there are no common times, return None + if not common_times: + return None + + # Return the earliest common start time + return min(common_times) + + def determine_future_lease_time(self, computed_reservations: list[LeaseReservationAvro], start: datetime, + end: datetime, duration: int) -> tuple[datetime, datetime]: + """ + Given a set of reservations, check if the requested resources are available for all reservations + to start simultaneously. If resources are not available, find the nearest start time when all + reservations can begin together. + + :param computed_reservations: List of LeaseReservationAvro objects representing computed reservations. + :type computed_reservations: list[LeaseReservationAvro] + :param start: Requested start datetime. + :type start: datetime + :param end: Requested end datetime. + :type end: datetime + :param duration: Requested duration in hours. + :type duration: int + :return: The nearest available start time and corresponding end time when all reservations can start together. + : Given start, start + duration is returned if no future reservation time can be found resulting in + : slice closure. + :rtype: tuple of datetime, datetime + """ + states = [ReservationStates.Active.value, + ReservationStates.ActiveTicketed.value, + ReservationStates.Ticketed.value] + + # Dictionary to hold the future start times for each reservation's candidate nodes + future_start_times = [] + resource_trackers = {} + + for r in computed_reservations: + requested_sliver = r.get_sliver() + if not isinstance(requested_sliver, NodeSliver): + continue + + candidate_nodes = FimHelper.candidate_nodes(combined_broker_model=self.combined_broker_model, + sliver=requested_sliver, use_capacities=True) + + if not candidate_nodes: + self.logger.error(f'Insufficient resources: No hosts available to provision the {r.get_sliver()}') + # Reservation will fail at the Broker with Insufficient resources + # Triggering Slice closure + return start, start + timedelta(hours=duration) + + # Gather the nearest available start time per candidate node for this reservation + reservation_times = set() + for c in candidate_nodes: + cbm_node = self.combined_broker_model.build_deep_node_sliver(node_id=c) + # Skip if CBM node is not the specific host that is requested + if requested_sliver.get_labels() and requested_sliver.get_labels().instance_parent and \ + requested_sliver.get_labels().instance_parent != cbm_node.get_name(): + continue + existing = self.get_management_actor().get_reservations(node_id=c, states=states, + start=start, end=end, full=True) + if c not in resource_trackers: + resource_trackers[c] = ResourceTracker(cbm_node=cbm_node) + tracker = resource_trackers[c] + # Add slivers from reservations to the tracker + for e in existing: + tracker.update(reservation=e, start=start, end=end) + + start_times = tracker.find_next_available(requested_sliver=requested_sliver, start=start, + end=end, duration=duration) + if start_times: + start_times = sorted(start_times) + reservation_times.update(start_times) + + if not len(reservation_times): + self.logger.error(f"Sliver {requested_sliver} request cannot be satisfied in the requested duration!") + # Reservation will fail at the Broker with Insufficient resources + # Triggering Slice closure + return start, start + timedelta(hours=duration) + + # Add the earliest start time for the reservation to future_start_times + future_start_times.append(list(reservation_times)) + + # Find the nearest start time across all reservations where they can start together + simultaneous_start_time = self.find_common_start_time(reservation_start_times=future_start_times) + if not simultaneous_start_time: + self.logger.error("Slice cannot be satisfied in the requested duration!") + # Reservation will fail at the Broker with Insufficient resources + # Triggering Slice closure + return start, start + timedelta(hours=duration) + + # Verify that the simultaneous start time allows all reservations to run + # for the full requested duration + final_time = simultaneous_start_time + timedelta(hours=duration) + if final_time > end: + self.logger.error("No common start time available for the requested duration.") + # Reservation will fail at the Broker with Insufficient resources + # Triggering Slice closure + return start, start + timedelta(hours=duration) + + return simultaneous_start_time, final_time + class OrchestratorKernelSingleton: __instance = None diff --git a/fabric_cf/orchestrator/core/orchestrator_slice_wrapper.py b/fabric_cf/orchestrator/core/orchestrator_slice_wrapper.py index e2d0b412..79450f3d 100644 --- a/fabric_cf/orchestrator/core/orchestrator_slice_wrapper.py +++ b/fabric_cf/orchestrator/core/orchestrator_slice_wrapper.py @@ -26,6 +26,7 @@ import ipaddress import threading import time +from datetime import datetime from ipaddress import IPv4Network from typing import List, Tuple, Dict from http.client import BAD_REQUEST, NOT_FOUND @@ -37,8 +38,7 @@ from fabric_mb.message_bus.messages.slice_avro import SliceAvro from fim.graph.slices.abc_asm import ABCASMPropertyGraph from fim.slivers.base_sliver import BaseSliver -from fim.slivers.capacities_labels import CapacityHints, Labels -from fim.slivers.instance_catalog import InstanceCatalog +from fim.slivers.capacities_labels import Labels from fim.slivers.network_node import NodeSliver, NodeType from fim.slivers.network_service import NetworkServiceSliver from fim.slivers.topology_diff import WhatsModifiedFlag, TopologyDiff @@ -80,6 +80,9 @@ def __init__(self, *, controller: ABCMgmtControllerMixin, broker: ID, slice_obj: # Reservations trigger ModifyLease (AM) self.computed_modify_properties_reservations = [] self.thread_lock = threading.Lock() + self.start = None + self.end = None + self.lifetime = None def lock(self): """ @@ -148,13 +151,20 @@ def add_reservations(self): self.controller.add_reservation(reservation=r) self.logger.info(f"ADD TIME: {time.time() - start:.0f}") - def create(self, *, slice_graph: ABCASMPropertyGraph) -> List[LeaseReservationAvro]: + def create(self, *, slice_graph: ABCASMPropertyGraph, lease_start_time: datetime = None, + lease_end_time: datetime = None, lifetime: int = None) -> List[LeaseReservationAvro]: """ Create a slice :param slice_graph: Slice Graph + :param lease_start_time: Lease Start Time (UTC) + :param lease_end_time: Lease End Time (UTC) + :param lifetime: Lifetime of the slice in hours :return: List of computed reservations """ try: + self.start = lease_start_time + self.end = lease_end_time + self.lifetime = lifetime # Build Network Node reservations start = time.time() network_node_reservations, node_res_mapping = self.__build_network_node_reservations(slice_graph=slice_graph) @@ -417,19 +427,7 @@ def __build_node_sliver_reservation(self, *, slice_graph: ABCASMPropertyGraph, if sliver.get_type() == NodeType.VM: # Compute Requested Capacities from Capacity Hints - requested_capacities = sliver.get_capacities() - requested_capacity_hints = sliver.get_capacity_hints() - catalog = InstanceCatalog() - if requested_capacities is None and requested_capacity_hints is not None: - requested_capacities = catalog.get_instance_capacities( - instance_type=requested_capacity_hints.instance_type) - sliver.set_capacities(cap=requested_capacities) - - # Compute Capacity Hints from Requested Capacities - if requested_capacity_hints is None and requested_capacities is not None: - instance_type = catalog.map_capacities_to_instance(cap=requested_capacities) - requested_capacity_hints = CapacityHints(instance_type=instance_type) - sliver.set_capacity_hints(caphint=requested_capacity_hints) + FimHelper.compute_capacities(sliver=sliver) # Generate reservation for the sliver reservation = self.reservation_converter.generate_reservation(sliver=sliver, diff --git a/fabric_cf/orchestrator/core/resource_tracker.py b/fabric_cf/orchestrator/core/resource_tracker.py new file mode 100644 index 00000000..6f6330c4 --- /dev/null +++ b/fabric_cf/orchestrator/core/resource_tracker.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +# MIT License +# +# Copyright (c) 2020 FABRIC Testbed +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# +# Author: Komal Thareja (kthare10@renci.org) +from collections import defaultdict +from datetime import datetime, timedelta + +from fabric_cf.actor.core.time.actor_clock import ActorClock +from fabric_mb.message_bus.messages.reservation_mng import ReservationMng +from fim.slivers.base_sliver import BaseSliver +from fim.slivers.attached_components import ComponentSliver, ComponentType +from fim.slivers.capacities_labels import Capacities, FreeCapacity +from fim.slivers.network_node import NodeSliver + + +class TimeSlot: + """Represents a time slot for resources, tracking capacities and components.""" + + def __init__(self, end: datetime): + """ + Initialize a TimeSlot instance with end time and empty resource capacities and components. + + :param end: The end datetime for this time slot. + :type end: datetime + """ + self.end = end + self.capacities = Capacities() + self.components = {} + + def __update_capacities(self, capacities: Capacities): + """ + Update the capacities in this time slot. + + :param capacities: The capacities to add to this time slot. + :type capacities: Capacities + """ + self.capacities += capacities + + def __update_components(self, by_type: dict[ComponentType, list[ComponentSliver]]): + """ + Update the components by type in this time slot. + + :param by_type: Dictionary with component types as keys and lists of ComponentSliver as values. + :type by_type: dict[ComponentType, list[ComponentSliver]] + """ + for comp_type, comps in by_type.items(): + if comp_type not in self.components: + self.components[comp_type] = 0 + for c in comps: + if c.get_capacities(): + units = c.get_capacities().unit + else: + units = c.get_capacity_allocations().unit + self.components[comp_type] += units + + def add_sliver(self, sliver: BaseSliver): + """ + Add sliver capacities and components to the current time slot. + + :param sliver: The sliver containing resource capacities and components to add. + :type sliver: BaseSliver + """ + if isinstance(sliver, NodeSliver): + if sliver.capacity_allocations: + self.__update_capacities(capacities=sliver.capacity_allocations) + else: + self.__update_capacities(capacities=sliver.capacities) + + if sliver.attached_components_info: + self.__update_components(by_type=sliver.attached_components_info.by_type) + + def __str__(self): + """ + Return a string representation of the capacities and components in this time slot. + + :return: String representation of capacities and components. + :rtype: str + """ + return f"Capacities: {self.capacities}, Components: {self.components}" + + +class ResourceTracker: + """Tracks resource over time slots and checks availability of resources.""" + + def __init__(self, cbm_node: NodeSliver): + """ + Initialize ResourceTracker with total capacities and components from a CBM node. + + :param cbm_node: The CBM node from which to initialize capacities and components. + :type cbm_node: NodeSliver + """ + self.total_capacities = cbm_node.get_capacities() + self.total_components = {} + + if cbm_node.attached_components_info: + for comp_type, comps in cbm_node.attached_components_info.by_type.items(): + if comp_type not in self.total_components: + self.total_components[comp_type] = 0 + for c in comps: + self.total_components[comp_type] += c.get_capacities().unit + + self.time_slots = defaultdict(TimeSlot) + self.reservation_ids = set() + + def update(self, reservation: ReservationMng, start: datetime, end: datetime): + """ + Update allocated resource information. + + :param reservation: Existing Reservation. + :type reservation: ReservationMng + :param start: Requested start datetime. + :type start: datetime + :param end: Requested end datetime. + :type end: datetime + """ + # Check if reservation has already been captured, if so skip it + if reservation.get_reservation_id() in self.reservation_ids: + return + self.reservation_ids.add(reservation.get_reservation_id()) + + start = start.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) + end = end.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) + sliver_end = ActorClock.from_milliseconds(milli_seconds=reservation.get_end()) + sliver_end = sliver_end.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) + + current_time = start + while current_time < sliver_end and current_time < end: + if current_time not in self.time_slots: + self.time_slots[current_time] = TimeSlot(current_time) + + # Update the specific time slot with the sliver's resource usage + self.time_slots[current_time].add_sliver(sliver=reservation.get_sliver()) + + current_time += timedelta(hours=1) + + def __check_components(self, requested_sliver: NodeSliver, + allocated: dict[ComponentType, int]) -> bool: + """ + Check if requested components can be fulfilled by available components. + + :param requested_sliver: The sliver with requested components. + :type requested_sliver: NodeSliver + :param allocated: Dictionary of available components by type. + :type allocated: dict[ComponentType, int] + :return: True if components can be fulfilled, False otherwise. + :rtype: bool + """ + if not requested_sliver.attached_components_info: + return True + for comp_type, comps in requested_sliver.attached_components_info.by_type.items(): + if comp_type not in self.total_components: + return False + elif len(comps) > (self.total_components[comp_type] - allocated.get(comp_type, 0)): + return False + return True + + def find_next_available(self, requested_sliver: NodeSliver, start: datetime, end: datetime, + duration: int) -> list[datetime]: + """ + Find the next available time slot that can fulfill the requested sliver's capacities and components. + + :param requested_sliver: The sliver with requested capacities and components. + :type requested_sliver: NodeSliver + :param start: The start datetime to begin searching for availability. + :type start: datetime + :param end: The end datetime to stop searching. + :type end: datetime + :param duration: The duration in hours for which the resources are needed. + :type duration: int + :return: List of all possible next available time slot, or empty list if not found. + :rtype: datetime + """ + current_time = start.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) + duration_timedelta = timedelta(hours=duration) + ret_val = [] + + while current_time + duration_timedelta <= end: + available = True + for i in range(duration): + time_slot_time = current_time + timedelta(hours=i) + if time_slot_time not in self.time_slots: + # If there's no entry for this time slot, assume full capacity available + continue + + time_slot = self.time_slots[time_slot_time] + free_capacity = FreeCapacity(total=self.total_capacities, allocated=time_slot.capacities) + + # Check if accumulated capacities are negative (means not enough capacity) + if free_capacity.free.negative_fields(): + available = False + break + + diff = free_capacity.free - requested_sliver.capacities + if diff.negative_fields(): + available = False + break + + if not self.__check_components(requested_sliver=requested_sliver, allocated=time_slot.components): + available = False + break + + if available: + ret_val.append(current_time) + + current_time += timedelta(hours=1) + + return ret_val diff --git a/fabric_cf/orchestrator/docker-compose.yml b/fabric_cf/orchestrator/docker-compose.yml index 904eebbf..67f4d07e 100644 --- a/fabric_cf/orchestrator/docker-compose.yml +++ b/fabric_cf/orchestrator/docker-compose.yml @@ -14,6 +14,7 @@ services: - ./nginx/default.conf:/etc/nginx/conf.d/default.conf - ./certs/fullchain.pem:/etc/ssl/public.pem - ./certs/privkey.pem:/etc/ssl/private.pem + - /opt/data/production/logs/nginx/orchestrator:/var/log/nginx restart: always neo4j: image: fabrictestbed/neo4j-apoc:5.3.0 @@ -68,7 +69,7 @@ services: build: context: ../../../ dockerfile: Dockerfile-orchestrator - image: orchestrator:1.7.0 + image: orchestrator:1.8.0 container_name: orchestrator restart: always depends_on: @@ -84,6 +85,7 @@ services: - ../../../secrets/snakeoil-ca-1.crt:/etc/fabric/message_bus/ssl/cacert.pem - ../../../secrets/kafkacat1.client.key:/etc/fabric/message_bus/ssl/client.key - ../../../secrets/kafkacat1-ca1-signed.pem:/etc/fabric/message_bus/ssl/client.pem + - ./slice_expiration_template.txt:/etc/fabric/actor/config/slice_expiration_template.txt #- ./state_recovery.lock:/usr/src/app/state_recovery.lock networks: frontend: diff --git a/fabric_cf/orchestrator/fabricTags.OrchestratorTags.xml b/fabric_cf/orchestrator/fabricTags.OrchestratorTags.xml index 12b2f090..868028be 100644 --- a/fabric_cf/orchestrator/fabricTags.OrchestratorTags.xml +++ b/fabric_cf/orchestrator/fabricTags.OrchestratorTags.xml @@ -1 +1 @@ -http://www.w3.org/TR/1999/REC-xpath-19991116http://www.w3.org/TR/1999/REC-xpath-19991116createhttp://www.w3.org/TR/1999/REC-xpath-19991116sliverhttp://www.w3.org/TR/1999/REC-xpath-19991116sliverSwitch.P4Policy Violation: Your project is lacking Switch.P4 tag to provision a P4 switch.Policy Violation: Policy returned deny for an unknown reason. This is an internal error.http://www.w3.org/TR/1999/REC-xpath-19991116sliverhttp://www.w3.org/TR/1999/REC-xpath-19991116sliver2VM.NoLimitCPUPolicy Violation: Your project is lacking VM.NoLimitCPU or VM.NoLimit tag to provision VM with more than 2 cores.10VM.NoLimitDiskPolicy Violation: Your project is lacking VM.NoLimitDisk or VM.NoLimit tag to provision VM with disk over 10GB.10VM.NoLimitRAMPolicy Violation: Your project is lacking VM.NoLimitRAM or VM.NoLimit tag to provision VM with more than 10GB of RAM.http://www.w3.org/TR/1999/REC-xpath-19991116sliver21010VM.NoLimithttp://www.w3.org/TR/1999/REC-xpath-19991116sliverhttp://www.w3.org/TR/1999/REC-xpath-19991116sliverGPUComponent.GPUPolicy Violation: Your project is lacking Component.GPU tag to provision a VM with GPU.SmartNICComponent.SmartNICPolicy Violation: Your project is lacking Component.SmartNIC tag to provision a VM with SmartNIC.StorageComponent.StoragePolicy Violation: Your project is lacking Component.Storage tag to provision a VM with attached storage.FPGAComponent.FPGAPolicy Violation: Your project is lacking Component.FPGA tag to provision a VM with FPGA.NVMEComponent.NVMEPolicy Violation: Your project is lacking Component.NVME tag to provision a VM with NVME.http://www.w3.org/TR/1999/REC-xpath-19991116sliverhttp://www.w3.org/TR/1999/REC-xpath-19991116sliver0Net.FABNetv4ExtPolicy Violation: Your project is lacking Net.FABNetv4Ext tag to request a slice with external IPv4 connectivity.0Net.FABNetv6ExtPolicy Violation: Your project is lacking Net.FABNetv6Ext tag to request a slice with external IPv6 connectivity.0Net.PortMirroringPolicy Violation: Your project is lacking Net.PortMirroring tag to request a slice that uses port mirroring.1Slice.MultisitePolicy Violation: Your project is lacking Slice.Multisite tag to request a slice spanning multiple sites.EDUKYSlice.OnlyEDUKYPolicy Violation: Your project is tagged as OnlyEDUKY and cannot use resources on sites other than EDUKY.10Net.NoLimitBWPolicy Violation: Your project is lacking Net.NoLimitBW tag to request links with bandwidth over 10Gbps.0Net.FacilityPort.Policy Violation: Your project is lacking Net.FacilityPort.<facility-port-name> tag to request a connection to one or more of the facilities.http://www.w3.org/TR/1999/REC-xpath-19991116sliverhttp://www.w3.org/TR/1999/REC-xpath-19991116slivertrueSlice.MeasurementsPolicy Violation: Your project is lacking Slice.Measurements tag to request measurement resources.P14DT5MSlice.NoLimitLifetimePolicy Violation: Your project is lacking Slice.NoLimitLifetime tag so you cannot request resource lifetime longer than two weeks.http://www.w3.org/TR/1999/REC-xpath-19991116modifyhttp://www.w3.org/TR/1999/REC-xpath-19991116sliverPolicy Violation: You are not the creator of this resource and not the member of the same project so you cannot modify it.http://www.w3.org/TR/1999/REC-xpath-19991116deletehttp://www.w3.org/TR/1999/REC-xpath-19991116sliverPolicy Violation: You are not the creator of this resource and cannot delete it.http://www.w3.org/TR/1999/REC-xpath-19991116renewhttp://www.w3.org/TR/1999/REC-xpath-19991116sliverPolicy Violation: You are not the creator of this resource and not the member of the same project so you cannot renew it.http://www.w3.org/TR/1999/REC-xpath-19991116sliverP14DT5MSlice.NoLimitLifetimePolicy Violation: Your project is lacking Slice.NoLimitLifetime tag so you cannot renew resource lifetime by longer than two weeks.http://www.w3.org/TR/1999/REC-xpath-19991116querystatusredeemPOAdemandupdatecloseclaimreclaimticketextendrelinquishhttp://www.w3.org/TR/1999/REC-xpath-19991116querystatusredeemPOAdemandupdatecloseclaimreclaimticketextendrelinquish \ No newline at end of file +http://www.w3.org/TR/1999/REC-xpath-19991116http://www.w3.org/TR/1999/REC-xpath-19991116createhttp://www.w3.org/TR/1999/REC-xpath-19991116sliverhttp://www.w3.org/TR/1999/REC-xpath-19991116sliver2VM.NoLimitCPUPolicy Violation: Your project is lacking VM.NoLimitCPU or VM.NoLimit tag to provision VM with more than 2 cores.Policy Violation: Policy returned deny for an unknown reason. This is an internal error.10VM.NoLimitDiskPolicy Violation: Your project is lacking VM.NoLimitDisk or VM.NoLimit tag to provision VM with disk over 10GB.10VM.NoLimitRAMPolicy Violation: Your project is lacking VM.NoLimitRAM or VM.NoLimit tag to provision VM with more than 10GB of RAM.http://www.w3.org/TR/1999/REC-xpath-19991116sliver21010VM.NoLimithttp://www.w3.org/TR/1999/REC-xpath-19991116sliverhttp://www.w3.org/TR/1999/REC-xpath-19991116sliverGPUComponent.GPUPolicy Violation: Your project is lacking Component.GPU tag to provision a VM with GPU.SmartNICComponent.SmartNICPolicy Violation: Your project is lacking Component.SmartNIC tag to provision a VM with SmartNIC.StorageComponent.StoragePolicy Violation: Your project is lacking Component.Storage tag to provision a VM with attached storage.FPGAComponent.FPGAPolicy Violation: Your project is lacking Component.FPGA tag to provision a VM with FPGA.NVMEComponent.NVMEPolicy Violation: Your project is lacking Component.NVME tag to provision a VM with NVME.http://www.w3.org/TR/1999/REC-xpath-19991116switch-p4http://www.w3.org/TR/1999/REC-xpath-19991116switch-p4Switch.P4Policy Violation: Your project is lacking Switch.P4 tag to provision a P4 switch.http://www.w3.org/TR/1999/REC-xpath-19991116sliverhttp://www.w3.org/TR/1999/REC-xpath-19991116sliver0Net.FABNetv4ExtPolicy Violation: Your project is lacking Net.FABNetv4Ext tag to request a slice with external IPv4 connectivity.0Net.FABNetv6ExtPolicy Violation: Your project is lacking Net.FABNetv6Ext tag to request a slice with external IPv6 connectivity.0Net.PortMirroringPolicy Violation: Your project is lacking Net.PortMirroring tag to request a slice that uses port mirroring.1Slice.MultisitePolicy Violation: Your project is lacking Slice.Multisite tag to request a slice spanning multiple sites.EDUKYSlice.OnlyEDUKYPolicy Violation: Your project is tagged as OnlyEDUKY and cannot use resources on sites other than EDUKY.10Net.NoLimitBWPolicy Violation: Your project is lacking Net.NoLimitBW tag to request links with bandwidth over 10Gbps.0Net.FacilityPort.Policy Violation: Your project is lacking Net.FacilityPort.<facility-port-name> tag to request a connection to one or more of the facilities.http://www.w3.org/TR/1999/REC-xpath-19991116sliverhttp://www.w3.org/TR/1999/REC-xpath-19991116slivertrueSlice.MeasurementsPolicy Violation: Your project is lacking Slice.Measurements tag to request measurement resources.P14DT5MSlice.NoLimitLifetimePolicy Violation: Your project is lacking Slice.NoLimitLifetime tag so you cannot request resource lifetime longer than two weeks.http://www.w3.org/TR/1999/REC-xpath-19991116modifyhttp://www.w3.org/TR/1999/REC-xpath-19991116sliverPolicy Violation: You are not the creator of this resource and not the member of the same project so you cannot modify it.http://www.w3.org/TR/1999/REC-xpath-19991116deletehttp://www.w3.org/TR/1999/REC-xpath-19991116sliverPolicy Violation: You are not the creator of this resource and cannot delete it.http://www.w3.org/TR/1999/REC-xpath-19991116renewhttp://www.w3.org/TR/1999/REC-xpath-19991116sliverPolicy Violation: You are not the creator of this resource and not the member of the same project so you cannot renew it.http://www.w3.org/TR/1999/REC-xpath-19991116sliverP14DT5MSlice.NoLimitLifetimePolicy Violation: Your project is lacking Slice.NoLimitLifetime tag so you cannot renew resource lifetime by longer than two weeks.http://www.w3.org/TR/1999/REC-xpath-19991116querystatusredeemPOAdemandupdatecloseclaimreclaimticketextendrelinquishhttp://www.w3.org/TR/1999/REC-xpath-19991116querystatusredeemPOAdemandupdatecloseclaimreclaimticketextendrelinquish \ No newline at end of file diff --git a/fabric_cf/orchestrator/openapi.json b/fabric_cf/orchestrator/openapi.json index 0e2908a9..86dd6a5e 100644 --- a/fabric_cf/orchestrator/openapi.json +++ b/fabric_cf/orchestrator/openapi.json @@ -914,6 +914,18 @@ "type": "string" } }, + { + "name": "lifetime", + "in": "query", + "description": "Lifetime of the slice requested in hours.", + "required": false, + "style": "form", + "explode": true, + "schema": { + "type": "integer", + "default": 24 + } + }, { "name": "lease_start_time", "in": "query", diff --git a/fabric_cf/orchestrator/setup.sh b/fabric_cf/orchestrator/setup.sh index de41d7a4..39c7f2e6 100755 --- a/fabric_cf/orchestrator/setup.sh +++ b/fabric_cf/orchestrator/setup.sh @@ -45,6 +45,7 @@ config=$3 mkdir -p $name/logs $name/pg_data/data $name/pg_data/logs $name/neo4j/data $name/neo4j/imports $name/neo4j/logs $name/pdp/conf $name/pdp/policies echo $neo4jpwd > $name/neo4j/password cp fabricTags.OrchestratorTags.xml $name/pdp/policies +cp slice_expiration_template.txt $name/ cp env.template $name/.env cp $config $name/config.yaml cp -R nginx $name/ diff --git a/fabric_cf/orchestrator/slice_expiration_template.txt b/fabric_cf/orchestrator/slice_expiration_template.txt new file mode 100644 index 00000000..3e666611 --- /dev/null +++ b/fabric_cf/orchestrator/slice_expiration_template.txt @@ -0,0 +1,21 @@ +Subject: FABRIC Reminders | Urgent: Your Slice Will Expire Soon + +Dear , + +We hope your experience with FABRIC has been productive. + +This is a reminder that your slice is scheduled to expire in hours. Once expired, the resources associated with your slice will be automatically released, which may result in the loss of any unsaved data or unfinished experiments. + +If you need additional time for your work, we encourage you to extend the slice before it expires. Alternatively, please ensure that you capture all necessary results from your experiments to avoid data loss. + +Here are a few steps you can take: +- **Extend the Slice**: If your experiment requires more time, you can extend your slice duration from the FABRIC portal. +- **Save Results**: If you're finished with your experiment, please download and store any critical results before the slice expires. + +If you have any questions or need assistance, feel free to reach out to us. We're here to support you! + +Thank you for using FABRIC, and we look forward to your continued success. + +Best regards, +The FABRIC Team +https://portal.fabric-testbed.net/ \ No newline at end of file diff --git a/fabric_cf/orchestrator/swagger_server/controllers/slices_controller.py b/fabric_cf/orchestrator/swagger_server/controllers/slices_controller.py index ec643207..36268b0c 100644 --- a/fabric_cf/orchestrator/swagger_server/controllers/slices_controller.py +++ b/fabric_cf/orchestrator/swagger_server/controllers/slices_controller.py @@ -32,23 +32,25 @@ def slices_create_post(body, name, ssh_key, lease_end_time=None): # noqa: E501 post_body = SlicesPost() post_body.graph_model = body.decode("utf-8") post_body.ssh_keys = [ssh_key] - return rc.slices_create_post(body=post_body, name=name, lease_end_time=lease_end_time) + return rc.slices_creates_post(body=post_body, name=name, lease_end_time=lease_end_time) -def slices_creates_post(body, name, lease_start_time=None, lease_end_time=None): # noqa: E501 +def slices_creates_post(body, name, lifetime=None, lease_start_time=None, lease_end_time=None): # noqa: E501 """Create slice Request to create slice as described in the request. Request would be a graph ML describing the requested resources. Resources may be requested to be created now or in future. On success, one or more slivers are allocated, - containing resources satisfying the request, and assigned to the given slice. This API returns list and - description of the resources reserved for the slice in the form of Graph ML. Orchestrator would also trigger - provisioning of these resources asynchronously on the appropriate sites either now or in the future as requested. + containing resources satisfying the request, and assigned to the given slice. This API returns list and description + of the resources reserved for the slice in the form of Graph ML. Orchestrator would also trigger provisioning of + these resources asynchronously on the appropriate sites either now or in the future as requested. Experimenter can invoke get slice API to get the latest state of the requested resources. # noqa: E501 :param body: Create new Slice :type body: dict | bytes :param name: Slice Name :type name: str + :param lifetime: Lifetime of the slice requested in hours. + :type lifetime: int :param lease_start_time: Lease End Time for the Slice :type lease_start_time: str :param lease_end_time: Lease End Time for the Slice @@ -58,7 +60,8 @@ def slices_creates_post(body, name, lease_start_time=None, lease_end_time=None): """ if connexion.request.is_json: body = SlicesPost.from_dict(connexion.request.get_json()) # noqa: E501 - return rc.slices_create_post(body=body, name=name, lease_start_time=lease_start_time, lease_end_time=lease_end_time) + return rc.slices_creates_post(body=body, name=name, lifetime=lifetime, + lease_start_time=lease_start_time, lease_end_time=lease_end_time) def slices_delete_delete(): # noqa: E501 @@ -90,7 +93,8 @@ def slices_delete_slice_id_delete(slice_id): # noqa: E501 def slices_get(name=None, search=None, exact_match=None, as_self=None, states=None, limit=None, offset=None): # noqa: E501 """Retrieve a listing of user slices - Retrieve a listing of user slices. It returns list of all slices belonging to all members in a project when 'as_self' is False otherwise returns only the all user's slices in a project. # noqa: E501 + Retrieve a listing of user slices. It returns list of all slices belonging to all members in a project when + 'as_self' is False otherwise returns only the all user's slices in a project. # noqa: E501 :param name: Search for Slices with the name :type name: str diff --git a/fabric_cf/orchestrator/swagger_server/response/slices_controller.py b/fabric_cf/orchestrator/swagger_server/response/slices_controller.py index b8ebd37b..084e1bd2 100644 --- a/fabric_cf/orchestrator/swagger_server/response/slices_controller.py +++ b/fabric_cf/orchestrator/swagger_server/response/slices_controller.py @@ -23,10 +23,12 @@ # # # Author: Komal Thareja (kthare10@renci.org) -from datetime import timedelta +from datetime import timedelta, datetime, timezone from http.client import BAD_REQUEST from typing import List +from fabric_cf.actor.core.common.constants import Constants + from fabric_cf.orchestrator.core.exceptions import OrchestratorException from fabric_cf.orchestrator.core.orchestrator_handler import OrchestratorHandler from fabric_cf.orchestrator.swagger_server.models import Status200OkNoContentData, Slice, Sliver, SlicesPost @@ -41,24 +43,24 @@ from fabric_cf.orchestrator.swagger_server.response.utils import get_token, cors_error_response, cors_success_response -def slices_create_post(body: SlicesPost, name: str, lease_start_time: str = None, - lease_end_time: str = None) -> Slivers: # noqa: E501 +def slices_creates_post(body: SlicesPost, name, lifetime=None, lease_start_time=None, lease_end_time=None): # noqa: E501 """Create slice - Request to create slice as described in the request. Request would be a graph ML describing the requested resources. - Resources may be requested to be created now or in future. On success, one or more slivers are allocated, containing - resources satisfying the request, and assigned to the given slice. This API returns list and description of the - resources reserved for the slice in the form of Graph ML. Orchestrator would also trigger provisioning of these - resources asynchronously on the appropriate sites either now or in the future as requested. Experimenter can - invoke get slice API to get the latest state of the requested resources. # noqa: E501 + Request to create a slice as described in the request, represented as a Graph ML specifying requested resources. + Resources may be scheduled for immediate or future provisioning. On success, the allocated resources (slivers) + are returned in Graph ML form and assigned to the requested slice. The orchestrator triggers asynchronous + provisioning on appropriate sites based on the request timing. Experimenters can invoke the 'get slice' API to + obtain the latest state of the requested resources. # noqa: E501 - :param body: Create new Slice + :param body: Contains the slice details, including requested resources. :type body: dict | bytes :param name: Slice Name :type name: str - :param lease_start_time: Lease End Time for the Slice + :param lifetime: Optional. The requested slice duration in hours. + :type lifetime: int + :param lease_start_time: Requested lease start time for the slice. :type lease_start_time: str - :param lease_end_time: Lease End Time for the Slice + :param lease_end_time: Requested lease end time for the slice. :type lease_end_time: str :rtype: Slivers @@ -72,22 +74,56 @@ def slices_create_post(body: SlicesPost, name: str, lease_start_time: str = None ssh_key = ','.join(body.ssh_keys) start = handler.validate_lease_time(lease_time=lease_start_time) end = handler.validate_lease_time(lease_time=lease_end_time) - if start and end and (end - start) < timedelta(minutes=60): - raise OrchestratorException(http_error_code=BAD_REQUEST, - message="Requested Lease should be at least 60 minutes long!") - + now = datetime.now(timezone.utc) + + # Check that start time is at least 60 minutes in the future + if start and (start - now) < timedelta(minutes=60): + raise OrchestratorException( + http_error_code=BAD_REQUEST, + message="Requested Start Time should be at least 60 minutes from the current time!" + ) + + # Check for valid lease duration between start and end times + if start and end: + diff = end - start + if diff < timedelta(minutes=60): + raise OrchestratorException( + http_error_code=BAD_REQUEST, + message="The requested lease duration must be at least 60 minutes." + ) + + max_duration_hours = Constants.DEFAULT_MAX_DURATION_IN_WEEKS.total_seconds() / 3600 # Convert weeks to hours + if diff > timedelta(hours=max_duration_hours): + raise OrchestratorException( + http_error_code=BAD_REQUEST, + message=f"Requested lease duration exceeds the maximum allowed duration of " + f"{max_duration_hours} hours." + ) + + # Set the lifetime to 24 hours if not specified and calculate based on start and end times + if start and end: + hours = int((end - start).total_seconds() / 3600) + if not lifetime: + lifetime = Constants.DEFAULT_LEASE_IN_HOURS # Default to 24 hours if unspecified + + # Ensure lifetime does not exceed the specified lease duration + if lifetime > hours: + raise OrchestratorException( + http_error_code=BAD_REQUEST, + message="The specified lifetime exceeds the allowable duration between the start and end times." + ) + + # Create the slice and assemble the response slivers_dict = handler.create_slice(token=token, slice_name=name, slice_graph=body.graph_model, lease_start_time=start, lease_end_time=end, - ssh_key=ssh_key) + ssh_key=ssh_key, lifetime=lifetime) response = Slivers() - response.data = [] - for s in slivers_dict: - sliver = Sliver().from_dict(s) - response.data.append(sliver) + response.data = [Sliver().from_dict(s) for s in slivers_dict] response.size = len(response.data) response.type = "slivers" success_counter.labels(POST_METHOD, SLICES_CREATE_PATH).inc() return cors_success_response(response_body=response) + except OrchestratorException as e: logger.exception(e) failure_counter.labels(POST_METHOD, SLICES_CREATE_PATH).inc() diff --git a/fabric_cf/orchestrator/swagger_server/swagger/swagger.yaml b/fabric_cf/orchestrator/swagger_server/swagger/swagger.yaml index 0bb28f86..c95fd97c 100644 --- a/fabric_cf/orchestrator/swagger_server/swagger/swagger.yaml +++ b/fabric_cf/orchestrator/swagger_server/swagger/swagger.yaml @@ -643,6 +643,15 @@ paths: schema: minLength: 3 type: string + - name: lifetime + in: query + description: Lifetime of the slice requested in hours. + required: false + style: form + explode: true + schema: + type: integer + default: 24 - name: lease_start_time in: query description: Lease End Time for the Slice diff --git a/fabric_cf/orchestrator/test/test.yaml b/fabric_cf/orchestrator/test/test.yaml index f8641d73..4560906d 100644 --- a/fabric_cf/orchestrator/test/test.yaml +++ b/fabric_cf/orchestrator/test/test.yaml @@ -85,7 +85,7 @@ database: pdp: url: http://localhost:8080/services/pdp - enable: False + enable: True neo4j: url: bolt://localhost:9687 diff --git a/pyproject.toml b/pyproject.toml index 0603574a..9f1cb336 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ keywords = ["Swagger", "Fabric Control Framework"] requires-python = '>=3.9' dependencies = [ "requests >= 2.28.1", - "cryptography==40.0.2", + "cryptography==43.0.3", "psycopg2-binary", "sqlalchemy", "waitress", @@ -26,9 +26,9 @@ dependencies = [ "connexion==2.14.2", "swagger-ui-bundle==0.0.9", "PyYAML", - "fabric_fss_utils==1.5.1", + "fabric_fss_utils==1.6.0", "fabric-message-bus==1.7.0", - "fabric-fim==1.7.0", + "fabric-fim==1.8.0", "fabric-credmgr-client==1.6.1", "ansible" ] diff --git a/tools/audit.py b/tools/audit.py index 5b78508e..511a6c85 100644 --- a/tools/audit.py +++ b/tools/audit.py @@ -27,11 +27,15 @@ import logging import os import re +import smtplib import traceback from datetime import datetime, timezone, timedelta from logging.handlers import RotatingFileHandler import yaml +from fabric_mb.message_bus.messages.slice_avro import SliceAvro + +from fabric_cf.actor.core.kernel.slice import SliceTypes from fabric_mb.message_bus.messages.auth_avro import AuthAvro from fim.slivers.network_service import ServiceType @@ -43,6 +47,7 @@ from fabric_cf.actor.core.manage.kafka.kafka_mgmt_message_processor import KafkaMgmtMessageProcessor from fabric_cf.actor.core.plugins.db.actor_database import ActorDatabase from fabric_cf.actor.core.util.id import ID +from fabric_cf.actor.core.util.smtp import send_email, load_and_update_template from fabric_cf.actor.fim.fim_helper import FimHelper @@ -82,6 +87,8 @@ def __init__(self, config_file: str, am_config_file: str): # Actor Config self.actor_config = config_dict[Constants.CONFIG_SECTION_ACTOR] + self.smtp_config = config_dict.get(Constants.CONFIG_SECTION_SMTP) + # Load config in the GlobalsSingleton from fabric_cf.actor.core.container.globals import GlobalsSingleton GlobalsSingleton.get().config_file = config_file @@ -241,6 +248,10 @@ def execute_ansible(self, *, inventory_path: str, playbook_path: str, extra_vars return ansible_helper.get_result_callback() def clean_sliver_close_fail(self): + """ + Clean the slivers in Close Fail state + @return: + """ try: actor_type = self.actor_config[Constants.TYPE] if actor_type.lower() != ActorType.Broker.name.lower(): @@ -257,14 +268,83 @@ def clean_sliver_close_fail(self): term = s.get_term() end = term.get_end_time() if term else None now = datetime.now(timezone.utc) - if end and end <= now: + if end and end < now: actor_db.remove_reservation(rid=s.get_reservation_id()) except Exception as e: self.logger.error(f"Failed to cleanup inconsistencies: {e}") self.logger.error(traceback.format_exc()) + def send_slice_expiry_email_warnings(self): + """ + Sends warning emails to users whose slices are about to expire within 12 hours or 6 hours. + + This function checks the expiration times of active slices and sends warning emails to the + slice owners if the slice is set to expire in less than 12 hours or 6 hours. The function is + intended to run periodically (e.g., once an hour) and uses a template for email content. + + @return: None: This function does not return any value but sends out emails and logs the process. + """ + actor_type = self.actor_config[Constants.TYPE] + if actor_type.lower() != ActorType.Orchestrator.name.lower(): + return + + actor_db = ActorDatabase(user=self.database_config[Constants.PROPERTY_CONF_DB_USER], + password=self.database_config[Constants.PROPERTY_CONF_DB_PASSWORD], + database=self.database_config[Constants.PROPERTY_CONF_DB_NAME], + db_host=self.database_config[Constants.PROPERTY_CONF_DB_HOST], + logger=self.logger) + + # Get the currently active slices + slices = actor_db.get_slices(states=SliceState.list_values_ex_closing_dead(), + slc_type=[SliceTypes.ClientSlice]) + + now = datetime.now(timezone.utc) + + for s in slices: + email = s.get_owner().get_email() + if s.get_lease_end(): + diff = s.get_lease_end() - now + hours_left = diff.total_seconds() // 3600 + + # Check if it's 12 hours or 6 hours before expiration + if 11 < hours_left <= 12: + # Send a 12-hour prior warning email + try: + subject, body = load_and_update_template( + template_path=self.smtp_config.get("template_path"), + user=email, + slice_name=f"{s.get_name()}/{s.get_slice_id()}", + hours_left=12 + ) + send_email(smtp_config=self.smtp_config, to_email=email, subject=subject, body=body) + self.logger.info(f"Sent 12-hour prior warning to {email} for slice {s.get_name()}") + except smtplib.SMTPAuthenticationError as e: + self.logger.error(f"Failed to send email: Error: {e.smtp_code} Message: {e.smtp_error}") + except Exception as e: + self.logger.error(f"Failed to send email: Error: {e}") + + elif 5 < hours_left <= 6: + # Send a 6-hour prior warning email + try: + subject, body = load_and_update_template( + template_path=self.smtp_config.get("template_path"), + user=email, + slice_name=f"{s.get_name()}/{s.get_slice_id()}", + hours_left=6 + ) + send_email(smtp_config=self.smtp_config, to_email=email, subject=subject, body=body) + self.logger.info(f"Sent 6-hour prior warning to {email} for slice {s.get_name()}") + except smtplib.SMTPAuthenticationError as e: + self.logger.error(f"Failed to send email: Error: {e.smtp_code} Message: {e.smtp_error}") + except Exception as e: + self.logger.error(f"Failed to send email: Error: {e}") + def clean_sliver_inconsistencies(self): + """ + Clean up any sliver inconsistencies between CF, Libvirt and Openstack + @return: + """ try: actor_type = self.actor_config[Constants.TYPE] if actor_type.lower() != ActorType.Authority.name.lower() or self.am_config_dict is None: @@ -397,6 +477,7 @@ def handle_command(self, args): self.delete_dead_closing_slice(days=args.days) self.clean_sliver_close_fail() self.clean_sliver_inconsistencies() + self.send_slice_expiry_email_warnings() else: print(f"Unsupported operation: {args.operation}") else: