Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P message test #2196

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ tests/chain_bench
tests/chain_test
tests/intense_test
tests/performance_test
tests/cli_test
tests/es_test

doxygen

Expand Down
98 changes: 7 additions & 91 deletions libraries/net/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
#include <fc/io/json.hpp>
#include <fc/io/enum_type.hpp>
#include <fc/crypto/rand.hpp>
#include <fc/network/rate_limiting.hpp>
#include <fc/network/ip.hpp>
#include <fc/smart_ref_impl.hpp>

Expand Down Expand Up @@ -119,66 +118,14 @@
#define testnetlog(...) do {} while (0)
#endif

namespace graphene { namespace net {
#include "node_impl.hxx"

namespace detail
{
namespace bmi = boost::multi_index;
class blockchain_tied_message_cache
{
private:
static const uint32_t cache_duration_in_blocks = GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS;

struct message_hash_index{};
struct message_contents_hash_index{};
struct block_clock_index{};
struct message_info
{
message_hash_type message_hash;
message message_body;
uint32_t block_clock_when_received;

// for network performance stats
message_propagation_data propagation_data;
fc::uint160_t message_contents_hash; // hash of whatever the message contains (if it's a transaction, this is the transaction id, if it's a block, it's the block_id)

message_info( const message_hash_type& message_hash,
const message& message_body,
uint32_t block_clock_when_received,
const message_propagation_data& propagation_data,
fc::uint160_t message_contents_hash ) :
message_hash( message_hash ),
message_body( message_body ),
block_clock_when_received( block_clock_when_received ),
propagation_data( propagation_data ),
message_contents_hash( message_contents_hash )
{}
};
typedef boost::multi_index_container
< message_info,
bmi::indexed_by< bmi::ordered_unique< bmi::tag<message_hash_index>,
bmi::member<message_info, message_hash_type, &message_info::message_hash> >,
bmi::ordered_non_unique< bmi::tag<message_contents_hash_index>,
bmi::member<message_info, fc::uint160_t, &message_info::message_contents_hash> >,
bmi::ordered_non_unique< bmi::tag<block_clock_index>,
bmi::member<message_info, uint32_t, &message_info::block_clock_when_received> > >
> message_cache_container;

message_cache_container _message_cache;

uint32_t block_clock;

public:
blockchain_tied_message_cache() :
block_clock( 0 )
{}
void block_accepted();
void cache_message( const message& message_to_cache, const message_hash_type& hash_of_message_to_cache,
const message_propagation_data& propagation_data, const fc::uint160_t& message_content_hash );
message get_message( const message_hash_type& hash_of_message_to_lookup );
message_propagation_data get_message_propagation_data( const fc::uint160_t& hash_of_message_contents_to_lookup ) const;
size_t size() const { return _message_cache.size(); }
};
FC_REFLECT(graphene::net::detail::node_configuration, (listen_endpoint)
(accept_incoming_connections)
(wait_if_endpoint_is_busy)
(private_key));

namespace graphene { namespace net { namespace detail {

void blockchain_tied_message_cache::block_accepted()
{
Expand Down Expand Up @@ -221,37 +168,6 @@ namespace graphene { namespace net {
FC_THROW_EXCEPTION( fc::key_not_found_exception, "Requested message not in cache" );
}

/////////////////////////////////////////////////////////////////////////////////////////////////////////

// This specifies configuration info for the local node. It's stored as JSON
// in the configuration directory (application data directory)
struct node_configuration
{
node_configuration() : accept_incoming_connections(true), wait_if_endpoint_is_busy(true) {}

fc::ip::endpoint listen_endpoint;
bool accept_incoming_connections;
bool wait_if_endpoint_is_busy;
/**
* Originally, our p2p code just had a 'node-id' that was a random number identifying this node
* on the network. This is now a private key/public key pair, where the public key is used
* in place of the old random node-id. The private part is unused, but might be used in
* the future to support some notion of trusted peers.
*/
fc::ecc::private_key private_key;
};


} } } // end namespace graphene::net::detail
FC_REFLECT(graphene::net::detail::node_configuration, (listen_endpoint)
(accept_incoming_connections)
(wait_if_endpoint_is_busy)
(private_key));

#include "node_impl.hxx"

namespace graphene { namespace net { namespace detail {

void node_impl_deleter::operator()(node_impl* impl_to_delete)
{
#ifdef P2P_IN_DEDICATED_THREAD
Expand Down
88 changes: 88 additions & 0 deletions libraries/net/node_impl.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,22 @@
#include <fc/thread/thread.hpp>
#include <fc/log/logger.hpp>
#include <fc/network/tcp_socket.hpp>
#include <fc/network/rate_limiting.hpp>

#include <graphene/chain/config.hpp>
#include <graphene/chain/protocol/types.hpp>
#include <graphene/net/node.hpp>
#include <graphene/net/core_messages.hpp>
#include <graphene/net/peer_connection.hpp>

#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics/stats.hpp>
#include <boost/accumulators/statistics/rolling_mean.hpp>
#include <boost/accumulators/statistics/min.hpp>
#include <boost/accumulators/statistics/max.hpp>
#include <boost/accumulators/statistics/sum.hpp>
#include <boost/accumulators/statistics/count.hpp>

namespace graphene { namespace net { namespace detail {

// when requesting items from peers, we want to prioritize any blocks before
Expand Down Expand Up @@ -163,6 +173,84 @@ private:
uint8_t get_current_block_interval_in_seconds() const override;
};

namespace bmi = boost::multi_index;
class blockchain_tied_message_cache
{
private:
static const uint32_t cache_duration_in_blocks = GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS;

struct message_hash_index{};
struct message_contents_hash_index{};
struct block_clock_index{};
struct message_info
{
message_hash_type message_hash;
message message_body;
uint32_t block_clock_when_received;

// for network performance stats
message_propagation_data propagation_data;
fc::uint160_t message_contents_hash; // hash of whatever the message contains (if it's a transaction, this is the transaction id, if it's a block, it's the block_id)

message_info( const message_hash_type& message_hash,
const message& message_body,
uint32_t block_clock_when_received,
const message_propagation_data& propagation_data,
fc::uint160_t message_contents_hash ) :
message_hash( message_hash ),
message_body( message_body ),
block_clock_when_received( block_clock_when_received ),
propagation_data( propagation_data ),
message_contents_hash( message_contents_hash )
{}
};
typedef boost::multi_index_container
< message_info,
bmi::indexed_by< bmi::ordered_unique< bmi::tag<message_hash_index>,
bmi::member<message_info, message_hash_type, &message_info::message_hash> >,
bmi::ordered_non_unique< bmi::tag<message_contents_hash_index>,
bmi::member<message_info, fc::uint160_t, &message_info::message_contents_hash> >,
bmi::ordered_non_unique< bmi::tag<block_clock_index>,
bmi::member<message_info, uint32_t, &message_info::block_clock_when_received> > >
> message_cache_container;

message_cache_container _message_cache;

uint32_t block_clock;

public:
blockchain_tied_message_cache() :
block_clock( 0 )
{}
void block_accepted();
void cache_message( const message& message_to_cache, const message_hash_type& hash_of_message_to_cache,
const message_propagation_data& propagation_data, const fc::uint160_t& message_content_hash );
message get_message( const message_hash_type& hash_of_message_to_lookup );
message_propagation_data get_message_propagation_data( const fc::uint160_t& hash_of_message_contents_to_lookup ) const;
size_t size() const { return _message_cache.size(); }
};

/////////////////////////////////////////////////////////////////////////////////////////////////////////

// This specifies configuration info for the local node. It's stored as JSON
// in the configuration directory (application data directory)
struct node_configuration
{
node_configuration() : accept_incoming_connections(true), wait_if_endpoint_is_busy(true) {}

fc::ip::endpoint listen_endpoint;
bool accept_incoming_connections;
bool wait_if_endpoint_is_busy;
/**
* Originally, our p2p code just had a 'node-id' that was a random number identifying this node
* on the network. This is now a private key/public key pair, where the public key is used
* in place of the old random node-id. The private part is unused, but might be used in
* the future to support some notion of trusted peers.
*/
fc::ecc::private_key private_key;
};


class node_impl : public peer_connection_delegate
{
public:
Expand Down
6 changes: 5 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ file(GLOB BENCH_MARKS "benchmarks/*.cpp")
add_executable( chain_bench ${BENCH_MARKS} ${COMMON_SOURCES} )
target_link_libraries( chain_bench graphene_chain graphene_app graphene_account_history graphene_elasticsearch graphene_es_objects graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} )

file(GLOB APP_SOURCES "app/*.cpp")
file(GLOB APP_SOURCES "app/main.cpp")
add_executable( app_test ${APP_SOURCES} )
target_link_libraries( app_test graphene_app graphene_account_history graphene_net graphene_witness graphene_chain graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} )

file(GLOB SYS_SOURCES "app/system_test.cpp")
add_executable( system_test ${SYS_SOURCES} )
target_link_libraries( system_test graphene_app graphene_wallet graphene_account_history graphene_net graphene_witness graphene_chain graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} )

file(GLOB CLI_SOURCES "cli/*.cpp")
add_executable( cli_test ${CLI_SOURCES} )
target_link_libraries( cli_test graphene_app graphene_wallet graphene_witness graphene_account_history graphene_net graphene_chain graphene_egenesis_none fc ${PLATFORM_SPECIFIC_LIBS} )
Expand Down
124 changes: 124 additions & 0 deletions tests/app/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <graphene/app/plugin.hpp>
#include <graphene/app/config_util.hpp>

#include <graphene/net/peer_connection.hpp>

#include <graphene/chain/balance_object.hpp>

#include <graphene/utilities/tempdir.hpp>
Expand Down Expand Up @@ -325,6 +327,8 @@ BOOST_AUTO_TEST_CASE( two_node_network )
BOOST_CHECK_EQUAL(app1.chain_database()->head_block_num(), 1);

BOOST_TEST_MESSAGE( "Checking GRAPHENE_NULL_ACCOUNT has balance" );
BOOST_CHECK_EQUAL( db1->get_balance( GRAPHENE_NULL_ACCOUNT, asset_id_type() ).amount.value, 1000000 );
BOOST_CHECK_EQUAL( db2->get_balance( GRAPHENE_NULL_ACCOUNT, asset_id_type() ).amount.value, 1000000 );
} catch( fc::exception& e ) {
edump((e.to_detail_string()));
throw;
Expand All @@ -349,3 +353,123 @@ BOOST_AUTO_TEST_CASE(application_impl_breakout) {
graphene::net::item_id id;
BOOST_CHECK(impl.has_item(id));
}

#include "../../libraries/net/node_impl.hxx"

/******
* @brief create a malformed message and make sure the application can handle it
*/
BOOST_AUTO_TEST_CASE( bad_message )
{
// TODO: Implement a message_oriented_connection to override read_loop()
/*
class my_peer_connection : public graphene::net::peer_connection
{
my_peer_connection(graphene::net::peer_connection_delegate* delegate) :
graphene::net::peer_connection(delegate)
{
}
};
*/

// override application_impl to override get_item
class my_app_impl : public graphene::app::detail::application_impl {
private:
graphene::chain::chain_id_type _chain_id;
public:
my_app_impl() : application_impl(nullptr) {}
bool has_item(const net::item_id& id) override {
return true;
}
graphene::chain::chain_id_type get_chain_id() const { return _chain_id; }
graphene::net::message get_item(const graphene::net::item_id& id) {
graphene::net::message result;
try {
FC_ASSERT( false );
} FC_CAPTURE_AND_RETHROW( (id))
return result;
}
};

// override node_impl (actually no longer necessary)
class my_node_impl : public graphene::net::detail::node_impl {
public:
my_node_impl() : node_impl("user agent") { }
};

my_node_impl _node_impl;
my_app_impl _app_impl;
_node_impl.set_node_delegate(&_app_impl, &fc::thread::current());
graphene::net::peer_connection* peer = nullptr;

graphene::net::fetch_items_message msg;
graphene::chain::block_id_type block_id;
msg.items_to_fetch.push_back(block_id);

try
{
_node_impl.on_message(peer, msg);
std::cout << "on_message was successful.\n";
}
catch( fc::out_of_range_exception& e )
{
std::cerr << "Out of range exception thrown.\n";
}
catch( std::exception& e)
{
std::cerr << "Uh oh... Exception thrown. " << e.what() << '\n';
}
catch (...)
{
std::cerr << "Unknown exception thrown.\n";
}
}

bool testit(size_t size)
{
const int BUFFER_SIZE = 16;
const int LEFTOVER = BUFFER_SIZE - sizeof(graphene::net::message_header);

char buffer[BUFFER_SIZE];
{
// a cheat to get bytes into buffer
graphene::net::message_header h;
h.size = size;
h.msg_type = graphene::net::block_message_type;
memcpy((char*)&buffer[0], (char*)&h, sizeof(graphene::net::message_header));
}
graphene::net::message m;
memcpy((char*)&m, buffer, sizeof(graphene::net::message_header));
size_t _bytes_received = BUFFER_SIZE;

// max size is 2,097,152
FC_ASSERT( m.size <= MAX_MESSAGE_SIZE, "", ("m.size",m.size)("MAX_MESSAGE_SIZE",MAX_MESSAGE_SIZE) );

size_t remaining_bytes_with_padding = 16 * ((m.size - LEFTOVER + 15) / 16);
m.data.resize(LEFTOVER + remaining_bytes_with_padding);
std::copy(buffer + sizeof(graphene::net::message_header), buffer + sizeof(buffer), m.data.begin());
if (remaining_bytes_with_padding)
{
char bytes[remaining_bytes_with_padding];
memset(&bytes[0], 0, remaining_bytes_with_padding);
// make some bytes
for(size_t i = 0; i < remaining_bytes_with_padding; ++i)
{
bytes[i] = (char)(i % 10);
}
memcpy(&m.data[LEFTOVER], &bytes[0], remaining_bytes_with_padding);
_bytes_received += remaining_bytes_with_padding;
}
m.data.resize(m.size); // truncate off the padding bytes
// now see what we've got
return true;
}

BOOST_AUTO_TEST_CASE( malformed_message )
{
for(size_t i = MAX_MESSAGE_SIZE - 10; i <= MAX_MESSAGE_SIZE; ++i)
{
std::cout << "Tesing " << std::to_string(i) << '\n';
testit(i);
}
}
Loading