Skip to content

Commit 5df6bc8

Browse files
authored
Merge pull request #401 from fabric-testbed/400-broker-vm-allocation-policy
400 broker vm allocation policy
2 parents 72466ab + d8546e7 commit 5df6bc8

File tree

5 files changed

+99
-4
lines changed

5 files changed

+99
-4
lines changed

fabric_cf/actor/core/common/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ class Constants:
304304

305305
USER_SSH_KEY = "user.ssh.key"
306306
ALGORITHM = 'algorithm'
307+
CORE_CAPACITY_THRESHOLD = "core_capacity_threshold"
307308

308309
# Orchestrator Lease params
309310
TWO_WEEKS = timedelta(days=15)

fabric_cf/actor/core/kernel/reservation_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1004,7 +1004,8 @@ def probe_join_state(self):
10041004

10051005
# This is a regular request for modifying network resources to an upstream broker.
10061006
self.sequence_ticket_out += 1
1007-
print(f"Issuing an extend ticket {sliver_to_str(sliver=self.get_requested_resources().get_sliver())}")
1007+
self.logger.debug(f"Issuing an extend ticket "
1008+
f"{sliver_to_str(sliver=self.get_requested_resources().get_sliver())}")
10081009
RPCManagerSingleton.get().extend_ticket(reservation=self)
10091010

10101011
# Update ASM with Reservation Info

fabric_cf/actor/core/policy/broker_simpler_units_policy.py

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
from fim.pluggable import PluggableRegistry, PluggableType
3939
from fim.slivers.attached_components import ComponentSliver, ComponentType
4040
from fim.slivers.base_sliver import BaseSliver
41-
from fim.slivers.capacities_labels import Labels
41+
from fim.slivers.capacities_labels import Labels, Capacities
4242
from fim.slivers.interface_info import InterfaceSliver, InterfaceType
4343
from fim.slivers.network_node import NodeSliver, NodeType
4444
from fim.slivers.network_service import NetworkServiceSliver, ServiceType, NSLayer
@@ -516,7 +516,6 @@ def __candidate_nodes(self, *, sliver: NodeSliver) -> List[str]:
516516
props=node_props,
517517
comps=sliver.attached_components_info)
518518

