Skip to content

Commit

Permalink
TSP-412, TSP-428 statuses fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
trolley813 committed Nov 23, 2021
1 parent 5b1c677 commit 6b65243
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import ru.itclover.tsp.core.io.{AnyDecodersInstances, BasicDecoders}
import ru.itclover.tsp.dsl.PatternFieldExtractor
import ru.itclover.tsp.http.domain.input.{FindPatternsRequest, QueueableRequest}
import ru.itclover.tsp.http.routes.JobReporting
import ru.itclover.tsp.http.services.streaming.StatusReporter
import ru.itclover.tsp.http.services.streaming.{ConsoleStatusReporter, StatusReporter}
import ru.itclover.tsp.io.input.{InfluxDBInputConf, InputConf, JDBCInputConf, KafkaInputConf}
import ru.itclover.tsp.io.output.{JDBCOutputConf, KafkaOutputConf, OutputConf}
import ru.itclover.tsp.mappers.PatternsToRowMapper
Expand Down Expand Up @@ -57,6 +57,11 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx

val jobQueue: mutable.PriorityQueue[TypedRequest] = mutable.PriorityQueue.empty

val isLocalhost: Boolean = uri.authority.host.toString match {
case "localhost" | "127.0.0.1" | "::1" => true
case _ => false
}

val ex = new ScheduledThreadPoolExecutor(1)
val task: Runnable = new Runnable {
def run(): Unit = onTimer()
Expand Down Expand Up @@ -86,6 +91,10 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx
result <- runStream(uuid)
_ = log.info("JDBC-to-JDBC: stream started")
} yield result
resultOrErr match {
case Left(error) => log.error(s"Cannot run request. Reason: $error")
case Right(_) => log.info(s"Stream successfully started!")
}
}

def runJdbcToKafka(request: FindPatternsRequest[JDBCInputConf, KafkaOutputConf]): Unit = {
Expand All @@ -97,6 +106,10 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx
_ <- createStream(patterns, inputConf, outConf, source)
result <- runStream(uuid)
} yield result
resultOrErr match {
case Left(error) => log.error(s"Cannot run request. Reason: $error")
case Right(_) => log.info(s"Stream successfully started!")
}
}

def runKafkaToJdbc(request: FindPatternsRequest[KafkaInputConf, JDBCOutputConf]): Unit = {
Expand All @@ -111,6 +124,10 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx
result <- runStream(uuid)
_ = log.info("Kafka runStream done")
} yield result
resultOrErr match {
case Left(error) => log.error(s"Cannot run request. Reason: $error")
case Right(_) => log.info(s"Stream successfully started!")
}
}

def runKafkaToKafka(request: FindPatternsRequest[KafkaInputConf, KafkaOutputConf]): Unit = {
Expand All @@ -125,6 +142,10 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx
result <- runStream(uuid)
_ = log.info("Kafka runStream done")
} yield result
resultOrErr match {
case Left(error) => log.error(s"Cannot run request. Reason: $error")
case Right(_) => log.info(s"Stream successfully started!")
}
}

def runInfluxToJdbc(request: FindPatternsRequest[InfluxDBInputConf, JDBCOutputConf]): Unit = {
Expand All @@ -136,6 +157,10 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx
_ <- createStream(patterns, inputConf, outConf, source)
result <- runStream(uuid)
} yield result
resultOrErr match {
case Left(error) => log.error(s"Cannot run request. Reason: $error")
case Right(_) => log.info(s"Stream successfully started!")
}
}

def runInfluxToKafka(request: FindPatternsRequest[InfluxDBInputConf, KafkaOutputConf]): Unit = {
Expand All @@ -152,7 +177,7 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx
def dequeueAndRun(slots: Int): Unit = {
// TODO: Functional style
var slotsRemaining = slots
while (jobQueue.nonEmpty && slotsRemaining > jobQueue.head._1.requiredSlots) {
while (jobQueue.nonEmpty && slotsRemaining >= jobQueue.head._1.requiredSlots) {
val request = jobQueue.dequeue()
slotsRemaining -= request._1.requiredSlots
run(request)
Expand Down Expand Up @@ -221,6 +246,9 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx
StatusReporter(uuid, value.brokers, value.topic)
)
case None =>
streamEnv.registerJobListener(
ConsoleStatusReporter(uuid)
)
}
streamEnv.execute(uuid)
}(blockingExecutionContext)
Expand All @@ -229,7 +257,7 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx
Right(None)
}

