Skip to content

Commit

Permalink
Harmony 1999 (#696)
Browse files Browse the repository at this point in the history
* HARMONY-1999: Add test for job status with warning

* HARMONY-1999: Remove commented out code
  • Loading branch information
indiejames authored Feb 11, 2025
1 parent 563ea3d commit 106f59a
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 104 deletions.
Original file line number Diff line number Diff line change
@@ -1,25 +1,40 @@
import env from '../../util/env';
import { logAsyncExecutionTime } from '../../util/log-execution';
import _, { ceil, range, sum } from 'lodash';
import { v4 as uuid } from 'uuid';
import WorkItemUpdate from '../../models/work-item-update';
import WorkflowStep, { getWorkflowStepByJobIdStepIndex, updateIsComplete } from '../../models/workflow-steps';
import { Logger } from 'winston';
import _, { ceil, range, sum } from 'lodash';
import { JobStatus, Job } from '../../models/job';
import JobMessage, { getMessageCountForJob, getMessagesForJob, JobMessageLevel } from '../../models/job-message';

import {
calculateQueryCmrLimit, QUERY_CMR_SERVICE_REGEX,
} from '../../backends/workflow-orchestration/util';
import { makeWorkScheduleRequest } from '../../backends/workflow-orchestration/work-item-polling';
import { Job, JobStatus } from '../../models/job';
import JobLink, { getJobDataLinkCount } from '../../models/job-link';
import { incrementReadyCount, deleteUserWorkForJob, incrementReadyAndDecrementRunningCounts, decrementRunningCount } from '../../models/user-work';
import WorkItem, { maxSortIndexForJobService, workItemCountForStep, getWorkItemsByJobIdAndStepIndex, getWorkItemById, updateWorkItemStatus, getJobIdForWorkItem } from '../../models/work-item';
import { WorkItemStatus, COMPLETED_WORK_ITEM_STATUSES } from '../../models/work-item-interface';
import { outputStacItemUrls, handleBatching, resultItemSizes } from '../../util/aggregation-batch';
import db, { Transaction, batchSize } from '../../util/db';
import JobMessage, {
getMessageCountForJob, getMessagesForJob, JobMessageLevel,
} from '../../models/job-message';
import {
decrementRunningCount, deleteUserWorkForJob, incrementReadyAndDecrementRunningCounts,
incrementReadyCount,
} from '../../models/user-work';
import WorkItem, {
getJobIdForWorkItem, getWorkItemById, getWorkItemsByJobIdAndStepIndex, maxSortIndexForJobService,
updateWorkItemStatus, workItemCountForStep,
} from '../../models/work-item';
import { COMPLETED_WORK_ITEM_STATUSES, WorkItemStatus } from '../../models/work-item-interface';
import WorkItemUpdate from '../../models/work-item-update';
import WorkflowStep, {
getWorkflowStepByJobIdStepIndex, updateIsComplete,
} from '../../models/workflow-steps';
import { handleBatching, outputStacItemUrls, resultItemSizes } from '../../util/aggregation-batch';
import db, { batchSize, Transaction } from '../../util/db';
import env from '../../util/env';
import { ServiceError } from '../../util/errors';
import { completeJob } from '../../util/job';
import { logAsyncExecutionTime } from '../../util/log-execution';
import { objectStoreForProtocol } from '../../util/object-store';
import { StacItem, readCatalogItems, StacItemLink, StacCatalog, readCatalogsItems } from '../../util/stac';
import {
readCatalogItems, readCatalogsItems, StacCatalog, StacItem, StacItemLink,
} from '../../util/stac';
import { resolve } from '../../util/url';
import { QUERY_CMR_SERVICE_REGEX, calculateQueryCmrLimit } from '../../backends/workflow-orchestration/util';
import { makeWorkScheduleRequest } from '../../backends/workflow-orchestration/work-item-polling';

/**
* A structure holding the preprocess info of a work item
Expand Down Expand Up @@ -85,18 +100,20 @@ async function getFinalStatusAndMessageForJob(tx: Transaction, job: Job):
Promise<{ finalStatus: JobStatus, finalMessage: string; }> {
let finalStatus = JobStatus.SUCCESSFUL;
const errorCount = await getMessageCountForJob(tx, job.jobID, JobMessageLevel.ERROR);
const warningCount = await getMessageCountForJob(tx, job.jobID, JobMessageLevel.WARNING);
const dataLinkCount = await getJobDataLinkCount(tx, job.jobID);
if (errorCount > 0) {
if (errorCount + warningCount > 0) {
if (dataLinkCount > 0) {
finalStatus = JobStatus.COMPLETE_WITH_ERRORS;
} else {
finalStatus = JobStatus.FAILED;
}
}
let finalMessage = '';
if ((errorCount > 1) && (finalStatus == JobStatus.FAILED)) {
finalMessage = `The job failed with ${errorCount} errors. See the errors field for more details`;
} else if ((errorCount == 1) && (finalStatus == JobStatus.FAILED)) {
// TODO stop treating warnings as errors in HARMONY-1995
if ((errorCount + warningCount > 1) && (finalStatus == JobStatus.FAILED)) {
finalMessage = `The job failed with ${errorCount} errors and ${warningCount} warnings. See the errors and warnings fields for more details`;
} else if ((errorCount == 1 || warningCount == 1) && (finalStatus == JobStatus.FAILED)) {
const jobError = (await getMessagesForJob(tx, job.jobID, 1))[0];
finalMessage = jobError.message;
}
Expand Down Expand Up @@ -184,7 +201,11 @@ async function handleFailedWorkItems(
let jobMessage;

if (errorMessage) {
jobMessage = `WorkItem failed: ${errorMessage}`;
let failedOrWarned = 'failed';
if (status === WorkItemStatus.WARNING) {
failedOrWarned = 'warned';
}
jobMessage = `WorkItem ${failedOrWarned}: ${errorMessage}`;
}

if (QUERY_CMR_SERVICE_REGEX.test(workItem.serviceID)) {
Expand All @@ -199,14 +220,24 @@ async function handleFailedWorkItems(
jobMessage = 'WorkItem failed with an unknown error';
}

// TODO HARMONY-1995 set the level in this call to error or warning - just use error for now
await addJobMessageForWorkItem(tx, job, url, jobMessage);
let level: JobMessageLevel;
switch (status) {
case WorkItemStatus.FAILED:
level = JobMessageLevel.ERROR;
break;
case WorkItemStatus.WARNING:
level = JobMessageLevel.WARNING;
break;
}
await addJobMessageForWorkItem(tx, job, url, jobMessage, level, workItem.message_category);
}

if (continueProcessing) {
const errorCount = await getMessageCountForJob(tx, job.jobID);
if (errorCount > env.maxErrorsForJob) {
jobMessage = `Maximum allowed errors ${env.maxErrorsForJob} exceeded. See the errors field for more details`;
// TODO handle this properly in HARMONY-1995 - for now just treat warnings like errors
const warningCount = await getMessageCountForJob(tx, job.jobID, JobMessageLevel.WARNING);
if (errorCount + warningCount > env.maxErrorsForJob) {
jobMessage = `Maximum allowed errors and warnings ${env.maxErrorsForJob} exceeded. See the errors and warnings fields for more details`;
logger.warn(jobMessage);
continueProcessing = false;
}
Expand Down Expand Up @@ -762,7 +793,8 @@ export async function processWorkItem(
'HWIUWJI.getWorkflowStepByJobIdStepIndex',
logger))(tx, workItem.jobID, workItem.workflowStepIndex + 1);

if (nextWorkflowStep && (status !== WorkItemStatus.FAILED || nextWorkflowStep?.hasAggregatedOutput)) {
if (nextWorkflowStep && (![WorkItemStatus.FAILED, WorkItemStatus.WARNING].includes(status)
|| nextWorkflowStep?.hasAggregatedOutput)) {
didCreateWorkItem = await (await logAsyncExecutionTime(
createNextWorkItems,
'HWIUWJI.createNextWorkItems',
Expand Down
56 changes: 38 additions & 18 deletions services/harmony/app/frontends/jobs.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
import { Response, NextFunction } from 'express';
import { NextFunction, Response } from 'express';
import _ from 'lodash';
import { Logger } from 'winston';
import { Job, JobStatus, JobQuery, JobForDisplay, getRelatedLinks } from '../models/job';
import { keysToLowerCase } from '../util/object';
import { cancelAndSaveJob, pauseAndSaveJob, resumeAndSaveJob, skipPreviewAndSaveJob, validateJobId } from '../util/job';
import JobLink from '../models/job-link';
import { needsStacLink } from '../util/stac';
import { getRequestRoot } from '../util/url';
import { getCloudAccessJsonLink, getCloudAccessShLink, getJobStateChangeLinks, getStacCatalogLink, getStatusLink, Link } from '../util/links';
import { RequestValidationError, NotFoundError, ServerError } from '../util/errors';
import { getPagingParams, getPagingLinks, setPagingHeaders } from '../util/pagination';

import HarmonyRequest from '../models/harmony-request';
import { getRelatedLinks, Job, JobForDisplay, JobQuery, JobStatus } from '../models/job';
import JobLink from '../models/job-link';
import JobMessage, { getMessagesForJob, JobMessageLevel } from '../models/job-message';
import db from '../util/db';
import env from '../util/env';
import JobMessage, { getMessagesForJob } from '../models/job-message';
import _ from 'lodash';
import { isAdminUser } from '../util/edl-api';
import env from '../util/env';
import { NotFoundError, RequestValidationError, ServerError } from '../util/errors';
import {
cancelAndSaveJob, pauseAndSaveJob, resumeAndSaveJob, skipPreviewAndSaveJob, validateJobId,
} from '../util/job';
import {
getCloudAccessJsonLink, getCloudAccessShLink, getJobStateChangeLinks, getStacCatalogLink,
getStatusLink, Link,
} from '../util/links';
import { keysToLowerCase } from '../util/object';
import { getPagingLinks, getPagingParams, setPagingHeaders } from '../util/pagination';
import { needsStacLink } from '../util/stac';
import { getRequestRoot } from '../util/url';

/**
* Returns true if the job contains S3 direct access links
Expand Down Expand Up @@ -88,21 +94,35 @@ function getMessageForDisplay(job: JobForDisplay, urlRoot: string): string {
* @param job - the serialized job
* @param urlRoot - the root URL to be used when constructing links
* @param linkType - the type to use for data links (http|https|s3|none)
* @param errors - a list of errors for the job
* @param messages - a list of messages for the job
* @returns the job for display
*/
function getJobForDisplay(job: Job, urlRoot: string, linkType?: string, errors?: JobMessage[]): JobForDisplay {
function getJobForDisplay(job: Job, urlRoot: string, linkType?: string, messages?: JobMessage[]): JobForDisplay {
const serializedJob = job.serialize(urlRoot, linkType);
const statusLinkRel = linkType === 'none' ? 'item' : 'self';
serializedJob.links = getLinksForDisplay(serializedJob, urlRoot, statusLinkRel, job.destination_url);
if (!job.destination_url) {
serializedJob.message = getMessageForDisplay(serializedJob, urlRoot);
}

const errors = [];
const warnings = [];
for (const message of messages) {
if (message.level === JobMessageLevel.ERROR) {
errors.push(message);
} else {
warnings.push(message);
}
}

if (errors.length > 0) {
serializedJob.errors = errors.map((e) => _.pick(e, ['url', 'message'])) as JobMessage[];
}

if (warnings.length > 0) {
serializedJob.warnings = warnings.map((e) => _.pick(e, ['url', 'message'])) as JobMessage[];
}

return serializedJob;
}

Expand Down Expand Up @@ -171,11 +191,11 @@ export async function getJobStatus(
const { page, limit } = getPagingParams(req, env.defaultResultPageSize);
let job: Job;
let pagination;
let errors: JobMessage[];
let messages: JobMessage[];

await db.transaction(async (tx) => {
({ job, pagination } = await Job.byJobID(tx, jobID, true, true, false, page, limit));
errors = await getMessagesForJob(tx, jobID);
messages = await getMessagesForJob(tx, jobID);
});
if (!job) {
throw new NotFoundError(`Unable to find job ${jobID}`);
Expand All @@ -189,7 +209,7 @@ export async function getJobStatus(
const urlRoot = getRequestRoot(req);
const pagingLinks = getPagingLinks(req, pagination).map((link) => new JobLink(link));
job.links = job.links.concat(pagingLinks);
const jobForDisplay = getJobForDisplay(job, urlRoot, linkType, errors);
const jobForDisplay = getJobForDisplay(job, urlRoot, linkType, messages);
res.send(jobForDisplay);
} catch (e) {
req.context.logger.error(e);
Expand Down
46 changes: 32 additions & 14 deletions services/harmony/app/models/job.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
import _, { pick } from 'lodash';
import { Knex } from 'knex';
import { ILengthAwarePagination } from 'knex-paginate'; // For types only
import _, { pick } from 'lodash';
import { Logger } from 'winston';
import { createMachine } from 'xstate';
import { CmrPermission, CmrPermissionsMap, getCollectionsByIds, getPermissions, CmrTagKeys } from '../util/cmr';
import { removeEmptyProperties } from '../util/object';
import { ConflictError } from '../util/errors';
import { createPublicPermalink } from '../frontends/service-results';

import { truncateString } from '@harmony/util/string';
import DBRecord from './record';

import { createPublicPermalink } from '../frontends/service-results';
import {
CmrPermission, CmrPermissionsMap, CmrTagKeys, getCollectionsByIds, getPermissions,
} from '../util/cmr';
import { Transaction } from '../util/db';
import env from '../util/env';
import { ConflictError } from '../util/errors';
import { removeEmptyProperties } from '../util/object';
import JobLink, { getLinksForJob, JobLinkOrRecord } from './job-link';
import JobMessage from './job-message';
import { getLabelsForJob, JOBS_LABELS_TABLE, LABELS_TABLE, setLabelsForJob } from './label';
import DBRecord from './record';
import { setReadyCountToZero } from './user-work';
import WorkflowStep, { getWorkflowStepsByJobId } from './workflow-steps';

// how long data generated by this job will be available
export const EXPIRATION_DAYS = 30;

export const TEXT_LIMIT = 4096; // this.request and this.message need to be under the 4,096 char limit

import env from '../util/env';
import JobMessage from './job-message';
import { setReadyCountToZero } from './user-work';
import { Knex } from 'knex';
import { Logger } from 'winston';
import { LABELS_TABLE, JOBS_LABELS_TABLE, getLabelsForJob, setLabelsForJob } from './label';
const { awsDefaultRegion } = env;

// Lazily load the list of unique provider ids, once, when requested
Expand Down Expand Up @@ -69,7 +73,7 @@ export interface JobRecord {
progress?: number;
batchesCompleted?: number;
links?: JobLinkOrRecord[];
errors?: JobMessage[];
messages?: JobMessage[];
request: string;
isAsync?: boolean;
ignoreErrors?: boolean;
Expand Down Expand Up @@ -112,6 +116,8 @@ export class JobForDisplay {

errors?: JobMessage[];

warnings?: JobMessage[];

}

export interface JobQuery {
Expand Down Expand Up @@ -406,7 +412,7 @@ export class Job extends DBRecord implements JobRecord {

links: JobLink[];

errors: JobMessage[];
messages: JobMessage[];

private statesToMessages: { [key in JobStatus]?: string };

Expand Down Expand Up @@ -1093,6 +1099,18 @@ export class Job extends DBRecord implements JobRecord {
}
await Promise.all(promises);
await setLabelsForJob(tx, this.jobID, this.username, this.labels);
if (this.messages) {
const messagePromises = [];
for (const message of this.messages) {
// Note we will not update existing messages in the database - only add new ones;
if (!message.id) {
message.jobID = this.jobID;
messagePromises.push(message.save(tx));
}
}
await Promise.all(messagePromises);
}

}

/**
Expand Down
17 changes: 10 additions & 7 deletions services/harmony/test/helpers/jobs.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import request, { Test } from 'supertest';
import { it } from 'mocha';
import { expect } from 'chai';
import { v4 as uuid } from 'uuid';
import _ from 'lodash';
import JobLink from '../../app/models/job-link';
import { Job, JobStatus, JobRecord, jobRecordFields, JobForDisplay, getRelatedLinks, JobQuery } from '../../app/models/job';
import { it } from 'mocha';
import request, { Test } from 'supertest';
import { v4 as uuid } from 'uuid';

import { JobListing } from '../../app/frontends/jobs';
import {
getRelatedLinks, Job, JobForDisplay, JobQuery, JobRecord, jobRecordFields, JobStatus,
} from '../../app/models/job';
import JobLink from '../../app/models/job-link';
import { RecordConstructor } from '../../app/models/record';
import db, { Transaction } from '../../app/util/db';
import { hookRequest } from './hooks';
import { truncateAll } from './db';
import { RecordConstructor } from '../../app/models/record';
import { hookRequest } from './hooks';

export const adminUsername = 'adam';

Expand Down
Loading

0 comments on commit 106f59a

Please sign in to comment.