Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge instance by ID support #15

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import * as pb from "./proto/orchestrator_service_pb";
import * as stubs from "./proto/orchestrator_service_grpc_pb";
import { TOrchestrator } from "./types/orchestrator.type";
import { TInput } from "./types/input.type";
import { TOutput } from "./types/output.type";
import { getName } from "./task";
import { randomUUID } from "crypto";
import { promisify } from "util";
import { newOrchestrationState } from "./orchestration";
import { newOrchestrationState, newPurgeResult } from "./orchestration";
import { OrchestrationState } from "./orchestration/orchestration-state";
import { GrpcClient } from "./client-grpc";
import { OrchestrationStatus } from "./orchestration/enum/orchestration-status.enum";
import { TimeoutError } from "./exception/timeout-error";
import { PurgeResult } from "./orchestration/orchestration-purge-result";

export class TaskHubGrpcClient {
private _stub: stubs.TaskHubSidecarServiceClient;
Expand Down Expand Up @@ -191,4 +191,14 @@ export class TaskHubGrpcClient {
const prom = promisify(this._stub.resumeInstance.bind(this._stub));
await prom(req);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add doc for purge method

async purgeInstanceById(instanceId: string,): Promise<PurgeResult | undefined> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be purgeOrchestration? in sync with other methods

const req = new pb.PurgeInstancesRequest;
req.setInstanceid(instanceId);
console.log(`Purging Instance '${instanceId}'`);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, new line before/after console logs. New line after const prom = promisify... line.

const prom = promisify(this._stub.purgeInstances.bind(this._stub));
// Execute the request and wait for the first response or timeout
const res = (await prom(req)) as pb.PurgeInstancesResponse;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not create PurgeResult here directly?

return newPurgeResult(res);
}
}
13 changes: 13 additions & 0 deletions src/orchestration/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as pb from "../proto/orchestrator_service_pb";
import { FailureDetails } from "../task/failure-details";
import { OrchestrationStatus, parseGrpcValue } from "./enum/orchestration-status.enum";
import { PurgeResult } from "./orchestration-purge-result";
import { OrchestrationState } from "./orchestration-state";

export function newOrchestrationState(
Expand Down Expand Up @@ -54,3 +55,15 @@ export function newOrchestrationState(
failureDetails,
);
}

export function newPurgeResult(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this if we directly create PurgeResult above.

res: pb.PurgeInstancesResponse
): PurgeResult | undefined {
if (!res || !res.getDeletedinstancecount()) {
return;
}

return new PurgeResult(
res.getDeletedinstancecount(),
);
}
9 changes: 9 additions & 0 deletions src/orchestration/orchestration-purge-result.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export class PurgeResult {
deletedInstanceCount: number;

constructor(
deletedInstanceCount: number,
) {
this.deletedInstanceCount = deletedInstanceCount;
}
}
37 changes: 33 additions & 4 deletions test/e2e/orchestration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,22 +343,51 @@ describe("Durable Functions", () => {
}, 31000);

it("should be able to run an single orchestration without activity", async () => {
const sequence: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any {
return startVal + 1;
};

taskHubWorker.addOrchestrator(sequence);
taskHubWorker.addOrchestrator(orchestrator);
await taskHubWorker.start();

const id = await taskHubClient.scheduleNewOrchestration(sequence, 15);
const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 15);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);

expect(state);
expect(state?.name).toEqual(getName(sequence));
expect(state?.name).toEqual(getName(orchestrator));
expect(state?.instanceId).toEqual(id);
expect(state?.failureDetails).toBeUndefined();
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.serializedInput).toEqual(JSON.stringify(15));
expect(state?.serializedOutput).toEqual(JSON.stringify(16));
}, 31000);

it("should be able purge orchestration", async () => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit typo:

it("should be able to purge orchestration", async () => {

const plusOne = async (_: ActivityContext, input: number) => {
return input + 1;
};

const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any {
return yield ctx.callActivity(plusOne, startVal);
};

taskHubWorker.addOrchestrator(orchestrator);
taskHubWorker.addActivity(plusOne);
await taskHubWorker.start();

const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 1);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);

expect(state);
expect(state?.name).toEqual(getName(orchestrator));
expect(state?.instanceId).toEqual(id);
expect(state?.failureDetails).toBeUndefined();
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.serializedInput).toEqual(JSON.stringify(1));
expect(state?.serializedOutput).toEqual(JSON.stringify(2));

const purgeResult = await taskHubClient.purgeInstanceById(id);
expect(purgeResult);
expect(purgeResult?.deletedInstanceCount).toEqual(1);
}, 31000);
});
Loading