Skip to content

Commit

Permalink
[Upgrades] Support preparing upgrade while still up
Browse files Browse the repository at this point in the history
This requires splitting up "preparing" the upgrade (which populates
the new namespaces) and "finalizing" the upgrade (which flips all the
trampolines and performs schema fixups).

Progress on #6697.
  • Loading branch information
msullivan committed Sep 6, 2024
1 parent a1cd3a6 commit 1ba26c6
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 52 deletions.
12 changes: 9 additions & 3 deletions edb/server/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ class ServerConfig(NamedTuple):
log_level: str
log_to: str
bootstrap_only: bool
inplace_upgrade: Optional[pathlib.Path]
inplace_upgrade_prepare: Optional[pathlib.Path]
inplace_upgrade_finalize: bool
bootstrap_command: str
bootstrap_command_file: pathlib.Path
default_branch: Optional[str]
Expand Down Expand Up @@ -676,10 +677,15 @@ def resolve_envvar_value(self, ctx: click.Context):
envvar="EDGEDB_SERVER_BOOTSTRAP_ONLY", cls=EnvvarResolver,
help='bootstrap the database cluster and exit'),
click.option(
'--inplace-upgrade', type=PathPath(),
envvar="EDGEDB_SERVER_INPLACE_UPGRADE",
'--inplace-upgrade-prepare', type=PathPath(),
envvar="EDGEDB_SERVER_INPLACE_UPGRADE_PREPARE",
cls=EnvvarResolver, # XXX?
help='try to do an in-place upgrade with the specified dump file'),
click.option(
'--inplace-upgrade-finalize', type=bool, is_flag=True,
envvar="EDGEDB_SERVER_INPLACE_UPGRADE_FINALIZE",
cls=EnvvarResolver, # XXX?
help='finalize an in-place upgrade'),
click.option(
'--default-branch', type=str,
help='the name of the default branch to create'),
Expand Down
139 changes: 92 additions & 47 deletions edb/server/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
from edb.server import config
from edb.server import compiler as edbcompiler
from edb.server import defines as edbdef
from edb.server import instdata
from edb.server import pgcluster
from edb.server import pgcon

Expand Down Expand Up @@ -216,7 +217,7 @@ def terminate(self) -> None:
class BootstrapContext:

cluster: pgcluster.BaseCluster
conn: PGConnectionProxy
conn: PGConnectionProxy | pgcon.PGConnection
args: edbargs.ServerConfig
mode: Optional[ClusterMode] = None

Expand Down Expand Up @@ -1533,7 +1534,6 @@ async def _init_stdlib(
StdlibBits,
config.Spec,
edbcompiler.Compiler,
Optional[dbops.SQLBlock], # trampoline block
]:
in_dev_mode = devmode.is_in_dev_mode()
conn = ctx.conn
Expand All @@ -1556,7 +1556,7 @@ async def _init_stdlib(
src_hash=src_hash,
cache_dir=cache_dir,
)
if args.inplace_upgrade:
if args.inplace_upgrade_prepare:
tpldbdump = None

tpldbdump, tpldbdump_inplace = None, None
Expand Down Expand Up @@ -1585,20 +1585,21 @@ async def _init_stdlib(
)

backend_params = cluster.get_runtime_params()
if not args.inplace_upgrade:
if not args.inplace_upgrade_prepare:
logger.info('Creating the necessary PostgreSQL extensions...')
await metaschema.create_pg_extensions(conn, backend_params)

trampolines = []
trampolines.extend(stdlib.trampolines)

eff_tpldbdump = tpldbdump_inplace if args.inplace_upgrade else tpldbdump
eff_tpldbdump = (
tpldbdump_inplace if args.inplace_upgrade_prepare else tpldbdump)
if eff_tpldbdump is None:
logger.info('Populating internal SQL structures...')
assert bootstrap_commands is not None
block = dbops.PLTopBlock()

if not args.inplace_upgrade:
if not args.inplace_upgrade_prepare:
fixed_bootstrap_commands = metaschema.get_fixed_bootstrap_commands()
fixed_bootstrap_commands.generate(block)

