diff --git a/example/processing_dapp/CMakeLists.txt b/example/processing_dapp/CMakeLists.txt index c01fe783..ceb4442d 100644 --- a/example/processing_dapp/CMakeLists.txt +++ b/example/processing_dapp/CMakeLists.txt @@ -1,6 +1,5 @@ add_executable(processing_dapp processing_dapp.cpp - processing_task_queue_impl.hpp ) @@ -15,7 +14,6 @@ add_executable(processing_dapp_processor processing_dapp_processor.cpp processing_subtask_result_storage.cpp processing_subtask_result_storage_impl.hpp - processing_task_queue_impl.hpp ) diff --git a/example/processing_dapp/processing_dapp.cpp b/example/processing_dapp/processing_dapp.cpp index 3d78141c..a55500d0 100644 --- a/example/processing_dapp/processing_dapp.cpp +++ b/example/processing_dapp/processing_dapp.cpp @@ -1,4 +1,4 @@ -#include "processing_task_queue_impl.hpp" +#include "processing/impl/processing_task_queue_impl.hpp" #include #include diff --git a/example/processing_dapp/processing_dapp_processor.cpp b/example/processing_dapp/processing_dapp_processor.cpp index f3dfbd7f..6c7f205b 100644 --- a/example/processing_dapp/processing_dapp_processor.cpp +++ b/example/processing_dapp/processing_dapp_processor.cpp @@ -1,19 +1,17 @@ -#include "processing_task_queue_impl.hpp" -#include "processing_subtask_result_storage_impl.hpp" - -#include -#include - -#include -#include - +#include +#include #include #include #include +#include "processing/impl/processing_task_queue_impl.hpp" +#include "processing_subtask_result_storage_impl.hpp" +#include "processing/processing_service.hpp" +#include "processing/processing_subtask_enqueuer_impl.hpp" +#include "crdt/globaldb/keypair_file_storage.hpp" +#include "crdt/globaldb/globaldb.hpp" + -#include -#include using namespace sgns::processing; diff --git a/example/processing_dapp/processing_subtask_result_storage.cpp b/example/processing_dapp/processing_subtask_result_storage.cpp index 0bea44ec..3dd38d58 100644 --- a/example/processing_dapp/processing_subtask_result_storage.cpp +++ b/example/processing_dapp/processing_subtask_result_storage.cpp @@ -25,10 +25,10 @@ void SubTaskResultStorageImpl::RemoveSubTaskResult(const std::string& subTaskId) sgns::crdt::HierarchicalKey((boost::format("results/%s") % subTaskId).str().c_str())); } -void SubTaskResultStorageImpl::GetSubTaskResults( - const std::set& subTaskIds, - std::vector& results) +std::vector SubTaskResultStorageImpl::GetSubTaskResults( + const std::set& subTaskIds) { + std::vector results; for (const auto& subTaskId : subTaskIds) { auto data = m_db->Get(sgns::crdt::HierarchicalKey((boost::format("results/%s") % subTaskId).str().c_str())); @@ -41,6 +41,7 @@ void SubTaskResultStorageImpl::GetSubTaskResults( } } } + return results; } } \ No newline at end of file diff --git a/example/processing_dapp/processing_subtask_result_storage_impl.hpp b/example/processing_dapp/processing_subtask_result_storage_impl.hpp index a40e1586..bfb7a554 100644 --- a/example/processing_dapp/processing_subtask_result_storage_impl.hpp +++ b/example/processing_dapp/processing_subtask_result_storage_impl.hpp @@ -22,9 +22,8 @@ namespace sgns::processing */ void AddSubTaskResult(const SGProcessing::SubTaskResult& result) override; void RemoveSubTaskResult(const std::string& subTaskId) override; - void GetSubTaskResults( - const std::set& subTaskIds, - std::vector& results) override; + std::vector GetSubTaskResults( + const std::set& subTaskIds) override; private: std::shared_ptr m_db; diff --git a/example/processing_dapp/processing_task_queue_impl.hpp b/example/processing_dapp/processing_task_queue_impl.hpp deleted file mode 100644 index 478e7b8d..00000000 --- a/example/processing_dapp/processing_task_queue_impl.hpp +++ /dev/null @@ -1,256 +0,0 @@ -#ifndef GRPC_FOR_SUPERGENIUS_PROCESSING_TASK_QUEUE_IMPL_HPP -#define GRPC_FOR_SUPERGENIUS_PROCESSING_TASK_QUEUE_IMPL_HPP - -#include -#include - -#include - -#include - -namespace sgns::processing -{ - class ProcessingTaskQueueImpl : public ProcessingTaskQueue - { - public: - ProcessingTaskQueueImpl( - std::shared_ptr db) - : m_db(db) - , m_processingTimeout(std::chrono::seconds(10)) - { - } - - void EnqueueTask( - const SGProcessing::Task& task, - const std::list& subTasks) override - { - auto taskKey = (boost::format("tasks/TASK_%d") % task.ipfs_block_id()).str(); - sgns::base::Buffer valueBuffer; - valueBuffer.put(task.SerializeAsString()); - auto setKeyResult = m_db->Put(sgns::crdt::HierarchicalKey(taskKey), valueBuffer); - if (setKeyResult.has_failure()) - { - m_logger->debug("Unable to put key-value to CRDT datastore."); - } - - // Check if data put - auto getKeyResult = m_db->Get(sgns::crdt::HierarchicalKey(taskKey)); - if (getKeyResult.has_failure()) - { - m_logger->debug("Unable to find key in CRDT datastore"); - } - else - { - m_logger->debug("[{}] placed to GlobalDB ", taskKey); - // getKeyResult.value().toString() - } - for (auto& subTask: subTasks) - { - auto subTaskKey = (boost::format("subtasks/TASK_%s/%s") % task.ipfs_block_id() % subTask.subtaskid()).str(); - valueBuffer.put(subTask.SerializeAsString()); - auto setKeyResult = m_db->Put(sgns::crdt::HierarchicalKey(subTaskKey), valueBuffer); - if (setKeyResult.has_failure()) - { - m_logger->debug("Unable to put key-value to CRDT datastore."); - } - - // Check if data put - auto getKeyResult = m_db->Get(sgns::crdt::HierarchicalKey(taskKey)); - if (getKeyResult.has_failure()) - { - m_logger->debug("Unable to find key in CRDT datastore"); - } - else - { - m_logger->debug("[{}] placed to GlobalDB ", taskKey); - // getKeyResult.value().toString() - } - } - } - - bool GetSubTasks( - const std::string& taskId, - std::list& subTasks) override - { - m_logger->debug("SUBTASKS_REQUESTED. TaskId: {}", taskId); - auto key = (boost::format("subtasks/TASK_%s") % taskId).str(); - auto querySubTasks = m_db->QueryKeyValues(key); - - if (querySubTasks.has_failure()) - { - m_logger->info("Unable list subtasks from CRDT datastore"); - return false; - } - - if (querySubTasks.has_value()) - { - m_logger->debug("SUBTASKS_FOUND {}", querySubTasks.value().size()); - - for (auto element : querySubTasks.value()) - { - SGProcessing::SubTask subTask; - if (subTask.ParseFromArray(element.second.data(), element.second.size())) - { - subTasks.push_back(std::move(subTask)); - } - else - { - m_logger->debug("Undable to parse a subtask"); - } - } - - return true; - } - else - { - m_logger->debug("NO_SUBTASKS_FOUND. TaskId {}", taskId); - return false; - } - } - - bool GrabTask(std::string& grabbedTaskKey, SGProcessing::Task& task) override - { - m_logger->info("GRAB_TASK"); - - auto queryTasks = m_db->QueryKeyValues("tasks"); - if (queryTasks.has_failure()) - { - m_logger->info("Unable list tasks from CRDT datastore"); - return false; - } - - std::set lockedTasks; - if (queryTasks.has_value()) - { - m_logger->info("TASK_QUEUE_SIZE: {}", queryTasks.value().size()); - bool isTaskGrabbed = false; - for (auto element : queryTasks.value()) - { - auto taskKey = m_db->KeyToString(element.first); - if (taskKey.has_value()) - { - bool isTaskLocked = IsTaskLocked(taskKey.value()); - m_logger->debug("TASK_QUEUE_ITEM: {}, LOCKED: {}", taskKey.value(), isTaskLocked); - - if (!isTaskLocked) - { - if (task.ParseFromArray(element.second.data(), element.second.size())) - { - if (LockTask(taskKey.value())) - { - m_logger->debug("TASK_LOCKED {}", taskKey.value()); - grabbedTaskKey = task.ipfs_block_id(); - return true; - } - } - } - else - { - m_logger->debug("TASK_PREVIOUSLY_LOCKED {}", taskKey.value()); - lockedTasks.insert(taskKey.value()); - } - } - else - { - m_logger->debug("Undable to convert a key to string"); - } - } - - // No task was grabbed so far - for (auto lockedTask : lockedTasks) - { - if (MoveExpiredTaskLock(lockedTask, task)) - { - grabbedTaskKey = task.ipfs_block_id(); - return true; - } - } - - } - return false; - } - - bool CompleteTask(const std::string& taskKey, const SGProcessing::TaskResult& taskResult) override - { - sgns::base::Buffer data; - data.put(taskResult.SerializeAsString()); - - auto transaction = m_db->BeginTransaction(); - transaction->AddToDelta(sgns::crdt::HierarchicalKey("task_results/" + taskKey), data); - transaction->RemoveFromDelta(sgns::crdt::HierarchicalKey("lock_" + taskKey)); - transaction->RemoveFromDelta(sgns::crdt::HierarchicalKey(taskKey)); - - auto res = transaction->PublishDelta(); - m_logger->debug("TASK_COMPLETED: {}", taskKey); - return !res.has_failure(); - } - - bool IsTaskLocked(const std::string& taskKey) - { - auto lockData = m_db->Get(sgns::crdt::HierarchicalKey("lock_" + taskKey)); - return (!lockData.has_failure() && lockData.has_value()); - } - - bool LockTask(const std::string& taskKey) - { - auto timestamp = std::chrono::system_clock::now(); - - SGProcessing::TaskLock lock; - lock.set_task_id(taskKey); - lock.set_lock_timestamp(timestamp.time_since_epoch().count()); - - sgns::base::Buffer lockData; - lockData.put(lock.SerializeAsString()); - - auto res = m_db->Put(sgns::crdt::HierarchicalKey("lock_" + taskKey), lockData); - return !res.has_failure(); - } - - bool MoveExpiredTaskLock(const std::string& taskKey, SGProcessing::Task& task) - { - auto timestamp = std::chrono::system_clock::now(); - - auto lockData = m_db->Get(sgns::crdt::HierarchicalKey("lock_" + taskKey)); - if (!lockData.has_failure() && lockData.has_value()) - { - // Check task expiration - SGProcessing::TaskLock lock; - if (lock.ParseFromArray(lockData.value().data(), lockData.value().size())) - { - auto expirationTime = - std::chrono::system_clock::time_point( - std::chrono::system_clock::duration(lock.lock_timestamp())) + m_processingTimeout; - if (timestamp > expirationTime) - { - auto taskData = m_db->Get(taskKey); - - if (!taskData.has_failure()) - { - if (task.ParseFromArray(taskData.value().data(), taskData.value().size())) - { - if (LockTask(taskKey)) - { - m_logger->debug("TASK_LOCK_MOVED {}", taskKey); - return true; - } - } - } - else - { - m_logger->debug("Unable to find a task {}", taskKey); - } - } - } - } - return false; - } - - private: - std::shared_ptr m_db; - std::chrono::system_clock::duration m_processingTimeout; - sgns::base::Logger m_logger = sgns::base::createLogger("ProcessingTaskQueueImpl"); - }; - -} - -#endif // GRPC_FOR_SUPERGENIUS_PROCESSING_TASK_QUEUE_HPP diff --git a/example/processing_room/processing_app.cpp b/example/processing_room/processing_app.cpp index 149ca4ca..4674e9a4 100644 --- a/example/processing_room/processing_app.cpp +++ b/example/processing_room/processing_app.cpp @@ -28,9 +28,8 @@ namespace public: void AddSubTaskResult(const SGProcessing::SubTaskResult& subTaskResult) override {} void RemoveSubTaskResult(const std::string& subTaskId) override {} - void GetSubTaskResults( - const std::set& subTaskIds, - std::vector& results) override {} + std::vector GetSubTaskResults( + const std::set& subTaskIds) override { return {};} }; class TaskSplitter @@ -121,19 +120,19 @@ namespace return false; } - bool GrabTask(std::string& taskKey, SGProcessing::Task& task) override + outcome::result> GrabTask() override { if (m_tasks.empty()) { - return false; + return outcome::failure(boost::system::error_code{}); } - + SGProcessing::Task task; task = std::move(m_tasks.back()); m_tasks.pop_back(); - taskKey = (boost::format("TASK_%d") % m_tasks.size()).str(); + std::string taskKey = (boost::format("TASK_%d") % m_tasks.size()).str(); - return true; + return std::make_pair(taskKey, task); }; bool CompleteTask(const std::string& taskKey, const SGProcessing::TaskResult& task) override diff --git a/src/account/TransactionManager.cpp b/src/account/TransactionManager.cpp index bad9d8b1..021d8639 100644 --- a/src/account/TransactionManager.cpp +++ b/src/account/TransactionManager.cpp @@ -354,14 +354,17 @@ namespace sgns { auto dest_infos = tx.GetUTXOParameters(); - if ( dest_infos.outputs_.size() == 2 ) + if ( dest_infos.outputs_.size() ) { - //has to be 1 for me and 1 for escrow + //The first is the escrow, second is the change (might not happen) auto hash = ( base::Hash256::fromReadableString( tx.dag_st.data_hash() ) ).value(); - if ( dest_infos.outputs_[1].dest_address == account_m->GetAddress() ) + if ( dest_infos.outputs_.size() > 1 ) { - GeniusUTXO new_utxo( hash, 1, uint64_t{ dest_infos.outputs_[1].encrypted_amount } ); - account_m->PutUTXO( new_utxo ); + if ( dest_infos.outputs_[1].dest_address == account_m->GetAddress() ) + { + GeniusUTXO new_utxo( hash, 1, uint64_t{ dest_infos.outputs_[1].encrypted_amount } ); + account_m->PutUTXO( new_utxo ); + } } auto dest_infos = tx.GetUTXOParameters(); InputUTXOInfo escrow_utxo; diff --git a/src/account/UTXOTxParameters.hpp b/src/account/UTXOTxParameters.hpp index 71c4f540..4b185032 100644 --- a/src/account/UTXOTxParameters.hpp +++ b/src/account/UTXOTxParameters.hpp @@ -9,7 +9,8 @@ #include "account/GeniusUTXO.hpp" #include -#include +#include "outcome/outcome.hpp" + namespace sgns { @@ -43,8 +44,6 @@ namespace sgns { UTXOTxParameters instance( utxo_pool, src_address, amount, dest_address, signature ); - std::cout << "Destination addr" << std::hex << dest_address << std::endl; - if ( instance.inputs_.size() ) { return instance; diff --git a/src/processing/impl/processing_subtask_result_storage_impl.cpp b/src/processing/impl/processing_subtask_result_storage_impl.cpp index bbe78ad0..c828ee7c 100644 --- a/src/processing/impl/processing_subtask_result_storage_impl.cpp +++ b/src/processing/impl/processing_subtask_result_storage_impl.cpp @@ -3,43 +3,41 @@ namespace sgns::processing { - SubTaskResultStorageImpl::SubTaskResultStorageImpl(std::shared_ptr db) - : m_db(db) + SubTaskResultStorageImpl::SubTaskResultStorageImpl( std::shared_ptr db ) : m_db( db ) { } - void SubTaskResultStorageImpl::AddSubTaskResult(const SGProcessing::SubTaskResult& result) + void SubTaskResultStorageImpl::AddSubTaskResult( const SGProcessing::SubTaskResult &result ) { sgns::crdt::GlobalDB::Buffer data; - data.put(result.SerializeAsString()); + data.put( result.SerializeAsString() ); - auto taskId = - m_db->Put( - sgns::crdt::HierarchicalKey((boost::format("results/%s") % result.subtaskid()).str().c_str()), - data); + auto taskId = m_db->Put( + sgns::crdt::HierarchicalKey( ( boost::format( "results/%s" ) % result.subtaskid() ).str().c_str() ), data ); } - void SubTaskResultStorageImpl::RemoveSubTaskResult(const std::string& subTaskId) + void SubTaskResultStorageImpl::RemoveSubTaskResult( const std::string &subTaskId ) { - m_db->Remove( - sgns::crdt::HierarchicalKey((boost::format("results/%s") % subTaskId).str().c_str())); + m_db->Remove( sgns::crdt::HierarchicalKey( ( boost::format( "results/%s" ) % subTaskId ).str().c_str() ) ); } - void SubTaskResultStorageImpl::GetSubTaskResults( - const std::set& subTaskIds, - std::vector& results) + std::vector SubTaskResultStorageImpl::GetSubTaskResults( + const std::set &subTaskIds ) { - for (const auto& subTaskId : subTaskIds) + std::vector results; + for ( const auto &subTaskId : subTaskIds ) { - auto data = m_db->Get(sgns::crdt::HierarchicalKey((boost::format("results/%s") % subTaskId).str().c_str())); - if (data) + auto data = + m_db->Get( sgns::crdt::HierarchicalKey( ( boost::format( "results/%s" ) % subTaskId ).str().c_str() ) ); + if ( data ) { SGProcessing::SubTaskResult result; - if (result.ParseFromArray(data.value().data(), data.value().size())) + if ( result.ParseFromArray( data.value().data(), data.value().size() ) ) { - results.push_back(std::move(result)); + results.push_back( std::move( result ) ); } } } + return results; } -} \ No newline at end of file +} diff --git a/src/processing/impl/processing_subtask_result_storage_impl.hpp b/src/processing/impl/processing_subtask_result_storage_impl.hpp index 65253980..5609f5b2 100644 --- a/src/processing/impl/processing_subtask_result_storage_impl.hpp +++ b/src/processing/impl/processing_subtask_result_storage_impl.hpp @@ -11,7 +11,7 @@ namespace sgns::processing { -/** Handles subtask states storage + /** Handles subtask states storage */ class SubTaskResultStorageImpl : public SubTaskResultStorage { @@ -19,25 +19,23 @@ namespace sgns::processing /** Create a subtask storage * @param db - CRDT globaldb to use */ - SubTaskResultStorageImpl(std::shared_ptr db); + SubTaskResultStorageImpl( std::shared_ptr db ); /** Add a subtask result * @param result - Result to add */ - void AddSubTaskResult(const SGProcessing::SubTaskResult& result) override; + void AddSubTaskResult( const SGProcessing::SubTaskResult &result ) override; /** Remove a subtask result * @param subTaskId - Result ID to remove */ - void RemoveSubTaskResult(const std::string& subTaskId) override; + void RemoveSubTaskResult( const std::string &subTaskId ) override; /** Get results for subtasks * @param subTaskIds - List of subtasks IDs to get results for * @param results - List of results reference. */ - void GetSubTaskResults( - const std::set& subTaskIds, - std::vector& results) override; + std::vector GetSubTaskResults( const std::set &subTaskIds ) override; private: std::shared_ptr m_db; diff --git a/src/processing/impl/processing_task_queue_impl.cpp b/src/processing/impl/processing_task_queue_impl.cpp index eb9c5257..c8ce3096 100644 --- a/src/processing/impl/processing_task_queue_impl.cpp +++ b/src/processing/impl/processing_task_queue_impl.cpp @@ -2,80 +2,78 @@ namespace sgns::processing { - void ProcessingTaskQueueImpl::EnqueueTask( - const SGProcessing::Task& task, - const std::list& subTasks) + void ProcessingTaskQueueImpl::EnqueueTask( const SGProcessing::Task &task, + const std::list &subTasks ) { - auto taskKey = (boost::format("tasks/TASK_%d") % task.ipfs_block_id()).str(); + auto taskKey = ( boost::format( "tasks/TASK_%d" ) % task.ipfs_block_id() ).str(); sgns::base::Buffer valueBuffer; - valueBuffer.put(task.SerializeAsString()); - auto setKeyResult = m_db->Put(sgns::crdt::HierarchicalKey(taskKey), valueBuffer); - if (setKeyResult.has_failure()) + valueBuffer.put( task.SerializeAsString() ); + auto setKeyResult = m_db->Put( sgns::crdt::HierarchicalKey( taskKey ), valueBuffer ); + if ( setKeyResult.has_failure() ) { - m_logger->debug("Unable to put key-value to CRDT datastore."); + m_logger->debug( "Unable to put key-value to CRDT datastore." ); } // Check if data put - auto getKeyResult = m_db->Get(sgns::crdt::HierarchicalKey(taskKey)); - if (getKeyResult.has_failure()) + auto getKeyResult = m_db->Get( sgns::crdt::HierarchicalKey( taskKey ) ); + if ( getKeyResult.has_failure() ) { - m_logger->debug("Unable to find key in CRDT datastore"); + m_logger->debug( "Unable to find key in CRDT datastore" ); } else { - m_logger->debug("[{}] placed to GlobalDB ", taskKey); + m_logger->debug( "[{}] placed to GlobalDB ", taskKey ); } - for (auto& subTask : subTasks) + for ( auto &subTask : subTasks ) { - auto subTaskKey = (boost::format("subtasks/TASK_%s/%s") % task.ipfs_block_id() % subTask.subtaskid()).str(); - valueBuffer.put(subTask.SerializeAsString()); - auto setKeyResult = m_db->Put(sgns::crdt::HierarchicalKey(subTaskKey), valueBuffer); - if (setKeyResult.has_failure()) + auto subTaskKey = + ( boost::format( "subtasks/TASK_%s/%s" ) % task.ipfs_block_id() % subTask.subtaskid() ).str(); + valueBuffer.put( subTask.SerializeAsString() ); + auto setKeyResult = m_db->Put( sgns::crdt::HierarchicalKey( subTaskKey ), valueBuffer ); + if ( setKeyResult.has_failure() ) { - m_logger->debug("Unable to put key-value to CRDT datastore."); + m_logger->debug( "Unable to put key-value to CRDT datastore." ); } // Check if data put - auto getKeyResult = m_db->Get(sgns::crdt::HierarchicalKey(taskKey)); - if (getKeyResult.has_failure()) + auto getKeyResult = m_db->Get( sgns::crdt::HierarchicalKey( taskKey ) ); + if ( getKeyResult.has_failure() ) { - m_logger->debug("Unable to find key in CRDT datastore"); + m_logger->debug( "Unable to find key in CRDT datastore" ); } else { - m_logger->debug("[{}] placed to GlobalDB ", taskKey); + m_logger->debug( "[{}] placed to GlobalDB ", taskKey ); } } } - bool ProcessingTaskQueueImpl::GetSubTasks( - const std::string& taskId, - std::list& subTasks) + bool ProcessingTaskQueueImpl::GetSubTasks( const std::string &taskId, std::list &subTasks ) { - m_logger->debug("SUBTASKS_REQUESTED. TaskId: {}", taskId); - auto key = (boost::format("subtasks/TASK_%s") % taskId).str(); - auto querySubTasks = m_db->QueryKeyValues(key); + m_logger->debug( "SUBTASKS_REQUESTED. TaskId: {}", taskId ); + auto key = ( boost::format( "subtasks/TASK_%s" ) % taskId ).str(); + auto querySubTasks = m_db->QueryKeyValues( key ); - if (querySubTasks.has_failure()) + if ( querySubTasks.has_failure() ) { - m_logger->info("Unable list subtasks from CRDT datastore"); + m_logger->info( "Unable list subtasks from CRDT datastore" ); return false; } - if (querySubTasks.has_value()) + if ( querySubTasks.has_value() ) { - m_logger->debug("SUBTASKS_FOUND {}", querySubTasks.value().size()); + m_logger->debug( "SUBTASKS_FOUND {}", querySubTasks.value().size() ); - for (auto element : querySubTasks.value()) + for ( auto element : querySubTasks.value() ) { SGProcessing::SubTask subTask; - if (subTask.ParseFromArray(element.second.data(), element.second.size())) + if ( subTask.ParseFromArray( element.second.data(), element.second.size() ) ) { - subTasks.push_back(std::move(subTask)); + subTasks.push_back( std::move( subTask ) ); } else { - m_logger->debug("Undable to parse a subtask"); + m_logger->debug( "Undable to parse a subtask" ); } } @@ -83,140 +81,152 @@ namespace sgns::processing } else { - m_logger->debug("NO_SUBTASKS_FOUND. TaskId {}", taskId); + m_logger->debug( "NO_SUBTASKS_FOUND. TaskId {}", taskId ); return false; } } - bool ProcessingTaskQueueImpl::GrabTask(std::string& grabbedTaskKey, SGProcessing::Task& task) + outcome::result> ProcessingTaskQueueImpl::GrabTask() { - m_logger->info("GRAB_TASK"); - auto queryTasks = m_db->QueryKeyValues("tasks"); - if (queryTasks.has_failure()) - { - m_logger->info("Unable list tasks from CRDT datastore"); - return false; - } + m_logger->info( "GRAB_TASK" ); + OUTCOME_TRY( ( auto &&, queryTasks ), m_db->QueryKeyValues( "tasks" ) ); + m_logger->info( "Task list grabbed from CRDT datastore" ); + + bool task_grabbed = false; std::set lockedTasks; - if (queryTasks.has_value()) + SGProcessing::Task task; + m_logger->info( "Number of tasks in Queue: {}", queryTasks.size() ); + for ( auto element : queryTasks ) { - m_logger->info("TASK_QUEUE_SIZE: {}", queryTasks.value().size()); - bool isTaskGrabbed = false; - for (auto element : queryTasks.value()) + auto taskKey = m_db->KeyToString( element.first ); + if ( !taskKey.has_value() ) { - auto taskKey = m_db->KeyToString(element.first); - if (taskKey.has_value()) - { - bool isTaskLocked = IsTaskLocked(taskKey.value()); - m_logger->debug("TASK_QUEUE_ITEM: {}, LOCKED: {}", taskKey.value(), isTaskLocked); - - if (!isTaskLocked) - { - if (task.ParseFromArray(element.second.data(), element.second.size())) - { - if (LockTask(taskKey.value())) - { - m_logger->debug("TASK_LOCKED {}", taskKey.value()); - grabbedTaskKey = task.ipfs_block_id(); - return true; - } - } - } - else - { - m_logger->debug("TASK_PREVIOUSLY_LOCKED {}", taskKey.value()); - lockedTasks.insert(taskKey.value()); - } - } - else - { - m_logger->debug("Undable to convert a key to string"); - } + m_logger->debug( "Unable to convert a key to string" ); + continue; + } + std::cout << "Trying to get results from " << "task_results/" + taskKey.value() << std::endl; + auto maybe_previous_result = m_db->Get( { "task_results/" + taskKey.value() } ); + if ( maybe_previous_result ) + { + m_logger->debug( "Task already processed" ); + continue; } - // No task was grabbed so far - for (auto lockedTask : lockedTasks) + if ( IsTaskLocked( taskKey.value() ) ) { - if (MoveExpiredTaskLock(lockedTask, task)) - { - grabbedTaskKey = task.ipfs_block_id(); - return true; - } + m_logger->debug( "TASK_PREVIOUSLY_LOCKED {}", taskKey.value() ); + lockedTasks.insert( taskKey.value() ); + continue; + } + m_logger->debug( "TASK_QUEUE_ITEM: {}, LOCKED: true", taskKey.value() ); + if ( !task.ParseFromArray( element.second.data(), element.second.size() ) ) + { + m_logger->debug( "Couldn't parse the task from Protobuf" ); + //TODO - Decide what to do with an invalid task - Maybe error? + continue; } + if ( !LockTask( taskKey.value() ) ) + { + m_logger->debug( "Failed to lock task" ); + continue; + } + m_logger->debug( "TASK_LOCKED {}", taskKey.value() ); + task_grabbed = true; + break; + } + // No task was grabbed so far + for ( auto lockedTask : lockedTasks ) + { + if ( MoveExpiredTaskLock( lockedTask, task ) ) + { + task_grabbed = true; + break; + } + } + + if ( task_grabbed ) + { + return std::make_pair( task.ipfs_block_id(), task ); + } + else + { + return outcome::failure( boost::system::error_code{} ); } - return false; } - bool ProcessingTaskQueueImpl::CompleteTask(const std::string& taskKey, const SGProcessing::TaskResult& taskResult) + bool ProcessingTaskQueueImpl::CompleteTask( const std::string &taskKey, const SGProcessing::TaskResult &taskResult ) { sgns::base::Buffer data; - data.put(taskResult.SerializeAsString()); - - auto transaction = m_db->BeginTransaction(); - transaction->AddToDelta(sgns::crdt::HierarchicalKey("task_results/" + taskKey), data); - transaction->RemoveFromDelta(sgns::crdt::HierarchicalKey("lock_" + taskKey)); - transaction->RemoveFromDelta(sgns::crdt::HierarchicalKey(taskKey)); - - auto res = transaction->PublishDelta(); - m_logger->debug("TASK_COMPLETED: {}", taskKey); - return !res.has_failure(); + data.put( taskResult.SerializeAsString() ); + + std::cout << "Completing the task and storing results on " << "task_results/" + taskKey << std::endl; + m_db->Put({"task_results/tasks/TASK_" + taskKey }, data); + m_db->Remove({"lock_tasks/TASK_" + taskKey}); + //auto transaction = m_db->BeginTransaction(); + //transaction->AddToDelta( sgns::crdt::HierarchicalKey( "task_results/" + taskKey ), data ); + //transaction->RemoveFromDelta( sgns::crdt::HierarchicalKey( "lock_" + taskKey ) ); + //transaction->RemoveFromDelta( sgns::crdt::HierarchicalKey( taskKey ) ); +// + //auto res = transaction->PublishDelta(); + m_logger->debug( "TASK_COMPLETED: {}", taskKey ); + return true; } - bool ProcessingTaskQueueImpl::IsTaskLocked(const std::string& taskKey) + bool ProcessingTaskQueueImpl::IsTaskLocked( const std::string &taskKey ) { - auto lockData = m_db->Get(sgns::crdt::HierarchicalKey("lock_" + taskKey)); - return (!lockData.has_failure() && lockData.has_value()); + auto lockData = m_db->Get( sgns::crdt::HierarchicalKey( "lock_" + taskKey ) ); + return !lockData.has_failure() && lockData.has_value(); } - bool ProcessingTaskQueueImpl::LockTask(const std::string& taskKey) + bool ProcessingTaskQueueImpl::LockTask( const std::string &taskKey ) { auto timestamp = std::chrono::system_clock::now(); SGProcessing::TaskLock lock; - lock.set_task_id(taskKey); - lock.set_lock_timestamp(timestamp.time_since_epoch().count()); + lock.set_task_id( taskKey ); + lock.set_lock_timestamp( timestamp.time_since_epoch().count() ); sgns::base::Buffer lockData; - lockData.put(lock.SerializeAsString()); + lockData.put( lock.SerializeAsString() ); - auto res = m_db->Put(sgns::crdt::HierarchicalKey("lock_" + taskKey), lockData); + auto res = m_db->Put( sgns::crdt::HierarchicalKey( "lock_" + taskKey ), lockData ); return !res.has_failure(); } - bool ProcessingTaskQueueImpl::MoveExpiredTaskLock(const std::string& taskKey, SGProcessing::Task& task) + bool ProcessingTaskQueueImpl::MoveExpiredTaskLock( const std::string &taskKey, SGProcessing::Task &task ) { auto timestamp = std::chrono::system_clock::now(); - auto lockData = m_db->Get(sgns::crdt::HierarchicalKey("lock_" + taskKey)); - if (!lockData.has_failure() && lockData.has_value()) + auto lockData = m_db->Get( sgns::crdt::HierarchicalKey( "lock_" + taskKey ) ); + if ( !lockData.has_failure() && lockData.has_value() ) { // Check task expiration SGProcessing::TaskLock lock; - if (lock.ParseFromArray(lockData.value().data(), lockData.value().size())) + if ( lock.ParseFromArray( lockData.value().data(), lockData.value().size() ) ) { - auto expirationTime = - std::chrono::system_clock::time_point( - std::chrono::system_clock::duration(lock.lock_timestamp())) + m_processingTimeout; - if (timestamp > expirationTime) + auto expirationTime = std::chrono::system_clock::time_point( + std::chrono::system_clock::duration( lock.lock_timestamp() ) ) + + m_processingTimeout; + if ( timestamp > expirationTime ) { - auto taskData = m_db->Get(taskKey); + auto taskData = m_db->Get( taskKey ); - if (!taskData.has_failure()) + if ( !taskData.has_failure() ) { - if (task.ParseFromArray(taskData.value().data(), taskData.value().size())) + if ( task.ParseFromArray( taskData.value().data(), taskData.value().size() ) ) { - if (LockTask(taskKey)) + if ( LockTask( taskKey ) ) { - m_logger->debug("TASK_LOCK_MOVED {}", taskKey); + m_logger->debug( "TASK_LOCK_MOVED {}", taskKey ); return true; } } } else { - m_logger->debug("Unable to find a task {}", taskKey); + m_logger->debug( "Unable to find a task {}", taskKey ); } } } diff --git a/src/processing/impl/processing_task_queue_impl.hpp b/src/processing/impl/processing_task_queue_impl.hpp index becff785..d8a0e196 100644 --- a/src/processing/impl/processing_task_queue_impl.hpp +++ b/src/processing/impl/processing_task_queue_impl.hpp @@ -6,12 +6,13 @@ #ifndef GRPC_FOR_SUPERGENIUS_PROCESSING_TASK_QUEUE_IMPL_HPP #define GRPC_FOR_SUPERGENIUS_PROCESSING_TASK_QUEUE_IMPL_HPP -#include -#include - +#include +#include #include +#include "outcome/outcome.hpp" -#include +#include "processing/processing_task_queue.hpp" +#include "crdt/globaldb/globaldb.hpp" namespace sgns::processing @@ -22,10 +23,8 @@ namespace sgns::processing /** Create a task queue * @param db - CRDT globaldb to use */ - ProcessingTaskQueueImpl( - std::shared_ptr db) - : m_db(db) - , m_processingTimeout(std::chrono::seconds(10)) + ProcessingTaskQueueImpl( std::shared_ptr db ) : + m_db( db ), m_processingTimeout( std::chrono::seconds( 10 ) ) { } @@ -33,50 +32,46 @@ namespace sgns::processing * @param task - Task to add * @param subTasks - List of subtasks */ - void EnqueueTask( - const SGProcessing::Task& task, - const std::list& subTasks) override; + void EnqueueTask( const SGProcessing::Task &task, const std::list &subTasks ) override; /** Get subtasks by task id, returns true if we got subtasks * @param taskId - id to look for subtasks of * @param subTasks - Reference of subtask list */ - bool GetSubTasks( - const std::string& taskId, - std::list& subTasks) override; + bool GetSubTasks( const std::string &taskId, std::list &subTasks ) override; /** Get task by task key, returns true if we got a task * @param grabbedTaskKey - id to look for task * @param task - Reference of task */ - bool GrabTask(std::string& grabbedTaskKey, SGProcessing::Task& task) override; + outcome::result> GrabTask() override; /** Complete task by task key, returns true if successful * @param taskKey - id to look for task * @param taskResult - Reference of a task result */ - bool CompleteTask(const std::string& taskKey, const SGProcessing::TaskResult& taskResult) override; + bool CompleteTask( const std::string &taskKey, const SGProcessing::TaskResult &taskResult ) override; /** Find if a task is locked * @param taskKey - id to look for task */ - bool IsTaskLocked(const std::string& taskKey); + bool IsTaskLocked( const std::string &taskKey ); /** Lock a task by key * @param taskKey - id to look for task */ - bool LockTask(const std::string& taskKey); + bool LockTask( const std::string &taskKey ); /** Move lock if expired, true if successful * @param taskKey - id to look for task * @param task - task reference */ - bool MoveExpiredTaskLock(const std::string& taskKey, SGProcessing::Task& task); + bool MoveExpiredTaskLock( const std::string &taskKey, SGProcessing::Task &task ); private: std::shared_ptr m_db; - std::chrono::system_clock::duration m_processingTimeout; - sgns::base::Logger m_logger = sgns::base::createLogger("ProcessingTaskQueueImpl"); + std::chrono::system_clock::duration m_processingTimeout; + sgns::base::Logger m_logger = sgns::base::createLogger( "ProcessingTaskQueueImpl" ); }; } diff --git a/src/processing/processing_subtask_enqueuer_impl.cpp b/src/processing/processing_subtask_enqueuer_impl.cpp index e203cdc1..3654e685 100644 --- a/src/processing/processing_subtask_enqueuer_impl.cpp +++ b/src/processing/processing_subtask_enqueuer_impl.cpp @@ -2,28 +2,26 @@ namespace sgns::processing { -SubTaskEnqueuerImpl::SubTaskEnqueuerImpl( - std::shared_ptr taskQueue) - : m_taskQueue(taskQueue) -{ -} + SubTaskEnqueuerImpl::SubTaskEnqueuerImpl( std::shared_ptr taskQueue ) : + m_taskQueue( taskQueue ) + { + } -bool SubTaskEnqueuerImpl::EnqueueSubTasks( - std::string& subTaskQueueId, - std::list& subTasks) -{ - SGProcessing::Task task; - std::string taskKey; - if (m_taskQueue->GrabTask(taskKey, task)) + bool SubTaskEnqueuerImpl::EnqueueSubTasks( std::string &subTaskQueueId, std::list &subTasks ) { - subTaskQueueId = taskKey; + if ( auto maybe_grabbed = m_taskQueue->GrabTask() ) + { + std::string taskKey = maybe_grabbed.value().first; + //SGProcessing::Task task = maybe_grabbed.value().second; //TODO - Not used for anything. Rewrite this code + + subTaskQueueId = taskKey; - m_taskQueue->GetSubTasks(taskKey, subTasks); + m_taskQueue->GetSubTasks( taskKey, subTasks ); - m_logger->debug("ENQUEUE_SUBTASKS: {}", subTasks.size()); - return true; + m_logger->debug( "ENQUEUE_SUBTASKS: {}", subTasks.size() ); + return true; + } + return false; } - return false; -} -} \ No newline at end of file +} diff --git a/src/processing/processing_subtask_enqueuer_impl.hpp b/src/processing/processing_subtask_enqueuer_impl.hpp index 5566458b..b46a7500 100644 --- a/src/processing/processing_subtask_enqueuer_impl.hpp +++ b/src/processing/processing_subtask_enqueuer_impl.hpp @@ -9,22 +9,18 @@ namespace sgns::processing { -// Encapsulates subtask queue construction algorithm -class SubTaskEnqueuerImpl : public SubTaskEnqueuer -{ -public: - SubTaskEnqueuerImpl( - std::shared_ptr taskQueue); - - bool EnqueueSubTasks( - std::string& subTaskQueueId, - std::list& subTasks) override; + // Encapsulates subtask queue construction algorithm + class SubTaskEnqueuerImpl : public SubTaskEnqueuer + { + public: + SubTaskEnqueuerImpl( std::shared_ptr taskQueue ); -private: - std::shared_ptr m_taskQueue; - sgns::base::Logger m_logger = sgns::base::createLogger("SubTaskEnqueuerImpl"); + bool EnqueueSubTasks( std::string &subTaskQueueId, std::list &subTasks ) override; -}; + private: + std::shared_ptr m_taskQueue; + sgns::base::Logger m_logger = sgns::base::createLogger( "SubTaskEnqueuerImpl" ); + }; } #endif // SUPERGENIUS_PROCESSING_SUBTASK_ENQUEUER_IMPL_HPP diff --git a/src/processing/processing_subtask_queue_accessor_impl.cpp b/src/processing/processing_subtask_queue_accessor_impl.cpp index eed0175e..d2c6050b 100644 --- a/src/processing/processing_subtask_queue_accessor_impl.cpp +++ b/src/processing/processing_subtask_queue_accessor_impl.cpp @@ -52,8 +52,7 @@ void SubTaskQueueAccessorImpl::AssignSubTasks(std::list& void SubTaskQueueAccessorImpl::UpdateResultsFromStorage(const std::set& subTaskIds) { - std::vector results; - m_subTaskResultStorage->GetSubTaskResults(subTaskIds, results); + auto results = m_subTaskResultStorage->GetSubTaskResults(subTaskIds); m_logger->debug("[RESULTS_LOADED] {} results loaded from results storage", results.size()); diff --git a/src/processing/processing_subtask_result_storage.hpp b/src/processing/processing_subtask_result_storage.hpp index 41a6006c..39d8ffc7 100644 --- a/src/processing/processing_subtask_result_storage.hpp +++ b/src/processing/processing_subtask_result_storage.hpp @@ -11,33 +11,30 @@ namespace sgns::processing { -/** Handles subtask results storage + /** Handles subtask results storage */ -class SubTaskResultStorage -{ -public: - - virtual ~SubTaskResultStorage() = default; - - /** Adds a result to the storage - * @param subTaskResult - processing result - */ - virtual void AddSubTaskResult(const SGProcessing::SubTaskResult& subTaskResult) = 0; - - /** Removes result from the storage - * @param subTaskId subtask id that the result was generated for - */ - virtual void RemoveSubTaskResult(const std::string& subTaskId) = 0; - - /** Returns results for specified subtask ids - * @param subTaskIds - list of subtask ids - * @return results - */ - virtual void GetSubTaskResults( - const std::set& subTaskIds, - std::vector& results) = 0; - -}; + class SubTaskResultStorage + { + public: + virtual ~SubTaskResultStorage() = default; + + /** Adds a result to the storage + * @param subTaskResult - processing result + */ + virtual void AddSubTaskResult( const SGProcessing::SubTaskResult &subTaskResult ) = 0; + + /** Removes result from the storage + * @param subTaskId subtask id that the result was generated for + */ + virtual void RemoveSubTaskResult( const std::string &subTaskId ) = 0; + + /** Returns results for specified subtask ids + * @param subTaskIds - list of subtask ids + * @return results + */ + virtual std::vector GetSubTaskResults( + const std::set &subTaskIds ) = 0; + }; } #endif // GRPC_FOR_SUPERGENIUS_PROCESSING_SUTASK_RESULT_STORAGE_HPP diff --git a/src/processing/processing_task_queue.hpp b/src/processing/processing_task_queue.hpp index 3a6613c5..d20c055e 100644 --- a/src/processing/processing_task_queue.hpp +++ b/src/processing/processing_task_queue.hpp @@ -7,9 +7,12 @@ #define GRPC_FOR_SUPERGENIUS_PROCESSING_TASK_QUEUE_HPP #include - +#include "outcome/outcome.hpp" #include #include + + + class ProcessingTaskQueue { /** Distributed task queue interface @@ -38,7 +41,7 @@ class ProcessingTaskQueue * @return taskId - task id * @return task */ - virtual bool GrabTask(std::string& taskId, SGProcessing::Task& task) = 0; + virtual outcome::result> GrabTask() = 0; /** Handles task completion * @param taskId - task id