From 6f502019f639fa1998bc2ec9b5485370b3c3483b Mon Sep 17 00:00:00 2001 From: Fantix King Date: Sat, 8 Feb 2025 15:59:35 -0500 Subject: [PATCH] Use sys_pgcon for long-term advisory locks --- edb/server/protocol/ai_ext.py | 47 ++++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/edb/server/protocol/ai_ext.py b/edb/server/protocol/ai_ext.py index fcc200eb906a..51787849165d 100644 --- a/edb/server/protocol/ai_ext.py +++ b/edb/server/protocol/ai_ext.py @@ -299,7 +299,7 @@ async def _ext_ai_index_builder_controller_loop( models = await _ext_ai_fetch_active_models(pgconn) if models: if not holding_lock: - holding_lock = await _ext_ai_lock(pgconn) + holding_lock = await _ext_ai_lock(tenant, pgconn) if holding_lock: provider_contexts = _prepare_provider_contexts( db, @@ -319,7 +319,7 @@ async def _ext_ai_index_builder_controller_loop( finally: if not sleep_timer.is_ready_and_urgent(): await asyncutil.deferred_shield( - _ext_ai_unlock(pgconn)) + _ext_ai_unlock(tenant)) holding_lock = False except Exception: logger.exception(f"caught error in {task_name}") @@ -363,22 +363,51 @@ async def _ext_ai_fetch_active_models( return result -_EXT_AI_ADVISORY_LOCK = b"3987734540" +_EXT_AI_ADVISORY_LOCK_LOCK = b"3987734540" +_EXT_AI_ADVISORY_LOCK = b"3987734541" async def _ext_ai_lock( + tenant: srv_tenant.Tenant, pgconn: pgcon.PGConnection, ) -> bool: - b = await pgconn.sql_fetch_val( - b"SELECT pg_try_advisory_lock(" + _EXT_AI_ADVISORY_LOCK + b")") - return b == b'\x01' + async def _try_xact_lock(lock: bytes) -> bool: + b = await pgconn.sql_fetch_val( + b"SELECT pg_try_advisory_xact_lock(" + lock + b")") + return b == b'\x01' + + # We use transaction-level advisory locks to ensure releasing + await pgconn.sql_execute(b"START TRANSACTION") + try: + # Acquire the outer lock to operate on the real lock + if await _try_xact_lock(_EXT_AI_ADVISORY_LOCK_LOCK): + # If success, acquire the real lock in the current session + # temporarily to make sure it's not locked by system session + sp = b"ext_ai_lock_sp" + await pgconn.sql_execute(b"SAVEPOINT " + sp) + if await _try_xact_lock(_EXT_AI_ADVISORY_LOCK): + # If success, move the lock to the system session + await pgconn.sql_execute(b"ROLLBACK TO SAVEPOINT " + sp) + async with tenant.use_sys_pgcon() as syscon: + # The long-term holding lock must be on session-level + await syscon.sql_execute( + b"SELECT pg_advisory_lock(" + + _EXT_AI_ADVISORY_LOCK + + b")" + ) + return True + finally: + await pgconn.sql_execute(b"ROLLBACK") + + return False async def _ext_ai_unlock( - pgconn: pgcon.PGConnection, + tenant: srv_tenant.Tenant, ) -> None: - await pgconn.sql_fetch_val( - b"SELECT pg_advisory_unlock(" + _EXT_AI_ADVISORY_LOCK + b")") + async with tenant.use_sys_pgcon() as syscon: + await syscon.sql_fetch_val( + b"SELECT pg_advisory_unlock(" + _EXT_AI_ADVISORY_LOCK + b")") def _prepare_provider_contexts(