diff --git a/edb/graphql/extension.pyx b/edb/graphql/extension.pyx index e19b1eb4a9c..f33963c8891 100644 --- a/edb/graphql/extension.pyx +++ b/edb/graphql/extension.pyx @@ -251,7 +251,7 @@ async def compile( async def _execute(db, tenant, query, operation_name, variables, globals): - dbver = db.dbver + ver = db.schema_version query_cache = tenant.server._http_query_cache if variables: @@ -299,7 +299,7 @@ async def _execute(db, tenant, query, operation_name, variables, globals): print(f'key_vars: {key_var_names}') print(f'variables: {vars}') - cache_key = ('graphql', prepared_query, key_vars, operation_name, dbver) + cache_key = ('graphql', prepared_query, key_vars, operation_name, ver) use_prep_stmt = False entry: CacheEntry = None @@ -308,7 +308,7 @@ async def _execute(db, tenant, query, operation_name, variables, globals): if isinstance(entry, CacheRedirect): key_vars2 = tuple(vars[k] for k in entry.key_vars) - cache_key2 = (prepared_query, key_vars2, operation_name, dbver) + cache_key2 = (prepared_query, key_vars2, operation_name, ver) entry = query_cache.get(cache_key2, None) await db.introspection() @@ -343,7 +343,7 @@ async def _execute(db, tenant, query, operation_name, variables, globals): query_cache[cache_key] = redir key_vars2 = tuple(vars[k] for k in key_var_names) cache_key2 = ( - 'graphql', prepared_query, key_vars2, operation_name, dbver + 'graphql', prepared_query, key_vars2, operation_name, ver ) query_cache[cache_key2] = qug, gql_op else: diff --git a/edb/server/dbview/dbview.pxd b/edb/server/dbview/dbview.pxd index 16004cb3e67..9027459a10b 100644 --- a/edb/server/dbview/dbview.pxd +++ b/edb/server/dbview/dbview.pxd @@ -85,7 +85,6 @@ cdef class Database: readonly str name readonly object schema_version - readonly object dbver readonly object db_config readonly bytes user_schema_pickle readonly object reflection_cache @@ -95,7 +94,7 @@ cdef class Database: cdef schedule_config_update(self) cdef _invalidate_caches(self) - cdef _cache_compiled_query(self, key, compiled, int dbver) + cdef _cache_compiled_query(self, key, compiled, schema_version) cdef _new_view(self, query_cache, protocol_version) cdef _remove_view(self, view) cdef _update_backend_ids(self, new_types) @@ -121,7 +120,7 @@ cdef class DatabaseConnectionView: object _protocol_version object _db_config_temp - object _db_config_dbver + object _db_config_schema_version # State properties object _config @@ -147,7 +146,6 @@ cdef class DatabaseConnectionView: object _in_tx_user_config_spec object _in_tx_global_schema_pickle object _in_tx_new_types - int _in_tx_dbver bint _in_tx bint _in_tx_with_ddl bint _in_tx_with_sysconfig @@ -174,7 +172,7 @@ cdef class DatabaseConnectionView: cpdef in_tx_error(self) cdef cache_compiled_query( - self, object key, object query_unit_group, int dbver + self, object key, object query_unit_group, schema_version ) cdef lookup_compiled_query(self, object key) diff --git a/edb/server/dbview/dbview.pyx b/edb/server/dbview/dbview.pyx index f29825d244b..2cc5427d075 100644 --- a/edb/server/dbview/dbview.pyx +++ b/edb/server/dbview/dbview.pyx @@ -62,16 +62,9 @@ cdef DEFAULT_STATE = json.dumps([]).encode('utf-8') cdef INT32_PACKER = struct.Struct('!l').pack -cdef int VER_COUNTER = 0 cdef DICTDEFAULT = (None, None) -cdef next_dbver(): - global VER_COUNTER - VER_COUNTER += 1 - return VER_COUNTER - - @cython.final cdef class QueryRequestInfo: @@ -166,7 +159,6 @@ cdef class Database: self.name = name self.schema_version = schema_version - self.dbver = next_dbver() self._index = index self._views = weakref.WeakSet() @@ -212,7 +204,6 @@ cdef class Database: raise AssertionError('new_schema is not supposed to be None') self.schema_version = schema_version - self.dbver = next_dbver() self.user_schema_pickle = new_schema_pickle self.extensions = extensions @@ -235,28 +226,28 @@ cdef class Database: self._index.invalidate_caches() cdef _cache_compiled_query( - self, key, compiled: dbstate.QueryUnitGroup, int dbver + self, key, compiled: dbstate.QueryUnitGroup, schema_version ): assert compiled.cacheable - existing, existing_dbver = self._eql_to_compiled.get(key, DICTDEFAULT) - if existing is not None and existing_dbver == self.dbver: + existing, existing_ver = self._eql_to_compiled.get(key, DICTDEFAULT) + if existing is not None and existing_ver == self.schema_version: # We already have a cached query for a more recent DB version. return - self._eql_to_compiled[key] = compiled, dbver + self._eql_to_compiled[key] = compiled, schema_version def cache_compiled_sql(self, key, compiled: list[str]): - existing, dbver = self._sql_to_compiled.get(key, DICTDEFAULT) - if existing is not None and dbver == self.dbver: + existing, ver = self._sql_to_compiled.get(key, DICTDEFAULT) + if existing is not None and ver == self.schema_version: # We already have a cached query for a more recent DB version. return - self._sql_to_compiled[key] = compiled, self.dbver + self._sql_to_compiled[key] = compiled, self.schema_version def lookup_compiled_sql(self, key): - rv, cached_dbver = self._sql_to_compiled.get(key, DICTDEFAULT) - if rv is not None and cached_dbver != self.dbver: + rv, cached_ver = self._sql_to_compiled.get(key, DICTDEFAULT) + if rv is not None and cached_ver != self.schema_version: rv = None return rv @@ -324,7 +315,7 @@ cdef class DatabaseConnectionView: self._capability_mask = compiler.Capability.ALL self._db_config_temp = None - self._db_config_dbver = None + self._db_config_schema_version = None self._last_comp_state = None self._last_comp_state_id = 0 @@ -349,7 +340,6 @@ cdef class DatabaseConnectionView: self._in_tx_user_config_spec = None self._in_tx_state_serializer = None self._tx_error = False - self._in_tx_dbver = 0 cdef clear_tx_error(self): self._tx_error = False @@ -461,12 +451,12 @@ cdef class DatabaseConnectionView: if self._db_config_temp is not None: # See `set_database_config()` for an explanation on # *why* we do this. - if self._db_config_dbver is self._db.dbver: - assert self._db_config_dbver is not None + if self._db_config_schema_version == self._db.schema_version: + assert self._db_config_schema_version is not None return self._db_config_temp else: self._db_config_temp = None - self._db_config_dbver = None + self._db_config_schema_version = None return self._db.db_config @@ -492,7 +482,7 @@ cdef class DatabaseConnectionView: # critical and resolve in time.) # Check out `get_database_config()` to see how this is used. self._db_config_temp = new_conf - self._db_config_dbver = self._db.dbver + self._db_config_schema_version = self._db.schema_version cdef get_system_config(self): return self._db._index.get_sys_config() @@ -545,9 +535,9 @@ cdef class DatabaseConnectionView: raise errors.InternalServerError( 'no need to serialize state while in transaction') - dbver = self._db.dbver + ver = self._db.schema_version if self._session_state_db_cache is not None: - if self._session_state_db_cache[0] == (self._config, dbver): + if self._session_state_db_cache[0] == (self._config, ver): return self._session_state_db_cache[1] state = [] @@ -560,12 +550,12 @@ cdef class DatabaseConnectionView: state.append({"name": sval.name, "value": jval, "type": kind}) # Include the database version in the state so that we are forced - # to clear the config cache on dbver changes. + # to clear the config cache on ver changes. state.append( - {"name": '__dbver__', "value": dbver, "type": 'C'}) + {"name": '__ver__', "value": str(ver), "type": 'C'}) spec = json.dumps(state).encode('utf-8') - self._session_state_db_cache = ((self._config, dbver), spec) + self._session_state_db_cache = ((self._config, ver), spec) return spec cdef bint is_state_desc_changed(self): @@ -700,11 +690,11 @@ cdef class DatabaseConnectionView: def __get__(self): return self._db.reflection_cache - property dbver: + property schema_version: def __get__(self): - if self._in_tx and self._in_tx_dbver: - return self._in_tx_dbver - return self._db.dbver + if self._in_tx and self._in_tx_user_schema_pv: + return self._in_tx_user_schema_pv[1] + return self._db.schema_version @property def server(self): @@ -721,13 +711,20 @@ cdef class DatabaseConnectionView: return self._tx_error cdef cache_compiled_query( - self, object key, object query_unit_group, int dbver + self, object key, object query_unit_group, schema_version ): assert query_unit_group.cacheable if not self._in_tx_with_ddl: - key = (key, self.get_modaliases(), self.get_session_config()) - self._db._cache_compiled_query(key, query_unit_group, dbver) + key = ( + key, + self.get_modaliases(), + self.get_session_config(), + self.get_compilation_system_config(), + ) + self._db._cache_compiled_query( + key, query_unit_group, schema_version + ) cdef lookup_compiled_query(self, object key): if (self._tx_error or @@ -735,10 +732,15 @@ cdef class DatabaseConnectionView: self._in_tx_with_ddl): return None - key = (key, self.get_modaliases(), self.get_session_config()) - query_unit_group, qu_dbver = self._db._eql_to_compiled.get( + key = ( + key, + self.get_modaliases(), + self.get_session_config(), + self.get_compilation_system_config(), + ) + query_unit_group, qu_ver = self._db._eql_to_compiled.get( key, DICTDEFAULT) - if query_unit_group is not None and qu_dbver != self._db.dbver: + if query_unit_group is not None and qu_ver != self._db.schema_version: query_unit_group = None return query_unit_group @@ -812,7 +814,6 @@ cdef class DatabaseConnectionView: if new_types: self._db._update_backend_ids(new_types) if query_unit.user_schema_pv is not None: - self._in_tx_dbver = next_dbver() self._db._set_and_signal_new_user_schema( query_unit.user_schema_pv[0], query_unit.user_schema_pv[1], @@ -870,7 +871,7 @@ cdef class DatabaseConnectionView: side_effects |= SideEffects.InstanceConfigChanges if self._in_tx_with_dbconfig: self._db_config_temp = self._in_tx_db_config - self._db_config_dbver = self._db.dbver + self._db_config_schema_version = self._db.schema_version self.update_database_config() side_effects |= SideEffects.DatabaseConfigChanges @@ -923,7 +924,7 @@ cdef class DatabaseConnectionView: side_effects |= SideEffects.InstanceConfigChanges if self._in_tx_with_dbconfig: self._db_config_temp = self._in_tx_db_config - self._db_config_dbver = self._db.dbver + self._db_config_schema_version = self._db.schema_version self.update_database_config() side_effects |= SideEffects.DatabaseConfigChanges @@ -985,7 +986,7 @@ cdef class DatabaseConnectionView: if query_unit_group is None: # Cache miss; need to compile this query. cached = False - dbver = self._db.dbver + schema_version = self._db.schema_version try: query_unit_group = await self._compile(query_req) @@ -1027,7 +1028,9 @@ cdef class DatabaseConnectionView: if cached_globally: self.server.system_compile_cache[query_req] = query_unit_group else: - self.cache_compiled_query(query_req, query_unit_group, dbver) + self.cache_compiled_query( + query_req, query_unit_group, schema_version + ) if use_metrics: metrics.edgeql_query_compilations.inc( @@ -1184,10 +1187,6 @@ cdef class DatabaseIndex: return self._comp_sys_config def update_sys_config(self, sys_config): - cdef Database db - for db in self._dbs.values(): - db.dbver = next_dbver() - with self._default_sysconfig.mutate() as mm: mm.update(sys_config) sys_config = mm.finish() diff --git a/edb/server/pgcon/pgcon.pxd b/edb/server/pgcon/pgcon.pxd index 434121c257e..215c5394320 100644 --- a/edb/server/pgcon/pgcon.pxd +++ b/edb/server/pgcon/pgcon.pxd @@ -153,7 +153,7 @@ cdef class PGConnection: cdef fallthrough_idle(self) cdef bint before_prepare( - self, bytes stmt_name, int dbver, WriteBuffer outbuf) + self, bytes stmt_name, schema_version, WriteBuffer outbuf) cdef write_sync(self, WriteBuffer outbuf) cdef make_clean_stmt_message(self, bytes stmt_name) @@ -161,7 +161,7 @@ cdef class PGConnection: cdef send_query_unit_group( self, object query_unit_group, bint sync, object bind_datas, bytes state, - ssize_t start, ssize_t end, int dbver, object parse_array + ssize_t start, ssize_t end, schema_version, object parse_array ) cdef _rewrite_copy_data( diff --git a/edb/server/pgcon/pgcon.pyx b/edb/server/pgcon/pgcon.pyx index 0ebaa7d646e..75bea46561c 100644 --- a/edb/server/pgcon/pgcon.pyx +++ b/edb/server/pgcon/pgcon.pyx @@ -764,7 +764,7 @@ cdef class PGConnection: cdef bint before_prepare( self, bytes stmt_name, - int dbver, + schema_version, WriteBuffer outbuf, ): cdef bint parse = 1 @@ -777,7 +777,7 @@ cdef class PGConnection: self.make_clean_stmt_message(stmt_name_to_clean)) if stmt_name in self.prep_stmts: - if self.prep_stmts[stmt_name] == dbver: + if self.prep_stmts[stmt_name] == schema_version: parse = 0 else: if self.debug: @@ -922,7 +922,7 @@ cdef class PGConnection: cdef send_query_unit_group( self, object query_unit_group, bint sync, object bind_datas, bytes state, - ssize_t start, ssize_t end, int dbver, object parse_array + ssize_t start, ssize_t end, schema_version, object parse_array ): # parse_array is an array of booleans for output with the same size as # the query_unit_group, indicating if each unit is freshly parsed @@ -952,9 +952,9 @@ cdef class PGConnection: # We just need to know and skip if we've already parsed the # same query within current send batch, because self.prep_stmts # will be updated before the next batch, with maybe a different - # dbver after DDL. + # schema_version after DDL. if stmt_name not in parsed and self.before_prepare( - stmt_name, dbver, out + stmt_name, schema_version, out ): buf = WriteBuffer.new_message(b'P') buf.write_bytestring(stmt_name) @@ -1042,7 +1042,7 @@ cdef class PGConnection: self, object query_unit, bint parse, - int dbver, + schema_version, *, bint ignore_data, frontend.AbstractFrontendConnection fe_conn = None, @@ -1093,7 +1093,7 @@ cdef class PGConnection: # ParseComplete self.buffer.discard_message() if parse: - self.prep_stmts[query_unit.sql_hash] = dbver + self.prep_stmts[query_unit.sql_hash] = schema_version elif mtype == b'E': ## result # ErrorResponse @@ -1135,7 +1135,7 @@ cdef class PGConnection: WriteBuffer bind_data, bint use_prep_stmt, bytes state, - int dbver, + schema_version, ): cdef: WriteBuffer out @@ -1206,7 +1206,7 @@ cdef class PGConnection: if use_prep_stmt: stmt_name = query.sql_hash parse = self.before_prepare( - stmt_name, dbver, out) + stmt_name, schema_version, out) else: stmt_name = b'' @@ -1355,7 +1355,7 @@ cdef class PGConnection: elif mtype == b'1' and parse: # ParseComplete self.buffer.discard_message() - self.prep_stmts[stmt_name] = dbver + self.prep_stmts[stmt_name] = schema_version elif mtype == b'E': ## result # ErrorResponse @@ -1402,7 +1402,7 @@ cdef class PGConnection: frontend.AbstractFrontendConnection fe_conn = None, bint use_prep_stmt = False, bytes state = None, - int dbver = 0, + schema_version = None, ): self.before_command() started_at = time.monotonic() @@ -1413,7 +1413,7 @@ cdef class PGConnection: bind_data, use_prep_stmt, state, - dbver, + schema_version, ) finally: metrics.backend_query_duration.observe( @@ -1623,13 +1623,14 @@ cdef class PGConnection: self, actions: list[PGMessage], fe_conn: frontend.AbstractFrontendConnection, - dbver: int, + schema_version, dbv: pg_ext.ConnectionView, send_sync_on_error: bool = False, ) -> tuple[bool, bool]: self.before_command() try: - state = self._write_sql_extended_query(actions, dbver, dbv) + state = self._write_sql_extended_query( + actions, schema_version, dbv) if state is not None: await self._parse_apply_state_resp( 2 if state != EMPTY_SQL_STATE else 1 @@ -1640,7 +1641,7 @@ cdef class PGConnection: return await self._parse_sql_extended_query( actions, fe_conn, - dbver, + schema_version, dbv, send_sync_on_error=send_sync_on_error, ) @@ -1653,7 +1654,7 @@ cdef class PGConnection: def _write_sql_extended_query( self, actions: list[PGMessage], - dbver: int, + schema_version, dbv: pg_ext.ConnectionView, ) -> bytes: cdef: @@ -1679,7 +1680,7 @@ cdef class PGConnection: action.frontend_only = True else: be_parse = self.before_prepare( - action.stmt_name, dbver, buf + action.stmt_name, schema_version, buf ) if not be_parse: if self.debug: @@ -1705,7 +1706,7 @@ cdef class PGConnection: action.frontend_only = True else: be_parse = self.before_prepare( - be_stmt_name, dbver, buf + be_stmt_name, schema_version, buf ) if not be_parse: if self.debug: @@ -1744,7 +1745,7 @@ cdef class PGConnection: action.frontend_only = True else: be_parse = self.before_prepare( - be_stmt_name, dbver, buf + be_stmt_name, schema_version, buf ) if not be_parse: if self.debug: @@ -1757,7 +1758,9 @@ cdef class PGConnection: action.query_unit is not None and action.query_unit.deallocate is not None and self.before_prepare( - action.query_unit.deallocate.be_stmt_name, dbver, buf + action.query_unit.deallocate.be_stmt_name, + schema_version, + buf, ) ): # This prepared statement does not actually exist @@ -1815,7 +1818,7 @@ cdef class PGConnection: self, actions: list[PGMessage], fe_conn: frontend.AbstractFrontendConnection, - dbver: int, + schema_version, dbv: pg_ext.ConnectionView, send_sync_on_error: bool, ) -> tuple[bool, bool]: @@ -1992,7 +1995,7 @@ cdef class PGConnection: self.buffer.finish_message() if self.debug: self.debug_print('PARSE COMPLETE MSG') - self.prep_stmts[action.stmt_name] = dbver + self.prep_stmts[action.stmt_name] = schema_version if not action.is_injected(): msg_buf = WriteBuffer.new_message(mtype) buf.write_buffer(msg_buf.end_message()) @@ -2052,9 +2055,9 @@ cdef class PGConnection: if self.debug: self.debug_print( f"remembering ps {be_stmt_name}, " - f"dbver {dbver}" + f"schema_version {schema_version}" ) - self.prep_stmts[be_stmt_name] = dbver + self.prep_stmts[be_stmt_name] = schema_version if not action.is_injected(): msg_buf = WriteBuffer.new_message(mtype) @@ -2267,12 +2270,12 @@ cdef class PGConnection: ) async def handle_ddl_in_script( - self, object query_unit, bint parse, int dbver + self, object query_unit, bint parse, schema_version ): data = None for sql in query_unit.sql: data = await self.wait_for_command( - query_unit, parse, dbver, ignore_data=bool(data) + query_unit, parse, schema_version, ignore_data=bool(data) ) or data return self.load_ddl_return(query_unit, data) diff --git a/edb/server/protocol/execute.pyx b/edb/server/protocol/execute.pyx index 1cfdf00ad1d..e72ee9b5e9d 100644 --- a/edb/server/protocol/execute.pyx +++ b/edb/server/protocol/execute.pyx @@ -125,7 +125,7 @@ async def execute( bind_data=bound_args_buf, use_prep_stmt=use_prep_stmt, state=state, - dbver=dbv.dbver, + schema_version=dbv.schema_version, ) if query_unit.needs_readback and data: @@ -221,9 +221,9 @@ async def execute_script( object extensions, ext_config_settings, cached_reflection object global_schema, roles WriteBuffer bind_data - int dbver = dbv.dbver bint parse + schema_version = dbv.schema_version user_schema_pv = None extensions = ext_config_settings = cached_reflection = None global_schema = roles = None @@ -275,7 +275,7 @@ async def execute_script( sync = sent == len(unit_group) and not no_sync bind_array = args_ser.recode_bind_args_for_script( dbv, compiled, bind_args, idx, sent) - dbver = dbv.dbver + schema_version = dbv.schema_version conn.send_query_unit_group( unit_group, sync, @@ -283,7 +283,7 @@ async def execute_script( state, idx, sent, - dbver, + schema_version, parse_array, ) @@ -311,7 +311,7 @@ async def execute_script( parse = parse_array[idx] if query_unit.ddl_stmt_id: ddl_ret = await conn.handle_ddl_in_script( - query_unit, parse, dbver + query_unit, parse, schema_version ) if ddl_ret and ddl_ret['new_types']: new_types = ddl_ret['new_types'] @@ -319,7 +319,10 @@ async def execute_script( config_data = [] for sql in query_unit.sql: config_data = await conn.wait_for_command( - query_unit, parse, dbver, ignore_data=False + query_unit, + parse, + schema_version, + ignore_data=False, ) if config_data: config_ops = [ @@ -329,12 +332,17 @@ async def execute_script( elif query_unit.output_format == FMT_NONE: for sql in query_unit.sql: await conn.wait_for_command( - query_unit, parse, dbver, ignore_data=True + query_unit, + parse, + schema_version, + ignore_data=True, ) else: for sql in query_unit.sql: data = await conn.wait_for_command( - query_unit, parse, dbver, + query_unit, + parse, + schema_version, ignore_data=False, fe_conn=fe_conn, ) diff --git a/edb/server/protocol/pg_ext.pyx b/edb/server/protocol/pg_ext.pyx index f0e910e3220..c0b5467823c 100644 --- a/edb/server/protocol/pg_ext.pyx +++ b/edb/server/protocol/pg_ext.pyx @@ -785,7 +785,7 @@ cdef class PgConnection(frontend.FrontendConnection): conn = await self.get_pgcon() try: success, _ = await conn.sql_extended_query( - actions, self, self.database.dbver, dbv) + actions, self, self.database.schema_version, dbv) self.ignore_till_sync = not success finally: self.maybe_release_pgcon(conn) @@ -822,7 +822,7 @@ cdef class PgConnection(frontend.FrontendConnection): _, rq_sent = await conn.sql_extended_query( actions, self, - self.database.dbver, + self.database.schema_version, dbv, send_sync_on_error=True, ) @@ -852,7 +852,7 @@ cdef class PgConnection(frontend.FrontendConnection): conn = await self.get_pgcon() try: success, _ = await conn.sql_extended_query( - actions, self, self.database.dbver, dbv) + actions, self, self.database.schema_version, dbv) self.ignore_till_sync = not success except Exception as ex: self.write_error(ex) diff --git a/edb/server/protocol/server_info.py b/edb/server/protocol/server_info.py index 346dd87e56b..4a9ea00dcbc 100644 --- a/edb/server/protocol/server_info.py +++ b/edb/server/protocol/server_info.py @@ -20,6 +20,7 @@ import dataclasses import http import json +import uuid import immutables @@ -46,6 +47,8 @@ def default(self, obj): return obj.to_str() if isinstance(obj, statypes.CompositeType): return obj.to_json_value() + if isinstance(obj, uuid.UUID): + return str(obj) return super().default(obj) diff --git a/edb/server/tenant.py b/edb/server/tenant.py index 3b04dfd5e7d..79dd91343af 100644 --- a/edb/server/tenant.py +++ b/edb/server/tenant.py @@ -1403,7 +1403,7 @@ def get_debug_info(self) -> dict[str, Any]: dbs[db.name] = dict( name=db.name, - dbver=db.dbver, + schema_version=db.schema_version, config=( None if db.db_config is None