Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove DQLITE_NEXT guard and unify with disk mode #671

Closed
wants to merge 9 commits into from
5 changes: 1 addition & 4 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ jobs:
compiler:
- gcc
- clang
dqlite-next:
- yes
- no
runs-on: ${{ matrix.os }}

steps:
Expand All @@ -35,7 +32,7 @@ jobs:
run: |
autoreconf -i
./configure --enable-debug --enable-code-coverage --enable-sanitize \
--enable-build-raft --enable-dqlite-next=${{ matrix.dqlite-next }}
--enable-build-raft
make -j4 unit-test integration-test \
raft-core-fuzzy-test \
raft-core-integration-test \
Expand Down
4 changes: 0 additions & 4 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ AM_CFLAGS += $(CODE_COVERAGE_CFLAGS)
AM_CFLAGS += $(SQLITE_CFLAGS) $(UV_CFLAGS) $(PTHREAD_CFLAGS)
AM_LDFLAGS = $(UV_LIBS) $(PTHREAD_LIBS)

if DQLITE_NEXT_ENABLED
AM_CFLAGS += -DDQLITE_NEXT
endif

if !BUILD_RAFT_ENABLED
AM_CFLAGS += $(RAFT_CFLAGS) -DUSE_SYSTEM_RAFT
AM_LDFLAGS += $(RAFT_LIBS)
Expand Down
4 changes: 0 additions & 4 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ AM_CONDITIONAL(BUILD_SQLITE_ENABLED, test "x$enable_build_sqlite" = "xyes")
AC_ARG_ENABLE(build-raft, AS_HELP_STRING([--enable-build-raft[=ARG]], [use the bundled raft sources instead of linking to libraft [default=no]]))
AM_CONDITIONAL(BUILD_RAFT_ENABLED, test "x$enable_build_raft" = "xyes")

AC_ARG_ENABLE(dqlite-next, AS_HELP_STRING([--enable-dqlite-next[=ARG]], [build with the experimental dqlite backend [default=no]]))
AM_CONDITIONAL(DQLITE_NEXT_ENABLED, test "x$enable_dqlite_next" = "xyes")
AS_IF([test "x$enable_build_raft" != "xyes" -a "x$enable_dqlite_next" = "xyes"], [AC_MSG_ERROR([dqlite-next requires bundled raft])], [])

# Whether to enable code coverage.
AX_CODE_COVERAGE

Expand Down
35 changes: 29 additions & 6 deletions include/dqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,33 @@ DQLITE_API int dqlite_node_create(dqlite_node_id id,
const char *data_dir,
dqlite_node **n);

/**
* Flags for dqlite_node_create_v2.
*/
enum {
/* Request that the node store its SQLite database and WAL files on
* disk instead of in memory.
*
* This is an incomplete, experimental feature. A node that uses this
* mode cannot use an existing data directory that was created by a
* node using the default, in-memory mode, and vice versa. A node that
* uses this mode is not guaranteed to be able to interoperate with
* nodes using the in-memory mode. */
DQLITE_NODE_CREATE_DISKMODE = 1 << 0,
};

/**
* Create a new dqlite node object, accepting additional options.
*
* This is an experimental API.
*/
DQLITE_API DQLITE_EXPERIMENTAL
int dqlite_node_create_v2(dqlite_node_id id,
const char *addr,
const char *data_dir,
int flags,
dqlite_node **t);

/**
* Destroy a dqlite node object.
*
Expand Down Expand Up @@ -400,12 +427,8 @@ DQLITE_API int dqlite_node_set_snapshot_params(dqlite_node *n,
DQLITE_API int dqlite_node_set_block_size(dqlite_node *n, size_t size);

/**
* WARNING: This is an experimental API.
*
* By default dqlite holds the SQLite database file and WAL in memory. By
* enabling disk-mode, dqlite will hold the SQLite database file on-disk while
* keeping the WAL in memory. Has to be called after `dqlite_node_create` and
* before `dqlite_node_start`.
* This function was formerly used to request that database and WAL files be
* stored on disk, but is now a no-op. It is deprecated.
*/
DQLITE_API int dqlite_node_enable_disk_mode(dqlite_node *n);
cole-miller marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
24 changes: 0 additions & 24 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -554,39 +554,15 @@ static void query_batch_async(struct handle *req, enum pool_half half)
}
}

#ifdef DQLITE_NEXT

static void qb_top(pool_work_t *w)
{
struct handle *req = CONTAINER_OF(w, struct handle, work);
query_batch_async(req, POOL_TOP_HALF);
}

