Skip to content

Commit

Permalink
Add lock on compile (#6349)
Browse files Browse the repository at this point in the history
We cannot use a Future in the LRU mapping, because we don't want to
invalidate an old cache item if the new compile result turns to be not
cacheable; and we can only know such thing after the compilation.
  • Loading branch information
fantix authored Mar 4, 2024
1 parent d02d143 commit f2ee48d
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 46 deletions.
1 change: 1 addition & 0 deletions edb/server/dbview/dbview.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ cdef class Database:

cdef:
stmt_cache.StatementsCache _eql_to_compiled
object _cache_locks
object _sql_to_compiled
DatabaseIndex _index
object _views
Expand Down
127 changes: 81 additions & 46 deletions edb/server/dbview/dbview.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ cdef class Database:

self._eql_to_compiled = stmt_cache.StatementsCache(
maxsize=defines._MAX_QUERIES_CACHE_DB)
self._cache_locks = {}
self._sql_to_compiled = lru.LRUMapping(
maxsize=defines._MAX_QUERIES_CACHE)

Expand Down Expand Up @@ -1049,59 +1050,93 @@ cdef class DatabaseConnectionView:
)
elif query_unit_group is None:
query_unit_group = self.lookup_compiled_query(query_req)

lock = None
cached = True

if query_unit_group is None:
# Cache miss; need to compile this query.
cached = False
# Remember the schema version we are compiling on, so that we can
# cache the result with the matching version. In case of concurrent
# schema update, we're only storing an outdated cache entry, and
# the next identical query could get recompiled on the new schema.
schema_version = self.schema_version
# Lock on the query compilation to avoid other coroutines running
# the same compile and waste computational resources
if cached_globally:
lock_table = self.server.system_compile_cache_locks
else:
lock_table = self._db._cache_locks
lock = lock_table.get(query_req)
if lock is None:
lock = asyncio.Lock()
lock_table[query_req] = lock
await lock.acquire()

try:
query_unit_group = await self._compile(query_req)
except (errors.EdgeQLSyntaxError, errors.InternalServerError):
raise
except errors.EdgeDBError:
if self.in_tx_error():
# Because we are in an error state it is more reasonable
# to fail with TransactionError("commands ignored")
# rather than with a potentially more cryptic error.
# An exception from this rule are syntax errors and
# ISEs, because these could arise while the user is
# trying to properly rollback this failed transaction.
self.raise_in_tx_error()
try:
# Check the cache again with the lock acquired
if query_unit_group is None and self._query_cache_enabled:
if cached_globally:
query_unit_group = (
self.server.system_compile_cache.get(query_req)
)
else:
raise
query_unit_group = self.lookup_compiled_query(query_req)

if query_unit_group is None:
# Cache miss; need to compile this query.
cached = False
# Remember the schema version we are compiling on, so that we
# can cache the result with the matching version. In case of
# concurrent schema update, we're only storing an outdated
# cache entry, and the next identical query could get
# recompiled on the new schema.
schema_version = self.schema_version

self.check_capabilities(
query_unit_group.capabilities,
allow_capabilities,
errors.DisabledCapabilityError,
"disabled by the client",
)
try:
query_unit_group = await self._compile(query_req)
except (errors.EdgeQLSyntaxError, errors.InternalServerError):
raise
except errors.EdgeDBError:
if self.in_tx_error():
# Because we are in an error state it's more reasonable
# to fail with TransactionError("commands ignored")
# rather than with a potentially more cryptic error.
# An exception from this rule are syntax errors and
# ISEs, because these could arise while the user is
# trying to properly rollback this failed transaction.
self.raise_in_tx_error()
else:
raise

self.check_capabilities(
query_unit_group.capabilities,
allow_capabilities,
errors.DisabledCapabilityError,
"disabled by the client",
)

if self.in_tx_error():
# The current transaction is aborted, so we must fail
# all commands except ROLLBACK or ROLLBACK TO SAVEPOINT.
first = query_unit_group[0]
if (
not (
first.tx_rollback
or first.tx_savepoint_rollback
or first.tx_abort_migration
) or len(query_unit_group) > 1
):
self.raise_in_tx_error()
if self.in_tx_error():
# The current transaction is aborted, so we must fail
# all commands except ROLLBACK or ROLLBACK TO SAVEPOINT.
first = query_unit_group[0]
if (
not (
first.tx_rollback
or first.tx_savepoint_rollback
or first.tx_abort_migration
) or len(query_unit_group) > 1
):
self.raise_in_tx_error()

if not cached and query_unit_group.cacheable:
if cached_globally:
self.server.system_compile_cache[query_req] = query_unit_group
else:
self.cache_compiled_query(
query_req, query_unit_group, schema_version
)
if not cached and query_unit_group.cacheable:
if cached_globally:
self.server.system_compile_cache[query_req] = (
query_unit_group
)
else:
self.cache_compiled_query(
query_req, query_unit_group, schema_version
)
finally:
if lock is not None:
lock.release()
if not lock._waiters:
del lock_table[query_req]

if use_metrics:
metrics.edgeql_query_compilations.inc(
Expand Down
5 changes: 5 additions & 0 deletions edb/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def __init__(
self._system_compile_cache = lru.LRUMapping(
maxsize=defines._MAX_QUERIES_CACHE
)
self._system_compile_cache_locks: dict[Any, Any] = {}

self._listen_sockets = listen_sockets
if listen_sockets:
Expand Down Expand Up @@ -444,6 +445,10 @@ def request_stop_fe_conns(self, dbname: str) -> None:
if conn.dbname == dbname:
conn.request_stop()

@property
def system_compile_cache_locks(self):
return self._system_compile_cache_locks

def _idle_gc_collector(self):
try:
self._idle_gc_handler = None
Expand Down

0 comments on commit f2ee48d

Please sign in to comment.