Skip to content

Commit

Permalink
feat: add compensation feature to deno context and utils
Browse files Browse the repository at this point in the history
  • Loading branch information
j03-dev committed Dec 19, 2024
1 parent ea8711f commit 4a06166
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 6 deletions.
7 changes: 7 additions & 0 deletions src/substantial/protocol/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ message Save {
}
}

message Compensation {
uint32 save_id = 1;
string error = 2;
bytes compensation_result = 3;
}

message Sleep {
uint32 id = 1;
google.protobuf.Timestamp start = 2;
Expand Down Expand Up @@ -58,6 +64,7 @@ message Event {
Send send = 13;
Stop stop = 14;
}
Compensation compensation = 15;
};

message Records {
Expand Down
7 changes: 6 additions & 1 deletion src/typegate/engine/runtime.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,12 @@ export type OperationEvent =
| { type: "Send"; event_name: string; value: unknown }
| { type: "Stop"; result: unknown }
| { type: "Start"; kwargs: Record<string, unknown> }
| { type: "Compensate" };
| {
type: "Compensate";
save_id: number;
error: string;
compensation_result: any;
};

export type Operation = { at: string; event: OperationEvent };

Expand Down
61 changes: 56 additions & 5 deletions src/typegate/src/runtimes/substantial/deno_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { make_internal } from "../../worker_utils.ts";
import { TaskContext } from "../deno/shared_types.ts";
import { appendIfOngoing, Interrupt, OperationEvent, Run } from "./types.ts";
import { randomUUID } from "../../crypto.ts";

// const isTest = Deno.env.get("DENO_TESTING") === "true";
const testBaseUrl = Deno.env.get("TEST_OVERRIDE_GQL_ORIGIN");
Expand All @@ -15,11 +16,14 @@ export class Context {
public kwargs = {};
gql: ReturnType<typeof createGQLClient>;
logger: SubLogger;
utils: Utils;
compensationStack: (() => any | Promise<any>)[] = [];

constructor(private run: Run, private internal: TaskContext) {
this.gql = createGQLClient(internal);
this.kwargs = getKwargsCopy(run);
this.logger = new SubLogger(this);
this.utils = new Utils(this);
}

#nextId() {
Expand All @@ -33,7 +37,14 @@ export class Context {
appendIfOngoing(this.run, { at: new Date().toJSON(), event: op });
}

async save<T>(fn: () => T | Promise<T>, option?: SaveOption) {
async save<T>(
fn: () => T | Promise<T>,
option?: SaveOption,
compensateWith?: () => T | Promise<T>,
) {
if (compensateWith) {
this.compensationStack.push(compensateWith!);
}
const id = this.#nextId();

let currRetryCount = 1;
Expand Down Expand Up @@ -76,6 +87,10 @@ export class Context {

return clonedResult;
} catch (err: any) {
if (option?.retry?.compensationOnfristFail) {
await this.#triggerCompensation(id, err);
throw err;
}
if (
option?.retry?.maxRetries &&
currRetryCount < option.retry.maxRetries
Expand Down Expand Up @@ -103,6 +118,7 @@ export class Context {

throw Interrupt.Variant("SAVE_RETRY");
} else {
await this.#triggerCompensation(id, err);
this.#appendOp({
type: "Save",
id,
Expand All @@ -120,6 +136,23 @@ export class Context {
}
}

async #triggerCompensation(save_id: number, error: string) {
const compensationStack = this.compensationStack;
if (compensationStack && compensationStack.length) {
compensationStack.reverse();
for (const compensationFn of compensationStack) {
const result = await Promise.resolve(compensationFn());
const clonedResult = deepClone(result ?? null);
this.#appendOp({
type: "Compensate",
save_id,
error,
compensation_result: clonedResult,
});
}
}
}

sleep(durationMs: number) {
const id = this.#nextId();
for (const { event } of this.run.operations) {
Expand Down Expand Up @@ -350,6 +383,7 @@ interface SaveOption {
minBackoffMs: number;
maxBackoffMs: number;
maxRetries: number;
compensationOnfristFail: boolean;
};
}

Expand Down Expand Up @@ -416,20 +450,19 @@ class RetryStrategy {
}
}


class SubLogger {
constructor(private ctx: Context) {}

async #log(kind: "warn" | "error" | "info", ...args: unknown[]) {
await this.ctx.save(() => {
const prefix = `[${kind.toUpperCase()}: ${this.ctx.getRun().run_id}]`;
switch(kind) {
switch (kind) {
case "warn": {
console.warn(prefix, ...args);
break;
}
case "error": {
console.error(prefix,...args);
console.error(prefix, ...args);
break;
}
default: {
Expand All @@ -444,7 +477,7 @@ class SubLogger {
// Functions are omitted,
// For example, JSON.stringify(() => 1234) => undefined (no throw)
return json === undefined ? String(arg) : json;
} catch(_) {
} catch (_) {
return String(arg);
}
}).join(" ");
Expand All @@ -466,6 +499,24 @@ class SubLogger {
}
}

class Utils {
constructor(private ctx: Context) {}

async now() {
return await this.ctx.save(() => Date.now());
}

async random(a: number, b: number) {
return await this.ctx.save(() =>
Math.floor(Math.random() * (b - a + 1)) + a
);
}

async uuid4() {
return await this.ctx.save(() => randomUUID());
}
}

function createGQLClient(internal: TaskContext) {
const tgLocal = new URL(internal.meta.url);
if (testBaseUrl) {
Expand Down

0 comments on commit 4a06166

Please sign in to comment.