From 85c1eaa5f3ade6c3ff6a59cdc2dee5228c96623c Mon Sep 17 00:00:00 2001 From: Aiwe Date: Sat, 4 May 2024 12:48:06 -0500 Subject: [PATCH] Stability & wallet sync enhancements (#223) - Wallet synchronization enhancements - Daemon stability enhancements - Correct application closing --- src/Platform/Linux/System/Dispatcher.cpp | 40 ++-- src/Platform/Linux/System/Dispatcher.h | 5 +- src/Platform/Linux/System/TcpConnection.cpp | 22 +- src/Platform/Linux/System/TcpConnection.h | 3 +- src/Platform/Linux/System/TcpConnector.cpp | 5 +- src/Platform/Linux/System/TcpConnector.h | 3 +- src/Platform/Linux/System/TcpListener.cpp | 12 +- src/Platform/Linux/System/TcpListener.h | 3 +- src/Platform/Linux/System/Timer.cpp | 12 +- src/Platform/Linux/System/Timer.h | 3 +- src/Platform/OSX/System/Dispatcher.cpp | 62 +++--- src/Platform/OSX/System/Dispatcher.h | 11 +- src/Platform/OSX/System/TcpConnection.cpp | 4 +- src/Platform/OSX/System/TcpConnection.h | 3 +- src/Platform/OSX/System/TcpConnector.cpp | 4 +- src/Platform/OSX/System/TcpConnector.h | 3 +- src/Platform/OSX/System/TcpListener.cpp | 4 +- src/Platform/OSX/System/TcpListener.h | 3 +- src/Platform/Windows/System/Dispatcher.cpp | 44 ++-- src/Platform/Windows/System/Dispatcher.h | 3 +- src/Platform/Windows/System/ErrorMessage.cpp | 7 +- src/Platform/Windows/System/ErrorMessage.h | 3 +- src/Platform/Windows/System/TcpConnection.cpp | 11 +- src/Platform/Windows/System/TcpConnection.h | 3 +- src/Platform/Windows/System/TcpConnector.cpp | 11 +- src/Platform/Windows/System/TcpConnector.h | 3 +- src/Platform/Windows/System/TcpListener.cpp | 12 +- src/Platform/Windows/System/TcpListener.h | 3 +- src/Platform/Windows/System/Timer.cpp | 6 +- src/Platform/Windows/System/Timer.h | 3 +- src/System/ContextGroup.cpp | 7 +- src/Transfers/BlockchainSynchronizer.cpp | 49 ++++- src/Transfers/BlockchainSynchronizer.h | 7 +- src/Transfers/IBlockchainSynchronizer.h | 6 +- src/Transfers/TransfersConsumer.cpp | 192 ++++++++++++------ src/Transfers/TransfersConsumer.h | 9 +- src/Transfers/TransfersContainer.cpp | 135 +++++++----- 37 files changed, 489 insertions(+), 227 deletions(-) diff --git a/src/Platform/Linux/System/Dispatcher.cpp b/src/Platform/Linux/System/Dispatcher.cpp index eb3c842d40..3baa58831f 100644 --- a/src/Platform/Linux/System/Dispatcher.cpp +++ b/src/Platform/Linux/System/Dispatcher.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -17,6 +18,7 @@ #include "Dispatcher.h" #include + #include #include #include @@ -56,8 +58,7 @@ class MutextGuard { static_assert(Dispatcher::SIZEOF_PTHREAD_MUTEX_T == sizeof(pthread_mutex_t), "invalid pthread mutex size"); -//const size_t STACK_SIZE = 64 * 1024; -const size_t STACK_SIZE = 512 * 1024; +const size_t STACK_SIZE = 64 * 1024; }; @@ -104,11 +105,13 @@ Dispatcher::Dispatcher() { } auto result = close(remoteSpawnEvent); + if (result) {} assert(result == 0); } } auto result = close(epoll); + if (result) {} assert(result == 0); } @@ -135,11 +138,13 @@ Dispatcher::~Dispatcher() { while (!timers.empty()) { int result = ::close(timers.top()); + if (result) {} assert(result == 0); timers.pop(); } auto result = close(epoll); + if (result) {} assert(result == 0); result = close(remoteSpawnEvent); assert(result == 0); @@ -172,8 +177,10 @@ void Dispatcher::dispatch() { if (firstResumingContext != nullptr) { context = firstResumingContext; firstResumingContext = context->next; - //assert(context->inExecutionQueue); + + assert(context->inExecutionQueue); context->inExecutionQueue = false; + break; } @@ -256,10 +263,13 @@ bool Dispatcher::interrupted() { void Dispatcher::pushContext(NativeContext* context) { assert(context != nullptr); + if (context->inExecutionQueue) return; + context->next = nullptr; context->inExecutionQueue = true; + if(firstResumingContext != nullptr) { assert(lastResumingContext != nullptr); lastResumingContext->next = context; @@ -330,23 +340,21 @@ void Dispatcher::yield() { } if ((events[i].events & EPOLLOUT) != 0) { - if(contextPair->writeContext != nullptr) { - if(contextPair->writeContext->context != nullptr) { + if (contextPair->writeContext != nullptr) { + if (contextPair->writeContext->context != nullptr) { contextPair->writeContext->context->interruptProcedure = nullptr; } + pushContext(contextPair->writeContext->context); + contextPair->writeContext->events = events[i].events; } - pushContext(contextPair->writeContext->context); - contextPair->writeContext->events = events[i].events; } else if ((events[i].events & EPOLLIN) != 0) { - if(contextPair->readContext != nullptr) { - if(contextPair->readContext->context != nullptr) { + if (contextPair->readContext != nullptr) { + if (contextPair->readContext->context != nullptr) { contextPair->readContext->context->interruptProcedure = nullptr; } + pushContext(contextPair->readContext->context); + contextPair->readContext->events = events[i].events; } - pushContext(contextPair->readContext->context); - contextPair->readContext->events = events[i].events; - } else if ((events[i].events & (EPOLLERR | EPOLLHUP)) != 0) { - throw std::runtime_error("Dispatcher::dispatch, events & (EPOLLERR | EPOLLHUP) != 0"); } else { continue; } @@ -408,7 +416,7 @@ int Dispatcher::getTimer() { if (timers.empty()) { timer = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); epoll_event timerEvent; - timerEvent.events = 0; + timerEvent.events = EPOLLONESHOT; timerEvent.data.ptr = nullptr; if (epoll_ctl(getEpoll(), EPOLL_CTL_ADD, timer, &timerEvent) == -1) { @@ -443,7 +451,7 @@ void Dispatcher::contextProcedure(void* ucontext) { ++runningContextCount; try { context.procedure(); - } catch(std::exception&) { + } catch(...) { } if (context.group != nullptr) { diff --git a/src/Platform/Linux/System/Dispatcher.h b/src/Platform/Linux/System/Dispatcher.h index 8ce5d12d64..cc163c676d 100644 --- a/src/Platform/Linux/System/Dispatcher.h +++ b/src/Platform/Linux/System/Dispatcher.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -19,9 +20,11 @@ #include #include +#include #include #include #include +#include #ifndef __GLIBC__ #include #endif diff --git a/src/Platform/Linux/System/TcpConnection.cpp b/src/Platform/Linux/System/TcpConnection.cpp index 3d091c75a9..4c04e129aa 100644 --- a/src/Platform/Linux/System/TcpConnection.cpp +++ b/src/Platform/Linux/System/TcpConnection.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -48,6 +49,7 @@ TcpConnection::~TcpConnection() { assert(contextPair.readContext == nullptr); assert(contextPair.writeContext == nullptr); int result = close(connection); + if (result) {} assert(result != -1); } } @@ -83,7 +85,10 @@ size_t TcpConnection::read(uint8_t* data, size_t size) { std::string message; ssize_t transferred = ::recv(connection, (void *)data, size, 0); if (transferred == -1) { - if (errno != EAGAIN) { +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wlogical-op" + if (errno != EAGAIN && errno != EWOULDBLOCK) { +#pragma GCC diagnostic pop message = "recv failed, " + lastErrorMessage(); } else { epoll_event connectionEvent; @@ -106,11 +111,11 @@ size_t TcpConnection::read(uint8_t* data, size_t size) { assert(dispatcher != nullptr); assert(contextPair.readContext != nullptr); epoll_event connectionEvent; - connectionEvent.events = 0; + connectionEvent.events = EPOLLONESHOT; connectionEvent.data.ptr = nullptr; if (epoll_ctl(dispatcher->getEpoll(), EPOLL_CTL_MOD, connection, &connectionEvent) == -1) { - throw std::runtime_error("TcpConnection::stop, epoll_ctl failed, " + lastErrorMessage()); + throw std::runtime_error("TcpConnection::read, interrupt procedure, epoll_ctl failed, " + lastErrorMessage()); } contextPair.readContext->interrupted = true; @@ -179,7 +184,10 @@ std::size_t TcpConnection::write(const uint8_t* data, size_t size) { ssize_t transferred = ::send(connection, (void *)data, size, MSG_NOSIGNAL); if (transferred == -1) { - if (errno != EAGAIN) { +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wlogical-op" + if (errno != EAGAIN && errno != EWOULDBLOCK) { +#pragma GCC diagnostic pop message = "send failed, " + lastErrorMessage(); } else { epoll_event connectionEvent; @@ -202,11 +210,11 @@ std::size_t TcpConnection::write(const uint8_t* data, size_t size) { assert(dispatcher != nullptr); assert(contextPair.writeContext != nullptr); epoll_event connectionEvent; - connectionEvent.events = 0; + connectionEvent.events = EPOLLONESHOT; connectionEvent.data.ptr = nullptr; if (epoll_ctl(dispatcher->getEpoll(), EPOLL_CTL_MOD, connection, &connectionEvent) == -1) { - throw std::runtime_error("TcpConnection::stop, epoll_ctl failed, " + lastErrorMessage()); + throw std::runtime_error("TcpConnection::write, interrupt procedure, epoll_ctl failed, " + lastErrorMessage()); } contextPair.writeContext->interrupted = true; diff --git a/src/Platform/Linux/System/TcpConnection.h b/src/Platform/Linux/System/TcpConnection.h index fba62e8878..fb9aa7d925 100644 --- a/src/Platform/Linux/System/TcpConnection.h +++ b/src/Platform/Linux/System/TcpConnection.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/Platform/Linux/System/TcpConnector.cpp b/src/Platform/Linux/System/TcpConnector.cpp index 10aed04693..fc3fbf3868 100644 --- a/src/Platform/Linux/System/TcpConnector.cpp +++ b/src/Platform/Linux/System/TcpConnector.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -144,6 +145,7 @@ TcpConnection TcpConnector::connect(const Ipv4Address& address, uint16_t port) { } else { if((connectorContext.events & (EPOLLERR | EPOLLHUP)) != 0) { int result = close(connection); + if (result) {} assert(result != -1); throw std::runtime_error("TcpConnector::connect, connection failed"); @@ -171,6 +173,7 @@ TcpConnection TcpConnector::connect(const Ipv4Address& address, uint16_t port) { } int result = close(connection); + if (result) {} assert(result != -1); } diff --git a/src/Platform/Linux/System/TcpConnector.h b/src/Platform/Linux/System/TcpConnector.h index 02f312914c..a18ceac651 100644 --- a/src/Platform/Linux/System/TcpConnector.h +++ b/src/Platform/Linux/System/TcpConnector.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/Platform/Linux/System/TcpListener.cpp b/src/Platform/Linux/System/TcpListener.cpp index 2c442b7028..2ee84e5e0d 100644 --- a/src/Platform/Linux/System/TcpListener.cpp +++ b/src/Platform/Linux/System/TcpListener.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -60,7 +61,7 @@ TcpListener::TcpListener(Dispatcher& dispatcher, const Ipv4Address& addr, uint16 message = "listen failed, " + lastErrorMessage(); } else { epoll_event listenEvent; - listenEvent.events = 0; + listenEvent.events = EPOLLONESHOT; listenEvent.data.ptr = nullptr; if (epoll_ctl(dispatcher.getEpoll(), EPOLL_CTL_ADD, listener, &listenEvent) == -1) { @@ -74,6 +75,7 @@ TcpListener::TcpListener(Dispatcher& dispatcher, const Ipv4Address& addr, uint16 } int result = close(listener); + if (result) {} assert(result != -1); } @@ -93,6 +95,7 @@ TcpListener::~TcpListener() { if (dispatcher != nullptr) { assert(context == nullptr); int result = close(listener); + if (result) {} assert(result != -1); } } @@ -145,11 +148,11 @@ TcpConnection TcpListener::accept() { OperationContext* listenerContext = static_cast(context); if (!listenerContext->interrupted) { epoll_event listenEvent; - listenEvent.events = 0; + listenEvent.events = EPOLLONESHOT; listenEvent.data.ptr = nullptr; if (epoll_ctl(dispatcher->getEpoll(), EPOLL_CTL_MOD, listener, &listenEvent) == -1) { - throw std::runtime_error("TcpListener::stop, epoll_ctl failed, " + lastErrorMessage() ); + throw std::runtime_error("TcpListener::accept, interrupt procedure, epoll_ctl failed, " + lastErrorMessage() ); } listenerContext->interrupted = true; @@ -187,6 +190,7 @@ TcpConnection TcpListener::accept() { } int result = close(connection); + if (result) {} assert(result != -1); } } diff --git a/src/Platform/Linux/System/TcpListener.h b/src/Platform/Linux/System/TcpListener.h index fda8e09aac..aba31713d1 100644 --- a/src/Platform/Linux/System/TcpListener.h +++ b/src/Platform/Linux/System/TcpListener.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/Platform/Linux/System/Timer.cpp b/src/Platform/Linux/System/Timer.cpp index ee5641c3e1..a0dba5c13c 100644 --- a/src/Platform/Linux/System/Timer.cpp +++ b/src/Platform/Linux/System/Timer.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -102,11 +103,14 @@ void Timer::sleep(std::chrono::nanoseconds duration) { if (!timerContext->interrupted) { uint64_t value = 0; if(::read(timer, &value, sizeof value) == -1 ){ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wlogical-op" if(errno == EAGAIN || errno == EWOULDBLOCK) { +#pragma GCC diagnostic pop timerContext->interrupted = true; dispatcher->pushContext(timerContext->context); } else { - throw std::runtime_error("Timer::interrupt, read failed, " + lastErrorMessage()); + throw std::runtime_error("Timer::sleep, interrupt procedure, read failed, " + lastErrorMessage()); } } else { assert(value>0); @@ -114,11 +118,11 @@ void Timer::sleep(std::chrono::nanoseconds duration) { } epoll_event timerEvent; - timerEvent.events = 0; + timerEvent.events = EPOLLONESHOT; timerEvent.data.ptr = nullptr; if (epoll_ctl(dispatcher->getEpoll(), EPOLL_CTL_MOD, timer, &timerEvent) == -1) { - throw std::runtime_error("Timer::interrupt, epoll_ctl failed, " + lastErrorMessage()); + throw std::runtime_error("Timer::sleep, interrupt procedure, epoll_ctl failed, " + lastErrorMessage()); } } }; diff --git a/src/Platform/Linux/System/Timer.h b/src/Platform/Linux/System/Timer.h index b156f8b695..ad394a49f7 100644 --- a/src/Platform/Linux/System/Timer.h +++ b/src/Platform/Linux/System/Timer.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/Platform/OSX/System/Dispatcher.cpp b/src/Platform/OSX/System/Dispatcher.cpp index 1d29687b12..8b5c72d297 100644 --- a/src/Platform/OSX/System/Dispatcher.cpp +++ b/src/Platform/OSX/System/Dispatcher.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -17,6 +18,7 @@ #include "Dispatcher.h" #include +#include #include #include #include @@ -55,8 +57,8 @@ class MutextGuard { pthread_mutex_t& mutex; }; -//const size_t STACK_SIZE = 64 * 1024; -const size_t STACK_SIZE = 512 * 1024; +const size_t STACK_SIZE = 64 * 1024; + } static_assert(Dispatcher::SIZEOF_PTHREAD_MUTEX_T == sizeof(pthread_mutex_t), "invalid pthread mutex size"); @@ -80,16 +82,16 @@ Dispatcher::Dispatcher() : lastCreatedTimer(0) { message = "pthread_mutex_init failed, " + lastErrorMessage(); } else { remoteSpawned = false; - + mainContext.interrupted = false; mainContext.group = &contextGroup; mainContext.groupPrev = nullptr; mainContext.groupNext = nullptr; + mainContext.inExecutionQueue = false; contextGroup.firstContext = nullptr; contextGroup.lastContext = nullptr; contextGroup.firstWaiter = nullptr; contextGroup.lastWaiter = nullptr; - mainContext.inExecutionQueue = false; currentContext = &mainContext; firstResumingContext = nullptr; firstReusableContext = nullptr; @@ -100,6 +102,7 @@ Dispatcher::Dispatcher() : lastCreatedTimer(0) { } auto result = close(kqueue); + if (result) {} assert(result == 0); } @@ -123,8 +126,9 @@ Dispatcher::~Dispatcher() { delete[] stackPtr; delete ucontext; } - + auto result = close(kqueue); + if (result) {} assert(result != -1); result = pthread_mutex_destroy(reinterpret_cast(this->mutex)); assert(result != -1); @@ -146,11 +150,13 @@ void Dispatcher::dispatch() { if (firstResumingContext != nullptr) { context = firstResumingContext; firstResumingContext = context->next; - //assert(context->inExecutionQueue); + + assert(context->inExecutionQueue); context->inExecutionQueue = false; + break; } - + if(remoteSpawned.load() == true) { MutextGuard guard(*reinterpret_cast(this->mutex)); while (!remoteSpawningProcedures.empty()) { @@ -240,8 +246,10 @@ bool Dispatcher::interrupted() { void Dispatcher::pushContext(NativeContext* context) { assert(context!=nullptr); + if (context->inExecutionQueue) return; + context->next = nullptr; context->inExecutionQueue = true; if (firstResumingContext != nullptr) { @@ -307,7 +315,7 @@ void Dispatcher::yield() { if (events[i].filter == EVFILT_USER && events[i].ident == 0) { EV_SET(&updates[updatesCounter++], 0, EVFILT_USER, EV_ADD | EV_DISABLE, NOTE_FFNOP, 0, NULL); - + MutextGuard guard(*reinterpret_cast(this->mutex)); while (!remoteSpawningProcedures.empty()) { spawn(std::move(remoteSpawningProcedures.front())); @@ -343,24 +351,24 @@ int Dispatcher::getKqueue() const { NativeContext& Dispatcher::getReusableContext() { if(firstReusableContext == nullptr) { - uctx* newlyCreatedContext = new uctx; - uint8_t* stackPointer = new uint8_t[STACK_SIZE]; - static_cast(newlyCreatedContext)->uc_stack.ss_sp = stackPointer; - static_cast(newlyCreatedContext)->uc_stack.ss_size = STACK_SIZE; - - ContextMakingData makingData{ newlyCreatedContext, this}; - makecontext(static_cast(newlyCreatedContext), reinterpret_cast(contextProcedureStatic), reinterpret_cast(&makingData)); - - uctx* oldContext = static_cast(currentContext->uctx); - if (swapcontext(oldContext, newlyCreatedContext) == -1) { - throw std::runtime_error("Dispatcher::getReusableContext, swapcontext failed, " + lastErrorMessage()); - } - - assert(firstReusableContext != nullptr); - assert(firstReusableContext->uctx == newlyCreatedContext); - firstReusableContext->stackPtr = stackPointer; + uctx* newlyCreatedContext = new uctx; + uint8_t* stackPointer = new uint8_t[STACK_SIZE]; + static_cast(newlyCreatedContext)->uc_stack.ss_sp = stackPointer; + static_cast(newlyCreatedContext)->uc_stack.ss_size = STACK_SIZE; + + ContextMakingData makingData{ newlyCreatedContext, this}; + makecontext(static_cast(newlyCreatedContext), reinterpret_cast(contextProcedureStatic), reinterpret_cast(&makingData)); + + uctx* oldContext = static_cast(currentContext->uctx); + if (swapcontext(oldContext, newlyCreatedContext) == -1) { + throw std::runtime_error("Dispatcher::getReusableContext, swapcontext failed, " + lastErrorMessage()); + } + + assert(firstReusableContext != nullptr); + assert(firstReusableContext->uctx == newlyCreatedContext); + firstReusableContext->stackPtr = stackPointer; } - + NativeContext* context = firstReusableContext; firstReusableContext = firstReusableContext->next; return *context; @@ -405,7 +413,7 @@ void Dispatcher::contextProcedure(void* ucontext) { ++runningContextCount; try { context.procedure(); - } catch(std::exception&) { + } catch(...) { } if (context.group != nullptr) { diff --git a/src/Platform/OSX/System/Dispatcher.h b/src/Platform/OSX/System/Dispatcher.h index 6c7ccb8b47..7324528f30 100644 --- a/src/Platform/OSX/System/Dispatcher.h +++ b/src/Platform/OSX/System/Dispatcher.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -19,9 +20,11 @@ #include #include +#include #include #include #include +#include namespace System { @@ -74,7 +77,11 @@ class Dispatcher { int getTimer(); void pushTimer(int timer); -static const int SIZEOF_PTHREAD_MUTEX_T = sizeof(pthread_mutex_t); +#ifdef __LP64__ + static const int SIZEOF_PTHREAD_MUTEX_T = 56 + sizeof(long); +#else + static const int SIZEOF_PTHREAD_MUTEX_T = 40 + sizeof(long); +#endif private: void spawn(std::function&& procedure); diff --git a/src/Platform/OSX/System/TcpConnection.cpp b/src/Platform/OSX/System/TcpConnection.cpp index c7cd406d47..9ce6d5dd5a 100644 --- a/src/Platform/OSX/System/TcpConnection.cpp +++ b/src/Platform/OSX/System/TcpConnection.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -50,6 +51,7 @@ TcpConnection::~TcpConnection() { assert(readContext == nullptr); assert(writeContext == nullptr); int result = close(connection); + if (result) {} assert(result != -1); } } diff --git a/src/Platform/OSX/System/TcpConnection.h b/src/Platform/OSX/System/TcpConnection.h index 5ed4d8dd09..1ed2de5b1e 100644 --- a/src/Platform/OSX/System/TcpConnection.h +++ b/src/Platform/OSX/System/TcpConnection.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/Platform/OSX/System/TcpConnector.cpp b/src/Platform/OSX/System/TcpConnector.cpp index 767fbc7c4d..e737950604 100644 --- a/src/Platform/OSX/System/TcpConnector.cpp +++ b/src/Platform/OSX/System/TcpConnector.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -165,6 +166,7 @@ TcpConnection TcpConnector::connect(const Ipv4Address& address, uint16_t port) { } int result = close(connection); + if (result) {} assert(result != -1);; } diff --git a/src/Platform/OSX/System/TcpConnector.h b/src/Platform/OSX/System/TcpConnector.h index 02f312914c..a18ceac651 100644 --- a/src/Platform/OSX/System/TcpConnector.h +++ b/src/Platform/OSX/System/TcpConnector.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/Platform/OSX/System/TcpListener.cpp b/src/Platform/OSX/System/TcpListener.cpp index efa9ae7c8d..6ea5539116 100644 --- a/src/Platform/OSX/System/TcpListener.cpp +++ b/src/Platform/OSX/System/TcpListener.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -95,6 +96,7 @@ TcpListener::~TcpListener() { if (dispatcher != nullptr) { assert(context == nullptr); int result = close(listener); + if (result) {} assert(result != -1); } } diff --git a/src/Platform/OSX/System/TcpListener.h b/src/Platform/OSX/System/TcpListener.h index 9cbb843d52..70f652654a 100644 --- a/src/Platform/OSX/System/TcpListener.h +++ b/src/Platform/OSX/System/TcpListener.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/Platform/Windows/System/Dispatcher.cpp b/src/Platform/Windows/System/Dispatcher.cpp index 1e0ff4c501..0dd736534e 100644 --- a/src/Platform/Windows/System/Dispatcher.cpp +++ b/src/Platform/Windows/System/Dispatcher.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -93,7 +94,7 @@ Dispatcher::Dispatcher() { } Dispatcher::~Dispatcher() { - //assert(GetCurrentThreadId() == threadId); + assert(GetCurrentThreadId() == threadId); for (NativeContext* context = contextGroup.firstContext; context != nullptr; context = context->groupNext) { interrupt(context); } @@ -120,7 +121,7 @@ Dispatcher::~Dispatcher() { } void Dispatcher::clear() { - //assert(GetCurrentThreadId() == threadId); + assert(GetCurrentThreadId() == threadId); while (firstReusableContext != nullptr) { void* fiber = firstReusableContext->fiber; firstReusableContext = firstReusableContext->next; @@ -129,9 +130,18 @@ void Dispatcher::clear() { } void Dispatcher::dispatch() { - //assert(GetCurrentThreadId() == threadId); + assert(GetCurrentThreadId() == threadId); NativeContext* context; for (;;) { + if (firstResumingContext != nullptr) { + context = firstResumingContext; + firstResumingContext = context->next; + + assert(context->inExecutionQueue); + context->inExecutionQueue = false; + + break; + } LARGE_INTEGER frequency; LARGE_INTEGER ticks; @@ -148,8 +158,10 @@ void Dispatcher::dispatch() { if (firstResumingContext != nullptr) { context = firstResumingContext; firstResumingContext = context->next; - //assert(context->inExecutionQueue); + + assert(context->inExecutionQueue); context->inExecutionQueue = false; + break; } @@ -192,7 +204,7 @@ void Dispatcher::dispatch() { } NativeContext* Dispatcher::getCurrentContext() const { - //assert(GetCurrentThreadId() == threadId); + assert(GetCurrentThreadId() == threadId); return currentContext; } @@ -201,7 +213,7 @@ void Dispatcher::interrupt() { } void Dispatcher::interrupt(NativeContext* context) { - //assert(GetCurrentThreadId() == threadId); + assert(GetCurrentThreadId() == threadId); assert(context != nullptr); if (!context->interrupted) { if (context->interruptProcedure != nullptr) { @@ -223,11 +235,13 @@ bool Dispatcher::interrupted() { } void Dispatcher::pushContext(NativeContext* context) { - //assert(GetCurrentThreadId() == threadId); + assert(GetCurrentThreadId() == threadId); assert(context != nullptr); + if (context->inExecutionQueue) { return; } + context->next = nullptr; context->inExecutionQueue = true; if (firstResumingContext != nullptr) { @@ -255,7 +269,7 @@ void Dispatcher::remoteSpawn(std::function&& procedure) { } void Dispatcher::spawn(std::function&& procedure) { - //assert(GetCurrentThreadId() == threadId); + assert(GetCurrentThreadId() == threadId); NativeContext* context = &getReusableContext(); if (contextGroup.firstContext != nullptr) { context->groupPrev = contextGroup.lastContext; @@ -276,7 +290,7 @@ void Dispatcher::spawn(std::function&& procedure) { } void Dispatcher::yield() { - //assert(GetCurrentThreadId() == threadId); + assert(GetCurrentThreadId() == threadId); for (;;) { LARGE_INTEGER frequency; LARGE_INTEGER ticks; @@ -331,7 +345,7 @@ void Dispatcher::yield() { } void Dispatcher::addTimer(uint64_t time, NativeContext* context) { - //assert(GetCurrentThreadId() == threadId); + assert(GetCurrentThreadId() == threadId); timers.insert(std::make_pair(time, context)); } @@ -363,10 +377,12 @@ void Dispatcher::pushReusableContext(NativeContext& context) { } void Dispatcher::interruptTimer(uint64_t time, NativeContext* context) { - //assert(GetCurrentThreadId() == threadId); + assert(GetCurrentThreadId() == threadId); + if (context->inExecutionQueue) { return; } + auto range = timers.equal_range(time); for (auto it = range.first; ; ++it) { assert(it != range.second); @@ -379,7 +395,7 @@ void Dispatcher::interruptTimer(uint64_t time, NativeContext* context) { } void Dispatcher::contextProcedure() { - //assert(GetCurrentThreadId() == threadId); + assert(GetCurrentThreadId() == threadId); assert(firstReusableContext == nullptr); NativeContext context; context.interrupted = false; @@ -391,7 +407,7 @@ void Dispatcher::contextProcedure() { ++runningContextCount; try { context.procedure(); - } catch (std::exception&) { + } catch (...) { } if (context.group != nullptr) { diff --git a/src/Platform/Windows/System/Dispatcher.h b/src/Platform/Windows/System/Dispatcher.h index 6b45a59750..338c684012 100644 --- a/src/Platform/Windows/System/Dispatcher.h +++ b/src/Platform/Windows/System/Dispatcher.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/Platform/Windows/System/ErrorMessage.cpp b/src/Platform/Windows/System/ErrorMessage.cpp index 9108525248..281216ba08 100755 --- a/src/Platform/Windows/System/ErrorMessage.cpp +++ b/src/Platform/Windows/System/ErrorMessage.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -17,6 +18,10 @@ #include "ErrorMessage.h" +#ifndef NOMINMAX +#define NOMINMAX +#endif + #include #include diff --git a/src/Platform/Windows/System/ErrorMessage.h b/src/Platform/Windows/System/ErrorMessage.h index 6babf659dc..49c57916e9 100644 --- a/src/Platform/Windows/System/ErrorMessage.h +++ b/src/Platform/Windows/System/ErrorMessage.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/Platform/Windows/System/TcpConnection.cpp b/src/Platform/Windows/System/TcpConnection.cpp index 3a661549ec..89613c2d84 100644 --- a/src/Platform/Windows/System/TcpConnection.cpp +++ b/src/Platform/Windows/System/TcpConnection.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -141,6 +142,10 @@ size_t TcpConnection::read(uint8_t* data, size_t size) { throw InterruptedException(); } + if (context.interrupted) { + throw InterruptedException(); + } + assert(transferred <= size); assert(flags == 0); return transferred; @@ -210,6 +215,10 @@ size_t TcpConnection::write(const uint8_t* data, size_t size) { throw InterruptedException(); } + if (context.interrupted) { + throw InterruptedException(); + } + assert(transferred == size); assert(flags == 0); return transferred; diff --git a/src/Platform/Windows/System/TcpConnection.h b/src/Platform/Windows/System/TcpConnection.h index 9cc873170b..4360ebfe9f 100644 --- a/src/Platform/Windows/System/TcpConnection.h +++ b/src/Platform/Windows/System/TcpConnection.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/Platform/Windows/System/TcpConnector.cpp b/src/Platform/Windows/System/TcpConnector.cpp index fff0ec1706..9842f78a4a 100644 --- a/src/Platform/Windows/System/TcpConnector.cpp +++ b/src/Platform/Windows/System/TcpConnector.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -158,6 +159,14 @@ TcpConnection TcpConnector::connect(const Ipv4Address& address, uint16_t port) { } } } else { + if (context2.interrupted) { + if (closesocket(connection) != 0) { + throw std::runtime_error("TcpConnector::connect, closesocket failed, " + errorMessage(WSAGetLastError())); + } else { + throw InterruptedException(); + } + } + assert(transferred == 0); assert(flags == 0); DWORD value = 1; diff --git a/src/Platform/Windows/System/TcpConnector.h b/src/Platform/Windows/System/TcpConnector.h index c00b0b93b4..cdb232dde8 100644 --- a/src/Platform/Windows/System/TcpConnector.h +++ b/src/Platform/Windows/System/TcpConnector.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/Platform/Windows/System/TcpListener.cpp b/src/Platform/Windows/System/TcpListener.cpp index 76c9748a2f..4650e6abbb 100644 --- a/src/Platform/Windows/System/TcpListener.cpp +++ b/src/Platform/Windows/System/TcpListener.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -183,6 +184,15 @@ TcpConnection TcpListener::accept() { } } } else { + if (context2.interrupted) { + if (closesocket(connection) != 0) { + throw std::runtime_error("TcpConnector::connect, closesocket failed, " + errorMessage(WSAGetLastError())); + } + else { + throw InterruptedException(); + } + } + assert(transferred == 0); assert(flags == 0); if (setsockopt(connection, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, reinterpret_cast(&listener), sizeof(listener)) != 0) { diff --git a/src/Platform/Windows/System/TcpListener.h b/src/Platform/Windows/System/TcpListener.h index 4945af7e94..47921f9875 100644 --- a/src/Platform/Windows/System/TcpListener.h +++ b/src/Platform/Windows/System/TcpListener.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/Platform/Windows/System/Timer.cpp b/src/Platform/Windows/System/Timer.cpp index f41c870180..e9a6be71db 100644 --- a/src/Platform/Windows/System/Timer.cpp +++ b/src/Platform/Windows/System/Timer.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -21,6 +22,9 @@ #ifndef WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN #endif +#ifndef NOMINMAX +#define NOMINMAX +#endif #include #include #include "Dispatcher.h" diff --git a/src/Platform/Windows/System/Timer.h b/src/Platform/Windows/System/Timer.h index 5b260113b0..819aa7c9bf 100644 --- a/src/Platform/Windows/System/Timer.h +++ b/src/Platform/Windows/System/Timer.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // diff --git a/src/System/ContextGroup.cpp b/src/System/ContextGroup.cpp index 4546dc27cf..a71c2aabf1 100755 --- a/src/System/ContextGroup.cpp +++ b/src/System/ContextGroup.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -83,6 +84,10 @@ void ContextGroup::wait() { if (contextGroup.firstContext != nullptr) { NativeContext* context = dispatcher->getCurrentContext(); context->next = nullptr; + + assert(!context->inExecutionQueue); + context->inExecutionQueue = true; + if (contextGroup.firstWaiter != nullptr) { assert(contextGroup.lastWaiter->next == nullptr); contextGroup.lastWaiter->next = context; diff --git a/src/Transfers/BlockchainSynchronizer.cpp b/src/Transfers/BlockchainSynchronizer.cpp index f9735d4d89..571a353084 100644 --- a/src/Transfers/BlockchainSynchronizer.cpp +++ b/src/Transfers/BlockchainSynchronizer.cpp @@ -250,7 +250,7 @@ bool BlockchainSynchronizer::setFutureStateIf(State s, std::function void BlockchainSynchronizer::actualizeFutureState() { std::unique_lock lk(m_stateMutex); - if (m_currentState == State::stopped && m_futureState == State::deleteOldTxs) { // start(), immideately attach observer + if (m_currentState == State::stopped && (m_futureState == State::deleteOldTxs || m_futureState == State::blockchainSync)) { // start(), immideately attach observer m_node.addObserver(this); } @@ -352,7 +352,15 @@ void BlockchainSynchronizer::start() { throw std::runtime_error(message); } - if (!setFutureStateIf(State::deleteOldTxs, [this] { return m_currentState == State::stopped && m_futureState == State::stopped; })) { + State nextState; + if (!wasStarted) { + nextState = State::deleteOldTxs; + wasStarted = true; + } else { + nextState = State::blockchainSync; + } + + if (!setFutureStateIf(nextState, [this] { return m_currentState == State::stopped && m_futureState == State::stopped; })) { auto message = "Failed to start: already started"; m_logger(ERROR, BRIGHT_RED) << message; throw std::runtime_error(message); @@ -499,7 +507,6 @@ void BlockchainSynchronizer::processBlocks(GetBlocksResponse& response) { CompleteBlock completeBlock; completeBlock.blockHash = block.blockHash; - interval.blocks.push_back(completeBlock.blockHash); if (block.hasBlock) { completeBlock.block = std::move(block.block); completeBlock.transactions.push_back(createTransactionPrefix(completeBlock.block->baseTransaction)); @@ -516,6 +523,7 @@ void BlockchainSynchronizer::processBlocks(GetBlocksResponse& response) { } } + interval.blocks.push_back(completeBlock.blockHash); blocks.push_back(std::move(completeBlock)); } @@ -534,7 +542,7 @@ void BlockchainSynchronizer::processBlocks(GetBlocksResponse& response) { break; case UpdateConsumersResult::nothingChanged: - if (m_node.getLastKnownBlockHeight() != m_node.getLastLocalBlockHeight()) { + if (m_node.getKnownBlockCount() != m_node.getLocalBlockCount()) { m_logger(DEBUGGING) << "Blockchain updated, resume blockchain synchronization"; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } else { @@ -564,8 +572,12 @@ void BlockchainSynchronizer::processBlocks(GetBlocksResponse& response) { /// \pre m_consumersMutex is locked BlockchainSynchronizer::UpdateConsumersResult BlockchainSynchronizer::updateConsumers(const BlockchainInterval& interval, const std::vector& blocks) { + assert(interval.blocks.size() == blocks.size()); + bool smthChanged = false; + bool hasErrors = false; + uint32_t lastBlockIndex = std::numeric_limits::max(); for (auto& kv : m_consumers) { auto result = kv.second->checkInterval(interval); @@ -577,21 +589,40 @@ BlockchainSynchronizer::UpdateConsumersResult BlockchainSynchronizer::updateCons if (result.hasNewBlocks) { uint32_t startOffset = result.newBlockHeight - interval.startHeight; - // update consumer uint32_t blockCount = static_cast(blocks.size()) - startOffset; + // update consumer m_logger(DEBUGGING) << "Adding blocks to consumer, consumer " << kv.first << ", start index " << result.newBlockHeight << ", count " << blockCount; - if (kv.first->onNewBlocks(blocks.data() + startOffset, result.newBlockHeight, blockCount)) { + uint32_t addedCount = kv.first->onNewBlocks(blocks.data() + startOffset, result.newBlockHeight, blockCount); + if (addedCount > 0) { + if (addedCount < blockCount) { + m_logger(ERROR, BRIGHT_RED) << "Failed to add " << (blockCount - addedCount) << " blocks of " << blockCount << " to consumer, consumer " << kv.first; + hasErrors = true; + } + // update state if consumer succeeded - kv.second->addBlocks(interval.blocks.data() + startOffset, result.newBlockHeight, static_cast(interval.blocks.size()) - startOffset); + kv.second->addBlocks(interval.blocks.data() + startOffset, result.newBlockHeight, addedCount); smthChanged = true; } else { m_logger(ERROR, BRIGHT_RED) << "Failed to add blocks to consumer, consumer " << kv.first; - return UpdateConsumersResult::errorOccurred; + hasErrors = true; + } + + if (addedCount > 0) { + lastBlockIndex = std::min(lastBlockIndex, startOffset + addedCount - 1); } } } - if (smthChanged) { + if (lastBlockIndex != std::numeric_limits::max()) { + assert(lastBlockIndex < blocks.size()); + lastBlockId = blocks[lastBlockIndex].blockHash; + m_logger(DEBUGGING) << "Last block hash " << lastBlockId << ", index " << (interval.startHeight + lastBlockIndex); + } + + if (hasErrors) { + m_logger(DEBUGGING) << "Not all blocks were added to consumers, there were errors"; + return UpdateConsumersResult::errorOccurred; + } else if (smthChanged) { m_logger(DEBUGGING) << "Blocks added to consumers"; return UpdateConsumersResult::addedNewBlocks; } else { diff --git a/src/Transfers/BlockchainSynchronizer.h b/src/Transfers/BlockchainSynchronizer.h index 1056f4ba37..353478936c 100644 --- a/src/Transfers/BlockchainSynchronizer.h +++ b/src/Transfers/BlockchainSynchronizer.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -38,7 +39,7 @@ class BlockchainSynchronizer : public: BlockchainSynchronizer(INode& node, Logging::ILogger& logger, const Crypto::Hash& genesisBlockHash); - ~BlockchainSynchronizer(); + virtual ~BlockchainSynchronizer() override; // IBlockchainSynchronizer virtual void addConsumer(IBlockchainConsumer* consumer) override; @@ -146,6 +147,8 @@ class BlockchainSynchronizer : mutable std::mutex m_consumersMutex; mutable std::mutex m_stateMutex; std::condition_variable m_hasWork; + + bool wasStarted = false; }; } diff --git a/src/Transfers/IBlockchainSynchronizer.h b/src/Transfers/IBlockchainSynchronizer.h index 235f5786e8..7023c64df0 100644 --- a/src/Transfers/IBlockchainSynchronizer.h +++ b/src/Transfers/IBlockchainSynchronizer.h @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2019, The Karbo developers // // This file is part of Karbo. // @@ -37,6 +38,7 @@ class IBlockchainSynchronizerObserver { public: virtual void synchronizationProgressUpdated(uint32_t processedBlockCount, uint32_t totalBlockCount) {} virtual void synchronizationCompleted(std::error_code result) {} + virtual ~IBlockchainSynchronizerObserver() {} }; class IBlockchainConsumerObserver; @@ -47,7 +49,7 @@ class IBlockchainConsumer : public IObservable { virtual SynchronizationStart getSyncStart() = 0; virtual const std::unordered_set& getKnownPoolTxIds() const = 0; virtual void onBlockchainDetach(uint32_t height) = 0; - virtual bool onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) = 0; + virtual uint32_t onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) = 0; virtual std::error_code onPoolUpdated(const std::vector>& addedTransactions, const std::vector& deletedTransactions) = 0; virtual std::error_code addUnconfirmedTransaction(const ITransactionReader& transaction) = 0; diff --git a/src/Transfers/TransfersConsumer.cpp b/src/Transfers/TransfersConsumer.cpp index b276e1d7f7..35e186f474 100644 --- a/src/Transfers/TransfersConsumer.cpp +++ b/src/Transfers/TransfersConsumer.cpp @@ -1,7 +1,7 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers -// Copyright (c) 2018, The BBSCoin Developers -// Copyright (c) 2018, The Karbo Developers -// +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers. +// Copyright (c) 2018 BBSCoin developers +// Copyright (c) 2018-2019, The Karbo Developers +// // This file is part of Karbo. // // Karbo is free software: you can redistribute it and/or modify @@ -23,8 +23,8 @@ #include #include "CommonTypes.h" -#include "Common/StringTools.h" #include "Common/BlockingQueue.h" +#include "CryptoNoteCore/CryptoNoteBasicImpl.h" #include "CryptoNoteCore/CryptoNoteFormatUtils.h" #include "CryptoNoteCore/TransactionApi.h" @@ -35,6 +35,7 @@ using namespace Crypto; using namespace Logging; using namespace Common; + std::unordered_set transactions_hash_seen; std::unordered_set public_keys_seen; std::mutex seen_mutex; @@ -43,6 +44,19 @@ namespace { using namespace CryptoNote; +class MarkTransactionConfirmedException : public std::exception { +public: + MarkTransactionConfirmedException(const Crypto::Hash& txHash) { + } + + const Hash& getTxHash() const { + return m_txHash; + } + +private: + Crypto::Hash m_txHash; +}; + void checkOutputKey( const KeyDerivation& derivation, const PublicKey& key, @@ -85,7 +99,6 @@ void findMyOutputs( uint64_t amount; KeyOutput out; tx.getOutput(idx, out, amount); - checkOutputKey(derivation, out.key, keyIndex, idx, spendKeys, outputs); ++keyIndex; @@ -94,10 +107,8 @@ void findMyOutputs( uint64_t amount; MultisignatureOutput out; tx.getOutput(idx, out, amount); - for (const auto& key : out.keys) { checkOutputKey(derivation, key, idx, idx, spendKeys, outputs); - ++keyIndex; } } @@ -134,9 +145,11 @@ ITransfersSubscription& TransfersConsumer::addSubscription(const AccountSubscrip if (res.get() == nullptr) { res.reset(new TransfersSubscription(m_currency, m_logger.getLogger(), subscription)); m_spendKeys.insert(subscription.keys.address.spendPublicKey); + if (m_subscriptions.size() == 1) { m_syncStart = res->getSyncStart(); - } else { + } + else { auto subStart = res->getSyncStart(); m_syncStart.height = std::min(m_syncStart.height, subStart.height); m_syncStart.timestamp = std::min(m_syncStart.timestamp, subStart.timestamp); @@ -180,7 +193,7 @@ void TransfersConsumer::initTransactionPool(const std::unordered_set::max(); + start.height = std::numeric_limits::max(); start.timestamp = std::numeric_limits::max(); for (const auto& kv : m_subscriptions) { @@ -204,13 +217,14 @@ void TransfersConsumer::onBlockchainDetach(uint32_t height) { } } -bool TransfersConsumer::onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) { +uint32_t TransfersConsumer::onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) { assert(blocks); assert(count > 0); struct Tx { TransactionBlockInfo blockInfo; const ITransactionReader* tx; + bool isLastTransactionInBlock; }; struct PreprocessedTx : Tx, PreprocessInfo {}; @@ -226,17 +240,20 @@ bool TransfersConsumer::onNewBlocks(const CompleteBlock* blocks, uint32_t startH BlockingQueue inputQueue(workers * 2); std::atomic stopProcessing(false); + std::atomic emptyBlockCount(0); auto pushingThread = std::async(std::launch::async, [&] { - for( uint32_t i = 0; i < count && !stopProcessing; ++i) { + for (uint32_t i = 0; i < count && !stopProcessing; ++i) { const auto& block = blocks[i].block; if (!block.is_initialized()) { + ++emptyBlockCount; continue; } // filter by syncStartTimestamp if (m_syncStart.timestamp && block->timestamp < m_syncStart.timestamp) { + ++emptyBlockCount; continue; } @@ -252,7 +269,8 @@ bool TransfersConsumer::onNewBlocks(const CompleteBlock* blocks, uint32_t startH continue; } - Tx item = { blockInfo, tx.get() }; + bool isLastTransactionInBlock = blockInfo.transactionIndex + 1 == blocks[i].transactions.size(); + Tx item = { blockInfo, tx.get(), isLastTransactionInBlock }; inputQueue.push(item); ++blockInfo.transactionIndex; } @@ -299,37 +317,68 @@ bool TransfersConsumer::onNewBlocks(const CompleteBlock* blocks, uint32_t startH } } + if (processingError) { + forEachSubscription([&](TransfersSubscription& sub) { + sub.onError(processingError, startHeight); + }); + + return 0; + } + std::vector blockHashes = getBlockHashes(blocks, count); - if (!processingError) { - m_observerManager.notify(&IBlockchainConsumerObserver::onBlocksAdded, this, blockHashes); + m_observerManager.notify(&IBlockchainConsumerObserver::onBlocksAdded, this, blockHashes); - // sort by block height and transaction index in block - std::sort(preprocessedTransactions.begin(), preprocessedTransactions.end(), [](const PreprocessedTx& a, const PreprocessedTx& b) { - return std::tie(a.blockInfo.height, a.blockInfo.transactionIndex) < std::tie(b.blockInfo.height, b.blockInfo.transactionIndex); - }); + // sort by block height and transaction index in block + std::sort(preprocessedTransactions.begin(), preprocessedTransactions.end(), [](const PreprocessedTx& a, const PreprocessedTx& b) { + return std::tie(a.blockInfo.height, a.blockInfo.transactionIndex) < std::tie(b.blockInfo.height, b.blockInfo.transactionIndex); + }); + uint32_t processedBlockCount = static_cast(emptyBlockCount); + try { for (const auto& tx : preprocessedTransactions) { processTransaction(tx.blockInfo, *tx.tx, tx); + + if (tx.isLastTransactionInBlock) { + ++processedBlockCount; + m_logger(TRACE) << "Processed block " << processedBlockCount << " of " << count << ", last processed block index " << tx.blockInfo.height << + ", hash " << blocks[processedBlockCount - 1].blockHash; + + auto newHeight = startHeight + processedBlockCount - 1; + forEachSubscription([newHeight](TransfersSubscription& sub) { + sub.advanceHeight(newHeight); + }); + } } - } else { - forEachSubscription([&](TransfersSubscription& sub) { - sub.onError(processingError, startHeight); + } catch (const MarkTransactionConfirmedException& e) { + m_logger(ERROR, BRIGHT_RED) << "Failed to process block transactions: failed to confirm transaction " << e.getTxHash() << + ", remove this transaction from all containers and transaction pool"; + forEachSubscription([&e](TransfersSubscription& sub) { + sub.deleteUnconfirmedTransaction(e.getTxHash()); }); - return false; + m_poolTxs.erase(e.getTxHash()); + } catch (std::exception& e) { + m_logger(ERROR, BRIGHT_RED) << "Failed to process block transactions, exception: " << e.what(); + } catch (...) { + m_logger(ERROR, BRIGHT_RED) << "Failed to process block transactions, unknown exception"; } - auto newHeight = startHeight + count - 1; - forEachSubscription([newHeight](TransfersSubscription& sub) { - sub.advanceHeight(newHeight); - }); + if (processedBlockCount < count) { + uint32_t detachIndex = startHeight + processedBlockCount; + m_logger(ERROR, BRIGHT_RED) << "Not all block transactions are processed, fully processed block count: " << processedBlockCount << " of " << count << + ", last processed block hash " << (processedBlockCount > 0 ? blocks[processedBlockCount - 1].blockHash : NULL_HASH) << + ", detach block index " << detachIndex << " to remove partially processed block"; + forEachSubscription([detachIndex](TransfersSubscription& sub) { + sub.onBlockchainDetach(detachIndex); + }); + } - return true; + return processedBlockCount; } std::error_code TransfersConsumer::onPoolUpdated(const std::vector>& addedTransactions, const std::vector& deletedTransactions) { TransactionBlockInfo unconfirmedBlockInfo; - unconfirmedBlockInfo.timestamp = 0; + unconfirmedBlockInfo.timestamp = 0; unconfirmedBlockInfo.height = WALLET_UNCONFIRMED_TRANSACTION_HEIGHT; std::error_code processingError; @@ -344,7 +393,7 @@ std::error_code TransfersConsumer::onPoolUpdated(const std::vector lk(seen_mutex); - transactions_hash_seen.insert(transactionHash); - public_keys_seen.insert(outputKey); + std::lock_guard lk(seen_mutex); + transactions_hash_seen.insert(transactionHash); + public_keys_seen.insert(outputKey); } -std::error_code TransfersConsumer::createTransfers( +std::error_code createTransfers( const AccountKeys& account, const TransactionBlockInfo& blockInfo, const ITransactionReader& tx, const std::vector& outputs, const std::vector& globalIdxs, - std::vector& transfers) { + std::vector& transfers, + Logging::LoggerRef& m_logger) { auto txPubKey = tx.getTransactionPublicKey(); auto txHash = tx.getTransactionHash(); std::vector temp_keys; std::lock_guard lk(seen_mutex); - if (account.spendSecretKey == NULL_SECRET_KEY) { - KeyPair deterministic_tx_keys; - bool spending = generateDeterministicTransactionKeys(tx.getTransactionInputsHash(), account.viewSecretKey, deterministic_tx_keys) - && deterministic_tx_keys.publicKey == txPubKey; - - if (spending) { - m_logger(WARNING, BRIGHT_YELLOW) << "Spending in tx " << Common::podToHex(tx.getTransactionHash()); - } - } - for (auto idx : outputs) { if (idx >= tx.getOutputCount()) { @@ -447,18 +487,18 @@ std::error_code TransfersConsumer::createTransfers( assert(out.key == reinterpret_cast(in_ephemeral.publicKey)); std::unordered_set::iterator it = transactions_hash_seen.find(txHash); - if (it == transactions_hash_seen.end()) { + if (it == transactions_hash_seen.end()) { std::unordered_set::iterator key_it = public_keys_seen.find(out.key); if (key_it != public_keys_seen.end()) { - m_logger(ERROR, BRIGHT_RED) << "Failed to process transaction " << Common::podToHex(txHash) << ": duplicate output key is found!"; + throw std::runtime_error("duplicate transaction output key is found"); return std::error_code(); } if (std::find(temp_keys.begin(), temp_keys.end(), out.key) != temp_keys.end()) { - m_logger(ERROR, BRIGHT_RED) << "Failed to process transaction " << Common::podToHex(txHash) << ": the same output key is present more than once"; + throw std::runtime_error("the same output key is present more than once"); return std::error_code(); } temp_keys.push_back(out.key); - } + } info.amount = amount; info.outputKey = out.key; @@ -467,16 +507,16 @@ std::error_code TransfersConsumer::createTransfers( MultisignatureOutput out; tx.getOutput(idx, out, amount); - for (const auto& key : out.keys) { + for (const auto& key : out.keys) { std::unordered_set::iterator it = transactions_hash_seen.find(txHash); if (it == transactions_hash_seen.end()) { std::unordered_set::iterator key_it = public_keys_seen.find(key); if (key_it != public_keys_seen.end()) { - m_logger(ERROR, BRIGHT_RED) << "Failed to process transaction " << Common::podToHex(txHash) << ": duplicate multisignature output key is found"; + throw std::runtime_error("duplicate multisignature output key is found"); return std::error_code(); } if (std::find(temp_keys.begin(), temp_keys.end(), key) != temp_keys.end()) { - m_logger(ERROR, BRIGHT_RED) << "Failed to process transaction " << Common::podToHex(txHash) << ": the same multisignature output key is present more than once"; + throw std::runtime_error("the same multisignature output key is present more than once"); return std::error_code(); } temp_keys.push_back(key); @@ -485,19 +525,27 @@ std::error_code TransfersConsumer::createTransfers( info.amount = amount; info.requiredSignatures = out.requiredSignatureCount; } - transfers.push_back(info); } - transactions_hash_seen.emplace(txHash); - std::copy(temp_keys.begin(), temp_keys.end(), std::inserter(public_keys_seen, public_keys_seen.end())); + transactions_hash_seen.insert(tx.getTransactionHash()); + for (std::vector::iterator it = temp_keys.begin(); it != temp_keys.end(); it++) { + public_keys_seen.insert(*it); + } return std::error_code(); } std::error_code TransfersConsumer::preprocessOutputs(const TransactionBlockInfo& blockInfo, const ITransactionReader& tx, PreprocessInfo& info) { std::unordered_map> outputs; - findMyOutputs(tx, m_viewSecret, m_spendKeys, outputs); + try { + findMyOutputs(tx, m_viewSecret, m_spendKeys, outputs); + } + catch (const std::exception& e) { + m_logger(ERROR, BRIGHT_RED) << "Failed to process transaction: " << e.what() << ", transaction hash " << Common::podToHex(tx.getTransactionHash()); + return std::error_code(); + } + if (outputs.empty()) { return std::error_code(); } @@ -515,9 +563,15 @@ std::error_code TransfersConsumer::preprocessOutputs(const TransactionBlockInfo& auto it = m_subscriptions.find(kv.first); if (it != m_subscriptions.end()) { auto& transfers = info.outputs[kv.first]; - errorCode = createTransfers(it->second->getKeys(), blockInfo, tx, kv.second, info.globalIdxs, transfers); - if (errorCode) { - return errorCode; + try { + errorCode = createTransfers(it->second->getKeys(), blockInfo, tx, kv.second, info.globalIdxs, transfers, m_logger); + if (errorCode) { + return errorCode; + } + } + catch (const std::exception& e) { + m_logger(ERROR, BRIGHT_RED) << "Failed to process transaction: " << e.what() << ", transaction hash " << Common::podToHex(tx.getTransactionHash()); + return std::error_code(); } } } @@ -539,6 +593,8 @@ std::error_code TransfersConsumer::processTransaction(const TransactionBlockInfo void TransfersConsumer::processTransaction(const TransactionBlockInfo& blockInfo, const ITransactionReader& tx, const PreprocessInfo& info) { std::vector emptyOutputs; std::vector transactionContainers; + + m_logger(TRACE) << "Process transaction, block " << blockInfo.height << ", transaction index " << blockInfo.transactionIndex << ", hash " << tx.getTransactionHash(); bool someContainerUpdated = false; for (auto& kv : m_subscriptions) { auto it = info.outputs.find(kv.first); @@ -554,7 +610,10 @@ void TransfersConsumer::processTransaction(const TransactionBlockInfo& blockInfo } if (someContainerUpdated) { + m_logger(TRACE) << "Transaction updated some containers, hash " << tx.getTransactionHash(); m_observerManager.notify(&IBlockchainConsumerObserver::onTransactionUpdated, this, tx.getTransactionHash(), transactionContainers); + } else { + m_logger(TRACE) << "Transaction doesn't updated any container, hash " << tx.getTransactionHash(); } } @@ -567,9 +626,14 @@ void TransfersConsumer::processOutputs(const TransactionBlockInfo& blockInfo, Tr if (contains) { if (subscriptionTxInfo.blockHeight == WALLET_UNCONFIRMED_TRANSACTION_HEIGHT && blockInfo.height != WALLET_UNCONFIRMED_TRANSACTION_HEIGHT) { - // pool->blockchain - sub.markTransactionConfirmed(blockInfo, tx.getTransactionHash(), globalIdxs); - updated = true; + try { + // pool->blockchain + sub.markTransactionConfirmed(blockInfo, tx.getTransactionHash(), globalIdxs); + updated = true; + } catch (...) { + m_logger(ERROR, BRIGHT_RED) << "markTransactionConfirmed failed, throw MarkTransactionConfirmedException"; + throw MarkTransactionConfirmedException(tx.getTransactionHash()); + } } else { assert(subscriptionTxInfo.blockHeight == blockInfo.height); } @@ -579,11 +643,11 @@ void TransfersConsumer::processOutputs(const TransactionBlockInfo& blockInfo, Tr } } -std::error_code TransfersConsumer::getGlobalIndices(const Hash& transactionHash, std::vector& outsGlobalIndices) { +std::error_code TransfersConsumer::getGlobalIndices(const Hash& transactionHash, std::vector& outsGlobalIndices) { std::promise prom; std::future f = prom.get_future(); - INode::Callback cb = [&prom](std::error_code ec) { + INode::Callback cb = [&prom](std::error_code ec) { std::promise p(std::move(prom)); p.set_value(ec); }; diff --git a/src/Transfers/TransfersConsumer.h b/src/Transfers/TransfersConsumer.h index 20f8b021fa..21733f6664 100755 --- a/src/Transfers/TransfersConsumer.h +++ b/src/Transfers/TransfersConsumer.h @@ -35,7 +35,7 @@ namespace CryptoNote { class INode; -class TransfersConsumer: public IObservableImpl { +class TransfersConsumer : public IObservableImpl { public: TransfersConsumer(const CryptoNote::Currency& currency, INode& node, Logging::ILogger& logger, const Crypto::SecretKey& viewSecret); @@ -48,11 +48,11 @@ class TransfersConsumer: public IObservableImpl& uncommitedTransactions); void addPublicKeysSeen(const Crypto::Hash& transactionHash, const Crypto::PublicKey& outputKey); - + // IBlockchainConsumer virtual SynchronizationStart getSyncStart() override; virtual void onBlockchainDetach(uint32_t height) override; - virtual bool onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) override; + virtual uint32_t onNewBlocks(const CompleteBlock* blocks, uint32_t startHeight, uint32_t count) override; virtual std::error_code onPoolUpdated(const std::vector>& addedTransactions, const std::vector& deletedTransactions) override; virtual const std::unordered_set& getKnownPoolTxIds() const override; @@ -78,8 +78,7 @@ class TransfersConsumer: public IObservableImpl& outputs, const std::vector& globalIdxs, bool& contains, bool& updated); - std::error_code createTransfers(const AccountKeys& account, const TransactionBlockInfo& blockInfo, const ITransactionReader& tx, - const std::vector& outputs, const std::vector& globalIdxs, std::vector& transfers); + std::error_code getGlobalIndices(const Crypto::Hash& transactionHash, std::vector& outsGlobalIndices); void updateSyncStart(); diff --git a/src/Transfers/TransfersContainer.cpp b/src/Transfers/TransfersContainer.cpp index e82403242b..3366a58372 100644 --- a/src/Transfers/TransfersContainer.cpp +++ b/src/Transfers/TransfersContainer.cpp @@ -1,4 +1,5 @@ -// Copyright (c) 2012-2016, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2012-2017, The CryptoNote developers, The Bytecoin developers +// Copyright (c) 2016-2020, The Karbo developers // // This file is part of Karbo. // @@ -181,8 +182,6 @@ bool TransfersContainer::addTransaction(const TransactionBlockInfo& block, const try { std::unique_lock lock(m_mutex); - m_logger(TRACE) << "Adding transaction, block " << block.height << ", transaction index " << block.transactionIndex << ", hash " << tx.getTransactionHash(); - if (block.height < m_currentHeight) { auto message = "Failed to add transaction: block index < m_currentHeight"; m_logger(ERROR, BRIGHT_RED) << message << ", block " << block.height << ", m_currentHeight " << m_currentHeight; @@ -200,8 +199,6 @@ bool TransfersContainer::addTransaction(const TransactionBlockInfo& block, const if (added) { addTransaction(block, tx); - } else { - m_logger(TRACE) << "Transaction not added"; } if (block.height != WALLET_LEGACY_UNCONFIRMED_TRANSACTION_HEIGHT) { @@ -457,56 +454,99 @@ bool TransfersContainer::markTransactionConfirmed(const TransactionBlockInfo& bl return false; } - auto txInfo = *transactionIt; - txInfo.blockHeight = block.height; - txInfo.timestamp = block.timestamp; - m_transactions.replace(transactionIt, txInfo); - - auto availableRange = m_unconfirmedTransfers.get().equal_range(transactionHash); - for (auto transferIt = availableRange.first; transferIt != availableRange.second; ) { - auto transfer = *transferIt; - assert(transfer.blockHeight == WALLET_LEGACY_UNCONFIRMED_TRANSACTION_HEIGHT); - assert(transfer.globalOutputIndex == UNCONFIRMED_TRANSACTION_GLOBAL_OUTPUT_INDEX); - if (transfer.outputInTransaction >= globalIndices.size()) { - auto message = "Failed to confirm transaction: not enough elements in globalIndices"; - m_logger(ERROR, BRIGHT_RED) << message << ", globalIndices.size() " << globalIndices.size() << ", output index " << transfer.outputInTransaction; - throw std::invalid_argument(message); - } + try { + auto txInfo = *transactionIt; + txInfo.blockHeight = block.height; + txInfo.timestamp = block.timestamp; + m_transactions.replace(transactionIt, txInfo); + + auto availableRange = m_unconfirmedTransfers.get().equal_range(transactionHash); + for (auto transferIt = availableRange.first; transferIt != availableRange.second; ) { + auto transfer = *transferIt; + assert(transfer.blockHeight == WALLET_LEGACY_UNCONFIRMED_TRANSACTION_HEIGHT); + assert(transfer.globalOutputIndex == UNCONFIRMED_TRANSACTION_GLOBAL_OUTPUT_INDEX); + if (transfer.outputInTransaction >= globalIndices.size()) { + auto message = "Failed to confirm transaction: not enough elements in globalIndices"; + m_logger(ERROR, BRIGHT_RED) << message << ", globalIndices.size() " << globalIndices.size() << ", output index " << transfer.outputInTransaction; + throw std::invalid_argument(message); + } - transfer.blockHeight = block.height; - transfer.transactionIndex = block.transactionIndex; - transfer.globalOutputIndex = globalIndices[transfer.outputInTransaction]; - - if (transfer.type == TransactionTypes::OutputType::Multisignature) { - SpentOutputDescriptor descriptor(transfer); - if (m_availableTransfers.get().count(descriptor) > 0 || - m_spentTransfers.get().count(descriptor) > 0) { - // This exception breaks TransfersContainer consistency - auto message = "Failed to confirm transaction: multisignature output already exists"; - m_logger(ERROR, BRIGHT_RED) << message << ", amount " << m_currency.formatAmount(transfer.amount) << ", global index " << transfer.globalOutputIndex; - throw std::runtime_error(message); + transfer.blockHeight = block.height; + transfer.transactionIndex = block.transactionIndex; + transfer.globalOutputIndex = globalIndices[transfer.outputInTransaction]; + + if (transfer.type == TransactionTypes::OutputType::Multisignature) { + SpentOutputDescriptor descriptor(transfer); + if (m_availableTransfers.get().count(descriptor) > 0 || + m_spentTransfers.get().count(descriptor) > 0) { + // This exception breaks TransfersContainer consistency + auto message = "Failed to confirm transaction: multisignature output already exists"; + m_logger(ERROR, BRIGHT_RED) << message << ", amount " << m_currency.formatAmount(transfer.amount) << ", global index " << transfer.globalOutputIndex; + throw std::runtime_error(message); + } + } + + auto result = m_availableTransfers.emplace(std::move(transfer)); + (void)result; // Disable unused warning + assert(result.second); + + transferIt = m_unconfirmedTransfers.get().erase(transferIt); + + if (transfer.type == TransactionTypes::OutputType::Key) { + updateTransfersVisibility(transfer.keyImage); } } - auto result = m_availableTransfers.emplace(std::move(transfer)); - (void)result; // Disable unused warning - assert(result.second); + auto& spendingTransactionIndex = m_spentTransfers.get(); + auto spentRange = spendingTransactionIndex.equal_range(transactionHash); + for (auto transferIt = spentRange.first; transferIt != spentRange.second; ++transferIt) { + auto transfer = *transferIt; + assert(transfer.spendingBlock.height == WALLET_LEGACY_UNCONFIRMED_TRANSACTION_HEIGHT); + + transfer.spendingBlock = block; + spendingTransactionIndex.replace(transferIt, transfer); + } + } catch (std::exception& e) { + m_logger(ERROR, BRIGHT_RED) << "markTransactionConfirmed failed: " << e.what() << ", rollback changes, block index " << block.height << + ", tx " << transactionHash; + + auto txInfo = *transactionIt; + txInfo.blockHeight = WALLET_LEGACY_UNCONFIRMED_TRANSACTION_HEIGHT; + txInfo.timestamp = 0; + m_transactions.replace(transactionIt, txInfo); + + auto availableRange = m_availableTransfers.get().equal_range(transactionHash); + for (auto transferIt = availableRange.first; transferIt != availableRange.second; ) { + TransactionOutputInformationEx unconfirmedTransfer = *transferIt; + assert(unconfirmedTransfer.blockHeight != WALLET_LEGACY_UNCONFIRMED_TRANSACTION_HEIGHT); + assert(unconfirmedTransfer.globalOutputIndex != UNCONFIRMED_TRANSACTION_GLOBAL_OUTPUT_INDEX); + unconfirmedTransfer.blockHeight = WALLET_LEGACY_UNCONFIRMED_TRANSACTION_HEIGHT; + unconfirmedTransfer.transactionIndex = 0; + unconfirmedTransfer.globalOutputIndex = UNCONFIRMED_TRANSACTION_GLOBAL_OUTPUT_INDEX; + + auto result = m_unconfirmedTransfers.emplace(std::move(unconfirmedTransfer)); + (void)result; // Disable unused warning + assert(result.second); - transferIt = m_unconfirmedTransfers.get().erase(transferIt); + transferIt = m_availableTransfers.get().erase(transferIt); - if (transfer.type == TransactionTypes::OutputType::Key) { - updateTransfersVisibility(transfer.keyImage); + if (unconfirmedTransfer.type == TransactionTypes::OutputType::Key) { + updateTransfersVisibility(unconfirmedTransfer.keyImage); + } } - } - auto& spendingTransactionIndex = m_spentTransfers.get(); - auto spentRange = spendingTransactionIndex.equal_range(transactionHash); - for (auto transferIt = spentRange.first; transferIt != spentRange.second; ++transferIt) { - auto transfer = *transferIt; - assert(transfer.spendingBlock.height == WALLET_LEGACY_UNCONFIRMED_TRANSACTION_HEIGHT); + auto& spendingTransactionIndex = m_spentTransfers.get(); + auto spentRange = spendingTransactionIndex.equal_range(transactionHash); + for (auto transferIt = spentRange.first; transferIt != spentRange.second; ++transferIt) { + auto spentTransfer = *transferIt; + spentTransfer.spendingBlock.height = WALLET_LEGACY_UNCONFIRMED_TRANSACTION_HEIGHT; + spentTransfer.spendingBlock.timestamp = 0; + spentTransfer.spendingBlock.transactionIndex = 0; + + spendingTransactionIndex.replace(transferIt, spentTransfer); + } - transfer.spendingBlock = block; - spendingTransactionIndex.replace(transferIt, transfer); + throw; } return true; @@ -896,7 +936,8 @@ void TransfersContainer::load(std::istream& in) { m_spentTransfers = std::move(spentTransfers); // Repair the container if it was broken while handling addTransaction() in previous version of the code - repair(); + // Hope it isn't necessary anymore + //repair(); } void TransfersContainer::repair() {