Skip to content

Commit

Permalink
Add scheduled jobs
Browse files Browse the repository at this point in the history
Change-type: minor
  • Loading branch information
joshbwlng committed Apr 18, 2023
1 parent 5102a07 commit 153866d
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 0 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"express-session": "^1.17.3",
"lodash": "^4.17.21",
"memoizee": "^0.4.15",
"node-schedule": "^2.1.1",
"pinejs-client-core": "^6.12.3",
"randomstring": "^1.2.3",
"typed-error": "^3.2.1"
Expand All @@ -66,6 +67,7 @@
"@types/chai-as-promised": "^7.1.5",
"@types/grunt": "^0.4.27",
"@types/mocha": "^10.0.1",
"@types/node-schedule": "^2.1.0",
"@types/supertest": "^2.0.12",
"@types/terser-webpack-plugin": "^5.2.0",
"@types/webpack": "^5.28.0",
Expand Down
37 changes: 37 additions & 0 deletions src/sbvr-api/jobs.sbvr
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Vocabulary: jobs

Term: job key
Concept Type: Short Text (Type)
Term: handler
Concept Type: Short Text (Type)
Term: parameter set
Concept Type: JSON (Type)
Term: start time
Concept Type: Date Time (Type)
Term: retry limit
Concept Type: Integer (Type)
Term: error count
Concept Type: Integer (Type)
Term: last error
Concept Type: Short Text (Type)
Term: status
Concept Type: Short Text (Type)

Term: scheduled job
Fact Type: scheduled job has job key
Necessity: each scheduled job has exactly one job key
Fact Type: scheduled job is executed by handler
Necessity: each scheduled job is executed by exactly one handler
Fact Type: scheduled job is executed with parameter set
Necessity: each scheduled job is executed with exactly one parameter set
Fact Type: scheduled job has start time
Necessity: each scheduled job has at most one start time
Fact Type: scheduled job has retry limit
Necessity: each scheduled job has exactly one retry limit
Fact Type: scheduled job has error count
Necessity: each scheduled job has at most one error count
Fact Type: scheduled job has last error
Necessity: each scheduled job has at most one last error
Fact Type: scheduled job has status
Necessity: each scheduled job has exactly one status
Definition: "pending" or "success" or "failed"
254 changes: 254 additions & 0 deletions src/sbvr-api/jobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
import * as nodeSchedule from 'node-schedule';
import { AnyObject } from 'pinejs-client-core';

import * as errors from './errors';
import { addPureHook } from './hooks';
import * as sbvrUtils from './sbvr-utils';
import type * as Db from '../database-layer/db';
import { permissions } from '../server-glue/module';

const apiRoot = 'jobs';
const DEFAULT_RETRY_LIMIT = 10;

// tslint:disable-next-line:no-var-requires
const modelText: string = require(`./${apiRoot}.sbvr`);

interface ScheduledJob {
id: number;
job_key: string;
is_executed_by__handler: string;
is_executed_with__parameter_set: any;
start_time: Date;
retry_limit: number;
error_count: number;
last_error?: string;
status: string;
}
interface JobHandler {
parameters: {
[key: string]: string;
};
callback: (parameterSet: any) => Promise<void>;
}

// TODO: Need to scope job handlers to model?
const jobHandlers: {
[name: string]: JobHandler;
} = {};

/**
* Create and return a /jobs client for internal use.
* @returns A /jobs pine client
*/
function getClient(): sbvrUtils.PinejsClient {
return new sbvrUtils.PinejsClient(`/${apiRoot}/`) as sbvrUtils.LoggingClient;
}

/**
* Validates a scheduled job.
* @param values - Request values to validate
*/
function validateScheduledJob(values: AnyObject) {
// Assert that the provided date is in the future.
if (values.start_time == null) {
throw new errors.BadRequestError(
'Must specify a start time for the scheduled job',
);
}
if (new Date(values.start_time).getTime() <= new Date().getTime()) {
throw new errors.BadRequestError(
'Scheduled job start time must be in the future',
);
}

// Assert that the requested handler exists.
if (values.is_executed_by__handler == null) {
throw new errors.BadRequestError(`Must specify a job handler to execute`);
}
if (jobHandlers[values.is_executed_by__handler] == null) {
throw new errors.BadRequestError(
`No job handler with name ${values.is_executed_by__handler} registered`,
);
}

// Assert that the requested parameters match the handler.
const handler = jobHandlers[values.is_executed_by__handler];
const parameterSet = values.is_executed_with__parameter_set;
if (handler.parameters != null && parameterSet == null) {
throw new errors.BadRequestError(
`Must specify parameters to execute job handler "${values.is_executed_by__handler}"`,
);
}
if (parameterSet != null) {
for (const parameterName of Object.keys(parameterSet)) {
if (handler.parameters[parameterName] == null) {
throw new errors.BadRequestError(
`Job handler "${values.is_executed_by__handler}" does not accept parameter "${parameterName}"`,
);
}
if (
typeof parameterSet[parameterName] !== handler.parameters[parameterName]
) {
throw new errors.BadRequestError(
`Job handler "${values.is_executed_by__handler}" parameter "${parameterName}" must be of type "${handler.parameters[parameterName]}"`,
);
}
}
}
}

