diff --git a/include/ccapi_cpp/ccapi_event_dispatcher.h b/include/ccapi_cpp/ccapi_event_dispatcher.h index 09a7da8a..edff1dc0 100644 --- a/include/ccapi_cpp/ccapi_event_dispatcher.h +++ b/include/ccapi_cpp/ccapi_event_dispatcher.h @@ -13,7 +13,7 @@ namespace ccapi { class EventDispatcher final { public: - explicit EventDispatcher(size_t numDispatcherThreads = 1) + explicit EventDispatcher(const size_t numDispatcherThreads = 1) : numDispatcherThreads(numDispatcherThreads) { CCAPI_LOGGER_FUNCTION_ENTER; CCAPI_LOGGER_TRACE("numDispatcherThreads = "+size_tToString(numDispatcherThreads)); diff --git a/include/ccapi_cpp/ccapi_event_queue.h b/include/ccapi_cpp/ccapi_event_queue.h deleted file mode 100644 index dd5519e8..00000000 --- a/include/ccapi_cpp/ccapi_event_queue.h +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef INCLUDE_CCAPI_CPP_CCAPI_EVENT_QUEUE_H_ -#define INCLUDE_CCAPI_CPP_CCAPI_EVENT_QUEUE_H_ -#include -#include -#include -#include -#include "ccapi_cpp/ccapi_event.h" -namespace ccapi { -class EventQueue final { - public: - void push(Event&& event) { - { - std::lock_guard lock(this->lock); - if (this->maxSize == -1 || this->queue.size() < this->maxSize) { - CCAPI_LOGGER_TRACE("this->queue.size() = "+size_tToString(this->queue.size())); - this->queue.push_back(event); - } else { - CCAPI_LOGGER_WARN("event queue is full!"); - } - } - this->cv.notify_all(); - } - Event& front() { - std::lock_guard lock(this->lock); - return this->queue.front(); - } - std::vector purge() { - std::lock_guard lock(this->lock); - std::vector p; - std::swap(p, this->queue); - return p; - } - size_t size() const { - std::lock_guard lock(this->lock); - return this->queue.size(); - } - size_t getMaxSize() const { - return maxSize; - } - void setMaxSize(size_t maxSize) { - this->maxSize = maxSize; - } - std::condition_variable cv; - - private: - std::vector queue; - mutable std::mutex lock; - size_t maxSize{}; -}; -} /* namespace ccapi */ -#endif // INCLUDE_CCAPI_CPP_CCAPI_EVENT_QUEUE_H_ diff --git a/include/ccapi_cpp/ccapi_exchange.h b/include/ccapi_cpp/ccapi_macro.h similarity index 97% rename from include/ccapi_cpp/ccapi_exchange.h rename to include/ccapi_cpp/ccapi_macro.h index 8c766412..3026466b 100644 --- a/include/ccapi_cpp/ccapi_exchange.h +++ b/include/ccapi_cpp/ccapi_macro.h @@ -1,5 +1,5 @@ -#ifndef INCLUDE_CCAPI_CPP_CCAPI_EXCHANGE_H_ -#define INCLUDE_CCAPI_CPP_CCAPI_EXCHANGE_H_ +#ifndef INCLUDE_CCAPI_CPP_CCAPI_MACRO_H_ +#define INCLUDE_CCAPI_CPP_CCAPI_MACRO_H_ #define CCAPI_EXCHANGE_NAME_MARKET_DEPTH "MARKET_DEPTH" #define CCAPI_EXCHANGE_NAME_TRADE "TRADE" #ifndef CCAPI_EXCHANGE_NAME_MARKET_DEPTH_MAX @@ -118,4 +118,4 @@ #define CCAPI_EXCHANGE_NAME_CONNECTION "connection" #define CCAPI_EXCHANGE_NAME_REASON "reason" #define CCAPI_EXCHANGE_NAME_ERROR_MESSAGE "message" -#endif // INCLUDE_CCAPI_CPP_CCAPI_EXCHANGE_H_ +#endif // INCLUDE_CCAPI_CPP_CCAPI_MACRO_H_ diff --git a/include/ccapi_cpp/ccapi_market_data_service.h b/include/ccapi_cpp/ccapi_market_data_service.h index 446ee2da..6704db1b 100644 --- a/include/ccapi_cpp/ccapi_market_data_service.h +++ b/include/ccapi_cpp/ccapi_market_data_service.h @@ -14,7 +14,7 @@ #include "websocketpp/common/connection_hdl.hpp" #include "ccapi_cpp/ccapi_event.h" #include "ccapi_cpp/ccapi_subscription_list.h" -#include "ccapi_cpp/ccapi_exchange.h" +#include "ccapi_cpp/ccapi_macro.h" #include "ccapi_cpp/ccapi_market_data_message.h" #include "rapidjson/document.h" #include "rapidjson/writer.h" diff --git a/include/ccapi_cpp/ccapi_queue.h b/include/ccapi_cpp/ccapi_queue.h new file mode 100644 index 00000000..085648c9 --- /dev/null +++ b/include/ccapi_cpp/ccapi_queue.h @@ -0,0 +1,55 @@ +#ifndef INCLUDE_CCAPI_CPP_CCAPI_QUEUE_H_ +#define INCLUDE_CCAPI_CPP_CCAPI_QUEUE_H_ +#include +#include +#include +#include "ccapi_cpp/ccapi_logger.h" +namespace ccapi { +template +class Queue { + public: + std::string EXCEPTION_QUEUE_FULL = "queue is full"; + std::string EXCEPTION_QUEUE_EMPTY = "queue is empty"; + explicit Queue(const size_t maxSize = 0) + : maxSize(maxSize) {} + void pushBack(T&& t) { + std::lock_guard lock(this->m); + if (this->maxSize <= 0 || this->queue.size() < this->maxSize) { + CCAPI_LOGGER_TRACE("this->queue.size() = "+size_tToString(this->queue.size())); + this->queue.push_back(t); + } else { + CCAPI_LOGGER_FATAL(EXCEPTION_QUEUE_FULL); + } + } + T popBack() { + std::lock_guard lock(this->m); + if (this->queue.empty()) { + CCAPI_LOGGER_FATAL(EXCEPTION_QUEUE_EMPTY); + } else { + T t = std::move(this->queue.back()); + this->queue.pop_back(); + return t; + } + } + std::vector purge() { + std::lock_guard lock(this->m); + std::vector p; + std::swap(p, this->queue); + return p; + } + size_t size() const { + std::lock_guard lock(this->m); + return this->queue.size(); + } + bool empty() const { + std::lock_guard lock(this->m); + return this->queue.empty(); + } + + private: + std::vector queue; + mutable std::mutex m; + size_t maxSize{}; +}; +} /* namespace ccapi */ +#endif // INCLUDE_CCAPI_CPP_CCAPI_QUEUE_H_ diff --git a/include/ccapi_cpp/ccapi_session.h b/include/ccapi_cpp/ccapi_session.h index 9abec731..5fd76ece 100644 --- a/include/ccapi_cpp/ccapi_session.h +++ b/include/ccapi_cpp/ccapi_session.h @@ -42,8 +42,8 @@ #include #include #include -#include "ccapi_cpp/ccapi_event_queue.h" -#include "ccapi_cpp/ccapi_exchange.h" +#include "ccapi_cpp/ccapi_queue.h" +#include "ccapi_cpp/ccapi_macro.h" #include "ccapi_cpp/ccapi_event_dispatcher.h" #include "ccapi_cpp/ccapi_event_handler.h" #include "ccapi_cpp/ccapi_event.h" @@ -56,7 +56,8 @@ class Session final { : sessionOptions(options), sessionConfigs(configs), eventHandler(eventHandler), - eventDispatcher(eventDispatcher) { + eventDispatcher(eventDispatcher), + eventQueue(options.maxEventQueueSize) { CCAPI_LOGGER_FUNCTION_ENTER; if (this->eventHandler) { if (!this->eventDispatcher) { @@ -67,7 +68,6 @@ class Session final { throw std::runtime_error("undefined behavior"); } } - this->eventQueue.setMaxSize(options.maxEventQueueSize); CCAPI_LOGGER_FUNCTION_EXIT; } // bool openService(std::string serviceName = "") { @@ -151,7 +151,7 @@ class Session final { if (this->eventDispatcher) { this->eventDispatcher->start(); } - std::function wsEventHandler = std::bind(&Session::onEvent, this, std::placeholders::_1); + std::function wsEventHandler = std::bind(&Session::onEvent, this, std::placeholders::_1, nullptr); auto sessionOptions = this->sessionOptions; auto sessionConfigs = this->sessionConfigs; CCAPI_LOGGER_TRACE("sessionOptions.enableOneIoContextPerExchange = "+toString(sessionOptions.enableOneIoContextPerExchange)); @@ -327,10 +327,10 @@ class Session final { } CCAPI_LOGGER_FUNCTION_EXIT; } - void onEvent(Event& event) { + void onEvent(Event& event, Queue *eventQueue) { CCAPI_LOGGER_FUNCTION_ENTER; CCAPI_LOGGER_TRACE("event = "+toString(event)); - if (this->eventHandler) { + if (this->eventHandler && !eventQueue) { CCAPI_LOGGER_TRACE("handle event asynchronously"); this->eventDispatcher->dispatch([&, event] { bool shouldContinue = true; @@ -346,11 +346,15 @@ class Session final { }); } else { CCAPI_LOGGER_TRACE("handle event synchronously"); - this->eventQueue.push(std::move(event)); + if (eventQueue) { + eventQueue->pushBack(std::move(event)); + } else { + this->eventQueue.pushBack(std::move(event)); + } } CCAPI_LOGGER_FUNCTION_EXIT; } - EventQueue eventQueue; + Queue eventQueue; private: // std::string serviceName; diff --git a/include/ccapi_cpp/ccapi_session_configs.h b/include/ccapi_cpp/ccapi_session_configs.h index 01b2a0e3..3f31d8d5 100644 --- a/include/ccapi_cpp/ccapi_session_configs.h +++ b/include/ccapi_cpp/ccapi_session_configs.h @@ -1,10 +1,10 @@ #ifndef INCLUDE_CCAPI_CPP_CCAPI_SESSION_CONFIGS_H_ #define INCLUDE_CCAPI_CPP_CCAPI_SESSION_CONFIGS_H_ +#include "ccapi_cpp/ccapi_macro.h" #include #include #include #include -#include "ccapi_cpp/ccapi_exchange.h" #include "ccapi_cpp/ccapi_logger.h" namespace ccapi { class SessionConfigs final { diff --git a/include/ccapi_cpp/ccapi_session_options.h b/include/ccapi_cpp/ccapi_session_options.h index c7dfd8bc..b509cb4a 100644 --- a/include/ccapi_cpp/ccapi_session_options.h +++ b/include/ccapi_cpp/ccapi_session_options.h @@ -26,7 +26,7 @@ class SessionOptions final { bool enableOneIoContextPerExchange{}; long pingIntervalMilliSeconds{10000}; long pongTimeoutMilliSeconds{5000}; - long maxEventQueueSize{-1}; + long maxEventQueueSize{0}; }; } /* namespace ccapi */ #endif // INCLUDE_CCAPI_CPP_CCAPI_SESSION_OPTIONS_H_ diff --git a/include/ccapi_cpp/ccapi_subscription.h b/include/ccapi_cpp/ccapi_subscription.h index 9ea59444..925e60d7 100644 --- a/include/ccapi_cpp/ccapi_subscription.h +++ b/include/ccapi_cpp/ccapi_subscription.h @@ -1,9 +1,9 @@ #ifndef INCLUDE_CCAPI_CPP_CCAPI_SUBSCRIPTION_H_ #define INCLUDE_CCAPI_CPP_CCAPI_SUBSCRIPTION_H_ +#include "ccapi_cpp/ccapi_macro.h" #include #include #include "ccapi_cpp/ccapi_correlationId.h" -#include "ccapi_cpp/ccapi_exchange.h" #include "ccapi_cpp/ccapi_util.h" namespace ccapi { class Subscription final {