Skip to content

Commit

Permalink
model work
Browse files Browse the repository at this point in the history
  • Loading branch information
joshbwlng committed Nov 10, 2023
1 parent b410338 commit 6b3bcec
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 52 deletions.
69 changes: 51 additions & 18 deletions src/sbvr-api/tasks.sbvr
Original file line number Diff line number Diff line change
@@ -1,41 +1,74 @@
Vocabulary: tasks

Term: key
Term: actor
Concept Type: Integer (Type)
Term: cron expression
Concept Type: Short Text (Type)
Term: handler
Term: end time
Concept Type: Date Time (Type)
Term: error
Concept Type: Short Text (Type)
Term: actor
Term: error count
Concept Type: Integer (Type)
Term: handler
Concept Type: Short Text (Type)
Term: key
Concept Type: Short Text (Type)
Term: last error
Concept Type: Short Text (Type)
Term: last run time
Concept Type: Date Time (Type)
Term: model name
Concept Type: Short Text (Type)
Term: parameter set
Concept Type: JSON (Type)
Term: start time
Concept Type: Date Time (Type)
Term: retry limit
Concept Type: Integer (Type)
Term: error count
Term: run count
Concept Type: Integer (Type)
Term: last error
Concept Type: Short Text (Type)
Term: start time
Concept Type: Date Time (Type)
Term: status
Concept Type: Short Text (Type)

Term: task
Fact Type: task has key
Necessity: each task has exactly one key
Fact Type: task is executed by actor
Necessity: each task is executed by exactly one actor
Fact Type: task has error count
Necessity: each task has exactly one error count
Fact Type: task is executed by handler
Necessity: each task is executed by exactly one handler
Fact Type: task is executed with parameter set
Necessity: each task is executed with exactly one parameter set
Fact Type: task has start time
Necessity: each task has at most one start time
Fact Type: task has retry limit
Necessity: each task has exactly one retry limit
Fact Type: task has error count
Necessity: each task has at most one error count
Fact Type: task is for model name
Necessity: each task is for exactly one model name
Fact Type: task is scheduled by actor
Necessity: each task is scheduled by exactly one actor
Fact Type: task is scheduled with cron expression
Necessity: each task is scheduled with at most one cron expression
Fact Type: task has key
Necessity: each task has exactly one key
Fact Type: task has last error
Necessity: each task has at most one last error
Fact Type: task has retry limit
Necessity: each task has exactly one retry limit
Fact Type: task has run count
Necessity: each task has exactly one run count
Fact Type: task has start time
Necessity: each task has exactly one start time
Fact Type: task has status
Necessity: each task has exactly one status
Definition: "pending" or "success" or "failed"

Term: task run
Fact Type: task run has error
Necessity: each task run has at most one error
Fact Type: task run is for task
Synonymous Form: task was executed by task run
Necessity: each task run is for exactly one task
Reference Type: informative
Fact Type: task run has start time
Necessity: each task run has exactly one start time
Fact Type: task run has end time
Necessity: each task run has at most one end time
Fact Type: task run has status
Necessity: each task run has exactly one status
Definition: "running" or "success" or "failed"
118 changes: 84 additions & 34 deletions src/sbvr-api/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,43 @@ const DEFAULT_RETRY_LIMIT = 10;
// eslint-disable-next-line @typescript-eslint/no-var-requires
const modelText: string = require(`./${apiRoot}.sbvr`);

// possible queue solution (wip)
// keep a list of next 10 upcoming jobs in memory
// - list is an in-memory array
// - list is ordered by order of execution (scheduled date)
// - list is updated every minute by rechecking tasks in db that are scheduled for execution within the next 10 minutes
// - execute within a tx
// - when executing a task, select task record with a lock
// - only execute one task at a time per instance (allow for concurrency option in the future?)
// - could always use `graphile-worker` instead of creating a custom solution (but that would only work for postgres)

interface Task {
id: number;
key: string;
error_count: number;
is_executed_by__handler: string;
is_executed_with__parameter_set: any;
is_for__model_name: string;
is_scheduled_by__actor: string;
is_scheduled_with__cron_expression?: string;
start_time: Date;
retry_limit: number;
error_count: number;
key: string;
last_error?: string;
retry_limit: number;
run_count: number;
status: string;
}

