Skip to content

Commit

Permalink
FEATURE: Add a mop upsert command
Browse files Browse the repository at this point in the history
  • Loading branch information
ing-eoking committed Jul 11, 2024
1 parent 3151b64 commit e37e85f
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 2 deletions.
23 changes: 21 additions & 2 deletions libmemcached/collection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ typedef enum {
SOP_DELETE_OP,
SOP_EXIST_OP,
MOP_INSERT_OP,
MOP_UPSERT_OP,
MOP_UPDATE_OP,
MOP_GET_OP,
MOP_DELETE_OP,
Expand Down Expand Up @@ -65,6 +66,7 @@ static inline const char *coll_op_string(memcached_coll_action_t verb)
case SOP_DELETE_OP: return "sop delete ";
case SOP_EXIST_OP: return "sop exist ";
case MOP_INSERT_OP: return "mop insert ";
case MOP_UPSERT_OP: return "mop upsert ";
case MOP_UPDATE_OP: return "mop update ";
case MOP_GET_OP: return "mop get ";
case MOP_DELETE_OP: return "mop delete ";
Expand Down Expand Up @@ -105,6 +107,7 @@ static inline int coll_op_length(memcached_coll_action_t verb)
case SOP_DELETE_OP: return 11;
case SOP_EXIST_OP: return 10;
case MOP_INSERT_OP: return 11;
case MOP_UPSERT_OP: return 11;
case MOP_UPDATE_OP: return 11;
case MOP_GET_OP: return 8;
case MOP_DELETE_OP: return 11;
Expand Down Expand Up @@ -1291,7 +1294,7 @@ static memcached_return_t do_coll_insert(memcached_st *ptr,
{
/* no sub key */
}
else if (verb == MOP_INSERT_OP)
else if (verb == MOP_INSERT_OP || verb == MOP_UPSERT_OP)
{
size_t mkey_length = query->sub_key.mkey.length;
if (mkey_length > MEMCACHED_COLL_MAX_MOP_MKEY_LENG)
Expand Down Expand Up @@ -1334,7 +1337,8 @@ static memcached_return_t do_coll_insert(memcached_st *ptr,
if (attributes)
{
bool set_overflowaction= verb != SOP_INSERT_OP &&
verb != MOP_INSERT_OP
verb != MOP_INSERT_OP &&
verb != MOP_UPSERT_OP
&& attributes->overflowaction
&& attributes->overflowaction != OVERFLOWACTION_NONE;

Expand Down Expand Up @@ -3793,6 +3797,21 @@ memcached_return_t memcached_mop_insert(memcached_st *ptr,
&query, NULL, attributes, MOP_INSERT_OP);
}

memcached_return_t memcached_mop_upsert(memcached_st *ptr,
const char *key, size_t key_length,
const char *mkey, size_t mkey_length,
const char *value, size_t value_length,
memcached_coll_create_attrs_st *attributes)
{
memcached_coll_query_st query;
memcached_mop_query_init(&query, mkey, mkey_length);
query.value = value;
query.value_length = value_length;

return do_coll_insert(ptr, key, key_length,
&query, NULL, attributes, MOP_UPSERT_OP);
}

memcached_return_t memcached_mop_update(memcached_st *ptr,
const char *key, size_t key_length,
const char *mkey, size_t mkey_length,
Expand Down
19 changes: 19 additions & 0 deletions libmemcached/collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,25 @@ memcached_return_t memcached_mop_insert(memcached_st *ptr,
const char *value, size_t value_length,
memcached_coll_create_attrs_st *attributes);

/**
* Upsert (update the element if it exists, insert otherwise) an element into the map item.
* Optionally create the item if it does not exist.
* @param ptr memcached handle.
* @param key map item's key.
* @param key_length key length (number of bytes).
* @param mkey map element's key.
* @param mkey_length mkey length (number of bytes).
* @param value buffer holding the element value.
* @param value_length length of the element value (number of bytes).
* @param attrs if not NULL, create the item using the attributes if the item does not exist.
*/
LIBMEMCACHED_API
memcached_return_t memcached_mop_upsert(memcached_st *ptr,
const char *key, size_t key_length,
const char *mkey, size_t mkey_length,
const char *value, size_t value_length,
memcached_coll_create_attrs_st *attributes);

/**
* Update the existing element in the map item.
* @param ptr memcached handle.
Expand Down
63 changes: 63 additions & 0 deletions tests/mem_functions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11402,6 +11402,68 @@ static test_return_t arcus_1_10_map_insert(memcached_st *memc)
return TEST_SUCCESS;
}

static test_return_t arcus_1_10_map_upsert(memcached_st *memc)
{
uint32_t flags= 10;
int32_t exptime= 600;
uint32_t maxcount= 1000;

memcached_coll_create_attrs_st attributes;
memcached_coll_create_attrs_init(&attributes, flags, exptime, maxcount);

// 1. CREATED_STORED
memcached_return_t rc= memcached_mop_upsert(memc, test_literal_param("map:a_map"), test_literal_param("mkey"), test_literal_param("value"), &attributes);
test_true_got(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED, memcached_strerror(NULL, rc));
test_true_got(memcached_get_last_response_code(memc) == MEMCACHED_CREATED_STORED, memcached_strerror(NULL, memcached_get_last_response_code(memc)));

// 2. NOT_FOUND
rc= memcached_mop_upsert(memc, test_literal_param("map:no_map"), test_literal_param("mkey"), test_literal_param("value"), NULL);
test_true_got(rc == MEMCACHED_NOTFOUND, memcached_strerror(NULL, rc));

// 3. TYPE_MISMATCH
rc= memcached_set(memc, test_literal_param("kv:item"), test_literal_param("value"), 600, 0);
test_true_got(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED, memcached_strerror(NULL, rc));
rc= memcached_mop_upsert(memc, test_literal_param("kv:item"), test_literal_param("mkey"), test_literal_param("value"), NULL);
test_true_got(rc == MEMCACHED_TYPE_MISMATCH, memcached_strerror(NULL, rc));

// 4. REPLACED
rc= memcached_mop_upsert(memc, test_literal_param("map:a_map"), test_literal_param("mkey"), test_literal_param("value"), NULL);
test_true_got(rc == MEMCACHED_REPLACED, memcached_strerror(NULL, rc));

// 5. CLIENT_ERROR too large value
char too_large[MEMCACHED_COLL_MAX_ELEMENT_SIZE + 1];
rc= memcached_mop_upsert(memc, test_literal_param("map:a_map"), test_literal_param("mkey1"), too_large, MEMCACHED_COLL_MAX_ELEMENT_SIZE+1, &attributes);
test_true_got(rc == MEMCACHED_CLIENT_ERROR, memcached_strerror(NULL, rc));

sleep(MEMCACHED_SERVER_FAILURE_RETRY_TIMEOUT + 1);

// 6. OVERFLOWED
for (uint32_t i=1; i<maxcount; i++)
{
char buffer[15];
size_t buffer_len= snprintf(buffer, 15, "mkey%d", i);
rc= memcached_mop_upsert(memc, test_literal_param("map:a_map"), buffer, buffer_len, test_literal_param("value"), &attributes);
test_true_got(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED, memcached_strerror(NULL, rc));
test_true_got(memcached_get_last_response_code(memc) == MEMCACHED_STORED, memcached_strerror(NULL, memcached_get_last_response_code(memc)));
}

memcached_coll_attrs_st attrs;
memcached_coll_attrs_init(&attrs);
memcached_coll_attrs_set_overflowaction(&attrs, OVERFLOWACTION_ERROR);
memcached_coll_attrs_set_expiretime(&attrs, 1000);

rc= memcached_set_attrs(memc, test_literal_param("map:a_map"), &attrs);
test_true_got(rc == MEMCACHED_SUCCESS, memcached_strerror(NULL, rc));

rc= memcached_mop_upsert(memc, test_literal_param("map:a_map"), test_literal_param("mkey1"), test_literal_param("value"), &attributes);
test_true_got(rc == MEMCACHED_REPLACED, memcached_strerror(NULL, rc));

rc= memcached_mop_upsert(memc, test_literal_param("map:a_map"), test_literal_param("last_mkey"), test_literal_param("value"), &attributes);
test_true_got(rc == MEMCACHED_OVERFLOWED, memcached_strerror(NULL, rc));

return TEST_SUCCESS;
}

static test_return_t arcus_1_10_map_update(memcached_st *memc)
{
uint32_t flags= 10;
Expand Down Expand Up @@ -11910,6 +11972,7 @@ test_st arcus_1_9_tests[] ={
test_st arcus_1_10_tests[] ={
{"arcus_1_10_map_create", true, (test_callback_fn*)arcus_1_10_map_create},
{"arcus_1_10_map_insert", true, (test_callback_fn*)arcus_1_10_map_insert},
{"arcus_1_10_map_upsert", true, (test_callback_fn*)arcus_1_10_map_upsert},
{"arcus_1_10_map_update", true, (test_callback_fn*)arcus_1_10_map_update},
{"arcus_1_10_map_delete", true, (test_callback_fn*)arcus_1_10_map_delete},
{"arcus_1_10_map_delete_all", true, (test_callback_fn*)arcus_1_10_map_delete_all},
Expand Down

0 comments on commit e37e85f

Please sign in to comment.