Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an API for querying the last raft log entry #683

Merged
merged 10 commits into from
Aug 26, 2024
23 changes: 23 additions & 0 deletions include/dqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,29 @@ DQLITE_API int dqlite_node_recover_ext(dqlite_node *n,
dqlite_node_info_ext infos[],
int n_info);

/**
* Retrieve information about the last persisted raft log entry.
*
* This is intended to be used in combination with dqlite_node_recover_ext, to
* determine which of the surviving nodes in a cluster is most up-to-date. The
* raft rules for this are:
*
* - If the two logs have last entries with different terms, the log with the
* higher term is more up-to-date.
* - Otherwise, the longer log is more up-to-date.
*
* Note that this function may result in physically modifying the raft-related
* files in the data directory. These modifications do not affect the logical
* state of the node. Deletion of invalid segment files can be disabled with
* dqlite_node_set_auto_recovery.
*
* This should be called after dqlite_node_init, but the node must not be
* running.
*/
DQLITE_API int dqlite_node_describe_last_entry(dqlite_node *n,
uint64_t *last_entry_index,
uint64_t *last_entry_term);

/**
* Return a human-readable description of the last error occurred.
*/
Expand Down
22 changes: 22 additions & 0 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,28 @@ RAFT_API int raft_bootstrap(struct raft *r,
RAFT_API int raft_recover(struct raft *r,
const struct raft_configuration *conf);

/**
* Read information about the last raft log entry that's stored on disk.
*
* "Last log entry" here should be understood as including snapshots,
* so if there is one snapshot on disk and no individual entries, the
* values returned in `index` and `term` are the index and term of the
* last entry included in the snapshot. If there are no snapshot and no
* entries, then `index` and `term` are both set to 0.
*
* This function is just a wrapper around the `load` method of raft_io.
* Note that the `load` method of the uv raft_io implementation is not
* read-only: as it walks the segment files on disk, it closes open
* segments that contain valid entries and deletes other open segments.
*
* This should be called after the raft_io instance is initialized (e.g.
* after calling raft_uv_init), but no active raft node should be using
* the instance.
*/
int raft_io_describe_last_entry(struct raft_io *io,
raft_index *index,
raft_term *term);

RAFT_API int raft_start(struct raft *r);

/**
Expand Down
30 changes: 30 additions & 0 deletions src/raft/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
#include "configuration.h"
#include "convert.h"
#include "election.h"
#include "entry.h"
#include "err.h"
#include "flags.h"
#include "heap.h"
#include "log.h"
#include "membership.h"
#include "snapshot.h"

#define DEFAULT_ELECTION_TIMEOUT 1000 /* One second */
#define DEFAULT_HEARTBEAT_TIMEOUT 100 /* One tenth of a second */
Expand Down Expand Up @@ -303,3 +305,31 @@

return 0;
}

int raft_io_describe_last_entry(struct raft_io *io,
raft_index *index,
raft_term *term)
{
raft_term current_term;
raft_id voted_for;
struct raft_snapshot *snapshot;
raft_index start_index;
struct raft_entry *entries;
size_t n_entries;
int rv;

cole-miller marked this conversation as resolved.
Show resolved Hide resolved
rv = io->load(io, &current_term, &voted_for, &snapshot,
&start_index, &entries, &n_entries);
if (rv != 0) {
return rv;

Check warning on line 324 in src/raft/raft.c

View check run for this annotation

Codecov / codecov/patch

src/raft/raft.c#L324

Added line #L324 was not covered by tests
}
*index = start_index + n_entries - 1;
*term = n_entries > 0 ? entries[n_entries - 1].term :
snapshot != NULL ? snapshot->term :
0;
if (snapshot != NULL) {
snapshotDestroy(snapshot);
}
entryBatchesDestroy(entries, n_entries);
return 0;
}
17 changes: 17 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,23 @@ int dqlite_node_recover_ext(dqlite_node *n,
return rv;
}

