Skip to content

Commit

Permalink
Improve e2e tests - Fix continue-as-new (#14)
Browse files Browse the repository at this point in the history
* improve e2e tests - fix continue-as-new

* increase test time out

* add config for testing single test

* fix more issues
  • Loading branch information
kaibocai authored Dec 12, 2023
1 parent 09db47b commit b9a8fc3
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 93 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"scripts": {
"test": "jest --runInBand --detectOpenHandles",
"test:unit": "jest test/unit --runInBand --detectOpenHandles",
"test:one": "jest test/unit --runInBand --detectOpenHandles --testNamePattern",
"test:e2e": "jest test/e2e --runInBand --detectOpenHandles",
"start": "ts-node --swc ./src/index.ts",
"dev": "nodemon --watch './src/**/*.ts' --exec 'ts-node --swc' ./src/index.ts"
Expand Down
2 changes: 2 additions & 0 deletions src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ export function getMethodNameForAction(action: pb.OrchestratorAction): string {
return "createTimer";
case pb.OrchestratorAction.OrchestratoractiontypeCase.CREATESUBORCHESTRATION:
return "callSubOrchestrator";
case pb.OrchestratorAction.OrchestratoractiontypeCase.COMPLETEORCHESTRATION:
return "completeOrchestration"
default:
throw new Error(`Unknown action type: ${actionType}`);
}
Expand Down
13 changes: 10 additions & 3 deletions src/worker/runtime-orchestration-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
// start the generator
const { value, done } = await this._generator.next();

// if the generator finished, complete the orchestration.
if (done) {
this.setComplete(value, pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
return;
}

// TODO: check if the task is null?
this._previousTask = value;
}
Expand Down Expand Up @@ -144,9 +150,10 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
}

setFailed(e: Error) {
if (this._isComplete) {
return;
}
// should allow orchestration to fail, even it's completed.
// if (this._isComplete) {
// return;
// }

this._isComplete = true;
this._completionStatus = pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED;
Expand Down
250 changes: 165 additions & 85 deletions test/e2e/orchestration.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { spawn } from "child_process";
import { TaskHubGrpcClient } from "../../src/client";
import { OrchestrationStatus } from "../../src/proto/orchestrator_service_pb";
import { getName, whenAll, whenAny } from "../../src/task";
Expand Down Expand Up @@ -45,9 +44,6 @@ describe("Durable Functions", () => {
expect(state?.instanceId).toEqual(id);
expect(state?.failureDetails).toBeUndefined();
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.serializedInput).toBeUndefined();
expect(state?.serializedOutput).toBeUndefined();
expect(state?.serializedCustomStatus).toBeUndefined();
});

it("should be able to run an activity sequence", async () => {
Expand Down Expand Up @@ -81,7 +77,69 @@ describe("Durable Functions", () => {
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.serializedInput).toEqual(JSON.stringify(1));
expect(state?.serializedOutput).toEqual(JSON.stringify([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]));
expect(state?.serializedCustomStatus).toBeUndefined();
}, 31000);

it("should be able to run fan-out/fan-in", async () => {
let activityCounter = 0;

const increment = (ctx: ActivityContext, _: any) => {
activityCounter++;
};

const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, count: number): any {
// Fan out to multiple sub-orchestrations
const tasks = [];

for (let i = 0; i < count; i++) {
tasks.push(ctx.callActivity(increment));
}

// Wait for all the sub-orchestrations to complete
yield whenAll(tasks);
};

taskHubWorker.addActivity(increment);
taskHubWorker.addOrchestrator(orchestrator);
await taskHubWorker.start();

const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 10);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 10);

expect(state);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.failureDetails).toBeUndefined();
expect(activityCounter).toEqual(10);
}, 31000);

