diff --git a/packages/core/src/app-middlewares/after/large-response-cacher.js b/packages/core/src/app-middlewares/after/large-response-cacher.js index 63d40930c..db4cf60c7 100644 --- a/packages/core/src/app-middlewares/after/large-response-cacher.js +++ b/packages/core/src/app-middlewares/after/large-response-cacher.js @@ -2,19 +2,28 @@ const constants = require('../../constants'); const cleaner = require('../../tools/cleaner'); +const responseStasher = require('../../tools/create-response-stasher'); -/* - TODO: Some services _do not_ enjoy having 6mb+ responses returned, so - we may need to user/request pre-signed S3 URLs (or somewhere else?) - to stash the large response, and return a pointer. -*/ -const largeResponseCachePointer = (output) => { - const size = JSON.stringify(cleaner.maskOutput(output)).length; - if (size > constants.RESPONSE_SIZE_LIMIT) { - console.log( - `Oh no! Payload is ${size}, which is larger than ${constants.RESPONSE_SIZE_LIMIT}.` - ); - // TODO: use envelope feature and to build RPC to get signed S3 upload URL. +const largeResponseCachePointer = async (output) => { + const response = cleaner.maskOutput(output); + + const autostashLimit = output.input._zapier.event.autostashPayloadOutputLimit; + + const payload = JSON.stringify(response.results); + const size = payload.length; + + // If autostash limit is defined, and is within the range, stash the response + // If it is -1, stash the response regardless of size + // If the limit is defined and is out of range, let lambda deal with it + if ( + (autostashLimit && + size >= constants.RESPONSE_SIZE_LIMIT && + size <= autostashLimit) || + autostashLimit === -1 + ) { + const url = await responseStasher(output.input, payload, size); + output.resultsUrl = url; + output.results = Array.isArray(output.results) ? [] : {}; } return output; }; diff --git a/packages/core/src/tools/cleaner.js b/packages/core/src/tools/cleaner.js index 67a5e9c8d..b2b79c7ed 100644 --- a/packages/core/src/tools/cleaner.js +++ b/packages/core/src/tools/cleaner.js @@ -130,7 +130,8 @@ const createBundleBank = (appRaw, event = {}, serializeFunc = (x) => x) => { }, {}); }; -const maskOutput = (output) => _.pick(output, 'results', 'status'); +const maskOutput = (output) => + _.pick(output, 'results', 'status', 'resultsUrl'); // These normalize functions are called after the initial before middleware that // cleans the request. The reason is that we need to know why a value is empty diff --git a/packages/core/src/tools/create-file-stasher.js b/packages/core/src/tools/create-file-stasher.js index 4db31100d..f67b9d8a5 100644 --- a/packages/core/src/tools/create-file-stasher.js +++ b/packages/core/src/tools/create-file-stasher.js @@ -9,15 +9,11 @@ const { randomBytes } = require('crypto'); const _ = require('lodash'); const contentDisposition = require('content-disposition'); -const FormData = require('form-data'); + const mime = require('mime-types'); -const request = require('./request-client-internal'); const { UPLOAD_MAX_SIZE, NON_STREAM_UPLOAD_MAX_SIZE } = require('../constants'); - -const LENGTH_ERR_MESSAGE = - 'We could not calculate the length of your file - please ' + - 'pass a knownLength like z.stashFile(f, knownLength)'; +const uploader = require('./uploader'); const DEFAULT_FILE_NAME = 'unnamedfile'; const DEFAULT_CONTENT_TYPE = 'application/octet-stream'; @@ -175,64 +171,6 @@ const resolveToBufferStringStream = async (responseOrData) => { ); }; -const uploader = async ( - signedPostData, - bufferStringStream, - knownLength, - filename, - contentType -) => { - filename = path.basename(filename).replace('"', ''); - - const fields = { - ...signedPostData.fields, - 'Content-Disposition': contentDisposition(filename), - 'Content-Type': contentType, - }; - - const form = new FormData(); - - Object.entries(fields).forEach(([key, value]) => { - form.append(key, value); - }); - - form.append('file', bufferStringStream, { - knownLength, - contentType, - filename, - }); - - // Try to catch the missing length early, before upload to S3 fails. - try { - form.getLengthSync(); - } catch (err) { - throw new Error(LENGTH_ERR_MESSAGE); - } - - // Send to S3 with presigned request. - const response = await request({ - url: signedPostData.url, - method: 'POST', - body: form, - }); - - if (response.status === 204) { - return new URL(signedPostData.fields.key, signedPostData.url).href; - } - - if ( - response.content && - response.content.includes && - response.content.includes( - 'You must provide the Content-Length HTTP header.' - ) - ) { - throw new Error(LENGTH_ERR_MESSAGE); - } - - throw new Error(`Got ${response.status} - ${response.content}`); -}; - const ensureUploadMaxSizeNotExceeded = (streamOrData, length) => { let uploadMaxSize = NON_STREAM_UPLOAD_MAX_SIZE; let uploadMethod = 'non-streaming'; @@ -242,7 +180,9 @@ const ensureUploadMaxSizeNotExceeded = (streamOrData, length) => { } if (length && length > uploadMaxSize) { - throw new Error(`${length} bytes is too big, ${uploadMaxSize} is the max for ${uploadMethod} data.`); + throw new Error( + `${length} bytes is too big, ${uploadMaxSize} is the max for ${uploadMethod} data.` + ); } }; diff --git a/packages/core/src/tools/create-response-stasher.js b/packages/core/src/tools/create-response-stasher.js new file mode 100644 index 000000000..93782126d --- /dev/null +++ b/packages/core/src/tools/create-response-stasher.js @@ -0,0 +1,40 @@ +'use strict'; + +const _ = require('lodash'); +const uploader = require('./uploader'); +const crypto = require('crypto'); + +const withRetry = async (fn, retries = 3, delay = 100, attempt = 0) => { + try { + return await fn(); + } catch (error) { + if (attempt >= retries) { + throw error; + } + + await new Promise((resolve) => setTimeout(resolve, delay)); + return withRetry(fn, retries, delay, attempt + 1); + } +}; + +// responseStasher uploads the data and returns the URL that points to that data. +const stashResponse = async (input, response, size) => { + const rpc = _.get(input, '_zapier.rpc'); + + if (!rpc) { + throw new Error('rpc is not available'); + } + const signedPostData = await rpc('get_presigned_upload_post_data'); + return withRetry( + _.partial( + uploader, + signedPostData, + response.toString(), // accept JSON string to send to uploader. + size, + crypto.randomUUID() + '.txt', + 'text/plain' + ) + ); +}; + +module.exports = stashResponse; diff --git a/packages/core/src/tools/uploader.js b/packages/core/src/tools/uploader.js new file mode 100644 index 000000000..32d7d9117 --- /dev/null +++ b/packages/core/src/tools/uploader.js @@ -0,0 +1,69 @@ +const path = require('path'); + +const FormData = require('form-data'); +const contentDisposition = require('content-disposition'); + +const request = require('./request-client-internal'); +const LENGTH_ERR_MESSAGE = + 'We could not calculate the length of your file - please ' + + 'pass a knownLength like z.stashFile(f, knownLength)'; + +const uploader = async ( + signedPostData, + bufferStringStream, + knownLength, + filename, + contentType +) => { + filename = path.basename(filename).replace('"', ''); + + const fields = { + ...signedPostData.fields, + 'Content-Disposition': contentDisposition(filename), + 'Content-Type': contentType, + }; + + const form = new FormData(); + + Object.entries(fields).forEach(([key, value]) => { + form.append(key, value); + }); + + form.append('file', bufferStringStream, { + knownLength, + contentType, + filename, + }); + + // Try to catch the missing length early, before upload to S3 fails. + try { + form.getLengthSync(); + } catch (err) { + throw new Error(LENGTH_ERR_MESSAGE); + } + + // Send to S3 with presigned request. + const response = await request({ + url: signedPostData.url, + method: 'POST', + body: form, + }); + + if (response.status === 204) { + return new URL(signedPostData.fields.key, signedPostData.url).href; + } + + if ( + response.content && + response.content.includes && + response.content.includes( + 'You must provide the Content-Length HTTP header.' + ) + ) { + throw new Error(LENGTH_ERR_MESSAGE); + } + + throw new Error(`Got ${response.status} - ${response.content}`); +}; + +module.exports = uploader; diff --git a/packages/core/test/app-middleware.js b/packages/core/test/app-middleware.js index 82a81e231..5f7071076 100644 --- a/packages/core/test/app-middleware.js +++ b/packages/core/test/app-middleware.js @@ -1,8 +1,12 @@ 'use strict'; - const createApp = require('../src/create-app'); const createInput = require('../src/tools/create-input'); const dataTools = require('../src/tools/data'); +const { + makeRpc, + mockRpcGetPresignedPostCall, + mockUpload, +} = require('./tools/mocky'); const exampleAppDefinition = require('./userapp'); describe('app middleware', () => { @@ -65,4 +69,109 @@ describe('app middleware', () => { }) .catch(done); }); + + describe('large-response-cacher', async () => { + it('after middleware should stash large payloads', async () => { + const rpc = makeRpc(); + mockRpcGetPresignedPostCall('1234/foo.json'); + mockUpload(); + + const appDefinition = dataTools.deepCopy(exampleAppDefinition); + + const app = createApp(appDefinition); + + // We are gonna invoke this method, but the after middleware is gonna + // change the result returned to something else + const input = createTestInput( + 'resources.really_big_response.list.operation.perform', + appDefinition + ); + input._zapier.rpc = rpc; + + // set the payload autostash limit + input._zapier.event.autostashPayloadOutputLimit = 11 * 1024 * 1024; + + const output = await app(input); + output.resultsUrl.should.eql('https://s3-fake.zapier.com/1234/foo.json'); + }); + it('should not stash if payload is bigger than autostash limit', async () => { + const rpc = makeRpc(); + mockRpcGetPresignedPostCall('1234/foo.json'); + mockUpload(); + + const appDefinition = dataTools.deepCopy(exampleAppDefinition); + + const app = createApp(appDefinition); + + // returns 10mb of response + const input = createTestInput( + 'resources.really_big_response.list.operation.perform', + appDefinition + ); + input._zapier.rpc = rpc; + + // set the payload autostash limit + // this limit is lower than res, so do not stash, let it fail + input._zapier.event.autostashPayloadOutputLimit = 8 * 1024 * 1024; + + const output = app(input); + output.should.not.have.property('resultsUrl'); + }); + it('should always stash if autostash limit is -1', async () => { + const rpc = makeRpc(); + mockRpcGetPresignedPostCall('1234/foo.json'); + mockUpload(); + + const appDefinition = dataTools.deepCopy(exampleAppDefinition); + + const app = createApp(appDefinition); + + // returns regular response + const input = createTestInput( + 'resources.list.list.operation.perform', + appDefinition + ); + input._zapier.rpc = rpc; + + // set the payload autostash limit + // this limit is lower than res, so do not stash, let it fail + input._zapier.event.autostashPayloadOutputLimit = -1; + + const output = await app(input); + output.resultsUrl.should.eql('https://s3-fake.zapier.com/1234/foo.json'); + }); + it('should not stash if limit is not defined', async () => { + const rpc = makeRpc(); + mockRpcGetPresignedPostCall('1234/foo.json'); + mockUpload(); + + const appDefinition = dataTools.deepCopy(exampleAppDefinition); + + const app = createApp(appDefinition); + + // returns regular response + const input = createTestInput( + 'resources.list.list.operation.perform', + appDefinition + ); + input._zapier.rpc = rpc; + + // omit setting the payload autostash limit + + const output = app(input); + output.should.not.have.property('resultsUrl'); + + // returns 10mb regular response + const bigInputCall = createTestInput( + 'resources.really_big_response.list.operation.perform', + appDefinition + ); + input._zapier.rpc = rpc; + + // omit setting the payload autostash limit + + const bigOutput = app(bigInputCall); + bigOutput.should.not.have.property('resultsUrl'); + }); + }); }); diff --git a/packages/core/test/tools/response-stasher.js b/packages/core/test/tools/response-stasher.js new file mode 100644 index 000000000..e79eff81f --- /dev/null +++ b/packages/core/test/tools/response-stasher.js @@ -0,0 +1,77 @@ +'use strict'; + +const should = require('should'); + +const { HTTPBIN_URL } = require('../constants'); +const { + FAKE_S3_URL, + makeRpc, + mockRpcGetPresignedPostCall, + mockUpload, +} = require('./mocky'); + +const stashResponse = require('../../src/tools/create-response-stasher'); +const createAppRequestClient = require('../../src/tools/create-app-request-client'); +const createInput = require('../../src/tools/create-input'); +const nock = require('nock'); + +describe('stash response', () => { + const testLogger = () => Promise.resolve({}); + const rpc = makeRpc(); + + const input = createInput({}, {}, testLogger, [], rpc); + + const request = createAppRequestClient(input); + + beforeEach(() => { + nock.cleanAll(); + }); + + it('should upload json response', async () => { + mockRpcGetPresignedPostCall('1234/foo.json'); + mockUpload(); + + const response = await request(`${HTTPBIN_URL}/json`); + const data = JSON.stringify(response.json); + const url = await stashResponse(input, data); + should(url).eql(`${FAKE_S3_URL}/1234/foo.json`); + + const s3Response = await request({ url }); + should(s3Response.getHeader('content-type')).startWith('text/plain'); + + should(s3Response.content).eql(data); + }); + it('retries on failure', async () => { + mockRpcGetPresignedPostCall('1234/foo.json'); + + // Mock fail the first upload, then succeed + nock(FAKE_S3_URL).post('/').reply(500, 'uh oh'); + mockUpload(); + + const response = await request(`${HTTPBIN_URL}/json`); + const data = JSON.stringify(response.json); + const url = await stashResponse(input, data); + should(url).eql(`${FAKE_S3_URL}/1234/foo.json`); + + const s3Response = await request({ url }); + should(s3Response.content).eql(data); + }); + it('throws on persistent failures', async () => { + mockRpcGetPresignedPostCall('1234/foo.json'); + + // Mock fail the first upload, then succeed + nock(FAKE_S3_URL) + .post('/') + .reply(500, 'uh oh') + .post('/') + .reply(500, 'uh oh') + .post('/') + .reply(500, 'uh oh') + .post('/') + .reply(500, 'uh oh'); + + const response = await request(`${HTTPBIN_URL}/json`); + const data = JSON.stringify(response.json); + await stashResponse(input, data).should.be.rejectedWith(/uh oh/); + }); +}); diff --git a/packages/core/test/userapp/index.js b/packages/core/test/userapp/index.js index cee86c212..063e5b151 100644 --- a/packages/core/test/userapp/index.js +++ b/packages/core/test/userapp/index.js @@ -582,6 +582,32 @@ const BadCallback = { }, }; +const createLargeResponse = (targetSizeInMB) => { + const targetSize = targetSizeInMB * 1024 * 1024; // Convert MB to bytes + const sampleData = { + id: 1, + data: 'a'.repeat(targetSize), + }; + return sampleData; +}; + +// 10mb of data +const ReallyBigResponse = { + key: 'really_big_response', + noun: 'Really Big Response', + list: { + display: { + label: 'Really Big Response', + description: 'This is a really big response.', + }, + operation: { + perform: (z, bundle) => { + return createLargeResponse(10); + }, + }, + }, +}; + // custom HTTP middlewares ///// /* @@ -623,6 +649,7 @@ const App = { [ExecuteCallbackRequest.key]: ExecuteCallbackRequest, [EnvironmentVariable.key]: EnvironmentVariable, [BadCallback.key]: BadCallback, + [ReallyBigResponse.key]: ReallyBigResponse, }, hydrators: { getBigStuff: () => {},