Skip to content
This repository has been archived by the owner on May 14, 2024. It is now read-only.

Commit

Permalink
Rework Update Status to use all sources of surplus SUs to cover usage (
Browse files Browse the repository at this point in the history
…#333)

* make sure the current date and the correct format are being passed into sreport

* Increase logging output of update status, WIP investment withdrawl for usage exceeding limits

* Use function for adding investments to account in test setup

* rework update status to iterate over surplus SU sources, cleaning up some of the conditional logic

* Include tests for managing multiple disbursements of investments and multiple investments

---------

Co-authored-by: Nickolas Comeau <[email protected]>
  • Loading branch information
Comeani and Comeani committed Sep 27, 2023
1 parent 0736da3 commit e0c1d36
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 78 deletions.
165 changes: 99 additions & 66 deletions bank/account_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ def _build_usage_table(self) -> PrettyTable:

cluster_name = str.upper(allocation.cluster_name)

# Determine whether or not there is a floating allocation
# Determine whether there is a floating allocation
if allocation.cluster_name == 'all_clusters':
floating_su_usage = allocation.service_units_used
floating_su_total = allocation.service_units_total
Expand Down Expand Up @@ -940,32 +940,34 @@ def update_status(self) -> None:
"""

# Update status runs daily
end_date = date.today()
start_date = end_date - relativedelta(days=1)

slurm_acct = SlurmAccount(self._account_name)
total_usage = slurm_acct.get_cluster_usage_total(start=start_date, end=end_date, in_hours=True)

# Initialize usage to SUs used over the last day
total_usage_exceeding_limits = slurm_acct.get_cluster_usage_total(start=start_date, end=end_date, in_hours=True)

with DBConnection.session() as session:

proposal = None
investment = None
investments = None
floating_alloc = None
lock_clusters = []

# Default to locking on all clusters
lock_clusters = Slurm.cluster_names()

# Gather the account's active proposal and investments if they exist
proposal = session.execute(self._active_proposal_query).scalars().first()
# TODO: utilize all investments (scalars().all())
investment = session.execute(self._active_investment_query).scalars().first()

# Update proposal or investment usage
investments = session.execute(self._active_investment_query).scalars().all()

# Attempt to cover usage with active proposal
# Update proposal usage to reflect sreport output
if proposal:

total_usage_exceeding_limits = 0
lock_clusters = []

# Update within cluster usage
# Update within-cluster usage, building a list of clusters to potentially lock on
for alloc in proposal.allocations:
if alloc.cluster_name == 'all_clusters':
floating_alloc = alloc
Expand All @@ -983,68 +985,76 @@ def update_status(self) -> None:
alloc.service_units_used = alloc.service_units_total
lock_clusters.append(alloc.cluster_name)

# Handle usage exceeding limits
if total_usage_exceeding_limits > 0:

# Attempt to cover with floating SUs
if floating_alloc:

floating_sus = floating_alloc.service_units_total - floating_alloc.service_units_used
if total_usage_exceeding_limits > 0:
# Gather sources of surplus SUs
sources = []
if floating_alloc:
sources.append(floating_alloc)
if investments:
for investment in investments:
sources.append(investment)

for source in sources:
# Apply Floating Allocation
if type(source) == Allocation:
floating_sus = source.service_units_total - source.service_units_used
floating_sus_remaining = floating_sus - total_usage_exceeding_limits

# Floating SUs alone cannot cover, try using investment SUs
if floating_sus_remaining < 0 and investment:
investment.current_sus += floating_sus_remaining
floating_alloc.service_units_used = floating_alloc.service_units_total
if investment.current_sus < 0:
# TODO: Implement check for other investments + withdraw and cover
investment.current_sus = 0
LOG.info(f"Locked {self._account_name} on all clusters due to insufficient floating "
f"or investment SUs to cover usage")
self.lock(clusters=lock_clusters)
else:
LOG.debug(f"Using investment SUs to cover usage for {self._account_name} on "
f" {lock_clusters}")

elif floating_sus_remaining < 0:
floating_alloc.service_units_used = floating_alloc.service_units_total
self.lock(clusters=lock_clusters)

else:
floating_alloc.service_units_used += total_usage_exceeding_limits
# Floating SUs can cover usage
if floating_sus_remaining >= 0:
# Do not lock on any cluster, update floating SUs used
lock_clusters = []
source.service_units_used += total_usage_exceeding_limits
total_usage_exceeding_limits = 0
LOG.debug(f"Using floating SUs to cover usage for {self._account_name} on {lock_clusters}")
break

# No floating alloc, attempt to cover with investment SUs
elif investment:
investment.current_sus -= total_usage_exceeding_limits
if investment.current_sus < 0:
# TODO: Implement check for other investments + withdraw and cover
investment.current_sus = 0
LOG.info(f"Locked {self._account_name} on {lock_clusters} due to insufficient investment")
self.lock(clusters=lock_clusters)
# Floating SUs can not cover usage
else:
LOG.debug(f"Using investment SUs to cover usage for {self._account_name} on "
f" {lock_clusters}")
# Exhaust floating SUs and continue to next source
total_usage_exceeding_limits -= floating_sus
floating_alloc.service_units_used = floating_alloc.service_units_total
continue

# Apply Investment SUs
else:
self.lock(clusters=lock_clusters)
LOG.info(f"Locked {self._account_name} on {lock_clusters}, usage could not be covered by "
f"floating or investment SUs")

# Investment covers all usage
elif not proposal and investment:
investment.current_sus -= total_usage
if investment.current_sus > 0:
LOG.debug(f"Using investment service units to cover usage with no active proposal "
f" for {self._account_name}")
else:
investment.current_sus = 0
self.lock(all_clusters=True)
LOG.info(f"Locked {self._account_name} on all clusters, no proposal and investment exhausted")

# No proposal or investment to cover usage
else:
self.lock(all_clusters=True)
LOG.info(f"Locked {self._account_name} on all clusters, no active proposal or investment")
# Check if the investment is expired, continue on to the next one if so
if source.is_expired:
continue

investment_sus_remaining = source.current_sus - total_usage_exceeding_limits
# Investment can not cover, attempt to withdraw remaining SUs in the investment
if investment_sus_remaining < 0:
total_usage_exceeding_limits -= source.current_sus

# TODO: This is the full amount, should it be less?
# Could use total divided by 5 to represent 5 year investment,
# while disbursement available and total_usage_exceeding_limits > 0,
# determine if usage covered like below.
source.current_sus = source.service_units - source.withdrawn_sus
source.withdrawn_sus = source.service_units

investment_sus_remaining = source.current_sus - total_usage_exceeding_limits

# Still can not cover after withdrawal
if investment_sus_remaining < 0:
total_usage_exceeding_limits -= source.current_sus
source.current_sus = 0
continue

if investment_sus_remaining >= 0:
source.current_sus -= total_usage_exceeding_limits
lock_clusters = []
total_usage_exceeding_limits = 0
LOG.debug(f"Using investment SUs to cover usage for {self._account_name} on {lock_clusters}")
break

# Lock if surplus SU sources could not cover usage exceeding proposal limits
if total_usage_exceeding_limits > 0:
LOG.info(f"Locked {self._account_name} on {lock_clusters} due to insufficient floating "
f"or investment SUs to cover usage")
self.lock(clusters=lock_clusters)

session.commit()

Expand Down Expand Up @@ -1161,31 +1171,54 @@ def find_unlocked_account_names(cls) -> dict:
"""

unlocked_accounts_by_cluster = {}
numclusters = len(Slurm.cluster_names())
cluster_progress = 0
for cluster in Slurm.cluster_names():
cluster_progress += 1
LOG.info(f"Gathering unlocked accounts on {cluster}")
unlocked_accounts_by_cluster[cluster] = set(cls._iter_accounts_by_lock_state(False, cluster))
LOG.info(f"Gathered unlocked accounts progress {cluster_progress}/{numclusters}")

return unlocked_accounts_by_cluster

@classmethod
def update_account_status(cls) -> None:
"""Update account usage information and lock any expired or overdrawn accounts"""

# Log start of update status
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
LOG.info(f"STARTING Update_status {now}")

# Gather all account names that are currently unlocked on some cluster
LOG.info(f"Gathering unlocked accounts...")
unlocked_accounts_by_cluster = cls.find_unlocked_account_names()

# Build set of account names that are unlocked on any cluster
account_names = set()
for name_set in unlocked_accounts_by_cluster.values():
account_names = account_names.union(name_set)

# Set up progress indicator for log
num_accounts = len(account_names)
progress = 0

# Update the status of any unlocked account
for name in account_names:
progress += 1
#TODO: maintain this whitelist in settings?
if name in ["root", "clcgenomics"]:
continue
try:
LOG.info(f"Updating status for {name}...")
account = AccountServices(name)
account.update_status()
except AccountNotFoundError:
LOG.info(f"SLURM Account does not exist for {name}")
continue

LOG.info(f"Update status: {progress}/{num_accounts} updated")

# Log end of update status
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
LOG.info(f"FINISHED Update_status {now}")

2 changes: 1 addition & 1 deletion bank/system/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def get_cluster_usage_per_user(self, cluster: str, start: date, end: date, in_ho
time = 'Seconds'

cmd = ShellCmd(f"sreport cluster AccountUtilizationByUser -Pn -T Billing -t {time} cluster={cluster} "
f"Account={self.account_name} start={start} end={end} format=Proper,Used")
f"Account={self.account_name} start={start.strftime('%Y-%m-%d')} end={end.strftime('%Y-%m-%d')} format=Proper,Used")

try:
account_total, *data = cmd.out.split('\n')
Expand Down
8 changes: 2 additions & 6 deletions tests/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def setUp(self) -> None:
"""Ensure there exists a user investment for the test user account"""

super().setUp()

investments = []
for i in range(3):
start = TODAY + ((i - 1) * timedelta(days=365))
Expand All @@ -137,10 +138,5 @@ def setUp(self) -> None:
withdrawn_sus=0,
rollover_sus=0
)
investments.append(inv)

with DBConnection.session() as session:
result = session.execute(select(Account).where(Account.name == settings.test_accounts[0]))
account = result.scalars().first()
account.investments.extend(investments)
session.commit()
add_investment_to_test_account(inv)
Loading

0 comments on commit e0c1d36

Please sign in to comment.