Skip to content

400 broker vm allocation policy #401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fabric_cf/actor/core/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ class Constants:

USER_SSH_KEY = "user.ssh.key"
ALGORITHM = 'algorithm'
CORE_CAPACITY_THRESHOLD = "core_capacity_threshold"

# Orchestrator Lease params
TWO_WEEKS = timedelta(days=15)
Expand Down
3 changes: 2 additions & 1 deletion fabric_cf/actor/core/kernel/reservation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,8 @@ def probe_join_state(self):

# This is a regular request for modifying network resources to an upstream broker.
self.sequence_ticket_out += 1
print(f"Issuing an extend ticket {sliver_to_str(sliver=self.get_requested_resources().get_sliver())}")
self.logger.debug(f"Issuing an extend ticket "
f"{sliver_to_str(sliver=self.get_requested_resources().get_sliver())}")
RPCManagerSingleton.get().extend_ticket(reservation=self)

# Update ASM with Reservation Info
Expand Down
95 changes: 93 additions & 2 deletions fabric_cf/actor/core/policy/broker_simpler_units_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from fim.pluggable import PluggableRegistry, PluggableType
from fim.slivers.attached_components import ComponentSliver, ComponentType
from fim.slivers.base_sliver import BaseSliver
from fim.slivers.capacities_labels import Labels
from fim.slivers.capacities_labels import Labels, Capacities
from fim.slivers.interface_info import InterfaceSliver, InterfaceType
from fim.slivers.network_node import NodeSliver, NodeType
from fim.slivers.network_service import NetworkServiceSliver, ServiceType, NSLayer
Expand Down Expand Up @@ -516,7 +516,6 @@ def __candidate_nodes(self, *, sliver: NodeSliver) -> List[str]:
props=node_props,
comps=sliver.attached_components_info)

