Skip to content

Commit

Permalink
Evict query cache
Browse files Browse the repository at this point in the history
  • Loading branch information
fantix committed Jan 31, 2024
1 parent f63df4d commit 064d99f
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 23 deletions.
19 changes: 16 additions & 3 deletions edb/common/lru.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import collections.abc


class LRUMapping(collections.abc.MutableMapping):
class ManualLRUMapping(collections.abc.MutableMapping):

# We use an OrderedDict for LRU implementation. Operations:
#
Expand Down Expand Up @@ -61,8 +61,10 @@ def __setitem__(self, key, o):
self._dict.move_to_end(key, last=True)
else:
self._dict[key] = o
if len(self._dict) > self._maxsize:
self._dict.popitem(last=False)

def gc(self):
while len(self._dict) > self._maxsize:
yield self._dict.popitem(last=False)

def __delitem__(self, key):
del self._dict[key]
Expand All @@ -75,3 +77,14 @@ def __len__(self):

def __iter__(self):
return iter(self._dict)


class LRUMapping(ManualLRUMapping):
def __setitem__(self, key, o):
if key in self._dict:
self._dict[key] = o
self._dict.move_to_end(key, last=True)
else:
self._dict[key] = o
if len(self._dict) > self._maxsize:
self._dict.popitem(last=False)
30 changes: 29 additions & 1 deletion edb/pgsql/metaschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,43 @@ def __init__(self) -> None:
dbops.Column(name='schema_version', type='uuid', required=True),
dbops.Column(name='input', type='bytea', required=True),
dbops.Column(name='output', type='bytea', required=True),
dbops.Column(name='evict', type='text', required=True),
])

self.add_constraint(
dbops.PrimaryKey(
table_name=('edgedb', '_query_cache'),
columns=['key', 'schema_version'],
columns=['key'],
),
)


class EvictQueryCacheFunction(dbops.Function):

text = f'''
DECLARE
evict_sql text;
BEGIN
DELETE FROM "edgedb"."_query_cache"
WHERE "key" = cache_key
RETURNING "evict" INTO evict_sql;
IF evict_sql IS NOT NULL THEN
EXECUTE evict_sql;
END IF;
END;
'''

def __init__(self) -> None:
super().__init__(
name=('edgedb', '_evict_query_cache'),
args=[("cache_key", ("uuid",))],
returns=("void",),
language='plpgsql',
volatility='volatile',
text=self.text,
)


