From 882464f941da1df59c5fec44970427318ed56bc4 Mon Sep 17 00:00:00 2001 From: Yao Yue Date: Tue, 4 Apr 2017 00:11:03 -0700 Subject: [PATCH 1/9] add data structures for topics, subscribers --- src/protocol/data/redis/cmd_pubsub.h | 7 + src/protocol/data/redis/request.h | 2 + src/storage/CMakeLists.txt | 1 + src/storage/pubsub/CMakeLists.txt | 5 + src/storage/pubsub/README | 5 + src/storage/pubsub/hashtable.h | 4 + src/storage/pubsub/index.h | 10 ++ src/storage/pubsub/listener.c | 218 +++++++++++++++++++++++++++ src/storage/pubsub/listener.h | 41 +++++ src/storage/pubsub/topic.c | 218 +++++++++++++++++++++++++++ src/storage/pubsub/topic.h | 41 +++++ 11 files changed, 552 insertions(+) create mode 100644 src/protocol/data/redis/cmd_pubsub.h create mode 100644 src/storage/pubsub/CMakeLists.txt create mode 100644 src/storage/pubsub/README create mode 100644 src/storage/pubsub/hashtable.h create mode 100644 src/storage/pubsub/index.h create mode 100644 src/storage/pubsub/listener.c create mode 100644 src/storage/pubsub/listener.h create mode 100644 src/storage/pubsub/topic.c create mode 100644 src/storage/pubsub/topic.h diff --git a/src/protocol/data/redis/cmd_pubsub.h b/src/protocol/data/redis/cmd_pubsub.h new file mode 100644 index 000000000..446cf4ec7 --- /dev/null +++ b/src/protocol/data/redis/cmd_pubsub.h @@ -0,0 +1,7 @@ +#pragma once + +/* type string # of args */ +#define REQ_PUBSUB(ACTION) \ + ACTION( REQ_PUBLISH, "publish", 3 )\ + ACTION( REQ_SUBSCRIBE, "subscribe", -2 )\ + ACTION( REQ_UNSUBSCRIBE, "unsubscribe", -2 ) diff --git a/src/protocol/data/redis/request.h b/src/protocol/data/redis/request.h index d5fb5f938..4289bd344 100644 --- a/src/protocol/data/redis/request.h +++ b/src/protocol/data/redis/request.h @@ -2,6 +2,7 @@ #include "cmd_hash.h" #include "cmd_misc.h" +#include "cmd_pubsub.h" #include "cmd_zset.h" #include @@ -44,6 +45,7 @@ typedef enum cmd_type { REQ_HASH(GET_TYPE) REQ_ZSET(GET_TYPE) REQ_MISC(GET_TYPE) + REQ_PUBSUB(GET_TYPE) REQ_SENTINEL } cmd_type_e; #undef GET_TYPE diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index c06eede35..0756902af 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -1,2 +1,3 @@ add_subdirectory(cuckoo) +add_subdirectory(pubsub) add_subdirectory(slab) diff --git a/src/storage/pubsub/CMakeLists.txt b/src/storage/pubsub/CMakeLists.txt new file mode 100644 index 000000000..4578d1360 --- /dev/null +++ b/src/storage/pubsub/CMakeLists.txt @@ -0,0 +1,5 @@ +set(SOURCE + listener.c + topic.c) + +add_library(pubsub ${SOURCE}) diff --git a/src/storage/pubsub/README b/src/storage/pubsub/README new file mode 100644 index 000000000..7def09c6a --- /dev/null +++ b/src/storage/pubsub/README @@ -0,0 +1,5 @@ +Mapping #1: client -> channel, to clean-up when client quits +Mapping #2: channel -> client, to handle subscription, fanout + +a channel with no subscriber should be deleted. +a client with no channel subscription is kept. diff --git a/src/storage/pubsub/hashtable.h b/src/storage/pubsub/hashtable.h new file mode 100644 index 000000000..8de665265 --- /dev/null +++ b/src/storage/pubsub/hashtable.h @@ -0,0 +1,4 @@ +#pragma once + +#define HASHSIZE(_n) (1ULL << (_n)) +#define HASHMASK(_n) (HASHSIZE(_n) - 1) diff --git a/src/storage/pubsub/index.h b/src/storage/pubsub/index.h new file mode 100644 index 000000000..f9c8bc71a --- /dev/null +++ b/src/storage/pubsub/index.h @@ -0,0 +1,10 @@ +#pragma once + +#include + +struct index_node { + TAILQ_ENTRY(index_node) i_tqe; + void *obj; +}; + +TAILQ_HEAD(index_tqh, index_node); diff --git a/src/storage/pubsub/listener.c b/src/storage/pubsub/listener.c new file mode 100644 index 000000000..ed103872c --- /dev/null +++ b/src/storage/pubsub/listener.c @@ -0,0 +1,218 @@ +#include "hashtable.h" +#include "listener.h" + +#include +#include +#include + +static struct listener_slh * +_ht_alloc(uint32_t nentry) +{ + struct listener_slh *table; + uint32_t i; + + table = cc_alloc(sizeof(*table) * nentry); + + if (table != NULL) { + for (i = 0; i < nentry; ++i) { + SLIST_INIT(&table[i]); + } + } + + return table; +} + +struct listener_ht * +listener_ht_create(uint32_t hash_power) +{ + struct listener_ht *ht; + uint32_t nentry; + + ASSERT(hash_power > 0); + + ht = cc_alloc(sizeof(struct listener_ht)); + if (ht == NULL) { + return NULL; + } + + ht->hash_power = hash_power; + ht->nlistener = 0; + nentry = HASHSIZE(ht->hash_power); + ht->table = _ht_alloc(nentry); + if (ht->table == NULL) { + cc_free(ht); + return NULL; + } + + return ht; +} + +void listener_ht_destroy(struct listener_ht **ht) +{ + ASSERT(ht != NULL); + + if (*ht != NULL && (*ht)->table != NULL) { + cc_free((*ht)->table); + } + + cc_free(*ht); + *ht = NULL; +} + + +static struct listener_slh * +_get_bucket(const channel_p ch, struct listener_ht *ht) +{ + /* use the _address_ of the channel to hash */ + uint32_t hval = hash_lookup3((char *)&ch, sizeof(channel_p), 0); + return &(ht->table[hval & HASHMASK(ht->hash_power)]); +} + +struct listener * +listener_ht_get(const channel_p ch, struct listener_ht *ht) +{ + struct listener_slh *bucket; + struct listener *l; + + bucket = _get_bucket(ch, ht); + for (l = SLIST_FIRST(bucket); l != NULL; l = SLIST_NEXT(l, l_sle)) { + if (l->ch == ch) { + return l; + } + } + + return NULL; +} + +void +listener_ht_put(const struct listener *l, struct listener_ht *ht) +{ + struct listener_slh *bucket; + + ASSERT(listener_ht_get(l->ch, ht) == NULL); + + bucket = _get_bucket(l->ch, ht); + SLIST_INSERT_HEAD(bucket, (struct listener *)l, l_sle); + + ht->nlistener++; +} + +void +listener_ht_delete(const channel_p ch, struct listener_ht *ht) +{ + struct listener_slh *bucket; + struct listener *l, *prev; + + bucket = _get_bucket(ch, ht); + for (prev = NULL, l = SLIST_FIRST(bucket); l != NULL; + prev = l, l = SLIST_NEXT(l, l_sle)) { + if (l->ch == ch) { + break; + } + } + + if (prev == NULL) { + SLIST_REMOVE_HEAD(bucket, l_sle); + } else { + SLIST_REMOVE_AFTER(prev, l_sle); + } + + --(ht->nlistener); +} + + +struct listener * +listener_create(channel_p ch, channel_handler_st *handler) +{ + struct listener *l; + + l = cc_alloc(sizeof(struct listener)); + if (l == NULL) { + return NULL; + } + + l->idx = cc_alloc(sizeof(struct index_tqh)); + if (l->idx == NULL) { + cc_free(l); + return NULL; + } + + listener_reset(l); + l->ch = ch; + l->handler = handler; + + return l; +} + +void +listener_destroy(struct listener **l) +{ + ASSERT(l != NULL && *l != NULL); + + struct index_node *curr, *next; + struct index_tqh *idx = (*l)->idx; + + /* delete all elements of the index */ + TAILQ_FOREACH_SAFE(curr, idx, i_tqe, next) { + TAILQ_REMOVE(idx, curr, i_tqe); + cc_free(curr); + } + cc_free(idx); + + cc_free(*l); + *l = NULL; +} + +void +listener_reset(struct listener *l) +{ + l->ch = NULL; + l->handler = NULL; + l->ntopic = 0; + TAILQ_INIT(l->idx); +} + +bool +listener_add_topic(struct listener *l, const struct topic *t) +{ + struct index_node *node; + + ASSERT(l != NULL && t != NULL); + + /* do nothing if already subscribed */ + TAILQ_FOREACH(node, l->idx, i_tqe) { + if (node->obj == t) { + return false; + } + } + + node = cc_alloc(sizeof(struct index_node)); + if (node == NULL) { + return false; + } + node->obj = (struct topic *)t; + + TAILQ_INSERT_TAIL(l->idx, node, i_tqe); + l->ntopic++; + + return true; +} + +void +listener_del_topic(struct listener *l, const struct topic *t) +{ + struct index_node *node; + + /* do nothing if not found */ + TAILQ_FOREACH(node, l->idx, i_tqe) { + if (node->obj == t) { + break; + } + } + if (node == NULL) { + return; + } + + TAILQ_REMOVE(l->idx, node, i_tqe); + l->ntopic--; +} diff --git a/src/storage/pubsub/listener.h b/src/storage/pubsub/listener.h new file mode 100644 index 000000000..872ade1c2 --- /dev/null +++ b/src/storage/pubsub/listener.h @@ -0,0 +1,41 @@ +#pragma once + +#include "index.h" + +#include + +/* + * a listener is a client that has subscribed to at least one channel + */ +struct listener { + SLIST_ENTRY(listener) l_sle; + + channel_p ch; + channel_handler_st *handler; + uint32_t ntopic; + struct index_tqh *idx; /* index of all topics */ +}; + +SLIST_HEAD(listener_slh, listener); + +struct listener_ht { + struct listener_slh *table; + uint32_t nlistener; + uint32_t hash_power; +}; + +struct topic; + +struct listener_ht *listener_ht_create(uint32_t hash_power); +void listener_ht_destroy(struct listener_ht **ht); + +struct listener *listener_ht_get(const channel_p ch, struct listener_ht *ht); +void listener_ht_put(const struct listener *l, struct listener_ht *ht); +void listener_ht_delete(const channel_p ch, struct listener_ht *ht); + +struct listener *listener_create(channel_p ch, channel_handler_st *handler); +void listener_destroy(struct listener **l); +void listener_reset(struct listener *l); + +bool listener_add_topic(struct listener *l, const struct topic *t); +void listener_del_topic(struct listener *l, const struct topic *t); diff --git a/src/storage/pubsub/topic.c b/src/storage/pubsub/topic.c new file mode 100644 index 000000000..77b40ff15 --- /dev/null +++ b/src/storage/pubsub/topic.c @@ -0,0 +1,218 @@ +#include "hashtable.h" +#include "topic.h" + +#include +#include +#include + +static struct topic_slh * +_ht_alloc(uint32_t nentry) +{ + struct topic_slh *table; + uint32_t i; + + table = cc_alloc(sizeof(*table) * nentry); + + if (table != NULL) { + for (i = 0; i < nentry; ++i) { + SLIST_INIT(&table[i]); + } + } + + return table; +} + +struct topic_ht * +topic_ht_create(uint32_t hash_power) +{ + struct topic_ht *ht; + uint32_t nentry; + + ASSERT(hash_power > 0); + + ht = cc_alloc(sizeof(struct topic_ht)); + if (ht == NULL) { + return NULL; + } + + ht->hash_power = hash_power; + ht->ntopic = 0; + nentry = HASHSIZE(ht->hash_power); + ht->table = _ht_alloc(nentry); + if (ht->table == NULL) { + cc_free(ht); + return NULL; + } + + return ht; +} + +void +topic_ht_destroy(struct topic_ht **ht) +{ + ASSERT(ht != NULL); + + if (*ht != NULL && (*ht)->table != NULL) { + cc_free((*ht)->table); + } + + cc_free(*ht); + *ht = NULL; +} + + +static struct topic_slh * +_get_bucket(const struct bstring *name, struct topic_ht *ht) +{ + /* use the _address_ of the channel to hash */ + uint32_t hval = hash_lookup3(name->data, name->len, 0); + return &(ht->table[hval & HASHMASK(ht->hash_power)]); +} + +struct topic * +topic_ht_get(const struct bstring *name, struct topic_ht *ht) +{ + struct topic_slh *bucket; + struct topic *t; + + bucket = _get_bucket(name, ht); + for (t = SLIST_FIRST(bucket); t != NULL; t = SLIST_NEXT(t, t_sle)) { + if (bstring_compare(&t->name, name) == 0) { + return t; + } + } + + return NULL; +} + +void +topic_ht_put(const struct topic *t, struct topic_ht *ht) +{ + struct topic_slh *bucket; + + ASSERT(topic_ht_get(&t->name, ht) == NULL); + + bucket = _get_bucket(&t->name, ht); + SLIST_INSERT_HEAD(bucket, (struct topic *)t, t_sle); + + ht->ntopic++; +} + +void +topic_ht_delete(const struct bstring *name, struct topic_ht *ht) +{ + struct topic_slh *bucket; + struct topic *t, *prev; + + bucket = _get_bucket(name, ht); + for (prev = NULL, t = SLIST_FIRST(bucket); t != NULL; + prev = t, t = SLIST_NEXT(t, t_sle)) { + if (bstring_compare(&t->name, name) == 0) { + break; + } + } + + if (prev == NULL) { + SLIST_REMOVE_HEAD(bucket, t_sle); + } else { + SLIST_REMOVE_AFTER(prev, t_sle); + } + + --(ht->ntopic); +} + + +struct topic * +topic_create(const struct bstring *name) +{ + struct topic *t; + + t = cc_alloc(sizeof(struct topic)); + if (t == NULL) { + return NULL; + } + + t->idx = cc_alloc(sizeof(struct index_tqh)); + if (t->idx == NULL) { + cc_free(t); + return NULL; + } + + topic_reset(t); + t->name = *name; + + return t; +} + +void +topic_destroy(struct topic **t) +{ + ASSERT(t != NULL && *t != NULL); + + struct index_node *curr, *next; + struct index_tqh *idx = (*t)->idx; + + /* delete all elements of the index */ + TAILQ_FOREACH_SAFE(curr, idx, i_tqe, next) { + TAILQ_REMOVE(idx, curr, i_tqe); + cc_free(curr); + } + cc_free(idx); + + cc_free(*t); + *t = NULL; +} + +void +topic_reset(struct topic *t) +{ + t->name = null_bstring; + t->nsub = 0; + TAILQ_INIT(t->idx); +} + +bool +topic_add_listener(struct topic *t, const struct listener *l) +{ + struct index_node *node; + + ASSERT(t != NULL && l != NULL); + + /* do nothing if already subscribed */ + TAILQ_FOREACH(node, t->idx, i_tqe) { + if (node->obj == l) { + return false; + } + } + + node = cc_alloc(sizeof(struct index_node)); + if (node == NULL) { + return false; + } + node->obj = (struct listener *)l; + + TAILQ_INSERT_TAIL(t->idx, node, i_tqe); + t->nsub++; + + return true; +} + +void +topic_del_listener(struct topic *t, const struct listener *l) +{ + struct index_node *node; + + /* do nothing if not found */ + TAILQ_FOREACH(node, t->idx, i_tqe) { + if (node->obj == l) { + break; + } + } + if (node == NULL) { + return; + } + + TAILQ_REMOVE(t->idx, node, i_tqe); + t->nsub--; + cc_free(node); +} diff --git a/src/storage/pubsub/topic.h b/src/storage/pubsub/topic.h new file mode 100644 index 000000000..f9ad7d9f2 --- /dev/null +++ b/src/storage/pubsub/topic.h @@ -0,0 +1,41 @@ +#pragma once + +#include "index.h" + +#include + +/* + * a topic is an endpoint that clients can subscribe to, equivalent to + * "channel" in the original redis protocol. + */ +struct topic { + SLIST_ENTRY(topic) t_sle; + + struct bstring name; + uint32_t nsub; + struct index_tqh *idx; /* index of all listeners */ +}; + +SLIST_HEAD(topic_slh, topic); + +struct topic_ht { + struct topic_slh *table; + uint32_t ntopic; + uint32_t hash_power; +}; + +struct listener; + +struct topic_ht *topic_ht_create(uint32_t hash_power); +void topic_ht_destroy(struct topic_ht **ht); + +struct topic *topic_ht_get(const struct bstring *name, struct topic_ht *ht); +void topic_ht_put(const struct topic *t, struct topic_ht *ht); +void topic_ht_delete(const struct bstring *name, struct topic_ht *ht); + +struct topic *topic_create(const struct bstring *name); +void topic_destroy(struct topic **t); +void topic_reset(struct topic *t); + +bool topic_add_listener(struct topic *t, const struct listener *l); +void topic_del_listener(struct topic *t, const struct listener *l); From 8fce409978860b970e41ba411ca13401d1cca454 Mon Sep 17 00:00:00 2001 From: Yao Yue Date: Thu, 6 Apr 2017 12:52:52 -0700 Subject: [PATCH 2/9] add pubsub server --- CMakeLists.txt | 7 +- config/pubsub.conf | 7 + src/core/data/worker.c | 2 +- src/protocol/data/redis/request.c | 1 + src/server/CMakeLists.txt | 4 + src/server/pubsub/CMakeLists.txt | 29 ++++ src/server/pubsub/admin/CMakeLists.txt | 4 + src/server/pubsub/admin/process.c | 68 +++++++++ src/server/pubsub/admin/process.h | 4 + src/server/pubsub/data/CMakeLists.txt | 5 + src/server/pubsub/data/process.c | 150 +++++++++++++++++++ src/server/pubsub/data/process.h | 30 ++++ src/server/pubsub/data/pubsub.c | 59 ++++++++ src/server/pubsub/data/pubsub.h | 12 ++ src/server/pubsub/main.c | 197 +++++++++++++++++++++++++ src/server/pubsub/setting.c | 18 +++ src/server/pubsub/setting.h | 50 +++++++ src/server/pubsub/stats.c | 21 +++ src/server/pubsub/stats.h | 38 +++++ 19 files changed, 702 insertions(+), 4 deletions(-) create mode 100644 config/pubsub.conf create mode 100644 src/server/pubsub/CMakeLists.txt create mode 100644 src/server/pubsub/admin/CMakeLists.txt create mode 100644 src/server/pubsub/admin/process.c create mode 100644 src/server/pubsub/admin/process.h create mode 100644 src/server/pubsub/data/CMakeLists.txt create mode 100644 src/server/pubsub/data/process.c create mode 100644 src/server/pubsub/data/process.h create mode 100644 src/server/pubsub/data/pubsub.c create mode 100644 src/server/pubsub/data/pubsub.h create mode 100644 src/server/pubsub/main.c create mode 100644 src/server/pubsub/setting.c create mode 100644 src/server/pubsub/setting.h create mode 100644 src/server/pubsub/stats.c create mode 100644 src/server/pubsub/stats.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 10dc97edc..888128019 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,9 +33,10 @@ option(HAVE_ASSERT_LOG "assert_log enabled by default" ON) option(HAVE_ASSERT_PANIC "assert_panic disabled by default" OFF) option(HAVE_LOGGING "logging enabled by default" ON) option(HAVE_STATS "stats enabled by default" ON) -option(TARGET_PINGSERVER "build pingserver binary" ON) -option(TARGET_SLIMCACHE "build slimcache binary" ON) -option(TARGET_TWEMCACHE "build twemcache binary" ON) +option(TARGET_PINGSERVER "build pingserver binary" OFF) +option(TARGET_PUBSUB "build pubsub binary" ON) +option(TARGET_SLIMCACHE "build slimcache binary" OFF) +option(TARGET_TWEMCACHE "build twemcache binary" OFF) option(COVERAGE "code coverage" OFF) # Note: duplicate custom targets only works with Makefile generators, will break XCode & VS diff --git a/config/pubsub.conf b/config/pubsub.conf new file mode 100644 index 000000000..90d333165 --- /dev/null +++ b/config/pubsub.conf @@ -0,0 +1,7 @@ +# if slab profile is specified, then the profile wil be explicitly set +# otherwise, slab profile will automatically be generated by using growth factor, slab size, and chunk size +#slab_profile: 1024 2048 4096 8192 16384 32768 65536 131072 262144 524288 + +debug_log_level: 6 +# debug_log_file: pubsub.log +# debug_log_nbuf: 16384 diff --git a/src/core/data/worker.c b/src/core/data/worker.c index 473922c31..336fbd058 100644 --- a/src/core/data/worker.c +++ b/src/core/data/worker.c @@ -291,7 +291,7 @@ _worker_evwait(void) void * core_worker_evloop(void *arg) { - processor = arg; + post_processor = arg; for(;;) { if (_worker_evwait() != CC_OK) { diff --git a/src/protocol/data/redis/request.c b/src/protocol/data/redis/request.c index 2a38e5c46..2eec939df 100644 --- a/src/protocol/data/redis/request.c +++ b/src/protocol/data/redis/request.c @@ -19,6 +19,7 @@ struct command command_table[REQ_SENTINEL] = { REQ_HASH(CMD_INIT) REQ_ZSET(CMD_INIT) REQ_MISC(CMD_INIT) + REQ_PUBSUB(CMD_INIT) }; #undef CMD_INIT diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 9dff0246e..f6043c56e 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -2,6 +2,10 @@ if(TARGET_PINGSERVER) add_subdirectory(pingserver) endif() +if(TARGET_PUBSUB) + add_subdirectory(pubsub) +endif() + if(TARGET_SLIMCACHE) add_subdirectory(slimcache) endif() diff --git a/src/server/pubsub/CMakeLists.txt b/src/server/pubsub/CMakeLists.txt new file mode 100644 index 000000000..f30ed9f30 --- /dev/null +++ b/src/server/pubsub/CMakeLists.txt @@ -0,0 +1,29 @@ +add_subdirectory(admin) +add_subdirectory(data) + +set(SOURCE + ${SOURCE} + main.c + setting.c + stats.c) + +set(MODULES + core + protocol_admin + protocol_redis + pubsub + time + util) + +set(LIBS + ccommon-static + ${CMAKE_THREAD_LIBS_INIT}) + +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/_bin) +set(TARGET_NAME ${PROJECT_NAME}_pubsub) + +add_executable(${TARGET_NAME} ${SOURCE}) +target_link_libraries(${TARGET_NAME} ${MODULES} ${LIBS}) + +install(TARGETS ${TARGET_NAME} RUNTIME DESTINATION bin) +add_dependencies(service ${TARGET_NAME}) diff --git a/src/server/pubsub/admin/CMakeLists.txt b/src/server/pubsub/admin/CMakeLists.txt new file mode 100644 index 000000000..31a7e65f3 --- /dev/null +++ b/src/server/pubsub/admin/CMakeLists.txt @@ -0,0 +1,4 @@ +set(SOURCE + ${SOURCE} + ${CMAKE_CURRENT_SOURCE_DIR}/process.c + PARENT_SCOPE) diff --git a/src/server/pubsub/admin/process.c b/src/server/pubsub/admin/process.c new file mode 100644 index 000000000..00f36d060 --- /dev/null +++ b/src/server/pubsub/admin/process.c @@ -0,0 +1,68 @@ +#include "process.h" + +#include "protocol/admin/admin_include.h" +#include "util/procinfo.h" + +#include + +#define PUBSUB_ADMIN_MODULE_NAME "pubsub::admin" + +extern struct stats stats; +extern unsigned int nmetric; + +static bool admin_init = false; +static char *buf = NULL; +static size_t cap; + +void +admin_process_setup(void) +{ + log_info("set up the %s module", PUBSUB_ADMIN_MODULE_NAME); + if (admin_init) { + log_warn("%s has already been setup, overwrite", + PUBSUB_ADMIN_MODULE_NAME); + } + + cap = METRIC_PRINT_LEN * nmetric + METRIC_END_LEN; + buf = cc_alloc(cap); + /* TODO: check return status of cc_alloc */ + + admin_init = true; +} + +void +admin_process_teardown(void) +{ + log_info("tear down the %s module", PUBSUB_ADMIN_MODULE_NAME); + if (!admin_init) { + log_warn("%s has never been setup", PUBSUB_ADMIN_MODULE_NAME); + } + + admin_init = false; +} + +static void +_admin_stats(struct response *rsp, struct request *req) +{ + procinfo_update(); + rsp->data.data = buf; + rsp->data.len = print_stats(buf, cap, (struct metric *)&stats, nmetric); +} + +void +admin_process_request(struct response *rsp, struct request *req) +{ + rsp->type = RSP_GENERIC; + + switch (req->type) { + case REQ_STATS: + _admin_stats(rsp, req); + break; + case REQ_VERSION: + rsp->data = str2bstr(VERSION_PRINTED); + break; + default: + rsp->type = RSP_INVALID; + break; + } +} diff --git a/src/server/pubsub/admin/process.h b/src/server/pubsub/admin/process.h new file mode 100644 index 000000000..361230004 --- /dev/null +++ b/src/server/pubsub/admin/process.h @@ -0,0 +1,4 @@ +#pragma once + +void admin_process_setup(void); +void admin_process_teardown(void); diff --git a/src/server/pubsub/data/CMakeLists.txt b/src/server/pubsub/data/CMakeLists.txt new file mode 100644 index 000000000..ff285f978 --- /dev/null +++ b/src/server/pubsub/data/CMakeLists.txt @@ -0,0 +1,5 @@ +set(SOURCE + ${SOURCE} + ${CMAKE_CURRENT_SOURCE_DIR}/process.c + ${CMAKE_CURRENT_SOURCE_DIR}/pubsub.c + PARENT_SCOPE) diff --git a/src/server/pubsub/data/process.c b/src/server/pubsub/data/process.c new file mode 100644 index 000000000..2ac1a8ad6 --- /dev/null +++ b/src/server/pubsub/data/process.c @@ -0,0 +1,150 @@ +#include "process.h" + +#include "protocol/data/redis_include.h" +#include "pubsub.h" + +#include +#include + +#define PUBSUB_PROCESS_MODULE_NAME "pubsub::process" + +command_fn command_registry[REQ_SENTINEL]; + +static bool process_init = false; +static process_metrics_st *process_metrics = NULL; + + +void +process_setup(process_metrics_st *metrics) +{ + log_info("set up the %s module", PUBSUB_PROCESS_MODULE_NAME); + + if (process_init) { + log_warn("%s has already been setup, overwrite", + PUBSUB_PROCESS_MODULE_NAME); + } + + pubsub_setup(); + + command_registry[REQ_SUBSCRIBE] = command_subscribe; + + process_metrics = metrics; + process_init = true; +} + +void +process_teardown(void) +{ + log_info("tear down the %s module", PUBSUB_PROCESS_MODULE_NAME); + if (!process_init) { + log_warn("%s has never been setup", PUBSUB_PROCESS_MODULE_NAME); + } + + pubsub_teardown(); + + command_registry[REQ_SUBSCRIBE] = NULL; + + process_metrics = NULL; + process_init = false; +} + +static void +process_request_sock(struct response *rsp, struct request *req, struct buf_sock *s) +{ + log_verb("processing req %p, write rsp to %p", req, rsp); + INCR(process_metrics, process_req); + + if (command_registry[req->type] == NULL) { + /* return error */ + } + + command_registry[req->type](rsp, req, s); +} + +int +pubsub_process_read(struct buf_sock *s) +{ + int status; + struct request *req; + struct response *rsp; + + log_verb("post-read processing"); + + req = request_borrow(); + rsp = response_borrow(); + + /* keep parse-process-compose until running out of data in rbuf */ + while (buf_rsize(s->rbuf) > 0) { + /* stage 1: parsing */ + log_verb("%"PRIu32" bytes left", buf_rsize(s->rbuf)); + + status = parse_req(req, s->rbuf); + if (status == PARSE_EUNFIN) { + buf_lshift(s->rbuf); + return 0; + } + if (status != PARSE_OK) { + /* parsing errors are all client errors, since we don't know + * how to recover from client errors in this condition (we do not + * have a valid request so we don't know where the invalid request + * ends), we should close the connection + */ + log_warn("illegal request received, status: %d", status); + return -1; + } + + /* stage 2: processing- check for quit, allocate response(s), process */ + + /* quit is special, no processing/resposne expected */ + if (req->type == REQ_QUIT) { + log_info("peer called quit"); + return -1; + } + + /* actual processing */ + process_request_sock(rsp, req, s); + + /* stage 3: write response(s) if necessary */ + + /* noreply means no need to write to buffers */ + + /* logging, clean-up */ + } + + request_return(&req); + response_return(&rsp); + + return 0; +} + + +int +pubsub_process_write(struct buf_sock *s) +{ + log_verb("post-write processing"); + + buf_lshift(s->rbuf); + buf_lshift(s->wbuf); + dbuf_shrink(&s->rbuf); + dbuf_shrink(&s->wbuf); + + return 0; +} + + +int +pubsub_process_error(struct buf_sock *s) +{ + struct request *req; + struct response *rsp; + + log_verb("post-error processing"); + + /* normalize buffer size */ + buf_reset(s->rbuf); + dbuf_shrink(&s->rbuf); + buf_reset(s->wbuf); + dbuf_shrink(&s->wbuf); + + return 0; +} diff --git a/src/server/pubsub/data/process.h b/src/server/pubsub/data/process.h new file mode 100644 index 000000000..63ec79ddc --- /dev/null +++ b/src/server/pubsub/data/process.h @@ -0,0 +1,30 @@ +#pragma once + +#include "protocol/data/redis_include.h" + +#include +#include +#include +#include + +/* name type description */ +#define PROCESS_METRIC(ACTION) \ + ACTION( process_req, METRIC_COUNTER, "# requests processed" )\ + ACTION( process_ex, METRIC_COUNTER, "# processing error" )\ + ACTION( publish, METRIC_COUNTER, "# publish requests" )\ + ACTION( subscribe, METRIC_COUNTER, "# subscribe requests" )\ + ACTION( unsubscribe, METRIC_COUNTER, "# unsubscribe requests") + +typedef struct { + PROCESS_METRIC(METRIC_DECLARE) +} process_metrics_st; + +typedef void (* command_fn)(struct response *, struct request *, struct buf_sock *); +extern command_fn command_registry[REQ_SENTINEL]; + +void process_setup(process_metrics_st *metrics); +void process_teardown(void); + +int pubsub_process_read(struct buf_sock *s); +int pubsub_process_write(struct buf_sock *s); +int pubsub_process_error(struct buf_sock *s); diff --git a/src/server/pubsub/data/pubsub.c b/src/server/pubsub/data/pubsub.c new file mode 100644 index 000000000..cd35ed595 --- /dev/null +++ b/src/server/pubsub/data/pubsub.c @@ -0,0 +1,59 @@ +#include "pubsub.h" + +#include "protocol/data/redis_include.h" +#include "storage/pubsub/listener.h" +#include "storage/pubsub/topic.h" + +#include +#include + +static struct listener_ht *lht; +static struct topic_ht *tht; + +void +pubsub_setup(void) +{ + lht = listener_ht_create(16); + tht = topic_ht_create(16); +} + +void +pubsub_teardown(void) +{ + listener_ht_destroy(&lht); + topic_ht_destroy(&tht); +} + +/* "subscribe topic [topic ...]" */ +void +command_subscribe(struct response *rsp, struct request *req, struct buf_sock *s) +{ + struct element *el; + struct listener *l; + struct topic *t; + uint32_t ntopic = req->token->nelem - 1; + + l = listener_ht_get(s->ch, lht); + if (l == NULL) { + l = listener_create(s->ch, s->hdl); + listener_ht_put(l, lht); + } + + for (int i = 1; i < ntopic; i++) { + el = array_get(req->token, i); + if (el->type != ELEM_BULK) { + /* handle client error */ + }; + + t = topic_ht_get(&el->bstr, tht); + if (t == NULL) { + t = topic_create(&el->bstr); + } + listener_add_topic(l, t); + } + + rsp->type = ELEM_STR; + el = array_push(rsp->token); + el->type = ELEM_STR; + el->bstr = str2bstr(RSP_STR_OK); +} diff --git a/src/server/pubsub/data/pubsub.h b/src/server/pubsub/data/pubsub.h new file mode 100644 index 000000000..72df15661 --- /dev/null +++ b/src/server/pubsub/data/pubsub.h @@ -0,0 +1,12 @@ +#pragma once + +struct request; +struct response; +struct buf_sock; + +void pubsub_setup(void); +void pubsub_teardown(void); + +void command_subscribe(struct response *rsp, struct request *req, struct buf_sock *s); +void command_unsubscribe(struct response *rsp, struct request *req, struct buf_sock *s); +void command_publish(struct response *rsp, struct request *req, struct buf_sock *s); diff --git a/src/server/pubsub/main.c b/src/server/pubsub/main.c new file mode 100644 index 000000000..411f5fc9b --- /dev/null +++ b/src/server/pubsub/main.c @@ -0,0 +1,197 @@ +#include "setting.h" +#include "stats.h" + +#include "time/time.h" +#include "util/util.h" + +#include + +#include +#include +#include +#include +#include +#include + +struct processor worker_processor = { + pubsub_process_read, + pubsub_process_write, + pubsub_process_error, +}; + +static void +show_usage(void) +{ + log_stdout( + "Usage:" CRLF + " pelikan_pubsub [option|config]" CRLF + ); + log_stdout( + "Description:" CRLF + " pelikan_pubsub is an in-memory pub/sub server." CRLF + CRLF + " It supports basic Redis pub/sub commands:" CRLF + " subscribe, unsubscribe, publish,... " CRLF + ); + log_stdout( + "Command-line options:" CRLF + " -h, --help show this message" CRLF + " -v, --version show version number" CRLF + " -c, --config list & describe all options in config" CRLF + " -s, --stats list & describe all metrics in stats" CRLF + ); + log_stdout( + "Example:" CRLF + " pelikan_pubsub pubsub.conf" CRLF CRLF + "Sample config files can be found under the config dir." CRLF + ); +} + +static void +teardown(void) +{ + core_teardown(); + admin_process_teardown(); + compose_teardown(); + parse_teardown(); + procinfo_teardown(); + time_teardown(); + + timing_wheel_teardown(); + tcp_teardown(); + sockio_teardown(); + event_teardown(); + dbuf_teardown(); + buf_teardown(); + + debug_teardown(); + log_teardown(); +} + +static void +setup(void) +{ + char *fname = NULL; + uint64_t intvl; + + if (atexit(teardown) != 0) { + log_stderr("cannot register teardown procedure with atexit()"); + exit(EX_OSERR); /* only failure comes from NOMEM */ + } + + /* Setup logging first */ + log_setup(&stats.log); + if (debug_setup(&setting.debug) < 0) { + log_stderr("debug log setup failed"); + goto error; + } + + /* setup top-level application options */ + if (option_bool(&setting.pubsub.daemonize)) { + daemonize(); + } + fname = option_str(&setting.pubsub.pid_filename); + if (fname != NULL) { + /* to get the correct pid, call create_pidfile after daemonize */ + create_pidfile(fname); + } + + /* setup library modules */ + buf_setup(&setting.buf, &stats.buf); + dbuf_setup(&setting.dbuf, &stats.dbuf); + event_setup(&stats.event); + sockio_setup(&setting.sockio, &stats.sockio); + tcp_setup(&setting.tcp, &stats.tcp); + timing_wheel_setup(&stats.timing_wheel); + + /* setup pelikan modules */ + time_setup(); + procinfo_setup(&stats.procinfo); + request_setup(&setting.request, &stats.request); + response_setup(&setting.response, &stats.response); + parse_setup(&stats.parse_req, NULL); + compose_setup(NULL, &stats.compose_rsp); + process_setup(&stats.process); + admin_process_setup(); + core_setup(&setting.admin, &setting.server, &setting.worker, + &stats.server, &stats.worker); + + /* adding recurring events to maintenance/admin thread */ + intvl = option_uint(&setting.pubsub.dlog_intvl); + if (core_admin_register(intvl, debug_log_flush, NULL) == NULL) { + log_stderr("Could not register timed event to flush debug log"); + goto error; + } + + return; + +error: + if (fname != NULL) { + remove_pidfile(fname); + } + + /* since we registered teardown with atexit, it'll be called upon exit */ + exit(EX_CONFIG); +} + +int +main(int argc, char **argv) +{ + rstatus_i status = CC_OK; + FILE *fp = NULL; + + if (argc > 2) { + show_usage(); + exit(EX_USAGE); + } + + if (argc == 1) { + log_stderr("launching server with default values."); + } + + if (argc == 2) { + if (strcmp(argv[1], "-h") == 0 || strcmp(argv[1], "--help") == 0) { + show_usage(); + exit(EX_OK); + } + if (strcmp(argv[1], "-v") == 0 || strcmp(argv[1], "--version") == 0) { + show_version(); + exit(EX_OK); + } + if (strcmp(argv[1], "-c") == 0 || strcmp(argv[1], "--config") == 0) { + option_describe_all((struct option *)&setting, nopt); + exit(EX_OK); + } + if (strcmp(argv[1], "-s") == 0 || strcmp(argv[1], "--stats") == 0) { + metric_describe_all((struct metric *)&stats, nmetric); + exit(EX_OK); + } + fp = fopen(argv[1], "r"); + if (fp == NULL) { + log_stderr("cannot open config: incorrect path or doesn't exist"); + exit(EX_DATAERR); + } + } + + if (option_load_default((struct option *)&setting, nopt) != CC_OK) { + log_stderr("failed to load default option values"); + exit(EX_CONFIG); + } + + if (fp != NULL) { + log_stderr("load config from %s", argv[1]); + status = option_load_file(fp, (struct option *)&setting, nopt); + fclose(fp); + } + if (status != CC_OK) { + log_stderr("failed to load config"); + exit(EX_DATAERR); + } + + setup(); + option_print_all((struct option *)&setting, nopt); + + core_run(NULL, &worker_processor); + + exit(EX_OK); +} diff --git a/src/server/pubsub/setting.c b/src/server/pubsub/setting.c new file mode 100644 index 000000000..6854a0e53 --- /dev/null +++ b/src/server/pubsub/setting.c @@ -0,0 +1,18 @@ +#include "setting.h" + +struct setting setting = { + { PUBSUB_OPTION(OPTION_INIT) }, + { ADMIN_OPTION(OPTION_INIT) }, + { SERVER_OPTION(OPTION_INIT) }, + { WORKER_OPTION(OPTION_INIT) }, + { REQUEST_OPTION(OPTION_INIT) }, + { RESPONSE_OPTION(OPTION_INIT) }, + { ARRAY_OPTION(OPTION_INIT) }, + { BUF_OPTION(OPTION_INIT) }, + { DBUF_OPTION(OPTION_INIT) }, + { DEBUG_OPTION(OPTION_INIT) }, + { SOCKIO_OPTION(OPTION_INIT) }, + { TCP_OPTION(OPTION_INIT) }, +}; + +unsigned int nopt = OPTION_CARDINALITY(setting); diff --git a/src/server/pubsub/setting.h b/src/server/pubsub/setting.h new file mode 100644 index 000000000..bb60374b6 --- /dev/null +++ b/src/server/pubsub/setting.h @@ -0,0 +1,50 @@ +#pragma once + +#include "admin/process.h" +#include "data/process.h" + +#include "core/core.h" +#include "protocol/data/redis_include.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* option related */ +/* name type default description */ +#define PUBSUB_OPTION(ACTION) \ + ACTION( daemonize, OPTION_TYPE_BOOL, false, "daemonize the process" )\ + ACTION( pid_filename, OPTION_TYPE_STR, NULL, "file storing the pid" )\ + ACTION( dlog_intvl, OPTION_TYPE_UINT, 500, "debug log flush interval(ms)" ) + +typedef struct { + PUBSUB_OPTION(OPTION_DECLARE) +} pubsub_options_st; + +struct setting { + /* top-level */ + pubsub_options_st pubsub; + /* application modules */ + admin_options_st admin; + server_options_st server; + worker_options_st worker; + request_options_st request; + response_options_st response; + /* ccommon libraries */ + array_options_st array; + buf_options_st buf; + dbuf_options_st dbuf; + debug_options_st debug; + sockio_options_st sockio; + tcp_options_st tcp; +}; + +extern struct setting setting; +extern unsigned int nopt; diff --git a/src/server/pubsub/stats.c b/src/server/pubsub/stats.c new file mode 100644 index 000000000..1c86a710a --- /dev/null +++ b/src/server/pubsub/stats.c @@ -0,0 +1,21 @@ +#include "stats.h" + +struct stats stats = { + { PROCINFO_METRIC(METRIC_INIT) }, + { PROCESS_METRIC(METRIC_INIT) }, + { PARSE_REQ_METRIC(METRIC_INIT) }, + { COMPOSE_RSP_METRIC(METRIC_INIT) }, + { REQUEST_METRIC(METRIC_INIT) }, + { RESPONSE_METRIC(METRIC_INIT) }, + { CORE_SERVER_METRIC(METRIC_INIT) }, + { CORE_WORKER_METRIC(METRIC_INIT) }, + { BUF_METRIC(METRIC_INIT) }, + { DBUF_METRIC(METRIC_INIT) }, + { EVENT_METRIC(METRIC_INIT) }, + { LOG_METRIC(METRIC_INIT) }, + { SOCKIO_METRIC(METRIC_INIT) }, + { TCP_METRIC(METRIC_INIT) }, + { TIMING_WHEEL_METRIC(METRIC_INIT) }, +}; + +unsigned int nmetric = METRIC_CARDINALITY(stats); diff --git a/src/server/pubsub/stats.h b/src/server/pubsub/stats.h new file mode 100644 index 000000000..5c458fd4c --- /dev/null +++ b/src/server/pubsub/stats.h @@ -0,0 +1,38 @@ +#pragma once + +#include "data/process.h" + +#include "core/core.h" +#include "protocol/data/redis_include.h" +#include "util/procinfo.h" + +#include +#include +#include +#include +#include +#include