diff --git a/memcached.c b/memcached.c index c7b65f4d7..b5e005085 100644 --- a/memcached.c +++ b/memcached.c @@ -528,6 +528,17 @@ static bool conn_reset_buffersize(conn *c) } } + if (c->rlsize != RITEM_LIST_INITIAL) { + void *ptr = malloc(sizeof(struct iovec) * RITEM_LIST_INITIAL); + if (ptr != NULL) { + free(c->rlist); + c->rlist = ptr; + c->rlsize = RITEM_LIST_INITIAL; + } else { + ret = false; + } + } + return ret; } @@ -555,6 +566,7 @@ static int conn_constructor(void *buffer, void *unused1, int unused2) free(c->suffixlist); free(c->iov); free(c->msglist); + free(c->rlist); mc_logger->log(EXTENSION_LOG_WARNING, NULL, "Failed to allocate buffers for connection\n"); return 1; @@ -583,6 +595,7 @@ static void conn_destructor(void *buffer, void *unused) free(c->suffixlist); free(c->iov); free(c->msglist); + free(c->rlist); LOCK_STATS(); mc_stats.conn_structs--; @@ -655,11 +668,8 @@ conn *conn_new(const int sfd, STATE_FUNC init_state, c->rbytes = c->wbytes = 0; c->wcurr = c->wbuf; c->rcurr = c->rbuf; - c->rtype = CONN_RTYPE_NONE; - c->rindex = 0; /* used when rtype is HINFO or EINFO */ - c->ritem = 0; - c->rlbytes = 0; - c->rltotal = 0; /* used when read with multiple mem blocks */ + c->rlcurr = 0; + c->rlused = 0; #ifdef SCAN_COMMAND c->pcurr = c->ilist; #endif @@ -1010,85 +1020,89 @@ static void conn_shrink(conn *c) c->iov = newbuf; c->iovsize = IOV_LIST_INITIAL; } + /* TODO check error condition? */ + } + + if (c->rlsize > RITEM_LIST_HIGHWAT) { + struct iovec *newbuf = (struct iovec *) realloc((void *)c->rlist, RITEM_LIST_INITIAL * sizeof(c->rlist[0])); + if (newbuf) { + c->rlist = newbuf; + c->rlsize = IOV_LIST_INITIAL; + } /* TODO check return value */ } } -static void ritem_set_first(conn *c, int rtype, int vleng) -{ - c->rtype = rtype; - if (c->rtype == CONN_RTYPE_MBLCK) { - c->membk = MBLCK_GET_HEADBLK(&c->memblist); - c->ritem = MBLCK_GET_BODYPTR(c->membk); - c->rlbytes = vleng < MBLCK_GET_BODYLEN(&c->memblist) - ? vleng : MBLCK_GET_BODYLEN(&c->memblist); - c->rltotal = vleng; - } - else if (c->rtype == CONN_RTYPE_HINFO) { - if (c->hinfo.naddnl == 0) { - c->ritem = (char*)c->hinfo.value; - c->rlbytes = vleng; - c->rltotal = 0; - } else { - if (c->hinfo.nvalue > 0) { - c->ritem = (char*)c->hinfo.value; - c->rlbytes = vleng < c->hinfo.nvalue - ? vleng : c->hinfo.nvalue; - c->rindex = 0; - } else { - c->ritem = c->hinfo.addnl[0]->ptr; - c->rlbytes = vleng < c->hinfo.addnl[0]->len - ? vleng : c->hinfo.addnl[0]->len; - c->rindex = 1; - } - c->rltotal = vleng; - } +static int add_ritem(conn *c, void *buf, int len) { + if (c->rlused >= c->rlsize) { + struct iovec *new_rlist = (struct iovec *)realloc(c->rlist, + (c->rlsize * 2) * sizeof(struct iovec)); + if (! new_rlist) + return -1; + c->rlist = new_rlist; + c->rlsize *= 2; } - else if (c->rtype == CONN_RTYPE_EINFO) { - if (c->einfo.naddnl == 0) { - c->ritem = (char*)c->einfo.value; - c->rlbytes = vleng; - c->rltotal = 0; - } else { - if (c->einfo.nvalue > 0) { - c->ritem = (char*)c->einfo.value; - c->rlbytes = vleng < c->einfo.nvalue - ? vleng : c->einfo.nvalue; - c->rindex = 0; - } else { - c->ritem = c->einfo.addnl[0]->ptr; - c->rlbytes = vleng < c->einfo.addnl[0]->len - ? vleng : c->einfo.addnl[0]->len; - c->rindex = 1; - } - c->rltotal = vleng; - } + if (len != 0) { + c->rlist[c->rlused].iov_base = buf; + c->rlist[c->rlused].iov_len = len; + c->rlused++; } + return 0; } -static void ritem_set_next(conn *c) -{ - assert(c->rltotal > 0); - if (c->rtype == CONN_RTYPE_MBLCK) { - c->membk = MBLCK_GET_NEXTBLK(c->membk); - c->ritem = MBLCK_GET_BODYPTR(c->membk); - c->rlbytes = c->rltotal < MBLCK_GET_BODYLEN(&c->memblist) - ? c->rltotal : MBLCK_GET_BODYLEN(&c->memblist); +static int add_ritem_mblck(conn *c, mblck_list_t *memblist) { + uint32_t rltotal = MBLCK_GET_ITEMCNT(memblist); + uint32_t rlbytes; + for (mblck_node_t *membk = MBLCK_GET_HEADBLK(memblist); + membk != NULL && rltotal > 0; + membk = MBLCK_GET_NEXTBLK(membk)) { + rlbytes = rltotal < MBLCK_GET_BODYLEN(memblist) + ? rltotal : MBLCK_GET_BODYLEN(memblist); + if (add_ritem(c, MBLCK_GET_BODYPTR(membk), rlbytes) != 0) + return -1; + rltotal -= rlbytes; } - else if (c->rtype == CONN_RTYPE_HINFO) { - c->ritem = c->hinfo.addnl[c->rindex]->ptr; - c->rlbytes = c->rltotal < c->hinfo.addnl[c->rindex]->len - ? c->rltotal : c->hinfo.addnl[c->rindex]->len; - c->rindex += 1; + return 0; +} + +static int add_ritem_hinfo(conn *c, item_info *hinfo) { + uint32_t rltotal = hinfo->nbytes; + uint32_t rlbytes; + if (c->protocol == binary_prot) rltotal -= 2; + rlbytes = rltotal < hinfo->nvalue + ? rltotal : hinfo->nvalue; + if (add_ritem(c, (void *)hinfo->value, rlbytes) != 0) + return -1; + rltotal -= rlbytes; + for (int i = 0; i < hinfo->naddnl && rltotal > 0; i++) { + rlbytes = rltotal < hinfo->addnl[i]->len + ? rltotal : hinfo->addnl[i]->len; + if (add_ritem(c, hinfo->addnl[i]->ptr, rlbytes) != 0) + return -1; + rltotal -= rlbytes; } - else if (c->rtype == CONN_RTYPE_EINFO) { - c->ritem = c->einfo.addnl[c->rindex]->ptr; - c->rlbytes = c->rltotal < c->einfo.addnl[c->rindex]->len - ? c->rltotal : c->einfo.addnl[c->rindex]->len; - c->rindex += 1; + return 0; +} + +static int add_ritem_einfo(conn *c, eitem_info *einfo) { + uint32_t vleng = einfo->nbytes; + uint32_t rlbytes; + if (c->protocol == binary_prot) vleng -= 2; + rlbytes = vleng < einfo->nvalue + ? vleng : einfo->nvalue; + if (add_ritem(c, (void *)einfo->value, rlbytes) != 0) + return -1; + vleng -= rlbytes; + for (int i = 0; i < einfo->naddnl && vleng > 0; i++) { + rlbytes = vleng < einfo->addnl[i]->len + ? vleng : einfo->addnl[i]->len; + if (add_ritem(c, einfo->addnl[i]->ptr, rlbytes) != 0) + return -1; + vleng -= rlbytes; } + return 0; } /** @@ -4125,9 +4139,15 @@ static void bin_read_chunk(conn *c, enum bin_substates next_substate, uint32_t c } /* preserve the header in the buffer.. */ - c->ritem = c->rcurr + sizeof(protocol_binary_request_header); - c->rlbytes = chunk; - c->rltotal = 0; + if (add_ritem(c, c->rcurr + sizeof(protocol_binary_request_header), chunk) != 0) + { + if (settings.verbose) { + mc_logger->log(EXTENSION_LOG_WARNING, c, + "%d: Failed to grow rlist. closing connection\n", c->sfd); + } + conn_set_state(c, conn_closing); + return; + } conn_set_state(c, conn_nread); } @@ -4242,9 +4262,10 @@ static void process_bin_sasl_auth(conn *c) memcpy(data->data, key, nkey); c->item = data; - c->ritem = data->data + nkey; - c->rlbytes = vlen; - c->rltotal = 0; + if (add_ritem(c, data->data + nkey, vlen) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + return; + } conn_set_state(c, conn_nread); c->substate = bin_reading_sasl_auth_data; } @@ -4303,7 +4324,6 @@ static void process_bin_complete_sasl_auth(conn *c) free(c->item); c->item = NULL; - c->ritem = NULL; if (settings.verbose) { mc_logger->log(EXTENSION_LOG_INFO, c, @@ -4476,27 +4496,27 @@ static void process_bin_lop_prepare_nread(conn *c) } if (ret == ENGINE_SUCCESS) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_LIST, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); - c->coll_eitem = (void *)elem; - c->coll_ecount = 1; - c->coll_op = OPERATION_LOP_INSERT; - c->coll_index = req->message.body.index; - if (req->message.body.create) { - c->coll_attrp = &c->coll_attr_space; /* create if not exist */ - c->coll_attrp->flags = req->message.body.flags; - c->coll_attrp->exptime = realtime(req->message.body.exptime); - c->coll_attrp->maxcount = req->message.body.maxcount; - c->coll_attrp->readable = 1; + if (add_ritem_einfo(c, &c->einfo) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + ret = ENGINE_ENOMEM; } else { - c->coll_attrp = NULL; + c->coll_eitem = (void *)elem; + c->coll_ecount = 1; + c->coll_op = OPERATION_LOP_INSERT; + c->coll_index = req->message.body.index; + if (req->message.body.create) { + c->coll_attrp = &c->coll_attr_space; /* create if not exist */ + c->coll_attrp->flags = req->message.body.flags; + c->coll_attrp->exptime = realtime(req->message.body.exptime); + c->coll_attrp->maxcount = req->message.body.maxcount; + c->coll_attrp->readable = 1; + } else { + c->coll_attrp = NULL; + } + conn_set_state(c, conn_nread); + c->substate = bin_reading_lop_nread_complete; } - conn_set_state(c, conn_nread); - c->substate = bin_reading_lop_nread_complete; } else { - if (settings.detail_enabled) { - stats_prefix_record_lop_insert(key, nkey, false); - } - STATS_CMD_NOKEY(c, lop_insert); if (ret == ENGINE_E2BIG) write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen); else if (ret == ENGINE_ENOMEM) @@ -4504,6 +4524,12 @@ static void process_bin_lop_prepare_nread(conn *c) else handle_unexpected_errorcode_bin(c, __func__, ret, vlen); } + if (ret != ENGINE_SUCCESS) { + if (settings.detail_enabled) { + stats_prefix_record_lop_insert(key, nkey, false); + } + STATS_CMD_NOKEY(c, lop_insert); + } } static void process_bin_lop_insert_complete(conn *c) @@ -4864,40 +4890,52 @@ static void process_bin_sop_prepare_nread(conn *c) if (ret == ENGINE_SUCCESS) { if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_SET, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); - } else { - c->ritem = ((value_item *)elem)->ptr; - c->rlbytes = vlen; - c->rltotal = 0; - } - c->coll_eitem = (void *)elem; - c->coll_ecount = 1; - if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) { - protocol_binary_request_sop_insert* req = binary_get_request(c); - if (req->message.body.create) { - req->message.body.exptime = ntohl(req->message.body.exptime); - req->message.body.maxcount = ntohl(req->message.body.maxcount); + if (add_ritem_einfo(c, &c->einfo) != 0){ + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0); + ret = ENGINE_ENOMEM; } - c->coll_op = OPERATION_SOP_INSERT; - if (req->message.body.create) { - c->coll_attrp = &c->coll_attr_space; /* create if not exist */ - c->coll_attrp->flags = req->message.body.flags; - c->coll_attrp->exptime = realtime(req->message.body.exptime); - c->coll_attrp->maxcount = req->message.body.maxcount; - c->coll_attrp->readable = 1; - } else { - c->coll_attrp = NULL; + } else if (add_ritem(c, ((value_item *)elem)->ptr, vlen) != 0){ + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0); + ret = ENGINE_ENOMEM; + } + if (ret == ENGINE_SUCCESS) { + c->coll_eitem = (void *)elem; + c->coll_ecount = 1; + if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) { + protocol_binary_request_sop_insert* req = binary_get_request(c); + if (req->message.body.create) { + req->message.body.exptime = ntohl(req->message.body.exptime); + req->message.body.maxcount = ntohl(req->message.body.maxcount); + } + c->coll_op = OPERATION_SOP_INSERT; + if (req->message.body.create) { + c->coll_attrp = &c->coll_attr_space; /* create if not exist */ + c->coll_attrp->flags = req->message.body.flags; + c->coll_attrp->exptime = realtime(req->message.body.exptime); + c->coll_attrp->maxcount = req->message.body.maxcount; + c->coll_attrp->readable = 1; + } else { + c->coll_attrp = NULL; + } + } else if (c->cmd == PROTOCOL_BINARY_CMD_SOP_DELETE) { + protocol_binary_request_sop_delete* req = binary_get_request(c); + c->coll_op = OPERATION_SOP_DELETE; + c->coll_drop = (req->message.body.drop ? true : false); + } else { /* PROTOCOL_BINARY_CMD_SOP_EXIST */ + c->coll_op = OPERATION_SOP_EXIST; } - } else if (c->cmd == PROTOCOL_BINARY_CMD_SOP_DELETE) { - protocol_binary_request_sop_delete* req = binary_get_request(c); - c->coll_op = OPERATION_SOP_DELETE; - c->coll_drop = (req->message.body.drop ? true : false); - } else { /* PROTOCOL_BINARY_CMD_SOP_EXIST */ - c->coll_op = OPERATION_SOP_EXIST; + conn_set_state(c, conn_nread); + c->substate = bin_reading_sop_nread_complete; } - conn_set_state(c, conn_nread); - c->substate = bin_reading_sop_nread_complete; } else { + if (ret == ENGINE_E2BIG) + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen); + else if (ret == ENGINE_ENOMEM) + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + else + handle_unexpected_errorcode_bin(c, __func__, ret, vlen); + } + if (ret != ENGINE_SUCCESS){ if (c->cmd == PROTOCOL_BINARY_CMD_SOP_INSERT) { if (settings.detail_enabled) stats_prefix_record_sop_insert(key, nkey, false); @@ -4911,12 +4949,6 @@ static void process_bin_sop_prepare_nread(conn *c) stats_prefix_record_sop_exist(key, nkey, false); STATS_CMD_NOKEY(c, sop_exist); } - if (ret == ENGINE_E2BIG) - write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen); - else if (ret == ENGINE_ENOMEM) - write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); - else - handle_unexpected_errorcode_bin(c, __func__, ret, vlen); } } @@ -5334,36 +5366,36 @@ static void process_bin_bop_prepare_nread(conn *c) } if (ret == ENGINE_SUCCESS) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_BTREE, elem, &c->einfo); - if (c->einfo.nscore == 0) { - memcpy((void*)c->einfo.score, req->message.body.bkey, sizeof(uint64_t)); + if (add_ritem_einfo(c, &c->einfo) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + ret = ENGINE_ENOMEM; } else { - memcpy((void*)c->einfo.score, req->message.body.bkey, c->einfo.nscore); - } - if (c->einfo.neflag > 0) { - memcpy((void*)c->einfo.eflag, req->message.body.eflag, c->einfo.neflag); - } + if (c->einfo.nscore == 0) { + memcpy((void*)c->einfo.score, req->message.body.bkey, sizeof(uint64_t)); + } else { + memcpy((void*)c->einfo.score, req->message.body.bkey, c->einfo.nscore); + } + if (c->einfo.neflag > 0) { + memcpy((void*)c->einfo.eflag, req->message.body.eflag, c->einfo.neflag); + } - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); - c->coll_eitem = (void *)elem; - c->coll_ecount = 1; - c->coll_op = (c->cmd == PROTOCOL_BINARY_CMD_BOP_INSERT ? OPERATION_BOP_INSERT - : OPERATION_BOP_UPSERT); - if (req->message.body.create) { - c->coll_attrp = &c->coll_attr_space; /* create if not exist */ - c->coll_attrp->flags = req->message.body.flags; - c->coll_attrp->exptime = realtime(req->message.body.exptime); - c->coll_attrp->maxcount = req->message.body.maxcount; - c->coll_attrp->readable = 1; - } else { - c->coll_attrp = NULL; + c->coll_eitem = (void *)elem; + c->coll_ecount = 1; + c->coll_op = (c->cmd == PROTOCOL_BINARY_CMD_BOP_INSERT ? OPERATION_BOP_INSERT + : OPERATION_BOP_UPSERT); + if (req->message.body.create) { + c->coll_attrp = &c->coll_attr_space; /* create if not exist */ + c->coll_attrp->flags = req->message.body.flags; + c->coll_attrp->exptime = realtime(req->message.body.exptime); + c->coll_attrp->maxcount = req->message.body.maxcount; + c->coll_attrp->readable = 1; + } else { + c->coll_attrp = NULL; + } + conn_set_state(c, conn_nread); + c->substate = bin_reading_bop_nread_complete; } - conn_set_state(c, conn_nread); - c->substate = bin_reading_bop_nread_complete; } else { - if (settings.detail_enabled) { - stats_prefix_record_bop_insert(key, nkey, false); - } - STATS_CMD_NOKEY(c, bop_insert); if (ret == ENGINE_E2BIG) write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen); else if (ret == ENGINE_ENOMEM) @@ -5371,6 +5403,12 @@ static void process_bin_bop_prepare_nread(conn *c) else handle_unexpected_errorcode_bin(c, __func__, ret, vlen); } + if (ret != ENGINE_SUCCESS) { + if (settings.detail_enabled) { + stats_prefix_record_bop_insert(key, nkey, false); + } + STATS_CMD_NOKEY(c, bop_insert); + } } static void process_bin_bop_insert_complete(conn *c) @@ -5572,19 +5610,17 @@ static void process_bin_bop_update_prepare_nread(conn *c) ((value_item*)elem)->len = vlen + 2; } if (ret == ENGINE_SUCCESS) { - c->ritem = ((value_item *)elem)->ptr; - c->rlbytes = vlen; - c->rltotal = 0; - c->coll_eitem = (void *)elem; - c->coll_ecount = 1; - c->coll_op = OPERATION_BOP_UPDATE; - conn_set_state(c, conn_nread); - c->substate = bin_reading_bop_update_nread_complete; - } else { - if (settings.detail_enabled) { - stats_prefix_record_bop_update(key, nkey, false); + if (add_ritem(c, ((value_item *)elem)->ptr, vlen) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + ret = ENGINE_ENOMEM; + } else { + c->coll_eitem = (void *)elem; + c->coll_ecount = 1; + c->coll_op = OPERATION_BOP_UPDATE; + conn_set_state(c, conn_nread); + c->substate = bin_reading_bop_update_nread_complete; } - STATS_CMD_NOKEY(c, bop_update); + } else { /* ret == ENGINE_E2BIG || ret == ENGINE_ENOMEM */ if (ret == ENGINE_E2BIG) write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen); @@ -5593,6 +5629,12 @@ static void process_bin_bop_update_prepare_nread(conn *c) else handle_unexpected_errorcode_bin(c, __func__, ret, vlen); } + if (ret != ENGINE_ENOMEM) { + if (settings.detail_enabled) { + stats_prefix_record_bop_update(key, nkey, false); + } + STATS_CMD_NOKEY(c, bop_update); + } } static void process_bin_bop_delete(conn *c) @@ -6043,13 +6085,17 @@ static void process_bin_bop_prepare_nread_keys(conn *c) } while(0); if (ret == ENGINE_SUCCESS) { - c->coll_strkeys = (void*)&c->memblist; - ritem_set_first(c, CONN_RTYPE_MBLCK, vlen); - c->coll_eitem = (void *)elem; - c->coll_ecount = 0; - c->coll_op = (c->cmd==PROTOCOL_BINARY_CMD_BOP_MGET ? OPERATION_BOP_MGET : OPERATION_BOP_SMGET); - conn_set_state(c, conn_nread); - c->substate = bin_reading_bop_nread_keys_complete; + if (add_ritem_mblck(c, &c->memblist) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + return; + } else { + c->coll_strkeys = (void*)&c->memblist; + c->coll_eitem = (void *)elem; + c->coll_ecount = 0; + c->coll_op = (c->cmd==PROTOCOL_BINARY_CMD_BOP_MGET ? OPERATION_BOP_MGET : OPERATION_BOP_SMGET); + conn_set_state(c, conn_nread); + c->substate = bin_reading_bop_nread_keys_complete; + } } else { /* ret == ENGINE_EBADVALUE || ret == ENGINE_ENOMEM */ if (ret == ENGINE_EBADVALUE) @@ -7121,6 +7167,9 @@ static void process_bin_update(conn *c) mc_engine.v1->release(mc_engine.v0, c, it); write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); ret = ENGINE_ENOMEM; + } else if (add_ritem_hinfo(c, &c->hinfo) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + ret = ENGINE_ENOMEM; } else { if (c->cmd == PROTOCOL_BINARY_CMD_ADD) c->store_op = OPERATION_ADD; @@ -7136,7 +7185,6 @@ static void process_bin_update(conn *c) } c->item = it; - ritem_set_first(c, CONN_RTYPE_HINFO, vlen); conn_set_state(c, conn_nread); c->substate = bin_read_set_value; } @@ -7201,6 +7249,8 @@ static void process_bin_append_prepend(conn *c) if (!mc_engine.v1->get_item_info(mc_engine.v0, c, it, &c->hinfo)) { mc_engine.v1->release(mc_engine.v0, c, it); write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); + } else if (add_ritem_hinfo(c, &c->hinfo) != 0) { + write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen); } else { if (c->cmd == PROTOCOL_BINARY_CMD_APPEND) c->store_op = OPERATION_APPEND; @@ -7210,7 +7260,6 @@ static void process_bin_append_prepend(conn *c) assert(0); c->item = it; - ritem_set_first(c, CONN_RTYPE_HINFO, vlen); conn_set_state(c, conn_nread); c->substate = bin_read_set_value; } @@ -7529,6 +7578,8 @@ static int try_read_command_binary(conn *c) c->msgcurr = 0; c->msgused = 0; c->iovused = 0; + c->rlcurr = 0; + c->rlused = 0; int ret = add_msghdr(c); assert(ret == 0); @@ -8392,12 +8443,12 @@ static void process_prepare_nread_keys(conn *c, uint32_t vlen, uint32_t kcnt, bo ENGINE_ERROR_CODE ret = ENGINE_SUCCESS; /* allocate memory blocks needed */ - if (mblck_list_alloc(&c->thread->mblck_pool, 1, vlen, &c->memblist) < 0) { + if (mblck_list_alloc(&c->thread->mblck_pool, 1, vlen, &c->memblist) < 0 || + add_ritem_mblck(c, &c->memblist) != 0) { ret = ENGINE_ENOMEM; } if (ret == ENGINE_SUCCESS) { c->coll_strkeys = (void*)&c->memblist; - ritem_set_first(c, CONN_RTYPE_MBLCK, vlen); c->coll_op = (return_cas ? OPERATION_MGETS : OPERATION_MGET); conn_set_state(c, conn_nread); } else { @@ -8489,9 +8540,11 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken mc_engine.v1->release(mc_engine.v0, c, it); out_string(c, "SERVER_ERROR error getting item data"); ret = ENGINE_ENOMEM; + } else if (add_ritem_hinfo(c, &c->hinfo) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; } else { c->item = it; - ritem_set_first(c, CONN_RTYPE_HINFO, vlen); c->store_op = store_op; conn_set_state(c, conn_nread); } @@ -9543,10 +9596,9 @@ static void process_extension_command(conn *c, token_t *tokens, size_t ntokens) conn_set_state(c, conn_new_cmd); } } + } else if (add_ritem(c, ptr, nbytes) != 0) { + out_string(c, "SERVER ERROR out of memory"); } else { - c->ritem = ptr; - c->rlbytes = nbytes; - c->rltotal = 0; c->ascii_cmd = cmd; /* NOT SUPPORTED YET! */ conn_set_state(c, conn_nread); @@ -10260,20 +10312,26 @@ static void process_lop_prepare_nread(conn *c, int cmd, size_t vlen, } if (ret == ENGINE_SUCCESS) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_LIST, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); - c->coll_eitem = (void *)elem; - c->coll_ecount = 1; - c->coll_op = OPERATION_LOP_INSERT; - c->coll_index = index; - conn_set_state(c, conn_nread); + if (add_ritem_einfo(c, &c->einfo) != 0) { + out_string(c, "SERVER ERROR out of memory"); + ret = ENGINE_ENOMEM; + } else { + c->coll_eitem = (void *)elem; + c->coll_ecount = 1; + c->coll_op = OPERATION_LOP_INSERT; + c->coll_index = index; + conn_set_state(c, conn_nread); + } } else { + if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); + else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); + else handle_unexpected_errorcode_ascii(c, __func__, ret); + } + if (ret != ENGINE_SUCCESS) { if (settings.detail_enabled) { stats_prefix_record_lop_insert(key, nkey, false); } STATS_CMD_NOKEY(c, lop_insert); - if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); - else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); - else handle_unexpected_errorcode_ascii(c, __func__, ret); if (ret != ENGINE_DISCONNECT) { /* swallow the data line */ @@ -10650,23 +10708,28 @@ static void process_sop_prepare_nread(conn *c, int cmd, size_t vlen, char *key, } else { /* OPERATION_SOP_DELETE or OPERATION_SOP_EXIST */ if ((elem = (eitem *)malloc(sizeof(value_item) + vlen)) == NULL) ret = ENGINE_ENOMEM; - else + else { ((value_item*)elem)->len = vlen; + } } } if (ret == ENGINE_SUCCESS) { if (cmd == (int)OPERATION_SOP_INSERT) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_SET, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); - } else { - c->ritem = ((value_item *)elem)->ptr; - c->rlbytes = vlen; - c->rltotal = 0; + if (add_ritem_einfo(c, &c->einfo) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } + } else if (add_ritem(c, ((value_item *)elem)->ptr, vlen) != 0){ + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } + if (ret == ENGINE_SUCCESS) { + c->coll_eitem = (void *)elem; + c->coll_ecount = 1; + c->coll_op = cmd; + conn_set_state(c, conn_nread); } - c->coll_eitem = (void *)elem; - c->coll_ecount = 1; - c->coll_op = cmd; - conn_set_state(c, conn_nread); } else { if (cmd == (int)OPERATION_SOP_INSERT) { if (settings.detail_enabled) @@ -11333,21 +11396,24 @@ static void process_bop_update_prepare_nread(conn *c, int cmd, ((value_item*)elem)->len = vlen; } if (ret == ENGINE_SUCCESS) { - c->ritem = ((value_item *)elem)->ptr; - c->rlbytes = vlen; - c->rltotal = 0; - c->coll_eitem = (void *)elem; - c->coll_ecount = 1; - c->coll_op = cmd; - conn_set_state(c, conn_nread); + if (add_ritem(c, ((value_item *)elem)->ptr, vlen) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } else { + c->coll_eitem = (void *)elem; + c->coll_ecount = 1; + c->coll_op = cmd; + conn_set_state(c, conn_nread); + } } else { + if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); + else out_string(c, "SERVER_ERROR out of memory"); + } + if (ret != ENGINE_SUCCESS) { if (settings.detail_enabled) { stats_prefix_record_bop_update(key, nkey, false); } STATS_CMD_NOKEY(c, bop_update); - if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); - else out_string(c, "SERVER_ERROR out of memory"); - /* swallow the data line */ c->sbytes = vlen; if (c->state == conn_write) { @@ -11374,23 +11440,28 @@ static void process_bop_prepare_nread(conn *c, int cmd, char *key, size_t nkey, } if (ret == ENGINE_SUCCESS) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_BTREE, elem, &c->einfo); - memcpy((void*)c->einfo.score, bkey, (c->einfo.nscore==0 ? sizeof(uint64_t) : c->einfo.nscore)); - if (c->einfo.neflag > 0) - memcpy((void*)c->einfo.eflag, eflag, c->einfo.neflag); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); - c->coll_eitem = (void *)elem; - c->coll_ecount = 1; - c->coll_op = cmd; /* OPERATION_BOP_INSERT | OPERATION_BOP_UPSERT */ - conn_set_state(c, conn_nread); + if (add_ritem_einfo(c, &c->einfo) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } else { + memcpy((void*)c->einfo.score, bkey, (c->einfo.nscore==0 ? sizeof(uint64_t) : c->einfo.nscore)); + if (c->einfo.neflag > 0) + memcpy((void*)c->einfo.eflag, eflag, c->einfo.neflag); + c->coll_eitem = (void *)elem; + c->coll_ecount = 1; + c->coll_op = cmd; /* OPERATION_BOP_INSERT | OPERATION_BOP_UPSERT */ + conn_set_state(c, conn_nread); + } } else { + if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); + else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); + else handle_unexpected_errorcode_ascii(c, __func__, ret); + } + if (ret != ENGINE_SUCCESS) { if (settings.detail_enabled) { stats_prefix_record_bop_insert(key, nkey, false); } STATS_CMD_NOKEY(c, bop_insert); - if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); - else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); - else handle_unexpected_errorcode_ascii(c, __func__, ret); - if (ret != ENGINE_DISCONNECT) { /* swallow the data line */ c->sbytes = vlen; @@ -11468,13 +11539,21 @@ static void process_bop_prepare_nread_keys(conn *c, int cmd, uint32_t vlen, uint } } if (ret == ENGINE_SUCCESS) { - c->coll_strkeys = (void*)&c->memblist; - ritem_set_first(c, CONN_RTYPE_MBLCK, vlen); - c->coll_eitem = (void *)elem; - c->coll_ecount = 0; - c->coll_op = cmd; - conn_set_state(c, conn_nread); + if (add_ritem_mblck(c, &c->memblist) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } else { + c->coll_strkeys = (void*)&c->memblist; + c->coll_eitem = (void *)elem; + c->coll_ecount = 0; + c->coll_op = cmd; + conn_set_state(c, conn_nread); + } } else { + /* ret == ENGINE_ENOMEM */ + out_string(c, "SERVER_ERROR out of memory"); + } + if (ret != ENGINE_SUCCESS) { #ifdef SUPPORT_BOP_MGET if (cmd == OPERATION_BOP_MGET) STATS_CMD_NOKEY(c, bop_mget); @@ -11483,9 +11562,6 @@ static void process_bop_prepare_nread_keys(conn *c, int cmd, uint32_t vlen, uint if (cmd == OPERATION_BOP_SMGET) STATS_CMD_NOKEY(c, bop_smget); #endif - /* ret == ENGINE_ENOMEM */ - out_string(c, "SERVER_ERROR out of memory"); - /* swallow the data line */ assert(c->state == conn_write); c->sbytes = vlen; @@ -11833,18 +11909,27 @@ static void process_mop_prepare_nread(conn *c, int cmd, char *key, size_t nkey, if (ret == ENGINE_SUCCESS) { if (cmd == OPERATION_MOP_INSERT) { mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_MAP, elem, &c->einfo); - ritem_set_first(c, CONN_RTYPE_EINFO, vlen); - } else { - c->ritem = ((value_item *)elem)->ptr; - c->rlbytes = vlen; - c->rltotal = 0; + if (add_ritem_einfo(c, &c->einfo) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } + } else if (add_ritem(c, ((value_item *)elem)->ptr, vlen)){ + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } + if (ret == ENGINE_SUCCESS) { + c->coll_eitem = (void *)elem; + c->coll_ecount = 1; + c->coll_op = cmd; + c->coll_field = *field; + conn_set_state(c, conn_nread); } - c->coll_eitem = (void *)elem; - c->coll_ecount = 1; - c->coll_op = cmd; - c->coll_field = *field; - conn_set_state(c, conn_nread); } else { + if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); + else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); + else handle_unexpected_errorcode_ascii(c, __func__, ret); + } + if (ret != ENGINE_SUCCESS) { if (settings.detail_enabled) { stats_prefix_record_mop_insert(key, nkey, false); } @@ -11853,10 +11938,6 @@ static void process_mop_prepare_nread(conn *c, int cmd, char *key, size_t nkey, } else if (cmd == OPERATION_MOP_UPDATE) { STATS_CMD_NOKEY(c, mop_update); } - if (ret == ENGINE_E2BIG) out_string(c, "CLIENT_ERROR too large value"); - else if (ret == ENGINE_ENOMEM) out_string(c, "SERVER_ERROR out of memory"); - else handle_unexpected_errorcode_ascii(c, __func__, ret); - if (ret != ENGINE_DISCONNECT) { /* swallow the data line */ c->sbytes = vlen; @@ -11878,21 +11959,26 @@ static void process_mop_prepare_nread_fields(conn *c, int cmd, char *key, size_t ret = ENGINE_ENOMEM; } if (ret == ENGINE_SUCCESS) { - c->coll_strkeys = (void*)&c->memblist; - ritem_set_first(c, CONN_RTYPE_MBLCK, flen); - c->coll_ecount = 1; - c->coll_op = cmd; - c->coll_lenkeys = flen; - conn_set_state(c, conn_nread); + if (add_ritem_mblck(c, &c->memblist) != 0) { + out_string(c, "SERVER_ERROR out of memory"); + ret = ENGINE_ENOMEM; + } else { + c->coll_strkeys = (void*)&c->memblist; + c->coll_ecount = 1; + c->coll_op = cmd; + c->coll_lenkeys = flen; + conn_set_state(c, conn_nread); + } } else { + /* ret == ENGINE_ENOMEM */ + out_string(c, "SERVER_ERROR out of memory"); + } + if (ret != ENGINE_SUCCESS) { if (cmd == OPERATION_MOP_DELETE) { STATS_CMD_NOKEY(c, mop_delete); } else if (cmd == OPERATION_MOP_GET) { STATS_CMD_NOKEY(c, mop_get); } - /* ret == ENGINE_ENOMEM */ - out_string(c, "SERVER_ERROR out of memory"); - /* swallow the data line */ c->sbytes = flen; if (c->state == conn_write) { @@ -13079,6 +13165,8 @@ static void process_command_ascii(conn *c, char *command, int cmdlen) c->msgcurr = 0; c->msgused = 0; c->iovused = 0; + c->rlcurr = 0; + c->rlused = 0; if (add_msghdr(c) != 0) { out_string(c, "SERVER_ERROR out of memory preparing response"); return; @@ -13767,7 +13855,7 @@ bool conn_nread(conn *c) { ssize_t res; - if (c->rlbytes == 0) { + if (c->rlcurr == c->rlused) { complete_nread(c); /* complete_nread eventually calls write functions @@ -13785,6 +13873,11 @@ bool conn_nread(conn *c) return true; } + if (c->rlbytes == 0) { + c->ritem = c->rlist[c->rlcurr].iov_base; + c->rlbytes = c->rlist[c->rlcurr].iov_len; + } + /* first check if we have leftovers in the conn_read buffer */ while (c->rbytes > 0) { int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; @@ -13795,14 +13888,14 @@ bool conn_nread(conn *c) c->rlbytes -= tocopy; c->rcurr += tocopy; c->rbytes -= tocopy; - if (c->rltotal > 0) { /* string block read */ - c->rltotal -= tocopy; - if (c->rlbytes == 0 && c->rltotal > 0) { - ritem_set_next(c); - continue; + if (c->rlbytes == 0) { /* string block read */ + c->rlcurr++ ; + if (c->rlcurr < c->rlused) { + c->ritem = c->rlist[c->rlcurr].iov_base; + c->rlbytes = c->rlist[c->rlcurr].iov_len; } } - if (c->rlbytes == 0) { + if (c->rlcurr == c->rlused) { return true; } } @@ -13816,11 +13909,8 @@ bool conn_nread(conn *c) } c->ritem += res; c->rlbytes -= res; - if (c->rltotal > 0) { - c->rltotal -= res; - if (c->rlbytes == 0 && c->rltotal > 0) { - ritem_set_next(c); - } + if (c->rlbytes == 0) { + c->rlcurr++; } return true; } diff --git a/memcached.h b/memcached.h index b2a49df1d..bd72a5aef 100644 --- a/memcached.h +++ b/memcached.h @@ -76,11 +76,15 @@ /** Initial number of sendmsg() argument structures to allocate. */ #define MSG_LIST_INITIAL 10 +/** Initial size of list of items being read. */ +#define RITEM_LIST_INITIAL 1 + /** High water marks for buffer shrinking */ #define READ_BUFFER_HIGHWAT 8192 #define ITEM_LIST_HIGHWAT 400 #define IOV_LIST_HIGHWAT 600 #define MSG_LIST_HIGHWAT 100 +#define RITEM_LIST_HIGHWAT 400 /* Binary protocol stuff */ #define MIN_BIN_PKT_LENGTH 16 @@ -261,12 +265,6 @@ typedef bool (*STATE_FUNC)(conn *); /** * The structure representing a connection into memcached. */ -/* rtype in connection */ -#define CONN_RTYPE_NONE 0 -#define CONN_RTYPE_MBLCK 1 -#define CONN_RTYPE_HINFO 2 -#define CONN_RTYPE_EINFO 3 - struct conn { int sfd; short nevents; @@ -290,13 +288,16 @@ struct conn { STATE_FUNC write_and_go; void *write_and_free; /** free this memory after finishing writing */ - int rtype; /* CONN_RTYPE_XXXXX */ - int rindex; /* used when rtype is HINFO or EINFO */ - char *ritem; /** when we read in an item's value, it goes here */ - uint32_t rlbytes; + /** data for nread state */ + struct iovec *rlist; /** list to read data except commands if needed */ + uint32_t rlcurr; /** element in rlist[] being read now */ + uint32_t rlsize; /** total allocated size of rlist */ + uint32_t rlused; /** number of elements used in rlist[] */ + + char *ritem; /** data pointer being read currently */ + uint32_t rlbytes; /** data length being read currently */ + /* use memory blocks */ - uint32_t rltotal; /* Used when read data with memory block */ - mblck_node_t *membk; /* current memory block pointer */ mblck_list_t memblist; /* (key or field) string memory block list */ /* hash item and elem item info */