class BigintDomain(dbops.Domain):
"""Bigint: a variant of numeric that enforces zero digits after the dot.
Expand Down Expand Up @@ -4409,6 +4436,7 @@ async def bootstrap(
dbops.CreateTable(DMLDummyTable()),
dbops.CreateTable(QueryCacheTable()),
dbops.Query(DMLDummyTable.SETUP_QUERY),
dbops.CreateFunction(EvictQueryCacheFunction()),
dbops.CreateFunction(UuidGenerateV1mcFunction('edgedbext')),
dbops.CreateFunction(UuidGenerateV4Function('edgedbext')),
dbops.CreateFunction(UuidGenerateV5Function('edgedbext')),
Expand Down
2 changes: 1 addition & 1 deletion edb/server/dbview/dbview.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ cdef class DatabaseIndex:
cdef class Database:

cdef:
object _eql_to_compiled
object _query_cache
object _sql_to_compiled
DatabaseIndex _index
object _views
Expand Down
39 changes: 25 additions & 14 deletions edb/server/dbview/dbview.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import pickle
import struct
import time
import typing
import uuid
import weakref

import immutables
Expand Down Expand Up @@ -100,8 +101,8 @@ cdef class CompiledQuery:

cdef class Database:

# Global LRU cache of compiled anonymous queries
_eql_to_compiled: typing.Mapping[str, dbstate.QueryUnitGroup]
# Global LRU cache of compiled queries
_query_cache: lru.ManualLRUMapping[uuid.UUID, dbstate.QueryUnitGroup]

def __init__(
self,
Expand All @@ -126,8 +127,8 @@ cdef class Database:

self._introspection_lock = asyncio.Lock()

self._eql_to_compiled = lru.LRUMapping(
maxsize=defines._MAX_QUERIES_CACHE)
self._query_cache = lru.ManualLRUMapping(
maxsize=defines._MAX_QUERIES_CACHE_DB)
self._sql_to_compiled = lru.LRUMapping(
maxsize=defines._MAX_QUERIES_CACHE)

Expand Down Expand Up @@ -181,7 +182,6 @@ cdef class Database:
self.backend_ids.update(new_types)

cdef _invalidate_caches(self):
self._eql_to_compiled.clear()
self._sql_to_compiled.clear()
self._index.invalidate_caches()

Expand All @@ -190,12 +190,18 @@ cdef class Database:
):
assert compiled.cacheable

existing, existing_ver = self._eql_to_compiled.get(key, DICTDEFAULT)
existing, existing_ver = self._query_cache.get(key, DICTDEFAULT)
if existing is not None and existing_ver == self.schema_version:
# We already have a cached query for a more recent DB version.
return

self._eql_to_compiled[key] = compiled, schema_version
self._query_cache[key] = compiled, schema_version
keys = [k for k, _ in self._query_cache.gc()]
if keys:
self.tenant.create_task(
self.tenant.evict_query_cache(self.name, keys),
interruptable=False,
)

def cache_compiled_sql(self, key, compiled: list[str]):
existing, ver = self._sql_to_compiled.get(key, DICTDEFAULT)
Expand Down Expand Up @@ -237,21 +243,26 @@ cdef class Database:
return old_serializer

def hydrate_cache(self, query_cache):
new = set()
for cache_key, schema_version, out_data in query_cache:
cache_key = uuidgen.from_bytes(cache_key)
schema_version = uuidgen.from_bytes(schema_version)
_, cached_ver = self._sql_to_compiled.get(cache_key, DICTDEFAULT)
new.add(cache_key)

_, cached_ver = self._query_cache.get(cache_key, DICTDEFAULT)
if cached_ver != schema_version:
self._cache_compiled_query(
uuidgen.from_bytes(cache_key),
pickle.loads(out_data),
schema_version,
self._query_cache[cache_key] = (
pickle.loads(out_data), schema_version
)
for cache_key in list(self._query_cache.keys()):
if cache_key not in new:
self._query_cache.pop(cache_key)

def iter_views(self):
yield from self._views

def get_query_cache_size(self):
return len(self._eql_to_compiled) + len(self._sql_to_compiled)
return len(self._query_cache) + len(self._sql_to_compiled)

async def introspection(self):
if self.user_schema_pickle is None:
Expand Down Expand Up @@ -705,7 +716,7 @@ cdef class DatabaseConnectionView:
self._in_tx_with_ddl):
return None

query_unit_group, qu_ver = self._db._eql_to_compiled.get(
query_unit_group, qu_ver = self._db._query_cache.get(
key, DICTDEFAULT)
if query_unit_group is not None and qu_ver != self._db.schema_version:
query_unit_group = None
Expand Down
1 change: 1 addition & 0 deletions edb/server/defines.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
BACKEND_COMPILER_TEMPLATE_PROC_RESTART_INTERVAL = 1

_MAX_QUERIES_CACHE = 1000
_MAX_QUERIES_CACHE_DB = 10_000

_QUERY_ROLLING_AVG_LEN = 10
_QUERIES_ROLLING_AVG_LEN = 300
Expand Down
9 changes: 5 additions & 4 deletions edb/server/protocol/execute.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,16 @@ async def persist_cache(
await be_conn.sql_execute((evict, persist))
await be_conn.sql_fetch(
b'INSERT INTO "edgedb"."_query_cache" '
b'(key, schema_version, input, output) '
b'VALUES ($1, $2, $3, $4) '
b'ON CONFLICT (key, schema_version) DO UPDATE SET '
b'input=$3, output=$4',
b'("key", "schema_version", "input", "output", "evict") '
b'VALUES ($1, $2, $3, $4, $5) '
b'ON CONFLICT (key) DO UPDATE SET '
b'"schema_version"=$2, "input"=$3, "output"=$4, "evict"=$5',
args=(
compiled.request.get_cache_key().bytes,
dbv.schema_version.bytes,
compiled.request.serialize(),
compiled.serialized,
evict,
),
use_prep_stmt=True,
)
Expand Down
28 changes: 28 additions & 0 deletions edb/server/tenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import struct
import sys
import time
import uuid

import immutables

Expand Down Expand Up @@ -1402,6 +1403,33 @@ async def _load_query_cache(
use_prep_stmt=True,
)

async def evict_query_cache(
self, dbname: str,
keys: Iterable[uuid.UUID],
) -> None:
try:
conn = await self._acquire_intro_pgcon(dbname)
if not conn:
return

try:
for key in keys:
await conn.sql_fetch(
b'SELECT "edgedb"."_evict_query_cache"($1)',
args=(key.bytes,),
use_prep_stmt=True,
)
finally:
self.release_pgcon(dbname, conn)

await self.signal_sysevent("query-cache-changes", dbname=dbname)

except Exception:
logger.exception("error in evict_query_cache():")
metrics.background_errors.inc(
1.0, self._instance_name, "evict_query_cache"
)

def on_remote_query_cache_change(self, dbname: str) -> None:
if not self._accept_new_tasks:
return
Expand Down

0 comments on commit 064d99f

Please sign in to comment.