Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[batch] Compact And Drop Records from job_group_inst_coll_cancellable_resources #14645

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
126 changes: 126 additions & 0 deletions batch/batch/driver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,130 @@
)


async def compact_job_group_cancellable_resources_records(db: Database):
keyfields = [
'batch_id',
'update_id',
'job_group_id',
'inst_coll',
]

rowfields = [
*keyfields,
'token',
'n_creating_cancellable_jobs',
'n_ready_cancellable_jobs',
'n_running_cancellable_jobs',
'ready_cancellable_cores_mcpu',
'running_cancellable_cores_mcpu',
]

@transaction(db)
async def compact(tx: Transaction, record: dict):
await tx.just_execute(
f"""\

Check warning on line 1516 in batch/batch/driver/main.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

batch/batch/driver/main.py#L1516

Possible SQL injection vector through string-based query construction.
DELETE FROM job_group_inst_coll_cancellable_resources
WHERE {' AND '.join([f'{k} = %s' for k in keyfields])};
""",
[record[k] for k in keyfields],
)

await tx.execute_insertone(
f"""\
INSERT INTO job_group_inst_coll_cancellable_resources ({','.join(rowfields)})
VALUES ({','.join(['%s' for _ in rowfields])});
""",
[record[k] for k in rowfields],
)

keys = ','.join([f'R.{k}' for k in keyfields])
targets = db.execute_and_fetchall(
f"""\
SELECT R.*
FROM job_groups AS G
LEFT JOIN LATERAL (
SELECT C.id FROM job_groups_cancelled AS C
INNER JOIN job_group_self_and_ancestors AS D
ON C.id = D.batch_id
AND C.job_group_id = D.ancestor_id
WHERE D.batch_id = G.batch_id
AND D.job_group_id = G.job_group_id
) AS C ON TRUE
INNER JOIN LATERAL (
SELECT {keys}
, SUM(R.n_creating_cancellable_jobs) AS n_creating_cancellable_jobs
, SUM(R.n_ready_cancellable_jobs) AS n_ready_cancellable_jobs
, SUM(R.n_running_cancellable_jobs) AS n_running_cancellable_jobs
, SUM(R.ready_cancellable_cores_mcpu) AS ready_cancellable_cores_mcpu
, SUM(R.running_cancellable_cores_mcpu) AS running_cancellable_cores_mcpu
FROM job_group_inst_coll_cancellable_resources AS R
WHERE R.batch_id = G.batch_id
AND R.job_group_id = G.job_group_id
GROUP BY {keys}
HAVING COUNT(*) > 1
) AS R ON TRUE
WHERE G.time_completed IS NOT NULL
AND C.id IS NULL
LIMIT 1000;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling out the limit on both queries. Seems other queries also limit to 1000 but not sure where this comes from. Without compacting, the query to find compacted rows takes for ever as it scans through a large chunk of the db. On the other hand, there are millions of rows so reducing this number would make the background task take longer to churn through records. Suggestions?

Copy link
Member Author

@ehigham ehigham Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, my 2-week old prod snapshot has 173561655 rows in job_group_inst_coll_cancellable_resources and 8567769 job_groups. Assuming (incorrectly) instant execution, It'll will take 100 days to churn through the db.

""",
query_name='find_finished_cancellable_resources_records_to_compact',
)

async for target in targets:
await compact({**target, 'token': 0})


async def delete_dead_job_group_cancellable_resources_records(db: Database):
keyfields = [
'batch_id',
'update_id',
'job_group_id',
'inst_coll',
]

keys = ','.join([f'R.{k}' for k in keyfields])
targets = db.execute_and_fetchall(
f"""\
SELECT {keys}
FROM job_groups AS G
LEFT JOIN LATERAL (
SELECT C.id FROM job_groups_cancelled AS C
INNER JOIN job_group_self_and_ancestors AS D
ON C.id = D.batch_id
AND C.job_group_id = D.ancestor_id
WHERE D.batch_id = G.batch_id
AND D.job_group_id = G.job_group_id
) AS C ON TRUE
INNER JOIN LATERAL (
SELECT {keys}
FROM job_group_inst_coll_cancellable_resources AS R
WHERE R.batch_id = G.batch_id
AND R.job_group_id = G.job_group_id
GROUP BY {keys}
HAVING COUNT(*) = 1
AND MAX(R.n_creating_cancellable_jobs) = 0
AND MAX(R.n_ready_cancellable_jobs) = 0
AND MAX(R.n_running_cancellable_jobs) = 0
AND MAX(R.ready_cancellable_cores_mcpu) = 0
AND MAX(R.running_cancellable_cores_mcpu) = 0
) AS R ON TRUE
WHERE G.time_completed IS NOT NULL
AND C.id IS NULL
LIMIT 1000;
""",
query_name='find_dead_cancellable_resources_records_to_delete',
)

async for target in targets:
await db.just_execute(
f"""\
DELETE FROM job_group_inst_coll_cancellable_resources
WHERE {' AND '.join([f'{k} = %s' for k in keyfields])};
""",
[target[k] for k in keyfields],
)


async def compact_agg_billing_project_users_table(app, db: Database):
if not app['feature_flags']['compact_billing_tables']:
return
Expand Down Expand Up @@ -1754,6 +1878,8 @@
task_manager.ensure_future(periodically_call(60, compact_agg_billing_project_users_by_date_table, app, db))
task_manager.ensure_future(periodically_call(60, delete_committed_job_groups_inst_coll_staging_records, db))
task_manager.ensure_future(periodically_call(60, delete_prev_cancelled_job_group_cancellable_resources_records, db))
task_manager.ensure_future(periodically_call(60, compact_job_group_cancellable_resources_records, db))
task_manager.ensure_future(periodically_call(60, delete_dead_job_group_cancellable_resources_records, db))


async def on_cleanup(app):
Expand Down