diff --git a/src/dyn_client.c b/src/dyn_client.c index 328bcd796..46bfd90e8 100644 --- a/src/dyn_client.c +++ b/src/dyn_client.c @@ -50,6 +50,8 @@ #include "dyn_util.h" static rstatus_t msg_quorum_rsp_handler(struct context *ctx, struct msg *req, struct msg *rsp); +static rstatus_t msg_each_quorum_rsp_handler(struct context *ctx, struct msg *req, + struct msg *rsp); static msg_response_handler_t msg_get_rsp_handler(struct context *ctx, struct msg *req); static rstatus_t rewrite_query_if_necessary(struct msg **req, @@ -653,8 +655,10 @@ rstatus_t req_forward_to_peer(struct context *ctx, struct conn *c_conn, } if (!(same_dc && same_rack) || force_swallow) { - // Swallow responses from remote racks or DCs. - rack_msg->swallow = true; + if (req->consistency != DC_EACH_SAFE_QUORUM || force_swallow) { + // Swallow responses from remote racks or DCs. + rack_msg->swallow = true; + } } // Get a connection to the node. @@ -701,13 +705,20 @@ rstatus_t req_forward_to_peer(struct context *ctx, struct conn *c_conn, return status; } -void req_forward_all_local_racks(struct context *ctx, struct conn *c_conn, +void req_forward_all_racks_for_dc(struct context *ctx, struct conn *c_conn, struct msg *req, struct mbuf *orig_mbuf, uint8_t *key, uint32_t keylen, struct datacenter *dc) { uint8_t rack_cnt = (uint8_t)array_n(&dc->racks); uint8_t rack_index; - init_response_mgr(&req->rspmgr, req, req->is_read, rack_cnt, c_conn); + + if (req->rspmgrs_inited == false) { + if (req->consistency == DC_EACH_SAFE_QUORUM) { + init_response_mgr_all_dcs(ctx, req, c_conn, dc); + } else { + init_response_mgr(req, &req->rspmgr, rack_cnt, c_conn); + } + } log_info("%s %s same DC racks:%d expect replies %d", print_obj(c_conn), print_obj(req), rack_cnt, req->rspmgr.max_responses); @@ -732,6 +743,10 @@ static bool request_send_to_all_dcs(struct msg *req) { // There is a routing override if (req->msg_routing != ROUTING_NORMAL) return false; + // Under DC_EACH_SAFE_QUORUM, we need to send reads and writes to all + // DCs. + if (req->consistency == DC_EACH_SAFE_QUORUM) return true; + // Reads are not propagated if (req->is_read) return false; @@ -753,8 +768,11 @@ static bool request_send_to_all_local_racks(struct msg *req) { // A write should go to all racks if (!req->is_read) return true; - if ((req->consistency == DC_QUORUM) || (req->consistency == DC_SAFE_QUORUM)) + if ((req->consistency == DC_QUORUM) + || (req->consistency == DC_SAFE_QUORUM) + || (req->consistency == DC_EACH_SAFE_QUORUM)) { return true; + } return false; } @@ -794,6 +812,17 @@ static void req_forward_remote_dc(struct context *ctx, struct conn *c_conn, const uint32_t rack_cnt = array_n(&dc->racks); if (rack_cnt == 0) return; + if (req->consistency == DC_EACH_SAFE_QUORUM) { + // Under 'DC_EACH_SAFE_QUORUM', we want to hear back from at least + // quorum racks in each DC, so send it to all racks in remote DCs. + req_forward_all_racks_for_dc(ctx, c_conn, req, orig_mbuf, key, keylen, dc); + return; + } + + // If we're not expecting a consistency level of 'DC_EACH_SAFE_QUORUM', then + // we send it to only to the preselected rack in the remote DCs. If that's not + // reachable, we failover to another in the remote DC. + // Pick the preferred pre-selected rack for this DC. struct rack *rack = dc->preselected_rack_for_replication; if (rack == NULL) rack = array_get(&dc->racks, 0); @@ -846,7 +875,7 @@ static void req_forward_local_dc(struct context *ctx, struct conn *c_conn, req->rsp_handler = msg_get_rsp_handler(ctx, req); if (request_send_to_all_local_racks(req)) { // send request to all local racks - req_forward_all_local_racks(ctx, c_conn, req, orig_mbuf, key, keylen, dc); + req_forward_all_racks_for_dc(ctx, c_conn, req, orig_mbuf, key, keylen, dc); } else { // send request to only local token owner struct rack *rack = @@ -1002,7 +1031,7 @@ rstatus_t rewrite_query_if_necessary(struct msg **req, struct context *ctx) { * the new query and free up the original msg. * */ -rstatus_t rewrite_query_with_timestamp_md(struct msg **req, struct context *ctx) { +static rstatus_t rewrite_query_with_timestamp_md(struct msg **req, struct context *ctx) { if (is_read_repairs_enabled() == false) return DN_OK; @@ -1099,6 +1128,8 @@ static msg_response_handler_t msg_get_rsp_handler(struct context *ctx, struct ms // Check if its quorum if ((req->consistency == DC_QUORUM) || (req->consistency == DC_SAFE_QUORUM)) { return msg_quorum_rsp_handler; + } else if (req->consistency == DC_EACH_SAFE_QUORUM) { + return msg_each_quorum_rsp_handler; } } @@ -1149,6 +1180,104 @@ static rstatus_t msg_quorum_rsp_handler(struct context *ctx, struct msg *req, return DN_OK; } +static int find_rspmgr_idx(struct context *ctx, struct response_mgr **rspmgrs, + struct string *target_dc_name) { + int num_dcs = (int) array_n(&ctx->pool.datacenters); + + int i = 0; + for (i = 0; i < num_dcs; ++i) { + struct response_mgr *rspmgr = rspmgrs[i]; + if (string_compare(&rspmgr->dc_name, target_dc_name) == 0) { + return i; + } + } + return -1; +} + +static bool all_rspmgrs_done(struct context *ctx, struct response_mgr **rspmgrs) { + int num_dcs = (int) array_n(&ctx->pool.datacenters); + int i = 0; + for (i = 0; i < num_dcs; ++i) { + struct response_mgr *rspmgr = rspmgrs[i]; + if (!rspmgr->done) return false; + } + + return true; +} + +static struct msg *all_rspmgrs_get_response(struct context *ctx, struct msg *req) { + int num_dcs = (int) array_n(&ctx->pool.datacenters); + struct msg *rsp = NULL; + int i; + for (i = 0; i < num_dcs; ++i) { + struct response_mgr *rspmgr = req->additional_each_rspmgrs[i]; + struct msg *dc_rsp = NULL; + if (!rsp) { + rsp = rspmgr_get_response(ctx, rspmgr); + ASSERT(rsp); + } else if (rsp->is_error) { + // If any of the DCs errored out, we just clean up responses from the + // remaining DCs. + rspmgr_free_other_responses(rspmgr, NULL); + continue; + } else { + ASSERT(rsp->is_error == false); + // If the DCs we've processed so far have not seen errors, we need to + // make sure that the remaining DCs don't have errors too. + dc_rsp = rspmgr_get_response(ctx, rspmgr); + ASSERT(dc_rsp); + if (dc_rsp->is_error) { + rsp_put(rsp); + rsp = dc_rsp; + } else { + // If it's not an error, clear all responses from this DC. + rspmgr_free_other_responses(rspmgr, NULL); + continue; + } + } + + rspmgr_free_other_responses(rspmgr, rsp); + rsp->peer = req; + req->selected_rsp = rsp; + req->error_code = rsp->error_code; + req->is_error = rsp->is_error; + req->dyn_error_code = rsp->dyn_error_code; + + } + + return rsp; +} + +static rstatus_t msg_each_quorum_rsp_handler(struct context *ctx, struct msg *req, + struct msg *rsp) { + + if (all_rspmgrs_done(ctx, req->additional_each_rspmgrs)) return swallow_extra_rsp(req, rsp); + + int rspmgr_idx = -1; + struct conn *rsp_conn = rsp->owner; + if (rsp_conn == NULL) { + rspmgr_idx = 0; + } else if (rsp_conn->type == CONN_DNODE_PEER_SERVER) { + struct node *peer_instance = (struct node*) rsp_conn->owner; + struct string *peer_dc_name = &peer_instance->dc; + rspmgr_idx = find_rspmgr_idx(ctx, req->additional_each_rspmgrs, peer_dc_name); + if (rspmgr_idx == -1) { + log_error("Could not find which DC response was from"); + } + } else if (rsp_conn->type == CONN_SERVER) { + // If this is a 'CONN_SERVER' connection, then it is from the same DC. + rspmgr_idx = 0; + } + + struct response_mgr *rspmgr = req->additional_each_rspmgrs[rspmgr_idx]; + rspmgr_submit_response(rspmgr, rsp); + if (!rspmgr_check_is_done(rspmgr)) return DN_EAGAIN; + if (!all_rspmgrs_done(ctx, req->additional_each_rspmgrs)) return DN_EAGAIN; + + rsp = all_rspmgrs_get_response(ctx, req); + return DN_OK; +} + static void req_client_enqueue_omsgq(struct context *ctx, struct conn *conn, struct msg *req) { ASSERT(req->is_request); diff --git a/src/dyn_conf.c b/src/dyn_conf.c index 10b130ed1..674baa02a 100644 --- a/src/dyn_conf.c +++ b/src/dyn_conf.c @@ -2010,10 +2010,12 @@ static rstatus_t conf_validate_pool(struct conf *cf, struct conf_pool *cp) { g_read_consistency = DC_SAFE_QUORUM; else if (!dn_strcasecmp(cp->read_consistency.data, CONF_STR_DC_QUORUM)) g_read_consistency = DC_QUORUM; + else if (!dn_strcasecmp(cp->read_consistency.data, CONF_STR_DC_EACH_SAFE_QUORUM)) + g_read_consistency = DC_EACH_SAFE_QUORUM; else { log_error( "conf: directive \"read_consistency:\"must be one of 'DC_ONE' " - "'DC_QUORUM' 'DC_SAFE_QUORUM'"); + "'DC_QUORUM' 'DC_SAFE_QUORUM' 'DC_EACH_SAFE_QUORUM'"); return DN_ERROR; } @@ -2023,10 +2025,12 @@ static rstatus_t conf_validate_pool(struct conf *cf, struct conf_pool *cp) { g_write_consistency = DC_SAFE_QUORUM; else if (!dn_strcasecmp(cp->write_consistency.data, CONF_STR_DC_QUORUM)) g_write_consistency = DC_QUORUM; + else if (!dn_strcasecmp(cp->write_consistency.data, CONF_STR_DC_EACH_SAFE_QUORUM)) + g_write_consistency = DC_EACH_SAFE_QUORUM; else { log_error( "conf: directive \"write_consistency:\"must be one of 'DC_ONE' " - "'DC_QUORUM' 'DC_SAFE_QUORUM'"); + "'DC_QUORUM' 'DC_SAFE_QUORUM' 'DC_EACH_SAFE_QUORUM'"); return DN_ERROR; } diff --git a/src/dyn_conf.h b/src/dyn_conf.h index ee3941296..335427411 100644 --- a/src/dyn_conf.h +++ b/src/dyn_conf.h @@ -47,6 +47,7 @@ #define CONF_STR_DC_ONE "dc_one" #define CONF_STR_DC_QUORUM "dc_quorum" #define CONF_STR_DC_SAFE_QUORUM "dc_safe_quorum" +#define CONF_STR_DC_EACH_SAFE_QUORUM "dc_each_safe_quorum" #define UNSET_NUM 0 diff --git a/src/dyn_dnode_client.c b/src/dyn_dnode_client.c index a3a069f98..f7de2206d 100644 --- a/src/dyn_dnode_client.c +++ b/src/dyn_dnode_client.c @@ -306,7 +306,8 @@ static void dnode_req_forward(struct context *ctx, struct conn *conn, // racks struct mbuf *orig_mbuf = STAILQ_FIRST(&req->mhdr); struct datacenter *dc = server_get_dc(pool, &pool->dc); - req_forward_all_local_racks(ctx, conn, req, orig_mbuf, key, keylen, dc); + + req_forward_all_racks_for_dc(ctx, conn, req, orig_mbuf, key, keylen, dc); } } diff --git a/src/dyn_message.c b/src/dyn_message.c index ad30e9a03..af7d97fe0 100644 --- a/src/dyn_message.c +++ b/src/dyn_message.c @@ -415,6 +415,8 @@ static struct msg *_msg_get(struct conn *conn, bool request, msg->orig_msg = NULL; msg->needs_repair = false; msg->rewrite_with_ts_possible = true; + msg->additional_each_rspmgrs = NULL; + msg->rspmgrs_inited = false; return msg; } @@ -641,6 +643,23 @@ void msg_put(struct msg *msg) { msg_put(msg->orig_msg); msg->orig_msg = NULL; } + + if (msg->additional_each_rspmgrs) { + ASSERT(msg->consistency == DC_EACH_SAFE_QUORUM); + // Only requests have their connection's owner as the 'struct server_pool' object, + // and only requests would have 'additional_each_rspmgrs', so it's safe to cast to + // 'struct server_pool'. + struct server_pool *sp = msg->owner->owner; + uint8_t num_dcs = array_n(&sp->datacenters); + + int i; + // Skip the 0th index as that points back to the statically allocated 'rspmgr' struct + // in 'msg'. + for (i = 1; i < num_dcs; ++i) { + dn_free(msg->additional_each_rspmgrs[i]); + } + dn_free(msg->additional_each_rspmgrs); + } TAILQ_INSERT_HEAD(&free_msgq, msg, m_tqe); } diff --git a/src/dyn_message.h b/src/dyn_message.h index 7ca9f0ad4..220dbaa6c 100644 --- a/src/dyn_message.h +++ b/src/dyn_message.h @@ -333,6 +333,7 @@ typedef enum consistency { DC_ONE = 0, DC_QUORUM, DC_SAFE_QUORUM, + DC_EACH_SAFE_QUORUM, } consistency_t; static inline char *get_consistency_string(consistency_t cons) { @@ -343,6 +344,8 @@ static inline char *get_consistency_string(consistency_t cons) { return "DC_QUORUM"; case DC_SAFE_QUORUM: return "DC_SAFE_QUORUM"; + case DC_EACH_SAFE_QUORUM: + return "DC_EACH_SAFE_QUORUM"; } return "INVALID CONSISTENCY"; } @@ -430,7 +433,7 @@ struct msg { or remote region or cross rack */ usec_t request_send_time; /* when message was sent: either to the data store or remote region or cross rack */ - uint8_t awaiting_rsps; + uint32_t awaiting_rsps; struct msg *selected_rsp; struct rbnode tmo_rbe; /* entry in rbtree */ @@ -506,7 +509,16 @@ struct msg { msg_response_handler_t rsp_handler; consistency_t consistency; msgid_t parent_id; /* parent message id */ + + // Primary response_mgr for this instance's DC. struct response_mgr rspmgr; + + // Additional response_mgrs if we choose to use DC_EACH_SAFE_QUORUM + struct response_mgr **additional_each_rspmgrs; + + // Indicates whether the rspmgr and additional_each_rspmgrs(if applicable) + // are init-ed. + bool rspmgrs_inited; }; TAILQ_HEAD(msg_tqh, msg); @@ -588,7 +600,7 @@ void dnode_rsp_gos_syn(struct context *ctx, struct conn *p_conn, void req_forward_error(struct context *ctx, struct conn *conn, struct msg *req, err_t error_code, err_t dyn_error_code); -void req_forward_all_local_racks(struct context *ctx, struct conn *c_conn, +void req_forward_all_racks_for_dc(struct context *ctx, struct conn *c_conn, struct msg *req, struct mbuf *orig_mbuf, uint8_t *key, uint32_t keylen, struct datacenter *dc); diff --git a/src/dyn_response_mgr.c b/src/dyn_response_mgr.c index 83acf6335..f4796d857 100644 --- a/src/dyn_response_mgr.c +++ b/src/dyn_response_mgr.c @@ -5,15 +5,109 @@ #include "dyn_message.h" #include "dyn_server.h" -void init_response_mgr(struct response_mgr *rspmgr, struct msg *req, - bool is_read, uint8_t max_responses, struct conn *conn) { + +int init_response_mgr_each_quorum_helper(struct msg *req, + struct response_mgr *rspmgr, struct conn *c_conn, struct datacenter *dc); + +rstatus_t init_response_mgr_all_dcs(struct context *ctx, struct msg *req, + struct conn *c_conn, struct datacenter *local_dc) { + + ASSERT(req->consistency == DC_EACH_SAFE_QUORUM); + + + uint32_t total_responses_to_await = 0; + int num_dcs_in_quorum = array_n(&ctx->pool.datacenters); + req->additional_each_rspmgrs = + (struct response_mgr**) dn_alloc(num_dcs_in_quorum * (sizeof(struct response_mgr*))); + if (req->additional_each_rspmgrs == NULL) return DN_ENOMEM; + + // Initialize the response managers for all remote DCs. (The 0th idx in the + // 'additional_each_rspmgrs' array is reserved for the local DC). + int i; + for (i = 1; i < num_dcs_in_quorum; ++i) { + req->additional_each_rspmgrs[i] = (struct rspmgr*) dn_alloc(sizeof(struct response_mgr)); + if (req->additional_each_rspmgrs[i] == NULL) { + goto enomem; + } + + uint32_t dc_idx = 0; + struct datacenter *remote_dc = NULL; + do { + remote_dc = (struct datacenter*) array_get(&ctx->pool.datacenters, dc_idx); + ++dc_idx; + } while(string_compare(remote_dc->name, local_dc->name) == 0); // Skip the local DC. + + int max_rsps_for_dc = init_response_mgr_each_quorum_helper( + req, req->additional_each_rspmgrs[i], c_conn, remote_dc); + if (max_rsps_for_dc == -1) goto enomem; + + total_responses_to_await += max_rsps_for_dc; + } + + // Now do the same as the above for the local DC. + // Point the 0th index to the statically allocated response_mgr. + req->additional_each_rspmgrs[0] = &req->rspmgr; + + int max_rsps_for_dc = init_response_mgr_each_quorum_helper( + req, req->additional_each_rspmgrs[0], c_conn, local_dc); + if (max_rsps_for_dc == -1) goto enomem; + total_responses_to_await += max_rsps_for_dc; + + // Update the total number of responses to wait for. + req->awaiting_rsps = total_responses_to_await; + + return DN_OK; + enomem: ; + int j = 1; + while (req->additional_each_rspmgrs[j] != NULL) { + if (req->additional_each_rspmgrs[j]->dc_name.data != NULL) { + string_deinit(&req->additional_each_rspmgrs[j]->dc_name); + } + dn_free(req->additional_each_rspmgrs[j]); + ++j; + } + dn_free(req->additional_each_rspmgrs); + return DN_ENOMEM; +} + + +/* + * Helper function that initializes a 'struct response_mgr' and copies the appropriate + * DC name to it. Only used under DC_EACH_SAFE_QUORUM. + * + * Returns maximum number of responses possible for 'dc'. + * Returns -1 on an error. + */ +int init_response_mgr_each_quorum_helper(struct msg *req, + struct response_mgr *rspmgr, struct conn *c_conn, struct datacenter *dc) { + + ASSERT(req->consistency == DC_EACH_SAFE_QUORUM); + + uint8_t rack_cnt = (uint8_t) array_n(&dc->racks); + + // Initialize the response mgr. + init_response_mgr(req, rspmgr, rack_cnt, c_conn); + + // Copy the name of the DC to the response manager. + if (string_duplicate(&rspmgr->dc_name, dc->name) != DN_OK) { + return -1; + } + + return rspmgr->max_responses; +} + +void init_response_mgr(struct msg *req, struct response_mgr *rspmgr, + uint8_t max_responses_for_dc, struct conn *c_conn) { + memset(rspmgr, 0, sizeof(struct response_mgr)); - rspmgr->is_read = is_read; - rspmgr->max_responses = max_responses; - rspmgr->quorum_responses = (uint8_t)(max_responses / 2 + 1); - rspmgr->conn = conn; + rspmgr->is_read = req->is_read; + rspmgr->max_responses = max_responses_for_dc; + rspmgr->quorum_responses = (uint8_t)(max_responses_for_dc / 2 + 1); + rspmgr->conn = c_conn; rspmgr->msg = req; - req->awaiting_rsps = max_responses; + req->awaiting_rsps = max_responses_for_dc; + + req->rspmgrs_inited = true; } static bool rspmgr_is_quorum_achieved(struct response_mgr *rspmgr) { diff --git a/src/dyn_response_mgr.h b/src/dyn_response_mgr.h index f73e6878a..d66cea931 100644 --- a/src/dyn_response_mgr.h +++ b/src/dyn_response_mgr.h @@ -1,6 +1,8 @@ #ifndef _DYN_RESPONSE_MGR_H_ #define _DYN_RESPONSE_MGR_H_ +#include "dyn_string.h" + #define MAX_REPLICAS_PER_DC 3 struct response_mgr { bool is_read; @@ -9,18 +11,29 @@ struct response_mgr { here. But we have only 3 ASGs */ struct msg *responses[MAX_REPLICAS_PER_DC]; uint32_t checksums[MAX_REPLICAS_PER_DC]; - uint8_t - good_responses; // non-error responses received. (nil) is not an error - uint8_t max_responses; // max responses expected. - uint8_t quorum_responses; // responses expected to form a quorum - uint8_t error_responses; // error responses received - struct msg *err_rsp; // first error response + // Number of non-error responses received. (nil) is not an error. + uint8_t good_responses; + // Maximum number of responses possible. + uint8_t max_responses; + // Number of responses required to form a quorum. + uint8_t quorum_responses; + // Number of error responses received. + uint8_t error_responses; + // First error response + struct msg *err_rsp; struct conn *conn; - struct msg *msg; // corresponding request + // Corresponding request + struct msg *msg; + // The DC that this response manager is responsible for. + struct string dc_name; }; -void init_response_mgr(struct response_mgr *rspmgr, struct msg *, bool is_read, - uint8_t max_responses, struct conn *conn); +rstatus_t init_response_mgr_all_dcs(struct context *ctx, struct msg *req, + struct conn *c_conn, struct datacenter *local_dc); + +void init_response_mgr(struct msg *req, struct response_mgr *rspmgr, + uint8_t max_responses_for_dc, struct conn *c_conn); + // DN_OK if response was accepted rstatus_t rspmgr_submit_response(struct response_mgr *rspmgr, struct msg *rsp); bool rspmgr_check_is_done(struct response_mgr *rspmgr);