Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix db init in tests and header tests #44

Merged
merged 8 commits into from
May 10, 2024
16 changes: 7 additions & 9 deletions example/processing_dapp/processing_dapp_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <boost/program_options.hpp>

#include <iostream>
#include <thread>
#include <utility>

using namespace sgns::processing;

Expand All @@ -32,14 +34,10 @@ namespace
class ProcessingCoreImpl : public ProcessingCore
{
public:
ProcessingCoreImpl(
std::shared_ptr<sgns::crdt::GlobalDB> db,
size_t subTaskProcessingTime,
size_t maximalProcessingSubTaskCount)
: m_db(db)
, m_subTaskProcessingTime(subTaskProcessingTime)
, m_maximalProcessingSubTaskCount(maximalProcessingSubTaskCount)
, m_processingSubTaskCount(0)
ProcessingCoreImpl( std::shared_ptr<sgns::crdt::GlobalDB> db, size_t subTaskProcessingTime,
size_t maximalProcessingSubTaskCount ) :
m_db( std::move( db ) ), m_subTaskProcessingTime( subTaskProcessingTime ),
m_maximalProcessingSubTaskCount( maximalProcessingSubTaskCount ), m_processingSubTaskCount( 0 )
{
}
bool SetProcessingTypeFromJson(std::string jsondata) override
Expand Down Expand Up @@ -276,7 +274,7 @@ int main(int argc, char* argv[])
std::vector<std::string> pubsubBootstrapPeers;
if (options->remote)
{
pubsubBootstrapPeers = std::move(std::vector({ *options->remote }));
pubsubBootstrapPeers = std::vector( { *options->remote } );
}
pubs->Start(40001, pubsubBootstrapPeers);

Expand Down
18 changes: 8 additions & 10 deletions src/blockchain/impl/key_value_block_header_repository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include "blockchain/impl/common.hpp"
#include "blockchain/impl/storage_util.hpp"
#include "scale/scale.hpp"
#include "blockchain/impl/proto/SGBlocks.pb.h"

using sgns::base::Hash256;
Expand Down Expand Up @@ -34,9 +33,9 @@ namespace sgns::blockchain
const primitives::BlockNumber &number ) const
{
OUTCOME_TRY( ( auto &&, header ), getBlockHeader( number ) );
OUTCOME_TRY( ( auto &&, enc_header ), scale::encode( header ) );
auto serializedHeader = GetHeaderSerializedData( header );

return hasher_->blake2b_256( enc_header );
return hasher_->blake2b_256( serializedHeader );
}

outcome::result<primitives::BlockHeader> KeyValueBlockHeaderRepository::getBlockHeader( const BlockId &id ) const
Expand All @@ -49,14 +48,12 @@ namespace sgns::blockchain
return ( isNotFoundError( header_res.error() ) ) ? Error::BLOCK_NOT_FOUND : header_res.error();
}

//return scale::decode<primitives::BlockHeader>(header_res.value());
return GetBlockHeaderFromSerialized( header_res.value().toVector() );
}

outcome::result<primitives::BlockHash> KeyValueBlockHeaderRepository::putBlockHeader(
const primitives::BlockHeader &header )
{
//OUTCOME_TRY((auto &&, encoded_header), scale::encode(header));
auto encoded_header = GetHeaderSerializedData( header );
auto header_hash = hasher_->blake2b_256( encoded_header );

Expand Down Expand Up @@ -92,25 +89,26 @@ namespace sgns::blockchain
header_proto.set_block_number( header.number );
header_proto.set_state_root( header.state_root.toReadableString() );
header_proto.set_extrinsics_root( header.extrinsics_root.toReadableString() );
//header_proto.set_digest(header.digest.toReadableString());

size_t size = header_proto.ByteSizeLong();
std::vector<uint8_t> serialized_proto( size );

header_proto.SerializeToArray( serialized_proto.data(), serialized_proto.size() );
header_proto.SerializeToArray( serialized_proto.data(), static_cast<int>( serialized_proto.size() ) );

return serialized_proto;
}

primitives::BlockHeader KeyValueBlockHeaderRepository::GetBlockHeaderFromSerialized(
const std::vector<uint8_t> &serialized_data ) const
const std::vector<uint8_t> &serialized_data )
{
primitives::BlockHeader block_header;
SGBlocks::BlockHeaderData header_proto;
if ( !header_proto.ParseFromArray( serialized_data.data(), serialized_data.size() ) )

if ( !header_proto.ParseFromArray( serialized_data.data(), static_cast<int>( serialized_data.size() ) ) )
{
std::cerr << "Failed to parse BlockHeaderData from array." << std::endl;
}

primitives::BlockHeader block_header;
block_header.parent_hash = ( Hash256::fromReadableString( header_proto.parent_hash() ) ).value();
block_header.number = header_proto.block_number();
block_header.state_root = ( Hash256::fromReadableString( header_proto.state_root() ) ).value();
Expand Down
5 changes: 3 additions & 2 deletions src/blockchain/impl/key_value_block_header_repository.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ namespace sgns::blockchain
}

std::string GetHeaderPath() const;
std::vector<uint8_t> GetHeaderSerializedData(const primitives::BlockHeader &header);
primitives::BlockHeader GetBlockHeaderFromSerialized(const std::vector<uint8_t> &serialized_data) const ;

static std::vector<uint8_t> GetHeaderSerializedData( const primitives::BlockHeader &header );

