diff --git a/src/vfs2.c b/src/vfs2.c index b143be78c..5eb03c19f 100644 --- a/src/vfs2.c +++ b/src/vfs2.c @@ -18,6 +18,7 @@ #include #include #include +#include /* MAX */ #include #define VFS2_WAL_FIXED_SUFFIX1 "-xwal1" @@ -39,119 +40,245 @@ #define READ_MARK_UNUSED 0xffffffff -static const uint32_t invalid_magic = 0x17171717; +#define DB_FILE_HEADER_SIZE 100 +#define DB_FILE_HEADER_NPAGES_OFFSET 28 + +/* Fixed version number used by the WAL and WAL-index. */ +#define WAL_MAX_VERSION 3007000 + +/* clang-format off */ enum { /* Entry is not yet open. */ WTX_CLOSED, - /* Next WAL write will be a header write, causing a WAL swap (WAL-cur is empty or fully checkpointed). */ + /* Next WAL write will be a header write, causing a WAL swap (WAL-cur is + empty or fully checkpointed). */ WTX_EMPTY, /* Non-leader, at least one transaction in WAL-cur is not committed. */ WTX_FOLLOWING, - /* Non-leader, all transactions in WAL-cur are committed (but at least one is not checkpointed). */ - WTX_FLUSH, - /* Leader, all transactions in WAL-cur are committed (but at least one is not checkpointed). */ + /* All transactions in WAL-cur are committed, but at least one is not + * checkpointed. (Both leaders and non-leaders can be in this state). + */ WTX_BASE, /* Leader, transaction in progress. */ WTX_ACTIVE, /* Leader, transaction committed by SQLite and hidden. */ WTX_HIDDEN, /* Leader, transation committed by SQLite, hidden, and polled. */ - WTX_POLLED + WTX_POLLED, + WTX_NR }; -/* - -Diagram of the state machine (some transitions omitted when they would crowd the diagram even more): - -+----------------------+ sqlite3_open +------------------------------------------------------------------------------+ -| CLOSED | ------------------------> | FOLLOWING | <+ -+----------------------+ +------------------------------------------------------------------------------+ | - | ^ ^ | | - | sqlite3_open | vfs2_apply_uncommitted | vfs2_apply_uncommitted | vfs2_{commit,unapply} | vfs2_apply_uncommitted - v | | v | -+----------------------------------------------------------------------------+ | +------------------------+ | -| | +----------------------- | | | -| EMPTY | | FLUSH | | -| | <------------------------ | | | -+----------------------------------------------------------------------------+ sqlite3_wal_checkpoint +------------------------+ | - | ^ | | - | vfs2_commit_barrier | sqlite3_wal_checkpoint | | - v | | | -+----------------------------------------------------------------------------+ vfs2_commit_barrier | | -| BASE | <---------------------------+ | -+----------------------------------------------------------------------------+ | - | ^ | | - | sqlite3_step | vfs2_unhide +-------------------------------------------------------------------------------+ - v | -+----------------------+ | -| ACTIVE | | -+----------------------+ | - | | - | COMMIT_PHASETWO | - v | -+----------------------+ | -| HIDDEN | | -+----------------------+ | - | | - | vfs2_poll | - v | -+----------------------+ | -| POLLED | -+ -+----------------------+ - -*/ - -static const struct sm_conf wtx_states[SM_STATES_MAX] = { +/** + * Major transitions for this state machine: + * + * +-----------CLOSED----------+ + * | | + * xOpen && no tx in WAL-cur xOpen && tx in WAL-cur + * | | + * | (FB) | + * v (BE) <------ v v-+ vfs2_add_uncommitted + * EMPTY<------BASE------>FOLLOWING-+ + * ^ | (BF) + * | | + * | | sqlite3_step && xWrite(WAL) + * | | + * | v v-+ xWrite(WAL) + * |--ACTIVE-+ + * | | + * vfs2_abort, | | COMMIT_PHASETWO + * vfs2_unhide | | + * | v + * |--HIDDEN + * | | + * | | vfs2_poll + * | | + * | v + * +--POLLED + * + * Abbreviations and omissions: + * - BE occurs when we run a full checkpoint. + * - FB occurs when we call vfs2_apply or vfs2_unadd and no uncommitted + * transactions remain afterward. + * - BF occurs via vfs2_add_uncommitted. + * - EMPTY may move to ACTIVE via sqlite3_step. + * - All states may move back to CLOSED. + */ +static const struct sm_conf wtx_states[WTX_NR] = { [WTX_CLOSED] = { .flags = SM_INITIAL|SM_FINAL, .name = "closed", - .allowed = BITS(WTX_EMPTY)|BITS(WTX_FOLLOWING)|BITS(WTX_FLUSH), + .allowed = BITS(WTX_EMPTY) + |BITS(WTX_BASE) + |BITS(WTX_FOLLOWING), }, - [WTX_EMPTY] = { - .flags = 0, - .name = "empty", - .allowed = BITS(WTX_FOLLOWING)|BITS(WTX_FLUSH)|BITS(WTX_ACTIVE)|BITS(WTX_CLOSED), + [WTX_EMPTY] = { + .name = "empty", + .allowed = BITS(WTX_FOLLOWING) + |BITS(WTX_ACTIVE) + |BITS(WTX_CLOSED), + }, + [WTX_FOLLOWING] = { + .name = "following", + .allowed = BITS(WTX_BASE) + |BITS(WTX_FOLLOWING) + |BITS(WTX_CLOSED), + }, + [WTX_BASE] = { + .name = "base", + .allowed = BITS(WTX_ACTIVE) + |BITS(WTX_FOLLOWING) + |BITS(WTX_EMPTY) + |BITS(WTX_CLOSED), + }, + [WTX_ACTIVE] = { + .name = "active", + .allowed = BITS(WTX_BASE) + |BITS(WTX_ACTIVE) + |BITS(WTX_HIDDEN) + |BITS(WTX_CLOSED), + }, + [WTX_HIDDEN] = { + .name = "hidden", + .allowed = BITS(WTX_BASE) + |BITS(WTX_POLLED) + |BITS(WTX_CLOSED), + }, + [WTX_POLLED] = { + .name = "polled", + .allowed = BITS(WTX_BASE) + |BITS(WTX_CLOSED), + }, +}; + +/** + * State machine that just tracks which WAL is WAL-cur, for observability. + */ +enum { + WAL_1, + WAL_2, + WAL_NR +}; + +static const struct sm_conf wal_states[WAL_NR] = { + [WAL_1] = { + .name = "wal1", + .allowed = BITS(WAL_2), + .flags = SM_INITIAL|SM_FINAL }, - [WTX_FOLLOWING] = { - .flags = 0, - .name = "following", - .allowed = BITS(WTX_FOLLOWING)|BITS(WTX_FLUSH)|BITS(WTX_CLOSED), + [WAL_2] = { + .name = "wal2", + .allowed = BITS(WAL_1), + .flags = SM_INITIAL|SM_FINAL + } + +}; + +/** + * State machine that just tracks the state of the WAL write lock, for + * observability. + */ +enum { + WLK_UNLOCKED, + WLK_LOCKED, + WLK_NR +}; + +static const struct sm_conf wlk_states[WLK_NR] = { + [WLK_UNLOCKED] = { + .name = "unlocked", + .allowed = BITS(WLK_LOCKED), + .flags = SM_INITIAL|SM_FINAL }, - [WTX_FLUSH] = { - .flags = 0, - .name = "flush", - .allowed = BITS(WTX_FOLLOWING)|BITS(WTX_FLUSH)|BITS(WTX_ACTIVE)|BITS(WTX_CLOSED), + [WLK_LOCKED] = { + .name = "locked", + .allowed = BITS(WLK_UNLOCKED), + .flags = SM_INITIAL|SM_FINAL + } +}; + +/** + * State machine that tracks who has been working on the shm, for + * observability. + * + * Other than observability, the point of this state machine is to check that + * SQLite does not run recovery on the shm after vfs2 has modified it. + */ +enum { + /* Nothing in the shm yet. */ + SHM_EMPTY, + /* SQLite is holding the recovery lock, building up the shm. */ + SHM_RECOVERING, + /* SQLite has finished building up the shm, vfs2 has not touched it. */ + SHM_RECOVERED, + /* vfs2 made the last modification to the shm. */ + SHM_MANAGED, + SHM_NR +}; + +static const struct sm_conf shm_states[SHM_NR] = { + [SHM_EMPTY] = { + .name = "empty", + .allowed = BITS(SHM_RECOVERING)|BITS(SHM_MANAGED), + .flags = SM_INITIAL }, - [WTX_BASE] = { - .flags = 0, - .name = "base", - .allowed = BITS(WTX_FOLLOWING)|BITS(WTX_BASE)|BITS(WTX_ACTIVE)|BITS(WTX_EMPTY)|BITS(WTX_CLOSED), + [SHM_RECOVERING] = { + .name = "recovering", + .allowed = BITS(SHM_RECOVERED), }, - [WTX_ACTIVE] = { - .flags = 0, - .name = "active", - .allowed = BITS(WTX_BASE)|BITS(WTX_ACTIVE)|BITS(WTX_HIDDEN)|BITS(WTX_CLOSED), + [SHM_RECOVERED] = { + .name = "recovered", + .allowed = BITS(SHM_MANAGED), + .flags = SM_FINAL }, - [WTX_HIDDEN] = { - .flags = 0, - .name = "hidden", - .allowed = BITS(WTX_BASE)|BITS(WTX_POLLED)|BITS(WTX_CLOSED), + [SHM_MANAGED] = { + .name = "managed", + .allowed = BITS(SHM_MANAGED), + .flags = SM_FINAL }, - [WTX_POLLED] = { - .flags = 0, - .name = "polled", - .allowed = BITS(WTX_BASE)|BITS(WTX_CLOSED), +}; + +/** + * State machine that just tracks whether a checkpoint is in progress or not, + * for observability. + */ +enum { + CKPT_QUIESCENT, + CKPT_CHECKPOINTING, + CKPT_NR +}; + +static const struct sm_conf ckpt_states[CKPT_NR] = { + [CKPT_QUIESCENT] = { + .name = "quiescent", + .allowed = BITS(CKPT_CHECKPOINTING), + .flags = SM_INITIAL|SM_FINAL }, + [CKPT_CHECKPOINTING] = { + .name = "checkpointing", + .allowed = BITS(CKPT_QUIESCENT) + } }; +/* clang-format on */ + +/** + * A dummy invariant, for when you just don't care. + */ +static bool no_invariant(const struct sm *sm, int prev) +{ + (void)sm; + (void)prev; + return true; +} + /** * Userdata owned by the VFS. */ struct common { sqlite3_vfs *orig; /* underlying VFS */ pthread_rwlock_t rwlock; /* protects the queue */ - queue queue; /* queue of entry */ + queue queue; /* queue of entry */ }; struct cksums { @@ -165,24 +292,18 @@ static bool is_bigendian(void) return *(char *)(&x) == 0; } -static uint32_t native_magic(void) +static void update_cksums(const uint8_t *p, size_t len, struct cksums *sums) { - return is_bigendian() ? BE_MAGIC : LE_MAGIC; -} - -static void update_cksums(uint32_t magic, const uint8_t *p, size_t len, struct cksums *sums) -{ - PRE(magic == BE_MAGIC || magic == LE_MAGIC); PRE(len % 8 == 0); const uint8_t *end = p + len; - for (; p != end; p += 8) { - if (magic == BE_MAGIC) { - sums->cksum1 += ByteGetBe32(p) + sums->cksum2; - sums->cksum2 += ByteGetBe32(p + 4) + sums->cksum1; - } else { - sums->cksum1 += ByteGetLe32(p) + sums->cksum2; - sums->cksum2 += ByteGetLe32(p + 4) + sums->cksum1; - } + uint32_t n; + while (p != end) { + memcpy(&n, p, 4); + sums->cksum1 += n + sums->cksum2; + p += 4; + memcpy(&n, p, 4); + sums->cksum2 += n + sums->cksum1; + p += 4; } } @@ -238,14 +359,179 @@ struct wal_index_full_hdr { uint8_t unused[4]; }; +#define SHM_SHORT_PGNOS_LEN 4062 +#define SHM_LONG_PGNOS_LEN 4096 +#define SHM_HT_LEN 8192 + +/** + * View of a shm region. + * + * The zeroth region looks like this (not to scale): + * + * | header | page numbers | hash table | + * + * The first and later regions look like this (also not to scale): + * + * | page numbers | hash table | + */ +struct shm_region { + union { + /* region 0 */ + struct { + struct wal_index_full_hdr hdr; + uint32_t pgnos_short[SHM_SHORT_PGNOS_LEN]; + }; + /* region 1 and later */ + uint32_t pgnos_long[SHM_LONG_PGNOS_LEN]; + }; + uint16_t ht[SHM_HT_LEN]; +}; + /** - * View of the zeroth shm region, which contains the WAL index header. + * "Shared" memory implementation for storing the WAL-index. + * + * Each region is stored in its own heap allocation of size + * VFS2_WAL_INDEX_REGION_SIZE. Using separate allocations ensures that existing + * region pointers held by SQLite remain valid when new regions are mapped, as + * SQLite expects. */ -union vfs2_shm_region0 { - struct wal_index_full_hdr hdr; - char bytes[VFS2_WAL_INDEX_REGION_SIZE]; +struct shm { + struct shm_region **regions; + int num_regions; + /* Counts the net number of times that SQLite has mapped the zeroth + * region. As a sanity check, we assert that this value is zero before + * we free the shm. */ + unsigned refcount; + struct sm sm; }; +static_assert(sizeof(struct shm_region) == VFS2_WAL_INDEX_REGION_SIZE, + "shm regions have the expected size"); + +/** + * Allocate a new shm region at the next-highest index. + * + * Returns a pointer to the new region, or NULL if allocation failed. + * In the latter case the shm is unchanged. + */ +static struct shm_region *shm_grow(struct shm *shm) +{ + int index = shm->num_regions; + struct shm_region *r = sqlite3_malloc64(VFS2_WAL_INDEX_REGION_SIZE); + if (r == NULL) { + goto err; + } + *r = (struct shm_region){}; + sqlite3_uint64 size = + (sqlite3_uint64)(index + 1) * (sqlite3_uint64)sizeof(*shm->regions); + struct shm_region **p = sqlite3_realloc64(shm->regions, size); + if (p == NULL) { + goto err_after_alloc_region; + } + p[index] = r; + shm->regions = p; + shm->num_regions = index + 1; + return r; + +err_after_alloc_region: + sqlite3_free(r); +err: + return NULL; +} + +static void write_basic_hdr_cksums(struct wal_index_basic_hdr *bhdr) +{ + struct cksums sums = {}; + const uint8_t *p = (const uint8_t *)bhdr; + size_t len = offsetof(struct wal_index_basic_hdr, cksums); + update_cksums(p, len, &sums); + bhdr->cksums = sums; +} + +/** + * Clear out all data in the WAL-index after a WAL swap, and re-initialize the + * header. + */ +static void shm_restart(struct shm *shm, struct wal_hdr whdr) +{ + for (int i = 0; i < shm->num_regions; i++) { + *shm->regions[i] = (struct shm_region){}; + } + + /* TODO(cole) eliminate redundancy with shm_init */ + struct shm_region *r0 = shm->regions[0]; + struct wal_index_full_hdr *ihdr = &r0->hdr; + ihdr->basic[0].iVersion = WAL_MAX_VERSION; + ihdr->basic[0].isInit = 1; + ihdr->basic[0].bigEndCksum = is_bigendian(); + ihdr->basic[0].szPage = (uint16_t)ByteGetBe32(whdr.page_size); + write_basic_hdr_cksums(&ihdr->basic[0]); + ihdr->basic[1] = ihdr->basic[0]; + ihdr->marks[0] = 0; + ihdr->marks[1] = READ_MARK_UNUSED; + ihdr->marks[2] = READ_MARK_UNUSED; + ihdr->marks[3] = READ_MARK_UNUSED; + ihdr->marks[4] = READ_MARK_UNUSED; + sm_move(&shm->sm, SHM_MANAGED); +} + +/** + * Perform initial setup of the WAL-index. + * + * This allocates the zeroth region and fills in the WAL-index header + * based on the provided header of WAL-cur. + */ +static int shm_init(struct shm *shm, struct wal_hdr whdr) +{ + struct shm_region *r0 = shm_grow(shm); + if (r0 == NULL) { + return SQLITE_NOMEM; + } + shm_restart(shm, whdr); + return SQLITE_OK; +} + +/** + * Add the page number for a frame to the appropriate page number array + * in the WAL-index. + * + * This allocates a new shm region if necessary, and hence can fail with + * SQLITE_NOMEM. + */ +static int shm_add_pgno(struct shm *shm, uint32_t frame, uint32_t pgno) +{ + PRE(shm->num_regions > 0); + if (frame < SHM_SHORT_PGNOS_LEN) { + struct shm_region *r0 = shm->regions[0]; + r0->pgnos_short[frame] = pgno; + return SQLITE_OK; + } + + uint32_t regno = 1 + (frame - SHM_SHORT_PGNOS_LEN) / SHM_LONG_PGNOS_LEN; + uint32_t index = (frame - SHM_SHORT_PGNOS_LEN) % SHM_LONG_PGNOS_LEN; + PRE(regno <= (uint32_t)shm->num_regions + 1); + struct shm_region *region; + if (regno == (uint32_t)shm->num_regions + 1) { + region = shm_grow(shm); + if (region == NULL) { + return SQLITE_NOMEM; + } + } else { + region = shm->regions[regno]; + } + region->pgnos_long[index] = pgno; + sm_move(&shm->sm, SHM_MANAGED); + return SQLITE_OK; +} + +static void shm_free(struct shm *shm) +{ + for (int i = 0; i < shm->num_regions; i++) { + sqlite3_free(shm->regions[i]); + } + sqlite3_free(shm->regions); +} + struct entry { /* Next/prev entries for this VFS. */ queue link; @@ -270,14 +556,16 @@ struct entry { /* Base VFS file object for WAL-prev */ sqlite3_file *wal_prev; - /* Number of `struct file` with SQLITE_OPEN_MAIN_DB that point to this entry */ + /* Number of `struct file` with SQLITE_OPEN_MAIN_DB that point to this + * entry */ unsigned refcount_main_db; - /* Number of `struct file` with SQLITE_OPEN_WAL that point to this entry */ + /* Number of `struct file` with SQLITE_OPEN_WAL that point to this entry + */ unsigned refcount_wal; - /* if WAL-cur is nonempty at startup, we read its header, verify the checkum, - * and use it to initialize the page size. otherwise, we wait until the first - * write to the WAL, which should be the header */ + /* if WAL-cur is nonempty at startup, we read its header, verify the + * checkum, and use it to initialize the page size. otherwise, we wait + * until the first write to the WAL, which should be the header */ uint32_t page_size; /* For ACTIVE, HIDDEN, POLLED: the header that hides the pending txn */ @@ -285,16 +573,15 @@ struct entry { /* For ACTIVE, HIDDEN, POLLED: the header that shows the pending txn */ struct wal_index_basic_hdr pending_txn_hdr; - /* shm implementation; holds the WAL index */ - void **shm_regions; - int shm_regions_len; - unsigned shm_refcount; - /* Zero for unlocked, positive for read-locked, UINT_MAX for write-locked */ + /* Shared memory implementation: holds the WAL-index. */ + struct shm shm; + /* Zero for unlocked, positive for read-locked, UINT_MAX for + * write-locked */ unsigned shm_locks[SQLITE_SHM_NLOCK]; /* For ACTIVE, HIDDEN: the pending txn. start and len * are in units of frames. */ - struct vfs2_wal_frame *pending_txn_frames; + dqlite_vfs_frame *pending_txn_frames; uint32_t pending_txn_start; uint32_t pending_txn_len; uint32_t pending_txn_last_frame_commit; @@ -307,6 +594,9 @@ struct entry { struct wal_hdr wal_prev_hdr; struct sm wtx_sm; + struct sm wal_sm; + struct sm wlk_sm; + struct sm ckpt_sm; /* VFS-wide data (immutable) */ struct common *common; }; @@ -331,7 +621,7 @@ static void free_pending_txn(struct entry *e) { if (e->pending_txn_frames != NULL) { for (uint32_t i = 0; i < e->pending_txn_len; i++) { - sqlite3_free(e->pending_txn_frames[i].page); + sqlite3_free(e->pending_txn_frames[i].data); } sqlite3_free(e->pending_txn_frames); } @@ -355,21 +645,25 @@ static bool salts_equal(struct vfs2_salts a, struct vfs2_salts b) return get_salt1(a) == get_salt1(b) && get_salt2(a) == get_salt2(b); } -static struct wal_index_full_hdr *get_full_hdr(struct entry *e) +static struct vfs2_salts make_salts(uint32_t salt1, uint32_t salt2) { - PRE(e->shm_regions_len > 0); - PRE(e->shm_regions != NULL); - return e->shm_regions[0]; + struct vfs2_salts ret; + BytePutBe32(salt1, ret.salt1); + BytePutBe32(salt2, ret.salt2); + return ret; } -static bool no_pending_txn(const struct entry *e) +static struct wal_index_full_hdr *get_full_hdr(struct entry *e) { - return e->pending_txn_len == 0 && e->pending_txn_frames == NULL && e->pending_txn_last_frame_commit == 0; + PRE(e->shm.num_regions > 0); + PRE(e->shm.regions != NULL); + return &e->shm.regions[0]->hdr; } -static bool write_lock_held(const struct entry *e) +static bool no_pending_txn(const struct entry *e) { - return e->shm_locks[WAL_WRITE_LOCK] == VFS2_EXCLUSIVE; + return e->pending_txn_len == 0 && e->pending_txn_frames == NULL && + e->pending_txn_last_frame_commit == 0; } static bool wal_index_basic_hdr_equal(struct wal_index_basic_hdr a, @@ -378,170 +672,32 @@ static bool wal_index_basic_hdr_equal(struct wal_index_basic_hdr a, return memcmp(&a, &b, sizeof(struct wal_index_basic_hdr)) == 0; } -static bool wal_index_basic_hdr_zeroed(struct wal_index_basic_hdr h) -{ - return wal_index_basic_hdr_equal(h, (struct wal_index_basic_hdr){}); -} - -static bool wal_index_basic_hdr_advanced(struct wal_index_basic_hdr new, - struct wal_index_basic_hdr old) -{ - return new.iChange == old.iChange + 1 && - new.nPage >= old.nPage /* no vacuums here */ - && ((get_salt1(new.salts) == get_salt1(old.salts) && - get_salt2(new.salts) == get_salt2(old.salts)) || - /* note the weirdness with zero salts */ - (get_salt1(old.salts) == 0 && - get_salt2(old.salts) == 0)) - && new.mxFrame > old.mxFrame; -} - -/* Check that the hash tables in the WAL index have been initialized - * by looking for nonzero bytes after the WAL index header. (TODO: - * actually parse the hash tables?) */ -static bool wal_index_recovered(const struct entry *e) -{ - PRE(e->shm_regions_len > 0); - char *p = e->shm_regions[0]; - for (size_t i = sizeof(struct wal_index_full_hdr); i < VFS2_WAL_INDEX_REGION_SIZE; i++) { - if (p[i] != 0) { - return true; - } - } - return false; -} - static bool is_valid_page_size(unsigned long n) { return n >= 1 << 9 && n <= 1 << 16 && is_po2(n); } -static bool is_open(const struct entry *e) -{ - return e->main_db_name != NULL - && e->wal_moving_name != NULL - && e->wal_cur_fixed_name != NULL - && e->wal_cur != NULL - && e->wal_prev_fixed_name != NULL - && e->wal_prev != NULL - && (e->refcount_main_db > 0 || e->refcount_wal > 0) - && e->shm_regions != NULL - && e->shm_regions_len > 0 - && e->shm_regions[0] != NULL - && e->common != NULL; -} - -static bool basic_hdr_valid(struct wal_index_basic_hdr bhdr) +static bool basic_hdr_valid(const struct wal_index_basic_hdr *bhdr) { struct cksums sums = {}; - update_cksums(bhdr.bigEndCksum ? BE_MAGIC : LE_MAGIC, (uint8_t *)&bhdr, offsetof(struct wal_index_basic_hdr, cksums), &sums); - return bhdr.iVersion == 3007000 - && bhdr.isInit == 1 - && cksums_equal(sums, bhdr.cksums); + const uint8_t *p = (const uint8_t *)bhdr; + size_t len = offsetof(struct wal_index_basic_hdr, cksums); + update_cksums(p, len, &sums); + return bhdr->iVersion == WAL_MAX_VERSION && bhdr->isInit == 1 && + cksums_equal(sums, bhdr->cksums); } static bool full_hdr_valid(const struct wal_index_full_hdr *ihdr) { - return basic_hdr_valid(ihdr->basic[0]) && wal_index_basic_hdr_equal(ihdr->basic[0], ihdr->basic[1]); + return basic_hdr_valid(&ihdr->basic[0]) && + wal_index_basic_hdr_equal(ihdr->basic[0], ihdr->basic[1]); } static bool wtx_invariant(const struct sm *sm, int prev) { - /* TODO make use of this */ + (void)sm; (void)prev; - - struct entry *e = CONTAINER_OF(sm, struct entry, wtx_sm); - - if (sm_state(sm) == WTX_CLOSED) { - char *region = (char *)e; - char zeroed[offsetof(struct entry, wtx_sm)] = {}; - - return CHECK(memcmp(region, zeroed, sizeof(zeroed)) == 0) && - CHECK(e->common != NULL); - } - - if (!CHECK(is_open(e))) { - return false; - } - struct wal_index_full_hdr *ihdr = get_full_hdr(e); - if (!CHECK(full_hdr_valid(ihdr))) { - return false; - } - uint32_t mx = ihdr->basic[0].mxFrame; - uint32_t backfill = ihdr->nBackfill; - uint32_t cursor = e->wal_cursor; - if (!CHECK(backfill <= mx) || !CHECK(mx <= cursor)) { - return false; - } - - /* TODO any checks applicable to the read marks and read locks? */ - - if (sm_state(sm) == WTX_EMPTY) { - return CHECK(mx == backfill) && - CHECK(mx == cursor) && - CHECK(no_pending_txn(e)) && - CHECK(!write_lock_held(e)); - } - - if (!CHECK(is_valid_page_size(e->page_size))) { - return false; - } - - if (sm_state(sm) == WTX_FOLLOWING) { - return CHECK(no_pending_txn(e)) && - CHECK(write_lock_held(e)) && - CHECK(mx < cursor); - } - - if (sm_state(sm) == WTX_FLUSH) { - return CHECK(no_pending_txn(e)) && - CHECK(!write_lock_held(e)) && - CHECK(ERGO(mx > 0, backfill < mx)) && - CHECK(mx == cursor); - } - - if (sm_state(sm) == WTX_BASE) { - return CHECK(no_pending_txn(e)) && - CHECK(!write_lock_held(e)) && - CHECK(ERGO(mx > 0, backfill < mx)) && - CHECK(mx == cursor) && - CHECK(ERGO(mx > 0, wal_index_recovered(e))); - } - - if (sm_state(sm) == WTX_ACTIVE) { - return CHECK(wal_index_basic_hdr_equal(get_full_hdr(e)->basic[0], e->prev_txn_hdr)) && - CHECK(wal_index_basic_hdr_zeroed(e->pending_txn_hdr)) && - CHECK(write_lock_held(e)); - } - - if (!CHECK(mx < cursor) || - !CHECK(e->pending_txn_len > 0) || - !CHECK(e->pending_txn_start + e->pending_txn_len == e->wal_cursor)) { - return false; - } - - if (sm_state(sm) == WTX_HIDDEN) { - bool res = CHECK(wal_index_basic_hdr_equal(get_full_hdr(e)->basic[0], e->prev_txn_hdr)) && - CHECK(wal_index_basic_hdr_advanced(e->pending_txn_hdr, e->prev_txn_hdr)) && - CHECK(!write_lock_held(e)) && - CHECK(e->pending_txn_frames != NULL); - if (!res) { - return false; - } - for (uint32_t i = 0; i < e->pending_txn_len; i++) { - res &= CHECK(e->pending_txn_frames[i].page != NULL); - } - return res; - } - - if (sm_state(sm) == WTX_POLLED) { - return CHECK(wal_index_basic_hdr_equal(get_full_hdr(e)->basic[0], e->prev_txn_hdr)) && - CHECK(wal_index_basic_hdr_advanced(e->pending_txn_hdr, e->prev_txn_hdr)) && - CHECK(write_lock_held(e)) && - CHECK(e->pending_txn_frames == NULL); - } - - assert(0); + return true; } static int check_wal_integrity(sqlite3_file *f) @@ -579,11 +735,13 @@ static void maybe_close_entry(struct entry *e) free_pending_txn(e); + assert(e->shm.refcount == 0); + shm_free(&e->shm); + pthread_rwlock_wrlock(&e->common->rwlock); queue_remove(&e->link); pthread_rwlock_unlock(&e->common->rwlock); sqlite3_free(e); - } static int vfs2_close(sqlite3_file *file) @@ -616,73 +774,58 @@ static int vfs2_read(sqlite3_file *file, void *buf, int amt, sqlite3_int64 ofst) return orig->pMethods->xRead(orig, buf, amt, ofst); } -static int wal_swap(struct entry *e, - const struct wal_hdr *wal_hdr) +/** + * Update the volatile state so that WAL-cur becomes WAL-prev and vice versa. + * + * This is implemented by exchanging the wal_cur and wal_prev file handles and + * fixed names, and flipping the moving name hard link to point to the opposite + * file. + */ +static void wal_swap(struct entry *e, const struct wal_hdr *hdr) { - PRE(e->pending_txn_len == 0); - PRE(e->pending_txn_frames == NULL); int rv; - e->page_size = ByteGetBe32(wal_hdr->page_size); - - /* Terminology: the outgoing WAL is the one that's moving - * from cur to prev. The incoming WAL is the one that's moving - * from prev to cur. */ + /* Terminology: the outgoing WAL is the one that's moving from cur to + * prev. The incoming WAL is the one that's moving from prev to cur. */ sqlite3_file *phys_outgoing = e->wal_cur; char *name_outgoing = e->wal_cur_fixed_name; sqlite3_file *phys_incoming = e->wal_prev; char *name_incoming = e->wal_prev_fixed_name; - tracef("wal swap outgoing=%s incoming=%s", name_outgoing, name_incoming); - - /* Write the new header of the incoming WAL. */ - rv = phys_incoming->pMethods->xWrite(phys_incoming, wal_hdr, - sizeof(struct wal_hdr), 0); - if (rv != SQLITE_OK) { - return rv; - } - - /* In-memory WAL swap. */ e->wal_cur = phys_incoming; e->wal_cur_fixed_name = name_incoming; e->wal_prev = phys_outgoing; e->wal_prev_fixed_name = name_outgoing; e->wal_cursor = 0; e->wal_prev_hdr = e->wal_cur_hdr; - e->wal_cur_hdr = *wal_hdr; + e->wal_cur_hdr = *hdr; + + sm_move(&e->wal_sm, !sm_state(&e->wal_sm)); - /* Move the moving name. */ + /* Best-effort: flip the moving name. + * + * If these syscalls fail, we can end up with no moving name, or a + * moving name that points to the wrong WAL. We don't use the moving + * name as the source of truth, so this can't lead to dqlite operating + * incorrectly. At worst, it's inconvenient for users who want to + * inspect their database with SQLite (readonly! when dqlite is not + * running!). */ rv = unlink(e->wal_moving_name); - if (rv != 0 && errno != ENOENT) { - tracef("unlink = IOERR"); - return SQLITE_IOERR; - } + (void)rv; rv = link(name_incoming, e->wal_moving_name); - if (rv != 0) { - tracef("link = IOERR"); - return SQLITE_IOERR; - } - - /* TODO do we need an fsync here? */ - - /* Best-effort: invalidate the outgoing physical WAL so that nobody gets - * confused. */ - (void)phys_outgoing->pMethods->xWrite(phys_outgoing, &invalid_magic, - sizeof(invalid_magic), 0); - return SQLITE_OK; + (void)rv; } static int vfs2_wal_write_frame_hdr(struct entry *e, const struct wal_frame_hdr *fhdr, uint32_t x) { - struct vfs2_wal_frame *frames = e->pending_txn_frames; + dqlite_vfs_frame *frames = e->pending_txn_frames; if (no_pending_txn(e)) { assert(x == e->wal_cursor); e->pending_txn_start = x; } uint32_t n = e->pending_txn_len; - tracef("orig=%u start=%u n=%u", x, e->pending_txn_start, n); x -= e->pending_txn_start; assert(x <= n); if (e->pending_txn_len == 0 && x == 0) { @@ -694,26 +837,25 @@ static int vfs2_wal_write_frame_hdr(struct entry *e, } if (x == n) { /* FIXME reallocating every time seems bad */ - sqlite3_uint64 z = (sqlite3_uint64)sizeof(*frames) * (sqlite3_uint64)(n + 1); + sqlite3_uint64 z = + (sqlite3_uint64)sizeof(*frames) * (sqlite3_uint64)(n + 1); e->pending_txn_frames = sqlite3_realloc64(frames, z); if (e->pending_txn_frames == NULL) { return SQLITE_NOMEM; } - struct vfs2_wal_frame *frame = &e->pending_txn_frames[n]; + dqlite_vfs_frame *frame = &e->pending_txn_frames[n]; uint32_t commit = ByteGetBe32(fhdr->commit); frame->page_number = ByteGetBe32(fhdr->page_number); - frame->commit = commit; - frame->page = NULL; + frame->data = NULL; e->pending_txn_last_frame_commit = commit; e->pending_txn_len++; } else { /* Overwriting a previously-written frame in the current * transaction. */ - struct vfs2_wal_frame *frame = &e->pending_txn_frames[x]; + dqlite_vfs_frame *frame = &e->pending_txn_frames[x]; frame->page_number = ByteGetBe32(fhdr->page_number); - frame->commit = ByteGetBe32(fhdr->commit); - sqlite3_free(frame->page); - frame->page = NULL; + sqlite3_free(frame->data); + frame->data = NULL; } sm_move(&e->wtx_sm, WTX_ACTIVE); return SQLITE_OK; @@ -725,7 +867,9 @@ static int vfs2_wal_post_write(struct entry *e, sqlite3_int64 ofst) { uint32_t frame_size = VFS2_WAL_FRAME_HDR_SIZE + e->page_size; - if (amt == VFS2_WAL_FRAME_HDR_SIZE) { + if (amt == (int)sizeof(struct wal_hdr)) { + return SQLITE_OK; + } else if (amt == VFS2_WAL_FRAME_HDR_SIZE) { ofst -= (sqlite3_int64)sizeof(struct wal_hdr); assert(ofst % frame_size == 0); sqlite3_int64 frame_ofst = ofst / (sqlite3_int64)frame_size; @@ -737,13 +881,13 @@ static int vfs2_wal_post_write(struct entry *e, x /= frame_size; x -= e->pending_txn_start; assert(0 <= x && x < e->pending_txn_len); - struct vfs2_wal_frame *frame = &e->pending_txn_frames[x]; - assert(frame->page == NULL); - frame->page = sqlite3_malloc(amt); - if (frame->page == NULL) { + dqlite_vfs_frame *frame = &e->pending_txn_frames[x]; + assert(frame->data == NULL); + frame->data = sqlite3_malloc(amt); + if (frame->data == NULL) { return SQLITE_NOMEM; } - memcpy(frame->page, buf, (size_t)amt); + memcpy(frame->data, buf, (size_t)amt); sm_move(&e->wtx_sm, WTX_ACTIVE); return SQLITE_OK; } else { @@ -759,22 +903,22 @@ static int vfs2_write(sqlite3_file *file, struct file *xfile = (struct file *)file; int rv; + /* A write to the WAL at offset 0 must be a header write, and indicates + * that SQLite has reset the WAL. We react by doing a WAL swap. + */ if ((xfile->flags & SQLITE_OPEN_WAL) && ofst == 0) { assert(amt == sizeof(struct wal_hdr)); const struct wal_hdr *hdr = buf; struct entry *e = xfile->entry; - tracef("about to wal swap"); - rv = wal_swap(e, hdr); - if (rv != SQLITE_OK) { - return rv; - } - /* check that the WAL-index hdr makes sense and save it */ + wal_swap(e, hdr); + /* Save the WAL-index header so that we can roll back to it in + * the future. The assertions check that the header we're + * saving has been updated to match the new, empty WAL. */ struct wal_index_basic_hdr ihdr = get_full_hdr(e)->basic[0]; assert(ihdr.isInit == 1); assert(ihdr.mxFrame == 0); e->prev_txn_hdr = ihdr; sm_move(&e->wtx_sm, WTX_ACTIVE); - return SQLITE_OK; } sqlite3_file *orig = get_orig(xfile); @@ -785,7 +929,6 @@ static int vfs2_write(sqlite3_file *file, if (xfile->flags & SQLITE_OPEN_WAL) { struct entry *e = xfile->entry; - tracef("wrote to WAL name=%s amt=%d ofst=%lld", e->wal_cur_fixed_name, amt, ofst); return vfs2_wal_post_write(e, buf, amt, ofst); } @@ -834,16 +977,28 @@ static int vfs2_check_reserved_lock(sqlite3_file *file, int *out) return orig->pMethods->xCheckReservedLock(orig, out); } -static int interpret_pragma(char **args) +static int interpret_pragma(struct entry *e, char **args) { - char **e = &args[0]; + char **err = &args[0]; char *left = args[1]; PRE(left != NULL); char *right = args[2]; - if (strcmp(left, "journal_mode") == 0 && right != NULL && strcasecmp(right, "wal") != 0) { - *e = sqlite3_mprintf("dqlite requires WAL mode"); + if (strcmp(left, "journal_mode") == 0 && right != NULL && + strcasecmp(right, "wal") != 0) { + *err = sqlite3_mprintf("dqlite requires WAL mode"); return SQLITE_ERROR; + } else if (strcmp(left, "page_size") == 0 && right != NULL) { + char *end = right + strlen(right); + unsigned long val = strtoul(right, &end, 10); + if (right != end && *end == '\0' && is_valid_page_size(val)) { + if (e->page_size != 0 && val != e->page_size) { + *err = sqlite3_mprintf( + "page size cannot be changed"); + return SQLITE_ERROR; + } + e->page_size = (uint32_t)val; + } } return SQLITE_NOTFOUND; @@ -866,7 +1021,7 @@ static int vfs2_file_control(sqlite3_file *file, int op, void *arg) e->wal_cursor += e->pending_txn_len; sm_move(&xfile->entry->wtx_sm, WTX_HIDDEN); } else if (op == SQLITE_FCNTL_PRAGMA) { - rv = interpret_pragma(arg); + rv = interpret_pragma(e, arg); if (rv != SQLITE_NOTFOUND) { return rv; } @@ -919,48 +1074,30 @@ static int vfs2_shm_map(sqlite3_file *file, int extend, void volatile **out) { + PRE(regsz == VFS2_WAL_INDEX_REGION_SIZE); struct file *xfile = (struct file *)file; + PRE(xfile->flags & SQLITE_OPEN_MAIN_DB); struct entry *e = xfile->entry; - void *region; - int rv; + struct shm *shm = &e->shm; + struct shm_region *region; - if (e->shm_regions != NULL && regno < e->shm_regions_len) { - region = e->shm_regions[regno]; - assert(region != NULL); + if (shm->regions != NULL && regno < shm->num_regions) { + region = shm->regions[regno]; } else if (extend != 0) { - assert(regno == e->shm_regions_len); - region = sqlite3_malloc(regsz); + assert(regno == shm->num_regions); + region = shm_grow(shm); if (region == NULL) { - rv = SQLITE_NOMEM; - goto err; - } - memset(region, 0, (size_t)regsz); - /* FIXME reallocating every time seems bad */ - sqlite3_uint64 z = (sqlite3_uint64)sizeof(*e->shm_regions) * (sqlite3_uint64)(e->shm_regions_len + 1); - void **regions = sqlite3_realloc64(e->shm_regions, z); - if (regions == NULL) { - rv = SQLITE_NOMEM; - goto err_after_region_malloc; + return SQLITE_NOMEM; } - e->shm_regions = regions; - e->shm_regions[regno] = region; - e->shm_regions_len++; } else { region = NULL; } *out = region; if (regno == 0 && region != NULL) { - e->shm_refcount++; + e->shm.refcount++; } return SQLITE_OK; - -err_after_region_malloc: - sqlite3_free(region); -err: - assert(rv != SQLITE_OK); - *out = NULL; - return rv; } static __attribute__((noinline)) int busy(void) @@ -1004,12 +1141,20 @@ static int vfs2_shm_lock(sqlite3_file *file, int ofst, int n, int flags) for (int i = ofst; i < ofst + n; i++) { e->shm_locks[i] = VFS2_EXCLUSIVE; + if (i == WAL_RECOVER_LOCK) { + sm_move(&e->shm.sm, SHM_RECOVERING); + } } /* XXX maybe this shouldn't be an assertion */ if (ofst == WAL_WRITE_LOCK) { assert(n == 1); assert(e->pending_txn_len == 0); + sm_move(&e->wlk_sm, WLK_LOCKED); + } + + if (ofst == WAL_CKPT_LOCK && n == 1) { + sm_move(&e->ckpt_sm, CKPT_CHECKPOINTING); } } else if (flags == (SQLITE_SHM_UNLOCK | SQLITE_SHM_SHARED)) { for (int i = ofst; i < ofst + n; i++) { @@ -1023,30 +1168,31 @@ static int vfs2_shm_lock(sqlite3_file *file, int ofst, int n, int flags) } if (ofst <= WAL_RECOVER_LOCK && WAL_RECOVER_LOCK < ofst + n) { - tracef("unlocking the recovery lock!"); + sm_move(&e->shm.sm, SHM_RECOVERED); } if (ofst == WAL_WRITE_LOCK) { - /* Unlocking the write lock: roll back any uncommitted - * transaction. */ + sm_move(&e->wlk_sm, WLK_UNLOCKED); + /* If the last frame of the pending transaction has no + * commit marker when SQLite releases the write lock, it + * means that the transaction rolled back before it + * committed. We respond by throwing away our stored + * frames and resetting the state machine. */ assert(n == 1); - tracef("unlocking write lock"); - /* TODO make sure this is correct */ - if (e->pending_txn_last_frame_commit == 0) { + if (sm_state(&e->wtx_sm) > WTX_BASE && + e->pending_txn_last_frame_commit == 0) { free_pending_txn(e); sm_move(&e->wtx_sm, WTX_BASE); } } else if (ofst == WAL_CKPT_LOCK && n == 1) { - /* End of a checkpoint: if all frames have been backfilled, - * move to EMPTY. */ - assert(n == 1); + /* End of a checkpoint: if all frames have been + * backfilled, move to EMPTY. */ struct wal_index_full_hdr *ihdr = get_full_hdr(e); if (ihdr->nBackfill == ihdr->basic[0].mxFrame) { sm_move(&e->wtx_sm, WTX_EMPTY); } - } /* else if (ofst <= WAL_RECOVER_LOCK && WAL_RECOVER_LOCK < ofst + n) { - sm_move(&e->wtx_sm, WTX_BASE); - } */ + sm_move(&e->ckpt_sm, CKPT_QUIESCENT); + } } else { assert(0); } @@ -1059,24 +1205,18 @@ static void vfs2_shm_barrier(sqlite3_file *file) (void)file; } -static int vfs2_shm_unmap(sqlite3_file *file, int delete) -{ - (void)delete; +static int vfs2_shm_unmap(sqlite3_file *file, int delete) { + /* SQLite sends this flag with the unmap method when the last + * connection to a database runs a complete checkpoint in sqlite3_close + * and then deletes the WAL file. The interpretation is that the + * WAL-index should be deleted as well. dqlite both disables + * checkpoint-on-close and instructs SQLite not to delete the WAL, so + * we don't expect to ever receive the flag. */ + PRE(delete == 0); struct file *xfile = (struct file *)file; struct entry *e = xfile->entry; - e->shm_refcount--; - if (e->shm_refcount == 0) { - for (int i = 0; i < e->shm_regions_len; i++) { - void *region = e->shm_regions[i]; - assert(region != NULL); - sqlite3_free(region); - } - sqlite3_free(e->shm_regions); - - e->shm_regions = NULL; - e->shm_regions_len = 0; - memset(e->shm_locks, 0, sizeof(e->shm_locks)); - } + PRE(e->shm.refcount > 0); + e->shm.refcount--; return SQLITE_OK; } @@ -1116,7 +1256,29 @@ static int compare_wal_headers(struct wal_hdr a, return SQLITE_OK; } -static int read_wal_hdr(sqlite3_file *wal, sqlite3_int64 *size, struct wal_hdr *hdr) +static bool wal_hdr_is_valid(const struct wal_hdr *hdr) +{ + /* TODO(cole) Add other validity constraints. */ + struct cksums sums_found = {}; + const uint8_t *p = (const uint8_t *)hdr; + size_t len = offsetof(struct wal_hdr, cksum1); + update_cksums(p, len, &sums_found); + struct cksums sums_expected = { ByteGetBe32(hdr->cksum1), + ByteGetBe32(hdr->cksum2) }; + return cksums_equal(sums_expected, sums_found); +} + +/** + * Read the header of the given WAL file and detect corruption. + * + * If the file contains a valid header, returns this header in `hdr` and the + * size of the file in `size`. If the file is empty, returns 0 in `size` only. + * If the file can't be read, or it is nonempty but doesn't contain a valid + * header, returns an error. + */ +static int try_read_wal_hdr(sqlite3_file *wal, + sqlite3_int64 *size, + struct wal_hdr *hdr) { int rv; @@ -1124,78 +1286,204 @@ static int read_wal_hdr(sqlite3_file *wal, sqlite3_int64 *size, struct wal_hdr * if (rv != SQLITE_OK) { return rv; } - if (*size >= (sqlite3_int64)sizeof(struct wal_hdr)) { - rv = wal->pMethods->xRead(wal, hdr, sizeof(*hdr), 0); - if (rv != SQLITE_OK) { - return rv; - } - } else { - *hdr = (struct wal_hdr){}; + if (*size == 0) { + return SQLITE_OK; + } + if (*size < (sqlite3_int64)sizeof(*hdr)) { + return SQLITE_CORRUPT; + } + rv = wal->pMethods->xRead(wal, hdr, sizeof(*hdr), 0); + if (rv != SQLITE_OK) { + return rv; + } + if (!wal_hdr_is_valid(hdr)) { + return SQLITE_CORRUPT; } return SQLITE_OK; } -static struct wal_index_full_hdr initial_full_hdr(struct wal_hdr whdr) +static void pgno_ht_insert(uint16_t *ht, uint16_t fx, uint32_t pgno) { - struct wal_index_full_hdr ihdr = {}; - ihdr.basic[0].iVersion = 3007000; - ihdr.basic[0].isInit = 1; - ihdr.basic[0].bigEndCksum = is_bigendian(); - ihdr.basic[0].szPage = (uint16_t)ByteGetBe32(whdr.page_size); - struct cksums sums = {}; - update_cksums(native_magic(), (const void *)&ihdr.basic[0], offsetof(struct wal_index_basic_hdr, cksums), &sums); - ihdr.basic[0].cksums = sums; - ihdr.basic[1] = ihdr.basic[0]; - ihdr.marks[0] = 0; - ihdr.marks[1] = 0; - ihdr.marks[2] = READ_MARK_UNUSED; - ihdr.marks[3] = READ_MARK_UNUSED; - ihdr.marks[4] = READ_MARK_UNUSED; - return ihdr; + uint32_t hash = pgno * 383; + while (ht[hash % SHM_HT_LEN] != 0) { + hash++; + } + /* SQLite uses 1-based frame indices in this context, reserving + * 0 for a sentinel value. */ + fx++; + ht[hash % SHM_HT_LEN] = fx; +} + +/** + * Grab the page number for the given frame from the appropriate array + * in the WAL-index and add it to the corresponding hash table. + */ +static void shm_update_ht(struct shm *shm, uint32_t frame) +{ + PRE(shm->num_regions > 0); + sm_move(&shm->sm, SHM_MANAGED); + if (frame < SHM_SHORT_PGNOS_LEN) { + struct shm_region *r0 = shm->regions[0]; + uint32_t pgno = r0->pgnos_short[frame]; + pgno_ht_insert(r0->ht, (uint16_t)frame, pgno); + return; + } + + uint32_t regno = 1 + (frame - SHM_SHORT_PGNOS_LEN) / SHM_LONG_PGNOS_LEN; + uint32_t index = (frame - SHM_SHORT_PGNOS_LEN) % SHM_LONG_PGNOS_LEN; + PRE(regno <= (uint32_t)shm->num_regions); + struct shm_region *region = shm->regions[regno]; + uint32_t pgno = region->pgnos_long[index]; + pgno_ht_insert(region->ht, (uint16_t)index, pgno); } -static void set_mx_frame(struct wal_index_full_hdr *ihdr, uint32_t mx, struct wal_frame_hdr fhdr) +static void set_mx_frame(struct entry *e, + uint32_t mx, + struct wal_frame_hdr fhdr) { + struct wal_index_full_hdr *ihdr = get_full_hdr(e); uint32_t num_pages = ByteGetBe32(fhdr.commit); PRE(num_pages > 0); + uint32_t old_mx = ihdr->basic[0].mxFrame; ihdr->basic[0].iChange += 1; ihdr->basic[0].mxFrame = mx; ihdr->basic[0].nPage = num_pages; - /* XXX byte order */ ihdr->basic[0].frame_cksums.cksum1 = ByteGetBe32(fhdr.cksum1); ihdr->basic[0].frame_cksums.cksum2 = ByteGetBe32(fhdr.cksum2); - struct cksums sums = {}; - update_cksums(native_magic(), (const void *)&ihdr->basic[0], 40, &sums); - ihdr->basic[0].cksums = sums; + write_basic_hdr_cksums(&ihdr->basic[0]); ihdr->basic[1] = ihdr->basic[0]; + POST(full_hdr_valid(ihdr)); + + struct shm *shm = &e->shm; + for (uint32_t i = old_mx; i < mx; i++) { + /* The page numbers array was already updated during the call + * to add_uncommitted, so we just need to update the hash array. + */ + shm_update_ht(shm, i); + } } -static void restart_full_hdr(struct wal_index_full_hdr *ihdr, struct wal_hdr new_whdr) +static sqlite3_int64 wal_offset_from_cursor(uint32_t page_size, uint32_t cursor) { - /* cf. walRestartHdr */ - ihdr->basic[0].mxFrame = 0; - ihdr->basic[0].salts = new_whdr.salts; - struct cksums sums = {}; - update_cksums(native_magic(), (const void *)&ihdr->basic[0], 40, &sums); - ihdr->basic[0].cksums = sums; - ihdr->basic[1] = ihdr->basic[0]; - ihdr->nBackfill = 0; - ihdr->nBackfillAttempted = 0; + return (sqlite3_int64)sizeof(struct wal_hdr) + + (sqlite3_int64)cursor * + ((sqlite3_int64)sizeof(struct wal_frame_hdr) + + (sqlite3_int64)page_size); } -static uint32_t wal_cursor_from_size(uint32_t page_size, sqlite3_int64 size) -{ - sqlite3_int64 whdr_size = (sqlite3_int64)sizeof(struct wal_hdr); - if (size < whdr_size) { - return 0; +/** + * Read the given WAL file from beginning to end, initializing the WAL-index in + * the process. + * + * The process stops when we reach a frame whose checksums are invalid, or + * after the last valid commit frame, or on the first unsuccessful read, or at + * the end of the transaction designated by `stop`, if that argument is + * non-NULL. + * + * On return, the WAL-index header is initialized with mxFrame = 0 and other + * data matching the given WAL, and the page numbers for all frames up to the + * stopping point are recorded in the WAL-index; the hash tables are not + * initialized. + * + * Returns the stopping point in units of frames, which becomes the wal_cursor. + */ +static int walk_wal(sqlite3_file *wal, + sqlite3_int64 size, + struct wal_hdr hdr, + const struct vfs2_wal_slice *stop, + uint32_t *wal_cursor, + struct shm *shm) +{ + uint32_t page_size = ByteGetBe32(hdr.page_size); + struct cksums sums = { ByteGetBe32(hdr.cksum1), + ByteGetBe32(hdr.cksum2) }; + int rv; + + /* TODO(cole): support WALs that use a non-native byte order for the + * checksums (because our data directory was transferred from another + * machine). */ + if (ByteGetBe32(hdr.magic) != (is_bigendian() ? BE_MAGIC : LE_MAGIC)) { + return SQLITE_ERROR; } - sqlite3_int64 x = (size - whdr_size) / ((sqlite3_int64)sizeof(struct wal_frame_hdr) + (sqlite3_int64)page_size); - return (uint32_t)x; -} -static sqlite3_int64 wal_offset_from_cursor(uint32_t page_size, uint32_t cursor) -{ - return (sqlite3_int64)sizeof(struct wal_hdr) + (sqlite3_int64)cursor * ((sqlite3_int64)sizeof(struct wal_frame_hdr) + (sqlite3_int64)page_size); + /* Check whether we have been provided a stopping point that corresponds + * to a transaction in the current WAL. + * + * TODO(cole) we shouldn't just ignore the stopping point if the salts + * don't match WAL-cur---there are a few possibilities that need to be + * handled differently: + * + * - the stopping point could be at the end of WAL-prev; we should interpret + * this as an instruction to ignore all the frames in WAL-cur + * - the stopping point could be in the interior of WAL-prev: this means + * something has gone badly wrong, since everything in WAL-prev should + * be committed + * - the salts for the stopping point match neither WAL-cur nor WAL-prev: + * this indicates that something has gone wrong in a bizarre way */ + bool have_stop = stop != NULL && salts_equal(stop->salts, hdr.salts); + + uint8_t *page_buf = sqlite3_malloc64(page_size); + if (page_buf == NULL) { + return SQLITE_NOMEM; + } + + sqlite3_int64 off = sizeof(struct wal_hdr); + while (off < size) { + if (have_stop && *wal_cursor == stop->start + stop->len) { + break; + } + + struct wal_frame_hdr fhdr; + rv = wal->pMethods->xRead(wal, &fhdr, sizeof(fhdr), off); + if (rv != SQLITE_OK) { + goto err; + } + /* If the salts for this frame don't match those in the WAL header, + * the frame is (probably) left over from a previous generation of + * the WAL, and so we've found the end of the current generation. */ + if (!salts_equal(fhdr.salts, hdr.salts)) { + break; + } + off += (sqlite3_int64)sizeof(fhdr); + const uint8_t *p = (const uint8_t *)&fhdr; + size_t len = offsetof(struct wal_frame_hdr, salts); + update_cksums(p, len, &sums); + + rv = wal->pMethods->xRead(wal, page_buf, (int)page_size, off); + if (rv != SQLITE_OK) { + goto err; + } + off += page_size; + update_cksums(page_buf, page_size, &sums); + struct cksums frame_sums = { ByteGetBe32(fhdr.cksum1), + ByteGetBe32(fhdr.cksum2) }; + if (!cksums_equal(frame_sums, sums)) { + break; + } + + rv = shm_add_pgno(shm, *wal_cursor, + ByteGetBe32(fhdr.page_number)); + if (rv != SQLITE_OK) { + goto err; + } + *wal_cursor += 1; + + /* We expect the last valid frame to have the commit marker. + * That's because if the last transaction wasn't fully written + * to the WAL, we should have been passed a `stop` argument + * corresponding to some preceding transaction. */ + if (off >= size) { + assert(ByteGetBe32(fhdr.commit) > 0); + } + } + + sqlite3_free(page_buf); + return SQLITE_OK; + +err: + sqlite3_free(page_buf); + POST(rv != SQLITE_OK); + return rv; } static int open_entry(struct common *common, const char *name, struct entry *e) @@ -1215,10 +1503,8 @@ static int open_entry(struct common *common, const char *name, struct entry *e) e->wal_moving_name = sqlite3_malloc(path_cap); e->wal_cur_fixed_name = sqlite3_malloc(path_cap); e->wal_prev_fixed_name = sqlite3_malloc(path_cap); - if (e->main_db_name == NULL || - e->wal_moving_name == NULL || - e->wal_cur_fixed_name == NULL || - e->wal_prev_fixed_name == NULL) { + if (e->main_db_name == NULL || e->wal_moving_name == NULL || + e->wal_cur_fixed_name == NULL || e->wal_prev_fixed_name == NULL) { return SQLITE_NOMEM; } @@ -1234,13 +1520,15 @@ static int open_entry(struct common *common, const char *name, struct entry *e) strcat(e->wal_prev_fixed_name, "-xwal2"); /* TODO EXRESCODE? */ - int phys_wal_flags = SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE|SQLITE_OPEN_WAL|SQLITE_OPEN_NOFOLLOW; + int phys_wal_flags = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | + SQLITE_OPEN_WAL | SQLITE_OPEN_NOFOLLOW; e->wal_cur = sqlite3_malloc(file_cap); if (e->wal_cur == NULL) { return SQLITE_NOMEM; } - rv = v->xOpen(v, e->wal_cur_fixed_name, e->wal_cur, phys_wal_flags, NULL); + rv = v->xOpen(v, e->wal_cur_fixed_name, e->wal_cur, phys_wal_flags, + NULL); if (rv != SQLITE_OK) { return rv; } @@ -1249,20 +1537,21 @@ static int open_entry(struct common *common, const char *name, struct entry *e) if (e->wal_prev == NULL) { return SQLITE_NOMEM; } - rv = v->xOpen(v, e->wal_prev_fixed_name, e->wal_prev, phys_wal_flags, NULL); + rv = v->xOpen(v, e->wal_prev_fixed_name, e->wal_prev, phys_wal_flags, + NULL); if (rv != SQLITE_OK) { return rv; } sqlite3_int64 size1; struct wal_hdr hdr1; - rv = read_wal_hdr(e->wal_cur, &size1, &hdr1); + rv = try_read_wal_hdr(e->wal_cur, &size1, &hdr1); if (rv != SQLITE_OK) { return rv; } sqlite3_int64 size2; struct wal_hdr hdr2; - rv = read_wal_hdr(e->wal_prev, &size2, &hdr2); + rv = try_read_wal_hdr(e->wal_prev, &size2, &hdr2); if (rv != SQLITE_OK) { return rv; } @@ -1297,6 +1586,9 @@ static int open_entry(struct common *common, const char *name, struct entry *e) size_cur = size2; hdr_prev = hdr1; } + sm_init(&e->wal_sm, no_invariant, NULL, wal_states, "wal", + wal1_is_fresh ? WAL_1 : WAL_2); + sm_relate(&e->wtx_sm, &e->wal_sm); e->wal_cur_hdr = hdr_cur; e->wal_prev_hdr = hdr_prev; @@ -1306,37 +1598,55 @@ static int open_entry(struct common *common, const char *name, struct entry *e) rv = link(e->wal_cur_fixed_name, e->wal_moving_name); (void)rv; - e->shm_regions = sqlite3_malloc(sizeof(void *[1])); - if (e->shm_regions == NULL) { - return SQLITE_NOMEM; - } - e->shm_regions[0] = sqlite3_malloc(VFS2_WAL_INDEX_REGION_SIZE); - if (e->shm_regions[0] == NULL) { - return SQLITE_NOMEM; - } - memset(e->shm_regions[0], 0, VFS2_WAL_INDEX_REGION_SIZE); - e->shm_regions_len = 1; - - *get_full_hdr(e) = initial_full_hdr(hdr_cur); - - e->wal_cursor = wal_cursor_from_size(e->page_size, size_cur); + sm_init(&e->shm.sm, no_invariant, NULL, shm_states, "shm", SHM_EMPTY); + sm_relate(&e->wtx_sm, &e->shm.sm); - int next = WTX_EMPTY; - if (size_cur >= wal_offset_from_cursor(0 /* this doesn't matter */, 0)) { - /* TODO verify the header here */ + /* If WAL-cur contains a valid header, walk it to initialize wal_cursor + * and the WAL-index. Also, set the page size. + * + * If WAL-cur is empty, then we are starting up for the first time, and + * we don't initialize the WAL-index. It will be initialized either by + * SQLite running recovery or by add_uncommitted, whichever happens + * first. (In general, we want to avoid letting SQLite run recovery, but + * in this case it's harmless, since if the WAL is empty then it can't + * read any uncommitted data.) + */ + if (size_cur > 0) { e->page_size = ByteGetBe32(hdr_cur.page_size); - next = WTX_FLUSH; + rv = shm_init(&e->shm, hdr_cur); + if (rv != SQLITE_OK) { + return rv; + } + rv = walk_wal(e->wal_cur, size_cur, hdr_cur, NULL, + &e->wal_cursor, &e->shm); + if (rv != SQLITE_OK) { + return rv; + } } - if (size_cur >= wal_offset_from_cursor(e->page_size, 1)) { + /* If we found at least one valid transaction in the WAL, take the write + * lock. */ + if (e->wal_cursor > 0) { e->shm_locks[WAL_WRITE_LOCK] = VFS2_EXCLUSIVE; - next = WTX_FOLLOWING; } - sm_move(&e->wtx_sm, next); + sm_init(&e->wlk_sm, no_invariant, NULL, wlk_states, "wlk", + e->wal_cursor > 0 ? WLK_LOCKED : WLK_UNLOCKED); + sm_relate(&e->wtx_sm, &e->wlk_sm); + + sm_init(&e->ckpt_sm, no_invariant, NULL, ckpt_states, "ckpt", + CKPT_QUIESCENT); + sm_relate(&e->wtx_sm, &e->ckpt_sm); + + sm_move(&e->wtx_sm, e->wal_cursor > 0 ? WTX_FOLLOWING + : size_cur > 0 ? WTX_BASE + : WTX_EMPTY); return SQLITE_OK; } -static int set_up_entry(struct common *common, const char *name, int flags, struct entry **e) +static int set_up_entry(struct common *common, + const char *name, + int flags, + struct entry **e) { bool name_is_db = (flags & SQLITE_OPEN_MAIN_DB) != 0; bool name_is_wal = (flags & SQLITE_OPEN_WAL) != 0; @@ -1359,16 +1669,18 @@ static int set_up_entry(struct common *common, const char *name, int flags, stru if (res != NULL) { sqlite3_free(*e); *e = res; - unsigned *refcount = name_is_db ? &res->refcount_main_db : &res->refcount_wal; + unsigned *refcount = + name_is_db ? &res->refcount_main_db : &res->refcount_wal; *refcount += 1; return SQLITE_OK; } assert(name_is_db); res = *e; - /* If open_entry fails we still want to link in the entry. Since we unconditionally - * set pMethods in our file vtable, SQLite will xClose the file and vfs2_close - * will run to clean up the partial work of open_entry. */ + /* If open_entry fails we still want to link in the entry. Since we + * unconditionally set pMethods in our file vtable, SQLite will xClose + * the file and vfs2_close will run to clean up the partial work of + * open_entry. */ rv = open_entry(common, name, res); pthread_rwlock_wrlock(&common->rwlock); queue_insert_tail(&common->queue, &res->link); @@ -1402,7 +1714,7 @@ static int vfs2_open(sqlite3_vfs *vfs, } } - if (flags & (SQLITE_OPEN_MAIN_DB|SQLITE_OPEN_WAL)) { + if (flags & (SQLITE_OPEN_MAIN_DB | SQLITE_OPEN_WAL)) { xout->entry = sqlite3_malloc(sizeof(*xout->entry)); if (xout->entry == NULL) { return SQLITE_NOMEM; @@ -1420,7 +1732,8 @@ static int vfs2_open(sqlite3_vfs *vfs, return SQLITE_OK; } -/* TODO does this need to be customized? should it ever be called on one of our files? */ +/* TODO does this need to be customized? should it ever be called on one of our + * files? */ static int vfs2_delete(sqlite3_vfs *vfs, const char *name, int sync_dir) { struct common *data = vfs->pAppData; @@ -1537,22 +1850,23 @@ sqlite3_vfs *vfs2_make(sqlite3_vfs *orig, const char *name) return vfs; } -int vfs2_unapply(sqlite3_file *file, struct vfs2_wal_slice first_to_unapply) +int vfs2_unadd(sqlite3_file *file, struct vfs2_wal_slice first_to_unadd) { struct file *xfile = (struct file *)file; PRE(xfile->flags & SQLITE_OPEN_MAIN_DB); struct entry *e = xfile->entry; - PRE(salts_equal(first_to_unapply.salts, e->wal_cur_hdr.salts)); - PRE(first_to_unapply.start + first_to_unapply.len <= e->wal_cursor); + PRE(salts_equal(first_to_unadd.salts, e->wal_cur_hdr.salts)); + PRE(first_to_unadd.start + first_to_unadd.len <= e->wal_cursor); struct wal_index_full_hdr *ihdr = get_full_hdr(e); - PRE(first_to_unapply.start >= ihdr->basic[0].mxFrame); + PRE(first_to_unadd.start >= ihdr->basic[0].mxFrame); PRE(e->shm_locks[WAL_WRITE_LOCK] == VFS2_EXCLUSIVE); - e->wal_cursor = first_to_unapply.start; + e->wal_cursor = first_to_unadd.start; if (e->wal_cursor == ihdr->basic[0].mxFrame) { e->shm_locks[WAL_WRITE_LOCK] = 0; - sm_move(&e->wtx_sm, WTX_FLUSH); + sm_move(&e->wtx_sm, WTX_BASE); + sm_move(&e->wlk_sm, WLK_UNLOCKED); } else { sm_move(&e->wtx_sm, WTX_FOLLOWING); } @@ -1565,6 +1879,7 @@ int vfs2_unhide(sqlite3_file *file) PRE(xfile->flags & SQLITE_OPEN_MAIN_DB); struct entry *e = xfile->entry; PRE(e->shm_locks[WAL_WRITE_LOCK] == VFS2_EXCLUSIVE); + sm_move(&e->wlk_sm, WLK_UNLOCKED); e->shm_locks[WAL_WRITE_LOCK] = 0; struct wal_index_full_hdr *hdr = get_full_hdr(e); @@ -1580,7 +1895,7 @@ int vfs2_unhide(sqlite3_file *file) return 0; } -int vfs2_commit(sqlite3_file *file, struct vfs2_wal_slice stop) +int vfs2_apply(sqlite3_file *file, struct vfs2_wal_slice stop) { struct file *xfile = (struct file *)file; PRE(xfile->flags & SQLITE_OPEN_MAIN_DB); @@ -1591,80 +1906,61 @@ int vfs2_commit(sqlite3_file *file, struct vfs2_wal_slice stop) PRE(e->shm_locks[WAL_WRITE_LOCK] == VFS2_EXCLUSIVE); sqlite3_file *wal_cur = e->wal_cur; struct wal_frame_hdr fhdr; - int rv = wal_cur->pMethods->xRead(wal_cur, &fhdr, sizeof(fhdr), wal_offset_from_cursor(e->page_size, stop.start + stop.len - 1)); - if (rv == SQLITE_OK) { + int rv = wal_cur->pMethods->xRead( + wal_cur, &fhdr, sizeof(fhdr), + wal_offset_from_cursor(e->page_size, stop.start + stop.len - 1)); + if (rv != SQLITE_OK) { return rv; } - set_mx_frame(get_full_hdr(e), commit, fhdr); + set_mx_frame(e, commit, fhdr); if (commit == e->wal_cursor) { e->shm_locks[WAL_WRITE_LOCK] = 0; - sm_move(&e->wtx_sm, WTX_FLUSH); + sm_move(&e->wlk_sm, WLK_UNLOCKED); + sm_move(&e->wtx_sm, WTX_BASE); } else { sm_move(&e->wtx_sm, WTX_FOLLOWING); } return 0; } -int vfs2_commit_barrier(sqlite3_file *file) -{ - struct file *xfile = (struct file *)file; - PRE(xfile->flags & SQLITE_OPEN_MAIN_DB); - struct entry *e = xfile->entry; - if (e->wal_cursor > 0) { - sqlite3_file *wal_cur = e->wal_cur; - struct wal_frame_hdr fhdr; - int rv = wal_cur->pMethods->xRead(wal_cur, &fhdr, sizeof(fhdr), wal_offset_from_cursor(e->page_size, e->wal_cursor - 1)); - if (rv == SQLITE_OK) { - return rv; - } - set_mx_frame(get_full_hdr(e), e->wal_cursor, fhdr); - /* It's okay if the write lock isn't held */ - e->shm_locks[WAL_WRITE_LOCK] = 0; - get_full_hdr(e)->basic[0].isInit = 0; - /* The next transaction will cause SQLite to run recovery which will complete the transition to BASE */ - sm_move(&e->wtx_sm, WTX_FLUSH); - } - return 0; -} - -int vfs2_poll(sqlite3_file *file, struct vfs2_wal_frame **frames, unsigned *n, struct vfs2_wal_slice *sl) +int vfs2_poll(sqlite3_file *file, + dqlite_vfs_frame **frames_out, + struct vfs2_wal_slice *sl_out) { struct file *xfile = (struct file *)file; PRE(xfile->flags & SQLITE_OPEN_MAIN_DB); struct entry *e = xfile->entry; + PRE(e->shm_locks[WAL_WRITE_LOCK] == 0); uint32_t len = e->pending_txn_len; + dqlite_vfs_frame *frames = NULL; + struct vfs2_wal_slice sl = {}; + /* If some frames were produced, take the write lock. */ if (len > 0) { - /* Don't go through vfs2_shm_lock here since that has additional - * checks that assume the context of being called from inside - * SQLite. */ - if (e->shm_locks[WAL_WRITE_LOCK] > 0) { - return 1; - } e->shm_locks[WAL_WRITE_LOCK] = VFS2_EXCLUSIVE; - } - - /* Note, not resetting pending_txn_{start,len} because they are used by later states */ - if (n != NULL && frames != NULL) { - *n = len; - *frames = e->pending_txn_frames; + sm_move(&e->wlk_sm, WLK_LOCKED); + frames = e->pending_txn_frames; + e->pending_txn_frames = NULL; + sl = (struct vfs2_wal_slice){ .salts = e->pending_txn_hdr.salts, + .start = e->prev_txn_hdr.mxFrame, + .len = len }; + /* We don't clear e->pending_txn_hdr here because it's used by + * vfs2_unhide. (By contrast, pending_txn_frames only exists + * to be returned by this function if requested.) */ + sm_move(&e->wtx_sm, WTX_POLLED); + } + + if (frames_out != NULL) { + *frames_out = frames; } else { - for (uint32_t i = 0; i < e->pending_txn_len; i++) { - sqlite3_free(e->pending_txn_frames[i].page); + for (uint32_t i = 0; i < len; i++) { + sqlite3_free(frames[i].data); } - sqlite3_free(e->pending_txn_frames); + sqlite3_free(frames); } - e->pending_txn_frames = NULL; - - if (sl != NULL) { - sl->len = len; - sl->salts = e->pending_txn_hdr.salts; - sl->start = e->prev_txn_hdr.mxFrame; - sl->len = len; + if (sl_out != NULL) { + *sl_out = sl; } - - sm_move(&xfile->entry->wtx_sm, WTX_POLLED); - return 0; } @@ -1678,7 +1974,7 @@ void vfs2_destroy(sqlite3_vfs *vfs) int vfs2_abort(sqlite3_file *file) { - /* TODO maybe can "followerize" this and get rid of vfs2_unapply_after? */ + /* TODO maybe can "followerize" this and get rid of vfs2_unadd_after? */ struct file *xfile = (struct file *)file; PRE(xfile->flags & SQLITE_OPEN_MAIN_DB); struct entry *e = xfile->entry; @@ -1711,7 +2007,8 @@ int vfs2_read_wal(sqlite3_file *file, int page_size = (int)e->page_size; for (size_t i = 0; i < txns_len; i++) { - struct vfs2_wal_frame *f = sqlite3_malloc64(txns[i].meta.len * sizeof(*f)); + dqlite_vfs_frame *f = + sqlite3_malloc64(txns[i].meta.len * sizeof(*f)); if (f == NULL) { goto oom; } @@ -1721,18 +2018,21 @@ int vfs2_read_wal(sqlite3_file *file, if (p == NULL) { goto oom; } - txns[i].frames[j].page = p; + txns[i].frames[j].data = p; } } for (size_t i = 0; i < txns_len; i++) { sqlite3_file *wal; unsigned read_lock; - bool from_wal_cur = salts_equal(txns[i].meta.salts, e->wal_cur_hdr.salts); - bool from_wal_prev = salts_equal(txns[i].meta.salts, e->wal_prev_hdr.salts); + bool from_wal_cur = + salts_equal(txns[i].meta.salts, e->wal_cur_hdr.salts); + bool from_wal_prev = + salts_equal(txns[i].meta.salts, e->wal_prev_hdr.salts); assert(from_wal_cur ^ from_wal_prev); if (from_wal_cur) { - rv = vfs2_pseudo_read_begin(file, e->wal_cursor, &read_lock); + rv = vfs2_pseudo_read_begin(file, e->wal_cursor, + &read_lock); if (rv != SQLITE_OK) { return 1; } @@ -1744,19 +2044,22 @@ int vfs2_read_wal(sqlite3_file *file, uint32_t start = txns[i].meta.start; uint32_t len = txns[i].meta.len; for (uint32_t j = 0; j < len; j++) { - sqlite3_int64 off = wal_offset_from_cursor(e->page_size, start + j); + sqlite3_int64 off = + wal_offset_from_cursor(e->page_size, start + j); struct wal_frame_hdr fhdr; - rv = wal->pMethods->xRead(wal, &fhdr, sizeof(fhdr), off); + rv = + wal->pMethods->xRead(wal, &fhdr, sizeof(fhdr), off); if (rv != SQLITE_OK) { return 1; } off += (sqlite3_int64)sizeof(fhdr); - rv = wal->pMethods->xRead(wal, txns[i].frames[j].page, page_size, off); + rv = wal->pMethods->xRead(wal, txns[i].frames[j].data, + page_size, off); if (rv != SQLITE_OK) { return 1; } - txns[i].frames[j].page_number = ByteGetBe32(fhdr.page_number); - txns[i].frames[j].commit = ByteGetBe32(fhdr.commit); + txns[i].frames[j].page_number = + ByteGetBe32(fhdr.page_number); } if (from_wal_cur) { vfs2_pseudo_read_end(file, read_lock); @@ -1768,7 +2071,7 @@ int vfs2_read_wal(sqlite3_file *file, oom: for (uint32_t i = 0; i < txns_len; i++) { for (uint32_t j = 0; j < txns[i].meta.len; j++) { - sqlite3_free(txns[i].frames[j].page); + sqlite3_free(txns[i].frames[j].data); } sqlite3_free(txns[i].frames); txns[i].frames = NULL; @@ -1776,7 +2079,9 @@ int vfs2_read_wal(sqlite3_file *file, return 1; } -static int write_one_frame(struct entry *e, struct wal_frame_hdr hdr, void *data) +static int write_one_frame(struct entry *e, + struct wal_frame_hdr hdr, + void *data) { int rv; @@ -1786,7 +2091,8 @@ static int write_one_frame(struct entry *e, struct wal_frame_hdr hdr, void *data return rv; } off += (sqlite3_int64)sizeof(hdr); - rv = e->wal_cur->pMethods->xWrite(e->wal_cur, data, (int)e->page_size, off); + rv = e->wal_cur->pMethods->xWrite(e->wal_cur, data, (int)e->page_size, + off); if (rv != SQLITE_OK) { return rv; } @@ -1794,54 +2100,107 @@ static int write_one_frame(struct entry *e, struct wal_frame_hdr hdr, void *data return SQLITE_OK; } -static struct wal_hdr next_wal_hdr(const struct entry *e) +/** + * Create a valid WAL header from the specified fields. + */ +static struct wal_hdr make_wal_hdr(uint32_t magic, + uint32_t page_size, + uint32_t ckpoint_seqno, + struct vfs2_salts salts) { struct wal_hdr ret; + BytePutBe32(magic, ret.magic); + BytePutBe32(WAL_MAX_VERSION, ret.version); + BytePutBe32(page_size, ret.page_size); + BytePutBe32(ckpoint_seqno, ret.ckpoint_seqno); + ret.salts = salts; + struct cksums sums = {}; + const uint8_t *p = (const uint8_t *)&ret; + size_t len = offsetof(struct wal_hdr, cksum1); + update_cksums(p, len, &sums); + BytePutBe32(sums.cksum1, ret.cksum1); + BytePutBe32(sums.cksum2, ret.cksum2); + POST(wal_hdr_is_valid(&ret)); + return ret; +} + +static struct wal_hdr initial_wal_hdr(uint32_t page_size) +{ + struct vfs2_salts salts; + sqlite3_randomness(sizeof(salts.salt1), (void *)&salts.salt1); + sqlite3_randomness(sizeof(salts.salt2), (void *)&salts.salt2); + return make_wal_hdr(is_bigendian() ? BE_MAGIC : LE_MAGIC, page_size, 0, + salts); +} + +/** + * Derive the next header that should be written to start a new WAL. + * + * To get the next header, we start with the header of WAL-cur, increment + * salt1 and ckpoint_seqno, and randomize salt2. + */ +static struct wal_hdr next_wal_hdr(const struct entry *e) +{ struct wal_hdr old = e->wal_cur_hdr; - BytePutBe32(native_magic(), ret.magic); - BytePutBe32(3007000, ret.version); - BytePutBe32(e->page_size, ret.page_size); - uint32_t ckpoint_seqno = ByteGetBe32(old.ckpoint_seqno); - BytePutBe32(ckpoint_seqno + 1, ret.ckpoint_seqno); - uint32_t salt1; - if (ckpoint_seqno == 0) { - salt1 = get_salt1(old.salts) + 1; + uint32_t magic = is_bigendian() ? BE_MAGIC : LE_MAGIC; + uint32_t ckpoint_seqno = ByteGetBe32(old.ckpoint_seqno) + 1; + /* salt2 is randomized every time we generate a new WAL header. + * We don't use the xRandomness method of the base VFS to do this, + * because it always translates to a syscall (getrandom), and + * SQLite intends that this should only be used for seeding the + * internal PRNG. Instead, we call sqlite3_randomness, which gives + * us access to this PRNG, seeded from the default (unix) VFS. */ + struct vfs2_salts salts; + if (ckpoint_seqno == 1) { + sqlite3_randomness(sizeof(salts.salt1), (void *)&salts.salt1); } else { - e->common->orig->xRandomness(e->common->orig, sizeof(salt1), (void *)&salt1); + BytePutBe32(get_salt1(old.salts) + 1, salts.salt1); } - BytePutBe32(salt1, ret.salts.salt1); - e->common->orig->xRandomness(e->common->orig, sizeof(ret.salts.salt2), (void *)&ret.salts.salt2); - return ret; + sqlite3_randomness(sizeof(salts.salt2), (void *)&salts.salt2); + return make_wal_hdr(magic, e->page_size, ckpoint_seqno, salts); } -static struct wal_frame_hdr txn_frame_hdr(struct entry *e, struct cksums sums, struct vfs2_wal_frame frame) +static struct wal_frame_hdr txn_frame_hdr(struct entry *e, + struct cksums sums, + const dqlite_vfs_frame *frame, + uint32_t commit) { struct wal_frame_hdr fhdr; - BytePutBe32(frame.page_number, fhdr.page_number); - BytePutBe32(frame.commit, fhdr.commit); - update_cksums(ByteGetBe32(e->wal_cur_hdr.magic), (const void *)(&fhdr), 8, &sums); - update_cksums(ByteGetBe32(e->wal_cur_hdr.magic), frame.page, e->page_size, &sums); + BytePutBe32((uint32_t)frame->page_number, fhdr.page_number); + BytePutBe32(commit, fhdr.commit); + update_cksums((const void *)(&fhdr), 8, &sums); + update_cksums(frame->data, e->page_size, &sums); fhdr.salts = e->wal_cur_hdr.salts; BytePutBe32(sums.cksum1, fhdr.cksum1); BytePutBe32(sums.cksum2, fhdr.cksum2); return fhdr; } -int vfs2_apply_uncommitted(sqlite3_file *file, uint32_t page_size, const struct vfs2_wal_frame *frames, unsigned len, struct vfs2_wal_slice *out) +int vfs2_add_uncommitted(sqlite3_file *file, + uint32_t page_size, + const dqlite_vfs_frame *frames, + unsigned len, + struct vfs2_wal_slice *out) { - PRE(len > 0); - PRE(is_valid_page_size(page_size)); - for (unsigned i = 0; i < len - 1; i++) { - PRE(frames[i].commit == 0); - } - PRE(frames[len - 1].commit > 0); struct file *xfile = (struct file *)file; PRE(xfile->flags & SQLITE_OPEN_MAIN_DB); struct entry *e = xfile->entry; - PRE(page_size == e->page_size); int rv; + /* TODO(cole) roll back wal_cursor and release the write lock if one of + * the writes fails. */ + + PRE(len > 0); + + /* We require the page size to have been set by the pragma before this + * point. */ + PRE(is_valid_page_size(page_size)); + + /* Sanity check that the leader isn't sending us pages of the wrong + * size. */ + PRE(page_size == e->page_size); + /* The write lock is always held if there is at least one * uncommitted frame in WAL-cur. In FOLLOWING state, we allow * adding more frames to WAL-cur even if there are already @@ -1849,57 +2208,142 @@ int vfs2_apply_uncommitted(sqlite3_file *file, uint32_t page_size, const struct * lock here before "acquiring" it, we just make sure that * it's held before returning. * - * The write lock will be released when a call to vfs2_commit - * or vfs2_unapply causes the number of committed frames in + * The write lock will be released when a call to vfs2_apply + * or vfs2_unadd causes the number of committed frames in * WAL-cur (mxFrame) to equal the number of applies frames * (wal_cursor). */ + if (e->shm_locks[WAL_WRITE_LOCK] == 0) { + sm_move(&e->wlk_sm, WLK_LOCKED); + } e->shm_locks[WAL_WRITE_LOCK] = VFS2_EXCLUSIVE; - - struct wal_index_full_hdr *ihdr = get_full_hdr(e); - uint32_t mx = ihdr->basic[0].mxFrame; - if (mx > 0 && ihdr->nBackfill == mx) { - struct wal_hdr new_whdr = next_wal_hdr(e); - restart_full_hdr(ihdr, new_whdr); - rv = wal_swap(e, &new_whdr); + + /* This paragraph accomplishes a few related things: initializing the + * shm if necessary, figuring out where to place the frames of the new + * transaction in the WAL, and, if necessary, writing a new WAL header + * and doing work related to the WAL swap. + * + * The shm will need to be initialized if this wasn't already done by + * either open_entry or SQLite itself. This happens when we open the + * database for the first time (so WAL-cur is empty) and then run + * add_uncommitted before anything else. + * + * The new transaction will be placed at the offset given by + * `e->wal_cursor`, after possibly resetting this to zero (when the WAL + * has been fully backfilled). + * + * If the new transaction is to be placed at offset zero, we also write + * a new WAL header, reset the shared memory (unless it was just + * initialized for the first time!), and swap the WALs. + */ + PRE(ERGO(e->shm.num_regions == 0, e->wal_cursor == 0)); + struct wal_index_full_hdr *ihdr = + e->shm.num_regions > 0 ? &e->shm.regions[0]->hdr : NULL; + uint32_t mx = ihdr != NULL ? ihdr->basic[0].mxFrame : 0; + uint32_t backfill = ihdr != NULL ? ihdr->nBackfill : 0; + if (e->wal_cursor == mx && mx == backfill) { + struct wal_hdr new_wal_hdr; + if (ihdr != NULL) { + new_wal_hdr = next_wal_hdr(e); + shm_restart(&e->shm, new_wal_hdr); + } else { + new_wal_hdr = initial_wal_hdr(e->page_size); + rv = shm_init(&e->shm, new_wal_hdr); + if (rv != SQLITE_OK) { + return 1; + } + } + wal_swap(e, &new_wal_hdr); + rv = e->wal_cur->pMethods->xWrite(e->wal_cur, &new_wal_hdr, + sizeof(new_wal_hdr), 0); if (rv != SQLITE_OK) { return 1; } - /* sm_move(&e->wtx_sm, WTX_FLUSH); */ } uint32_t start = e->wal_cursor; - struct cksums sums; + uint32_t db_size; if (start > 0) { + /* There's already a transaction in the WAL. In this case + * we initialize the rolling checksum and database size + * calculation from the header of the last (commit) frame in + * this transaction. */ /* TODO cache this in the entry? */ struct wal_frame_hdr prev_fhdr; - sqlite3_int64 off = wal_offset_from_cursor(e->page_size, e->wal_cursor - 1); - rv = e->wal_cur->pMethods->xRead(e->wal_cur, &prev_fhdr, sizeof(prev_fhdr), off); + sqlite3_int64 off = + wal_offset_from_cursor(e->page_size, e->wal_cursor - 1); + rv = e->wal_cur->pMethods->xRead(e->wal_cur, &prev_fhdr, + sizeof(prev_fhdr), off); if (rv != SQLITE_OK) { return 1; } sums.cksum1 = ByteGetBe32(prev_fhdr.cksum1); sums.cksum2 = ByteGetBe32(prev_fhdr.cksum2); + db_size = ByteGetBe32(prev_fhdr.commit); } else { + /* This is the first transaction in this WAL. In this case + * we initialize the rolling checksum from the checksum in + * the WAL header, and read the actual database file to + * initialize the running database size. */ sums.cksum1 = ByteGetBe32(e->wal_cur_hdr.cksum1); sums.cksum2 = ByteGetBe32(e->wal_cur_hdr.cksum2); - } + /* The database size in pages is kept in a field of the database + * header. */ + uint8_t b[DB_FILE_HEADER_SIZE]; + rv = + xfile->orig->pMethods->xRead(xfile->orig, &b, sizeof(b), 0); + /* TODO(cole) this can't fail provided that the main file + * has been created; ensure that this is the case even if + * we haven't run a checkpoint yet. */ + assert(rv == SQLITE_OK); + db_size = ByteGetBe32(b + DB_FILE_HEADER_NPAGES_OFFSET); + } + POST(db_size > 0); - struct wal_frame_hdr fhdr = txn_frame_hdr(e, sums, frames[0]); - rv = write_one_frame(e, fhdr, frames[0].page); - if (rv != SQLITE_OK) { - return 1; - } + struct wal_frame_hdr fhdr; + for (unsigned i = 0; i < len; i++) { + /* Record the new frame in the appropriate page number array in + * the WAL index. Note that this doesn't make the frame visible + * to readers: that only happens once it is also recorded in + * the WAL-index hash array and mxFrame exceeds the frame + * index. The hash array is updated only when we increase + * mxFrame, so that we don't have to deal with the added + * complexity of removing things from the hash table when + * frames are overwritten before being committed. Updating the + * page number array "early" like this is harmless and saves us + * from having to stash the page numbers somewhere else in + * memory in between add_uncommitted and apply, or (worse) read + * them back from WAL-cur. + * + * TODO(cole) we can simplify the error handling by requesting + * the shm to grow as much as necessary up front, before we + * have written the frames. */ + uint32_t pgno = (uint32_t)frames[i].page_number; + rv = shm_add_pgno(&e->shm, e->wal_cursor, pgno); + if (rv != SQLITE_OK) { + return 1; + } + + /* With every frame, we make this update. The interpretation is + * that db_size would be the size of the main file in pages if + * we checkpointing up to and including the current frame. Then + * we use the final value to write the commit marker of the + * last frame. */ + db_size = MAX(db_size, pgno); + uint32_t commit = i == len - 1 ? db_size : 0; + + /* Keep the checksums rolling. */ + if (i > 0) { + sums.cksum1 = ByteGetBe32(fhdr.cksum1); + sums.cksum2 = ByteGetBe32(fhdr.cksum2); + } - for (unsigned i = 1; i < len; i++) { - sums.cksum1 = ByteGetBe32(fhdr.cksum1); - sums.cksum2 = ByteGetBe32(fhdr.cksum2); - fhdr = txn_frame_hdr(e, sums, frames[i]); - rv = write_one_frame(e, fhdr, frames[i].page); + fhdr = txn_frame_hdr(e, sums, &frames[i], commit); + rv = write_one_frame(e, fhdr, frames[i].data); if (rv != SQLITE_OK) { return 1; } - } + } sm_move(&e->wtx_sm, WTX_FOLLOWING); out->salts = e->wal_cur_hdr.salts; @@ -1920,6 +2364,8 @@ static unsigned read_lock(unsigned i) int vfs2_pseudo_read_begin(sqlite3_file *file, uint32_t target, unsigned *out) { + /* FIXME(cole) this implementation is incorrect and needs to be + * replaced. */ struct file *xfile = (struct file *)file; PRE(xfile->flags & SQLITE_OPEN_MAIN_DB); struct entry *e = xfile->entry; @@ -1928,7 +2374,7 @@ int vfs2_pseudo_read_begin(sqlite3_file *file, uint32_t target, unsigned *out) /* adapted from walTryBeginRead */ uint32_t max_mark = 0; unsigned max_index = 0; - for (unsigned i = 1; i < WAL_NREADER; i++){ + for (unsigned i = 1; i < WAL_NREADER; i++) { uint32_t cur = ihdr->marks[i]; if (max_mark <= cur && cur <= target) { assert(cur != READ_MARK_UNUSED); @@ -1963,3 +2409,24 @@ int vfs2_pseudo_read_end(sqlite3_file *file, unsigned i) e->shm_locks[i] -= 1; return 0; } + +void vfs2_ut_sm_relate(sqlite3_file *orig, sqlite3_file *targ) +{ + struct file *forig = (struct file *)orig; + PRE(forig->flags & SQLITE_OPEN_MAIN_DB); + struct file *ftarg = (struct file *)targ; + PRE(ftarg->flags & SQLITE_OPEN_MAIN_DB); + sm_relate(&forig->entry->wtx_sm, &ftarg->entry->wtx_sm); +} + +void vfs2_ut_make_wal_hdr(uint8_t *buf, + uint32_t page_size, + uint32_t ckpoint_seqno, + uint32_t salt1, + uint32_t salt2) +{ + struct wal_hdr hdr = + make_wal_hdr(is_bigendian() ? BE_MAGIC : LE_MAGIC, page_size, + ckpoint_seqno, make_salts(salt1, salt2)); + memcpy(buf, &hdr, sizeof(hdr)); +} diff --git a/src/vfs2.h b/src/vfs2.h index f2181e760..790fd5436 100644 --- a/src/vfs2.h +++ b/src/vfs2.h @@ -1,6 +1,8 @@ #ifndef DQLITE_VFS2_H #define DQLITE_VFS2_H +#include "../include/dqlite.h" /* dqlite_vfs_frame */ + #include #include @@ -10,15 +12,20 @@ * Create a new VFS object that wraps the given VFS object. * * The returned VFS is allocated on the heap and lives until vfs2_destroy is - * called. Its methods are thread-safe if those of the wrapped VFS are, but - * the methods of the sqlite3_file objects it creates are not thread-safe. - * Therefore, a database connection that's created using this VFS should only - * be used on the thread that opened it. The functions below that operate on - * sqlite3_file objects created by this VFS should also only be used on that - * thread. + * called. The provided name is not copied or freed, and must outlive the VFS. + * + * The methods of the resulting sqlite3_vfs object are thread-safe, but the + * xOpen method creates sqlite3_file objects whose methods are not thread-safe. + * Therefore, when a database connection is opened using the returned VFS, it + * must not subsequently be used on other threads without additional + * synchronization. This also goes for the functions below that operate directly + * on sqlite3_file. */ sqlite3_vfs *vfs2_make(sqlite3_vfs *orig, const char *name); +/** + * A pair of salt values from the header of a WAL file. + */ struct vfs2_salts { uint8_t salt1[4]; uint8_t salt2[4]; @@ -26,6 +33,10 @@ struct vfs2_salts { /** * Identifying information about a write transaction. + * + * The salts identify a WAL file. The `start` and `len` fields have units of + * frames and identify a range of frames within that WAL file that represent a + * transaction. */ struct vfs2_wal_slice { struct vfs2_salts salts; @@ -33,35 +44,91 @@ struct vfs2_wal_slice { uint32_t len; }; -struct vfs2_wal_frame { - uint32_t page_number; - uint32_t commit; - void *page; -}; - /** - * Retrieve frames that were appended to the WAL by the last write transaction, - * and reacquire the write lock. + * Retrieve a description of a write transaction that was just written to the + * WAL. * - * Call this on the database main file object (SQLITE_FCNTL_FILE_POINTER). + * The first argument is the main file handle for the database. This function + * also acquires the WAL write lock, preventing another write transaction from + * overwriting the first one until vfs2_unhide is called. + * + * The `frames` out argument is populated with an array containing + * the frames of the transaction, and the `sl` out argument is populated with a + * WAL slice describing the transaction. The length of the frames array is + * `sl.len`. * * Polling the same transaction more than once is an error. */ -int vfs2_poll(sqlite3_file *file, struct vfs2_wal_frame **frames, unsigned *n, struct vfs2_wal_slice *sl); +int vfs2_poll(sqlite3_file *file, + dqlite_vfs_frame **frames, + struct vfs2_wal_slice *sl); +/** + * Mark a write transaction that was previously polled as committed, making it + * visible to future transactions. + * + * This function also releases the WAL write lock, allowing another write + * transaction to execute. It should be called on the main file handle + * (SQLITE_FCNTL_FILE_POINTER). + * + * It's an error to call this function if no write transaction is currently + * pending due to vfs2_poll. + */ int vfs2_unhide(sqlite3_file *file); -int vfs2_commit(sqlite3_file *file, struct vfs2_wal_slice stop); - -int vfs2_commit_barrier(sqlite3_file *file); +/** + * Make some transactions in the WAL visible to readers. + * + * The first argument is the main file for the database + * (SQLITE_FCNTL_FILE_POINTER). All transactions up to and including the one + * described by the second argument will be marked as committed and made + * visible. The WAL write lock is also released if there are no uncommitted + * transactions left in the WAL. + * + * The affected transactions must have been added to the WAL by + * vfs2_add_committed. + */ +int vfs2_apply(sqlite3_file *file, struct vfs2_wal_slice stop); -int vfs2_apply_uncommitted(sqlite3_file *file, uint32_t page_size, const struct vfs2_wal_frame *frames, unsigned n, struct vfs2_wal_slice *out); +/** + * Add the frames of a write transaction directly to the end of the WAL. + * + * The first argument is the main file handle for the database. On success, the + * WAL write lock is acquired if it was not held already. The added frames are + * initially invisible to readers, and must be made visible by calling + * vfs2_commit or removed from the WAL by calling vfs2_unadd. + * + * A WAL slice describing the new transaction is written to the last argument. + * + * The `page_size` for the new frames must match the page size already set for + * this database. + */ +int vfs2_add_uncommitted(sqlite3_file *file, + uint32_t page_size, + const dqlite_vfs_frame *frames, + unsigned n, + struct vfs2_wal_slice *out); -int vfs2_unapply(sqlite3_file *file, struct vfs2_wal_slice stop); +/** + * Remove some transactions from the WAL. + * + * The first argument is the main file handle for the database + * (SQLITE_FCNTL_FILE_POINTER). The second argument is a WAL slice. The + * transaction described by this slice, and all following transactions, will be + * removed from the WAL. The WAL write lock will be released if there are no + * uncommitted transactions in the WAL afterward. + * + * All removed transactions must have been added to the WAL by + * vfs2_add_uncommitted, and must not have been made visible using vfs2_commit. + */ +int vfs2_unadd(sqlite3_file *file, struct vfs2_wal_slice stop); +/** + * Request to read a specific transaction from a WAL file. + */ struct vfs2_wal_txn { struct vfs2_wal_slice meta; - struct vfs2_wal_frame *frames; + dqlite_vfs_frame *frames; }; /** @@ -87,8 +154,23 @@ int vfs2_read_wal(sqlite3_file *file, */ int vfs2_abort(sqlite3_file *file); +/** + * Try to set a read lock at a fixed place in the WAL-index. + * + * The first argument is the main file handle for the database + * (SQLITE_FCNTL_FILE_POINTER). The second argument is the desired value for the + * read mark, in units of frames. On success, the index of the read mark is + * written to the last argument, and the corresponding WAL read lock is held. + * + * This function may fail if all read marks are in used when it is called. + */ int vfs2_pseudo_read_begin(sqlite3_file *file, uint32_t target, unsigned *out); +/** + * Unset a read mark that was set by vfs2_pseudo_read_begin. + * + * This also releases the corresponding read lock. + */ int vfs2_pseudo_read_end(sqlite3_file *file, unsigned i); /** @@ -99,7 +181,26 @@ int vfs2_pseudo_read_end(sqlite3_file *file, unsigned i); */ void vfs2_destroy(sqlite3_vfs *vfs); -// TODO access read marks and shm_locks -// TODO access information about checkpoints +/** + * Declare a relationship between two databases opened by (possibly distinct) + * vfs2 instances. + * + * Intended for use by unit tests. The arguments are main file handles + * (SQLITE_FCNTL_FILE_POINTER). + */ +void vfs2_ut_sm_relate(sqlite3_file *orig, sqlite3_file *targ); + +#define VFS2_WAL_HDR_SIZE 32 + +/** + * Write a WAL header with the provided fields to the given buffer. + * + * The size of the buffer must be at least VFS2_WAL_HDR_SIZE bytes. + */ +void vfs2_ut_make_wal_hdr(uint8_t *buf, + uint32_t page_size, + uint32_t ckpoint_seqno, + uint32_t salt1, + uint32_t salt2); #endif diff --git a/test/unit/test_vfs2.c b/test/unit/test_vfs2.c index 635d7ce7f..d8ad1b9e1 100644 --- a/test/unit/test_vfs2.c +++ b/test/unit/test_vfs2.c @@ -1,7 +1,5 @@ -#pragma GCC diagnostic ignored "-Wformat-truncation" // XXX - -#include "../../src/vfs2.h" #include "../../src/lib/byte.h" +#include "../../src/vfs2.h" #include "../lib/fs.h" #include "../lib/runner.h" @@ -14,38 +12,86 @@ #include #include +#define NUM_NODES 3 #define PAGE_SIZE 512 #define PAGE_SIZE_STR "512" +#define OK(rv) munit_assert_int((rv), ==, 0) + SUITE(vfs2); -struct fixture { +struct node { sqlite3_vfs *vfs; + char *vfs_name; char *dir; }; +struct fixture { + struct node nodes[NUM_NODES]; +}; + static void *set_up(const MunitParameter params[], void *user_data) { (void)params; (void)user_data; struct fixture *f = munit_malloc(sizeof(*f)); - f->dir = test_dir_setup(); - f->vfs = vfs2_make(sqlite3_vfs_find("unix"), "dqlite-vfs2"); - munit_assert_ptr_not_null(f->vfs); - sqlite3_vfs_register(f->vfs, 1 /* make default */); + struct node *node; + for (unsigned i = 0; i < NUM_NODES; i++) { + node = &f->nodes[i]; + node->dir = test_dir_setup(); + node->vfs_name = sqlite3_mprintf("vfs2-%u", i); + munit_assert_ptr_not_null(node->vfs_name); + node->vfs = vfs2_make(sqlite3_vfs_find("unix"), node->vfs_name); + munit_assert_ptr_not_null(node->vfs); + sqlite3_vfs_register(node->vfs, 0); + } return f; } static void tear_down(void *data) { struct fixture *f = data; - sqlite3_vfs_unregister(f->vfs); - vfs2_destroy(f->vfs); - test_dir_tear_down(f->dir); + const struct node *node; + for (unsigned i = 0; i < NUM_NODES; i++) { + node = &f->nodes[i]; + sqlite3_vfs_unregister(node->vfs); + vfs2_destroy(node->vfs); + sqlite3_free(node->vfs_name); + test_dir_tear_down(node->dir); + } free(f); } -static void prepare_wals(const char *dbname, +/** + * Open a connection to a database for this node. + */ +static sqlite3 *node_open_db(const struct node *node, const char *name) +{ + char buf[PATH_MAX]; + snprintf(buf, sizeof(buf), "%s/%s", node->dir, name); + sqlite3 *db; + int rv = sqlite3_open_v2(buf, &db, + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, + node->vfs_name); + munit_assert_int(rv, ==, SQLITE_OK); + munit_assert_ptr_not_null(db); + rv = sqlite3_exec(db, + "PRAGMA page_size=" PAGE_SIZE_STR + ";" + "PRAGMA journal_mode=WAL;" + "PRAGMA wal_autocheckpoint=0", + NULL, NULL, NULL); + munit_assert_int(rv, ==, SQLITE_OK); + rv = sqlite3_db_config(db, SQLITE_DBCONFIG_NO_CKPT_ON_CLOSE, 1, NULL); + munit_assert_int(rv, ==, SQLITE_OK); + return db; +} + +/** + * Write two WALs to disk with the given contents. + */ +static void prepare_wals(const struct node *node, + const char *dbname, const unsigned char *wal1, size_t wal1_len, const unsigned char *wal2, @@ -53,102 +99,91 @@ static void prepare_wals(const char *dbname, { char buf[PATH_MAX]; ssize_t n; - int rv; if (wal1 != NULL) { - snprintf(buf, sizeof(buf), "%s-xwal1", dbname); + snprintf(buf, sizeof(buf), "%s/%s-xwal1", node->dir, dbname); int fd1 = open(buf, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); munit_assert_int(fd1, !=, -1); - rv = ftruncate(fd1, 0); - munit_assert_int(rv, ==, 0); + OK(ftruncate(fd1, 0)); n = write(fd1, wal1, wal1_len); - munit_assert_llong(n, ==, wal1_len); + munit_assert_llong(n, ==, (ssize_t)wal1_len); close(fd1); } if (wal2 != NULL) { - snprintf(buf, sizeof(buf), "%s-xwal2", dbname); + snprintf(buf, sizeof(buf), "%s/%s-xwal2", node->dir, dbname); int fd2 = open(buf, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); munit_assert_int(fd2, !=, -1); - rv = ftruncate(fd2, 0); - munit_assert_int(rv, ==, 0); + OK(ftruncate(fd2, 0)); n = write(fd2, wal2, wal2_len); - munit_assert_llong(n, ==, wal2_len); + munit_assert_llong(n, ==, (ssize_t)wal2_len); close(fd2); } } -static void check_wals(const char *dbname, off_t wal1_len, off_t wal2_len) +/** + * Assert the lengths of WAL1 and WAL2 on disk. + */ +static void assert_wal_sizes(const struct node *node, + const char *dbname, + off_t wal1_len, + off_t wal2_len) { char buf[PATH_MAX]; struct stat st; int rv; - snprintf(buf, sizeof(buf), "%s-xwal1", dbname); + snprintf(buf, sizeof(buf), "%s/%s-xwal1", node->dir, dbname); rv = stat(buf, &st); - munit_assert_true((rv == 0 && st.st_size == wal1_len) || (rv < 0 && errno == ENOENT && wal1_len == 0)); + munit_assert_true((rv == 0 && st.st_size == wal1_len) || + (rv < 0 && errno == ENOENT && wal1_len == 0)); - snprintf(buf, sizeof(buf), "%s-xwal2", dbname); + snprintf(buf, sizeof(buf), "%s/%s-xwal2", node->dir, dbname); rv = stat(buf, &st); - munit_assert_true((rv == 0 && st.st_size == wal2_len) || (rv < 0 && errno == ENOENT && wal2_len == 0)); + munit_assert_true((rv == 0 && st.st_size == wal2_len) || + (rv < 0 && errno == ENOENT && wal2_len == 0)); } +static sqlite3_file *main_file(sqlite3 *db) +{ + sqlite3_file *fp; + sqlite3_file_control(db, "main", SQLITE_FCNTL_FILE_POINTER, &fp); + return fp; +} + +/** + * Single-node test with several transactions and a checkpoint. + */ TEST(vfs2, basic, set_up, tear_down, 0, NULL) { struct fixture *f = data; + struct node *node = &f->nodes[0]; int rv; - char buf[PATH_MAX]; - snprintf(buf, PATH_MAX, "%s/%s", f->dir, "test.db"); - sqlite3 *db; - rv = sqlite3_open(buf, &db); - munit_assert_int(rv, ==, SQLITE_OK); - - rv = sqlite3_exec(db, - "PRAGMA page_size=" PAGE_SIZE_STR ";" - "PRAGMA journal_mode=WAL;" - "PRAGMA wal_autocheckpoint=0", - NULL, NULL, NULL); - munit_assert_int(rv, ==, SQLITE_OK); - - char *args[] = {NULL, "page_size", NULL}; - rv = sqlite3_file_control(db, "main", SQLITE_FCNTL_PRAGMA, args); - sqlite3_file *fp; - sqlite3_file_control(db, "main", SQLITE_FCNTL_FILE_POINTER, &fp); - rv = vfs2_commit_barrier(fp); - munit_assert_int(rv, ==, 0); - rv = sqlite3_exec(db, "CREATE TABLE foo (bar INTEGER)", NULL, NULL, - NULL); - munit_assert_int(rv, ==, SQLITE_OK); + sqlite3 *db = node_open_db(node, "test.db"); + sqlite3_file *fp = main_file(db); + OK(sqlite3_exec(db, "CREATE TABLE foo (bar INTEGER)", NULL, NULL, + NULL)); struct vfs2_wal_slice sl; - rv = vfs2_poll(fp, NULL, NULL, &sl); - munit_assert_int(rv, ==, 0); - rv = vfs2_unhide(fp); - munit_assert_int(rv, ==, 0); + OK(vfs2_poll(fp, NULL, &sl)); + OK(vfs2_unhide(fp)); munit_assert_uint32(sl.start, ==, 0); munit_assert_uint32(sl.len, ==, 2); - rv = sqlite3_exec(db, "INSERT INTO foo (bar) VALUES (17)", NULL, NULL, - NULL); - munit_assert_int(rv, ==, SQLITE_OK); - tracef("aborting..."); - rv = vfs2_abort(fp); - munit_assert_int(rv, ==, 0); - - rv = sqlite3_exec(db, "INSERT INTO foo (bar) values (22)", NULL, NULL, - NULL); - munit_assert_int(rv, ==, 0); - rv = vfs2_poll(fp, NULL, NULL, &sl); - munit_assert_int(rv, ==, 0); + OK(sqlite3_exec(db, "INSERT INTO foo (bar) VALUES (17)", NULL, NULL, + NULL)); + OK(vfs2_abort(fp)); + + OK(sqlite3_exec(db, "INSERT INTO foo (bar) values (22)", NULL, NULL, + NULL)); + OK(vfs2_poll(fp, NULL, &sl)); munit_assert_uint32(sl.start, ==, 2); munit_assert_uint32(sl.len, ==, 1); - rv = vfs2_unhide(fp); - munit_assert_int(rv, ==, 0); + OK(vfs2_unhide(fp)); sqlite3_stmt *stmt; - rv = sqlite3_prepare_v2(db, "SELECT * FROM foo", -1, &stmt, NULL); - munit_assert_int(rv, ==, SQLITE_OK); + OK(sqlite3_prepare_v2(db, "SELECT * FROM foo", -1, &stmt, NULL)); rv = sqlite3_step(stmt); munit_assert_int(rv, ==, SQLITE_ROW); munit_assert_int(sqlite3_column_count(stmt), ==, 1); @@ -158,18 +193,15 @@ TEST(vfs2, basic, set_up, tear_down, 0, NULL) int nlog; int nckpt; - rv = sqlite3_wal_checkpoint_v2(db, "main", SQLITE_CHECKPOINT_PASSIVE, - &nlog, &nckpt); - munit_assert_int(rv, ==, SQLITE_OK); + OK(sqlite3_wal_checkpoint_v2(db, "main", SQLITE_CHECKPOINT_PASSIVE, + &nlog, &nckpt)); munit_assert_int(nlog, ==, 3); munit_assert_int(nckpt, ==, 3); - rv = sqlite3_exec(db, "INSERT INTO foo (bar) VALUES (101)", NULL, NULL, - NULL); - munit_assert_int(rv, ==, SQLITE_OK); + OK(sqlite3_exec(db, "INSERT INTO foo (bar) VALUES (101)", NULL, NULL, + NULL)); - rv = sqlite3_reset(stmt); - munit_assert_int(rv, ==, SQLITE_OK); + OK(sqlite3_reset(stmt)); rv = sqlite3_step(stmt); munit_assert_int(rv, ==, SQLITE_ROW); munit_assert_int(sqlite3_column_count(stmt), ==, 1); @@ -178,21 +210,17 @@ TEST(vfs2, basic, set_up, tear_down, 0, NULL) rv = sqlite3_step(stmt); munit_assert_int(rv, ==, SQLITE_DONE); - struct vfs2_wal_frame *frames; - unsigned n; - rv = vfs2_poll(fp, &frames, &n, &sl); - munit_assert_int(rv, ==, 0); - munit_assert_uint(n, ==, 1); + dqlite_vfs_frame *frames; + OK(vfs2_poll(fp, &frames, &sl)); + munit_assert_uint(sl.len, ==, 1); munit_assert_not_null(frames); - munit_assert_not_null(frames[0].page); - sqlite3_free(frames[0].page); + munit_assert_not_null(frames[0].data); + sqlite3_free(frames[0].data); sqlite3_free(frames); - rv = vfs2_unhide(fp); - munit_assert_int(rv, ==, 0); + OK(vfs2_unhide(fp)); - rv = sqlite3_reset(stmt); - munit_assert_int(rv, ==, SQLITE_OK); + OK(sqlite3_reset(stmt)); rv = sqlite3_step(stmt); munit_assert_int(rv, ==, SQLITE_ROW); munit_assert_int(sqlite3_column_count(stmt), ==, 1); @@ -203,163 +231,193 @@ TEST(vfs2, basic, set_up, tear_down, 0, NULL) rv = sqlite3_step(stmt); munit_assert_int(rv, ==, SQLITE_DONE); - rv = sqlite3_finalize(stmt); - munit_assert_int(rv, ==, SQLITE_OK); + OK(sqlite3_finalize(stmt)); sqlite3_close(db); return MUNIT_OK; } -#define WAL_SIZE_FROM_FRAMES(n) (32 + (24 + PAGE_SIZE) * (n)) - -static void make_wal_hdr(uint8_t *buf, uint32_t ckpoint_seqno, uint32_t salt1, uint32_t salt2) -{ - uint8_t *p = buf; - - /* checksum */ - BytePutBe32(0x377f0683, p); - p += 4; - BytePutBe32(3007000, p); - p += 4; - BytePutBe32(PAGE_SIZE, p); - p += 4; - BytePutBe32(ckpoint_seqno, p); - p += 4; - BytePutBe32(salt1, p); - p += 4; - BytePutBe32(salt2, p); - p += 4; - - uint32_t s0 = 0; - uint32_t s1 = 0; - size_t off = 0; - - s0 += ByteGetBe32(buf + off) + s1; - s1 += ByteGetBe32(buf + off + 4) + s0; - off += 8; - - s0 += ByteGetBe32(buf + off) + s1; - s1 += ByteGetBe32(buf + off + 4) + s0; - off += 8; - - s0 += ByteGetBe32(buf + off) + s1; - s1 += ByteGetBe32(buf + off + 4) + s0; - off += 8; - - BytePutBe32(s0, p); - p += 4; - BytePutBe32(s1, p); - p += 4; -} +#define WAL_SIZE_FROM_FRAMES(n) (VFS2_WAL_HDR_SIZE + (24 + PAGE_SIZE) * (n)) +/** + * When one WAL has a valid header and the other is empty, + * the nonempty one becomes WAL-cur. Then, the first write triggers a WAL + * swap, so the frames go to the *other* WAL. + */ TEST(vfs2, startup_one_nonempty, set_up, tear_down, 0, NULL) { struct fixture *f = data; - char buf[PATH_MAX]; + struct node *node = &f->nodes[0]; - snprintf(buf, PATH_MAX, "%s/%s", f->dir, "test.db"); + /* WAL2 has a header. */ + uint8_t wal2_hdronly[WAL_SIZE_FROM_FRAMES(0)] = { 0 }; + vfs2_ut_make_wal_hdr(wal2_hdronly, PAGE_SIZE, 0, 17, 103); + prepare_wals(node, "test.db", NULL, 0, wal2_hdronly, + sizeof(wal2_hdronly)); + sqlite3 *db = node_open_db(node, "test.db"); + OK(sqlite3_exec(db, "CREATE TABLE foo (n INTEGER)", NULL, NULL, NULL)); + OK(sqlite3_close(db)); - check_wals(buf, 0, 0); - - uint8_t wal2_hdronly[WAL_SIZE_FROM_FRAMES(0)] = {0}; - make_wal_hdr(wal2_hdronly, 0, 17, 103); - prepare_wals(buf, NULL, 0, wal2_hdronly, sizeof(wal2_hdronly)); - sqlite3 *db; - tracef("opening..."); - int rv = sqlite3_open(buf, &db); - munit_assert_int(rv, ==, SQLITE_OK); - tracef("setup..."); - rv = sqlite3_exec(db, - "PRAGMA page_size=" PAGE_SIZE_STR ";" - "PRAGMA journal_mode=WAL;" - "PRAGMA wal_autocheckpoint=0", - NULL, NULL, NULL); - munit_assert_int(rv, ==, SQLITE_OK); - sqlite3_file *fp; - sqlite3_file_control(db, "main", SQLITE_FCNTL_FILE_POINTER, &fp); - tracef("barrier..."); - rv = vfs2_commit_barrier(fp); - munit_assert_int(rv, ==, 0); - tracef("create table..."); - rv = sqlite3_exec(db, "CREATE TABLE foo (n INTEGER)", NULL, NULL, NULL); - munit_assert_int(rv, ==, SQLITE_OK); - tracef("closing..."); - rv = sqlite3_close(db); - munit_assert_int(rv, ==, SQLITE_OK); - - check_wals(buf, WAL_SIZE_FROM_FRAMES(2), WAL_SIZE_FROM_FRAMES(0)); + /* WAL1 ends up with the frames. */ + assert_wal_sizes(node, "test.db", WAL_SIZE_FROM_FRAMES(2), + WAL_SIZE_FROM_FRAMES(0)); return MUNIT_OK; } -TEST(vfs2, startup_both_nonempty, set_up, tear_down, 0, NULL) +/** + * When one WAL has a valid transaction and the other is empty, + * the WAL with the transaction becomes WAL-cur. The first write does not + * trigger a WAL swap, but rather goes to that same WAL. + */ +TEST(vfs2, startup_frames_in_one, set_up, tear_down, 0, NULL) { struct fixture *f = data; - char buf[PATH_MAX]; + struct node *node = &f->nodes[0]; + int rv; - snprintf(buf, PATH_MAX, "%s/%s", f->dir, "test.db"); + /* Set up a transaction in WAL2. */ + sqlite3 *db = node_open_db(node, "test.db"); + sqlite3_file *fp = main_file(db); + OK(sqlite3_exec(db, "CREATE TABLE foo (n INTEGER)", NULL, NULL, NULL)); - check_wals(buf, 0, 0); + struct vfs2_wal_slice sl; + OK(vfs2_poll(fp, NULL, &sl)); + OK(sqlite3_close(db)); + /* WAL2 has the frames. */ + assert_wal_sizes(node, "test.db", 0, WAL_SIZE_FROM_FRAMES(2)); + + db = node_open_db(node, "test.db"); + fp = main_file(db); + /* The transaction is not visible. */ + rv = sqlite3_exec(db, "SELECT * FROM foo", NULL, NULL, NULL); + munit_assert_int(rv, ==, SQLITE_ERROR); + /* The write lock is held. */ + rv = sqlite3_exec(db, "CREATE TABLE bar (k INTEGER)", NULL, NULL, NULL); + munit_assert_int(rv, ==, SQLITE_BUSY); + /* The transaction can be committed. */ + OK(vfs2_apply(fp, sl)); + /* The transaction is visible. */ + OK(sqlite3_exec(db, "SELECT * FROM foo", NULL, NULL, NULL)); + /* The write lock is not held. */ + OK(sqlite3_exec(db, "CREATE TABLE bar (k, INTEGER)", NULL, NULL, NULL)); + /* The write lock is released. */ + sqlite3_close(db); - uint8_t wal1_hdronly[WAL_SIZE_FROM_FRAMES(0)] = {0}; - make_wal_hdr(wal1_hdronly, 0, 18, 103); - uint8_t wal2_hdronly[WAL_SIZE_FROM_FRAMES(0)] = {0}; - make_wal_hdr(wal2_hdronly, 0, 17, 103); - prepare_wals(buf, wal1_hdronly, sizeof(wal1_hdronly), wal2_hdronly, sizeof(wal2_hdronly)); - sqlite3 *db; - int rv = sqlite3_open(buf, &db); - munit_assert_int(rv, ==, SQLITE_OK); - rv = sqlite3_exec(db, - "PRAGMA page_size=" PAGE_SIZE_STR ";" - "PRAGMA journal_mode=WAL;" - "PRAGMA wal_autocheckpoint=0", - NULL, NULL, NULL); - munit_assert_int(rv, ==, SQLITE_OK); - rv = sqlite3_exec(db, "CREATE TABLE foo (n INTEGER)", NULL, NULL, NULL); - munit_assert_int(rv, ==, SQLITE_OK); - rv = sqlite3_close(db); - munit_assert_int(rv, ==, SQLITE_OK); + return MUNIT_OK; +} - check_wals(buf, WAL_SIZE_FROM_FRAMES(0), WAL_SIZE_FROM_FRAMES(2)); +/** + * When both WALs are nonempty at startup, the one with the higher salt1 + * value becomes WAL-cur. Then, the first write triggers a WAL swap, so + * the frames go to the *other* WAL. + */ +TEST(vfs2, startup_both_nonempty, set_up, tear_down, 0, NULL) +{ + struct fixture *f = data; + struct node *node = &f->nodes[0]; + + /* WAL1 has the higher salt1. */ + uint8_t wal1_hdronly[WAL_SIZE_FROM_FRAMES(0)] = { 0 }; + vfs2_ut_make_wal_hdr(wal1_hdronly, PAGE_SIZE, 0, 18, 103); + uint8_t wal2_hdronly[WAL_SIZE_FROM_FRAMES(0)] = { 0 }; + vfs2_ut_make_wal_hdr(wal2_hdronly, PAGE_SIZE, 0, 17, 103); + prepare_wals(node, "test.db", wal1_hdronly, sizeof(wal1_hdronly), + wal2_hdronly, sizeof(wal2_hdronly)); + sqlite3 *db = node_open_db(node, "test.db"); + OK(sqlite3_exec(db, "CREATE TABLE foo (n INTEGER)", NULL, NULL, NULL)); + OK(sqlite3_close(db)); + + /* WAL2 ends up with the frames. */ + assert_wal_sizes(node, "test.db", WAL_SIZE_FROM_FRAMES(0), + WAL_SIZE_FROM_FRAMES(2)); return MUNIT_OK; } +/** + * Single-node test of rolling back a transaction. + */ TEST(vfs2, rollback, set_up, tear_down, 0, NULL) { struct fixture *f = data; - char buf[PATH_MAX]; - - snprintf(buf, PATH_MAX, "%s/%s", f->dir, "test.db"); - - sqlite3 *db; - int rv = sqlite3_open(buf, &db); - munit_assert_int(rv, ==, SQLITE_OK); - - rv = sqlite3_exec(db, - "PRAGMA journal_mode=WAL;" - "PRAGMA wal_autocheckpoint=0", - NULL, NULL, NULL); - munit_assert_int(rv, ==, SQLITE_OK); - rv = sqlite3_exec(db, "CREATE TABLE foo (n INTEGER)", NULL, NULL, NULL); - munit_assert_int(rv, ==, SQLITE_OK); - sqlite3_file *fp; - sqlite3_file_control(db, "main", SQLITE_FCNTL_FILE_POINTER, &fp); + struct node *node = &f->nodes[0]; struct vfs2_wal_slice sl; - rv = vfs2_poll(fp, NULL, NULL, &sl); - munit_assert_int(rv, ==, 0); - rv = vfs2_unhide(fp); - rv = sqlite3_exec(db, "BEGIN", NULL, NULL, NULL); - munit_assert_int(rv, ==, SQLITE_OK); - char sql[100]; + int rv; + + sqlite3 *db = node_open_db(node, "test.db"); + OK(sqlite3_exec(db, "CREATE TABLE foo (n INTEGER)", NULL, NULL, NULL)); + sqlite3_file *fp = main_file(db); + OK(vfs2_poll(fp, NULL, &sl)); + OK(vfs2_unhide(fp)); + sqlite3_stmt *stmt; + OK(sqlite3_prepare_v2(db, "INSERT INTO foo (n) VALUES (?)", -1, &stmt, NULL)); + OK(sqlite3_exec(db, "BEGIN", NULL, NULL, NULL)); for (unsigned i = 0; i < 500; i++) { - snprintf(sql, sizeof(sql), "INSERT INTO foo (n) VALUES (%d)", i); - rv = sqlite3_exec(db, sql, NULL, NULL, NULL); - munit_assert_int(rv, ==, SQLITE_OK); + OK(sqlite3_bind_int(stmt, 1, i)); + rv = sqlite3_step(stmt); + munit_assert_int(rv, ==, SQLITE_DONE); + OK(sqlite3_reset(stmt)); } - rv = sqlite3_exec(db, "ROLLBACK", NULL, NULL, NULL); - munit_assert_int(rv, ==, SQLITE_OK); - rv = sqlite3_close(db); - munit_assert_int(rv, ==, SQLITE_OK); + OK(sqlite3_finalize(stmt)); + OK(sqlite3_exec(db, "ROLLBACK", NULL, NULL, NULL)); + OK(sqlite3_close(db)); + + return MUNIT_OK; +} +/** + * Two-node test covering the full replication cycle. + */ +TEST(vfs2, leader_and_follower, set_up, tear_down, 0, NULL) +{ + struct fixture *f = data; + struct node *leader = &f->nodes[0]; + struct node *follower = &f->nodes[1]; + int rv; + + /* The leader executes and polls a transaction. */ + sqlite3 *leader_db = node_open_db(leader, "test.db"); + OK(sqlite3_exec(leader_db, "CREATE TABLE foo (n INTEGER)", NULL, NULL, + NULL)); + /* WAL2 gets the frames after a WAL swap. */ + assert_wal_sizes(leader, "test.db", 0, WAL_SIZE_FROM_FRAMES(2)); + sqlite3_file *leader_fp = main_file(leader_db); + dqlite_vfs_frame *frames; + struct vfs2_wal_slice leader_sl; + OK(vfs2_poll(leader_fp, &frames, &leader_sl)); + munit_assert_uint(leader_sl.len, ==, 2); + + /* The follower opens its database. */ + sqlite3 *follower_db = node_open_db(follower, "test.db"); + sqlite3_file *follower_fp = main_file(follower_db); + vfs2_ut_sm_relate(leader_fp, follower_fp); + + /* The follower receives the transaction. */ + struct vfs2_wal_slice follower_sl; + OK(vfs2_add_uncommitted(follower_fp, PAGE_SIZE, frames, leader_sl.len, + &follower_sl)); + /* WAL2 gets the frames after a WAL swap. */ + assert_wal_sizes(follower, "test.db", 0, WAL_SIZE_FROM_FRAMES(2)); + sqlite3_free(frames[0].data); + sqlite3_free(frames[1].data); + sqlite3_free(frames); + /* The transaction is not visible, and the write lock is held. */ + rv = sqlite3_exec(follower_db, "SELECT * FROM foo", NULL, NULL, NULL); + munit_assert_int(rv, ==, SQLITE_ERROR); + rv = sqlite3_exec(follower_db, "CREATE TABLE bar (k INTEGER)", NULL, NULL, NULL); + munit_assert_int(rv, ==, SQLITE_BUSY); + + /* The leader receives the follower's acknowledgement + * and applies the transaction locally. */ + OK(vfs2_unhide(leader_fp)); + + /* The follower learns the new commit index and applies + * the transaction locally. */ + OK(vfs2_apply(follower_fp, follower_sl)); + /* The transaction is visible and the write lock is released. */ + OK(sqlite3_exec(follower_db, "INSERT INTO foo (n) VALUES (17)", NULL, NULL, NULL)); + + sqlite3_close(follower_db); + sqlite3_close(leader_db); return MUNIT_OK; }