From 081318d2fdb9f25ad5b50e7df4a8f21785f01a63 Mon Sep 17 00:00:00 2001 From: Saloni Shah Date: Fri, 13 Sep 2024 11:46:49 -0400 Subject: [PATCH] [WX-1835] Scheduled logging for list of groups experiencing quota exhaustion (#7539) --- .../backend/standard/GroupMetricsActor.scala | 52 +++++++++++++++---- .../standard/GroupMetricsActorSpec.scala | 4 +- core/src/main/resources/reference.conf | 2 + .../cromwell/server/CromwellRootActor.scala | 7 ++- .../tokens/JobTokenDispenserActorSpec.scala | 13 +++-- 5 files changed, 62 insertions(+), 16 deletions(-) diff --git a/backend/src/main/scala/cromwell/backend/standard/GroupMetricsActor.scala b/backend/src/main/scala/cromwell/backend/standard/GroupMetricsActor.scala index 2e71a3e36a0..6a0626c2817 100644 --- a/backend/src/main/scala/cromwell/backend/standard/GroupMetricsActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/GroupMetricsActor.scala @@ -11,14 +11,24 @@ import cromwell.database.sql.SqlConverters.OffsetDateTimeToSystemTimestamp import cromwell.database.sql.tables.GroupMetricsEntry import java.time.OffsetDateTime +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration import scala.util.{Failure, Success} -class GroupMetricsActor(engineDbInterface: EngineSqlDatabase, quotaExhaustionThresholdInMins: Long) - extends Actor +class GroupMetricsActor(engineDbInterface: EngineSqlDatabase, + quotaExhaustionThresholdInMins: Long, + loggingInterval: FiniteDuration +) extends Actor with ActorLogging { implicit val ec: MessageDispatcher = context.system.dispatchers.lookup(Dispatcher.EngineDispatcher) + log.info( + s"${this.getClass.getSimpleName} configured to log groups experiencing quota exhaustion at interval of ${loggingInterval.toString()}." + ) + // initial schedule for logging exhausted groups + context.system.scheduler.scheduleOnce(loggingInterval)(self ! LogQuotaExhaustedGroups) + override def receive: Receive = { case RecordGroupQuotaExhaustion(group) => val groupMetricsEntry = GroupMetricsEntry(group, OffsetDateTime.now.toSystemTimestamp) @@ -26,19 +36,38 @@ class GroupMetricsActor(engineDbInterface: EngineSqlDatabase, quotaExhaustionThr () case GetQuotaExhaustedGroups => val respondTo: ActorRef = sender() - - // for a group in the GROUP_METRICS_ENTRY table, if the 'quota_exhaustion_detected' timestamp hasn't - // been updated in last X minutes it is no longer experiencing cloud quota exhaustion - val currentTimestampMinusDelay = OffsetDateTime.now().minusMinutes(quotaExhaustionThresholdInMins) - engineDbInterface.getQuotaExhaustedGroups(currentTimestampMinusDelay.toSystemTimestamp) onComplete { + getQuotaExhaustedGroups() onComplete { case Success(quotaExhaustedGroups) => respondTo ! GetQuotaExhaustedGroupsSuccess(quotaExhaustedGroups.toList) case Failure(exception) => respondTo ! GetQuotaExhaustedGroupsFailure(exception.getMessage) } + case LogQuotaExhaustedGroups => + getQuotaExhaustedGroups() onComplete { + case Success(quotaExhaustedGroups) => + log.info( + s"Hog groups currently experiencing quota exhaustion: ${quotaExhaustedGroups.length}. Group IDs: [${quotaExhaustedGroups.toList + .mkString(", ")}]." + ) + case Failure(exception) => + log.info( + s"Something went wrong when fetching quota exhausted groups for logging. Will retry in ${loggingInterval + .toString()}. Exception: ${exception.getMessage}" + ) + } + // schedule next logging + context.system.scheduler.scheduleOnce(loggingInterval)(self ! LogQuotaExhaustedGroups) + () case other => log.error( s"Programmer Error: Unexpected message ${other.toPrettyElidedString(1000)} received by ${this.self.path.name}." ) } + + private def getQuotaExhaustedGroups(): Future[Seq[String]] = { + // for a group in the GROUP_METRICS_ENTRY table, if the 'quota_exhaustion_detected' timestamp hasn't + // been updated in last X minutes it is no longer experiencing cloud quota exhaustion + val currentTimestampMinusDelay = OffsetDateTime.now().minusMinutes(quotaExhaustionThresholdInMins) + engineDbInterface.getQuotaExhaustedGroups(currentTimestampMinusDelay.toSystemTimestamp) + } } object GroupMetricsActor { @@ -47,12 +76,17 @@ object GroupMetricsActor { sealed trait GroupMetricsActorMessage case class RecordGroupQuotaExhaustion(group: String) extends GroupMetricsActorMessage case object GetQuotaExhaustedGroups extends GroupMetricsActorMessage + case object LogQuotaExhaustedGroups extends GroupMetricsActorMessage // Responses sealed trait GetQuotaExhaustedGroupsResponse case class GetQuotaExhaustedGroupsSuccess(quotaExhaustedGroups: List[String]) extends GetQuotaExhaustedGroupsResponse case class GetQuotaExhaustedGroupsFailure(errorMsg: String) extends GetQuotaExhaustedGroupsResponse - def props(engineDbInterface: EngineSqlDatabase, quotaExhaustionThresholdInMins: Long): Props = - Props(new GroupMetricsActor(engineDbInterface, quotaExhaustionThresholdInMins)).withDispatcher(EngineDispatcher) + def props(engineDbInterface: EngineSqlDatabase, + quotaExhaustionThresholdInMins: Long, + loggingInterval: FiniteDuration + ): Props = + Props(new GroupMetricsActor(engineDbInterface, quotaExhaustionThresholdInMins, loggingInterval)) + .withDispatcher(EngineDispatcher) } diff --git a/backend/src/test/scala/cromwell/backend/standard/GroupMetricsActorSpec.scala b/backend/src/test/scala/cromwell/backend/standard/GroupMetricsActorSpec.scala index 39002c7ff6b..fb0007fbd27 100644 --- a/backend/src/test/scala/cromwell/backend/standard/GroupMetricsActorSpec.scala +++ b/backend/src/test/scala/cromwell/backend/standard/GroupMetricsActorSpec.scala @@ -47,7 +47,7 @@ class GroupMetricsActorSpec extends AnyFlatSpec with Matchers { it should "receive new quota exhaustion message and call database function" in { val db = databaseInterface() - val mockGroupMetricsActor = TestActorRef(GroupMetricsActor.props(db, 15)) + val mockGroupMetricsActor = TestActorRef(GroupMetricsActor.props(db, 15, 5.minutes)) mockGroupMetricsActor.tell(RecordGroupQuotaExhaustion(testHogGroup), TestProbe().ref) @@ -58,7 +58,7 @@ class GroupMetricsActorSpec extends AnyFlatSpec with Matchers { it should "respond with groups in quota exhaustion" in { val db = databaseInterface() - val mockGroupMetricsActor = TestActorRef(GroupMetricsActor.props(db, 15)) + val mockGroupMetricsActor = TestActorRef(GroupMetricsActor.props(db, 15, 5.minutes)) val requestActor = TestProbe() mockGroupMetricsActor.tell(GetQuotaExhaustedGroups, requestActor.ref) diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 90ac2d0906f..465f43ad7a1 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -277,6 +277,8 @@ system { # threshold (in minutes) after which a group in GROUP_METRICS_ENTRY table is no longer considered to be # actively experiencing quota exhaustion threshold-minutes = 15 + # logging interval for which groups are in active quota exhaustion state + logging-interval = 5 minutes } workflow-heartbeats { diff --git a/engine/src/main/scala/cromwell/server/CromwellRootActor.scala b/engine/src/main/scala/cromwell/server/CromwellRootActor.scala index 8775ca37565..073990d699e 100644 --- a/engine/src/main/scala/cromwell/server/CromwellRootActor.scala +++ b/engine/src/main/scala/cromwell/server/CromwellRootActor.scala @@ -210,9 +210,14 @@ abstract class CromwellRootActor(terminator: CromwellTerminator, systemConfig.as[Option[Boolean]]("quota-exhaustion-job-start-control.enabled").getOrElse(false) private lazy val quotaExhaustionThresholdInMins: Long = systemConfig.as[Option[Long]]("quota-exhaustion-job-start-control.threshold-minutes").getOrElse(15) + private lazy val quotaExhaustionLoggingInterval: FiniteDuration = + systemConfig.as[Option[FiniteDuration]]("quota-exhaustion-job-start-control.logging-interval").getOrElse(5.minutes) private lazy val groupMetricsActor: ActorRef = context.actorOf( - GroupMetricsActor.props(EngineServicesStore.engineDatabaseInterface, quotaExhaustionThresholdInMins) + GroupMetricsActor.props(EngineServicesStore.engineDatabaseInterface, + quotaExhaustionThresholdInMins, + quotaExhaustionLoggingInterval + ) ) private lazy val groupMetricsActorForJTDA: Option[ActorRef] = if (quotaExhaustionJobControlEnabled) Option(groupMetricsActor) else None diff --git a/engine/src/test/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActorSpec.scala b/engine/src/test/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActorSpec.scala index f434682c440..4ef5c1d1ba1 100644 --- a/engine/src/test/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActorSpec.scala +++ b/engine/src/test/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActorSpec.scala @@ -3,7 +3,11 @@ package cromwell.engine.workflow.tokens import akka.actor.{ActorRef, PoisonPill, Props} import akka.testkit.{ImplicitSender, TestActorRef, TestProbe} import cromwell.backend.standard.GroupMetricsActor -import cromwell.backend.standard.GroupMetricsActor.{GetQuotaExhaustedGroups, GetQuotaExhaustedGroupsSuccess} +import cromwell.backend.standard.GroupMetricsActor.{ + GetQuotaExhaustedGroups, + GetQuotaExhaustedGroupsSuccess, + LogQuotaExhaustedGroups +} import cromwell.core.JobToken.JobTokenType import cromwell.core.{HogGroup, TestKitSuite} import cromwell.engine.workflow.tokens.DynamicRateLimiter.{Rate, TokensAvailable} @@ -572,8 +576,9 @@ object JobTokenDispenserActorSpec { val LimitedTo5Tokens: JobTokenType = limitedTokenType(5) } -class TestGroupMetricsActorForJTDA extends GroupMetricsActor(engineDatabaseInterface, 15) { - override def receive: Receive = { case GetQuotaExhaustedGroups => - sender() ! GetQuotaExhaustedGroupsSuccess(List(quotaExhaustedHogGroup.value)) +class TestGroupMetricsActorForJTDA extends GroupMetricsActor(engineDatabaseInterface, 15, 10.minutes) { + override def receive: Receive = { + case GetQuotaExhaustedGroups => sender() ! GetQuotaExhaustedGroupsSuccess(List(quotaExhaustedHogGroup.value)) + case LogQuotaExhaustedGroups => () } }