diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index d79b6a27f..6da783fc9 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -16,9 +16,6 @@ jobs: compiler: - gcc - clang - dqlite-next: - - yes - - no runs-on: ${{ matrix.os }} steps: @@ -35,7 +32,7 @@ jobs: run: | autoreconf -i ./configure --enable-debug --enable-code-coverage --enable-sanitize \ - --enable-build-raft --enable-dqlite-next=${{ matrix.dqlite-next }} + --enable-build-raft make -j4 unit-test integration-test \ raft-core-fuzzy-test \ raft-core-integration-test \ diff --git a/Makefile.am b/Makefile.am index fb601c71b..2da3b66d2 100644 --- a/Makefile.am +++ b/Makefile.am @@ -4,10 +4,6 @@ AM_CFLAGS += $(CODE_COVERAGE_CFLAGS) AM_CFLAGS += $(SQLITE_CFLAGS) $(UV_CFLAGS) $(PTHREAD_CFLAGS) AM_LDFLAGS = $(UV_LIBS) $(PTHREAD_LIBS) -if DQLITE_NEXT_ENABLED -AM_CFLAGS += -DDQLITE_NEXT -endif - if !BUILD_RAFT_ENABLED AM_CFLAGS += $(RAFT_CFLAGS) -DUSE_SYSTEM_RAFT AM_LDFLAGS += $(RAFT_LIBS) diff --git a/configure.ac b/configure.ac index d673a9b6d..07043922a 100644 --- a/configure.ac +++ b/configure.ac @@ -39,10 +39,6 @@ AM_CONDITIONAL(BUILD_SQLITE_ENABLED, test "x$enable_build_sqlite" = "xyes") AC_ARG_ENABLE(build-raft, AS_HELP_STRING([--enable-build-raft[=ARG]], [use the bundled raft sources instead of linking to libraft [default=no]])) AM_CONDITIONAL(BUILD_RAFT_ENABLED, test "x$enable_build_raft" = "xyes") -AC_ARG_ENABLE(dqlite-next, AS_HELP_STRING([--enable-dqlite-next[=ARG]], [build with the experimental dqlite backend [default=no]])) -AM_CONDITIONAL(DQLITE_NEXT_ENABLED, test "x$enable_dqlite_next" = "xyes") -AS_IF([test "x$enable_build_raft" != "xyes" -a "x$enable_dqlite_next" = "xyes"], [AC_MSG_ERROR([dqlite-next requires bundled raft])], []) - # Whether to enable code coverage. AX_CODE_COVERAGE diff --git a/include/dqlite.h b/include/dqlite.h index b558a85a3..a906f0a05 100644 --- a/include/dqlite.h +++ b/include/dqlite.h @@ -268,6 +268,18 @@ DQLITE_API int dqlite_node_create(dqlite_node_id id, const char *data_dir, dqlite_node **n); +/** + * Create a new dqlite node object using experimental features. + * + * The created node will store the databases and WAL files on disk, instead of + * in memory. This feature is incomplete and not suitable for general use. + */ +DQLITE_API DQLITE_EXPERIMENTAL +int dqlite_node_create_v2(dqlite_node_id id, + const char *addr, + const char *data_dir, + dqlite_node **t); + /** * Destroy a dqlite node object. * @@ -400,12 +412,9 @@ DQLITE_API int dqlite_node_set_snapshot_params(dqlite_node *n, DQLITE_API int dqlite_node_set_block_size(dqlite_node *n, size_t size); /** - * WARNING: This is an experimental API. - * - * By default dqlite holds the SQLite database file and WAL in memory. By - * enabling disk-mode, dqlite will hold the SQLite database file on-disk while - * keeping the WAL in memory. Has to be called after `dqlite_node_create` and - * before `dqlite_node_start`. + * This function was formerly used to request that database and WAL files be + * stored on disk, but is now a no-op. It is deprecated, and + * dqlite_node_create_v2 should be used instead. */ DQLITE_API int dqlite_node_enable_disk_mode(dqlite_node *n); diff --git a/src/gateway.c b/src/gateway.c index 0ba1f60f4..225c470e0 100644 --- a/src/gateway.c +++ b/src/gateway.c @@ -554,22 +554,6 @@ static void query_batch_async(struct handle *req, enum pool_half half) } } -#ifdef DQLITE_NEXT - -static void qb_top(pool_work_t *w) -{ - struct handle *req = CONTAINER_OF(w, struct handle, work); - query_batch_async(req, POOL_TOP_HALF); -} - -static void qb_bottom(pool_work_t *w) -{ - struct handle *req = CONTAINER_OF(w, struct handle, work); - query_batch_async(req, POOL_BOTTOM_HALF); -} - -#endif - static void query_batch(struct gateway *g) { struct handle *req = g->req; @@ -577,16 +561,8 @@ static void query_batch(struct gateway *g) g->req = NULL; req->gw = g; -#ifdef DQLITE_NEXT - struct dqlite_node *node = g->raft->data; - pool_t *pool = !!(pool_ut_fallback()->flags & POOL_FOR_UT) - ? pool_ut_fallback() : &node->pool; - pool_queue_work(pool, &req->work, g->leader->db->cookie, - WT_UNORD, qb_top, qb_bottom); -#else query_batch_async(req, POOL_TOP_HALF); query_batch_async(req, POOL_BOTTOM_HALF); -#endif } static void query_barrier_cb(struct barrier *barrier, int status) diff --git a/src/leader.c b/src/leader.c index 61ef91639..d0d8043ef 100644 --- a/src/leader.c +++ b/src/leader.c @@ -417,22 +417,6 @@ static void leaderExecV2(struct exec *req, enum pool_half half) leaderExecDone(l->exec); } -#ifdef DQLITE_NEXT - -static void exec_top(pool_work_t *w) -{ - struct exec *req = CONTAINER_OF(w, struct exec, work); - leaderExecV2(req, POOL_TOP_HALF); -} - -static void exec_bottom(pool_work_t *w) -{ - struct exec *req = CONTAINER_OF(w, struct exec, work); - leaderExecV2(req, POOL_BOTTOM_HALF); -} - -#endif - static void execBarrierCb(struct barrier *barrier, int status) { tracef("exec barrier cb status %d", status); @@ -445,16 +429,8 @@ static void execBarrierCb(struct barrier *barrier, int status) return; } -#ifdef DQLITE_NEXT - struct dqlite_node *node = l->raft->data; - pool_t *pool = !!(pool_ut_fallback()->flags & POOL_FOR_UT) - ? pool_ut_fallback() : &node->pool; - pool_queue_work(pool, &req->work, l->db->cookie, - WT_UNORD, exec_top, exec_bottom); -#else leaderExecV2(req, POOL_TOP_HALF); leaderExecV2(req, POOL_BOTTOM_HALF); -#endif } int leader__exec(struct leader *l, diff --git a/src/raft.h b/src/raft.h index b9f4d2a45..e7ff2c42b 100644 --- a/src/raft.h +++ b/src/raft.h @@ -1499,6 +1499,29 @@ RAFT_API void raft_uv_set_tracer(struct raft_io *io, */ RAFT_API void raft_uv_set_auto_recovery(struct raft_io *io, bool flag); +/** + * Values for the format version number that describes the segment and snapshot + * files. + */ +enum raft_uv_format_version { + RAFT_UV_FORMAT_V1 = 1, /* Original */ + RAFT_UV_FORMAT_V2, /* Added local_data fields */ + RAFT_UV_FORMAT_NR +}; + +/** + * Select which disk format version should be used. + * + * If this is called after the `init` callback from the raft_io vtable, + * and some metadata files were found on disk, the @version is compared + * to the format version found in the metadata files, and an error is + * returned if they don't match. + * + * RAFT_UV_FORMAT_V1 is used by default. + */ +RAFT_API void raft_uv_set_format_version(struct raft_io *io, + enum raft_uv_format_version version); + /** * Callback invoked by the transport implementation when a new incoming * connection has been established. diff --git a/src/raft/uv.c b/src/raft/uv.c index f1450e742..949f5a4c1 100644 --- a/src/raft/uv.c +++ b/src/raft/uv.c @@ -112,6 +112,12 @@ static int uvInit(struct raft_io *io, raft_id id, const char *address) if (rv != 0) { return rv; } + if (metadata.format_version != 0 && + metadata.format_version != uv->format_version) { + ErrMsgPrintf(io->errmsg, "decode metadata: bad format version %u", + metadata.format_version); + return RAFT_MALFORMED; + } uv->metadata = metadata; rv = uv->transport->init(uv->transport, id, address); @@ -512,6 +518,7 @@ static int uvSetTerm(struct raft_io *io, const raft_term term) struct uv *uv; int rv; uv = io->impl; + uv->metadata.format_version = uv->format_version; uv->metadata.version++; uv->metadata.term = term; uv->metadata.voted_for = 0; @@ -528,6 +535,7 @@ static int uvSetVote(struct raft_io *io, const raft_id server_id) struct uv *uv; int rv; uv = io->impl; + uv->metadata.format_version = uv->format_version; uv->metadata.version++; uv->metadata.voted_for = server_id; rv = uvMetadataStore(uv, &uv->metadata); @@ -723,6 +731,7 @@ int raft_uv_init(struct raft_io *io, uv->closing = false; uv->close_cb = NULL; uv->auto_recovery = true; + uv->format_version = RAFT_UV_FORMAT_V1; uvSeedRand(uv); @@ -812,4 +821,12 @@ void raft_uv_set_auto_recovery(struct raft_io *io, bool flag) uv->auto_recovery = flag; } +void raft_uv_set_format_version(struct raft_io *io, + enum raft_uv_format_version version) +{ + PRE(RAFT_UV_FORMAT_V1 <= version && version < RAFT_UV_FORMAT_NR); + struct uv *uv = io->impl; + uv->format_version = version; +} + #undef tracef diff --git a/src/raft/uv.h b/src/raft/uv.h index 3d0fd342b..b9b0189cc 100644 --- a/src/raft/uv.h +++ b/src/raft/uv.h @@ -47,6 +47,7 @@ typedef unsigned long long uvCounter; /* Information persisted in a single metadata file. */ struct uvMetadata { + enum raft_uv_format_version format_version; unsigned long long version; /* Monotonically increasing version */ raft_term term; /* Current term */ raft_id voted_for; /* Server ID of last vote, or 0 */ @@ -95,6 +96,7 @@ struct uv bool closing; /* True if we are closing */ raft_io_close_cb close_cb; /* Invoked when finishing closing */ bool auto_recovery; /* Try to recover from corrupt segments */ + enum raft_uv_format_version format_version; }; /* Implementation of raft_io->truncate. */ @@ -175,13 +177,16 @@ int uvSegmentLoadAll(struct uv *uv, * The memory is aligned at disk block boundary, to allow for direct I/O. */ struct uvSegmentBuffer { + enum raft_uv_format_version format_version; size_t block_size; /* Disk block size for direct I/O */ uv_buf_t arena; /* Previously allocated memory that can be re-used */ size_t n; /* Write offset */ }; /* Initialize an empty buffer. */ -void uvSegmentBufferInit(struct uvSegmentBuffer *b, size_t block_size); +void uvSegmentBufferInit(struct uvSegmentBuffer *b, + enum raft_uv_format_version format_version, + size_t block_size); /* Release all memory used by the buffer. */ void uvSegmentBufferClose(struct uvSegmentBuffer *b); @@ -196,7 +201,8 @@ int uvSegmentBufferFormat(struct uvSegmentBuffer *b); * will be appended. */ int uvSegmentBufferAppend(struct uvSegmentBuffer *b, const struct raft_entry entries[], - unsigned n_entries); + unsigned n_entries, + enum raft_uv_format_version format_version); /* After all entries to write have been encoded, finalize the buffer by zeroing * the unused memory of the last block. The out parameter will point to the diff --git a/src/raft/uv_append.c b/src/raft/uv_append.c index 1544c274e..fd7ea20ae 100644 --- a/src/raft/uv_append.c +++ b/src/raft/uv_append.c @@ -176,7 +176,7 @@ static int uvAliveSegmentEncodeEntriesToWriteBuf(struct uvAliveSegment *segment, } rv = uvSegmentBufferAppend(&segment->pending, append->entries, - append->n); + append->n, segment->uv->format_version); if (rv != 0) { return rv; } @@ -512,7 +512,7 @@ static void uvAliveSegmentInit(struct uvAliveSegment *s, struct uv *uv) s->last_index = 0; s->size = sizeof(uint64_t) /* Format version */; s->next_block = 0; - uvSegmentBufferInit(&s->pending, uv->block_size); + uvSegmentBufferInit(&s->pending, uv->format_version, uv->block_size); s->written = 0; s->barrier = NULL; s->finalize = false; @@ -593,11 +593,12 @@ static void uvAliveSegmentReserveSegmentCapacity(struct uvAliveSegment *s, /* Return the number of bytes needed to store the batch of entries of this * append request on disk. */ -static size_t uvAppendSize(struct uvAppend *a) +static size_t uvAppendSize(struct uvAppend *a, + enum raft_uv_format_version format_version) { size_t size = sizeof(uint32_t) * 2; /* CRC checksums */ unsigned i; - size += uvSizeofBatchHeader(a->n, true); /* Batch header */ + size += uvSizeofBatchHeader(a->n, format_version); /* Batch header */ for (i = 0; i < a->n; i++) { /* Entries data */ size += bytePad64(a->entries[i].buf.len); } @@ -618,7 +619,7 @@ static int uvAppendEnqueueRequest(struct uv *uv, struct uvAppend *append) assert(uv->append_next_index > 0); tracef("enqueue %u entries", append->n); - size = uvAppendSize(append); + size = uvAppendSize(append, uv->format_version); /* If we have no segments yet, it means this is the very first append, * and we need to add a new segment. Otherwise we check if the last diff --git a/src/raft/uv_encoding.c b/src/raft/uv_encoding.c index 2714f643a..48c6586d8 100644 --- a/src/raft/uv_encoding.c +++ b/src/raft/uv_encoding.c @@ -4,6 +4,7 @@ #include #include "../raft.h" +#include "../utils.h" #include "assert.h" #include "byte.h" #include "configuration.h" @@ -86,14 +87,12 @@ static size_t sizeofTimeoutNow(void) sizeof(uint64_t) /* Last log term. */; } -size_t uvSizeofBatchHeader(size_t n, bool with_local_data) +size_t uvSizeofBatchHeader(size_t n, enum raft_uv_format_version format_version) { size_t res = 8 + /* Number of entries in the batch, little endian */ 16 * n; /* One header per entry */; - if (with_local_data) { -#ifdef DQLITE_NEXT + if (format_version == RAFT_UV_FORMAT_V2) { res += 8; /* Local data length, applies to all entries */ -#endif } return res; } @@ -143,7 +142,9 @@ static void encodeAppendEntries(const struct raft_append_entries *p, void *buf) bytePut64(&cursor, p->prev_log_term); /* Previous term. */ bytePut64(&cursor, p->leader_commit); /* Commit index. */ - uvEncodeBatchHeader(p->entries, p->n_entries, cursor, false /* no local data */); + /* Note: use RAFT_UV_FORMAT_V1 unconditionally because local data doesn't + * appear in the AppendEntries message. */ + uvEncodeBatchHeader(p->entries, p->n_entries, cursor, RAFT_UV_FORMAT_V1); } static void encodeAppendEntriesResult( @@ -302,7 +303,7 @@ int uvEncodeMessage(const struct raft_message *message, void uvEncodeBatchHeader(const struct raft_entry *entries, unsigned n, void *buf, - bool with_local_data) + enum raft_uv_format_version format_version) { unsigned i; void *cursor = buf; @@ -310,11 +311,9 @@ void uvEncodeBatchHeader(const struct raft_entry *entries, /* Number of entries in the batch, little endian */ bytePut64(&cursor, n); - if (with_local_data) { -#ifdef DQLITE_NEXT + if (format_version == RAFT_UV_FORMAT_V2) { /* Local data size per entry, little endian */ bytePut64(&cursor, (uint64_t)sizeof(struct raft_entry_local_data)); -#endif } for (i = 0; i < n; i++) { @@ -378,7 +377,8 @@ static void decodeRequestVoteResult(const uv_buf_t *buf, int uvDecodeBatchHeader(const void *batch, struct raft_entry **entries, unsigned *n, - uint64_t *local_data_size) + uint64_t *local_data_size, + enum raft_uv_format_version format_version) { const void *cursor = batch; size_t i; @@ -391,15 +391,13 @@ int uvDecodeBatchHeader(const void *batch, return 0; } - if (local_data_size != NULL) { -#ifdef DQLITE_NEXT + if (format_version == RAFT_UV_FORMAT_V2) { uint64_t z = byteGet64(&cursor); if (z == 0 || z > sizeof(struct raft_entry_local_data) || z % sizeof(uint64_t) != 0) { rv = RAFT_MALFORMED; goto err; } *local_data_size = z; -#endif } *entries = raft_malloc(*n * sizeof **entries); @@ -456,7 +454,10 @@ static int decodeAppendEntries(const uv_buf_t *buf, args->prev_log_term = byteGet64(&cursor); args->leader_commit = byteGet64(&cursor); - rv = uvDecodeBatchHeader(cursor, &args->entries, &args->n_entries, false); + /* Note: use RAFT_UV_FORMAT_V1 unconditionally because local data doesn't + * appear in the AppendEntries message. */ + rv = uvDecodeBatchHeader(cursor, &args->entries, &args->n_entries, + NULL, RAFT_UV_FORMAT_V1); if (rv != 0) { return rv; } @@ -579,10 +580,13 @@ int uvDecodeEntriesBatch(uint8_t *batch, size_t offset, struct raft_entry *entries, unsigned n, - uint64_t local_data_size) + uint64_t local_data_size, + enum raft_uv_format_version format_version) { uint8_t *cursor; + PRE(ERGO(format_version == RAFT_UV_FORMAT_V1, local_data_size == 0)); + assert(batch != NULL); cursor = batch + offset; @@ -603,10 +607,10 @@ int uvDecodeEntriesBatch(uint8_t *batch, entry->local_data = (struct raft_entry_local_data){}; assert(local_data_size <= sizeof(entry->local_data.buf)); assert(local_data_size % 8 == 0); -#ifdef DQLITE_NEXT - memcpy(entry->local_data.buf, cursor, local_data_size); - cursor += local_data_size; -#endif + if (format_version == RAFT_UV_FORMAT_V2) { + memcpy(entry->local_data.buf, cursor, local_data_size); + cursor += local_data_size; + } } return 0; } diff --git a/src/raft/uv_encoding.h b/src/raft/uv_encoding.h index 851966fb7..a4e2418df 100644 --- a/src/raft/uv_encoding.h +++ b/src/raft/uv_encoding.h @@ -7,13 +7,6 @@ #include "../raft.h" -/* Current disk format version. */ -#ifdef DQLITE_NEXT -#define UV__DISK_FORMAT 2 -#else -#define UV__DISK_FORMAT 1 -#endif - int uvEncodeMessage(const struct raft_message *message, uv_buf_t **bufs, unsigned *n_bufs); @@ -26,13 +19,15 @@ int uvDecodeMessage(uint16_t type, int uvDecodeBatchHeader(const void *batch, struct raft_entry **entries, unsigned *n, - uint64_t *local_data_size); + uint64_t *local_data_size, + enum raft_uv_format_version format_version); int uvDecodeEntriesBatch(uint8_t *batch, size_t offset, struct raft_entry *entries, unsigned n, - uint64_t local_data_size); + uint64_t local_data_size, + enum raft_uv_format_version format_version); /** * The layout of the memory pointed at by a @batch pointer is the following: @@ -57,11 +52,11 @@ int uvDecodeEntriesBatch(uint8_t *batch, * arbitrary lengths, possibly padded with extra bytes to reach 8-byte boundary * (which means that all entry data pointers are 8-byte aligned). */ -size_t uvSizeofBatchHeader(size_t n, bool with_local_data); +size_t uvSizeofBatchHeader(size_t n, enum raft_uv_format_version format_version); void uvEncodeBatchHeader(const struct raft_entry *entries, unsigned n, void *buf, - bool with_local_data); + enum raft_uv_format_version format_version); #endif /* UV_ENCODING_H_ */ diff --git a/src/raft/uv_metadata.c b/src/raft/uv_metadata.c index aee87323f..956cf8f1b 100644 --- a/src/raft/uv_metadata.c +++ b/src/raft/uv_metadata.c @@ -14,7 +14,7 @@ static void uvMetadataEncode(const struct uvMetadata *metadata, void *buf) { void *cursor = buf; - bytePut64(&cursor, UV__DISK_FORMAT); + bytePut64(&cursor, metadata->format_version); bytePut64(&cursor, metadata->version); bytePut64(&cursor, metadata->term); bytePut64(&cursor, metadata->voted_for); @@ -26,21 +26,22 @@ static int uvMetadataDecode(const void *buf, char *errmsg) { const void *cursor = buf; - uint64_t format; - format = byteGet64(&cursor); - if (format != UV__DISK_FORMAT) { - ErrMsgPrintf(errmsg, "bad format version %ju", format); + + uint64_t format_version = byteGet64(&cursor); + if (!(RAFT_UV_FORMAT_V1 <= format_version && + format_version < RAFT_UV_FORMAT_NR)) { + ErrMsgPrintf(errmsg, "bad format version %ju", format_version); return RAFT_MALFORMED; } - metadata->version = byteGet64(&cursor); - metadata->term = byteGet64(&cursor); - metadata->voted_for = byteGet64(&cursor); - - /* Coherence checks that values make sense */ - if (metadata->version == 0) { + metadata->format_version = (enum raft_uv_format_version)format_version; + uint64_t version = byteGet64(&cursor); + if (version == 0) { ErrMsgPrintf(errmsg, "version is set to zero"); return RAFT_CORRUPT; } + metadata->version = version; + metadata->term = byteGet64(&cursor); + metadata->voted_for = byteGet64(&cursor); return 0; } @@ -127,7 +128,9 @@ static int uvMetadataLoadN(const char *dir, return 0; } -int uvMetadataLoad(const char *dir, struct uvMetadata *metadata, char *errmsg) +int uvMetadataLoad(const char *dir, + struct uvMetadata *metadata, + char *errmsg) { struct uvMetadata metadata1; struct uvMetadata metadata2; @@ -146,9 +149,7 @@ int uvMetadataLoad(const char *dir, struct uvMetadata *metadata, char *errmsg) /* Check the versions. */ if (metadata1.version == 0 && metadata2.version == 0) { /* Neither metadata file exists: have a brand new server. */ - metadata->version = 0; - metadata->term = 0; - metadata->voted_for = 0; + *metadata = (struct uvMetadata){}; } else if (metadata1.version == metadata2.version) { /* The two metadata files can't have the same version. */ ErrMsgPrintf(errmsg, @@ -182,7 +183,8 @@ int uvMetadataStore(struct uv *uv, const struct uvMetadata *metadata) unsigned short n; int rv; - assert(metadata->version > 0); + PRE(RAFT_UV_FORMAT_V1 <= metadata->format_version && + metadata->format_version < RAFT_UV_FORMAT_NR); /* Encode the given metadata. */ uvMetadataEncode(metadata, content); diff --git a/src/raft/uv_recv.c b/src/raft/uv_recv.c index 01e4ffc73..ccd53fbeb 100644 --- a/src/raft/uv_recv.c +++ b/src/raft/uv_recv.c @@ -290,12 +290,15 @@ static void uvServerReadCb(uv_stream_t *stream, case RAFT_IO_APPEND_ENTRIES: payload.base = s->payload.base; payload.len = s->payload.len; + /* Note: use RAFT_UV_FORMAT_V1 unconditionally + * because local data doesn't appear in the + * AppendEntries message. */ (void)uvDecodeEntriesBatch( payload.base, 0, s->message.append_entries.entries, s->message.append_entries .n_entries, - false); + 0, RAFT_UV_FORMAT_V1); break; case RAFT_IO_INSTALL_SNAPSHOT: s->message.install_snapshot.data.base = diff --git a/src/raft/uv_segment.c b/src/raft/uv_segment.c index d54841bb2..5c0c9e245 100644 --- a/src/raft/uv_segment.c +++ b/src/raft/uv_segment.c @@ -266,11 +266,11 @@ static int uvLoadEntriesBatch(struct uv *uv, /* Consume the batch header, excluding the first 8 bytes containing the * number of entries, which we have already read. */ - header.len = uvSizeofBatchHeader(n, true); + header.len = uvSizeofBatchHeader(n, uv->format_version); header.base = batch; rv = uvConsumeContent(content, offset, - uvSizeofBatchHeader(n, true) - sizeof(uint64_t), NULL, + uvSizeofBatchHeader(n, uv->format_version) - sizeof(uint64_t), NULL, errmsg); if (rv != 0) { ErrMsgTransfer(errmsg, uv->io->errmsg, "read header"); @@ -289,7 +289,7 @@ static int uvLoadEntriesBatch(struct uv *uv, /* Decode the batch header, allocating the entries array. */ uint64_t local_data_size = 0; - rv = uvDecodeBatchHeader(header.base, entries, n_entries, &local_data_size); + rv = uvDecodeBatchHeader(header.base, entries, n_entries, &local_data_size, uv->format_version); if (rv != 0) { goto err; } @@ -299,9 +299,9 @@ static int uvLoadEntriesBatch(struct uv *uv, data.len = 0; for (i = 0; i < n; i++) { data.len += (*entries)[i].buf.len; -#ifdef DQLITE_NEXT - data.len += sizeof((*entries)[i].local_data); -#endif + if (uv->format_version == RAFT_UV_FORMAT_V2) { + data.len += sizeof((*entries)[i].local_data); + } } data.base = (uint8_t *)content->base + *offset; @@ -324,7 +324,7 @@ static int uvLoadEntriesBatch(struct uv *uv, } rv = uvDecodeEntriesBatch(content->base, *offset - data.len, *entries, - *n_entries, local_data_size); + *n_entries, local_data_size, uv->format_version); if (rv != 0) { goto err_after_header_decode; } @@ -405,7 +405,7 @@ int uvSegmentLoadClosed(struct uv *uv, if (rv != 0) { goto err; } - if (format != UV__DISK_FORMAT) { + if (format != uv->format_version) { ErrMsgPrintf(uv->io->errmsg, "unexpected format version %ju", format); rv = RAFT_CORRUPT; @@ -527,7 +527,7 @@ static int uvSegmentLoadOpen(struct uv *uv, /* Check that the format is the expected one, or perhaps 0, indicating * that the segment was allocated but never written. */ offset = sizeof format; - if (format != UV__DISK_FORMAT) { + if (format != uv->format_version) { if (format == 0) { all_zeros = uvContentHasOnlyTrailingZeros(&buf, offset); if (all_zeros) { @@ -701,8 +701,11 @@ static int uvEnsureSegmentBufferIsLargeEnough(struct uvSegmentBuffer *b, return 0; } -void uvSegmentBufferInit(struct uvSegmentBuffer *b, size_t block_size) +void uvSegmentBufferInit(struct uvSegmentBuffer *b, + enum raft_uv_format_version format_version, + size_t block_size) { + b->format_version = format_version; b->block_size = block_size; b->arena.base = NULL; b->arena.len = 0; @@ -729,13 +732,14 @@ int uvSegmentBufferFormat(struct uvSegmentBuffer *b) } b->n = n; cursor = b->arena.base; - bytePut64(&cursor, UV__DISK_FORMAT); + bytePut64(&cursor, b->format_version); return 0; } int uvSegmentBufferAppend(struct uvSegmentBuffer *b, const struct raft_entry entries[], - unsigned n_entries) + unsigned n_entries, + enum raft_uv_format_version format_version) { size_t size; /* Total size of the batch */ uint32_t crc1; /* Header checksum */ @@ -748,12 +752,12 @@ int uvSegmentBufferAppend(struct uvSegmentBuffer *b, int rv; size = sizeof(uint32_t) * 2; /* CRC checksums */ - size += uvSizeofBatchHeader(n_entries, true); /* Batch header */ + size += uvSizeofBatchHeader(n_entries, format_version); /* Batch header */ for (i = 0; i < n_entries; i++) { /* Entries data */ size += bytePad64(entries[i].buf.len); -#ifdef DQLITE_NEXT - size += sizeof(struct raft_entry_local_data); -#endif + if (format_version == RAFT_UV_FORMAT_V2) { + size += sizeof(struct raft_entry_local_data); + } } rv = uvEnsureSegmentBufferIsLargeEnough(b, b->n + size); @@ -770,9 +774,9 @@ int uvSegmentBufferAppend(struct uvSegmentBuffer *b, /* Batch header */ header = cursor; - uvEncodeBatchHeader(entries, n_entries, cursor, true /* encode local data */); - crc1 = byteCrc32(header, uvSizeofBatchHeader(n_entries, true), 0); - cursor = (uint8_t *)cursor + uvSizeofBatchHeader(n_entries, true); + uvEncodeBatchHeader(entries, n_entries, cursor, format_version); + crc1 = byteCrc32(header, uvSizeofBatchHeader(n_entries, format_version), 0); + cursor = (uint8_t *)cursor + uvSizeofBatchHeader(n_entries, format_version); /* Batch data */ crc2 = 0; @@ -784,12 +788,12 @@ int uvSegmentBufferAppend(struct uvSegmentBuffer *b, cursor = (uint8_t *)cursor + entry->buf.len; static_assert(sizeof(entry->local_data.buf) % sizeof(uint64_t) == 0, "bad size for entry local data"); -#ifdef DQLITE_NEXT - size_t local_data_size = sizeof(entry->local_data.buf); - memcpy(cursor, entry->local_data.buf, local_data_size); - crc2 = byteCrc32(cursor, local_data_size, crc2); - cursor = (uint8_t *)cursor + local_data_size; -#endif + if (format_version == RAFT_UV_FORMAT_V2) { + size_t local_data_size = sizeof(entry->local_data.buf); + memcpy(cursor, entry->local_data.buf, local_data_size); + crc2 = byteCrc32(cursor, local_data_size, crc2); + cursor = (uint8_t *)cursor + local_data_size; + } } bytePut32(&crc1_p, crc1); @@ -1030,12 +1034,12 @@ static int uvWriteClosedSegment(struct uv *uv, * block */ cap = uv->block_size - (sizeof(uint64_t) /* Format version */ + - sizeof(uint64_t) /* Checksums */ + uvSizeofBatchHeader(1, true /* include local bufs */)); + sizeof(uint64_t) /* Checksums */ + uvSizeofBatchHeader(1, uv->format_version)); if (conf->len > cap) { return RAFT_TOOBIG; } - uvSegmentBufferInit(&buf, uv->block_size); + uvSegmentBufferInit(&buf, uv->format_version, uv->block_size); rv = uvSegmentBufferFormat(&buf); if (rv != 0) { @@ -1046,7 +1050,7 @@ static int uvWriteClosedSegment(struct uv *uv, entry.type = RAFT_CHANGE; entry.buf = *conf; - rv = uvSegmentBufferAppend(&buf, &entry, 1); + rv = uvSegmentBufferAppend(&buf, &entry, 1, uv->format_version); if (rv != 0) { uvSegmentBufferClose(&buf); return rv; @@ -1139,14 +1143,14 @@ int uvSegmentTruncate(struct uv *uv, assert(index - segment->first_index < n); m = (unsigned)(index - segment->first_index); - uvSegmentBufferInit(&buf, uv->block_size); + uvSegmentBufferInit(&buf, uv->format_version, uv->block_size); rv = uvSegmentBufferFormat(&buf); if (rv != 0) { goto out_after_buffer_init; } - rv = uvSegmentBufferAppend(&buf, entries, m); + rv = uvSegmentBufferAppend(&buf, entries, m, uv->format_version); if (rv != 0) { goto out_after_buffer_init; } diff --git a/src/raft/uv_snapshot.c b/src/raft/uv_snapshot.c index 343d1ce07..629f647e8 100644 --- a/src/raft/uv_snapshot.c +++ b/src/raft/uv_snapshot.c @@ -249,7 +249,7 @@ static int uvSnapshotLoadMeta(struct uv *uv, } format = byteFlip64(header[0]); - if (format != UV__DISK_FORMAT) { + if (format != uv->format_version) { tracef("load %s: unsupported format %ju", info->filename, format); rv = RAFT_MALFORMED; @@ -676,7 +676,7 @@ int UvSnapshotPut(struct raft_io *io, } cursor = put->meta.header; - bytePut64(&cursor, UV__DISK_FORMAT); + bytePut64(&cursor, uv->format_version); bytePut64(&cursor, 0); bytePut64(&cursor, snapshot->configuration_index); bytePut64(&cursor, put->meta.bufs[1].len); diff --git a/src/server.c b/src/server.c index 7e44b1c49..ace9629ad 100644 --- a/src/server.c +++ b/src/server.c @@ -56,12 +56,14 @@ static void state_cb(struct raft *r, int dqlite__init(struct dqlite_node *d, dqlite_node_id id, const char *address, - const char *dir) + const char *dir, + bool disk_mode) { - int rv; char db_dir_path[1024]; int urandom; ssize_t count; + int rv; + int rv2; d->initialized = false; d->lock_fd = -1; @@ -80,8 +82,14 @@ int dqlite__init(struct dqlite_node *d, "config__init(rv:%d)", rv); goto err; } + d->config.disk = disk_mode; rv = VfsInit(&d->vfs, d->config.name); - sqlite3_vfs_register(&d->vfs, 0); + if (disk_mode) { + rv2 = dqlite_vfs_enable_disk(&d->vfs); + assert(rv2 == 0); + } + rv2 = sqlite3_vfs_register(&d->vfs, 0 /* not default */); + assert(rv2 == SQLITE_OK); if (rv != 0) { goto err_after_config_init; } @@ -94,16 +102,17 @@ int dqlite__init(struct dqlite_node *d, rv = DQLITE_ERROR; goto err_after_vfs_init; } -#ifdef DQLITE_NEXT - rv = pool_init(&d->pool, &d->loop, d->config.pool_thread_count, - POOL_QOS_PRIO_FAIR); - if (rv != 0) { - snprintf(d->errmsg, DQLITE_ERRMSG_BUF_SIZE, "pool_init(): %s", - uv_strerror(rv)); - rv = DQLITE_ERROR; - goto err_after_loop_init; + if (disk_mode) { + rv = pool_init(&d->pool, &d->loop, d->config.pool_thread_count, + POOL_QOS_PRIO_FAIR); + if (rv != 0) { + snprintf(d->errmsg, DQLITE_ERRMSG_BUF_SIZE, "pool_init(): %s", + uv_strerror(rv)); + rv = DQLITE_ERROR; + goto err_after_loop_init; + } } -#endif + rv = raftProxyInit(&d->raft_transport, &d->loop); if (rv != 0) { goto err_after_pool_init; @@ -115,7 +124,16 @@ int dqlite__init(struct dqlite_node *d, rv = DQLITE_ERROR; goto err_after_raft_transport_init; } - rv = fsm__init(&d->raft_fsm, &d->config, &d->registry); +#ifndef USE_SYSTEM_RAFT + raft_uv_set_format_version(&d->raft_io, + disk_mode ? RAFT_UV_FORMAT_V2 : RAFT_UV_FORMAT_V1); +#endif + + if (disk_mode) { + rv = fsm__init_disk(&d->raft_fsm, &d->config, &d->registry); + } else { + rv = fsm__init(&d->raft_fsm, &d->config, &d->registry); + } if (rv != 0) { goto err_after_raft_io_init; } @@ -190,11 +208,11 @@ int dqlite__init(struct dqlite_node *d, err_after_raft_transport_init: raftProxyClose(&d->raft_transport); err_after_pool_init: -#ifdef DQLITE_NEXT - pool_close(&d->pool); - pool_fini(&d->pool); + if (disk_mode) { + pool_close(&d->pool); + pool_fini(&d->pool); + } err_after_loop_init: -#endif uv_loop_close(&d->loop); err_after_vfs_init: VfsClose(&d->vfs); @@ -222,9 +240,9 @@ void dqlite__close(struct dqlite_node *d) // the TODO above referencing the cleanup logic without running the // node. See https://github.com/canonical/dqlite/issues/504. -#ifdef DQLITE_NEXT - pool_fini(&d->pool); -#endif + if (d->config.disk) { + pool_fini(&d->pool); + } uv_loop_close(&d->loop); raftProxyClose(&d->raft_transport); registry__close(&d->registry); @@ -246,7 +264,20 @@ int dqlite_node_create(dqlite_node_id id, return DQLITE_NOMEM; } - return dqlite__init(*t, id, address, data_dir); + return dqlite__init(*t, id, address, data_dir, false); +} + +int dqlite_node_create_v2(dqlite_node_id id, + const char *address, + const char *data_dir, + dqlite_node **t) +{ + *t = sqlite3_malloc(sizeof **t); + if (*t == NULL) { + return DQLITE_NOMEM; + } + + return dqlite__init(*t, id, address, data_dir, true); } int dqlite_node_set_bind_address(dqlite_node *t, const char *address) @@ -443,28 +474,10 @@ int dqlite_node_set_block_size(dqlite_node *n, size_t size) raft_uv_set_block_size(&n->raft_io, size); return 0; } + int dqlite_node_enable_disk_mode(dqlite_node *n) { - int rv; - - if (n->running) { - return DQLITE_MISUSE; - } - - rv = dqlite_vfs_enable_disk(&n->vfs); - if (rv != 0) { - return rv; - } - - n->registry.config->disk = true; - - /* Close the default fsm and initialize the disk one. */ - fsm__close(&n->raft_fsm); - rv = fsm__init_disk(&n->raft_fsm, &n->config, &n->registry); - if (rv != 0) { - return rv; - } - + (void)n; return 0; } @@ -559,9 +572,9 @@ static void stopCb(uv_async_t *stop) tracef("not running or already stopped"); return; } -#ifdef DQLITE_NEXT - pool_close(&d->pool); -#endif + if (d->config.disk) { + pool_close(&d->pool); + } if (d->role_management) { rv = uv_timer_stop(&d->timer); assert(rv == 0); diff --git a/src/server.h b/src/server.h index 443da461b..bae14a667 100644 --- a/src/server.h +++ b/src/server.h @@ -98,7 +98,8 @@ struct dqlite_server { int dqlite__init(struct dqlite_node *d, dqlite_node_id id, const char *address, - const char *dir); + const char *dir, + bool disk_mode); void dqlite__close(struct dqlite_node *d); diff --git a/test/integration/test_fsm.c b/test/integration/test_fsm.c index d735d1c9b..d4e97ff00 100644 --- a/test/integration/test_fsm.c +++ b/test/integration/test_fsm.c @@ -6,6 +6,7 @@ #include "../lib/runner.h" #include "../lib/server.h" #include "../lib/sqlite.h" +#include "../lib/util.h" /****************************************************************************** * @@ -97,11 +98,7 @@ TEST(fsm, snapshotFreshDb, setUp, tearDown, 0, snapshot_params) unsigned n_bufs = 0; int rv; - bool disk_mode = false; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } + bool disk_mode = param_get_bool(params, "disk_mode"); rv = fsm->snapshot(fsm, &bufs, &n_bufs); munit_assert_int(rv, ==, 0); @@ -131,11 +128,7 @@ TEST(fsm, snapshotWrittenDb, setUp, tearDown, 0, snapshot_params) uint64_t last_insert_id; uint64_t rows_affected; - bool disk_mode = false; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } + bool disk_mode = param_get_bool(params, "disk_mode"); /* Add some data to database */ HANDSHAKE; @@ -173,11 +166,7 @@ TEST(fsm, snapshotHeapFaultSingleDB, setUp, tearDown, 0, snapshot_params) uint64_t last_insert_id; uint64_t rows_affected; - bool disk_mode = false; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } + bool disk_mode = param_get_bool(params, "disk_mode"); /* Add some data to database */ HANDSHAKE; @@ -229,11 +218,7 @@ TEST(fsm, uint64_t last_insert_id; uint64_t rows_affected; - bool disk_mode = false; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } + bool disk_mode = param_get_bool(params, "disk_mode"); if (!disk_mode) { return MUNIT_SKIP; @@ -276,11 +261,7 @@ TEST(fsm, snapshotHeapFaultTwoDB, setUp, tearDown, 0, snapshot_params) uint64_t last_insert_id; uint64_t rows_affected; - bool disk_mode = false; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } + bool disk_mode = param_get_bool(params, "disk_mode"); /* Open 2 databases and add data to them */ HANDSHAKE; @@ -344,12 +325,7 @@ TEST(fsm, snapshotHeapFaultTwoDBAsync, setUp, tearDown, 0, snapshot_params) uint64_t last_insert_id; uint64_t rows_affected; - bool disk_mode = false; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } - + bool disk_mode = param_get_bool(params, "disk_mode"); if (!disk_mode) { return MUNIT_SKIP; } @@ -414,11 +390,7 @@ TEST(fsm, snapshotNewDbAddedBeforeFinalize, setUp, tearDown, 0, snapshot_params) uint64_t last_insert_id; uint64_t rows_affected; - bool disk_mode = false; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } + bool disk_mode = param_get_bool(params, "disk_mode"); /* Add some data to database */ HANDSHAKE; @@ -468,11 +440,7 @@ TEST(fsm, snapshotWritesBeforeFinalize, setUp, tearDown, 0, snapshot_params) char sql[128]; int rv; - bool disk_mode = false; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } + bool disk_mode = param_get_bool(params, "disk_mode"); /* Add some data to database */ HANDSHAKE; @@ -523,11 +491,7 @@ TEST(fsm, concurrentSnapshots, setUp, tearDown, 0, snapshot_params) uint64_t rows_affected; int rv; - bool disk_mode = false; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } + bool disk_mode = param_get_bool(params, "disk_mode"); /* Add some data to database */ HANDSHAKE; @@ -619,11 +583,7 @@ TEST(fsm, snapshotRestore, setUp, tearDown, 0, restore_params) int rv; char sql[128]; - bool disk_mode = false; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } + bool disk_mode = param_get_bool(params, "disk_mode"); /* Add some data to database */ HANDSHAKE; @@ -689,11 +649,7 @@ TEST(fsm, snapshotRestoreMultipleDBs, setUp, tearDown, 0, snapshot_params) char *msg; int rv; - bool disk_mode = false; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } + bool disk_mode = param_get_bool(params, "disk_mode"); /* Create 2 databases and add data to them. */ HANDSHAKE; diff --git a/test/integration/test_node.c b/test/integration/test_node.c index 0620fd0f4..bf89ed55a 100644 --- a/test/integration/test_node.c +++ b/test/integration/test_node.c @@ -3,6 +3,7 @@ #include "../lib/runner.h" #include "../lib/server.h" #include "../lib/sqlite.h" +#include "../lib/util.h" #include "../../include/dqlite.h" #include "../../src/protocol.h" @@ -37,20 +38,15 @@ static void *setUp(const MunitParameter params[], void *user_data) f->dir = test_dir_setup(); - rv = dqlite_node_create(1, "1", f->dir, &f->node); + bool disk_mode = param_get_bool(params, "disk_mode"); + + rv = disk_mode ? dqlite_node_create_v2(1, "1", f->dir, &f->node) + : dqlite_node_create(1, "1", f->dir, &f->node); munit_assert_int(rv, ==, 0); rv = dqlite_node_set_bind_address(f->node, "@123"); munit_assert_int(rv, ==, 0); - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - bool disk_mode = (bool)atoi(disk_mode_param); - if (disk_mode) { - rv = dqlite_node_enable_disk_mode(f->node); - munit_assert_int(rv, ==, 0); - } - } return f; } @@ -64,21 +60,15 @@ static void *setUpInet(const MunitParameter params[], void *user_data) f->dir = test_dir_setup(); - rv = dqlite_node_create(1, "1", f->dir, &f->node); + bool disk_mode = param_get_bool(params, "disk_mode"); + + rv = disk_mode ? dqlite_node_create_v2(1, "1", f->dir, &f->node) + : dqlite_node_create(1, "1", f->dir, &f->node); munit_assert_int(rv, ==, 0); rv = dqlite_node_set_bind_address(f->node, "127.0.0.1:9001"); munit_assert_int(rv, ==, 0); - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - bool disk_mode = (bool)atoi(disk_mode_param); - if (disk_mode) { - rv = dqlite_node_enable_disk_mode(f->node); - munit_assert_int(rv, ==, 0); - } - } - return f; } @@ -98,21 +88,15 @@ static void *setUpForRecovery(const MunitParameter params[], void *user_data) startStopNode(f); dqlite_node_destroy(f->node); - rv = dqlite_node_create(1, "1", f->dir, &f->node); + bool disk_mode = param_get_bool(params, "disk_mode"); + + rv = disk_mode ? dqlite_node_create_v2(1, "1", f->dir, &f->node) + : dqlite_node_create(1, "1", f->dir, &f->node); munit_assert_int(rv, ==, 0); rv = dqlite_node_set_bind_address(f->node, "@123"); munit_assert_int(rv, ==, 0); - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - bool disk_mode = (bool)atoi(disk_mode_param); - if (disk_mode) { - rv = dqlite_node_enable_disk_mode(f->node); - munit_assert_int(rv, ==, 0); - } - } - return f; } diff --git a/test/integration/test_vfs.c b/test/integration/test_vfs.c index 3335c228c..c4c41a818 100644 --- a/test/integration/test_vfs.c +++ b/test/integration/test_vfs.c @@ -4,6 +4,7 @@ #include "../lib/heap.h" #include "../lib/runner.h" #include "../lib/sqlite.h" +#include "../lib/util.h" #include "../../include/dqlite.h" #include "../../src/raft.h" @@ -44,15 +45,11 @@ static void *setUp(const MunitParameter params[], void *user_data) sprintf(f->names[i], "%u", i + 1); rv = dqlite_vfs_init(&f->vfs[i], f->names[i]); munit_assert_int(rv, ==, 0); - const char *disk_mode_param = - munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - bool disk_mode = (bool)atoi(disk_mode_param); - if (disk_mode) { - f->dirs[i] = test_dir_setup(); - rv = dqlite_vfs_enable_disk(&f->vfs[i]); - munit_assert_int(rv, ==, 0); - } + bool disk_mode = param_get_bool(params, "disk_mode"); + if (disk_mode) { + f->dirs[i] = test_dir_setup(); + rv = dqlite_vfs_enable_disk(&f->vfs[i]); + munit_assert_int(rv, ==, 0); } rv = sqlite3_vfs_register(&f->vfs[i], 0); munit_assert_int(rv, ==, 0); diff --git a/test/lib/server.c b/test/lib/server.c index 89d4060cb..5abe09289 100644 --- a/test/lib/server.c +++ b/test/lib/server.c @@ -2,7 +2,7 @@ #include "fs.h" #include "server.h" - +#include "util.h" static int endpointConnect(void *data, const char *address, int *fd) { @@ -62,7 +62,10 @@ void test_server_start(struct test_server *s, const MunitParameter params[]) { int rv; - rv = dqlite_node_create(s->id, s->address, s->dir, &s->dqlite); + bool disk_mode = param_get_bool(params, "disk_mode"); + + rv = disk_mode ? dqlite_node_create_v2(s->id, s->address, s->dir, &s->dqlite) + : dqlite_node_create(s->id, s->address, s->dir, &s->dqlite); munit_assert_int(rv, ==, 0); rv = dqlite_node_set_bind_address(s->dqlite, s->address); @@ -93,15 +96,6 @@ void test_server_start(struct test_server *s, const MunitParameter params[]) munit_assert_int(rv, ==, 0); } - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - bool disk_mode = (bool)atoi(disk_mode_param); - if (disk_mode) { - rv = dqlite_node_enable_disk_mode(s->dqlite); - munit_assert_int(rv, ==, 0); - } - } - const char *target_voters_param = munit_parameters_get(params, "target_voters"); if (target_voters_param != NULL) { diff --git a/test/lib/util.h b/test/lib/util.h index f92b957b0..018547378 100644 --- a/test/lib/util.h +++ b/test/lib/util.h @@ -24,4 +24,15 @@ } \ } while (0) +/** + * Parse a numeric test parameter as a boolean. The result is false + * if the parameter is unset or set to "0", and true otherwise. + */ +static inline bool param_get_bool(const MunitParameter *params, + const char *name) +{ + const char *param = munit_parameters_get(params, name); + return param != NULL && (bool)atoi(param); +} + #endif /* TEST_UTIL_H */ diff --git a/test/raft/integration/test_uv_init.c b/test/raft/integration/test_uv_init.c index eae17fb11..1eb92be1f 100644 --- a/test/raft/integration/test_uv_init.c +++ b/test/raft/integration/test_uv_init.c @@ -16,6 +16,13 @@ * *****************************************************************************/ +static char *format_versions[] = {"1", "2", NULL}; + +static MunitParameterEnum format_params[] = { + {"format_version", format_versions}, + {NULL, NULL}, +}; + struct fixture { FIXTURE_UV_DEPS; @@ -41,6 +48,7 @@ static void closeCb(struct raft_io *io) int _rv; \ _rv = raft_uv_init(&f->io, &f->loop, DIR, &f->transport); \ munit_assert_int(_rv, ==, 0); \ + raft_uv_set_format_version(&f->io, f->format_version); \ _rv = f->io.init(&f->io, 1, "1"); \ munit_assert_int(_rv, ==, 0); \ } while (0) @@ -60,6 +68,7 @@ static void closeCb(struct raft_io *io) int _rv; \ _rv = raft_uv_init(&f->io, &f->loop, DIR, &f->transport); \ munit_assert_int(_rv, ==, 0); \ + raft_uv_set_format_version(&f->io, f->format_version); \ _rv = f->io.init(&f->io, 1, "1"); \ munit_assert_int(_rv, ==, RV); \ munit_assert_string_equal(f->io.errmsg, ERRMSG); \ @@ -102,6 +111,8 @@ static void *setUp(const MunitParameter params[], void *user_data) SETUP_UV_DEPS; f->io.data = f; f->closed = false; + const char *format_version = munit_parameters_get(params, "format_version"); + f->format_version = format_version != NULL ? atoi(format_version) : 1; return f; } @@ -228,21 +239,22 @@ TEST(init, metadataOneBadFormat, setUp, tearDown, 0, NULL) { struct fixture *f = data; WRITE_METADATA_FILE(1, /* Metadata file index */ - BAD_FORMAT, /* Format */ + BAD_FORMAT, 1, /* Version */ 1, /* Term */ 0 /* Voted for */); INIT_ERROR(f->dir, RAFT_MALFORMED, - "decode content of metadata1: bad format version " BAD_FORMAT_STR); + "decode content of metadata1: bad format version " + BAD_FORMAT_STR); return MUNIT_OK; } /* The metadata1 file has not a valid version. */ -TEST(init, metadataOneBadVersion, setUp, tearDown, 0, NULL) +TEST(init, metadataOneBadVersion, setUp, tearDown, 0, format_params) { struct fixture *f = data; WRITE_METADATA_FILE(1, /* Metadata file index */ - UV__DISK_FORMAT, /* Format */ + f->format_version, 0, /* Version */ 1, /* Term */ 0 /* Voted for */); @@ -253,16 +265,16 @@ TEST(init, metadataOneBadVersion, setUp, tearDown, 0, NULL) /* The data directory has both metadata files, but they have the same * version. */ -TEST(init, metadataOneAndTwoSameVersion, setUp, tearDown, 0, NULL) +TEST(init, metadataOneAndTwoSameVersion, setUp, tearDown, 0, format_params) { struct fixture *f = data; WRITE_METADATA_FILE(1, /* Metadata file index */ - UV__DISK_FORMAT, /* Format */ + f->format_version, 2, /* Version */ 3, /* Term */ 0 /* Voted for */); WRITE_METADATA_FILE(2, /* Metadata file index */ - UV__DISK_FORMAT, /* Format */ + f->format_version, 2, /* Version */ 2, /* Term */ 0 /* Voted for */); diff --git a/test/raft/integration/test_uv_load.c b/test/raft/integration/test_uv_load.c index 4427e34ac..3be700dd0 100644 --- a/test/raft/integration/test_uv_load.c +++ b/test/raft/integration/test_uv_load.c @@ -2,7 +2,6 @@ #include "../../../src/raft/byte.h" #include "../../../src/raft/uv.h" -#include "../../../src/raft/uv_encoding.h" #include "../lib/runner.h" #include "../lib/uv.h" @@ -12,6 +11,13 @@ * *****************************************************************************/ +static char *format_versions[] = {"1", "2", NULL}; + +static MunitParameterEnum format_params[] = { + {"format_version", format_versions}, + {NULL, NULL}, +}; + struct fixture { FIXTURE_UV_DEPS; @@ -108,6 +114,7 @@ struct snapshot munit_assert_int(_rv, ==, 0); \ raft_uv_set_block_size(&_io, SEGMENT_BLOCK_SIZE); \ raft_uv_set_segment_size(&_io, SEGMENT_SIZE); \ + raft_uv_set_format_version(&_io, f->format_version); \ _rv = _io.load(&_io, &_term, &_voted_for, &_snapshot, &_start_index, \ &_entries, &_n); \ munit_assert_int(_rv, ==, 0); \ @@ -190,6 +197,7 @@ struct snapshot munit_assert_int(_rv, ==, 0); \ raft_uv_set_block_size(&_io, SEGMENT_BLOCK_SIZE); \ raft_uv_set_segment_size(&_io, SEGMENT_SIZE); \ + raft_uv_set_format_version(&_io, f->format_version); \ _rv = _io.load(&_io, &_term, &_voted_for, &_snapshot, &_start_index, \ &_entries, &_n); \ munit_assert_int(_rv, ==, 0); \ @@ -378,6 +386,8 @@ static void *setUp(const MunitParameter params[], void *user_data) { struct fixture *f = munit_malloc(sizeof *f); SETUP_UV_DEPS; + const char *format_version = munit_parameters_get(params, "format_version"); + f->format_version = format_version != NULL ? atoi(format_version) : 1; return f; } @@ -575,27 +585,41 @@ TEST(load, openSegmentWithIncompleteBatch, setUp, tearDown, 0, NULL) /* The data directory has an open segment whose first batch is only * partially written. In that case the segment gets removed. */ -TEST(load, openSegmentWithIncompleteFirstBatch, setUp, tearDown, 0, NULL) +TEST(load, openSegmentWithIncompleteFirstBatch, setUp, tearDown, 0, format_params) { struct fixture *f = data; + + uint8_t v = (uint8_t)f->format_version; uint8_t buf[5 * WORD_SIZE] = { - UV__DISK_FORMAT, 0, 0, 0, 0, 0, 0, 0, /* Format version */ + v, 0, 0, 0, 0, 0, 0, 0, /* Format version */ 0, 0, 0, 0, 0, 0, 0, 0, /* CRC32 checksums */ 0, 0, 0, 0, 0, 0, 0, 0, /* Number of entries */ - 0, 0, 0, 0, 0, 0, 0, 0, /* Local data size */ - 0, 0, 0, 0, 0, 0, 0, 0 /* Batch data */ + 0, 0, 0, 0, 0, 0, 0, 0, /* Batch data (v1) or local data size (v2) */ + 0, 0, 0, 0, 0, 0, 0, 0 /* Batch data (v2) */ }; + size_t buf_len; + switch (v) { + case RAFT_UV_FORMAT_V1: + buf_len = 4 * WORD_SIZE; + break; + case RAFT_UV_FORMAT_V2: + buf_len = 5 * WORD_SIZE; + break; + default: + munit_error("impossible"); + } + APPEND(1, 1); UNFINALIZE(1, 1, 1); - DirOverwriteFile(f->dir, "open-1", buf, sizeof buf, 0); + DirOverwriteFile(f->dir, "open-1", buf, buf_len, 0); - LOAD(0, /* term */ - 0, /* voted for */ - NULL, /* snapshot */ - 1, /* start index */ - 0, /* data for first loaded entry */ - 0 /* n entries */ + LOAD(0, /* term */ + 0, /* voted for */ + NULL, /* snapshot */ + 1, /* start index */ + 0, /* data for first loaded entry */ + 0 /* n entries */ ); return MUNIT_OK; @@ -1621,7 +1645,7 @@ TEST(load, openSegmentWithIncompletePreamble, setUp, tearDown, 0, NULL) } /* The data directory has an open segment which has incomplete batch header. */ -TEST(load, openSegmentWithIncompleteBatchHeader, setUp, tearDown, 0, NULL) +TEST(load, openSegmentWithIncompleteBatchHeader, setUp, tearDown, 0, format_params) { struct fixture *f = data; size_t offset = WORD_SIZE + /* Format version */ @@ -1632,21 +1656,25 @@ TEST(load, openSegmentWithIncompleteBatchHeader, setUp, tearDown, 0, NULL) APPEND(1, 1); UNFINALIZE(1, 1, 1); DirTruncateFile(f->dir, "open-1", offset); -#ifdef DQLITE_NEXT - const char *msg = - "load open segment open-1: entries batch 1 starting at byte 8: " - "read header: short read: 8 bytes instead of 24"; -#else - const char *msg = - "load open segment open-1: entries batch 1 starting at byte 8: " - "read header: short read: 8 bytes instead of 16"; -#endif + const char *msg; + switch (f->format_version) { + case RAFT_UV_FORMAT_V1: + msg = "load open segment open-1: entries batch 1 starting at byte 8: " + "read header: short read: 8 bytes instead of 16"; + break; + case RAFT_UV_FORMAT_V2: + msg = "load open segment open-1: entries batch 1 starting at byte 8: " + "read header: short read: 8 bytes instead of 24"; + break; + default: + munit_error("impossible"); + } LOAD_ERROR(RAFT_IOERR, msg); return MUNIT_OK; } /* The data directory has an open segment which has incomplete batch data. */ -TEST(load, openSegmentWithIncompleteBatchData, setUp, tearDown, 0, NULL) +TEST(load, openSegmentWithIncompleteBatchData, setUp, tearDown, 0, format_params) { struct fixture *f = data; size_t offset = WORD_SIZE + /* Format version */ @@ -1656,23 +1684,27 @@ TEST(load, openSegmentWithIncompleteBatchData, setUp, tearDown, 0, NULL) WORD_SIZE + /* Entry type and data size */ WORD_SIZE / 2 /* Partial entry data */; -#ifdef DQLITE_NEXT - offset += WORD_SIZE; /* Local data size */ -#endif + if (f->format_version == RAFT_UV_FORMAT_V2) { + offset += WORD_SIZE; + } APPEND(1, 1); UNFINALIZE(1, 1, 1); DirTruncateFile(f->dir, "open-1", offset); -#ifdef DQLITE_NEXT - const char *msg = - "load open segment open-1: entries batch 1 starting at byte 8: " - "read data: short read: 4 bytes instead of 24"; -#else - const char *msg = - "load open segment open-1: entries batch 1 starting at byte 8: " - "read data: short read: 4 bytes instead of 8"; -#endif + const char *msg; + switch (f->format_version) { + case RAFT_UV_FORMAT_V1: + msg = "load open segment open-1: entries batch 1 starting at byte 8: " + "read data: short read: 4 bytes instead of 8"; + break; + case RAFT_UV_FORMAT_V2: + msg = "load open segment open-1: entries batch 1 starting at byte 8: " + "read data: short read: 4 bytes instead of 24"; + break; + default: + munit_error("impossible"); + } LOAD_ERROR(RAFT_IOERR, msg); return MUNIT_OK; } diff --git a/test/raft/integration/test_uv_set_term.c b/test/raft/integration/test_uv_set_term.c index 63381a011..910c1f89c 100644 --- a/test/raft/integration/test_uv_set_term.c +++ b/test/raft/integration/test_uv_set_term.c @@ -1,6 +1,5 @@ #include "../../../src/raft.h" #include "../../../src/raft/byte.h" -#include "../../../src/raft/uv_encoding.h" #include "../lib/runner.h" #include "../lib/uv.h" @@ -10,6 +9,13 @@ * *****************************************************************************/ +static char *format_versions[] = {"1", "2", NULL}; + +static MunitParameterEnum format_params[] = { + {"format_version", format_versions}, + {NULL, NULL}, +}; + struct fixture { FIXTURE_UV_DEPS; @@ -35,6 +41,7 @@ static void closeCb(struct raft_io *io) int _rv; \ _rv = raft_uv_init(&f->io, &f->loop, f->dir, &f->transport); \ munit_assert_int(_rv, ==, 0); \ + raft_uv_set_format_version(&f->io, f->format_version); \ _rv = f->io.init(&f->io, 1, "1"); \ munit_assert_int(_rv, ==, 0); \ } while (0) @@ -89,7 +96,7 @@ static void closeCb(struct raft_io *io) char filename[strlen("metadataN") + 1]; \ sprintf(filename, "metadata%d", N); \ DirReadFile(f->dir, filename, buf2, sizeof buf2); \ - munit_assert_int(byteGet64(&cursor), ==, UV__DISK_FORMAT); \ + munit_assert_int(byteGet64(&cursor), ==, f->format_version); \ munit_assert_int(byteGet64(&cursor), ==, VERSION); \ munit_assert_int(byteGet64(&cursor), ==, TERM); \ munit_assert_int(byteGet64(&cursor), ==, VOTED_FOR); \ @@ -107,6 +114,8 @@ static void *setUpDeps(const MunitParameter params[], void *user_data) SETUP_UV_DEPS; f->io.data = f; f->closed = false; + const char *format_version = munit_parameters_get(params, "format_version"); + f->format_version = format_version != NULL ? atoi(format_version) : 1; return f; } @@ -181,11 +190,11 @@ TEST(set_term, fourth, setUp, tearDown, 0, NULL) /* If the data directory has a single metadata1 file, the first time set_data() * is called, the second metadata file gets created. */ -TEST(set_term, metadataOneExists, setUpDeps, tearDown, 0, NULL) +TEST(set_term, metadataOneExists, setUpDeps, tearDown, 0, format_params) { struct fixture *f = data; WRITE_METADATA_FILE(1, /* Metadata file index */ - UV__DISK_FORMAT, /* Format */ + f->format_version, 1, /* Version */ 1, /* Term */ 0 /* Voted for */); @@ -197,16 +206,16 @@ TEST(set_term, metadataOneExists, setUpDeps, tearDown, 0, NULL) } /* The data directory has both metadata files, but metadata1 is greater. */ -TEST(set_term, metadataOneIsGreater, setUpDeps, tearDown, 0, NULL) +TEST(set_term, metadataOneIsGreater, setUpDeps, tearDown, 0, format_params) { struct fixture *f = data; WRITE_METADATA_FILE(1, /* Metadata file index */ - UV__DISK_FORMAT, /* Format */ + f->format_version, 3, /* Version */ 3, /* Term */ 0 /* Voted for */); WRITE_METADATA_FILE(2, /* Metadata file index */ - UV__DISK_FORMAT, /* Format */ + f->format_version, 2, /* Version */ 2, /* Term */ 0 /* Voted for */); @@ -220,16 +229,16 @@ TEST(set_term, metadataOneIsGreater, setUpDeps, tearDown, 0, NULL) } /* The data directory has both metadata files, but metadata2 is greater. */ -TEST(set_term, metadataTwoIsGreater, setUpDeps, tearDown, 0, NULL) +TEST(set_term, metadataTwoIsGreater, setUpDeps, tearDown, 0, format_params) { struct fixture *f = data; WRITE_METADATA_FILE(1, /* Metadata file index */ - UV__DISK_FORMAT, /* Format */ + f->format_version, 1, /* Version */ 1, /* Term */ 0 /* Voted for */); WRITE_METADATA_FILE(2, /* Metadata file index */ - UV__DISK_FORMAT, /* Format */ + f->format_version, 2, /* Version */ 2, /* Term */ 0 /* Voted for */); diff --git a/test/raft/lib/uv.h b/test/raft/lib/uv.h index 7fdcdd08b..6f0a640f5 100644 --- a/test/raft/lib/uv.h +++ b/test/raft/lib/uv.h @@ -34,13 +34,18 @@ TEAR_DOWN_HEAP; \ TEAR_DOWN_DIR -#define FIXTURE_UV struct raft_io io +#define FIXTURE_UV \ + struct raft_io io; \ + int format_version #define SETUP_UV \ do { \ int rv_; \ rv_ = raft_uv_init(&f->io, &f->loop, f->dir, &f->transport); \ munit_assert_int(rv_, ==, 0); \ + const char *format_version_ = munit_parameters_get(params, "format_version"); \ + f->format_version = format_version_ != NULL ? atoi(format_version_) : 1; \ + raft_uv_set_format_version(&f->io, f->format_version); \ raft_uv_set_auto_recovery(&f->io, false); \ rv_ = f->io.init(&f->io, 1, "127.0.0.1:9001"); \ munit_assert_int(rv_, ==, 0); \ diff --git a/test/unit/test_vfs.c b/test/unit/test_vfs.c index cfd103d6d..fdfacf322 100644 --- a/test/unit/test_vfs.c +++ b/test/unit/test_vfs.c @@ -5,11 +5,11 @@ #include "../../include/dqlite.h" -#include "../lib/config.h" #include "../lib/fs.h" #include "../lib/heap.h" #include "../lib/runner.h" #include "../lib/sqlite.h" +#include "../lib/util.h" #include "../../src/format.h" #include "../../src/raft.h" @@ -56,7 +56,7 @@ static void setPageSizeDisk(const MunitParameter params[], int rv) { int rc; - bool disk_mode = false; + bool disk_mode; char page_sz[32]; rc = snprintf(page_sz, sizeof(page_sz), "%u", page_size); munit_assert_int(rc, >, 0); @@ -69,10 +69,7 @@ static void setPageSizeDisk(const MunitParameter params[], "", }; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } + disk_mode = param_get_bool(params, "disk_mode"); if (disk_mode) { rc = f->pMethods->xFileControl(f, SQLITE_FCNTL_PRAGMA, fnctl); munit_assert_int(rc, ==, rv); @@ -83,17 +80,14 @@ static void *setUp(const MunitParameter params[], void *user_data) { struct fixture *f = munit_malloc(sizeof *f); int rv; - bool disk_mode = false; + bool disk_mode; SETUP_HEAP; SETUP_SQLITE; rv = VfsInit(&f->vfs, "dqlite"); munit_assert_int(rv, ==, 0); f->dir = NULL; - const char *disk_mode_param = munit_parameters_get(params, "disk_mode"); - if (disk_mode_param != NULL) { - disk_mode = (bool)atoi(disk_mode_param); - } + disk_mode = param_get_bool(params, "disk_mode"); if (disk_mode) { rv = VfsEnableDisk(&f->vfs); munit_assert_int(rv, ==, 0);