Skip to content

Commit

Permalink
TSP-435 some batch fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
trolley813 committed Dec 1, 2021
1 parent 2aa9aec commit 092d6f3
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,23 @@ case class ConsoleStatusReporter(jobName: String)

override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {
client.foreach { c =>
if (jobExecutionResult != null && c.getJobID.toHexString != jobExecutionResult.getJobID.toHexString) {
return
}
val status = Try(c.getJobStatus.get().name).getOrElse("status unknown")
val msg = StatusMessage(
jobName,
status,
throwable match {
case null =>
// Unregister
unregisterSelf()
s"Job executed with no exceptions in ${jobExecutionResult.getNetRuntime} ms"
case _ => s"Job executed with exception: ${throwable.getStackTrace.mkString("\n")}"
case _ =>
s"Job executed with exception: ${throwable.getStackTrace.mkString("\n")}"
}
)
// Unregister
unregisterSelf()
log.info(f"Job ${msg.uuid}: status=${msg.status}, message=${msg.text}")
status match {
case "FINISHED" | "CANCELED" =>
// Unregister
unregisterSelf()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,23 @@ case class StatusReporter(jobName: String, brokers: String, topic: String)
var client: Option[JobClient] = None

override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
if (jobClient != null) client = Some(jobClient)
val record = new ProducerRecord[String, StatusMessage](
topic,
LocalDateTime.now.toString,
StatusMessage(
jobName,
Try(jobClient.getJobStatus.get().name).toOption.getOrElse("no status"),
client match {
case Some(value) => s"Job submitted with id ${value.getJobID}"
case None => s"Job submission failed"
}
if (jobClient != null && client.isEmpty) {
client = Some(jobClient)
val record = new ProducerRecord[String, StatusMessage](
topic,
LocalDateTime.now.toString,
StatusMessage(
jobName,
Try(jobClient.getJobStatus.get().name).toOption.getOrElse("no status"),
client match {
case Some(value) => s"Job submitted with id ${value.getJobID}"
case None => s"Job submission failed"
}
)
)
)
messageProducer.send(record)
messageProducer.flush()
messageProducer.send(record)
messageProducer.flush()
}
}

def unregisterSelf(): Unit = {
Expand All @@ -64,6 +66,9 @@ case class StatusReporter(jobName: String, brokers: String, topic: String)

override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {
client.foreach { c =>
if (jobExecutionResult != null && c.getJobID.toHexString != jobExecutionResult.getJobID.toHexString) {
return
}
val status = Try(c.getJobStatus.get().name).getOrElse("status unknown")
val record = new ProducerRecord[String, StatusMessage](
topic,
Expand All @@ -73,18 +78,13 @@ case class StatusReporter(jobName: String, brokers: String, topic: String)
status,
throwable match {
case null =>
// Unregister
unregisterSelf()
s"Job executed with no exceptions in ${jobExecutionResult.getNetRuntime} ms"
case _ => s"Job executed with exception: ${throwable.getStackTrace.mkString("\n")}"
}
)
)
status match {
case "FINISHED" | "CANCELED" =>
// Unregister
unregisterSelf()
}
// Unregister
unregisterSelf()
messageProducer.send(record)
messageProducer.flush()
}
Expand Down

0 comments on commit 092d6f3

Please sign in to comment.