-
Notifications
You must be signed in to change notification settings - Fork 160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP - Cluster bootstrap remoting probe method #546
Open
jroper
wants to merge
1
commit into
akka:main
Choose a base branch
from
jroper:cluster-bootstrap-remoting
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
122 changes: 122 additions & 0 deletions
122
...main/scala/akka/management/cluster/bootstrap/internal/AbstractContactPointBootstrap.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com> | ||
*/ | ||
|
||
package akka.management.cluster.bootstrap.internal | ||
|
||
import java.time.LocalDateTime | ||
import java.util.concurrent.ThreadLocalRandom | ||
|
||
import akka.actor.{Actor, ActorLogging, DeadLetterSuppression, Status, Timers} | ||
import akka.annotation.InternalApi | ||
import akka.discovery.ServiceDiscovery.ResolvedTarget | ||
import akka.management.cluster.bootstrap.ClusterBootstrapSettings | ||
import akka.util.Timeout | ||
import akka.pattern.pipe | ||
|
||
import scala.concurrent.Future | ||
import scala.concurrent.duration._ | ||
|
||
@InternalApi | ||
private[bootstrap] object AbstractContactPointBootstrap { | ||
|
||
private case object ProbeTick extends DeadLetterSuppression | ||
private val ProbingTimerKey = "probing-key" | ||
} | ||
|
||
|
||
/** | ||
* Intended to be spawned as child actor by a higher-level Bootstrap coordinator that manages obtaining of the URIs. | ||
* | ||
* This additional step may at-first seem superficial -- after all, we already have some addresses of the nodes | ||
* that we'll want to join -- however it is not optional. By communicating with the actual nodes before joining their | ||
* cluster we're able to inquire about their status, double-check if perhaps they are part of an existing cluster already | ||
* that we should join, or even coordinate rolling upgrades or more advanced patterns. | ||
*/ | ||
@InternalApi | ||
private[bootstrap] abstract class AbstractContactPointBootstrap( | ||
settings: ClusterBootstrapSettings, | ||
contactPoint: ResolvedTarget | ||
) extends Actor | ||
with ActorLogging | ||
with Timers { | ||
|
||
import AbstractContactPointBootstrap.ProbeTick | ||
import AbstractContactPointBootstrap.ProbingTimerKey | ||
import akka.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol._ | ||
import context.dispatcher | ||
|
||
private val probeInterval = settings.contactPoint.probeInterval | ||
private implicit val probingFailureTimeout: Timeout = Timeout(settings.contactPoint.probingFailureTimeout) | ||
|
||
/** | ||
* If probing keeps failing until the deadline triggers, we notify the parent, | ||
* such that it rediscover again. | ||
*/ | ||
private var probingKeepFailingDeadline: Deadline = settings.contactPoint.probingFailureTimeout.fromNow | ||
|
||
private def resetProbingKeepFailingWithinDeadline(): Unit = | ||
probingKeepFailingDeadline = settings.contactPoint.probingFailureTimeout.fromNow | ||
|
||
override final def preStart(): Unit = | ||
self ! ProbeTick | ||
|
||
override final def receive: Receive = { | ||
case ProbeTick ⇒ | ||
log.debug("Probing [{}] for seed nodes...", uri) | ||
probe() pipeTo self | ||
|
||
case Status.Failure(cause) => | ||
log.warning("Probing [{}] failed due to: {}", uri, cause.getMessage) | ||
if (probingKeepFailingDeadline.isOverdue()) { | ||
log.error("Overdue of probing-failure-timeout, stop probing, signaling that it's failed") | ||
context.parent ! BootstrapCoordinator.Protocol.ProbingFailed(contactPoint, cause) | ||
context.stop(self) | ||
} else { | ||
// keep probing, hoping the request will eventually succeed | ||
scheduleNextContactPointProbing() | ||
} | ||
|
||
case response: SeedNodes ⇒ | ||
notifyParentAboutSeedNodes(response) | ||
resetProbingKeepFailingWithinDeadline() | ||
// we keep probing and looking if maybe a cluster does form after all | ||
// (technically could be long polling or web-sockets, but that would need reconnect logic, so this is simpler) | ||
scheduleNextContactPointProbing() | ||
} | ||
|
||
/** | ||
* Probe the contact point. | ||
* | ||
* @param probingFailureTimeout A timeout, if not replied within this timeout, the returned Future should fail. | ||
* @return A future of the seed nodes. | ||
*/ | ||
protected def probe()(implicit probingFailureTimeout: Timeout): Future[SeedNodes] | ||
|
||
/** | ||
* Render the URI of the contact point as a string. | ||
* | ||
* This is used for logging purposes. | ||
*/ | ||
protected def uri: String | ||
|
||
private def notifyParentAboutSeedNodes(members: SeedNodes): Unit = { | ||
val seedAddresses = members.seedNodes.map(_.node) | ||
context.parent ! BootstrapCoordinator.Protocol.ObtainedHttpSeedNodesObservation(timeNow(), contactPoint, | ||
members.selfNode, seedAddresses) | ||
} | ||
|
||
private def scheduleNextContactPointProbing(): Unit = | ||
timers.startSingleTimer(ProbingTimerKey, ProbeTick, effectiveProbeInterval()) | ||
|
||
/** Duration with configured jitter applied */ | ||
private def effectiveProbeInterval(): FiniteDuration = | ||
probeInterval + jitter(probeInterval) | ||
|
||
def jitter(d: FiniteDuration): FiniteDuration = | ||
(d.toMillis * settings.contactPoint.probeIntervalJitter * ThreadLocalRandom.current().nextDouble()).millis | ||
|
||
protected def timeNow(): LocalDateTime = | ||
LocalDateTime.now() | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we want to support both methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You won't be able to do a rolling upgrade to the new method if you don't support both concurrently.