519-
# Skip nodes without any delegations which would be data-switch in this case
520519
if sliver.get_type() == NodeType.Switch:
521520
exclude = []
522521
for n in result:
@@ -558,6 +557,45 @@ def __prune_nodes_in_maintenance(self, node_id_list: List[str], site: str, reser
558557

559558
return node_id_list
560559

560+
def __reshuffle_nodes(self, node_id_list: List[str], node_id_to_reservations: dict,
561+
term: Term) -> List[str]:
562+
"""
563+
Reshuffles nodes based on their usage compared to a given threshold.
564+
565+
@param: node_id_list (list): List of node_ids
566+
567+
@return: list: Reshuffled list of nodes, with nodes exceeding the threshold shuffled separately.
568+
"""
569+
if len(node_id_list) == 1:
570+
return node_id_list
571+
572+
enabled, threshold = self.get_core_capacity_threshold()
573+
if not enabled:
574+
return node_id_list
575+
576+
# Separate nodes based on whether their usage exceeds the threshold
577+
above_threshold = []
578+
below_threshold = []
579+
580+
for node_id in node_id_list:
581+
node, total, allocated = self.get_node_capacities(node_id=node_id,
582+
node_id_to_reservations=node_id_to_reservations,
583+
term=term)
584+
if total and allocated:
585+
self.logger.debug(f"Allocated: {allocated} Total: {total}")
586+
cpu_usage_percent = int(((allocated.core * 100)/ total.core))
587+
self.logger.debug(f"CPU Usage for {node.get_name()}: {cpu_usage_percent}; "
588+
f"threshold: {threshold}")
589+
if cpu_usage_percent < threshold:
590+
below_threshold.append(node_id)
591+
else:
592+
above_threshold.append(node_id)
593+
594+
# Combine both shuffled lists (you can choose the order of combining)
595+
reshuffled_nodes = below_threshold + above_threshold
596+
597+
return reshuffled_nodes
598+
561599
def __find_first_fit(self, node_id_list: List[str], node_id_to_reservations: dict, inv: NetworkNodeInventory,
562600
reservation: ABCBrokerReservation, term: Term, sliver: NodeSliver,
563601
operation: ReservationOperation = ReservationOperation.Create) -> Tuple[str, BaseSliver, Any]:
@@ -634,6 +672,13 @@ def __allocate_nodes(self, *, reservation: ABCBrokerReservation, inv: NetworkNod
634672
node_id_list = self.__candidate_nodes(sliver=sliver)
635673
if self.get_algorithm_type(site=sliver.site) == BrokerAllocationAlgorithm.Random:
636674
random.shuffle(node_id_list)
675+
else:
676+
# Reshuffle Nodes based on CPU Threshold only for VMs when no specific host is specified
677+
if sliver.get_type() == NodeType.VM and (sliver.labels is None or
678+
(sliver.labels and sliver.labels.instance_parent is None)):
679+
node_id_list = self.__reshuffle_nodes(node_id_list=node_id_list,
680+
node_id_to_reservations=node_id_to_reservations,
681+
term=term)
637682

638683
if len(node_id_list) == 0 and sliver.site not in self.combined_broker_model.get_sites():
639684
error_msg = f'Unknown site {sliver.site} requested for {reservation}'
@@ -1685,6 +1730,52 @@ def get_algorithm_type(self, site: str) -> BrokerAllocationAlgorithm:
16851730
return BrokerAllocationAlgorithm.FirstFit
16861731
return BrokerAllocationAlgorithm.FirstFit
16871732

1733+
def get_core_capacity_threshold(self) -> Tuple[bool, int]:
1734+
if self.properties is not None:
1735+
core_capacity_threshold = self.properties.get(Constants.CORE_CAPACITY_THRESHOLD, None)
1736+
if core_capacity_threshold and core_capacity_threshold.get('enabled'):
1737+
core_usage_threshold_percent = core_capacity_threshold.get('core_usage_threshold_percent', 75)
1738+
return True, core_usage_threshold_percent
1739+
return False, 0
1740+
1741+
def get_node_capacities(self, node_id: str, node_id_to_reservations: dict,
1742+
term: Term) -> Tuple[NodeSliver, Capacities, Capacities]:
1743+
"""
1744+
Get Node capacities - total as well as allocated capacities
1745+
@param node_id: Node Id
1746+
@param node_id_to_reservations: Reservations assigned as part of this bid
1747+
@param term: Term
1748+
@return: Tuple containing node, total and allocated capacity
1749+
"""
1750+
try:
1751+
graph_node = self.get_network_node_from_graph(node_id=node_id)
1752+
existing_reservations = self.get_existing_reservations(node_id=node_id,
1753+
node_id_to_reservations=node_id_to_reservations,
1754+
start=term.get_start_time(),
1755+
end=term.get_end_time())
1756+
1757+
delegation_id, delegated_capacity = NetworkNodeInventory.get_delegations(
1758+
lab_cap_delegations=graph_node.get_capacity_delegations())
1759+
1760+
allocated_capacity = Capacities()
1761+
1762+
if existing_reservations:
1763+
for reservation in existing_reservations:
1764+
# For Active or Ticketed or Ticketing reservations; reduce the counts from available
1765+
resource_sliver = None
1766+
if reservation.is_ticketing() and reservation.get_approved_resources() is not None:
1767+
resource_sliver = reservation.get_approved_resources().get_sliver()
1768+
1769+
if (reservation.is_active() or reservation.is_ticketed()) and \
1770+
reservation.get_resources() is not None:
1771+
resource_sliver = reservation.get_resources().get_sliver()
1772+
1773+
if resource_sliver is not None and isinstance(resource_sliver, NodeSliver):
1774+
allocated_capacity += resource_sliver.get_capacity_allocations()
1775+
1776+
return graph_node, delegated_capacity, allocated_capacity
1777+
except Exception as e:
1778+
self.logger.error(f"Failed to determine node capacities: {node_id}, error: {e}")
16881779

16891780
if __name__ == '__main__':
16901781
policy = BrokerSimplerUnitsPolicy()

fabric_cf/actor/core/policy/network_node_inventory.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,6 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent
445445

446446
for c in comps_to_remove:
447447
self.logger.debug(f"Excluding component: {c.get_name()}")
448-
print(f"Excluding component: {c.get_name()}")
449448
graph_node.attached_components_info.remove_device(name=c.get_name())
450449

451450
self.logger.debug(f"requested_components: {requested_components.devices.values()} for reservation# {rid}")

fabric_cf/broker/config.broker.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ actor:
139139
module: fabric_cf.actor.core.policy.broker_simpler_units_policy
140140
class: BrokerSimplerUnitsPolicy
141141
properties:
142+
core_capacity_threshold:
143+
enabled: true
144+
core_usage_threshold_percent: 75
142145
algorithm:
143146
FirstFit: # Default policy for all sites
144147
enabled: true

0 commit comments

Comments
 (0)