Skip to content

Commit

Permalink
Validate task instance during task claiming
Browse files Browse the repository at this point in the history
  • Loading branch information
JiaweiWu committed Jan 20, 2025
1 parent 1d90b39 commit ef97e4e
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 3 deletions.
94 changes: 94 additions & 0 deletions x-pack/platform/plugins/shared/task_manager/server/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import { ObjectType, schema, TypeOf } from '@kbn/config-schema';
import { isNumber } from 'lodash';
import moment from 'moment';
import { isErr, tryAsResult } from './lib/result_type';
import { Interval, isInterval, parseIntervalAsMillisecond } from './lib/intervals';
import { DecoratedError } from './task_running';
Expand Down Expand Up @@ -239,6 +240,99 @@ export enum TaskLifecycleResult {
}

export type TaskLifecycle = TaskStatus | TaskLifecycleResult;

const validateDate = (date: Date, field: string) => {
if (!moment(date).isValid()) {
return `Invalid ${field} date "${date}".`;
}
};

export const taskInstanceSchema = schema.object({
id: schema.maybe(schema.string()),
taskType: schema.string(),
scheduledAt: schema.maybe(
schema.any({
validate: (scheduledAt) => validateDate(scheduledAt, 'scheduledAt'),
})
),
startedAt: schema.maybe(
schema.nullable(
schema.any({
validate: (startedAt) => validateDate(startedAt, 'startedAt'),
})
)
),
retryAt: schema.maybe(
schema.nullable(
schema.any({
validate: (retryAt) => validateDate(retryAt, 'retryAt'),
})
)
),
runAt: schema.maybe(
schema.any({
validate: (runAt) => validateDate(runAt, 'runAt'),
})
),
schedule: schema.maybe(
schema.object({
interval: schema.string({
validate: (interval) => {
if (!isInterval(interval)) {
return `Invalid schedule interval "${interval}": interval must be of the form "{number}{cadance}" where number is an integer. Example: 5m.`;
}
},
}),
})
),
params: schema.recordOf(schema.string(), schema.any()),
state: schema.recordOf(schema.string(), schema.any()),
stateVersion: schema.maybe(schema.number()),
traceparent: schema.maybe(schema.string()),
user: schema.maybe(schema.string()),
scope: schema.maybe(schema.arrayOf(schema.string())),
ownderId: schema.maybe(schema.nullable(schema.string())),
enabled: schema.maybe(schema.boolean()),
timeoutOverride: schema.maybe(schema.string()),
partition: schema.maybe(schema.number()),
});

export const concreteTaskInstanceSchema = taskInstanceSchema.extends({
id: schema.string(),
interval: schema.maybe(schema.string()),
numSkippedRuns: schema.maybe(schema.number()),
version: schema.maybe(schema.string()),
scheduledAt: schema.any({
validate: (scheduledAt) => validateDate(scheduledAt, 'scheduledAt'),
}),
attempts: schema.number(),
status: schema.oneOf([
schema.literal(TaskStatus.Idle),
schema.literal(TaskStatus.Claiming),
schema.literal(TaskStatus.Running),
schema.literal(TaskStatus.Failed),
schema.literal(TaskStatus.ShouldDelete),
schema.literal(TaskStatus.Unrecognized),
schema.literal(TaskStatus.DeadLetter),
]),
runAt: schema.any({
validate: (runAt) => validateDate(runAt, 'runAt'),
}),
startedAt: schema.nullable(
schema.any({
validate: (startedAt) => validateDate(startedAt, 'startedAt'),
})
),
retryAt: schema.nullable(
schema.any({
validate: (retryAt) => validateDate(retryAt, 'retryAt'),
})
),
state: schema.recordOf(schema.string(), schema.any()),
ownerId: schema.nullable(schema.string()),
partition: schema.maybe(schema.number()),
});