int dqlite_node_describe_last_entry(dqlite_node *n,
uint64_t *index,
uint64_t *term)
{
PRE(n->initialized && !n->running);
static_assert(sizeof(*index) == sizeof(raft_index),
"unexpected index type size");
raft_index *i = (raft_index *)index;
static_assert(sizeof(*term) == sizeof(raft_term),
"unexpected term type size");
raft_term *t = (raft_term *)term;
int rv;

rv = raft_io_describe_last_entry(&n->raft_io, i, t);
return rv == 0 ? 0 : DQLITE_ERROR;
}

dqlite_node_id dqlite_generate_node_id(const char *address)
{
tracef("generate node id");
Expand Down
87 changes: 79 additions & 8 deletions test/integration/test_cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,16 @@ TEST(cluster, restart, setUp, tearDown, 0, cluster_params)
struct rows rows;
long n_records =
strtol(munit_parameters_get(params, "num_records"), NULL, 0);
char sql[128];

HANDSHAKE;
OPEN;
PREPARE("CREATE TABLE test (n INT)", &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);

PREPARE("INSERT INTO TEST(n) VALUES(?)", &stmt_id);
for (int i = 0; i < n_records; ++i) {
sprintf(sql, "INSERT INTO test(n) VALUES(%d)", i + 1);
PREPARE(sql, &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);
EXEC_PARAMS(stmt_id, &last_insert_id, &rows_affected,
{.type = SQLITE_INTEGER, .integer = i});
cole-miller marked this conversation as resolved.
Show resolved Hide resolved
}

struct test_server *server = &f->servers[0];
Expand All @@ -133,19 +132,19 @@ TEST(cluster, dataOnNewNode, setUp, tearDown, 0, cluster_params)
struct rows rows;
long n_records =
strtol(munit_parameters_get(params, "num_records"), NULL, 0);
char sql[128];
unsigned id = 2;
const char *address = "@2";
int rv;

HANDSHAKE;
OPEN;
PREPARE("CREATE TABLE test (n INT)", &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);

PREPARE("INSERT INTO test(n) VALUES(?)", &stmt_id);
for (int i = 0; i < n_records; ++i) {
sprintf(sql, "INSERT INTO test(n) VALUES(%d)", i + 1);
PREPARE(sql, &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);
EXEC_PARAMS(stmt_id, &last_insert_id, &rows_affected,
{.type = SQLITE_INTEGER, .integer = i});
}

/* Add a second voting server, this one will receive all data from the
Expand All @@ -158,6 +157,27 @@ TEST(cluster, dataOnNewNode, setUp, tearDown, 0, cluster_params)
REMOVE(1);
sleep(1);

struct test_server *first = &f->servers[0];
test_server_stop(first);
test_server_prepare(first, params);
/* One entry per INSERT, plus one for the initial configuration, plus
* one for the CREATE TABLE, plus one legacy checkpoint command entry
* after 993 records or two after 2200 records. */
size_t extra = n_records >= 2200 ? 4 :
n_records >= 993 ? 3 :
2;
uint64_t last_entry_index;
uint64_t last_entry_term;
rv = dqlite_node_describe_last_entry(first->dqlite,
&last_entry_index,
&last_entry_term);
munit_assert_int(rv, ==, 0);
/* This assertion is not tight because the the leader also generates
* a nondeterministic number of barrier entries. */
munit_assert_ullong(last_entry_index, >=, n_records + extra);
munit_assert_ullong(last_entry_term, ==, 1);
test_server_run(first);

/* The full table is visible from the new node */
SELECT(2);
HANDSHAKE;
Expand All @@ -166,6 +186,22 @@ TEST(cluster, dataOnNewNode, setUp, tearDown, 0, cluster_params)
QUERY(stmt_id, &rows);
munit_assert_long(rows.next->values->integer, ==, n_records);
clientCloseRows(&rows);

/* One more entry on the new node. */
PREPARE("INSERT INTO test(n) VALUES(?)", &stmt_id);
EXEC_PARAMS(stmt_id, &last_insert_id, &rows_affected,
{.type = SQLITE_INTEGER, .integer = 5000});

struct test_server *second = &f->servers[1];
test_server_stop(second);
test_server_prepare(second, params);
rv = dqlite_node_describe_last_entry(second->dqlite,
&last_entry_index,
&last_entry_term);
munit_assert_int(rv, ==, 0);
munit_assert_ullong(last_entry_index, >=, n_records + extra + 1);
munit_assert_ullong(last_entry_term, ==, 1);
test_server_run(second);
return MUNIT_OK;
}