static primitives::BlockHeader GetBlockHeaderFromSerialized( const std::vector<uint8_t> &serialized_data );

private:
static constexpr std::string_view BLOCKCHAIN_PATH = "blockchain/";
Expand Down
9 changes: 5 additions & 4 deletions src/blockchain/impl/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace sgns::blockchain {
}
return key;
}

outcome::result<std::string> idToStringKey(crdt::GlobalDB &db,
const primitives::BlockId &id) {
auto key = visit_in_place(
Expand All @@ -41,14 +42,14 @@ namespace sgns::blockchain {
},
[&db](const base::Hash256 &hash) {
auto key = db.Get({hash.toReadableString()});

if (key)
{
return std::to_string(BufferToNumber(key.value()).value());
}
else
{
return std::string{};
}

return std::string{};

});
if (key.empty())
{
Expand Down
7 changes: 5 additions & 2 deletions src/crdt/crdt_datastore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@
#include <crdt/dagsyncer.hpp>
#include <crdt/crdt_options.hpp>
#include <storage/rocksdb/rocksdb.hpp>
#include <primitives/cid/cid.hpp>
#include <ipfs_lite/ipld/ipld_node.hpp>
#include <shared_mutex>
#include <future>
#include <chrono>
#include <queue>
#include <thread>

namespace sgns::crdt
{
Expand Down Expand Up @@ -209,6 +207,11 @@ namespace sgns::crdt
*/
outcome::result<std::shared_ptr<Delta>> CreateDeltaToRemove(const std::string& key);

auto GetDB()
{
return dataStore_->getDB();
}

protected:

/** DAG jobs structure used by DAG worker threads to send new jobs
Expand Down
5 changes: 5 additions & 0 deletions src/crdt/globaldb/globaldb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ class GlobalDB
*/
std::shared_ptr<CrdtDataStoreTransaction> BeginTransaction();

auto GetDB()
{
return m_crdtDatastore->GetDB();
}

private:
std::shared_ptr<boost::asio::io_context> m_context;
std::string m_databasePath;
Expand Down
1 change: 1 addition & 0 deletions src/crdt/impl/crdt_datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <crdt/proto/bcast.pb.h>
#include <google/protobuf/unknown_field_set.h>
#include <ipfs_lite/ipld/impl/ipld_node_impl.hpp>
#include <thread>

namespace sgns::crdt
{
Expand Down
16 changes: 7 additions & 9 deletions src/processing/impl/processing_subtask_result_storage_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#include "processing_subtask_result_storage_impl.hpp"
#include <boost/format.hpp>
#include <utility>

namespace sgns::processing
{
SubTaskResultStorageImpl::SubTaskResultStorageImpl(std::shared_ptr<sgns::crdt::GlobalDB> db)
: m_db(db)
SubTaskResultStorageImpl::SubTaskResultStorageImpl( std::shared_ptr<sgns::crdt::GlobalDB> db ) :
m_db( std::move( db ) )
{
}

Expand All @@ -13,16 +14,13 @@ namespace sgns::processing
sgns::crdt::GlobalDB::Buffer data;
data.put(result.SerializeAsString());

auto taskId =
m_db->Put(
sgns::crdt::HierarchicalKey((boost::format("results/%s") % result.subtaskid()).str().c_str()),
data);
auto taskId = m_db->Put(
sgns::crdt::HierarchicalKey( ( boost::format( "results/%s" ) % result.subtaskid() ).str() ), data );
}

void SubTaskResultStorageImpl::RemoveSubTaskResult(const std::string& subTaskId)
{
m_db->Remove(
sgns::crdt::HierarchicalKey((boost::format("results/%s") % subTaskId).str().c_str()));
m_db->Remove( sgns::crdt::HierarchicalKey( ( boost::format( "results/%s" ) % subTaskId ).str() ) );
}

void SubTaskResultStorageImpl::GetSubTaskResults(
Expand All @@ -31,7 +29,7 @@ namespace sgns::processing
{
for (const auto& subTaskId : subTaskIds)
{
auto data = m_db->Get(sgns::crdt::HierarchicalKey((boost::format("results/%s") % subTaskId).str().c_str()));
auto data = m_db->Get( sgns::crdt::HierarchicalKey( ( boost::format( "results/%s" ) % subTaskId ).str() ) );
if (data)
{
SGProcessing::SubTaskResult result;
Expand Down
5 changes: 5 additions & 0 deletions src/storage/rocksdb/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ namespace sgns::storage
{
using BlockBasedTableOptions = ::ROCKSDB_NAMESPACE::BlockBasedTableOptions;

rocksdb::~rocksdb()
{
db_->Close();
}

outcome::result<std::shared_ptr<rocksdb>> rocksdb::create(
std::string_view path, Options options)
{
Expand Down
7 changes: 5 additions & 2 deletions src/storage/rocksdb/rocksdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace sgns::storage
using Slice = ::ROCKSDB_NAMESPACE::Slice;
using QueryResult = std::map<Buffer, Buffer>;

~rocksdb() override = default;
~rocksdb() override;

/**
* @brief Factory method to create an instance of rocksdb class.
Expand Down Expand Up @@ -86,7 +86,10 @@ namespace sgns::storage
return "rocksdb";
}

inline std::shared_ptr<DB> getDB() const { return db_; }
[[nodiscard]] std::shared_ptr<DB> getDB() const
{
return db_;
}

private:
std::shared_ptr<DB> db_;
Expand Down
Loading
Loading