From 61e3df99ab407e5e4531674cdf00856040484c8e Mon Sep 17 00:00:00 2001 From: Josh Bowling Date: Fri, 7 Apr 2023 09:54:43 +0900 Subject: [PATCH] Add scheduled jobs Change-type: minor --- package.json | 2 + src/sbvr-api/jobs.sbvr | 37 ++++++ src/sbvr-api/jobs.ts | 254 +++++++++++++++++++++++++++++++++++++ src/sbvr-api/sbvr-utils.ts | 3 + src/server-glue/module.ts | 1 + 5 files changed, 297 insertions(+) create mode 100644 src/sbvr-api/jobs.sbvr create mode 100644 src/sbvr-api/jobs.ts diff --git a/package.json b/package.json index 197d6488a..102991867 100644 --- a/package.json +++ b/package.json @@ -56,6 +56,7 @@ "express-session": "^1.17.3", "lodash": "^4.17.21", "memoizee": "^0.4.15", + "node-schedule": "^2.1.1", "pinejs-client-core": "^6.12.3", "randomstring": "^1.2.3", "typed-error": "^3.2.1" @@ -66,6 +67,7 @@ "@types/chai-as-promised": "^7.1.5", "@types/grunt": "^0.4.27", "@types/mocha": "^10.0.1", + "@types/node-schedule": "^2.1.0", "@types/supertest": "^2.0.12", "@types/terser-webpack-plugin": "^5.2.0", "@types/webpack": "^5.28.0", diff --git a/src/sbvr-api/jobs.sbvr b/src/sbvr-api/jobs.sbvr new file mode 100644 index 000000000..0641a9fef --- /dev/null +++ b/src/sbvr-api/jobs.sbvr @@ -0,0 +1,37 @@ +Vocabulary: jobs + +Term: job key + Concept Type: Short Text (Type) +Term: handler + Concept Type: Short Text (Type) +Term: parameter set + Concept Type: JSON (Type) +Term: start time + Concept Type: Date Time (Type) +Term: retry limit + Concept Type: Integer (Type) +Term: error count + Concept Type: Integer (Type) +Term: last error + Concept Type: Short Text (Type) +Term: status + Concept Type: Short Text (Type) + +Term: scheduled job +Fact Type: scheduled job has job key + Necessity: each scheduled job has exactly one job key +Fact Type: scheduled job is executed by handler + Necessity: each scheduled job is executed by exactly one handler +Fact Type: scheduled job is executed with parameter set + Necessity: each scheduled job is executed with exactly one parameter set +Fact Type: scheduled job has start time + Necessity: each scheduled job has at most one start time +Fact Type: scheduled job has retry limit + Necessity: each scheduled job has exactly one retry limit +Fact Type: scheduled job has error count + Necessity: each scheduled job has at most one error count +Fact Type: scheduled job has last error + Necessity: each scheduled job has at most one last error +Fact Type: scheduled job has status + Necessity: each scheduled job has exactly one status + Definition: "pending" or "success" or "failed" diff --git a/src/sbvr-api/jobs.ts b/src/sbvr-api/jobs.ts new file mode 100644 index 000000000..ff6faa220 --- /dev/null +++ b/src/sbvr-api/jobs.ts @@ -0,0 +1,254 @@ +import * as nodeSchedule from 'node-schedule'; +import { AnyObject } from 'pinejs-client-core'; + +import * as errors from './errors'; +import { addPureHook } from './hooks'; +import * as sbvrUtils from './sbvr-utils'; +import type * as Db from '../database-layer/db'; +import { permissions } from '../server-glue/module'; + +const apiRoot = 'jobs'; +const DEFAULT_RETRY_LIMIT = 10; + +// tslint:disable-next-line:no-var-requires +const modelText: string = require(`./${apiRoot}.sbvr`); + +interface ScheduledJob { + id: number; + job_key: string; + is_executed_by__handler: string; + is_executed_with__parameter_set: any; + start_time: Date; + retry_limit: number; + error_count: number; + last_error?: string; + status: string; +} +interface JobHandler { + parameters: { + [key: string]: string; + }; + callback: (parameterSet: any) => Promise; +} + +// TODO: Need to scope job handlers to model? +const jobHandlers: { + [name: string]: JobHandler; +} = {}; + +/** + * Create and return a /jobs client for internal use. + * @returns A /jobs pine client + */ +function getClient(): sbvrUtils.PinejsClient { + return new sbvrUtils.PinejsClient(`/${apiRoot}/`) as sbvrUtils.LoggingClient; +} + +/** + * Validates a scheduled job. + * @param values - Request values to validate + */ +function validateScheduledJob(values: AnyObject) { + // Assert that the provided date is in the future. + if (values.start_time == null) { + throw new errors.BadRequestError( + 'Must specify a start time for the scheduled job', + ); + } + if (new Date(values.start_time).getTime() <= new Date().getTime()) { + throw new errors.BadRequestError( + 'Scheduled job start time must be in the future', + ); + } + + // Assert that the requested handler exists. + if (values.is_executed_by__handler == null) { + throw new errors.BadRequestError(`Must specify a job handler to execute`); + } + if (jobHandlers[values.is_executed_by__handler] == null) { + throw new errors.BadRequestError( + `No job handler with name ${values.is_executed_by__handler} registered`, + ); + } + + // Assert that the requested parameters match the handler. + const handler = jobHandlers[values.is_executed_by__handler]; + const parameterSet = values.is_executed_with__parameter_set; + if (handler.parameters != null && parameterSet == null) { + throw new errors.BadRequestError( + `Must specify parameters to execute job handler "${values.is_executed_by__handler}"`, + ); + } + if (parameterSet != null) { + for (const parameterName of Object.keys(parameterSet)) { + if (handler.parameters[parameterName] == null) { + throw new errors.BadRequestError( + `Job handler "${values.is_executed_by__handler}" does not accept parameter "${parameterName}"`, + ); + } + if ( + typeof parameterSet[parameterName] !== handler.parameters[parameterName] + ) { + throw new errors.BadRequestError( + `Job handler "${values.is_executed_by__handler}" parameter "${parameterName}" must be of type "${handler.parameters[parameterName]}"`, + ); + } + } + } +} + +/** + * Execute a scheduled job, retrying and updating as necessary. + * @param scheduledJob - Scheduled job to execute + */ +async function executeScheduledJob(scheduledJob: ScheduledJob): Promise { + const client = getClient(); + try { + await jobHandlers[scheduledJob.is_executed_by__handler].callback( + scheduledJob.is_executed_with__parameter_set, + ); + + // Execution was a success so update the scheduled job as such. + await client.patch({ + resource: 'scheduled_job', + id: scheduledJob.id, + body: { + status: 'success', + }, + }); + } catch (err: any) { + // Re-schedule if the retry limit has not been reached. + scheduledJob.error_count++; + if (scheduledJob.error_count < scheduledJob.retry_limit) { + await client.patch({ + resource: 'scheduled_job', + id: scheduledJob.id, + body: { + error_count: scheduledJob.error_count, + last_error: err.message, + // TODO: Improve backoff time logic. + start_time: new Date(Date.now() + 10000 * scheduledJob.error_count), + }, + }); + } else { + // Execution failed so update the scheduled job as such. + await client.patch({ + resource: 'scheduled_job', + id: scheduledJob.id, + body: { + status: 'failed', + error_count: scheduledJob.error_count, + last_error: err.message, + }, + }); + } + } +} + +export const config = { + models: [ + { + apiRoot, + modelText, + customServerCode: exports, + migrations: {}, + }, + ] as sbvrUtils.ExecutableModel[], +}; + +export const setup = async (tx: Db.Tx) => { + // Validate and schedule new scheduled jobs for future execution. + addPureHook('POST', apiRoot, 'scheduled_job', { + POSTPARSE: async ({ request }) => { + // Set defaults. + request.values.status = 'pending'; + request.values.retry_count = 0; + request.values.retry_limit ??= DEFAULT_RETRY_LIMIT; + request.values.error_count = 0; + + // Validate the scheduled job. + validateScheduledJob(request.values); + }, + POSTRUN: async ({ api, result }) => { + const scheduledJob = (await api.get({ + resource: 'scheduled_job', + id: result, + })) as ScheduledJob; + nodeSchedule.scheduleJob( + `${scheduledJob.id}`, + scheduledJob.start_time, + async () => { + await executeScheduledJob(scheduledJob); + }, + ); + }, + }); + + // Cancel scheduled jobs when they are deleted. + addPureHook('DELETE', apiRoot, 'scheduled_job', { + POSTRUN: async (args) => { + const affectedIds = await sbvrUtils.getAffectedIds(args); + for (const id of affectedIds) { + nodeSchedule.cancelJob(`${id}`); + } + }, + }); + + // Find and re-schedule/execute pending scheduled jobs on startup. + const client = getClient(); + const scheduledJobs = (await client.get({ + resource: 'scheduled_job', + passthrough: { + req: permissions.root, + tx, + }, + options: { + $filter: { + status: 'pending', + }, + }, + })) as ScheduledJob[]; + const now = new Date(); + for (const scheduledJob of scheduledJobs) { + if (scheduledJob.start_time.getTime() < now.getTime()) { + // Execute pending jobs that should have already been executed. + await executeScheduledJob(scheduledJob); + } else if (scheduledJob.start_time.getTime() > now.getTime()) { + // Re-schedule pending scheduled jobs that have not yet been executed. + nodeSchedule.scheduleJob( + scheduledJob.job_key, + scheduledJob.start_time, + async () => { + await executeScheduledJob(scheduledJob); + }, + ); + } + } +}; + +/** + * Register a new job handler. + * @param name - job handler unique name + * @param parameters - job handler parameters definition + * @param callback - job handler callback to execute + * + * @example + * addJobHandler('myJobHandler', { + * message: 'string', + * }, async ({ message }) => { + * console.log(message); + * }); + */ +export const addJobHandler = ( + name: string, + parameters: JobHandler['parameters'], + callback: JobHandler['callback'], +): void => { + if (jobHandlers[name] != null) { + throw new Error(`Job handler with name "${name}" already registered`); + } + jobHandlers[name] = { + parameters, + callback, + }; +}; diff --git a/src/sbvr-api/sbvr-utils.ts b/src/sbvr-api/sbvr-utils.ts index 7793b1c7d..91972b555 100644 --- a/src/sbvr-api/sbvr-utils.ts +++ b/src/sbvr-api/sbvr-utils.ts @@ -40,6 +40,7 @@ import { generateODataMetadata } from '../odata-metadata/odata-metadata-generato // tslint:disable-next-line:no-var-requires const devModel = require('./dev.sbvr'); +import * as jobs from './jobs'; import * as permissions from './permissions'; import { BadRequestError, @@ -1873,6 +1874,7 @@ export const executeStandardModels = async (tx: Db.Tx): Promise => { }, }); await executeModels(tx, permissions.config.models); + await executeModels(tx, jobs.config.models); console.info('Successfully executed standard models.'); } catch (err: any) { console.error('Failed to execute standard models.', err); @@ -1889,6 +1891,7 @@ export const setup = async ( await db.transaction(async (tx) => { await executeStandardModels(tx); await permissions.setup(); + await jobs.setup(tx); }); } catch (err: any) { console.error('Could not execute standard models', err); diff --git a/src/server-glue/module.ts b/src/server-glue/module.ts index 8125744f6..8364e1002 100644 --- a/src/server-glue/module.ts +++ b/src/server-glue/module.ts @@ -18,6 +18,7 @@ export * as errors from '../sbvr-api/errors'; export * as env from '../config-loader/env'; export * as types from '../sbvr-api/common-types'; export * as hooks from '../sbvr-api/hooks'; +export * as jobs from '../sbvr-api/jobs'; export type { configLoader as ConfigLoader }; export type { migratorUtils as Migrator };