Expand Down Expand Up @@ -288,3 +324,38 @@ TEST(cluster, modifyingQuerySql, setUp, tearDown, 0, cluster_params)
clientCloseRows(&rows);
return MUNIT_OK;
}

/* Edge cases for dqlite_node_describe_last_entry. */
TEST(cluster, last_entry_edge_cases, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
uint64_t index;
uint64_t term;
int rv;

sleep(1);

struct test_server *first = &f->servers[0];
test_server_stop(first);
test_server_prepare(first, params);
rv = dqlite_node_describe_last_entry(first->dqlite, &index, &term);
munit_assert_int(rv, ==, 0);
/* The log contains only the bootstrap configuration. */
munit_assert_ullong(index, ==, 1);
/* The bootstrap configuration is always tagged with term 1. */
munit_assert_ullong(term, ==, 1);
test_server_run(first);

struct test_server *second = &f->servers[1];
test_server_stop(second);
test_server_prepare(second, params);
rv = dqlite_node_describe_last_entry(second->dqlite, &index, &term);
munit_assert_int(rv, ==, 0);
/* We didn't bootstrap and haven't joined the leader, so our log is
* empty. */
munit_assert_ullong(index, ==, 0);
munit_assert_ullong(term, ==, 0);
test_server_run(second);

return MUNIT_OK;
}
12 changes: 12 additions & 0 deletions test/lib/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@
munit_assert_int(rv_, ==, 0); \
}

#define EXEC_PARAMS(STMT_ID, LAST_INSERT_ID, ROWS_AFFECTED, ...) \
{ \
int rv_; \
struct value vals_[] = {__VA_ARGS__}; \
size_t len_ = sizeof(vals_) / sizeof(vals_[0]); \
rv_ = clientSendExec(f->client, STMT_ID, vals_, len_, NULL); \
munit_assert_int(rv_, ==, 0); \
rv_ = clientRecvResult(f->client, LAST_INSERT_ID, \
ROWS_AFFECTED, NULL); \
munit_assert_int(rv_, ==, 0); \
}

#define EXEC_SQL(SQL, LAST_INSERT_ID, ROWS_AFFECTED) \
{ \
int rv_; \
Expand Down
13 changes: 12 additions & 1 deletion test/lib/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void test_server_tear_down(struct test_server *s)
test_dir_tear_down(s->dir);
}

void test_server_start(struct test_server *s, const MunitParameter params[])
void test_server_prepare(struct test_server *s, const MunitParameter params[])
{
int rv;

Expand Down Expand Up @@ -128,13 +128,24 @@ void test_server_start(struct test_server *s, const MunitParameter params[])
munit_assert_int(rv, ==, 0);
}
}
}

void test_server_run(struct test_server *s)
{
int rv;

rv = dqlite_node_start(s->dqlite);
munit_assert_int(rv, ==, 0);

test_server_client_connect(s, &s->client);
}

void test_server_start(struct test_server *s, const MunitParameter params[])
{
test_server_prepare(s, params);
test_server_run(s);
}

struct client_proto *test_server_client(struct test_server *s)
{
return &s->client;
Expand Down
8 changes: 7 additions & 1 deletion test/lib/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ void test_server_setup(struct test_server *s,
/* Cleanup the test server. */
void test_server_tear_down(struct test_server *s);

/* Start the test server. */
/* Set up the test server without running it. */
void test_server_prepare(struct test_server *s, const MunitParameter params[]);

/* Run the test server after setting it up. */
void test_server_run(struct test_server *s);

/* Start the test server. Equivalent to test_server_prepare + test_server_run. */
void test_server_start(struct test_server *s, const MunitParameter params[]);

/* Stop the test server. */
Expand Down
Loading