Skip to content

Commit

Permalink
[Upgrades] Support preparing upgrade while still up (#7718)
Browse files Browse the repository at this point in the history
This requires splitting up "preparing" the upgrade (which populates
the new namespaces) and "finalizing" the upgrade (which flips all the
trampolines and performs schema fixups).

This also does some more work towards making the upgrade recoverable
(by running the finalizations in transactions, and not committing any
until
all are ready), but there are still important TODOs on that front.

Progress on #6697.
  • Loading branch information
msullivan authored Sep 6, 2024
1 parent 5dfcb35 commit bed0b65
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 58 deletions.
12 changes: 9 additions & 3 deletions edb/server/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ class ServerConfig(NamedTuple):
log_level: str
log_to: str
bootstrap_only: bool
inplace_upgrade: Optional[pathlib.Path]
inplace_upgrade_prepare: Optional[pathlib.Path]
inplace_upgrade_finalize: bool
bootstrap_command: str
bootstrap_command_file: pathlib.Path
default_branch: Optional[str]
Expand Down Expand Up @@ -676,10 +677,15 @@ def resolve_envvar_value(self, ctx: click.Context):
envvar="EDGEDB_SERVER_BOOTSTRAP_ONLY", cls=EnvvarResolver,
help='bootstrap the database cluster and exit'),
click.option(
'--inplace-upgrade', type=PathPath(),
envvar="EDGEDB_SERVER_INPLACE_UPGRADE",
'--inplace-upgrade-prepare', type=PathPath(),
envvar="EDGEDB_SERVER_INPLACE_UPGRADE_PREPARE",
cls=EnvvarResolver, # XXX?
help='try to do an in-place upgrade with the specified dump file'),
click.option(
'--inplace-upgrade-finalize', type=bool, is_flag=True,
envvar="EDGEDB_SERVER_INPLACE_UPGRADE_FINALIZE",
cls=EnvvarResolver, # XXX?
help='finalize an in-place upgrade'),
click.option(
'--default-branch', type=str,
help='the name of the default branch to create'),
Expand Down
151 changes: 98 additions & 53 deletions edb/server/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
from edb.server import config
from edb.server import compiler as edbcompiler
from edb.server import defines as edbdef
from edb.server import instdata
from edb.server import pgcluster
from edb.server import pgcon

Expand Down Expand Up @@ -216,7 +217,7 @@ def terminate(self) -> None:
class BootstrapContext:

cluster: pgcluster.BaseCluster
conn: PGConnectionProxy
conn: PGConnectionProxy | pgcon.PGConnection
args: edbargs.ServerConfig
mode: Optional[ClusterMode] = None

Expand Down Expand Up @@ -1533,7 +1534,6 @@ async def _init_stdlib(
StdlibBits,
config.Spec,
edbcompiler.Compiler,
Optional[dbops.SQLBlock], # trampoline block
]:
in_dev_mode = devmode.is_in_dev_mode()
conn = ctx.conn
Expand All @@ -1556,7 +1556,7 @@ async def _init_stdlib(
src_hash=src_hash,
cache_dir=cache_dir,
)
if args.inplace_upgrade:
if args.inplace_upgrade_prepare:
tpldbdump = None

tpldbdump, tpldbdump_inplace = None, None
Expand Down Expand Up @@ -1585,20 +1585,21 @@ async def _init_stdlib(
)

backend_params = cluster.get_runtime_params()
if not args.inplace_upgrade:
if not args.inplace_upgrade_prepare:
logger.info('Creating the necessary PostgreSQL extensions...')
await metaschema.create_pg_extensions(conn, backend_params)

trampolines = []
trampolines.extend(stdlib.trampolines)

eff_tpldbdump = tpldbdump_inplace if args.inplace_upgrade else tpldbdump
eff_tpldbdump = (
tpldbdump_inplace if args.inplace_upgrade_prepare else tpldbdump)
if eff_tpldbdump is None:
logger.info('Populating internal SQL structures...')
assert bootstrap_commands is not None
block = dbops.PLTopBlock()

if not args.inplace_upgrade:
if not args.inplace_upgrade_prepare:
fixed_bootstrap_commands = metaschema.get_fixed_bootstrap_commands()
fixed_bootstrap_commands.generate(block)

Expand Down Expand Up @@ -1671,7 +1672,7 @@ async def _init_stdlib(
tpldbdump_inplace = cleanup_tpldbdump(tpldbdump_inplace)

# XXX: BE SMARTER ABOUT THIS, DON'T DO ALL THAT WORK
if args.inplace_upgrade:
if args.inplace_upgrade_prepare:
tpldbdump = None

buildmeta.write_data_cache(
Expand Down Expand Up @@ -1820,13 +1821,17 @@ async def _init_stdlib(
block = dbops.PLTopBlock()
tramps.generate(block)

if args.inplace_upgrade:
trampoline_block = block
if args.inplace_upgrade_prepare:
trampoline_text = block.to_string()
await _store_static_text_cache(
ctx,
f'trampoline_pivot_query',
trampoline_text,
)
else:
await _execute_block(conn, block)
trampoline_block = None

return stdlib, config_spec, compiler, trampoline_block
return stdlib, config_spec, compiler


async def _init_defaults(schema, compiler, conn):
Expand Down Expand Up @@ -2438,7 +2443,7 @@ async def _bootstrap_edgedb_super_roles(ctx: BootstrapContext) -> uuid.UUID:
async def _bootstrap(
ctx: BootstrapContext,
no_template: bool=False,
) -> tuple[edbcompiler.CompilerState, Optional[dbops.SQLBlock]]:
) -> edbcompiler.CompilerState:
args = ctx.args
cluster = ctx.cluster
backend_params = cluster.get_runtime_params()
Expand All @@ -2464,7 +2469,7 @@ async def _bootstrap(
using_template = backend_params.has_create_database and not no_template

if using_template:
if not args.inplace_upgrade:
if not args.inplace_upgrade_prepare:
new_template_db_id = await _create_edgedb_template_database(ctx)
# XXX: THIS IS WRONG, RIGHT?
else:
Expand Down Expand Up @@ -2492,7 +2497,7 @@ async def _bootstrap(

await _populate_misc_instance_data(tpl_ctx)

stdlib, config_spec, compiler, trampoline_block = await _init_stdlib(
stdlib, config_spec, compiler = await _init_stdlib(
tpl_ctx,
testmode=args.testmode,
global_ids={
Expand Down Expand Up @@ -2581,7 +2586,7 @@ async def _bootstrap(
async with iteration:
await _pg_ensure_database_not_connected(ctx.conn, tpl_db)

if args.inplace_upgrade:
if args.inplace_upgrade_prepare:
pass
elif backend_params.has_create_database:
await _create_edgedb_database(
Expand Down Expand Up @@ -2613,7 +2618,7 @@ async def _bootstrap(
compiler=compiler,
)

if args.inplace_upgrade:
if args.inplace_upgrade_prepare:
pass
elif backend_params.has_create_database:
await _create_edgedb_database(
Expand Down Expand Up @@ -2652,7 +2657,7 @@ async def _bootstrap(
args.default_database_user or edbdef.EDGEDB_SUPERUSER,
)

return compiler.state, trampoline_block
return compiler.state


async def _load_schema(
Expand Down Expand Up @@ -2693,11 +2698,12 @@ def _is_stdlib_target(
return t.get_name(schema).get_module_name() in s_schema.STD_MODULES


async def _fixup_schema(
def _compile_schema_fixup(
ctx: BootstrapContext,
schema: s_schema.ChainedSchema,
keys: dict[str, Any],
) -> None:
) -> str:
"""Compile any schema-specific fixes that need to be applied."""
current_block = dbops.PLTopBlock()
backend_params = ctx.cluster.get_runtime_params()

Expand Down Expand Up @@ -2783,7 +2789,7 @@ async def _fixup_schema(
)
plan.generate(current_block)

await _execute_block(ctx.conn, current_block)
return current_block.to_string()


async def _upgrade_one(
Expand All @@ -2806,8 +2812,6 @@ async def _upgrade_one(
for name, qltype, objid in upgrade_data['ids']
}

logger.info('Populating schema tables...')

# Load the schemas
schema = await _load_schema(ctx, state)

Expand Down Expand Up @@ -2881,7 +2885,13 @@ async def _upgrade_one(
key = 'configspec_ext';
''').encode('utf-8'))

await _fixup_schema(ctx, schema, keys)
# Compile the fixup script for the schema and stash it away
schema_fixup = _compile_schema_fixup(ctx, schema, keys)
await _store_static_text_cache(
ctx,
f'schema_fixup_query',
schema_fixup,
)


DEP_CHECK_QUERY = r'''
Expand Down Expand Up @@ -2935,12 +2945,18 @@ async def _upgrade_one(

async def _cleanup_one(
ctx: BootstrapContext,
state: edbcompiler.CompilerState,
trampoline_block: dbops.SQLBlock,
) -> None:
# TODO: Handle it already being committed
conn = ctx.conn

await _execute_block(conn, trampoline_block)
trampoline_query = await instdata.get_instdata(
conn, 'trampoline_pivot_query', 'text')
fixup_query = await instdata.get_instdata(
conn, 'schema_fixup_query', 'text')

await conn.sql_execute(trampoline_query)
if fixup_query:
await conn.sql_execute(fixup_query)

namespaces = json.loads(await conn.sql_fetch_val("""
select json_agg(nspname) from pg_namespace
Expand Down Expand Up @@ -2983,15 +2999,13 @@ async def _cleanup_one(
""".encode('utf-8'))


async def _upgrade_all(
async def _get_databases(
ctx: BootstrapContext,
state: edbcompiler.CompilerState,
trampoline_block: dbops.SQLBlock,
) -> None:
) -> list[str]:
cluster = ctx.cluster

tpl_db = cluster.get_db_name(edbdef.EDGEDB_TEMPLATE_DB)
conn = PGConnectionProxy(cluster, tpl_db)
conn = await cluster.connect(database=tpl_db)

# FIXME: Use the sys query instead?
try:
Expand All @@ -3003,20 +3017,31 @@ async def _upgrade_all(
finally:
conn.terminate()

assert ctx.args.inplace_upgrade
with open(ctx.args.inplace_upgrade) as f:
upgrade_data = json.load(f)

# DEBUG VELOCITY HACK: You can add a failing database to EARLY
# when trying to upgrade the whole suite.
EARLY: tuple[str, ...] = ('dump01',)
EARLY: tuple[str, ...] = ()
databases.sort(key=lambda k: (k not in EARLY, k))

return databases


async def _upgrade_all(
ctx: BootstrapContext,
state: edbcompiler.CompilerState,
) -> None:
cluster = ctx.cluster

databases = await _get_databases(ctx)

assert ctx.args.inplace_upgrade_prepare
with open(ctx.args.inplace_upgrade_prepare) as f:
upgrade_data = json.load(f)

for database in databases:
if database == edbdef.EDGEDB_TEMPLATE_DB:
continue

conn = PGConnectionProxy(cluster, ctx.cluster.get_db_name(database))
conn = PGConnectionProxy(cluster, cluster.get_db_name(database))
try:
subctx = dataclasses.replace(ctx, conn=conn)

Expand All @@ -3029,20 +3054,31 @@ async def _upgrade_all(
state=state,
upgrade_data=upgrade_data.get(database),
)

# XXX: This is not the right place to do this. We only
# want to do this if everything succeeds.
await _cleanup_one(subctx, state, trampoline_block)
finally:
conn.terminate()

conn = PGConnectionProxy(
cluster, ctx.cluster.get_db_name(edbdef.EDGEDB_TEMPLATE_DB))
try:
subctx = dataclasses.replace(ctx, conn=conn)
await _cleanup_one(subctx, state, trampoline_block)
finally:
conn.terminate()

async def _finish_all(
ctx: BootstrapContext,
) -> None:
cluster = ctx.cluster

databases = await _get_databases(ctx)

for database in databases:
conn = await cluster.connect(database=cluster.get_db_name(database))
try:
subctx = dataclasses.replace(ctx, conn=conn)

logger.info(f"Pivoting database '{database}'")
# TODO: Try running each cleanup in a transaction to test
# that they all work, before applying them for real. (We
# would *like* to just run them all in open transactions
# and then commit them all, but we run into memory
# concerns.)
await _cleanup_one(subctx)
finally:
conn.terminate()


async def ensure_bootstrapped(
Expand All @@ -3060,13 +3096,22 @@ async def ensure_bootstrapped(
try:
mode = await _get_cluster_mode(ctx)
ctx = dataclasses.replace(ctx, mode=mode)
if mode == ClusterMode.pristine or args.inplace_upgrade:
state, trampoline_block = await _bootstrap(ctx)
if args.inplace_upgrade_prepare or args.inplace_upgrade_finalize:
assert args.bootstrap_only or args.inplace_upgrade_finalize

if args.inplace_upgrade_prepare:
state = await _bootstrap(ctx)
await _upgrade_all(ctx, state)

if args.inplace_upgrade:
assert trampoline_block
await _upgrade_all(ctx, state, trampoline_block)
if args.inplace_upgrade_finalize:
await _finish_all(ctx)
if not args.inplace_upgrade_prepare:
state = await _start(ctx)

return True, state

elif mode == ClusterMode.pristine:
state = await _bootstrap(ctx)
return True, state
else:
state = await _start(ctx)
Expand Down
Loading

0 comments on commit bed0b65

Please sign in to comment.