interface TaskRun {
id: number;
end_time: Date;
error?: string;
is_for__task: {
__id: number;
};
start_time: Date;
status: string;
}

interface TaskHandler {
parameters: {
[key: string]: string;
Expand Down Expand Up @@ -115,45 +141,67 @@ function validate(values: AnyObject) {
*/
async function execute(task: Task): Promise<void> {
const client = getClient();

// Create task run record.
const taskRun = (await client.post({
resource: 'task_run',
body: {
is_for__task: task.id,
start_time: new Date(),
status: 'running',
},
})) as TaskRun;

try {
await taskHandlers[task.is_executed_by__handler].callback(
task.is_executed_with__parameter_set,
);

// Execution was a success so update the task as such.
await client.patch({
resource: 'task',
id: task.id,
body: {
status: 'success',
},
});
} catch (err: any) {
// Re-schedule if the retry limit has not been reached.
task.error_count++;
if (task.error_count < task.retry_limit) {
await client.patch({
resource: 'task',
id: task.id,
// Execution was a success so update the task and task_run as such.
await Promise.all([
client.patch({
resource: 'task_run',
id: taskRun.id,
body: {
error_count: task.error_count,
last_error: err.message,
// TODO: Improve backoff time logic.
start_time: new Date(Date.now() + 10000 * task.error_count),
end_time: new Date(),
status: 'success',
},
});
} else {
// Execution failed so update the task as such.
await client.patch({
}),
client.patch({
resource: 'task',
id: task.id,
body: {
status: 'failed',
error_count: task.error_count,
last_error: err.message,
status: 'success',
},
});
}),
]);
} catch (err: any) {
await client.patch({
resource: 'task_run',
id: taskRun.id,
body: {
end_time: new Date(),
status: 'failed',
error: err.message,
},
});

// Re-schedule if the retry limit has not been reached.
const body: AnyObject = {
error_count: task.error_count + 1,
last_error: err.message,
};
if (body.error_count < task.retry_limit) {
// TODO: Improve backoff time logic.
body.start_time = new Date(Date.now() + 10000 * task.error_count);
} else {
body.status = 'failed';
}
await client.patch({
resource: 'task',
id: task.id,
body,
});
}
}

Expand All @@ -173,13 +221,14 @@ export const setup = async (tx: Db.Tx) => {
addPureHook('POST', apiRoot, 'task', {
POSTPARSE: async ({ req, request }) => {
// Set the actor.
request.values.is_executed_by__actor = getActor(req);
request.values.is_scheduled_by__actor = getActor(req);

// Set defaults.
request.values.status = 'pending';
request.values.retry_count = 0;
request.values.retry_limit ??= DEFAULT_RETRY_LIMIT;
request.values.error_count = 0;
request.values.run_count = 0;

// Validate the task.
validate(request.values);
Expand Down Expand Up @@ -219,12 +268,13 @@ export const setup = async (tx: Db.Tx) => {
},
},
})) as Task[];
const now = new Date();
const now = new Date().getTime();
for (const task of tasks) {
if (task.start_time.getTime() < now.getTime()) {
const startTime = task.start_time.getTime();
if (startTime < now) {
// Execute pending tasks that should have already been executed.
await execute(task);
} else if (task.start_time.getTime() > now.getTime()) {
} else if (startTime > now) {
// Re-schedule pending tasks that have not yet been executed.
nodeSchedule.scheduleJob(task.key, task.start_time, async () => {
await execute(task);
Expand Down
2 changes: 2 additions & 0 deletions test/07-tasks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ describe('07 task tests', function () {
before(async () => {
pineServer = await testInit({
configPath,
deleteDb: true,
taskHandlersPath,
});
pineTest = new PineTest({}, { app: testLocalServer });
Expand Down Expand Up @@ -59,6 +60,7 @@ describe('07 task tests', function () {
apiPrefix: 'tasks/',
resource: 'task',
body: {
is_for__model_name: 'example',
key: 'foobar-1',
is_executed_by__handler: 'foobar',
is_executed_with__parameter_set: {
Expand Down

0 comments on commit 6b3bcec

Please sign in to comment.