Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
Change-type: patch
  • Loading branch information
joshbwlng committed Feb 8, 2024
1 parent d4c18b7 commit 40033b4
Show file tree
Hide file tree
Showing 2 changed files with 320 additions and 77 deletions.
90 changes: 49 additions & 41 deletions src/sbvr-api/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const handlers: {
export const taskStatuses = ['pending', 'cancelled', 'success', 'failed'];
export interface Task {
id: number;
created_at: Date;
modified_at: Date;
is_created_by__actor: number;
is_executed_by__handler: string;
is_executed_with__parameter_set: object | null;
Expand Down Expand Up @@ -124,7 +126,7 @@ export const setup = async () => {
}
};

// Register a new task handler
// Register a task handler
export function addTaskHandler(name: string, fn: TaskHandler['fn']): void {
handlers[name] = {
name,
Expand Down Expand Up @@ -166,7 +168,10 @@ function watch(): void {
WHERE
t."is executed by-handler" IN (${binds}) AND
t."status" = 'pending' AND
(t."is scheduled to execute on-time" IS NULL OR t."is scheduled to execute on-time" <= NOW())
(
t."is scheduled to execute on-time" IS NULL OR
t."is scheduled to execute on-time" <= CURRENT_TIMESTAMP + INTERVAL '${Math.ceil(tasksEnv.queueIntervalMS / 1000)} second'
)
ORDER BY
t."is scheduled to execute on-time" ASC,
t."priority" DESC,
Expand Down Expand Up @@ -194,32 +199,17 @@ async function execute(
): Promise<void> {
try {
const handler = handlers[task.is_executed_by__handler];
const startedOnTime = new Date();
if (handler == null) {
await client.patch({
resource: 'task',
passthrough: {
tx,
},
id: task.id,
body: {
started_on__time: new Date(),
ended_on__time: new Date(),
status: 'failed',
error_message: 'Matching task handler not found',
},
});
await finalizeTask(
client,
tx,
task.id,
startedOnTime,
'failed',
'Matching task handler not found',
);
return;
} else {
await client.patch({
resource: 'task',
passthrough: {
tx,
},
id: task.id,
body: {
started_on__time: new Date(),
},
});
}

const result = await handler.fn({
Expand All @@ -230,24 +220,42 @@ async function execute(
tx,
});

await client.patch({
resource: 'task',
passthrough: {
tx,
},
id: task.id,
body: {
ended_on__time: new Date(),
status:
result.status != null && taskStatuses.includes(result.status)
? result.status
: 'failed',
...(result.error != null && { error_message: result.error }),
},
});
// Update the task with the result of the execution.
await finalizeTask(
client,
tx,
task.id,
startedOnTime,
result.status,
result.error,
);
} catch (err: any) {
// This shouldn't normally happen, but if it does, we want to log it and kill the process.
console.error('Task execution failed:', err);
process.exit(1);
}
}

// Finalize a task
async function finalizeTask(
client: PinejsClient,
tx: Tx,
id: number,
startedOnTime: Date,
status: string,
errorMessage?: string,
): Promise<void> {
await client.patch({
resource: 'task',
passthrough: {
tx,
},
id,
body: {
started_on__time: startedOnTime,
ended_on__time: new Date(),
status,
...(errorMessage != null && { error_message: errorMessage }),
},
});
}
Loading

0 comments on commit 40033b4

Please sign in to comment.