Skip to content

Commit

Permalink
Merge pull request #720 from Netflix/dev
Browse files Browse the repository at this point in the history
[Beta] Add DC_EACH_SAFE_QUORUM consistency level
  • Loading branch information
smukil authored Oct 11, 2019
2 parents ae86260 + d5c1032 commit 46a9b33
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 28 deletions.
143 changes: 136 additions & 7 deletions src/dyn_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
#include "dyn_util.h"

static rstatus_t msg_quorum_rsp_handler(struct context *ctx, struct msg *req, struct msg *rsp);
static rstatus_t msg_each_quorum_rsp_handler(struct context *ctx, struct msg *req,
struct msg *rsp);
static msg_response_handler_t msg_get_rsp_handler(struct context *ctx, struct msg *req);

static rstatus_t rewrite_query_if_necessary(struct msg **req,
Expand Down Expand Up @@ -653,8 +655,10 @@ rstatus_t req_forward_to_peer(struct context *ctx, struct conn *c_conn,
}

if (!(same_dc && same_rack) || force_swallow) {
// Swallow responses from remote racks or DCs.
rack_msg->swallow = true;
if (req->consistency != DC_EACH_SAFE_QUORUM || force_swallow) {
// Swallow responses from remote racks or DCs.
rack_msg->swallow = true;
}
}

// Get a connection to the node.
Expand Down Expand Up @@ -701,13 +705,20 @@ rstatus_t req_forward_to_peer(struct context *ctx, struct conn *c_conn,
return status;
}

void req_forward_all_local_racks(struct context *ctx, struct conn *c_conn,
void req_forward_all_racks_for_dc(struct context *ctx, struct conn *c_conn,
struct msg *req, struct mbuf *orig_mbuf,
uint8_t *key, uint32_t keylen,
struct datacenter *dc) {
uint8_t rack_cnt = (uint8_t)array_n(&dc->racks);
uint8_t rack_index;
init_response_mgr(&req->rspmgr, req, req->is_read, rack_cnt, c_conn);

if (req->rspmgrs_inited == false) {
if (req->consistency == DC_EACH_SAFE_QUORUM) {
init_response_mgr_all_dcs(ctx, req, c_conn, dc);
} else {
init_response_mgr(req, &req->rspmgr, rack_cnt, c_conn);
}
}
log_info("%s %s same DC racks:%d expect replies %d", print_obj(c_conn),
print_obj(req), rack_cnt, req->rspmgr.max_responses);

Expand All @@ -732,6 +743,10 @@ static bool request_send_to_all_dcs(struct msg *req) {
// There is a routing override
if (req->msg_routing != ROUTING_NORMAL) return false;

// Under DC_EACH_SAFE_QUORUM, we need to send reads and writes to all
// DCs.
if (req->consistency == DC_EACH_SAFE_QUORUM) return true;

// Reads are not propagated
if (req->is_read) return false;

Expand All @@ -753,8 +768,11 @@ static bool request_send_to_all_local_racks(struct msg *req) {
// A write should go to all racks
if (!req->is_read) return true;

if ((req->consistency == DC_QUORUM) || (req->consistency == DC_SAFE_QUORUM))
if ((req->consistency == DC_QUORUM)
|| (req->consistency == DC_SAFE_QUORUM)
|| (req->consistency == DC_EACH_SAFE_QUORUM)) {
return true;
}
return false;
}

Expand Down Expand Up @@ -794,6 +812,17 @@ static void req_forward_remote_dc(struct context *ctx, struct conn *c_conn,
const uint32_t rack_cnt = array_n(&dc->racks);
if (rack_cnt == 0) return;

if (req->consistency == DC_EACH_SAFE_QUORUM) {
// Under 'DC_EACH_SAFE_QUORUM', we want to hear back from at least
// quorum racks in each DC, so send it to all racks in remote DCs.
req_forward_all_racks_for_dc(ctx, c_conn, req, orig_mbuf, key, keylen, dc);
return;
}

// If we're not expecting a consistency level of 'DC_EACH_SAFE_QUORUM', then
// we send it to only to the preselected rack in the remote DCs. If that's not
// reachable, we failover to another in the remote DC.

// Pick the preferred pre-selected rack for this DC.
struct rack *rack = dc->preselected_rack_for_replication;
if (rack == NULL) rack = array_get(&dc->racks, 0);
Expand Down Expand Up @@ -846,7 +875,7 @@ static void req_forward_local_dc(struct context *ctx, struct conn *c_conn,
req->rsp_handler = msg_get_rsp_handler(ctx, req);
if (request_send_to_all_local_racks(req)) {
// send request to all local racks
req_forward_all_local_racks(ctx, c_conn, req, orig_mbuf, key, keylen, dc);
req_forward_all_racks_for_dc(ctx, c_conn, req, orig_mbuf, key, keylen, dc);
} else {
// send request to only local token owner
struct rack *rack =
Expand Down Expand Up @@ -1002,7 +1031,7 @@ rstatus_t rewrite_query_if_necessary(struct msg **req, struct context *ctx) {
* the new query and free up the original msg.
*
*/
rstatus_t rewrite_query_with_timestamp_md(struct msg **req, struct context *ctx) {
static rstatus_t rewrite_query_with_timestamp_md(struct msg **req, struct context *ctx) {

if (is_read_repairs_enabled() == false) return DN_OK;

Expand Down Expand Up @@ -1099,6 +1128,8 @@ static msg_response_handler_t msg_get_rsp_handler(struct context *ctx, struct ms
// Check if its quorum
if ((req->consistency == DC_QUORUM) || (req->consistency == DC_SAFE_QUORUM)) {
return msg_quorum_rsp_handler;
} else if (req->consistency == DC_EACH_SAFE_QUORUM) {
return msg_each_quorum_rsp_handler;
}
}

Expand Down Expand Up @@ -1149,6 +1180,104 @@ static rstatus_t msg_quorum_rsp_handler(struct context *ctx, struct msg *req,
return DN_OK;
}

static int find_rspmgr_idx(struct context *ctx, struct response_mgr **rspmgrs,
struct string *target_dc_name) {
int num_dcs = (int) array_n(&ctx->pool.datacenters);

int i = 0;
for (i = 0; i < num_dcs; ++i) {
struct response_mgr *rspmgr = rspmgrs[i];
if (string_compare(&rspmgr->dc_name, target_dc_name) == 0) {
return i;
}
}
return -1;
}

static bool all_rspmgrs_done(struct context *ctx, struct response_mgr **rspmgrs) {
int num_dcs = (int) array_n(&ctx->pool.datacenters);
int i = 0;
for (i = 0; i < num_dcs; ++i) {
struct response_mgr *rspmgr = rspmgrs[i];
if (!rspmgr->done) return false;
}

return true;
}

static struct msg *all_rspmgrs_get_response(struct context *ctx, struct msg *req) {
int num_dcs = (int) array_n(&ctx->pool.datacenters);
struct msg *rsp = NULL;
int i;
for (i = 0; i < num_dcs; ++i) {
struct response_mgr *rspmgr = req->additional_each_rspmgrs[i];
struct msg *dc_rsp = NULL;
if (!rsp) {
rsp = rspmgr_get_response(ctx, rspmgr);
ASSERT(rsp);
} else if (rsp->is_error) {
// If any of the DCs errored out, we just clean up responses from the
// remaining DCs.
rspmgr_free_other_responses(rspmgr, NULL);
continue;
} else {
ASSERT(rsp->is_error == false);
// If the DCs we've processed so far have not seen errors, we need to
// make sure that the remaining DCs don't have errors too.
dc_rsp = rspmgr_get_response(ctx, rspmgr);
ASSERT(dc_rsp);
if (dc_rsp->is_error) {
rsp_put(rsp);
rsp = dc_rsp;
} else {
// If it's not an error, clear all responses from this DC.
rspmgr_free_other_responses(rspmgr, NULL);
continue;
}
}

rspmgr_free_other_responses(rspmgr, rsp);
rsp->peer = req;
req->selected_rsp = rsp;
req->error_code = rsp->error_code;
req->is_error = rsp->is_error;
req->dyn_error_code = rsp->dyn_error_code;

}

return rsp;
}

static rstatus_t msg_each_quorum_rsp_handler(struct context *ctx, struct msg *req,
struct msg *rsp) {

if (all_rspmgrs_done(ctx, req->additional_each_rspmgrs)) return swallow_extra_rsp(req, rsp);

int rspmgr_idx = -1;
struct conn *rsp_conn = rsp->owner;
if (rsp_conn == NULL) {
rspmgr_idx = 0;
} else if (rsp_conn->type == CONN_DNODE_PEER_SERVER) {
struct node *peer_instance = (struct node*) rsp_conn->owner;
struct string *peer_dc_name = &peer_instance->dc;
rspmgr_idx = find_rspmgr_idx(ctx, req->additional_each_rspmgrs, peer_dc_name);
if (rspmgr_idx == -1) {
log_error("Could not find which DC response was from");
}
} else if (rsp_conn->type == CONN_SERVER) {
// If this is a 'CONN_SERVER' connection, then it is from the same DC.
rspmgr_idx = 0;
}

struct response_mgr *rspmgr = req->additional_each_rspmgrs[rspmgr_idx];
rspmgr_submit_response(rspmgr, rsp);
if (!rspmgr_check_is_done(rspmgr)) return DN_EAGAIN;
if (!all_rspmgrs_done(ctx, req->additional_each_rspmgrs)) return DN_EAGAIN;

rsp = all_rspmgrs_get_response(ctx, req);
return DN_OK;
}

static void req_client_enqueue_omsgq(struct context *ctx, struct conn *conn,
struct msg *req) {
ASSERT(req->is_request);
Expand Down
8 changes: 6 additions & 2 deletions src/dyn_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -2010,10 +2010,12 @@ static rstatus_t conf_validate_pool(struct conf *cf, struct conf_pool *cp) {
g_read_consistency = DC_SAFE_QUORUM;
else if (!dn_strcasecmp(cp->read_consistency.data, CONF_STR_DC_QUORUM))
g_read_consistency = DC_QUORUM;
else if (!dn_strcasecmp(cp->read_consistency.data, CONF_STR_DC_EACH_SAFE_QUORUM))
g_read_consistency = DC_EACH_SAFE_QUORUM;
else {
log_error(
"conf: directive \"read_consistency:\"must be one of 'DC_ONE' "
"'DC_QUORUM' 'DC_SAFE_QUORUM'");
"'DC_QUORUM' 'DC_SAFE_QUORUM' 'DC_EACH_SAFE_QUORUM'");
return DN_ERROR;
}

Expand All @@ -2023,10 +2025,12 @@ static rstatus_t conf_validate_pool(struct conf *cf, struct conf_pool *cp) {
g_write_consistency = DC_SAFE_QUORUM;
else if (!dn_strcasecmp(cp->write_consistency.data, CONF_STR_DC_QUORUM))
g_write_consistency = DC_QUORUM;
else if (!dn_strcasecmp(cp->write_consistency.data, CONF_STR_DC_EACH_SAFE_QUORUM))
g_write_consistency = DC_EACH_SAFE_QUORUM;
else {
log_error(
"conf: directive \"write_consistency:\"must be one of 'DC_ONE' "
"'DC_QUORUM' 'DC_SAFE_QUORUM'");
"'DC_QUORUM' 'DC_SAFE_QUORUM' 'DC_EACH_SAFE_QUORUM'");
return DN_ERROR;
}

Expand Down
1 change: 1 addition & 0 deletions src/dyn_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#define CONF_STR_DC_ONE "dc_one"
#define CONF_STR_DC_QUORUM "dc_quorum"
#define CONF_STR_DC_SAFE_QUORUM "dc_safe_quorum"
#define CONF_STR_DC_EACH_SAFE_QUORUM "dc_each_safe_quorum"

#define UNSET_NUM 0

Expand Down
3 changes: 2 additions & 1 deletion src/dyn_dnode_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ static void dnode_req_forward(struct context *ctx, struct conn *conn,
// racks
struct mbuf *orig_mbuf = STAILQ_FIRST(&req->mhdr);
struct datacenter *dc = server_get_dc(pool, &pool->dc);
req_forward_all_local_racks(ctx, conn, req, orig_mbuf, key, keylen, dc);

req_forward_all_racks_for_dc(ctx, conn, req, orig_mbuf, key, keylen, dc);
}
}

Expand Down
19 changes: 19 additions & 0 deletions src/dyn_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ static struct msg *_msg_get(struct conn *conn, bool request,
msg->orig_msg = NULL;
msg->needs_repair = false;
msg->rewrite_with_ts_possible = true;
msg->additional_each_rspmgrs = NULL;
msg->rspmgrs_inited = false;

return msg;
}
Expand Down Expand Up @@ -641,6 +643,23 @@ void msg_put(struct msg *msg) {
msg_put(msg->orig_msg);
msg->orig_msg = NULL;
}

if (msg->additional_each_rspmgrs) {
ASSERT(msg->consistency == DC_EACH_SAFE_QUORUM);
// Only requests have their connection's owner as the 'struct server_pool' object,
// and only requests would have 'additional_each_rspmgrs', so it's safe to cast to
// 'struct server_pool'.
struct server_pool *sp = msg->owner->owner;
uint8_t num_dcs = array_n(&sp->datacenters);

int i;
// Skip the 0th index as that points back to the statically allocated 'rspmgr' struct
// in 'msg'.
for (i = 1; i < num_dcs; ++i) {
dn_free(msg->additional_each_rspmgrs[i]);
}
dn_free(msg->additional_each_rspmgrs);
}
TAILQ_INSERT_HEAD(&free_msgq, msg, m_tqe);
}

Expand Down
16 changes: 14 additions & 2 deletions src/dyn_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ typedef enum consistency {
DC_ONE = 0,
DC_QUORUM,
DC_SAFE_QUORUM,
DC_EACH_SAFE_QUORUM,
} consistency_t;

static inline char *get_consistency_string(consistency_t cons) {
Expand All @@ -343,6 +344,8 @@ static inline char *get_consistency_string(consistency_t cons) {
return "DC_QUORUM";
case DC_SAFE_QUORUM:
return "DC_SAFE_QUORUM";
case DC_EACH_SAFE_QUORUM:
return "DC_EACH_SAFE_QUORUM";
}
return "INVALID CONSISTENCY";
}
Expand Down Expand Up @@ -430,7 +433,7 @@ struct msg {
or remote region or cross rack */
usec_t request_send_time; /* when message was sent: either to the data store
or remote region or cross rack */
uint8_t awaiting_rsps;
uint32_t awaiting_rsps;
struct msg *selected_rsp;

struct rbnode tmo_rbe; /* entry in rbtree */
Expand Down Expand Up @@ -506,7 +509,16 @@ struct msg {
msg_response_handler_t rsp_handler;
consistency_t consistency;
msgid_t parent_id; /* parent message id */

// Primary response_mgr for this instance's DC.
struct response_mgr rspmgr;

// Additional response_mgrs if we choose to use DC_EACH_SAFE_QUORUM
struct response_mgr **additional_each_rspmgrs;

// Indicates whether the rspmgr and additional_each_rspmgrs(if applicable)
// are init-ed.
bool rspmgrs_inited;
};

TAILQ_HEAD(msg_tqh, msg);
Expand Down Expand Up @@ -588,7 +600,7 @@ void dnode_rsp_gos_syn(struct context *ctx, struct conn *p_conn,

void req_forward_error(struct context *ctx, struct conn *conn, struct msg *req,
err_t error_code, err_t dyn_error_code);
void req_forward_all_local_racks(struct context *ctx, struct conn *c_conn,
void req_forward_all_racks_for_dc(struct context *ctx, struct conn *c_conn,
struct msg *req, struct mbuf *orig_mbuf,
uint8_t *key, uint32_t keylen,
struct datacenter *dc);
Expand Down
Loading

0 comments on commit 46a9b33

Please sign in to comment.