diff --git a/edb/common/lru.py b/edb/common/lru.py index 89963fbd9d8..4b12f9d0133 100644 --- a/edb/common/lru.py +++ b/edb/common/lru.py @@ -22,7 +22,7 @@ import collections.abc -class LRUMapping(collections.abc.MutableMapping): +class ManualLRUMapping(collections.abc.MutableMapping): # We use an OrderedDict for LRU implementation. Operations: # @@ -61,8 +61,10 @@ def __setitem__(self, key, o): self._dict.move_to_end(key, last=True) else: self._dict[key] = o - if len(self._dict) > self._maxsize: - self._dict.popitem(last=False) + + def gc(self): + while len(self._dict) > self._maxsize: + yield self._dict.popitem(last=False) def __delitem__(self, key): del self._dict[key] @@ -75,3 +77,14 @@ def __len__(self): def __iter__(self): return iter(self._dict) + + +class LRUMapping(ManualLRUMapping): + def __setitem__(self, key, o): + if key in self._dict: + self._dict[key] = o + self._dict.move_to_end(key, last=True) + else: + self._dict[key] = o + if len(self._dict) > self._maxsize: + self._dict.popitem(last=False) diff --git a/edb/pgsql/metaschema.py b/edb/pgsql/metaschema.py index 2d1af9ad889..a422fa851aa 100644 --- a/edb/pgsql/metaschema.py +++ b/edb/pgsql/metaschema.py @@ -169,16 +169,43 @@ def __init__(self) -> None: dbops.Column(name='schema_version', type='uuid', required=True), dbops.Column(name='input', type='bytea', required=True), dbops.Column(name='output', type='bytea', required=True), + dbops.Column(name='evict', type='text', required=True), ]) self.add_constraint( dbops.PrimaryKey( table_name=('edgedb', '_query_cache'), - columns=['key', 'schema_version'], + columns=['key'], ), ) +class EvictQueryCacheFunction(dbops.Function): + + text = f''' + DECLARE + evict_sql text; + BEGIN + DELETE FROM "edgedb"."_query_cache" + WHERE "key" = cache_key + RETURNING "evict" INTO evict_sql; + IF evict_sql IS NOT NULL THEN + EXECUTE evict_sql; + END IF; + END; + ''' + + def __init__(self) -> None: + super().__init__( + name=('edgedb', '_evict_query_cache'), + args=[("cache_key", ("uuid",))], + returns=("void",), + language='plpgsql', + volatility='volatile', + text=self.text, + ) + + class BigintDomain(dbops.Domain): """Bigint: a variant of numeric that enforces zero digits after the dot. @@ -4409,6 +4436,7 @@ async def bootstrap( dbops.CreateTable(DMLDummyTable()), dbops.CreateTable(QueryCacheTable()), dbops.Query(DMLDummyTable.SETUP_QUERY), + dbops.CreateFunction(EvictQueryCacheFunction()), dbops.CreateFunction(UuidGenerateV1mcFunction('edgedbext')), dbops.CreateFunction(UuidGenerateV4Function('edgedbext')), dbops.CreateFunction(UuidGenerateV5Function('edgedbext')), diff --git a/edb/server/dbview/dbview.pxd b/edb/server/dbview/dbview.pxd index 066aa40813d..b3f2590e9bb 100644 --- a/edb/server/dbview/dbview.pxd +++ b/edb/server/dbview/dbview.pxd @@ -70,7 +70,7 @@ cdef class DatabaseIndex: cdef class Database: cdef: - object _eql_to_compiled + object _query_cache object _sql_to_compiled DatabaseIndex _index object _views diff --git a/edb/server/dbview/dbview.pyx b/edb/server/dbview/dbview.pyx index fcca9895dd5..309c727b274 100644 --- a/edb/server/dbview/dbview.pyx +++ b/edb/server/dbview/dbview.pyx @@ -28,6 +28,7 @@ import pickle import struct import time import typing +import uuid import weakref import immutables @@ -100,8 +101,8 @@ cdef class CompiledQuery: cdef class Database: - # Global LRU cache of compiled anonymous queries - _eql_to_compiled: typing.Mapping[str, dbstate.QueryUnitGroup] + # Global LRU cache of compiled queries + _query_cache: lru.ManualLRUMapping[uuid.UUID, dbstate.QueryUnitGroup] def __init__( self, @@ -126,8 +127,8 @@ cdef class Database: self._introspection_lock = asyncio.Lock() - self._eql_to_compiled = lru.LRUMapping( - maxsize=defines._MAX_QUERIES_CACHE) + self._query_cache = lru.ManualLRUMapping( + maxsize=defines._MAX_QUERIES_CACHE_DB) self._sql_to_compiled = lru.LRUMapping( maxsize=defines._MAX_QUERIES_CACHE) @@ -181,7 +182,6 @@ cdef class Database: self.backend_ids.update(new_types) cdef _invalidate_caches(self): - self._eql_to_compiled.clear() self._sql_to_compiled.clear() self._index.invalidate_caches() @@ -190,12 +190,18 @@ cdef class Database: ): assert compiled.cacheable - existing, existing_ver = self._eql_to_compiled.get(key, DICTDEFAULT) + existing, existing_ver = self._query_cache.get(key, DICTDEFAULT) if existing is not None and existing_ver == self.schema_version: # We already have a cached query for a more recent DB version. return - self._eql_to_compiled[key] = compiled, schema_version + self._query_cache[key] = compiled, schema_version + keys = [k for k, _ in self._query_cache.gc()] + if keys: + self.tenant.create_task( + self.tenant.evict_query_cache(self.name, keys), + interruptable=False, + ) def cache_compiled_sql(self, key, compiled: list[str]): existing, ver = self._sql_to_compiled.get(key, DICTDEFAULT) @@ -237,21 +243,26 @@ cdef class Database: return old_serializer def hydrate_cache(self, query_cache): + new = set() for cache_key, schema_version, out_data in query_cache: + cache_key = uuidgen.from_bytes(cache_key) schema_version = uuidgen.from_bytes(schema_version) - _, cached_ver = self._sql_to_compiled.get(cache_key, DICTDEFAULT) + new.add(cache_key) + + _, cached_ver = self._query_cache.get(cache_key, DICTDEFAULT) if cached_ver != schema_version: - self._cache_compiled_query( - uuidgen.from_bytes(cache_key), - pickle.loads(out_data), - schema_version, + self._query_cache[cache_key] = ( + pickle.loads(out_data), schema_version ) + for cache_key in list(self._query_cache.keys()): + if cache_key not in new: + self._query_cache.pop(cache_key) def iter_views(self): yield from self._views def get_query_cache_size(self): - return len(self._eql_to_compiled) + len(self._sql_to_compiled) + return len(self._query_cache) + len(self._sql_to_compiled) async def introspection(self): if self.user_schema_pickle is None: @@ -705,7 +716,7 @@ cdef class DatabaseConnectionView: self._in_tx_with_ddl): return None - query_unit_group, qu_ver = self._db._eql_to_compiled.get( + query_unit_group, qu_ver = self._db._query_cache.get( key, DICTDEFAULT) if query_unit_group is not None and qu_ver != self._db.schema_version: query_unit_group = None diff --git a/edb/server/defines.py b/edb/server/defines.py index b1b232fbf63..00461c9666d 100644 --- a/edb/server/defines.py +++ b/edb/server/defines.py @@ -53,6 +53,7 @@ BACKEND_COMPILER_TEMPLATE_PROC_RESTART_INTERVAL = 1 _MAX_QUERIES_CACHE = 1000 +_MAX_QUERIES_CACHE_DB = 10_000 _QUERY_ROLLING_AVG_LEN = 10 _QUERIES_ROLLING_AVG_LEN = 300 diff --git a/edb/server/protocol/execute.pyx b/edb/server/protocol/execute.pyx index a14e5471021..ab4ec9a2877 100644 --- a/edb/server/protocol/execute.pyx +++ b/edb/server/protocol/execute.pyx @@ -68,15 +68,16 @@ async def persist_cache( await be_conn.sql_execute((evict, persist)) await be_conn.sql_fetch( b'INSERT INTO "edgedb"."_query_cache" ' - b'(key, schema_version, input, output) ' - b'VALUES ($1, $2, $3, $4) ' - b'ON CONFLICT (key, schema_version) DO UPDATE SET ' - b'input=$3, output=$4', + b'("key", "schema_version", "input", "output", "evict") ' + b'VALUES ($1, $2, $3, $4, $5) ' + b'ON CONFLICT (key) DO UPDATE SET ' + b'"schema_version"=$2, "input"=$3, "output"=$4, "evict"=$5', args=( compiled.request.get_cache_key().bytes, dbv.schema_version.bytes, compiled.request.serialize(), compiled.serialized, + evict, ), use_prep_stmt=True, ) diff --git a/edb/server/tenant.py b/edb/server/tenant.py index 8e711b3c70d..18d3429c0b1 100644 --- a/edb/server/tenant.py +++ b/edb/server/tenant.py @@ -29,6 +29,7 @@ import struct import sys import time +import uuid import immutables @@ -1402,6 +1403,33 @@ async def _load_query_cache( use_prep_stmt=True, ) + async def evict_query_cache( + self, dbname: str, + keys: Iterable[uuid.UUID], + ) -> None: + try: + conn = await self._acquire_intro_pgcon(dbname) + if not conn: + return + + try: + for key in keys: + await conn.sql_fetch( + b'SELECT "edgedb"."_evict_query_cache"($1)', + args=(key.bytes,), + use_prep_stmt=True, + ) + finally: + self.release_pgcon(dbname, conn) + + await self.signal_sysevent("query-cache-changes", dbname=dbname) + + except Exception: + logger.exception("error in evict_query_cache():") + metrics.background_errors.inc( + 1.0, self._instance_name, "evict_query_cache" + ) + def on_remote_query_cache_change(self, dbname: str) -> None: if not self._accept_new_tasks: return