Skip to content

Commit

Permalink
Merge pull request #977 from JupiterOne/INT-9336-dont-flush-after-steps
Browse files Browse the repository at this point in the history
Don't flush after step completes
  • Loading branch information
Gonzalo-Avalos-Ribas authored Jun 24, 2024
2 parents aa9dfab + 8c0b69e commit 1885d30
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 173 deletions.
11 changes: 10 additions & 1 deletion packages/integration-sdk-core/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,16 @@ export class IntegrationLocalConfigFieldMissingError extends IntegrationError {
});
}
}

export class UploadError extends IntegrationError {
readonly typesInvolved: string[] | undefined;
constructor(message: string, typesInvolved?: string[]) {
super({
code: 'UPLOAD_ERROR',
message,
});
this.typesInvolved = typesInvolved;
}
}
export class IntegrationLocalConfigFieldTypeMismatchError extends IntegrationError {
constructor(message: string) {
super({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ describe('executeStepDependencyGraph', () => {
expect(spyB).toHaveBeenCalledBefore(spyC);
});

test('should mark steps with failed executionHandlers with status FAILURE and dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => {
test('should throw if upload fails', async () => {
const spyA = jest.fn();
const spyB = jest.fn();
const spyC = jest.fn();
Expand Down Expand Up @@ -715,18 +715,19 @@ describe('executeStepDependencyGraph', () => {

function createFailingUploader(
stepId: string,
collector: FlushedGraphObjectData[],
): StepGraphObjectDataUploader {
return {
stepId,
async enqueue() {
async enqueue(graphObjectData) {
collector.push(graphObjectData);
return Promise.resolve();
},
waitUntilUploadsComplete() {
return Promise.reject(new Error('expected upload wait failure'));
return Promise.reject(new Error('Expected error'));
},
};
}

const passingUploaderCollector: FlushedGraphObjectData[] = [];

/**
Expand All @@ -737,73 +738,23 @@ describe('executeStepDependencyGraph', () => {
* 'b' depends on 'a',
* 'c' depends on 'b'
*/
const result = await executeSteps(
steps,
stepStartStates,
graphObjectStore,
(stepId) => {
if (stepId === 'b') {
return createFailingUploader(stepId);
} else {
return createPassingUploader(stepId, passingUploaderCollector);
await expect(
executeSteps(steps, stepStartStates, graphObjectStore, (stepId) => {
if (stepId == 'c') {
return createFailingUploader(stepId, passingUploaderCollector);
}
},
);
return createPassingUploader(stepId, passingUploaderCollector);
}),
).rejects.toThrow();

const expectedCollected: FlushedGraphObjectData[] = [
{
entities: [eA],
relationships: [],
},
{
entities: [eC],
entities: [eA, eB, eC],
relationships: [],
},
];

expect(passingUploaderCollector).toEqual(expectedCollected);

expect(result).toEqual([
{
id: 'a',
name: 'a',
declaredTypes: [],
partialTypes: [],
encounteredTypes: [eA._type],
encounteredTypeCounts: expect.any(Object),
status: StepResultStatus.SUCCESS,
startTime: expect.any(Number),
endTime: expect.any(Number),
duration: expect.any(Number),
},
{
id: 'b',
name: 'b',
declaredTypes: [],
partialTypes: [],
encounteredTypes: [eB._type],
encounteredTypeCounts: expect.any(Object),
dependsOn: ['a'],
status: StepResultStatus.FAILURE,
startTime: expect.any(Number),
endTime: expect.any(Number),
duration: expect.any(Number),
},
{
id: 'c',
name: 'c',
declaredTypes: [],
partialTypes: [],
encounteredTypes: [eC._type],
encounteredTypeCounts: expect.any(Object),
dependsOn: ['b'],
status: StepResultStatus.PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE,
startTime: expect.any(Number),
endTime: expect.any(Number),
duration: expect.any(Number),
},
]);

expect(spyA).toHaveBeenCalledTimes(1);
expect(spyB).toHaveBeenCalledTimes(1);
expect(spyC).toHaveBeenCalledTimes(1);
Expand Down
75 changes: 70 additions & 5 deletions packages/integration-sdk-runtime/src/execution/dependencyGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
StepResultStatus,
StepStartStates,
StepExecutionHandlerWrapperFunction,
UploadError,
} from '@jupiterone/integration-sdk-core';

import { timeOperation } from '../metrics';
Expand Down Expand Up @@ -145,9 +146,17 @@ export function executeStepDependencyGraph<
startTime?: number;
endTime?: number;
duration?: number;
partialTypes?: string[];
}) {
const { stepId, status, typeTracker, startTime, endTime, duration } =
params;
const {
stepId,
status,
typeTracker,
startTime,
endTime,
duration,
partialTypes,
} = params;
const existingResult = stepResultsMap.get(stepId);
if (existingResult) {
stepResultsMap.set(stepId, {
Expand All @@ -162,6 +171,9 @@ export function executeStepDependencyGraph<
startTime,
endTime,
duration,
partialTypes: Array.from(
new Set(existingResult.partialTypes.concat(partialTypes ?? [])),
),
});
}
}
Expand Down Expand Up @@ -417,9 +429,7 @@ export function executeStepDependencyGraph<

status = StepResultStatus.FAILURE;
}

await context.jobState.flush();

let possibleAdditionalPartialTypes: string[] | undefined = undefined;
if (context.jobState.waitUntilUploadsComplete) {
try {
// Failing to upload all integration data should not be considered a
Expand All @@ -429,6 +439,9 @@ export function executeStepDependencyGraph<
} catch (err) {
context.logger.stepFailure(step, err);
status = StepResultStatus.FAILURE;
if (err instanceof UploadError) {
possibleAdditionalPartialTypes = err.typesInvolved;
}
}
}

Expand All @@ -439,6 +452,7 @@ export function executeStepDependencyGraph<
startTime,
endTime: Date.now(),
duration: Date.now() - startTime,
partialTypes: possibleAdditionalPartialTypes,
});
enqueueLeafSteps();
}
Expand Down Expand Up @@ -486,11 +500,62 @@ export function executeStepDependencyGraph<

return status;
}

async function forceFlushEverything() {
/** Instead of flushing after each step, flush only when we finish all steps OR when we reach the threshold limit
* Because the 'createStepGraphObjectDataUploader' needs a step I'm using the last step as it
*/
let uploader: StepGraphObjectDataUploader | undefined;
const lastStep = Array.from(stepResultsMap.keys()).pop() as string;
if (createStepGraphObjectDataUploader) {
uploader = createStepGraphObjectDataUploader(lastStep);
}
await graphObjectStore.flush(
async (entities) =>
entities.length
? uploader?.enqueue({
entities,
relationships: [],
})
: undefined,
async (relationships) =>
relationships.length
? uploader?.enqueue({
entities: [],
relationships,
})
: undefined,
);
try {
await uploader?.waitUntilUploadsComplete();
} catch (err) {
executionContext.logger.stepFailure(
workingGraph.getNodeData(lastStep),
err,
);
if (err instanceof UploadError) {
updateStepResultStatus({
stepId: lastStep,
status: StepResultStatus.FAILURE,
typeTracker,
partialTypes: err.typesInvolved, //We mark as partial all types related to the failed uploads
});
} else {
updateStepResultStatus({
stepId: lastStep,
status: StepResultStatus.FAILURE,
typeTracker,
});
}
}
}

// kick off work for all leaf nodes
enqueueLeafSteps();

void promiseQueue
.onIdle()
.then(forceFlushEverything)
.then(() => resolve([...stepResultsMap.values()]))
.catch(reject);
});
Expand Down
47 changes: 47 additions & 0 deletions packages/integration-sdk-runtime/src/execution/uploader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { createApiClient, getApiBaseUrl } from '../api';
import { generateSynchronizationJob } from '../synchronization/__tests__/util/generateSynchronizationJob';
import { createMockIntegrationLogger } from '../../test/util/fixtures';
import { getExpectedRequestHeaders } from '../../test/util/request';
import { UploadError } from '@jupiterone/integration-sdk-core';

function createFlushedGraphObjectData(): FlushedGraphObjectData {
return {
Expand Down Expand Up @@ -135,6 +136,52 @@ describe('#createQueuedStepGraphObjectDataUploader', () => {

expect(uploaded).toEqual([flushed[0], flushed[2], flushedAfterFailure]);
});

test('should throw UploadError with types involved', async () => {
const uploaded: FlushedGraphObjectData[] = [];
const stepId = uuid();

let numQueued = 0;

const uploader = createQueuedStepGraphObjectDataUploader({
stepId,
uploadConcurrency: 2,
async upload(d) {
numQueued++;

if (numQueued === 2) {
await sleep(100);
throw new Error('expected upload error');
} else {
await sleep(200);
uploaded.push(d);
}
},
});

const flushed = await createAndEnqueueUploads(uploader, 3);

// Ensure that the next enqueue happens _after_ a failure has occurred.
await sleep(300);
const flushedAfterFailure = createFlushedGraphObjectData();
await uploader.enqueue(flushedAfterFailure);

try {
await uploader.waitUntilUploadsComplete();
} catch (error) {
expect(error).toBeInstanceOf(UploadError);
flushed[1].entities.forEach((entity) =>
expect((error as UploadError).typesInvolved as string[]).toInclude(
entity._type,
),
);
flushed[1].relationships.forEach((relationship) =>
expect((error as UploadError).typesInvolved as string[]).toInclude(
relationship._type,
),
);
}
});
});

describe('#createPersisterApiStepGraphObjectDataUploader', () => {
Expand Down
25 changes: 16 additions & 9 deletions packages/integration-sdk-runtime/src/execution/uploader.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IntegrationError } from '@jupiterone/integration-sdk-core';
import { UploadError } from '@jupiterone/integration-sdk-core';
import PQueue from 'p-queue/dist';
import { FlushedGraphObjectData } from '../storage/types';
import {
Expand Down Expand Up @@ -37,7 +37,7 @@ export function createQueuedStepGraphObjectDataUploader({

let completed = false;
const uploadErrors: Error[] = [];

const typesInvolvedInFailures = new Set<string>();
return {
stepId,
async enqueue(graphObjectData) {
Expand Down Expand Up @@ -76,6 +76,16 @@ export function createQueuedStepGraphObjectDataUploader({
// cases where this could cause an issue (e.g. a relationship getting
// uploaded that references an entity that failed to upload).
uploadErrors.push(err);
if (graphObjectData) {
graphObjectData.entities.forEach(
(entity) => typesInvolvedInFailures.add(entity._type),
typesInvolvedInFailures,
);
graphObjectData.relationships.forEach(
(relationship) => typesInvolvedInFailures.add(relationship._type),
typesInvolvedInFailures,
);
}
});
},

Expand All @@ -93,15 +103,12 @@ export function createQueuedStepGraphObjectDataUploader({
}

if (uploadErrors.length) {
throw new IntegrationError({
code: 'UPLOAD_ERROR',
message: `Error(s) uploading graph object data (stepId=${stepId}, errorMessages=${uploadErrors.join(
throw new UploadError(
`Error(s) uploading graph object data (stepId=${stepId}, errorMessages=${uploadErrors.join(
',',
)})`,
// Just include the first error cause. We should be able to gather
// additional information from the joined error messages.
cause: uploadErrors[0],
});
Array.from(typesInvolvedInFailures.values()),
);
}
},
};
Expand Down
Loading

0 comments on commit 1885d30

Please sign in to comment.