From 5aa0549129d1194385c9c2fd043d33d1a9f5376e Mon Sep 17 00:00:00 2001 From: uhm0311 Date: Thu, 8 Aug 2024 19:04:23 +0900 Subject: [PATCH] FEATURE: Add mset API. --- .gitignore | 1 + libmemcached/fetch.cc | 25 ++++ libmemcached/libmemcached_probes.h | 6 +- libmemcached/storage.cc | 163 ++++++++++++++++++++++++- libmemcached/storage.h | 30 ++++- libmemcached/types.h | 3 +- tests/include.am | 12 +- tests/storage.cc | 187 +++++++++++++++++++++++++++++ tests/storage.h | 15 +++ 9 files changed, 432 insertions(+), 10 deletions(-) create mode 100644 tests/storage.cc create mode 100644 tests/storage.h diff --git a/.gitignore b/.gitignore index 4b19f764..01d3d302 100644 --- a/.gitignore +++ b/.gitignore @@ -102,5 +102,6 @@ tests/memrm tests/memslap tests/memstat tests/sasl +tests/storage unittests/unittests devtools/* diff --git a/libmemcached/fetch.cc b/libmemcached/fetch.cc index aa022ee2..47c13cab 100644 --- a/libmemcached/fetch.cc +++ b/libmemcached/fetch.cc @@ -53,6 +53,31 @@ #include +memcached_return_t memcached_fetch_storage_result(memcached_st *ptr, + const char *key, const size_t key_length) +{ + unlikely (not ptr or not key) + { + return MEMCACHED_INVALID_ARGUMENTS; + } + unlikely (ptr->flags.use_udp) + { + return MEMCACHED_NOT_SUPPORTED; + } + + uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); + memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); + + char result[MEMCACHED_DEFAULT_COMMAND_SIZE]; + memcached_return_t rc= memcached_read_one_response(instance, result, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL); + + if (rc == MEMCACHED_STORED) + { + return MEMCACHED_SUCCESS; + } + return rc; +} + char *memcached_fetch(memcached_st *ptr, char *key, size_t *key_length, size_t *value_length, uint32_t *flags, diff --git a/libmemcached/libmemcached_probes.h b/libmemcached/libmemcached_probes.h index b819f78a..fcca3a44 100644 --- a/libmemcached/libmemcached_probes.h +++ b/libmemcached/libmemcached_probes.h @@ -1,5 +1,5 @@ /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: - * + * * Libmemcached library * * Copyright (C) 2011 Data Differential, http://datadifferential.com/ @@ -111,6 +111,10 @@ #define LIBMEMCACHED_MEMCACHED_SET_END_ENABLED() (0) #define LIBMEMCACHED_MEMCACHED_SET_START() #define LIBMEMCACHED_MEMCACHED_SET_START_ENABLED() (0) +#define LIBMEMCACHED_MEMCACHED_MSET_END() +#define LIBMEMCACHED_MEMCACHED_MSET_END_ENABLED() (0) +#define LIBMEMCACHED_MEMCACHED_MSET_START() +#define LIBMEMCACHED_MEMCACHED_MSET_START_ENABLED() (0) #endif diff --git a/libmemcached/storage.cc b/libmemcached/storage.cc index bce3425e..87079159 100644 --- a/libmemcached/storage.cc +++ b/libmemcached/storage.cc @@ -38,6 +38,11 @@ enum memcached_storage_action_t { CAS_OP }; +enum memcached_storage_send_type_t { + SINGLE, + MULTI +}; + /* Inline this */ static inline const char *storage_op_string(memcached_storage_action_t verb) { @@ -121,6 +126,67 @@ static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply return ret; } +static memcached_return_t before_storage_query(memcached_st *ptr, + const char *group_key, size_t group_key_length, + const memcached_storage_query_st* queries, + const size_t number_of_queries) +{ + memcached_return_t rc= initialize_query(ptr); + if (memcached_failed(rc)) + { + return rc; + } + + if (ptr->flags.use_udp) + { + return memcached_set_error(*ptr, MEMCACHED_NOT_SUPPORTED, MEMCACHED_AT); + } + + if (queries == NULL) + { + return memcached_set_error(*ptr, MEMCACHED_NOTFOUND, MEMCACHED_AT, + memcached_literal_param("queries were null")); + } + if (number_of_queries == 0) + { + return memcached_set_error(*ptr, MEMCACHED_NOTFOUND, MEMCACHED_AT, + memcached_literal_param("number_of_queries were zero")); + } + + if (group_key and group_key_length) + { + if (memcached_failed(memcached_key_test(*ptr, (const char * const *)&group_key, &group_key_length, 1))) + { + return memcached_set_error(*ptr, MEMCACHED_BAD_KEY_PROVIDED, MEMCACHED_AT, + memcached_literal_param("A bad group key was provided.")); + } + } + + /* + Here is where we pay for the non-block API. We need to remove any data sitting + in the queue before we start our store operations. + + It might be optimum to bounce the connection if count > some number. + */ + for (uint32_t x= 0; x < memcached_server_count(ptr); x++) + { + memcached_server_write_instance_st instance= + memcached_server_instance_fetch(ptr, x); + + if (memcached_server_response_count(instance)) + { + char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; + + if (ptr->flags.no_block) + (void)memcached_io_write(instance, NULL, 0, true); + + while(memcached_server_response_count(instance)) + (void)memcached_response(instance, buffer, sizeof(buffer), &ptr->result); + } + } + return MEMCACHED_SUCCESS; +} + static memcached_return_t memcached_send_binary(memcached_st *ptr, const char *group_key, size_t group_key_length, @@ -131,7 +197,8 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, time_t expiration, uint32_t flags, uint64_t cas, - memcached_storage_action_t verb) + memcached_storage_action_t verb, + memcached_storage_send_type_t send_type) { bool flush; protocol_binary_request_set request= {}; @@ -227,7 +294,7 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, return MEMCACHED_BUFFERED; } - if (noreply) + if (send_type == MULTI || noreply) { return MEMCACHED_SUCCESS; } @@ -257,7 +324,8 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, const time_t expiration, const uint32_t flags, const uint64_t cas, - memcached_storage_action_t verb) + memcached_storage_action_t verb, + memcached_storage_send_type_t send_type) { char flags_buffer[MEMCACHED_MAXIMUM_INTEGER_DISPLAY_LENGTH +1]; int flags_buffer_length= snprintf(flags_buffer, sizeof(flags_buffer), " %u", flags); @@ -349,7 +417,7 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, if (rc == MEMCACHED_SUCCESS) { - if (ptr->flags.no_reply) + if (send_type == MULTI || ptr->flags.no_reply) { rc= (to_write == false) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS; } @@ -415,19 +483,80 @@ static inline memcached_return_t memcached_send(memcached_st *ptr, rc= memcached_send_binary(ptr, group_key, group_key_length, key, key_length, value, value_length, expiration, - flags, cas, verb); + flags, cas, verb, SINGLE); } else { rc= memcached_send_ascii(ptr, group_key, group_key_length, key, key_length, value, value_length, expiration, - flags, cas, verb); + flags, cas, verb, SINGLE); } return rc; } +static memcached_return_t memcached_multi_send(memcached_st *ptr, + const char *group_key, + size_t group_key_length, + const memcached_storage_query_st* queries, + const size_t number_of_queries, + memcached_return_t *results, + memcached_storage_action_t verb) +{ + arcus_server_check_for_update(ptr); + + memcached_return_t rc; + if (memcached_failed(rc= before_storage_query(ptr, group_key, group_key_length, queries, number_of_queries))) + { + return rc; + } + + if (memcached_server_count(ptr) > MAX_SERVERS_FOR_MULTI_KEY_OPERATION) + { + return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, + memcached_literal_param("memcached instances should be <= MAX_SERVERS_FOR_MULTI_KEY_OPERATION")); + } + + bool is_group_key_set= group_key and group_key_length; + + for (size_t i= 0; i < number_of_queries; i++) + { + if (memcached_failed(rc= memcached_validate_key_length(queries[i].key_length, ptr->flags.binary_protocol))) + { + results[i]= rc; + continue; + } + if (memcached_failed(rc= memcached_key_test(*ptr, &(queries[i].key), &(queries[i].key_length), 1))) + { + results[i]= rc; + continue; + } + + char *safe_group_key= (is_group_key_set ? (char *)group_key : queries[i].key); + size_t safe_group_key_length= (is_group_key_set ? group_key_length : queries[i].key_length); + uint64_t safe_cas = verb == CAS_OP ? queries[i].cas : 0; + + if (ptr->flags.binary_protocol) + { + results[i]= memcached_send_binary(ptr, safe_group_key, safe_group_key_length, + queries[i].key, queries[i].key_length, + queries[i].value, queries[i].value_length, + queries[i].expiration, queries[i].flags, + safe_cas, verb, MULTI); + } + else + { + results[i]= memcached_send_ascii(ptr, safe_group_key, safe_group_key_length, + queries[i].key, queries[i].key_length, + queries[i].value, queries[i].value_length, + queries[i].expiration, queries[i].flags, + safe_cas, verb, MULTI); + } + } + + return MEMCACHED_SUCCESS; +} memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length, const char *value, size_t value_length, @@ -605,3 +734,25 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr, return rc; } +memcached_return_t memcached_mset(memcached_st *ptr, + const memcached_storage_query_st* queries, + const size_t number_of_queries, + memcached_return_t *results) +{ + return memcached_mset_by_key(ptr, NULL, 0, queries, number_of_queries, results); +} + +memcached_return_t memcached_mset_by_key(memcached_st *ptr, + const char *group_key, + size_t group_key_length, + const memcached_storage_query_st* queries, + const size_t number_of_queries, + memcached_return_t *results) +{ + memcached_return_t rc; + LIBMEMCACHED_MEMCACHED_MSET_START(); + rc= memcached_multi_send(ptr, group_key, group_key_length, + queries, number_of_queries, results, SET_OP); + LIBMEMCACHED_MEMCACHED_MSET_END(); + return rc; +} diff --git a/libmemcached/storage.h b/libmemcached/storage.h index 215f4ebb..f6a09bff 100644 --- a/libmemcached/storage.h +++ b/libmemcached/storage.h @@ -1,5 +1,5 @@ /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: - * + * * Libmemcached library * * Copyright (C) 2011 Data Differential, http://datadifferential.com/ @@ -40,6 +40,16 @@ #include "libmemcached/memcached.h" +struct memcached_storage_query_st { + char *key; + size_t key_length; + char *value; + size_t value_length; + time_t expiration; + uint32_t flags; + uint64_t cas; +}; + #ifdef __cplusplus extern "C" { #endif @@ -129,6 +139,24 @@ memcached_return_t memcached_cas_by_key(memcached_st *ptr, uint32_t flags, uint64_t cas); +LIBMEMCACHED_API +memcached_return_t memcached_mset(memcached_st *ptr, + const memcached_storage_query_st* queries, + const size_t number_of_queries, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_mset_by_key(memcached_st *ptr, + const char *group_key, + size_t group_key_length, + const memcached_storage_query_st* queries, + const size_t number_of_queries, + memcached_return_t *results); + +LIBMEMCACHED_API +memcached_return_t memcached_fetch_storage_result(memcached_st *ptr, + const char *key, const size_t key_length); + #ifdef __cplusplus } #endif diff --git a/libmemcached/types.h b/libmemcached/types.h index cc8cf828..33d8a848 100644 --- a/libmemcached/types.h +++ b/libmemcached/types.h @@ -15,7 +15,7 @@ * limitations under the License. */ /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: - * + * * Libmemcached library * * Copyright (C) 2011 Data Differential, http://datadifferential.com/ @@ -60,6 +60,7 @@ typedef struct memcached_stat_st memcached_stat_st; typedef struct memcached_analysis_st memcached_analysis_st; typedef struct memcached_result_st memcached_result_st; typedef struct memcached_array_st memcached_array_st; +typedef struct memcached_storage_query_st memcached_storage_query_st; typedef struct memcached_error_t memcached_error_t; // All of the flavors of memcache_server_st diff --git a/tests/include.am b/tests/include.am index 29b21027..a53df3da 100644 --- a/tests/include.am +++ b/tests/include.am @@ -65,6 +65,16 @@ tests_internals_LDADD+= libtest/libtest.la check_PROGRAMS+= tests/internals noinst_PROGRAMS+= tests/internals +# Test storage +tests_storage_SOURCES= tests/storage.cc +tests_storage_CXXFLAGS = $(AM_CXXFLAGS) ${PTHREAD_CFLAGS} +tests_storage_DEPENDENCIES= libmemcachedinternal/libmemcachedinternal.la libtest/libtest.la libmemcachedinternal/libmemcachedutilinternal.la +tests_storage_LDADD= libmemcachedinternal/libmemcachedinternal.la +tests_storage_LDADD+= ${PTHREAD_LIBS} +tests_storage_LDADD+= libmemcachedinternal/libmemcachedutilinternal.la +tests_storage_LDADD+= libtest/libtest.la +check_PROGRAMS+= tests/storage +noinst_PROGRAMS+= tests/storage tests_testapp_CXXFLAGS = $(AM_CXXFLAGS) ${PTHREAD_CFLAGS} tests_testapp_CFLAGS= $(AM_CFLAGS) $(NO_CONVERSION) $(NO_STRICT_ALIASING) @@ -239,7 +249,7 @@ tests_memdump_DEPENDENCIES= libtest/libtest.la $(TESTS_LDADDS) tests_memdump_LDADD= $(tests_memdump_DEPENDENCIES) check_PROGRAMS+= tests/memdump noinst_PROGRAMS+= tests/memdump - + # Test linking with C application tests_c_test_SOURCES= tests/c_test.c tests_c_test_CFLAGS= ${PTHREAD_CFLAGS} diff --git a/tests/storage.cc b/tests/storage.cc new file mode 100644 index 00000000..9d318312 --- /dev/null +++ b/tests/storage.cc @@ -0,0 +1,187 @@ +#include +#include + +using namespace libtest; + +#include "tests/storage.h" + +#define MSET_COUNT 5 +#define BUFFER_SIZE 64 +#define EXPIRE_TIME 60 + +static void safe_free(char *c) +{ + if (c != NULL) + { + free(c); + } +} + +static void safe_free_queries(memcached_storage_query_st queries[MSET_COUNT]) +{ + for (int i= 0; i < MSET_COUNT; i++) + { + safe_free(queries[i].key); + safe_free(queries[i].value); + } +} + +test_return_t mset_and_get_test(memcached_st *mc) +{ + memcached_storage_query_st queries[MSET_COUNT]; + srand(time(NULL)); + + for (int i= 0; i < MSET_COUNT; i++) + { + char *key= (char *)malloc(BUFFER_SIZE); + if (not key) + { + printf("key cannot be allocated...\n"); + return TEST_FAILURE; + } + + char *value= (char *)malloc(BUFFER_SIZE); + if (not value) + { + printf("value cannot be allocated...\n"); + safe_free(key); + return TEST_FAILURE; + } + + memset(key, 0, BUFFER_SIZE); + sprintf(key, "MSET:mset-key-%d", i); + + memset(value, 0, BUFFER_SIZE); + sprintf(value, "mset-value-%d", i); + + queries[i].key= key; + queries[i].key_length= strlen(key); + + queries[i].value= value; + queries[i].value_length= strlen(value); + + queries[i].expiration= EXPIRE_TIME; + queries[i].flags= (uint32_t) rand(); + } + + memcached_return_t results[MSET_COUNT]; + memcached_return_t rc= memcached_mset(mc, queries, MSET_COUNT, results); + printf("memcached_mset: rc is %s\n", memcached_strerror(mc, rc)); + + if (memcached_failed(rc)) + { + safe_free_queries(queries); + return TEST_FAILURE; + } + + for (int i= 0; i < MSET_COUNT; i++) + { + printf("memcached_mset: rc[%d] is %s\n", i, memcached_strerror(mc, results[i])); + + if (memcached_failed(results[i])) + { + safe_free_queries(queries); + return TEST_FAILURE; + } + } + + for (int i= 0; i < MSET_COUNT; i++) + { + rc= memcached_fetch_storage_result(mc, queries[i].key, queries[i].key_length); + printf("memcached_storage_fetch: rc[%d] is %s\n", i, memcached_strerror(mc, rc)); + + if (memcached_failed(rc)) + { + safe_free_queries(queries); + return TEST_FAILURE; + } + } + + for (int i= 0; i < MSET_COUNT; i++) + { + size_t value_length= -1; + uint32_t flags= -1; + + char *value= memcached_get(mc, queries[i].key, queries[i].key_length, &value_length, &flags, &rc); + printf("memcached_get: rc[%d] is %s\n", i, memcached_strerror(mc, rc)); + + if (value == NULL || memcached_failed(rc)) + { + safe_free(value); + safe_free_queries(queries); + return TEST_FAILURE; + } + + printf("memcached_get: flags[%d] is %u\n", i, flags); + printf("memcached_get: value[%d] is %s\n", i, value); + + if (queries[i].value_length != value_length) + { + printf("memcached_get: value_length[%d] is not equal... stored %ld but got %ld\n", i, queries[i].value_length, value_length); + + safe_free(value); + safe_free_queries(queries); + return TEST_FAILURE; + } + if (strcmp(queries[i].value, value)) + { + printf("memcached_get: value[%d] is not equal... stored %s but got %s\n", i, queries[i].value, value); + + safe_free(value); + safe_free_queries(queries); + return TEST_FAILURE; + } + if (queries[i].flags != flags) + { + printf("memcached_get: flags[%d] is not equal... stored %u but got %u\n", i, queries[i].flags, flags); + + safe_free(value); + safe_free_queries(queries); + return TEST_FAILURE; + } + + safe_free(value); + } + + safe_free_queries(queries); + + return TEST_SUCCESS; +} + +/* + Test cases +*/ +test_st mset_tests[] ={ + {"mset_and_get_test", true, (test_callback_fn*)mset_and_get_test }, + {0, 0, 0} +}; + +collection_st collection[] ={ + {"mset_tests", 0, 0, mset_tests}, + {0, 0, 0, 0} +}; + +#define TEST_PORT_BASE MEMCACHED_DEFAULT_PORT +10 + +#include "tests/libmemcached_world.h" + +void get_world(Framework *world) +{ + world->collections= collection; + + world->_create= (test_callback_create_fn*)world_create; + world->_destroy= (test_callback_destroy_fn*)world_destroy; + + world->item._startup= (test_callback_fn*)world_test_startup; + world->item.set_pre((test_callback_fn*)world_pre_run); + world->item.set_flush((test_callback_fn*)world_flush); + world->item.set_post((test_callback_fn*)world_post_run); + world->_on_error= (test_callback_error_fn*)world_on_error; + + world->collection_startup= (test_callback_fn*)world_container_startup; + world->collection_shutdown= (test_callback_fn*)world_container_shutdown; + + world->set_runner(&default_libmemcached_runner); + + world->set_socket(); +} diff --git a/tests/storage.h b/tests/storage.h new file mode 100644 index 00000000..bdf7d74a --- /dev/null +++ b/tests/storage.h @@ -0,0 +1,15 @@ +#ifndef __TESTS_STORAGE_H__ +#define __TESTS_STORAGE_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +LIBTEST_LOCAL +test_return_t mset_and_get_test(memcached_st *mc); + +#ifdef __cplusplus +} +#endif + +#endif /* __TESTS_STORAGE_H__ */