From 04147ee2475cb58688873512e4d166b5067ff1e0 Mon Sep 17 00:00:00 2001 From: j03-dev <24nomeniavo@gmail.com> Date: Sun, 29 Dec 2024 18:40:09 +0300 Subject: [PATCH] add compensation test --- .../src/runtimes/substantial/deno_context.ts | 12 +-- tests/runtimes/substantial/common.ts | 100 +++++++++++------- .../substantial/imports/common_types.ts | 3 +- tests/runtimes/substantial/substantial.py | 9 ++ .../substantial/workflows/workflow.ts | 40 ++++++- 5 files changed, 113 insertions(+), 51 deletions(-) diff --git a/src/typegate/src/runtimes/substantial/deno_context.ts b/src/typegate/src/runtimes/substantial/deno_context.ts index eaa84d639..373a14a2b 100644 --- a/src/typegate/src/runtimes/substantial/deno_context.ts +++ b/src/typegate/src/runtimes/substantial/deno_context.ts @@ -40,10 +40,9 @@ export class Context { async save( fn: () => T | Promise, option?: SaveOption, - compensateWith?: () => T | Promise, ) { - if (compensateWith) { - this.compensationStack.push(compensateWith!); + if (option?.compensateWith) { + this.compensationStack.push(option?.compensateWith); } const id = this.#nextId(); @@ -259,7 +258,7 @@ export class ChildWorkflowHandle { constructor( private ctx: Context, public handleDef: SerializableWorkflowHandle, - ) {} + ) { } async start(): Promise { const { data } = await this.ctx.gql /**/` @@ -385,6 +384,7 @@ interface SaveOption { maxRetries: number; compensationOnfristFail: boolean; }; + compensateWith?: () => any | Promise } function failAfter(ms: number): Promise { @@ -451,7 +451,7 @@ class RetryStrategy { } class SubLogger { - constructor(private ctx: Context) {} + constructor(private ctx: Context) { } async #log(kind: "warn" | "error" | "info", ...args: unknown[]) { await this.ctx.save(() => { @@ -500,7 +500,7 @@ class SubLogger { } class Utils { - constructor(private ctx: Context) {} + constructor(private ctx: Context) { } async now() { return await this.ctx.save(() => Date.now()); diff --git a/tests/runtimes/substantial/common.ts b/tests/runtimes/substantial/common.ts index ffd012a41..922888f3b 100644 --- a/tests/runtimes/substantial/common.ts +++ b/tests/runtimes/substantial/common.ts @@ -154,6 +154,22 @@ export function basicTestTemplate( .on(e); }, ); + + const account_balance = 1000; + + await t.should("compensate account balance", async () => { + await gql` + mutation { + start_compensation(kwargs: { account: $account_balance }) + } + ` + .withVars({ account_balance }) + .expectBody((body) => { + const account = body.data?.start_compensation! as number; + assertEquals(account, account_balance); + }) + .on(e); + }); }, ); } @@ -299,8 +315,8 @@ export function concurrentWorkflowTestTemplate( const localSorter = (a: any, b: any) => a.run_id.localeCompare(b.run_id); - const received = body?.data?.results?.completed?.runs ?? - ([] as Array); + const received = + body?.data?.results?.completed?.runs ?? ([] as Array); const expected = [ { result: { @@ -461,8 +477,8 @@ export function retrySaveTestTemplate( const localSorter = (a: any, b: any) => a.run_id.localeCompare(b.run_id); - const received = body?.data?.results?.completed?.runs ?? - ([] as Array); + const received = + body?.data?.results?.completed?.runs ?? ([] as Array); const expected = [ { result: { @@ -624,42 +640,48 @@ export function childWorkflowTestTemplate( .on(e); }); - - await t.should(`filter the runs given a nested expr (${backendName})`, async () => { - await gql` - query { - search(name: "bumpPackage", filter: $filter) { - # started_at - # ended_at - status - value + await t.should( + `filter the runs given a nested expr (${backendName})`, + async () => { + await gql` + query { + search(name: "bumpPackage", filter: $filter) { + # started_at + # ended_at + status + value + } } - } - ` - .withVars({ - filter: { - or: [ - { - and: [ - { status: { contains: JSON.stringify("COMPL") }}, - { contains: JSON.stringify("substantial") } - ] - }, - { not: { not: { eq: JSON.stringify("Bump typegraph v3 => v4") } } } - ] - } satisfies Expr - }) - .expectBody((body) => { - const sorted = body.data.search.sort((a: any, b: any) => a.value.localeCompare(b.value)); - assertEquals(sorted, - [ - { status: "COMPLETED", value: '"Bump substantial v2 => v3"' }, - { status: "COMPLETED", value: '"Bump typegraph v3 => v4"' } - ] - ); - }) - .on(e); - }); + ` + .withVars({ + filter: { + or: [ + { + and: [ + { status: { contains: JSON.stringify("COMPL") } }, + { contains: JSON.stringify("substantial") }, + ], + }, + { + not: { + not: { eq: JSON.stringify("Bump typegraph v3 => v4") }, + }, + }, + ], + } satisfies Expr, + }) + .expectBody((body) => { + const sorted = body.data.search.sort((a: any, b: any) => + a.value.localeCompare(b.value), + ); + assertEquals(sorted, [ + { status: "COMPLETED", value: '"Bump substantial v2 => v3"' }, + { status: "COMPLETED", value: '"Bump typegraph v3 => v4"' }, + ]); + }) + .on(e); + }, + ); }, ); } diff --git a/tests/runtimes/substantial/imports/common_types.ts b/tests/runtimes/substantial/imports/common_types.ts index f68b7a197..e800fb2d3 100644 --- a/tests/runtimes/substantial/imports/common_types.ts +++ b/tests/runtimes/substantial/imports/common_types.ts @@ -31,7 +31,7 @@ export interface Context { ) => Promise>; }; sleep: (ms: number) => void; - save(fn: () => T | Promise, option?: SaveOption, compensateWith?: () => T | Promise): Promise; + save(fn: () => T | Promise, option?: SaveOption): Promise; receive(eventName: string): O; handle( eventName: string, @@ -80,6 +80,7 @@ export interface SaveOption { maxBackoffMs: number; maxRetries: number; }; + compensateWith?: () => any | Promise } export interface Utils { diff --git a/tests/runtimes/substantial/substantial.py b/tests/runtimes/substantial/substantial.py index 3eb55ae68..2f16cab75 100644 --- a/tests/runtimes/substantial/substantial.py +++ b/tests/runtimes/substantial/substantial.py @@ -31,6 +31,7 @@ def substantial(g: Graph): "retryExample", "secretsExample", "accidentalInputMutation", + "compensation", ] ) .build() @@ -84,4 +85,12 @@ def substantial(g: Graph): ) ).reduce({"name": "accidentalInputMutation"}), **sub.internals(), + # compensation + start_compensation=sub.start( + t.struct( + { + "account": t.integer(), + } + ) + ).reduce({"name": "compensation"}), ) diff --git a/tests/runtimes/substantial/workflows/workflow.ts b/tests/runtimes/substantial/workflows/workflow.ts index 4c548e38f..51dd9c12a 100644 --- a/tests/runtimes/substantial/workflows/workflow.ts +++ b/tests/runtimes/substantial/workflows/workflow.ts @@ -128,10 +128,9 @@ export async function accidentalInputMutation(ctx: Context) { if (front.innerField == mutValue) { // Should throw on shallow clones throw new Error( - `actual kwargs was mutated after interrupts: copy ${ - JSON.stringify( - copy, - ) + `actual kwargs was mutated after interrupts: copy ${JSON.stringify( + copy, + ) }, ${mutValue}`, ); } @@ -149,5 +148,36 @@ export async function accidentalInputMutation(ctx: Context) { } export async function compensation(ctx: Context) { - // ctx.utils.now() + const { account } = ctx.kwargs; + + const debitAccount = (value: number) => { + return account - value; + }; + + const creditAccount = (value: number) => { + return account + value; + }; + + const risky_transaction = () => { + throw Error("Transaction Failed"); + }; + + await ctx.save(() => debitAccount(4), { + compensateWith: () => creditAccount(4), + }); + await ctx.save(() => debitAccount(10), { + compensateWith: () => creditAccount(10), + }); + await ctx.save(() => { + debitAccount(2); + risky_transaction(); + }, { + compensateWith: () => creditAccount(4), + }); + + await ctx.save(() => debitAccount(100), { + compensateWith: () => creditAccount(100), + }); + + return account; }