Skip to content

Commit

Permalink
feat(sim/compute): Add support for snapshotting workloads
Browse files Browse the repository at this point in the history
This change updates the interface of `SimWorkload` to support
snapshotting workloads. We introduce a new method `snapshot()` to this
interface which returns a new `SimWorkload` that can be started at a
later point in time and on another `SimMachine`, which continues
progress from the moment the workload was snapshotted.
  • Loading branch information
fabianishere committed Oct 31, 2022
1 parent 587a5af commit b28c536
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public class SimHost(

val bootWorkload = bootModel.get()
val hypervisor = hypervisor
val hypervisorWorkload = object : SimWorkload {
val hypervisorWorkload = object : SimWorkload by hypervisor {
override fun onStart(ctx: SimMachineContext) {
try {
_bootTime = clock.instant()
Expand All @@ -296,10 +296,6 @@ public class SimHost(
throw cause
}
}

override fun onStop(ctx: SimMachineContext) {
hypervisor.onStop(ctx)
}
}

val workload = if (bootWorkload != null) SimWorkloads.chain(bootWorkload, hypervisorWorkload) else hypervisorWorkload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public class SimTFDevice(
output = null
}

override fun snapshot(): SimWorkload = throw UnsupportedOperationException()

override fun onUpdate(ctx: FlowStage, now: Long): Long {
val output = output ?: return Long.MAX_VALUE
val lastPull = lastPull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public final Map<String, Object> getMeta() {
return meta;
}

@Override
public SimWorkload snapshot() {
return workload.snapshot();
}

@Override
public void reset() {
final FlowGraph graph = getMemory().getInput().getGraph();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ public interface SimMachineContext {
*/
List<? extends SimStorageInterface> getStorageInterfaces();

/**
* Create a snapshot of the {@link SimWorkload} running on this machine.
*
* @throws UnsupportedOperationException if the workload does not support snapshotting.
*/
SimWorkload snapshot();

/**
* Reset all resources of the machine.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ public void onStop(SimMachineContext ctx) {
}
}

@Override
public SimWorkload snapshot() {
throw new UnsupportedOperationException("Unable to snapshot hypervisor");
}

/**
* The context which carries the state when the hypervisor is running on a machine.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,20 @@ final class SimChainWorkload implements SimWorkload {
* Construct a {@link SimChainWorkload} instance.
*
* @param workloads The workloads to chain.
* @param activeWorkloadIndex The index of the active workload.
*/
SimChainWorkload(SimWorkload... workloads) {
SimChainWorkload(SimWorkload[] workloads, int activeWorkloadIndex) {
this.workloads = workloads;
this.activeWorkloadIndex = activeWorkloadIndex;
}

/**
* Construct a {@link SimChainWorkload} instance.
*
* @param workloads The workloads to chain.
*/
SimChainWorkload(SimWorkload... workloads) {
this(workloads, 0);
}

@Override
Expand Down Expand Up @@ -79,6 +90,19 @@ public void onStop(SimMachineContext ctx) {
tryThrow(context.doStop(workloads[activeWorkloadIndex]));
}

@Override
public SimChainWorkload snapshot() {
final int activeWorkloadIndex = this.activeWorkloadIndex;
final SimWorkload[] workloads = this.workloads;
final SimWorkload[] newWorkloads = new SimWorkload[workloads.length - activeWorkloadIndex];

for (int i = 0; i < newWorkloads.length; i++) {
newWorkloads[i] = workloads[activeWorkloadIndex + i].snapshot();
}

return new SimChainWorkload(newWorkloads, 0);
}

/**
* A {@link SimMachineContext} that intercepts the shutdown calls.
*/
Expand Down Expand Up @@ -119,6 +143,12 @@ public List<? extends SimStorageInterface> getStorageInterfaces() {
return ctx.getStorageInterfaces();
}

@Override
public SimWorkload snapshot() {
final SimWorkload workload = workloads[activeWorkloadIndex];
return workload.snapshot();
}

@Override
public void reset() {
ctx.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic {

this.flops = flops;
this.utilization = utilization;
this.remainingAmount = flops;
}

@Override
Expand Down Expand Up @@ -98,8 +99,13 @@ public void onStop(SimMachineContext ctx) {
}

@Override
public String toString() {
return "SimFlopsWorkload[FLOPs=" + flops + ",utilization=" + utilization + "]";
public SimFlopsWorkload snapshot() {
final FlowStage stage = this.stage;
if (stage != null) {
stage.sync();
}

return new SimFlopsWorkload((long) remainingAmount, utilization);
}

@Override
Expand All @@ -125,7 +131,7 @@ public long onUpdate(FlowStage ctx, long now) {
float remainingAmount = this.remainingAmount - consumed;
this.remainingAmount = remainingAmount;

long duration = (long) Math.ceil(remainingAmount / limit * 1000);
long duration = (long) Math.ceil(remainingAmount / limit * 0.001);

if (duration <= 0) {
final SimMachineContext machineContext = this.ctx;
Expand All @@ -138,4 +144,9 @@ public long onUpdate(FlowStage ctx, long now) {

return now + duration;
}

@Override
public String toString() {
return "SimFlopsWorkload[FLOPs=" + flops + ",utilization=" + utilization + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class SimRuntimeWorkload implements SimWorkload, FlowStageLogic {

this.duration = duration;
this.utilization = utilization;
this.remainingDuration = duration;
}

@Override
Expand Down Expand Up @@ -97,6 +98,16 @@ public void onStop(SimMachineContext ctx) {
}
}

@Override
public SimRuntimeWorkload snapshot() {
final FlowStage stage = this.stage;
if (stage != null) {
stage.sync();
}

return new SimRuntimeWorkload(remainingDuration, utilization);
}

@Override
public long onUpdate(FlowStage ctx, long now) {
long lastUpdate = this.lastUpdate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private SimTrace(double[] usageCol, long[] deadlineCol, int[] coresCol, int size
* @param offset The offset for the timestamps.
*/
public SimWorkload createWorkload(long offset) {
return new Workload(offset, usageCol, deadlineCol, coresCol, size);
return new Workload(offset, usageCol, deadlineCol, coresCol, size, 0);
}

/**
Expand Down Expand Up @@ -211,22 +211,24 @@ private static class Workload implements SimWorkload {
private final long[] deadlineCol;
private final int[] coresCol;
private final int size;
private final int index;

private Workload(long offset, double[] usageCol, long[] deadlineCol, int[] coresCol, int size) {
private Workload(long offset, double[] usageCol, long[] deadlineCol, int[] coresCol, int size, int index) {
this.offset = offset;
this.usageCol = usageCol;
this.deadlineCol = deadlineCol;
this.coresCol = coresCol;
this.size = size;
this.index = index;
}

@Override
public void onStart(SimMachineContext ctx) {
final WorkloadStageLogic logic;
if (ctx.getCpus().size() == 1) {
logic = new SingleWorkloadLogic(ctx, offset, usageCol, deadlineCol, size);
logic = new SingleWorkloadLogic(ctx, offset, usageCol, deadlineCol, size, index);
} else {
logic = new MultiWorkloadLogic(ctx, offset, usageCol, deadlineCol, coresCol, size);
logic = new MultiWorkloadLogic(ctx, offset, usageCol, deadlineCol, coresCol, size, index);
}
this.logic = logic;
}
Expand All @@ -240,6 +242,18 @@ public void onStop(SimMachineContext ctx) {
logic.getStage().close();
}
}

@Override
public SimWorkload snapshot() {
final WorkloadStageLogic logic = this.logic;
int index = this.index;

if (logic != null) {
index = logic.getIndex();
}

return new Workload(offset, usageCol, deadlineCol, coresCol, size, index);
}
}

/**
Expand All @@ -250,6 +264,11 @@ private interface WorkloadStageLogic extends FlowStageLogic {
* Return the {@link FlowStage} belonging to this instance.
*/
FlowStage getStage();

/**
* Return the current index of the workload.
*/
int getIndex();
}

/**
Expand All @@ -268,12 +287,13 @@ private static class SingleWorkloadLogic implements WorkloadStageLogic {
private final SimMachineContext ctx;

private SingleWorkloadLogic(
SimMachineContext ctx, long offset, double[] usageCol, long[] deadlineCol, int size) {
SimMachineContext ctx, long offset, double[] usageCol, long[] deadlineCol, int size, int index) {
this.ctx = ctx;
this.offset = offset;
this.usageCol = usageCol;
this.deadlineCol = deadlineCol;
this.size = size;
this.index = index;

final FlowGraph graph = ctx.getGraph();
final List<? extends SimProcessingUnit> cpus = ctx.getCpus();
Expand Down Expand Up @@ -315,6 +335,11 @@ public FlowStage getStage() {
return stage;
}

@Override
public int getIndex() {
return index;
}

/**
* Helper method to stop the execution of the workload.
*/
Expand Down Expand Up @@ -346,13 +371,20 @@ private static class MultiWorkloadLogic implements WorkloadStageLogic {
private final SimMachineContext ctx;

private MultiWorkloadLogic(
SimMachineContext ctx, long offset, double[] usageCol, long[] deadlineCol, int[] coresCol, int size) {
SimMachineContext ctx,
long offset,
double[] usageCol,
long[] deadlineCol,
int[] coresCol,
int size,
int index) {
this.ctx = ctx;
this.offset = offset;
this.usageCol = usageCol;
this.deadlineCol = deadlineCol;
this.coresCol = coresCol;
this.size = size;
this.index = index;

final FlowGraph graph = ctx.getGraph();
final List<? extends SimProcessingUnit> cpus = ctx.getCpus();
Expand Down Expand Up @@ -418,5 +450,10 @@ public long onUpdate(FlowStage ctx, long now) {
public FlowStage getStage() {
return stage;
}

@Override
public int getIndex() {
return index;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,9 @@ public interface SimWorkload {
* @param ctx The execution context in which the machine runs.
*/
void onStop(SimMachineContext ctx);

/**
* Create a snapshot of this workload.
*/
SimWorkload snapshot();
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class SimMachineTest {
}

override fun onStop(ctx: SimMachineContext) {}

override fun snapshot(): SimWorkload = TODO()
})
}

Expand All @@ -197,6 +199,8 @@ class SimMachineTest {
}

override fun onStop(ctx: SimMachineContext) {}

override fun snapshot(): SimWorkload = TODO()
})
}

Expand All @@ -217,6 +221,8 @@ class SimMachineTest {
}

override fun onStop(ctx: SimMachineContext) {}

override fun snapshot(): SimWorkload = TODO()
})

assertEquals(1000, clock.millis())
Expand All @@ -243,6 +249,8 @@ class SimMachineTest {
}

override fun onStop(ctx: SimMachineContext) {}

override fun snapshot(): SimWorkload = TODO()
})

assertEquals(40, clock.millis())
Expand All @@ -266,6 +274,8 @@ class SimMachineTest {
}

override fun onStop(ctx: SimMachineContext) {}

override fun snapshot(): SimWorkload = TODO()
})

assertEquals(4000, clock.millis())
Expand All @@ -289,6 +299,8 @@ class SimMachineTest {
}

override fun onStop(ctx: SimMachineContext) {}

override fun snapshot(): SimWorkload = TODO()
})

assertEquals(4000, clock.millis())
Expand Down
Loading

0 comments on commit b28c536

Please sign in to comment.