Skip to content

Renew slice with network services when VMs connected to network services cannot be extended #410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions fabric_cf/actor/core/kernel/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ def modify_lease(self, *, reservation: ABCReservationMixin):
finally:
reservation.unlock()

def extend_reservation(self, *, rid: ID, resources: ResourceSet, term: Term, dependencies: List[ABCReservationMixin]) -> int:
def extend_reservation(self, *, rid: ID, resources: ResourceSet, term: Term,
dependencies: List[ABCReservationMixin]) -> int:
"""
Extends the reservation with the given resources and term.
@param rid reservation identifier of reservation to extend
Expand All @@ -365,7 +366,9 @@ def extend_reservation(self, *, rid: ID, resources: ResourceSet, term: Term, dep
raise KernelException(Constants.PENDING_OPERATION_ERROR)

if isinstance(real, ReservationClient) and dependencies is not None:
real.redeem_predecessors.clear()
# Modify case - clear old dependencies
if resources.get_sliver() is not None:
real.redeem_predecessors.clear()
for d in dependencies:
real.add_redeem_predecessor(reservation=d)

Expand Down
122 changes: 94 additions & 28 deletions fabric_cf/actor/core/kernel/reservation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ def prepare_ticket(self, extend: bool = False):
parent_res = pred_state.get_reservation()

if Constants.PEERED in value1:
self.logger.debug(f"KOMAL --- Node MAP:{ifs.get_node_map()} Result: {result}")
self.logger.debug(f"Node MAP:{ifs.get_node_map()} Result: {result}")
if parent_res is not None:
ns_sliver = parent_res.get_resources().get_sliver()
# component_name contains => Peered:<peered ns id>:<peer ifs name>
Expand Down Expand Up @@ -553,6 +553,56 @@ def prepare_ticket(self, extend: bool = False):

self.logger.trace(f"Updated Network Res# {self.get_reservation_id()} {sliver}")

def approve_extend_ticket(self) -> Tuple[bool, bool]:
"""
ExtendTicket predicate: invoked internally to determine if the reservation
should be extended. This gives subclasses an opportunity sequence actions at the orchestrator side.

If false, the reservation enters a "BlockedTicket" sub-state until a subsequent approve_ticket returns true.
When true, the reservation can manipulate the current reservation's attributes to
facilitate ticketing from the broker. Note that approve_ticket may be polled multiple
times, and should be idempotent.

@return tuple (true if approved; false otherwise, list of failed predecessors in case of false)
"""
approved = True
rollback = False
failed_preds = []
for pred_state in self.redeem_predecessors.values():
pred_reservation = pred_state.get_reservation()
if pred_reservation is None:
self.logger.error(f"redeem predecessor reservation is null, ignoring it: {pred_reservation}")
continue

pred_reservation_term = pred_reservation.get_term()
if pred_reservation_term is None:
self.logger.error(f"redeem predecessor reservation term is null, ignoring it: {pred_reservation}")
continue

if pred_reservation.is_failed() or pred_reservation.is_closed() or pred_reservation.is_closing():
msg = f"redeem predecessor reservation# {pred_reservation.get_reservation_id()}"\
f" is in a terminal state, failing the reservation# {self.get_reservation_id()}"
self.logger.error(msg)

# In case of modify, roll back to the previous state and do not fail the reservation
if self.is_active() or self.is_active_joined() or self.is_active_ticketed():
failed_preds.append(str(pred_reservation.get_reservation_id()))
approved = False
break
else:
self.fail(message=msg)

if not (pred_reservation.is_ticketed() or pred_reservation.is_active()) or \
pred_reservation.is_extending_ticket():
approved = False
break

if self.get_approved_term().ends_after(date=pred_reservation_term.get_end_time()):
rollback = True
break

return approved, rollback

def approve_ticket(self, extend: bool = False) -> Tuple[bool, List[str]]:
"""
Ticket predicate: invoked internally to determine if the reservation
Expand Down Expand Up @@ -594,6 +644,17 @@ def approve_ticket(self, extend: bool = False) -> Tuple[bool, List[str]]:

return approved, failed_preds

def can_extend(self) -> bool:
ret_val = False
if self.get_type() is not None:
resource_type_str = str(self.get_type())
if resource_type_str in Constants.SUPPORTED_SERVICES_STR:
ret_val, rollback = self.approve_extend_ticket()
else:
ret_val = True

return ret_val

def can_ticket(self, extend: bool = False) -> bool:
ret_val = False
if self.get_type() is not None:
Expand Down Expand Up @@ -754,12 +815,19 @@ def extend_ticket(self, *, actor: ABCActorMixin):
self.error(err="Wrong state to initiate extend ticket: {}".format(ReservationStates(self.state).name))

