Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start untangling orchestrator #1739

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
refactor(orchestrator): Rename getCurrentOrtRun to getOrtRun
The word "current" does not carry any meaning in the context of the
function.

Signed-off-by: Martin Nonnenmacher <[email protected]>
mnonnenmacher committed Jan 6, 2025
commit 391a7333b956d3c15923e255c42546089110b219
10 changes: 5 additions & 5 deletions orchestrator/src/main/kotlin/Orchestrator.kt
Original file line number Diff line number Diff line change
@@ -113,7 +113,7 @@ class Orchestrator(
*/
fun handleConfigWorkerResult(header: MessageHeader, configWorkerResult: ConfigWorkerResult) {
db.blockingQueryCatching(transactionIsolation = isolationLevel) {
val ortRun = getCurrentOrtRun(configWorkerResult.ortRunId)
val ortRun = getOrtRun(configWorkerResult.ortRunId)

nextJobsToSchedule(ConfigEndpoint, ortRun.id, header, jobs = emptyMap())
}.onSuccess { (context, schedules) ->
@@ -255,7 +255,7 @@ class Orchestrator(
repository.tryComplete(job.id, Clock.System.now(), JobStatus.FAILED)?.let {
nextJobsToSchedule(Endpoint.fromConfigPrefix(workerError.endpointName), job.ortRunId, header)
}
} ?: (createWorkerScheduleContext(getCurrentOrtRun(ortRunId), header, failed = true) to emptyList())
} ?: (createWorkerScheduleContext(getOrtRun(ortRunId), header, failed = true) to emptyList())
}.onSuccess { (context, schedules) ->
scheduleCreatedJobs(context, schedules)
}.onFailure {
@@ -271,7 +271,7 @@ class Orchestrator(
log.info("Handling a lost schedule for ORT run {}.", lostSchedule.ortRunId)

db.blockingQueryCatching(transactionIsolation = isolationLevel) {
val ortRun = getCurrentOrtRun(lostSchedule.ortRunId)
val ortRun = getOrtRun(lostSchedule.ortRunId)
val context = createWorkerScheduleContext(ortRun, header)

if (context.jobs.isNotEmpty()) {
@@ -289,7 +289,7 @@ class Orchestrator(
/**
* Obtain the [OrtRun] with the given [ortRunId] of fail with an exception if it does not exist.
*/
private fun getCurrentOrtRun(ortRunId: Long): OrtRun =
private fun getOrtRun(ortRunId: Long): OrtRun =
requireNotNull(ortRunRepository.get(ortRunId)) {
"ORT run '$ortRunId' not found."
}
@@ -362,7 +362,7 @@ class Orchestrator(
): Pair<WorkerScheduleContext, List<JobScheduleFunc>> {
log.info("Handling a completed job for endpoint '{}' and ORT run {}.", endpoint.configPrefix, ortRunId)

val ortRun = getCurrentOrtRun(ortRunId)
val ortRun = getOrtRun(ortRunId)
val scheduleContext = createWorkerScheduleContext(ortRun, header, workerJobs = jobs)

return fetchNextJobs(scheduleContext)