Skip to content

Commit

Permalink
Added max number of failures (#254)
Browse files Browse the repository at this point in the history
* Added a max failure for tasks. If tasks fail more times, they get cancelled

* Added maxNumFailures to the frontend

* Updated tests
  • Loading branch information
DanteNiewenhuis authored Sep 12, 2024
1 parent ad8051f commit 5047e4a
Show file tree
Hide file tree
Showing 17 changed files with 111 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public interface Task : Resource {
*/
public val state: TaskState

/**
* The number of times a Task has been stopped due to failures
*/
public val numFailures: Int

/**
* The most recent moment in time when the task was launched.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
package org.opendc.compute.failure.models

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.opendc.compute.service.ComputeService
import org.opendc.trace.Trace
import org.opendc.trace.conv.FAILURE_DURATION
Expand Down Expand Up @@ -79,12 +78,11 @@ public class TraceBasedFailureModel(
override suspend fun runInjector() {
do {
for (failure in failureList) {
delay(failure.failureInterval - clock.millis())
delay(failure.failureInterval)

val victims = victimSelector.select(hosts, failure.failureIntensity)
scope.launch {
fault.apply(victims, failure.failureDuration)
}

fault.apply(victims, failure.failureDuration)
}
} while (repeat)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public final class ComputeService implements AutoCloseable {
*/
private final SplittableRandom random = new SplittableRandom(0);

private final int maxNumFailures;

/**
* A flag to indicate that the service is closed.
*/
Expand Down Expand Up @@ -162,6 +164,7 @@ public void onStateChanged(@NotNull Host host, @NotNull Task task, @NotNull Task

serviceTask.setState(newState);

// TODO: move the removal of tasks when max Failures are reached to here
if (newState == TaskState.TERMINATED || newState == TaskState.DELETED || newState == TaskState.ERROR) {
LOGGER.info("task {} {} {} finished", task.getUid(), task.getName(), task.getFlavor());

Expand Down Expand Up @@ -196,10 +199,11 @@ public void onStateChanged(@NotNull Host host, @NotNull Task task, @NotNull Task
/**
* Construct a {@link ComputeService} instance.
*/
ComputeService(Dispatcher dispatcher, ComputeScheduler scheduler, Duration quantum) {
ComputeService(Dispatcher dispatcher, ComputeScheduler scheduler, Duration quantum, int maxNumFailures) {
this.clock = dispatcher.getTimeSource();
this.scheduler = scheduler;
this.pacer = new Pacer(dispatcher, quantum.toMillis(), (time) -> doSchedule());
this.maxNumFailures = maxNumFailures;
}

/**
Expand Down Expand Up @@ -365,8 +369,16 @@ private void doSchedule() {
}

final ServiceTask task = request.task;
// Check if all dependencies are met
// otherwise continue

// Remove task from scheduling if it has failed too many times
if (task.getNumFailures() > maxNumFailures) {
LOGGER.warn("Failed to spawn {}: Task has failed more than the allowed {} times", task, maxNumFailures);

taskQueue.poll();
tasksPending--;
task.setState(TaskState.TERMINATED);
continue;
}

final ServiceFlavor flavor = task.getFlavor();
final HostView hv = scheduler.select(request.task);
Expand Down Expand Up @@ -425,6 +437,7 @@ public static class Builder {
private final Dispatcher dispatcher;
private final ComputeScheduler computeScheduler;
private Duration quantum = Duration.ofMinutes(5);
private int maxNumFailures = 10;

Builder(Dispatcher dispatcher, ComputeScheduler computeScheduler) {
this.dispatcher = dispatcher;
Expand All @@ -439,11 +452,16 @@ public Builder withQuantum(Duration quantum) {
return this;
}

public Builder withMaxNumFailures(int maxNumFailures) {
this.maxNumFailures = maxNumFailures;
return this;
}

/**
* Build a {@link ComputeService}.
*/
public ComputeService build() {
return new ComputeService(dispatcher, computeScheduler, quantum);
return new ComputeService(dispatcher, computeScheduler, quantum, maxNumFailures);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public final class ServiceTask implements Task {
Host host = null;
private ComputeService.SchedulingRequest request = null;

private int numFailures = 0;

ServiceTask(
ComputeService service,
UUID uid,
Expand Down Expand Up @@ -232,14 +234,19 @@ public String toString() {
return "Task[uid=" + uid + ",name=" + name + ",state=" + state + "]";
}

void setState(TaskState state) {
if (this.state != state) {
for (TaskWatcher watcher : watchers) {
watcher.onStateChanged(this, state);
}
void setState(TaskState newState) {
if (this.state == newState) {
return;
}

for (TaskWatcher watcher : watchers) {
watcher.onStateChanged(this, newState);
}
if (newState == TaskState.ERROR) {
this.numFailures++;
}

this.state = state;
this.state = newState;
}

/**
Expand All @@ -252,4 +259,9 @@ private void cancelProvisioningRequest() {
request.isCancelled = true;
}
}

@Override
public int getNumFailures() {
return this.numFailures;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ internal class ComputeServiceTest {
filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)),
weighers = listOf(RamWeigher()),
)
service = ComputeService(scope.dispatcher, computeScheduler, Duration.ofMinutes(5))
service = ComputeService(scope.dispatcher, computeScheduler, Duration.ofMinutes(5), 10)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ public class ComputeServiceProvisioningStep internal constructor(
private val serviceDomain: String,
private val scheduler: (ProvisioningContext) -> ComputeScheduler,
private val schedulingQuantum: Duration,
private val maxNumFailures: Int,
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
val service =
ComputeService.builder(ctx.dispatcher, scheduler(ctx))
.withQuantum(schedulingQuantum)
.withMaxNumFailures(maxNumFailures)
.build()
ctx.registry.register(serviceDomain, ComputeService::class.java, service)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public fun setupComputeService(
serviceDomain: String,
scheduler: (ProvisioningContext) -> ComputeScheduler,
schedulingQuantum: Duration = Duration.ofMinutes(5),
maxNumFailures: Int = 10,
): ProvisioningStep {
return ComputeServiceProvisioningStep(serviceDomain, scheduler, schedulingQuantum)
return ComputeServiceProvisioningStep(serviceDomain, scheduler, schedulingQuantum, maxNumFailures)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ internal class SimHostTest {
override val name: String,
override val flavor: Flavor,
override val image: Image,
override val numFailures: Int = 10,
) : Task {
override val labels: Map<String, String> = emptyMap()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import mu.KotlinLogging
import org.opendc.common.Dispatcher
import org.opendc.common.asCoroutineDispatcher
import org.opendc.compute.api.Task
import org.opendc.compute.api.TaskState
import org.opendc.compute.carbon.CarbonTrace
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
Expand Down Expand Up @@ -422,11 +423,11 @@ public class ComputeMetricReader(
*/
private class TaskTableReaderImpl(
private val service: ComputeService,
task: Task,
private val task: Task,
private val startTime: Duration = Duration.ofMillis(0),
) : TaskTableReader {
override fun copy(): TaskTableReader {
val newTaskTable = TaskTableReaderImpl(service, _task)
val newTaskTable = TaskTableReaderImpl(service, task)
newTaskTable.setValues(this)

return newTaskTable
Expand All @@ -448,14 +449,14 @@ public class ComputeMetricReader(
_provisionTime = table.provisionTime
_bootTime = table.bootTime
_bootTimeAbsolute = table.bootTimeAbsolute
}

private val _task = task
_taskState = table.taskState
}

/**
* The static information about this task.
*/
override val task =
override val taskInfo =
TaskInfo(
task.uid.toString(),
task.name,
Expand Down Expand Up @@ -527,18 +528,22 @@ public class ComputeMetricReader(
get() = _bootTimeAbsolute
private var _bootTimeAbsolute: Instant? = null

override val taskState: TaskState?
get() = _taskState
private var _taskState: TaskState? = null

/**
* Record the next cycle.
*/
fun record(now: Instant) {
val newHost = service.lookupHost(_task)
val newHost = service.lookupHost(task)
if (newHost != null && newHost.uid != _host?.uid) {
_host = newHost
host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity)
}

val cpuStats = _host?.getCpuStats(_task)
val sysStats = _host?.getSystemStats(_task)
val cpuStats = _host?.getCpuStats(task)
val sysStats = _host?.getSystemStats(task)

_timestamp = now
_timestampAbsolute = now + startTime
Expand All @@ -550,9 +555,11 @@ public class ComputeMetricReader(
_cpuLostTime = cpuStats?.lostTime ?: 0
_uptime = sysStats?.uptime?.toMillis() ?: 0
_downtime = sysStats?.downtime?.toMillis() ?: 0
_provisionTime = _task.launchedAt
_provisionTime = task.launchedAt
_bootTime = sysStats?.bootTime

_taskState = task.state

if (sysStats != null) {
_bootTimeAbsolute = sysStats.bootTime + startTime
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public object DfltTaskExportColumns {
Types.required(BINARY)
.`as`(LogicalTypeAnnotation.stringType())
.named("task_id"),
) { Binary.fromString(it.task.id) }
) { Binary.fromString(it.taskInfo.id) }

public val HOST_ID: ExportColumn<TaskTableReader> =
ExportColumn(
Expand All @@ -80,17 +80,17 @@ public object DfltTaskExportColumns {
Types.required(BINARY)
.`as`(LogicalTypeAnnotation.stringType())
.named("task_name"),
) { Binary.fromString(it.task.name) }
) { Binary.fromString(it.taskInfo.name) }

public val CPU_COUNT: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.required(INT32).named("cpu_count"),
) { it.task.cpuCount }
) { it.taskInfo.cpuCount }

public val MEM_CAPACITY: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.required(INT64).named("mem_capacity"),
) { it.task.memCapacity }
) { it.taskInfo.memCapacity }

public val CPU_LIMIT: ExportColumn<TaskTableReader> =
ExportColumn(
Expand Down Expand Up @@ -142,6 +142,14 @@ public object DfltTaskExportColumns {
field = Types.optional(INT64).named("boot_time_absolute"),
) { it.bootTimeAbsolute?.toEpochMilli() }

public val TASK_STATE: ExportColumn<TaskTableReader> =
ExportColumn(
field =
Types.optional(BINARY)
.`as`(LogicalTypeAnnotation.stringType())
.named("task_state"),
) { Binary.fromString(it.taskState?.name) }

/**
* The columns that are always included in the output file.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

package org.opendc.compute.telemetry.table

import org.opendc.compute.api.TaskState
import org.opendc.compute.telemetry.export.parquet.DfltTaskExportColumns
import org.opendc.trace.util.parquet.exporter.Exportable
import java.time.Instant
Expand All @@ -47,7 +48,7 @@ public interface TaskTableReader : Exportable {
/**
* The [TaskInfo] of the task to which the row belongs to.
*/
public val task: TaskInfo
public val taskInfo: TaskInfo

/**
* The [HostInfo] of the host on which the task is hosted or `null` if it has no host.
Expand Down Expand Up @@ -103,6 +104,11 @@ public interface TaskTableReader : Exportable {
* The duration (in seconds) of CPU time that was lost due to interference.
*/
public val cpuLostTime: Long

/**
* The state of the task
*/
public val taskState: TaskState?
}

// Loads the default export fields for deserialization whenever this file is loaded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,6 @@ public suspend fun ComputeService.replay(
val checkpointDuration = checkpointModelSpec?.checkpointDuration ?: 0L
val checkpointIntervalScaling = checkpointModelSpec?.checkpointIntervalScaling ?: 1.0

// val workload = SimRuntimeWorkload(
// entry.duration,
// 1.0,
// checkpointTime,
// checkpointWait
// )

val workload = entry.trace.createWorkload(start, checkpointInterval, checkpointDuration, checkpointIntervalScaling)
val meta = mutableMapOf<String, Any>("workload" to workload)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public fun runScenario(
setupComputeService(
serviceDomain,
{ createComputeScheduler(scenario.allocationPolicySpec.policyType, Random(it.seeder.nextLong())) },
maxNumFailures = scenario.maxNumFailures,
),
setupHosts(serviceDomain, topology, optimize = true),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,19 @@ public fun getExperiment(experimentSpec: ExperimentSpec): List<Scenario> {
val scenario =
Scenario(
id = scenarioID,
name = scenarioID.toString(),
outputFolder = outputFolder,
runs = experimentSpec.runs,
initialSeed = experimentSpec.initialSeed,
computeExportConfig = scenarioSpec.computeExportConfig,
topologySpec = scenarioSpec.topology,
workloadSpec = scenarioSpec.workload,
allocationPolicySpec = scenarioSpec.allocationPolicy,
exportModelSpec = scenarioSpec.exportModel,
failureModelSpec = scenarioSpec.failureModel,
checkpointModelSpec = scenarioSpec.checkpointModel,
carbonTracePath = scenarioSpec.carbonTracePath,
exportModelSpec = scenarioSpec.exportModel,
outputFolder = outputFolder,
name = scenarioID.toString(),
runs = experimentSpec.runs,
initialSeed = experimentSpec.initialSeed,
computeExportConfig = scenarioSpec.computeExportConfig,
maxNumFailures = scenarioSpec.maxNumFailures,
)
trackScenario(scenarioSpec, outputFolder)
scenarios.add(scenario)
Expand Down
Loading

0 comments on commit 5047e4a

Please sign in to comment.