From b9a8fc36378dd0b45683258f0c24474773d3b178 Mon Sep 17 00:00:00 2001 From: kaibocai <89094811+kaibocai@users.noreply.github.com> Date: Tue, 12 Dec 2023 11:00:17 -0600 Subject: [PATCH] Improve e2e tests - Fix continue-as-new (#14) * improve e2e tests - fix continue-as-new * increase test time out * add config for testing single test * fix more issues --- package.json | 1 + src/worker/index.ts | 2 + src/worker/runtime-orchestration-context.ts | 13 +- test/e2e/orchestration.spec.ts | 250 +++++++++++++------- test/unit/orchestration_executor.spec.ts | 10 +- 5 files changed, 183 insertions(+), 93 deletions(-) diff --git a/package.json b/package.json index 5141c69..2ddc1ca 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "scripts": { "test": "jest --runInBand --detectOpenHandles", "test:unit": "jest test/unit --runInBand --detectOpenHandles", + "test:one": "jest test/unit --runInBand --detectOpenHandles --testNamePattern", "test:e2e": "jest test/e2e --runInBand --detectOpenHandles", "start": "ts-node --swc ./src/index.ts", "dev": "nodemon --watch './src/**/*.ts' --exec 'ts-node --swc' ./src/index.ts" diff --git a/src/worker/index.ts b/src/worker/index.ts index 5401523..af610cb 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -47,6 +47,8 @@ export function getMethodNameForAction(action: pb.OrchestratorAction): string { return "createTimer"; case pb.OrchestratorAction.OrchestratoractiontypeCase.CREATESUBORCHESTRATION: return "callSubOrchestrator"; + case pb.OrchestratorAction.OrchestratoractiontypeCase.COMPLETEORCHESTRATION: + return "completeOrchestration" default: throw new Error(`Unknown action type: ${actionType}`); } diff --git a/src/worker/runtime-orchestration-context.ts b/src/worker/runtime-orchestration-context.ts index f432985..c953470 100644 --- a/src/worker/runtime-orchestration-context.ts +++ b/src/worker/runtime-orchestration-context.ts @@ -70,6 +70,12 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { // start the generator const { value, done } = await this._generator.next(); + // if the generator finished, complete the orchestration. + if (done) { + this.setComplete(value, pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + return; + } + // TODO: check if the task is null? this._previousTask = value; } @@ -144,9 +150,10 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { } setFailed(e: Error) { - if (this._isComplete) { - return; - } + // should allow orchestration to fail, even it's completed. + // if (this._isComplete) { + // return; + // } this._isComplete = true; this._completionStatus = pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED; diff --git a/test/e2e/orchestration.spec.ts b/test/e2e/orchestration.spec.ts index 854104c..ddfbbce 100644 --- a/test/e2e/orchestration.spec.ts +++ b/test/e2e/orchestration.spec.ts @@ -1,4 +1,3 @@ -import { spawn } from "child_process"; import { TaskHubGrpcClient } from "../../src/client"; import { OrchestrationStatus } from "../../src/proto/orchestrator_service_pb"; import { getName, whenAll, whenAny } from "../../src/task"; @@ -45,9 +44,6 @@ describe("Durable Functions", () => { expect(state?.instanceId).toEqual(id); expect(state?.failureDetails).toBeUndefined(); expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); - expect(state?.serializedInput).toBeUndefined(); - expect(state?.serializedOutput).toBeUndefined(); - expect(state?.serializedCustomStatus).toBeUndefined(); }); it("should be able to run an activity sequence", async () => { @@ -81,7 +77,69 @@ describe("Durable Functions", () => { expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(state?.serializedInput).toEqual(JSON.stringify(1)); expect(state?.serializedOutput).toEqual(JSON.stringify([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])); - expect(state?.serializedCustomStatus).toBeUndefined(); + }, 31000); + + it("should be able to run fan-out/fan-in", async () => { + let activityCounter = 0; + + const increment = (ctx: ActivityContext, _: any) => { + activityCounter++; + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, count: number): any { + // Fan out to multiple sub-orchestrations + const tasks = []; + + for (let i = 0; i < count; i++) { + tasks.push(ctx.callActivity(increment)); + } + + // Wait for all the sub-orchestrations to complete + yield whenAll(tasks); + }; + + taskHubWorker.addActivity(increment); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 10); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 10); + + expect(state); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(activityCounter).toEqual(10); + }, 31000); + + it("should be able to use the sub-orchestration", async () => { + let activityCounter = 0; + + const increment = (ctx: ActivityContext, _: any) => { + activityCounter++; + }; + + const orchestratorChild: TOrchestrator = async function* (ctx: OrchestrationContext, activityCount: number): any { + yield ctx.callActivity(increment); + }; + + const orchestratorParent: TOrchestrator = async function* (ctx: OrchestrationContext, count: number): any { + // Call sub-orchestration + yield ctx.callSubOrchestrator(orchestratorChild) + + }; + + taskHubWorker.addActivity(increment); + taskHubWorker.addOrchestrator(orchestratorChild); + taskHubWorker.addOrchestrator(orchestratorParent); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestratorParent, 10); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(activityCounter).toEqual(1); }, 31000); it("should be able to use the sub-orchestration for fan-out", async () => { @@ -146,47 +204,79 @@ describe("Durable Functions", () => { expect(state?.serializedOutput).toEqual(JSON.stringify(["a", "b", "c"])); }); - it("should wait for external events with a timeout", async () => { - for (const shouldRaiseEvent of [true, false]) { - const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { - const approval = ctx.waitForExternalEvent("Approval"); - const timeout = ctx.createTimer(3 * 1000); - const winner = yield whenAny([approval, timeout]); + it("should be able to run an single timer", async () => { + const delay = 3 + const singleTimer: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any { + yield ctx.createTimer(delay) + }; - if (winner == approval) { - return "approved"; - } else { - return "timed out"; - } - }; + taskHubWorker.addOrchestrator(singleTimer); + await taskHubWorker.start(); - taskHubWorker.addOrchestrator(orchestrator); - await taskHubWorker.start(); + const id = await taskHubClient.scheduleNewOrchestration(singleTimer); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); - // Send events to the client immediately - const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const expectedCompletionSecond = state?.createdAt?.getTime()! + delay * 1000; + const actualCompletionSecond = state?.lastUpdatedAt?.getTime(); + + expect(state); + expect(state?.name).toEqual(getName(singleTimer)); + expect(state?.instanceId).toEqual(id); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.createdAt).toBeDefined(); + expect(state?.lastUpdatedAt).toBeDefined(); + expect(expectedCompletionSecond).toBeLessThanOrEqual(actualCompletionSecond!); + }, 31000); + + it("should wait for external events with a timeout - true", async () => { + const shouldRaiseEvent = true + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { + const approval = ctx.waitForExternalEvent("Approval"); + const timeout = ctx.createTimer(3); + const winner = yield whenAny([approval, timeout]); - if (shouldRaiseEvent) { - taskHubClient.raiseOrchestrationEvent(id, "Approval"); + if (winner == approval) { + return "approved"; + } else { + return "timed out"; } + }; - const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); - expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + // Send events to the client immediately + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); - if (shouldRaiseEvent) { - expect(state?.serializedOutput).toEqual(JSON.stringify("approved")); - } else { - expect(state?.serializedOutput).toEqual(JSON.stringify("timed out")); - } + if (shouldRaiseEvent) { + taskHubClient.raiseOrchestrationEvent(id, "Approval"); + } + + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + + if (shouldRaiseEvent) { + expect(state?.serializedOutput).toEqual(JSON.stringify("approved")); + } else { + expect(state?.serializedOutput).toEqual(JSON.stringify("timed out")); } }, 31000); - it("should be able to use suspend and resume", async () => { + it("should wait for external events with a timeout - false", async () => { + const shouldRaiseEvent = false const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { - const res = yield ctx.waitForExternalEvent("my_event"); - return res; + const approval = ctx.waitForExternalEvent("Approval"); + const timeout = ctx.createTimer(3); + const winner = yield whenAny([approval, timeout]); + + if (winner == approval) { + return "approved"; + } else { + return "timed out"; + } }; taskHubWorker.addOrchestrator(orchestrator); @@ -194,37 +284,22 @@ describe("Durable Functions", () => { // Send events to the client immediately const id = await taskHubClient.scheduleNewOrchestration(orchestrator); - let state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); - expect(state); - // Suspend the orchestration and wait for it to go into the SUSPENDED state - await taskHubClient.suspendOrchestration(id); - - // TODO: is this needed in JS? We use a promise above - // while (state?.runtimeStatus == OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING) { - // await new Promise((resolve) => setTimeout(resolve, 100)); - // state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); - // expect(state); - // } - - // Raise an event to the orchestration and confirm that it does NOT complete - taskHubClient.raiseOrchestrationEvent(id, "my_event", 42); - - try { - state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 3); - // TODO - // assert False, "Orchestration should not have been completed" - } catch (e) { - // pass + if (shouldRaiseEvent) { + taskHubClient.raiseOrchestrationEvent(id, "Approval"); } - // Resume the orchestration and wait for it to complete - taskHubClient.resumeOrchestration(id); - state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + expect(state); expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); - expect(state?.serializedOutput).toEqual(JSON.stringify(42)); - }); + + if (shouldRaiseEvent) { + expect(state?.serializedOutput).toEqual(JSON.stringify("approved")); + } else { + expect(state?.serializedOutput).toEqual(JSON.stringify("timed out")); + } + }, 31000); it("should be able to terminate an orchestration", async () => { const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { @@ -241,44 +316,49 @@ describe("Durable Functions", () => { expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING); taskHubClient.terminateOrchestration(id, "some reason for termination"); - state = await taskHubClient.waitForOrchestrationStart(id, undefined, 30); + state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); expect(state); expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); expect(state?.serializedOutput).toEqual(JSON.stringify("some reason for termination")); - }); + }, 31000); it("should allow to continue as new", async () => { - const allResults: any[] = []; - const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { - const res = yield ctx.waitForExternalEvent("my_event"); - - if (!ctx.isReplaying) { - allResults.push(res); - } - - if (allResults.length <= 4) { - ctx.continueAsNew(Math.max(...allResults), true); + if (input < 10) { + ctx.continueAsNew(input + 1, true); } else { - return allResults; + return input; } }; taskHubWorker.addOrchestrator(orchestrator); await taskHubWorker.start(); - const id = await taskHubClient.scheduleNewOrchestration(orchestrator); - taskHubClient.raiseOrchestrationEvent(id, "my_event", 1); - taskHubClient.raiseOrchestrationEvent(id, "my_event", 2); - taskHubClient.raiseOrchestrationEvent(id, "my_event", 3); - taskHubClient.raiseOrchestrationEvent(id, "my_event", 4); - taskHubClient.raiseOrchestrationEvent(id, "my_event", 5); + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 1); - const state = await taskHubClient.waitForOrchestrationStart(id, undefined, 30); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); expect(state); expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); - expect(state?.serializedInput).toEqual(JSON.stringify(4)); - expect(state?.serializedOutput).toEqual(JSON.stringify(allResults)); - expect(allResults).toEqual([1, 2, 3, 4, 5]); - }); -}); + expect(state?.serializedOutput).toEqual(JSON.stringify(10)); + }, 31000); + + it("should be able to run an single orchestration without activity", async () => { + const sequence: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any { + return startVal + 1; + }; + + taskHubWorker.addOrchestrator(sequence); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(sequence, 15); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state); + expect(state?.name).toEqual(getName(sequence)); + expect(state?.instanceId).toEqual(id); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.serializedInput).toEqual(JSON.stringify(15)); + expect(state?.serializedOutput).toEqual(JSON.stringify(16)); + }, 31000); +}); \ No newline at end of file diff --git a/test/unit/orchestration_executor.spec.ts b/test/unit/orchestration_executor.spec.ts index 60447dc..b124a17 100644 --- a/test/unit/orchestration_executor.spec.ts +++ b/test/unit/orchestration_executor.spec.ts @@ -99,9 +99,9 @@ describe("Orchestration Executor", () => { expect(actions[0]?.getCreatetimer()?.getFireat()?.toDate()).toEqual(expectedFireAt); }); it("should test the resumption of a task using a timerFired event", async () => { - const delayOrchestrator: TOrchestrator = async (ctx: OrchestrationContext, _: any) => { + const delayOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any) :any { const dueTime = new Date(ctx.currentUtcDateTime.getTime() + 1000); - await ctx.createTimer(dueTime); + yield ctx.createTimer(dueTime); return "done"; }; const registry = new Registry(); @@ -369,7 +369,7 @@ describe("Orchestration Executor", () => { // user_code_statement = "ctx.call_sub_orchestrator(suborchestrator)" // assert user_code_statement in complete_action.failureDetails.stackTrace.value }); - it("should test the non-determinism detection when a sub-orchestration action is encountered when it shouldn't be", async () => { + it("should test the non-determinism detection when a sub-orchestration action is encountered when it shouldn't be-subOrchestrator", async () => { const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { const res = new CompletableTask(); // dummy task return res; @@ -399,7 +399,7 @@ describe("Orchestration Executor", () => { * This variation tests the case where the expected task type is wrong (e.g. the code schedules a timer task * but the history contains a sub-orchestration completed task) */ - it("should test the non-determinism detection when a sub-orchestration action is encountered when it shouldn't be", async () => { + it("should test the non-determinism detection when a sub-orchestration action is encountered when it shouldn't be-timer", async () => { const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { const res = yield ctx.createTimer(new Date()); // Created timer but history expects sub-orchestration return res; @@ -607,7 +607,7 @@ describe("Orchestration Executor", () => { tasks.push(ctx.callActivity(hello, i.toString())); } - const results = whenAll(tasks); + const results = yield whenAll(tasks); return results; };