Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/tezos synchronizer #742

Open
wants to merge 5 commits into
base: nrt_tezos
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 75 additions & 25 deletions core/src/async/algorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,41 +34,91 @@
#include "Future.hpp"
#include <utils/ImmediateExecutionContext.hpp>

#include <tuple>

namespace ledger {
namespace core {
namespace async {

namespace internals {

template <class T>
Future<Unit> sequence_go(const std::shared_ptr<api::ExecutionContext>& context, int index, const std::vector< Future<T> >& futures, std::vector<T>* buffer) {
if (index >= futures.size())
return Future<Unit>::successful(unit);
else {
Future<T> fut = futures[index];
return fut.template flatMap<Unit>(context, [context, buffer, index, futures](const T &r) -> Future<Unit> {
buffer->push_back(r);
return sequence_go(context, index + 1, futures, buffer);
});
}
namespace core {
namespace async {

namespace internals {

template <class T>
Future<Unit> sequence_go(const std::shared_ptr<api::ExecutionContext>& context, int index, const std::vector<Future<T>>& futures, std::vector<T>* buffer)
{
if (index >= futures.size()) {
return Future<Unit>::successful(unit);
} else {
Future<T> fut = futures[index];
return fut.template flatMap<Unit>(context, [context, buffer, index, futures](const T &r) -> Future<Unit> {
buffer->push_back(r);
return sequence_go(context, index + 1, futures, buffer);
});
}
}

} // namespace internals

template <class T>
Future<std::vector<T>> sequence(const std::shared_ptr<api::ExecutionContext>& context, const std::vector<Future<T>>& futures)
{
auto buffer = new std::vector<T>();
return internals::sequence_go<T>(context, 0, futures, buffer).template map<std::vector<T>>(context, [buffer] (const Unit&) -> std::vector<T> {
auto res = *buffer;
delete buffer;
return res;
});
}

template <class T>
Future< std::vector<T> > sequence(const std::shared_ptr<api::ExecutionContext>& context, const std::vector< Future<T> >& futures) {
auto buffer = new std::vector<T>();
return internals::sequence_go<T>(context, 0, futures, buffer).template map< std::vector<T> >(context, [buffer] (const Unit&) -> std::vector<T> {
auto res = *buffer;
delete buffer;
return res;
namespace internals {

template<std::size_t i, std::size_t size, typename... T>
struct sequence_t
{
static auto go(
const std::shared_ptr<api::ExecutionContext>& context,
std::tuple<Future<T>...>& tuple_futs,
std::tuple<T...>* tuple) -> Future<Unit>
{
return std::get<i>(tuple_futs)
.template flatMap<Unit>(context, [context, tuple_futs, tuple](const auto& r) mutable {
std::get<i>(*tuple) = r;
return sequence_t<i + 1, size, T...>::go(context, tuple_futs, tuple);
});
}
}
};

}
template<std::size_t size, typename... T>
struct sequence_t<size, size, T...>
{
static auto go(
const std::shared_ptr<api::ExecutionContext>& context,
std::tuple<Future<T>...>& tuple_futs,
std::tuple<T...>* tuple) -> Future<Unit>
{
return Future<Unit>::successful(unit);
}
};

} // namespace internals

template<typename... T>
auto sequence(
const std::shared_ptr<api::ExecutionContext>& context,
std::tuple<Future<T>...>& tuple_futs) -> Future<std::tuple<T...>>
{
auto tuple = new std::tuple<T...>();
return internals::sequence_t<0, sizeof...(T), T...>::go(context, tuple_futs, tuple)
.template map<std::tuple<T...>>(context, [tuple](const Unit&) {
const auto res = *tuple;
delete tuple;
return res;
});
}

} // namespace async
} // namespace core
} // namespace ledger

#define BEGIN_ASYNC_WHILE()

#define ASYNC_FOREACH()
Expand Down
15 changes: 7 additions & 8 deletions core/src/wallet/tezos/TezosLikeAccount.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,15 @@ namespace ledger {
if (transaction.type == api::TezosOperationTag::OPERATION_TAG_ORIGINATION && transaction.status == 1) {
updateOriginatedAccounts(sql, operation);
}
out.push_back(operation);
out.push_back(operation);
result = static_cast<int>(transaction.type);
}

if (_accountAddress == transaction.receiver) {
operation.amount = transaction.value;
operation.type = api::OperationType::RECEIVE;
operation.refreshUid();
out.push_back(operation);
out.push_back(operation);
result = static_cast<int>(transaction.type);
}
}
Expand All @@ -164,13 +164,13 @@ namespace ledger {
auto originatedAccountUid = TezosLikeAccountDatabaseHelper::createOriginatedAccountUid(getAccountUid(), origAccount.address);

const auto found = std::find_if (
_originatedAccounts.begin(),
_originatedAccounts.end(),
[&originatedAccountUid](const std::shared_ptr<api::TezosLikeOriginatedAccount>& element) {
_originatedAccounts.begin(),
_originatedAccounts.end(),
[&originatedAccountUid](const std::shared_ptr<api::TezosLikeOriginatedAccount>& element) {
return std::dynamic_pointer_cast<TezosLikeOriginatedAccount>(element)->getAccountUid() == originatedAccountUid;
});

if (found == _originatedAccounts.end()) {
if (found == _originatedAccounts.end()) {
_originatedAccounts.emplace_back(
std::make_shared<TezosLikeOriginatedAccount>(originatedAccountUid,
origAccount.address,
Expand Down Expand Up @@ -205,10 +205,9 @@ namespace ledger {
if (cachedBalance.hasValue()) {
return FuturePtr<Amount>::successful(std::make_shared<Amount>(cachedBalance.getValue()));
}
std::vector<TezosLikeKeychain::Address> listAddresses{_keychain->getAddress()};
auto currency = getWallet()->getCurrency();
auto self = getSelf();
return _explorer->getBalance(listAddresses).mapPtr<Amount>(getMainExecutionContext(), [self, currency](
return _explorer->getBalance(_keychain->getAddress()).mapPtr<Amount>(getMainExecutionContext(), [self, currency](
const std::shared_ptr<BigInt> &balance) -> std::shared_ptr<Amount> {
Amount b(currency, 0, BigInt(balance->toString()));
self->getWallet()->updateBalanceCache(self->getIndex(), b);
Expand Down
4 changes: 2 additions & 2 deletions core/src/wallet/tezos/TezosLikeAccount.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
#include <wallet/common/AbstractAccount.hpp>
#include <wallet/common/Amount.h>
#include <wallet/tezos/explorers/TezosLikeBlockchainExplorer.h>
#include <wallet/tezos/synchronizers/TezosLikeAccountSynchronizer.h>
#include <wallet/tezos/synchronizers/TezosLikeAccountSynchronizer.hpp>
#include <wallet/tezos/keychains/TezosLikeKeychain.h>
#include <wallet/tezos/database/TezosLikeAccountDatabaseEntry.h>

Expand Down Expand Up @@ -87,7 +87,7 @@ namespace ledger {

void interpretTransaction(const TezosLikeBlockchainExplorerTransaction& transaction,
std::vector<Operation>& out);

Try<int> bulkInsert(const std::vector<Operation>& operations);

void updateOriginatedAccounts(soci::session &sql, const Operation &operation);
Expand Down
81 changes: 10 additions & 71 deletions core/src/wallet/tezos/TezosLikeAccount2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,68 +110,7 @@ namespace ledger {

auto startTime = DateUtils::now();
eventPublisher->postSticky(std::make_shared<Event>(api::EventCode::SYNCHRONIZATION_STARTED, api::DynamicObject::newInstance()), 0);
future.flatMap<BlockchainExplorerAccountSynchronizationResult>(getContext(), [self] (const Try<BlockchainExplorerAccountSynchronizationResult> &result) {
// Synchronize originated accounts ...
// Notes: We should rid of this part by implementing support for fetching
// txs for multiple addresses
// Hint: we could add originated accounts to keychain as
// managedAccounts and getAllObservableAddresses will return them as well
if (self->_originatedAccounts.empty()) {
return Future<BlockchainExplorerAccountSynchronizationResult>::successful(result.getValue());
}
if (result.isFailure()) {
return Future<BlockchainExplorerAccountSynchronizationResult>::successful(BlockchainExplorerAccountSynchronizationResult{});
}
using TxsBulk = TezosLikeBlockchainExplorer::TransactionsBulk;

static std::function<Future<BlockchainExplorerAccountSynchronizationResult> (std::shared_ptr<TezosLikeAccount>, size_t, void*, BlockchainExplorerAccountSynchronizationResult)> getTxs =
[] (const std::shared_ptr<TezosLikeAccount> &account, size_t id, void *session, BlockchainExplorerAccountSynchronizationResult result) {
std::vector<std::string> addresses{account->_originatedAccounts[id]->getAddress()};

// Get offset to not start sync from beginning
auto offset = session ? Future<std::vector<std::shared_ptr<api::Operation>>>::successful(std::vector<std::shared_ptr<api::Operation>>()) :
std::dynamic_pointer_cast<OperationQuery>(
account->_originatedAccounts[id]->queryOperations()->partial()
)->execute();

return offset.flatMap<BlockchainExplorerAccountSynchronizationResult>(account->getContext(), [=] (const std::vector<std::shared_ptr<api::Operation>> &ops) mutable {
// For the moment we start synchro from the beginning
auto getSession = session ? Future<void *>::successful(session) :
account->_explorer->startSession();
return getSession.flatMap<BlockchainExplorerAccountSynchronizationResult>(account->getContext(), [=] (void *s) mutable {
return account->_explorer->getTransactions(addresses, std::to_string(ops.size()), s)
.flatMap<BlockchainExplorerAccountSynchronizationResult>(account->getContext(), [=] (const std::shared_ptr<TxsBulk> &bulk) mutable {
auto uid = TezosLikeAccountDatabaseHelper::createOriginatedAccountUid(account->getAccountUid(), addresses[0]);
{
std::vector<Operation> operations;
for (auto &tx : bulk->transactions) {
tx.originatedAccountUid = uid;
tx.originatedAccountAddress = addresses[0];
account->interpretTransaction(tx, operations);
}
auto f = account->bulkInsert(operations);
if (f.isSuccess()) {
result.newOperations += operations.size();
}
}

if (bulk->hasNext) {
return getTxs(account, id, s, result);
}

if (id == account->_originatedAccounts.size() - 1) {
return Future<BlockchainExplorerAccountSynchronizationResult>::successful(result);
}
return getTxs(account, id + 1, nullptr, result);
}).recover(account->getContext(), [] (const Exception& ex) -> BlockchainExplorerAccountSynchronizationResult {
throw ex;
});
});
});
};
return getTxs(self, 0, nullptr, result.getValue());
}).onComplete(getContext(), [eventPublisher, self, startTime](const auto &result) {

future.onComplete(getContext(), [eventPublisher, self, startTime](const auto &result) {
api::EventCode code;
auto payload = std::make_shared<DynamicObject>();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
Expand Down Expand Up @@ -214,7 +153,7 @@ namespace ledger {
std::cout << "broadcastTransaction: "<< counter->toString() << " / " << txHash << std::endl;
//auto waitingTxs = self->getInternalPreferences()->getStringArray("waiting_counter_txs", {});
//waitingTxs.push_back(txHash);
//self->getInternalPreferences()->editor()->putStringArray("waiting_counter_txs", waitingTxs)->commit();
//self->getInternalPreferences()->editor()->putStringArray("waiting_counter_txs", waitingTxs)->commit();
}

void TezosLikeAccount::_broadcastRawTransaction(const std::vector<uint8_t> &transaction,
Expand Down Expand Up @@ -276,7 +215,7 @@ namespace ledger {
// Check if balance is sufficient
auto currency = self->getWallet()->getCurrency();
auto accountAddress = TezosLikeAddress::fromBase58(senderAddress, currency);
return explorer->getBalance(std::vector<std::shared_ptr<TezosLikeAddress>>{accountAddress}).flatMapPtr<api::TezosLikeTransaction>(
return explorer->getBalance(accountAddress).flatMapPtr<api::TezosLikeTransaction>(
self->getMainExecutionContext(),
[self, request, explorer, accountAddress, currency, senderAddress](const std::shared_ptr<BigInt> &balance) {
// Check if all needed values are set
Expand Down Expand Up @@ -329,8 +268,8 @@ namespace ledger {
};

return setRevealStatus().flatMapPtr<api::TezosLikeTransaction>(self->getMainExecutionContext(), [=] (const Unit &result) {
//initialize the value

//initialize the value
//note that the value will be recalculated for the wipe mode after calculating fees
if (request.type != api::TezosOperationTag::OPERATION_TAG_DELEGATION) {
tx->setValue(request.wipe ? std::make_shared<BigInt>(BigInt::ZERO) : request.value);
Expand Down Expand Up @@ -419,15 +358,15 @@ namespace ledger {
return gasPriceFut.flatMapPtr<TezosLikeTransactionApi>(self->getMainExecutionContext(), [self, filledTx] (const std::shared_ptr<BigInt>&gasPrice) -> FuturePtr<TezosLikeTransactionApi> {
return self->estimateGasLimit(filledTx).flatMapPtr<TezosLikeTransactionApi>(self->getMainExecutionContext(), [filledTx, gasPrice] (const std::shared_ptr<GasLimit> &gas) -> FuturePtr<TezosLikeTransactionApi> {
// 0.000001 comes from the gasPrice->toInt64 being in picoTez

filledTx->setRevealGasLimit(std::make_shared<BigInt>(gas->reveal));
const auto revealFees = std::make_shared<BigInt>(static_cast<int64_t>(1 + static_cast<double>(gas->reveal.toInt64()) * static_cast<double>(gasPrice->toInt64()) * 0.000001));
filledTx->setRevealFees(revealFees);

filledTx->setTransactionGasLimit(std::make_shared<BigInt>(gas->transaction));
const auto transactionFees = std::make_shared<BigInt>(static_cast<int64_t>(1 + static_cast<double>(gas->transaction.toInt64()) * static_cast<double>(gasPrice->toInt64()) * 0.000001));
const auto transactionFees = std::make_shared<BigInt>(static_cast<int64_t>(1 + static_cast<double>(gas->transaction.toInt64()) * static_cast<double>(gasPrice->toInt64()) * 0.000001));
filledTx->setTransactionFees(transactionFees);

return FuturePtr<TezosLikeTransactionApi>::successful(filledTx);
});
});
Expand Down Expand Up @@ -495,14 +434,14 @@ namespace ledger {
//else {
// //keep only not validated counters:
// auto waitingTxs = self->getInternalPreferences()->getStringArray("waiting_counter_txs", {});
// std::cout << "get waiting_counter_txs =";
// std::cout << "get waiting_counter_txs =";
// for (auto& s: waitingTxs) std::cout << s << "-";
// std::cout << std::endl;
// auto waitingTxsSize = waitingCounter - explorerCounter->toInt64();
// if(waitingTxsSize < waitingTxs.size()) {
// waitingTxs = std::vector<std::string>(waitingTxs.end() - waitingTxsSize, waitingTxs.end());
// self->getInternalPreferences()->editor()->putStringArray("waiting_counter_txs", waitingTxs)->commit();
// std::cout << "set waiting_counter_txs =";
// std::cout << "set waiting_counter_txs =";
// for (auto& s: waitingTxs) std::cout << s << "-";
// std::cout << std::endl;
// }
Expand Down
2 changes: 1 addition & 1 deletion core/src/wallet/tezos/TezosLikeWallet.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include <api/TezosLikeWallet.hpp>
#include <wallet/common/AbstractWallet.hpp>
#include <wallet/tezos/explorers/TezosLikeBlockchainExplorer.h>
#include <wallet/tezos/synchronizers/TezosLikeAccountSynchronizer.h>
#include <wallet/tezos/synchronizers/TezosLikeAccountSynchronizer.hpp>
#include <wallet/tezos/factories/TezosLikeWalletFactory.h>
#include <wallet/tezos/factories/TezosLikeKeychainFactory.h>

Expand Down
Loading