Skip to content

Commit

Permalink
Merge pull request #374 from crypto-chassis/develop
Browse files Browse the repository at this point in the history
Release
  • Loading branch information
cryptochassis authored Mar 28, 2023
2 parents d0cfac7 + 88e8d4f commit 0415e56
Show file tree
Hide file tree
Showing 80 changed files with 2,378 additions and 512 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
Notification to Maintainers and Developers: we are aiming at an effort to transition our websocket related code from using websocketpp to boost beast websocket. In release v5.43.x, everything remains fully backward compatible. The codebase introduced a macro `CCAPI_USE_BOOST_BEAST_WEBSOCKET` to faciliate the future transition. If you have any questions, feel free to contact us at any time. Thank you.

<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)*
Expand Down
1 change: 1 addition & 0 deletions include/ccapi_cpp/ccapi_http_retry.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#ifndef INCLUDE_CCAPI_CPP_CCAPI_HTTP_RETRY_H_
#define INCLUDE_CCAPI_CPP_CCAPI_HTTP_RETRY_H_
#include <future>
#include <string>
namespace ccapi {
/**
Expand Down
95 changes: 95 additions & 0 deletions include/ccapi_cpp/ccapi_inflate_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#ifndef INCLUDE_CCAPI_CPP_CCAPI_INFLATE_STREAM_H_
#define INCLUDE_CCAPI_CPP_CCAPI_INFLATE_STREAM_H_
#ifndef CCAPI_DECOMPRESS_BUFFER_SIZE
#define CCAPI_DECOMPRESS_BUFFER_SIZE 1 << 20
#endif
#include "zlib.h"
namespace ccapi {
/**
* Due to Huobi using gzip instead of zip in data compression, we cannot use beast::zboost::system::inflate_stream. Therefore we have to create our own.
*/
class InflateStream CCAPI_FINAL {
public:
explicit InflateStream() {
this->windowBits = 15;
this->windowBitsOverride = 0;
this->istate.zalloc = Z_NULL;
this->istate.zfree = Z_NULL;
this->istate.opaque = Z_NULL;
this->istate.avail_in = 0;
this->istate.next_in = Z_NULL;
this->decompressBufferSize = CCAPI_DECOMPRESS_BUFFER_SIZE;
}
std::string toString() const {
std::string output = "InflateStream [windowBits = " + ccapi::toString(windowBits) + "]";
return output;
}
virtual ~InflateStream() {
if (!this->initialized) {
return;
}
int ret = inflateEnd(&this->istate);
if (ret != Z_OK) {
CCAPI_LOGGER_ERROR("error cleaning up zlib decompression state");
}
}
void setWindowBitsOverride(int windowBitsOverride) { this->windowBitsOverride = windowBitsOverride; }
boost::system::error_code init() {
int ret;
if (this->windowBitsOverride == 0) {
ret = inflateInit2(&this->istate, -1 * this->windowBits);
} else {
ret = inflateInit2(&this->istate, this->windowBitsOverride
// 31
);
}
if (ret != Z_OK) {
CCAPI_LOGGER_ERROR("decompress error");
return boost::system::error_code();
}
this->buffer.reset(new unsigned char[this->decompressBufferSize]);
this->initialized = true;
return boost::system::error_code();
}
boost::system::error_code decompress(uint8_t const *buf, size_t len, std::string &out) {
if (!this->initialized) {
CCAPI_LOGGER_ERROR("decompress error");
return boost::system::error_code();
}
int ret;
this->istate.avail_in = len;
this->istate.next_in = const_cast<unsigned char *>(buf);
do {
this->istate.avail_out = this->decompressBufferSize;
this->istate.next_out = this->buffer.get();
int ret = inflate(&this->istate, Z_SYNC_FLUSH);
if (ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR) {
CCAPI_LOGGER_ERROR("decompress error");
return boost::system::error_code();
}
out.append(reinterpret_cast<char *>(this->buffer.get()), this->decompressBufferSize - this->istate.avail_out);
} while (this->istate.avail_out == 0);
return boost::system::error_code();
}
boost::system::error_code inflate_reset() {
int ret = inflateReset(&this->istate);
if (ret != Z_OK) {
CCAPI_LOGGER_ERROR("decompress error");
return boost::system::error_code();
}
return boost::system::error_code();
}
#ifndef CCAPI_EXPOSE_INTERNAL

private:
#endif
int windowBits;
int windowBitsOverride;
bool initialized;
std::unique_ptr<unsigned char[]> buffer;
z_stream istate;
size_t decompressBufferSize;
};

} /* namespace ccapi */
#endif // INCLUDE_CCAPI_CPP_CCAPI_INFLATE_STREAM_H_
20 changes: 10 additions & 10 deletions include/ccapi_cpp/ccapi_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ class Session {
for (auto& subscriptionListByExchange : subscriptionListByExchangeMap) {
auto exchange = subscriptionListByExchange.first;
auto subscriptionList = subscriptionListByExchange.second;
std::map<std::string, wspp::lib::shared_ptr<Service> >& serviceByExchangeMap = this->serviceByServiceNameExchangeMap.at(serviceName);
std::map<std::string, std::shared_ptr<Service> >& serviceByExchangeMap = this->serviceByServiceNameExchangeMap.at(serviceName);
if (serviceByExchangeMap.find(exchange) == serviceByExchangeMap.end()) {
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, "please enable exchange: " + exchange);
return;
Expand All @@ -706,7 +706,7 @@ class Session {
for (auto& subscriptionListByExchange : subscriptionListByExchangeMap) {
auto exchange = subscriptionListByExchange.first;
auto subscriptionList = subscriptionListByExchange.second;
std::map<std::string, wspp::lib::shared_ptr<Service> >& serviceByExchangeMap = this->serviceByServiceNameExchangeMap.at(serviceName);
std::map<std::string, std::shared_ptr<Service> >& serviceByExchangeMap = this->serviceByServiceNameExchangeMap.at(serviceName);
if (serviceByExchangeMap.find(exchange) == serviceByExchangeMap.end()) {
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, "please enable exchange: " + exchange);
return;
Expand All @@ -725,7 +725,7 @@ class Session {
return;
}
auto exchange = subscription.getExchange();
std::map<std::string, wspp::lib::shared_ptr<Service> >& serviceByExchangeMap = this->serviceByServiceNameExchangeMap.at(serviceName);
std::map<std::string, std::shared_ptr<Service> >& serviceByExchangeMap = this->serviceByServiceNameExchangeMap.at(serviceName);
if (serviceByExchangeMap.find(exchange) == serviceByExchangeMap.end()) {
this->onError(Event::Type::FIX_STATUS, Message::Type::FIX_FAILURE, "please enable exchange: " + exchange);
return;
Expand Down Expand Up @@ -781,7 +781,7 @@ class Session {
this->onError(Event::Type::FIX_STATUS, Message::Type::FIX_FAILURE, "please enable service: " + serviceName + ", and the exchanges that you want");
return;
}
std::map<std::string, wspp::lib::shared_ptr<Service> >& serviceByExchangeMap = this->serviceByServiceNameExchangeMap.at(serviceName);
std::map<std::string, std::shared_ptr<Service> >& serviceByExchangeMap = this->serviceByServiceNameExchangeMap.at(serviceName);
auto exchange = request.getExchange();
if (serviceByExchangeMap.find(exchange) == serviceByExchangeMap.end()) {
this->onError(Event::Type::FIX_STATUS, Message::Type::FIX_FAILURE, "please enable exchange: " + exchange);
Expand All @@ -805,7 +805,7 @@ class Session {
this->onError(Event::Type::REQUEST_STATUS, Message::Type::REQUEST_FAILURE, "please enable service: " + serviceName + ", and the exchanges that you want");
return;
}
std::map<std::string, wspp::lib::shared_ptr<Service> >& serviceByExchangeMap = this->serviceByServiceNameExchangeMap.at(serviceName);
std::map<std::string, std::shared_ptr<Service> >& serviceByExchangeMap = this->serviceByServiceNameExchangeMap.at(serviceName);
auto exchange = request.getExchange();
if (serviceByExchangeMap.find(exchange) == serviceByExchangeMap.end()) {
this->onError(Event::Type::REQUEST_STATUS, Message::Type::REQUEST_FAILURE, "please enable exchange: " + exchange);
Expand Down Expand Up @@ -841,7 +841,7 @@ class Session {
"please enable service: " + serviceName + ", and the exchanges that you want", eventQueuePtr);
return;
}
std::map<std::string, wspp::lib::shared_ptr<Service> >& serviceByExchangeMap = this->serviceByServiceNameExchangeMap.at(serviceName);
std::map<std::string, std::shared_ptr<Service> >& serviceByExchangeMap = this->serviceByServiceNameExchangeMap.at(serviceName);
auto exchange = request.getExchange();
if (serviceByExchangeMap.find(exchange) == serviceByExchangeMap.end()) {
this->onError(Event::Type::REQUEST_STATUS, Message::Type::REQUEST_FAILURE, "please enable exchange: " + exchange, eventQueuePtr);
Expand Down Expand Up @@ -888,7 +888,7 @@ class Session {
#ifndef SWIG
virtual void setTimer(const std::string& id, long delayMilliSeconds, std::function<void(const boost::system::error_code&)> errorHandler,
std::function<void()> successHandler) {
wspp::lib::asio::post(this->serviceContextPtr->tlsClientPtr->get_io_service(), [this, id, delayMilliSeconds, errorHandler, successHandler]() {
boost::asio::post(*this->serviceContextPtr->ioContextPtr, [this, id, delayMilliSeconds, errorHandler, successHandler]() {
std::shared_ptr<steady_timer> timerPtr(new steady_timer(*this->serviceContextPtr->ioContextPtr, boost::asio::chrono::milliseconds(delayMilliSeconds)));
timerPtr->async_wait([this, id, errorHandler, successHandler](const boost::system::error_code& ec) {
if (this->eventHandler) {
Expand All @@ -914,7 +914,7 @@ class Session {
});
}
virtual void cancelTimer(const std::string& id) {
wspp::lib::asio::post(this->serviceContextPtr->tlsClientPtr->get_io_service(), [this, id]() {
boost::asio::post(*this->serviceContextPtr->ioContextPtr, [this, id]() {
if (this->delayTimerByIdMap.find(id) != this->delayTimerByIdMap.end()) {
this->delayTimerByIdMap[id]->cancel();
this->delayTimerByIdMap.erase(id);
Expand Down Expand Up @@ -944,8 +944,8 @@ class Session {
EventDispatcher* eventDispatcher;
bool useInternalEventDispatcher{};
#endif
wspp::lib::shared_ptr<ServiceContext> serviceContextPtr;
std::map<std::string, std::map<std::string, wspp::lib::shared_ptr<Service> > > serviceByServiceNameExchangeMap;
std::shared_ptr<ServiceContext> serviceContextPtr;
std::map<std::string, std::map<std::string, std::shared_ptr<Service> > > serviceByServiceNameExchangeMap;
std::thread t;
Queue<Event> eventQueue;
std::function<void(Event& event, Queue<Event>* eventQueue)> internalEventHandler;
Expand Down
4 changes: 4 additions & 0 deletions include/ccapi_cpp/ccapi_session_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class SessionOptions CCAPI_FINAL {
long httpConnectionPoolIdleTimeoutMilliSeconds{0}; // used to purge the http connection pool if all connections in the
// pool have stayed idle for at least this amount of time
bool enableOneHttpConnectionPerRequest{}; // create a new http connection for each request
#ifndef CCAPI_USE_BOOST_BEAST_WEBSOCKET
#else
long websocketConnectTimeoutMilliSeconds{10000};
#endif
};
} /* namespace ccapi */
#endif // INCLUDE_CCAPI_CPP_CCAPI_SESSION_OPTIONS_H_
95 changes: 95 additions & 0 deletions include/ccapi_cpp/ccapi_ws_connection.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#ifndef INCLUDE_CCAPI_CPP_CCAPI_WS_CONNECTION_H_
#define INCLUDE_CCAPI_CPP_CCAPI_WS_CONNECTION_H_
#ifndef CCAPI_USE_BOOST_BEAST_WEBSOCKET
#include <string>

#include "ccapi_cpp/ccapi_logger.h"
Expand Down Expand Up @@ -74,4 +75,98 @@ class WsConnection CCAPI_FINAL {
std::map<std::string, std::string> credential;
};
} /* namespace ccapi */
#else
#include <string>

#include "ccapi_cpp/ccapi_logger.h"
#include "ccapi_cpp/ccapi_subscription.h"
namespace ccapi {
/**
* This class represents a TCP socket connection for the websocket API.
*/
class WsConnection CCAPI_FINAL {
public:
WsConnection(std::string url, std::string group, std::vector<Subscription> subscriptionList, std::map<std::string, std::string> credential,
std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream> > > streamPtr)
: url(url), group(group), subscriptionList(subscriptionList), credential(credential), streamPtr(streamPtr) {
this->id = this->url + "||" + this->group + "||" + ccapi::toString(this->subscriptionList) + "||" + ccapi::toString(this->credential);
this->correlationIdList.reserve(subscriptionList.size());
std::transform(subscriptionList.cbegin(), subscriptionList.cend(), this->correlationIdList.begin(),
[](Subscription subscription) { return subscription.getCorrelationId(); });
auto splitted1 = UtilString::split(url, "://");
auto foundSlash = splitted1[1].find_first_of('/');
auto foundQuestionMark = splitted1[1].find_first_of('?');
if (foundSlash == std::string::npos && foundQuestionMark == std::string::npos) {
this->path = "/";
} else if (foundSlash == std::string::npos && foundQuestionMark != std::string::npos) {
this->path = "/" + splitted1[1].substr(foundQuestionMark);
} else if (foundSlash != std::string::npos && foundQuestionMark == std::string::npos) {
this->path = splitted1[1].substr(foundSlash);
} else {
this->path = splitted1[1].substr(foundSlash);
}
}
WsConnection() {}
std::string toString() const {
std::map<std::string, std::string> shortCredential;
for (const auto& x : credential) {
shortCredential.insert(std::make_pair(x.first, UtilString::firstNCharacter(x.second, CCAPI_CREDENTIAL_DISPLAY_LENGTH)));
}
std::ostringstream oss;
oss << streamPtr;
std::string output = "WsConnection [id = " + id + ", url = " + url + ", group = " + group + ", subscriptionList = " + ccapi::toString(subscriptionList) +
", credential = " + ccapi::toString(shortCredential) + ", status = " + statusToString(status) +
", headers = " + ccapi::toString(headers) + ", streamPtr = " + oss.str() + "]";
return output;
}
enum class Status {
UNKNOWN,
CONNECTING,
OPEN,
FAILED,
CLOSING,
CLOSED,
};
static std::string statusToString(Status status) {
std::string output;
switch (status) {
case Status::UNKNOWN:
output = "UNKNOWN";
break;
case Status::CONNECTING:
output = "CONNECTING";
break;
case Status::OPEN:
output = "OPEN";
break;
case Status::FAILED:
output = "FAILED";
break;
case Status::CLOSING:
output = "CLOSING";
break;
case Status::CLOSED:
output = "CLOSED";
break;
default:
CCAPI_LOGGER_FATAL(CCAPI_UNSUPPORTED_VALUE);
}
return output;
}
std::string id;
std::string url;
std::string group;
std::vector<Subscription> subscriptionList;
std::vector<std::string> correlationIdList;
Status status{Status::UNKNOWN};
std::map<std::string, std::string> headers;
std::map<std::string, std::string> credential;
std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream> > > streamPtr;
beast::websocket::close_code remoteCloseCode;
beast::websocket::close_reason remoteCloseReason;
std::string hostHttpHeaderValue;
std::string path;
};
} /* namespace ccapi */
#endif
#endif // INCLUDE_CCAPI_CPP_CCAPI_WS_CONNECTION_H_
Loading

0 comments on commit 0415e56

Please sign in to comment.