Skip to content

Commit

Permalink
Add async tasks
Browse files Browse the repository at this point in the history
Change-type: minor
  • Loading branch information
joshbwlng committed Feb 6, 2024
1 parent 434f0a7 commit 25f731f
Show file tree
Hide file tree
Showing 34 changed files with 537 additions and 27 deletions.
7 changes: 6 additions & 1 deletion src/config-loader/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export const cache = {
apiKeyActorId: false as CacheOpts,
};

import { boolVar } from '@balena/env-parsing';
import { boolVar, intVar } from '@balena/env-parsing';
import memoize from 'memoizee';
import memoizeWeak = require('memoizee/weak');
export const createCache = <T extends (...args: any[]) => any>(
Expand Down Expand Up @@ -146,3 +146,8 @@ export const migrator = {
*/
asyncMigrationIsEnabled: boolVar('PINEJS_ASYNC_MIGRATION_ENABLED', true),
};

export const tasks = {
queueConcurrency: intVar('PINEJS_QUEUE_CONCURRENCY', 0),
queueIntervalMS: intVar('PINEJS_QUEUE_INTERVAL_MS', 1000),
};
4 changes: 4 additions & 0 deletions src/sbvr-api/sbvr-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import { generateODataMetadata } from '../odata-metadata/odata-metadata-generato

// eslint-disable-next-line @typescript-eslint/no-var-requires
const devModel = require('./dev.sbvr');
import * as tasks from './tasks';
import * as permissions from './permissions';
import {
BadRequestError,
Expand Down Expand Up @@ -77,6 +78,7 @@ export {
addPureHook,
addSideEffectHook,
} from './hooks';
export { addTaskHandler } from './tasks';

import memoizeWeak = require('memoizee/weak');
import * as controlFlow from './control-flow';
Expand Down Expand Up @@ -1953,6 +1955,7 @@ export const executeStandardModels = async (tx: Db.Tx): Promise<void> => {
},
});
await executeModels(tx, permissions.config.models);
await executeModels(tx, tasks.config.models);
console.info('Successfully executed standard models.');
} catch (err: any) {
console.error('Failed to execute standard models.', err);
Expand All @@ -1969,6 +1972,7 @@ export const setup = async (
await db.transaction(async (tx) => {
await executeStandardModels(tx);
await permissions.setup();
await tasks.setup();
});
} catch (err: any) {
console.error('Could not execute standard models', err);
Expand Down
47 changes: 47 additions & 0 deletions src/sbvr-api/tasks.sbvr
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
Vocabulary: tasks

Term: actor
Concept Type: Integer (Type)
Term: cron expression
Concept Type: Short Text (Type)
Term: error message
Concept Type: Short Text (Type)
Term: handler
Concept Type: Short Text (Type)
Term: key
Concept Type: Short Text (Type)
Term: parameter set
Concept Type: JSON (Type)
Term: priority
Concept Type: Integer (Type)
Term: status
Concept Type: Short Text (Type)
Term: time
Concept Type: Date Time (Type)

Term: task
Fact type: task has key
Necessity: each task has at most one key
Fact type: task is created by actor
Necessity: each task is created by exactly one actor
Fact type: task is executed by handler
Necessity: each task is executed by exactly one handler
Fact type: task is executed with parameter set
Necessity: each task is executed with at most one parameter set
Fact type: task has priority
Necessity: each task has exactly one priority
Necessity: each task has a priority that is greater than or equal to 0
Fact type: task is scheduled with cron expression
Necessity: each task is scheduled with at most one cron expression
Fact type: task is scheduled to execute on time
Necessity: each task is scheduled to execute on at most one time
Fact type: task has status
Necessity: each task has exactly one status
Definition: "pending" or "cancelled" or "success" or "failed"
Fact type: task started on time
Necessity: each task started on at most one time
Fact type: task ended on time
Necessity: each task ended on at most one time
Fact type: task has error message
Necessity: each task has at most one error message

220 changes: 220 additions & 0 deletions src/sbvr-api/tasks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import type { Tx } from '../database-layer/db';
import { BadRequestError } from './errors';
import type { HookReq } from './hooks';
import { PinejsClient } from './sbvr-utils';
import type { ExecutableModel, LoggingClient } from './sbvr-utils';
import { tasks as tasksEnv } from '../config-loader/env';
import type { AnyObject } from 'pinejs-client-core';
import { addPureHook } from './hooks';
import { sbvrUtils } from '../server-glue/module';

const apiRoot = 'tasks';

// eslint-disable-next-line @typescript-eslint/no-var-requires
const modelText: string = require(`./${apiRoot}.sbvr`);

const taskStatuses = ['pending', 'cancelled', 'success', 'failed'];
export interface Task {
id: number;
is_created_by__actor: number;
is_executed_by__handler: string;
is_executed_with__parameter_set: object | null;
is_scheduled_with__cron_expression: string | null;
is_scheduled_to_execute_on__time: Date | null;
priority: number;
status: (typeof taskStatuses)[number];
started_on__time: Date | null;
ended_on__time: Date | null;
error_message: string | null;
}

interface TaskArgs {
tx: Tx;
params?: AnyObject;
}

type TaskResponse = Promise<{
status: (typeof taskStatuses)[number];
error?: string;
}>;

export interface TaskHandler {
name: string;
fn: (options: TaskArgs) => TaskResponse;
}

const taskHandlers: {
[name: string]: TaskHandler;
} = {};

// Get and return actor from hook request object.
function getActor(req: HookReq): number {
const actor = req.user?.actor ?? req.apiKey?.actor;
if (actor == null) {
throw new BadRequestError(
'Scheduling task with missing actor on req is not allowed',
);
}
return actor;
}

// Create and return client for internal use.
function getClient(tx: Tx): PinejsClient {
return new PinejsClient({
apiPrefix: `/${apiRoot}/`,
passthrough: {
tx,
},
}) as LoggingClient;
}

// Validate a task.
function validate(values: AnyObject) {
// Assert that the provided start time is at least a minute in the future.
if (values.start_time == null) {
throw new BadRequestError('Must specify a start time for the task');
}
const now = new Date(new Date().getTime() + tasksEnv.queueIntervalMS);
const startTime = new Date(values.start_time);
if (startTime < now) {
throw new BadRequestError(
`Task start time must be greater than ${tasksEnv.queueIntervalMS} milliseconds in the future`,
);
}

// Assert that the requested handler exists.
if (values.is_executed_by__handler == null) {
throw new BadRequestError(`Must specify a task handler to execute`);
}
if (taskHandlers[values.is_executed_by__handler] == null) {
throw new BadRequestError(
`No task handler with name ${values.is_executed_by__handler} registered`,
);
}
}

export const config = {
models: [
{
apiRoot,
modelText,
customServerCode: exports,
migrations: {},
},
] as ExecutableModel[],
};

export const setup = async () => {
addPureHook('POST', apiRoot, 'task', {
POSTPARSE: async ({ req, request }) => {
// Set the actor.
request.values.is_created_by__actor = getActor(req);

// Set defaults.
request.values.status = 'pending';
request.values.priority ??= 1;

// Validate the task.
validate(request.values);
},
});
};

// Register a new task handler
export async function addTaskHandler(
name: string,
fn: TaskHandler['fn'],
): Promise<void> {
if (taskHandlers[name] != null) {
throw new Error(`Task handler with name "${name}" already registered`);
}
taskHandlers[name] = {
name,
fn,
};
await runQueue(name);
}

// Execute a task.
async function execute(task: Task, tx: Tx): Promise<void> {
try {
const client = getClient(tx);

await client.patch({
resource: 'task',
id: task.id,
body: {
started_on__time: new Date(),
},
});

// TODO: Pass client as `api` to task handler function with client-defined root (eg 'resin' or 'example', not 'tasks')
const result = await taskHandlers[task.is_executed_by__handler].fn({
tx,
params: task.is_executed_with__parameter_set ?? undefined,
});

await client.patch({
resource: 'task',
id: task.id,
body: {
ended_on__time: new Date(),
status:
result.status != null && taskStatuses.includes(result.status)
? result.status
: 'failed',
...(result.error != null && { error_message: result.error }),
},
});
} catch (err: any) {
// This shouldn't normally happen, but if it does, we want to log it and kill the process.
console.error('Task execution failed:', err);
process.exit(1);
}
}

// Create a queue to process tasks for a task handler
async function runQueue(name: string): Promise<void> {
if (tasksEnv.queueConcurrency < 1) {
return;
}

// TODO: Add NOTIFY/LISTEN support
let inFlight = 0;
setInterval(async () => {
if (inFlight >= tasksEnv.queueConcurrency) {
return;
}

await sbvrUtils.db.transaction(async (tx) => {
const result = await tx.executeSql(
`
SELECT
t."id",
t."is executed by-handler" AS is_executed_by__handler,
t."is executed with-parameter set" AS is_executed_with__parameter_set,
t."is scheduled with-cron expression" AS is_scheduled_with__cron_expression
FROM
task AS t
WHERE
t."is executed by-handler" = $1 AND
t."status" = 'pending' AND
(t."is scheduled to execute on-time" IS NULL OR t."is scheduled to execute on-time" <= NOW())
ORDER BY
t."is scheduled to execute on-time" ASC,
t."priority" DESC,
t."id" ASC
LIMIT 1
FOR UPDATE
SKIP LOCKED
`,
[name],
);
if (result.rows.length > 0) {
inFlight++;
await execute(result.rows[0] as Task, tx);
inFlight--;
}
});
}, tasksEnv.queueIntervalMS);
}
1 change: 1 addition & 0 deletions src/server-glue/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export * as errors from '../sbvr-api/errors';
export * as env from '../config-loader/env';
export * as types from '../sbvr-api/common-types';
export * as hooks from '../sbvr-api/hooks';
export * as tasks from '../sbvr-api/tasks';
export * as webResourceHandler from '../webresource-handler';
export type { configLoader as ConfigLoader };
export type { migratorUtils as Migrator };
Expand Down
2 changes: 1 addition & 1 deletion test/02-sync-migrator.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import supertest from 'supertest';
import { ChildProcess } from 'child_process';
import type { ChildProcess } from 'child_process';
import { expect } from 'chai';
import { testInit, testDeInit, testLocalServer } from './lib/test-init';

Expand Down
2 changes: 1 addition & 1 deletion test/03-async-migrator.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import supertest from 'supertest';
import { ChildProcess } from 'child_process';
import type { ChildProcess } from 'child_process';
import { assert, expect } from 'chai';
import { setTimeout } from 'timers';
import { dbModule } from '../src/server-glue/module';
Expand Down
Loading

0 comments on commit 25f731f

Please sign in to comment.