Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext: fix issues in net gc task #7904

Merged
merged 4 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions edb/server/net_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,11 @@ def _warn(e):
)
async for iteration in rloop:
async with iteration:
result = await execute.parse_execute_json(
if not db.tenant.is_database_connectable(db.name):
# Don't run the net_worker if the database is not
# connectable, e.g. being dropped
continue
result_json = await execute.parse_execute_json(
db,
"""
with requests := (
Expand All @@ -346,14 +350,15 @@ def _warn(e):
and (datetime_of_statement() - .updated_at) >
<duration>$expires_in
)
delete requests;
select count((delete requests));
""",
variables={"expires_in": expires_in.to_backend_str()},
cached_globally=True,
tx_isolation=defines.TxIsolationLevel.RepeatableRead,
)
if len(result) > 0:
logger.info(f"Deleted requests: {result!r}")
result: list[int] = json.loads(result_json)
if result[0] > 0:
logger.info(f"Deleted {result[0]} requests")
else:
logger.info(f"No requests to delete")

Expand Down Expand Up @@ -383,7 +388,8 @@ async def gc(server: edbserver.BaseServer) -> None:
if tenant.accept_new_tasks
]
try:
await asyncio.wait(tasks)
if tasks:
await asyncio.wait(tasks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't we using a TaskGroup here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess we can; this was a carried-over from an old commit, let me try again so that we know for sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah it's just unnecessary, because the Task we're in here is safe without cancellation - the server doesn't need to wait for gc() itself to clean up anything, while all child Tasks are already managed by the TaskGroup in each Tenant (by tenant.create_task()).

We would need an extra coroutine wrapper to use TaskGroup here:

async def _await_task(task: asyncio.Task):
    return await task


async def gc(server: edbserver.BaseServer) -> None:
    while True:
        try:
            async with asyncio.TaskGroup() as g:
                for tenant in server.iter_tenants():
                    if tenant.accept_new_tasks:
                        task = tenant.create_task(
                            _gc(tenant, NET_HTTP_REQUEST_TTL),
                            interruptable=False,
                        )
                        g.create_task(_await_task(task))
...

except Exception as ex:
logger.debug(
"GC of std::net::http::ScheduledRequest failed", exc_info=ex
Expand Down
30 changes: 18 additions & 12 deletions edb/server/protocol/auth_ext/pkce.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,23 +158,29 @@ async def delete(db: edbtenant.dbview.Database, id: str) -> None:
assert len(result_json) == 1


async def _delete_challenge(db: edbtenant.dbview.Database) -> None:
if not db.tenant.is_database_connectable(db.name):
# Don't run gc if the database is not connectable, e.g. being dropped
return

await execute.parse_execute_json(
db,
"""
delete ext::auth::PKCEChallenge filter
(datetime_of_statement() - .created_at) >
<duration>$validity
""",
variables={"validity": VALIDITY.to_backend_str()},
cached_globally=True,
)


async def _gc(tenant: edbtenant.Tenant) -> None:
try:
async with asyncio.TaskGroup() as g:
for db in tenant.iter_dbs():
if "auth" in db.extensions:
g.create_task(
execute.parse_execute_json(
db,
"""
delete ext::auth::PKCEChallenge filter
(datetime_of_statement() - .created_at) >
<duration>$validity
""",
variables={"validity": VALIDITY.to_backend_str()},
cached_globally=True,
),
)
g.create_task(_delete_challenge(db))
except Exception as ex:
logger.debug(
"GC of ext::auth::PKCEChallenge failed (instance: %s)",
Expand Down