Skip to content

Commit

Permalink
Make Artery connect failure logging less verbose, akka#26865 (akka#27073
Browse files Browse the repository at this point in the history
)

* silence "Restarting graph due to failure" logging by RestartFlow, akka#26865
* only log first error in connectionFlowWithRestart as warning and then debug
* avoid double logging of connection refused
* mima
  • Loading branch information
patriknw authored Jun 14, 2019
1 parent 31def70 commit af525fd
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 12 deletions.
15 changes: 13 additions & 2 deletions akka-remote/src/main/scala/akka/remote/artery/Association.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.remote.artery

import java.net.ConnectException

import akka.util.PrettyDuration._
import java.util.Queue
import java.util.concurrent.CountDownLatch
Expand All @@ -16,6 +18,7 @@ import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._

import akka.{ Done, NotUsed }
import akka.actor.ActorRef
import akka.actor.ActorSelectionMessage
Expand All @@ -42,8 +45,8 @@ import akka.stream.scaladsl.Source
import akka.util.{ OptionVal, Unsafe, WildcardIndex }
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
import akka.stream.SharedKillSwitch

import scala.util.control.NoStackTrace

import akka.actor.Cancellable
import akka.stream.StreamTcpException
import akka.util.ccompat._
Expand Down Expand Up @@ -939,6 +942,10 @@ private[remote] class Association(
}
}

def isConnectException: Boolean =
cause.isInstanceOf[StreamTcpException] && cause.getCause != null && cause.getCause
.isInstanceOf[ConnectException]

if (stoppedIdle) {
log.debug("{} to [{}] was idle and stopped. It will be restarted if used again.", streamName, remoteAddress)
lazyRestart()
Expand All @@ -949,7 +956,11 @@ private[remote] class Association(
remoteAddress)
lazyRestart()
} else if (bypassRestartCounter || restartCounter.restart()) {
log.error(cause, "{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage)
// ConnectException may happen repeatedly and are already logged in connectionFlowWithRestart
if (isConnectException)
log.debug("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause)
else
log.warning("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause)
lazyRestart()
} else {
log.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package akka.remote.artery
package tcp

import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.immutable
import scala.concurrent.Await
Expand Down Expand Up @@ -134,7 +135,10 @@ private[remote] class ArteryTcpTransport(
}

def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = {
val restartCount = new AtomicInteger(0)

val flowFactory = () => {
val onFailureLogLevel = if (restartCount.incrementAndGet() == 1) Logging.WarningLevel else Logging.DebugLevel

def flow(controlIdleKillSwitch: OptionVal[SharedKillSwitch]) =
Flow[ByteString]
Expand All @@ -153,7 +157,7 @@ private[remote] class ArteryTcpTransport(
}))
.recoverWithRetries(1, { case ArteryTransport.ShutdownSignal => Source.empty })
.log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream")
.addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = Logging.WarningLevel))
.addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = onFailureLogLevel))

if (streamId == ControlStreamId) {
// must replace the KillSwitch when restarted
Expand All @@ -168,11 +172,15 @@ private[remote] class ArteryTcpTransport(
// Restart of inner connection part important in control stream, since system messages
// are buffered and resent from the outer SystemMessageDelivery stage. No maxRestarts limit for control
// stream. For message stream it's best effort retry a few times.
RestartFlow.withBackoff[ByteString, ByteString](
settings.Advanced.OutboundRestartBackoff,
settings.Advanced.OutboundRestartBackoff * 5,
0.1,
maxRestarts)(flowFactory)
RestartFlow
.withBackoff[ByteString, ByteString](
settings.Advanced.OutboundRestartBackoff,
settings.Advanced.OutboundRestartBackoff * 5,
0.1,
maxRestarts)(flowFactory)
// silence "Restarting graph due to failure" logging by RestartFlow
.addAttributes(Attributes.logLevels(onFailure = LogLevels.Off))

}

Flow[EnvelopeBuffer]
Expand Down
3 changes: 3 additions & 0 deletions akka-stream/src/main/mima-filters/2.5.x.backwards.excludes
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SourceQueueAdapter
# Remove deprecated features since 2.5.0 https://github.com/akka/akka/issues/26492
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.ActorMaterializerSettings.withAutoFusing")

# Add attributes to constructor of internal class
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.RestartWithBackoffLogic.this")

# #26910 scheduleWithFixedDelay vs scheduleAtFixedRate
# Adding methods to Materializer is not compatible but we don't support other Materializer implementations
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Materializer.scheduleAtFixedRate")
Expand Down
22 changes: 19 additions & 3 deletions akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import akka.stream._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl.RestartWithBackoffFlow.Delay
import akka.stream.stage._

import scala.concurrent.duration._

import akka.stream.Attributes.LogLevels

/**
* A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails.
*
Expand Down Expand Up @@ -151,7 +152,15 @@ private final class RestartWithBackoffFlow[In, Out](
override def shape = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes) =
new RestartWithBackoffLogic("Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) {
new RestartWithBackoffLogic(
"Flow",
shape,
inheritedAttributes,
minBackoff,
maxBackoff,
randomFactor,
onlyOnFailures,
maxRestarts) {
val delay = inheritedAttributes.get[Delay](Delay(50.millis)).duration

var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None
Expand Down Expand Up @@ -207,6 +216,7 @@ private final class RestartWithBackoffFlow[In, Out](
private abstract class RestartWithBackoffLogic[S <: Shape](
name: String,
shape: S,
inheritedAttributes: Attributes,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
Expand All @@ -223,6 +233,11 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
protected def startGraph(): Unit
protected def backoff(): Unit

private def loggingEnabled = inheritedAttributes.get[LogLevels] match {
case Some(levels) => levels.onFailure != LogLevels.Off
case None => true
}

/**
* @param out The permanent outlet
* @return A sub sink inlet that's sink is attached to the wrapped operator
Expand All @@ -248,7 +263,8 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
if (finishing || maxRestartsReached()) {
fail(out, ex)
} else {
log.warning("Restarting graph due to failure. stack_trace: {}", Logging.stackTraceFor(ex))
if (loggingEnabled)
log.warning("Restarting graph due to failure. stack_trace: {}", Logging.stackTraceFor(ex))
scheduleRestartTimer()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private final class RestartWithBackoffSink[T](
new RestartWithBackoffLogic(
"Sink",
shape,
inheritedAttributes,
minBackoff,
maxBackoff,
randomFactor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,15 @@ private final class RestartWithBackoffSource[T](

override def shape = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes) =
new RestartWithBackoffLogic("Source", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) {
new RestartWithBackoffLogic(
"Source",
shape,
inheritedAttributes,
minBackoff,
maxBackoff,
randomFactor,
onlyOnFailures,
maxRestarts) {

override protected def logSource = self.getClass

Expand Down

0 comments on commit af525fd

Please sign in to comment.