From d106e8792fdce52e518cab03e1fb381a7583ebaa Mon Sep 17 00:00:00 2001 From: Fantix King Date: Sat, 8 Feb 2025 15:59:35 -0500 Subject: [PATCH] Use sys_pgcon for long-term advisory locks --- edb/server/protocol/ai_ext.py | 50 ++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/edb/server/protocol/ai_ext.py b/edb/server/protocol/ai_ext.py index fcc200eb906a..bcc4a4be788e 100644 --- a/edb/server/protocol/ai_ext.py +++ b/edb/server/protocol/ai_ext.py @@ -299,7 +299,7 @@ async def _ext_ai_index_builder_controller_loop( models = await _ext_ai_fetch_active_models(pgconn) if models: if not holding_lock: - holding_lock = await _ext_ai_lock(pgconn) + holding_lock = await _ext_ai_lock(tenant, pgconn) if holding_lock: provider_contexts = _prepare_provider_contexts( db, @@ -319,7 +319,7 @@ async def _ext_ai_index_builder_controller_loop( finally: if not sleep_timer.is_ready_and_urgent(): await asyncutil.deferred_shield( - _ext_ai_unlock(pgconn)) + _ext_ai_unlock(tenant)) holding_lock = False except Exception: logger.exception(f"caught error in {task_name}") @@ -363,22 +363,54 @@ async def _ext_ai_fetch_active_models( return result -_EXT_AI_ADVISORY_LOCK = b"3987734540" +_EXT_AI_ADVISORY_LOCK_LOCK = b"3987734540" +_EXT_AI_ADVISORY_LOCK = b"3987734541" + + +def _try_lock(lock: bytes) -> bytes: + return b"SELECT pg_try_advisory_lock(" + lock + b")" + + +def _try_xact_lock(lock: bytes) -> bytes: + return b"SELECT pg_try_advisory_xact_lock(" + lock + b")" + + +def _unlock(lock: bytes) -> bytes: + return b"SELECT pg_advisory_unlock(" + lock + b")" async def _ext_ai_lock( + tenant: srv_tenant.Tenant, pgconn: pgcon.PGConnection, ) -> bool: - b = await pgconn.sql_fetch_val( - b"SELECT pg_try_advisory_lock(" + _EXT_AI_ADVISORY_LOCK + b")") - return b == b'\x01' + # We use transaction-level advisory locks to ensure releasing + await pgconn.sql_execute(b"START TRANSACTION") + try: + # Acquire the outer lock to operate on the real lock + b = await pgconn.sql_fetch_val(_try_lock(_EXT_AI_ADVISORY_LOCK_LOCK)) + + # If success, acquire the real lock in the current session + if b == b'\x01': + b = await pgconn.sql_fetch_val(_try_lock(_EXT_AI_ADVISORY_LOCK)) + + # If success, move the lock to the system session + if b == b'\x01': + await pgconn.sql_fetch_val(_unlock(_EXT_AI_ADVISORY_LOCK)) + async with tenant.use_sys_pgcon() as syscon: + # The long-term holding lock must be on session-level + await syscon.sql_execute(_try_lock(_EXT_AI_ADVISORY_LOCK)) + return True + finally: + await pgconn.sql_execute(b"ROLLBACK") + + return False async def _ext_ai_unlock( - pgconn: pgcon.PGConnection, + tenant: srv_tenant.Tenant, ) -> None: - await pgconn.sql_fetch_val( - b"SELECT pg_advisory_unlock(" + _EXT_AI_ADVISORY_LOCK + b")") + async with tenant.use_sys_pgcon() as syscon: + await syscon.sql_fetch_val(_unlock(_EXT_AI_ADVISORY_LOCK)) def _prepare_provider_contexts(