Skip to content

Commit

Permalink
Merge pull request #808 from zapier/pde-5125/handle-lambda-payloads
Browse files Browse the repository at this point in the history
PDE-5125 - feat(core) Handle large response payloads
  • Loading branch information
standielpls authored Jun 26, 2024
2 parents 0eb608b + 238c2b7 commit 151dadf
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 79 deletions.
33 changes: 21 additions & 12 deletions packages/core/src/app-middlewares/after/large-response-cacher.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,28 @@

const constants = require('../../constants');
const cleaner = require('../../tools/cleaner');
const responseStasher = require('../../tools/create-response-stasher');

/*
TODO: Some services _do not_ enjoy having 6mb+ responses returned, so
we may need to user/request pre-signed S3 URLs (or somewhere else?)
to stash the large response, and return a pointer.
*/
const largeResponseCachePointer = (output) => {
const size = JSON.stringify(cleaner.maskOutput(output)).length;
if (size > constants.RESPONSE_SIZE_LIMIT) {
console.log(
`Oh no! Payload is ${size}, which is larger than ${constants.RESPONSE_SIZE_LIMIT}.`
);
// TODO: use envelope feature and to build RPC to get signed S3 upload URL.
const largeResponseCachePointer = async (output) => {
const response = cleaner.maskOutput(output);

const autostashLimit = output.input._zapier.event.autostashPayloadOutputLimit;

const payload = JSON.stringify(response.results);
const size = payload.length;

// If autostash limit is defined, and is within the range, stash the response
// If it is -1, stash the response regardless of size
// If the limit is defined and is out of range, let lambda deal with it
if (
(autostashLimit &&
size >= constants.RESPONSE_SIZE_LIMIT &&
size <= autostashLimit) ||
autostashLimit === -1
) {
const url = await responseStasher(output.input, payload, size);
output.resultsUrl = url;
output.results = Array.isArray(output.results) ? [] : {};
}
return output;
};
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/tools/cleaner.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ const createBundleBank = (appRaw, event = {}, serializeFunc = (x) => x) => {
}, {});
};

const maskOutput = (output) => _.pick(output, 'results', 'status');
const maskOutput = (output) =>
_.pick(output, 'results', 'status', 'resultsUrl');

// These normalize functions are called after the initial before middleware that
// cleans the request. The reason is that we need to know why a value is empty
Expand Down
70 changes: 5 additions & 65 deletions packages/core/src/tools/create-file-stasher.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,11 @@ const { randomBytes } = require('crypto');

const _ = require('lodash');
const contentDisposition = require('content-disposition');
const FormData = require('form-data');

const mime = require('mime-types');

const request = require('./request-client-internal');
const { UPLOAD_MAX_SIZE, NON_STREAM_UPLOAD_MAX_SIZE } = require('../constants');

const LENGTH_ERR_MESSAGE =
'We could not calculate the length of your file - please ' +
'pass a knownLength like z.stashFile(f, knownLength)';
const uploader = require('./uploader');

const DEFAULT_FILE_NAME = 'unnamedfile';
const DEFAULT_CONTENT_TYPE = 'application/octet-stream';
Expand Down Expand Up @@ -175,64 +171,6 @@ const resolveToBufferStringStream = async (responseOrData) => {
);
};

const uploader = async (
signedPostData,
bufferStringStream,
knownLength,
filename,
contentType
) => {
filename = path.basename(filename).replace('"', '');

const fields = {
...signedPostData.fields,
'Content-Disposition': contentDisposition(filename),
'Content-Type': contentType,
};

const form = new FormData();

Object.entries(fields).forEach(([key, value]) => {
form.append(key, value);
});

form.append('file', bufferStringStream, {
knownLength,
contentType,
filename,
});

// Try to catch the missing length early, before upload to S3 fails.
try {
form.getLengthSync();
} catch (err) {
throw new Error(LENGTH_ERR_MESSAGE);
}

// Send to S3 with presigned request.
const response = await request({
url: signedPostData.url,
method: 'POST',
body: form,
});

if (response.status === 204) {
return new URL(signedPostData.fields.key, signedPostData.url).href;
}

if (
response.content &&
response.content.includes &&
response.content.includes(
'You must provide the Content-Length HTTP header.'
)
) {
throw new Error(LENGTH_ERR_MESSAGE);
}

throw new Error(`Got ${response.status} - ${response.content}`);
};

const ensureUploadMaxSizeNotExceeded = (streamOrData, length) => {
let uploadMaxSize = NON_STREAM_UPLOAD_MAX_SIZE;
let uploadMethod = 'non-streaming';
Expand All @@ -242,7 +180,9 @@ const ensureUploadMaxSizeNotExceeded = (streamOrData, length) => {
}

if (length && length > uploadMaxSize) {
throw new Error(`${length} bytes is too big, ${uploadMaxSize} is the max for ${uploadMethod} data.`);
throw new Error(
`${length} bytes is too big, ${uploadMaxSize} is the max for ${uploadMethod} data.`
);
}
};

Expand Down
40 changes: 40 additions & 0 deletions packages/core/src/tools/create-response-stasher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';

const _ = require('lodash');
const uploader = require('./uploader');
const crypto = require('crypto');

const withRetry = async (fn, retries = 3, delay = 100, attempt = 0) => {
try {
return await fn();
} catch (error) {
if (attempt >= retries) {
throw error;
}

await new Promise((resolve) => setTimeout(resolve, delay));
return withRetry(fn, retries, delay, attempt + 1);
}
};

