Skip to content

Commit

Permalink
Cache by schema version (#6847)
Browse files Browse the repository at this point in the history
* Update dbview with user schema version
* Use schema_version for compile cache
* pg_ext: cache with the matching schema version
  • Loading branch information
fantix authored Feb 15, 2024
1 parent 7812f2e commit d258eb2
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 21 deletions.
16 changes: 16 additions & 0 deletions edb/server/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from edb.schema import roles as s_role
from edb.schema import schema as s_schema
from edb.schema import types as s_types
from edb.schema import version as s_ver

from edb.pgsql import ast as pgast
from edb.pgsql import compiler as pg_compiler
Expand Down Expand Up @@ -1002,6 +1003,7 @@ def parse_user_schema_db_config(
)
return dbstate.ParsedDatabase(
user_schema_pickle=pickle.dumps(user_schema, -1),
schema_version=_get_schema_version(user_schema),
database_config=db_config,
ext_config_settings=ext_config_settings,
protocol_version=defines.CURRENT_PROTOCOL,
Expand Down Expand Up @@ -1498,6 +1500,11 @@ def _compile_schema_storage_stmt(
ctx.state.current_tx().update_schema(schema)


def _get_schema_version(user_schema: s_schema.Schema) -> uuid.UUID:
ver = user_schema.get_global(s_ver.SchemaVersion, "__schema_version__")
return ver.get_version(user_schema)


def _compile_ql_script(
ctx: CompileContext,
eql: str,
Expand Down Expand Up @@ -2459,6 +2466,9 @@ def _try_compile(
if comp.user_schema is not None:
final_user_schema = comp.user_schema
unit.user_schema = pickle.dumps(comp.user_schema, -1)
unit.user_schema_version = (
_get_schema_version(comp.user_schema)
)
unit.extensions, unit.ext_config_settings = (
_extract_extensions(ctx, comp.user_schema)
)
Expand All @@ -2484,6 +2494,9 @@ def _try_compile(
if comp.user_schema is not None:
final_user_schema = comp.user_schema
unit.user_schema = pickle.dumps(comp.user_schema, -1)
unit.user_schema_version = (
_get_schema_version(comp.user_schema)
)
unit.extensions, unit.ext_config_settings = (
_extract_extensions(ctx, comp.user_schema)
)
Expand Down Expand Up @@ -2522,6 +2535,9 @@ def _try_compile(
if comp.user_schema is not None:
final_user_schema = comp.user_schema
unit.user_schema = pickle.dumps(comp.user_schema, -1)
unit.user_schema_version = (
_get_schema_version(comp.user_schema)
)
unit.extensions, unit.ext_config_settings = (
_extract_extensions(ctx, comp.user_schema)
)
Expand Down
2 changes: 2 additions & 0 deletions edb/server/compiler/dbstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ class QueryUnit:
# If present, represents the future schema state after
# the command is run. The schema is pickled.
user_schema: Optional[bytes] = None
user_schema_version: Optional[uuid.UUID] = None
cached_reflection: Optional[bytes] = None
extensions: Optional[set[str]] = None
ext_config_settings: Optional[list[config.Setting]] = None
Expand Down Expand Up @@ -475,6 +476,7 @@ class SQLQueryUnit:
@dataclasses.dataclass
class ParsedDatabase:
user_schema_pickle: bytes
schema_version: uuid.UUID
database_config: immutables.Map[str, config.SettingValue]
ext_config_settings: list[config.Setting]

Expand Down
7 changes: 5 additions & 2 deletions edb/server/dbview/dbview.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ cdef class Database:
readonly object user_config_spec

readonly str name
readonly object schema_version
readonly object dbver
readonly object db_config
readonly bytes user_schema_pickle
Expand All @@ -94,13 +95,14 @@ cdef class Database:
cdef schedule_config_update(self)

cdef _invalidate_caches(self)
cdef _cache_compiled_query(self, key, compiled, int dbver)
cdef _cache_compiled_query(self, key, compiled, schema_version)
cdef _new_view(self, query_cache, protocol_version)
cdef _remove_view(self, view)
cdef _update_backend_ids(self, new_types)
cdef _set_and_signal_new_user_schema(
self,
new_schema_pickle,
schema_version,
extensions,
ext_config_settings,
reflection_cache=?,
Expand Down Expand Up @@ -142,6 +144,7 @@ cdef class DatabaseConnectionView:
object _in_tx_db_config
object _in_tx_savepoints
object _in_tx_user_schema_pickle
object _in_tx_user_schema_version
object _in_tx_user_config_spec
object _in_tx_global_schema_pickle
object _in_tx_new_types
Expand Down Expand Up @@ -172,7 +175,7 @@ cdef class DatabaseConnectionView:
cpdef in_tx_error(self)

cdef cache_compiled_query(
self, object key, object query_unit_group, int dbver
self, object key, object query_unit_group, schema_version
)
cdef lookup_compiled_query(self, object key)

Expand Down
75 changes: 57 additions & 18 deletions edb/server/dbview/dbview.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ cdef class Database:
str name,
*,
bytes user_schema_pickle,
object schema_version,
object db_config,
object reflection_cache,
object backend_ids,
Expand All @@ -164,6 +165,7 @@ cdef class Database:
):
self.name = name

self.schema_version = schema_version
self.dbver = next_dbver()

self._index = index
Expand Down Expand Up @@ -199,6 +201,7 @@ cdef class Database:
cdef _set_and_signal_new_user_schema(
self,
new_schema_pickle,
schema_version,
extensions,
ext_config_settings,
reflection_cache=None,
Expand All @@ -208,6 +211,7 @@ cdef class Database:
if new_schema_pickle is None:
raise AssertionError('new_schema is not supposed to be None')

self.schema_version = schema_version
self.dbver = next_dbver()

self.user_schema_pickle = new_schema_pickle
Expand All @@ -231,29 +235,31 @@ cdef class Database:
self._index.invalidate_caches()

cdef _cache_compiled_query(
self, key, compiled: dbstate.QueryUnitGroup, int dbver
self, key, compiled: dbstate.QueryUnitGroup, schema_version
):
# `dbver` must be the schema version `compiled` was compiled upon
assert compiled.cacheable

existing, existing_dbver = self._eql_to_compiled.get(key, DICTDEFAULT)
if existing is not None and existing_dbver == self.dbver:
existing, existing_ver = self._eql_to_compiled.get(key, DICTDEFAULT)
if existing is not None and existing_ver == self.schema_version:
# We already have a cached query for a more recent DB version.
return

self._eql_to_compiled[key] = compiled, dbver
# Store the matching schema version, see also the comments at origin
self._eql_to_compiled[key] = compiled, schema_version

def cache_compiled_sql(self, key, compiled: list[str]):
existing, dbver = self._sql_to_compiled.get(key, DICTDEFAULT)
if existing is not None and dbver == self.dbver:
def cache_compiled_sql(self, key, compiled: list[str], schema_version):
existing, ver = self._sql_to_compiled.get(key, DICTDEFAULT)
if existing is not None and ver == self.schema_version:
# We already have a cached query for a more recent DB version.
return

self._sql_to_compiled[key] = compiled, self.dbver
# Store the matching schema version, see also the comments at origin
self._sql_to_compiled[key] = compiled, schema_version

def lookup_compiled_sql(self, key):
rv, cached_dbver = self._sql_to_compiled.get(key, DICTDEFAULT)
if rv is not None and cached_dbver != self.dbver:
rv, cached_ver = self._sql_to_compiled.get(key, DICTDEFAULT)
if rv is not None and cached_ver != self.schema_version:
rv = None
return rv

Expand Down Expand Up @@ -341,6 +347,7 @@ cdef class DatabaseConnectionView:
self._in_tx_with_dbconfig = False
self._in_tx_with_set = False
self._in_tx_user_schema_pickle = None
self._in_tx_user_schema_version = None
self._in_tx_global_schema_pickle = None
self._in_tx_new_types = {}
self._in_tx_user_config_spec = None
Expand Down Expand Up @@ -703,6 +710,12 @@ cdef class DatabaseConnectionView:
return self._in_tx_dbver
return self._db.dbver

property schema_version:
def __get__(self):
if self._in_tx and self._in_tx_user_schema_version:
return self._in_tx_user_schema_version
return self._db.schema_version

@property
def server(self):
return self._db._index._server
Expand All @@ -718,24 +731,36 @@ cdef class DatabaseConnectionView:
return self._tx_error

cdef cache_compiled_query(
self, object key, object query_unit_group, int dbver
self, object key, object query_unit_group, schema_version
):
assert query_unit_group.cacheable

if not self._in_tx_with_ddl:
key = (key, self.get_modaliases(), self.get_session_config())
self._db._cache_compiled_query(key, query_unit_group, dbver)
key = (
key,
self.get_modaliases(),
self.get_session_config(),
self.get_compilation_system_config(),
)
self._db._cache_compiled_query(
key, query_unit_group, schema_version
)

cdef lookup_compiled_query(self, object key):
if (self._tx_error or
not self._query_cache_enabled or
self._in_tx_with_ddl):
return None

key = (key, self.get_modaliases(), self.get_session_config())
query_unit_group, qu_dbver = self._db._eql_to_compiled.get(
key = (
key,
self.get_modaliases(),
self.get_session_config(),
self.get_compilation_system_config(),
)
query_unit_group, qu_ver = self._db._eql_to_compiled.get(
key, DICTDEFAULT)
if query_unit_group is not None and qu_dbver != self._db.dbver:
if query_unit_group is not None and qu_ver != self._db.schema_version:
query_unit_group = None

return query_unit_group
Expand Down Expand Up @@ -763,6 +788,7 @@ cdef class DatabaseConnectionView:
self._in_tx_db_config = self._db.db_config
self._in_tx_modaliases = self._modaliases
self._in_tx_user_schema_pickle = self._db.user_schema_pickle
self._in_tx_user_schema_version = self._db.schema_version
self._in_tx_global_schema_pickle = \
self._db._index._global_schema_pickle
self._in_tx_user_config_spec = self._db.user_config_spec
Expand All @@ -781,6 +807,7 @@ cdef class DatabaseConnectionView:
if query_unit.user_schema is not None:
self._in_tx_dbver = next_dbver()
self._in_tx_user_schema_pickle = query_unit.user_schema
self._in_tx_user_schema_version = query_unit.user_schema_version
self._in_tx_user_config_spec = config.FlatSpec(
*query_unit.ext_config_settings
)
Expand Down Expand Up @@ -808,6 +835,7 @@ cdef class DatabaseConnectionView:
if query_unit.user_schema is not None:
self._db._set_and_signal_new_user_schema(
query_unit.user_schema,
query_unit.user_schema_version,
query_unit.extensions,
query_unit.ext_config_settings,
pickle.loads(query_unit.cached_reflection)
Expand Down Expand Up @@ -850,6 +878,7 @@ cdef class DatabaseConnectionView:
if query_unit.user_schema is not None:
self._db._set_and_signal_new_user_schema(
query_unit.user_schema,
query_unit.user_schema_version,
query_unit.extensions,
query_unit.ext_config_settings,
pickle.loads(query_unit.cached_reflection)
Expand Down Expand Up @@ -902,6 +931,7 @@ cdef class DatabaseConnectionView:
if user_schema is not None:
self._db._set_and_signal_new_user_schema(
user_schema,
self._in_tx_user_schema_version,
extensions,
ext_config_settings,
pickle.loads(cached_reflection)
Expand Down Expand Up @@ -978,7 +1008,11 @@ cdef class DatabaseConnectionView:
if query_unit_group is None:
# Cache miss; need to compile this query.
cached = False
dbver = self.dbver
# Remember the schema version we are compiling on, so that we can
# cache the result with the matching version. In case of concurrent
# schema update, we're only storing an outdated cache entry, and
# the next identical query could get recompiled on the new schema.
schema_version = self.schema_version

try:
query_unit_group = await self._compile(query_req)
Expand Down Expand Up @@ -1020,7 +1054,9 @@ cdef class DatabaseConnectionView:
if cached_globally:
self.server.system_compile_cache[query_req] = query_unit_group
else:
self.cache_compiled_query(query_req, query_unit_group, dbver)
self.cache_compiled_query(
query_req, query_unit_group, schema_version
)

if use_metrics:
metrics.edgeql_query_compilations.inc(
Expand Down Expand Up @@ -1214,6 +1250,7 @@ cdef class DatabaseIndex:
dbname,
*,
user_schema_pickle,
schema_version,
db_config,
reflection_cache,
backend_ids,
Expand All @@ -1225,6 +1262,7 @@ cdef class DatabaseIndex:
if db is not None:
db._set_and_signal_new_user_schema(
user_schema_pickle,
schema_version,
extensions,
ext_config_settings,
reflection_cache,
Expand All @@ -1236,6 +1274,7 @@ cdef class DatabaseIndex:
self,
dbname,
user_schema_pickle=user_schema_pickle,
schema_version=schema_version,
db_config=db_config,
reflection_cache=reflection_cache,
backend_ids=backend_ids,
Expand Down
7 changes: 6 additions & 1 deletion edb/server/protocol/pg_ext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,11 @@ cdef class PgConnection(frontend.FrontendConnection):
result = self.database.lookup_compiled_sql(key)
if result is not None:
return result
# Remember the schema version we are compiling on, so that we can
# cache the result with the matching version. In case of concurrent
# schema update, we're only storing an outdated cache entry, and
# the next identical query could get recompiled on the new schema.
schema_version = self.database.schema_version
compiler_pool = self.server.get_compiler_pool()
result = await compiler_pool.compile_sql(
self.dbname,
Expand All @@ -1442,7 +1447,7 @@ cdef class PgConnection(frontend.FrontendConnection):
self.username,
client_id=self.tenant.client_id,
)
self.database.cache_compiled_sql(key, result)
self.database.cache_compiled_sql(key, result, schema_version)
if self.debug:
self.debug_print("Compile result", result)
return result
Expand Down
2 changes: 2 additions & 0 deletions edb/server/tenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ async def introspect_db(self, dbname: str) -> None:
db = self._dbindex.register_db(
dbname,
user_schema_pickle=parsed_db.user_schema_pickle,
schema_version=parsed_db.schema_version,
db_config=parsed_db.database_config,
reflection_cache=reflection_cache,
backend_ids=backend_ids,
Expand Down Expand Up @@ -921,6 +922,7 @@ async def _early_introspect_db(self, dbname: str) -> None:
self._dbindex.register_db(
dbname,
user_schema_pickle=None,
schema_version=None,
db_config=None,
reflection_cache=None,
backend_ids=None,
Expand Down

0 comments on commit d258eb2

Please sign in to comment.