static void qb_bottom(pool_work_t *w)
{
struct handle *req = CONTAINER_OF(w, struct handle, work);
query_batch_async(req, POOL_BOTTOM_HALF);
}

#endif

static void query_batch(struct gateway *g)
{
struct handle *req = g->req;
assert(req != NULL);
g->req = NULL;
req->gw = g;

#ifdef DQLITE_NEXT
struct dqlite_node *node = g->raft->data;
pool_t *pool = !!(pool_ut_fallback()->flags & POOL_FOR_UT)
? pool_ut_fallback() : &node->pool;
pool_queue_work(pool, &req->work, g->leader->db->cookie,
WT_UNORD, qb_top, qb_bottom);
#else
query_batch_async(req, POOL_TOP_HALF);
query_batch_async(req, POOL_BOTTOM_HALF);
#endif
}

static void query_barrier_cb(struct barrier *barrier, int status)
Expand Down
24 changes: 0 additions & 24 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -417,22 +417,6 @@ static void leaderExecV2(struct exec *req, enum pool_half half)
leaderExecDone(l->exec);
}

#ifdef DQLITE_NEXT

static void exec_top(pool_work_t *w)
{
struct exec *req = CONTAINER_OF(w, struct exec, work);
leaderExecV2(req, POOL_TOP_HALF);
}

static void exec_bottom(pool_work_t *w)
{
struct exec *req = CONTAINER_OF(w, struct exec, work);
leaderExecV2(req, POOL_BOTTOM_HALF);
}

#endif

static void execBarrierCb(struct barrier *barrier, int status)
{
tracef("exec barrier cb status %d", status);
Expand All @@ -445,16 +429,8 @@ static void execBarrierCb(struct barrier *barrier, int status)
return;
}

#ifdef DQLITE_NEXT
struct dqlite_node *node = l->raft->data;
pool_t *pool = !!(pool_ut_fallback()->flags & POOL_FOR_UT)
? pool_ut_fallback() : &node->pool;
pool_queue_work(pool, &req->work, l->db->cookie,
WT_UNORD, exec_top, exec_bottom);
#else
leaderExecV2(req, POOL_TOP_HALF);
leaderExecV2(req, POOL_BOTTOM_HALF);
#endif
}