// responseStasher uploads the data and returns the URL that points to that data.
const stashResponse = async (input, response, size) => {
const rpc = _.get(input, '_zapier.rpc');

if (!rpc) {
throw new Error('rpc is not available');
}
const signedPostData = await rpc('get_presigned_upload_post_data');
return withRetry(
_.partial(
uploader,
signedPostData,
response.toString(), // accept JSON string to send to uploader.
size,
crypto.randomUUID() + '.txt',
'text/plain'
)
);
};

module.exports = stashResponse;
69 changes: 69 additions & 0 deletions packages/core/src/tools/uploader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
const path = require('path');

const FormData = require('form-data');
const contentDisposition = require('content-disposition');

const request = require('./request-client-internal');
const LENGTH_ERR_MESSAGE =
'We could not calculate the length of your file - please ' +
'pass a knownLength like z.stashFile(f, knownLength)';

const uploader = async (
signedPostData,
bufferStringStream,
knownLength,
filename,
contentType
) => {
filename = path.basename(filename).replace('"', '');

const fields = {
...signedPostData.fields,
'Content-Disposition': contentDisposition(filename),
'Content-Type': contentType,
};

const form = new FormData();

Object.entries(fields).forEach(([key, value]) => {
form.append(key, value);
});

form.append('file', bufferStringStream, {
knownLength,
contentType,
filename,
});

// Try to catch the missing length early, before upload to S3 fails.
try {
form.getLengthSync();
} catch (err) {
throw new Error(LENGTH_ERR_MESSAGE);
}

// Send to S3 with presigned request.
const response = await request({
url: signedPostData.url,
method: 'POST',
body: form,
});

if (response.status === 204) {
return new URL(signedPostData.fields.key, signedPostData.url).href;
}

if (
response.content &&
response.content.includes &&
response.content.includes(
'You must provide the Content-Length HTTP header.'
)
) {
throw new Error(LENGTH_ERR_MESSAGE);
}

throw new Error(`Got ${response.status} - ${response.content}`);
};

module.exports = uploader;
111 changes: 110 additions & 1 deletion packages/core/test/app-middleware.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
'use strict';

const createApp = require('../src/create-app');
const createInput = require('../src/tools/create-input');
const dataTools = require('../src/tools/data');
const {
makeRpc,
mockRpcGetPresignedPostCall,
mockUpload,
} = require('./tools/mocky');
const exampleAppDefinition = require('./userapp');

describe('app middleware', () => {
Expand Down Expand Up @@ -65,4 +69,109 @@ describe('app middleware', () => {
})
.catch(done);
});

describe('large-response-cacher', async () => {
it('after middleware should stash large payloads', async () => {
const rpc = makeRpc();
mockRpcGetPresignedPostCall('1234/foo.json');
mockUpload();

const appDefinition = dataTools.deepCopy(exampleAppDefinition);

const app = createApp(appDefinition);

// We are gonna invoke this method, but the after middleware is gonna
// change the result returned to something else
const input = createTestInput(
'resources.really_big_response.list.operation.perform',
appDefinition
);
input._zapier.rpc = rpc;

// set the payload autostash limit
input._zapier.event.autostashPayloadOutputLimit = 11 * 1024 * 1024;

const output = await app(input);
output.resultsUrl.should.eql('https://s3-fake.zapier.com/1234/foo.json');
});
it('should not stash if payload is bigger than autostash limit', async () => {
const rpc = makeRpc();
mockRpcGetPresignedPostCall('1234/foo.json');
mockUpload();

const appDefinition = dataTools.deepCopy(exampleAppDefinition);

const app = createApp(appDefinition);

// returns 10mb of response
const input = createTestInput(
'resources.really_big_response.list.operation.perform',
appDefinition
);
input._zapier.rpc = rpc;

// set the payload autostash limit
// this limit is lower than res, so do not stash, let it fail
input._zapier.event.autostashPayloadOutputLimit = 8 * 1024 * 1024;

const output = app(input);
output.should.not.have.property('resultsUrl');
});
it('should always stash if autostash limit is -1', async () => {
const rpc = makeRpc();
mockRpcGetPresignedPostCall('1234/foo.json');
mockUpload();

const appDefinition = dataTools.deepCopy(exampleAppDefinition);

const app = createApp(appDefinition);

// returns regular response
const input = createTestInput(
'resources.list.list.operation.perform',
appDefinition
);
input._zapier.rpc = rpc;

// set the payload autostash limit
// this limit is lower than res, so do not stash, let it fail
input._zapier.event.autostashPayloadOutputLimit = -1;

const output = await app(input);
output.resultsUrl.should.eql('https://s3-fake.zapier.com/1234/foo.json');
});
it('should not stash if limit is not defined', async () => {
const rpc = makeRpc();
mockRpcGetPresignedPostCall('1234/foo.json');
mockUpload();

const appDefinition = dataTools.deepCopy(exampleAppDefinition);

const app = createApp(appDefinition);

// returns regular response
const input = createTestInput(
'resources.list.list.operation.perform',
appDefinition
);
input._zapier.rpc = rpc;

// omit setting the payload autostash limit

const output = app(input);
output.should.not.have.property('resultsUrl');

// returns 10mb regular response
const bigInputCall = createTestInput(
'resources.really_big_response.list.operation.perform',
appDefinition
);
input._zapier.rpc = rpc;

// omit setting the payload autostash limit

const bigOutput = app(bigInputCall);
bigOutput.should.not.have.property('resultsUrl');
});
});
});
Loading

0 comments on commit 151dadf

Please sign in to comment.