Skip to content

Commit

Permalink
Add test restarting compute at WAL page boundary
Browse files Browse the repository at this point in the history
  • Loading branch information
arssher committed Dec 12, 2024
1 parent 0bd8eca commit 4bce31d
Showing 1 changed file with 56 additions and 0 deletions.
56 changes: 56 additions & 0 deletions test_runner/regress/test_wal_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,62 @@ def test_restart_endpoint_after_switch_wal(neon_env_builder: NeonEnvBuilder):
endpoint.safe_psql("SELECT 'works'")


# Test restarting compute at WAL page boundary.
def test_restart_endpoint_wal_page_boundary(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()

ep = env.endpoints.create_start("main")
ep.safe_psql("create table t (i int)")

with ep.cursor() as cur:
# measure how much space logical message takes. Sometimes first attempt
# creates huge message and then it stabilizes, have no idea why.
for _ in range(3):
lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"current_lsn={lsn_before}")
# Non-transactional logical message doesn't write WAL, only XLogInsert's
# it, so use transactional. Which is a bit problematic as transactional
# necessitates commit record. Alternatively we can do smth like
# select neon_xlogflush(pg_current_wal_insert_lsn());
# but isn't much better + that particular call complains on 'xlog flush
# request 0/282C018 is not satisfied' as pg_current_wal_insert_lsn skips
# page headers.
payload = "blahblah"
cur.execute(f"select pg_logical_emit_message(true, 'pref', '{payload}')")
lsn_after_by_curr_wal_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
lsn_diff = lsn_after_by_curr_wal_lsn - lsn_before
logical_message_base = lsn_after_by_curr_wal_lsn - lsn_before - len(payload)
log.info(
f"before {lsn_before}, after {lsn_after_by_curr_wal_lsn}, lsn diff is {lsn_diff}, base {logical_message_base}"
)

# and write logical message spanning exactly as we want
lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"current_lsn={lsn_before}")
curr_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
offs = int(curr_lsn) % 8192
till_page = 8192 - offs
target_lsn = curr_lsn + till_page
payload_len = (
till_page - logical_message_base - 8
) # not sure why 8 is here, it is deduced from experiments
log.info(
f"current_lsn={curr_lsn}, offs {offs}, till_page {till_page}, target_lsn {target_lsn}"
)

cur.execute(f"select pg_logical_emit_message(true, 'pref', 'f{'a' * payload_len}')")
supposedly_contrecord_end = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
log.info(f"supposedly_page_boundary={supposedly_contrecord_end}")
# The calculations to hit the page boundary are very fuzzy, so just
# ignore test if we fail to reach it.
if not (int(supposedly_contrecord_end) % 8192 == 0):
pytest.skip(f"missed page boundary, bad luck: lsn is {supposedly_contrecord_end}")

ep.stop(mode="immediate")
ep = env.endpoints.create_start("main")
ep.safe_psql("insert into t values (42)") # should be ok


# Context manager which logs passed time on exit.
class DurationLogger:
def __init__(self, desc):
Expand Down

0 comments on commit 4bce31d

Please sign in to comment.