From 41dc9c4cfa04119cce4f30862b5a13d9e6a91835 Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Wed, 16 Oct 2019 09:34:38 -0700 Subject: [PATCH 1/5] Make the number_generator() return an int always --- test/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/utils.py b/test/utils.py index 32611c1d1..3dc7b1b7c 100755 --- a/test/utils.py +++ b/test/utils.py @@ -38,7 +38,7 @@ def string_generator(size=6, chars=string.ascii_letters + string.digits): return ''.join(random.choice(chars) for _ in range(size)) def number_generator(size=4, chars=string.digits): - return ''.join(random.choice(chars) for _ in range(size)) + return int(''.join(random.choice(chars) for _ in range(size))) def dict_request(request, key1, key2): """Converts the request into an easy to consume dict format.""" From 84875e9e94325044ee6ee2bdab7675a15c90c6eb Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Thu, 17 Oct 2019 07:44:17 -0700 Subject: [PATCH 2/5] Capture pthread_create() return value with appropriate data type Even though rstatus_t is an int typedef, it's not appropriate to capture integers that do not denote 'rstatus_t' values into an rstatus_t variable, as it decreases readability of code. --- src/dyn_gossip.c | 8 ++++---- src/dyn_stats.c | 9 ++++----- src/entropy/dyn_entropy_util.c | 9 ++++----- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/dyn_gossip.c b/src/dyn_gossip.c index 1e324c89d..1bbd7bd69 100644 --- a/src/dyn_gossip.c +++ b/src/dyn_gossip.c @@ -840,12 +840,12 @@ static void *gossip_loop(void *arg) { } rstatus_t gossip_start(struct server_pool *sp) { - rstatus_t status; pthread_t tid; - status = pthread_create(&tid, NULL, gossip_loop, sp); - if (status < 0) { - log_error("gossip service create failed: %s", strerror(status)); + int pthread_status; + pthread_status = pthread_create(&tid, NULL, gossip_loop, sp); + if (pthread_status < 0) { + log_error("gossip service create failed: %s", strerror(pthread_status)); return DN_ERROR; } diff --git a/src/dyn_stats.c b/src/dyn_stats.c index f8337f20b..a749d60ea 100644 --- a/src/dyn_stats.c +++ b/src/dyn_stats.c @@ -1346,17 +1346,16 @@ static rstatus_t stats_listen(struct stats *st) { } static rstatus_t stats_start_aggregator(struct stats *st) { - rstatus_t status; - if (!stats_enabled) { return DN_OK; } THROW_STATUS(stats_listen(st)); - status = pthread_create(&st->tid, NULL, stats_loop, st); - if (status < 0) { - log_error("stats aggregator create failed: %s", strerror(status)); + int pthread_status; + pthread_status = pthread_create(&st->tid, NULL, stats_loop, st); + if (pthread_status < 0) { + log_error("stats aggregator create failed: %s", strerror(pthread_status)); return DN_ERROR; } diff --git a/src/entropy/dyn_entropy_util.c b/src/entropy/dyn_entropy_util.c index ea5879bb0..82f4a3f1d 100644 --- a/src/entropy/dyn_entropy_util.c +++ b/src/entropy/dyn_entropy_util.c @@ -627,14 +627,13 @@ void *entropy_loop(void *arg) { */ rstatus_t entropy_conn_start(struct entropy *cn) { - rstatus_t status; - THROW_STATUS(entropy_listen(cn)); - status = pthread_create(&cn->tid, NULL, entropy_loop, cn); - if (status < 0) { + int pthread_status; + pthread_status = pthread_create(&cn->tid, NULL, entropy_loop, cn); + if (pthread_status < 0) { log_error("reconciliation thread for socket create failed: %s", - strerror(status)); + strerror(pthread_status)); return DN_ERROR; } From 89cd8f5bee870734130fef4b54eae674c7d8a6f4 Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Thu, 17 Oct 2019 07:55:28 -0700 Subject: [PATCH 3/5] Disable initializing anti-entropy as it's not used The initialization always fails in any case, so let's stop doing it and mark the anti-entropy part of the code as deprecated. --- src/dyn_core.c | 8 +++----- src/entropy/dyn_entropy_util.c | 3 +++ 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/dyn_core.c b/src/dyn_core.c index 9fc874882..bbfe95115 100644 --- a/src/dyn_core.c +++ b/src/dyn_core.c @@ -132,6 +132,9 @@ static rstatus_t core_event_base_create(struct context *ctx) { } /** + * + * NOTE: DEPRECATED and not currently used in the codebase. + * * Initialize anti-entropy. * @param[in,out] ctx Context. * @return rstatus_t Return status code. @@ -269,11 +272,6 @@ rstatus_t core_start(struct instance *nci) { goto error; } - status = core_entropy_init(ctx); - if (status != DN_OK) { - goto error; - } - status = core_event_base_create(ctx); if (status != DN_OK) { goto error; diff --git a/src/entropy/dyn_entropy_util.c b/src/entropy/dyn_entropy_util.c index 82f4a3f1d..755135731 100644 --- a/src/entropy/dyn_entropy_util.c +++ b/src/entropy/dyn_entropy_util.c @@ -405,6 +405,9 @@ rstatus_t entropy_key_iv_load(struct context *ctx) { } /* + * + * NOTE: DEPRECATED and not currently used in the codebase. + * * Function: entropy_snd_init * -------------------- * Initiates the data for the connection towards another cluster for From 2bacb2ba218b57e4c46cf3e126f9dd7215d05f71 Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Sun, 20 Oct 2019 15:31:18 -0700 Subject: [PATCH 4/5] [Unsupported] Allow setting per-connection consistency through fake Redis cmd Ideally we shouldn't be doing this and should have a connection handshake stage to set connection level configuration. However, this is quite urgent for a specific use-case at the moment and will be changed in the future. Production code relying on this is not a good idea. In this we introduce a new fake Redis command that's parsed by the Redis parser called: 'DYNO_CONFIG:CONN_CONSISTENCY ' This will change the consistency level only for that connection. A new Dynomtie parsing result called MSG_PARSE_DYNO_CONFIG signifies a connection level configuration was received. We simulate a response to the client by replying with "+OK\r\n". Testing: Made sure that a key with a quorum failure is accessible after calling 'DYNO_CONFIG:CONN_CONSISTENCY DC_ONE' in the same client session. --- src/dyn_client.c | 6 +++ src/dyn_connection.c | 6 +-- src/dyn_message.c | 102 +++++++++++++++++++++++++++++++++++++++-- src/dyn_message.h | 38 +++++++++++++-- src/dyn_response_mgr.c | 2 +- src/dyn_server.c | 5 +- src/proto/dyn_redis.c | 31 +++++++++++++ 7 files changed, 173 insertions(+), 17 deletions(-) diff --git a/src/dyn_client.c b/src/dyn_client.c index 726abfcf0..5332287db 100644 --- a/src/dyn_client.c +++ b/src/dyn_client.c @@ -343,6 +343,7 @@ struct msg *req_recv_next(struct context *ctx, struct conn *conn, bool alloc) { if (is_read_repairs_enabled()) { req->timestamp = current_timestamp_in_millis(); } + return req; } @@ -370,6 +371,11 @@ static bool req_filter(struct context *ctx, struct conn *conn, return true; } + // If this is a Dynomite configuration message, don't forward it. + if (is_msg_type_dyno_config(req->type)) { + return true; + } + return false; } diff --git a/src/dyn_connection.c b/src/dyn_connection.c index 2639f3e09..9f41b4511 100644 --- a/src/dyn_connection.c +++ b/src/dyn_connection.c @@ -127,8 +127,7 @@ inline void conn_set_read_consistency(struct conn *conn, consistency_t cons) { } inline consistency_t conn_get_read_consistency(struct conn *conn) { - // return conn->read_consistency; - return g_read_consistency; + return conn->read_consistency; } inline void conn_set_write_consistency(struct conn *conn, consistency_t cons) { @@ -136,8 +135,7 @@ inline void conn_set_write_consistency(struct conn *conn, consistency_t cons) { } inline consistency_t conn_get_write_consistency(struct conn *conn) { - // return conn->write_consistency; - return g_write_consistency; + return conn->write_consistency; } rstatus_t conn_event_del_conn(struct conn *conn) { diff --git a/src/dyn_message.c b/src/dyn_message.c index af7d97fe0..141413f93 100644 --- a/src/dyn_message.c +++ b/src/dyn_message.c @@ -915,6 +915,87 @@ static rstatus_t msg_repair(struct context *ctx, struct conn *conn, return DN_OK; } +/* + * Crafts a success response message for the respective datastore. + * + * TODO: This currently does only Redis. The Redis specific code should + * be moved out of this file. + * + * Returns a 'msg' with the expected success response. + */ +static struct msg *simulate_ok_rsp(struct context *ctx, struct conn *conn, + struct msg *req) { + + ASSERT(req->is_request); + + rstatus_t ret_status = DN_OK; + const char *QUIT_FMT_STRING = "+OK\r\n"; + + struct msg *rsp = msg_get(conn, false, __FUNCTION__); + if (rsp == NULL) { + conn->err = errno; + return NULL; + } + + rstatus_t append_status = msg_append(rsp, QUIT_FMT_STRING, strlen(QUIT_FMT_STRING)); + if (append_status != DN_OK) { + rsp_put(rsp); + return NULL; + } + + rsp->peer = req; + rsp->is_request = 0; + + req->done = 1; + + return rsp; +} + + +/* + * If the command sent to Dynomite was a special Dynomite configuration + * command, we process and apply the configuration here. + * + * Returns: DN_OK on successful application, DN_ERROR otherwise. + */ +static rstatus_t msg_apply_config(struct context *ctx, struct conn *conn, + struct msg *msg) { + + // We only support one type of configuration now. + // TODO: If we support more, convert this to a switch case. + ASSERT(msg->type == MSG_HACK_SETTING_CONN_CONSISTENCY); + + struct argpos *consistency_string = (struct argpos*) array_get(msg->args, 0); + + // We must have a consistency string, else we wouldn't have reached here. + ASSERT(consistency_string != NULL); + + consistency_t cons = get_consistency_enum_from_string(consistency_string->start); + if (cons == -1) return DN_ERROR; + + conn_set_read_consistency(conn, cons); + conn_set_write_consistency(conn, cons); + + // Set the consistency to DC_ONE, since this is just a configuration setting. + msg->consistency = DC_ONE; + + // Create an OK response. + struct msg *ok_rsp = simulate_ok_rsp(ctx, conn, msg); + + // Add it to the outstanding messages dictionary, so that 'conn_handle_response' + // can process it appropriately. + dictAdd(conn->outstanding_msgs_dict, &msg->id, msg); + + // Enqueue the message in the outbound queue so that the code on the response + // path can find it. + conn_enqueue_outq(ctx, conn, msg); + + THROW_STATUS(conn_handle_response(ctx, conn, + msg->parent_id ? msg->parent_id : msg->id, ok_rsp)); + + return DN_OK; +} + static rstatus_t msg_parse(struct context *ctx, struct conn *conn, struct msg *msg) { rstatus_t status; @@ -929,19 +1010,24 @@ static rstatus_t msg_parse(struct context *ctx, struct conn *conn, switch (msg->result) { case MSG_PARSE_OK: - // log_debug(LOG_VVERB, "MSG_PARSE_OK"); status = msg_parsed(ctx, conn, msg); break; - case MSG_PARSE_REPAIR: - // log_debug(LOG_VVERB, "MSG_PARSE_REPAIR"); status = msg_repair(ctx, conn, msg); break; - case MSG_PARSE_AGAIN: - // log_debug(LOG_VVERB, "MSG_PARSE_AGAIN"); status = DN_OK; break; + case MSG_PARSE_DYNO_CONFIG: + status = msg_apply_config(ctx, conn, msg); + + // No more data to parse. + conn_recv_done(ctx, conn, msg, NULL); + break; + + case MSG_PARSE_NOOP: + status = DN_NOOPS; + break; default: /* @@ -1611,3 +1697,9 @@ rstatus_t msg_append_format(struct msg *msg, const char *fmt, int num_args, ...) return DN_OK; } + +bool is_msg_type_dyno_config(msg_type_t msg_type) { + // TODO: Convert to a switch case if we support more. + if (msg_type == MSG_HACK_SETTING_CONN_CONSISTENCY) return true; + return false; +} diff --git a/src/dyn_message.h b/src/dyn_message.h index 220dbaa6c..bf09826ff 100644 --- a/src/dyn_message.h +++ b/src/dyn_message.h @@ -218,6 +218,7 @@ ACTION(RSP_REDIS_ERROR_EXECABORT) \ ACTION(RSP_REDIS_ERROR_MASTERDOWN) \ ACTION(RSP_REDIS_ERROR_NOREPLICAS) \ + ACTION(HACK_SETTING_CONN_CONSISTENCY) \ ACTION(SENTINEL) \ ACTION(END_IDX) \ /* ACTION( REQ_REDIS_AUTH) */ \ @@ -260,11 +261,21 @@ extern func_msg_repair_t g_make_repair_query; /* Create a repair msg. */ void set_datastore_ops(void); typedef enum msg_parse_result { - MSG_PARSE_OK, /* parsing ok */ - MSG_PARSE_ERROR, /* parsing error */ - MSG_PARSE_REPAIR, /* more to parse -> repair parsed & unparsed data */ - MSG_PARSE_FRAGMENT, /* multi-vector request -> fragment */ - MSG_PARSE_AGAIN, /* incomplete -> parse again */ + // Parsing OK + MSG_PARSE_OK, + // Parsing error + MSG_PARSE_ERROR, + // More to parse -> Repair parsed & unparsed data + MSG_PARSE_REPAIR, + // Multi-vector request -> fragment + MSG_PARSE_FRAGMENT, + // Incomplete, parse again. + MSG_PARSE_AGAIN, + // Parsing done, but do nothing after + MSG_PARSE_NOOP, + // Parsing done, command was a dynomite configuration + MSG_PARSE_DYNO_CONFIG, + // OOM error during parsing (TODO: consider removing) MSG_OOM_ERROR } msg_parse_result_t; @@ -350,6 +361,19 @@ static inline char *get_consistency_string(consistency_t cons) { return "INVALID CONSISTENCY"; } +static inline consistency_t get_consistency_enum_from_string(char *cons) { + if (dn_strcasecmp(cons, "DC_ONE") == 0) { + return DC_ONE; + } else if (dn_strcasecmp(cons, "DC_QUORUM") == 0) { + return DC_QUORUM; + } else if (dn_strcasecmp(cons, "DC_SAFE_QUORUM") == 0) { + return DC_SAFE_QUORUM; + } else if (dn_strcasecmp(cons, "DC_EACH_SAFE_QUORUM") == 0) { + return DC_EACH_SAFE_QUORUM; + } + return -1; +} + #define DEFAULT_READ_CONSISTENCY DC_ONE #define DEFAULT_WRITE_CONSISTENCY DC_ONE extern consistency_t g_write_consistency; @@ -616,4 +640,8 @@ rstatus_t dnode_peer_req_forward(struct context *ctx, struct conn *c_conn, // string *data); void dnode_peer_gossip_forward(struct context *ctx, struct conn *conn, struct mbuf *data); + +// Returns 'true' if 'msg_type' is a Dynomite configuration command. +bool is_msg_type_dyno_config(msg_type_t msg_type); + #endif diff --git a/src/dyn_response_mgr.c b/src/dyn_response_mgr.c index f4796d857..9a9394423 100644 --- a/src/dyn_response_mgr.c +++ b/src/dyn_response_mgr.c @@ -182,7 +182,7 @@ static void rspmgr_incr_non_quorum_responses_stats( */ bool perform_repairs_if_necessary(struct context *ctx, struct response_mgr *rspmgr) { - struct msg* repair_msg; + struct msg* repair_msg = NULL; rstatus_t repair_create_status = g_make_repair_query(ctx, rspmgr, &repair_msg); if (repair_create_status == DN_OK && repair_msg != NULL) { diff --git a/src/dyn_server.c b/src/dyn_server.c index 26a1b2869..2d54c692f 100644 --- a/src/dyn_server.c +++ b/src/dyn_server.c @@ -886,10 +886,11 @@ void req_send_done(struct context *ctx, struct conn *conn, struct msg *req) { * enqueue message (request) in server outq, if response is expected. * Otherwise, free the request */ - if (req->expect_datastore_reply || (conn->type == CONN_SERVER)) + if (req->expect_datastore_reply || (conn->type == CONN_SERVER)) { conn_enqueue_outq(ctx, conn, req); - else + } else { req_put(req); + } } static void req_server_enqueue_imsgq(struct context *ctx, struct conn *conn, diff --git a/src/proto/dyn_redis.c b/src/proto/dyn_redis.c index 1696a898e..14511b82a 100644 --- a/src/proto/dyn_redis.c +++ b/src/proto/dyn_redis.c @@ -1562,6 +1562,18 @@ void redis_parse_req(struct msg *r, struct context *ctx) { } break; + + case 28: + // Note: This is not a Redis command, but a dynomite configuration + // command. + if (dn_strcasecmp(m, "dyno_config:conn_consistency") == 0) { + r->type = MSG_HACK_SETTING_CONN_CONSISTENCY; + r->is_read = 0; + break; + } + + break; + default: r->is_read = 1; break; @@ -1590,6 +1602,12 @@ void redis_parse_req(struct msg *r, struct context *ctx) { "remainaing part of command", p - m, m); break; } + + if (is_msg_type_dyno_config(r->type)) { + // If this is a Dynomite config message, we parse it a bit differently. + state = SW_ARG1_LEN; + break; + } switch (ch) { case LF: if (redis_argz(r) && (r->rntokens == 0)) { @@ -1812,8 +1830,21 @@ void redis_parse_req(struct msg *r, struct context *ctx) { log_error("Redis CONFIG command not supported '%.*s'", p - m, m); goto error; } + m = p + r->rlen; + if (is_msg_type_dyno_config(r->type)) { + rstatus_t argstatus = record_arg(p, m, r->args); + if (argstatus == DN_ERROR) { + goto error; + } else if (argstatus == DN_ENOMEM) { + goto enomem; + } + + r->result = MSG_PARSE_DYNO_CONFIG; + return; + } + if (read_repairs_enabled) { bool arg1_across_mbufs = false; while (m >= b->last) { From 24f32547490981eaabfb0d10e40ae1b0fe494314 Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Sun, 20 Oct 2019 16:09:47 -0700 Subject: [PATCH 5/5] Support the Redis "QUIT" command properly The QUIT command so far was handled by just closing the connection on the server side and did not respect the contract of the QUIT command. The Redis documentation states that the QUIT command must return "+OK\r\n" back to the client so that the client can close the connection on its side. This patch takes advantage of simulating a datastore response from the previous patch to achieve this. --- src/dyn_client.c | 6 +++++- src/dyn_message.c | 34 ++++++++++++++++++++-------------- src/dyn_message.h | 10 ++++++++++ 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/src/dyn_client.c b/src/dyn_client.c index 5332287db..d8e6a590e 100644 --- a/src/dyn_client.c +++ b/src/dyn_client.c @@ -365,9 +365,13 @@ static bool req_filter(struct context *ctx, struct conn *conn, if (req->quit) { ASSERT(conn->rmsg == NULL); log_debug(LOG_VERB, "%s filter quit %s", print_obj(conn), print_obj(req)); + + // The client expects to receive an "+OK\r\n" response, so make sure + // to do that. + IGNORE_RET_VAL(simulate_ok_rsp(ctx, conn, req)); + conn->eof = 1; conn->recv_ready = 0; - req_put(req); return true; } diff --git a/src/dyn_message.c b/src/dyn_message.c index 141413f93..2eb5e17af 100644 --- a/src/dyn_message.c +++ b/src/dyn_message.c @@ -923,7 +923,7 @@ static rstatus_t msg_repair(struct context *ctx, struct conn *conn, * * Returns a 'msg' with the expected success response. */ -static struct msg *simulate_ok_rsp(struct context *ctx, struct conn *conn, +static struct msg *craft_ok_rsp(struct context *ctx, struct conn *conn, struct msg *req) { ASSERT(req->is_request); @@ -951,6 +951,24 @@ static struct msg *simulate_ok_rsp(struct context *ctx, struct conn *conn, return rsp; } +rstatus_t simulate_ok_rsp(struct context *ctx, struct conn *conn, + struct msg *msg) { + // Create an OK response. + struct msg *ok_rsp = craft_ok_rsp(ctx, conn, msg); + + // Add it to the outstanding messages dictionary, so that 'conn_handle_response' + // can process it appropriately. + dictAdd(conn->outstanding_msgs_dict, &msg->id, msg); + + // Enqueue the message in the outbound queue so that the code on the response + // path can find it. + conn_enqueue_outq(ctx, conn, msg); + + THROW_STATUS(conn_handle_response(ctx, conn, + msg->parent_id ? msg->parent_id : msg->id, ok_rsp)); + + return DN_OK; +} /* * If the command sent to Dynomite was a special Dynomite configuration @@ -979,19 +997,7 @@ static rstatus_t msg_apply_config(struct context *ctx, struct conn *conn, // Set the consistency to DC_ONE, since this is just a configuration setting. msg->consistency = DC_ONE; - // Create an OK response. - struct msg *ok_rsp = simulate_ok_rsp(ctx, conn, msg); - - // Add it to the outstanding messages dictionary, so that 'conn_handle_response' - // can process it appropriately. - dictAdd(conn->outstanding_msgs_dict, &msg->id, msg); - - // Enqueue the message in the outbound queue so that the code on the response - // path can find it. - conn_enqueue_outq(ctx, conn, msg); - - THROW_STATUS(conn_handle_response(ctx, conn, - msg->parent_id ? msg->parent_id : msg->id, ok_rsp)); + THROW_STATUS(simulate_ok_rsp(ctx, conn, msg)); return DN_OK; } diff --git a/src/dyn_message.h b/src/dyn_message.h index bf09826ff..c6e240a29 100644 --- a/src/dyn_message.h +++ b/src/dyn_message.h @@ -641,6 +641,16 @@ rstatus_t dnode_peer_req_forward(struct context *ctx, struct conn *c_conn, void dnode_peer_gossip_forward(struct context *ctx, struct conn *conn, struct mbuf *data); +/* + * Simulates a successful response as though the datastore sent it. + * Also, does the necessary to make sure that the response path is + * able to send this response back to the client. + * + * Returns DN_OK on success and an appropriate error otherwise. + */ +rstatus_t simulate_ok_rsp(struct context *ctx, struct conn *conn, + struct msg *msg); + // Returns 'true' if 'msg_type' is a Dynomite configuration command. bool is_msg_type_dyno_config(msg_type_t msg_type);