From e0c1d3612116157cdba15dcd87dc99be1e6177ce Mon Sep 17 00:00:00 2001 From: Nickolas Comeau Date: Wed, 27 Sep 2023 16:29:27 -0400 Subject: [PATCH] Rework Update Status to use all sources of surplus SUs to cover usage (#333) * make sure the current date and the correct format are being passed into sreport * Increase logging output of update status, WIP investment withdrawl for usage exceeding limits * Use function for adding investments to account in test setup * rework update status to iterate over surplus SU sources, cleaning up some of the conditional logic * Include tests for managing multiple disbursements of investments and multiple investments --------- Co-authored-by: Nickolas Comeau --- bank/account_logic.py | 165 ++++++++++++-------- bank/system/slurm.py | 2 +- tests/_utils.py | 8 +- tests/account_logic/test_AccountServices.py | 117 +++++++++++++- 4 files changed, 214 insertions(+), 78 deletions(-) diff --git a/bank/account_logic.py b/bank/account_logic.py index 4dfb969a..855fa02e 100644 --- a/bank/account_logic.py +++ b/bank/account_logic.py @@ -742,7 +742,7 @@ def _build_usage_table(self) -> PrettyTable: cluster_name = str.upper(allocation.cluster_name) - # Determine whether or not there is a floating allocation + # Determine whether there is a floating allocation if allocation.cluster_name == 'all_clusters': floating_su_usage = allocation.service_units_used floating_su_total = allocation.service_units_total @@ -940,32 +940,34 @@ def update_status(self) -> None: """ + # Update status runs daily end_date = date.today() start_date = end_date - relativedelta(days=1) slurm_acct = SlurmAccount(self._account_name) - total_usage = slurm_acct.get_cluster_usage_total(start=start_date, end=end_date, in_hours=True) + + # Initialize usage to SUs used over the last day + total_usage_exceeding_limits = slurm_acct.get_cluster_usage_total(start=start_date, end=end_date, in_hours=True) with DBConnection.session() as session: proposal = None - investment = None + investments = None floating_alloc = None - lock_clusters = [] + + # Default to locking on all clusters + lock_clusters = Slurm.cluster_names() # Gather the account's active proposal and investments if they exist proposal = session.execute(self._active_proposal_query).scalars().first() - # TODO: utilize all investments (scalars().all()) - investment = session.execute(self._active_investment_query).scalars().first() - - # Update proposal or investment usage + investments = session.execute(self._active_investment_query).scalars().all() - # Attempt to cover usage with active proposal + # Update proposal usage to reflect sreport output if proposal: - total_usage_exceeding_limits = 0 + lock_clusters = [] - # Update within cluster usage + # Update within-cluster usage, building a list of clusters to potentially lock on for alloc in proposal.allocations: if alloc.cluster_name == 'all_clusters': floating_alloc = alloc @@ -983,68 +985,76 @@ def update_status(self) -> None: alloc.service_units_used = alloc.service_units_total lock_clusters.append(alloc.cluster_name) - # Handle usage exceeding limits - if total_usage_exceeding_limits > 0: - - # Attempt to cover with floating SUs - if floating_alloc: - - floating_sus = floating_alloc.service_units_total - floating_alloc.service_units_used + if total_usage_exceeding_limits > 0: + # Gather sources of surplus SUs + sources = [] + if floating_alloc: + sources.append(floating_alloc) + if investments: + for investment in investments: + sources.append(investment) + + for source in sources: + # Apply Floating Allocation + if type(source) == Allocation: + floating_sus = source.service_units_total - source.service_units_used floating_sus_remaining = floating_sus - total_usage_exceeding_limits - # Floating SUs alone cannot cover, try using investment SUs - if floating_sus_remaining < 0 and investment: - investment.current_sus += floating_sus_remaining - floating_alloc.service_units_used = floating_alloc.service_units_total - if investment.current_sus < 0: - # TODO: Implement check for other investments + withdraw and cover - investment.current_sus = 0 - LOG.info(f"Locked {self._account_name} on all clusters due to insufficient floating " - f"or investment SUs to cover usage") - self.lock(clusters=lock_clusters) - else: - LOG.debug(f"Using investment SUs to cover usage for {self._account_name} on " - f" {lock_clusters}") - - elif floating_sus_remaining < 0: - floating_alloc.service_units_used = floating_alloc.service_units_total - self.lock(clusters=lock_clusters) - - else: - floating_alloc.service_units_used += total_usage_exceeding_limits + # Floating SUs can cover usage + if floating_sus_remaining >= 0: + # Do not lock on any cluster, update floating SUs used + lock_clusters = [] + source.service_units_used += total_usage_exceeding_limits + total_usage_exceeding_limits = 0 LOG.debug(f"Using floating SUs to cover usage for {self._account_name} on {lock_clusters}") + break - # No floating alloc, attempt to cover with investment SUs - elif investment: - investment.current_sus -= total_usage_exceeding_limits - if investment.current_sus < 0: - # TODO: Implement check for other investments + withdraw and cover - investment.current_sus = 0 - LOG.info(f"Locked {self._account_name} on {lock_clusters} due to insufficient investment") - self.lock(clusters=lock_clusters) + # Floating SUs can not cover usage else: - LOG.debug(f"Using investment SUs to cover usage for {self._account_name} on " - f" {lock_clusters}") + # Exhaust floating SUs and continue to next source + total_usage_exceeding_limits -= floating_sus + floating_alloc.service_units_used = floating_alloc.service_units_total + continue + + # Apply Investment SUs else: - self.lock(clusters=lock_clusters) - LOG.info(f"Locked {self._account_name} on {lock_clusters}, usage could not be covered by " - f"floating or investment SUs") - - # Investment covers all usage - elif not proposal and investment: - investment.current_sus -= total_usage - if investment.current_sus > 0: - LOG.debug(f"Using investment service units to cover usage with no active proposal " - f" for {self._account_name}") - else: - investment.current_sus = 0 - self.lock(all_clusters=True) - LOG.info(f"Locked {self._account_name} on all clusters, no proposal and investment exhausted") - # No proposal or investment to cover usage - else: - self.lock(all_clusters=True) - LOG.info(f"Locked {self._account_name} on all clusters, no active proposal or investment") + # Check if the investment is expired, continue on to the next one if so + if source.is_expired: + continue + + investment_sus_remaining = source.current_sus - total_usage_exceeding_limits + # Investment can not cover, attempt to withdraw remaining SUs in the investment + if investment_sus_remaining < 0: + total_usage_exceeding_limits -= source.current_sus + + # TODO: This is the full amount, should it be less? + # Could use total divided by 5 to represent 5 year investment, + # while disbursement available and total_usage_exceeding_limits > 0, + # determine if usage covered like below. + source.current_sus = source.service_units - source.withdrawn_sus + source.withdrawn_sus = source.service_units + + investment_sus_remaining = source.current_sus - total_usage_exceeding_limits + + # Still can not cover after withdrawal + if investment_sus_remaining < 0: + total_usage_exceeding_limits -= source.current_sus + source.current_sus = 0 + continue + + if investment_sus_remaining >= 0: + source.current_sus -= total_usage_exceeding_limits + lock_clusters = [] + total_usage_exceeding_limits = 0 + LOG.debug(f"Using investment SUs to cover usage for {self._account_name} on {lock_clusters}") + break + + # Lock if surplus SU sources could not cover usage exceeding proposal limits + if total_usage_exceeding_limits > 0: + LOG.info(f"Locked {self._account_name} on {lock_clusters} due to insufficient floating " + f"or investment SUs to cover usage") + self.lock(clusters=lock_clusters) session.commit() @@ -1161,8 +1171,13 @@ def find_unlocked_account_names(cls) -> dict: """ unlocked_accounts_by_cluster = {} + numclusters = len(Slurm.cluster_names()) + cluster_progress = 0 for cluster in Slurm.cluster_names(): + cluster_progress += 1 + LOG.info(f"Gathering unlocked accounts on {cluster}") unlocked_accounts_by_cluster[cluster] = set(cls._iter_accounts_by_lock_state(False, cluster)) + LOG.info(f"Gathered unlocked accounts progress {cluster_progress}/{numclusters}") return unlocked_accounts_by_cluster @@ -1170,7 +1185,12 @@ def find_unlocked_account_names(cls) -> dict: def update_account_status(cls) -> None: """Update account usage information and lock any expired or overdrawn accounts""" + # Log start of update status + now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + LOG.info(f"STARTING Update_status {now}") + # Gather all account names that are currently unlocked on some cluster + LOG.info(f"Gathering unlocked accounts...") unlocked_accounts_by_cluster = cls.find_unlocked_account_names() # Build set of account names that are unlocked on any cluster @@ -1178,14 +1198,27 @@ def update_account_status(cls) -> None: for name_set in unlocked_accounts_by_cluster.values(): account_names = account_names.union(name_set) + # Set up progress indicator for log + num_accounts = len(account_names) + progress = 0 + # Update the status of any unlocked account for name in account_names: + progress += 1 #TODO: maintain this whitelist in settings? if name in ["root", "clcgenomics"]: continue try: + LOG.info(f"Updating status for {name}...") account = AccountServices(name) account.update_status() except AccountNotFoundError: LOG.info(f"SLURM Account does not exist for {name}") continue + + LOG.info(f"Update status: {progress}/{num_accounts} updated") + + # Log end of update status + now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + LOG.info(f"FINISHED Update_status {now}") + diff --git a/bank/system/slurm.py b/bank/system/slurm.py index 60fbf991..cbaefed0 100644 --- a/bank/system/slurm.py +++ b/bank/system/slurm.py @@ -175,7 +175,7 @@ def get_cluster_usage_per_user(self, cluster: str, start: date, end: date, in_ho time = 'Seconds' cmd = ShellCmd(f"sreport cluster AccountUtilizationByUser -Pn -T Billing -t {time} cluster={cluster} " - f"Account={self.account_name} start={start} end={end} format=Proper,Used") + f"Account={self.account_name} start={start.strftime('%Y-%m-%d')} end={end.strftime('%Y-%m-%d')} format=Proper,Used") try: account_total, *data = cmd.out.split('\n') diff --git a/tests/_utils.py b/tests/_utils.py index 2e58360f..4905f9d2 100644 --- a/tests/_utils.py +++ b/tests/_utils.py @@ -124,6 +124,7 @@ def setUp(self) -> None: """Ensure there exists a user investment for the test user account""" super().setUp() + investments = [] for i in range(3): start = TODAY + ((i - 1) * timedelta(days=365)) @@ -137,10 +138,5 @@ def setUp(self) -> None: withdrawn_sus=0, rollover_sus=0 ) - investments.append(inv) - with DBConnection.session() as session: - result = session.execute(select(Account).where(Account.name == settings.test_accounts[0])) - account = result.scalars().first() - account.investments.extend(investments) - session.commit() + add_investment_to_test_account(inv) diff --git a/tests/account_logic/test_AccountServices.py b/tests/account_logic/test_AccountServices.py index d9354d02..b0b9b02e 100644 --- a/tests/account_logic/test_AccountServices.py +++ b/tests/account_logic/test_AccountServices.py @@ -7,9 +7,10 @@ from bank import settings from bank.account_logic import AccountServices -from bank.orm import Account, Allocation, DBConnection, Proposal +from bank.orm import Account, Allocation, DBConnection, Investment, Proposal from bank.system.slurm import SlurmAccount, Slurm -from tests._utils import active_proposal_query, active_investment_query, InvestmentSetup, ProposalSetup +from tests._utils import active_proposal_query, active_investment_query, add_investment_to_test_account, \ + InvestmentSetup, ProposalSetup class CalculatePercentage(TestCase): @@ -454,10 +455,12 @@ def test_status_unlocked_with_investment_sus_applied(self) -> None: self.assertEqual(900, investment.current_sus) - @skip('TODO: The withdraw/forward functionality has not been fully implemented in update_status yet.') - def test_status_unlocked_with_multiple_investments_sus_applied(self) -> None: + @patch.object(SlurmAccount, + "get_cluster_usage_per_user", + lambda self, cluster, start, end, in_hours: {'account1': 550, 'account2': 550}) + def test_status_unlocked_with_multiple_investments_disbursements_applied(self) -> None: """Test that update_status uses investment SUs to cover usage over limits, exhausting the first investment - and withdrawing from a future investment""" + and withdrawing from a future disbursement of the investment""" self.slurm_account.set_locked_state(False, cluster=settings.test_cluster) @@ -470,6 +473,8 @@ def test_status_unlocked_with_multiple_investments_sus_applied(self) -> None: # Investment is expired investment = session.execute(active_investment_query).scalars().first() investment.current_sus = 1000 + investment.withdrawn_sus = 1000 + investment.service_units = 2000 session.commit() @@ -483,3 +488,105 @@ def test_status_unlocked_with_multiple_investments_sus_applied(self) -> None: investment = session.execute(active_investment_query).scalars().first() self.assertEqual(900, investment.current_sus) + self.assertEqual(investment.withdrawn_sus, investment.service_units) + + @patch.object(SlurmAccount, + "get_cluster_usage_per_user", + lambda self, cluster, start, end, in_hours: {'account1': 550, 'account2': 550}) + def test_status_unlocked_with_multiple_investments_applied(self) -> None: + """Test that update_status uses investment SUs to cover usage over limits, exhausting the first investment + and utilizing another investment""" + + self.slurm_account.set_locked_state(False, cluster=settings.test_cluster) + + start = date.today() + end = start + (1 * timedelta(days=365)) + + inv = Investment( + start_date=start, + end_date=end, + service_units=1000, + current_sus=1000, + withdrawn_sus=1000, + rollover_sus=0 + ) + + add_investment_to_test_account(inv) + + with DBConnection.session() as session: + # Proposal is expired + proposal = session.execute(active_proposal_query).scalars().first() + proposal.allocations[0].service_units_total = 10_000 + proposal.allocations[0].service_units_used = 35_000 + + # Investment is expired + investments = session.execute(active_investment_query).scalars().all() + investments[0].current_sus = 1000 + investments[0].withdrawn_sus = 2000 + investments[0].service_units = 2000 + + session.commit() + + self.account.update_status() + + with DBConnection.session() as session: + # check that investment SUs were used to cover usage + investments = session.execute(active_investment_query).scalars().all() + + self.assertEqual(0, investments[0].current_sus) + self.assertEqual(investments[0].withdrawn_sus, investments[0].service_units) + self.assertEqual(investments[1].withdrawn_sus, investments[1].service_units) + self.assertEqual(900, investments[1].current_sus) + + # cluster should be unlocked due to exceeding usage being covered by investment + self.assertFalse(self.slurm_account.get_locked_state(cluster=settings.test_cluster)) + + @patch.object(SlurmAccount, + "get_cluster_usage_per_user", + lambda self, cluster, start, end, in_hours: {'account1': 550, 'account2': 550}) + def test_status_locked_with_multiple_sources_exhausted(self) -> None: + """Test that update_status attempts to use floating and investment SUs to cover usage over limits, + exhausting the floating SUs and investments""" + + self.slurm_account.set_locked_state(False, cluster=settings.test_cluster) + + start = date.today() + end = start + (1 * timedelta(days=365)) + + inv = Investment( + start_date=start, + end_date=end, + service_units=1000, + current_sus=1000, + withdrawn_sus=1000, + rollover_sus=0 + ) + + add_investment_to_test_account(inv) + + with DBConnection.session() as session: + # Proposal is expired + proposal = session.execute(active_proposal_query).scalars().first() + proposal.allocations[0].service_units_total = 10_000 + proposal.allocations[0].service_units_used = 36_000 + + # Investment is expired + investments = session.execute(active_investment_query).scalars().all() + investments[0].current_sus = 1000 + investments[0].withdrawn_sus = 2000 + investments[0].service_units = 2000 + + session.commit() + + self.account.update_status() + + with DBConnection.session() as session: + # check that investment SUs were used to cover usage + investments = session.execute(active_investment_query).scalars().all() + + self.assertEqual(0, investments[1].current_sus) + self.assertEqual(investments[1].withdrawn_sus, investments[1].service_units) + self.assertEqual(0, investments[0].current_sus) + + # cluster should be unlocked due to exceeding usage being covered by investment + self.assertTrue(self.slurm_account.get_locked_state(cluster=settings.test_cluster))