Skip to content

Commit

Permalink
PMM-347 coordinator and monitoring support
Browse files Browse the repository at this point in the history
  • Loading branch information
trolley813 committed Jul 25, 2022
1 parent cf2eaac commit 4b90591
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 248 deletions.
4 changes: 3 additions & 1 deletion http/src/main/scala/ru/itclover/tsp/http/HttpService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ trait HttpService extends RoutesProtocols {
implicit val system: ActorSystem
implicit val materializer: ActorMaterializer
implicit val executionContext: ExecutionContextExecutor
implicit val queueManagerService: QueueManagerService

val blockingExecutorContext: ExecutionContextExecutor

Expand All @@ -39,8 +40,9 @@ trait HttpService extends RoutesProtocols {

val res = for {
jobs <- JobsRoutes.fromExecutionContext(blockingExecutorContext)
monitoring <- MonitoringRoutes.fromExecutionContext(queueManagerService)
validation <- ValidationRoutes.fromExecutionContext()
} yield jobs ~ validation
} yield jobs ~ monitoring ~ validation

log.debug("composeRoutes finished")
res
Expand Down
2 changes: 1 addition & 1 deletion http/src/main/scala/ru/itclover/tsp/http/Launcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,5 @@ object Launcher extends App with HttpService {
}


val queueManager = QueueManagerService.getOrCreate("mgr", blockingExecutorContext)
implicit val queueManagerService = QueueManagerService.getOrCreate("mgr", blockingExecutorContext)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package ru.itclover.tsp.http.routes

import akka.actor.ActorSystem
import akka.http.scaladsl.model.StatusCodes.{BadRequest, InternalServerError}
import akka.http.scaladsl.model.{HttpResponse, StatusCodes, Uri}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import cats.data.Reader
import com.typesafe.scalalogging.Logger
import ru.itclover.tsp.BuildInfo
import ru.itclover.tsp.http.domain.output.{FailureResponse, SuccessfulResponse}
import ru.itclover.tsp.http.protocols.RoutesProtocols
import ru.itclover.tsp.http.services.queuing.QueueManagerService
import ru.itclover.tsp.streaming.checkpointing.CheckpointingService

import scala.concurrent.ExecutionContextExecutor
import scala.util.Success

object MonitoringRoutes {

private val log = Logger[MonitoringRoutes]

def fromExecutionContext(
queueManagerService: QueueManagerService
)(implicit as: ActorSystem, am: ActorMaterializer): Reader[ExecutionContextExecutor, Route] = {

log.debug("fromExecutionContext started")

Reader { execContext =>
new MonitoringRoutes {
implicit override val executionContext = execContext
implicit override val actors = as
implicit override val materializer = am
implicit override val qm = queueManagerService
}.route
}

}
log.debug("fromExecutionContext finished")
}

trait MonitoringRoutes extends RoutesProtocols {
implicit val qm: QueueManagerService

implicit val executionContext: ExecutionContextExecutor
implicit val actors: ActorSystem
implicit val materializer: ActorMaterializer

val route: Route = path("job" / Segment / "status") { uuid =>
CheckpointingService.getCheckpoint(uuid) match {
case Some(details) => complete(Map("rowsRead" -> details.readRows, "rowsWritten" -> details.writtenRows))
case None => complete((BadRequest, FailureResponse(4006, "No such job.", Seq.empty)))
//case Failure(err) => complete((InternalServerError, FailureResponse(5005, err)))
}
} ~ path("jobs" / "overview") {
complete(qm.getRunningJobsIds)
} ~
path("metainfo" / "getVersion") {
complete(
SuccessfulResponse(
Map(
"tsp" -> BuildInfo.version,
"scala" -> BuildInfo.scalaVersion,
)
)
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ case class CoordinatorService (coordUri: String)
method = HttpMethods.POST,
uri = uri,
entity = HttpEntity(ContentTypes.`application/json`,
s"""{"jobId": "$jobId", "success": $success, "error": "$error"}""")
s"""{"jobId": "$jobId", "success": $success, "error": "Exception occurred"}""")
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.model.{HttpRequest, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import cats.effect.IO
import cats.effect.{Deferred, IO}
import cats.effect.syntax.async
import cats.effect.unsafe.implicits.global
import cats.implicits.toTraverseOps
import com.typesafe.scalalogging.Logger
import fs2.concurrent.SignallingRef
import ru.itclover.tsp.StreamSource.Row
import ru.itclover.tsp.core.{Incident, RawPattern}
import ru.itclover.tsp.{JdbcSource, KafkaSource, RowWithIdx, StreamSource}
Expand All @@ -17,7 +20,6 @@ import ru.itclover.tsp.dsl.PatternFieldExtractor
import ru.itclover.tsp.http.domain.input.{FindPatternsRequest, QueueableRequest}
import ru.itclover.tsp.http.protocols.RoutesProtocols
import ru.itclover.tsp.http.services.coordinator.CoordinatorService
import ru.itclover.tsp.http.services.streaming.MonitoringServiceModel.{JobDetails, Vertex, VertexMetrics}
import ru.itclover.tsp.streaming.io.{InputConf, JDBCInputConf, KafkaInputConf}
import ru.itclover.tsp.streaming.io.{JDBCOutputConf, KafkaOutputConf, OutputConf}
import ru.itclover.tsp.streaming.mappers.PatternsToRowMapper
Expand All @@ -35,7 +37,7 @@ import scala.concurrent.{Await, ExecutionContextExecutor, Future}
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
import collection.JavaConverters._

import scala.collection.mutable.ListBuffer

class QueueManagerService(id: String, blockingExecutionContext: ExecutionContextExecutor)(
implicit executionContext: ExecutionContextExecutor,
Expand All @@ -49,7 +51,6 @@ class QueueManagerService(id: String, blockingExecutionContext: ExecutionContext
type TypedRequest = (QueueableRequest, String)
type Request = FindPatternsRequest[RowWithIdx, Symbol, Any, Row]


case class Metric(id: String, value: String)

implicit val metricFmt = jsonFormat2(Metric.apply)
Expand All @@ -60,6 +61,8 @@ class QueueManagerService(id: String, blockingExecutionContext: ExecutionContext
//log.warn(s"Recovering job queue: ${jobQueue.count} entries found")
val jobQueue = mutable.Queue[TypedRequest]()

val runningStreams = mutable.Map[String, SignallingRef[IO, Boolean]]()

val isLocalhost: Boolean = true

val ex = new ScheduledThreadPoolExecutor(1)
Expand All @@ -72,17 +75,15 @@ class QueueManagerService(id: String, blockingExecutionContext: ExecutionContext

def enqueue(r: Request): Unit = {
jobQueue.enqueue(
(r,
confClassTagToString(ClassTag(r.inputConf.getClass))
)
)
(r, confClassTagToString(ClassTag(r.inputConf.getClass)))
)
log.info(s"Job ${r.uuid} enqueued.")
}

def confClassTagToString(ct: ClassTag[_]): String = ct.runtimeClass match {
case c if c.isAssignableFrom(classOf[JDBCInputConf]) => "from-jdbc"
case c if c.isAssignableFrom(classOf[JDBCInputConf]) => "from-jdbc"
case c if c.isAssignableFrom(classOf[KafkaInputConf]) => "from-kafka"
case _ => "unknown"
case _ => "unknown"
}

def getQueuedJobs: Seq[QueueableRequest] = jobQueue.map(_._1).toSeq
Expand All @@ -101,7 +102,9 @@ class QueueManagerService(id: String, blockingExecutionContext: ExecutionContext
_ = log.info("JDBC-to-JDBC: stream started")
} yield result
resultOrErr match {
case Left(error) => log.error(s"Cannot run request. Reason: $error")
case Left(error) =>
log.error(s"Cannot run request. Reason: $error")
CoordinatorService.notifyJobCompleted(uuid, Some(new Exception(error.toString)))
case Right(_) => log.info(s"Stream successfully started!")
}
}
Expand All @@ -124,7 +127,6 @@ class QueueManagerService(id: String, blockingExecutionContext: ExecutionContext
}
}


/*def dequeueAndRun(slots: Int): Unit = {
// TODO: Functional style
var slotsRemaining = slots
Expand Down Expand Up @@ -204,24 +206,39 @@ class QueueManagerService(id: String, blockingExecutionContext: ExecutionContext
CoordinatorService.notifyJobStarted(uuid)

// Run the streams (multiple sinks)
streams.foreach { stream =>
stream.compile.drain.unsafeRunAsync {
case Left(throwable) =>
log.error(s"Job $uuid failed: $throwable")
CoordinatorService.notifyJobCompleted(uuid, Some(throwable))
case Right(_) =>
// success
log.info(s"Job $uuid finished")
CoordinatorService.notifyJobCompleted(uuid, None)
}
}
SignallingRef[IO, Boolean](false)
.flatMap { signal =>
runningStreams(uuid) = signal
streams
.sequence
.interruptWhen(signal)
.compile
.drain
}
.unsafeRunAsync {
case Left(throwable) =>
log.error(s"Job $uuid failed: $throwable")
CoordinatorService.notifyJobCompleted(uuid, Some(throwable))
runningStreams.remove(uuid)
case Right(_) =>
// success
log.info(s"Job $uuid finished")
CoordinatorService.notifyJobCompleted(uuid, None)
runningStreams.remove(uuid)
}

log.debug("runStream finished")
Right(None)
}

def availableSlots: Future[Int] = Future(32)
def stopStream(uuid: String): Unit = runningStreams.get(uuid).map { signal =>
log.info(s"Job $uuid stopped")
signal.set(true)
}

def getRunningJobsIds: Seq[String] = runningStreams.keys.toSeq

def availableSlots: Future[Int] = Future(32)

def onTimer(): Unit = {
availableSlots.onComplete {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import cats.effect.IO
import cats.effect.unsafe.implicits.global
import com.dimafeng.testcontainers._
import fs2.kafka.{Acks, KafkaProducer, ProducerRecord, ProducerRecords, ProducerSettings, Serializer}
import ru.itclover.tsp.http.services.queuing.QueueManagerService
import ru.itclover.tsp.streaming.io.{IntESValue, StringESValue}

import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -48,6 +49,8 @@ class SimpleCasesTest
with RoutesProtocols {
implicit override val executionContext: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global

override val queueManagerService: QueueManagerService = QueueManagerService.getOrCreate("mgr", executionContext)

// to run blocking tasks.
val blockingExecutorContext: ExecutionContextExecutor =
ExecutionContext.fromExecutor(
Expand Down
Loading

0 comments on commit 4b90591

Please sign in to comment.