Expand Down Expand Up @@ -1671,7 +1672,7 @@ async def _init_stdlib(
tpldbdump_inplace = cleanup_tpldbdump(tpldbdump_inplace)

# XXX: BE SMARTER ABOUT THIS, DON'T DO ALL THAT WORK
if args.inplace_upgrade:
if args.inplace_upgrade_prepare:
tpldbdump = None

buildmeta.write_data_cache(
Expand Down Expand Up @@ -1820,13 +1821,17 @@ async def _init_stdlib(
block = dbops.PLTopBlock()
tramps.generate(block)

if args.inplace_upgrade:
trampoline_block = block
if args.inplace_upgrade_prepare:
trampoline_text = block.to_string()
await _store_static_text_cache(
ctx,
f'trampoline_pivot_query',
trampoline_text,
)
else:
await _execute_block(conn, block)
trampoline_block = None

return stdlib, config_spec, compiler, trampoline_block
return stdlib, config_spec, compiler


async def _init_defaults(schema, compiler, conn):
Expand Down Expand Up @@ -2438,7 +2443,7 @@ async def _bootstrap_edgedb_super_roles(ctx: BootstrapContext) -> uuid.UUID:
async def _bootstrap(
ctx: BootstrapContext,
no_template: bool=False,
) -> tuple[edbcompiler.CompilerState, Optional[dbops.SQLBlock]]:
) -> edbcompiler.CompilerState:
args = ctx.args
cluster = ctx.cluster
backend_params = cluster.get_runtime_params()
Expand All @@ -2464,7 +2469,7 @@ async def _bootstrap(
using_template = backend_params.has_create_database and not no_template

if using_template:
if not args.inplace_upgrade:
if not args.inplace_upgrade_prepare:
new_template_db_id = await _create_edgedb_template_database(ctx)
# XXX: THIS IS WRONG, RIGHT?
else:
Expand Down Expand Up @@ -2492,7 +2497,7 @@ async def _bootstrap(

await _populate_misc_instance_data(tpl_ctx)

stdlib, config_spec, compiler, trampoline_block = await _init_stdlib(
stdlib, config_spec, compiler = await _init_stdlib(
tpl_ctx,
testmode=args.testmode,
global_ids={
Expand Down Expand Up @@ -2581,7 +2586,7 @@ async def _bootstrap(
async with iteration:
await _pg_ensure_database_not_connected(ctx.conn, tpl_db)

if args.inplace_upgrade:
if args.inplace_upgrade_prepare:
pass
elif backend_params.has_create_database:
await _create_edgedb_database(
Expand Down Expand Up @@ -2613,7 +2618,7 @@ async def _bootstrap(
compiler=compiler,
)

if args.inplace_upgrade:
if args.inplace_upgrade_prepare:
pass
elif backend_params.has_create_database:
await _create_edgedb_database(
Expand Down Expand Up @@ -2652,7 +2657,7 @@ async def _bootstrap(
args.default_database_user or edbdef.EDGEDB_SUPERUSER,
)

return compiler.state, trampoline_block
return compiler.state


async def _load_schema(
Expand Down Expand Up @@ -2806,8 +2811,6 @@ async def _upgrade_one(
for name, qltype, objid in upgrade_data['ids']
}

logger.info('Populating schema tables...')

# Load the schemas
schema = await _load_schema(ctx, state)

Expand Down Expand Up @@ -2886,12 +2889,13 @@ async def _upgrade_one(

async def _cleanup_one(
ctx: BootstrapContext,
state: edbcompiler.CompilerState,
trampoline_block: dbops.SQLBlock,
) -> None:
# TODO: Handle it already being committed
conn = ctx.conn

await _execute_block(conn, trampoline_block)
trampoline_query = await instdata.get_instdata(
conn, 'trampoline_pivot_query', 'text')
await conn.sql_execute(trampoline_query)

namespaces = json.loads(await conn.sql_fetch_val("""
select json_agg(nspname) from pg_namespace
Expand All @@ -2910,15 +2914,13 @@ async def _cleanup_one(
""".encode('utf-8'))


async def _upgrade_all(
async def _get_databases(
ctx: BootstrapContext,
state: edbcompiler.CompilerState,
trampoline_block: dbops.SQLBlock,
) -> None:
) -> list[str]:
cluster = ctx.cluster

tpl_db = cluster.get_db_name(edbdef.EDGEDB_TEMPLATE_DB)
conn = PGConnectionProxy(cluster, tpl_db)
conn = await cluster.connect(database=tpl_db)

# FIXME: Use the sys query instead?
try:
Expand All @@ -2930,20 +2932,31 @@ async def _upgrade_all(
finally:
conn.terminate()

assert ctx.args.inplace_upgrade
with open(ctx.args.inplace_upgrade) as f:
upgrade_data = json.load(f)

# DEBUG VELOCITY HACK: You can add a failing database to EARLY
# when trying to upgrade the whole suite.
EARLY: tuple[str, ...] = ('dump01',)
EARLY: tuple[str, ...] = ()
databases.sort(key=lambda k: (k not in EARLY, k))

return databases


async def _upgrade_all(
ctx: BootstrapContext,
state: edbcompiler.CompilerState,
) -> None:
cluster = ctx.cluster

databases = await _get_databases(ctx)

assert ctx.args.inplace_upgrade_prepare
with open(ctx.args.inplace_upgrade_prepare) as f:
upgrade_data = json.load(f)

for database in databases:
if database == edbdef.EDGEDB_TEMPLATE_DB:
continue

conn = PGConnectionProxy(cluster, ctx.cluster.get_db_name(database))
conn = PGConnectionProxy(cluster, cluster.get_db_name(database))
try:
subctx = dataclasses.replace(ctx, conn=conn)

Expand All @@ -2956,20 +2969,43 @@ async def _upgrade_all(
state=state,
upgrade_data=upgrade_data.get(database),
)

# XXX: This is not the right place to do this. We only
# want to do this if everything succeeds.
await _cleanup_one(subctx, state, trampoline_block)
finally:
conn.terminate()

conn = PGConnectionProxy(
cluster, ctx.cluster.get_db_name(edbdef.EDGEDB_TEMPLATE_DB))

async def _finish_all(
ctx: BootstrapContext,
) -> None:
cluster = ctx.cluster

databases = await _get_databases(ctx)

conns = []
try:
subctx = dataclasses.replace(ctx, conn=conn)
await _cleanup_one(subctx, state, trampoline_block)
for database in databases:
conn = await cluster.connect(database=cluster.get_db_name(database))
conns.append(conn)

subctx = dataclasses.replace(ctx, conn=conn)

await conn.sql_execute(b'START TRANSACTION')
await conn.sql_execute(
b' SET LOCAL idle_in_transaction_session_timeout = 0'
)
logger.info(f"Pivoting database '{database}'")
await _cleanup_one(subctx)

# Commit all of the pivots together, once they have succeeded.
# This could still fail part way through (due to a crash or
# some such), but then we should be able to try again and have
# it succeed, since we already know that the DDL works.
logger.info(f"Commiting database pivots")
for conn in conns:
await conn.sql_execute(b'COMMIT')

finally:
conn.terminate()
for conn in conns:
conn.terminate()


async def ensure_bootstrapped(
Expand All @@ -2987,13 +3023,22 @@ async def ensure_bootstrapped(
try:
mode = await _get_cluster_mode(ctx)
ctx = dataclasses.replace(ctx, mode=mode)
if mode == ClusterMode.pristine or args.inplace_upgrade:
state, trampoline_block = await _bootstrap(ctx)
if args.inplace_upgrade_prepare or args.inplace_upgrade_finalize:
assert args.bootstrap_only or args.inplace_upgrade_finalize

if args.inplace_upgrade_prepare:
state = await _bootstrap(ctx)
await _upgrade_all(ctx, state)

if args.inplace_upgrade:
assert trampoline_block
await _upgrade_all(ctx, state, trampoline_block)
if args.inplace_upgrade_finalize:
await _finish_all(ctx)
if not args.inplace_upgrade_prepare:
state = await _start(ctx)

return True, state

elif mode == ClusterMode.pristine:
state = await _bootstrap(ctx)
return True, state
else:
state = await _start(ctx)
Expand Down
33 changes: 31 additions & 2 deletions tests/inplace-testing/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,41 @@ make parsers

tar cf "$DIR".tar "$DIR"

PORT=12346
edb server -D "$DIR" -P $PORT &
SPID=$!
cleanup() {
kill $SPID
}
trap cleanup EXIT

# Wait for the server to come up and see it is working
edgedb -H localhost -P $PORT --tls-security insecure -b select query 'select count(User)' | grep 2

# Upgrade to the new version
patch -f -p1 < tests/inplace-testing/upgrade.patch

make parsers

edb server --bootstrap-only --inplace-upgrade "$DIR"/upgrade.json --data-dir "$DIR"
# Get the DSN from the debug endpoint
DSN=$(curl -s http://localhost:$PORT/server-info | jq -r '.pg_addr | "postgres:///?user=\(.user)&port=\(.port)&host=\(.host)"')

# Prepare the upgrade, operating against the postgres that the old
# version server is managing
edb server --bootstrap-only --inplace-upgrade-prepare "$DIR"/upgrade.json --backend-dsn="$DSN"

# Check the server is still working
edgedb -H localhost -P $PORT --tls-security insecure -b select query 'select count(User)' | grep 2

# Kill the old version so we can finalize the upgrade
kill $SPID
wait $SPID
SPID=

tar cf "$DIR"-prepped.tar "$DIR"

# Finalize the upgrade
edb server --bootstrap-only --inplace-upgrade-finalize --data-dir "$DIR"
tar cf "$DIR"-cooked.tar "$DIR"

# Test!
edb test --data-dir "$DIR" --use-data-dir-dbs -v "$@"

0 comments on commit 1ba26c6

Please sign in to comment.