int leader__exec(struct leader *l,
Expand Down
23 changes: 23 additions & 0 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,29 @@ RAFT_API void raft_uv_set_tracer(struct raft_io *io,
*/
RAFT_API void raft_uv_set_auto_recovery(struct raft_io *io, bool flag);

/**
* Values for the format version number that describes the segment and snapshot
* files.
*/
enum raft_uv_format_version {
RAFT_UV_FORMAT_V1 = 1, /* Original */
RAFT_UV_FORMAT_V2, /* Added local_data fields */
RAFT_UV_FORMAT_NR
};

/**
* Select which disk format version should be used.
*
* If this is called after the `init` callback from the raft_io vtable,
* and some metadata files were found on disk, the @version is compared
* to the format version found in the metadata files, and an error is
* returned if they don't match.
*
* RAFT_UV_FORMAT_V1 is used by default.
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
*/
RAFT_API void raft_uv_set_format_version(struct raft_io *io,
enum raft_uv_format_version version);

/**
* Callback invoked by the transport implementation when a new incoming
* connection has been established.
Expand Down
17 changes: 17 additions & 0 deletions src/raft/uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@
if (rv != 0) {
return rv;
}
if (metadata.format_version != 0 &&
metadata.format_version != uv->format_version) {
ErrMsgPrintf(io->errmsg, "decode metadata: bad format version %u",

Check warning on line 117 in src/raft/uv.c

View check run for this annotation

Codecov / codecov/patch

src/raft/uv.c#L117

Added line #L117 was not covered by tests
metadata.format_version);
return RAFT_MALFORMED;

Check warning on line 119 in src/raft/uv.c

View check run for this annotation

Codecov / codecov/patch

src/raft/uv.c#L119

Added line #L119 was not covered by tests
}
uv->metadata = metadata;

rv = uv->transport->init(uv->transport, id, address);
Expand Down Expand Up @@ -512,6 +518,7 @@
struct uv *uv;
int rv;
uv = io->impl;
uv->metadata.format_version = uv->format_version;
uv->metadata.version++;
uv->metadata.term = term;
uv->metadata.voted_for = 0;
Expand All @@ -528,6 +535,7 @@
struct uv *uv;
int rv;
uv = io->impl;
uv->metadata.format_version = uv->format_version;
uv->metadata.version++;
uv->metadata.voted_for = server_id;
rv = uvMetadataStore(uv, &uv->metadata);
Expand Down Expand Up @@ -723,6 +731,7 @@
uv->closing = false;
uv->close_cb = NULL;
uv->auto_recovery = true;
uv->format_version = RAFT_UV_FORMAT_V1;

uvSeedRand(uv);

Expand Down Expand Up @@ -812,4 +821,12 @@
uv->auto_recovery = flag;
}

void raft_uv_set_format_version(struct raft_io *io,
enum raft_uv_format_version version)
{
PRE(RAFT_UV_FORMAT_V1 <= version && version < RAFT_UV_FORMAT_NR);
struct uv *uv = io->impl;
uv->format_version = version;
}

#undef tracef
10 changes: 8 additions & 2 deletions src/raft/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ typedef unsigned long long uvCounter;
/* Information persisted in a single metadata file. */
struct uvMetadata
{
enum raft_uv_format_version format_version;
unsigned long long version; /* Monotonically increasing version */
raft_term term; /* Current term */
raft_id voted_for; /* Server ID of last vote, or 0 */
Expand Down Expand Up @@ -95,6 +96,7 @@ struct uv
bool closing; /* True if we are closing */
raft_io_close_cb close_cb; /* Invoked when finishing closing */
bool auto_recovery; /* Try to recover from corrupt segments */
enum raft_uv_format_version format_version;
};

/* Implementation of raft_io->truncate. */
Expand Down Expand Up @@ -175,13 +177,16 @@ int uvSegmentLoadAll(struct uv *uv,
* The memory is aligned at disk block boundary, to allow for direct I/O. */
struct uvSegmentBuffer
{
enum raft_uv_format_version format_version;
size_t block_size; /* Disk block size for direct I/O */
uv_buf_t arena; /* Previously allocated memory that can be re-used */
size_t n; /* Write offset */
};

/* Initialize an empty buffer. */
void uvSegmentBufferInit(struct uvSegmentBuffer *b, size_t block_size);
void uvSegmentBufferInit(struct uvSegmentBuffer *b,
enum raft_uv_format_version format_version,
size_t block_size);

/* Release all memory used by the buffer. */
void uvSegmentBufferClose(struct uvSegmentBuffer *b);
Expand All @@ -196,7 +201,8 @@ int uvSegmentBufferFormat(struct uvSegmentBuffer *b);
* will be appended. */
int uvSegmentBufferAppend(struct uvSegmentBuffer *b,
const struct raft_entry entries[],
unsigned n_entries);
unsigned n_entries,
enum raft_uv_format_version format_version);

/* After all entries to write have been encoded, finalize the buffer by zeroing
* the unused memory of the last block. The out parameter will point to the
Expand Down
11 changes: 6 additions & 5 deletions src/raft/uv_append.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ static int uvAliveSegmentEncodeEntriesToWriteBuf(struct uvAliveSegment *segment,
}

rv = uvSegmentBufferAppend(&segment->pending, append->entries,
append->n);
append->n, segment->uv->format_version);
if (rv != 0) {
return rv;
}
Expand Down Expand Up @@ -512,7 +512,7 @@ static void uvAliveSegmentInit(struct uvAliveSegment *s, struct uv *uv)
s->last_index = 0;
s->size = sizeof(uint64_t) /* Format version */;
s->next_block = 0;
uvSegmentBufferInit(&s->pending, uv->block_size);
uvSegmentBufferInit(&s->pending, uv->format_version, uv->block_size);
s->written = 0;
s->barrier = NULL;
s->finalize = false;
Expand Down Expand Up @@ -593,11 +593,12 @@ static void uvAliveSegmentReserveSegmentCapacity(struct uvAliveSegment *s,

/* Return the number of bytes needed to store the batch of entries of this
* append request on disk. */
static size_t uvAppendSize(struct uvAppend *a)
static size_t uvAppendSize(struct uvAppend *a,
enum raft_uv_format_version format_version)
{
size_t size = sizeof(uint32_t) * 2; /* CRC checksums */
unsigned i;
size += uvSizeofBatchHeader(a->n, true); /* Batch header */
size += uvSizeofBatchHeader(a->n, format_version); /* Batch header */
for (i = 0; i < a->n; i++) { /* Entries data */
size += bytePad64(a->entries[i].buf.len);
}
Expand All @@ -618,7 +619,7 @@ static int uvAppendEnqueueRequest(struct uv *uv, struct uvAppend *append)
assert(uv->append_next_index > 0);
tracef("enqueue %u entries", append->n);

size = uvAppendSize(append);
size = uvAppendSize(append, uv->format_version);

/* If we have no segments yet, it means this is the very first append,
* and we need to add a new segment. Otherwise we check if the last
Expand Down
Loading
Loading