diff --git a/CMakeLists.txt b/CMakeLists.txt index 03bd116..79b57e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,7 +16,7 @@ endif() include (${CUSTOM_CMAKE_MODULES}/common.mk) option(BUILD_DEBUG "debug or release" OFF) -option(BUILD_TEST "build unit-tests" OFF) +option(BUILD_TEST "build tests" OFF) function(parseLogLevel varName) if (NOT ${varName}) @@ -184,37 +184,20 @@ install(FILES ${flora_headers} DESTINATION include) # unit-tests if (BUILD_TEST) -file(GLOB flora_unit_test_SOURCES - unit-tests/*.h - unit-tests/*.cc +findPackage(gtest REQUIRED + HINTS ${gtestPrefix} + HEADERS gtest/gtest.h + STATIC_LIBS gtest ) -add_executable(flora-test - ${flora_unit_test_SOURCES} -) -target_include_directories(flora-test - PRIVATE include ${mutils_INCLUDE_DIRS} -) -target_link_libraries(flora-test - flora-cli - flora-svc - ${mutils_LIBRARIES} -) -set_target_properties(flora-test PROPERTIES INSTALL_RPATH ${CMAKE_INSTALL_PREFIX}/lib) -install(TARGETS flora-test RUNTIME DESTINATION bin) - -file(GLOB flora_demo_SOURCES - examples/*.h - examples/*.cc -) -add_executable(flora-demo - ${flora_demo_SOURCES} -) -target_include_directories(flora-demo - PRIVATE include ${mutils_INCLUDE_DIRS} +add_executable(test test/main.cc test/simple.cc test/svc.h) +target_include_directories(test PRIVATE + include + ${gtest_INCLUDE_DIRS} + ${mutils_INCLUDE_DIRS} ) -target_link_libraries(flora-demo +target_link_libraries(test flora-cli flora-svc - ${mutils_LIBRARIES} + ${gtest_LIBRARIES} ) endif(BUILD_TEST) diff --git a/config b/config index 56b79e5..2a4585f 100755 --- a/config +++ b/config @@ -20,6 +20,7 @@ Configuration: Dependencies: --mutils=DIR specify mutils install dir --ncurses=DIR specify ncurses install dir + --gtest=DIR specify gtest install dir Cross Compile: --toolchain=DIR toolchain install dir @@ -73,6 +74,9 @@ do --ncurses=*) CMAKE_ARGS=(${CMAKE_ARGS[@]} -DncursesPrefix=$conf_optarg) ;; + --gtest=*) + CMAKE_ARGS=(${CMAKE_ARGS[@]} -DgtestPrefix=$conf_optarg) + ;; --toolchain=*) CMAKE_ARGS=(${CMAKE_ARGS[@]} -DTOOLCHAIN_HOME=$conf_optarg) CROSS_COMPILE=yes diff --git a/include/flora-cli.h b/include/flora-cli.h index 47082e8..0b77830 100644 --- a/include/flora-cli.h +++ b/include/flora-cli.h @@ -135,9 +135,9 @@ class ClientCallback { class MonitorListItem { public: uint32_t id; - int32_t pid; - std::string name; uint32_t flags; + uint64_t tag; + std::string name; }; class MonitorSubscriptionItem { @@ -194,6 +194,31 @@ class MonitorCallback { virtual void disconnected() {} }; +class MsgSender { +public: + // return: 0 unix domain socket connection + // 1 tcp socket connection + static uint32_t connection_type(); + + // pid of msg sender, if connection type is unix domain socket connection + static pid_t pid(); + + // identify of msg sender + static uint64_t tag(); + + // ipv4 addr string, if connection type is tcp socket connection + static const char* ipaddr(); + + // ipv4 port, if connection type is tcp socket connection + static uint16_t port(); + + static const char* name(); + + // pid string, if connection type is unix domain socket connection + // ipaddr:port, if connection type is tcp socket connection + static void to_string(std::string& str); +}; + } // namespace flora extern "C" { @@ -229,9 +254,9 @@ typedef void (*flora_call_callback_t)(int32_t rescode, flora_call_result *result, void *arg); typedef struct { - uint32_t id; - int32_t pid; + uint64_t tag; const char *name; + uint32_t id; } flora_cli_monitor_list_item; typedef struct { const char *name; diff --git a/monitor/main.cc b/monitor/main.cc index 09cbd05..7e89f12 100644 --- a/monitor/main.cc +++ b/monitor/main.cc @@ -33,7 +33,6 @@ static bool parseCmdline(int argc, char **argv, CmdlineArgs &res) { class FloraClientInfo { public: uint32_t id = 0; - int32_t pid = 0; string name; uint32_t flags = 0; }; @@ -45,7 +44,6 @@ class MonitorView : public flora::MonitorCallback { while (it != items.end()) { auto fcit = floraClients.emplace(floraClients.end()); fcit->id = it->id; - fcit->pid = it->pid; fcit->name = it->name; fcit->flags = it->flags; ++it; @@ -56,7 +54,6 @@ class MonitorView : public flora::MonitorCallback { void list_add(MonitorListItem &item) { auto fcit = floraClients.emplace(floraClients.end()); fcit->id = item.id; - fcit->pid = item.pid; fcit->name = item.name; fcit->flags = item.flags; updateMonitorScreen(); @@ -98,7 +95,7 @@ class MonitorView : public flora::MonitorCallback { void printTabLine(FloraClientInfo &info) { char buf[16]; - snprintf(buf, sizeof(buf), "%d", info.pid); + // snprintf(buf, sizeof(buf), "%d", info.pid); printTabCell(0, buf); printTabCell(1, info.name.c_str()); printw("\n"); diff --git a/src/adap.h b/src/adap.h index f2ee348..75edc13 100644 --- a/src/adap.h +++ b/src/adap.h @@ -14,7 +14,6 @@ class AdapterInfo { AdapterInfo() { id = ++idseq; } uint32_t id; - int32_t pid = 0; std::string name; std::set declared_methods; uint32_t flags = 0; @@ -53,6 +52,11 @@ class Adapter { public: uint32_t serialize_flags; AdapterInfo *info = nullptr; + // unix socket adapter: pid + // tcp socket adapter: + // high 32 bits: 0x80000000 | ipv4port + // low 32 bits: ipv4addr + uint64_t tag = 0; #ifdef FLORA_DEBUG uint32_t recv_times = 0; uint32_t recv_bytes = 0; diff --git a/src/cli.cc b/src/cli.cc index d7a7e26..95226da 100644 --- a/src/cli.cc +++ b/src/cli.cc @@ -13,6 +13,9 @@ using namespace std; using namespace std::chrono; using rokid::Uri; +thread_local uint64_t flora::internal::Client::tag = 0; +thread_local string flora::internal::Client::sender_name; + static bool ignore_sigpipe = false; int32_t flora::Client::connect(const char *uri, flora::ClientCallback *ccb, flora::MonitorCallback *mcb, @@ -229,7 +232,8 @@ bool Client::handle_cmd_after_auth(int32_t cmd, shared_ptr &resp) { string name; shared_ptr args; - if (ResponseParser::parse_post(resp, name, msgtype, args) != 0) { + if (ResponseParser::parse_post(resp, name, msgtype, args, tag, + sender_name) != 0) { return false; } if (cli_callback) { @@ -241,7 +245,8 @@ bool Client::handle_cmd_after_auth(int32_t cmd, shared_ptr &resp) { string name; int32_t msgid; shared_ptr args; - if (ResponseParser::parse_call(resp, name, args, msgid) != 0) { + if (ResponseParser::parse_call(resp, name, args, msgid, tag, + sender_name) != 0) { return false; } if (cli_callback) { @@ -258,7 +263,7 @@ bool Client::handle_cmd_after_auth(int32_t cmd, shared_ptr &resp) { PendingRequestList::iterator it; Response response; - if (ResponseParser::parse_reply(resp, msgid, rescode, response) != 0) { + if (ResponseParser::parse_reply(resp, msgid, rescode, response, tag) != 0) { KLOGW(TAG, "parse reply failed"); return false; } @@ -733,6 +738,47 @@ void ReplyImpl::end(int32_t code, std::shared_ptr &data) { } // namespace internal } // namespace flora +namespace flora { +uint32_t MsgSender::connection_type() { + return flora::internal::TagHelper::type(flora::internal::Client::tag); +} + +pid_t MsgSender::pid() { + return flora::internal::TagHelper::pid(flora::internal::Client::tag); +} + +uint64_t MsgSender::tag() { + return flora::internal::Client::tag; +} + +const char* MsgSender::ipaddr() { + return flora::internal::TagHelper::ipaddr(flora::internal::Client::tag); +} + +uint16_t MsgSender::port() { + return flora::internal::TagHelper::port(flora::internal::Client::tag); +} + +const char* MsgSender::name() { + return flora::internal::Client::sender_name.c_str(); +} + +void MsgSender::to_string(string& str) { + str = "["; + if (MsgSender::connection_type() == 0) { + char buf[16]; + snprintf(buf, sizeof(buf), "%d", MsgSender::pid()); + str += buf; + } else { + string tmp; + flora::internal::TagHelper::to_addr_string(flora::internal::Client::tag, tmp); + str += tmp; + } + str += "]"; + str += MsgSender::name(); +} +} // namespace flora + using flora::Reply; using flora::Response; using flora::internal::Client; diff --git a/src/cli.h b/src/cli.h index 469669c..78773c4 100644 --- a/src/cli.h +++ b/src/cli.h @@ -125,6 +125,8 @@ class Client : public flora::Client { public: std::string auth_extra; ClientCallback *cli_callback = nullptr; + static thread_local uint64_t tag; + static thread_local std::string sender_name; #ifdef FLORA_DEBUG uint32_t post_times = 0; uint32_t post_bytes = 0; diff --git a/src/defs.h b/src/defs.h index 19e1eb1..8b82670 100644 --- a/src/defs.h +++ b/src/defs.h @@ -1,6 +1,6 @@ #pragma once -#define FLORA_VERSION 3 +#define FLORA_VERSION 4 // client --> server #define CMD_AUTH_REQ 0 diff --git a/src/disp.cc b/src/disp.cc index 1de8999..8b03566 100644 --- a/src/disp.cc +++ b/src/disp.cc @@ -128,7 +128,7 @@ void Dispatcher::handle_cmd(shared_ptr &msg_caps, void Dispatcher::pending_call_timeout(PendingCall &pc) { int32_t c = ResponseSerializer::serialize_reply(pc.cliid, FLORA_CLI_ETIMEOUT, - nullptr, buffer, buf_size, + nullptr, 0, buffer, buf_size, pc.sender->serialize_flags); pc.sender->write(buffer, c); } @@ -181,15 +181,17 @@ bool Dispatcher::handle_auth_req(shared_ptr &msg_caps, KLOGE(TAG, "<<< %s: auth failed. version not supported, excepted %u", extra.c_str(), FLORA_VERSION); } else { + if (sender->tag == 0) + sender->tag = TagHelper::create(pid); if (flags & FLORA_CLI_FLAG_MONITOR) { if ((this->flags & FLORA_DISP_FLAG_MONITOR) == 0) { result = FLORA_CLI_EAUTH; KLOGE(TAG, "<<< %s: auth failed. service not support monitor mode", extra.c_str()); } else { - add_monitor(extra, pid, flags, sender); + add_monitor(extra, flags, sender); } - } else if (!add_adapter(extra, pid, flags, sender)) { + } else if (!add_adapter(extra, flags, sender)) { result = FLORA_CLI_EDUPID; KLOGE(TAG, "<<< %s: auth failed. client id already used", extra.c_str()); } @@ -218,7 +220,9 @@ void Dispatcher::do_erase_adapter(shared_ptr &sender) { if (sender->info->flags & FLORA_CLI_FLAG_MONITOR) monitors.erase(reinterpret_cast(sender.get())); else { - KLOGI(TAG, "erase adapter <%d>:%s", sender->info->pid, + string str; + TagHelper::to_addr_string(sender->tag, str); + KLOGI(TAG, "erase adapter <%s>:%s", str.c_str(), sender->info->name.c_str()); write_monitor_list_remove(sender->info->id); adapter_infos.erase(reinterpret_cast(sender.get())); @@ -247,8 +251,8 @@ bool Dispatcher::handle_subscribe_req(shared_ptr &msg_caps, PersistMsgMap::iterator pit = persist_msgs.find(name); if (pit != persist_msgs.end()) { int32_t c = ResponseSerializer::serialize_post( - name.c_str(), FLORA_MSGTYPE_PERSIST, pit->second.data, buffer, buf_size, - sender->serialize_flags); + name.c_str(), FLORA_MSGTYPE_PERSIST, pit->second.data, 0, "", buffer, + buf_size, sender->serialize_flags); if (c > 0) { KLOGI(TAG, ">>> %s: dispatch persist msg %s", sender->info->name.c_str(), name.c_str()); @@ -352,9 +356,13 @@ bool Dispatcher::post_msg(const string &name, uint32_t type, } ++ait; } - write_post_msg_to_adapters(name, type, args, 0, nobo_adapters, cli_name); - write_post_msg_to_adapters(name, type, args, CAPS_FLAG_NET_BYTEORDER, - bo_adapters, cli_name); + if (!nobo_adapters.empty()) + write_post_msg_to_adapters(name, type, args, sender->tag, 0, + nobo_adapters, cli_name); + if (!bo_adapters.empty()) + write_post_msg_to_adapters(name, type, args, sender->tag, + CAPS_FLAG_NET_BYTEORDER, bo_adapters, + cli_name); } if (type == FLORA_MSGTYPE_PERSIST) { @@ -364,13 +372,11 @@ bool Dispatcher::post_msg(const string &name, uint32_t type, return true; } -void Dispatcher::write_post_msg_to_adapters(const string &name, uint32_t type, - shared_ptr &args, - uint32_t flags, - AdapterList &adapters, - const char *sender_name) { - int32_t c = ResponseSerializer::serialize_post(name.c_str(), type, args, - buffer, buf_size, flags); +void Dispatcher::write_post_msg_to_adapters( + const string &name, uint32_t type, shared_ptr &args, uint64_t tag, + uint32_t flags, AdapterList &adapters, const char *sender_name) { + int32_t c = ResponseSerializer::serialize_post( + name.c_str(), type, args, tag, sender_name, buffer, buf_size, flags); if (c < 0) return; AdapterList::iterator ait; @@ -423,7 +429,7 @@ bool Dispatcher::handle_call_req(shared_ptr &msg_caps, int32_t c; if (it == named_adapters.end() || !it->second->info->has_method(name)) { c = ResponseSerializer::serialize_reply(cliid, FLORA_CLI_ENEXISTS, nullptr, - buffer, buf_size, + 0, buffer, buf_size, sender->serialize_flags); if (c < 0) return false; @@ -434,8 +440,9 @@ bool Dispatcher::handle_call_req(shared_ptr &msg_caps, } int32_t svrid = ++reqseq; add_pending_call(svrid, cliid, sender, it->second, timeout); - c = ResponseSerializer::serialize_call(name.c_str(), args, svrid, buffer, - buf_size, it->second->serialize_flags); + c = ResponseSerializer::serialize_call( + name.c_str(), args, svrid, sender->tag, sender->info->name.c_str(), + buffer, buf_size, it->second->serialize_flags); if (c < 0) return false; KLOGI(TAG, "%s >>> %s: call %d/%s", sender->info->name.c_str(), @@ -472,7 +479,7 @@ bool Dispatcher::handle_reply_req(shared_ptr &msg_caps, resp.data = data; resp.extra = sender->info->name; int32_t c = ResponseSerializer::serialize_reply( - (*it).cliid, FLORA_CLI_SUCCESS, &resp, buffer, buf_size, + (*it).cliid, FLORA_CLI_SUCCESS, &resp, sender->tag, buffer, buf_size, (*it).sender->serialize_flags); if (c < 0) return false; @@ -497,7 +504,7 @@ bool Dispatcher::handle_ping_req(shared_ptr &msg_caps, return true; } -bool Dispatcher::add_adapter(const string &name, int32_t pid, uint32_t flags, +bool Dispatcher::add_adapter(const string &name, uint32_t flags, shared_ptr &adapter) { if (name.length() > 0) { auto r2 = named_adapters.insert(make_pair(name, adapter)); @@ -509,7 +516,6 @@ bool Dispatcher::add_adapter(const string &name, int32_t pid, uint32_t flags, auto r1 = adapter_infos.insert( make_pair(reinterpret_cast(adapter.get()), info)); if (r1.second) { - r1.first->second.pid = pid; r1.first->second.name = name; r1.first->second.flags = flags; adapter->info = &r1.first->second; @@ -517,13 +523,12 @@ bool Dispatcher::add_adapter(const string &name, int32_t pid, uint32_t flags, return true; } -void Dispatcher::add_monitor(const string &name, int32_t pid, uint32_t flags, +void Dispatcher::add_monitor(const string &name, uint32_t flags, shared_ptr &adapter) { AdapterInfo info; auto r = monitors.insert( make_pair(reinterpret_cast(adapter.get()), info)); if (r.second) { - r.first->second.pid = pid; r.first->second.name = name; r.first->second.flags = flags; adapter->info = &r.first->second; diff --git a/src/disp.h b/src/disp.h index 9ea84c1..2c9d12b 100644 --- a/src/disp.h +++ b/src/disp.h @@ -79,10 +79,10 @@ class Dispatcher : public flora::Dispatcher { bool handle_ping_req(std::shared_ptr &msg_caps, std::shared_ptr &sender); - bool add_adapter(const std::string &name, int32_t pid, uint32_t flags, + bool add_adapter(const std::string &name, uint32_t flags, std::shared_ptr &adapter); - void add_monitor(const std::string &name, int32_t pid, uint32_t flags, + void add_monitor(const std::string &name, uint32_t flags, std::shared_ptr &adapter); void handle_cmds(); @@ -101,10 +101,10 @@ class Dispatcher : public flora::Dispatcher { bool post_msg(const std::string &name, uint32_t type, std::shared_ptr &args, Adapter *sender); - void write_post_msg_to_adapters(const std::string &name, uint32_t type, - std::shared_ptr &args, uint32_t flags, - AdapterList &adapters, - const char *sender_name); + void write_post_msg_to_adapters( + const std::string &name, uint32_t type, std::shared_ptr &args, + uint64_t tag, uint32_t flags, AdapterList &adapters, + const char *sender_name); void do_erase_adapter(std::shared_ptr &sender); diff --git a/src/ser-helper.cc b/src/ser-helper.cc index df19424..13d38ef 100644 --- a/src/ser-helper.cc +++ b/src/ser-helper.cc @@ -155,13 +155,16 @@ int32_t ResponseSerializer::serialize_auth(int32_t result, void *data, } int32_t ResponseSerializer::serialize_post(const char *name, uint32_t msgtype, - shared_ptr &args, void *data, + shared_ptr &args, uint64_t tag, + const char* cliname, void *data, uint32_t size, uint32_t flags) { shared_ptr caps = Caps::new_instance(); caps->write(CMD_POST_RESP); caps->write(msgtype); caps->write(name); caps->write(args); + caps->write(tag); + caps->write(cliname); int32_t r = caps->serialize(data, size, flags); if (r < 0) return -1; @@ -172,6 +175,7 @@ int32_t ResponseSerializer::serialize_post(const char *name, uint32_t msgtype, int32_t ResponseSerializer::serialize_call(const char *name, shared_ptr &args, int32_t id, + uint64_t tag, const char *cliname, void *data, uint32_t size, uint32_t flags) { shared_ptr caps = Caps::new_instance(); @@ -179,6 +183,8 @@ int32_t ResponseSerializer::serialize_call(const char *name, caps->write(id); caps->write(name); caps->write(args); + caps->write(tag); + caps->write(cliname); int32_t r = caps->serialize(data, size, flags); if (r < 0) return -1; @@ -188,8 +194,9 @@ int32_t ResponseSerializer::serialize_call(const char *name, } int32_t ResponseSerializer::serialize_reply(int32_t id, int32_t rescode, - Response *reply, void *data, - uint32_t size, uint32_t flags) { + Response *reply, uint64_t tag, + void *data, uint32_t size, + uint32_t flags) { shared_ptr caps = Caps::new_instance(); caps->write(CMD_REPLY_RESP); caps->write(id); @@ -199,6 +206,7 @@ int32_t ResponseSerializer::serialize_reply(int32_t id, int32_t rescode, caps->write(reply->data); caps->write(reply->extra.c_str()); } + caps->write(tag); int32_t r = caps->serialize(data, size, flags); if (r < 0) return -1; @@ -210,7 +218,6 @@ int32_t ResponseSerializer::serialize_reply(int32_t id, int32_t rescode, static shared_ptr serialize_monitor_list_item(AdapterInfo &info) { shared_ptr r = Caps::new_instance(); r->write(info.id); - r->write(info.pid); r->write(info.name); r->write(info.flags); return r; @@ -427,29 +434,40 @@ int32_t ResponseParser::parse_auth(shared_ptr &caps, int32_t &result) { } int32_t ResponseParser::parse_post(shared_ptr &caps, string &name, - uint32_t &msgtype, shared_ptr &args) { + uint32_t &msgtype, shared_ptr &args, + uint64_t &tag, string &cliname) { if (caps->read(msgtype) != CAPS_SUCCESS) return -1; if (caps->read(name) != CAPS_SUCCESS) return -1; if (caps->read(args) != CAPS_SUCCESS) return -1; + if (caps->read(tag) != CAPS_SUCCESS) + return -1; + if (caps->read(cliname) != CAPS_SUCCESS) + return -1; return 0; } int32_t ResponseParser::parse_call(shared_ptr &caps, string &name, - shared_ptr &args, int32_t &id) { + shared_ptr &args, int32_t &id, + uint64_t &tag, string &cliname) { if (caps->read(id) != CAPS_SUCCESS) return -1; if (caps->read(name) != CAPS_SUCCESS) return -1; if (caps->read(args) != CAPS_SUCCESS) return -1; + if (caps->read(tag) != CAPS_SUCCESS) + return -1; + if (caps->read(cliname) != CAPS_SUCCESS) + return -1; return 0; } int32_t ResponseParser::parse_reply(shared_ptr &caps, int32_t &id, - int32_t &rescode, Response &reply) { + int32_t &rescode, Response &reply, + uint64_t &tag) { if (caps->read(id) != CAPS_SUCCESS) return -1; if (caps->read(rescode) != CAPS_SUCCESS) @@ -462,6 +480,8 @@ int32_t ResponseParser::parse_reply(shared_ptr &caps, int32_t &id, if (caps->read(reply.extra) != CAPS_SUCCESS) return -1; } + if (caps->read(tag) != CAPS_SUCCESS) + return -1; return 0; } @@ -482,12 +502,11 @@ int32_t ResponseParser::parse_monitor_list_all(shared_ptr &caps, MonitorListItem &i = infos.back(); if (sub->read(i.id) != CAPS_SUCCESS) return -1; - if (sub->read(i.pid) != CAPS_SUCCESS) - return -1; if (sub->read(i.name) != CAPS_SUCCESS) return -1; if (sub->read(i.flags) != CAPS_SUCCESS) return -1; + // TODO: set i.tag } return infos.size() == size ? 0 : -1; } @@ -499,12 +518,11 @@ int32_t ResponseParser::parse_monitor_list_add(shared_ptr &caps, return -1; if (sub->read(info.id) != CAPS_SUCCESS) return -1; - if (sub->read(info.pid) != CAPS_SUCCESS) - return -1; if (sub->read(info.name) != CAPS_SUCCESS) return -1; if (sub->read(info.flags) != CAPS_SUCCESS) return -1; + // TODO: set i.tag return 0; } diff --git a/src/ser-helper.h b/src/ser-helper.h index 0548667..b87918d 100644 --- a/src/ser-helper.h +++ b/src/ser-helper.h @@ -4,6 +4,7 @@ #include "defs.h" #include "disp.h" #include "flora-cli.h" +#include #include #include @@ -50,15 +51,17 @@ class ResponseSerializer { uint32_t flags); static int32_t serialize_post(const char *name, uint32_t msgtype, - std::shared_ptr &args, void *data, - uint32_t size, uint32_t flags); + std::shared_ptr &args, uint64_t tag, + const char *cliname, void *data, uint32_t size, + uint32_t flags); static int32_t serialize_call(const char *name, std::shared_ptr &args, - int32_t id, void *data, uint32_t size, - uint32_t flags); + int32_t id, uint64_t tag, const char *cliname, + void *data, uint32_t size, uint32_t flags); static int32_t serialize_reply(int32_t id, int32_t rescode, Response *reply, - void *data, uint32_t size, uint32_t flags); + uint64_t tag, void *data, uint32_t size, + uint32_t flags); static int32_t serialize_monitor_list_all(AdapterInfoMap &infos, void *data, uint32_t size, uint32_t flags); @@ -138,13 +141,15 @@ class ResponseParser { static int32_t parse_auth(std::shared_ptr &caps, int32_t &result); static int32_t parse_post(std::shared_ptr &caps, std::string &name, - uint32_t &msgtype, std::shared_ptr &args); + uint32_t &msgtype, std::shared_ptr &args, + uint64_t &tag, std::string &cliname); static int32_t parse_call(std::shared_ptr &caps, std::string &name, - std::shared_ptr &args, int32_t &id); + std::shared_ptr &args, int32_t &id, + uint64_t &tag, std::string &cliname); static int32_t parse_reply(std::shared_ptr &caps, int32_t &id, - int32_t &rescode, Response &reply); + int32_t &rescode, Response &reply, uint64_t &tag); static int32_t parse_monitor_list_all(std::shared_ptr &caps, std::vector &infos); @@ -184,5 +189,42 @@ class ResponseParser { bool is_valid_msgtype(uint32_t msgtype); +class TagHelper { +public: + static uint64_t create(struct sockaddr_in& addr) { + return (((uint64_t)(0x80000000 | addr.sin_port)) << 32) | addr.sin_addr.s_addr; + } + + static uint64_t create(uint32_t pid) { return pid; } + + // return: 0 unix + // 1 tcp + static uint32_t type(uint64_t tag) { + return (tag >> 32) & 0x80000000; + } + + static pid_t pid(uint64_t tag) { + return tag & 0xffffffff; + } + + static const char* ipaddr(uint64_t tag) { + struct in_addr iaddr; + iaddr.s_addr = (tag & 0xffffffff); + return inet_ntoa(iaddr); + } + + static uint16_t port(uint64_t tag) { + return (tag >> 32) & 0xffff; + } + + static void to_addr_string(uint64_t tag, std::string& str) { + str = ipaddr(tag); + str += ':'; + char buf[16]; + snprintf(buf, sizeof(buf), "%d", port(tag)); + str += buf; + } +}; + } // namespace internal } // namespace flora diff --git a/src/sock-poll.cc b/src/sock-poll.cc index ff06828..f92aed8 100644 --- a/src/sock-poll.cc +++ b/src/sock-poll.cc @@ -1,4 +1,5 @@ #include "sock-poll.h" +#include "ser-helper.h" #include "flora-svc.h" #include "rlog.h" #include @@ -96,10 +97,14 @@ static int unix_accept(int lfd) { return accept(lfd, (sockaddr *)&addr, &addr_len); } -static int tcp_accept(int lfd) { +static int tcp_accept(int lfd, uint64_t& tag) { sockaddr_in addr; socklen_t addr_len = sizeof(addr); - return accept(lfd, (sockaddr *)&addr, &addr_len); + auto fd = accept(lfd, (sockaddr *)&addr, &addr_len); + if (fd < 0) + return fd; + tag = TagHelper::create(addr); + return fd; } void SocketPoll::run() { @@ -162,14 +167,16 @@ void SocketPoll::run() { shared_ptr SocketPoll::do_accept(int lfd) { int new_fd = -1; + uint64_t tag = 0; if (type == POLL_TYPE_TCP) - new_fd = tcp_accept(lfd); + new_fd = tcp_accept(lfd, tag); else new_fd = unix_accept(lfd); if (new_fd < 0) return nullptr; auto adap = new_adapter(new_fd); + adap->tag = tag; adapters.insert(make_pair(new_fd, adap)); return adap; } diff --git a/test/main.cc b/test/main.cc new file mode 100644 index 0000000..d512cb4 --- /dev/null +++ b/test/main.cc @@ -0,0 +1,25 @@ +#include "gtest/gtest.h" +#include "svc.h" + +using namespace std; +using namespace flora; + +string Service::floraUri; +ThreadPool Service::thrPool{4}; + +int main(int argc, char** argv) { + if (argc < 2) { + printf("USAGE: %s ${floraUri}\n", argv[0]); + return 1; + } + Service svc{argv[1]}; + svc.start(); + + --argc; + ++argv; + testing::InitGoogleTest(&argc, argv); + auto r = RUN_ALL_TESTS(); + svc.stop(); + Service::thrPool.close(); + return r; +} diff --git a/test/simple.cc b/test/simple.cc new file mode 100644 index 0000000..2bbc5bc --- /dev/null +++ b/test/simple.cc @@ -0,0 +1,76 @@ +#include +#include "gtest/gtest.h" +#include "flora-agent.h" +#include "svc.h" + +using namespace std; +using namespace flora; + +TEST(SimpleTest, postInstant) { + Agent rcv; + Agent snd1; + Agent snd2; + string uri = Service::floraUri; + rcv.config(FLORA_AGENT_CONFIG_URI, uri.c_str()); + snd1.config(FLORA_AGENT_CONFIG_URI, (uri + "#test1").c_str()); + snd2.config(FLORA_AGENT_CONFIG_URI, (uri + "#test2").c_str()); + string msgName = "foo"; + struct { + uint32_t num[2]{0, 0}; + string tags[2]; + uint32_t postCompleted{0}; + + int32_t tag2Idx(const string& tag) { + int32_t i; + for (i = 0; i < 2; ++i) { + if (tags[i].empty()) { + tags[i] = tag; + return i; + } + if (tags[i] == tag) + return i; + } + return -1; + } + } inst; + rcv.subscribe(msgName.c_str(), + [&inst](const char* name, shared_ptr& msg, uint32_t type) { + string tag; + MsgSender::to_string(tag); + auto idx = inst.tag2Idx(tag); + ASSERT_GE(idx, 0); + ASSERT_LT(idx, 2); + int32_t v{-1}; + EXPECT_EQ(msg->read(v), CAPS_SUCCESS); + EXPECT_EQ(v, inst.num[idx]); + ++inst.num[idx]; + }); + rcv.start(); + snd1.start(); + snd2.start(); + + Service::thrPool.push([&snd1, &inst, &msgName]() { + uint32_t i; + for (i = 0; i < 78; ++i) { + auto msg = Caps::new_instance(); + msg->write(i); + snd1.post(msgName.c_str(), msg); + } + ++inst.postCompleted; + }); + + Service::thrPool.push([&snd2, &inst, &msgName]() { + uint32_t i; + for (i = 0; i < 99; ++i) { + auto msg = Caps::new_instance(); + msg->write(i); + snd2.post(msgName.c_str(), msg); + } + ++inst.postCompleted; + }); + + sleep(5); + EXPECT_EQ(inst.postCompleted, 2); + EXPECT_EQ(inst.num[0] + inst.num[1], 78 + 99); + EXPECT_EQ(inst.num[0] == 78 || inst.num[0] == 99, true); +} diff --git a/test/svc.h b/test/svc.h new file mode 100644 index 0000000..882b9c7 --- /dev/null +++ b/test/svc.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include "thr-pool.h" +#include "flora-svc.h" + +class Service { +public: + Service(const char* uri) { + floraUri = uri; + } + + void start() { + disp = flora::Dispatcher::new_instance(FLORA_DISP_FLAG_MONITOR); + poll = flora::Poll::new_instance(floraUri.c_str()); + poll->start(disp); + disp->run(false); + } + + void stop() { + disp->close(); + poll->stop(); + disp.reset(); + poll.reset(); + } + +public: + static std::string floraUri; + static ThreadPool thrPool; + +private: + std::shared_ptr disp; + std::shared_ptr poll; +}; +