diff --git a/src/sbvr-api/tasks.ts b/src/sbvr-api/tasks.ts index 8989816c6..839931e91 100644 --- a/src/sbvr-api/tasks.ts +++ b/src/sbvr-api/tasks.ts @@ -21,6 +21,8 @@ const handlers: { export const taskStatuses = ['pending', 'cancelled', 'success', 'failed']; export interface Task { id: number; + created_at: Date; + modified_at: Date; is_created_by__actor: number; is_executed_by__handler: string; is_executed_with__parameter_set: object | null; @@ -124,7 +126,7 @@ export const setup = async () => { } }; -// Register a new task handler +// Register a task handler export function addTaskHandler(name: string, fn: TaskHandler['fn']): void { handlers[name] = { name, @@ -166,7 +168,10 @@ function watch(): void { WHERE t."is executed by-handler" IN (${binds}) AND t."status" = 'pending' AND - (t."is scheduled to execute on-time" IS NULL OR t."is scheduled to execute on-time" <= NOW()) + ( + t."is scheduled to execute on-time" IS NULL OR + t."is scheduled to execute on-time" <= CURRENT_TIMESTAMP + INTERVAL '${Math.ceil(tasksEnv.queueIntervalMS / 1000)} second' + ) ORDER BY t."is scheduled to execute on-time" ASC, t."priority" DESC, @@ -194,32 +199,17 @@ async function execute( ): Promise { try { const handler = handlers[task.is_executed_by__handler]; + const startedOnTime = new Date(); if (handler == null) { - await client.patch({ - resource: 'task', - passthrough: { - tx, - }, - id: task.id, - body: { - started_on__time: new Date(), - ended_on__time: new Date(), - status: 'failed', - error_message: 'Matching task handler not found', - }, - }); + await finalizeTask( + client, + tx, + task.id, + startedOnTime, + 'failed', + 'Matching task handler not found', + ); return; - } else { - await client.patch({ - resource: 'task', - passthrough: { - tx, - }, - id: task.id, - body: { - started_on__time: new Date(), - }, - }); } const result = await handler.fn({ @@ -230,24 +220,42 @@ async function execute( tx, }); - await client.patch({ - resource: 'task', - passthrough: { - tx, - }, - id: task.id, - body: { - ended_on__time: new Date(), - status: - result.status != null && taskStatuses.includes(result.status) - ? result.status - : 'failed', - ...(result.error != null && { error_message: result.error }), - }, - }); + // Update the task with the result of the execution. + await finalizeTask( + client, + tx, + task.id, + startedOnTime, + result.status, + result.error, + ); } catch (err: any) { // This shouldn't normally happen, but if it does, we want to log it and kill the process. console.error('Task execution failed:', err); process.exit(1); } } + +// Finalize a task +async function finalizeTask( + client: PinejsClient, + tx: Tx, + id: number, + startedOnTime: Date, + status: string, + errorMessage?: string, +): Promise { + await client.patch({ + resource: 'task', + passthrough: { + tx, + }, + id, + body: { + started_on__time: startedOnTime, + ended_on__time: new Date(), + status, + ...(errorMessage != null && { error_message: errorMessage }), + }, + }); +} diff --git a/test/08-tasks.test.ts b/test/08-tasks.test.ts index 26c1c723a..cfec8d575 100644 --- a/test/08-tasks.test.ts +++ b/test/08-tasks.test.ts @@ -1,13 +1,18 @@ import { expect } from 'chai'; +import { strict } from 'node:assert'; import { randomUUID } from 'node:crypto'; import { setTimeout } from 'node:timers/promises'; import { PineTest } from 'pinejs-client-supertest'; import { testInit, testDeInit, testLocalServer } from './lib/test-init'; import { tasks as tasksEnv } from '../src/config-loader/env'; +import type { Task } from '../src/sbvr-api/tasks'; +import * as cronParser from 'cron-parser'; const configPath = __dirname + '/fixtures/08-tasks/config.js'; const taskHandlersPath = __dirname + '/fixtures/08-tasks/task-handlers.js'; +const actorId = 1; + // Wait for a condition to be true, or throw an error if it doesn't happen in time. async function waitFor(checkFn: () => Promise): Promise { const maxCount = 10; @@ -21,6 +26,45 @@ async function waitFor(checkFn: () => Promise): Promise { throw new Error('waitFor timed out'); } +// Create a task and return it +async function createTask( + pineTest: PineTest, + apikey: string, + task: Partial, +): Promise { + const { body: createdTask } = await pineTest + .post({ + resource: 'task', + body: { + key: randomUUID(), + is_executed_with__api_prefix: '/example/', + apikey, + ...task, + }, + }) + .expect(201); + return createdTask as Task; +} + +// Get a task, assert it has expected properties, and return it +async function expectTask( + pineTest: PineTest, + id: number, + expected: Partial, +): Promise { + const { body: updatedTask } = await pineTest + .get({ + resource: 'task', + id, + }) + .expect(200); + expect(updatedTask.is_created_by__actor).to.equal(actorId); + Object.entries(expected).forEach(([key, value]) => { + expect(updatedTask).to.have.property(key, value); + }); + return updatedTask; +} + describe('08 task tests', function () { let pineServer: Awaited>; let pineTest: PineTest; @@ -48,7 +92,7 @@ describe('08 task tests', function () { resource: 'api_key', body: { key: apikey, - is_of__actor: 1, + is_of__actor: actorId, permissions: [], }, }) @@ -60,26 +104,138 @@ describe('08 task tests', function () { }); describe('tasks', () => { - it('should execute with specified handler and parameters', async () => { - // Create a task to create a new device record in 500ms. - const name = randomUUID(); - const { body: task } = await pineTest - .post({ - resource: 'task', - body: { - key: randomUUID(), - is_executed_by__handler: 'create_device', - is_executed_with__parameter_set: { - name, - type: randomUUID(), + it('should execute tasks FIFO by default', async () => { + // This task should be executed first + const name1 = randomUUID(); + const task1 = await createTask(pineTest, apikey, { + is_executed_by__handler: 'create_device', + is_executed_with__parameter_set: { + name: name1, + type: randomUUID(), + }, + }); + + // This task should be executed second + const name2 = randomUUID(); + const task2 = await createTask(pineTest, apikey, { + is_executed_by__handler: 'create_device', + is_executed_with__parameter_set: { + name: name2, + type: randomUUID(), + }, + }); + + // Assert the device records were created in the expected order + await waitFor(async () => { + const { body } = await pineTest + .get({ + apiPrefix: 'example/', + resource: 'device', + options: { + $select: ['created_at', 'name'], + $filter: { + name: { + $in: [name1, name2], + }, + }, }, - is_executed_with__api_prefix: '/example/', - apikey, + }) + .expect(200); + const devices: Array<{ created_at: string; name: string }> = body; + const sorted = devices.sort((a, b) => + a.created_at.localeCompare(b.created_at), + ); + return ( + sorted.length === 2 && + sorted[0].name === name1 && + sorted[1].name === name2 + ); + }); + + // Assert the task records were updated as expected + for (const id of [task1.id, task2.id]) { + const task = await expectTask(pineTest, id, { + status: 'success', + error_message: null, + }); + + // Assert that timestamps are not null + strict(task.started_on__time, 'started_on__time is null'); + strict(task.ended_on__time, 'ended_on__time is null'); + + // Parse and check dates + expect(new Date(task.started_on__time)).to.be.instanceOf(Date); + expect(new Date(task.ended_on__time)).to.be.instanceOf(Date); + expect(new Date(task.ended_on__time).getTime()).to.be.greaterThan( + new Date(task.started_on__time).getTime(), + ); + } + }); + + it('should execute tasks with higher priority first', async () => { + // This task should be executed first + const name1 = randomUUID(); + const name2 = randomUUID(); + await Promise.all([ + createTask(pineTest, apikey, { + is_executed_by__handler: 'create_device', + is_executed_with__parameter_set: { + name: name1, + type: randomUUID(), }, - }) - .expect(201); + priority: 1, + }), + createTask(pineTest, apikey, { + is_executed_by__handler: 'create_device', + is_executed_with__parameter_set: { + name: name2, + type: randomUUID(), + }, + priority: 2, + }), + ]); - // Assert the expected record was created. + // Assert the device records were created in the expected order + await waitFor(async () => { + const { body } = await pineTest + .get({ + apiPrefix: 'example/', + resource: 'device', + options: { + $select: ['created_at', 'name'], + $filter: { + name: { + $in: [name1, name2], + }, + }, + }, + }) + .expect(200); + const devices: Array<{ created_at: string; name: string }> = body; + const sorted = devices.sort((a, b) => + a.created_at.localeCompare(b.created_at), + ); + return ( + sorted.length === 2 && + sorted[0].name === name2 && + sorted[1].name === name1 + ); + }); + }); + + it('should execute on specified future date', async () => { + // Create a task to create a new device record in 3s + const name = randomUUID(); + let task = await createTask(pineTest, apikey, { + is_executed_by__handler: 'create_device', + is_executed_with__parameter_set: { + name, + type: randomUUID(), + }, + is_scheduled_to_execute_on__time: new Date(Date.now() + 3000), + }); + + // Assert the task handler created the expected device record await waitFor(async () => { const { body: devices } = await pineTest .get({ @@ -95,30 +251,109 @@ describe('08 task tests', function () { return devices.length === 1; }); - // Assert the task record was updated as expected. - const { body: updatedTask } = await pineTest + // Assert the task record was updated as expected + task = await expectTask(pineTest, task.id, { + status: 'success', + error_message: null, + }); + + // Assert that timestamps are not null + strict(task.started_on__time, 'started_on__time is null'); + strict(task.ended_on__time, 'ended_on__time is null'); + strict( + task.is_scheduled_to_execute_on__time, + 'is_scheduled_to_execute_on__time is null', + ); + + // Parse dates once and reuse them + const started = new Date(task.started_on__time).getTime(); + const ended = new Date(task.ended_on__time).getTime(); + const scheduled = new Date( + task.is_scheduled_to_execute_on__time, + ).getTime(); + const created = new Date(task.created_at).getTime(); + + // Check if end time is greater than start time + expect(ended).to.be.greaterThan(started); + + // Check if the task was created before it was started + expect(created).to.be.lessThan(started); + + // Calculate the earliest and latest start times based on queue interval + const earliest = scheduled - tasksEnv.queueIntervalMS; + const latest = scheduled + tasksEnv.queueIntervalMS; + + // Check if the start time was within the expected range + expect(started) + .to.be.greaterThanOrEqual(earliest) + .and.lessThanOrEqual(latest); + }); + + it('should not immediately execute tasks scheduled to execute in the future', async () => { + // Create a task to create a new device record in 3s + const name = randomUUID(); + const task = await createTask(pineTest, apikey, { + is_executed_by__handler: 'create_device', + is_executed_with__parameter_set: { + name, + type: randomUUID(), + }, + is_scheduled_to_execute_on__time: new Date(Date.now() + 30000), + }); + + // Wait a few seconds to ensure the task is not executed immediately + await setTimeout(3000); + + // Assert the task record was not updated + await expectTask(pineTest, task.id, { + status: 'pending', + error_message: null, + started_on__time: null, + ended_on__time: null, + }); + + // Assert the scheduled device creation has not yet occurred + const { body: devices } = await pineTest .get({ - resource: 'task', - id: task.id, + apiPrefix: 'example/', + resource: 'device', options: { - $select: [ - 'started_on__time', - 'status', - 'ended_on__time', - 'error_message', - 'is_created_by__actor', - ], + $filter: { + name, + }, }, }) .expect(200); - expect(updatedTask.status).to.equal('success'); - expect(updatedTask.error_message).to.be.null; - expect(updatedTask.is_created_by__actor).to.equal(1); - expect(new Date(updatedTask.started_on__time)).to.be.instanceOf(Date); - expect(new Date(updatedTask.ended_on__time)).to.be.instanceOf(Date); - expect(new Date(updatedTask.ended_on__time).getTime()).to.be.greaterThan( - new Date(updatedTask.started_on__time).getTime(), + expect(devices.length).to.equal(0); + }); + + it('should set scheduled execution time when cron expression is provided', async () => { + // Create a task to create a new device record in 3s + const cron = '0 0 2,8,12,14 * * *'; + const expectedSchedule = cronParser.parseExpression(cron).next().toDate(); + const task = await createTask(pineTest, apikey, { + is_executed_by__handler: 'create_device', + is_executed_with__parameter_set: { + name: randomUUID(), + type: randomUUID(), + }, + is_scheduled_with__cron_expression: cron, + }); + + // Assert schedule properties are not null + strict( + task.is_scheduled_with__cron_expression, + 'is_scheduled_with__cron_expression is null', ); + strict( + task.is_scheduled_to_execute_on__time, + 'is_scheduled_to_execute_on__time is null', + ); + + // Check the calculated scheduled time matches the expected time + expect( + new Date(task.is_scheduled_to_execute_on__time).getTime(), + ).to.equal(expectedSchedule.getTime()); }); }); });