# Extend Ticket is invoked by Probe; Check dependencies only in case of modify
if self.requested_resources.sliver is not None:
if not self.can_ticket(extend=True):
self.transition_with_join(prefix="Extend ticket blocked", state=self.state,
pending=self.pending_state, join_state=JoinState.BlockedExtendTicket)
self.logger.info("Reservation has to wait for the dependencies to be extended!")
return
# No new sliver is passed for renew and does not require dependency check
if self.requested_resources.sliver is not None and not self.can_ticket(extend=True):
self.transition_with_join(prefix="Extend ticket blocked", state=self.state,
pending=self.pending_state, join_state=JoinState.BlockedExtendTicket)
self.logger.info("Reservation has to wait for the dependencies to be extended!")
return
else:
if not self.can_extend():
self.transition_with_join(prefix="Extend ticket blocked", state=self.state,
pending=self.pending_state, join_state=JoinState.BlockedExtendTicket)
self.logger.info("Reservation has to wait for the dependencies to be extended!")
return

self.sequence_ticket_out += 1
RPCManagerSingleton.get().extend_ticket(reservation=self)
Expand Down Expand Up @@ -951,18 +1019,6 @@ def prepare_probe(self):
def prepare_redeem(self):
assert self.resources is not None
assert self.resources.sliver is not None
sliver = self.resources.sliver

'''
self.logger.info(f"Redeem prepared for Sliver: {sliver}")
if isinstance(sliver, NetworkServiceSliver) and sliver.interface_info is not None:
for ifs in sliver.interface_info.interfaces.values():
self.logger.info(f"Interface Sliver: {ifs}")

if isinstance(sliver, NodeSliver) and sliver.attached_components_info is not None:
for c in sliver.attached_components_info.devices.values():
self.logger.info(f"Component: {c}")
'''

