Skip to content
This repository was archived by the owner on Apr 13, 2022. It is now read-only.

Minor updates for networking #369

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/main/scala/scorex/core/network/NetworkController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ class NetworkController(settings: NetworkSettings,
log.info(s"Declared address: ${scorexContext.externalNodeAddress}")

//bind to listen incoming connections
tcpManager ! Bind(self, bindAddress, options = Nil, pullMode = false)
tcpManager ! Bind(self, bindAddress, options = Nil, pullMode = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I am misunderstanding, I thought pull mode was purposely disabled here since to enable it would cause the network controller to respond to the TCP manager each time a new connection is attempted. But since each attempt to connect to peers is independent, there is no need to block subsequent connection requests.

Relevant documentation on pullMode in bind command
https://doc.akka.io/docs/akka/2.6.9/io-tcp.html?language=java#pull-mode-reading-for-inbound-connections

pullMode on connect makes sense to me as we want the PeerConnectionHandlers between remote peers to fully process and acknowledge messages to maintain a stable connection.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply. Pull mode is supposed to throttle reading, not to throttle the number of connection requests. However, in Akka, both are throttled, so pull mode was disabled on purpose to not throttle the number of incoming connections.

In this pull request, pull mode is enabled to throttle only reading, and at the same time, sending a message to TCP manager to always accept new connections.


override def receive: Receive =
bindingLogic orElse
override def receive: Receive = bindingLogic

def mainLogic(tcpListener: ActorRef): Receive =
businessLogic orElse
peerCommands orElse
connectionEvents orElse
Expand All @@ -80,6 +81,7 @@ class NetworkController(settings: NetworkSettings,
case Bound(_) =>
log.info("Successfully bound to the port " + settings.bindAddress.getPort)
scheduleConnectionToPeer()
context become mainLogic(sender())

case CommandFailed(_: Bind) =>
log.error("Network port " + settings.bindAddress.getPort + " already in use!")
Expand Down Expand Up @@ -128,10 +130,12 @@ class NetworkController(settings: NetworkSettings,
log.info(s"Unconfirmed connection: ($remoteAddress, $localAddress) => $connectionId")
if (connectionDirection.isOutgoing) createPeerConnectionHandler(connectionId, sender())
else peerManagerRef ! ConfirmConnection(connectionId, sender())
tcpManager ! Tcp.ResumeAccepting(1)

case Connected(remoteAddress, _) =>
log.warn(s"Connection to peer $remoteAddress is already established")
sender() ! Close
tcpManager ! Tcp.ResumeAccepting(1)

case ConnectionConfirmed(connectionId, handlerRef) =>
log.info(s"Connection confirmed to $connectionId")
Expand Down Expand Up @@ -192,7 +196,7 @@ class NetworkController(settings: NetworkSettings,
*/
private def scheduleConnectionToPeer(): Unit = {
context.system.scheduler.schedule(5.seconds, 5.seconds) {
if (connections.size < settings.maxConnections) {
if (connections.size + unconfirmedConnections.size < settings.maxConnections) {
val randomPeerF = peerManagerRef ? RandomPeerExcluding(connections.values.flatMap(_.peerInfo).toSeq)
randomPeerF.mapTo[Option[PeerInfo]].foreach { peerInfoOpt =>
peerInfoOpt.foreach(peerInfo => self ! ConnectTo(peerInfo))
Expand Down Expand Up @@ -282,7 +286,9 @@ class NetworkController(settings: NetworkSettings,
} else {
peerManagerRef ! AddOrUpdatePeer(peerInfo)

val updatedConnectedPeer = connectedPeer.copy(peerInfo = Some(peerInfo))
val updatedPeerSpec = peerInfo.peerSpec.copy(declaredAddress = Some(peerInfo.peerSpec.address.getOrElse(remoteAddress)))
val updatedPeerInfo = peerInfo.copy(peerSpec = updatedPeerSpec)
val updatedConnectedPeer = connectedPeer.copy(peerInfo = Some(updatedPeerInfo))
connections += remoteAddress -> updatedConnectedPeer
context.system.eventStream.publish(HandshakedPeer(updatedConnectedPeer))
}
Expand Down