Skip to content

Commit

Permalink
Merge pull request #168 from DanteNiewenhuis/greenifier-demo
Browse files Browse the repository at this point in the history
Updated the simulation to let servers run until they are finished.
  • Loading branch information
DanteNiewenhuis authored Nov 15, 2023
2 parents 2fc71b8 + 1513efe commit 05141e9
Show file tree
Hide file tree
Showing 15 changed files with 1,376 additions and 615 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void enqueue() {

// We assume that the scheduler runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// We calculate here the delay until the next scheduling slot.
long timeUntilNextSlot = quantumMs - (now % quantumMs);
long timeUntilNextSlot = (quantumMs - (now % quantumMs)) % quantumMs;

handle = dispatcher.scheduleCancellable(timeUntilNextSlot, () -> {
process.accept(now + timeUntilNextSlot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.opendc.compute.simulator.SimWorkloadMapper
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.kernel.SimVirtualMachine
import org.opendc.simulator.compute.workload.SimWorkload
import java.time.Duration
import java.time.Instant
import java.time.InstantSource
Expand Down Expand Up @@ -169,7 +170,8 @@ internal class Guest(

onStart()

val workload = mapper.createWorkload(server)
val workload: SimWorkload = mapper.createWorkload(server)
workload.setOffset(clock.millis())
val meta = mapOf("driver" to host, "server" to server) + server.meta
ctx = machine.startWorkload(workload, meta) { cause ->
onStop(if (cause != null) ServerState.ERROR else ServerState.TERMINATED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import java.io.File
import java.lang.ref.SoftReference
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.time.Duration
import java.time.Instant
import kotlin.math.max
import kotlin.math.roundToLong

Expand Down Expand Up @@ -82,14 +84,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
while (reader.nextRow()) {
val id = reader.getString(idCol)!!
val time = reader.getInstant(timestampCol)!!
val duration = reader.getDuration(durationCol)!!
val durationMs = reader.getDuration(durationCol)!!
val cores = reader.getInt(coresCol)
val cpuUsage = reader.getDouble(usageCol)

val deadlineMs = time.toEpochMilli()
val timeMs = (time - duration).toEpochMilli()
val builder = fragments.computeIfAbsent(id) { Builder() }
builder.add(timeMs, deadlineMs, cpuUsage, cores)
builder.add(time, durationMs, cpuUsage, cores)
}

fragments
Expand Down Expand Up @@ -246,17 +246,17 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
* @param usage CPU usage of this fragment.
* @param cores Number of cores used.
*/
fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
val duration = max(0, deadline - timestamp)
totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs
fun add(deadline: Instant, duration: Duration, usage: Double, cores: Int) {
val startTimeMs = (deadline - duration).toEpochMilli()
totalLoad += (usage * duration.toMillis()) / 1000.0 // avg MHz * duration = MFLOPs

if (timestamp != previousDeadline) {
if ((startTimeMs != previousDeadline) && (previousDeadline != Long.MIN_VALUE)) {
// There is a gap between the previous and current fragment; fill the gap
builder.add(timestamp, 0.0, cores)
builder.add(startTimeMs, 0.0, cores)
}

builder.add(deadline, usage, cores)
previousDeadline = deadline
builder.add(deadline.toEpochMilli(), usage, cores)
previousDeadline = deadline.toEpochMilli()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,47 @@ package org.opendc.experiments.compute
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.yield
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.api.ServerWatcher
import org.opendc.compute.service.ComputeService
import java.time.InstantSource
import java.util.Random
import kotlin.coroutines.coroutineContext
import kotlin.math.max

public class RunningServerWatcher: ServerWatcher {

private val _mutex: Mutex = Mutex();

public suspend fun lock () {
_mutex.lock()
}

public suspend fun wait () {
// TODO: look at the better way to wait for an unlock
this.lock();
}

override fun onStateChanged(server: Server, newState: ServerState) {
when (newState) {
ServerState.TERMINATED -> {
_mutex.unlock()
}
ServerState.ERROR -> {
_mutex.unlock()
}
ServerState.DELETED -> {
_mutex.unlock()
}
else -> {}
}
}

}

/**
* Helper method to replay the specified list of [VirtualMachine] and suspend execution util all VMs have finished.
*
Expand Down Expand Up @@ -63,25 +97,26 @@ public suspend fun ComputeService.replay(
// Start the fault injector
injector?.start()

var offset = Long.MIN_VALUE
var simulationOffset = Long.MIN_VALUE

for (entry in trace.sortedBy { it.startTime }) {
val now = clock.millis()
val start = entry.startTime.toEpochMilli()

if (offset < 0) {
offset = start - now
// Set the simulationOffset based on the starting time of the first server
if (simulationOffset == Long.MIN_VALUE) {
simulationOffset = start - now
}

// Make sure the trace entries are ordered by submission time
assert(start - offset >= 0) { "Invalid trace order" }
// assert(start - simulationOffset >= 0) { "Invalid trace order" }

// Delay the server based on the startTime given by the trace.
if (!submitImmediately) {
delay(max(0, (start - offset) - now))
delay(max(0, (start - now - simulationOffset)));
}

val workloadOffset = -offset + 300001
val workload = entry.trace.createWorkload(workloadOffset)
val workload = entry.trace.createWorkload(start)
val meta = mutableMapOf<String, Any>("workload" to workload)

val interferenceProfile = entry.interferenceProfile
Expand All @@ -102,16 +137,18 @@ public suspend fun ComputeService.replay(
meta = meta
)

// Wait for the server reach its end time
val endTime = entry.stopTime.toEpochMilli()
delay(endTime + workloadOffset - clock.millis() + (5 * 60 * 10000))
val serverWatcher = RunningServerWatcher()
serverWatcher.lock()
server.watch(serverWatcher)

// Wait until the server is terminated
serverWatcher.wait()

// Stop the server after reaching the end-time of the virtual machine
server.stop()
server.delete()
}
}
}

yield()
} finally {
injector?.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,46 +79,47 @@ public class ComputeMetricReader(
*/
private val job = scope.launch {
val intervalMs = exportInterval.toMillis()
val service = service
val monitor = monitor
val hostTableReaders = hostTableReaders
val serverTableReaders = serverTableReaders
val serviceTableReader = serviceTableReader

try {
while (isActive) {
delay(intervalMs)

try {
val now = clock.instant()

for (host in service.hosts) {
val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) }
reader.record(now)
monitor.record(reader.copy())
reader.reset()
}

for (server in service.servers) {
val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
reader.record(now)
monitor.record(reader)
reader.reset()
}

serviceTableReader.record(now)
monitor.record(serviceTableReader)
} catch (cause: Throwable) {
logger.warn(cause) { "Exporter threw an Exception" }
}
loggState()
}

} finally {
// loggState()

if (monitor is AutoCloseable) {
monitor.close()
}
}
}

private fun loggState() {
try {
val now = this.clock.instant()

for (host in this.service.hosts) {
val reader = this.hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) }
reader.record(now)
this.monitor.record(reader.copy())
reader.reset()
}

for (server in this.service.servers) {
val reader = this.serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) }
reader.record(now)
this.monitor.record(reader.copy())
reader.reset()
}

this.serviceTableReader.record(now)
monitor.record(this.serviceTableReader.copy())
} catch (cause: Throwable) {
this.logger.warn(cause) { "Exporter threw an Exception" }
}
}

override fun close() {
job.cancel()
}
Expand All @@ -127,6 +128,27 @@ public class ComputeMetricReader(
* An aggregator for service metrics before they are reported.
*/
private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader {

override fun copy(): ServiceTableReader {
val newServiceTable = ServiceTableReaderImpl(service)
newServiceTable.setValues(this)

return newServiceTable
}

override fun setValues(table: ServiceTableReader) {
_timestamp = table.timestamp

_hostsUp = table.hostsUp
_hostsDown = table.hostsDown
_serversTotal = table.serversTotal
_serversPending = table.serversPending
_serversActive = table.serversActive
_attemptsSuccess = table.attemptsSuccess
_attemptsFailure = table.attemptsFailure
_attemptsError = table.attemptsError
}

private var _timestamp: Instant = Instant.MIN
override val timestamp: Instant
get() = _timestamp
Expand Down Expand Up @@ -362,18 +384,18 @@ public class ComputeMetricReader(
}

override fun setValues(table: ServerTableReader) {
host = table.host

_timestamp = table.timestamp
_uptime = table.uptime
_downtime = table.downtime
_provisionTime = table.provisionTime
_bootTime = table.bootTime
_cpuLimit = table.cpuLimit
_cpuActiveTime = table.cpuActiveTime
_cpuIdleTime = table.cpuIdleTime
_cpuStealTime = table.cpuStealTime
_cpuLostTime = table.cpuLostTime

host = table.host
_uptime = table.uptime
_downtime = table.downtime
_provisionTime = table.provisionTime
_bootTime = table.bootTime
}

private val _server = server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import java.time.Instant
* An interface that is used to read a row of a service trace entry.
*/
public interface ServiceTableReader {

public fun copy(): ServiceTableReader

public fun setValues(table: ServiceTableReader)

/**
* The timestamp of the current entry of the reader.
*/
Expand Down
Loading

0 comments on commit 05141e9

Please sign in to comment.