From 18acb5318344db3d4bf8584300c644db12d38666 Mon Sep 17 00:00:00 2001 From: Martin Nonnenmacher Date: Fri, 27 Dec 2024 14:01:55 +0100 Subject: [PATCH 01/10] chore(orchestrator): Fix a function name Signed-off-by: Martin Nonnenmacher --- orchestrator/src/main/kotlin/Orchestrator.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/orchestrator/src/main/kotlin/Orchestrator.kt b/orchestrator/src/main/kotlin/Orchestrator.kt index da34339d8b..a2b95023a8 100644 --- a/orchestrator/src/main/kotlin/Orchestrator.kt +++ b/orchestrator/src/main/kotlin/Orchestrator.kt @@ -251,7 +251,7 @@ class Orchestrator( repository.tryComplete(job.id, Clock.System.now(), JobStatus.FAILED)?.let { nextJobsToSchedule(Endpoint.fromConfigPrefix(workerError.endpointName), job.ortRunId, header) } - } ?: (createWorkerSchedulerContext(getCurrentOrtRun(ortRunId), header, failed = true) to emptyList()) + } ?: (createWorkerScheduleContext(getCurrentOrtRun(ortRunId), header, failed = true) to emptyList()) }.scheduleNextJobs { log.warn("Failed to handle 'WorkerError' message.", it) } @@ -266,7 +266,7 @@ class Orchestrator( db.blockingQueryCatching(transactionIsolation = isolationLevel) { val ortRun = getCurrentOrtRun(lostSchedule.ortRunId) - val context = createWorkerSchedulerContext(ortRun, header) + val context = createWorkerScheduleContext(ortRun, header) if (context.jobs.isNotEmpty()) { fetchNextJobs(context) @@ -353,7 +353,7 @@ class Orchestrator( log.info("Handling a completed job for endpoint '{}' and ORT run {}.", endpoint.configPrefix, ortRunId) val ortRun = getCurrentOrtRun(ortRunId) - val scheduleContext = createWorkerSchedulerContext(ortRun, header, workerJobs = jobs) + val scheduleContext = createWorkerScheduleContext(ortRun, header, workerJobs = jobs) return fetchNextJobs(scheduleContext) } @@ -377,7 +377,7 @@ class Orchestrator( * The context is initialized with the status of all jobs for this run, either from the given [workerJobs] * parameter or by loading the job status from the database. */ - private fun createWorkerSchedulerContext( + private fun createWorkerScheduleContext( ortRun: OrtRun, header: MessageHeader, failed: Boolean = false, From f5b0f6d8ae964dfb76e225d741e0c9d34e339fe9 Mon Sep 17 00:00:00 2001 From: Martin Nonnenmacher Date: Fri, 27 Dec 2024 14:02:18 +0100 Subject: [PATCH 02/10] chore(orchestrator): Remove unnecessary whitespace Signed-off-by: Martin Nonnenmacher --- orchestrator/src/main/kotlin/Orchestrator.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/orchestrator/src/main/kotlin/Orchestrator.kt b/orchestrator/src/main/kotlin/Orchestrator.kt index a2b95023a8..02033740d3 100644 --- a/orchestrator/src/main/kotlin/Orchestrator.kt +++ b/orchestrator/src/main/kotlin/Orchestrator.kt @@ -404,9 +404,7 @@ class Orchestrator( val ortRunStatus = when { context.isFailed() -> OrtRunStatus.FAILED - context.isFinishedWithIssues() -> OrtRunStatus.FINISHED_WITH_ISSUES - else -> OrtRunStatus.FINISHED } From be8a3cb5bdd2f1cc0e260878e187720be7027e8c Mon Sep 17 00:00:00 2001 From: Martin Nonnenmacher Date: Fri, 27 Dec 2024 17:18:52 +0100 Subject: [PATCH 03/10] chore(orchestrator): Remove an unnecessary suppression Signed-off-by: Martin Nonnenmacher --- orchestrator/src/main/kotlin/Orchestrator.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestrator/src/main/kotlin/Orchestrator.kt b/orchestrator/src/main/kotlin/Orchestrator.kt index 02033740d3..bf76708b4d 100644 --- a/orchestrator/src/main/kotlin/Orchestrator.kt +++ b/orchestrator/src/main/kotlin/Orchestrator.kt @@ -78,7 +78,7 @@ private val log = LoggerFactory.getLogger(Orchestrator::class.java) * It creates jobs for the single processing steps and passes them to the corresponding workers. It collects the results * produced by the workers until the complete ORT result is available or the run has failed. */ -@Suppress("LongParameterList", "TooManyFunctions") +@Suppress("TooManyFunctions") class Orchestrator( private val db: Database, private val workerJobRepositories: WorkerJobRepositories, From 3b2f14f169ca92c8d39964d566890eb8ea256987 Mon Sep 17 00:00:00 2001 From: Martin Nonnenmacher Date: Fri, 27 Dec 2024 18:06:27 +0100 Subject: [PATCH 04/10] refactor(orchestrator): Inline the `scheduleNextJobs` function This makes the code more explicit and helps with upcoming refactorings. Also, the function name was confusing because it was not only responsible for scheduling jobs but also for executing the `onFailure` handler from the caller. Signed-off-by: Martin Nonnenmacher --- orchestrator/src/main/kotlin/Orchestrator.kt | 34 +++++++++----------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/orchestrator/src/main/kotlin/Orchestrator.kt b/orchestrator/src/main/kotlin/Orchestrator.kt index bf76708b4d..3f1cc181b4 100644 --- a/orchestrator/src/main/kotlin/Orchestrator.kt +++ b/orchestrator/src/main/kotlin/Orchestrator.kt @@ -101,7 +101,9 @@ class Orchestrator( val context = WorkerScheduleContext(ortRun, workerJobRepositories, publisher, header, emptyMap()) context to listOf { scheduleConfigWorkerJob(ortRun, header, updateRun = true) } - }.scheduleNextJobs { + }.onSuccess { (context, schedules) -> + scheduleCreatedJobs(context, schedules) + }.onFailure { log.warn("Failed to handle 'CreateOrtRun' message.", it) } } @@ -114,7 +116,9 @@ class Orchestrator( val ortRun = getCurrentOrtRun(configWorkerResult.ortRunId) nextJobsToSchedule(ConfigEndpoint, ortRun.id, header, jobs = emptyMap()) - }.scheduleNextJobs { + }.onSuccess { (context, schedules) -> + scheduleCreatedJobs(context, schedules) + }.onFailure { log.warn("Failed to handle 'ConfigWorkerResult' message.", it) } } @@ -252,7 +256,9 @@ class Orchestrator( nextJobsToSchedule(Endpoint.fromConfigPrefix(workerError.endpointName), job.ortRunId, header) } } ?: (createWorkerScheduleContext(getCurrentOrtRun(ortRunId), header, failed = true) to emptyList()) - }.scheduleNextJobs { + }.onSuccess { (context, schedules) -> + scheduleCreatedJobs(context, schedules) + }.onFailure { log.warn("Failed to handle 'WorkerError' message.", it) } } @@ -273,7 +279,9 @@ class Orchestrator( } else { context to listOf { scheduleConfigWorkerJob(ortRun, header, updateRun = false) } } - }.scheduleNextJobs { + }.onSuccess { (context, schedules) -> + scheduleCreatedJobs(context, schedules) + }.onFailure { log.warn("Failed to handle 'LostSchedule' message.", it) } } @@ -333,7 +341,9 @@ class Orchestrator( if (issues.isNotEmpty()) ortRunRepository.update(job.ortRunId, issues = issues.asPresent()) nextJobsToSchedule(endpoint, job.ortRunId, header) - }.scheduleNextJobs { + }.onSuccess { (context, schedules) -> + scheduleCreatedJobs(context, schedules) + }.onFailure { log.warn("Failed to handle '{}' message.", message::class.java.simpleName, it) } } @@ -358,20 +368,6 @@ class Orchestrator( return fetchNextJobs(scheduleContext) } - /** - * Convenience function to evaluate and process this [Result] with information about the next jobs to be scheduled. - * If the result is successful, actually trigger the jobs. Otherwise, call the given [onFailure] function with the - * exception that occurred. - */ - private fun Result>>.scheduleNextJobs( - onFailure: (Throwable) -> Unit - ) { - onSuccess { (context, schedules) -> - scheduleCreatedJobs(context, schedules) - } - this@scheduleNextJobs.onFailure { onFailure(it) } - } - /** * Create a [WorkerScheduleContext] for the given [ortRun] and message [header] with the given [failed] flag. * The context is initialized with the status of all jobs for this run, either from the given [workerJobs] From 391a7333b956d3c15923e255c42546089110b219 Mon Sep 17 00:00:00 2001 From: Martin Nonnenmacher Date: Fri, 27 Dec 2024 18:17:06 +0100 Subject: [PATCH 05/10] refactor(orchestrator): Rename `getCurrentOrtRun` to `getOrtRun` The word "current" does not carry any meaning in the context of the function. Signed-off-by: Martin Nonnenmacher --- orchestrator/src/main/kotlin/Orchestrator.kt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/orchestrator/src/main/kotlin/Orchestrator.kt b/orchestrator/src/main/kotlin/Orchestrator.kt index 3f1cc181b4..e5db684642 100644 --- a/orchestrator/src/main/kotlin/Orchestrator.kt +++ b/orchestrator/src/main/kotlin/Orchestrator.kt @@ -113,7 +113,7 @@ class Orchestrator( */ fun handleConfigWorkerResult(header: MessageHeader, configWorkerResult: ConfigWorkerResult) { db.blockingQueryCatching(transactionIsolation = isolationLevel) { - val ortRun = getCurrentOrtRun(configWorkerResult.ortRunId) + val ortRun = getOrtRun(configWorkerResult.ortRunId) nextJobsToSchedule(ConfigEndpoint, ortRun.id, header, jobs = emptyMap()) }.onSuccess { (context, schedules) -> @@ -255,7 +255,7 @@ class Orchestrator( repository.tryComplete(job.id, Clock.System.now(), JobStatus.FAILED)?.let { nextJobsToSchedule(Endpoint.fromConfigPrefix(workerError.endpointName), job.ortRunId, header) } - } ?: (createWorkerScheduleContext(getCurrentOrtRun(ortRunId), header, failed = true) to emptyList()) + } ?: (createWorkerScheduleContext(getOrtRun(ortRunId), header, failed = true) to emptyList()) }.onSuccess { (context, schedules) -> scheduleCreatedJobs(context, schedules) }.onFailure { @@ -271,7 +271,7 @@ class Orchestrator( log.info("Handling a lost schedule for ORT run {}.", lostSchedule.ortRunId) db.blockingQueryCatching(transactionIsolation = isolationLevel) { - val ortRun = getCurrentOrtRun(lostSchedule.ortRunId) + val ortRun = getOrtRun(lostSchedule.ortRunId) val context = createWorkerScheduleContext(ortRun, header) if (context.jobs.isNotEmpty()) { @@ -289,7 +289,7 @@ class Orchestrator( /** * Obtain the [OrtRun] with the given [ortRunId] of fail with an exception if it does not exist. */ - private fun getCurrentOrtRun(ortRunId: Long): OrtRun = + private fun getOrtRun(ortRunId: Long): OrtRun = requireNotNull(ortRunRepository.get(ortRunId)) { "ORT run '$ortRunId' not found." } @@ -362,7 +362,7 @@ class Orchestrator( ): Pair> { log.info("Handling a completed job for endpoint '{}' and ORT run {}.", endpoint.configPrefix, ortRunId) - val ortRun = getCurrentOrtRun(ortRunId) + val ortRun = getOrtRun(ortRunId) val scheduleContext = createWorkerScheduleContext(ortRun, header, workerJobs = jobs) return fetchNextJobs(scheduleContext) From 9400db2c90a966aa78d25fdfa1205e8b987b6783 Mon Sep 17 00:00:00 2001 From: Martin Nonnenmacher Date: Fri, 27 Dec 2024 19:04:12 +0100 Subject: [PATCH 06/10] refactor(orchestrator): Simplify defining job dependencies Use the `WorkerScheduleInfo` enum instead of the `Endpoint` class to define the dependencies between jobs. This slightly simplifies the code and improves type safety. It also makes the companion object of `WorkerScheduleInfo` obsolete. Signed-off-by: Martin Nonnenmacher --- .../src/main/kotlin/WorkerScheduleInfo.kt | 33 +++++++------------ 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt b/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt index 5c3b544ec3..a3354a02e2 100644 --- a/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt +++ b/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt @@ -56,13 +56,13 @@ internal enum class WorkerScheduleInfo( * A list defining the worker jobs that this job depends on. This job will only be executed after all the * dependencies have been successfully completed. */ - private val dependsOn: List> = emptyList(), + private val dependsOn: List = emptyList(), /** * A list defining the worker jobs that must run before this job. The difference to [dependsOn] is that this job * can also run if these other jobs will not be executed. It is only guaranteed that it runs after all of them. */ - private val runsAfter: List> = emptyList(), + private val runsAfter: List = emptyList(), /** * A flag determining whether the represented worker should be run even if previous workers have already failed. @@ -83,7 +83,7 @@ internal enum class WorkerScheduleInfo( override fun isConfigured(configs: JobConfigurations): Boolean = true }, - ADVISOR(AdvisorEndpoint, dependsOn = listOf(AnalyzerEndpoint)) { + ADVISOR(AdvisorEndpoint, dependsOn = listOf(ANALYZER)) { override fun createJob(context: WorkerScheduleContext): WorkerJob? = context.jobConfigs().advisor?.let { config -> context.workerJobRepositories.advisorJobRepository.create(context.ortRun.id, config) @@ -97,7 +97,7 @@ internal enum class WorkerScheduleInfo( configs.advisor != null }, - SCANNER(ScannerEndpoint, dependsOn = listOf(AnalyzerEndpoint)) { + SCANNER(ScannerEndpoint, dependsOn = listOf(ANALYZER)) { override fun createJob(context: WorkerScheduleContext): WorkerJob? = context.jobConfigs().scanner?.let { config -> context.workerJobRepositories.scannerJobRepository.create(context.ortRun.id, config) @@ -111,7 +111,7 @@ internal enum class WorkerScheduleInfo( configs.scanner != null }, - EVALUATOR(EvaluatorEndpoint, runsAfter = listOf(AdvisorEndpoint, ScannerEndpoint)) { + EVALUATOR(EvaluatorEndpoint, runsAfter = listOf(ADVISOR, SCANNER)) { override fun createJob(context: WorkerScheduleContext): WorkerJob? = context.jobConfigs().evaluator?.let { config -> context.workerJobRepositories.evaluatorJobRepository.create(context.ortRun.id, config) @@ -125,7 +125,7 @@ internal enum class WorkerScheduleInfo( configs.evaluator != null }, - REPORTER(ReporterEndpoint, runsAfter = listOf(EvaluatorEndpoint), runAfterFailure = true) { + REPORTER(ReporterEndpoint, runsAfter = listOf(EVALUATOR), runAfterFailure = true) { override fun createJob(context: WorkerScheduleContext): WorkerJob? = context.jobConfigs().reporter?.let { config -> context.workerJobRepositories.reporterJobRepository.create(context.ortRun.id, config) @@ -139,7 +139,7 @@ internal enum class WorkerScheduleInfo( configs.reporter != null }, - NOTIFIER(NotifierEndpoint, dependsOn = listOf(ReporterEndpoint), runAfterFailure = true) { + NOTIFIER(NotifierEndpoint, dependsOn = listOf(REPORTER), runAfterFailure = true) { override fun createJob(context: WorkerScheduleContext): WorkerJob? = context.jobConfigs().notifier?.let { config -> context.workerJobRepositories.notifierJobRepository.create(context.ortRun.id, config) @@ -153,20 +153,14 @@ internal enum class WorkerScheduleInfo( configs.notifier != null }; - companion object { - private val entriesByPrefix = entries.associateBy { it.endpoint.configPrefix } - - operator fun get(endpoint: Endpoint<*>): WorkerScheduleInfo = entriesByPrefix.getValue(endpoint.configPrefix) - } - /** * Return the transitive set of the workers that must complete before this one can run. This is necessary to * determine whether this worker can be started in the current phase of an ORT run. Note that it is assumed that * no cycles exist in the dependency graph of workers; otherwise, the scheduler algorithm would have a severe * problem. */ - private val runsAfterTransitively: Set> - get() = (runsAfter + dependsOn).flatMapTo(mutableSetOf()) { WorkerScheduleInfo[it].runsAfterTransitively + it } + private val runsAfterTransitively: Set + get() = (runsAfter + dependsOn).flatMapTo(mutableSetOf()) { it.runsAfterTransitively + it } /** * Check whether a job for the represented worker can be scheduled now based on the given [context]. If so, create @@ -209,8 +203,8 @@ internal enum class WorkerScheduleInfo( isConfigured(context.jobConfigs()) && !context.wasScheduled(endpoint) && canRunWithFailureState(context) && - dependsOn.all { context.isJobCompleted(it) } && - runsAfterTransitively.none { WorkerScheduleInfo[it].isPending(context) } + dependsOn.all { context.isJobCompleted(it.endpoint) } && + runsAfterTransitively.none { it.isPending(context) } /** * Check whether the represented worker is pending for the current ORT run based on the given [context]. This @@ -220,10 +214,7 @@ internal enum class WorkerScheduleInfo( isConfigured(context.jobConfigs()) && !context.isJobCompleted(endpoint) && canRunWithFailureState(context) && - dependsOn.all { - context.wasScheduled(it) || - WorkerScheduleInfo[it].isPending(context) - } + dependsOn.all { context.wasScheduled(it.endpoint) || it.isPending(context) } /** * Check whether the represented worker can be executed for the failure state stored in the given [context]. Here From 8bb979a98ad07339dca8c827f16ef312cc8adda3 Mon Sep 17 00:00:00 2001 From: Martin Nonnenmacher Date: Sat, 28 Dec 2024 20:28:07 +0100 Subject: [PATCH 07/10] refactor(orchestrator): Extract scheduling logic Extract the logic to determine which jobs should run next into a new `OrtRunInfo` class to be able to test it independently. The class will be taken into use in a follow-up commit. Signed-off-by: Martin Nonnenmacher --- orchestrator/src/main/kotlin/OrtRunInfo.kt | 79 ++++ .../src/main/kotlin/WorkerScheduleInfo.kt | 8 +- .../src/test/kotlin/OrtRunInfoTest.kt | 346 ++++++++++++++++++ 3 files changed, 429 insertions(+), 4 deletions(-) create mode 100644 orchestrator/src/main/kotlin/OrtRunInfo.kt create mode 100644 orchestrator/src/test/kotlin/OrtRunInfoTest.kt diff --git a/orchestrator/src/main/kotlin/OrtRunInfo.kt b/orchestrator/src/main/kotlin/OrtRunInfo.kt new file mode 100644 index 0000000000..7c3fbb2687 --- /dev/null +++ b/orchestrator/src/main/kotlin/OrtRunInfo.kt @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2024 The ORT Server Authors (See ) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * License-Filename: LICENSE + */ + +package org.eclipse.apoapsis.ortserver.orchestrator + +import org.eclipse.apoapsis.ortserver.model.JobStatus + +/** A class to store the required information to determine which jobs can be run. */ +internal class OrtRunInfo( + /** The ORT run ID. */ + val id: Long, + + /** Whether the config worker has failed. */ + val configWorkerFailed: Boolean, + + /** The jobs configured to run in this ORT run. */ + val configuredJobs: Set, + + /** Status information for already created jobs. */ + val jobInfos: Map +) { + /** Get the next jobs that can be run. */ + fun getNextJobs(): Set = WorkerScheduleInfo.entries.filterTo(mutableSetOf()) { canRun(it) } + + /** Return true if the job can be run. */ + private fun canRun(info: WorkerScheduleInfo): Boolean = + isConfigured(info) && + !wasScheduled(info) && + canRunIfPreviousJobFailed(info) && + info.dependsOn.all { isCompleted(it) } && + info.runsAfterTransitively.none { isPending(it) } + + /** Return true if no previous job has failed or if the job is configured to run after a failure. */ + private fun canRunIfPreviousJobFailed(info: WorkerScheduleInfo): Boolean = info.runAfterFailure || !isFailed() + + /** Return true if the job has been completed. */ + private fun isCompleted(info: WorkerScheduleInfo): Boolean = jobInfos[info]?.status?.final == true + + /** Return true if the job is configured to run. */ + private fun isConfigured(info: WorkerScheduleInfo): Boolean = info in configuredJobs + + /** Return true if any job has failed. */ + private fun isFailed(): Boolean = configWorkerFailed || jobInfos.any { it.value.status == JobStatus.FAILED } + + /** Return true if the job is pending execution. */ + private fun isPending(info: WorkerScheduleInfo): Boolean = + isConfigured(info) && + !isCompleted(info) && + canRunIfPreviousJobFailed(info) && + info.dependsOn.all { wasScheduled(it) || isPending(it) } + + /** Return true if the job has been scheduled. */ + private fun wasScheduled(info: WorkerScheduleInfo): Boolean = jobInfos.containsKey(info) +} + +/** A class to store information of a worker job required by [OrtRunInfo]. */ +internal class WorkerJobInfo( + /** The job ID. */ + val id: Long, + + /** The job status. */ + val status: JobStatus +) diff --git a/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt b/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt index a3354a02e2..ebaf30edfc 100644 --- a/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt +++ b/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt @@ -56,18 +56,18 @@ internal enum class WorkerScheduleInfo( * A list defining the worker jobs that this job depends on. This job will only be executed after all the * dependencies have been successfully completed. */ - private val dependsOn: List = emptyList(), + val dependsOn: List = emptyList(), /** * A list defining the worker jobs that must run before this job. The difference to [dependsOn] is that this job * can also run if these other jobs will not be executed. It is only guaranteed that it runs after all of them. */ - private val runsAfter: List = emptyList(), + val runsAfter: List = emptyList(), /** * A flag determining whether the represented worker should be run even if previous workers have already failed. */ - private val runAfterFailure: Boolean = false + val runAfterFailure: Boolean = false ) { ANALYZER(AnalyzerEndpoint) { override fun createJob(context: WorkerScheduleContext): WorkerJob = @@ -159,7 +159,7 @@ internal enum class WorkerScheduleInfo( * no cycles exist in the dependency graph of workers; otherwise, the scheduler algorithm would have a severe * problem. */ - private val runsAfterTransitively: Set + val runsAfterTransitively: Set get() = (runsAfter + dependsOn).flatMapTo(mutableSetOf()) { it.runsAfterTransitively + it } /** diff --git a/orchestrator/src/test/kotlin/OrtRunInfoTest.kt b/orchestrator/src/test/kotlin/OrtRunInfoTest.kt new file mode 100644 index 0000000000..4bbd9c64e7 --- /dev/null +++ b/orchestrator/src/test/kotlin/OrtRunInfoTest.kt @@ -0,0 +1,346 @@ +/* + * Copyright (C) 2024 The ORT Server Authors (See ) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * License-Filename: LICENSE + */ + +package org.eclipse.apoapsis.ortserver.orchestrator + +import io.kotest.core.spec.style.WordSpec +import io.kotest.matchers.collections.beEmpty +import io.kotest.matchers.collections.containExactly +import io.kotest.matchers.should + +import org.eclipse.apoapsis.ortserver.model.JobStatus + +class OrtRunInfoTest : WordSpec({ + "getNextJobs() with all jobs configured" should { + val configuredJobs = WorkerScheduleInfo.entries.toSet() + + "return ANALYZER if no job was created yet" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = emptyMap() + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.ANALYZER) + } + + "return nothing if ANALYZER is still running" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return ADVISOR and SCANNER if all previous jobs finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.ADVISOR, WorkerScheduleInfo.SCANNER) + } + + "return nothing if ADVISOR is still running" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.RUNNING), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return nothing if SCANNER is still running" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return EVALUATOR if all previous jobs finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.EVALUATOR) + } + + "return nothing if EVALUATOR is still running" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return REPORTER if all previous jobs finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) + } + + "return REPORTER if config worker failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = true, + configuredJobs = configuredJobs, + jobInfos = emptyMap() + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) + } + + "return REPORTER if ANALYZER failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FAILED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) + } + + "return REPORTER if ADVISOR failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FAILED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) + } + + "return REPORTER if SCANNER failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FAILED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) + } + + "return REPORTER if EVALUATOR failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FAILED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) + } + + "return NOTIFIER if REPORTER finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.REPORTER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.NOTIFIER) + } + + "return NOTIFIER if REPORTER failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.REPORTER to WorkerJobInfo(id = 1, status = JobStatus.FAILED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.NOTIFIER) + } + + "return nothing if all jobs finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = WorkerScheduleInfo.entries.associateWith { + WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + } + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + } + + "getNextJobs() with only ANALYZER and EVALUATOR configured" should { + val configuredJobs = setOf(WorkerScheduleInfo.ANALYZER, WorkerScheduleInfo.EVALUATOR) + + "return ANALYZER if no job was created yet" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = emptyMap() + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.ANALYZER) + } + + "return nothing if ANALYZER is still running" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return nothing if ANALYZER failed" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FAILED) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return EVALUATOR if ANALYZER finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.EVALUATOR) + } + + "return nothing if EVALUATOR is still running" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + + "return nothing if all jobs finished" { + val ortRunInfo = OrtRunInfo( + id = 1, + configWorkerFailed = false, + configuredJobs = configuredJobs, + jobInfos = mapOf( + WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), + WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) + ) + ) + + ortRunInfo.getNextJobs() should beEmpty() + } + } +}) From 5c104740276d8924f06959f0df53bb07df5a4e7d Mon Sep 17 00:00:00 2001 From: Martin Nonnenmacher Date: Mon, 6 Jan 2025 20:06:23 +0100 Subject: [PATCH 08/10] fix(orchestrator): Make the reporter depend on the analyzer If the analyzer did not run, it makes no sense to run the reporter worker as there is no data to report. Signed-off-by: Martin Nonnenmacher --- orchestrator/src/main/kotlin/WorkerScheduleInfo.kt | 2 +- orchestrator/src/test/kotlin/OrtRunInfoTest.kt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt b/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt index ebaf30edfc..d6d198d8f8 100644 --- a/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt +++ b/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt @@ -125,7 +125,7 @@ internal enum class WorkerScheduleInfo( configs.evaluator != null }, - REPORTER(ReporterEndpoint, runsAfter = listOf(EVALUATOR), runAfterFailure = true) { + REPORTER(ReporterEndpoint, dependsOn = listOf(ANALYZER), runsAfter = listOf(EVALUATOR), runAfterFailure = true) { override fun createJob(context: WorkerScheduleContext): WorkerJob? = context.jobConfigs().reporter?.let { config -> context.workerJobRepositories.reporterJobRepository.create(context.ortRun.id, config) diff --git a/orchestrator/src/test/kotlin/OrtRunInfoTest.kt b/orchestrator/src/test/kotlin/OrtRunInfoTest.kt index 4bbd9c64e7..d8f1f96596 100644 --- a/orchestrator/src/test/kotlin/OrtRunInfoTest.kt +++ b/orchestrator/src/test/kotlin/OrtRunInfoTest.kt @@ -144,7 +144,7 @@ class OrtRunInfoTest : WordSpec({ ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) } - "return REPORTER if config worker failed" { + "return nothing if config worker failed" { val ortRunInfo = OrtRunInfo( id = 1, configWorkerFailed = true, @@ -152,7 +152,7 @@ class OrtRunInfoTest : WordSpec({ jobInfos = emptyMap() ) - ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) + ortRunInfo.getNextJobs() should beEmpty() } "return REPORTER if ANALYZER failed" { From f3b26aec247edf85b7cb649471689a017de225cb Mon Sep 17 00:00:00 2001 From: Martin Nonnenmacher Date: Mon, 6 Jan 2025 18:58:07 +0100 Subject: [PATCH 09/10] refactor(orchestrator): Take `OrtRunInfo` into use Add a new function `scheduleNextJobs` that uses the previously introduced `OrtRunInfo` to determine which jobs need to be scheduled and delete all now unused functions from the previous implementation. Signed-off-by: Martin Nonnenmacher --- orchestrator/src/main/kotlin/Orchestrator.kt | 109 ++++++++---------- .../src/main/kotlin/WorkerScheduleContext.kt | 15 +-- .../src/main/kotlin/WorkerScheduleInfo.kt | 59 +--------- 3 files changed, 51 insertions(+), 132 deletions(-) diff --git a/orchestrator/src/main/kotlin/Orchestrator.kt b/orchestrator/src/main/kotlin/Orchestrator.kt index e5db684642..ac47719a61 100644 --- a/orchestrator/src/main/kotlin/Orchestrator.kt +++ b/orchestrator/src/main/kotlin/Orchestrator.kt @@ -99,10 +99,7 @@ class Orchestrator( "Repository '${ortRun.repositoryId}' not found." } - val context = WorkerScheduleContext(ortRun, workerJobRepositories, publisher, header, emptyMap()) - context to listOf { scheduleConfigWorkerJob(ortRun, header, updateRun = true) } - }.onSuccess { (context, schedules) -> - scheduleCreatedJobs(context, schedules) + scheduleConfigWorkerJob(ortRun, header, updateRun = true) }.onFailure { log.warn("Failed to handle 'CreateOrtRun' message.", it) } @@ -115,9 +112,9 @@ class Orchestrator( db.blockingQueryCatching(transactionIsolation = isolationLevel) { val ortRun = getOrtRun(configWorkerResult.ortRunId) - nextJobsToSchedule(ConfigEndpoint, ortRun.id, header, jobs = emptyMap()) - }.onSuccess { (context, schedules) -> - scheduleCreatedJobs(context, schedules) + createWorkerScheduleContext(ortRun, header) + }.onSuccess { context -> + scheduleNextJobs(context) }.onFailure { log.warn("Failed to handle 'ConfigWorkerResult' message.", it) } @@ -252,12 +249,11 @@ class Orchestrator( "ORT run '$ortRunId' not found." } - repository.tryComplete(job.id, Clock.System.now(), JobStatus.FAILED)?.let { - nextJobsToSchedule(Endpoint.fromConfigPrefix(workerError.endpointName), job.ortRunId, header) - } - } ?: (createWorkerScheduleContext(getOrtRun(ortRunId), header, failed = true) to emptyList()) - }.onSuccess { (context, schedules) -> - scheduleCreatedJobs(context, schedules) + repository.tryComplete(job.id, Clock.System.now(), JobStatus.FAILED) + createWorkerScheduleContext(getOrtRun(ortRunId), header) + } ?: createWorkerScheduleContext(getOrtRun(ortRunId), header, failed = true) + }.onSuccess { context -> + scheduleNextJobs(context) }.onFailure { log.warn("Failed to handle 'WorkerError' message.", it) } @@ -274,13 +270,14 @@ class Orchestrator( val ortRun = getOrtRun(lostSchedule.ortRunId) val context = createWorkerScheduleContext(ortRun, header) - if (context.jobs.isNotEmpty()) { - fetchNextJobs(context) + if (context.jobs.isEmpty()) { + scheduleConfigWorkerJob(ortRun, header, updateRun = false) + null } else { - context to listOf { scheduleConfigWorkerJob(ortRun, header, updateRun = false) } + context } - }.onSuccess { (context, schedules) -> - scheduleCreatedJobs(context, schedules) + }.onSuccess { context -> + context?.let { scheduleNextJobs(context) } }.onFailure { log.warn("Failed to handle 'LostSchedule' message.", it) } @@ -340,34 +337,14 @@ class Orchestrator( val job = workerJobRepositories.updateJobStatus(endpoint, message.jobId, status) if (issues.isNotEmpty()) ortRunRepository.update(job.ortRunId, issues = issues.asPresent()) - nextJobsToSchedule(endpoint, job.ortRunId, header) - }.onSuccess { (context, schedules) -> - scheduleCreatedJobs(context, schedules) + createWorkerScheduleContext(getOrtRun(job.ortRunId), header) + }.onSuccess { context -> + scheduleNextJobs(context) }.onFailure { log.warn("Failed to handle '{}' message.", message::class.java.simpleName, it) } } - /** - * Determine the next jobs that can be scheduled after a job for the given [endpoint] for the run with the given - * [ortRunId] has completed. Use the given [header] to send messages to the worker endpoints. Optionally, - * accept a map with the [jobs] that have been run. Return a list with the new jobs to schedule and the current - * [WorkerScheduleContext]. - */ - private fun nextJobsToSchedule( - endpoint: Endpoint<*>, - ortRunId: Long, - header: MessageHeader, - jobs: Map? = null - ): Pair> { - log.info("Handling a completed job for endpoint '{}' and ORT run {}.", endpoint.configPrefix, ortRunId) - - val ortRun = getOrtRun(ortRunId) - val scheduleContext = createWorkerScheduleContext(ortRun, header, workerJobs = jobs) - - return fetchNextJobs(scheduleContext) - } - /** * Create a [WorkerScheduleContext] for the given [ortRun] and message [header] with the given [failed] flag. * The context is initialized with the status of all jobs for this run, either from the given [workerJobs] @@ -386,16 +363,36 @@ class Orchestrator( return WorkerScheduleContext(ortRun, workerJobRepositories, publisher, header, jobs, failed) } - /** - * Trigger the scheduling of the given new [createdJobs] for the ORT run contained in the given [context]. This - * also includes sending corresponding messages to the worker endpoints. - */ - private fun scheduleCreatedJobs(context: WorkerScheduleContext, createdJobs: CreatedJobs) { - // TODO: Handle errors during job scheduling. + /** Schedule the next jobs for the current ORT run based on the current state of the run. */ + private fun scheduleNextJobs(context: WorkerScheduleContext) { + val configuredJobs = WorkerScheduleInfo.entries.filterTo(mutableSetOf()) { + it.isConfigured(context.jobConfigs()) + } - createdJobs.forEach { it() } + val jobInfos = configuredJobs.mapNotNull { + context.jobs[it.endpoint.configPrefix]?.let { job -> + it to WorkerJobInfo(job.id, job.status) + } + }.toMap() + + val ortRunInfo = OrtRunInfo(context.ortRun.id, context.failed, configuredJobs, jobInfos) + + val nextJobs = ortRunInfo.getNextJobs() + + nextJobs.forEach { info -> + info.createJob(context)?.let { job -> + // TODO: Handle errors during job scheduling. + info.publishJob(context, job) + context.workerJobRepositories.updateJobStatus( + info.endpoint, + job.id, + JobStatus.SCHEDULED, + finished = false + ) + } + } - if (createdJobs.isEmpty() && !context.hasRunningJobs()) { + if (nextJobs.isEmpty() && !context.hasRunningJobs()) { cleanupJobs(context.ortRun.id) val ortRunStatus = when { @@ -460,11 +457,6 @@ class Orchestrator( ) } -/** - * Type definition to represent a list of jobs that have been created and must be scheduled. - */ -typealias CreatedJobs = List - /** * Create an [Issue] object representing an error that occurred in any [Endpoint]. */ @@ -474,12 +466,3 @@ fun Endpoint.createErrorIssue(): Issue = Issue( message = "The $configPrefix worker failed due to an unexpected error.", severity = Severity.ERROR ) - -/** - * Return a [Pair] with the given [scheduleContext] and the list of jobs that can be scheduled in the current phase - * of the affected ORT run. - */ -private fun fetchNextJobs( - scheduleContext: WorkerScheduleContext -): Pair> = - scheduleContext to WorkerScheduleInfo.entries.mapNotNull { it.createAndScheduleJobIfPossible(scheduleContext) } diff --git a/orchestrator/src/main/kotlin/WorkerScheduleContext.kt b/orchestrator/src/main/kotlin/WorkerScheduleContext.kt index 1c8ec57d05..2c69256635 100644 --- a/orchestrator/src/main/kotlin/WorkerScheduleContext.kt +++ b/orchestrator/src/main/kotlin/WorkerScheduleContext.kt @@ -60,7 +60,7 @@ internal class WorkerScheduleContext( * jobs that have been run. With this flag, this mechanism can be overridden, which is necessary for workers that * do not spawn jobs like the Config worker. */ - private val failed: Boolean = false + val failed: Boolean = false ) { /** * Return the [JobConfigurations] object for the current run. Prefer the resolved configurations if available; @@ -89,19 +89,6 @@ internal class WorkerScheduleContext( fun hasRunningJobs(): Boolean = jobs.values.any { !it.isCompleted() } - /** - * Return a flag whether the worker job for the given [endpoint] was scheduled for the current ORT run. It may - * still be running or have finished already. - */ - fun wasScheduled(endpoint: Endpoint<*>): Boolean = - endpoint.configPrefix in jobs - - /** - * Return a flag whether the worker job for the given [endpoint] has already completed. - */ - fun isJobCompleted(endpoint: Endpoint<*>): Boolean = - jobs[endpoint.configPrefix]?.isCompleted() ?: false - /** * Return a flag whether this [OrtRun] has failed, i.e. it has at least one job in failed state. */ diff --git a/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt b/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt index d6d198d8f8..a930a7a8fc 100644 --- a/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt +++ b/orchestrator/src/main/kotlin/WorkerScheduleInfo.kt @@ -20,7 +20,6 @@ package org.eclipse.apoapsis.ortserver.orchestrator import org.eclipse.apoapsis.ortserver.model.JobConfigurations -import org.eclipse.apoapsis.ortserver.model.JobStatus import org.eclipse.apoapsis.ortserver.model.WorkerJob import org.eclipse.apoapsis.ortserver.model.orchestrator.AdvisorRequest import org.eclipse.apoapsis.ortserver.model.orchestrator.AnalyzerRequest @@ -36,11 +35,6 @@ import org.eclipse.apoapsis.ortserver.transport.NotifierEndpoint import org.eclipse.apoapsis.ortserver.transport.ReporterEndpoint import org.eclipse.apoapsis.ortserver.transport.ScannerEndpoint -/** - * Type definition for a function that schedules another worker job. - */ -typealias JobScheduleFunc = () -> Unit - /** * An enumeration class with constants that describe if and when a job for a specific worker should be scheduled. * @@ -50,7 +44,7 @@ typealias JobScheduleFunc = () -> Unit */ internal enum class WorkerScheduleInfo( /** The endpoint of the worker represented by this schedule info. */ - private val endpoint: Endpoint<*>, + val endpoint: Endpoint<*>, /** * A list defining the worker jobs that this job depends on. This job will only be executed after all the @@ -162,64 +156,19 @@ internal enum class WorkerScheduleInfo( val runsAfterTransitively: Set get() = (runsAfter + dependsOn).flatMapTo(mutableSetOf()) { it.runsAfterTransitively + it } - /** - * Check whether a job for the represented worker can be scheduled now based on the given [context]. If so, create - * the job in the database and return a function that schedules the job. - */ - fun createAndScheduleJobIfPossible(context: WorkerScheduleContext): JobScheduleFunc? { - if (!canRun(context)) return null - - return createJob(context)?.let { job -> - { - publishJob(context, job) - - context.workerJobRepositories.updateJobStatus(endpoint, job.id, JobStatus.SCHEDULED, finished = false) - } - } - } - /** * Create a new job for this worker based on the information in the given [context]. */ - protected abstract fun createJob(context: WorkerScheduleContext): WorkerJob? + abstract fun createJob(context: WorkerScheduleContext): WorkerJob? /** * Publish a message to the worker endpoint to schedule the given [job] based on the information in the given * [context]. */ - protected abstract fun publishJob(context: WorkerScheduleContext, job: WorkerJob) + abstract fun publishJob(context: WorkerScheduleContext, job: WorkerJob) /** * Return a flag whether this worker is configured to run for the current ORT run based on the given [configs]. */ - protected abstract fun isConfigured(configs: JobConfigurations): Boolean - - /** - * Return a flag whether a job for the represented worker can be started now based on the given [context]. - * This function checks whether this worker is configured to run and whether the jobs it depends on have been - * completed. - */ - private fun canRun(context: WorkerScheduleContext): Boolean = - isConfigured(context.jobConfigs()) && - !context.wasScheduled(endpoint) && - canRunWithFailureState(context) && - dependsOn.all { context.isJobCompleted(it.endpoint) } && - runsAfterTransitively.none { it.isPending(context) } - - /** - * Check whether the represented worker is pending for the current ORT run based on the given [context]. This - * means that the worker has not yet run, but - given the current state - is supposed to run later. - */ - private fun isPending(context: WorkerScheduleContext): Boolean = - isConfigured(context.jobConfigs()) && - !context.isJobCompleted(endpoint) && - canRunWithFailureState(context) && - dependsOn.all { context.wasScheduled(it.endpoint) || it.isPending(context) } - - /** - * Check whether the represented worker can be executed for the failure state stored in the given [context]. Here - * a worker can decide whether it can always run or only if all previous workers were successful. - */ - private fun canRunWithFailureState(context: WorkerScheduleContext) = - runAfterFailure || !context.isFailed() + abstract fun isConfigured(configs: JobConfigurations): Boolean } From 3af763d56b92007d5eeb43b58e5638f916ba5af9 Mon Sep 17 00:00:00 2001 From: Martin Nonnenmacher Date: Tue, 7 Jan 2025 08:18:18 +0100 Subject: [PATCH 10/10] chore(orchestrator): Remove an unnecessary helper function With the introduction of the `JobStatus.final` property in 4d23673 the `WorkerJob.isCompleted()` helper function is not required anymore. Signed-off-by: Martin Nonnenmacher --- orchestrator/src/main/kotlin/WorkerScheduleContext.kt | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/orchestrator/src/main/kotlin/WorkerScheduleContext.kt b/orchestrator/src/main/kotlin/WorkerScheduleContext.kt index 2c69256635..6aaaa140ed 100644 --- a/orchestrator/src/main/kotlin/WorkerScheduleContext.kt +++ b/orchestrator/src/main/kotlin/WorkerScheduleContext.kt @@ -87,7 +87,7 @@ internal class WorkerScheduleContext( * Return a flag whether the current [OrtRun] has at least one running job. */ fun hasRunningJobs(): Boolean = - jobs.values.any { !it.isCompleted() } + jobs.values.any { !it.status.final } /** * Return a flag whether this [OrtRun] has failed, i.e. it has at least one job in failed state. @@ -101,9 +101,3 @@ internal class WorkerScheduleContext( fun isFinishedWithIssues(): Boolean = !isFailed() && jobs.values.any { it.status == JobStatus.FINISHED_WITH_ISSUES } } - -/** - * Return a flag whether this [WorkerJob] is already completed. - */ -private fun WorkerJob.isCompleted(): Boolean = - status == JobStatus.FINISHED || status == JobStatus.FAILED || status == JobStatus.FINISHED_WITH_ISSUES