Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finishes all futures when one throws an exception #464

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/api/common/include/exchangeprivateapi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ class ExchangePrivate : public ExchangeBase {
/// Returns the amounts actually traded with the final amount balance on this currency
TradedAmountsVectorWithFinalAmount queryDustSweeper(CurrencyCode currencyCode);

/// Builds en ExchangeName wrapping the exchange and the key name
ExchangeName exchangeName() const { return ExchangeName(_exchangePublic.name(), _apiKey.name()); }
/// Builds an ExchangeName wrapping the exchange and the key name
ExchangeName exchangeName() const { return {_exchangePublic.name(), _apiKey.name()}; }

const ExchangeInfo &exchangeInfo() const { return _exchangePublic.exchangeInfo(); }

Expand Down
15 changes: 8 additions & 7 deletions src/api/exchanges/src/binanceprivateapi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "recentdeposit.hpp"
#include "ssl_sha.hpp"
#include "stringhelpers.hpp"
#include "timedef.hpp"
#include "timestring.hpp"
#include "tradeinfo.hpp"

Expand Down Expand Up @@ -61,7 +62,7 @@ void SetNonceAndSignature(const APIKey& apiKey, CurlPostData& postData, Duration

bool CheckErrorDoRetry(int statusCode, const json& ret, QueryDelayDir& queryDelayDir, Duration& sleepingTime,
Duration& queryDelay) {
static constexpr Duration kInitialDurationQueryDelay = std::chrono::milliseconds(200);
static constexpr Duration kInitialDurationQueryDelay = TimeInMs(200);
switch (statusCode) {
case kInvalidTimestamp: {
auto msgIt = ret.find("msg");
Expand All @@ -80,7 +81,7 @@ bool CheckErrorDoRetry(int statusCode, const json& ret, QueryDelayDir& queryDela
}
queryDelay -= sleepingTime;
log::warn("Our local time is ahead of Binance server's time. Query delay modified to {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(queryDelay).count());
std::chrono::duration_cast<TimeInMs>(queryDelay).count());
// Ensure Nonce is increasing while modifying the query delay
std::this_thread::sleep_for(sleepingTime);
return true;
Expand All @@ -96,7 +97,7 @@ bool CheckErrorDoRetry(int statusCode, const json& ret, QueryDelayDir& queryDela
}
queryDelay += sleepingTime;
log::warn("Our local time is behind of Binance server's time. Query delay modified to {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(queryDelay).count());
std::chrono::duration_cast<TimeInMs>(queryDelay).count());
return true;
}
}
Expand Down Expand Up @@ -131,7 +132,7 @@ json PrivateQuery(CurlHandle& curlHandle, const APIKey& apiKey, HttpRequestType
json ret;
for (int retryPos = 0; retryPos < kNbOrderRequestsRetries; ++retryPos) {
if (retryPos != 0) {
log::trace("Wait {} ms...", std::chrono::duration_cast<std::chrono::milliseconds>(sleepingTime).count());
log::trace("Wait {} ms...", std::chrono::duration_cast<TimeInMs>(sleepingTime).count());
std::this_thread::sleep_for(sleepingTime);
sleepingTime = (3 * sleepingTime) / 2;
}
Expand All @@ -153,7 +154,7 @@ json PrivateQuery(CurlHandle& curlHandle, const APIKey& apiKey, HttpRequestType
break;
}
if (throwIfError) {
log::error("Full Binance json error: '{}'", ret.dump());
log::error("Full Binance json error for {}: '{}'", apiKey.name(), ret.dump());
throw exception("Error: {}, msg: {}", MonetaryAmount(statusCode), ret["msg"].get<std::string_view>());
}
return ret;
Expand Down Expand Up @@ -267,7 +268,7 @@ Orders BinancePrivate::queryOpenedOrders(const OrdersConstraints& openedOrdersCo
}
int64_t millisecondsSinceEpoch = orderDetails["time"].get<int64_t>();

TimePoint placedTime{std::chrono::milliseconds(millisecondsSinceEpoch)};
TimePoint placedTime{TimeInMs(millisecondsSinceEpoch)};
if (!openedOrdersConstraints.validatePlacedTime(placedTime)) {
continue;
}
Expand Down Expand Up @@ -697,7 +698,7 @@ MonetaryAmount BinancePrivate::queryWithdrawDelivery(const InitiatedWithdrawInfo
MonetaryAmount amountReceived(depositDetail["amount"].get<double>(), currencyCode);
int64_t millisecondsSinceEpoch = depositDetail["insertTime"].get<int64_t>();

TimePoint timestamp{std::chrono::milliseconds(millisecondsSinceEpoch)};
TimePoint timestamp{TimeInMs(millisecondsSinceEpoch)};

closestRecentDepositPicker.addDeposit(RecentDeposit(amountReceived, timestamp));
}
Expand Down
71 changes: 35 additions & 36 deletions src/engine/src/exchangesorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ ExchangeHealthCheckStatus ExchangesOrchestrator::healthCheck(ExchangeNameSpan ex

ExchangeHealthCheckStatus ret(selectedExchanges.size());

_threadPool.parallel_transform(selectedExchanges.begin(), selectedExchanges.end(), ret.begin(),
[](Exchange *exchange) { return std::make_pair(exchange, exchange->healthCheck()); });
_threadPool.parallelTransform(selectedExchanges.begin(), selectedExchanges.end(), ret.begin(),
[](Exchange *exchange) { return std::make_pair(exchange, exchange->healthCheck()); });

return ret;
}
Expand All @@ -109,7 +109,7 @@ ExchangeTickerMaps ExchangesOrchestrator::getTickerInformation(ExchangeNameSpan
UniquePublicSelectedExchanges selectedExchanges = _exchangeRetriever.selectOneAccount(exchangeNames);

ExchangeTickerMaps ret(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), ret.begin(),
[](Exchange *exchange) { return std::make_pair(exchange, exchange->queryAllApproximatedOrderBooks(1)); });

Expand All @@ -124,8 +124,8 @@ MarketOrderBookConversionRates ExchangesOrchestrator::getMarketOrderBooks(Market
equiCurrencyCode.isNeutral() ? "" : equiCurrencyCode);
UniquePublicSelectedExchanges selectedExchanges = _exchangeRetriever.selectOneAccount(exchangeNames);
std::array<bool, kNbSupportedExchanges> isMarketTradable;
_threadPool.parallel_transform(selectedExchanges.begin(), selectedExchanges.end(), isMarketTradable.begin(),
[mk](Exchange *exchange) { return exchange->queryTradableMarkets().contains(mk); });
_threadPool.parallelTransform(selectedExchanges.begin(), selectedExchanges.end(), isMarketTradable.begin(),
[mk](Exchange *exchange) { return exchange->queryTradableMarkets().contains(mk); });

FilterVector(selectedExchanges, isMarketTradable);

Expand All @@ -141,7 +141,7 @@ MarketOrderBookConversionRates ExchangesOrchestrator::getMarketOrderBooks(Market
}
return std::make_tuple(exchange->name(), std::move(marketOrderBook), optConversionRate);
};
_threadPool.parallel_transform(selectedExchanges.begin(), selectedExchanges.end(), ret.begin(), marketOrderBooksFunc);
_threadPool.parallelTransform(selectedExchanges.begin(), selectedExchanges.end(), ret.begin(), marketOrderBooksFunc);
return ret;
}

Expand All @@ -157,9 +157,9 @@ BalancePerExchange ExchangesOrchestrator::getBalance(std::span<const ExchangeNam
_exchangeRetriever.select(ExchangeRetriever::Order::kInitial, privateExchangeNames);

SmallVector<BalancePortfolio, kTypicalNbPrivateAccounts> balancePortfolios(balanceExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
balanceExchanges.begin(), balanceExchanges.end(), balancePortfolios.begin(),
[&](Exchange *exchange) { return exchange->apiPrivate().getAccountBalance(balanceOptions); });
[&balanceOptions](Exchange *exchange) { return exchange->apiPrivate().getAccountBalance(balanceOptions); });

BalancePerExchange ret;
ret.reserve(balanceExchanges.size());
Expand Down Expand Up @@ -202,7 +202,7 @@ WalletPerExchange ExchangesOrchestrator::getDepositInfo(std::span<const Exchange
FilterVector(depositInfoExchanges, canDepositCurrency);

SmallVector<Wallet, kTypicalNbPrivateAccounts> walletPerExchange(depositInfoExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
depositInfoExchanges.begin(), depositInfoExchanges.end(), walletPerExchange.begin(),
[depositCurrency](Exchange *exchange) { return exchange->apiPrivate().queryDepositWallet(depositCurrency); });
WalletPerExchange ret;
Expand All @@ -221,7 +221,7 @@ OpenedOrdersPerExchange ExchangesOrchestrator::getOpenedOrders(std::span<const E
_exchangeRetriever.select(ExchangeRetriever::Order::kInitial, privateExchangeNames);

OpenedOrdersPerExchange ret(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), ret.begin(), [&](Exchange *exchange) {
return std::make_pair(exchange, OrdersSet(exchange->apiPrivate().queryOpenedOrders(openedOrdersConstraints)));
});
Expand All @@ -236,7 +236,7 @@ NbCancelledOrdersPerExchange ExchangesOrchestrator::cancelOrders(std::span<const
ExchangeRetriever::SelectedExchanges selectedExchanges =
_exchangeRetriever.select(ExchangeRetriever::Order::kInitial, privateExchangeNames);
NbCancelledOrdersPerExchange nbOrdersCancelled(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), nbOrdersCancelled.begin(), [&](Exchange *exchange) {
return std::make_pair(exchange, exchange->apiPrivate().cancelOpenedOrders(ordersConstraints));
});
Expand All @@ -252,7 +252,7 @@ DepositsPerExchange ExchangesOrchestrator::getRecentDeposits(std::span<const Exc
_exchangeRetriever.select(ExchangeRetriever::Order::kInitial, privateExchangeNames);

DepositsPerExchange ret(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), ret.begin(), [&](Exchange *exchange) {
return std::make_pair(exchange, exchange->apiPrivate().queryRecentDeposits(depositsConstraints));
});
Expand All @@ -268,7 +268,7 @@ WithdrawsPerExchange ExchangesOrchestrator::getRecentWithdraws(std::span<const E
_exchangeRetriever.select(ExchangeRetriever::Order::kInitial, privateExchangeNames);

WithdrawsPerExchange ret(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), ret.begin(), [&](Exchange *exchange) {
return std::make_pair(exchange, exchange->apiPrivate().queryRecentWithdraws(withdrawsConstraints));
});
Expand All @@ -280,7 +280,7 @@ ConversionPathPerExchange ExchangesOrchestrator::getConversionPaths(Market mk, E
log::info("Query {} conversion path from {}", mk, ConstructAccumulatedExchangeNames(exchangeNames));
UniquePublicSelectedExchanges selectedExchanges = _exchangeRetriever.selectOneAccount(exchangeNames);
ConversionPathPerExchange conversionPathPerExchange(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), conversionPathPerExchange.begin(), [mk](Exchange *exchange) {
return std::make_pair(exchange, exchange->apiPublic().findMarketsPath(mk.base(), mk.quote()));
});
Expand All @@ -305,8 +305,8 @@ MarketsPerExchange ExchangesOrchestrator::getMarketsPerExchange(CurrencyCode cur
[cur1, cur2](Market mk) { return mk.canTrade(cur1) && (cur2.isNeutral() || mk.canTrade(cur2)); });
return std::make_pair(exchange, std::move(ret));
};
_threadPool.parallel_transform(selectedExchanges.begin(), selectedExchanges.end(), marketsPerExchange.begin(),
marketsWithCur);
_threadPool.parallelTransform(selectedExchanges.begin(), selectedExchanges.end(), marketsPerExchange.begin(),
marketsWithCur);
return marketsPerExchange;
}

Expand All @@ -315,7 +315,7 @@ UniquePublicSelectedExchanges ExchangesOrchestrator::getExchangesTradingCurrency
bool shouldBeWithdrawable) {
UniquePublicSelectedExchanges selectedExchanges = _exchangeRetriever.selectOneAccount(exchangeNames);
std::array<bool, kNbSupportedExchanges> isCurrencyTradablePerExchange;
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), isCurrencyTradablePerExchange.begin(),
[currencyCode, shouldBeWithdrawable](Exchange *exchange) {
CurrencyExchangeFlatSet currencies = exchange->queryTradableCurrencies();
Expand All @@ -332,9 +332,8 @@ UniquePublicSelectedExchanges ExchangesOrchestrator::getExchangesTradingMarket(M
ExchangeNameSpan exchangeNames) {
UniquePublicSelectedExchanges selectedExchanges = _exchangeRetriever.selectOneAccount(exchangeNames);
std::array<bool, kNbSupportedExchanges> isMarketTradablePerExchange;
_threadPool.parallel_transform(selectedExchanges.begin(), selectedExchanges.end(),
isMarketTradablePerExchange.begin(),
[mk](Exchange *exchange) { return exchange->queryTradableMarkets().contains(mk); });
_threadPool.parallelTransform(selectedExchanges.begin(), selectedExchanges.end(), isMarketTradablePerExchange.begin(),
[mk](Exchange *exchange) { return exchange->queryTradableMarkets().contains(mk); });

// Erases Exchanges which do not propose asked market
FilterVector(selectedExchanges, isMarketTradablePerExchange);
Expand Down Expand Up @@ -426,7 +425,7 @@ TradedAmountsPerExchange LaunchAndCollectTrades(ThreadPool &threadPool, Exchange
ExchangeAmountMarketsPathVector::iterator last, CurrencyCode toCurrency,
const TradeOptions &tradeOptions) {
TradedAmountsPerExchange tradeAmountsPerExchange(std::distance(first, last));
threadPool.parallel_transform(first, last, tradeAmountsPerExchange.begin(), [toCurrency, &tradeOptions](auto &tuple) {
threadPool.parallelTransform(first, last, tradeAmountsPerExchange.begin(), [toCurrency, &tradeOptions](auto &tuple) {
Exchange *exchange = std::get<0>(tuple);
return std::make_pair(
exchange, exchange->apiPrivate().trade(std::get<1>(tuple), toCurrency, tradeOptions, std::get<2>(tuple)));
Expand All @@ -438,7 +437,7 @@ template <class Iterator>
TradedAmountsPerExchange LaunchAndCollectTrades(ThreadPool &threadPool, Iterator first, Iterator last,
const TradeOptions &tradeOptions) {
TradedAmountsPerExchange tradeAmountsPerExchange(std::distance(first, last));
threadPool.parallel_transform(first, last, tradeAmountsPerExchange.begin(), [&tradeOptions](auto &tuple) {
threadPool.parallelTransform(first, last, tradeAmountsPerExchange.begin(), [&tradeOptions](auto &tuple) {
Exchange *exchange = std::get<0>(tuple);
return std::make_pair(exchange, exchange->apiPrivate().trade(std::get<1>(tuple), std::get<2>(tuple), tradeOptions,
std::get<3>(tuple)));
Expand Down Expand Up @@ -718,11 +717,11 @@ TradedAmountsVectorWithFinalAmountPerExchange ExchangesOrchestrator::dustSweeper
_exchangeRetriever.select(ExchangeRetriever::Order::kInitial, privateExchangeNames);

TradedAmountsVectorWithFinalAmountPerExchange ret(selExchanges.size());
_threadPool.parallel_transform(selExchanges.begin(), selExchanges.end(), ret.begin(),
[currencyCode](Exchange *exchange) {
return std::make_pair(static_cast<const Exchange *>(exchange),
exchange->apiPrivate().queryDustSweeper(currencyCode));
});
_threadPool.parallelTransform(selExchanges.begin(), selExchanges.end(), ret.begin(),
[currencyCode](Exchange *exchange) {
return std::make_pair(static_cast<const Exchange *>(exchange),
exchange->apiPrivate().queryDustSweeper(currencyCode));
});

return ret;
}
Expand All @@ -747,8 +746,8 @@ DeliveredWithdrawInfoWithExchanges ExchangesOrchestrator::withdraw(MonetaryAmoun
throw exception("Cannot withdraw to the same account");
}
std::array<CurrencyExchangeFlatSet, 2> currencyExchangeSets;
_threadPool.parallel_transform(exchangePair.begin(), exchangePair.end(), currencyExchangeSets.begin(),
[](Exchange *exchange) { return exchange->queryTradableCurrencies(); });
_threadPool.parallelTransform(exchangePair.begin(), exchangePair.end(), currencyExchangeSets.begin(),
[](Exchange *exchange) { return exchange->queryTradableCurrencies(); });

DeliveredWithdrawInfoWithExchanges ret{{&fromExchange, &toExchange}, DeliveredWithdrawInfo{}};

Expand Down Expand Up @@ -783,10 +782,10 @@ MonetaryAmountPerExchange ExchangesOrchestrator::getWithdrawFees(CurrencyCode cu
UniquePublicSelectedExchanges selectedExchanges = getExchangesTradingCurrency(currencyCode, exchangeNames, true);

MonetaryAmountPerExchange withdrawFeePerExchange(selectedExchanges.size());
_threadPool.parallel_transform(selectedExchanges.begin(), selectedExchanges.end(), withdrawFeePerExchange.begin(),
[currencyCode](Exchange *exchange) {
return std::make_pair(exchange, exchange->queryWithdrawalFee(currencyCode));
});
_threadPool.parallelTransform(selectedExchanges.begin(), selectedExchanges.end(), withdrawFeePerExchange.begin(),
[currencyCode](Exchange *exchange) {
return std::make_pair(exchange, exchange->queryWithdrawalFee(currencyCode));
});
return withdrawFeePerExchange;
}

Expand All @@ -796,7 +795,7 @@ MonetaryAmountPerExchange ExchangesOrchestrator::getLast24hTradedVolumePerExchan
UniquePublicSelectedExchanges selectedExchanges = getExchangesTradingMarket(mk, exchangeNames);

MonetaryAmountPerExchange tradedVolumePerExchange(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), tradedVolumePerExchange.begin(),
[mk](Exchange *exchange) { return std::make_pair(exchange, exchange->queryLast24hVolume(mk)); });
return tradedVolumePerExchange;
Expand All @@ -809,7 +808,7 @@ LastTradesPerExchange ExchangesOrchestrator::getLastTradesPerExchange(Market mk,
UniquePublicSelectedExchanges selectedExchanges = getExchangesTradingMarket(mk, exchangeNames);

LastTradesPerExchange ret(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), ret.begin(), [mk, nbLastTrades](Exchange *exchange) {
return std::make_pair(static_cast<const Exchange *>(exchange), exchange->queryLastTrades(mk, nbLastTrades));
});
Expand All @@ -822,7 +821,7 @@ MonetaryAmountPerExchange ExchangesOrchestrator::getLastPricePerExchange(Market
UniquePublicSelectedExchanges selectedExchanges = getExchangesTradingMarket(mk, exchangeNames);

MonetaryAmountPerExchange lastPricePerExchange(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), lastPricePerExchange.begin(),
[mk](Exchange *exchange) { return std::make_pair(exchange, exchange->queryLastPrice(mk)); });
return lastPricePerExchange;
Expand Down
2 changes: 2 additions & 0 deletions src/tech/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ add_unit_test(
add_unit_test(
threadpool_test
test/threadpool_test.cpp
DEFINITIONS
CCT_DISABLE_SPDLOG
)

add_unit_test(
Expand Down
Loading