Skip to content

Commit

Permalink
add purge instance by ID support (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaibocai authored Dec 12, 2023
1 parent b9a8fc3 commit 355a611
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 6 deletions.
14 changes: 12 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import * as pb from "./proto/orchestrator_service_pb";
import * as stubs from "./proto/orchestrator_service_grpc_pb";
import { TOrchestrator } from "./types/orchestrator.type";
import { TInput } from "./types/input.type";
import { TOutput } from "./types/output.type";
import { getName } from "./task";
import { randomUUID } from "crypto";
import { promisify } from "util";
import { newOrchestrationState } from "./orchestration";
import { newOrchestrationState, newPurgeResult } from "./orchestration";
import { OrchestrationState } from "./orchestration/orchestration-state";
import { GrpcClient } from "./client-grpc";
import { OrchestrationStatus } from "./orchestration/enum/orchestration-status.enum";
import { TimeoutError } from "./exception/timeout-error";
import { PurgeResult } from "./orchestration/orchestration-purge-result";

export class TaskHubGrpcClient {
private _stub: stubs.TaskHubSidecarServiceClient;
Expand Down Expand Up @@ -191,4 +191,14 @@ export class TaskHubGrpcClient {
const prom = promisify(this._stub.resumeInstance.bind(this._stub));
await prom(req);
}

async purgeInstanceById(instanceId: string,): Promise<PurgeResult | undefined> {
const req = new pb.PurgeInstancesRequest;
req.setInstanceid(instanceId);
console.log(`Purging Instance '${instanceId}'`);
const prom = promisify(this._stub.purgeInstances.bind(this._stub));
// Execute the request and wait for the first response or timeout
const res = (await prom(req)) as pb.PurgeInstancesResponse;
return newPurgeResult(res);
}
}
13 changes: 13 additions & 0 deletions src/orchestration/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as pb from "../proto/orchestrator_service_pb";
import { FailureDetails } from "../task/failure-details";
import { OrchestrationStatus, parseGrpcValue } from "./enum/orchestration-status.enum";
import { PurgeResult } from "./orchestration-purge-result";
import { OrchestrationState } from "./orchestration-state";

export function newOrchestrationState(
Expand Down Expand Up @@ -54,3 +55,15 @@ export function newOrchestrationState(
failureDetails,
);
}

export function newPurgeResult(
res: pb.PurgeInstancesResponse
): PurgeResult | undefined {
if (!res || !res.getDeletedinstancecount()) {
return;
}

return new PurgeResult(
res.getDeletedinstancecount(),
);
}
9 changes: 9 additions & 0 deletions src/orchestration/orchestration-purge-result.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export class PurgeResult {
deletedInstanceCount: number;

constructor(
deletedInstanceCount: number,
) {
this.deletedInstanceCount = deletedInstanceCount;
}
}
37 changes: 33 additions & 4 deletions test/e2e/orchestration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,22 +343,51 @@ describe("Durable Functions", () => {
}, 31000);

it("should be able to run an single orchestration without activity", async () => {
const sequence: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any {
return startVal + 1;
};

taskHubWorker.addOrchestrator(sequence);
taskHubWorker.addOrchestrator(orchestrator);
await taskHubWorker.start();

const id = await taskHubClient.scheduleNewOrchestration(sequence, 15);
const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 15);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);

expect(state);
expect(state?.name).toEqual(getName(sequence));
expect(state?.name).toEqual(getName(orchestrator));
expect(state?.instanceId).toEqual(id);
expect(state?.failureDetails).toBeUndefined();
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.serializedInput).toEqual(JSON.stringify(15));
expect(state?.serializedOutput).toEqual(JSON.stringify(16));
}, 31000);

it("should be able purge orchestration", async () => {
const plusOne = async (_: ActivityContext, input: number) => {
return input + 1;
};

const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any {
return yield ctx.callActivity(plusOne, startVal);
};

taskHubWorker.addOrchestrator(orchestrator);
taskHubWorker.addActivity(plusOne);
await taskHubWorker.start();

const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 1);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);

expect(state);
expect(state?.name).toEqual(getName(orchestrator));
expect(state?.instanceId).toEqual(id);
expect(state?.failureDetails).toBeUndefined();
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.serializedInput).toEqual(JSON.stringify(1));
expect(state?.serializedOutput).toEqual(JSON.stringify(2));

const purgeResult = await taskHubClient.purgeInstanceById(id);
expect(purgeResult);
expect(purgeResult?.deletedInstanceCount).toEqual(1);
}, 31000);
});

0 comments on commit 355a611

Please sign in to comment.