Skip to content

Commit

Permalink
TSP-435 some batch fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
trolley813 committed Dec 1, 2021
1 parent 2aa9aec commit d0d3282
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,23 @@ case class ConsoleStatusReporter(jobName: String)

override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {
client.foreach { c =>
if (jobExecutionResult != null && c.getJobID.toHexString != jobExecutionResult.getJobID.toHexString) {
return
}
val status = Try(c.getJobStatus.get().name).getOrElse("status unknown")
val msg = StatusMessage(
jobName,
status,
throwable match {
case null =>
// Unregister
unregisterSelf()
s"Job executed with no exceptions in ${jobExecutionResult.getNetRuntime} ms"
case _ => s"Job executed with exception: ${throwable.getStackTrace.mkString("\n")}"
case _ =>
s"Job executed with exception: ${throwable.getStackTrace.mkString("\n")}"
}
)
// Unregister
unregisterSelf()
log.info(f"Job ${msg.uuid}: status=${msg.status}, message=${msg.text}")
status match {
case "FINISHED" | "CANCELED" =>
// Unregister
unregisterSelf()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ case class StatusReporter(jobName: String, brokers: String, topic: String)

override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {
client.foreach { c =>
if (jobExecutionResult != null && c.getJobID.toHexString != jobExecutionResult.getJobID.toHexString) {
return
}
val status = Try(c.getJobStatus.get().name).getOrElse("status unknown")
val record = new ProducerRecord[String, StatusMessage](
topic,
Expand All @@ -73,18 +76,13 @@ case class StatusReporter(jobName: String, brokers: String, topic: String)
status,
throwable match {
case null =>
// Unregister
unregisterSelf()
s"Job executed with no exceptions in ${jobExecutionResult.getNetRuntime} ms"
case _ => s"Job executed with exception: ${throwable.getStackTrace.mkString("\n")}"
}
)
)
status match {
case "FINISHED" | "CANCELED" =>
// Unregister
unregisterSelf()
}
// Unregister
unregisterSelf()
messageProducer.send(record)
messageProducer.flush()
}
Expand Down

0 comments on commit d0d3282

Please sign in to comment.