From cc96a79ddbcd33a53fa53c1869ae869abd75ea22 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Mon, 9 Dec 2024 16:53:27 -0500 Subject: [PATCH 01/29] initial changes to implement quotas --- fabric_cf/actor/core/apis/abc_database.py | 42 ++++++- .../actor/core/apis/abc_mgmt_client_actor.py | 9 ++ fabric_cf/actor/core/kernel/kernel.py | 6 + .../core/manage/actor_management_object.py | 19 +++- .../core/manage/local/local_controller.py | 7 ++ .../actor/core/plugins/db/actor_database.py | 92 +++++++++++++++ fabric_cf/actor/core/time/term.py | 12 ++ fabric_cf/actor/core/util/utils.py | 84 +++++++++++++- fabric_cf/actor/db/__init__.py | 24 +++- fabric_cf/actor/db/psql_database.py | 106 +++++++++++++++++- .../orchestrator/core/orchestrator_handler.py | 27 ++--- 11 files changed, 405 insertions(+), 23 deletions(-) diff --git a/fabric_cf/actor/core/apis/abc_database.py b/fabric_cf/actor/core/apis/abc_database.py index fd6a0245..466d8319 100644 --- a/fabric_cf/actor/core/apis/abc_database.py +++ b/fabric_cf/actor/core/apis/abc_database.py @@ -477,4 +477,44 @@ def get_poas(self, *, poa_id: str = None, email: str = None, sliver_id: ID = Non @param offset offset @param states states @param last_update_time last update time - """ \ No newline at end of file + """ + + def create_quota(self, project_id: str, resource_type: str, resource_unit: str, quota_limit: int): + """ + Create a new quota record in the database. + + @param project_id: UUID of the project the quota is associated with. + @param resource_type: Type of resource (e.g., SLICE, COMPONENT). + @param resource_unit: Unit of the resource (e.g., HOURS, COUNT, GB). + @param quota_limit: Maximum allowed usage for this resource. + @return: The created `Quotas` object. + @throws: Exception if there is an error during the creation. + """ + + def get_quota_lookup(self, project_id: str): + """ + Fetches all quotas for a given project and creates a lookup dictionary. + + @param project_id: UUID of the project whose quotas are to be fetched. + @return: Dictionary with keys as (resource_type, resource_unit) and values as quota details. + @throws: Exception if there is an error during the database interaction. + """ + + def update_quota(self, reservation: ABCReservationMixin): + """ + Update an existing quota record. + + @param reservation: reservation. + @throws: Exception if there is an error during the update. + """ + + def delete_quota(self, project_id: str, resource_type: str, resource_unit: str): + """ + Delete a specific quota record. + + @param project_id: UUID of the project the quota is associated with. + @param resource_type: Type of resource (e.g., SLICE, COMPONENT). + @param resource_unit: Unit of the resource (e.g., HOURS, COUNT, GB). + @return: True if the quota was successfully deleted, False if not found. + @throws: Exception if there is an error during the deletion. + """ diff --git a/fabric_cf/actor/core/apis/abc_mgmt_client_actor.py b/fabric_cf/actor/core/apis/abc_mgmt_client_actor.py index 01a3473d..60ddd026 100644 --- a/fabric_cf/actor/core/apis/abc_mgmt_client_actor.py +++ b/fabric_cf/actor/core/apis/abc_mgmt_client_actor.py @@ -181,3 +181,12 @@ def reclaim_delegations(self, *, broker: ID, did: ID) -> DelegationAvro: @return reservation """ raise ManageException(Constants.NOT_IMPLEMENTED) + + def get_quota_lookup(self, project_id: str) -> dict: + """ + Fetches all quotas for a given project and creates a lookup dictionary. + + @param project_id: UUID of the project whose quotas are to be fetched. + @return: Dictionary with keys as (resource_type, resource_unit) and values as quota details. + @throws: Exception if there is an error during the database interaction. + """ \ No newline at end of file diff --git a/fabric_cf/actor/core/kernel/kernel.py b/fabric_cf/actor/core/kernel/kernel.py index 4d15883e..00bcd7ec 100644 --- a/fabric_cf/actor/core/kernel/kernel.py +++ b/fabric_cf/actor/core/kernel/kernel.py @@ -26,6 +26,7 @@ import threading import time import traceback +from datetime import datetime, timezone from typing import List, Dict @@ -219,6 +220,10 @@ def close(self, *, reservation: ABCReservationMixin, force: bool = False): self.policy.close(reservation=reservation) reservation.close(force=force) self.plugin.get_database().update_reservation(reservation=reservation) + ## TODO release resources back if deleted before expiry + if reservation.get_term().get_remaining_length() > 0: + self.plugin.get_database().update_quota(reservation=reservation) + reservation.service_close() except Exception as e: err = f"An error occurred during close for reservation #{reservation.get_reservation_id()}" @@ -1453,6 +1458,7 @@ def update_ticket(self, *, reservation: ABCReservationMixin, update: Reservation self.plugin.get_database().update_reservation(reservation=reservation) if not reservation.is_failed(): reservation.service_update_ticket() + self.plugin.get_database().update_quota(reservation=reservation) except Exception as e: self.logger.error(traceback.format_exc()) self.error(err=f"An error occurred during update ticket for " diff --git a/fabric_cf/actor/core/manage/actor_management_object.py b/fabric_cf/actor/core/manage/actor_management_object.py index efc4f09c..1839e296 100644 --- a/fabric_cf/actor/core/manage/actor_management_object.py +++ b/fabric_cf/actor/core/manage/actor_management_object.py @@ -29,6 +29,8 @@ from datetime import datetime, timezone from typing import TYPE_CHECKING, List, Dict, Tuple +from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro + from fabric_cf.actor.fim.fim_helper import FimHelper from fabric_mb.message_bus.messages.reservation_mng import ReservationMng from fabric_mb.message_bus.messages.result_delegation_avro import ResultDelegationAvro @@ -902,4 +904,19 @@ def build_broker_query_model(self, level_0_broker_query_model: str, level: int, end=end, includes=includes, excludes=excludes) except Exception as e: self.logger.error(f"Exception occurred build_broker_query_model e: {e}") - self.logger.error(traceback.format_exc()) \ No newline at end of file + self.logger.error(traceback.format_exc()) + + def get_quota_lookup(self, project_id: str, caller: AuthToken) -> dict: + """ + Fetches all quotas for a given project and creates a lookup dictionary. + + @param project_id: UUID of the project whose quotas are to be fetched. + @param caller: caller + @return: Dictionary with keys as (resource_type, resource_unit) and values as quota details. + @throws: Exception if there is an error during the database interaction. + """ + try: + return self.db.get_quota_lookup(project_id=project_id) + except Exception as e: + self.logger.error(f"Exception occurred build_broker_query_model e: {e}") + self.logger.error(traceback.format_exc()) diff --git a/fabric_cf/actor/core/manage/local/local_controller.py b/fabric_cf/actor/core/manage/local/local_controller.py index 4baf8b50..bd511c05 100644 --- a/fabric_cf/actor/core/manage/local/local_controller.py +++ b/fabric_cf/actor/core/manage/local/local_controller.py @@ -244,3 +244,10 @@ def poa(self, *, poa: PoaAvro) -> bool: self.on_exception(e=e, traceback_str=traceback.format_exc()) return False + + def get_quota_lookup(self, project_id: str) -> dict: + self.clear_last() + try: + return self.manager.get_quota_lookup(project_id=project_id, caller=self.auth) + except Exception as e: + self.on_exception(e=e, traceback_str=traceback.format_exc()) diff --git a/fabric_cf/actor/core/plugins/db/actor_database.py b/fabric_cf/actor/core/plugins/db/actor_database.py index 934eb264..9d9a7a3b 100644 --- a/fabric_cf/actor/core/plugins/db/actor_database.py +++ b/fabric_cf/actor/core/plugins/db/actor_database.py @@ -30,6 +30,8 @@ from datetime import datetime from typing import List, Union, Tuple, Dict +from fim.user import ComponentType + from fabric_cf.actor.core.apis.abc_actor_mixin import ABCActorMixin, ActorType from fabric_cf.actor.core.apis.abc_broker_proxy import ABCBrokerProxy from fabric_cf.actor.core.apis.abc_controller_reservation import ABCControllerReservation @@ -44,6 +46,7 @@ from fabric_cf.actor.core.plugins.handlers.configuration_mapping import ConfigurationMapping from fabric_cf.actor.core.container.maintenance import Site from fabric_cf.actor.core.util.id import ID +from fabric_cf.actor.core.util.utils import extract_quota_usage from fabric_cf.actor.db.psql_database import PsqlDatabase @@ -994,3 +997,92 @@ def remove_poa(self, *, poa_id: str): finally: if self.lock.locked(): self.lock.release() + + def create_quota(self, project_id: str, resource_type: str, resource_unit: str, quota_limit: int): + try: + self.db.create_quota(project_id=project_id, + resource_type=resource_type, + resource_unit=resource_unit, + quota_limit=quota_limit) + finally: + if self.lock.locked(): + self.lock.release() + + def get_quota_lookup(self, project_id: str): + try: + return self.db.get_quota_lookup(project_id=project_id) + finally: + if self.lock.locked(): + self.lock.release() + + def update_quota(self, reservation: ABCReservationMixin): + print("Update Quota") + try: + slice_object = reservation.get_slice() + if not slice_object: + return + project_id = slice_object.get_project_id() + if not project_id: + return + + sliver = None + from fabric_cf.actor.core.kernel.reservation_client import ReservationClient + if isinstance(reservation, ReservationClient) and reservation.get_leased_resources() and \ + reservation.get_leased_resources().get_sliver(): + sliver = reservation.get_leased_resources().get_sliver() + if not sliver and reservation.get_resources() and reservation.get_resources().get_sliver(): + sliver = reservation.get_resources().get_sliver() + + if not sliver: + return + + if reservation.is_closed() or reservation.is_closing(): + duration = reservation.get_term().get_remaining_length() + else: + duration = reservation.get_term().get_length() + + if duration < 60: + return + + duration /= 3600000 + existing_quota = self.db.get_quota_lookup(project_id=project_id) + + sliver_quota_usage = extract_quota_usage(sliver=sliver, duration=duration) + + print(f"Existing: {existing_quota}") + print(f"Updated by: {sliver_quota_usage}") + + # Check each accumulated resource usage against its quota + for quota_key, total_duration in sliver_quota_usage.items(): + print(f"Iteration: {quota_key}") + current_duration = 0 + if quota_key in existing_quota: + current_duration = existing_quota.get(quota_key) + (resource_type, resource_unit) = quota_key + + # Return resource hours for a sliver deleted before expiry + if reservation.is_closing() or reservation.is_closed(): + usage = current_duration["quota_used"] - total_duration + if usage < 0: + usage = 0 + self.db.update_quota(project_id=project_id, + resource_type=resource_type, + resource_unit=resource_unit, quota_used=usage) + # Account for resource hours used for a new or extended sliver + else: + usage = total_duration + current_duration["quota_used"] + self.db.update_quota(project_id=project_id, + resource_type=resource_type, + resource_unit=resource_unit, quota_used=usage) + finally: + if self.lock.locked(): + self.lock.release() + + def delete_quota(self, project_id: str, resource_type: str, resource_unit: str): + try: + self.db.delete_quota(project_id=project_id, + resource_type=resource_type, + resource_unit=resource_unit) + finally: + if self.lock.locked(): + self.lock.release() diff --git a/fabric_cf/actor/core/time/term.py b/fabric_cf/actor/core/time/term.py index 3b18ecf9..dd97f859 100644 --- a/fabric_cf/actor/core/time/term.py +++ b/fabric_cf/actor/core/time/term.py @@ -331,6 +331,18 @@ def get_full_length(self) -> int: return end_ms - start_ms + 1 + def get_remaining_length(self) -> int: + """ + Returns the length of remaining term in milliseconds. The length of a term is the + number of milliseconds in the closed interval [now, end] + @returns term length + """ + now = datetime.now(timezone.utc) + current_ms = ActorClock.to_milliseconds(when=now) + end_ms = ActorClock.to_milliseconds(when=self.end_time) + + return end_ms - current_ms + 1 + def get_length(self) -> int: """ Returns the length of a term in milliseconds. The length of a term is the diff --git a/fabric_cf/actor/core/util/utils.py b/fabric_cf/actor/core/util/utils.py index 90da469b..c21b0c93 100644 --- a/fabric_cf/actor/core/util/utils.py +++ b/fabric_cf/actor/core/util/utils.py @@ -24,13 +24,14 @@ # Author Komal Thareja (kthare10@renci.org) import hashlib from bisect import bisect_left +from typing import Tuple, Dict from fabric_mb.message_bus.messages.abc_message_avro import AbcMessageAvro +from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro from fim.slivers.base_sliver import BaseSliver from fim.slivers.network_node import NodeSliver from fim.slivers.network_service import NetworkServiceSliver -from fim.user import ComponentType -from fim.user.topology import TopologyDiff, TopologyDiffTuple +from fim.user import ComponentType, InstanceCatalog from fabric_cf.actor.security.pdp_auth import ActionId @@ -118,3 +119,82 @@ def generate_sha256(*, token: str): sha256_hex = sha256_hash.hexdigest() return sha256_hex + + +def extract_quota_usage(sliver, duration: float) -> Dict[Tuple[str, str], float]: + """ + Extract quota usage from a sliver + + @param sliver: The sliver object from which resources are extracted. + @param duration: Number of hours the resources are requested for. + @return: A dictionary of resource type/unit tuples to requested amounts. + """ + unit = "HOURS" + requested_resources = {} + + # Check if the sliver is a NodeSliver + if not isinstance(sliver, NodeSliver): + return requested_resources + + allocations = sliver.get_capacity_allocations() + if not allocations and sliver.get_capacity_hints(): + catalog = InstanceCatalog() + allocations = catalog.get_instance_capacities(instance_type=sliver.get_capacity_hints().instance_type) + else: + allocations = sliver.get_capacities() + + # Extract Core, Ram, Disk Hours + requested_resources[("Core", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.core) + requested_resources[("RAM", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.ram) + requested_resources[("Disk", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.disk) + + # Extract component hours (e.g., GPU, FPGA, SmartNIC) + if sliver.attached_components_info: + for c in sliver.attached_components_info.devices.values(): + component_type = str(c.get_type()) + requested_resources[(component_type, unit)] = ( + requested_resources.get((component_type, unit), 0) + duration + ) + + return requested_resources + + +def enforce_quota_limits(quota_lookup: dict, computed_reservations: list[LeaseReservationAvro], + duration: float) -> Tuple[bool, str]: + """ + Check if the requested resources for multiple reservations are within the project's quota limits. + + @param quota_lookup: Quota Limits for various resource types. + @param computed_reservations: List of slivers requested. + @param duration: Number of hours the reservations are requested for. + @return: Tuple (True, None) if resources are within quota, or (False, message) if denied. + @throws: Exception if there is an error during the database interaction. + """ + try: + requested_resources = {} + + # Accumulate resource usage for all reservations + for r in computed_reservations: + sliver = r.get_sliver() + sliver_resources = extract_quota_usage(sliver, duration) + for key, value in sliver_resources.items(): + requested_resources[key] = requested_resources.get(key, 0) + value + + # Check each accumulated resource usage against its quota + for quota_key, total_requested_duration in requested_resources.items(): + if quota_key not in quota_lookup: + return False, f"Quota not defined for resource: {quota_key[0]} ({quota_key[1]})." + + quota_info = quota_lookup[quota_key] + available_quota = quota_info["quota_limit"] - quota_info["quota_used"] + + if total_requested_duration > available_quota: + return False, ( + f"Requested {total_requested_duration} {quota_key[1]} of {quota_key[0]}, " + f"but only {available_quota} is available." + ) + + # If all checks pass + return True, None + except Exception as e: + raise Exception(f"Error while checking reservation: {str(e)}") diff --git a/fabric_cf/actor/db/__init__.py b/fabric_cf/actor/db/__init__.py index 11f85f9b..695f8f42 100644 --- a/fabric_cf/actor/db/__init__.py +++ b/fabric_cf/actor/db/__init__.py @@ -24,12 +24,12 @@ # # Author: Komal Thareja (kthare10@renci.org) -from sqlalchemy import JSON, ForeignKey, LargeBinary, Index, TIMESTAMP -from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy import JSON, ForeignKey, LargeBinary, Index, TIMESTAMP, UUID, func, text, Float from sqlalchemy.orm import declarative_base from sqlalchemy import Column, String, Integer, Sequence from sqlalchemy.orm import relationship + Base = declarative_base() FOREIGN_KEY_ACTOR_ID = 'Actors.act_id' @@ -241,3 +241,23 @@ class Components(Base): bdf = Column(String, primary_key=True, index=True) reservation = relationship('Reservations', back_populates='components') + +class Quotas(Base): + __tablename__ = "quotas" + + resource_type = Column(String(50), primary_key=True, index=True, nullable=False) + project_id = Column(UUID(as_uuid=True), primary_key=True, index=True, nullable=False) + resource_unit = Column(String(20), primary_key=True, index=True, nullable=False, default="HOURS") + quota_limit = Column(Float, nullable=False) + quota_used = Column(Float, default=0) + created_at = Column( + TIMESTAMP(timezone=True), + nullable=False, + server_default=text("timezone('utc', now())") # Explicitly ensure UTC in PostgreSQL + ) + updated_at = Column( + TIMESTAMP(timezone=True), + nullable=False, + server_default=text("timezone('utc', now())"), + onupdate=func.now() + ) diff --git a/fabric_cf/actor/db/psql_database.py b/fabric_cf/actor/db/psql_database.py index fc4cfb0b..8015d292 100644 --- a/fabric_cf/actor/db/psql_database.py +++ b/fabric_cf/actor/db/psql_database.py @@ -36,7 +36,7 @@ from fabric_cf.actor.core.common.constants import Constants from fabric_cf.actor.core.common.exceptions import DatabaseException from fabric_cf.actor.db import Base, Clients, ConfigMappings, Proxies, Units, Reservations, Slices, ManagerObjects, \ - Miscellaneous, Actors, Delegations, Sites, Poas, Components, Metrics + Miscellaneous, Actors, Delegations, Sites, Poas, Components, Metrics, Quotas @contextmanager @@ -1715,6 +1715,110 @@ def get_metrics(self, *, project_id: str = None, user_id: str = None, excluded_p self.logger.error(Constants.EXCEPTION_OCCURRED.format(e)) raise e + def create_quota(self, project_id: str, resource_type: str, resource_unit: str, quota_limit: int): + """ + Create a new quota record in the database. + + @param project_id: UUID of the project the quota is associated with. + @param resource_type: Type of resource (e.g., SLICE, COMPONENT). + @param resource_unit: Unit of the resource (e.g., HOURS, COUNT, GB). + @param quota_limit: Maximum allowed usage for this resource. + @return: The created `Quotas` object. + @throws: Exception if there is an error during the creation. + """ + session = self.get_session() + try: + quota = Quotas( + project_id=project_id, + resource_type=resource_type, + resource_unit=resource_unit, + quota_limit=quota_limit, + quota_used=0 + ) + session.add(quota) + session.commit() + return quota + except Exception as e: + self.logger.error(Constants.EXCEPTION_OCCURRED.format(e)) + raise e + + def get_quota_lookup(self, project_id: str): + """ + Fetches all quotas for a given project and creates a lookup dictionary. + + @param project_id: UUID of the project whose quotas are to be fetched. + @return: Dictionary with keys as (resource_type, resource_unit) and values as quota details. + @throws: Exception if there is an error during the database interaction. + """ + session = self.get_session() + try: + # Fetch all quotas for the project + project_quotas = session.query(Quotas).filter(Quotas.project_id == project_id).all() + + # Create a lookup dictionary for quick quota access + quota_lookup = {} + for quota in project_quotas: + key = (quota.resource_type, quota.resource_unit) + quota_lookup[key] = { + "quota_limit": quota.quota_limit, + "quota_used": quota.quota_used, + } + return quota_lookup + except Exception as e: + raise Exception(f"Error while fetching quotas: {str(e)}") + + def update_quota(self, project_id: str, resource_type: str, resource_unit: str, **kwargs): + """ + Update an existing quota record. + + @param project_id: UUID of the project the quota is associated with. + @param resource_type: Type of resource (e.g., SLICE, COMPONENT). + @param resource_unit: Unit of the resource (e.g., HOURS, COUNT, GB). + @param kwargs: Dictionary of fields to update and their new values. + @return: The updated `Quotas` object, or None if the quota does not exist. + @throws: Exception if there is an error during the update. + """ + session = self.get_session() + try: + filter_dict = {"project_id": project_id, "resource_type": resource_type, "resource_unit": resource_unit} + quota = session.query(Quotas).filter_by(**filter_dict).one_or_none() + if not quota: + return None + + for key, value in kwargs.items(): + if hasattr(quota, key): + setattr(quota, key, value) + print(f"Updating: {quota.project_id} {quota.resource_type} {quota.resource_unit} {quota.quota_limit} {quota.quota_used}") + session.commit() + return quota + except Exception as e: + self.logger.error(Constants.EXCEPTION_OCCURRED.format(e)) + raise e + + def delete_quota(self, project_id: str, resource_type: str, resource_unit: str): + """ + Delete a specific quota record. + + @param project_id: UUID of the project the quota is associated with. + @param resource_type: Type of resource (e.g., SLICE, COMPONENT). + @param resource_unit: Unit of the resource (e.g., HOURS, COUNT, GB). + @return: True if the quota was successfully deleted, False if not found. + @throws: Exception if there is an error during the deletion. + """ + session = self.get_session() + try: + quota = session.query(Quotas).filter(Quotas.project_id == project_id and + Quotas.resource_type == resource_type and + Quotas.resource_unit == resource_unit).first() + if quota: + session.delete(quota) + session.commit() + return True + return False + except Exception as e: + self.logger.error(Constants.EXCEPTION_OCCURRED.format(e)) + raise e + def test(): logger = logging.getLogger('PsqlDatabase') diff --git a/fabric_cf/orchestrator/core/orchestrator_handler.py b/fabric_cf/orchestrator/core/orchestrator_handler.py index ef50ac66..1aa60e9c 100644 --- a/fabric_cf/orchestrator/core/orchestrator_handler.py +++ b/fabric_cf/orchestrator/core/orchestrator_handler.py @@ -30,11 +30,13 @@ from typing import List, Tuple, Union from fabric_mb.message_bus.messages.auth_avro import AuthAvro +from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro from fabric_mb.message_bus.messages.poa_avro import PoaAvro from fabric_mb.message_bus.messages.reservation_mng import ReservationMng from fabric_mb.message_bus.messages.slice_avro import SliceAvro from fim.graph.networkx_property_graph_disjoint import NetworkXGraphImporterDisjoint from fim.slivers.base_sliver import BaseSliver +from fim.slivers.network_node import NodeSliver from fim.slivers.network_service import NetworkServiceSliver from fim.user import GraphFormat from fim.user.topology import ExperimentTopology @@ -43,6 +45,7 @@ from fabric_cf.actor.core.kernel.poa import PoaStates from fabric_cf.actor.core.kernel.reservation_states import ReservationStates from fabric_cf.actor.core.time.actor_clock import ActorClock +from fabric_cf.actor.core.util.utils import enforce_quota_limits from fabric_cf.actor.fim.fim_helper import FimHelper from fabric_cf.actor.core.apis.abc_mgmt_controller_mixin import ABCMgmtControllerMixin from fabric_cf.actor.core.common.constants import Constants, ErrorCodes @@ -313,7 +316,6 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key new_slice_object = OrchestratorSliceWrapper(controller=controller, broker=broker, slice_obj=slice_obj, logger=self.logger) - create_ts = time.time() new_slice_object.lock() # Create Slivers from Slice Graph; Compute Reservations from Slivers; @@ -326,21 +328,14 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key # Check if Testbed in Maintenance or Site in Maintenance self.check_maintenance_mode(token=fabric_token, reservations=computed_reservations) - # TODO Future Slice - ''' - if lease_start_time and lease_end_time and lifetime: - future_start, future_end = self.controller_state.determine_future_lease_time(computed_reservations=computed_reservations, - start=lease_start_time, end=lease_end_time, - duration=lifetime) - self.logger.debug(f"Advanced Scheduling: Slice: {slice_name}({slice_id}) lifetime: {future_start} to {future_end}") - slice_obj.set_lease_start(lease_start=future_start) - slice_obj.set_lease_end(lease_end=future_end) - self.logger.debug(f"Update Slice {slice_name}") - slice_id = controller.update_slice(slice_obj=slice_obj) - for r in computed_reservations: - r.set_start(value=ActorClock.to_milliseconds(when=future_start)) - r.set_end(value=ActorClock.to_milliseconds(when=future_end)) - ''' + quota_lookup = controller.get_quota_lookup(project_id=project) + status, error_message = enforce_quota_limits(quota_lookup=quota_lookup, + computed_reservations=computed_reservations, + duration=(end_time-start_time).total_seconds()/3600) + + if not status: + raise OrchestratorException(http_error_code=BAD_REQUEST, + message=error_message) create_ts = time.time() if lease_start_time and lease_end_time and lifetime: From 1b61d4326955496c96445e495815290dcba96f7f Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Sat, 28 Dec 2024 10:21:34 -0500 Subject: [PATCH 02/29] updated config --- fabric_cf/actor/core/policy/network_service_inventory.py | 3 +++ fabric_cf/authority/switch_handler_config.yml | 2 ++ fabric_cf/authority/vm_handler_config.yml | 4 ++++ fabric_cf/orchestrator/core/orchestrator_handler.py | 4 +--- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/fabric_cf/actor/core/policy/network_service_inventory.py b/fabric_cf/actor/core/policy/network_service_inventory.py index a53e2aa7..86612fe3 100644 --- a/fabric_cf/actor/core/policy/network_service_inventory.py +++ b/fabric_cf/actor/core/policy/network_service_inventory.py @@ -471,6 +471,9 @@ def _assign_gateway_labels(self, *, ip_network: Union[IPv4Network, IPv6Network], :param requested_ns: Network Service sliver. :return: Gateway labels populated with the appropriate subnet and IP address. """ + if len(subnet_list) == 0: + raise BrokerException(error_code=ExceptionErrorCode.INSUFFICIENT_RESOURCES, + msg=f"No subnets available for {requested_ns.get_site()}") gateway_labels = Labels() if requested_ns.get_type() == ServiceType.FABNetv4: # Allocate the requested network if available else allocate new network diff --git a/fabric_cf/authority/switch_handler_config.yml b/fabric_cf/authority/switch_handler_config.yml index c5a3fbe4..9f67dd2b 100644 --- a/fabric_cf/authority/switch_handler_config.yml +++ b/fabric_cf/authority/switch_handler_config.yml @@ -22,6 +22,8 @@ # # # Author: Komal Thareja (kthare10@renci.org) +runtime: + P4: /etc/fabric/actor/playbooks/p4/bf-sde-9.7.1.tgz playbooks: location: /etc/fabric/actor/playbooks inventory_location: /etc/fabric/actor/playbooks/inventory diff --git a/fabric_cf/authority/vm_handler_config.yml b/fabric_cf/authority/vm_handler_config.yml index c77b5995..262e20dc 100644 --- a/fabric_cf/authority/vm_handler_config.yml +++ b/fabric_cf/authority/vm_handler_config.yml @@ -35,12 +35,14 @@ runtime: images: default_centos8_stream: centos default_centos9_stream: cloud-user + default_centos10_stream: cloud-user default_debian_11: debian default_debian_12: debian default_fedora_39: fedora default_fedora_40: fedora default_freebsd_13_zfs: freebsd default_freebsd_14_zfs: freebsd + default_fedora_41: fedora default_kali: kali default_openbsd_7: openbsd default_rocky_8: rocky @@ -52,6 +54,8 @@ runtime: docker_rocky_9: rocky docker_ubuntu_20: ubuntu docker_ubuntu_22: ubuntu + docker_ubuntu_24: ubuntu + attestable_bmv2_v1_ubuntu_20: ubuntu attestable_bmv2_v2_ubuntu_20: ubuntu playbooks: location: /etc/fabric/actor/playbooks diff --git a/fabric_cf/orchestrator/core/orchestrator_handler.py b/fabric_cf/orchestrator/core/orchestrator_handler.py index 1aa60e9c..091dea5c 100644 --- a/fabric_cf/orchestrator/core/orchestrator_handler.py +++ b/fabric_cf/orchestrator/core/orchestrator_handler.py @@ -27,16 +27,14 @@ import traceback from datetime import datetime, timedelta, timezone from http.client import NOT_FOUND, BAD_REQUEST, UNAUTHORIZED -from typing import List, Tuple, Union +from typing import List, Union from fabric_mb.message_bus.messages.auth_avro import AuthAvro -from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro from fabric_mb.message_bus.messages.poa_avro import PoaAvro from fabric_mb.message_bus.messages.reservation_mng import ReservationMng from fabric_mb.message_bus.messages.slice_avro import SliceAvro from fim.graph.networkx_property_graph_disjoint import NetworkXGraphImporterDisjoint from fim.slivers.base_sliver import BaseSliver -from fim.slivers.network_node import NodeSliver from fim.slivers.network_service import NetworkServiceSliver from fim.user import GraphFormat from fim.user.topology import ExperimentTopology From c134969996d415b44045df9429dc129991fcc0b7 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Mon, 10 Feb 2025 15:24:33 -0500 Subject: [PATCH 03/29] removed quota schema and relevant apis --- fabric_cf/actor/core/apis/abc_database.py | 40 ---- .../actor/core/apis/abc_mgmt_client_actor.py | 9 - .../actor/core/kernel/reservation_client.py | 6 +- .../core/manage/actor_management_object.py | 15 -- .../client_actor_management_object_helper.py | 5 +- .../core/manage/local/local_controller.py | 7 - .../actor/core/plugins/db/actor_database.py | 92 +--------- fabric_cf/actor/core/util/qutoa_mgr.py | 173 ++++++++++++++++++ fabric_cf/actor/core/util/utils.py | 78 -------- fabric_cf/actor/db/__init__.py | 21 --- fabric_cf/actor/db/psql_database.py | 106 ----------- fabric_cf/authority/switch_handler_config.yml | 2 +- pyproject.toml | 2 +- 13 files changed, 181 insertions(+), 375 deletions(-) create mode 100644 fabric_cf/actor/core/util/qutoa_mgr.py diff --git a/fabric_cf/actor/core/apis/abc_database.py b/fabric_cf/actor/core/apis/abc_database.py index 466d8319..54bc9c4b 100644 --- a/fabric_cf/actor/core/apis/abc_database.py +++ b/fabric_cf/actor/core/apis/abc_database.py @@ -478,43 +478,3 @@ def get_poas(self, *, poa_id: str = None, email: str = None, sliver_id: ID = Non @param states states @param last_update_time last update time """ - - def create_quota(self, project_id: str, resource_type: str, resource_unit: str, quota_limit: int): - """ - Create a new quota record in the database. - - @param project_id: UUID of the project the quota is associated with. - @param resource_type: Type of resource (e.g., SLICE, COMPONENT). - @param resource_unit: Unit of the resource (e.g., HOURS, COUNT, GB). - @param quota_limit: Maximum allowed usage for this resource. - @return: The created `Quotas` object. - @throws: Exception if there is an error during the creation. - """ - - def get_quota_lookup(self, project_id: str): - """ - Fetches all quotas for a given project and creates a lookup dictionary. - - @param project_id: UUID of the project whose quotas are to be fetched. - @return: Dictionary with keys as (resource_type, resource_unit) and values as quota details. - @throws: Exception if there is an error during the database interaction. - """ - - def update_quota(self, reservation: ABCReservationMixin): - """ - Update an existing quota record. - - @param reservation: reservation. - @throws: Exception if there is an error during the update. - """ - - def delete_quota(self, project_id: str, resource_type: str, resource_unit: str): - """ - Delete a specific quota record. - - @param project_id: UUID of the project the quota is associated with. - @param resource_type: Type of resource (e.g., SLICE, COMPONENT). - @param resource_unit: Unit of the resource (e.g., HOURS, COUNT, GB). - @return: True if the quota was successfully deleted, False if not found. - @throws: Exception if there is an error during the deletion. - """ diff --git a/fabric_cf/actor/core/apis/abc_mgmt_client_actor.py b/fabric_cf/actor/core/apis/abc_mgmt_client_actor.py index 60ddd026..01a3473d 100644 --- a/fabric_cf/actor/core/apis/abc_mgmt_client_actor.py +++ b/fabric_cf/actor/core/apis/abc_mgmt_client_actor.py @@ -181,12 +181,3 @@ def reclaim_delegations(self, *, broker: ID, did: ID) -> DelegationAvro: @return reservation """ raise ManageException(Constants.NOT_IMPLEMENTED) - - def get_quota_lookup(self, project_id: str) -> dict: - """ - Fetches all quotas for a given project and creates a lookup dictionary. - - @param project_id: UUID of the project whose quotas are to be fetched. - @return: Dictionary with keys as (resource_type, resource_unit) and values as quota details. - @throws: Exception if there is an error during the database interaction. - """ \ No newline at end of file diff --git a/fabric_cf/actor/core/kernel/reservation_client.py b/fabric_cf/actor/core/kernel/reservation_client.py index 8be75693..01360a32 100644 --- a/fabric_cf/actor/core/kernel/reservation_client.py +++ b/fabric_cf/actor/core/kernel/reservation_client.py @@ -1557,16 +1557,16 @@ def validate_redeem(self): def add_redeem_predecessor(self, *, reservation: ABCReservationMixin, filters: dict = None): if reservation.get_reservation_id() not in self.redeem_predecessors: - state = PredecessorState(reservation=reservation) + state = PredecessorState(reservation=reservation, filters=filters) self.redeem_predecessors[reservation.get_reservation_id()] = state def remove_redeem_predecessor(self, *, rid: ID): if rid in self.redeem_predecessors: self.redeem_predecessors.pop(rid) - def add_join_predecessor(self, *, predecessor): + def add_join_predecessor(self, *, predecessor: ABCReservationMixin, filters: dict = None): if predecessor.get_reservation_id() not in self.redeem_predecessors: - state = PredecessorState(reservation=predecessor) + state = PredecessorState(reservation=predecessor, filters=filters) self.join_predecessors[predecessor.get_reservation_id()] = state def get_redeem_predecessors(self) -> List[PredecessorState]: diff --git a/fabric_cf/actor/core/manage/actor_management_object.py b/fabric_cf/actor/core/manage/actor_management_object.py index 1839e296..fc06305c 100644 --- a/fabric_cf/actor/core/manage/actor_management_object.py +++ b/fabric_cf/actor/core/manage/actor_management_object.py @@ -905,18 +905,3 @@ def build_broker_query_model(self, level_0_broker_query_model: str, level: int, except Exception as e: self.logger.error(f"Exception occurred build_broker_query_model e: {e}") self.logger.error(traceback.format_exc()) - - def get_quota_lookup(self, project_id: str, caller: AuthToken) -> dict: - """ - Fetches all quotas for a given project and creates a lookup dictionary. - - @param project_id: UUID of the project whose quotas are to be fetched. - @param caller: caller - @return: Dictionary with keys as (resource_type, resource_unit) and values as quota details. - @throws: Exception if there is an error during the database interaction. - """ - try: - return self.db.get_quota_lookup(project_id=project_id) - except Exception as e: - self.logger.error(f"Exception occurred build_broker_query_model e: {e}") - self.logger.error(traceback.format_exc()) diff --git a/fabric_cf/actor/core/manage/client_actor_management_object_helper.py b/fabric_cf/actor/core/manage/client_actor_management_object_helper.py index 662b5608..67c94be0 100644 --- a/fabric_cf/actor/core/manage/client_actor_management_object_helper.py +++ b/fabric_cf/actor/core/manage/client_actor_management_object_helper.py @@ -43,8 +43,7 @@ from fabric_cf.actor.core.apis.abc_actor_runnable import ABCActorRunnable from fabric_cf.actor.core.apis.abc_controller_reservation import ABCControllerReservation -from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin -from fabric_cf.actor.core.common.constants import Constants, ErrorCodes +from fabric_cf.actor.core.common.constants import ErrorCodes from fabric_cf.actor.core.common.exceptions import ManageException from fabric_cf.actor.core.kernel.reservation_client import ClientReservationFactory from fabric_cf.actor.core.kernel.reservation_states import ReservationStates, ReservationPendingStates @@ -635,4 +634,4 @@ def run(self): result.set_message(ErrorCodes.ErrorInternalError.interpret(exception=e)) result = ManagementObject.set_exception_details(result=result, e=e) - return result \ No newline at end of file + return result diff --git a/fabric_cf/actor/core/manage/local/local_controller.py b/fabric_cf/actor/core/manage/local/local_controller.py index bd511c05..4baf8b50 100644 --- a/fabric_cf/actor/core/manage/local/local_controller.py +++ b/fabric_cf/actor/core/manage/local/local_controller.py @@ -244,10 +244,3 @@ def poa(self, *, poa: PoaAvro) -> bool: self.on_exception(e=e, traceback_str=traceback.format_exc()) return False - - def get_quota_lookup(self, project_id: str) -> dict: - self.clear_last() - try: - return self.manager.get_quota_lookup(project_id=project_id, caller=self.auth) - except Exception as e: - self.on_exception(e=e, traceback_str=traceback.format_exc()) diff --git a/fabric_cf/actor/core/plugins/db/actor_database.py b/fabric_cf/actor/core/plugins/db/actor_database.py index 9d9a7a3b..8bb1e87d 100644 --- a/fabric_cf/actor/core/plugins/db/actor_database.py +++ b/fabric_cf/actor/core/plugins/db/actor_database.py @@ -28,9 +28,8 @@ import time import traceback from datetime import datetime -from typing import List, Union, Tuple, Dict +from typing import List, Union, Dict -from fim.user import ComponentType from fabric_cf.actor.core.apis.abc_actor_mixin import ABCActorMixin, ActorType from fabric_cf.actor.core.apis.abc_broker_proxy import ABCBrokerProxy @@ -997,92 +996,3 @@ def remove_poa(self, *, poa_id: str): finally: if self.lock.locked(): self.lock.release() - - def create_quota(self, project_id: str, resource_type: str, resource_unit: str, quota_limit: int): - try: - self.db.create_quota(project_id=project_id, - resource_type=resource_type, - resource_unit=resource_unit, - quota_limit=quota_limit) - finally: - if self.lock.locked(): - self.lock.release() - - def get_quota_lookup(self, project_id: str): - try: - return self.db.get_quota_lookup(project_id=project_id) - finally: - if self.lock.locked(): - self.lock.release() - - def update_quota(self, reservation: ABCReservationMixin): - print("Update Quota") - try: - slice_object = reservation.get_slice() - if not slice_object: - return - project_id = slice_object.get_project_id() - if not project_id: - return - - sliver = None - from fabric_cf.actor.core.kernel.reservation_client import ReservationClient - if isinstance(reservation, ReservationClient) and reservation.get_leased_resources() and \ - reservation.get_leased_resources().get_sliver(): - sliver = reservation.get_leased_resources().get_sliver() - if not sliver and reservation.get_resources() and reservation.get_resources().get_sliver(): - sliver = reservation.get_resources().get_sliver() - - if not sliver: - return - - if reservation.is_closed() or reservation.is_closing(): - duration = reservation.get_term().get_remaining_length() - else: - duration = reservation.get_term().get_length() - - if duration < 60: - return - - duration /= 3600000 - existing_quota = self.db.get_quota_lookup(project_id=project_id) - - sliver_quota_usage = extract_quota_usage(sliver=sliver, duration=duration) - - print(f"Existing: {existing_quota}") - print(f"Updated by: {sliver_quota_usage}") - - # Check each accumulated resource usage against its quota - for quota_key, total_duration in sliver_quota_usage.items(): - print(f"Iteration: {quota_key}") - current_duration = 0 - if quota_key in existing_quota: - current_duration = existing_quota.get(quota_key) - (resource_type, resource_unit) = quota_key - - # Return resource hours for a sliver deleted before expiry - if reservation.is_closing() or reservation.is_closed(): - usage = current_duration["quota_used"] - total_duration - if usage < 0: - usage = 0 - self.db.update_quota(project_id=project_id, - resource_type=resource_type, - resource_unit=resource_unit, quota_used=usage) - # Account for resource hours used for a new or extended sliver - else: - usage = total_duration + current_duration["quota_used"] - self.db.update_quota(project_id=project_id, - resource_type=resource_type, - resource_unit=resource_unit, quota_used=usage) - finally: - if self.lock.locked(): - self.lock.release() - - def delete_quota(self, project_id: str, resource_type: str, resource_unit: str): - try: - self.db.delete_quota(project_id=project_id, - resource_type=resource_type, - resource_unit=resource_unit) - finally: - if self.lock.locked(): - self.lock.release() diff --git a/fabric_cf/actor/core/util/qutoa_mgr.py b/fabric_cf/actor/core/util/qutoa_mgr.py new file mode 100644 index 00000000..b5d72533 --- /dev/null +++ b/fabric_cf/actor/core/util/qutoa_mgr.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +# MIT License +# +# Copyright (c) 2020 FABRIC Testbed +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# +# Author: Komal Thareja (kthare10@renci.org) +from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro + +from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin + + +class QuotaMgr: + def __init__(self, *, core_api_host: str): + self.core_api_host = core_api_host + + def update_quota(self, reservation: ABCReservationMixin): + print("Update Quota") + try: + slice_object = reservation.get_slice() + if not slice_object: + return + project_id = slice_object.get_project_id() + if not project_id: + return + + sliver = None + from fabric_cf.actor.core.kernel.reservation_client import ReservationClient + if isinstance(reservation, ReservationClient) and reservation.get_leased_resources() and \ + reservation.get_leased_resources().get_sliver(): + sliver = reservation.get_leased_resources().get_sliver() + if not sliver and reservation.get_resources() and reservation.get_resources().get_sliver(): + sliver = reservation.get_resources().get_sliver() + + if not sliver: + return + + if reservation.is_closed() or reservation.is_closing(): + duration = reservation.get_term().get_remaining_length() + else: + duration = reservation.get_term().get_length() + + if duration < 60: + return + + duration /= 3600000 + existing_quota = self.db.get_quota_lookup(project_id=project_id) + + sliver_quota_usage = self.extract_quota_usage(sliver=sliver, duration=duration) + + print(f"Existing: {existing_quota}") + print(f"Updated by: {sliver_quota_usage}") + + # Check each accumulated resource usage against its quota + for quota_key, total_duration in sliver_quota_usage.items(): + print(f"Iteration: {quota_key}") + current_duration = 0 + if quota_key in existing_quota: + current_duration = existing_quota.get(quota_key) + (resource_type, resource_unit) = quota_key + + # Return resource hours for a sliver deleted before expiry + if reservation.is_closing() or reservation.is_closed(): + usage = current_duration["quota_used"] - total_duration + if usage < 0: + usage = 0 + self.db.update_quota(project_id=project_id, + resource_type=resource_type, + resource_unit=resource_unit, quota_used=usage) + # Account for resource hours used for a new or extended sliver + else: + usage = total_duration + current_duration["quota_used"] + self.db.update_quota(project_id=project_id, + resource_type=resource_type, + resource_unit=resource_unit, quota_used=usage) + finally: + if self.lock.locked(): + self.lock.release() + + def extract_quota_usage(self, sliver, duration: float) -> dict[tuple[str, str], float]: + """ + Extract quota usage from a sliver + + @param sliver: The sliver object from which resources are extracted. + @param duration: Number of hours the resources are requested for. + @return: A dictionary of resource type/unit tuples to requested amounts. + """ + unit = "HOURS" + requested_resources = {} + + # Check if the sliver is a NodeSliver + if not isinstance(sliver, NodeSliver): + return requested_resources + + allocations = sliver.get_capacity_allocations() + if not allocations and sliver.get_capacity_hints(): + catalog = InstanceCatalog() + allocations = catalog.get_instance_capacities(instance_type=sliver.get_capacity_hints().instance_type) + else: + allocations = sliver.get_capacities() + + # Extract Core, Ram, Disk Hours + requested_resources[("Core", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.core) + requested_resources[("RAM", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.ram) + requested_resources[("Disk", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.disk) + + # Extract component hours (e.g., GPU, FPGA, SmartNIC) + if sliver.attached_components_info: + for c in sliver.attached_components_info.devices.values(): + component_type = str(c.get_type()) + requested_resources[(component_type, unit)] = ( + requested_resources.get((component_type, unit), 0) + duration + ) + + return requested_resources + + def enforce_quota_limits(self, quota_lookup: dict, computed_reservations: list[LeaseReservationAvro], + duration: float) -> tuple[bool, str]: + """ + Check if the requested resources for multiple reservations are within the project's quota limits. + + @param quota_lookup: Quota Limits for various resource types. + @param computed_reservations: List of slivers requested. + @param duration: Number of hours the reservations are requested for. + @return: Tuple (True, None) if resources are within quota, or (False, message) if denied. + @throws: Exception if there is an error during the database interaction. + """ + try: + requested_resources = {} + + # Accumulate resource usage for all reservations + for r in computed_reservations: + sliver = r.get_sliver() + sliver_resources = self.extract_quota_usage(sliver, duration) + for key, value in sliver_resources.items(): + requested_resources[key] = requested_resources.get(key, 0) + value + + # Check each accumulated resource usage against its quota + for quota_key, total_requested_duration in requested_resources.items(): + if quota_key not in quota_lookup: + return False, f"Quota not defined for resource: {quota_key[0]} ({quota_key[1]})." + + quota_info = quota_lookup[quota_key] + available_quota = quota_info["quota_limit"] - quota_info["quota_used"] + + if total_requested_duration > available_quota: + return False, ( + f"Requested {total_requested_duration} {quota_key[1]} of {quota_key[0]}, " + f"but only {available_quota} is available." + ) + + # If all checks pass + return True, None + except Exception as e: + raise Exception(f"Error while checking reservation: {str(e)}") diff --git a/fabric_cf/actor/core/util/utils.py b/fabric_cf/actor/core/util/utils.py index c21b0c93..54cfb1d2 100644 --- a/fabric_cf/actor/core/util/utils.py +++ b/fabric_cf/actor/core/util/utils.py @@ -120,81 +120,3 @@ def generate_sha256(*, token: str): return sha256_hex - -def extract_quota_usage(sliver, duration: float) -> Dict[Tuple[str, str], float]: - """ - Extract quota usage from a sliver - - @param sliver: The sliver object from which resources are extracted. - @param duration: Number of hours the resources are requested for. - @return: A dictionary of resource type/unit tuples to requested amounts. - """ - unit = "HOURS" - requested_resources = {} - - # Check if the sliver is a NodeSliver - if not isinstance(sliver, NodeSliver): - return requested_resources - - allocations = sliver.get_capacity_allocations() - if not allocations and sliver.get_capacity_hints(): - catalog = InstanceCatalog() - allocations = catalog.get_instance_capacities(instance_type=sliver.get_capacity_hints().instance_type) - else: - allocations = sliver.get_capacities() - - # Extract Core, Ram, Disk Hours - requested_resources[("Core", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.core) - requested_resources[("RAM", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.ram) - requested_resources[("Disk", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.disk) - - # Extract component hours (e.g., GPU, FPGA, SmartNIC) - if sliver.attached_components_info: - for c in sliver.attached_components_info.devices.values(): - component_type = str(c.get_type()) - requested_resources[(component_type, unit)] = ( - requested_resources.get((component_type, unit), 0) + duration - ) - - return requested_resources - - -def enforce_quota_limits(quota_lookup: dict, computed_reservations: list[LeaseReservationAvro], - duration: float) -> Tuple[bool, str]: - """ - Check if the requested resources for multiple reservations are within the project's quota limits. - - @param quota_lookup: Quota Limits for various resource types. - @param computed_reservations: List of slivers requested. - @param duration: Number of hours the reservations are requested for. - @return: Tuple (True, None) if resources are within quota, or (False, message) if denied. - @throws: Exception if there is an error during the database interaction. - """ - try: - requested_resources = {} - - # Accumulate resource usage for all reservations - for r in computed_reservations: - sliver = r.get_sliver() - sliver_resources = extract_quota_usage(sliver, duration) - for key, value in sliver_resources.items(): - requested_resources[key] = requested_resources.get(key, 0) + value - - # Check each accumulated resource usage against its quota - for quota_key, total_requested_duration in requested_resources.items(): - if quota_key not in quota_lookup: - return False, f"Quota not defined for resource: {quota_key[0]} ({quota_key[1]})." - - quota_info = quota_lookup[quota_key] - available_quota = quota_info["quota_limit"] - quota_info["quota_used"] - - if total_requested_duration > available_quota: - return False, ( - f"Requested {total_requested_duration} {quota_key[1]} of {quota_key[0]}, " - f"but only {available_quota} is available." - ) - - # If all checks pass - return True, None - except Exception as e: - raise Exception(f"Error while checking reservation: {str(e)}") diff --git a/fabric_cf/actor/db/__init__.py b/fabric_cf/actor/db/__init__.py index 695f8f42..cbd8071a 100644 --- a/fabric_cf/actor/db/__init__.py +++ b/fabric_cf/actor/db/__init__.py @@ -240,24 +240,3 @@ class Components(Base): component = Column(String, primary_key=True, index=True) bdf = Column(String, primary_key=True, index=True) reservation = relationship('Reservations', back_populates='components') - - -class Quotas(Base): - __tablename__ = "quotas" - - resource_type = Column(String(50), primary_key=True, index=True, nullable=False) - project_id = Column(UUID(as_uuid=True), primary_key=True, index=True, nullable=False) - resource_unit = Column(String(20), primary_key=True, index=True, nullable=False, default="HOURS") - quota_limit = Column(Float, nullable=False) - quota_used = Column(Float, default=0) - created_at = Column( - TIMESTAMP(timezone=True), - nullable=False, - server_default=text("timezone('utc', now())") # Explicitly ensure UTC in PostgreSQL - ) - updated_at = Column( - TIMESTAMP(timezone=True), - nullable=False, - server_default=text("timezone('utc', now())"), - onupdate=func.now() - ) diff --git a/fabric_cf/actor/db/psql_database.py b/fabric_cf/actor/db/psql_database.py index 8015d292..3fcae909 100644 --- a/fabric_cf/actor/db/psql_database.py +++ b/fabric_cf/actor/db/psql_database.py @@ -1715,110 +1715,6 @@ def get_metrics(self, *, project_id: str = None, user_id: str = None, excluded_p self.logger.error(Constants.EXCEPTION_OCCURRED.format(e)) raise e - def create_quota(self, project_id: str, resource_type: str, resource_unit: str, quota_limit: int): - """ - Create a new quota record in the database. - - @param project_id: UUID of the project the quota is associated with. - @param resource_type: Type of resource (e.g., SLICE, COMPONENT). - @param resource_unit: Unit of the resource (e.g., HOURS, COUNT, GB). - @param quota_limit: Maximum allowed usage for this resource. - @return: The created `Quotas` object. - @throws: Exception if there is an error during the creation. - """ - session = self.get_session() - try: - quota = Quotas( - project_id=project_id, - resource_type=resource_type, - resource_unit=resource_unit, - quota_limit=quota_limit, - quota_used=0 - ) - session.add(quota) - session.commit() - return quota - except Exception as e: - self.logger.error(Constants.EXCEPTION_OCCURRED.format(e)) - raise e - - def get_quota_lookup(self, project_id: str): - """ - Fetches all quotas for a given project and creates a lookup dictionary. - - @param project_id: UUID of the project whose quotas are to be fetched. - @return: Dictionary with keys as (resource_type, resource_unit) and values as quota details. - @throws: Exception if there is an error during the database interaction. - """ - session = self.get_session() - try: - # Fetch all quotas for the project - project_quotas = session.query(Quotas).filter(Quotas.project_id == project_id).all() - - # Create a lookup dictionary for quick quota access - quota_lookup = {} - for quota in project_quotas: - key = (quota.resource_type, quota.resource_unit) - quota_lookup[key] = { - "quota_limit": quota.quota_limit, - "quota_used": quota.quota_used, - } - return quota_lookup - except Exception as e: - raise Exception(f"Error while fetching quotas: {str(e)}") - - def update_quota(self, project_id: str, resource_type: str, resource_unit: str, **kwargs): - """ - Update an existing quota record. - - @param project_id: UUID of the project the quota is associated with. - @param resource_type: Type of resource (e.g., SLICE, COMPONENT). - @param resource_unit: Unit of the resource (e.g., HOURS, COUNT, GB). - @param kwargs: Dictionary of fields to update and their new values. - @return: The updated `Quotas` object, or None if the quota does not exist. - @throws: Exception if there is an error during the update. - """ - session = self.get_session() - try: - filter_dict = {"project_id": project_id, "resource_type": resource_type, "resource_unit": resource_unit} - quota = session.query(Quotas).filter_by(**filter_dict).one_or_none() - if not quota: - return None - - for key, value in kwargs.items(): - if hasattr(quota, key): - setattr(quota, key, value) - print(f"Updating: {quota.project_id} {quota.resource_type} {quota.resource_unit} {quota.quota_limit} {quota.quota_used}") - session.commit() - return quota - except Exception as e: - self.logger.error(Constants.EXCEPTION_OCCURRED.format(e)) - raise e - - def delete_quota(self, project_id: str, resource_type: str, resource_unit: str): - """ - Delete a specific quota record. - - @param project_id: UUID of the project the quota is associated with. - @param resource_type: Type of resource (e.g., SLICE, COMPONENT). - @param resource_unit: Unit of the resource (e.g., HOURS, COUNT, GB). - @return: True if the quota was successfully deleted, False if not found. - @throws: Exception if there is an error during the deletion. - """ - session = self.get_session() - try: - quota = session.query(Quotas).filter(Quotas.project_id == project_id and - Quotas.resource_type == resource_type and - Quotas.resource_unit == resource_unit).first() - if quota: - session.delete(quota) - session.commit() - return True - return False - except Exception as e: - self.logger.error(Constants.EXCEPTION_OCCURRED.format(e)) - raise e - def test(): logger = logging.getLogger('PsqlDatabase') @@ -2002,8 +1898,6 @@ def test3(): if __name__ == '__main__': test2() - #test() - #test3() logger = logging.getLogger('PsqlDatabase') db = PsqlDatabase(user='fabric', password='fabric', database='orchestrator', db_host='127.0.0.1:5432', diff --git a/fabric_cf/authority/switch_handler_config.yml b/fabric_cf/authority/switch_handler_config.yml index b43dec81..fb5a907a 100644 --- a/fabric_cf/authority/switch_handler_config.yml +++ b/fabric_cf/authority/switch_handler_config.yml @@ -25,7 +25,7 @@ runtime: ssh_retries: 10 playbooks: - admin_ssh_key: /root/.ssh/id_rsa_nova + admin_ssh_key: /root/.ssh/id_rsa_p4_fabric location: /etc/fabric/actor/playbooks inventory_location: /etc/fabric/actor/playbooks/inventory Switch: head_switch_provisioning.yml diff --git a/pyproject.toml b/pyproject.toml index 9f1cb336..9d3201e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ dependencies = [ "fabric_fss_utils==1.6.0", "fabric-message-bus==1.7.0", "fabric-fim==1.8.0", - "fabric-credmgr-client==1.6.1", + "fabrictestbed==1.8.2b0", "ansible" ] From 3c6ff437ce70cef8f1a53c2844493386d9653062 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Mon, 10 Feb 2025 16:39:32 -0500 Subject: [PATCH 04/29] initial quota changes with core-api --- fabric_cf/actor/boot/configuration.py | 19 ++++- fabric_cf/actor/core/common/constants.py | 4 ++ fabric_cf/actor/core/container/globals.py | 9 +++ fabric_cf/actor/core/kernel/kernel.py | 10 +-- fabric_cf/actor/core/util/qutoa_mgr.py | 69 +++++++++++-------- fabric_cf/authority/config.al2s.am.yaml | 4 ++ fabric_cf/authority/config.net.am.yaml | 4 ++ fabric_cf/authority/config.site.am.geni.yaml | 4 ++ fabric_cf/authority/config.site.am.yaml | 4 ++ fabric_cf/broker/config.broker.yaml | 4 ++ .../orchestrator/config.orchestrator.yaml | 4 ++ .../orchestrator/core/orchestrator_handler.py | 14 ++-- pyproject.toml | 2 +- 13 files changed, 108 insertions(+), 43 deletions(-) diff --git a/fabric_cf/actor/boot/configuration.py b/fabric_cf/actor/boot/configuration.py index 5171e928..d2d235bd 100644 --- a/fabric_cf/actor/boot/configuration.py +++ b/fabric_cf/actor/boot/configuration.py @@ -45,6 +45,10 @@ def __init__(self, *, config: dict): if Constants.CONFIG_SECTION_O_AUTH in config: self.oauth = config.get(Constants.CONFIG_SECTION_O_AUTH) + self.core_api = {} + if Constants.CONFIG_SECTION_CORE_API in config: + self.core_api = config.get(Constants.CONFIG_SECTION_CORE_API) + self.smtp = {} if Constants.CONFIG_SECTION_SMTP in config: self.smtp = config.get(Constants.CONFIG_SECTION_SMTP) @@ -91,6 +95,12 @@ def get_oauth(self) -> dict: """ return self.oauth + def get_core_api(self) -> dict: + """ + Return core api + """ + return self.core_api + def get_smtp(self) -> dict: """ Return smtp config @@ -425,7 +435,6 @@ def get_runtime_config(self) -> dict: """ if self.global_config is not None: return self.global_config.get_runtime() - return None def get_oauth_config(self) -> dict: """ @@ -433,7 +442,13 @@ def get_oauth_config(self) -> dict: """ if self.global_config is not None: return self.global_config.get_oauth() - return None + + def get_core_api_config(self) -> dict: + """ + Return Core API Config + """ + if self.global_config is not None: + return self.global_config.get_core_api() def get_smtp_config(self) -> dict: if self.global_config: diff --git a/fabric_cf/actor/core/common/constants.py b/fabric_cf/actor/core/common/constants.py index c4b4cf3b..21aaafe1 100644 --- a/fabric_cf/actor/core/common/constants.py +++ b/fabric_cf/actor/core/common/constants.py @@ -173,6 +173,9 @@ class Constants: PROPERTY_CONF_O_AUTH_TRL_REFRESH = "trl-refresh" PROPERTY_CONF_O_AUTH_VERIFY_EXP = "verify-exp" + CONFIG_SECTION_CORE_API = "core_api" + PROPERTY_CONF_HOST = "host" + CONFIG_SECTION_SMTP = "smtp" CONFIG_SECTION_DATABASE = "database" @@ -275,6 +278,7 @@ class Constants: CLAIMS_PROJECTS = "projects" UUID = "uuid" TAGS = "tags" + TOKEN = "token" TOKEN_HASH = "token_hash" PROJECT_ID = "project_id" USERS = "users" diff --git a/fabric_cf/actor/core/container/globals.py b/fabric_cf/actor/core/container/globals.py index b0b141c7..e73a0f90 100644 --- a/fabric_cf/actor/core/container/globals.py +++ b/fabric_cf/actor/core/container/globals.py @@ -35,6 +35,7 @@ import logging import os +from fabric_cf.actor.core.util.qutoa_mgr import QuotaMgr from fim.graph.neo4j_property_graph import Neo4jGraphImporter from fim.graph.resources.abc_arm import ABCARMPropertyGraph from fss_utils.jwt_validate import JWTValidator @@ -74,6 +75,7 @@ def __init__(self): self.lock = threading.Lock() self.jwt_validator = None self.token_validator = None + self.quota_mgr = None def make_logger(self): """ @@ -193,6 +195,10 @@ def load_validators(self): refresh_period=timedelta(hours=t.hour, minutes=t.minute, seconds=t.second), jwt_validator=self.jwt_validator) + core_api = self.config.get_core_api_config() + self.quota_mgr = QuotaMgr(core_api_host=core_api.get(Constants.PROPERTY_CONF_HOST), + token=core_api.get(Constants.TOKEN, "")) + def load_config(self): """ Load the configuration @@ -211,6 +217,9 @@ def get_jwt_validator(self) -> JWTValidator: def get_token_validator(self) -> TokenValidator: return self.token_validator + def get_quota_mgr(self) -> QuotaMgr: + return self.quota_mgr + def get_container(self) -> ABCActorContainer: """ Get the container diff --git a/fabric_cf/actor/core/kernel/kernel.py b/fabric_cf/actor/core/kernel/kernel.py index 00bcd7ec..4cbd85aa 100644 --- a/fabric_cf/actor/core/kernel/kernel.py +++ b/fabric_cf/actor/core/kernel/kernel.py @@ -35,7 +35,7 @@ from fabric_cf.actor.core.apis.abc_delegation import ABCDelegation from fabric_cf.actor.core.apis.abc_policy import ABCPolicy from fabric_cf.actor.core.common.constants import Constants -from fabric_cf.actor.core.common.event_logger import EventLogger, EventLoggerSingleton +from fabric_cf.actor.core.common.event_logger import EventLoggerSingleton from fabric_cf.actor.core.common.exceptions import ReservationNotFoundException, DelegationNotFoundException, \ KernelException from fabric_cf.actor.core.kernel.authority_reservation import AuthorityReservation @@ -220,10 +220,9 @@ def close(self, *, reservation: ABCReservationMixin, force: bool = False): self.policy.close(reservation=reservation) reservation.close(force=force) self.plugin.get_database().update_reservation(reservation=reservation) - ## TODO release resources back if deleted before expiry if reservation.get_term().get_remaining_length() > 0: - self.plugin.get_database().update_quota(reservation=reservation) - + from fabric_cf.actor.core.container.globals import GlobalsSingleton + GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation) reservation.service_close() except Exception as e: err = f"An error occurred during close for reservation #{reservation.get_reservation_id()}" @@ -1458,7 +1457,8 @@ def update_ticket(self, *, reservation: ABCReservationMixin, update: Reservation self.plugin.get_database().update_reservation(reservation=reservation) if not reservation.is_failed(): reservation.service_update_ticket() - self.plugin.get_database().update_quota(reservation=reservation) + from fabric_cf.actor.core.container.globals import GlobalsSingleton + GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation) except Exception as e: self.logger.error(traceback.format_exc()) self.error(err=f"An error occurred during update ticket for " diff --git a/fabric_cf/actor/core/util/qutoa_mgr.py b/fabric_cf/actor/core/util/qutoa_mgr.py index b5d72533..4c15851c 100644 --- a/fabric_cf/actor/core/util/qutoa_mgr.py +++ b/fabric_cf/actor/core/util/qutoa_mgr.py @@ -24,16 +24,25 @@ # # Author: Komal Thareja (kthare10@renci.org) from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro +from fabrictestbed.external_api.core_api import CoreApi +from fabrictestbed.slice_editor import InstanceCatalog +from fim.slivers.network_node import NodeSliver from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin class QuotaMgr: - def __init__(self, *, core_api_host: str): - self.core_api_host = core_api_host + def __init__(self, *, core_api_host: str, token: str): + self.core_api = CoreApi(core_api_host=core_api_host, token=token) + + def list_quotas(self, project_uuid: str, offset: int = 0, limit: int = 200) -> dict[tuple[str, str], dict]: + quota_list = self.core_api.list_quotas(project_uuid=project_uuid, offset=offset, limit=limit) + quotas = {} + for q in quota_list: + quotas[(q.get("resource_type"), q.get("resource_unit"))] = q + return quotas def update_quota(self, reservation: ABCReservationMixin): - print("Update Quota") try: slice_object = reservation.get_slice() if not slice_object: @@ -62,40 +71,44 @@ def update_quota(self, reservation: ABCReservationMixin): return duration /= 3600000 - existing_quota = self.db.get_quota_lookup(project_id=project_id) + existing_quotas = self.list_quotas(project_uuid=project_id) sliver_quota_usage = self.extract_quota_usage(sliver=sliver, duration=duration) - print(f"Existing: {existing_quota}") + print(f"Existing: {existing_quotas}") print(f"Updated by: {sliver_quota_usage}") # Check each accumulated resource usage against its quota for quota_key, total_duration in sliver_quota_usage.items(): - print(f"Iteration: {quota_key}") - current_duration = 0 - if quota_key in existing_quota: - current_duration = existing_quota.get(quota_key) - (resource_type, resource_unit) = quota_key + existing = existing_quotas.get(quota_key) + print(f"No quota available for: prj:{project_id} quota_key:{quota_key}: quota: {existing}") + if not existing: + continue # Return resource hours for a sliver deleted before expiry if reservation.is_closing() or reservation.is_closed(): - usage = current_duration["quota_used"] - total_duration + usage = existing.get("quota_used") - total_duration if usage < 0: usage = 0 - self.db.update_quota(project_id=project_id, - resource_type=resource_type, - resource_unit=resource_unit, quota_used=usage) + self.core_api.update_quota(uuid=existing.get("uuid"), project_id=project_id, + resource_type=existing.get("resource_type"), + resource_unit=existing.get("resource_unit"), + quota_used=usage) + # Account for resource hours used for a new or extended sliver else: - usage = total_duration + current_duration["quota_used"] - self.db.update_quota(project_id=project_id, - resource_type=resource_type, - resource_unit=resource_unit, quota_used=usage) + usage = total_duration + existing.get("quota_used") + self.core_api.update_quota(uuid=existing.get("uuid"), project_id=project_id, + resource_type=existing.get("resource_type"), + resource_unit=existing.get("resource_unit"), + quota_used=usage) + except Exception as e: + print(f"Failed to update Quota: {e}") finally: - if self.lock.locked(): - self.lock.release() + print("done") - def extract_quota_usage(self, sliver, duration: float) -> dict[tuple[str, str], float]: + @staticmethod + def extract_quota_usage(sliver, duration: float) -> dict[tuple[str, str], float]: """ Extract quota usage from a sliver @@ -118,9 +131,9 @@ def extract_quota_usage(self, sliver, duration: float) -> dict[tuple[str, str], allocations = sliver.get_capacities() # Extract Core, Ram, Disk Hours - requested_resources[("Core", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.core) - requested_resources[("RAM", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.ram) - requested_resources[("Disk", unit)] = requested_resources.get(("Core", unit), 0) + (duration * allocations.disk) + requested_resources[("core", unit)] = requested_resources.get(("core", unit), 0) + (duration * allocations.core) + requested_resources[("ram", unit)] = requested_resources.get(("ram", unit), 0) + (duration * allocations.ram) + requested_resources[("disk", unit)] = requested_resources.get(("disk", unit), 0) + (duration * allocations.disk) # Extract component hours (e.g., GPU, FPGA, SmartNIC) if sliver.attached_components_info: @@ -132,12 +145,12 @@ def extract_quota_usage(self, sliver, duration: float) -> dict[tuple[str, str], return requested_resources - def enforce_quota_limits(self, quota_lookup: dict, computed_reservations: list[LeaseReservationAvro], + def enforce_quota_limits(self, quotas: dict, computed_reservations: list[LeaseReservationAvro], duration: float) -> tuple[bool, str]: """ Check if the requested resources for multiple reservations are within the project's quota limits. - @param quota_lookup: Quota Limits for various resource types. + @param quotas: Quota Limits for various resource types. @param computed_reservations: List of slivers requested. @param duration: Number of hours the reservations are requested for. @return: Tuple (True, None) if resources are within quota, or (False, message) if denied. @@ -155,10 +168,10 @@ def enforce_quota_limits(self, quota_lookup: dict, computed_reservations: list[L # Check each accumulated resource usage against its quota for quota_key, total_requested_duration in requested_resources.items(): - if quota_key not in quota_lookup: + if quota_key not in quotas: return False, f"Quota not defined for resource: {quota_key[0]} ({quota_key[1]})." - quota_info = quota_lookup[quota_key] + quota_info = quotas[quota_key] available_quota = quota_info["quota_limit"] - quota_info["quota_used"] if total_requested_duration > available_quota: diff --git a/fabric_cf/authority/config.al2s.am.yaml b/fabric_cf/authority/config.al2s.am.yaml index 2bd34e83..b0c95b66 100644 --- a/fabric_cf/authority/config.al2s.am.yaml +++ b/fabric_cf/authority/config.al2s.am.yaml @@ -81,6 +81,10 @@ oauth: trl-refresh: 00:10:00 verify-exp: True +core_api: + host: https://uis.fabric-testbed.net + token: + database: db-user: fabric db-password: fabric diff --git a/fabric_cf/authority/config.net.am.yaml b/fabric_cf/authority/config.net.am.yaml index 7cd92d5d..ae575edb 100644 --- a/fabric_cf/authority/config.net.am.yaml +++ b/fabric_cf/authority/config.net.am.yaml @@ -81,6 +81,10 @@ oauth: trl-refresh: 00:10:00 verify-exp: True +core_api: + host: https://uis.fabric-testbed.net + token: + database: db-user: fabric db-password: fabric diff --git a/fabric_cf/authority/config.site.am.geni.yaml b/fabric_cf/authority/config.site.am.geni.yaml index e6cbcce1..f9bf1387 100644 --- a/fabric_cf/authority/config.site.am.geni.yaml +++ b/fabric_cf/authority/config.site.am.geni.yaml @@ -84,6 +84,10 @@ oauth: trl-refresh: 00:10:00 verify-exp: True +core_api: + host: https://uis.fabric-testbed.net + token: + database: db-user: fabric db-password: fabric diff --git a/fabric_cf/authority/config.site.am.yaml b/fabric_cf/authority/config.site.am.yaml index 1d754062..1870b3e9 100644 --- a/fabric_cf/authority/config.site.am.yaml +++ b/fabric_cf/authority/config.site.am.yaml @@ -84,6 +84,10 @@ oauth: trl-refresh: 00:10:00 verify-exp: True +core_api: + host: https://uis.fabric-testbed.net + token: + database: db-user: fabric db-password: fabric diff --git a/fabric_cf/broker/config.broker.yaml b/fabric_cf/broker/config.broker.yaml index 468169ee..eeaa0bfd 100644 --- a/fabric_cf/broker/config.broker.yaml +++ b/fabric_cf/broker/config.broker.yaml @@ -82,6 +82,10 @@ oauth: trl-refresh: 00:10:00 verify-exp: True +core_api: + host: https://uis.fabric-testbed.net + token: + database: db-user: fabric db-password: fabric diff --git a/fabric_cf/orchestrator/config.orchestrator.yaml b/fabric_cf/orchestrator/config.orchestrator.yaml index 4d1f6a10..0396ebb5 100644 --- a/fabric_cf/orchestrator/config.orchestrator.yaml +++ b/fabric_cf/orchestrator/config.orchestrator.yaml @@ -86,6 +86,10 @@ oauth: key-refresh: 00:10:00 verify-exp: True +core_api: + host: https://uis.fabric-testbed.net + token: + smtp: smtp_server: mail.fabric-testbed.net smtp_port: 587 diff --git a/fabric_cf/orchestrator/core/orchestrator_handler.py b/fabric_cf/orchestrator/core/orchestrator_handler.py index 6688015a..45f4ca51 100644 --- a/fabric_cf/orchestrator/core/orchestrator_handler.py +++ b/fabric_cf/orchestrator/core/orchestrator_handler.py @@ -43,7 +43,6 @@ from fabric_cf.actor.core.kernel.poa import PoaStates from fabric_cf.actor.core.kernel.reservation_states import ReservationStates from fabric_cf.actor.core.time.actor_clock import ActorClock -from fabric_cf.actor.core.util.utils import enforce_quota_limits from fabric_cf.actor.fim.fim_helper import FimHelper from fabric_cf.actor.core.apis.abc_mgmt_controller_mixin import ABCMgmtControllerMixin from fabric_cf.actor.core.common.constants import Constants, ErrorCodes @@ -62,6 +61,7 @@ def __init__(self): self.controller_state = OrchestratorKernelSingleton.get() from fabric_cf.actor.core.container.globals import GlobalsSingleton self.globals = GlobalsSingleton.get() + self.quota_mgr = self.globals.get_quota_mgr() self.logger = self.globals.get_logger() self.jwks_url = self.globals.get_config().get_oauth_config().get(Constants.PROPERTY_CONF_O_AUTH_JWKS_URL, None) self.pdp_config = self.globals.get_config().get_global_config().get_pdp_config() @@ -326,10 +326,10 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key # Check if Testbed in Maintenance or Site in Maintenance self.check_maintenance_mode(token=fabric_token, reservations=computed_reservations) - quota_lookup = controller.get_quota_lookup(project_id=project) - status, error_message = enforce_quota_limits(quota_lookup=quota_lookup, - computed_reservations=computed_reservations, - duration=(end_time-start_time).total_seconds()/3600) + quotas = self.quota_mgr.list_quotas(project_id=project) + status, error_message = self.quota_mgr.enforce_quota_limits(quotas=quotas, + computed_reservations=computed_reservations, + duration=(end_time-start_time).total_seconds()/3600) if not status: raise OrchestratorException(http_error_code=BAD_REQUEST, @@ -848,7 +848,7 @@ def validate_lease_time(lease_time: str) -> Union[datetime, None]: def __compute_lease_end_time(self, lease_end_time: datetime = None, allow_long_lived: bool = False, project_id: str = None, - lifetime: int = Constants.DEFAULT_LEASE_IN_HOURS) -> Tuple[datetime, datetime]: + lifetime: int = Constants.DEFAULT_LEASE_IN_HOURS) -> tuple[datetime, datetime]: """ Validate and compute Lease End Time. @@ -924,7 +924,7 @@ def check_maintenance_mode(self, *, token: FabricToken, reservations: List[Reser raise OrchestratorException(message=message, http_error_code=Constants.INTERNAL_SERVER_ERROR_MAINT_MODE) - def poa(self, *, token: str, sliver_id: str, poa: PoaAvro) -> Tuple[str, str]: + def poa(self, *, token: str, sliver_id: str, poa: PoaAvro) -> tuple[str, str]: try: controller = self.controller_state.get_management_actor() self.logger.debug(f"poa invoked for Controller: {controller}") diff --git a/pyproject.toml b/pyproject.toml index 9d3201e7..4d83f953 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ dependencies = [ "PyYAML", "fabric_fss_utils==1.6.0", "fabric-message-bus==1.7.0", - "fabric-fim==1.8.0", + "fabric-fim==1.8.1", "fabrictestbed==1.8.2b0", "ansible" ] From 2ddcf324aa360a9cc600095a88ef5d2614d57346 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Mon, 10 Feb 2025 16:52:00 -0500 Subject: [PATCH 05/29] enable quota mgmt using flag --- fabric_cf/actor/core/container/globals.py | 5 +++-- fabric_cf/actor/core/kernel/kernel.py | 6 ++++-- fabric_cf/authority/config.al2s.am.yaml | 1 + fabric_cf/authority/config.net.am.yaml | 1 + fabric_cf/authority/config.site.am.geni.yaml | 1 + fabric_cf/authority/config.site.am.yaml | 1 + fabric_cf/broker/config.broker.yaml | 1 + fabric_cf/orchestrator/config.orchestrator.yaml | 1 + .../orchestrator/core/orchestrator_handler.py | 17 +++++++++-------- 9 files changed, 22 insertions(+), 12 deletions(-) diff --git a/fabric_cf/actor/core/container/globals.py b/fabric_cf/actor/core/container/globals.py index e73a0f90..b5594131 100644 --- a/fabric_cf/actor/core/container/globals.py +++ b/fabric_cf/actor/core/container/globals.py @@ -196,8 +196,9 @@ def load_validators(self): jwt_validator=self.jwt_validator) core_api = self.config.get_core_api_config() - self.quota_mgr = QuotaMgr(core_api_host=core_api.get(Constants.PROPERTY_CONF_HOST), - token=core_api.get(Constants.TOKEN, "")) + if core_api.get("enable", False): + self.quota_mgr = QuotaMgr(core_api_host=core_api.get(Constants.PROPERTY_CONF_HOST), + token=core_api.get(Constants.TOKEN, "")) def load_config(self): """ diff --git a/fabric_cf/actor/core/kernel/kernel.py b/fabric_cf/actor/core/kernel/kernel.py index 4cbd85aa..82ca0969 100644 --- a/fabric_cf/actor/core/kernel/kernel.py +++ b/fabric_cf/actor/core/kernel/kernel.py @@ -222,7 +222,8 @@ def close(self, *, reservation: ABCReservationMixin, force: bool = False): self.plugin.get_database().update_reservation(reservation=reservation) if reservation.get_term().get_remaining_length() > 0: from fabric_cf.actor.core.container.globals import GlobalsSingleton - GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation) + if GlobalsSingleton.get().get_quota_mgr(): + GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation) reservation.service_close() except Exception as e: err = f"An error occurred during close for reservation #{reservation.get_reservation_id()}" @@ -1458,7 +1459,8 @@ def update_ticket(self, *, reservation: ABCReservationMixin, update: Reservation if not reservation.is_failed(): reservation.service_update_ticket() from fabric_cf.actor.core.container.globals import GlobalsSingleton - GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation) + if GlobalsSingleton.get().get_quota_mgr(): + GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation) except Exception as e: self.logger.error(traceback.format_exc()) self.error(err=f"An error occurred during update ticket for " diff --git a/fabric_cf/authority/config.al2s.am.yaml b/fabric_cf/authority/config.al2s.am.yaml index b0c95b66..e4b98609 100644 --- a/fabric_cf/authority/config.al2s.am.yaml +++ b/fabric_cf/authority/config.al2s.am.yaml @@ -84,6 +84,7 @@ oauth: core_api: host: https://uis.fabric-testbed.net token: + enable: False database: db-user: fabric diff --git a/fabric_cf/authority/config.net.am.yaml b/fabric_cf/authority/config.net.am.yaml index ae575edb..39992fbd 100644 --- a/fabric_cf/authority/config.net.am.yaml +++ b/fabric_cf/authority/config.net.am.yaml @@ -84,6 +84,7 @@ oauth: core_api: host: https://uis.fabric-testbed.net token: + enable: False database: db-user: fabric diff --git a/fabric_cf/authority/config.site.am.geni.yaml b/fabric_cf/authority/config.site.am.geni.yaml index f9bf1387..2dc278cf 100644 --- a/fabric_cf/authority/config.site.am.geni.yaml +++ b/fabric_cf/authority/config.site.am.geni.yaml @@ -87,6 +87,7 @@ oauth: core_api: host: https://uis.fabric-testbed.net token: + enable: False database: db-user: fabric diff --git a/fabric_cf/authority/config.site.am.yaml b/fabric_cf/authority/config.site.am.yaml index 1870b3e9..c0c6e552 100644 --- a/fabric_cf/authority/config.site.am.yaml +++ b/fabric_cf/authority/config.site.am.yaml @@ -87,6 +87,7 @@ oauth: core_api: host: https://uis.fabric-testbed.net token: + enable: False database: db-user: fabric diff --git a/fabric_cf/broker/config.broker.yaml b/fabric_cf/broker/config.broker.yaml index eeaa0bfd..af6b1ad0 100644 --- a/fabric_cf/broker/config.broker.yaml +++ b/fabric_cf/broker/config.broker.yaml @@ -83,6 +83,7 @@ oauth: verify-exp: True core_api: + enable: False host: https://uis.fabric-testbed.net token: diff --git a/fabric_cf/orchestrator/config.orchestrator.yaml b/fabric_cf/orchestrator/config.orchestrator.yaml index 0396ebb5..c8382a45 100644 --- a/fabric_cf/orchestrator/config.orchestrator.yaml +++ b/fabric_cf/orchestrator/config.orchestrator.yaml @@ -87,6 +87,7 @@ oauth: verify-exp: True core_api: + enable: True host: https://uis.fabric-testbed.net token: diff --git a/fabric_cf/orchestrator/core/orchestrator_handler.py b/fabric_cf/orchestrator/core/orchestrator_handler.py index 45f4ca51..7bc7d8a7 100644 --- a/fabric_cf/orchestrator/core/orchestrator_handler.py +++ b/fabric_cf/orchestrator/core/orchestrator_handler.py @@ -326,14 +326,15 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key # Check if Testbed in Maintenance or Site in Maintenance self.check_maintenance_mode(token=fabric_token, reservations=computed_reservations) - quotas = self.quota_mgr.list_quotas(project_id=project) - status, error_message = self.quota_mgr.enforce_quota_limits(quotas=quotas, - computed_reservations=computed_reservations, - duration=(end_time-start_time).total_seconds()/3600) - - if not status: - raise OrchestratorException(http_error_code=BAD_REQUEST, - message=error_message) + if self.quota_mgr: + quotas = self.quota_mgr.list_quotas(project_id=project) + status, error_message = self.quota_mgr.enforce_quota_limits(quotas=quotas, + computed_reservations=computed_reservations, + duration=(end_time-start_time).total_seconds()/3600) + + if not status: + raise OrchestratorException(http_error_code=BAD_REQUEST, + message=error_message) create_ts = time.time() if lease_start_time and lease_end_time and lifetime: From 61091f4972a978f22b2f0db8386947a5448a98ac Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Mon, 10 Feb 2025 16:54:11 -0500 Subject: [PATCH 06/29] enable quota mgmt using flag --- fabric_cf/orchestrator/core/orchestrator_handler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fabric_cf/orchestrator/core/orchestrator_handler.py b/fabric_cf/orchestrator/core/orchestrator_handler.py index 7bc7d8a7..87ea9bcd 100644 --- a/fabric_cf/orchestrator/core/orchestrator_handler.py +++ b/fabric_cf/orchestrator/core/orchestrator_handler.py @@ -332,9 +332,7 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key computed_reservations=computed_reservations, duration=(end_time-start_time).total_seconds()/3600) - if not status: - raise OrchestratorException(http_error_code=BAD_REQUEST, - message=error_message) + self.logger.info(f"Quota enforcement status: {status}, error: {error_message}") create_ts = time.time() if lease_start_time and lease_end_time and lifetime: From 1154ea7e421575a93cab6003befaa756f2c8233c Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Mon, 10 Feb 2025 17:03:56 -0500 Subject: [PATCH 07/29] setup logger for quotamgr --- fabric_cf/actor/core/container/globals.py | 3 ++- fabric_cf/actor/core/util/qutoa_mgr.py | 17 ++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/fabric_cf/actor/core/container/globals.py b/fabric_cf/actor/core/container/globals.py index b5594131..86328c19 100644 --- a/fabric_cf/actor/core/container/globals.py +++ b/fabric_cf/actor/core/container/globals.py @@ -198,7 +198,8 @@ def load_validators(self): core_api = self.config.get_core_api_config() if core_api.get("enable", False): self.quota_mgr = QuotaMgr(core_api_host=core_api.get(Constants.PROPERTY_CONF_HOST), - token=core_api.get(Constants.TOKEN, "")) + token=core_api.get(Constants.TOKEN, ""), + logger=self.log) def load_config(self): """ diff --git a/fabric_cf/actor/core/util/qutoa_mgr.py b/fabric_cf/actor/core/util/qutoa_mgr.py index 4c15851c..feb492e8 100644 --- a/fabric_cf/actor/core/util/qutoa_mgr.py +++ b/fabric_cf/actor/core/util/qutoa_mgr.py @@ -23,6 +23,8 @@ # # # Author: Komal Thareja (kthare10@renci.org) +import logging + from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro from fabrictestbed.external_api.core_api import CoreApi from fabrictestbed.slice_editor import InstanceCatalog @@ -32,8 +34,9 @@ class QuotaMgr: - def __init__(self, *, core_api_host: str, token: str): + def __init__(self, *, core_api_host: str, token: str, logger: logging.Logger): self.core_api = CoreApi(core_api_host=core_api_host, token=token) + self.logger = logger def list_quotas(self, project_uuid: str, offset: int = 0, limit: int = 200) -> dict[tuple[str, str], dict]: quota_list = self.core_api.list_quotas(project_uuid=project_uuid, offset=offset, limit=limit) @@ -75,13 +78,13 @@ def update_quota(self, reservation: ABCReservationMixin): sliver_quota_usage = self.extract_quota_usage(sliver=sliver, duration=duration) - print(f"Existing: {existing_quotas}") - print(f"Updated by: {sliver_quota_usage}") + self.logger.debug(f"Existing: {existing_quotas}") + self.logger.debug(f"Updated by: {sliver_quota_usage}") # Check each accumulated resource usage against its quota for quota_key, total_duration in sliver_quota_usage.items(): existing = existing_quotas.get(quota_key) - print(f"No quota available for: prj:{project_id} quota_key:{quota_key}: quota: {existing}") + self.logger.debug(f"No quota available for: prj:{project_id} quota_key:{quota_key}: quota: {existing}") if not existing: continue @@ -103,9 +106,9 @@ def update_quota(self, reservation: ABCReservationMixin): resource_unit=existing.get("resource_unit"), quota_used=usage) except Exception as e: - print(f"Failed to update Quota: {e}") + self.logger.error(f"Failed to update Quota: {e}") finally: - print("done") + self.logger.debug("done") @staticmethod def extract_quota_usage(sliver, duration: float) -> dict[tuple[str, str], float]: @@ -183,4 +186,4 @@ def enforce_quota_limits(self, quotas: dict, computed_reservations: list[LeaseRe # If all checks pass return True, None except Exception as e: - raise Exception(f"Error while checking reservation: {str(e)}") + self.logger.error(f"Error while checking reservation: {str(e)}") From 98852b2ec2d5bd3ca5ab019c0a851ba78dbe3fd5 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Mon, 10 Feb 2025 17:13:45 -0500 Subject: [PATCH 08/29] fix import --- fabric_cf/actor/db/psql_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fabric_cf/actor/db/psql_database.py b/fabric_cf/actor/db/psql_database.py index 3fcae909..860ca43d 100644 --- a/fabric_cf/actor/db/psql_database.py +++ b/fabric_cf/actor/db/psql_database.py @@ -36,7 +36,7 @@ from fabric_cf.actor.core.common.constants import Constants from fabric_cf.actor.core.common.exceptions import DatabaseException from fabric_cf.actor.db import Base, Clients, ConfigMappings, Proxies, Units, Reservations, Slices, ManagerObjects, \ - Miscellaneous, Actors, Delegations, Sites, Poas, Components, Metrics, Quotas + Miscellaneous, Actors, Delegations, Sites, Poas, Components, Metrics @contextmanager From 19835f14b118edd90992b96cf886dba10ab0769d Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Mon, 10 Feb 2025 17:15:31 -0500 Subject: [PATCH 09/29] fix import --- fabric_cf/actor/core/plugins/db/actor_database.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fabric_cf/actor/core/plugins/db/actor_database.py b/fabric_cf/actor/core/plugins/db/actor_database.py index 8bb1e87d..1c58b010 100644 --- a/fabric_cf/actor/core/plugins/db/actor_database.py +++ b/fabric_cf/actor/core/plugins/db/actor_database.py @@ -45,7 +45,6 @@ from fabric_cf.actor.core.plugins.handlers.configuration_mapping import ConfigurationMapping from fabric_cf.actor.core.container.maintenance import Site from fabric_cf.actor.core.util.id import ID -from fabric_cf.actor.core.util.utils import extract_quota_usage from fabric_cf.actor.db.psql_database import PsqlDatabase From 283bcd8077522d2ed35f1e58fe05ab35d4bba28c Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Mon, 10 Feb 2025 17:29:26 -0500 Subject: [PATCH 10/29] fix import --- fabric_cf/orchestrator/core/orchestrator_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fabric_cf/orchestrator/core/orchestrator_handler.py b/fabric_cf/orchestrator/core/orchestrator_handler.py index 87ea9bcd..45ad3aa1 100644 --- a/fabric_cf/orchestrator/core/orchestrator_handler.py +++ b/fabric_cf/orchestrator/core/orchestrator_handler.py @@ -327,7 +327,7 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key self.check_maintenance_mode(token=fabric_token, reservations=computed_reservations) if self.quota_mgr: - quotas = self.quota_mgr.list_quotas(project_id=project) + quotas = self.quota_mgr.list_quotas(project_uuid=project) status, error_message = self.quota_mgr.enforce_quota_limits(quotas=quotas, computed_reservations=computed_reservations, duration=(end_time-start_time).total_seconds()/3600) From 764b681da2832c617d5967af245dbe9b733bc726 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Wed, 12 Feb 2025 11:23:56 -0500 Subject: [PATCH 11/29] lower str for key name --- fabric_cf/actor/core/util/qutoa_mgr.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fabric_cf/actor/core/util/qutoa_mgr.py b/fabric_cf/actor/core/util/qutoa_mgr.py index feb492e8..472799cd 100644 --- a/fabric_cf/actor/core/util/qutoa_mgr.py +++ b/fabric_cf/actor/core/util/qutoa_mgr.py @@ -42,7 +42,7 @@ def list_quotas(self, project_uuid: str, offset: int = 0, limit: int = 200) -> d quota_list = self.core_api.list_quotas(project_uuid=project_uuid, offset=offset, limit=limit) quotas = {} for q in quota_list: - quotas[(q.get("resource_type"), q.get("resource_unit"))] = q + quotas[(q.get("resource_type").lower(), q.get("resource_unit").lower())] = q return quotas def update_quota(self, reservation: ABCReservationMixin): @@ -141,7 +141,7 @@ def extract_quota_usage(sliver, duration: float) -> dict[tuple[str, str], float] # Extract component hours (e.g., GPU, FPGA, SmartNIC) if sliver.attached_components_info: for c in sliver.attached_components_info.devices.values(): - component_type = str(c.get_type()) + component_type = str(c.get_type()).lower() requested_resources[(component_type, unit)] = ( requested_resources.get((component_type, unit), 0) + duration ) From 85fa2f11fc1454b5ca4f55591f87df0bbcabba87 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Wed, 12 Feb 2025 11:37:46 -0500 Subject: [PATCH 12/29] lower str for key name --- fabric_cf/actor/core/util/qutoa_mgr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fabric_cf/actor/core/util/qutoa_mgr.py b/fabric_cf/actor/core/util/qutoa_mgr.py index 472799cd..79cb192f 100644 --- a/fabric_cf/actor/core/util/qutoa_mgr.py +++ b/fabric_cf/actor/core/util/qutoa_mgr.py @@ -119,7 +119,7 @@ def extract_quota_usage(sliver, duration: float) -> dict[tuple[str, str], float] @param duration: Number of hours the resources are requested for. @return: A dictionary of resource type/unit tuples to requested amounts. """ - unit = "HOURS" + unit = "HOURS".lower() requested_resources = {} # Check if the sliver is a NodeSliver From 6c86098fcded7c94d4877cb1204a66c08c5ce0ff Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Wed, 12 Feb 2025 11:44:55 -0500 Subject: [PATCH 13/29] lower str for key name --- fabric_cf/actor/core/util/qutoa_mgr.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fabric_cf/actor/core/util/qutoa_mgr.py b/fabric_cf/actor/core/util/qutoa_mgr.py index 79cb192f..ae73948f 100644 --- a/fabric_cf/actor/core/util/qutoa_mgr.py +++ b/fabric_cf/actor/core/util/qutoa_mgr.py @@ -96,7 +96,7 @@ def update_quota(self, reservation: ABCReservationMixin): self.core_api.update_quota(uuid=existing.get("uuid"), project_id=project_id, resource_type=existing.get("resource_type"), resource_unit=existing.get("resource_unit"), - quota_used=usage) + quota_used=usage, quota_limit=existing.get("quota_limit")) # Account for resource hours used for a new or extended sliver else: @@ -104,7 +104,7 @@ def update_quota(self, reservation: ABCReservationMixin): self.core_api.update_quota(uuid=existing.get("uuid"), project_id=project_id, resource_type=existing.get("resource_type"), resource_unit=existing.get("resource_unit"), - quota_used=usage) + quota_used=usage, quota_limit=existing.get("quota_limit")) except Exception as e: self.logger.error(f"Failed to update Quota: {e}") finally: From f695e7c9e3ee323d0bb8105f154d9de0cf914465 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Wed, 12 Feb 2025 11:50:12 -0500 Subject: [PATCH 14/29] lower str for key name --- fabric_cf/actor/core/util/qutoa_mgr.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fabric_cf/actor/core/util/qutoa_mgr.py b/fabric_cf/actor/core/util/qutoa_mgr.py index ae73948f..bc5b7f0f 100644 --- a/fabric_cf/actor/core/util/qutoa_mgr.py +++ b/fabric_cf/actor/core/util/qutoa_mgr.py @@ -84,8 +84,9 @@ def update_quota(self, reservation: ABCReservationMixin): # Check each accumulated resource usage against its quota for quota_key, total_duration in sliver_quota_usage.items(): existing = existing_quotas.get(quota_key) - self.logger.debug(f"No quota available for: prj:{project_id} quota_key:{quota_key}: quota: {existing}") + self.logger.debug(f"Quota update requested for: prj:{project_id} quota_key:{quota_key}: quota: {existing}") if not existing: + self.logger.debug("Existing not found so skipping!") continue # Return resource hours for a sliver deleted before expiry From 34b4de1afa14f5eceac863de5e4147d990e67b8b Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Wed, 12 Feb 2025 19:51:42 -0500 Subject: [PATCH 15/29] reformatted the file --- fabric_cf/actor/core/util/qutoa_mgr.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/fabric_cf/actor/core/util/qutoa_mgr.py b/fabric_cf/actor/core/util/qutoa_mgr.py index bc5b7f0f..721a5f63 100644 --- a/fabric_cf/actor/core/util/qutoa_mgr.py +++ b/fabric_cf/actor/core/util/qutoa_mgr.py @@ -130,14 +130,18 @@ def extract_quota_usage(sliver, duration: float) -> dict[tuple[str, str], float] allocations = sliver.get_capacity_allocations() if not allocations and sliver.get_capacity_hints(): catalog = InstanceCatalog() - allocations = catalog.get_instance_capacities(instance_type=sliver.get_capacity_hints().instance_type) + allocations = catalog.get_instance_capacities(instance_type= + sliver.get_capacity_hints().instance_type) else: allocations = sliver.get_capacities() # Extract Core, Ram, Disk Hours - requested_resources[("core", unit)] = requested_resources.get(("core", unit), 0) + (duration * allocations.core) - requested_resources[("ram", unit)] = requested_resources.get(("ram", unit), 0) + (duration * allocations.ram) - requested_resources[("disk", unit)] = requested_resources.get(("disk", unit), 0) + (duration * allocations.disk) + requested_resources[("core", unit)] = requested_resources.get(("core", unit), 0) + \ + (duration * allocations.core) + requested_resources[("ram", unit)] = requested_resources.get(("ram", unit), 0) +\ + (duration * allocations.ram) + requested_resources[("disk", unit)] = requested_resources.get(("disk", unit), 0) + \ + (duration * allocations.disk) # Extract component hours (e.g., GPU, FPGA, SmartNIC) if sliver.attached_components_info: From 980d6ceb1d8de4e1b172cbd7dfdb0bd8a5332e51 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 08:20:23 -0500 Subject: [PATCH 16/29] rename project_id to project_uuid to match with core api --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 4d83f953..c699538f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ dependencies = [ "fabric_fss_utils==1.6.0", "fabric-message-bus==1.7.0", "fabric-fim==1.8.1", - "fabrictestbed==1.8.2b0", + "fabrictestbed==1.8.2b1", "ansible" ] From d1c5461c227fa7cb5f1eec0665417d5b47c96ba0 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 08:21:32 -0500 Subject: [PATCH 17/29] fix spell mistake in file name --- fabric_cf/actor/core/container/globals.py | 2 +- fabric_cf/actor/core/util/{qutoa_mgr.py => quota_mgr.py} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename fabric_cf/actor/core/util/{qutoa_mgr.py => quota_mgr.py} (100%) diff --git a/fabric_cf/actor/core/container/globals.py b/fabric_cf/actor/core/container/globals.py index 86328c19..c8e5c961 100644 --- a/fabric_cf/actor/core/container/globals.py +++ b/fabric_cf/actor/core/container/globals.py @@ -35,7 +35,7 @@ import logging import os -from fabric_cf.actor.core.util.qutoa_mgr import QuotaMgr +from fabric_cf.actor.core.util.quota_mgr import QuotaMgr from fim.graph.neo4j_property_graph import Neo4jGraphImporter from fim.graph.resources.abc_arm import ABCARMPropertyGraph from fss_utils.jwt_validate import JWTValidator diff --git a/fabric_cf/actor/core/util/qutoa_mgr.py b/fabric_cf/actor/core/util/quota_mgr.py similarity index 100% rename from fabric_cf/actor/core/util/qutoa_mgr.py rename to fabric_cf/actor/core/util/quota_mgr.py From e6fa0a965e7440c9e0e52374e2ea9ea39da4d7a2 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 09:55:06 -0500 Subject: [PATCH 18/29] fix spell mistake in file name --- fabric_cf/actor/core/util/quota_mgr.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fabric_cf/actor/core/util/quota_mgr.py b/fabric_cf/actor/core/util/quota_mgr.py index 721a5f63..b9e5f4ac 100644 --- a/fabric_cf/actor/core/util/quota_mgr.py +++ b/fabric_cf/actor/core/util/quota_mgr.py @@ -94,7 +94,7 @@ def update_quota(self, reservation: ABCReservationMixin): usage = existing.get("quota_used") - total_duration if usage < 0: usage = 0 - self.core_api.update_quota(uuid=existing.get("uuid"), project_id=project_id, + self.core_api.update_quota(uuid=existing.get("uuid"), project_uuid=project_id, resource_type=existing.get("resource_type"), resource_unit=existing.get("resource_unit"), quota_used=usage, quota_limit=existing.get("quota_limit")) @@ -102,7 +102,7 @@ def update_quota(self, reservation: ABCReservationMixin): # Account for resource hours used for a new or extended sliver else: usage = total_duration + existing.get("quota_used") - self.core_api.update_quota(uuid=existing.get("uuid"), project_id=project_id, + self.core_api.update_quota(uuid=existing.get("uuid"), project_uuid=project_id, resource_type=existing.get("resource_type"), resource_unit=existing.get("resource_unit"), quota_used=usage, quota_limit=existing.get("quota_limit")) From 3b492f7278a00c4bd58049ac1e0545fd9c494c1e Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 10:31:06 -0500 Subject: [PATCH 19/29] update quota from broker to ensure updates by multiple slivers from multiple slices are handled --- fabric_cf/actor/core/kernel/kernel.py | 7 ------- .../actor/core/policy/broker_simpler_units_policy.py | 10 ++++++++++ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/fabric_cf/actor/core/kernel/kernel.py b/fabric_cf/actor/core/kernel/kernel.py index 82ca0969..d853cafe 100644 --- a/fabric_cf/actor/core/kernel/kernel.py +++ b/fabric_cf/actor/core/kernel/kernel.py @@ -220,10 +220,6 @@ def close(self, *, reservation: ABCReservationMixin, force: bool = False): self.policy.close(reservation=reservation) reservation.close(force=force) self.plugin.get_database().update_reservation(reservation=reservation) - if reservation.get_term().get_remaining_length() > 0: - from fabric_cf.actor.core.container.globals import GlobalsSingleton - if GlobalsSingleton.get().get_quota_mgr(): - GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation) reservation.service_close() except Exception as e: err = f"An error occurred during close for reservation #{reservation.get_reservation_id()}" @@ -1458,9 +1454,6 @@ def update_ticket(self, *, reservation: ABCReservationMixin, update: Reservation self.plugin.get_database().update_reservation(reservation=reservation) if not reservation.is_failed(): reservation.service_update_ticket() - from fabric_cf.actor.core.container.globals import GlobalsSingleton - if GlobalsSingleton.get().get_quota_mgr(): - GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation) except Exception as e: self.logger.error(traceback.format_exc()) self.error(err=f"An error occurred during update ticket for " diff --git a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py index 8ad4e5b8..d0686409 100644 --- a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py +++ b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py @@ -1115,6 +1115,11 @@ def ticket_inventory(self, *, reservation: ABCBrokerReservation, inv: InventoryF if node_id_to_reservations.get(node_id, None) is None: node_id_to_reservations[node_id] = ReservationSet() node_id_to_reservations[node_id].add(reservation=reservation) + + from fabric_cf.actor.core.container.globals import GlobalsSingleton + if GlobalsSingleton.get().get_quota_mgr(): + GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation) + self.logger.debug(f"Ticket Inventory returning: True {error_msg}") return True, node_id_to_reservations, error_msg except Exception as e: @@ -1204,6 +1209,11 @@ def issue_ticket(self, *, reservation: ABCBrokerReservation, units: int, rtype: return reservation def release(self, *, reservation): + if reservation.get_term().get_remaining_length() > 0: + from fabric_cf.actor.core.container.globals import GlobalsSingleton + if GlobalsSingleton.get().get_quota_mgr(): + GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation) + if isinstance(reservation, ABCBrokerReservation): self.logger.debug("Broker reservation") super().release(reservation=reservation) From 8b49e818fe1e5c85cadd8e936646c0752d0ea9cb Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 10:45:26 -0500 Subject: [PATCH 20/29] extract sliver from reservation at broker --- fabric_cf/actor/core/policy/inventory_for_type.py | 2 +- fabric_cf/actor/core/policy/network_node_inventory.py | 4 ++-- fabric_cf/actor/core/policy/network_service_inventory.py | 6 +++--- fabric_cf/actor/core/util/quota_mgr.py | 5 ++++- fabric_cf/broker/config.broker.yaml | 2 +- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/fabric_cf/actor/core/policy/inventory_for_type.py b/fabric_cf/actor/core/policy/inventory_for_type.py index a8ec6b5c..6564d5b3 100644 --- a/fabric_cf/actor/core/policy/inventory_for_type.py +++ b/fabric_cf/actor/core/policy/inventory_for_type.py @@ -59,7 +59,7 @@ def free(self, *, count: int, request: dict = None, resource: dict = None) -> di """ @staticmethod - def _get_allocated_sliver(reservation: ABCReservationMixin) -> BaseSliver: + def get_allocated_sliver(reservation: ABCReservationMixin) -> BaseSliver: """ Retrieve the allocated sliver from the reservation. diff --git a/fabric_cf/actor/core/policy/network_node_inventory.py b/fabric_cf/actor/core/policy/network_node_inventory.py index df3e4895..055f1845 100644 --- a/fabric_cf/actor/core/policy/network_node_inventory.py +++ b/fabric_cf/actor/core/policy/network_node_inventory.py @@ -69,7 +69,7 @@ def check_capacities(*, rid: ID, requested_capacities: Capacities, delegated: De if rid == reservation.get_reservation_id(): continue # For Active or Ticketed or Ticketing reservations; reduce the counts from available - resource_sliver = InventoryForType._get_allocated_sliver(reservation=reservation) + resource_sliver = InventoryForType.get_allocated_sliver(reservation=reservation) if resource_sliver is not None and isinstance(resource_sliver, NodeSliver): logger.debug( @@ -380,7 +380,7 @@ def __exclude_components_for_existing_reservations(*, rid: ID, graph_node: NodeS (operation == ReservationOperation.Extend or not reservation.is_ticketed()): continue # For Active or Ticketed or Ticketing reservations; reduce the counts from available - allocated_sliver = InventoryForType._get_allocated_sliver(reservation=reservation) + allocated_sliver = InventoryForType.get_allocated_sliver(reservation=reservation) if reservation.is_extending_ticket() and reservation.get_requested_resources() is not None and \ reservation.get_requested_resources().get_sliver() is not None: diff --git a/fabric_cf/actor/core/policy/network_service_inventory.py b/fabric_cf/actor/core/policy/network_service_inventory.py index 86612fe3..91f44613 100644 --- a/fabric_cf/actor/core/policy/network_service_inventory.py +++ b/fabric_cf/actor/core/policy/network_service_inventory.py @@ -76,7 +76,7 @@ def __exclude_allocated_vlans(self, *, rid: ID, available_vlan_range: List[int], continue # For Active or Ticketed or Ticketing reservations; reduce the counts from available - allocated_sliver = self._get_allocated_sliver(reservation=reservation) + allocated_sliver = self.get_allocated_sliver(reservation=reservation) self.logger.debug( f"Existing res# {reservation.get_reservation_id()} state:{reservation.get_state()} " @@ -278,7 +278,7 @@ def allocate_vnic(self, *, rid: ID, requested_ns: NetworkServiceSliver, owner_ns continue # For Active or Ticketed or Ticketing reservations; reduce the counts from available - allocated_sliver = self._get_allocated_sliver(reservation=reservation) + allocated_sliver = self.get_allocated_sliver(reservation=reservation) self.logger.debug(f"Existing res# {reservation.get_reservation_id()} " f"allocated: {allocated_sliver}") @@ -413,7 +413,7 @@ def _exclude_allocated_subnets(self, *, subnet_list: List, requested_ns_type: st if rid == reservation.get_reservation_id(): continue - allocated_sliver = self._get_allocated_sliver(reservation) + allocated_sliver = self.get_allocated_sliver(reservation) if allocated_sliver is None: continue diff --git a/fabric_cf/actor/core/util/quota_mgr.py b/fabric_cf/actor/core/util/quota_mgr.py index b9e5f4ac..c02fbc40 100644 --- a/fabric_cf/actor/core/util/quota_mgr.py +++ b/fabric_cf/actor/core/util/quota_mgr.py @@ -31,6 +31,7 @@ from fim.slivers.network_node import NodeSliver from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin +from fabric_cf.actor.core.policy.inventory_for_type import InventoryForType class QuotaMgr: @@ -54,13 +55,15 @@ def update_quota(self, reservation: ABCReservationMixin): if not project_id: return - sliver = None + sliver = InventoryForType.get_allocated_sliver(reservation=reservation) + ''' from fabric_cf.actor.core.kernel.reservation_client import ReservationClient if isinstance(reservation, ReservationClient) and reservation.get_leased_resources() and \ reservation.get_leased_resources().get_sliver(): sliver = reservation.get_leased_resources().get_sliver() if not sliver and reservation.get_resources() and reservation.get_resources().get_sliver(): sliver = reservation.get_resources().get_sliver() + ''' if not sliver: return diff --git a/fabric_cf/broker/config.broker.yaml b/fabric_cf/broker/config.broker.yaml index af6b1ad0..acea9a27 100644 --- a/fabric_cf/broker/config.broker.yaml +++ b/fabric_cf/broker/config.broker.yaml @@ -83,7 +83,7 @@ oauth: verify-exp: True core_api: - enable: False + enable: True host: https://uis.fabric-testbed.net token: From 1474e97f40efd37d1cff0c593b54f07a6cc39880 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 10:46:40 -0500 Subject: [PATCH 21/29] extract sliver from reservation at broker --- fabric_cf/actor/core/policy/inventory_for_type.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fabric_cf/actor/core/policy/inventory_for_type.py b/fabric_cf/actor/core/policy/inventory_for_type.py index 6564d5b3..5875ac5c 100644 --- a/fabric_cf/actor/core/policy/inventory_for_type.py +++ b/fabric_cf/actor/core/policy/inventory_for_type.py @@ -70,3 +70,5 @@ def get_allocated_sliver(reservation: ABCReservationMixin) -> BaseSliver: return reservation.get_approved_resources().get_sliver() if (reservation.is_active() or reservation.is_ticketed()) and reservation.get_resources() is not None: return reservation.get_resources().get_sliver() + if (reservation.is_closed()) and reservation.get_resources() is not None: + return reservation.get_resources().get_sliver() From 2543f24dc342d500eb32c91780a612a36ccb2080 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 11:03:56 -0500 Subject: [PATCH 22/29] explicitly pass term --- fabric_cf/actor/core/policy/broker_simpler_units_policy.py | 5 +++-- fabric_cf/actor/core/util/quota_mgr.py | 7 ++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py index d0686409..39e13c71 100644 --- a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py +++ b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py @@ -1118,7 +1118,7 @@ def ticket_inventory(self, *, reservation: ABCBrokerReservation, inv: InventoryF from fabric_cf.actor.core.container.globals import GlobalsSingleton if GlobalsSingleton.get().get_quota_mgr(): - GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation) + GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation, term=term) self.logger.debug(f"Ticket Inventory returning: True {error_msg}") return True, node_id_to_reservations, error_msg @@ -1212,7 +1212,8 @@ def release(self, *, reservation): if reservation.get_term().get_remaining_length() > 0: from fabric_cf.actor.core.container.globals import GlobalsSingleton if GlobalsSingleton.get().get_quota_mgr(): - GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation) + GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation, + term=reservation.get_term()) if isinstance(reservation, ABCBrokerReservation): self.logger.debug("Broker reservation") diff --git a/fabric_cf/actor/core/util/quota_mgr.py b/fabric_cf/actor/core/util/quota_mgr.py index c02fbc40..2527291b 100644 --- a/fabric_cf/actor/core/util/quota_mgr.py +++ b/fabric_cf/actor/core/util/quota_mgr.py @@ -25,6 +25,7 @@ # Author: Komal Thareja (kthare10@renci.org) import logging +from fabric_cf.actor.core.time.term import Term from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro from fabrictestbed.external_api.core_api import CoreApi from fabrictestbed.slice_editor import InstanceCatalog @@ -46,7 +47,7 @@ def list_quotas(self, project_uuid: str, offset: int = 0, limit: int = 200) -> d quotas[(q.get("resource_type").lower(), q.get("resource_unit").lower())] = q return quotas - def update_quota(self, reservation: ABCReservationMixin): + def update_quota(self, reservation: ABCReservationMixin, term: Term): try: slice_object = reservation.get_slice() if not slice_object: @@ -69,9 +70,9 @@ def update_quota(self, reservation: ABCReservationMixin): return if reservation.is_closed() or reservation.is_closing(): - duration = reservation.get_term().get_remaining_length() + duration = term.get_remaining_length() else: - duration = reservation.get_term().get_length() + duration = term.get_length() if duration < 60: return From ead7521dcdff4fc35181b6d7fbf6c4d7cd51e368 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 11:16:36 -0500 Subject: [PATCH 23/29] explicitly pass term --- fabric_cf/actor/core/util/quota_mgr.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/fabric_cf/actor/core/util/quota_mgr.py b/fabric_cf/actor/core/util/quota_mgr.py index 2527291b..99dd40bf 100644 --- a/fabric_cf/actor/core/util/quota_mgr.py +++ b/fabric_cf/actor/core/util/quota_mgr.py @@ -98,18 +98,14 @@ def update_quota(self, reservation: ABCReservationMixin, term: Term): usage = existing.get("quota_used") - total_duration if usage < 0: usage = 0 - self.core_api.update_quota(uuid=existing.get("uuid"), project_uuid=project_id, - resource_type=existing.get("resource_type"), - resource_unit=existing.get("resource_unit"), - quota_used=usage, quota_limit=existing.get("quota_limit")) - # Account for resource hours used for a new or extended sliver else: usage = total_duration + existing.get("quota_used") - self.core_api.update_quota(uuid=existing.get("uuid"), project_uuid=project_id, - resource_type=existing.get("resource_type"), - resource_unit=existing.get("resource_unit"), - quota_used=usage, quota_limit=existing.get("quota_limit")) + + self.core_api.update_quota(uuid=existing.get("uuid"), project_uuid=project_id, + resource_type=existing.get("resource_type"), + resource_unit=existing.get("resource_unit"), + quota_used=usage, quota_limit=existing.get("quota_limit")) except Exception as e: self.logger.error(f"Failed to update Quota: {e}") finally: From e42c6b3f4a0b2be581ae5da7804bfadd03c22612 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 13:53:45 -0500 Subject: [PATCH 24/29] quota updates and enforcement at broker --- .../policy/broker_simpler_units_policy.py | 12 +++++ fabric_cf/actor/core/util/quota_mgr.py | 46 ++++++++++--------- fabric_cf/authority/test/test.yaml | 5 ++ fabric_cf/broker/test/test.yaml | 5 ++ .../orchestrator/core/orchestrator_handler.py | 9 ---- fabric_cf/orchestrator/test/test.yaml | 5 ++ 6 files changed, 52 insertions(+), 30 deletions(-) diff --git a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py index 39e13c71..eca413d7 100644 --- a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py +++ b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py @@ -1075,8 +1075,20 @@ def ticket_inventory(self, *, reservation: ABCBrokerReservation, inv: InventoryF try: if operation == ReservationOperation.Extend: rset = reservation.get_resources() + duration = term.get_length() else: rset = reservation.get_requested_resources() + duration = term.get_full_length() + + from fabric_cf.actor.core.container.globals import GlobalsSingleton + if GlobalsSingleton.get().get_quota_mgr(): + status, error_msg = GlobalsSingleton.get().get_quota_mgr().enforce_quota_limits(reservation=reservation, + duration=duration) + self.logger.info(f"Quota enforcement status: {status}, error: {error_msg}") + # TODO: enable enforcement action later + #if not status: + # return status, node_id_to_reservations, error_msg + needed = rset.get_units() # for network node slivers diff --git a/fabric_cf/actor/core/util/quota_mgr.py b/fabric_cf/actor/core/util/quota_mgr.py index 99dd40bf..87d2713d 100644 --- a/fabric_cf/actor/core/util/quota_mgr.py +++ b/fabric_cf/actor/core/util/quota_mgr.py @@ -24,6 +24,7 @@ # # Author: Komal Thareja (kthare10@renci.org) import logging +from typing import Any from fabric_cf.actor.core.time.term import Term from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro @@ -88,31 +89,30 @@ def update_quota(self, reservation: ABCReservationMixin, term: Term): # Check each accumulated resource usage against its quota for quota_key, total_duration in sliver_quota_usage.items(): existing = existing_quotas.get(quota_key) + usage = 0 self.logger.debug(f"Quota update requested for: prj:{project_id} quota_key:{quota_key}: quota: {existing}") + print( + f"Quota update requested for: prj:{project_id} quota_key:{quota_key}: quota: {existing}") if not existing: self.logger.debug("Existing not found so skipping!") + print("Existing not found so skipping!") continue # Return resource hours for a sliver deleted before expiry if reservation.is_closing() or reservation.is_closed(): - usage = existing.get("quota_used") - total_duration - if usage < 0: - usage = 0 + usage -= total_duration # Account for resource hours used for a new or extended sliver else: - usage = total_duration + existing.get("quota_used") + usage += total_duration - self.core_api.update_quota(uuid=existing.get("uuid"), project_uuid=project_id, - resource_type=existing.get("resource_type"), - resource_unit=existing.get("resource_unit"), - quota_used=usage, quota_limit=existing.get("quota_limit")) + self.core_api.update_quota_usage(uuid=existing.get("uuid"), project_uuid=project_id, quota_used=usage) except Exception as e: self.logger.error(f"Failed to update Quota: {e}") finally: self.logger.debug("done") @staticmethod - def extract_quota_usage(sliver, duration: float) -> dict[tuple[str, str], float]: + def extract_quota_usage(sliver: NodeSliver, duration: float) -> dict[tuple[str, str], float]: """ Extract quota usage from a sliver @@ -153,29 +153,33 @@ def extract_quota_usage(sliver, duration: float) -> dict[tuple[str, str], float] return requested_resources - def enforce_quota_limits(self, quotas: dict, computed_reservations: list[LeaseReservationAvro], - duration: float) -> tuple[bool, str]: + def enforce_quota_limits(self, reservation: ABCReservationMixin, duration: float) -> tuple[bool, Any]: """ Check if the requested resources for multiple reservations are within the project's quota limits. - @param quotas: Quota Limits for various resource types. - @param computed_reservations: List of slivers requested. + @param reservation: Reservation. @param duration: Number of hours the reservations are requested for. @return: Tuple (True, None) if resources are within quota, or (False, message) if denied. @throws: Exception if there is an error during the database interaction. """ try: - requested_resources = {} + slice_object = reservation.get_slice() + if not slice_object: + return False, None + project_uuid = slice_object.get_project_id() + if not project_uuid: + return False, None + + sliver = InventoryForType.get_allocated_sliver(reservation=reservation) + if not sliver: + self.logger.info("No sliver found!") + return False, None - # Accumulate resource usage for all reservations - for r in computed_reservations: - sliver = r.get_sliver() - sliver_resources = self.extract_quota_usage(sliver, duration) - for key, value in sliver_resources.items(): - requested_resources[key] = requested_resources.get(key, 0) + value + sliver_resources = self.extract_quota_usage(sliver, duration) + quotas = self.list_quotas(project_uuid=project_uuid) # Check each accumulated resource usage against its quota - for quota_key, total_requested_duration in requested_resources.items(): + for quota_key, total_requested_duration in sliver_resources.items(): if quota_key not in quotas: return False, f"Quota not defined for resource: {quota_key[0]} ({quota_key[1]})." diff --git a/fabric_cf/authority/test/test.yaml b/fabric_cf/authority/test/test.yaml index 41a638d6..533405ba 100644 --- a/fabric_cf/authority/test/test.yaml +++ b/fabric_cf/authority/test/test.yaml @@ -89,6 +89,11 @@ database: container: container.guid: site1-am-conainer +core_api: + host: https://alpha-6.fabric-testbed.net + token: + enable: False + time: # This section controls settings, which are generally useful # when running under emulation. These settings allow you to diff --git a/fabric_cf/broker/test/test.yaml b/fabric_cf/broker/test/test.yaml index de894fa3..6de9624c 100644 --- a/fabric_cf/broker/test/test.yaml +++ b/fabric_cf/broker/test/test.yaml @@ -72,6 +72,11 @@ logging: logger: broker +core_api: + host: https://alpha-6.fabric-testbed.net + token: + enable: True + oauth: jwks-url: https://alpha-2.fabric-testbed.net/credmgr/certs # Uses HH:MM:SS (less than 24 hours) diff --git a/fabric_cf/orchestrator/core/orchestrator_handler.py b/fabric_cf/orchestrator/core/orchestrator_handler.py index 45ad3aa1..427d8c86 100644 --- a/fabric_cf/orchestrator/core/orchestrator_handler.py +++ b/fabric_cf/orchestrator/core/orchestrator_handler.py @@ -61,7 +61,6 @@ def __init__(self): self.controller_state = OrchestratorKernelSingleton.get() from fabric_cf.actor.core.container.globals import GlobalsSingleton self.globals = GlobalsSingleton.get() - self.quota_mgr = self.globals.get_quota_mgr() self.logger = self.globals.get_logger() self.jwks_url = self.globals.get_config().get_oauth_config().get(Constants.PROPERTY_CONF_O_AUTH_JWKS_URL, None) self.pdp_config = self.globals.get_config().get_global_config().get_pdp_config() @@ -326,14 +325,6 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key # Check if Testbed in Maintenance or Site in Maintenance self.check_maintenance_mode(token=fabric_token, reservations=computed_reservations) - if self.quota_mgr: - quotas = self.quota_mgr.list_quotas(project_uuid=project) - status, error_message = self.quota_mgr.enforce_quota_limits(quotas=quotas, - computed_reservations=computed_reservations, - duration=(end_time-start_time).total_seconds()/3600) - - self.logger.info(f"Quota enforcement status: {status}, error: {error_message}") - create_ts = time.time() if lease_start_time and lease_end_time and lifetime: # Enqueue future slices on Advanced Scheduling Thread to determine possible start time diff --git a/fabric_cf/orchestrator/test/test.yaml b/fabric_cf/orchestrator/test/test.yaml index 4560906d..834fba24 100644 --- a/fabric_cf/orchestrator/test/test.yaml +++ b/fabric_cf/orchestrator/test/test.yaml @@ -70,6 +70,11 @@ logging: logger: orchestrator +core_api: + host: https://alpha-6.fabric-testbed.net + token: + enable: True + oauth: jwks-url: https://alpha-2.fabric-testbed.net/credmgr/certs # Uses HH:MM:SS (less than 24 hours) From 5dd87177d040146fc501b4bb9904069153233aaf Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 13:54:47 -0500 Subject: [PATCH 25/29] quota updates and enforcement at broker --- fabric_cf/orchestrator/config.orchestrator.yaml | 2 +- fabric_cf/orchestrator/test/test.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fabric_cf/orchestrator/config.orchestrator.yaml b/fabric_cf/orchestrator/config.orchestrator.yaml index c8382a45..6f18e525 100644 --- a/fabric_cf/orchestrator/config.orchestrator.yaml +++ b/fabric_cf/orchestrator/config.orchestrator.yaml @@ -87,7 +87,7 @@ oauth: verify-exp: True core_api: - enable: True + enable: False host: https://uis.fabric-testbed.net token: diff --git a/fabric_cf/orchestrator/test/test.yaml b/fabric_cf/orchestrator/test/test.yaml index 834fba24..0280c048 100644 --- a/fabric_cf/orchestrator/test/test.yaml +++ b/fabric_cf/orchestrator/test/test.yaml @@ -72,8 +72,8 @@ logging: core_api: host: https://alpha-6.fabric-testbed.net - token: - enable: True + token: + enable: False oauth: jwks-url: https://alpha-2.fabric-testbed.net/credmgr/certs From a0d003248948783b47f7c851ebaeb52fa591b754 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 14:16:44 -0500 Subject: [PATCH 26/29] add comments --- .../policy/broker_simpler_units_policy.py | 7 +-- fabric_cf/actor/core/util/quota_mgr.py | 51 ++++++++++++------- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py index eca413d7..9ffd0cb5 100644 --- a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py +++ b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py @@ -1130,7 +1130,7 @@ def ticket_inventory(self, *, reservation: ABCBrokerReservation, inv: InventoryF from fabric_cf.actor.core.container.globals import GlobalsSingleton if GlobalsSingleton.get().get_quota_mgr(): - GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation, term=term) + GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation, duration=duration) self.logger.debug(f"Ticket Inventory returning: True {error_msg}") return True, node_id_to_reservations, error_msg @@ -1221,11 +1221,12 @@ def issue_ticket(self, *, reservation: ABCBrokerReservation, units: int, rtype: return reservation def release(self, *, reservation): - if reservation.get_term().get_remaining_length() > 0: + duration = reservation.get_term().get_remaining_length() + if duration > 0: from fabric_cf.actor.core.container.globals import GlobalsSingleton if GlobalsSingleton.get().get_quota_mgr(): GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation, - term=reservation.get_term()) + duration=duration) if isinstance(reservation, ABCBrokerReservation): self.logger.debug("Broker reservation") diff --git a/fabric_cf/actor/core/util/quota_mgr.py b/fabric_cf/actor/core/util/quota_mgr.py index 87d2713d..ee524de0 100644 --- a/fabric_cf/actor/core/util/quota_mgr.py +++ b/fabric_cf/actor/core/util/quota_mgr.py @@ -26,8 +26,6 @@ import logging from typing import Any -from fabric_cf.actor.core.time.term import Term -from fabric_mb.message_bus.messages.lease_reservation_avro import LeaseReservationAvro from fabrictestbed.external_api.core_api import CoreApi from fabrictestbed.slice_editor import InstanceCatalog from fim.slivers.network_node import NodeSliver @@ -37,18 +35,42 @@ class QuotaMgr: + """ + Manages resource quotas for projects, including listing, updating, and enforcing limits. + """ def __init__(self, *, core_api_host: str, token: str, logger: logging.Logger): + """ + Initialize the Quota Manager. + + @param core_api_host: The API host for core services. + @param token: Authentication token for API access. + @param logger: Logger instance for logging messages. + """ self.core_api = CoreApi(core_api_host=core_api_host, token=token) self.logger = logger def list_quotas(self, project_uuid: str, offset: int = 0, limit: int = 200) -> dict[tuple[str, str], dict]: + """ + Retrieve the list of quotas for a given project. + + @param project_uuid: Unique identifier for the project. + @param offset: Pagination offset for results (default: 0). + @param limit: Maximum number of quotas to fetch (default: 200). + @return: A dictionary mapping resource type/unit pairs to their quota details. + """ quota_list = self.core_api.list_quotas(project_uuid=project_uuid, offset=offset, limit=limit) quotas = {} for q in quota_list: quotas[(q.get("resource_type").lower(), q.get("resource_unit").lower())] = q return quotas - def update_quota(self, reservation: ABCReservationMixin, term: Term): + def update_quota(self, reservation: ABCReservationMixin, duration: float): + """ + Update the quota usage based on a reservation. + + @param reservation: Reservation object containing resource usage details. + @param duration: Duration in seconds for which the reservation was held. + """ try: slice_object = reservation.get_slice() if not slice_object: @@ -70,11 +92,6 @@ def update_quota(self, reservation: ABCReservationMixin, term: Term): if not sliver: return - if reservation.is_closed() or reservation.is_closing(): - duration = term.get_remaining_length() - else: - duration = term.get_length() - if duration < 60: return @@ -114,11 +131,11 @@ def update_quota(self, reservation: ABCReservationMixin, term: Term): @staticmethod def extract_quota_usage(sliver: NodeSliver, duration: float) -> dict[tuple[str, str], float]: """ - Extract quota usage from a sliver + Extract resource usage details from a given sliver. - @param sliver: The sliver object from which resources are extracted. - @param duration: Number of hours the resources are requested for. - @return: A dictionary of resource type/unit tuples to requested amounts. + @param sliver: The sliver object representing allocated resources. + @param duration: Duration in hours for which resources are requested. + @return: A dictionary mapping resource type/unit pairs to usage amounts. """ unit = "HOURS".lower() requested_resources = {} @@ -155,12 +172,12 @@ def extract_quota_usage(sliver: NodeSliver, duration: float) -> dict[tuple[str, def enforce_quota_limits(self, reservation: ABCReservationMixin, duration: float) -> tuple[bool, Any]: """ - Check if the requested resources for multiple reservations are within the project's quota limits. + Verify whether a reservation's requested resources fit within the project's quota limits. - @param reservation: Reservation. - @param duration: Number of hours the reservations are requested for. - @return: Tuple (True, None) if resources are within quota, or (False, message) if denied. - @throws: Exception if there is an error during the database interaction. + @param reservation: The reservation to check against available quotas. + @param duration: Duration in hours for the reservation request. + @return: Tuple (True, None) if within limits, (False, message) if quota is exceeded. + @throws: Exception if an error occurs during database interaction. """ try: slice_object = reservation.get_slice() From 68b17940804de17c0f2ad17d88ca7a1a47d92e2b Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 14:35:29 -0500 Subject: [PATCH 27/29] enforce quota to pick sliver from requested resources for new requests --- fabric_cf/actor/core/util/quota_mgr.py | 4 ++++ fabric_cf/broker/test/test.yaml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/fabric_cf/actor/core/util/quota_mgr.py b/fabric_cf/actor/core/util/quota_mgr.py index ee524de0..db5d04db 100644 --- a/fabric_cf/actor/core/util/quota_mgr.py +++ b/fabric_cf/actor/core/util/quota_mgr.py @@ -188,6 +188,10 @@ def enforce_quota_limits(self, reservation: ABCReservationMixin, duration: float return False, None sliver = InventoryForType.get_allocated_sliver(reservation=reservation) + + if not sliver and reservation.is_ticketing() and reservation.get_requested_resources(): + sliver = reservation.get_requested_resources().get_sliver() + if not sliver: self.logger.info("No sliver found!") return False, None diff --git a/fabric_cf/broker/test/test.yaml b/fabric_cf/broker/test/test.yaml index 6de9624c..db331edb 100644 --- a/fabric_cf/broker/test/test.yaml +++ b/fabric_cf/broker/test/test.yaml @@ -74,7 +74,7 @@ logging: core_api: host: https://alpha-6.fabric-testbed.net - token: + token: enable: True oauth: From 41b5bc9ac559ff497fcfd018e96959109b18e8b7 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 15:13:22 -0500 Subject: [PATCH 28/29] handle renew with quotas --- .../actor/core/policy/broker_simpler_units_policy.py | 2 +- fabric_cf/actor/core/util/quota_mgr.py | 11 ++++------- fabric_cf/broker/test/test.yaml | 2 +- pyproject.toml | 2 +- 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py index 9ffd0cb5..e157a055 100644 --- a/fabric_cf/actor/core/policy/broker_simpler_units_policy.py +++ b/fabric_cf/actor/core/policy/broker_simpler_units_policy.py @@ -1075,7 +1075,7 @@ def ticket_inventory(self, *, reservation: ABCBrokerReservation, inv: InventoryF try: if operation == ReservationOperation.Extend: rset = reservation.get_resources() - duration = term.get_length() + duration = Term.delta(reservation.get_term().get_end_time(), term.get_end_time()) else: rset = reservation.get_requested_resources() duration = term.get_full_length() diff --git a/fabric_cf/actor/core/util/quota_mgr.py b/fabric_cf/actor/core/util/quota_mgr.py index db5d04db..3e49a247 100644 --- a/fabric_cf/actor/core/util/quota_mgr.py +++ b/fabric_cf/actor/core/util/quota_mgr.py @@ -95,9 +95,7 @@ def update_quota(self, reservation: ABCReservationMixin, duration: float): if duration < 60: return - duration /= 3600000 existing_quotas = self.list_quotas(project_uuid=project_id) - sliver_quota_usage = self.extract_quota_usage(sliver=sliver, duration=duration) self.logger.debug(f"Existing: {existing_quotas}") @@ -108,11 +106,8 @@ def update_quota(self, reservation: ABCReservationMixin, duration: float): existing = existing_quotas.get(quota_key) usage = 0 self.logger.debug(f"Quota update requested for: prj:{project_id} quota_key:{quota_key}: quota: {existing}") - print( - f"Quota update requested for: prj:{project_id} quota_key:{quota_key}: quota: {existing}") if not existing: self.logger.debug("Existing not found so skipping!") - print("Existing not found so skipping!") continue # Return resource hours for a sliver deleted before expiry @@ -134,12 +129,14 @@ def extract_quota_usage(sliver: NodeSliver, duration: float) -> dict[tuple[str, Extract resource usage details from a given sliver. @param sliver: The sliver object representing allocated resources. - @param duration: Duration in hours for which resources are requested. + @param duration: Duration in ms for which resources are requested. @return: A dictionary mapping resource type/unit pairs to usage amounts. """ unit = "HOURS".lower() requested_resources = {} + duration /= 3600000 + # Check if the sliver is a NodeSliver if not isinstance(sliver, NodeSliver): return requested_resources @@ -175,7 +172,7 @@ def enforce_quota_limits(self, reservation: ABCReservationMixin, duration: float Verify whether a reservation's requested resources fit within the project's quota limits. @param reservation: The reservation to check against available quotas. - @param duration: Duration in hours for the reservation request. + @param duration: Duration in ms for the reservation request. @return: Tuple (True, None) if within limits, (False, message) if quota is exceeded. @throws: Exception if an error occurs during database interaction. """ diff --git a/fabric_cf/broker/test/test.yaml b/fabric_cf/broker/test/test.yaml index db331edb..6de9624c 100644 --- a/fabric_cf/broker/test/test.yaml +++ b/fabric_cf/broker/test/test.yaml @@ -74,7 +74,7 @@ logging: core_api: host: https://alpha-6.fabric-testbed.net - token: + token: enable: True oauth: diff --git a/pyproject.toml b/pyproject.toml index c699538f..9ad62b1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ dependencies = [ "fabric_fss_utils==1.6.0", "fabric-message-bus==1.7.0", "fabric-fim==1.8.1", - "fabrictestbed==1.8.2b1", + "fabrictestbed==1.8.2", "ansible" ] From 375bcb417d93e1d812b69ab18574347b58384f81 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Thu, 13 Feb 2025 15:21:28 -0500 Subject: [PATCH 29/29] change type for limit and used to float --- fabric_cf/actor/core/util/quota_mgr.py | 83 ++++++++++++-------------- 1 file changed, 38 insertions(+), 45 deletions(-) diff --git a/fabric_cf/actor/core/util/quota_mgr.py b/fabric_cf/actor/core/util/quota_mgr.py index 3e49a247..da2982d4 100644 --- a/fabric_cf/actor/core/util/quota_mgr.py +++ b/fabric_cf/actor/core/util/quota_mgr.py @@ -24,6 +24,7 @@ # # Author: Komal Thareja (kthare10@renci.org) import logging +import threading from typing import Any from fabrictestbed.external_api.core_api import CoreApi @@ -48,6 +49,7 @@ def __init__(self, *, core_api_host: str, token: str, logger: logging.Logger): """ self.core_api = CoreApi(core_api_host=core_api_host, token=token) self.logger = logger + self.lock = threading.Lock() def list_quotas(self, project_uuid: str, offset: int = 0, limit: int = 200) -> dict[tuple[str, str], dict]: """ @@ -66,62 +68,53 @@ def list_quotas(self, project_uuid: str, offset: int = 0, limit: int = 200) -> d def update_quota(self, reservation: ABCReservationMixin, duration: float): """ - Update the quota usage based on a reservation. + Update the quota usage for a given reservation. @param reservation: Reservation object containing resource usage details. - @param duration: Duration in seconds for which the reservation was held. + @param duration: Duration in milliseconds for which the reservation was held. """ try: - slice_object = reservation.get_slice() - if not slice_object: - return - project_id = slice_object.get_project_id() - if not project_id: - return + with self.lock: # Locking critical section + self.logger.debug("Acquired lock for quota update.") - sliver = InventoryForType.get_allocated_sliver(reservation=reservation) - ''' - from fabric_cf.actor.core.kernel.reservation_client import ReservationClient - if isinstance(reservation, ReservationClient) and reservation.get_leased_resources() and \ - reservation.get_leased_resources().get_sliver(): - sliver = reservation.get_leased_resources().get_sliver() - if not sliver and reservation.get_resources() and reservation.get_resources().get_sliver(): - sliver = reservation.get_resources().get_sliver() - ''' + slice_object = reservation.get_slice() + if not slice_object: + return + project_id = slice_object.get_project_id() + if not project_id: + return - if not sliver: - return + sliver = InventoryForType.get_allocated_sliver(reservation=reservation) + if not sliver: + return - if duration < 60: - return + if duration < 60: + return - existing_quotas = self.list_quotas(project_uuid=project_id) - sliver_quota_usage = self.extract_quota_usage(sliver=sliver, duration=duration) + existing_quotas = self.list_quotas(project_uuid=project_id) - self.logger.debug(f"Existing: {existing_quotas}") - self.logger.debug(f"Updated by: {sliver_quota_usage}") + sliver_quota_usage = self.extract_quota_usage(sliver=sliver, duration=duration) + + self.logger.debug(f"Existing: {existing_quotas}") + self.logger.debug(f"Updated by: {sliver_quota_usage}") + + # Check each accumulated resource usage against its quota + for quota_key, total_duration in sliver_quota_usage.items(): + existing = existing_quotas.get(quota_key) + if not existing: + self.logger.debug("Existing not found, skipping!") + continue + + # Return resource hours for a sliver deleted before expiry + usage = -total_duration if reservation.is_closing() or reservation.is_closed() else total_duration + + self.core_api.update_quota_usage(uuid=existing.get("uuid"), project_uuid=project_id, + quota_used=usage) - # Check each accumulated resource usage against its quota - for quota_key, total_duration in sliver_quota_usage.items(): - existing = existing_quotas.get(quota_key) - usage = 0 - self.logger.debug(f"Quota update requested for: prj:{project_id} quota_key:{quota_key}: quota: {existing}") - if not existing: - self.logger.debug("Existing not found so skipping!") - continue - - # Return resource hours for a sliver deleted before expiry - if reservation.is_closing() or reservation.is_closed(): - usage -= total_duration - # Account for resource hours used for a new or extended sliver - else: - usage += total_duration - - self.core_api.update_quota_usage(uuid=existing.get("uuid"), project_uuid=project_id, quota_used=usage) except Exception as e: self.logger.error(f"Failed to update Quota: {e}") finally: - self.logger.debug("done") + self.logger.debug("Released lock for quota update.") @staticmethod def extract_quota_usage(sliver: NodeSliver, duration: float) -> dict[tuple[str, str], float]: @@ -129,7 +122,7 @@ def extract_quota_usage(sliver: NodeSliver, duration: float) -> dict[tuple[str, Extract resource usage details from a given sliver. @param sliver: The sliver object representing allocated resources. - @param duration: Duration in ms for which resources are requested. + @param duration: Duration in milliseconds for which resources are requested. @return: A dictionary mapping resource type/unit pairs to usage amounts. """ unit = "HOURS".lower() @@ -172,7 +165,7 @@ def enforce_quota_limits(self, reservation: ABCReservationMixin, duration: float Verify whether a reservation's requested resources fit within the project's quota limits. @param reservation: The reservation to check against available quotas. - @param duration: Duration in ms for the reservation request. + @param duration: Duration in milliseconds for the reservation request. @return: Tuple (True, None) if within limits, (False, message) if quota is exceeded. @throws: Exception if an error occurs during database interaction. """