-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Start untangling orchestrator #1739
base: main
Are you sure you want to change the base?
Changes from all commits
18acb53
f5b0f6d
be8a3cb
3b2f14f
391a733
9400db2
8bb979a
5c10474
f3b26ae
3af763d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,7 +78,7 @@ private val log = LoggerFactory.getLogger(Orchestrator::class.java) | |
* It creates jobs for the single processing steps and passes them to the corresponding workers. It collects the results | ||
* produced by the workers until the complete ORT result is available or the run has failed. | ||
*/ | ||
@Suppress("LongParameterList", "TooManyFunctions") | ||
@Suppress("TooManyFunctions") | ||
class Orchestrator( | ||
private val db: Database, | ||
private val workerJobRepositories: WorkerJobRepositories, | ||
|
@@ -99,9 +99,8 @@ class Orchestrator( | |
"Repository '${ortRun.repositoryId}' not found." | ||
} | ||
|
||
val context = WorkerScheduleContext(ortRun, workerJobRepositories, publisher, header, emptyMap()) | ||
context to listOf { scheduleConfigWorkerJob(ortRun, header, updateRun = true) } | ||
}.scheduleNextJobs { | ||
scheduleConfigWorkerJob(ortRun, header, updateRun = true) | ||
}.onFailure { | ||
log.warn("Failed to handle 'CreateOrtRun' message.", it) | ||
} | ||
} | ||
|
@@ -111,10 +110,12 @@ class Orchestrator( | |
*/ | ||
fun handleConfigWorkerResult(header: MessageHeader, configWorkerResult: ConfigWorkerResult) { | ||
db.blockingQueryCatching(transactionIsolation = isolationLevel) { | ||
val ortRun = getCurrentOrtRun(configWorkerResult.ortRunId) | ||
val ortRun = getOrtRun(configWorkerResult.ortRunId) | ||
|
||
nextJobsToSchedule(ConfigEndpoint, ortRun.id, header, jobs = emptyMap()) | ||
}.scheduleNextJobs { | ||
createWorkerScheduleContext(ortRun, header) | ||
}.onSuccess { context -> | ||
scheduleNextJobs(context) | ||
}.onFailure { | ||
log.warn("Failed to handle 'ConfigWorkerResult' message.", it) | ||
} | ||
} | ||
|
@@ -248,11 +249,12 @@ class Orchestrator( | |
"ORT run '$ortRunId' not found." | ||
} | ||
|
||
repository.tryComplete(job.id, Clock.System.now(), JobStatus.FAILED)?.let { | ||
nextJobsToSchedule(Endpoint.fromConfigPrefix(workerError.endpointName), job.ortRunId, header) | ||
} | ||
} ?: (createWorkerSchedulerContext(getCurrentOrtRun(ortRunId), header, failed = true) to emptyList()) | ||
}.scheduleNextJobs { | ||
repository.tryComplete(job.id, Clock.System.now(), JobStatus.FAILED) | ||
createWorkerScheduleContext(getOrtRun(ortRunId), header) | ||
} ?: createWorkerScheduleContext(getOrtRun(ortRunId), header, failed = true) | ||
}.onSuccess { context -> | ||
scheduleNextJobs(context) | ||
}.onFailure { | ||
log.warn("Failed to handle 'WorkerError' message.", it) | ||
} | ||
} | ||
|
@@ -265,23 +267,26 @@ class Orchestrator( | |
log.info("Handling a lost schedule for ORT run {}.", lostSchedule.ortRunId) | ||
|
||
db.blockingQueryCatching(transactionIsolation = isolationLevel) { | ||
val ortRun = getCurrentOrtRun(lostSchedule.ortRunId) | ||
val context = createWorkerSchedulerContext(ortRun, header) | ||
val ortRun = getOrtRun(lostSchedule.ortRunId) | ||
val context = createWorkerScheduleContext(ortRun, header) | ||
|
||
if (context.jobs.isNotEmpty()) { | ||
fetchNextJobs(context) | ||
if (context.jobs.isEmpty()) { | ||
scheduleConfigWorkerJob(ortRun, header, updateRun = false) | ||
null | ||
} else { | ||
context to listOf { scheduleConfigWorkerJob(ortRun, header, updateRun = false) } | ||
context | ||
} | ||
}.scheduleNextJobs { | ||
}.onSuccess { context -> | ||
context?.let { scheduleNextJobs(context) } | ||
}.onFailure { | ||
log.warn("Failed to handle 'LostSchedule' message.", it) | ||
} | ||
} | ||
|
||
/** | ||
* Obtain the [OrtRun] with the given [ortRunId] of fail with an exception if it does not exist. | ||
*/ | ||
private fun getCurrentOrtRun(ortRunId: Long): OrtRun = | ||
private fun getOrtRun(ortRunId: Long): OrtRun = | ||
requireNotNull(ortRunRepository.get(ortRunId)) { | ||
"ORT run '$ortRunId' not found." | ||
} | ||
|
@@ -332,52 +337,20 @@ class Orchestrator( | |
val job = workerJobRepositories.updateJobStatus(endpoint, message.jobId, status) | ||
if (issues.isNotEmpty()) ortRunRepository.update(job.ortRunId, issues = issues.asPresent()) | ||
|
||
nextJobsToSchedule(endpoint, job.ortRunId, header) | ||
}.scheduleNextJobs { | ||
createWorkerScheduleContext(getOrtRun(job.ortRunId), header) | ||
}.onSuccess { context -> | ||
scheduleNextJobs(context) | ||
}.onFailure { | ||
log.warn("Failed to handle '{}' message.", message::class.java.simpleName, it) | ||
} | ||
} | ||
|
||
/** | ||
* Determine the next jobs that can be scheduled after a job for the given [endpoint] for the run with the given | ||
* [ortRunId] has completed. Use the given [header] to send messages to the worker endpoints. Optionally, | ||
* accept a map with the [jobs] that have been run. Return a list with the new jobs to schedule and the current | ||
* [WorkerScheduleContext]. | ||
*/ | ||
private fun nextJobsToSchedule( | ||
endpoint: Endpoint<*>, | ||
ortRunId: Long, | ||
header: MessageHeader, | ||
jobs: Map<String, WorkerJob>? = null | ||
): Pair<WorkerScheduleContext, List<JobScheduleFunc>> { | ||
log.info("Handling a completed job for endpoint '{}' and ORT run {}.", endpoint.configPrefix, ortRunId) | ||
|
||
val ortRun = getCurrentOrtRun(ortRunId) | ||
val scheduleContext = createWorkerSchedulerContext(ortRun, header, workerJobs = jobs) | ||
|
||
return fetchNextJobs(scheduleContext) | ||
} | ||
|
||
/** | ||
* Convenience function to evaluate and process this [Result] with information about the next jobs to be scheduled. | ||
* If the result is successful, actually trigger the jobs. Otherwise, call the given [onFailure] function with the | ||
* exception that occurred. | ||
*/ | ||
private fun Result<Pair<WorkerScheduleContext, List<JobScheduleFunc>>>.scheduleNextJobs( | ||
onFailure: (Throwable) -> Unit | ||
) { | ||
onSuccess { (context, schedules) -> | ||
scheduleCreatedJobs(context, schedules) | ||
} | ||
[email protected] { onFailure(it) } | ||
} | ||
|
||
/** | ||
* Create a [WorkerScheduleContext] for the given [ortRun] and message [header] with the given [failed] flag. | ||
* The context is initialized with the status of all jobs for this run, either from the given [workerJobs] | ||
* parameter or by loading the job status from the database. | ||
*/ | ||
private fun createWorkerSchedulerContext( | ||
private fun createWorkerScheduleContext( | ||
ortRun: OrtRun, | ||
header: MessageHeader, | ||
failed: Boolean = false, | ||
|
@@ -390,23 +363,41 @@ class Orchestrator( | |
return WorkerScheduleContext(ortRun, workerJobRepositories, publisher, header, jobs, failed) | ||
} | ||
|
||
/** | ||
* Trigger the scheduling of the given new [createdJobs] for the ORT run contained in the given [context]. This | ||
* also includes sending corresponding messages to the worker endpoints. | ||
*/ | ||
private fun scheduleCreatedJobs(context: WorkerScheduleContext, createdJobs: CreatedJobs) { | ||
// TODO: Handle errors during job scheduling. | ||
/** Schedule the next jobs for the current ORT run based on the current state of the run. */ | ||
private fun scheduleNextJobs(context: WorkerScheduleContext) { | ||
val configuredJobs = WorkerScheduleInfo.entries.filterTo(mutableSetOf()) { | ||
it.isConfigured(context.jobConfigs()) | ||
} | ||
|
||
val jobInfos = configuredJobs.mapNotNull { | ||
context.jobs[it.endpoint.configPrefix]?.let { job -> | ||
it to WorkerJobInfo(job.id, job.status) | ||
} | ||
}.toMap() | ||
|
||
val ortRunInfo = OrtRunInfo(context.ortRun.id, context.failed, configuredJobs, jobInfos) | ||
|
||
createdJobs.forEach { it() } | ||
val nextJobs = ortRunInfo.getNextJobs() | ||
|
||
if (createdJobs.isEmpty() && !context.hasRunningJobs()) { | ||
nextJobs.forEach { info -> | ||
info.createJob(context)?.let { job -> | ||
// TODO: Handle errors during job scheduling. | ||
info.publishJob(context, job) | ||
context.workerJobRepositories.updateJobStatus( | ||
info.endpoint, | ||
job.id, | ||
JobStatus.SCHEDULED, | ||
finished = false | ||
) | ||
} | ||
} | ||
|
||
if (nextJobs.isEmpty() && !context.hasRunningJobs()) { | ||
cleanupJobs(context.ortRun.id) | ||
|
||
val ortRunStatus = when { | ||
context.isFailed() -> OrtRunStatus.FAILED | ||
|
||
context.isFinishedWithIssues() -> OrtRunStatus.FINISHED_WITH_ISSUES | ||
|
||
else -> OrtRunStatus.FINISHED | ||
} | ||
|
||
|
@@ -466,11 +457,6 @@ class Orchestrator( | |
) | ||
} | ||
|
||
/** | ||
* Type definition to represent a list of jobs that have been created and must be scheduled. | ||
*/ | ||
typealias CreatedJobs = List<JobScheduleFunc> | ||
|
||
/** | ||
* Create an [Issue] object representing an error that occurred in any [Endpoint]. | ||
*/ | ||
|
@@ -480,12 +466,3 @@ fun <T : Any> Endpoint<T>.createErrorIssue(): Issue = Issue( | |
message = "The $configPrefix worker failed due to an unexpected error.", | ||
severity = Severity.ERROR | ||
) | ||
|
||
/** | ||
* Return a [Pair] with the given [scheduleContext] and the list of jobs that can be scheduled in the current phase | ||
* of the affected ORT run. | ||
*/ | ||
private fun fetchNextJobs( | ||
scheduleContext: WorkerScheduleContext | ||
): Pair<WorkerScheduleContext, List<JobScheduleFunc>> = | ||
scheduleContext to WorkerScheduleInfo.entries.mapNotNull { it.createAndScheduleJobIfPossible(scheduleContext) } |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>) | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* License-Filename: LICENSE | ||
*/ | ||
|
||
package org.eclipse.apoapsis.ortserver.orchestrator | ||
|
||
import org.eclipse.apoapsis.ortserver.model.JobStatus | ||
|
||
/** A class to store the required information to determine which jobs can be run. */ | ||
internal class OrtRunInfo( | ||
/** The ORT run ID. */ | ||
val id: Long, | ||
|
||
/** Whether the config worker has failed. */ | ||
val configWorkerFailed: Boolean, | ||
|
||
/** The jobs configured to run in this ORT run. */ | ||
val configuredJobs: Set<WorkerScheduleInfo>, | ||
|
||
/** Status information for already created jobs. */ | ||
val jobInfos: Map<WorkerScheduleInfo, WorkerJobInfo> | ||
) { | ||
/** Get the next jobs that can be run. */ | ||
fun getNextJobs(): Set<WorkerScheduleInfo> = WorkerScheduleInfo.entries.filterTo(mutableSetOf()) { canRun(it) } | ||
|
||
/** Return true if the job can be run. */ | ||
private fun canRun(info: WorkerScheduleInfo): Boolean = | ||
isConfigured(info) && | ||
!wasScheduled(info) && | ||
canRunIfPreviousJobFailed(info) && | ||
info.dependsOn.all { isCompleted(it) } && | ||
info.runsAfterTransitively.none { isPending(it) } | ||
|
||
/** Return true if no previous job has failed or if the job is configured to run after a failure. */ | ||
private fun canRunIfPreviousJobFailed(info: WorkerScheduleInfo): Boolean = info.runAfterFailure || !isFailed() | ||
|
||
/** Return true if the job has been completed. */ | ||
private fun isCompleted(info: WorkerScheduleInfo): Boolean = jobInfos[info]?.status?.final == true | ||
|
||
/** Return true if the job is configured to run. */ | ||
private fun isConfigured(info: WorkerScheduleInfo): Boolean = info in configuredJobs | ||
|
||
/** Return true if any job has failed. */ | ||
private fun isFailed(): Boolean = configWorkerFailed || jobInfos.any { it.value.status == JobStatus.FAILED } | ||
|
||
/** Return true if the job is pending execution. */ | ||
private fun isPending(info: WorkerScheduleInfo): Boolean = | ||
isConfigured(info) && | ||
!isCompleted(info) && | ||
canRunIfPreviousJobFailed(info) && | ||
info.dependsOn.all { wasScheduled(it) || isPending(it) } | ||
|
||
/** Return true if the job has been scheduled. */ | ||
private fun wasScheduled(info: WorkerScheduleInfo): Boolean = jobInfos.containsKey(info) | ||
} | ||
|
||
/** A class to store information of a worker job required by [OrtRunInfo]. */ | ||
internal class WorkerJobInfo( | ||
/** The job ID. */ | ||
val id: Long, | ||
|
||
/** The job status. */ | ||
val status: JobStatus | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,6 @@ | |
package org.eclipse.apoapsis.ortserver.orchestrator | ||
|
||
import org.eclipse.apoapsis.ortserver.model.JobConfigurations | ||
import org.eclipse.apoapsis.ortserver.model.JobStatus | ||
import org.eclipse.apoapsis.ortserver.model.WorkerJob | ||
import org.eclipse.apoapsis.ortserver.model.orchestrator.AdvisorRequest | ||
import org.eclipse.apoapsis.ortserver.model.orchestrator.AnalyzerRequest | ||
|
@@ -36,11 +35,6 @@ import org.eclipse.apoapsis.ortserver.transport.NotifierEndpoint | |
import org.eclipse.apoapsis.ortserver.transport.ReporterEndpoint | ||
import org.eclipse.apoapsis.ortserver.transport.ScannerEndpoint | ||
|
||
/** | ||
* Type definition for a function that schedules another worker job. | ||
*/ | ||
typealias JobScheduleFunc = () -> Unit | ||
|
||
/** | ||
* An enumeration class with constants that describe if and when a job for a specific worker should be scheduled. | ||
* | ||
|
@@ -50,24 +44,24 @@ typealias JobScheduleFunc = () -> Unit | |
*/ | ||
internal enum class WorkerScheduleInfo( | ||
/** The endpoint of the worker represented by this schedule info. */ | ||
private val endpoint: Endpoint<*>, | ||
val endpoint: Endpoint<*>, | ||
|
||
/** | ||
* A list defining the worker jobs that this job depends on. This job will only be executed after all the | ||
* dependencies have been successfully completed. | ||
*/ | ||
private val dependsOn: List<Endpoint<*>> = emptyList(), | ||
val dependsOn: List<WorkerScheduleInfo> = emptyList(), | ||
|
||
/** | ||
* A list defining the worker jobs that must run before this job. The difference to [dependsOn] is that this job | ||
* can also run if these other jobs will not be executed. It is only guaranteed that it runs after all of them. | ||
*/ | ||
private val runsAfter: List<Endpoint<*>> = emptyList(), | ||
val runsAfter: List<WorkerScheduleInfo> = emptyList(), | ||
|
||
/** | ||
* A flag determining whether the represented worker should be run even if previous workers have already failed. | ||
*/ | ||
private val runAfterFailure: Boolean = false | ||
val runAfterFailure: Boolean = false | ||
) { | ||
ANALYZER(AnalyzerEndpoint) { | ||
override fun createJob(context: WorkerScheduleContext): WorkerJob = | ||
|
@@ -83,7 +77,7 @@ internal enum class WorkerScheduleInfo( | |
override fun isConfigured(configs: JobConfigurations): Boolean = true | ||
}, | ||
|
||
ADVISOR(AdvisorEndpoint, dependsOn = listOf(AnalyzerEndpoint)) { | ||
ADVISOR(AdvisorEndpoint, dependsOn = listOf(ANALYZER)) { | ||
override fun createJob(context: WorkerScheduleContext): WorkerJob? = | ||
context.jobConfigs().advisor?.let { config -> | ||
context.workerJobRepositories.advisorJobRepository.create(context.ortRun.id, config) | ||
|
@@ -97,7 +91,7 @@ internal enum class WorkerScheduleInfo( | |
configs.advisor != null | ||
}, | ||
|
||
SCANNER(ScannerEndpoint, dependsOn = listOf(AnalyzerEndpoint)) { | ||
SCANNER(ScannerEndpoint, dependsOn = listOf(ANALYZER)) { | ||
override fun createJob(context: WorkerScheduleContext): WorkerJob? = | ||
context.jobConfigs().scanner?.let { config -> | ||
context.workerJobRepositories.scannerJobRepository.create(context.ortRun.id, config) | ||
|
@@ -111,7 +105,7 @@ internal enum class WorkerScheduleInfo( | |
configs.scanner != null | ||
}, | ||
|
||
EVALUATOR(EvaluatorEndpoint, runsAfter = listOf(AdvisorEndpoint, ScannerEndpoint)) { | ||
EVALUATOR(EvaluatorEndpoint, runsAfter = listOf(ADVISOR, SCANNER)) { | ||
override fun createJob(context: WorkerScheduleContext): WorkerJob? = | ||
context.jobConfigs().evaluator?.let { config -> | ||
context.workerJobRepositories.evaluatorJobRepository.create(context.ortRun.id, config) | ||
|
@@ -125,7 +119,7 @@ internal enum class WorkerScheduleInfo( | |
configs.evaluator != null | ||
}, | ||
|
||
REPORTER(ReporterEndpoint, runsAfter = listOf(EvaluatorEndpoint), runAfterFailure = true) { | ||
REPORTER(ReporterEndpoint, dependsOn = listOf(ANALYZER), runsAfter = listOf(EVALUATOR), runAfterFailure = true) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was a deliberate decision that the Reporter step should always be executed to enable use cases like an "ORT run report" containing information about the whole run with its successful and failing steps. So, I would be reluctant to say that it generally makes no sense to run the reporter after a failed analyzer step. In every case, the type "fix" is not correct for this commit because this is no bug, but the behavior was by design. |
||
override fun createJob(context: WorkerScheduleContext): WorkerJob? = | ||
context.jobConfigs().reporter?.let { config -> | ||
context.workerJobRepositories.reporterJobRepository.create(context.ortRun.id, config) | ||
|
@@ -139,7 +133,7 @@ internal enum class WorkerScheduleInfo( | |
configs.reporter != null | ||
}, | ||
|
||
NOTIFIER(NotifierEndpoint, dependsOn = listOf(ReporterEndpoint), runAfterFailure = true) { | ||
NOTIFIER(NotifierEndpoint, dependsOn = listOf(REPORTER), runAfterFailure = true) { | ||
override fun createJob(context: WorkerScheduleContext): WorkerJob? = | ||
context.jobConfigs().notifier?.let { config -> | ||
context.workerJobRepositories.notifierJobRepository.create(context.ortRun.id, config) | ||
|
@@ -153,82 +147,28 @@ internal enum class WorkerScheduleInfo( | |
configs.notifier != null | ||
}; | ||
|
||
companion object { | ||
private val entriesByPrefix = entries.associateBy { it.endpoint.configPrefix } | ||
|
||
operator fun get(endpoint: Endpoint<*>): WorkerScheduleInfo = entriesByPrefix.getValue(endpoint.configPrefix) | ||
} | ||
|
||
/** | ||
* Return the transitive set of the workers that must complete before this one can run. This is necessary to | ||
* determine whether this worker can be started in the current phase of an ORT run. Note that it is assumed that | ||
* no cycles exist in the dependency graph of workers; otherwise, the scheduler algorithm would have a severe | ||
* problem. | ||
*/ | ||
private val runsAfterTransitively: Set<Endpoint<*>> | ||
get() = (runsAfter + dependsOn).flatMapTo(mutableSetOf()) { WorkerScheduleInfo[it].runsAfterTransitively + it } | ||
|
||
/** | ||
* Check whether a job for the represented worker can be scheduled now based on the given [context]. If so, create | ||
* the job in the database and return a function that schedules the job. | ||
*/ | ||
fun createAndScheduleJobIfPossible(context: WorkerScheduleContext): JobScheduleFunc? { | ||
if (!canRun(context)) return null | ||
|
||
return createJob(context)?.let { job -> | ||
{ | ||
publishJob(context, job) | ||
|
||
context.workerJobRepositories.updateJobStatus(endpoint, job.id, JobStatus.SCHEDULED, finished = false) | ||
} | ||
} | ||
} | ||
val runsAfterTransitively: Set<WorkerScheduleInfo> | ||
get() = (runsAfter + dependsOn).flatMapTo(mutableSetOf()) { it.runsAfterTransitively + it } | ||
|
||
/** | ||
* Create a new job for this worker based on the information in the given [context]. | ||
*/ | ||
protected abstract fun createJob(context: WorkerScheduleContext): WorkerJob? | ||
abstract fun createJob(context: WorkerScheduleContext): WorkerJob? | ||
|
||
/** | ||
* Publish a message to the worker endpoint to schedule the given [job] based on the information in the given | ||
* [context]. | ||
*/ | ||
protected abstract fun publishJob(context: WorkerScheduleContext, job: WorkerJob) | ||
abstract fun publishJob(context: WorkerScheduleContext, job: WorkerJob) | ||
|
||
/** | ||
* Return a flag whether this worker is configured to run for the current ORT run based on the given [configs]. | ||
*/ | ||
protected abstract fun isConfigured(configs: JobConfigurations): Boolean | ||
|
||
/** | ||
* Return a flag whether a job for the represented worker can be started now based on the given [context]. | ||
* This function checks whether this worker is configured to run and whether the jobs it depends on have been | ||
* completed. | ||
*/ | ||
private fun canRun(context: WorkerScheduleContext): Boolean = | ||
isConfigured(context.jobConfigs()) && | ||
!context.wasScheduled(endpoint) && | ||
canRunWithFailureState(context) && | ||
dependsOn.all { context.isJobCompleted(it) } && | ||
runsAfterTransitively.none { WorkerScheduleInfo[it].isPending(context) } | ||
|
||
/** | ||
* Check whether the represented worker is pending for the current ORT run based on the given [context]. This | ||
* means that the worker has not yet run, but - given the current state - is supposed to run later. | ||
*/ | ||
private fun isPending(context: WorkerScheduleContext): Boolean = | ||
isConfigured(context.jobConfigs()) && | ||
!context.isJobCompleted(endpoint) && | ||
canRunWithFailureState(context) && | ||
dependsOn.all { | ||
context.wasScheduled(it) || | ||
WorkerScheduleInfo[it].isPending(context) | ||
} | ||
|
||
/** | ||
* Check whether the represented worker can be executed for the failure state stored in the given [context]. Here | ||
* a worker can decide whether it can always run or only if all previous workers were successful. | ||
*/ | ||
private fun canRunWithFailureState(context: WorkerScheduleContext) = | ||
runAfterFailure || !context.isFailed() | ||
abstract fun isConfigured(configs: JobConfigurations): Boolean | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,346 @@ | ||
/* | ||
* Copyright (C) 2024 The ORT Server Authors (See <https://github.com/eclipse-apoapsis/ort-server/blob/main/NOTICE>) | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* License-Filename: LICENSE | ||
*/ | ||
|
||
package org.eclipse.apoapsis.ortserver.orchestrator | ||
|
||
import io.kotest.core.spec.style.WordSpec | ||
import io.kotest.matchers.collections.beEmpty | ||
import io.kotest.matchers.collections.containExactly | ||
import io.kotest.matchers.should | ||
|
||
import org.eclipse.apoapsis.ortserver.model.JobStatus | ||
|
||
class OrtRunInfoTest : WordSpec({ | ||
"getNextJobs() with all jobs configured" should { | ||
val configuredJobs = WorkerScheduleInfo.entries.toSet() | ||
|
||
"return ANALYZER if no job was created yet" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = emptyMap() | ||
) | ||
|
||
ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.ANALYZER) | ||
} | ||
|
||
"return nothing if ANALYZER is still running" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should beEmpty() | ||
} | ||
|
||
"return ADVISOR and SCANNER if all previous jobs finished" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.ADVISOR, WorkerScheduleInfo.SCANNER) | ||
} | ||
|
||
"return nothing if ADVISOR is still running" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.RUNNING), | ||
WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should beEmpty() | ||
} | ||
|
||
"return nothing if SCANNER is still running" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should beEmpty() | ||
} | ||
|
||
"return EVALUATOR if all previous jobs finished" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.EVALUATOR) | ||
} | ||
|
||
"return nothing if EVALUATOR is still running" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should beEmpty() | ||
} | ||
|
||
"return REPORTER if all previous jobs finished" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) | ||
} | ||
|
||
"return nothing if config worker failed" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = true, | ||
configuredJobs = configuredJobs, | ||
jobInfos = emptyMap() | ||
) | ||
|
||
ortRunInfo.getNextJobs() should beEmpty() | ||
} | ||
|
||
"return REPORTER if ANALYZER failed" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FAILED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) | ||
} | ||
|
||
"return REPORTER if ADVISOR failed" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FAILED), | ||
WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) | ||
} | ||
|
||
"return REPORTER if SCANNER failed" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FAILED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) | ||
} | ||
|
||
"return REPORTER if EVALUATOR failed" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FAILED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.REPORTER) | ||
} | ||
|
||
"return NOTIFIER if REPORTER finished" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.REPORTER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.NOTIFIER) | ||
} | ||
|
||
"return NOTIFIER if REPORTER failed" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.ADVISOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.SCANNER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.REPORTER to WorkerJobInfo(id = 1, status = JobStatus.FAILED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.NOTIFIER) | ||
} | ||
|
||
"return nothing if all jobs finished" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = WorkerScheduleInfo.entries.associateWith { | ||
WorkerJobInfo(id = 1, status = JobStatus.FINISHED) | ||
} | ||
) | ||
|
||
ortRunInfo.getNextJobs() should beEmpty() | ||
} | ||
} | ||
|
||
"getNextJobs() with only ANALYZER and EVALUATOR configured" should { | ||
val configuredJobs = setOf(WorkerScheduleInfo.ANALYZER, WorkerScheduleInfo.EVALUATOR) | ||
|
||
"return ANALYZER if no job was created yet" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = emptyMap() | ||
) | ||
|
||
ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.ANALYZER) | ||
} | ||
|
||
"return nothing if ANALYZER is still running" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should beEmpty() | ||
} | ||
|
||
"return nothing if ANALYZER failed" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FAILED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should beEmpty() | ||
} | ||
|
||
"return EVALUATOR if ANALYZER finished" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should containExactly(WorkerScheduleInfo.EVALUATOR) | ||
} | ||
|
||
"return nothing if EVALUATOR is still running" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.RUNNING) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should beEmpty() | ||
} | ||
|
||
"return nothing if all jobs finished" { | ||
val ortRunInfo = OrtRunInfo( | ||
id = 1, | ||
configWorkerFailed = false, | ||
configuredJobs = configuredJobs, | ||
jobInfos = mapOf( | ||
WorkerScheduleInfo.ANALYZER to WorkerJobInfo(id = 1, status = JobStatus.FINISHED), | ||
WorkerScheduleInfo.EVALUATOR to WorkerJobInfo(id = 1, status = JobStatus.FINISHED) | ||
) | ||
) | ||
|
||
ortRunInfo.getNextJobs() should beEmpty() | ||
} | ||
} | ||
}) |
Unchanged files with check annotations Beta
&& corepack enable | ||
FROM scratch AS node | ||
COPY --from=nodebuild $NVM_DIR $NVM_DIR | ||
Check warning on line 211 in workers/analyzer/docker/Analyzer.Dockerfile
|
||
# Required for Corepack to dynamically modify binaries of supported package managers. | ||
RUN sudo chgrp -R 0 /opt/nvm && chmod -R g+rwX /opt/nvm | ||
&& gem install bundler cocoapods:$COCOAPODS_VERSION | ||
FROM scratch AS ruby | ||
COPY --from=rubybuild $RBENV_ROOT $RBENV_ROOT | ||
Check warning on line 247 in workers/analyzer/docker/Analyzer.Dockerfile
|
||
#------------------------------------------------------------------------ | ||
# RUST - Build as a separate component | ||
RUN curl -sSL https://get.haskellstack.org/ | bash -s -- -d $HASKELL_HOME/bin | ||
FROM scratch AS haskell | ||
COPY --from=haskellbuild $HASKELL_HOME $HASKELL_HOME | ||
Check warning on line 296 in workers/analyzer/docker/Analyzer.Dockerfile
|
||
#------------------------------------------------------------------------ | ||
# REPO / ANDROID SDK | ||
&& sudo chmod a+x $ANDROID_HOME/cmdline-tools/bin/repo | ||
FROM scratch AS android | ||
COPY --from=androidbuild $ANDROID_HOME $ANDROID_HOME | ||
Check warning on line 328 in workers/analyzer/docker/Analyzer.Dockerfile
|
||
#------------------------------------------------------------------------ | ||
# Dart | ||
&& unzip /dart/dart.zip | ||
FROM scratch AS dart | ||
COPY --from=dartbuild $DART_SDK $DART_SDK | ||
Check warning on line 348 in workers/analyzer/docker/Analyzer.Dockerfile
|
||
#------------------------------------------------------------------------ | ||
# SBT | ||
RUN curl -L https://github.com/sbt/sbt/releases/download/v$SBT_VERSION/sbt-$SBT_VERSION.tgz | tar -C /opt -xz | ||
FROM scratch AS sbt | ||
COPY --from=sbtbuild $DART_SDK $DART_SDK | ||
Check warning on line 362 in workers/analyzer/docker/Analyzer.Dockerfile
|
||
#------------------------------------------------------------------------ | ||
# SWIFT | ||
| tar -xz -C $SWIFT_HOME --strip-components=2 | ||
FROM scratch AS swift | ||
COPY --from=swiftbuild $SWIFT_HOME $SWIFT_HOME | ||
Check warning on line 384 in workers/analyzer/docker/Analyzer.Dockerfile
|
||
#------------------------------------------------------------------------ | ||
# DOTNET |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I like the idea to extract the scheduling logic to a dedicated class, I have some problems with the current implementation:
OrtRunInfo
is meaningless in this context and rather reminds of a data model class.WorkerScheduleContext
is unclear.Orchestrator
now creates aWorkerScheduleContext
, and with the help of this context, anOrtRunInfo
. This is because the latter has its own state derived from the context (this is not really untangling). It would be better ifOrtRunInfo
was stateless and only implemented the scheduling strategy. ThegetNextJobs()
function could be passed aWorkerScheduleContext
info object and obtain all required information from there.