diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index ac6f99d756e..c2b8198bf9d 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1492,6 +1492,130 @@ async def delete_prev_cancelled_job_group_cancellable_resources_records(db: Data ) +async def compact_job_group_cancellable_resources_records(db: Database): + keyfields = [ + 'batch_id', + 'update_id', + 'job_group_id', + 'inst_coll', + ] + + rowfields = [ + *keyfields, + 'token', + 'n_creating_cancellable_jobs', + 'n_ready_cancellable_jobs', + 'n_running_cancellable_jobs', + 'ready_cancellable_cores_mcpu', + 'running_cancellable_cores_mcpu', + ] + + @transaction(db) + async def compact(tx: Transaction, record: dict): + await tx.just_execute( + f"""\ +DELETE FROM job_group_inst_coll_cancellable_resources +WHERE {' AND '.join([f'{k} = %s' for k in keyfields])}; +""", + [record[k] for k in keyfields], + ) + + await tx.execute_insertone( + f"""\ +INSERT INTO job_group_inst_coll_cancellable_resources ({','.join(rowfields)}) +VALUES ({','.join(['%s' for _ in rowfields])}); +""", + [record[k] for k in rowfields], + ) + + keys = ','.join([f'R.{k}' for k in keyfields]) + targets = db.execute_and_fetchall( + f"""\ +SELECT R.* +FROM job_groups AS G +LEFT JOIN LATERAL ( + SELECT C.id FROM job_groups_cancelled AS C + INNER JOIN job_group_self_and_ancestors AS D + ON C.id = D.batch_id + AND C.job_group_id = D.ancestor_id + WHERE D.batch_id = G.batch_id + AND D.job_group_id = G.job_group_id +) AS C ON TRUE +INNER JOIN LATERAL ( + SELECT {keys} + , SUM(R.n_creating_cancellable_jobs) AS n_creating_cancellable_jobs + , SUM(R.n_ready_cancellable_jobs) AS n_ready_cancellable_jobs + , SUM(R.n_running_cancellable_jobs) AS n_running_cancellable_jobs + , SUM(R.ready_cancellable_cores_mcpu) AS ready_cancellable_cores_mcpu + , SUM(R.running_cancellable_cores_mcpu) AS running_cancellable_cores_mcpu + FROM job_group_inst_coll_cancellable_resources AS R + WHERE R.batch_id = G.batch_id + AND R.job_group_id = G.job_group_id + GROUP BY {keys} + HAVING COUNT(*) > 1 +) AS R ON TRUE +WHERE G.time_completed IS NOT NULL + AND C.id IS NULL +LIMIT 1000; +""", + query_name='find_finished_cancellable_resources_records_to_compact', + ) + + async for target in targets: + await compact({**target, 'token': 0}) + + +async def delete_dead_job_group_cancellable_resources_records(db: Database): + keyfields = [ + 'batch_id', + 'update_id', + 'job_group_id', + 'inst_coll', + ] + + keys = ','.join([f'R.{k}' for k in keyfields]) + targets = db.execute_and_fetchall( + f"""\ +SELECT {keys} +FROM job_groups AS G +LEFT JOIN LATERAL ( + SELECT C.id FROM job_groups_cancelled AS C + INNER JOIN job_group_self_and_ancestors AS D + ON C.id = D.batch_id + AND C.job_group_id = D.ancestor_id + WHERE D.batch_id = G.batch_id + AND D.job_group_id = G.job_group_id +) AS C ON TRUE +INNER JOIN LATERAL ( + SELECT {keys} + FROM job_group_inst_coll_cancellable_resources AS R + WHERE R.batch_id = G.batch_id + AND R.job_group_id = G.job_group_id + GROUP BY {keys} + HAVING COUNT(*) = 1 + AND MAX(R.n_creating_cancellable_jobs) = 0 + AND MAX(R.n_ready_cancellable_jobs) = 0 + AND MAX(R.n_running_cancellable_jobs) = 0 + AND MAX(R.ready_cancellable_cores_mcpu) = 0 + AND MAX(R.running_cancellable_cores_mcpu) = 0 +) AS R ON TRUE +WHERE G.time_completed IS NOT NULL + AND C.id IS NULL +LIMIT 1000; +""", + query_name='find_dead_cancellable_resources_records_to_delete', + ) + + async for target in targets: + await db.just_execute( + f"""\ +DELETE FROM job_group_inst_coll_cancellable_resources +WHERE {' AND '.join([f'{k} = %s' for k in keyfields])}; +""", + [target[k] for k in keyfields], + ) + + async def compact_agg_billing_project_users_table(app, db: Database): if not app['feature_flags']['compact_billing_tables']: return @@ -1754,6 +1878,8 @@ async def close_and_wait(): task_manager.ensure_future(periodically_call(60, compact_agg_billing_project_users_by_date_table, app, db)) task_manager.ensure_future(periodically_call(60, delete_committed_job_groups_inst_coll_staging_records, db)) task_manager.ensure_future(periodically_call(60, delete_prev_cancelled_job_group_cancellable_resources_records, db)) + task_manager.ensure_future(periodically_call(60, compact_job_group_cancellable_resources_records, db)) + task_manager.ensure_future(periodically_call(60, delete_dead_job_group_cancellable_resources_records, db)) async def on_cleanup(app):