Skip to content

Commit

Permalink
Merge pull request #16 from crypto-chassis/improvement
Browse files Browse the repository at this point in the history
Update: refactor event queue
  • Loading branch information
cryptochassis authored Oct 6, 2020
2 parents 45eb825 + 54f9dba commit f7f7d05
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 68 deletions.
2 changes: 1 addition & 1 deletion include/ccapi_cpp/ccapi_event_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace ccapi {
class EventDispatcher final {
public:
explicit EventDispatcher(size_t numDispatcherThreads = 1)
explicit EventDispatcher(const size_t numDispatcherThreads = 1)
: numDispatcherThreads(numDispatcherThreads) {
CCAPI_LOGGER_FUNCTION_ENTER;
CCAPI_LOGGER_TRACE("numDispatcherThreads = "+size_tToString(numDispatcherThreads));
Expand Down
51 changes: 0 additions & 51 deletions include/ccapi_cpp/ccapi_event_queue.h

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef INCLUDE_CCAPI_CPP_CCAPI_EXCHANGE_H_
#define INCLUDE_CCAPI_CPP_CCAPI_EXCHANGE_H_
#ifndef INCLUDE_CCAPI_CPP_CCAPI_MACRO_H_
#define INCLUDE_CCAPI_CPP_CCAPI_MACRO_H_
#define CCAPI_EXCHANGE_NAME_MARKET_DEPTH "MARKET_DEPTH"
#define CCAPI_EXCHANGE_NAME_TRADE "TRADE"
#ifndef CCAPI_EXCHANGE_NAME_MARKET_DEPTH_MAX
Expand Down Expand Up @@ -118,4 +118,4 @@
#define CCAPI_EXCHANGE_NAME_CONNECTION "connection"
#define CCAPI_EXCHANGE_NAME_REASON "reason"
#define CCAPI_EXCHANGE_NAME_ERROR_MESSAGE "message"
#endif // INCLUDE_CCAPI_CPP_CCAPI_EXCHANGE_H_
#endif // INCLUDE_CCAPI_CPP_CCAPI_MACRO_H_
2 changes: 1 addition & 1 deletion include/ccapi_cpp/ccapi_market_data_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include "websocketpp/common/connection_hdl.hpp"
#include "ccapi_cpp/ccapi_event.h"
#include "ccapi_cpp/ccapi_subscription_list.h"
#include "ccapi_cpp/ccapi_exchange.h"
#include "ccapi_cpp/ccapi_macro.h"
#include "ccapi_cpp/ccapi_market_data_message.h"
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
Expand Down
55 changes: 55 additions & 0 deletions include/ccapi_cpp/ccapi_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#ifndef INCLUDE_CCAPI_CPP_CCAPI_QUEUE_H_
#define INCLUDE_CCAPI_CPP_CCAPI_QUEUE_H_
#include <queue>
#include <mutex>
#include <vector>
#include "ccapi_cpp/ccapi_logger.h"
namespace ccapi {
template <class T>
class Queue {
public:
std::string EXCEPTION_QUEUE_FULL = "queue is full";
std::string EXCEPTION_QUEUE_EMPTY = "queue is empty";
explicit Queue(const size_t maxSize = 0)
: maxSize(maxSize) {}
void pushBack(T&& t) {
std::lock_guard<std::mutex> lock(this->m);
if (this->maxSize <= 0 || this->queue.size() < this->maxSize) {
CCAPI_LOGGER_TRACE("this->queue.size() = "+size_tToString(this->queue.size()));
this->queue.push_back(t);
} else {
CCAPI_LOGGER_FATAL(EXCEPTION_QUEUE_FULL);
}
}
T popBack() {
std::lock_guard<std::mutex> lock(this->m);
if (this->queue.empty()) {
CCAPI_LOGGER_FATAL(EXCEPTION_QUEUE_EMPTY);
} else {
T t = std::move(this->queue.back());
this->queue.pop_back();
return t;
}
}
std::vector<T> purge() {
std::lock_guard<std::mutex> lock(this->m);
std::vector<T> p;
std::swap(p, this->queue);
return p;
}
size_t size() const {
std::lock_guard<std::mutex> lock(this->m);
return this->queue.size();
}
bool empty() const {
std::lock_guard<std::mutex> lock(this->m);
return this->queue.empty();
}

private:
std::vector<T> queue;
mutable std::mutex m;
size_t maxSize{};
};
} /* namespace ccapi */
#endif // INCLUDE_CCAPI_CPP_CCAPI_QUEUE_H_
22 changes: 13 additions & 9 deletions include/ccapi_cpp/ccapi_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
#include <vector>
#include <map>
#include <unordered_set>
#include "ccapi_cpp/ccapi_event_queue.h"
#include "ccapi_cpp/ccapi_exchange.h"
#include "ccapi_cpp/ccapi_queue.h"
#include "ccapi_cpp/ccapi_macro.h"
#include "ccapi_cpp/ccapi_event_dispatcher.h"
#include "ccapi_cpp/ccapi_event_handler.h"
#include "ccapi_cpp/ccapi_event.h"
Expand All @@ -56,7 +56,8 @@ class Session final {
: sessionOptions(options),
sessionConfigs(configs),
eventHandler(eventHandler),
eventDispatcher(eventDispatcher) {
eventDispatcher(eventDispatcher),
eventQueue(options.maxEventQueueSize) {
CCAPI_LOGGER_FUNCTION_ENTER;
if (this->eventHandler) {
if (!this->eventDispatcher) {
Expand All @@ -67,7 +68,6 @@ class Session final {
throw std::runtime_error("undefined behavior");
}
}
this->eventQueue.setMaxSize(options.maxEventQueueSize);
CCAPI_LOGGER_FUNCTION_EXIT;
}
// bool openService(std::string serviceName = "") {
Expand Down Expand Up @@ -151,7 +151,7 @@ class Session final {
if (this->eventDispatcher) {
this->eventDispatcher->start();
}
std::function<void(Event& event)> wsEventHandler = std::bind(&Session::onEvent, this, std::placeholders::_1);
std::function<void(Event& event)> wsEventHandler = std::bind(&Session::onEvent, this, std::placeholders::_1, nullptr);
auto sessionOptions = this->sessionOptions;
auto sessionConfigs = this->sessionConfigs;
CCAPI_LOGGER_TRACE("sessionOptions.enableOneIoContextPerExchange = "+toString(sessionOptions.enableOneIoContextPerExchange));
Expand Down Expand Up @@ -327,10 +327,10 @@ class Session final {
}
CCAPI_LOGGER_FUNCTION_EXIT;
}
void onEvent(Event& event) {
void onEvent(Event& event, Queue<Event> *eventQueue) {
CCAPI_LOGGER_FUNCTION_ENTER;
CCAPI_LOGGER_TRACE("event = "+toString(event));
if (this->eventHandler) {
if (this->eventHandler && !eventQueue) {
CCAPI_LOGGER_TRACE("handle event asynchronously");
this->eventDispatcher->dispatch([&, event] {
bool shouldContinue = true;
Expand All @@ -346,11 +346,15 @@ class Session final {
});
} else {
CCAPI_LOGGER_TRACE("handle event synchronously");
this->eventQueue.push(std::move(event));
if (eventQueue) {
eventQueue->pushBack(std::move(event));
} else {
this->eventQueue.pushBack(std::move(event));
}
}
CCAPI_LOGGER_FUNCTION_EXIT;
}
EventQueue eventQueue;
Queue<Event> eventQueue;

private:
// std::string serviceName;
Expand Down
2 changes: 1 addition & 1 deletion include/ccapi_cpp/ccapi_session_configs.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#ifndef INCLUDE_CCAPI_CPP_CCAPI_SESSION_CONFIGS_H_
#define INCLUDE_CCAPI_CPP_CCAPI_SESSION_CONFIGS_H_
#include "ccapi_cpp/ccapi_macro.h"
#include <string>
#include <map>
#include <vector>
#include <set>
#include "ccapi_cpp/ccapi_exchange.h"
#include "ccapi_cpp/ccapi_logger.h"
namespace ccapi {
class SessionConfigs final {
Expand Down
2 changes: 1 addition & 1 deletion include/ccapi_cpp/ccapi_session_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class SessionOptions final {
bool enableOneIoContextPerExchange{};
long pingIntervalMilliSeconds{10000};
long pongTimeoutMilliSeconds{5000};
long maxEventQueueSize{-1};
long maxEventQueueSize{0};
};
} /* namespace ccapi */
#endif // INCLUDE_CCAPI_CPP_CCAPI_SESSION_OPTIONS_H_
2 changes: 1 addition & 1 deletion include/ccapi_cpp/ccapi_subscription.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#ifndef INCLUDE_CCAPI_CPP_CCAPI_SUBSCRIPTION_H_
#define INCLUDE_CCAPI_CPP_CCAPI_SUBSCRIPTION_H_
#include "ccapi_cpp/ccapi_macro.h"
#include <string>
#include <set>
#include "ccapi_cpp/ccapi_correlationId.h"
#include "ccapi_cpp/ccapi_exchange.h"
#include "ccapi_cpp/ccapi_util.h"
namespace ccapi {
class Subscription final {
Expand Down

0 comments on commit f7f7d05

Please sign in to comment.