Skip to content

Commit

Permalink
[6.x] Remove insertion of extra repairs in patch queue (#8278)
Browse files Browse the repository at this point in the history
Currently, if there is *any* repair in a patch queue, we insert a
repair after *every* patch that modifies the std schema.

This was in combination with deferring all repairs until the last one
in the queue, which allowed us to avoid needing to load the current
standard schema in order to do a repair. But we implemented support
for doing that as part of patching user extensions, so merge all of
the schema repair code into that.

I tested this on 5.x's substantial collection of complex patches (see
#8277), and
then forward ported to master, and then backported this to 6.x.
  • Loading branch information
msullivan authored Jan 31, 2025
1 parent 49e45fc commit 34923bd
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 76 deletions.
22 changes: 2 additions & 20 deletions edb/pgsql/patches.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,6 @@ def get_version_key(num_patches: int):
return f'_v{num_major}'


def _setup_patches(patches: list[tuple[str, str]]) -> list[tuple[str, str]]:
"""Do postprocessing on the patches list
For technical reasons, we can't run a user schema repair if there
is a pending standard schema change, so when applying repairs we
always defer them to the *last* repair patch, and we ensure that
edgeql+schema is followed by a repair if necessary.
"""
seen_repair = False
npatches = []
for kind, patch in patches:
npatches.append((kind, patch))
if kind.startswith('edgeql+schema') and seen_repair:
npatches.append(('repair', ''))
seen_repair |= kind == 'repair'
return npatches


"""
The actual list of patches. The patches are (kind, script) pairs.
Expand All @@ -76,7 +58,7 @@ def _setup_patches(patches: list[tuple[str, str]]) -> list[tuple[str, str]]:
* repair - fix up inconsistencies in *user* schemas
* sql-introspection - refresh all sql introspection views
"""
PATCHES: list[tuple[str, str]] = _setup_patches([
PATCHES: list[tuple[str, str]] = [
# 6.0b2
# One of the sql-introspection's adds a param with a default to
# uuid_to_oid, so we need to drop the original to avoid ambiguity.
Expand All @@ -85,4 +67,4 @@ def _setup_patches(patches: list[tuple[str, str]]) -> list[tuple[str, str]]:
'''),
('sql-introspection', ''),
('metaschema-sql', 'SysConfigFullFunction'),
])
]
51 changes: 39 additions & 12 deletions edb/server/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,8 +686,7 @@ def prepare_repair_patch(
globalschema: s_schema.Schema,
schema_class_layout: s_refl.SchemaClassLayout,
backend_params: params.BackendRuntimeParams,
config: Any,
) -> bytes:
) -> str:
compiler = edbcompiler.new_compiler(
std_schema=stdschema,
reflection_schema=reflschema,
Expand All @@ -701,13 +700,13 @@ def prepare_repair_patch(
)
res = edbcompiler.repair_schema(compilerctx)
if not res:
return b""
return ""
sql, _, _ = res

return sql
return sql.decode('utf-8')


PatchEntry = tuple[tuple[str, ...], tuple[str, ...], dict[str, Any], bool]
PatchEntry = tuple[tuple[str, ...], tuple[str, ...], dict[str, Any]]


async def gather_patch_info(
Expand Down Expand Up @@ -786,6 +785,8 @@ def prepare_patch(
patch_info: Optional[dict[str, list[str]]],
user_schema: Optional[s_schema.Schema]=None,
global_schema: Optional[s_schema.Schema]=None,
*,
dbname: Optional[str]=None,
) -> PatchEntry:
val = f'{pg_common.quote_literal(json.dumps(num + 1))}::jsonb'
# TODO: This is an INSERT because 2.0 shipped without num_patches.
Expand All @@ -801,19 +802,45 @@ def prepare_patch(

# Pure SQL patches are simple
if kind == 'sql':
return (patch, update), (), {}, False
return (patch, update), (), {}

# metaschema-sql: just recreate a function from metaschema
if kind == 'metaschema-sql':
func = getattr(metaschema, patch)
create = dbops.CreateFunction(func(), or_replace=True)
block = dbops.PLTopBlock()
create.generate(block)
return (block.to_string(), update), (), {}, False
return (block.to_string(), update), (), {}

if kind == 'repair':
assert not patch
return (update,), (), {}, True
if not user_schema:
return (update,), (), dict(is_user_update=True)
assert global_schema

# TODO: Implement the last-repair-only optimization?
try:
logger.info("repairing database '%s'", dbname)
sql = prepare_repair_patch(
schema,
reflschema,
user_schema,
global_schema,
schema_class_layout,
backend_params
)
except errors.EdgeDBError as e:
if isinstance(e, errors.InternalServerError):
raise
raise errors.SchemaError(
f'Could not repair schema inconsistencies in '
f'database branch "{dbname}". Probably the schema is '
f'no longer valid due to a bug fix.\n'
f'Downgrade to the last working version, fix '
f'the schema issue, and try again.'
) from e

return (update, sql), (), {}

# EdgeQL and reflection schema patches need to be compiled.
current_block = dbops.PLTopBlock()
Expand Down Expand Up @@ -874,7 +901,7 @@ def prepare_patch(
# There isn't anything to do on the system database for
# userext updates.
if user_schema is None:
return (update,), (), dict(is_user_ext_update=True), False
return (update,), (), dict(is_user_update=True)

# Only run a userext update if the extension we are trying to
# update is installed.
Expand All @@ -883,7 +910,7 @@ def prepare_patch(
s_exts.Extension, extension_name, default=None)

if not extension:
return (update,), (), {}, False
return (update,), (), {}

assert global_schema
cschema = s_schema.ChainedSchema(
Expand Down Expand Up @@ -1097,13 +1124,13 @@ def prepare_patch(
sys_updates = (patch,) + sys_updates
else:
regular_updates = spatches + (update,)
# FIXME: This is a hack to make the is_user_ext_update cases
# FIXME: This is a hack to make the is_user_update cases
# work (by ensuring we can always read their current state),
# but this is actually a pretty dumb approach and we can do
# better.
regular_updates += sys_updates

return regular_updates, sys_updates, updates, False
return regular_updates, sys_updates, updates


async def create_branch(
Expand Down
52 changes: 8 additions & 44 deletions edb/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,8 @@ async def _prepare_patches(
conn, f'patch_log_{idx}', pickle.dumps(entry))

patches[num] = entry
_, _, updates, _ = entry
# FIXME: *_ for 6.x compat; 7.x can drop it
_, _, updates, *_ = entry
if 'std_and_reflection_schema' in updates:
self._std_schema, self._refl_schema = updates[
'std_and_reflection_schema']
Expand Down Expand Up @@ -1378,16 +1379,17 @@ async def _maybe_apply_patches(
) -> None:
"""Apply any un-applied patches to the database."""
num_patches = await self.get_patch_count(conn)
for num, (sql_b, syssql, keys, repair) in patches.items():
# FIXME: *_ for 6.x compat; 7.x can drop it
for num, (sql_b, syssql, keys, *_) in patches.items():
if num_patches <= num:
if sys:
sql_b += syssql
logger.info("applying patch %d to database '%s'", num, dbname)
sql = tuple(x.encode('utf-8') for x in sql_b)

# If we are doing a user_ext update, we need to
# actually run that against each user database.
if keys.get('is_user_ext_update'):
# For certain things, we need to actually run it
# against each user database.
if keys.get('is_user_update'):
from . import bootstrap

kind, patch = pg_patches.PATCHES[num]
Expand Down Expand Up @@ -1421,49 +1423,11 @@ async def _maybe_apply_patches(
patch_info=patch_info,
user_schema=user_schema,
global_schema=global_schema,
dbname=dbname,
)

sql += tuple(x.encode('utf-8') for x in entry[0])

# Only do repairs when they are the *last* pending
# repair in the patch queue. We make sure that every
# patch that changes the user schema is followed by a
# repair, so this allows us to only ever have to do
# repairs on up-to-date std schemas.
last_repair = repair and not any(
patches[i][3] for i in range(num + 1, len(patches))
)
if last_repair:
from . import bootstrap

global_schema = await self.introspect_global_schema(conn)
user_schema = await self._introspect_user_schema(
conn, global_schema)
config_json = await self.introspect_db_config(conn)
db_config = self._parse_db_config(config_json, user_schema)
try:
logger.info("repairing database '%s'", dbname)
rep_sql = bootstrap.prepare_repair_patch(
self._std_schema,
self._refl_schema,
user_schema,
global_schema,
self._schema_class_layout,
self._tenant.get_backend_runtime_params(),
db_config,
)
sql += (rep_sql,)
except errors.EdgeDBError as e:
if isinstance(e, errors.InternalServerError):
raise
raise errors.SchemaError(
f'Could not repair schema inconsistencies in '
f'database "{dbname}". Probably the schema is '
f'no longer valid due to a bug fix.\n'
f'Downgrade to the last working version, fix '
f'the schema issue, and try again.'
) from e

if sql:
await conn.sql_execute(sql)
logger.info(
Expand Down

0 comments on commit 34923bd

Please sign in to comment.