-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Change-type: minor
- Loading branch information
Showing
5 changed files
with
297 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
Vocabulary: jobs | ||
|
||
Term: job key | ||
Concept Type: Short Text (Type) | ||
Term: handler | ||
Concept Type: Short Text (Type) | ||
Term: parameter set | ||
Concept Type: JSON (Type) | ||
Term: start time | ||
Concept Type: Date Time (Type) | ||
Term: retry limit | ||
Concept Type: Integer (Type) | ||
Term: error count | ||
Concept Type: Integer (Type) | ||
Term: last error | ||
Concept Type: Short Text (Type) | ||
Term: status | ||
Concept Type: Short Text (Type) | ||
|
||
Term: scheduled job | ||
Fact Type: scheduled job has job key | ||
Necessity: each scheduled job has exactly one job key | ||
Fact Type: scheduled job is executed by handler | ||
Necessity: each scheduled job is executed by exactly one handler | ||
Fact Type: scheduled job is executed with parameter set | ||
Necessity: each scheduled job is executed with exactly one parameter set | ||
Fact Type: scheduled job has start time | ||
Necessity: each scheduled job has at most one start time | ||
Fact Type: scheduled job has retry limit | ||
Necessity: each scheduled job has exactly one retry limit | ||
Fact Type: scheduled job has error count | ||
Necessity: each scheduled job has at most one error count | ||
Fact Type: scheduled job has last error | ||
Necessity: each scheduled job has at most one last error | ||
Fact Type: scheduled job has status | ||
Necessity: each scheduled job has exactly one status | ||
Definition: "pending" or "success" or "failed" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,254 @@ | ||
import * as nodeSchedule from 'node-schedule'; | ||
import { AnyObject } from 'pinejs-client-core'; | ||
|
||
import * as errors from './errors'; | ||
import { addPureHook } from './hooks'; | ||
import * as sbvrUtils from './sbvr-utils'; | ||
import type * as Db from '../database-layer/db'; | ||
import { permissions } from '../server-glue/module'; | ||
|
||
const apiRoot = 'jobs'; | ||
const DEFAULT_RETRY_LIMIT = 10; | ||
|
||
// tslint:disable-next-line:no-var-requires | ||
const modelText: string = require(`./${apiRoot}.sbvr`); | ||
|
||
interface ScheduledJob { | ||
id: number; | ||
job_key: string; | ||
is_executed_by__handler: string; | ||
is_executed_with__parameter_set: any; | ||
start_time: Date; | ||
retry_limit: number; | ||
error_count: number; | ||
last_error?: string; | ||
status: string; | ||
} | ||
interface JobHandler { | ||
parameters: { | ||
[key: string]: string; | ||
}; | ||
callback: (parameterSet: any) => Promise<void>; | ||
} | ||
|
||
// TODO: Need to scope job handlers to model? | ||
const jobHandlers: { | ||
[name: string]: JobHandler; | ||
} = {}; | ||
|
||
/** | ||
* Create and return a /jobs client for internal use. | ||
* @returns A /jobs pine client | ||
*/ | ||
function getClient(): sbvrUtils.PinejsClient { | ||
return new sbvrUtils.PinejsClient(`/${apiRoot}/`) as sbvrUtils.LoggingClient; | ||
} | ||
|
||
/** | ||
* Validates a scheduled job. | ||
* @param values - Request values to validate | ||
*/ | ||
function validateScheduledJob(values: AnyObject) { | ||
// Assert that the provided date is in the future. | ||
if (values.start_time == null) { | ||
throw new errors.BadRequestError( | ||
'Must specify a start time for the scheduled job', | ||
); | ||
} | ||
if (new Date(values.start_time).getTime() <= new Date().getTime()) { | ||
throw new errors.BadRequestError( | ||
'Scheduled job start time must be in the future', | ||
); | ||
} | ||
|
||
// Assert that the requested handler exists. | ||
if (values.is_executed_by__handler == null) { | ||
throw new errors.BadRequestError(`Must specify a job handler to execute`); | ||
} | ||
if (jobHandlers[values.is_executed_by__handler] == null) { | ||
throw new errors.BadRequestError( | ||
`No job handler with name ${values.is_executed_by__handler} registered`, | ||
); | ||
} | ||
|
||
// Assert that the requested parameters match the handler. | ||
const handler = jobHandlers[values.is_executed_by__handler]; | ||
const parameterSet = values.is_executed_with__parameter_set; | ||
if (handler.parameters != null && parameterSet == null) { | ||
throw new errors.BadRequestError( | ||
`Must specify parameters to execute job handler "${values.is_executed_by__handler}"`, | ||
); | ||
} | ||
if (parameterSet != null) { | ||
for (const parameterName of Object.keys(parameterSet)) { | ||
if (handler.parameters[parameterName] == null) { | ||
throw new errors.BadRequestError( | ||
`Job handler "${values.is_executed_by__handler}" does not accept parameter "${parameterName}"`, | ||
); | ||
} | ||
if ( | ||
typeof parameterSet[parameterName] !== handler.parameters[parameterName] | ||
) { | ||
throw new errors.BadRequestError( | ||
`Job handler "${values.is_executed_by__handler}" parameter "${parameterName}" must be of type "${handler.parameters[parameterName]}"`, | ||
); | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Execute a scheduled job, retrying and updating as necessary. | ||
* @param scheduledJob - Scheduled job to execute | ||
*/ | ||
async function executeScheduledJob(scheduledJob: ScheduledJob): Promise<void> { | ||
const client = getClient(); | ||
try { | ||
await jobHandlers[scheduledJob.is_executed_by__handler].callback( | ||
scheduledJob.is_executed_with__parameter_set, | ||
); | ||
|
||
// Execution was a success so update the scheduled job as such. | ||
await client.patch({ | ||
resource: 'scheduled_job', | ||
id: scheduledJob.id, | ||
body: { | ||
status: 'success', | ||
}, | ||
}); | ||
} catch (err: any) { | ||
// Re-schedule if the retry limit has not been reached. | ||
scheduledJob.error_count++; | ||
if (scheduledJob.error_count < scheduledJob.retry_limit) { | ||
await client.patch({ | ||
resource: 'scheduled_job', | ||
id: scheduledJob.id, | ||
body: { | ||
error_count: scheduledJob.error_count, | ||
last_error: err.message, | ||
// TODO: Improve backoff time logic. | ||
start_time: new Date(Date.now() + 10000 * scheduledJob.error_count), | ||
}, | ||
}); | ||
} else { | ||
// Execution failed so update the scheduled job as such. | ||
await client.patch({ | ||
resource: 'scheduled_job', | ||
id: scheduledJob.id, | ||
body: { | ||
status: 'failed', | ||
error_count: scheduledJob.error_count, | ||
last_error: err.message, | ||
}, | ||
}); | ||
} | ||
} | ||
} | ||
|
||
export const config = { | ||
models: [ | ||
{ | ||
apiRoot, | ||
modelText, | ||
customServerCode: exports, | ||
migrations: {}, | ||
}, | ||
] as sbvrUtils.ExecutableModel[], | ||
}; | ||
|
||
export const setup = async (tx: Db.Tx) => { | ||
// Validate and schedule new scheduled jobs for future execution. | ||
addPureHook('POST', apiRoot, 'scheduled_job', { | ||
POSTPARSE: async ({ request }) => { | ||
// Set defaults. | ||
request.values.status = 'pending'; | ||
request.values.retry_count = 0; | ||
request.values.retry_limit ??= DEFAULT_RETRY_LIMIT; | ||
request.values.error_count = 0; | ||
|
||
// Validate the scheduled job. | ||
validateScheduledJob(request.values); | ||
}, | ||
POSTRUN: async ({ api, result }) => { | ||
const scheduledJob = (await api.get({ | ||
resource: 'scheduled_job', | ||
id: result, | ||
})) as ScheduledJob; | ||
nodeSchedule.scheduleJob( | ||
`${scheduledJob.id}`, | ||
scheduledJob.start_time, | ||
async () => { | ||
await executeScheduledJob(scheduledJob); | ||
}, | ||
); | ||
}, | ||
}); | ||
|
||
// Cancel scheduled jobs when they are deleted. | ||
addPureHook('DELETE', apiRoot, 'scheduled_job', { | ||
POSTRUN: async (args) => { | ||
const affectedIds = await sbvrUtils.getAffectedIds(args); | ||
for (const id of affectedIds) { | ||
nodeSchedule.cancelJob(`${id}`); | ||
} | ||
}, | ||
}); | ||
|
||
// Find and re-schedule/execute pending scheduled jobs on startup. | ||
const client = getClient(); | ||
const scheduledJobs = (await client.get({ | ||
resource: 'scheduled_job', | ||
passthrough: { | ||
req: permissions.root, | ||
tx, | ||
}, | ||
options: { | ||
$filter: { | ||
status: 'pending', | ||
}, | ||
}, | ||
})) as ScheduledJob[]; | ||
const now = new Date(); | ||
for (const scheduledJob of scheduledJobs) { | ||
if (scheduledJob.start_time.getTime() < now.getTime()) { | ||
// Execute pending jobs that should have already been executed. | ||
await executeScheduledJob(scheduledJob); | ||
} else if (scheduledJob.start_time.getTime() > now.getTime()) { | ||
// Re-schedule pending scheduled jobs that have not yet been executed. | ||
nodeSchedule.scheduleJob( | ||
scheduledJob.job_key, | ||
scheduledJob.start_time, | ||
async () => { | ||
await executeScheduledJob(scheduledJob); | ||
}, | ||
); | ||
} | ||
} | ||
}; | ||
|
||
/** | ||
* Register a new job handler. | ||
* @param name - job handler unique name | ||
* @param parameters - job handler parameters definition | ||
* @param callback - job handler callback to execute | ||
* | ||
* @example | ||
* addJobHandler('myJobHandler', { | ||
* message: 'string', | ||
* }, async ({ message }) => { | ||
* console.log(message); | ||
* }); | ||
*/ | ||
export const addJobHandler = ( | ||
name: string, | ||
parameters: JobHandler['parameters'], | ||
callback: JobHandler['callback'], | ||
): void => { | ||
if (jobHandlers[name] != null) { | ||
throw new Error(`Job handler with name "${name}" already registered`); | ||
} | ||
jobHandlers[name] = { | ||
parameters, | ||
callback, | ||
}; | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters