From bed0b65cfdcdf3d9a2d0f9d963491b92e1dcaa1e Mon Sep 17 00:00:00 2001 From: "Michael J. Sullivan" Date: Fri, 6 Sep 2024 14:18:06 -0700 Subject: [PATCH] [Upgrades] Support preparing upgrade while still up (#7718) This requires splitting up "preparing" the upgrade (which populates the new namespaces) and "finalizing" the upgrade (which flips all the trampolines and performs schema fixups). This also does some more work towards making the upgrade recoverable (by running the finalizations in transactions, and not committing any until all are ready), but there are still important TODOs on that front. Progress on #6697. --- edb/server/args.py | 12 ++- edb/server/bootstrap.py | 151 ++++++++++++++++++++++------------ tests/inplace-testing/test.sh | 35 +++++++- 3 files changed, 140 insertions(+), 58 deletions(-) diff --git a/edb/server/args.py b/edb/server/args.py index b5c3431d650..dea268be435 100644 --- a/edb/server/args.py +++ b/edb/server/args.py @@ -227,7 +227,8 @@ class ServerConfig(NamedTuple): log_level: str log_to: str bootstrap_only: bool - inplace_upgrade: Optional[pathlib.Path] + inplace_upgrade_prepare: Optional[pathlib.Path] + inplace_upgrade_finalize: bool bootstrap_command: str bootstrap_command_file: pathlib.Path default_branch: Optional[str] @@ -676,10 +677,15 @@ def resolve_envvar_value(self, ctx: click.Context): envvar="EDGEDB_SERVER_BOOTSTRAP_ONLY", cls=EnvvarResolver, help='bootstrap the database cluster and exit'), click.option( - '--inplace-upgrade', type=PathPath(), - envvar="EDGEDB_SERVER_INPLACE_UPGRADE", + '--inplace-upgrade-prepare', type=PathPath(), + envvar="EDGEDB_SERVER_INPLACE_UPGRADE_PREPARE", cls=EnvvarResolver, # XXX? help='try to do an in-place upgrade with the specified dump file'), + click.option( + '--inplace-upgrade-finalize', type=bool, is_flag=True, + envvar="EDGEDB_SERVER_INPLACE_UPGRADE_FINALIZE", + cls=EnvvarResolver, # XXX? + help='finalize an in-place upgrade'), click.option( '--default-branch', type=str, help='the name of the default branch to create'), diff --git a/edb/server/bootstrap.py b/edb/server/bootstrap.py index a2fb0f22a00..593f8e99dc1 100644 --- a/edb/server/bootstrap.py +++ b/edb/server/bootstrap.py @@ -78,6 +78,7 @@ from edb.server import config from edb.server import compiler as edbcompiler from edb.server import defines as edbdef +from edb.server import instdata from edb.server import pgcluster from edb.server import pgcon @@ -216,7 +217,7 @@ def terminate(self) -> None: class BootstrapContext: cluster: pgcluster.BaseCluster - conn: PGConnectionProxy + conn: PGConnectionProxy | pgcon.PGConnection args: edbargs.ServerConfig mode: Optional[ClusterMode] = None @@ -1533,7 +1534,6 @@ async def _init_stdlib( StdlibBits, config.Spec, edbcompiler.Compiler, - Optional[dbops.SQLBlock], # trampoline block ]: in_dev_mode = devmode.is_in_dev_mode() conn = ctx.conn @@ -1556,7 +1556,7 @@ async def _init_stdlib( src_hash=src_hash, cache_dir=cache_dir, ) - if args.inplace_upgrade: + if args.inplace_upgrade_prepare: tpldbdump = None tpldbdump, tpldbdump_inplace = None, None @@ -1585,20 +1585,21 @@ async def _init_stdlib( ) backend_params = cluster.get_runtime_params() - if not args.inplace_upgrade: + if not args.inplace_upgrade_prepare: logger.info('Creating the necessary PostgreSQL extensions...') await metaschema.create_pg_extensions(conn, backend_params) trampolines = [] trampolines.extend(stdlib.trampolines) - eff_tpldbdump = tpldbdump_inplace if args.inplace_upgrade else tpldbdump + eff_tpldbdump = ( + tpldbdump_inplace if args.inplace_upgrade_prepare else tpldbdump) if eff_tpldbdump is None: logger.info('Populating internal SQL structures...') assert bootstrap_commands is not None block = dbops.PLTopBlock() - if not args.inplace_upgrade: + if not args.inplace_upgrade_prepare: fixed_bootstrap_commands = metaschema.get_fixed_bootstrap_commands() fixed_bootstrap_commands.generate(block) @@ -1671,7 +1672,7 @@ async def _init_stdlib( tpldbdump_inplace = cleanup_tpldbdump(tpldbdump_inplace) # XXX: BE SMARTER ABOUT THIS, DON'T DO ALL THAT WORK - if args.inplace_upgrade: + if args.inplace_upgrade_prepare: tpldbdump = None buildmeta.write_data_cache( @@ -1820,13 +1821,17 @@ async def _init_stdlib( block = dbops.PLTopBlock() tramps.generate(block) - if args.inplace_upgrade: - trampoline_block = block + if args.inplace_upgrade_prepare: + trampoline_text = block.to_string() + await _store_static_text_cache( + ctx, + f'trampoline_pivot_query', + trampoline_text, + ) else: await _execute_block(conn, block) - trampoline_block = None - return stdlib, config_spec, compiler, trampoline_block + return stdlib, config_spec, compiler async def _init_defaults(schema, compiler, conn): @@ -2438,7 +2443,7 @@ async def _bootstrap_edgedb_super_roles(ctx: BootstrapContext) -> uuid.UUID: async def _bootstrap( ctx: BootstrapContext, no_template: bool=False, -) -> tuple[edbcompiler.CompilerState, Optional[dbops.SQLBlock]]: +) -> edbcompiler.CompilerState: args = ctx.args cluster = ctx.cluster backend_params = cluster.get_runtime_params() @@ -2464,7 +2469,7 @@ async def _bootstrap( using_template = backend_params.has_create_database and not no_template if using_template: - if not args.inplace_upgrade: + if not args.inplace_upgrade_prepare: new_template_db_id = await _create_edgedb_template_database(ctx) # XXX: THIS IS WRONG, RIGHT? else: @@ -2492,7 +2497,7 @@ async def _bootstrap( await _populate_misc_instance_data(tpl_ctx) - stdlib, config_spec, compiler, trampoline_block = await _init_stdlib( + stdlib, config_spec, compiler = await _init_stdlib( tpl_ctx, testmode=args.testmode, global_ids={ @@ -2581,7 +2586,7 @@ async def _bootstrap( async with iteration: await _pg_ensure_database_not_connected(ctx.conn, tpl_db) - if args.inplace_upgrade: + if args.inplace_upgrade_prepare: pass elif backend_params.has_create_database: await _create_edgedb_database( @@ -2613,7 +2618,7 @@ async def _bootstrap( compiler=compiler, ) - if args.inplace_upgrade: + if args.inplace_upgrade_prepare: pass elif backend_params.has_create_database: await _create_edgedb_database( @@ -2652,7 +2657,7 @@ async def _bootstrap( args.default_database_user or edbdef.EDGEDB_SUPERUSER, ) - return compiler.state, trampoline_block + return compiler.state async def _load_schema( @@ -2693,11 +2698,12 @@ def _is_stdlib_target( return t.get_name(schema).get_module_name() in s_schema.STD_MODULES -async def _fixup_schema( +def _compile_schema_fixup( ctx: BootstrapContext, schema: s_schema.ChainedSchema, keys: dict[str, Any], -) -> None: +) -> str: + """Compile any schema-specific fixes that need to be applied.""" current_block = dbops.PLTopBlock() backend_params = ctx.cluster.get_runtime_params() @@ -2783,7 +2789,7 @@ async def _fixup_schema( ) plan.generate(current_block) - await _execute_block(ctx.conn, current_block) + return current_block.to_string() async def _upgrade_one( @@ -2806,8 +2812,6 @@ async def _upgrade_one( for name, qltype, objid in upgrade_data['ids'] } - logger.info('Populating schema tables...') - # Load the schemas schema = await _load_schema(ctx, state) @@ -2881,7 +2885,13 @@ async def _upgrade_one( key = 'configspec_ext'; ''').encode('utf-8')) - await _fixup_schema(ctx, schema, keys) + # Compile the fixup script for the schema and stash it away + schema_fixup = _compile_schema_fixup(ctx, schema, keys) + await _store_static_text_cache( + ctx, + f'schema_fixup_query', + schema_fixup, + ) DEP_CHECK_QUERY = r''' @@ -2935,12 +2945,18 @@ async def _upgrade_one( async def _cleanup_one( ctx: BootstrapContext, - state: edbcompiler.CompilerState, - trampoline_block: dbops.SQLBlock, ) -> None: + # TODO: Handle it already being committed conn = ctx.conn - await _execute_block(conn, trampoline_block) + trampoline_query = await instdata.get_instdata( + conn, 'trampoline_pivot_query', 'text') + fixup_query = await instdata.get_instdata( + conn, 'schema_fixup_query', 'text') + + await conn.sql_execute(trampoline_query) + if fixup_query: + await conn.sql_execute(fixup_query) namespaces = json.loads(await conn.sql_fetch_val(""" select json_agg(nspname) from pg_namespace @@ -2983,15 +2999,13 @@ async def _cleanup_one( """.encode('utf-8')) -async def _upgrade_all( +async def _get_databases( ctx: BootstrapContext, - state: edbcompiler.CompilerState, - trampoline_block: dbops.SQLBlock, -) -> None: +) -> list[str]: cluster = ctx.cluster tpl_db = cluster.get_db_name(edbdef.EDGEDB_TEMPLATE_DB) - conn = PGConnectionProxy(cluster, tpl_db) + conn = await cluster.connect(database=tpl_db) # FIXME: Use the sys query instead? try: @@ -3003,20 +3017,31 @@ async def _upgrade_all( finally: conn.terminate() - assert ctx.args.inplace_upgrade - with open(ctx.args.inplace_upgrade) as f: - upgrade_data = json.load(f) - # DEBUG VELOCITY HACK: You can add a failing database to EARLY # when trying to upgrade the whole suite. - EARLY: tuple[str, ...] = ('dump01',) + EARLY: tuple[str, ...] = () databases.sort(key=lambda k: (k not in EARLY, k)) + return databases + + +async def _upgrade_all( + ctx: BootstrapContext, + state: edbcompiler.CompilerState, +) -> None: + cluster = ctx.cluster + + databases = await _get_databases(ctx) + + assert ctx.args.inplace_upgrade_prepare + with open(ctx.args.inplace_upgrade_prepare) as f: + upgrade_data = json.load(f) + for database in databases: if database == edbdef.EDGEDB_TEMPLATE_DB: continue - conn = PGConnectionProxy(cluster, ctx.cluster.get_db_name(database)) + conn = PGConnectionProxy(cluster, cluster.get_db_name(database)) try: subctx = dataclasses.replace(ctx, conn=conn) @@ -3029,20 +3054,31 @@ async def _upgrade_all( state=state, upgrade_data=upgrade_data.get(database), ) - - # XXX: This is not the right place to do this. We only - # want to do this if everything succeeds. - await _cleanup_one(subctx, state, trampoline_block) finally: conn.terminate() - conn = PGConnectionProxy( - cluster, ctx.cluster.get_db_name(edbdef.EDGEDB_TEMPLATE_DB)) - try: - subctx = dataclasses.replace(ctx, conn=conn) - await _cleanup_one(subctx, state, trampoline_block) - finally: - conn.terminate() + +async def _finish_all( + ctx: BootstrapContext, +) -> None: + cluster = ctx.cluster + + databases = await _get_databases(ctx) + + for database in databases: + conn = await cluster.connect(database=cluster.get_db_name(database)) + try: + subctx = dataclasses.replace(ctx, conn=conn) + + logger.info(f"Pivoting database '{database}'") + # TODO: Try running each cleanup in a transaction to test + # that they all work, before applying them for real. (We + # would *like* to just run them all in open transactions + # and then commit them all, but we run into memory + # concerns.) + await _cleanup_one(subctx) + finally: + conn.terminate() async def ensure_bootstrapped( @@ -3060,13 +3096,22 @@ async def ensure_bootstrapped( try: mode = await _get_cluster_mode(ctx) ctx = dataclasses.replace(ctx, mode=mode) - if mode == ClusterMode.pristine or args.inplace_upgrade: - state, trampoline_block = await _bootstrap(ctx) + if args.inplace_upgrade_prepare or args.inplace_upgrade_finalize: + assert args.bootstrap_only or args.inplace_upgrade_finalize + + if args.inplace_upgrade_prepare: + state = await _bootstrap(ctx) + await _upgrade_all(ctx, state) - if args.inplace_upgrade: - assert trampoline_block - await _upgrade_all(ctx, state, trampoline_block) + if args.inplace_upgrade_finalize: + await _finish_all(ctx) + if not args.inplace_upgrade_prepare: + state = await _start(ctx) + + return True, state + elif mode == ClusterMode.pristine: + state = await _bootstrap(ctx) return True, state else: state = await _start(ctx) diff --git a/tests/inplace-testing/test.sh b/tests/inplace-testing/test.sh index 703a376416f..eadfc948ce1 100755 --- a/tests/inplace-testing/test.sh +++ b/tests/inplace-testing/test.sh @@ -16,12 +16,43 @@ make parsers tar cf "$DIR".tar "$DIR" +PORT=12346 +edb server -D "$DIR" -P $PORT & +SPID=$! +cleanup() { + if [ -n "$SPID" ]; then + kill $SPID + fi +} +trap cleanup EXIT + +# Wait for the server to come up and see it is working +edgedb -H localhost -P $PORT --tls-security insecure -b select query 'select count(User)' | grep 2 + +# Upgrade to the new version patch -f -p1 < tests/inplace-testing/upgrade.patch - make parsers -edb server --bootstrap-only --inplace-upgrade "$DIR"/upgrade.json --data-dir "$DIR" +# Get the DSN from the debug endpoint +DSN=$(curl -s http://localhost:$PORT/server-info | jq -r '.pg_addr | "postgres:///?user=\(.user)&port=\(.port)&host=\(.host)"') + +# Prepare the upgrade, operating against the postgres that the old +# version server is managing +edb server --bootstrap-only --inplace-upgrade-prepare "$DIR"/upgrade.json --backend-dsn="$DSN" + +# Check the server is still working +edgedb -H localhost -P $PORT --tls-security insecure -b select query 'select count(User)' | grep 2 + +# Kill the old version so we can finalize the upgrade +kill $SPID +wait $SPID +SPID= + +tar cf "$DIR"-prepped.tar "$DIR" +# Finalize the upgrade +edb server --bootstrap-only --inplace-upgrade-finalize --data-dir "$DIR" tar cf "$DIR"-cooked.tar "$DIR" +# Test! edb test --data-dir "$DIR" --use-data-dir-dbs -v "$@"