# Skip nodes without any delegations which would be data-switch in this case
if sliver.get_type() == NodeType.Switch:
exclude = []
for n in result:
Expand Down Expand Up @@ -558,6 +557,45 @@ def __prune_nodes_in_maintenance(self, node_id_list: List[str], site: str, reser

return node_id_list

def __reshuffle_nodes(self, node_id_list: List[str], node_id_to_reservations: dict,
term: Term) -> List[str]:
"""
Reshuffles nodes based on their usage compared to a given threshold.

@param: node_id_list (list): List of node_ids

@return: list: Reshuffled list of nodes, with nodes exceeding the threshold shuffled separately.
"""
if len(node_id_list) == 1:
return node_id_list

enabled, threshold = self.get_core_capacity_threshold()
if not enabled:
return node_id_list

# Separate nodes based on whether their usage exceeds the threshold
above_threshold = []
below_threshold = []

for node_id in node_id_list:
node, total, allocated = self.get_node_capacities(node_id=node_id,
node_id_to_reservations=node_id_to_reservations,
term=term)
if total and allocated:
self.logger.debug(f"Allocated: {allocated} Total: {total}")
cpu_usage_percent = int(((allocated.core * 100)/ total.core))
self.logger.debug(f"CPU Usage for {node.get_name()}: {cpu_usage_percent}; "
f"threshold: {threshold}")
if cpu_usage_percent < threshold:
below_threshold.append(node_id)
else:
above_threshold.append(node_id)

# Combine both shuffled lists (you can choose the order of combining)
reshuffled_nodes = below_threshold + above_threshold

return reshuffled_nodes

def __find_first_fit(self, node_id_list: List[str], node_id_to_reservations: dict, inv: NetworkNodeInventory,
reservation: ABCBrokerReservation, term: Term, sliver: NodeSliver,
operation: ReservationOperation = ReservationOperation.Create) -> Tuple[str, BaseSliver, Any]:
Expand Down Expand Up @@ -634,6 +672,13 @@ def __allocate_nodes(self, *, reservation: ABCBrokerReservation, inv: NetworkNod
node_id_list = self.__candidate_nodes(sliver=sliver)
if self.get_algorithm_type(site=sliver.site) == BrokerAllocationAlgorithm.Random:
random.shuffle(node_id_list)
else:
# Reshuffle Nodes based on CPU Threshold only for VMs when no specific host is specified
if sliver.get_type() == NodeType.VM and (sliver.labels is None or
(sliver.labels and sliver.labels.instance_parent is None)):
node_id_list = self.__reshuffle_nodes(node_id_list=node_id_list,
node_id_to_reservations=node_id_to_reservations,
term=term)

if len(node_id_list) == 0 and sliver.site not in self.combined_broker_model.get_sites():
error_msg = f'Unknown site {sliver.site} requested for {reservation}'
Expand Down Expand Up @@ -1685,6 +1730,52 @@ def get_algorithm_type(self, site: str) -> BrokerAllocationAlgorithm:
return BrokerAllocationAlgorithm.FirstFit
return BrokerAllocationAlgorithm.FirstFit

def get_core_capacity_threshold(self) -> Tuple[bool, int]:
if self.properties is not None:
core_capacity_threshold = self.properties.get(Constants.CORE_CAPACITY_THRESHOLD, None)
if core_capacity_threshold and core_capacity_threshold.get('enabled'):
core_usage_threshold_percent = core_capacity_threshold.get('core_usage_threshold_percent', 75)
return True, core_usage_threshold_percent
return False, 0

def get_node_capacities(self, node_id: str, node_id_to_reservations: dict,
term: Term) -> Tuple[NodeSliver, Capacities, Capacities]:
"""
Get Node capacities - total as well as allocated capacities
@param node_id: Node Id
@param node_id_to_reservations: Reservations assigned as part of this bid
@param term: Term
@return: Tuple containing node, total and allocated capacity
"""
try:
graph_node = self.get_network_node_from_graph(node_id=node_id)
existing_reservations = self.get_existing_reservations(node_id=node_id,
node_id_to_reservations=node_id_to_reservations,
start=term.get_start_time(),
end=term.get_end_time())

delegation_id, delegated_capacity = NetworkNodeInventory.get_delegations(
lab_cap_delegations=graph_node.get_capacity_delegations())

allocated_capacity = Capacities()

if existing_reservations:
for reservation in existing_reservations:
# For Active or Ticketed or Ticketing reservations; reduce the counts from available
resource_sliver = None
if reservation.is_ticketing() and reservation.get_approved_resources() is not None:
resource_sliver = reservation.get_approved_resources().get_sliver()

if (reservation.is_active() or reservation.is_ticketed()) and \
reservation.get_resources() is not None:
resource_sliver = reservation.get_resources().get_sliver()

if resource_sliver is not None and isinstance(resource_sliver, NodeSliver):
allocated_capacity += resource_sliver.get_capacity_allocations()

return graph_node, delegated_capacity, allocated_capacity
except Exception as e:
self.logger.error(f"Failed to determine node capacities: {node_id}, error: {e}")

if __name__ == '__main__':
policy = BrokerSimplerUnitsPolicy()
1 change: 0 additions & 1 deletion fabric_cf/actor/core/policy/network_node_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent

for c in comps_to_remove:
self.logger.debug(f"Excluding component: {c.get_name()}")
print(f"Excluding component: {c.get_name()}")
graph_node.attached_components_info.remove_device(name=c.get_name())

self.logger.debug(f"requested_components: {requested_components.devices.values()} for reservation# {rid}")
Expand Down
3 changes: 3 additions & 0 deletions fabric_cf/broker/config.broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ actor:
module: fabric_cf.actor.core.policy.broker_simpler_units_policy
class: BrokerSimplerUnitsPolicy
properties:
core_capacity_threshold:
enabled: true
core_usage_threshold_percent: 75
algorithm:
FirstFit: # Default policy for all sites
enabled: true
Expand Down