def probe_join_state(self):
"""
Expand Down Expand Up @@ -996,17 +1052,27 @@ def probe_join_state(self):
# this is existing reservation, and the extend ticket is
# blocked for a predecessor: see if we can get it going now.
assert self.state == ReservationStates.Active
rollback = False
failed_preds = []

if self.requested_resources.sliver is not None:
status, failed_preds = self.approve_ticket(extend=True)
else:
status, rollback = self.approve_extend_ticket()

status, failed_preds = self.approve_ticket(extend=True)
if status:
self.transition_with_join(prefix="unblock ticket", state=self.state,
pending=self.pending_state, join_state=JoinState.NoJoin)
if not rollback:
self.transition_with_join(prefix="unblock ticket", state=self.state,
pending=self.pending_state, join_state=JoinState.NoJoin)

# This is a regular request for modifying network resources to an upstream broker.
self.sequence_ticket_out += 1
self.logger.debug(f"Issuing an extend ticket "
f"{sliver_to_str(sliver=self.get_requested_resources().get_sliver())}")
RPCManagerSingleton.get().extend_ticket(reservation=self)
# This is a regular request for modifying network resources to an upstream broker.
self.sequence_ticket_out += 1
self.logger.debug(f"Issuing an extend ticket "
f"{sliver_to_str(sliver=self.get_requested_resources().get_sliver())}")
RPCManagerSingleton.get().extend_ticket(reservation=self)
else:
self.transition_with_join(prefix="failed ticket update", state=self.state,
pending=ReservationPendingStates.None_, join_state=JoinState.NoJoin)

# Update ASM with Reservation Info
self.update_slice_graph(sliver=self.resources.sliver)
Expand Down Expand Up @@ -1555,7 +1621,7 @@ def validate_redeem(self):
if self.term is None:
self.error(err=Constants.NOT_SPECIFIED_PREFIX.format("term"))

def add_redeem_predecessor(self, *, reservation: ABCReservationMixin, filters: dict = None):
def add_redeem_predecessor(self, *, reservation: ABCReservationMixin, filters: dict = None, extend: bool = False):
if reservation.get_reservation_id() not in self.redeem_predecessors:
state = PredecessorState(reservation=reservation, filters=filters)
self.redeem_predecessors[reservation.get_reservation_id()] = state
Expand All @@ -1565,7 +1631,7 @@ def remove_redeem_predecessor(self, *, rid: ID):
self.redeem_predecessors.pop(rid)

def add_join_predecessor(self, *, predecessor: ABCReservationMixin, filters: dict = None):
if predecessor.get_reservation_id() not in self.redeem_predecessors:
if predecessor.get_reservation_id() not in self.join_predecessors:
state = PredecessorState(reservation=predecessor, filters=filters)
self.join_predecessors[predecessor.get_reservation_id()] = state

Expand Down
13 changes: 7 additions & 6 deletions fabric_cf/actor/core/policy/broker_simpler_units_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1221,12 +1221,13 @@ def issue_ticket(self, *, reservation: ABCBrokerReservation, units: int, rtype:
return reservation

def release(self, *, reservation):
duration = reservation.get_term().get_remaining_length()
if duration > 0:
from fabric_cf.actor.core.container.globals import GlobalsSingleton
if GlobalsSingleton.get().get_quota_mgr():
GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation,
duration=duration)
if reservation.get_term():
duration = reservation.get_term().get_remaining_length()
if duration > 0:
from fabric_cf.actor.core.container.globals import GlobalsSingleton
if GlobalsSingleton.get().get_quota_mgr():
GlobalsSingleton.get().get_quota_mgr().update_quota(reservation=reservation,
duration=duration)

if isinstance(reservation, ABCBrokerReservation):
self.logger.debug("Broker reservation")
Expand Down
40 changes: 29 additions & 11 deletions fabric_cf/actor/core/util/quota_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
#
# Author: Komal Thareja ([email protected])
import logging
import re
import threading
from typing import Any

from fabrictestbed.external_api.core_api import CoreApi
from fabrictestbed.slice_editor import InstanceCatalog
from fim.slivers.network_node import NodeSliver
from fim.user import NodeType

from fabric_cf.actor.core.apis.abc_reservation_mixin import ABCReservationMixin
from fabric_cf.actor.core.policy.inventory_for_type import InventoryForType
Expand Down Expand Up @@ -63,7 +65,7 @@ def list_quotas(self, project_uuid: str, offset: int = 0, limit: int = 200) -> d
quota_list = self.core_api.list_quotas(project_uuid=project_uuid, offset=offset, limit=limit)
quotas = {}
for q in quota_list:
quotas[(q.get("resource_type").lower(), q.get("resource_unit").lower())] = q
quotas[(q.get("resource_type").get("name").lower(), q.get("resource_unit").lower())] = q
return quotas

def update_quota(self, reservation: ABCReservationMixin, duration: float):
Expand Down Expand Up @@ -116,6 +118,15 @@ def update_quota(self, reservation: ABCReservationMixin, duration: float):
finally:
self.logger.debug("Released lock for quota update.")

@staticmethod
def __massage_name(name: str) -> str:
"""
Massage to make it python friendly
:param name:
:return:
"""
return re.sub(r'[ -]', '_', name)

@staticmethod
def extract_quota_usage(sliver: NodeSliver, duration: float) -> dict[tuple[str, str], float]:
"""
Expand All @@ -142,20 +153,26 @@ def extract_quota_usage(sliver: NodeSliver, duration: float) -> dict[tuple[str,
else:
allocations = sliver.get_capacities()

# Extract Core, Ram, Disk Hours
requested_resources[("core", unit)] = requested_resources.get(("core", unit), 0) + \
(duration * allocations.core)
requested_resources[("ram", unit)] = requested_resources.get(("ram", unit), 0) +\
(duration * allocations.ram)
requested_resources[("disk", unit)] = requested_resources.get(("disk", unit), 0) + \
(duration * allocations.disk)
if allocations:
# Extract Core, Ram, Disk Hours
requested_resources[("core", unit)] = requested_resources.get(("core", unit), 0) + \
(duration * allocations.core)
requested_resources[("ram", unit)] = requested_resources.get(("ram", unit), 0) +\
(duration * allocations.ram)
requested_resources[("disk", unit)] = requested_resources.get(("disk", unit), 0) + \
(duration * allocations.disk)

if sliver.get_type() == NodeType.Switch and allocations:
requested_resources["p4", unit] = requested_resources.get(("p4", unit), 0) + \
(duration * allocations.unit)

# Extract component hours (e.g., GPU, FPGA, SmartNIC)
if sliver.attached_components_info:
for c in sliver.attached_components_info.devices.values():
component_type = str(c.get_type()).lower()
requested_resources[(component_type, unit)] = (
requested_resources.get((component_type, unit), 0) + duration
type_model_name = '_'.join([QuotaMgr.__massage_name(str(c.get_type())),
QuotaMgr.__massage_name(str(c.get_model()))])
requested_resources[(type_model_name.lower(), unit)] = (
requested_resources.get((type_model_name.lower(), unit), 0) + duration
)

return requested_resources
Expand Down Expand Up @@ -207,3 +224,4 @@ def enforce_quota_limits(self, reservation: ABCReservationMixin, duration: float
return True, None
except Exception as e:
self.logger.error(f"Error while checking reservation: {str(e)}")
return False, None
7 changes: 6 additions & 1 deletion tools/db_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from logging.handlers import RotatingFileHandler

from fim.user import ServiceType
from fim.slivers.network_service import NetworkServiceSliver

from fabric_cf.actor.core.kernel.reservation_states import ReservationStates
from fim.graph.neo4j_property_graph import Neo4jGraphImporter, Neo4jPropertyGraph
Expand Down Expand Up @@ -160,8 +161,12 @@ def get_reservations(self, slice_id: str = None, res_id: str = None, email: str
print(f"REQ RES Sliver: {r.get_requested_resources()} {r.get_requested_resources().get_sliver()}")
print(f"APPR RES Sliver: {r.get_approved_resources()} {r.get_approved_resources().get_sliver()}")
from fabric_cf.actor.core.kernel.reservation_client import ReservationClient
if isinstance(r, ReservationClient):
if isinstance(r, ReservationClient) and r.get_leased_resources():
print(r.get_leased_resources().get_sliver())
sliver = r.get_resources().get_sliver()
if isinstance(sliver, NetworkServiceSliver):
from fabric_cf.actor.core.util.utils import sliver_to_str
print(sliver_to_str(sliver=sliver))
print()
else:
print(f"No reservations found: {res_list}")
Expand Down