/**
* Execute a scheduled job, retrying and updating as necessary.
* @param scheduledJob - Scheduled job to execute
*/
async function executeScheduledJob(scheduledJob: ScheduledJob): Promise<void> {
const client = getClient();
try {
await jobHandlers[scheduledJob.is_executed_by__handler].callback(
scheduledJob.is_executed_with__parameter_set,
);

// Execution was a success so update the scheduled job as such.
await client.patch({
resource: 'scheduled_job',
id: scheduledJob.id,
body: {
status: 'success',
},
});
} catch (err: any) {
// Re-schedule if the retry limit has not been reached.
scheduledJob.error_count++;
if (scheduledJob.error_count < scheduledJob.retry_limit) {
await client.patch({
resource: 'scheduled_job',
id: scheduledJob.id,
body: {
error_count: scheduledJob.error_count,
last_error: err.message,
// TODO: Improve backoff time logic.
start_time: new Date(Date.now() + 10000 * scheduledJob.error_count),
},
});
} else {
// Execution failed so update the scheduled job as such.
await client.patch({
resource: 'scheduled_job',
id: scheduledJob.id,
body: {
status: 'failed',
error_count: scheduledJob.error_count,
last_error: err.message,
},
});
}
}
}

export const config = {
models: [
{
apiRoot,
modelText,
customServerCode: exports,
migrations: {},
},
] as sbvrUtils.ExecutableModel[],
};

export const setup = async (tx: Db.Tx) => {
// Validate and schedule new scheduled jobs for future execution.
addPureHook('POST', apiRoot, 'scheduled_job', {
POSTPARSE: async ({ request }) => {
// Set defaults.
request.values.status = 'pending';
request.values.retry_count = 0;
request.values.retry_limit ??= DEFAULT_RETRY_LIMIT;
request.values.error_count = 0;

// Validate the scheduled job.
validateScheduledJob(request.values);
},
POSTRUN: async ({ api, result }) => {
const scheduledJob = (await api.get({
resource: 'scheduled_job',
id: result,
})) as ScheduledJob;
nodeSchedule.scheduleJob(
`${scheduledJob.id}`,
scheduledJob.start_time,
async () => {
await executeScheduledJob(scheduledJob);
},
);
},
});

// Cancel scheduled jobs when they are deleted.
addPureHook('DELETE', apiRoot, 'scheduled_job', {
POSTRUN: async (args) => {
const affectedIds = await sbvrUtils.getAffectedIds(args);
for (const id of affectedIds) {
nodeSchedule.cancelJob(`${id}`);
}
},
});

// Find and re-schedule/execute pending scheduled jobs on startup.
const client = getClient();
const scheduledJobs = (await client.get({
resource: 'scheduled_job',
passthrough: {
req: permissions.root,
tx,
},
options: {
$filter: {
status: 'pending',
},
},
})) as ScheduledJob[];
const now = new Date();
for (const scheduledJob of scheduledJobs) {
if (scheduledJob.start_time.getTime() < now.getTime()) {
// Execute pending jobs that should have already been executed.
await executeScheduledJob(scheduledJob);
} else if (scheduledJob.start_time.getTime() > now.getTime()) {
// Re-schedule pending scheduled jobs that have not yet been executed.
nodeSchedule.scheduleJob(
scheduledJob.job_key,
scheduledJob.start_time,
async () => {
await executeScheduledJob(scheduledJob);
},
);
}
}
};

/**
* Register a new job handler.
* @param name - job handler unique name
* @param parameters - job handler parameters definition
* @param callback - job handler callback to execute
*
* @example
* addJobHandler('myJobHandler', {
* message: 'string',
* }, async ({ message }) => {
* console.log(message);
* });
*/
export const addJobHandler = (
name: string,
parameters: JobHandler['parameters'],
callback: JobHandler['callback'],
): void => {
if (jobHandlers[name] != null) {
throw new Error(`Job handler with name "${name}" already registered`);
}
jobHandlers[name] = {
parameters,
callback,
};
};
3 changes: 3 additions & 0 deletions src/sbvr-api/sbvr-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import { generateODataMetadata } from '../odata-metadata/odata-metadata-generato

// tslint:disable-next-line:no-var-requires
const devModel = require('./dev.sbvr');
import * as jobs from './jobs';
import * as permissions from './permissions';
import {
BadRequestError,
Expand Down Expand Up @@ -1873,6 +1874,7 @@ export const executeStandardModels = async (tx: Db.Tx): Promise<void> => {
},
});
await executeModels(tx, permissions.config.models);
await executeModels(tx, jobs.config.models);
console.info('Successfully executed standard models.');
} catch (err: any) {
console.error('Failed to execute standard models.', err);
Expand All @@ -1889,6 +1891,7 @@ export const setup = async (
await db.transaction(async (tx) => {
await executeStandardModels(tx);
await permissions.setup();
await jobs.setup(tx);
});
} catch (err: any) {
console.error('Could not execute standard models', err);
Expand Down
1 change: 1 addition & 0 deletions src/server-glue/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export * as errors from '../sbvr-api/errors';
export * as env from '../config-loader/env';
export * as types from '../sbvr-api/common-types';
export * as hooks from '../sbvr-api/hooks';
export * as jobs from '../sbvr-api/jobs';
export type { configLoader as ConfigLoader };
export type { migratorUtils as Migrator };

Expand Down

0 comments on commit 153866d

Please sign in to comment.