Skip to content

Commit

Permalink
Create and use Worker class
Browse files Browse the repository at this point in the history
Change-type: patch
  • Loading branch information
joshbwlng committed Feb 7, 2024
1 parent d1cf54b commit 806b6d7
Show file tree
Hide file tree
Showing 25 changed files with 320 additions and 257 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"@types/websql": "^0.0.30",
"busboy": "^1.6.0",
"commander": "^11.1.0",
"cron-parser": "^4.9.0",
"deep-freeze": "^0.0.1",
"eventemitter3": "^5.0.1",
"express-session": "^1.17.3",
Expand Down
220 changes: 0 additions & 220 deletions src/sbvr-api/tasks.ts

This file was deleted.

141 changes: 141 additions & 0 deletions src/sbvr-api/tasks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { tasks as tasksEnv } from '../../config-loader/env';
import type { Tx } from '../../database-layer/db';
import { BadRequestError } from '../errors';
import { addPureHook } from '../hooks';
import type { ExecutableModel } from '../sbvr-utils';
import type { PinejsClient } from '../sbvr-utils';
import { Worker } from './worker';

import * as cronParser from 'cron-parser';
import type { AnyObject } from 'pinejs-client-core';

export const apiRoot = 'tasks';

// eslint-disable-next-line @typescript-eslint/no-var-requires
const modelText: string = require(`./${apiRoot}.sbvr`);

let worker: Worker | null = null;
const taskHandlers: {
[name: string]: TaskHandler;
} = {};

export const taskStatuses = ['pending', 'cancelled', 'success', 'failed'];
export interface Task {
id: number;
is_created_by__actor: number;
is_executed_by__handler: string;
is_executed_with__parameter_set: object | null;
is_executed_with__api_prefix: string;
is_scheduled_with__cron_expression: string | null;
is_scheduled_to_execute_on__time: Date | null;
priority: number;
status: (typeof taskStatuses)[number];
started_on__time: Date | null;
ended_on__time: Date | null;
error_message: string | null;
}

interface TaskArgs {
api: PinejsClient;
params: AnyObject;
tx: Tx;
}

type TaskResponse = Promise<{
status: (typeof taskStatuses)[number];
error?: string;
}>;

export interface TaskHandler {
name: string;
fn: (options: TaskArgs) => TaskResponse;
}

export const config = {
models: [
{
apiRoot,
modelText,
customServerCode: exports,
migrations: {},
},
] as ExecutableModel[],
};

export const setup = async () => {
addPureHook('POST', apiRoot, 'task', {
POSTPARSE: async ({ req, request }) => {
// Set the actor.
request.values.is_created_by__actor =
req.user?.actor ?? req.apiKey?.actor;
if (request.values.is_created_by__actor == null) {
throw new BadRequestError(
'Creating tasks with missing actor on req is not allowed',
);
}

// Set defaults.
request.values.status = 'pending';
request.values.priority ??= 1;

// Set scheduled start time using cron expression if provided.
if (request.values.is_scheduled_with__cron_expression != null) {
try {
const interval = cronParser.parseExpression(
request.values.is_scheduled_with__cron_expression,
);
request.values.is_scheduled_to_execute_on__time = interval
.next()
.toDate();
} catch (_) {
throw new BadRequestError(
`Invalid cron expression: ${request.values.is_scheduled_with__cron_expression}`,
);
}
}

// Assert that the provided start time is far enough in the future.
if (request.values.is_scheduled_to_execute_on__time != null) {
const now = new Date(new Date().getTime() + tasksEnv.queueIntervalMS);
const startTime = new Date(
request.values.is_scheduled_to_execute_on__time,
);
if (startTime < now) {
throw new BadRequestError(
`Task scheduled start time must be greater than ${tasksEnv.queueIntervalMS} milliseconds in the future`,
);
}
}

// Assert that the requested handler exists.
if (request.values.is_executed_by__handler == null) {
throw new BadRequestError(`Must specify a task handler to execute`);
}
if (taskHandlers[request.values.is_executed_by__handler] == null) {
throw new BadRequestError(
`No task handler with name ${request.values.is_executed_by__handler} registered`,
);
}
},
});

// Start the worker if possible
if (tasksEnv.queueConcurrency > 0 && tasksEnv.queueIntervalMS >= 1000) {
worker = new Worker(tasksEnv.queueConcurrency, tasksEnv.queueIntervalMS);
worker.start();
}
};

// Register a new task handler
export function addTaskHandler(name: string, fn: TaskHandler['fn']): void {
if (taskHandlers[name] != null) {
throw new Error(`Task handler with name "${name}" already registered`);
}
taskHandlers[name] = {
name,
fn,
};
if (worker != null) {
worker.registerTaskHandler(taskHandlers[name]);
}
}
4 changes: 4 additions & 0 deletions src/sbvr-api/tasks.sbvr → src/sbvr-api/tasks/tasks.sbvr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ Vocabulary: tasks

Term: actor
Concept Type: Integer (Type)
Term: api prefix
Concept Type: Short Text (Type)
Term: cron expression
Concept Type: Short Text (Type)
Term: error message
Expand All @@ -28,6 +30,8 @@ Fact type: task is executed by handler
Necessity: each task is executed by exactly one handler
Fact type: task is executed with parameter set
Necessity: each task is executed with at most one parameter set
Fact type: task is executed with api prefix
Necessity: each task is executed with exactly one api prefix
Fact type: task has priority
Necessity: each task has exactly one priority
Necessity: each task has a priority that is greater than or equal to 0
Expand Down
Loading

0 comments on commit 806b6d7

Please sign in to comment.