def availableSlots: Future[Int] =
def availableSlots: Future[Int] = if (isLocalhost) Future(32) else
Http()
.singleRequest(HttpRequest(uri = uri.toString + "/jobmanager/metrics?get=taskSlotsAvailable"))
.flatMap(resp => Unmarshal(resp).to[Seq[Metric]])
Expand All @@ -241,10 +269,8 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx
log.info(s"$slots slots available")
dequeueAndRun(slots)
} else {
if (jobQueue.nonEmpty)
log.info(
if (jobQueue.isEmpty)
s"$slots slots available, but the queue is empty"
else
s"Waiting for free slot ($slots available), cannot run jobs right now"
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package ru.itclover.tsp.http.services.streaming

import com.typesafe.scalalogging.Logger
import org.apache.flink.api.common.{JobExecutionResult, JobID}
import org.apache.flink.core.execution.{JobClient, JobListener}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}


import scala.util.Try

case class ConsoleStatusReporter(jobName: String)
(implicit executionEnvironment: StreamExecutionEnvironment)
extends JobListener {


var client: Option[JobClient] = None

val log = Logger[ConsoleStatusReporter]

override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
if (jobClient != null) client = Some(jobClient)
val msg = StatusMessage(
jobName,
Try(jobClient.getJobStatus.get().name).toOption.getOrElse("no status"),
client match {
case Some(value) => s"Job submitted with id ${value.getJobID}"
case None => s"Job submission failed"
}
)
log.info(f"Job ${msg.uuid}: status=${msg.status}, message=${msg.text}")
}

def unregisterSelf(): Unit = {
executionEnvironment.getJavaEnv.getJobListeners.remove(this)
}

override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {
client.foreach { c =>
val status = Try(c.getJobStatus.get().name).getOrElse("status unknown")
val msg = StatusMessage(
jobName,
status,
throwable match {
case null =>
// Unregister
unregisterSelf()
s"Job executed with no exceptions in ${jobExecutionResult.getNetRuntime} ms"
case _ => s"Job executed with exception: ${throwable.getStackTrace.mkString("\n")}"
}
)
log.info(f"Job ${msg.uuid}: status=${msg.status}, message=${msg.text}")
status match {
case "FINISHED" | "CANCELED" =>
// Unregister
unregisterSelf()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ru.itclover.tsp.http.services.streaming
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.api.common.{JobExecutionResult, JobID}
import org.apache.flink.core.execution.{JobClient, JobListener}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.Serializer

Expand All @@ -25,7 +26,9 @@ class StatusMessageSerializer extends Serializer[StatusMessage] {
objectMapper.writeValueAsBytes(data)
}

case class StatusReporter(jobName: String, brokers: String, topic: String) extends JobListener {
case class StatusReporter(jobName: String, brokers: String, topic: String)
(implicit executionEnvironment: StreamExecutionEnvironment)
extends JobListener {

val config: Map[String, Object] = Map(
"bootstrap.servers" -> brokers,
Expand Down Expand Up @@ -55,25 +58,32 @@ case class StatusReporter(jobName: String, brokers: String, topic: String) exten
messageProducer.flush()
}

def unregisterSelf(): Unit = {
executionEnvironment.getJavaEnv.getJobListeners.remove(this)
}

override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {
client.foreach { c =>
val status = c.getJobStatus.get().name
val status = Try(c.getJobStatus.get().name).getOrElse("status unknown")
val record = new ProducerRecord[String, StatusMessage](
topic,
LocalDateTime.now.toString,
StatusMessage(
jobName,
status,
throwable match {
case null => s"Job executed with no exceptions in ${jobExecutionResult.getNetRuntime} ms"
case null =>
// Unregister
unregisterSelf()
s"Job executed with no exceptions in ${jobExecutionResult.getNetRuntime} ms"
case _ => s"Job executed with exception: ${throwable.getStackTrace.mkString("\n")}"
}
)
)
status match {
case "FINISHED" | "CANCELED" =>
// Clear client value
client = None
// Unregister
unregisterSelf()
}
messageProducer.send(record)
messageProducer.flush()
Expand Down

0 comments on commit 6b65243

Please sign in to comment.