Skip to content

Commit

Permalink
Merge pull request #592 from rsksmart/update_explorer
Browse files Browse the repository at this point in the history
Update explorer clean up period
  • Loading branch information
aeidelman authored Jun 13, 2018
2 parents e3e6676 + d3261cf commit 0be5275
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 38 deletions.
14 changes: 12 additions & 2 deletions rskj-core/src/main/java/co/rsk/config/RskSystemProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@
public class RskSystemProperties extends SystemProperties {
private static final Logger logger = LoggerFactory.getLogger("config");

public static final int PD_DEFAULT_REFRESH_PERIOD = 60000;
/** while timeout period is lower than clean period it doesn't affect much since
requests will be checked after a clean period.
**/
private static final int PD_DEFAULT_CLEAN_PERIOD = 15000; //miliseconds
private static final int PD_DEFAULT_TIMEOUT_MESSAGE = PD_DEFAULT_CLEAN_PERIOD - 1; //miliseconds
private static final int PD_DEFAULT_REFRESH_PERIOD = 60000; //miliseconds

public static final int BLOCKS_FOR_PEERS_DEFAULT = 100;
private static final String MINER_REWARD_ADDRESS_CONFIG = "miner.reward.address";
private static final String MINER_COINBASE_SECRET_CONFIG = "miner.coinbase.secret";
Expand Down Expand Up @@ -261,7 +267,7 @@ public void enableRemasc() {

public long peerDiscoveryMessageTimeOut() {
return configFromFiles.hasPath("peer.discovery.msg.timeout") ?
configFromFiles.getLong("peer.discovery.msg.timeout") : 30000;
configFromFiles.getLong("peer.discovery.msg.timeout") : PD_DEFAULT_TIMEOUT_MESSAGE;
}

public long peerDiscoveryRefreshPeriod() {
Expand Down Expand Up @@ -429,4 +435,8 @@ public int getPruneNoBlocksToAvoidForks() {
public PruneConfiguration getPruneConfiguration() {
return new PruneConfiguration(this.getPruneNoBlocksToCopy(), this.getPruneNoBlocksToAvoidForks(), this.getPruneNoBlocksToWait());
}

public long peerDiscoveryCleanPeriod() {
return PD_DEFAULT_CLEAN_PERIOD;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package co.rsk.net.discovery;

import co.rsk.net.discovery.message.PingPeerMessage;
import com.google.common.annotations.VisibleForTesting;
import org.ethereum.net.rlpx.Node;

import java.util.Map;
Expand All @@ -41,4 +42,9 @@ public NodeChallenge startChallenge(Node challengedNode, Node challenger, PeerEx
public NodeChallenge removeChallenge(String challengeId) {
return activeChallenges.remove(challengeId);
}

@VisibleForTesting
public int activeChallengesCount() {
return activeChallenges.size();
}
}
34 changes: 24 additions & 10 deletions rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import co.rsk.net.discovery.table.OperationResult;
import co.rsk.net.discovery.table.PeerDiscoveryRequestBuilder;
import co.rsk.util.IpUtils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.ethereum.crypto.ECKey;
Expand All @@ -47,6 +48,7 @@ public class PeerExplorer {
private static final int MAX_NODES_PER_MSG = 20;
private static final int MAX_NODES_TO_ASK = 24;
private static final int MAX_NODES_TO_CHECK = 16;
private static final int RETRIES_COUNT = 3;

private final Set<InetSocketAddress> bootNodes = ConcurrentHashMap.newKeySet();
private final Map<String, PeerDiscoveryRequest> pendingPingRequests = new ConcurrentHashMap<>();
Expand All @@ -70,15 +72,15 @@ public class PeerExplorer {

private long requestTimeout;

public PeerExplorer(List<String> initialBootNodes, Node localNode, NodeDistanceTable distanceTable, ECKey key, long reqTimeOut, long refreshPeriod) {
public PeerExplorer(List<String> initialBootNodes, Node localNode, NodeDistanceTable distanceTable, ECKey key, long reqTimeOut, long updatePeriod, long cleanPeriod) {
this.localNode = localNode;
this.key = key;
this.distanceTable = distanceTable;
this.updateEntryLock = new ReentrantLock();

loadInitialBootNodes(initialBootNodes);

this.cleaner = new PeerExplorerCleaner(this, refreshPeriod);
this.cleaner = new PeerExplorerCleaner(this, updatePeriod, cleanPeriod);
this.challengeManager = new NodeChallengeManager();
this.requestTimeout = reqTimeOut;
}
Expand Down Expand Up @@ -259,20 +261,23 @@ public NeighborsPeerMessage sendNeighbors(InetSocketAddress nodeAddress, List<No
}

public void purgeRequests() {
List<PeerDiscoveryRequest> oldPingRequests = this.removeExpiredRequests(this.pendingPingRequests);
this.resendExpiredPing(oldPingRequests);
this.removeConnections(oldPingRequests.stream().
filter(r -> r.getAttemptNumber() >= 3).collect(Collectors.toList()));
List<PeerDiscoveryRequest> oldPingRequests = removeExpiredRequests(this.pendingPingRequests);
removeExpiredChallenges(oldPingRequests);
resendExpiredPing(oldPingRequests);
removeConnections(oldPingRequests.stream().
filter(r -> r.getAttemptNumber() >= RETRIES_COUNT).collect(Collectors.toList()));

removeExpiredRequests(this.pendingFindNodeRequests);
}

public void cleanAndUpdate() {
public void clean() {
this.purgeRequests();
}

public void update() {
List<Node> closestNodes = this.distanceTable.getClosestNodes(this.localNode.getId());
this.askForMoreNodes(closestNodes);
this.checkPeersPulse(closestNodes);

this.purgeRequests();
}

private void checkPeersPulse(List<Node> closestNodes) {
Expand All @@ -293,8 +298,12 @@ private List<PeerDiscoveryRequest> removeExpiredRequests(Map<String, PeerDiscove
return requests;
}

private void removeExpiredChallenges(List<PeerDiscoveryRequest> peerDiscoveryRequests) {
peerDiscoveryRequests.stream().forEach(r -> challengeManager.removeChallenge(r.getMessageId()));
}

private void resendExpiredPing(List<PeerDiscoveryRequest> peerDiscoveryRequests) {
peerDiscoveryRequests.stream().filter(r -> r.getAttemptNumber() < 3)
peerDiscoveryRequests.stream().filter(r -> r.getAttemptNumber() < RETRIES_COUNT)
.forEach(r -> sendPing(r.getAddress(), r.getAttemptNumber() + 1));
}

Expand Down Expand Up @@ -356,4 +365,9 @@ private Set<Node> collectRandomNodes(List<Node> originalList, int elementsNbr) {

return ret;
}

@VisibleForTesting
public NodeChallengeManager getChallengeManager() {
return challengeManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ public class PeerExplorerCleaner {
private PeerExplorer peerExplorer;
private ScheduledExecutorService updateTask;
private long updatePeriod;
private long cleanPeriod;
private boolean running = false;

public PeerExplorerCleaner(PeerExplorer peerExplorer, long updatePeriod) {
public PeerExplorerCleaner(PeerExplorer peerExplorer, long updatePeriod, long cleanPeriod) {
this.peerExplorer = peerExplorer;
this.updatePeriod = updatePeriod;

this.cleanPeriod = cleanPeriod;
// it should stay on a single thread since there are two tasks that could interfere with each other running here
this.updateTask = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "PeerExplorerCleaner"));
}

Expand All @@ -47,7 +49,8 @@ public void run() {
}

private void startUpdateTask() {
updateTask.scheduleAtFixedRate(() -> peerExplorer.cleanAndUpdate(), updatePeriod, updatePeriod, TimeUnit.MILLISECONDS);
updateTask.scheduleAtFixedRate(() -> peerExplorer.clean(), cleanPeriod, cleanPeriod, TimeUnit.MILLISECONDS);
updateTask.scheduleAtFixedRate(() -> peerExplorer.update(), updatePeriod, updatePeriod, TimeUnit.MILLISECONDS);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class NodeDistanceTable {

public NodeDistanceTable(int numberOfBuckets, int entriesPerBucket, Node localNode) {
this.localNode = localNode;
this.distanceCalculator = new DistanceCalculator(KademliaOptions.BINS);
this.distanceCalculator = new DistanceCalculator(numberOfBuckets);

for (int i = 0; i < numberOfBuckets; i++) {
buckets.put(i, new Bucket(entriesPerBucket, i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ public PeerExplorer peerExplorer(RskSystemProperties rskConfig) {
NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, localNode);
long msgTimeOut = rskConfig.peerDiscoveryMessageTimeOut();
long refreshPeriod = rskConfig.peerDiscoveryRefreshPeriod();
long cleanPeriod = rskConfig.peerDiscoveryCleanPeriod();
List<String> initialBootNodes = rskConfig.peerDiscoveryIPList();
List<Node> activePeers = rskConfig.peerActive();
if(CollectionUtils.isNotEmpty(activePeers)) {
Expand All @@ -218,7 +219,7 @@ public PeerExplorer peerExplorer(RskSystemProperties rskConfig) {
initialBootNodes.add(address.getHostName() + ":" + address.getPort());
}
}
return new PeerExplorer(initialBootNodes, localNode, distanceTable, key, msgTimeOut, refreshPeriod);
return new PeerExplorer(initialBootNodes, localNode, distanceTable, key, msgTimeOut, refreshPeriod, cleanPeriod);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class NodeChallengeManagerTest {
private static final int PORT_3 = 44037;

private static final long TIMEOUT = 30000;
private static final long REFRESH = 60000;
private static final long UPDATE = 60000;
private static final long CLEAN = 60000;


@Test
Expand All @@ -62,7 +63,7 @@ public void startChallenge() {
Node node3 = new Node(key3.getNodeId(), HOST_3, PORT_3);

NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node1);
PeerExplorer peerExplorer = new PeerExplorer(new ArrayList<>(), node1, distanceTable, new ECKey(), TIMEOUT, REFRESH);
PeerExplorer peerExplorer = new PeerExplorer(new ArrayList<>(), node1, distanceTable, new ECKey(), TIMEOUT, UPDATE, CLEAN);
peerExplorer.setUDPChannel(Mockito.mock(UDPChannel.class));

NodeChallengeManager manager = new NodeChallengeManager();
Expand Down
78 changes: 69 additions & 9 deletions rskj-core/src/test/java/co/rsk/net/discovery/PeerExplorerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,22 @@ public class PeerExplorerTest {
private static final int PORT_3 = 44037;

private static final long TIMEOUT = 30000;
private static final long REFRESH = 60000;
private static final long UPDATE = 60000;
private static final long CLEAN = 60000;

@Test
public void sendInitialMessageToNodesNoNodes() {
Node node = new Node(new ECKey().getNodeId(), HOST_2, PORT_2);
NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node);
PeerExplorer peerExplorer = new PeerExplorer(new ArrayList<>(), node, distanceTable, new ECKey(), TIMEOUT, REFRESH);
PeerExplorer peerExplorer = new PeerExplorer(new ArrayList<>(), node, distanceTable, new ECKey(), TIMEOUT, UPDATE, CLEAN);

peerExplorer.setUDPChannel(Mockito.mock(UDPChannel.class));

Set<String> nodesWithMessage = peerExplorer.startConversationWithNewNodes();

Assert.assertTrue(CollectionUtils.isEmpty(nodesWithMessage));

peerExplorer = new PeerExplorer(null, node, distanceTable, new ECKey(), TIMEOUT, REFRESH);
peerExplorer = new PeerExplorer(null, node, distanceTable, new ECKey(), TIMEOUT, UPDATE, CLEAN);
peerExplorer.setUDPChannel(Mockito.mock(UDPChannel.class));

nodesWithMessage = peerExplorer.startConversationWithNewNodes();
Expand All @@ -93,7 +94,7 @@ public void sendInitialMessageToNodes() {

Node node = new Node(new ECKey().getNodeId(), HOST_1, PORT_1);
NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, new ECKey(), TIMEOUT, REFRESH);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, new ECKey(), TIMEOUT, UPDATE, CLEAN);

UDPChannel channel = new UDPChannel(Mockito.mock(Channel.class), peerExplorer);
peerExplorer.setUDPChannel(channel);
Expand All @@ -113,7 +114,7 @@ public void handlePingMessage() throws Exception {

Node node = new Node(key2.getNodeId(), HOST_2, PORT_2);
NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, REFRESH);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, UPDATE, CLEAN);

Channel internalChannel = Mockito.mock(Channel.class);
UDPTestChannel channel = new UDPTestChannel(internalChannel, peerExplorer);
Expand Down Expand Up @@ -168,7 +169,7 @@ public void handlePongMessage() throws Exception {

Node node = new Node(key2.getNodeId(), HOST_2, PORT_2);
NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, REFRESH);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, UPDATE, CLEAN);

Channel internalChannel = Mockito.mock(Channel.class);
UDPTestChannel channel = new UDPTestChannel(internalChannel, peerExplorer);
Expand Down Expand Up @@ -212,7 +213,7 @@ public void handleFindNodeMessage() throws Exception {

Node node = new Node(key2.getNodeId(), HOST_2, PORT_2);
NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, REFRESH);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, UPDATE, CLEAN);

Channel internalChannel = Mockito.mock(Channel.class);
UDPTestChannel channel = new UDPTestChannel(internalChannel, peerExplorer);
Expand Down Expand Up @@ -259,7 +260,7 @@ public void handleFindNodeMessageWithExtraNodes() throws Exception {

Node node = new Node(key2.getNodeId(), HOST_2, PORT_2);
NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, REFRESH);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, UPDATE, CLEAN);

Channel internalChannel = Mockito.mock(Channel.class);
UDPTestChannel channel = new UDPTestChannel(internalChannel, peerExplorer);
Expand Down Expand Up @@ -315,7 +316,7 @@ public void handleNeighbors() throws Exception {
Node node1 = new Node(key1.getNodeId(), HOST_1, PORT_1);
Node node2 = new Node(key2.getNodeId(), HOST_2, PORT_2);
NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node2);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node2, distanceTable, key2, TIMEOUT, REFRESH);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node2, distanceTable, key2, TIMEOUT, UPDATE, CLEAN);

Channel internalChannel = Mockito.mock(Channel.class);
UDPTestChannel channel = new UDPTestChannel(internalChannel, peerExplorer);
Expand Down Expand Up @@ -360,4 +361,63 @@ public void handleNeighbors() throws Exception {
Assert.assertEquals(DiscoveryMessageType.PING, discoveryEvent.getMessage().getMessageType());
}

@Test
public void testCleanPeriod() throws InterruptedException, Exception{
List<String> nodes = new ArrayList<>();
nodes.add(HOST_1 + ":" + PORT_1);
nodes.add(HOST_3 + ":" + PORT_3);

ECKey key1 = ECKey.fromPrivate(Hex.decode(KEY_1)).decompress();
ECKey key2 = ECKey.fromPrivate(Hex.decode(KEY_2)).decompress();
ECKey key3 = ECKey.fromPrivate(Hex.decode(KEY_3)).decompress();

Node node = new Node(key2.getNodeId(), HOST_2, PORT_2);
NodeDistanceTable distanceTable = new NodeDistanceTable(1, 1, node);
PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, 199, UPDATE, 200);

Channel internalChannel = Mockito.mock(Channel.class);
UDPTestChannel channel = new UDPTestChannel(internalChannel, peerExplorer);
ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class);
peerExplorer.setUDPChannel(channel);
Assert.assertTrue(CollectionUtils.isEmpty(peerExplorer.getNodes()));

//A incoming pong for a Ping we did not sent.
String check = UUID.randomUUID().toString();
PongPeerMessage incomingPongMessage = PongPeerMessage.create(HOST_1, PORT_1, check, key1);
DiscoveryEvent incomingPongEvent = new DiscoveryEvent(incomingPongMessage, new InetSocketAddress(HOST_1, PORT_1));
channel.clearEvents();
channel.channelRead0(ctx, incomingPongEvent);
List<DiscoveryEvent> sentEvents = channel.getEventsWritten();
Assert.assertEquals(0, sentEvents.size());
Assert.assertEquals(0, peerExplorer.getNodes().size());

//Now we send the ping first
peerExplorer.startConversationWithNewNodes();
sentEvents = channel.getEventsWritten();
Assert.assertEquals(2, sentEvents.size());
incomingPongMessage = PongPeerMessage.create(HOST_1, PORT_1, ((PingPeerMessage) sentEvents.get(0).getMessage()).getMessageId(), key1);
incomingPongEvent = new DiscoveryEvent(incomingPongMessage, new InetSocketAddress(HOST_1, PORT_1));
PongPeerMessage incomingPongMessage3 = PongPeerMessage.create(HOST_3, PORT_3, ((PingPeerMessage) sentEvents.get(1).getMessage()).getMessageId(), key3);
DiscoveryEvent incomingPongEvent3 = new DiscoveryEvent(incomingPongMessage3, new InetSocketAddress(HOST_3, PORT_3));
channel.clearEvents();
List<Node> addedNodes = peerExplorer.getNodes();
Assert.assertEquals(0, addedNodes.size());
channel.channelRead0(ctx, incomingPongEvent);
Assert.assertEquals(1, peerExplorer.getNodes().size());
addedNodes = peerExplorer.getNodes();
Assert.assertEquals(1, addedNodes.size());

channel.channelRead0(ctx, incomingPongEvent3);
Assert.assertEquals(1, peerExplorer.getNodes().size());
addedNodes = peerExplorer.getNodes();
Assert.assertEquals(1, addedNodes.size());

Assert.assertEquals(1, peerExplorer.getChallengeManager().activeChallengesCount());
Thread.sleep(200L);
peerExplorer.clean();
peerExplorer.clean();
peerExplorer.clean();
Assert.assertEquals(0, peerExplorer.getChallengeManager().activeChallengesCount());
}

}
Loading

0 comments on commit 0be5275

Please sign in to comment.