it("should be able to use the sub-orchestration", async () => {
let activityCounter = 0;

const increment = (ctx: ActivityContext, _: any) => {
activityCounter++;
};

const orchestratorChild: TOrchestrator = async function* (ctx: OrchestrationContext, activityCount: number): any {
yield ctx.callActivity(increment);
};

const orchestratorParent: TOrchestrator = async function* (ctx: OrchestrationContext, count: number): any {
// Call sub-orchestration
yield ctx.callSubOrchestrator(orchestratorChild)

};

taskHubWorker.addActivity(increment);
taskHubWorker.addOrchestrator(orchestratorChild);
taskHubWorker.addOrchestrator(orchestratorParent);
await taskHubWorker.start();

const id = await taskHubClient.scheduleNewOrchestration(orchestratorParent, 10);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);

expect(state);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.failureDetails).toBeUndefined();
expect(activityCounter).toEqual(1);
}, 31000);

it("should be able to use the sub-orchestration for fan-out", async () => {
Expand Down Expand Up @@ -146,85 +204,102 @@ describe("Durable Functions", () => {
expect(state?.serializedOutput).toEqual(JSON.stringify(["a", "b", "c"]));
});

it("should wait for external events with a timeout", async () => {
for (const shouldRaiseEvent of [true, false]) {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any {
const approval = ctx.waitForExternalEvent("Approval");
const timeout = ctx.createTimer(3 * 1000);
const winner = yield whenAny([approval, timeout]);
it("should be able to run an single timer", async () => {
const delay = 3
const singleTimer: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any {
yield ctx.createTimer(delay)
};

if (winner == approval) {
return "approved";
} else {
return "timed out";
}
};
taskHubWorker.addOrchestrator(singleTimer);
await taskHubWorker.start();

taskHubWorker.addOrchestrator(orchestrator);
await taskHubWorker.start();
const id = await taskHubClient.scheduleNewOrchestration(singleTimer);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);

// Send events to the client immediately
const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
const expectedCompletionSecond = state?.createdAt?.getTime()! + delay * 1000;
const actualCompletionSecond = state?.lastUpdatedAt?.getTime();

expect(state);
expect(state?.name).toEqual(getName(singleTimer));
expect(state?.instanceId).toEqual(id);
expect(state?.failureDetails).toBeUndefined();
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.createdAt).toBeDefined();
expect(state?.lastUpdatedAt).toBeDefined();
expect(expectedCompletionSecond).toBeLessThanOrEqual(actualCompletionSecond!);
}, 31000);

it("should wait for external events with a timeout - true", async () => {
const shouldRaiseEvent = true
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any {
const approval = ctx.waitForExternalEvent("Approval");
const timeout = ctx.createTimer(3);
const winner = yield whenAny([approval, timeout]);

if (shouldRaiseEvent) {
taskHubClient.raiseOrchestrationEvent(id, "Approval");
if (winner == approval) {
return "approved";
} else {
return "timed out";
}
};

const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
taskHubWorker.addOrchestrator(orchestrator);
await taskHubWorker.start();

expect(state);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
// Send events to the client immediately
const id = await taskHubClient.scheduleNewOrchestration(orchestrator);

if (shouldRaiseEvent) {
expect(state?.serializedOutput).toEqual(JSON.stringify("approved"));
} else {
expect(state?.serializedOutput).toEqual(JSON.stringify("timed out"));
}
if (shouldRaiseEvent) {
taskHubClient.raiseOrchestrationEvent(id, "Approval");
}

const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);

expect(state);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);

if (shouldRaiseEvent) {
expect(state?.serializedOutput).toEqual(JSON.stringify("approved"));
} else {
expect(state?.serializedOutput).toEqual(JSON.stringify("timed out"));
}
}, 31000);

it("should be able to use suspend and resume", async () => {
it("should wait for external events with a timeout - false", async () => {
const shouldRaiseEvent = false
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any {
const res = yield ctx.waitForExternalEvent("my_event");
return res;
const approval = ctx.waitForExternalEvent("Approval");
const timeout = ctx.createTimer(3);
const winner = yield whenAny([approval, timeout]);

if (winner == approval) {
return "approved";
} else {
return "timed out";
}
};

taskHubWorker.addOrchestrator(orchestrator);
await taskHubWorker.start();

// Send events to the client immediately
const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
let state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
expect(state);

// Suspend the orchestration and wait for it to go into the SUSPENDED state
await taskHubClient.suspendOrchestration(id);

// TODO: is this needed in JS? We use a promise above
// while (state?.runtimeStatus == OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING) {
// await new Promise((resolve) => setTimeout(resolve, 100));
// state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
// expect(state);
// }

// Raise an event to the orchestration and confirm that it does NOT complete
taskHubClient.raiseOrchestrationEvent(id, "my_event", 42);

try {
state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 3);
// TODO
// assert False, "Orchestration should not have been completed"
} catch (e) {
// pass
if (shouldRaiseEvent) {
taskHubClient.raiseOrchestrationEvent(id, "Approval");
}

// Resume the orchestration and wait for it to complete
taskHubClient.resumeOrchestration(id);
state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);

expect(state);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.serializedOutput).toEqual(JSON.stringify(42));
});

if (shouldRaiseEvent) {
expect(state?.serializedOutput).toEqual(JSON.stringify("approved"));
} else {
expect(state?.serializedOutput).toEqual(JSON.stringify("timed out"));
}
}, 31000);

