Skip to content

Commit

Permalink
This reverts commit 39083b6.
Browse files Browse the repository at this point in the history
  • Loading branch information
fantix committed Feb 12, 2025
1 parent 4c072ae commit dd9491b
Showing 1 changed file with 8 additions and 59 deletions.
67 changes: 8 additions & 59 deletions edb/server/protocol/ai_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ async def _ext_ai_index_builder_controller_loop(
models = await _ext_ai_fetch_active_models(pgconn)
if models:
if not holding_lock:
holding_lock = await _ext_ai_lock(tenant, pgconn)
holding_lock = await _ext_ai_lock(pgconn)
if holding_lock:
provider_contexts = _prepare_provider_contexts(
db,
Expand All @@ -320,7 +320,7 @@ async def _ext_ai_index_builder_controller_loop(
finally:
if not sleep_timer.is_ready_and_urgent():
await asyncutil.deferred_shield(
_ext_ai_unlock(tenant))
_ext_ai_unlock(pgconn))
holding_lock = False
except Exception:
logger.exception(f"caught error in {task_name}")
Expand Down Expand Up @@ -364,73 +364,22 @@ async def _ext_ai_fetch_active_models(
return result


# The _ext_ai_lock() is a long-term lock held in the system pgcon. It is used
# in the index builder job above guarding multiple alternating database pgcons
# and outgoing HTTP requests (free up pgcons while waiting for a response from
# external services), so that different Gel tenants on the same backend
# run this job exclusively.
#
# The following implementation is also safe to be used by multiple tasks within
# the same tenant (though at the time of writing, there is only one such task
# per tenant). To achieve this, we added an extra query on pg_locks to check if
# it's already held by another task, because advisory locks allow reentrancy
# from the same session (the same sys_pgcon). And to avoid racing conditions,
# we use another advisory lock over the 2 queries of check-and-lock in the
# local session. This also means, one must use _ext_ai_lock() instead of an
# individual lock of the 2 locks here to avoid misuse.
#
# If you are editing the magic numbers here: make sure it fits in a Postgres
# Oid type (uint32), or you'll need to change the `classid` query below.
_EXT_AI_ADVISORY_LOCK = b"3987734540"
_EXT_AI_ADVISORY_LOCK_LOCK = b"3987734541"


async def _ext_ai_lock(
tenant: srv_tenant.Tenant,
pgconn: pgcon.PGConnection,
) -> bool:
# We use transaction-level advisory locks to ensure releasing
await pgconn.sql_execute(b"START TRANSACTION")
try:
b = await pgconn.sql_fetch_val(
b"SELECT pg_try_advisory_xact_lock("
+ _EXT_AI_ADVISORY_LOCK_LOCK
+ b")"
)
if b == b'\x01':
lock_free = await pgconn.sql_fetch_val(
b'''
SELECT NOT EXISTS (
SELECT
1
FROM
pg_locks
WHERE
locktype = 'advisory'
AND classid = 0
AND objid = \
''' + _EXT_AI_ADVISORY_LOCK + b')')
if lock_free == b'\x01':
async with tenant.use_sys_pgcon() as syscon:
# The long-term holding lock must be on session-level
b = await syscon.sql_fetch_val(
b"SELECT pg_try_advisory_lock("
+ _EXT_AI_ADVISORY_LOCK
+ b")"
)
return b == b'\x01'
finally:
await pgconn.sql_execute(b"ROLLBACK")

return False
b = await pgconn.sql_fetch_val(
b"SELECT pg_try_advisory_lock(" + _EXT_AI_ADVISORY_LOCK + b")")
return b == b'\x01'


async def _ext_ai_unlock(
tenant: srv_tenant.Tenant,
pgconn: pgcon.PGConnection,
) -> None:
async with tenant.use_sys_pgcon() as syscon:
await syscon.sql_fetch_val(
b"SELECT pg_advisory_unlock(" + _EXT_AI_ADVISORY_LOCK + b")")
await pgconn.sql_fetch_val(
b"SELECT pg_advisory_unlock(" + _EXT_AI_ADVISORY_LOCK + b")")


def _prepare_provider_contexts(
Expand Down

0 comments on commit dd9491b

Please sign in to comment.