diff --git a/http/src/main/scala/ru/itclover/tsp/http/services/queuing/QueueManagerService.scala b/http/src/main/scala/ru/itclover/tsp/http/services/queuing/QueueManagerService.scala index 938bb209..7dbe39f8 100644 --- a/http/src/main/scala/ru/itclover/tsp/http/services/queuing/QueueManagerService.scala +++ b/http/src/main/scala/ru/itclover/tsp/http/services/queuing/QueueManagerService.scala @@ -19,7 +19,7 @@ import ru.itclover.tsp.core.io.{AnyDecodersInstances, BasicDecoders} import ru.itclover.tsp.dsl.PatternFieldExtractor import ru.itclover.tsp.http.domain.input.{FindPatternsRequest, QueueableRequest} import ru.itclover.tsp.http.routes.JobReporting -import ru.itclover.tsp.http.services.streaming.StatusReporter +import ru.itclover.tsp.http.services.streaming.{ConsoleStatusReporter, StatusReporter} import ru.itclover.tsp.io.input.{InfluxDBInputConf, InputConf, JDBCInputConf, KafkaInputConf} import ru.itclover.tsp.io.output.{JDBCOutputConf, KafkaOutputConf, OutputConf} import ru.itclover.tsp.mappers.PatternsToRowMapper @@ -57,6 +57,11 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx val jobQueue: mutable.PriorityQueue[TypedRequest] = mutable.PriorityQueue.empty + val isLocalhost: Boolean = uri.authority.host.toString match { + case "localhost" | "127.0.0.1" | "::1" => true + case _ => false + } + val ex = new ScheduledThreadPoolExecutor(1) val task: Runnable = new Runnable { def run(): Unit = onTimer() @@ -86,6 +91,10 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx result <- runStream(uuid) _ = log.info("JDBC-to-JDBC: stream started") } yield result + resultOrErr match { + case Left(error) => log.error(s"Cannot run request. Reason: $error") + case Right(_) => log.info(s"Stream successfully started!") + } } def runJdbcToKafka(request: FindPatternsRequest[JDBCInputConf, KafkaOutputConf]): Unit = { @@ -97,6 +106,10 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx _ <- createStream(patterns, inputConf, outConf, source) result <- runStream(uuid) } yield result + resultOrErr match { + case Left(error) => log.error(s"Cannot run request. Reason: $error") + case Right(_) => log.info(s"Stream successfully started!") + } } def runKafkaToJdbc(request: FindPatternsRequest[KafkaInputConf, JDBCOutputConf]): Unit = { @@ -111,6 +124,10 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx result <- runStream(uuid) _ = log.info("Kafka runStream done") } yield result + resultOrErr match { + case Left(error) => log.error(s"Cannot run request. Reason: $error") + case Right(_) => log.info(s"Stream successfully started!") + } } def runKafkaToKafka(request: FindPatternsRequest[KafkaInputConf, KafkaOutputConf]): Unit = { @@ -125,6 +142,10 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx result <- runStream(uuid) _ = log.info("Kafka runStream done") } yield result + resultOrErr match { + case Left(error) => log.error(s"Cannot run request. Reason: $error") + case Right(_) => log.info(s"Stream successfully started!") + } } def runInfluxToJdbc(request: FindPatternsRequest[InfluxDBInputConf, JDBCOutputConf]): Unit = { @@ -136,6 +157,10 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx _ <- createStream(patterns, inputConf, outConf, source) result <- runStream(uuid) } yield result + resultOrErr match { + case Left(error) => log.error(s"Cannot run request. Reason: $error") + case Right(_) => log.info(s"Stream successfully started!") + } } def runInfluxToKafka(request: FindPatternsRequest[InfluxDBInputConf, KafkaOutputConf]): Unit = { @@ -152,7 +177,7 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx def dequeueAndRun(slots: Int): Unit = { // TODO: Functional style var slotsRemaining = slots - while (jobQueue.nonEmpty && slotsRemaining > jobQueue.head._1.requiredSlots) { + while (jobQueue.nonEmpty && slotsRemaining >= jobQueue.head._1.requiredSlots) { val request = jobQueue.dequeue() slotsRemaining -= request._1.requiredSlots run(request) @@ -221,6 +246,9 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx StatusReporter(uuid, value.brokers, value.topic) ) case None => + streamEnv.registerJobListener( + ConsoleStatusReporter(uuid) + ) } streamEnv.execute(uuid) }(blockingExecutionContext) @@ -229,7 +257,7 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx Right(None) } - def availableSlots: Future[Int] = + def availableSlots: Future[Int] = if (isLocalhost) Future(32) else Http() .singleRequest(HttpRequest(uri = uri.toString + "/jobmanager/metrics?get=taskSlotsAvailable")) .flatMap(resp => Unmarshal(resp).to[Seq[Metric]]) @@ -241,10 +269,8 @@ class QueueManagerService(uri: Uri, blockingExecutionContext: ExecutionContextEx log.info(s"$slots slots available") dequeueAndRun(slots) } else { + if (jobQueue.nonEmpty) log.info( - if (jobQueue.isEmpty) - s"$slots slots available, but the queue is empty" - else s"Waiting for free slot ($slots available), cannot run jobs right now" ) } diff --git a/http/src/main/scala/ru/itclover/tsp/http/services/streaming/ConsoleStatusReporter.scala b/http/src/main/scala/ru/itclover/tsp/http/services/streaming/ConsoleStatusReporter.scala new file mode 100644 index 00000000..efe2f9ca --- /dev/null +++ b/http/src/main/scala/ru/itclover/tsp/http/services/streaming/ConsoleStatusReporter.scala @@ -0,0 +1,60 @@ +package ru.itclover.tsp.http.services.streaming + +import com.typesafe.scalalogging.Logger +import org.apache.flink.api.common.{JobExecutionResult, JobID} +import org.apache.flink.core.execution.{JobClient, JobListener} +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} + + +import scala.util.Try + +case class ConsoleStatusReporter(jobName: String) + (implicit executionEnvironment: StreamExecutionEnvironment) + extends JobListener { + + + var client: Option[JobClient] = None + + val log = Logger[ConsoleStatusReporter] + + override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = { + if (jobClient != null) client = Some(jobClient) + val msg = StatusMessage( + jobName, + Try(jobClient.getJobStatus.get().name).toOption.getOrElse("no status"), + client match { + case Some(value) => s"Job submitted with id ${value.getJobID}" + case None => s"Job submission failed" + } + ) + log.info(f"Job ${msg.uuid}: status=${msg.status}, message=${msg.text}") + } + + def unregisterSelf(): Unit = { + executionEnvironment.getJavaEnv.getJobListeners.remove(this) + } + + override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = { + client.foreach { c => + val status = Try(c.getJobStatus.get().name).getOrElse("status unknown") + val msg = StatusMessage( + jobName, + status, + throwable match { + case null => + // Unregister + unregisterSelf() + s"Job executed with no exceptions in ${jobExecutionResult.getNetRuntime} ms" + case _ => s"Job executed with exception: ${throwable.getStackTrace.mkString("\n")}" + } + ) + log.info(f"Job ${msg.uuid}: status=${msg.status}, message=${msg.text}") + status match { + case "FINISHED" | "CANCELED" => + // Unregister + unregisterSelf() + } + } + } +} diff --git a/http/src/main/scala/ru/itclover/tsp/http/services/streaming/StatusReporter.scala b/http/src/main/scala/ru/itclover/tsp/http/services/streaming/StatusReporter.scala index 9b8fb8da..aa0981e3 100644 --- a/http/src/main/scala/ru/itclover/tsp/http/services/streaming/StatusReporter.scala +++ b/http/src/main/scala/ru/itclover/tsp/http/services/streaming/StatusReporter.scala @@ -3,6 +3,7 @@ package ru.itclover.tsp.http.services.streaming import com.fasterxml.jackson.databind.ObjectMapper import org.apache.flink.api.common.{JobExecutionResult, JobID} import org.apache.flink.core.execution.{JobClient, JobListener} +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.Serializer @@ -25,7 +26,9 @@ class StatusMessageSerializer extends Serializer[StatusMessage] { objectMapper.writeValueAsBytes(data) } -case class StatusReporter(jobName: String, brokers: String, topic: String) extends JobListener { +case class StatusReporter(jobName: String, brokers: String, topic: String) + (implicit executionEnvironment: StreamExecutionEnvironment) + extends JobListener { val config: Map[String, Object] = Map( "bootstrap.servers" -> brokers, @@ -55,9 +58,13 @@ case class StatusReporter(jobName: String, brokers: String, topic: String) exten messageProducer.flush() } + def unregisterSelf(): Unit = { + executionEnvironment.getJavaEnv.getJobListeners.remove(this) + } + override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = { client.foreach { c => - val status = c.getJobStatus.get().name + val status = Try(c.getJobStatus.get().name).getOrElse("status unknown") val record = new ProducerRecord[String, StatusMessage]( topic, LocalDateTime.now.toString, @@ -65,15 +72,18 @@ case class StatusReporter(jobName: String, brokers: String, topic: String) exten jobName, status, throwable match { - case null => s"Job executed with no exceptions in ${jobExecutionResult.getNetRuntime} ms" + case null => + // Unregister + unregisterSelf() + s"Job executed with no exceptions in ${jobExecutionResult.getNetRuntime} ms" case _ => s"Job executed with exception: ${throwable.getStackTrace.mkString("\n")}" } ) ) status match { case "FINISHED" | "CANCELED" => - // Clear client value - client = None + // Unregister + unregisterSelf() } messageProducer.send(record) messageProducer.flush()