Skip to content

Commit

Permalink
Use sys_pgcon for long-term advisory locks
Browse files Browse the repository at this point in the history
  • Loading branch information
fantix committed Feb 8, 2025
1 parent 1c5353b commit d106e87
Showing 1 changed file with 41 additions and 9 deletions.
50 changes: 41 additions & 9 deletions edb/server/protocol/ai_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ async def _ext_ai_index_builder_controller_loop(
models = await _ext_ai_fetch_active_models(pgconn)
if models:
if not holding_lock:
holding_lock = await _ext_ai_lock(pgconn)
holding_lock = await _ext_ai_lock(tenant, pgconn)
if holding_lock:
provider_contexts = _prepare_provider_contexts(
db,
Expand All @@ -319,7 +319,7 @@ async def _ext_ai_index_builder_controller_loop(
finally:
if not sleep_timer.is_ready_and_urgent():
await asyncutil.deferred_shield(
_ext_ai_unlock(pgconn))
_ext_ai_unlock(tenant))
holding_lock = False
except Exception:
logger.exception(f"caught error in {task_name}")
Expand Down Expand Up @@ -363,22 +363,54 @@ async def _ext_ai_fetch_active_models(
return result


_EXT_AI_ADVISORY_LOCK = b"3987734540"
_EXT_AI_ADVISORY_LOCK_LOCK = b"3987734540"
_EXT_AI_ADVISORY_LOCK = b"3987734541"


def _try_lock(lock: bytes) -> bytes:
return b"SELECT pg_try_advisory_lock(" + lock + b")"


def _try_xact_lock(lock: bytes) -> bytes:
return b"SELECT pg_try_advisory_xact_lock(" + lock + b")"


def _unlock(lock: bytes) -> bytes:
return b"SELECT pg_advisory_unlock(" + lock + b")"


async def _ext_ai_lock(
tenant: srv_tenant.Tenant,
pgconn: pgcon.PGConnection,
) -> bool:
b = await pgconn.sql_fetch_val(
b"SELECT pg_try_advisory_lock(" + _EXT_AI_ADVISORY_LOCK + b")")
return b == b'\x01'
# We use transaction-level advisory locks to ensure releasing
await pgconn.sql_execute(b"START TRANSACTION")
try:
# Acquire the outer lock to operate on the real lock
b = await pgconn.sql_fetch_val(_try_lock(_EXT_AI_ADVISORY_LOCK_LOCK))

# If success, acquire the real lock in the current session
if b == b'\x01':
b = await pgconn.sql_fetch_val(_try_lock(_EXT_AI_ADVISORY_LOCK))

# If success, move the lock to the system session
if b == b'\x01':
await pgconn.sql_fetch_val(_unlock(_EXT_AI_ADVISORY_LOCK))
async with tenant.use_sys_pgcon() as syscon:
# The long-term holding lock must be on session-level
await syscon.sql_execute(_try_lock(_EXT_AI_ADVISORY_LOCK))
return True
finally:
await pgconn.sql_execute(b"ROLLBACK")

return False


async def _ext_ai_unlock(
pgconn: pgcon.PGConnection,
tenant: srv_tenant.Tenant,
) -> None:
await pgconn.sql_fetch_val(
b"SELECT pg_advisory_unlock(" + _EXT_AI_ADVISORY_LOCK + b")")
async with tenant.use_sys_pgcon() as syscon:
await syscon.sql_fetch_val(_unlock(_EXT_AI_ADVISORY_LOCK))


def _prepare_provider_contexts(
Expand Down

0 comments on commit d106e87

Please sign in to comment.