Skip to content

Commit

Permalink
HARMONY-2013: Check if a job is in a terminal state prior to attempti…
Browse files Browse the repository at this point in the history
…ng to process the STAC catalog for a work item update.
  • Loading branch information
chris-durbin committed Feb 12, 2025
1 parent 106f59a commit e3a1938
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -911,18 +911,22 @@ export async function processWorkItems(
'HWIUWJI.Job.byJobID',
logger))(tx, jobID, false, false, true);

const thisStep: WorkflowStep = await (await logAsyncExecutionTime(
getWorkflowStepByJobIdStepIndex,
'HWIUWJI.getWorkflowStepByJobIdStepIndex',
logger))(tx, jobID, workflowStepIndex);

const lastIndex = items.length - 1;
for (let index = 0; index < items.length; index++) {
const { preprocessResult, update } = items[index];
if (index < lastIndex) {
await processWorkItem(tx, preprocessResult, job, update, logger, false, thisStep);
} else {
await processWorkItem(tx, preprocessResult, job, update, logger, true, thisStep);
if (job.hasTerminalStatus()) {
logger.warn(`Ignoring work item updates for job ${jobID} in terminal state ${job.status}.`);
} else {
const thisStep: WorkflowStep = await (await logAsyncExecutionTime(
getWorkflowStepByJobIdStepIndex,
'HWIUWJI.getWorkflowStepByJobIdStepIndex',
logger))(tx, jobID, workflowStepIndex);

const lastIndex = items.length - 1;
for (let index = 0; index < items.length; index++) {
const { preprocessResult, update } = items[index];
if (index < lastIndex) {
await processWorkItem(tx, preprocessResult, job, update, logger, false, thisStep);
} else {
await processWorkItem(tx, preprocessResult, job, update, logger, true, thisStep);
}
}
}
});
Expand Down
17 changes: 16 additions & 1 deletion services/harmony/app/models/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { createPublicPermalink } from '../frontends/service-results';
import {
CmrPermission, CmrPermissionsMap, CmrTagKeys, getCollectionsByIds, getPermissions,
} from '../util/cmr';
import { Transaction } from '../util/db';
import db, { Transaction } from '../util/db';
import env from '../util/env';
import { ConflictError } from '../util/errors';
import { removeEmptyProperties } from '../util/object';
Expand Down Expand Up @@ -337,6 +337,21 @@ async function getUniqueProviderIds(tx: Transaction): Promise<string[]> {
return results.map((job) => job.provider_id);
}

/**
* Get the job status for the given job ID
*
* @param jobID - the job ID
* @returns the job status for the given job ID
*/
export async function getJobStatusForJobID(jobID: string): Promise<JobStatus> {
return (
await db('jobs')
.select('status')
.where({ jobID })
.first()
)?.status;
}

/**
* Sets the fields on the where clauses (see JobQuery) to be prefixed with a table name to avoid
* ambiguities when joining with other tables
Expand Down
29 changes: 17 additions & 12 deletions services/work-updater/app/workers/updater.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import { Logger } from 'winston';

import {
WorkItemUpdateQueueItem,
handleWorkItemUpdate,
preprocessWorkItem,
processWorkItems } from '../../../harmony/app/backends/workflow-orchestration/work-item-updates';
handleWorkItemUpdate, preprocessWorkItem, processWorkItems, WorkItemUpdateQueueItem,
} from '../../../harmony/app/backends/workflow-orchestration/work-item-updates';
import { getJobStatusForJobID, terminalStates } from '../../../harmony/app/models/job';
import { getJobIdForWorkItem } from '../../../harmony/app/models/work-item';
import { getWorkflowStepByJobIdStepIndex } from '../../../harmony/app/models/workflow-steps';
import db from '../../../harmony/app/util/db';
import { default as defaultLogger } from '../../../harmony/app/util/log';
import { logAsyncExecutionTime } from '../../../harmony/app/util/log-execution';
import { WorkItemQueueType } from '../../../harmony/app/util/queue/queue';
import { getQueueForType } from '../../../harmony/app/util/queue/queue-factory';
import sleep from '../../../harmony/app/util/sleep';
import { Worker } from '../../../harmony/app/workers/worker';
import env from '../util/env';
import { logAsyncExecutionTime } from '../../../harmony/app/util/log-execution';
import { getWorkflowStepByJobIdStepIndex } from '../../../harmony/app/models/workflow-steps';
import db from '../../../harmony/app/util/db';

/**
* Group work item updates by its workflow step and return the grouped work item updates
Expand Down Expand Up @@ -105,11 +105,16 @@ export async function handleBatchWorkItemUpdates(
}, {});
// process each job's updates
for (const jobID in jobUpdates) {
const startTime = Date.now();
logger.debug(`Processing ${jobUpdates[jobID].length} work item updates for job ${jobID}`);
await handleBatchWorkItemUpdatesWithJobId(jobID, jobUpdates[jobID], logger);
const endTime = Date.now();
logger.debug(`Processing ${jobUpdates[jobID].length} work item updates for job ${jobID} took ${endTime - startTime} ms`);
const jobStatus = await getJobStatusForJobID(jobID);
if (terminalStates.includes(jobStatus)) {
logger.warn(`Ignoring work item updates for job ${jobID} in terminal state ${jobStatus}.`);
} else {
const startTime = Date.now();
logger.debug(`Processing ${jobUpdates[jobID].length} work item updates for job ${jobID}`);
await handleBatchWorkItemUpdatesWithJobId(jobID, jobUpdates[jobID], logger);
const endTime = Date.now();
logger.debug(`Processing ${jobUpdates[jobID].length} work item updates for job ${jobID} took ${endTime - startTime} ms`);
}
}
}

Expand Down

0 comments on commit e3a1938

Please sign in to comment.