Skip to content

Commit

Permalink
[WX-1835] Scheduled logging for list of groups experiencing quota exh…
Browse files Browse the repository at this point in the history
…austion (#7539)
  • Loading branch information
salonishah11 committed Sep 13, 2024
1 parent 6f1f9e5 commit 081318d
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,63 @@ import cromwell.database.sql.SqlConverters.OffsetDateTimeToSystemTimestamp
import cromwell.database.sql.tables.GroupMetricsEntry

import java.time.OffsetDateTime
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}

class GroupMetricsActor(engineDbInterface: EngineSqlDatabase, quotaExhaustionThresholdInMins: Long)
extends Actor
class GroupMetricsActor(engineDbInterface: EngineSqlDatabase,
quotaExhaustionThresholdInMins: Long,
loggingInterval: FiniteDuration
) extends Actor
with ActorLogging {

implicit val ec: MessageDispatcher = context.system.dispatchers.lookup(Dispatcher.EngineDispatcher)

log.info(
s"${this.getClass.getSimpleName} configured to log groups experiencing quota exhaustion at interval of ${loggingInterval.toString()}."
)
// initial schedule for logging exhausted groups
context.system.scheduler.scheduleOnce(loggingInterval)(self ! LogQuotaExhaustedGroups)

override def receive: Receive = {
case RecordGroupQuotaExhaustion(group) =>
val groupMetricsEntry = GroupMetricsEntry(group, OffsetDateTime.now.toSystemTimestamp)
engineDbInterface.recordGroupMetricsEntry(groupMetricsEntry)
()
case GetQuotaExhaustedGroups =>
val respondTo: ActorRef = sender()

// for a group in the GROUP_METRICS_ENTRY table, if the 'quota_exhaustion_detected' timestamp hasn't
// been updated in last X minutes it is no longer experiencing cloud quota exhaustion
val currentTimestampMinusDelay = OffsetDateTime.now().minusMinutes(quotaExhaustionThresholdInMins)
engineDbInterface.getQuotaExhaustedGroups(currentTimestampMinusDelay.toSystemTimestamp) onComplete {
getQuotaExhaustedGroups() onComplete {
case Success(quotaExhaustedGroups) => respondTo ! GetQuotaExhaustedGroupsSuccess(quotaExhaustedGroups.toList)
case Failure(exception) => respondTo ! GetQuotaExhaustedGroupsFailure(exception.getMessage)
}
case LogQuotaExhaustedGroups =>
getQuotaExhaustedGroups() onComplete {
case Success(quotaExhaustedGroups) =>
log.info(
s"Hog groups currently experiencing quota exhaustion: ${quotaExhaustedGroups.length}. Group IDs: [${quotaExhaustedGroups.toList
.mkString(", ")}]."
)
case Failure(exception) =>
log.info(
s"Something went wrong when fetching quota exhausted groups for logging. Will retry in ${loggingInterval
.toString()}. Exception: ${exception.getMessage}"
)
}
// schedule next logging
context.system.scheduler.scheduleOnce(loggingInterval)(self ! LogQuotaExhaustedGroups)
()
case other =>
log.error(
s"Programmer Error: Unexpected message ${other.toPrettyElidedString(1000)} received by ${this.self.path.name}."
)
}

private def getQuotaExhaustedGroups(): Future[Seq[String]] = {
// for a group in the GROUP_METRICS_ENTRY table, if the 'quota_exhaustion_detected' timestamp hasn't
// been updated in last X minutes it is no longer experiencing cloud quota exhaustion
val currentTimestampMinusDelay = OffsetDateTime.now().minusMinutes(quotaExhaustionThresholdInMins)
engineDbInterface.getQuotaExhaustedGroups(currentTimestampMinusDelay.toSystemTimestamp)
}
}

