Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock concurrent compiles of the same query #6349

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions edb/server/dbview/dbview.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ cdef class Database:

cdef:
stmt_cache.StatementsCache _eql_to_compiled
object _cache_locks
object _sql_to_compiled
DatabaseIndex _index
object _views
Expand Down
127 changes: 81 additions & 46 deletions edb/server/dbview/dbview.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ cdef class Database:

self._eql_to_compiled = stmt_cache.StatementsCache(
maxsize=defines._MAX_QUERIES_CACHE_DB)
self._cache_locks = {}
self._sql_to_compiled = lru.LRUMapping(
maxsize=defines._MAX_QUERIES_CACHE)

Expand Down Expand Up @@ -1049,59 +1050,93 @@ cdef class DatabaseConnectionView:
)
elif query_unit_group is None:
query_unit_group = self.lookup_compiled_query(query_req)

lock = None
cached = True

if query_unit_group is None:
# Cache miss; need to compile this query.
cached = False
# Remember the schema version we are compiling on, so that we can
# cache the result with the matching version. In case of concurrent
# schema update, we're only storing an outdated cache entry, and
# the next identical query could get recompiled on the new schema.
schema_version = self.schema_version
# Lock on the query compilation to avoid other coroutines running
# the same compile and waste computational resources
if cached_globally:
lock_table = self.server.system_compile_cache_locks
else:
lock_table = self._db._cache_locks
lock = lock_table.get(query_req)
if lock is None:
lock = asyncio.Lock()
lock_table[query_req] = lock
await lock.acquire()

try:
query_unit_group = await self._compile(query_req)
except (errors.EdgeQLSyntaxError, errors.InternalServerError):
raise
except errors.EdgeDBError:
if self.in_tx_error():
# Because we are in an error state it is more reasonable
# to fail with TransactionError("commands ignored")
# rather than with a potentially more cryptic error.
# An exception from this rule are syntax errors and
# ISEs, because these could arise while the user is
# trying to properly rollback this failed transaction.
self.raise_in_tx_error()
try:
# Check the cache again with the lock acquired
if query_unit_group is None and self._query_cache_enabled:
if cached_globally:
query_unit_group = (
self.server.system_compile_cache.get(query_req)
)
else:
raise
query_unit_group = self.lookup_compiled_query(query_req)

if query_unit_group is None:
# Cache miss; need to compile this query.
cached = False
# Remember the schema version we are compiling on, so that we
# can cache the result with the matching version. In case of
# concurrent schema update, we're only storing an outdated
# cache entry, and the next identical query could get
# recompiled on the new schema.
schema_version = self.schema_version

self.check_capabilities(
query_unit_group.capabilities,
allow_capabilities,
errors.DisabledCapabilityError,
"disabled by the client",
)
try:
query_unit_group = await self._compile(query_req)
except (errors.EdgeQLSyntaxError, errors.InternalServerError):
raise
except errors.EdgeDBError:
if self.in_tx_error():
# Because we are in an error state it's more reasonable
# to fail with TransactionError("commands ignored")
# rather than with a potentially more cryptic error.
# An exception from this rule are syntax errors and
# ISEs, because these could arise while the user is
# trying to properly rollback this failed transaction.
self.raise_in_tx_error()
else:
raise

self.check_capabilities(
query_unit_group.capabilities,
allow_capabilities,
errors.DisabledCapabilityError,
"disabled by the client",
)

if self.in_tx_error():
# The current transaction is aborted, so we must fail
# all commands except ROLLBACK or ROLLBACK TO SAVEPOINT.
first = query_unit_group[0]
if (
not (
first.tx_rollback
or first.tx_savepoint_rollback
or first.tx_abort_migration
) or len(query_unit_group) > 1
):
self.raise_in_tx_error()
if self.in_tx_error():
# The current transaction is aborted, so we must fail
# all commands except ROLLBACK or ROLLBACK TO SAVEPOINT.
first = query_unit_group[0]
if (
not (
first.tx_rollback
or first.tx_savepoint_rollback
or first.tx_abort_migration
) or len(query_unit_group) > 1
):
self.raise_in_tx_error()

if not cached and query_unit_group.cacheable:
if cached_globally:
self.server.system_compile_cache[query_req] = query_unit_group
else:
self.cache_compiled_query(
query_req, query_unit_group, schema_version
)
if not cached and query_unit_group.cacheable:
if cached_globally:
self.server.system_compile_cache[query_req] = (
query_unit_group
)
else:
self.cache_compiled_query(
query_req, query_unit_group, schema_version
)
finally:
if lock is not None:
lock.release()
if not lock._waiters:
del lock_table[query_req]

if use_metrics:
metrics.edgeql_query_compilations.inc(
Expand Down
5 changes: 5 additions & 0 deletions edb/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def __init__(
self._system_compile_cache = lru.LRUMapping(
maxsize=defines._MAX_QUERIES_CACHE
)
self._system_compile_cache_locks: dict[Any, Any] = {}

self._listen_sockets = listen_sockets
if listen_sockets:
Expand Down Expand Up @@ -444,6 +445,10 @@ def request_stop_fe_conns(self, dbname: str) -> None:
if conn.dbname == dbname:
conn.request_stop()

@property
def system_compile_cache_locks(self):
return self._system_compile_cache_locks

def _idle_gc_collector(self):
try:
self._idle_gc_handler = None
Expand Down