Skip to content

Commit

Permalink
Use sys_pgcon for long-term advisory locks
Browse files Browse the repository at this point in the history
  • Loading branch information
fantix committed Feb 8, 2025
1 parent 1c5353b commit 6f50201
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions edb/server/protocol/ai_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ async def _ext_ai_index_builder_controller_loop(
models = await _ext_ai_fetch_active_models(pgconn)
if models:
if not holding_lock:
holding_lock = await _ext_ai_lock(pgconn)
holding_lock = await _ext_ai_lock(tenant, pgconn)
if holding_lock:
provider_contexts = _prepare_provider_contexts(
db,
Expand All @@ -319,7 +319,7 @@ async def _ext_ai_index_builder_controller_loop(
finally:
if not sleep_timer.is_ready_and_urgent():
await asyncutil.deferred_shield(
_ext_ai_unlock(pgconn))
_ext_ai_unlock(tenant))
holding_lock = False
except Exception:
logger.exception(f"caught error in {task_name}")
Expand Down Expand Up @@ -363,22 +363,51 @@ async def _ext_ai_fetch_active_models(
return result


_EXT_AI_ADVISORY_LOCK = b"3987734540"
_EXT_AI_ADVISORY_LOCK_LOCK = b"3987734540"
_EXT_AI_ADVISORY_LOCK = b"3987734541"


async def _ext_ai_lock(
tenant: srv_tenant.Tenant,
pgconn: pgcon.PGConnection,
) -> bool:
b = await pgconn.sql_fetch_val(
b"SELECT pg_try_advisory_lock(" + _EXT_AI_ADVISORY_LOCK + b")")
return b == b'\x01'
async def _try_xact_lock(lock: bytes) -> bool:
b = await pgconn.sql_fetch_val(
b"SELECT pg_try_advisory_xact_lock(" + lock + b")")
return b == b'\x01'

# We use transaction-level advisory locks to ensure releasing
await pgconn.sql_execute(b"START TRANSACTION")
try:
# Acquire the outer lock to operate on the real lock
if await _try_xact_lock(_EXT_AI_ADVISORY_LOCK_LOCK):
# If success, acquire the real lock in the current session
# temporarily to make sure it's not locked by system session
sp = b"ext_ai_lock_sp"
await pgconn.sql_execute(b"SAVEPOINT " + sp)
if await _try_xact_lock(_EXT_AI_ADVISORY_LOCK):
# If success, move the lock to the system session
await pgconn.sql_execute(b"ROLLBACK TO SAVEPOINT " + sp)
async with tenant.use_sys_pgcon() as syscon:
# The long-term holding lock must be on session-level
await syscon.sql_execute(
b"SELECT pg_advisory_lock("
+ _EXT_AI_ADVISORY_LOCK
+ b")"
)
return True
finally:
await pgconn.sql_execute(b"ROLLBACK")

return False


async def _ext_ai_unlock(
pgconn: pgcon.PGConnection,
tenant: srv_tenant.Tenant,
) -> None:
await pgconn.sql_fetch_val(
b"SELECT pg_advisory_unlock(" + _EXT_AI_ADVISORY_LOCK + b")")
async with tenant.use_sys_pgcon() as syscon:
await syscon.sql_fetch_val(
b"SELECT pg_advisory_unlock(" + _EXT_AI_ADVISORY_LOCK + b")")


def _prepare_provider_contexts(
Expand Down

0 comments on commit 6f50201

Please sign in to comment.