Skip to content

Commit

Permalink
add compensation test
Browse files Browse the repository at this point in the history
  • Loading branch information
j03-dev committed Dec 29, 2024
1 parent 881b486 commit 04147ee
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 51 deletions.
12 changes: 6 additions & 6 deletions src/typegate/src/runtimes/substantial/deno_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ export class Context {
async save<T>(
fn: () => T | Promise<T>,
option?: SaveOption,
compensateWith?: () => T | Promise<T>,
) {
if (compensateWith) {
this.compensationStack.push(compensateWith!);
if (option?.compensateWith) {
this.compensationStack.push(option?.compensateWith);
}
const id = this.#nextId();

Expand Down Expand Up @@ -259,7 +258,7 @@ export class ChildWorkflowHandle {
constructor(
private ctx: Context,
public handleDef: SerializableWorkflowHandle,
) {}
) { }

async start(): Promise<string> {
const { data } = await this.ctx.gql /**/`
Expand Down Expand Up @@ -385,6 +384,7 @@ interface SaveOption {
maxRetries: number;
compensationOnfristFail: boolean;
};
compensateWith?: () => any | Promise<any>
}

function failAfter(ms: number): Promise<never> {
Expand Down Expand Up @@ -451,7 +451,7 @@ class RetryStrategy {
}

class SubLogger {
constructor(private ctx: Context) {}
constructor(private ctx: Context) { }

async #log(kind: "warn" | "error" | "info", ...args: unknown[]) {
await this.ctx.save(() => {
Expand Down Expand Up @@ -500,7 +500,7 @@ class SubLogger {
}

class Utils {
constructor(private ctx: Context) {}
constructor(private ctx: Context) { }

async now() {
return await this.ctx.save(() => Date.now());
Expand Down
100 changes: 61 additions & 39 deletions tests/runtimes/substantial/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,22 @@ export function basicTestTemplate(
.on(e);
},
);

const account_balance = 1000;

await t.should("compensate account balance", async () => {
await gql`
mutation {
start_compensation(kwargs: { account: $account_balance })
}
`
.withVars({ account_balance })
.expectBody((body) => {
const account = body.data?.start_compensation! as number;
assertEquals(account, account_balance);
})
.on(e);
});
},
);
}
Expand Down Expand Up @@ -299,8 +315,8 @@ export function concurrentWorkflowTestTemplate(
const localSorter = (a: any, b: any) =>
a.run_id.localeCompare(b.run_id);

const received = body?.data?.results?.completed?.runs ??
([] as Array<any>);
const received =
body?.data?.results?.completed?.runs ?? ([] as Array<any>);
const expected = [
{
result: {
Expand Down Expand Up @@ -461,8 +477,8 @@ export function retrySaveTestTemplate(
const localSorter = (a: any, b: any) =>
a.run_id.localeCompare(b.run_id);

const received = body?.data?.results?.completed?.runs ??
([] as Array<any>);
const received =
body?.data?.results?.completed?.runs ?? ([] as Array<any>);
const expected = [
{
result: {
Expand Down Expand Up @@ -624,42 +640,48 @@ export function childWorkflowTestTemplate(
.on(e);
});


await t.should(`filter the runs given a nested expr (${backendName})`, async () => {
await gql`
query {
search(name: "bumpPackage", filter: $filter) {
# started_at
# ended_at
status
value
await t.should(
`filter the runs given a nested expr (${backendName})`,
async () => {
await gql`
query {
search(name: "bumpPackage", filter: $filter) {
# started_at
# ended_at
status
value
}
}
}
`
.withVars({
filter: {
or: [
{
and: [
{ status: { contains: JSON.stringify("COMPL") }},
{ contains: JSON.stringify("substantial") }
]
},
{ not: { not: { eq: JSON.stringify("Bump typegraph v3 => v4") } } }
]
} satisfies Expr
})
.expectBody((body) => {
const sorted = body.data.search.sort((a: any, b: any) => a.value.localeCompare(b.value));
assertEquals(sorted,
[
{ status: "COMPLETED", value: '"Bump substantial v2 => v3"' },
{ status: "COMPLETED", value: '"Bump typegraph v3 => v4"' }
]
);
})
.on(e);
});
`
.withVars({
filter: {
or: [
{
and: [
{ status: { contains: JSON.stringify("COMPL") } },
{ contains: JSON.stringify("substantial") },
],
},
{
not: {
not: { eq: JSON.stringify("Bump typegraph v3 => v4") },
},
},
],
} satisfies Expr,
})
.expectBody((body) => {
const sorted = body.data.search.sort((a: any, b: any) =>
a.value.localeCompare(b.value),
);
assertEquals(sorted, [
{ status: "COMPLETED", value: '"Bump substantial v2 => v3"' },
{ status: "COMPLETED", value: '"Bump typegraph v3 => v4"' },
]);
})
.on(e);
},
);
},
);
}
Expand Down
3 changes: 2 additions & 1 deletion tests/runtimes/substantial/imports/common_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export interface Context {
) => Promise<Record<string, unknown>>;
};
sleep: (ms: number) => void;
save<T>(fn: () => T | Promise<T>, option?: SaveOption, compensateWith?: () => T | Promise<T>): Promise<T>;
save<T>(fn: () => T | Promise<T>, option?: SaveOption): Promise<T>;
receive<O>(eventName: string): O;
handle<I, O>(
eventName: string,
Expand Down Expand Up @@ -80,6 +80,7 @@ export interface SaveOption {
maxBackoffMs: number;
maxRetries: number;
};
compensateWith?: () => any | Promise<any>
}

export interface Utils {
Expand Down
9 changes: 9 additions & 0 deletions tests/runtimes/substantial/substantial.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def substantial(g: Graph):
"retryExample",
"secretsExample",
"accidentalInputMutation",
"compensation",
]
)
.build()
Expand Down Expand Up @@ -84,4 +85,12 @@ def substantial(g: Graph):
)
).reduce({"name": "accidentalInputMutation"}),
**sub.internals(),
# compensation
start_compensation=sub.start(
t.struct(
{
"account": t.integer(),
}
)
).reduce({"name": "compensation"}),
)
40 changes: 35 additions & 5 deletions tests/runtimes/substantial/workflows/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,9 @@ export async function accidentalInputMutation(ctx: Context) {
if (front.innerField == mutValue) {
// Should throw on shallow clones
throw new Error(
`actual kwargs was mutated after interrupts: copy ${
JSON.stringify(
copy,
)
`actual kwargs was mutated after interrupts: copy ${JSON.stringify(
copy,
)
}, ${mutValue}`,
);
}
Expand All @@ -149,5 +148,36 @@ export async function accidentalInputMutation(ctx: Context) {
}

export async function compensation(ctx: Context) {
// ctx.utils.now()
const { account } = ctx.kwargs;

const debitAccount = (value: number) => {
return account - value;
};

const creditAccount = (value: number) => {
return account + value;
};

const risky_transaction = () => {
throw Error("Transaction Failed");
};

await ctx.save(() => debitAccount(4), {
compensateWith: () => creditAccount(4),
});
await ctx.save(() => debitAccount(10), {
compensateWith: () => creditAccount(10),
});
await ctx.save(() => {
debitAccount(2);
risky_transaction();
}, {
compensateWith: () => creditAccount(4),
});

await ctx.save(() => debitAccount(100), {
compensateWith: () => creditAccount(100),
});

return account;
}

0 comments on commit 04147ee

Please sign in to comment.