From cb5ce60fd2b70db3d33f9a1a620f83427847469f Mon Sep 17 00:00:00 2001 From: yeoncheol-kim Date: Tue, 26 Mar 2024 11:08:49 +0900 Subject: [PATCH] CLEANUP: Refactor methods of setting ritems --- mc_util.h | 1 + memcached.c | 430 +++++++++++++++++++++++++++++++--------------------- memcached.h | 25 +-- 3 files changed, 269 insertions(+), 187 deletions(-) diff --git a/mc_util.h b/mc_util.h index 2c09b1c54..cd745607c 100644 --- a/mc_util.h +++ b/mc_util.h @@ -68,6 +68,7 @@ typedef struct _mblck_pool { #define MBLCK_GET_BODYLEN(l) ((l)->body_len) #define MBLCK_GET_ITEMCNT(l) ((l)->item_cnt) #define MBLCK_GET_ITEMLEN(l) ((l)->item_len) +#define MBLCK_GET_TOTALSZ(l) ((l)->item_cnt * (l)->item_len) #define MBLCK_GET_NEXTBLK(b) ((b)->next) #define MBLCK_GET_BODYPTR(b) ((b)->data) diff --git a/memcached.c b/memcached.c index c7b65f4d7..314084431 100644 --- a/memcached.c +++ b/memcached.c @@ -528,6 +528,17 @@ static bool conn_reset_buffersize(conn *c) } } + if (c->rlsize != RITEM_LIST_INITIAL) { + void *ptr = malloc(sizeof(struct iovec) * RITEM_LIST_INITIAL); + if (ptr != NULL) { + free(c->rlist); + c->rlist = ptr; + c->rlsize = RITEM_LIST_INITIAL; + } else { + ret = false; + } + } + return ret; } @@ -555,6 +566,7 @@ static int conn_constructor(void *buffer, void *unused1, int unused2) free(c->suffixlist); free(c->iov); free(c->msglist); + free(c->rlist); mc_logger->log(EXTENSION_LOG_WARNING, NULL, "Failed to allocate buffers for connection\n"); return 1; @@ -583,6 +595,7 @@ static void conn_destructor(void *buffer, void *unused) free(c->suffixlist); free(c->iov); free(c->msglist); + free(c->rlist); LOCK_STATS(); mc_stats.conn_structs--; @@ -655,11 +668,10 @@ conn *conn_new(const int sfd, STATE_FUNC init_state, c->rbytes = c->wbytes = 0; c->wcurr = c->wbuf; c->rcurr = c->rbuf; - c->rtype = CONN_RTYPE_NONE; - c->rindex = 0; /* used when rtype is HINFO or EINFO */ + c->rlcurr = 0; + c->rlused = 0; c->ritem = 0; c->rlbytes = 0; - c->rltotal = 0; /* used when read with multiple mem blocks */ #ifdef SCAN_COMMAND c->pcurr = c->ilist; #endif @@ -1010,85 +1022,123 @@ static void conn_shrink(conn *c) c->iov = newbuf; c->iovsize = IOV_LIST_INITIAL; } + /* TODO check error condition? */ + } + + if (c->rlsize > RITEM_LIST_HIGHWAT) { + struct iovec *newbuf = (struct iovec *) realloc((void *)c->rlist, RITEM_LIST_INITIAL * sizeof(c->rlist[0])); + if (newbuf) { + c->rlist = newbuf; + c->rlsize = RITEM_LIST_INITIAL; + } /* TODO check return value */ } } -static void ritem_set_first(conn *c, int rtype, int vleng) +static int add_ritem(conn *c, const void *buf, int len) { - c->rtype = rtype; - - if (c->rtype == CONN_RTYPE_MBLCK) { - c->membk = MBLCK_GET_HEADBLK(&c->memblist); - c->ritem = MBLCK_GET_BODYPTR(c->membk); - c->rlbytes = vleng < MBLCK_GET_BODYLEN(&c->memblist) - ? vleng : MBLCK_GET_BODYLEN(&c->memblist); - c->rltotal = vleng; + if (c->rlused >= c->rlsize) { + struct iovec *new_rlist = (struct iovec *)realloc(c->rlist, + (c->rlsize * 2) * sizeof(struct iovec)); + if (! new_rlist) + return -1; + c->rlist = new_rlist; + c->rlsize *= 2; } - else if (c->rtype == CONN_RTYPE_HINFO) { - if (c->hinfo.naddnl == 0) { - c->ritem = (char*)c->hinfo.value; - c->rlbytes = vleng; - c->rltotal = 0; - } else { - if (c->hinfo.nvalue > 0) { - c->ritem = (char*)c->hinfo.value; - c->rlbytes = vleng < c->hinfo.nvalue - ? vleng : c->hinfo.nvalue; - c->rindex = 0; - } else { - c->ritem = c->hinfo.addnl[0]->ptr; - c->rlbytes = vleng < c->hinfo.addnl[0]->len - ? vleng : c->hinfo.addnl[0]->len; - c->rindex = 1; - } - c->rltotal = vleng; - } + if (len > 0) { + c->rlist[c->rlused].iov_base = (void *)buf; + c->rlist[c->rlused].iov_len = len; + c->rlused++; } - else if (c->rtype == CONN_RTYPE_EINFO) { - if (c->einfo.naddnl == 0) { - c->ritem = (char*)c->einfo.value; - c->rlbytes = vleng; - c->rltotal = 0; - } else { - if (c->einfo.nvalue > 0) { - c->ritem = (char*)c->einfo.value; - c->rlbytes = vleng < c->einfo.nvalue - ? vleng : c->einfo.nvalue; - c->rindex = 0; - } else { - c->ritem = c->einfo.addnl[0]->ptr; - c->rlbytes = vleng < c->einfo.addnl[0]->len - ? vleng : c->einfo.addnl[0]->len; - c->rindex = 1; - } - c->rltotal = vleng; + return 0; +} + +static int add_ritem_mblck(conn *c, mblck_list_t *memblist) +{ + uint32_t rltotal = MBLCK_GET_TOTALSZ(memblist); + uint32_t rlbytes; + + mblck_node_t *membk = MBLCK_GET_HEADBLK(memblist); + c->ritem = MBLCK_GET_BODYPTR(membk); + c->rlbytes = rltotal < MBLCK_GET_BODYLEN(memblist) + ? rltotal : MBLCK_GET_BODYLEN(memblist); + rltotal -= c->rlbytes; + while ((membk = MBLCK_GET_NEXTBLK(membk)) != NULL) { + rlbytes = rltotal < MBLCK_GET_BODYLEN(memblist) + ? rltotal : MBLCK_GET_BODYLEN(memblist); + assert(rlbytes > 0); + if (add_ritem(c, MBLCK_GET_BODYPTR(membk), rlbytes) != 0) { + return -1; } + rltotal -= rlbytes; } + return 0; } -static void ritem_set_next(conn *c) +static int add_ritem_hinfo_ascii(conn *c, item_info *hinfo) { - assert(c->rltotal > 0); + assert(c->protocol == ascii_prot); + c->ritem = (void*)hinfo->value; + c->rlbytes = hinfo->nvalue; + for (int i = 0; i < hinfo->naddnl; i++) { + if (add_ritem(c, hinfo->addnl[i]->ptr, hinfo->addnl[i]->len) != 0) + return -1; + } + return 0; +} + +static int add_ritem_hinfo_bin(conn *c, item_info *hinfo) +{ + assert(c->protocol == binary_prot); + uint32_t rltotal = hinfo->nbytes - 2; + uint32_t rlbytes; + c->ritem = (void*)hinfo->value; + c->rlbytes = rltotal < hinfo->nvalue + ? rltotal : hinfo->nvalue; + rltotal -= c->rlbytes; + for (int i = 0; i < hinfo->naddnl; i++) { + rlbytes = rltotal < hinfo->addnl[i]->len + ? rltotal : hinfo->addnl[i]->len; + if (add_ritem(c, hinfo->addnl[i]->ptr, rlbytes) != 0) + return -1; + rltotal -= rlbytes; + } + return 0; +} - if (c->rtype == CONN_RTYPE_MBLCK) { - c->membk = MBLCK_GET_NEXTBLK(c->membk); - c->ritem = MBLCK_GET_BODYPTR(c->membk); - c->rlbytes = c->rltotal < MBLCK_GET_BODYLEN(&c->memblist) - ? c->rltotal : MBLCK_GET_BODYLEN(&c->memblist); +static int add_ritem_einfo_ascii(conn *c, eitem_info *einfo) +{ + assert(c->protocol == ascii_prot); + c->ritem = (void*)einfo->value; + c->rlbytes = einfo->nvalue; + for (int i = 0; i < einfo->naddnl; i++) { + if (add_ritem(c, einfo->addnl[i]->ptr, einfo->addnl[i]->len) != 0) + return -1; } - else if (c->rtype == CONN_RTYPE_HINFO) { - c->ritem = c->hinfo.addnl[c->rindex]->ptr; - c->rlbytes = c->rltotal < c->hinfo.addnl[c->rindex]->len - ? c->rltotal : c->hinfo.addnl[c->rindex]->len; - c->rindex += 1; + return 0; +} + +static int add_ritem_einfo_bin(conn *c, eitem_info *einfo) +{ + assert(c->protocol == binary_prot); + uint32_t rltotal = einfo->nbytes - 2; + uint32_t rlbytes; + c->ritem = (void*)einfo->value; + c->rlbytes = rltotal < einfo->nvalue + ? rltotal : einfo->nvalue; + rltotal -= c->rlbytes; + for (int i = 0; i < einfo->naddnl; i++) { + rlbytes = rltotal < einfo->addnl[i]->len + ? rltotal : einfo->addnl[i]->len; + if (add_ritem(c, einfo->addnl[i]->ptr, rlbytes) != 0) + return -1; + rltotal -= rlbytes; } - else if (c->rtype == CONN_RTYPE_EINFO) { - c->ritem = c->einfo.addnl[c->rindex]->ptr; - c->rlbytes = c->rltotal < c->einfo.addnl[c->rindex]->len - ? c->rltotal : c->einfo.addnl[c->rindex]->len; - c->rindex += 1; + if (c->rlcurr < c->rlused) { + c->ritem = c->rlist[c->rlcurr].iov_base; + c->rlbytes = c->rlist[c->rlcurr].iov_len; } + return 0; } /** @@ -3015,7 +3065,7 @@ process_get_single(conn *c, char *key, size_t nkey, bool return_cas) } if (it) { - if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { + if (mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo) != true) { mc_engine.v1->release(mc_engine.v0, c, it); return ENGINE_ENOMEM; } @@ -3219,7 +3269,7 @@ static void complete_update_ascii(conn *c) item *it = c->item; ENGINE_ERROR_CODE ret; - if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { + if (mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo) != true) { mc_logger->log(EXTENSION_LOG_WARNING, c, "%d: Failed to get item info\n", c->sfd); out_string(c, "SERVER_ERROR out of memory for getting item info"); @@ -3709,7 +3759,7 @@ static void complete_update_bin(conn *c) item *it = c->item; ENGINE_ERROR_CODE ret; - if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { + if (mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo) != true) { mc_logger->log(EXTENSION_LOG_WARNING, c, "%d: Failed to get item info\n", c->sfd); write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0); @@ -3827,7 +3877,7 @@ static void process_bin_get(conn *c) switch (ret) { case ENGINE_SUCCESS: - if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { + if (mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo) != true) { mc_engine.v1->release(mc_engine.v0, c, it); mc_logger->log(EXTENSION_LOG_WARNING, c, "%d: Failed to get item info\n", c->sfd); @@ -4127,7 +4177,6 @@ static void bin_read_chunk(conn *c, enum bin_substates next_substate, uint32_t c /* preserve the header in the buffer.. */ c->ritem = c->rcurr + sizeof(protocol_binary_request_header); c->rlbytes = chunk; - c->rltotal = 0; conn_set_state(c, conn_nread); } @@ -4244,7 +4293,6 @@ static void process_bin_sasl_auth(conn *c) c->item = data; c->ritem = data->data + nkey; c->rlbytes = vlen; - c->rltotal = 0; conn_set_state(c, conn_nread); c->substate = bin_reading_sasl_auth_data; } @@ -4476,7 +4524,12 @@ static void process_bin_lop_prepare_nread(conn *c) } if (ret == ENGINE_SUCCESS) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_LIST, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); + if (add_ritem_einfo_bin(c, &c->einfo) != 0) { + mc_engine.v1->list_elem_free(mc_engine.v0, c, elem); + ret = ENGINE_ENOMEM; + } + } + if (ret == ENGINE_SUCCESS) { c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = OPERATION_LOP_INSERT; @@ -4864,12 +4917,16 @@ static void process_bin_sop_prepare_nread(conn *c) if (ret == ENGINE_SUCCESS) { if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_SET, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); - } else { + if (add_ritem_einfo_bin(c, &c->einfo) != 0) { + mc_engine.v1->set_elem_free(mc_engine.v0, c, elem); + ret = ENGINE_ENOMEM; + } + } else { c->ritem = ((value_item *)elem)->ptr; c->rlbytes = vlen; - c->rltotal = 0; - } + } + } + if (ret == ENGINE_SUCCESS) { c->coll_eitem = (void *)elem; c->coll_ecount = 1; if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) { @@ -5334,6 +5391,12 @@ static void process_bin_bop_prepare_nread(conn *c) } if (ret == ENGINE_SUCCESS) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_BTREE, elem, &c->einfo); + if (add_ritem_einfo_bin(c, &c->einfo) != 0) { + mc_engine.v1->btree_elem_free(mc_engine.v0, c, elem); + ret = ENGINE_ENOMEM; + } + } + if (ret == ENGINE_SUCCESS) { if (c->einfo.nscore == 0) { memcpy((void*)c->einfo.score, req->message.body.bkey, sizeof(uint64_t)); } else { @@ -5342,8 +5405,6 @@ static void process_bin_bop_prepare_nread(conn *c) if (c->einfo.neflag > 0) { memcpy((void*)c->einfo.eflag, req->message.body.eflag, c->einfo.neflag); } - - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = (c->cmd == PROTOCOL_BINARY_CMD_BOP_INSERT ? OPERATION_BOP_INSERT @@ -5574,7 +5635,6 @@ static void process_bin_bop_update_prepare_nread(conn *c) if (ret == ENGINE_SUCCESS) { c->ritem = ((value_item *)elem)->ptr; c->rlbytes = vlen; - c->rltotal = 0; c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = OPERATION_BOP_UPDATE; @@ -6042,9 +6102,15 @@ static void process_bin_bop_prepare_nread_keys(conn *c) } } while(0); + if (ret == ENGINE_SUCCESS) { + if (add_ritem_mblck(c, &c->memblist) != 0) { + free((void*)elem); + mblck_list_free(&c->thread->mblck_pool, &c->memblist); + ret = ENGINE_ENOMEM; + } + } if (ret == ENGINE_SUCCESS) { c->coll_strkeys = (void*)&c->memblist; - ritem_set_first(c, CONN_RTYPE_MBLCK, vlen); c->coll_eitem = (void *)elem; c->coll_ecount = 0; c->coll_op = (c->cmd==PROTOCOL_BINARY_CMD_BOP_MGET ? OPERATION_BOP_MGET : OPERATION_BOP_SMGET); @@ -7059,7 +7125,6 @@ static void dispatch_bin_command(conn *c) bin_read_chunk(c, bin_reading_packet, c->binary_header.request.bodylen); } } - if (protocol_error) handle_binary_protocol_error(c); } @@ -7117,7 +7182,8 @@ static void process_bin_update(conn *c) realtime(req->message.body.expiration), c->binary_header.request.cas); if (ret == ENGINE_SUCCESS) { - if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { + if (mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo) != true || + add_ritem_hinfo_bin(c, &c->hinfo) != 0) { mc_engine.v1->release(mc_engine.v0, c, it); write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); ret = ENGINE_ENOMEM; @@ -7136,7 +7202,6 @@ static void process_bin_update(conn *c) } c->item = it; - ritem_set_first(c, CONN_RTYPE_HINFO, vlen); conn_set_state(c, conn_nread); c->substate = bin_read_set_value; } @@ -7198,7 +7263,8 @@ static void process_bin_append_prepend(conn *c) ret = mc_engine.v1->allocate(mc_engine.v0, c, &it, key, nkey, vlen+2, 0, 0, c->binary_header.request.cas); if (ret == ENGINE_SUCCESS) { - if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { + if (mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo) != true || + add_ritem_hinfo_bin(c, &c->hinfo) != 0) { mc_engine.v1->release(mc_engine.v0, c, it); write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); } else { @@ -7210,7 +7276,6 @@ static void process_bin_append_prepend(conn *c) assert(0); c->item = it; - ritem_set_first(c, CONN_RTYPE_HINFO, vlen); conn_set_state(c, conn_nread); c->substate = bin_read_set_value; } @@ -8395,9 +8460,14 @@ static void process_prepare_nread_keys(conn *c, uint32_t vlen, uint32_t kcnt, bo if (mblck_list_alloc(&c->thread->mblck_pool, 1, vlen, &c->memblist) < 0) { ret = ENGINE_ENOMEM; } + if (ret == ENGINE_SUCCESS) { + if (add_ritem_mblck(c, &c->memblist) != 0) { + mblck_list_free(&c->thread->mblck_pool, &c->memblist); + ret = ENGINE_ENOMEM; + } + } if (ret == ENGINE_SUCCESS) { c->coll_strkeys = (void*)&c->memblist; - ritem_set_first(c, CONN_RTYPE_MBLCK, vlen); c->coll_op = (return_cas ? OPERATION_MGETS : OPERATION_MGET); conn_set_state(c, conn_nread); } else { @@ -8485,13 +8555,13 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken ret = mc_engine.v1->allocate(mc_engine.v0, c, &it, key, nkey, vlen, htonl(flags), realtime(exptime), req_cas_id); if (ret == ENGINE_SUCCESS) { - if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { + if (mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo) != true || + add_ritem_hinfo_ascii(c, &c->hinfo) != 0) { mc_engine.v1->release(mc_engine.v0, c, it); - out_string(c, "SERVER_ERROR error getting item data"); + out_string(c, "SERVER_ERROR out of memory"); ret = ENGINE_ENOMEM; } else { c->item = it; - ritem_set_first(c, CONN_RTYPE_HINFO, vlen); c->store_op = store_op; conn_set_state(c, conn_nread); } @@ -9546,7 +9616,6 @@ static void process_extension_command(conn *c, token_t *tokens, size_t ntokens) } else { c->ritem = ptr; c->rlbytes = nbytes; - c->rltotal = 0; c->ascii_cmd = cmd; /* NOT SUPPORTED YET! */ conn_set_state(c, conn_nread); @@ -9856,7 +9925,7 @@ static void process_keyscan_command(conn *c, token_t *tokens, const size_t ntoke char *attrptr = response + KEYSCAN_RESPONSE_HEADER_MAX_LENGTH; for (i = 0; i < item_count; i++) { item *it = (item *)c->ilist[i]; - if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { + if (mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo) != true) { ret = ENGINE_ENOMEM; break; } sprintf(attrptr, " %c %d\r\n", @@ -10260,7 +10329,12 @@ static void process_lop_prepare_nread(conn *c, int cmd, size_t vlen, } if (ret == ENGINE_SUCCESS) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_LIST, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); + if (add_ritem_einfo_ascii(c, &c->einfo) != 0) { + mc_engine.v1->list_elem_free(mc_engine.v0, c, elem); + ret = ENGINE_ENOMEM; + } + } + if (ret == ENGINE_SUCCESS) { c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = OPERATION_LOP_INSERT; @@ -10657,12 +10731,16 @@ static void process_sop_prepare_nread(conn *c, int cmd, size_t vlen, char *key, if (ret == ENGINE_SUCCESS) { if (cmd == (int)OPERATION_SOP_INSERT) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_SET, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); + if (add_ritem_einfo_ascii(c, &c->einfo) != 0) { + mc_engine.v1->set_elem_free(mc_engine.v0, c, elem); + ret = ENGINE_ENOMEM; + } } else { c->ritem = ((value_item *)elem)->ptr; c->rlbytes = vlen; - c->rltotal = 0; } + } + if (ret == ENGINE_SUCCESS) { c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = cmd; @@ -11335,7 +11413,6 @@ static void process_bop_update_prepare_nread(conn *c, int cmd, if (ret == ENGINE_SUCCESS) { c->ritem = ((value_item *)elem)->ptr; c->rlbytes = vlen; - c->rltotal = 0; c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = cmd; @@ -11347,7 +11424,6 @@ static void process_bop_update_prepare_nread(conn *c, int cmd, STATS_CMD_NOKEY(c, bop_update); if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); else out_string(c, "SERVER_ERROR out of memory"); - /* swallow the data line */ c->sbytes = vlen; if (c->state == conn_write) { @@ -11374,10 +11450,15 @@ static void process_bop_prepare_nread(conn *c, int cmd, char *key, size_t nkey, } if (ret == ENGINE_SUCCESS) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_BTREE, elem, &c->einfo); + if (add_ritem_einfo_ascii(c, &c->einfo) != 0) { + mc_engine.v1->btree_elem_free(mc_engine.v0, c, elem); + ret = ENGINE_ENOMEM; + } + } + if (ret == ENGINE_SUCCESS) { memcpy((void*)c->einfo.score, bkey, (c->einfo.nscore==0 ? sizeof(uint64_t) : c->einfo.nscore)); if (c->einfo.neflag > 0) memcpy((void*)c->einfo.eflag, eflag, c->einfo.neflag); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = cmd; /* OPERATION_BOP_INSERT | OPERATION_BOP_UPSERT */ @@ -11467,9 +11548,15 @@ static void process_bop_prepare_nread_keys(conn *c, int cmd, uint32_t vlen, uint ret = ENGINE_ENOMEM; } } + if (ret == ENGINE_SUCCESS) { + if (add_ritem_mblck(c, &c->memblist) != 0) { + free((void*)elem); + mblck_list_free(&c->thread->mblck_pool, &c->memblist); + ret = ENGINE_ENOMEM; + } + } if (ret == ENGINE_SUCCESS) { c->coll_strkeys = (void*)&c->memblist; - ritem_set_first(c, CONN_RTYPE_MBLCK, vlen); c->coll_eitem = (void *)elem; c->coll_ecount = 0; c->coll_op = cmd; @@ -11833,12 +11920,16 @@ static void process_mop_prepare_nread(conn *c, int cmd, char *key, size_t nkey, if (ret == ENGINE_SUCCESS) { if (cmd == OPERATION_MOP_INSERT) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_MAP, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); + if (add_ritem_einfo_ascii(c, &c->einfo) != 0) { + mc_engine.v1->map_elem_free(mc_engine.v0, c, elem); + ret = ENGINE_ENOMEM; + } } else { c->ritem = ((value_item *)elem)->ptr; c->rlbytes = vlen; - c->rltotal = 0; } + } + if (ret == ENGINE_SUCCESS) { c->coll_eitem = (void *)elem; c->coll_ecount = 1; c->coll_op = cmd; @@ -11877,9 +11968,14 @@ static void process_mop_prepare_nread_fields(conn *c, int cmd, char *key, size_t if (mblck_list_alloc(&c->thread->mblck_pool, 1, flen, &c->memblist) < 0) { ret = ENGINE_ENOMEM; } + if (ret == ENGINE_SUCCESS) { + if (add_ritem_mblck(c, &c->memblist) != 0) { + mblck_list_free(&c->thread->mblck_pool, &c->memblist); + ret = ENGINE_ENOMEM; + } + } if (ret == ENGINE_SUCCESS) { c->coll_strkeys = (void*)&c->memblist; - ritem_set_first(c, CONN_RTYPE_MBLCK, flen); c->coll_ecount = 1; c->coll_op = cmd; c->coll_lenkeys = flen; @@ -13765,9 +13861,39 @@ bool conn_swallow(conn *c) bool conn_nread(conn *c) { - ssize_t res; + ssize_t len; + bool error = false; + do { + if (c->rlbytes == 0) { + c->ritem = c->rlist[c->rlcurr].iov_base; + c->rlbytes = c->rlist[c->rlcurr].iov_len; + c->rlcurr++; + } + while (c->rlbytes > 0) { + if (c->rbytes > 0) { + len = c->rbytes < c->rlbytes ? c->rbytes : c->rlbytes; + if (c->ritem != c->rcurr) + memmove(c->ritem, c->rcurr, len); + c->rcurr += len; + c->rbytes -= len; + } else { + len = read(c->sfd, c->ritem, c->rlbytes); + if (len > 0) { + STATS_ADD(c, bytes_read, len); + if (c->rcurr == c->ritem) + c->rcurr += len; + } else { + error = true; + break; + } + } + c->ritem += len; + c->rlbytes -= len; + } + } while (!error && c->rlcurr < c->rlused); - if (c->rlbytes == 0) { + if (!error) { + c->rlcurr = c->rlused = 0; complete_nread(c); /* complete_nread eventually calls write functions @@ -13782,83 +13908,37 @@ bool conn_nread(conn *c) return false; /* blocked */ } } - return true; - } - - /* first check if we have leftovers in the conn_read buffer */ - while (c->rbytes > 0) { - int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; - if (c->ritem != c->rcurr) { - memmove(c->ritem, c->rcurr, tocopy); - } - c->ritem += tocopy; - c->rlbytes -= tocopy; - c->rcurr += tocopy; - c->rbytes -= tocopy; - if (c->rltotal > 0) { /* string block read */ - c->rltotal -= tocopy; - if (c->rlbytes == 0 && c->rltotal > 0) { - ritem_set_next(c); - continue; + } else { + if (len == 0) { /* end of stream */ + if (settings.verbose > 0) { + mc_logger->log(EXTENSION_LOG_INFO, c, + "Couldn't read in conn_nread: end of stream.\n"); } - } - if (c->rlbytes == 0) { - return true; - } - } - - /* now try reading from the socket */ - res = read(c->sfd, c->ritem, c->rlbytes); - if (res > 0) { - STATS_ADD(c, bytes_read, res); - if (c->rcurr == c->ritem) { - c->rcurr += res; - } - c->ritem += res; - c->rlbytes -= res; - if (c->rltotal > 0) { - c->rltotal -= res; - if (c->rlbytes == 0 && c->rltotal > 0) { - ritem_set_next(c); + } else if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (!update_event(c, EV_READ | EV_PERSIST)) { + mc_logger->log(EXTENSION_LOG_WARNING, c, + "Couldn't update event in conn_nread.\n"); + } else { + return false; } - } - return true; - } - if (res == 0) { /* end of stream */ - if (settings.verbose > 0) { + } else if (errno == ENOTCONN || errno == ECONNRESET){ mc_logger->log(EXTENSION_LOG_INFO, c, - "Couldn't read in conn_nread: end of stream.\n"); - } - conn_set_state(c, conn_closing); - return true; - } - if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - if (!update_event(c, EV_READ | EV_PERSIST)) { + "Failed to read in conn_nread, and not due to blocking: err=(%d:%s), client_ip: %s\n" + "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n", + errno, strerror(errno), c->client_ip, + (long)c->rcurr, (long)c->ritem, (long)c->rbuf, + (int)c->rlbytes, (int)c->rsize); + } else { + /* otherwise we have a real error, on which we close the connection */ mc_logger->log(EXTENSION_LOG_WARNING, c, - "Couldn't update event in conn_nread.\n"); - conn_set_state(c, conn_closing); - return true; + "Failed to read in conn_nread, and not due to blocking: err=(%d:%s), client_ip: %s\n" + "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n", + errno, strerror(errno), c->client_ip, + (long)c->rcurr, (long)c->ritem, (long)c->rbuf, + (int)c->rlbytes, (int)c->rsize); } - return false; - } - - if (errno != ENOTCONN && errno != ECONNRESET) { - /* otherwise we have a real error, on which we close the connection */ - mc_logger->log(EXTENSION_LOG_WARNING, c, - "Failed to read in conn_nread, and not due to blocking: err=(%d:%s), client_ip: %s\n" - "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n", - errno, strerror(errno), c->client_ip, - (long)c->rcurr, (long)c->ritem, (long)c->rbuf, - (int)c->rlbytes, (int)c->rsize); - } else { - mc_logger->log(EXTENSION_LOG_INFO, c, - "Failed to read in conn_nread, and not due to blocking: err=(%d:%s), client_ip: %s\n" - "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n", - errno, strerror(errno), c->client_ip, - (long)c->rcurr, (long)c->ritem, (long)c->rbuf, - (int)c->rlbytes, (int)c->rsize); + conn_set_state(c, conn_closing); } - conn_set_state(c, conn_closing); return true; } diff --git a/memcached.h b/memcached.h index b2a49df1d..153e8c6ee 100644 --- a/memcached.h +++ b/memcached.h @@ -76,11 +76,15 @@ /** Initial number of sendmsg() argument structures to allocate. */ #define MSG_LIST_INITIAL 10 +/** Initial size of list of items being read. */ +#define RITEM_LIST_INITIAL 10 + /** High water marks for buffer shrinking */ #define READ_BUFFER_HIGHWAT 8192 #define ITEM_LIST_HIGHWAT 400 #define IOV_LIST_HIGHWAT 600 #define MSG_LIST_HIGHWAT 100 +#define RITEM_LIST_HIGHWAT 100 /* Binary protocol stuff */ #define MIN_BIN_PKT_LENGTH 16 @@ -261,12 +265,6 @@ typedef bool (*STATE_FUNC)(conn *); /** * The structure representing a connection into memcached. */ -/* rtype in connection */ -#define CONN_RTYPE_NONE 0 -#define CONN_RTYPE_MBLCK 1 -#define CONN_RTYPE_HINFO 2 -#define CONN_RTYPE_EINFO 3 - struct conn { int sfd; short nevents; @@ -290,13 +288,16 @@ struct conn { STATE_FUNC write_and_go; void *write_and_free; /** free this memory after finishing writing */ - int rtype; /* CONN_RTYPE_XXXXX */ - int rindex; /* used when rtype is HINFO or EINFO */ - char *ritem; /** when we read in an item's value, it goes here */ - uint32_t rlbytes; + /** data for nread state */ + struct iovec *rlist; /** list to read data except commands if needed */ + uint32_t rlsize; /** total allocated size of rlist */ + uint32_t rlcurr; /** element in rlist[] being read now */ + uint32_t rlused; /** number of elements used in rlist[] */ + + char *ritem; /** data pointer being read currently */ + uint32_t rlbytes; /** data length being read currently */ + /* use memory blocks */ - uint32_t rltotal; /* Used when read data with memory block */ - mblck_node_t *membk; /* current memory block pointer */ mblck_list_t memblist; /* (key or field) string memory block list */ /* hash item and elem item info */