Skip to content

Commit

Permalink
Merge pull request #31 from crypto-chassis/execution_management
Browse files Browse the repository at this point in the history
feat: execution management
  • Loading branch information
cryptochassis authored Jan 5, 2021
2 parents 9cd437c + 9deb345 commit da9f722
Show file tree
Hide file tree
Showing 71 changed files with 5,179 additions and 912 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cpplint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ jobs:
- uses: actions/checkout@v1
- uses: actions/setup-python@v1
- run: pip install cpplint
- run: cpplint --recursive --exclude=include/ccapi_cpp/websocketpp_decompress_workaround.h --filter -legal/copyright,-whitespace/line_length,-build/c++11,-runtime/references,-build/include_what_you_use,-runtime/int include/**/* example/src/*
- run: cpplint --recursive --exclude=include/ccapi_cpp/ccapi_hmac.h --exclude=include/ccapi_cpp/websocketpp_decompress_workaround.h --exclude=*/build --filter -legal/copyright,-whitespace/line_length,-build/c++11,-runtime/references,-build/include_what_you_use,-runtime/int include/**/* example/**/* test/**/*
6 changes: 5 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ jobs:
{"type": "style", "release": "patch"},
{"type": "refactor", "release": "patch"},
{"type": "perf", "release": "patch"},
{"type": "test", "release": "patch"}
{"type": "test", "release": "patch"},
{"type": "minor", "release": "minor"},
{"type": "major", "release": "major"}
]
}], ["@semantic-release/release-notes-generator", {
"preset": "conventionalcommits"
}], "@semantic-release/github"]
}
}'' > "package.json"'
Expand Down
20 changes: 16 additions & 4 deletions .github/workflows/build_cmake.yml → .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ jobs:
execute_process(
COMMAND ${{ steps.cmake_and_ninja.outputs.cmake_dir }}/cmake
-DBUILD_BUILD_TEST=ON
-DBUILD_TEST_BUILD=ON
-DBUILD_TEST_UNIT=ON
-S test
-B build
-B test/build
-D CMAKE_BUILD_TYPE=$ENV{BUILD_TYPE}
-G Ninja
-D CMAKE_MAKE_PROGRAM=ninja
Expand All @@ -167,7 +168,6 @@ jobs:
message(FATAL_ERROR "Bad exit status")
endif()
- name: Build
shell: cmake -P {0}
run: |
Expand Down Expand Up @@ -202,11 +202,23 @@ jobs:
execute_process(COMMAND ccache -z)
execute_process(
COMMAND ${{ steps.cmake_and_ninja.outputs.cmake_dir }}/cmake --build build -j
COMMAND ${{ steps.cmake_and_ninja.outputs.cmake_dir }}/cmake --build test/build -j
RESULT_VARIABLE result
)
if (NOT result EQUAL 0)
message(FATAL_ERROR "Bad exit status")
endif()
execute_process(COMMAND ccache -s)
- name: Test
shell: cmake -P {0}
run: |
execute_process(
COMMAND ctest -j
WORKING_DIRECTORY test/build
RESULT_VARIABLE result
)
if (NOT result EQUAL 0)
message(FATAL_ERROR "Bad exit status")
endif()
222 changes: 190 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,46 @@
- [Build](#build)
- [Constants](#constants)
- [Examples](#examples)
- [Simple](#simple)
- [Advanced](#advanced)
- [Simple Market Data](#simple-market-data)
- [Advanced Market Data](#advanced-market-data)
- [Specify market depth](#specify-market-depth)
- [Specify correlation id](#specify-correlation-id)
- [Normalize instrument name](#normalize-instrument-name)
- [Multiple exchanges and/or instruments](#multiple-exchanges-andor-instruments)
- [Receive events at periodic intervals](#receive-events-at-periodic-intervals)
- [Receive events at periodic intervals including when the market depth snapshot hasn't changed](#receive-events-at-periodic-intervals-including-when-the-market-depth-snapshot-hasnt-changed)
- [Dispatch events to multiple threads](#dispatch-events-to-multiple-threads)
- [Handle Events Synchronously](#handle-events-synchronously)
- [Simple Execution Management](#simple-execution-management)
- [Advanced Execution Management](#advanced-execution-management)
- [Specify correlation id](#specify-correlation-id-1)
- [Normalize instrument name](#normalize-instrument-name-1)
- [Multiple exchanges and/or instruments](#multiple-exchanges-andor-instruments-1)
- [Make Session::sendRequest blocking](#make-sessionsendrequest-blocking)
- [Multiple sets of API credentials for the same exchange](#multiple-sets-of-api-credentials-for-the-same-exchange)
- [Override exchange urls](#override-exchange-urls)
- [More Advanced Topics](#more-advanced-topics)
- [Handle events in "immediate" vs. "batching" mode](#handle-events-in-immediate-vs-batching-mode)
- [Thread safety](#thread-safety)
- [Enable library logging](#enable-library-logging)
- [Contributing](#contributing)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

**NEW**: Version 2.0.0 has been released. The APIs remain to be largely the same to version 1.x.x except for a few small breaking changes with the most prominent one being renaming of some of the macros needed at build time.
**NEW**: Version 2.2.x released execution management for the binance family: binance, binance-futures, and binance-us.

**BREAKING CHANGE**: Version 2.2.x introduced a few breaking changes:
* Added `CCAPI_CPP_` prefix to enablement macros.
* Changed `eventQueue` visibility in `Session` from public to private.
* Changed `logMessage` parameters in `Logger` from mixed types to all std::string.

# ccapi_cpp
* A header-only C++ library for streaming public market data directly from cryptocurrency exchanges (i.e. the connections are between your server and the exchange server without anything in-between).
* A header-only C++ library for streaming market data and executing trades directly from cryptocurrency exchanges (i.e. the connections are between your server and the exchange server without anything in-between).
* Code closely follows Bloomberg's API: https://www.bloomberg.com/professional/support/api-library/.
* It is ultra fast thanks to very careful optimizations: move semantics, regex optimization, locality of reference, lock contention minimization, etc.
* Supported exchanges: coinbase, gemini, kraken, bitstamp, bitfinex, bitmex, binance-us, binance, binance-futures, huobi, okex.
* Supported exchanges:
* Market data: coinbase, gemini, kraken, bitstamp, bitfinex, bitmex, binance-us, binance, binance-futures, huobi, okex.
* Execution Management: binance-us, binance, binance-futures.
* To spur innovation and industry collaboration, this library is open for use by the public without cost. Follow us on https://medium.com/@cryptochassis and our publication on https://medium.com/open-crypto-market-data-initiative.
* For historical data, see https://github.com/crypto-chassis/cryptochassis-api-docs.
* For historical market data, see https://github.com/crypto-chassis/cryptochassis-api-docs.
* Since symbol normalization is a tedious task, you can choose to use a reference file at https://marketdata-e0323a9039add2978bf5b49550572c7c-public.s3.amazonaws.com/supported_exchange_instrument_subscription_data.csv.gz which we frequently update.
* Please contact us for general questions, issue reporting, consultative services, and/or custom engineering work. To subscribe to our mailing list, simply send us an email with subject "subscribe".

Expand All @@ -42,8 +59,7 @@
* Example CMake: example/CMakeLists.txt.
* Require C++14 and OpenSSL.
* Definitions in the compiler command line:
* Define service enablement macro `ENABLE_SERVICE_MARKET_DATA` and exchange enablement macros such as `ENABLE_EXCHANGE_COINBASE`, etc. These macros can be found at the top of `include/ccapi_cpp/ccapi_session.h`.
* If your OpenSSL version is older than 1.1, define macro `OPENSSL_VERSION_MAJOR` and `OPENSSL_VERSION_MINOR` (e.g. for OpenSSL 1.0.2s, define `OPENSSL_VERSION_MAJOR` to be 1 and `OPENSSL_VERSION_MINOR` to be 0).
* Define service enablement macro such as `CCAPI_ENABLE_SERVICE_MARKET_DATA`, `CCAPI_ENABLE_SERVICE_EXECUTION_MANAGEMENT`, etc. and exchange enablement macros such as `CCAPI_ENABLE_EXCHANGE_COINBASE`, etc. These macros can be found at the top of `include/ccapi_cpp/ccapi_session.h`.
* Include directories:
* include
* dependency/websocketpp
Expand All @@ -63,7 +79,7 @@

## Examples
[Source](example)
### Simple
### Simple Market Data
**Objective:**

For a specific exchange and instrument, whenever the top 10 bids' or asks' price or size change, print the market depth snapshot at that moment.
Expand Down Expand Up @@ -115,7 +131,7 @@ Best bid and ask at 2020-07-27T23:56:51.935993000Z are:
...
```

### Advanced
### Advanced Market Data
#### Specify market depth

Instantiate `Subscription` with option `MARKET_DEPTH_MAX` set to be the desired market depth.
Expand All @@ -139,17 +155,16 @@ std::string coolName = "btc_usd";
exchangeInstrumentSymbolMap["coinbase"][coolName] = "BTC-USD";
SessionConfigs sessionConfigs(exchangeInstrumentSymbolMap);
Session session(sessionOptions, sessionConfigs, &eventHandler);
Subscription subscription("coinbase", coolName, "MARKET_DEPTH");
```

#### Multiple exchanges and/or instruments

Subscribe a `std::vector<Subscription>`.
```
std::vector<Subscription> subscriptionList;
Subscription subscription_1("coinbase", "BTC-USD", "MARKET_DEPTH", "", "coinbase|btc_usd");
Subscription subscription_1("coinbase", "BTC-USD", "MARKET_DEPTH");
subscriptionList.push_back(subscription_1);
Subscription subscription_2("binance-us", "ethusd", "MARKET_DEPTH", "", "binance-us|eth_usd");
Subscription subscription_2("binance-us", "ethusd", "MARKET_DEPTH");
subscriptionList.push_back(subscription_2);
session.subscribe(subscriptionList);
```
Expand All @@ -168,36 +183,178 @@ Instantiate `Subscription` with option `CCAPI_EXCHANGE_NAME_CONFLATE_INTERVAL_MI
Subscription subscription("coinbase", "BTC-USD", "MARKET_DEPTH", "CONFLATE_INTERVAL_MILLISECONDS=1000&CONFLATE_GRACE_PERIOD_MILLISECONDS=0");
```

#### Dispatch events to multiple threads
### Simple Execution Management
**Objective:**

Instantiate `EventDispatcher` with
`numDispatcherThreads` set to be the desired number.
For a specific exchange and instrument, submit a simple limit order.

**Code:**
```
EventDispatcher eventDispatcher(2);
Session session(sessionOptions, sessionConfigs, &eventHandler, &eventDispatcher);
#include "ccapi_cpp/ccapi_session.h"
namespace ccapi {
Logger* Logger::logger = nullptr; // This line is needed.
class MyEventHandler : public EventHandler {
public:
bool processEvent(const Event& event, Session *session) override {
std::cout << "Received an event: " + toString(event) << std::endl;
return true;
}
};
} /* namespace ccapi */
int main(int argc, char** argv) {
using namespace ccapi; // NOLINT(build/namespaces)
std::string key = UtilSystem::getEnvAsString("BINANCE_US_API_KEY");
if (key.empty()) {
std::cerr << "Please set environment variable BINANCE_US_API_KEY" << std::endl;
return EXIT_FAILURE;
}
std::string secret = UtilSystem::getEnvAsString("BINANCE_US_API_SECRET");
if (secret.empty()) {
std::cerr << "Please set environment variable BINANCE_US_API_SECRET" << std::endl;
return EXIT_FAILURE;
}
SessionOptions sessionOptions;
SessionConfigs sessionConfigs;
MyEventHandler eventHandler;
Session session(sessionOptions, sessionConfigs, &eventHandler);
Request request(Request::Operation::CREATE_ORDER, "binance-us", "BTCUSD");
request.appendParam({
{"SIDE", "BUY"},
{"QUANTITY", "0.0005"},
{"LIMIT_PRICE", "20000"}
});
session.sendRequest(request);
std::this_thread::sleep_for(std::chrono::seconds(10));
session.stop();
std::cout << "Bye" << std::endl;
return EXIT_SUCCESS;
}
```
**Output:**
```console
Received an event:
Event [
type = RESPONSE,
messageList = [
Message [
type = CREATE_ORDER,
recapType = UNKNOWN,
time = 1970-01-01T00:00:00.000000000Z,
timeReceived = 2021-01-04T04:15:04.710133000Z,
elementList = [
Element [
nameValueMap = {
CLIENT_ORDER_ID = MbdTQCHc0EQgLKry0Ryrhr,
CUMULATIVE_FILLED_PRICE_TIMES_QUANTITY = 0.0000,
CUMULATIVE_FILLED_QUANTITY = 0.00000000,
INSTRUMENT = BTCUSD,
LIMIT_PRICE = 20000.0000,
ORDER_ID = 187143156,
QUANTITY = 0.00050000,
SIDE = BUY,
STATUS = OPEN
}
]
],
correlationIdList = [ 5PN2qmWqBlQ9wQj99nsQzldVI5ZuGXbE ]
]
]
]
Bye
```
### Advanced Execution Management

#### Specify correlation id

Instantiate `Request` with the desired correlationId.
```
Request request(Request::Operation::CREATE_ORDER, "binance-us", "BTCUSD", "cool correlation id");
```

#### Normalize instrument name

Instantiate `SessionConfigs` with a map mapping the exchange name and the normalized instrument name to the instrument's symbol on the exchange.
```
std::map<std::string, std::map<std::string, std::string> > exchangeInstrumentSymbolMap;
std::string coolName = "btc_usd";
exchangeInstrumentSymbolMap["coinbase"][coolName] = "BTC-USD";
SessionConfigs sessionConfigs(exchangeInstrumentSymbolMap);
Session session(sessionOptions, sessionConfigs, &eventHandler);
```

#### Handle Events Synchronously
#### Multiple exchanges and/or instruments

Instantiate `Session` without `EventHandler`, then obtain the events to be processed by calling `session.eventQueue.purge()`.
Send a `std::vector<Request>`.
```
std::vector<Request> requestList;
Request request_1(Request::Operation::CREATE_ORDER, "binance-us", "BTCUSD");
request_1.appendParam(...);
requestList.push_back(request_1);
Request request_2(Request::Operation::CREATE_ORDER, "binance-us", "ETHUSD");
request_2.appendParam(...);
requestList.push_back(request_2);
session.sendRequest(requestList);
```

#### Make Session::sendRequest blocking
Instantiate `Session` without `EventHandler` argument, and pass a pointer to `Queue<Event>` as an additional argument.
```
Session session(sessionOptions, sessionConfigs);
Subscription subscription("coinbase", "BTC-USD", "MARKET_DEPTH");
session.subscribe(subscription);
std::this_thread::sleep_for(std::chrono::seconds(5));
std::vector<Event> eventList = session.eventQueue.purge();
...
Queue<Event> eventQueue;
session.sendRequest(request, &eventQueue); // block until a response is received
std::vector<Event> eventList = eventQueue.purge();
```

#### Multiple sets of API credentials for the same exchange
There are 3 ways to provide API credentials (listed with increasing priority).
* Set the relevent environment variables (see section "exchange API credentials" in `include/ccapi_cpp/ccapi_macro.h`).
* Provide credentials to `SessionConfigs`.
```
sessionConfigs.setCredential({
{"BINANCE_US_API_KEY", ...},
{"BINANCE_US_API_SECRET", ...}
});
```
* Provide credentials to `Request`.
```
Request request(Request::Operation::CREATE_ORDER, "binance-us", "BTCUSD", "cool correlation id", {
{"BINANCE_US_API_KEY", ...},
{"BINANCE_US_API_SECRET", ...}
});
```

#### Override exchange urls
See section "exchange REST urls" in `include/ccapi_cpp/ccapi_macro.h`.
### More Advanced Topics

#### Handle events in "immediate" vs. "batching" mode

In general there are 2 ways to handle events.
* When a `Session` is instantiated with an `eventHandler` argument, it will handle events in immediate mode. The `processEvent` method in the `eventHandler` will be executed on one of the internal threads in the `eventDispatcher`. A default `EventDispatcher` with 1 internal thread will be created if no `eventDispatcher` argument is provided in `Session` instantiation. To dispatch events to multiple threads, instantiate `EventDispatcher` with `numDispatcherThreads` set to be the desired number. `EventHandler`s and/or `EventDispatcher`s can be shared among different sessions. Otherwise, different sessions are independent from each other.
```
EventDispatcher eventDispatcher(2);
Session session(sessionOptions, sessionConfigs, &eventHandler, &eventDispatcher);
```
* When a `Session` is instantiated without an `eventHandler` argument, it will handle events in batching mode. The evetns will be batched into an internal `Queue<Event>` and can be retrieved by
```
std::vector<Event> eventList = session.getEventQueue().purge();
```

#### Thread safety
The following methods are implemented to be thread-safe: `Session::subscribe`, `Session::sendRequest`, all public methods in `Queue`.

#### Enable library logging

Extend a subclass, e.g. `MyLogger`, from class `Logger` and override method `logMessage`. Assign a `MyLogger` pointer to `Logger::logger`. Add one of the following macros in the compiler command line: `ENABLE_LOG_TRACE`, `ENABLE_LOG_DEBUG`, `ENABLE_LOG_INFO`, `ENABLE_LOG_WARN`, `ENABLE_LOG_ERROR`, `ENABLE_LOG_FATAL`.
Extend a subclass, e.g. `MyLogger`, from class `Logger` and override method `logMessage`. Assign a `MyLogger` pointer to `Logger::logger`. Add one of the following macros in the compiler command line: `CCAPI_ENABLE_LOG_TRACE`, `CCAPI_ENABLE_LOG_DEBUG`, `CCAPI_ENABLE_LOG_INFO`, `CCAPI_ENABLE_LOG_WARN`, `CCAPI_ENABLE_LOG_ERROR`, `CCAPI_ENABLE_LOG_FATAL`.
```
namespace ccapi {
class MyLogger final: public Logger {
public:
virtual void logMessage(Logger::Severity severity, std::thread::id threadId,
std::chrono::system_clock::time_point time,
std::string fileName, int lineNumber,
virtual void logMessage(std::string severity,
std::string threadId,
std::string timeISO,
std::string fileName,
std::string lineNumber,
std::string message) override {
...
}
Expand All @@ -206,7 +363,8 @@ MyLogger myLogger;
Logger* Logger::logger = &myLogger;
}
```
Add one of the following macros in the compiler command line: `ENABLE_LOG_TRACE`, `ENABLE_LOG_DEBUG`, `ENABLE_LOG_INFO`, `ENABLE_LOG_WARN`, `ENABLE_LOG_ERROR`, `ENABLE_LOG_FATAL`.


### Contributing
* (Required) Submit a pull request to the master branch.
* (Required) Pass Github checks: https://docs.github.com/en/rest/reference/checks.
Expand Down
4 changes: 2 additions & 2 deletions example/src/execution_management_simple/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
set(NAME execution_management_simple)
project(${NAME})
add_definitions(-DENABLE_SERVICE_EXECUTION_MANAGEMENT)
add_definitions(-DENABLE_EXCHANGE_BINANCE_US)
add_compile_definitions(CCAPI_ENABLE_SERVICE_EXECUTION_MANAGEMENT)
add_compile_definitions(CCAPI_ENABLE_EXCHANGE_BINANCE_US)
add_executable(${NAME} main.cpp)
2 changes: 1 addition & 1 deletion example/src/execution_management_simple/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Logger* Logger::logger = nullptr; // This line is needed.
class MyEventHandler : public EventHandler {
public:
bool processEvent(const Event& event, Session *session) override {
std::cout << "Received an event: " + toString(event) << std::endl;
std::cout << "Received an event:\n" + event.toStringPretty(2, 2) << std::endl;
return true;
}
};
Expand Down
Loading

0 comments on commit da9f722

Please sign in to comment.