export interface IntervalSchedule {
/**
* An interval in minutes (e.g. '5m'). If specified, this is a recurring task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1620,6 +1620,125 @@ describe('TaskClaiming', () => {
expect(store.bulkGet).not.toHaveBeenCalled();
});

test('should handle malformed errors when claiming tasks', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));

const fetchedTasks = [
mockInstance({ id: `id-1`, taskType: 'report' }),
mockInstance({ id: `id-2`, taskType: 'report' }),
mockInstance({
id: `id-3`,
taskType: 'yawn',
schedule: {
interval: 'PT1M',
},
}),
mockInstance({ id: `id-4`, taskType: 'report' }),
];

const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks);
store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap });
store.getDocVersions.mockResolvedValueOnce(docLatestVersions);
store.bulkPartialUpdate.mockResolvedValueOnce(
[fetchedTasks[0], fetchedTasks[1], fetchedTasks[3]].map(getPartialUpdateResult)
);
store.bulkGet.mockResolvedValueOnce([
asOk(fetchedTasks[0]),
asOk(fetchedTasks[1]),
asOk(fetchedTasks[3]),
]);

const taskClaiming = new TaskClaiming({
logger: taskManagerLogger,
strategy: CLAIM_STRATEGY_MGET,
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
});

const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({
claimOwnershipUntil: new Date(),
});

if (!isOk<ClaimOwnershipResult, FillPoolResult>(resultOrErr)) {
expect(resultOrErr).toBe(undefined);
}

const result = unwrap(resultOrErr) as ClaimOwnershipResult;

expect(apm.startTransaction).toHaveBeenCalledWith(
TASK_MANAGER_MARK_AS_CLAIMED,
TASK_MANAGER_TRANSACTION_TYPE
);
expect(mockApmTrans.end).toHaveBeenCalledWith('success');

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(taskManagerLogger.error).toHaveBeenCalledWith(
'Error validating task schema id-3:yawn during claim: {"cause":{"path":["schedule","interval"]}}',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
size: 40,
seq_no_primary_term: true,
});

expect(store.getDocVersions).toHaveBeenCalledWith([
'task:id-1',
'task:id-2',
'task:id-3',
'task:id-4',
]);

expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1);
expect(store.bulkPartialUpdate).toHaveBeenCalledWith([
{
id: fetchedTasks[0].id,
version: fetchedTasks[0].version,
scheduledAt: fetchedTasks[0].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
id: fetchedTasks[1].id,
version: fetchedTasks[1].version,
scheduledAt: fetchedTasks[1].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
id: fetchedTasks[3].id,
version: fetchedTasks[3].version,
scheduledAt: fetchedTasks[3].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
]);

expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-4']);

expect(result.stats.tasksUpdated).toEqual(3);
expect(result.stats.tasksClaimed).toEqual(3);
expect(result.stats.tasksErrors).toEqual(1);
});

test('it should filter for specific partitions and tasks without partitions', async () => {
const taskManagerId = uuidv4();
const definitions = new TaskTypeDictionary(mockLogger());
Expand Down Expand Up @@ -2253,8 +2372,6 @@ function mockInstance(instance: Partial<ConcreteTaskInstance> = {}) {
{
id: uuidv4(),
taskType: 'bar',
sequenceNumber: 32,
primaryTerm: 32,
runAt: new Date(),
scheduledAt: new Date(),
startedAt: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
ConcreteTaskInstanceVersion,
TaskCost,
PartialConcreteTaskInstance,
concreteTaskInstanceSchema,
} from '../task';
import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running';
import { TASK_MANAGER_MARK_AS_CLAIMED } from '../queries/task_claiming';
Expand Down Expand Up @@ -152,6 +153,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
// apply capacity constraint to candidate tasks
const tasksToRun: ConcreteTaskInstance[] = [];
const leftOverTasks: ConcreteTaskInstance[] = [];
const tasksWithMalformedData: ConcreteTaskInstance[] = [];

let capacityAccumulator = 0;
for (const task of candidateTasks) {
Expand All @@ -169,6 +171,17 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
const now = new Date();
const taskUpdates: PartialConcreteTaskInstance[] = [];
for (const task of tasksToRun) {
try {
concreteTaskInstanceSchema.validate(task);
} catch (error) {
logger.error(
`Error validating task schema ${task.id}:${task.taskType} during claim: ${JSON.stringify(
error
)}`
);
tasksWithMalformedData.push(task);
continue;
}
taskUpdates.push({
id: task.id,
version: task.version,
Expand Down Expand Up @@ -237,7 +250,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
tasksConflicted: conflicts,
tasksClaimed: fullTasksToRun.length,
tasksLeftUnclaimed: leftOverTasks.length,
tasksErrors: bulkUpdateErrors + bulkGetErrors,
tasksErrors: bulkUpdateErrors + bulkGetErrors + tasksWithMalformedData.length,
staleTasks: staleTasks.length,
},
docs: fullTasksToRun,
Expand Down

0 comments on commit ef97e4e

Please sign in to comment.