object GroupMetricsActor {
Expand All @@ -47,12 +76,17 @@ object GroupMetricsActor {
sealed trait GroupMetricsActorMessage
case class RecordGroupQuotaExhaustion(group: String) extends GroupMetricsActorMessage
case object GetQuotaExhaustedGroups extends GroupMetricsActorMessage
case object LogQuotaExhaustedGroups extends GroupMetricsActorMessage

// Responses
sealed trait GetQuotaExhaustedGroupsResponse
case class GetQuotaExhaustedGroupsSuccess(quotaExhaustedGroups: List[String]) extends GetQuotaExhaustedGroupsResponse
case class GetQuotaExhaustedGroupsFailure(errorMsg: String) extends GetQuotaExhaustedGroupsResponse

def props(engineDbInterface: EngineSqlDatabase, quotaExhaustionThresholdInMins: Long): Props =
Props(new GroupMetricsActor(engineDbInterface, quotaExhaustionThresholdInMins)).withDispatcher(EngineDispatcher)
def props(engineDbInterface: EngineSqlDatabase,
quotaExhaustionThresholdInMins: Long,
loggingInterval: FiniteDuration
): Props =
Props(new GroupMetricsActor(engineDbInterface, quotaExhaustionThresholdInMins, loggingInterval))
.withDispatcher(EngineDispatcher)
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class GroupMetricsActorSpec extends AnyFlatSpec with Matchers {

it should "receive new quota exhaustion message and call database function" in {
val db = databaseInterface()
val mockGroupMetricsActor = TestActorRef(GroupMetricsActor.props(db, 15))
val mockGroupMetricsActor = TestActorRef(GroupMetricsActor.props(db, 15, 5.minutes))

mockGroupMetricsActor.tell(RecordGroupQuotaExhaustion(testHogGroup), TestProbe().ref)

Expand All @@ -58,7 +58,7 @@ class GroupMetricsActorSpec extends AnyFlatSpec with Matchers {

it should "respond with groups in quota exhaustion" in {
val db = databaseInterface()
val mockGroupMetricsActor = TestActorRef(GroupMetricsActor.props(db, 15))
val mockGroupMetricsActor = TestActorRef(GroupMetricsActor.props(db, 15, 5.minutes))
val requestActor = TestProbe()

mockGroupMetricsActor.tell(GetQuotaExhaustedGroups, requestActor.ref)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ system {
# threshold (in minutes) after which a group in GROUP_METRICS_ENTRY table is no longer considered to be
# actively experiencing quota exhaustion
threshold-minutes = 15
# logging interval for which groups are in active quota exhaustion state
logging-interval = 5 minutes
}

workflow-heartbeats {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,14 @@ abstract class CromwellRootActor(terminator: CromwellTerminator,
systemConfig.as[Option[Boolean]]("quota-exhaustion-job-start-control.enabled").getOrElse(false)
private lazy val quotaExhaustionThresholdInMins: Long =
systemConfig.as[Option[Long]]("quota-exhaustion-job-start-control.threshold-minutes").getOrElse(15)
private lazy val quotaExhaustionLoggingInterval: FiniteDuration =
systemConfig.as[Option[FiniteDuration]]("quota-exhaustion-job-start-control.logging-interval").getOrElse(5.minutes)
private lazy val groupMetricsActor: ActorRef =
context.actorOf(
GroupMetricsActor.props(EngineServicesStore.engineDatabaseInterface, quotaExhaustionThresholdInMins)
GroupMetricsActor.props(EngineServicesStore.engineDatabaseInterface,
quotaExhaustionThresholdInMins,
quotaExhaustionLoggingInterval
)
)
private lazy val groupMetricsActorForJTDA: Option[ActorRef] =
if (quotaExhaustionJobControlEnabled) Option(groupMetricsActor) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package cromwell.engine.workflow.tokens
import akka.actor.{ActorRef, PoisonPill, Props}
import akka.testkit.{ImplicitSender, TestActorRef, TestProbe}
import cromwell.backend.standard.GroupMetricsActor
import cromwell.backend.standard.GroupMetricsActor.{GetQuotaExhaustedGroups, GetQuotaExhaustedGroupsSuccess}
import cromwell.backend.standard.GroupMetricsActor.{
GetQuotaExhaustedGroups,
GetQuotaExhaustedGroupsSuccess,
LogQuotaExhaustedGroups
}
import cromwell.core.JobToken.JobTokenType
import cromwell.core.{HogGroup, TestKitSuite}
import cromwell.engine.workflow.tokens.DynamicRateLimiter.{Rate, TokensAvailable}
Expand Down Expand Up @@ -572,8 +576,9 @@ object JobTokenDispenserActorSpec {
val LimitedTo5Tokens: JobTokenType = limitedTokenType(5)
}

class TestGroupMetricsActorForJTDA extends GroupMetricsActor(engineDatabaseInterface, 15) {
override def receive: Receive = { case GetQuotaExhaustedGroups =>
sender() ! GetQuotaExhaustedGroupsSuccess(List(quotaExhaustedHogGroup.value))
class TestGroupMetricsActorForJTDA extends GroupMetricsActor(engineDatabaseInterface, 15, 10.minutes) {
override def receive: Receive = {
case GetQuotaExhaustedGroups => sender() ! GetQuotaExhaustedGroupsSuccess(List(quotaExhaustedHogGroup.value))
case LogQuotaExhaustedGroups => ()
}
}

0 comments on commit 081318d

Please sign in to comment.