it("should be able to terminate an orchestration", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any {
Expand All @@ -241,44 +316,49 @@ describe("Durable Functions", () => {
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING);

taskHubClient.terminateOrchestration(id, "some reason for termination");
state = await taskHubClient.waitForOrchestrationStart(id, undefined, 30);
state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
expect(state);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED);
expect(state?.serializedOutput).toEqual(JSON.stringify("some reason for termination"));
});
}, 31000);

it("should allow to continue as new", async () => {
const allResults: any[] = [];

const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any {
const res = yield ctx.waitForExternalEvent("my_event");

if (!ctx.isReplaying) {
allResults.push(res);
}

if (allResults.length <= 4) {
ctx.continueAsNew(Math.max(...allResults), true);
if (input < 10) {
ctx.continueAsNew(input + 1, true);
} else {
return allResults;
return input;
}
};

taskHubWorker.addOrchestrator(orchestrator);
await taskHubWorker.start();

const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
taskHubClient.raiseOrchestrationEvent(id, "my_event", 1);
taskHubClient.raiseOrchestrationEvent(id, "my_event", 2);
taskHubClient.raiseOrchestrationEvent(id, "my_event", 3);
taskHubClient.raiseOrchestrationEvent(id, "my_event", 4);
taskHubClient.raiseOrchestrationEvent(id, "my_event", 5);
const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 1);

const state = await taskHubClient.waitForOrchestrationStart(id, undefined, 30);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
expect(state);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.serializedInput).toEqual(JSON.stringify(4));
expect(state?.serializedOutput).toEqual(JSON.stringify(allResults));
expect(allResults).toEqual([1, 2, 3, 4, 5]);
});
});
expect(state?.serializedOutput).toEqual(JSON.stringify(10));
}, 31000);

it("should be able to run an single orchestration without activity", async () => {
const sequence: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any {
return startVal + 1;
};

taskHubWorker.addOrchestrator(sequence);
await taskHubWorker.start();

const id = await taskHubClient.scheduleNewOrchestration(sequence, 15);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);

expect(state);
expect(state?.name).toEqual(getName(sequence));
expect(state?.instanceId).toEqual(id);
expect(state?.failureDetails).toBeUndefined();
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.serializedInput).toEqual(JSON.stringify(15));
expect(state?.serializedOutput).toEqual(JSON.stringify(16));
}, 31000);
});
Loading

0 comments on commit b9a8fc3

Please sign in to comment.