diff --git a/edb/server/dbview/dbview.pxd b/edb/server/dbview/dbview.pxd index 8186dab8060..96f2050e410 100644 --- a/edb/server/dbview/dbview.pxd +++ b/edb/server/dbview/dbview.pxd @@ -63,6 +63,7 @@ cdef class Database: cdef: stmt_cache.StatementsCache _eql_to_compiled + object _cache_locks object _sql_to_compiled DatabaseIndex _index object _views diff --git a/edb/server/dbview/dbview.pyx b/edb/server/dbview/dbview.pyx index bc7c6661e41..d971406ffd6 100644 --- a/edb/server/dbview/dbview.pyx +++ b/edb/server/dbview/dbview.pyx @@ -124,6 +124,7 @@ cdef class Database: self._eql_to_compiled = stmt_cache.StatementsCache( maxsize=defines._MAX_QUERIES_CACHE_DB) + self._cache_locks = {} self._sql_to_compiled = lru.LRUMapping( maxsize=defines._MAX_QUERIES_CACHE) @@ -1049,59 +1050,93 @@ cdef class DatabaseConnectionView: ) elif query_unit_group is None: query_unit_group = self.lookup_compiled_query(query_req) + + lock = None cached = True + if query_unit_group is None: - # Cache miss; need to compile this query. - cached = False - # Remember the schema version we are compiling on, so that we can - # cache the result with the matching version. In case of concurrent - # schema update, we're only storing an outdated cache entry, and - # the next identical query could get recompiled on the new schema. - schema_version = self.schema_version + # Lock on the query compilation to avoid other coroutines running + # the same compile and waste computational resources + if cached_globally: + lock_table = self.server.system_compile_cache_locks + else: + lock_table = self._db._cache_locks + lock = lock_table.get(query_req) + if lock is None: + lock = asyncio.Lock() + lock_table[query_req] = lock + await lock.acquire() - try: - query_unit_group = await self._compile(query_req) - except (errors.EdgeQLSyntaxError, errors.InternalServerError): - raise - except errors.EdgeDBError: - if self.in_tx_error(): - # Because we are in an error state it is more reasonable - # to fail with TransactionError("commands ignored") - # rather than with a potentially more cryptic error. - # An exception from this rule are syntax errors and - # ISEs, because these could arise while the user is - # trying to properly rollback this failed transaction. - self.raise_in_tx_error() + try: + # Check the cache again with the lock acquired + if query_unit_group is None and self._query_cache_enabled: + if cached_globally: + query_unit_group = ( + self.server.system_compile_cache.get(query_req) + ) else: - raise + query_unit_group = self.lookup_compiled_query(query_req) + + if query_unit_group is None: + # Cache miss; need to compile this query. + cached = False + # Remember the schema version we are compiling on, so that we + # can cache the result with the matching version. In case of + # concurrent schema update, we're only storing an outdated + # cache entry, and the next identical query could get + # recompiled on the new schema. + schema_version = self.schema_version - self.check_capabilities( - query_unit_group.capabilities, - allow_capabilities, - errors.DisabledCapabilityError, - "disabled by the client", - ) + try: + query_unit_group = await self._compile(query_req) + except (errors.EdgeQLSyntaxError, errors.InternalServerError): + raise + except errors.EdgeDBError: + if self.in_tx_error(): + # Because we are in an error state it's more reasonable + # to fail with TransactionError("commands ignored") + # rather than with a potentially more cryptic error. + # An exception from this rule are syntax errors and + # ISEs, because these could arise while the user is + # trying to properly rollback this failed transaction. + self.raise_in_tx_error() + else: + raise + + self.check_capabilities( + query_unit_group.capabilities, + allow_capabilities, + errors.DisabledCapabilityError, + "disabled by the client", + ) - if self.in_tx_error(): - # The current transaction is aborted, so we must fail - # all commands except ROLLBACK or ROLLBACK TO SAVEPOINT. - first = query_unit_group[0] - if ( - not ( - first.tx_rollback - or first.tx_savepoint_rollback - or first.tx_abort_migration - ) or len(query_unit_group) > 1 - ): - self.raise_in_tx_error() + if self.in_tx_error(): + # The current transaction is aborted, so we must fail + # all commands except ROLLBACK or ROLLBACK TO SAVEPOINT. + first = query_unit_group[0] + if ( + not ( + first.tx_rollback + or first.tx_savepoint_rollback + or first.tx_abort_migration + ) or len(query_unit_group) > 1 + ): + self.raise_in_tx_error() - if not cached and query_unit_group.cacheable: - if cached_globally: - self.server.system_compile_cache[query_req] = query_unit_group - else: - self.cache_compiled_query( - query_req, query_unit_group, schema_version - ) + if not cached and query_unit_group.cacheable: + if cached_globally: + self.server.system_compile_cache[query_req] = ( + query_unit_group + ) + else: + self.cache_compiled_query( + query_req, query_unit_group, schema_version + ) + finally: + if lock is not None: + lock.release() + if not lock._waiters: + del lock_table[query_req] if use_metrics: metrics.edgeql_query_compilations.inc( diff --git a/edb/server/server.py b/edb/server/server.py index 2b45cbc163a..452c8df5b82 100644 --- a/edb/server/server.py +++ b/edb/server/server.py @@ -187,6 +187,7 @@ def __init__( self._system_compile_cache = lru.LRUMapping( maxsize=defines._MAX_QUERIES_CACHE ) + self._system_compile_cache_locks: dict[Any, Any] = {} self._listen_sockets = listen_sockets if listen_sockets: @@ -444,6 +445,10 @@ def request_stop_fe_conns(self, dbname: str) -> None: if conn.dbname == dbname: conn.request_stop() + @property + def system_compile_cache_locks(self): + return self._system_compile_cache_locks + def _idle_gc_collector(self): try: self._idle_gc_handler = None