From 4b29d90062c0e630574904a0dda756eac2f9aaae Mon Sep 17 00:00:00 2001 From: Lisandro Date: Mon, 11 Jun 2018 12:46:41 -0300 Subject: [PATCH 1/3] Remove expired challenges --- .../main/java/co/rsk/net/discovery/PeerExplorer.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java b/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java index a990d97ad3f..50b3b26897b 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java @@ -259,9 +259,10 @@ public NeighborsPeerMessage sendNeighbors(InetSocketAddress nodeAddress, List oldPingRequests = this.removeExpiredRequests(this.pendingPingRequests); - this.resendExpiredPing(oldPingRequests); - this.removeConnections(oldPingRequests.stream(). + List oldPingRequests = removeExpiredRequests(this.pendingPingRequests); + removeExpiredChallenges(oldPingRequests); + resendExpiredPing(oldPingRequests); + removeConnections(oldPingRequests.stream(). filter(r -> r.getAttemptNumber() >= 3).collect(Collectors.toList())); removeExpiredRequests(this.pendingFindNodeRequests); @@ -293,6 +294,10 @@ private List removeExpiredRequests(Map peerDiscoveryRequests) { + peerDiscoveryRequests.stream().forEach(r -> challengeManager.removeChallenge(r.getMessageId())); + } + private void resendExpiredPing(List peerDiscoveryRequests) { peerDiscoveryRequests.stream().filter(r -> r.getAttemptNumber() < 3) .forEach(r -> sendPing(r.getAddress(), r.getAttemptNumber() + 1)); From bb08c52b8c5216e24a505188671057fc910225b3 Mon Sep 17 00:00:00 2001 From: Lisandro Date: Mon, 11 Jun 2018 13:11:07 -0300 Subject: [PATCH 2/3] Reduced default refresh period --- .../co/rsk/config/RskSystemProperties.java | 14 ++++++++++-- .../co/rsk/net/discovery/PeerExplorer.java | 22 ++++++++++++++----- .../net/discovery/PeerExplorerCleaner.java | 7 ++++-- .../discovery/table/NodeDistanceTable.java | 2 +- .../org/ethereum/config/DefaultConfig.java | 3 ++- .../discovery/NodeChallengeManagerTest.java | 5 +++-- .../rsk/net/discovery/PeerExplorerTest.java | 19 ++++++++-------- .../co/rsk/net/discovery/UDPServerTest.java | 9 ++++---- 8 files changed, 54 insertions(+), 27 deletions(-) diff --git a/rskj-core/src/main/java/co/rsk/config/RskSystemProperties.java b/rskj-core/src/main/java/co/rsk/config/RskSystemProperties.java index b27067d68bb..08162bcdace 100644 --- a/rskj-core/src/main/java/co/rsk/config/RskSystemProperties.java +++ b/rskj-core/src/main/java/co/rsk/config/RskSystemProperties.java @@ -52,7 +52,13 @@ public class RskSystemProperties extends SystemProperties { private static final Logger logger = LoggerFactory.getLogger("config"); - public static final int PD_DEFAULT_REFRESH_PERIOD = 60000; + /** while timeout period is lower than clean period it doesn't affect much since + requests will be checked after a clean period. + **/ + private static final int PD_DEFAULT_CLEAN_PERIOD = 15000; //miliseconds + private static final int PD_DEFAULT_TIMEOUT_MESSAGE = PD_DEFAULT_CLEAN_PERIOD - 1; //miliseconds + private static final int PD_DEFAULT_REFRESH_PERIOD = 60000; //miliseconds + public static final int BLOCKS_FOR_PEERS_DEFAULT = 100; private static final String MINER_REWARD_ADDRESS_CONFIG = "miner.reward.address"; private static final String MINER_COINBASE_SECRET_CONFIG = "miner.coinbase.secret"; @@ -261,7 +267,7 @@ public void enableRemasc() { public long peerDiscoveryMessageTimeOut() { return configFromFiles.hasPath("peer.discovery.msg.timeout") ? - configFromFiles.getLong("peer.discovery.msg.timeout") : 30000; + configFromFiles.getLong("peer.discovery.msg.timeout") : PD_DEFAULT_TIMEOUT_MESSAGE; } public long peerDiscoveryRefreshPeriod() { @@ -429,4 +435,8 @@ public int getPruneNoBlocksToAvoidForks() { public PruneConfiguration getPruneConfiguration() { return new PruneConfiguration(this.getPruneNoBlocksToCopy(), this.getPruneNoBlocksToAvoidForks(), this.getPruneNoBlocksToWait()); } + + public long peerDiscoveryCleanPeriod() { + return PD_DEFAULT_CLEAN_PERIOD; + } } diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java b/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java index 50b3b26897b..42855b12ade 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java @@ -24,6 +24,7 @@ import co.rsk.net.discovery.table.OperationResult; import co.rsk.net.discovery.table.PeerDiscoveryRequestBuilder; import co.rsk.util.IpUtils; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.ethereum.crypto.ECKey; @@ -47,6 +48,7 @@ public class PeerExplorer { private static final int MAX_NODES_PER_MSG = 20; private static final int MAX_NODES_TO_ASK = 24; private static final int MAX_NODES_TO_CHECK = 16; + private static final int RETRIES_COUNT = 3; private final Set bootNodes = ConcurrentHashMap.newKeySet(); private final Map pendingPingRequests = new ConcurrentHashMap<>(); @@ -70,7 +72,7 @@ public class PeerExplorer { private long requestTimeout; - public PeerExplorer(List initialBootNodes, Node localNode, NodeDistanceTable distanceTable, ECKey key, long reqTimeOut, long refreshPeriod) { + public PeerExplorer(List initialBootNodes, Node localNode, NodeDistanceTable distanceTable, ECKey key, long reqTimeOut, long updatePeriod, long cleanPeriod) { this.localNode = localNode; this.key = key; this.distanceTable = distanceTable; @@ -78,7 +80,7 @@ public PeerExplorer(List initialBootNodes, Node localNode, NodeDistanceT loadInitialBootNodes(initialBootNodes); - this.cleaner = new PeerExplorerCleaner(this, refreshPeriod); + this.cleaner = new PeerExplorerCleaner(this, updatePeriod, cleanPeriod); this.challengeManager = new NodeChallengeManager(); this.requestTimeout = reqTimeOut; } @@ -263,17 +265,25 @@ public void purgeRequests() { removeExpiredChallenges(oldPingRequests); resendExpiredPing(oldPingRequests); removeConnections(oldPingRequests.stream(). - filter(r -> r.getAttemptNumber() >= 3).collect(Collectors.toList())); + filter(r -> r.getAttemptNumber() >= RETRIES_COUNT).collect(Collectors.toList())); removeExpiredRequests(this.pendingFindNodeRequests); } + @VisibleForTesting public void cleanAndUpdate() { + this.update(); + this.clean(); + } + + public void clean() { + this.purgeRequests(); + } + + public void update() { List closestNodes = this.distanceTable.getClosestNodes(this.localNode.getId()); this.askForMoreNodes(closestNodes); this.checkPeersPulse(closestNodes); - - this.purgeRequests(); } private void checkPeersPulse(List closestNodes) { @@ -299,7 +309,7 @@ private void removeExpiredChallenges(List peerDiscoveryReq } private void resendExpiredPing(List peerDiscoveryRequests) { - peerDiscoveryRequests.stream().filter(r -> r.getAttemptNumber() < 3) + peerDiscoveryRequests.stream().filter(r -> r.getAttemptNumber() < RETRIES_COUNT) .forEach(r -> sendPing(r.getAddress(), r.getAttemptNumber() + 1)); } diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorerCleaner.java b/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorerCleaner.java index 42e1376bd51..eedb0e47ef5 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorerCleaner.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorerCleaner.java @@ -30,11 +30,13 @@ public class PeerExplorerCleaner { private PeerExplorer peerExplorer; private ScheduledExecutorService updateTask; private long updatePeriod; + private long cleanPeriod; private boolean running = false; - public PeerExplorerCleaner(PeerExplorer peerExplorer, long updatePeriod) { + public PeerExplorerCleaner(PeerExplorer peerExplorer, long updatePeriod, long cleanPeriod) { this.peerExplorer = peerExplorer; this.updatePeriod = updatePeriod; + this.cleanPeriod = cleanPeriod; this.updateTask = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "PeerExplorerCleaner")); } @@ -47,7 +49,8 @@ public void run() { } private void startUpdateTask() { - updateTask.scheduleAtFixedRate(() -> peerExplorer.cleanAndUpdate(), updatePeriod, updatePeriod, TimeUnit.MILLISECONDS); + updateTask.scheduleAtFixedRate(() -> peerExplorer.clean(), cleanPeriod, cleanPeriod, TimeUnit.MILLISECONDS); + updateTask.scheduleAtFixedRate(() -> peerExplorer.update(), updatePeriod, updatePeriod, TimeUnit.MILLISECONDS); } diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/table/NodeDistanceTable.java b/rskj-core/src/main/java/co/rsk/net/discovery/table/NodeDistanceTable.java index 68dd038333e..2d717f1a3b4 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/table/NodeDistanceTable.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/table/NodeDistanceTable.java @@ -35,7 +35,7 @@ public class NodeDistanceTable { public NodeDistanceTable(int numberOfBuckets, int entriesPerBucket, Node localNode) { this.localNode = localNode; - this.distanceCalculator = new DistanceCalculator(KademliaOptions.BINS); + this.distanceCalculator = new DistanceCalculator(numberOfBuckets); for (int i = 0; i < numberOfBuckets; i++) { buckets.put(i, new Bucket(entriesPerBucket, i)); diff --git a/rskj-core/src/main/java/org/ethereum/config/DefaultConfig.java b/rskj-core/src/main/java/org/ethereum/config/DefaultConfig.java index 90ea51bd63d..e57eb01278e 100644 --- a/rskj-core/src/main/java/org/ethereum/config/DefaultConfig.java +++ b/rskj-core/src/main/java/org/ethereum/config/DefaultConfig.java @@ -210,6 +210,7 @@ public PeerExplorer peerExplorer(RskSystemProperties rskConfig) { NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, localNode); long msgTimeOut = rskConfig.peerDiscoveryMessageTimeOut(); long refreshPeriod = rskConfig.peerDiscoveryRefreshPeriod(); + long cleanPeriod = rskConfig.peerDiscoveryCleanPeriod(); List initialBootNodes = rskConfig.peerDiscoveryIPList(); List activePeers = rskConfig.peerActive(); if(CollectionUtils.isNotEmpty(activePeers)) { @@ -218,7 +219,7 @@ public PeerExplorer peerExplorer(RskSystemProperties rskConfig) { initialBootNodes.add(address.getHostName() + ":" + address.getPort()); } } - return new PeerExplorer(initialBootNodes, localNode, distanceTable, key, msgTimeOut, refreshPeriod); + return new PeerExplorer(initialBootNodes, localNode, distanceTable, key, msgTimeOut, refreshPeriod, cleanPeriod); } @Bean diff --git a/rskj-core/src/test/java/co/rsk/net/discovery/NodeChallengeManagerTest.java b/rskj-core/src/test/java/co/rsk/net/discovery/NodeChallengeManagerTest.java index 818a9b5d937..8a6588c118c 100644 --- a/rskj-core/src/test/java/co/rsk/net/discovery/NodeChallengeManagerTest.java +++ b/rskj-core/src/test/java/co/rsk/net/discovery/NodeChallengeManagerTest.java @@ -48,7 +48,8 @@ public class NodeChallengeManagerTest { private static final int PORT_3 = 44037; private static final long TIMEOUT = 30000; - private static final long REFRESH = 60000; + private static final long UPDATE = 60000; + private static final long CLEAN = 60000; @Test @@ -62,7 +63,7 @@ public void startChallenge() { Node node3 = new Node(key3.getNodeId(), HOST_3, PORT_3); NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node1); - PeerExplorer peerExplorer = new PeerExplorer(new ArrayList<>(), node1, distanceTable, new ECKey(), TIMEOUT, REFRESH); + PeerExplorer peerExplorer = new PeerExplorer(new ArrayList<>(), node1, distanceTable, new ECKey(), TIMEOUT, UPDATE, CLEAN); peerExplorer.setUDPChannel(Mockito.mock(UDPChannel.class)); NodeChallengeManager manager = new NodeChallengeManager(); diff --git a/rskj-core/src/test/java/co/rsk/net/discovery/PeerExplorerTest.java b/rskj-core/src/test/java/co/rsk/net/discovery/PeerExplorerTest.java index 89ab48fb6d5..73e7b16a3cf 100644 --- a/rskj-core/src/test/java/co/rsk/net/discovery/PeerExplorerTest.java +++ b/rskj-core/src/test/java/co/rsk/net/discovery/PeerExplorerTest.java @@ -59,13 +59,14 @@ public class PeerExplorerTest { private static final int PORT_3 = 44037; private static final long TIMEOUT = 30000; - private static final long REFRESH = 60000; + private static final long UPDATE = 60000; + private static final long CLEAN = 60000; @Test public void sendInitialMessageToNodesNoNodes() { Node node = new Node(new ECKey().getNodeId(), HOST_2, PORT_2); NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node); - PeerExplorer peerExplorer = new PeerExplorer(new ArrayList<>(), node, distanceTable, new ECKey(), TIMEOUT, REFRESH); + PeerExplorer peerExplorer = new PeerExplorer(new ArrayList<>(), node, distanceTable, new ECKey(), TIMEOUT, UPDATE, CLEAN); peerExplorer.setUDPChannel(Mockito.mock(UDPChannel.class)); @@ -73,7 +74,7 @@ public void sendInitialMessageToNodesNoNodes() { Assert.assertTrue(CollectionUtils.isEmpty(nodesWithMessage)); - peerExplorer = new PeerExplorer(null, node, distanceTable, new ECKey(), TIMEOUT, REFRESH); + peerExplorer = new PeerExplorer(null, node, distanceTable, new ECKey(), TIMEOUT, UPDATE, CLEAN); peerExplorer.setUDPChannel(Mockito.mock(UDPChannel.class)); nodesWithMessage = peerExplorer.startConversationWithNewNodes(); @@ -93,7 +94,7 @@ public void sendInitialMessageToNodes() { Node node = new Node(new ECKey().getNodeId(), HOST_1, PORT_1); NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node); - PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, new ECKey(), TIMEOUT, REFRESH); + PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, new ECKey(), TIMEOUT, UPDATE, CLEAN); UDPChannel channel = new UDPChannel(Mockito.mock(Channel.class), peerExplorer); peerExplorer.setUDPChannel(channel); @@ -113,7 +114,7 @@ public void handlePingMessage() throws Exception { Node node = new Node(key2.getNodeId(), HOST_2, PORT_2); NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node); - PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, REFRESH); + PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, UPDATE, CLEAN); Channel internalChannel = Mockito.mock(Channel.class); UDPTestChannel channel = new UDPTestChannel(internalChannel, peerExplorer); @@ -168,7 +169,7 @@ public void handlePongMessage() throws Exception { Node node = new Node(key2.getNodeId(), HOST_2, PORT_2); NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node); - PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, REFRESH); + PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, UPDATE, CLEAN); Channel internalChannel = Mockito.mock(Channel.class); UDPTestChannel channel = new UDPTestChannel(internalChannel, peerExplorer); @@ -212,7 +213,7 @@ public void handleFindNodeMessage() throws Exception { Node node = new Node(key2.getNodeId(), HOST_2, PORT_2); NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node); - PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, REFRESH); + PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, UPDATE, CLEAN); Channel internalChannel = Mockito.mock(Channel.class); UDPTestChannel channel = new UDPTestChannel(internalChannel, peerExplorer); @@ -259,7 +260,7 @@ public void handleFindNodeMessageWithExtraNodes() throws Exception { Node node = new Node(key2.getNodeId(), HOST_2, PORT_2); NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node); - PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, REFRESH); + PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, TIMEOUT, UPDATE, CLEAN); Channel internalChannel = Mockito.mock(Channel.class); UDPTestChannel channel = new UDPTestChannel(internalChannel, peerExplorer); @@ -315,7 +316,7 @@ public void handleNeighbors() throws Exception { Node node1 = new Node(key1.getNodeId(), HOST_1, PORT_1); Node node2 = new Node(key2.getNodeId(), HOST_2, PORT_2); NodeDistanceTable distanceTable = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node2); - PeerExplorer peerExplorer = new PeerExplorer(nodes, node2, distanceTable, key2, TIMEOUT, REFRESH); + PeerExplorer peerExplorer = new PeerExplorer(nodes, node2, distanceTable, key2, TIMEOUT, UPDATE, CLEAN); Channel internalChannel = Mockito.mock(Channel.class); UDPTestChannel channel = new UDPTestChannel(internalChannel, peerExplorer); diff --git a/rskj-core/src/test/java/co/rsk/net/discovery/UDPServerTest.java b/rskj-core/src/test/java/co/rsk/net/discovery/UDPServerTest.java index 756c925ea24..1b12c6840a1 100644 --- a/rskj-core/src/test/java/co/rsk/net/discovery/UDPServerTest.java +++ b/rskj-core/src/test/java/co/rsk/net/discovery/UDPServerTest.java @@ -51,7 +51,8 @@ public class UDPServerTest { private static final int PORT_3 = 40307; private static final long TIMEOUT = 30000; - private static final long REFRESH = 60000; + private static final long UPDATE = 60000; + private static final long CLEAN = 60000; @Test public void run3NodesFullTest() throws InterruptedException { @@ -76,9 +77,9 @@ public void run3NodesFullTest() throws InterruptedException { NodeDistanceTable distanceTable2 = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node2); NodeDistanceTable distanceTable3 = new NodeDistanceTable(KademliaOptions.BINS, KademliaOptions.BUCKET_SIZE, node3); - PeerExplorer peerExplorer1 = new PeerExplorer(node1BootNode, node1, distanceTable1, key1, TIMEOUT, REFRESH); - PeerExplorer peerExplorer2 = new PeerExplorer(node2BootNode, node2, distanceTable2, key2, TIMEOUT, REFRESH); - PeerExplorer peerExplorer3 = new PeerExplorer(node3BootNode, node3, distanceTable3, key3, TIMEOUT, REFRESH); + PeerExplorer peerExplorer1 = new PeerExplorer(node1BootNode, node1, distanceTable1, key1, TIMEOUT, UPDATE, CLEAN); + PeerExplorer peerExplorer2 = new PeerExplorer(node2BootNode, node2, distanceTable2, key2, TIMEOUT, UPDATE, CLEAN); + PeerExplorer peerExplorer3 = new PeerExplorer(node3BootNode, node3, distanceTable3, key3, TIMEOUT, UPDATE, CLEAN); Assert.assertEquals(0, peerExplorer1.getNodes().size()); Assert.assertEquals(0, peerExplorer2.getNodes().size()); From d3261cf796050506d9711e4664324ce2985159d9 Mon Sep 17 00:00:00 2001 From: Lisandro Date: Wed, 13 Jun 2018 14:25:02 -0300 Subject: [PATCH 3/3] Test to check after cleanup active challenges are removed --- .../net/discovery/NodeChallengeManager.java | 6 ++ .../co/rsk/net/discovery/PeerExplorer.java | 11 ++-- .../net/discovery/PeerExplorerCleaner.java | 2 +- .../rsk/net/discovery/PeerExplorerTest.java | 59 +++++++++++++++++++ .../co/rsk/net/discovery/UDPServerTest.java | 18 ++++-- 5 files changed, 83 insertions(+), 13 deletions(-) diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/NodeChallengeManager.java b/rskj-core/src/main/java/co/rsk/net/discovery/NodeChallengeManager.java index d67bf465878..ef2878b5b14 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/NodeChallengeManager.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/NodeChallengeManager.java @@ -19,6 +19,7 @@ package co.rsk.net.discovery; import co.rsk.net.discovery.message.PingPeerMessage; +import com.google.common.annotations.VisibleForTesting; import org.ethereum.net.rlpx.Node; import java.util.Map; @@ -41,4 +42,9 @@ public NodeChallenge startChallenge(Node challengedNode, Node challenger, PeerEx public NodeChallenge removeChallenge(String challengeId) { return activeChallenges.remove(challengeId); } + + @VisibleForTesting + public int activeChallengesCount() { + return activeChallenges.size(); + } } diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java b/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java index 42855b12ade..09f98f5e8ae 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java @@ -270,12 +270,6 @@ public void purgeRequests() { removeExpiredRequests(this.pendingFindNodeRequests); } - @VisibleForTesting - public void cleanAndUpdate() { - this.update(); - this.clean(); - } - public void clean() { this.purgeRequests(); } @@ -371,4 +365,9 @@ private Set collectRandomNodes(List originalList, int elementsNbr) { return ret; } + + @VisibleForTesting + public NodeChallengeManager getChallengeManager() { + return challengeManager; + } } diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorerCleaner.java b/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorerCleaner.java index eedb0e47ef5..85ec4f454cc 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorerCleaner.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorerCleaner.java @@ -37,7 +37,7 @@ public PeerExplorerCleaner(PeerExplorer peerExplorer, long updatePeriod, long cl this.peerExplorer = peerExplorer; this.updatePeriod = updatePeriod; this.cleanPeriod = cleanPeriod; - + // it should stay on a single thread since there are two tasks that could interfere with each other running here this.updateTask = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "PeerExplorerCleaner")); } diff --git a/rskj-core/src/test/java/co/rsk/net/discovery/PeerExplorerTest.java b/rskj-core/src/test/java/co/rsk/net/discovery/PeerExplorerTest.java index 73e7b16a3cf..cbfa0b78b60 100644 --- a/rskj-core/src/test/java/co/rsk/net/discovery/PeerExplorerTest.java +++ b/rskj-core/src/test/java/co/rsk/net/discovery/PeerExplorerTest.java @@ -361,4 +361,63 @@ public void handleNeighbors() throws Exception { Assert.assertEquals(DiscoveryMessageType.PING, discoveryEvent.getMessage().getMessageType()); } + @Test + public void testCleanPeriod() throws InterruptedException, Exception{ + List nodes = new ArrayList<>(); + nodes.add(HOST_1 + ":" + PORT_1); + nodes.add(HOST_3 + ":" + PORT_3); + + ECKey key1 = ECKey.fromPrivate(Hex.decode(KEY_1)).decompress(); + ECKey key2 = ECKey.fromPrivate(Hex.decode(KEY_2)).decompress(); + ECKey key3 = ECKey.fromPrivate(Hex.decode(KEY_3)).decompress(); + + Node node = new Node(key2.getNodeId(), HOST_2, PORT_2); + NodeDistanceTable distanceTable = new NodeDistanceTable(1, 1, node); + PeerExplorer peerExplorer = new PeerExplorer(nodes, node, distanceTable, key2, 199, UPDATE, 200); + + Channel internalChannel = Mockito.mock(Channel.class); + UDPTestChannel channel = new UDPTestChannel(internalChannel, peerExplorer); + ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class); + peerExplorer.setUDPChannel(channel); + Assert.assertTrue(CollectionUtils.isEmpty(peerExplorer.getNodes())); + + //A incoming pong for a Ping we did not sent. + String check = UUID.randomUUID().toString(); + PongPeerMessage incomingPongMessage = PongPeerMessage.create(HOST_1, PORT_1, check, key1); + DiscoveryEvent incomingPongEvent = new DiscoveryEvent(incomingPongMessage, new InetSocketAddress(HOST_1, PORT_1)); + channel.clearEvents(); + channel.channelRead0(ctx, incomingPongEvent); + List sentEvents = channel.getEventsWritten(); + Assert.assertEquals(0, sentEvents.size()); + Assert.assertEquals(0, peerExplorer.getNodes().size()); + + //Now we send the ping first + peerExplorer.startConversationWithNewNodes(); + sentEvents = channel.getEventsWritten(); + Assert.assertEquals(2, sentEvents.size()); + incomingPongMessage = PongPeerMessage.create(HOST_1, PORT_1, ((PingPeerMessage) sentEvents.get(0).getMessage()).getMessageId(), key1); + incomingPongEvent = new DiscoveryEvent(incomingPongMessage, new InetSocketAddress(HOST_1, PORT_1)); + PongPeerMessage incomingPongMessage3 = PongPeerMessage.create(HOST_3, PORT_3, ((PingPeerMessage) sentEvents.get(1).getMessage()).getMessageId(), key3); + DiscoveryEvent incomingPongEvent3 = new DiscoveryEvent(incomingPongMessage3, new InetSocketAddress(HOST_3, PORT_3)); + channel.clearEvents(); + List addedNodes = peerExplorer.getNodes(); + Assert.assertEquals(0, addedNodes.size()); + channel.channelRead0(ctx, incomingPongEvent); + Assert.assertEquals(1, peerExplorer.getNodes().size()); + addedNodes = peerExplorer.getNodes(); + Assert.assertEquals(1, addedNodes.size()); + + channel.channelRead0(ctx, incomingPongEvent3); + Assert.assertEquals(1, peerExplorer.getNodes().size()); + addedNodes = peerExplorer.getNodes(); + Assert.assertEquals(1, addedNodes.size()); + + Assert.assertEquals(1, peerExplorer.getChallengeManager().activeChallengesCount()); + Thread.sleep(200L); + peerExplorer.clean(); + peerExplorer.clean(); + peerExplorer.clean(); + Assert.assertEquals(0, peerExplorer.getChallengeManager().activeChallengesCount()); + } + } diff --git a/rskj-core/src/test/java/co/rsk/net/discovery/UDPServerTest.java b/rskj-core/src/test/java/co/rsk/net/discovery/UDPServerTest.java index 1b12c6840a1..e08b3d6f12e 100644 --- a/rskj-core/src/test/java/co/rsk/net/discovery/UDPServerTest.java +++ b/rskj-core/src/test/java/co/rsk/net/discovery/UDPServerTest.java @@ -91,18 +91,24 @@ public void run3NodesFullTest() throws InterruptedException { udpServer3.start(); TimeUnit.SECONDS.sleep(2); - peerExplorer3.cleanAndUpdate(); + peerExplorer3.update(); + peerExplorer3.clean(); udpServer2.start(); TimeUnit.SECONDS.sleep(2); - peerExplorer2.cleanAndUpdate(); - peerExplorer3.cleanAndUpdate(); + peerExplorer2.update(); + peerExplorer2.clean(); + peerExplorer3.update(); + peerExplorer3.clean(); udpServer1.start(); TimeUnit.SECONDS.sleep(2); - peerExplorer1.cleanAndUpdate(); - peerExplorer2.cleanAndUpdate(); - peerExplorer3.cleanAndUpdate(); + peerExplorer1.update(); + peerExplorer1.clean(); + peerExplorer2.update(); + peerExplorer2.clean(); + peerExplorer3.update(); + peerExplorer3.clean(); TimeUnit.SECONDS.sleep(2);