diff --git a/include/dqlite.h b/include/dqlite.h index f064f4836..d0dbe153f 100644 --- a/include/dqlite.h +++ b/include/dqlite.h @@ -586,6 +586,29 @@ DQLITE_API int dqlite_node_recover_ext(dqlite_node *n, dqlite_node_info_ext infos[], int n_info); +/** + * Retrieve information about the last persisted raft log entry. + * + * This is intended to be used in combination with dqlite_node_recover_ext, to + * determine which of the surviving nodes in a cluster is most up-to-date. The + * raft rules for this are: + * + * - If the two logs have last entries with different terms, the log with the + * higher term is more up-to-date. + * - Otherwise, the longer log is more up-to-date. + * + * Note that this function may result in physically modifying the raft-related + * files in the data directory. These modifications do not affect the logical + * state of the node. Deletion of invalid segment files can be disabled with + * dqlite_node_set_auto_recovery. + * + * This should be called after dqlite_node_init, but the node must not be + * running. + */ +DQLITE_API int dqlite_node_describe_last_entry(dqlite_node *n, + uint64_t *last_entry_index, + uint64_t *last_entry_term); + /** * Return a human-readable description of the last error occurred. */ diff --git a/src/raft.h b/src/raft.h index fce3f5518..76bea3500 100644 --- a/src/raft.h +++ b/src/raft.h @@ -1061,6 +1061,28 @@ RAFT_API int raft_bootstrap(struct raft *r, RAFT_API int raft_recover(struct raft *r, const struct raft_configuration *conf); +/** + * Read information about the last raft log entry that's stored on disk. + * + * "Last log entry" here should be understood as including snapshots, + * so if there is one snapshot on disk and no individual entries, the + * values returned in `index` and `term` are the index and term of the + * last entry included in the snapshot. If there are no snapshot and no + * entries, then `index` and `term` are both set to 0. + * + * This function is just a wrapper around the `load` method of raft_io. + * Note that the `load` method of the uv raft_io implementation is not + * read-only: as it walks the segment files on disk, it closes open + * segments that contain valid entries and deletes other open segments. + * + * This should be called after the raft_io instance is initialized (e.g. + * after calling raft_uv_init), but no active raft node should be using + * the instance. + */ +int raft_io_describe_last_entry(struct raft_io *io, + raft_index *index, + raft_term *term); + RAFT_API int raft_start(struct raft *r); /** diff --git a/src/raft/raft.c b/src/raft/raft.c index ce3996b90..e98927b4e 100644 --- a/src/raft/raft.c +++ b/src/raft/raft.c @@ -10,11 +10,13 @@ #include "configuration.h" #include "convert.h" #include "election.h" +#include "entry.h" #include "err.h" #include "flags.h" #include "heap.h" #include "log.h" #include "membership.h" +#include "snapshot.h" #define DEFAULT_ELECTION_TIMEOUT 1000 /* One second */ #define DEFAULT_HEARTBEAT_TIMEOUT 100 /* One tenth of a second */ @@ -303,3 +305,31 @@ static int ioFsmVersionCheck(struct raft *r, return 0; } + +int raft_io_describe_last_entry(struct raft_io *io, + raft_index *index, + raft_term *term) +{ + raft_term current_term; + raft_id voted_for; + struct raft_snapshot *snapshot; + raft_index start_index; + struct raft_entry *entries; + size_t n_entries; + int rv; + + rv = io->load(io, ¤t_term, &voted_for, &snapshot, + &start_index, &entries, &n_entries); + if (rv != 0) { + return rv; + } + *index = start_index + n_entries - 1; + *term = n_entries > 0 ? entries[n_entries - 1].term : + snapshot != NULL ? snapshot->term : + 0; + if (snapshot != NULL) { + snapshotDestroy(snapshot); + } + entryBatchesDestroy(entries, n_entries); + return 0; +} diff --git a/src/server.c b/src/server.c index 7e44b1c49..6a29e80c3 100644 --- a/src/server.c +++ b/src/server.c @@ -1065,6 +1065,30 @@ int dqlite_node_recover_ext(dqlite_node *n, return rv; } +int dqlite_node_describe_last_entry(dqlite_node *n, + uint64_t *index, + uint64_t *term) +{ + PRE(n->initialized && !n->running); + static_assert(sizeof(*index) == sizeof(raft_index), + "unexpected index type size"); + raft_index *i = (raft_index *)index; + static_assert(sizeof(*term) == sizeof(raft_term), + "unexpected term type size"); + raft_term *t = (raft_term *)term; + int rv; + +#ifdef USE_SYSTEM_RAFT + (void)i; + (void)t; + (void)rv; + return DQLITE_ERROR; +#else + rv = raft_io_describe_last_entry(&n->raft_io, i, t); + return rv == 0 ? 0 : DQLITE_ERROR; +#endif +} + dqlite_node_id dqlite_generate_node_id(const char *address) { tracef("generate node id"); diff --git a/test/integration/test_cluster.c b/test/integration/test_cluster.c index 57f605cff..131be5132 100644 --- a/test/integration/test_cluster.c +++ b/test/integration/test_cluster.c @@ -97,17 +97,16 @@ TEST(cluster, restart, setUp, tearDown, 0, cluster_params) struct rows rows; long n_records = strtol(munit_parameters_get(params, "num_records"), NULL, 0); - char sql[128]; HANDSHAKE; OPEN; PREPARE("CREATE TABLE test (n INT)", &stmt_id); EXEC(stmt_id, &last_insert_id, &rows_affected); + PREPARE("INSERT INTO TEST(n) VALUES(?)", &stmt_id); for (int i = 0; i < n_records; ++i) { - sprintf(sql, "INSERT INTO test(n) VALUES(%d)", i + 1); - PREPARE(sql, &stmt_id); - EXEC(stmt_id, &last_insert_id, &rows_affected); + EXEC_PARAMS(stmt_id, &last_insert_id, &rows_affected, + {.type = SQLITE_INTEGER, .integer = i}); } struct test_server *server = &f->servers[0]; @@ -133,7 +132,6 @@ TEST(cluster, dataOnNewNode, setUp, tearDown, 0, cluster_params) struct rows rows; long n_records = strtol(munit_parameters_get(params, "num_records"), NULL, 0); - char sql[128]; unsigned id = 2; const char *address = "@2"; @@ -142,10 +140,10 @@ TEST(cluster, dataOnNewNode, setUp, tearDown, 0, cluster_params) PREPARE("CREATE TABLE test (n INT)", &stmt_id); EXEC(stmt_id, &last_insert_id, &rows_affected); + PREPARE("INSERT INTO test(n) VALUES(?)", &stmt_id); for (int i = 0; i < n_records; ++i) { - sprintf(sql, "INSERT INTO test(n) VALUES(%d)", i + 1); - PREPARE(sql, &stmt_id); - EXEC(stmt_id, &last_insert_id, &rows_affected); + EXEC_PARAMS(stmt_id, &last_insert_id, &rows_affected, + {.type = SQLITE_INTEGER, .integer = i}); } /* Add a second voting server, this one will receive all data from the @@ -158,6 +156,32 @@ TEST(cluster, dataOnNewNode, setUp, tearDown, 0, cluster_params) REMOVE(1); sleep(1); + struct test_server *first = &f->servers[0]; + test_server_stop(first); + test_server_prepare(first, params); +#ifndef USE_SYSTEM_RAFT + int rv; + /* One entry per INSERT, plus one for the initial configuration, plus + * one for the CREATE TABLE, plus one legacy checkpoint command entry + * after 993 records or two after 2200 records. */ + uint64_t expected_entries = n_records + (n_records >= 2200 ? 4 : + n_records >= 993 ? 3 : + 2); + /* We also expect a variable number of barrier entries. Just specify an + * upper bound since we don't know the exact count. */ + uint64_t max_barriers = 10; + uint64_t last_entry_index; + uint64_t last_entry_term; + rv = dqlite_node_describe_last_entry(first->dqlite, + &last_entry_index, + &last_entry_term); + munit_assert_int(rv, ==, 0); + munit_assert_uint64(expected_entries, <=, last_entry_index); + munit_assert_uint64(last_entry_index, <, expected_entries + max_barriers); + munit_assert_uint64(last_entry_term, ==, 1); +#endif + test_server_run(first); + /* The full table is visible from the new node */ SELECT(2); HANDSHAKE; @@ -166,6 +190,25 @@ TEST(cluster, dataOnNewNode, setUp, tearDown, 0, cluster_params) QUERY(stmt_id, &rows); munit_assert_long(rows.next->values->integer, ==, n_records); clientCloseRows(&rows); + + /* One more entry on the new node. */ + PREPARE("INSERT INTO test(n) VALUES(?)", &stmt_id); + EXEC_PARAMS(stmt_id, &last_insert_id, &rows_affected, + {.type = SQLITE_INTEGER, .integer = 5000}); + + struct test_server *second = &f->servers[1]; + test_server_stop(second); + test_server_prepare(second, params); +#ifndef USE_SYSTEM_RAFT + rv = dqlite_node_describe_last_entry(second->dqlite, + &last_entry_index, + &last_entry_term); + munit_assert_int(rv, ==, 0); + munit_assert_uint64(expected_entries + 1, <=, last_entry_index); + munit_assert_uint64(last_entry_index, <, expected_entries + max_barriers + 1); + munit_assert_uint64(last_entry_term, ==, 1); +#endif + test_server_run(second); return MUNIT_OK; } @@ -288,3 +331,42 @@ TEST(cluster, modifyingQuerySql, setUp, tearDown, 0, cluster_params) clientCloseRows(&rows); return MUNIT_OK; } + +#ifndef USE_SYSTEM_RAFT + +/* Edge cases for dqlite_node_describe_last_entry. */ +TEST(cluster, last_entry_edge_cases, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + uint64_t index; + uint64_t term; + int rv; + + sleep(1); + + struct test_server *first = &f->servers[0]; + test_server_stop(first); + test_server_prepare(first, params); + rv = dqlite_node_describe_last_entry(first->dqlite, &index, &term); + munit_assert_int(rv, ==, 0); + /* The log contains only the bootstrap configuration. */ + munit_assert_uint64(index, ==, 1); + /* The bootstrap configuration is always tagged with term 1. */ + munit_assert_uint64(term, ==, 1); + test_server_run(first); + + struct test_server *second = &f->servers[1]; + test_server_stop(second); + test_server_prepare(second, params); + rv = dqlite_node_describe_last_entry(second->dqlite, &index, &term); + munit_assert_int(rv, ==, 0); + /* We didn't bootstrap and haven't joined the leader, so our log is + * empty. */ + munit_assert_uint64(index, ==, 0); + munit_assert_uint64(term, ==, 0); + test_server_run(second); + + return MUNIT_OK; +} + +#endif diff --git a/test/lib/client.h b/test/lib/client.h index daab3affa..a42bd8ed6 100644 --- a/test/lib/client.h +++ b/test/lib/client.h @@ -140,6 +140,18 @@ munit_assert_int(rv_, ==, 0); \ } +#define EXEC_PARAMS(STMT_ID, LAST_INSERT_ID, ROWS_AFFECTED, ...) \ + { \ + int rv_; \ + struct value vals_[] = {__VA_ARGS__}; \ + size_t len_ = sizeof(vals_) / sizeof(vals_[0]); \ + rv_ = clientSendExec(f->client, STMT_ID, vals_, len_, NULL); \ + munit_assert_int(rv_, ==, 0); \ + rv_ = clientRecvResult(f->client, LAST_INSERT_ID, \ + ROWS_AFFECTED, NULL); \ + munit_assert_int(rv_, ==, 0); \ + } + #define EXEC_SQL(SQL, LAST_INSERT_ID, ROWS_AFFECTED) \ { \ int rv_; \ diff --git a/test/lib/server.c b/test/lib/server.c index 89d4060cb..1bfdb6b98 100644 --- a/test/lib/server.c +++ b/test/lib/server.c @@ -58,7 +58,7 @@ void test_server_tear_down(struct test_server *s) test_dir_tear_down(s->dir); } -void test_server_start(struct test_server *s, const MunitParameter params[]) +void test_server_prepare(struct test_server *s, const MunitParameter params[]) { int rv; @@ -128,6 +128,11 @@ void test_server_start(struct test_server *s, const MunitParameter params[]) munit_assert_int(rv, ==, 0); } } +} + +void test_server_run(struct test_server *s) +{ + int rv; rv = dqlite_node_start(s->dqlite); munit_assert_int(rv, ==, 0); @@ -135,6 +140,12 @@ void test_server_start(struct test_server *s, const MunitParameter params[]) test_server_client_connect(s, &s->client); } +void test_server_start(struct test_server *s, const MunitParameter params[]) +{ + test_server_prepare(s, params); + test_server_run(s); +} + struct client_proto *test_server_client(struct test_server *s) { return &s->client; diff --git a/test/lib/server.h b/test/lib/server.h index 7e0dc111b..8b559108e 100644 --- a/test/lib/server.h +++ b/test/lib/server.h @@ -35,7 +35,13 @@ void test_server_setup(struct test_server *s, /* Cleanup the test server. */ void test_server_tear_down(struct test_server *s); -/* Start the test server. */ +/* Set up the test server without running it. */ +void test_server_prepare(struct test_server *s, const MunitParameter params[]); + +/* Run the test server after setting it up. */ +void test_server_run(struct test_server *s); + +/* Start the test server. Equivalent to test_server_prepare + test_server_run. */ void test_server_start(struct test_server *s, const MunitParameter params[]); /* Stop the test server. */