From c7f99b1b78b5f267703d31c89fc8e9ddb9792508 Mon Sep 17 00:00:00 2001 From: "lukasz.wach" Date: Fri, 17 Jan 2025 19:07:57 +0100 Subject: [PATCH 1/8] #25873 add internal throttle to importer --- commands/importer.js | 1 + src/flotiq-api.js | 29 ++++++++++++++++++++++++----- tests/commands/importer.test.js | 2 +- tests/flotiq-api.test.js | 33 +++++++++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 6 deletions(-) create mode 100644 tests/flotiq-api.test.js diff --git a/commands/importer.js b/commands/importer.js index 6ec205a..ff69443 100644 --- a/commands/importer.js +++ b/commands/importer.js @@ -429,6 +429,7 @@ async function handler(argv) { const flotiqApi = new FlotiqApi(`${config.apiUrl}/api/v1`, argv.flotiqApiKey, { batchSize: 100, + internalWPSLimit: 10 }); let [featuredImages, CTDs] = await importer( diff --git a/src/flotiq-api.js b/src/flotiq-api.js index ee2ee48..8d8ad3e 100644 --- a/src/flotiq-api.js +++ b/src/flotiq-api.js @@ -20,6 +20,7 @@ module.exports = class FlotiqApi { this.flotiqApiKey = flotiqApiKey; this.batchSizeRead = options.batchSizeRead || options.batchSize || 1000; this.batchSize = options.batchSize || 100; + this.internalWPSLimit = options.internalWPSLimit || 10; this.headers = { "Content-type": "application/json;charset=utf-8", @@ -115,17 +116,35 @@ module.exports = class FlotiqApi { async persistContentObjectBatch(ctd, obj) { assert(typeof ctd, 'string'); assert(Array.isArray(obj)); + const interval = 1000 / this.internalWPSLimit; const bar = new ProgressBar(`Persisting ${ctd} [:bar] :percent ETA :etas`, { total: obj.length }); const uri = `/content/${ctd}/batch?updateExisting=true`; for (let i = 0; i < obj.length; i += this.batchSize) { const batch = obj.slice(i, i + this.batchSize); - await this.middleware.post(uri, batch).catch(e =>{ - console.dir(e.response.data.errors, { depth: undefined }) - throw new Error(e.message); - }); - bar.tick(this.batchSize) + + const sendRequest = async () => { + try { + await this.middleware.post(uri, batch); + } catch (e) { + if (e.response && e.response.status === 429) { + logger.info(`\nReceived status 429 (Too Many Requests), retrying in 1 second...`); + await new Promise(resolve => setTimeout(resolve, 1000)); + return sendRequest(); + } else { + console.dir(e.response.data.errors, { depth: undefined }); + throw new Error(e.message); + } + } + }; + + await sendRequest(); + bar.tick(this.batchSize); + + if (i + this.batchSize < obj.length) { + await new Promise(resolve => setTimeout(resolve, interval)); + } } } diff --git a/tests/commands/importer.test.js b/tests/commands/importer.test.js index bb27407..3362a25 100644 --- a/tests/commands/importer.test.js +++ b/tests/commands/importer.test.js @@ -78,4 +78,4 @@ describe('importer', () => { expect(FlotiqApi).toHaveBeenCalledWith('https://dummy-api.flotiq.com/api/v1', mockApiKey, expect.any(Object)); }); -}); \ No newline at end of file +}); diff --git a/tests/flotiq-api.test.js b/tests/flotiq-api.test.js new file mode 100644 index 0000000..b15c242 --- /dev/null +++ b/tests/flotiq-api.test.js @@ -0,0 +1,33 @@ +const axios = require('axios'); +const FlotiqApi = require('./../src/flotiq-api'); + +jest.mock('axios'); + +describe('FlotiqApi', () => { + const mockApiUrl = 'https://dummy-api.flotiq.com'; + const mockApiKey = 'dummyApiKey'; + + it('method persistContentObjectBatch should retry when receiving a 429 status', async () => { + // Mock first response from Axios as 429, seconds as 200 + const postMock = jest.fn() + .mockRejectedValueOnce({ response: { status: 429 } }) + .mockResolvedValueOnce({ ok: true }); + + axios.create.mockReturnValue({ + post: postMock, + }); + + const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, { + batchSize: 100, + internalWPSLimit: 5, + }); + + const obj = new Array(100).fill({}); + await flotiqApi.persistContentObjectBatch('mockContentType', obj); + + // Expect first call to be 429, then after retry: success + expect(postMock).toHaveBeenCalledTimes(2); + expect(postMock).toHaveBeenCalledWith(expect.anything(), expect.arrayContaining([{}])); + }); +}); + \ No newline at end of file From dc073314a7b6f25434a8090a198e0442eb1cb2e1 Mon Sep 17 00:00:00 2001 From: "lukasz.wach" Date: Sat, 18 Jan 2025 00:05:17 +0100 Subject: [PATCH 2/8] #25873 add internal throttle to mediaImporter --- commands/importer.js | 6 +++--- src/media.js | 34 +++++++++++++++++++++++++--------- 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/commands/importer.js b/commands/importer.js index ff69443..575aa3e 100644 --- a/commands/importer.js +++ b/commands/importer.js @@ -399,7 +399,7 @@ async function featuredImagesImport(flotiqApi, contentTypeDefinitions, featuredI if (contentTypeDefinition.name === featuredImage.ctdName) { contentTypeDefinition.featuredImage = featuredImage.featuredImage; if (replacements.length) { - await shouldUpdate(contentTypeDefinition, replacements) + await shouldUpdate(contentTypeDefinition, replacements) } let response = await flotiqApi.updateContentTypeDefinition(contentTypeDefinition.name, contentTypeDefinition) .catch((e)=>{return e.response}); @@ -429,7 +429,7 @@ async function handler(argv) { const flotiqApi = new FlotiqApi(`${config.apiUrl}/api/v1`, argv.flotiqApiKey, { batchSize: 100, - internalWPSLimit: 10 + internalWpsLimit: 10 }); let [featuredImages, CTDs] = await importer( @@ -451,7 +451,7 @@ async function handler(argv) { directory, flotiqApi, mediaApi, - featuredImages + internalWpsLimit = 10 ); await featuredImagesImport( diff --git a/src/media.js b/src/media.js index d236b33..02e7530 100644 --- a/src/media.js +++ b/src/media.js @@ -4,7 +4,7 @@ const logger = require('./logger') const { Blob } = require('buffer'); const {readCTDs, shouldUpdate } = require("./util"); -async function mediaImporter (directory, flotiqApi, mediaApi) { +async function mediaImporter (directory, flotiqApi, mediaApi, internalWpsLimit = 10) { const checkIfMediaUsed = true; const flotiqDefinitions = await flotiqApi.fetchContentTypeDefs(); @@ -64,15 +64,9 @@ async function mediaImporter (directory, flotiqApi, mediaApi) { form.append('type', file.type); form.append('file', blob, file.fileName); - const mediaEntity = await mediaApi - .post('', form, { - headers: { - 'Content-Type': 'multipart/form-data', - }, - }) - .then(res => res.data) + const mediaEntity = await postMedia('', form, mediaApi, internalWpsLimit); - replacements.push([file, mediaEntity]) + replacements.push([file, mediaEntity]); } logger.info('Will replace media in content objects') @@ -101,6 +95,28 @@ async function mediaImporter (directory, flotiqApi, mediaApi) { return replacements; } +const postMedia = async (url, form, mediaApi, internalWpsLimit) => { + const interval = 1000 / internalWpsLimit; + + try { + await new Promise(resolve => setTimeout(resolve, interval)); + const response = await mediaApi.post(url, form, { + headers: { 'Content-Type': 'multipart/form-data' } + }); + + return response.data; + } catch (e) { + if (e.response && e.response.status === 429) { + logger.info(`Received 429 on media upload, retrying after 1 second...`); + // Wait for 1 second before retrying + await new Promise(resolve => setTimeout(resolve, 1000)); + return postMedia(url, form, mediaApi, internalWpsLimit); + } else { + throw new Error(e.message); + } + } +} + async function checkIsUsedIn(fileId, objects) { let isUsed = false; for (const relatedContentObject of objects) { From 1a9a80fa1b3b8cc5d08665fd6f832ed7af550fee Mon Sep 17 00:00:00 2001 From: "lukasz.wach" Date: Sat, 18 Jan 2025 00:05:35 +0100 Subject: [PATCH 3/8] #25873 add tests for mediaImporter --- tests/media.test.js | 112 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 tests/media.test.js diff --git a/tests/media.test.js b/tests/media.test.js new file mode 100644 index 0000000..d0e5f4a --- /dev/null +++ b/tests/media.test.js @@ -0,0 +1,112 @@ +const fs = require('fs/promises'); +const fetch = require('node-fetch'); +const axios = require('axios'); +const logger = require('./../src/logger'); +const FlotiqApi = require('./../src/flotiq-api'); +const { mediaImporter } = require('./../src/media'); + +jest.mock('axios'); +jest.mock('fs/promises'); +jest.mock('node-fetch'); +jest.mock('./../src/flotiq-api'); +jest.mock('./../src/logger', () => ({ + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), +})); + +describe('mediaImporter', () => { + const mockDirectory = '/mock/directory'; + const mockApiUrl = 'https://dummy-api.flotiq.com'; + const mockApiKey = 'dummyApiKey'; + + beforeEach(() => { + jest.clearAllMocks(); + + FlotiqApi.mockImplementation(() => ({ + fetchContentTypeDefinition: jest.fn().mockResolvedValue([]), + fetchContentTypeDefs: jest.fn().mockResolvedValue([]), + updateContentTypeDefinition: jest.fn(), + fetchContentObjects: jest.fn().mockResolvedValue([]), + patchContentObjectBatch: jest.fn(), + persistContentObjectBatch: jest.fn(), + createOrUpdate: jest.fn().mockResolvedValue({ + ok: true, + json: jest.fn().mockResolvedValue({success:true}) + }), + middleware: { + put: jest.fn(), + delete: jest.fn().mockResolvedValue(undefined) + } + })); + + global.fetch = jest.fn(() => + Promise.resolve({ + status: 404, // fetch should return 404 for importer to send postMedia request + }) + ); + + // Mock fs.readFile + fs.readFile.mockResolvedValue( + JSON.stringify([ + { + id: 'file1', + url: '/image/0x0/dummy_media_id.jpg', + mimeType: 'image/png', + extension: 'png', + fileName: 'file1.png' + } + ]) + ); + }); + + + + it('should retry on 429 error during media upload', async () => { + const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, { + batchSize: 100, + }); + flotiqApi.flotiqApiUrl = mockApiUrl; + + const mockMediaApi = { + post: jest.fn() + .mockRejectedValueOnce({ + response: { + status: 429, + message: 'Too Many Requests' + } + }) + .mockResolvedValueOnce({ + data: { id: 'new-media-id' } + }) + }; + + await mediaImporter(mockDirectory, flotiqApi, mockMediaApi, 1); + + expect(mockMediaApi.post).toHaveBeenCalledTimes(2); + }); + + it('should respect internalWpsLimit and throttle uploads', async () => { + const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, { + batchSize: 100, + }); + flotiqApi.flotiqApiUrl = mockApiUrl; + + const mockMediaApi = { + post: jest.fn() + .mockResolvedValueOnce({ + data: { id: 'new-media-id' } + }) + }; + + const start = Date.now(); + await mediaImporter(mockDirectory, flotiqApi, mockMediaApi, 1); // internalWpsLimit = 1 + + const end = Date.now(); + const elapsed = end - start; + + expect(mockMediaApi.post).toHaveBeenCalledTimes(1); + // Check that importer respected throttle limit + expect(elapsed).toBeGreaterThanOrEqual(1000); // at least 1 second + }); +}); From 78549bf86572361bd1a19c998043c2e98baf566e Mon Sep 17 00:00:00 2001 From: "lukasz.wach" Date: Sat, 18 Jan 2025 00:06:07 +0100 Subject: [PATCH 4/8] #25873 add interal throttle to batchpatch --- src/flotiq-api.js | 80 ++++++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/src/flotiq-api.js b/src/flotiq-api.js index 8d8ad3e..b33eb41 100644 --- a/src/flotiq-api.js +++ b/src/flotiq-api.js @@ -20,13 +20,15 @@ module.exports = class FlotiqApi { this.flotiqApiKey = flotiqApiKey; this.batchSizeRead = options.batchSizeRead || options.batchSize || 1000; this.batchSize = options.batchSize || 100; - this.internalWPSLimit = options.internalWPSLimit || 10; + this.internalWpsLimit = options.internalWpsLimit || 10; this.headers = { "Content-type": "application/json;charset=utf-8", "X-Auth-Token": this.flotiqApiKey, }; + this.tooManyRequestsMessage = `\nReceived status 429 (Too Many Requests), retrying in 1 second...`; + this.middleware = axios.create({ baseURL: this.flotiqApiUrl, timeout: this.timeout, @@ -116,67 +118,52 @@ module.exports = class FlotiqApi { async persistContentObjectBatch(ctd, obj) { assert(typeof ctd, 'string'); assert(Array.isArray(obj)); - const interval = 1000 / this.internalWPSLimit; + const interval = 1000 / this.internalWpsLimit; const bar = new ProgressBar(`Persisting ${ctd} [:bar] :percent ETA :etas`, { total: obj.length }); const uri = `/content/${ctd}/batch?updateExisting=true`; for (let i = 0; i < obj.length; i += this.batchSize) { const batch = obj.slice(i, i + this.batchSize); + await this._sendRequest(uri, batch, 'POST'); - const sendRequest = async () => { - try { - await this.middleware.post(uri, batch); - } catch (e) { - if (e.response && e.response.status === 429) { - logger.info(`\nReceived status 429 (Too Many Requests), retrying in 1 second...`); - await new Promise(resolve => setTimeout(resolve, 1000)); - return sendRequest(); - } else { - console.dir(e.response.data.errors, { depth: undefined }); - throw new Error(e.message); - } - } - }; - - await sendRequest(); bar.tick(this.batchSize); - if (i + this.batchSize < obj.length) { - await new Promise(resolve => setTimeout(resolve, interval)); - } + await new Promise(resolve => setTimeout(resolve, interval)); } } async patchContentObjectBatch(ctd, obj) { - assert(typeof ctd, 'string'); + assert(typeof ctd === 'string'); assert(Array.isArray(obj)); - + const interval = 1000 / this.internalWpsLimit; + const bar = new ProgressBar(`Updating ${ctd} [:bar] :percent ETA :etas`, { total: obj.length }); const uri = `/content/${ctd}/batch`; - + for (let i = 0; i < obj.length; i += this.batchSize) { const batch = obj.slice(i, i + this.batchSize); - await this.middleware.patch(uri, batch).catch(e =>{ - console.log(e.response.data.errors) - throw new Error(e.message); - }); + await this._sendRequest(uri, batch, 'PATCH'); + bar.tick(this.batchSize); + + await new Promise(resolve => setTimeout(resolve, interval)); } } async deleteContentObjectBatch(ctd, obj) { - assert(typeof ctd, 'string'); + assert(typeof ctd === 'string'); assert(Array.isArray(obj)); - - const uri = `/content/${ctd}/batch-delete` - + const interval = 1000 / this.internalWpsLimit; + + const uri = `/content/${ctd}/batch-delete`; + for (let i = 0; i < obj.length; i += this.batchSize) { - const batch = obj - .slice(i, i + this.batchSize) - .map(item => item.id); - await this.middleware.post(uri, batch) + const batch = obj.slice(i, i + this.batchSize).map(item => item.id); + await this._sendRequest(uri, batch, 'DELETE'); + + await new Promise(resolve => setTimeout(resolve, interval)); } } @@ -305,4 +292,25 @@ module.exports = class FlotiqApi { }) } + async _sendRequest(uri, batch, method) { + try { + switch (method) { + case 'POST': + return await this.middleware.post(uri, batch); + case 'PATCH': + return await this.middleware.patch(uri, batch); + case 'DELETE': + return await this.middleware.post(uri, batch); + } + } catch (e) { + if (e.response && e.response.status === 429) { + logger.info(this.tooManyRequestsMessage); + await new Promise(resolve => setTimeout(resolve, 1000)); // Retry after 1 second + return this._sendRequest(uri, batch, method); // Retry request + } else { + console.dir(e.response.data.errors, { depth: undefined }); + throw new Error(e.message); + } + } + } }; From 5aa882a87a2a1945f66f0cbcc70c9c2e218eaf45 Mon Sep 17 00:00:00 2001 From: "lukasz.wach" Date: Sat, 18 Jan 2025 00:06:17 +0100 Subject: [PATCH 5/8] #25873 add tests for batchpatch throttle --- tests/flotiq-api.test.js | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/tests/flotiq-api.test.js b/tests/flotiq-api.test.js index b15c242..680fcda 100644 --- a/tests/flotiq-api.test.js +++ b/tests/flotiq-api.test.js @@ -19,7 +19,7 @@ describe('FlotiqApi', () => { const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, { batchSize: 100, - internalWPSLimit: 5, + internalWpsLimit: 5, }); const obj = new Array(100).fill({}); @@ -29,5 +29,28 @@ describe('FlotiqApi', () => { expect(postMock).toHaveBeenCalledTimes(2); expect(postMock).toHaveBeenCalledWith(expect.anything(), expect.arrayContaining([{}])); }); + + it('method patchContentObjectBatch should retry when receiving a 429 status', async () => { + // Mock first response from Axios as 429, seconds as 200 + const patchMock = jest.fn() + .mockRejectedValueOnce({ response: { status: 429 } }) + .mockResolvedValueOnce({ ok: true }); + + axios.create.mockReturnValue({ + patch: patchMock, + }); + + const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, { + batchSize: 100, + internalWpsLimit: 5, + }); + + const obj = new Array(100).fill({}); + await flotiqApi.patchContentObjectBatch('mockContentType', obj); + + // Expect first call to be 429, then after retry: success + expect(patchMock).toHaveBeenCalledTimes(2); + expect(patchMock).toHaveBeenCalledWith(expect.anything(), expect.arrayContaining([{}])); + }); }); \ No newline at end of file From 7806db4749a3d1dcb1b5f8f92e6dba344e6ae5ab Mon Sep 17 00:00:00 2001 From: "lukasz.wach" Date: Sat, 18 Jan 2025 00:10:35 +0100 Subject: [PATCH 6/8] #25873 remove some code smells --- tests/flotiq-api.test.js | 1 - tests/media.test.js | 3 --- 2 files changed, 4 deletions(-) diff --git a/tests/flotiq-api.test.js b/tests/flotiq-api.test.js index 680fcda..3cc9503 100644 --- a/tests/flotiq-api.test.js +++ b/tests/flotiq-api.test.js @@ -53,4 +53,3 @@ describe('FlotiqApi', () => { expect(patchMock).toHaveBeenCalledWith(expect.anything(), expect.arrayContaining([{}])); }); }); - \ No newline at end of file diff --git a/tests/media.test.js b/tests/media.test.js index d0e5f4a..1de356a 100644 --- a/tests/media.test.js +++ b/tests/media.test.js @@ -1,7 +1,4 @@ const fs = require('fs/promises'); -const fetch = require('node-fetch'); -const axios = require('axios'); -const logger = require('./../src/logger'); const FlotiqApi = require('./../src/flotiq-api'); const { mediaImporter } = require('./../src/media'); From ecdccdaafa03cd164ee2be99a826b5a99e517784 Mon Sep 17 00:00:00 2001 From: "lukasz.wach" Date: Sat, 18 Jan 2025 00:33:58 +0100 Subject: [PATCH 7/8] remove trailing space --- src/flotiq-api.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flotiq-api.js b/src/flotiq-api.js index b33eb41..591732a 100644 --- a/src/flotiq-api.js +++ b/src/flotiq-api.js @@ -141,7 +141,7 @@ module.exports = class FlotiqApi { const bar = new ProgressBar(`Updating ${ctd} [:bar] :percent ETA :etas`, { total: obj.length }); const uri = `/content/${ctd}/batch`; - + for (let i = 0; i < obj.length; i += this.batchSize) { const batch = obj.slice(i, i + this.batchSize); await this._sendRequest(uri, batch, 'PATCH'); From 7cfe778f21f7d86cf243faa3e72fd9cb626ed6b6 Mon Sep 17 00:00:00 2001 From: "lukasz.wach" Date: Mon, 27 Jan 2025 10:54:35 +0100 Subject: [PATCH 8/8] #25873 fix cr issues --- commands/importer.js | 4 +-- src/flotiq-api.js | 71 ++++++++++++++++------------------------ src/media.js | 10 +++--- tests/flotiq-api.test.js | 4 +-- tests/media.test.js | 4 +-- 5 files changed, 39 insertions(+), 54 deletions(-) diff --git a/commands/importer.js b/commands/importer.js index 575aa3e..fb4a02e 100644 --- a/commands/importer.js +++ b/commands/importer.js @@ -429,7 +429,7 @@ async function handler(argv) { const flotiqApi = new FlotiqApi(`${config.apiUrl}/api/v1`, argv.flotiqApiKey, { batchSize: 100, - internalWpsLimit: 10 + writePerSecondLimit: 10 }); let [featuredImages, CTDs] = await importer( @@ -451,7 +451,7 @@ async function handler(argv) { directory, flotiqApi, mediaApi, - internalWpsLimit = 10 + writePerSecondLimit = 10 ); await featuredImagesImport( diff --git a/src/flotiq-api.js b/src/flotiq-api.js index 591732a..887e4af 100644 --- a/src/flotiq-api.js +++ b/src/flotiq-api.js @@ -20,7 +20,7 @@ module.exports = class FlotiqApi { this.flotiqApiKey = flotiqApiKey; this.batchSizeRead = options.batchSizeRead || options.batchSize || 1000; this.batchSize = options.batchSize || 100; - this.internalWpsLimit = options.internalWpsLimit || 10; + this.interval = 1000 / (options.writePerSecondLimit || 10); this.headers = { "Content-type": "application/json;charset=utf-8", @@ -118,53 +118,31 @@ module.exports = class FlotiqApi { async persistContentObjectBatch(ctd, obj) { assert(typeof ctd, 'string'); assert(Array.isArray(obj)); - const interval = 1000 / this.internalWpsLimit; const bar = new ProgressBar(`Persisting ${ctd} [:bar] :percent ETA :etas`, { total: obj.length }); const uri = `/content/${ctd}/batch?updateExisting=true`; - for (let i = 0; i < obj.length; i += this.batchSize) { - const batch = obj.slice(i, i + this.batchSize); - await this._sendRequest(uri, batch, 'POST'); - - bar.tick(this.batchSize); - - await new Promise(resolve => setTimeout(resolve, interval)); - } + await this._sendRequest(uri, obj, 'POST', bar); } async patchContentObjectBatch(ctd, obj) { assert(typeof ctd === 'string'); assert(Array.isArray(obj)); - const interval = 1000 / this.internalWpsLimit; const bar = new ProgressBar(`Updating ${ctd} [:bar] :percent ETA :etas`, { total: obj.length }); const uri = `/content/${ctd}/batch`; - for (let i = 0; i < obj.length; i += this.batchSize) { - const batch = obj.slice(i, i + this.batchSize); - await this._sendRequest(uri, batch, 'PATCH'); - - bar.tick(this.batchSize); - - await new Promise(resolve => setTimeout(resolve, interval)); - } + await this._sendRequest(uri, obj, 'PATCH', bar); } async deleteContentObjectBatch(ctd, obj) { assert(typeof ctd === 'string'); assert(Array.isArray(obj)); - const interval = 1000 / this.internalWpsLimit; const uri = `/content/${ctd}/batch-delete`; - - for (let i = 0; i < obj.length; i += this.batchSize) { - const batch = obj.slice(i, i + this.batchSize).map(item => item.id); - await this._sendRequest(uri, batch, 'DELETE'); - await new Promise(resolve => setTimeout(resolve, interval)); - } + await this._sendRequest(uri, obj, 'DELETE'); } @@ -292,25 +270,32 @@ module.exports = class FlotiqApi { }) } - async _sendRequest(uri, batch, method) { - try { - switch (method) { - case 'POST': - return await this.middleware.post(uri, batch); - case 'PATCH': - return await this.middleware.patch(uri, batch); - case 'DELETE': - return await this.middleware.post(uri, batch); + async _sendRequest(uri, obj, method, bar) { + for (let i = 0; i < obj.length; i += this.batchSize) { + const batch = obj.slice(i, i + this.batchSize); + const actions = { + POST: async () => await this.middleware.post(uri, batch), + PATCH: async () => await this.middleware.patch(uri, batch), + DELETE: async () => await this.middleware.post(uri, batch), + }; + + try { + await actions[method](); + } catch (e) { + if (e.response && e.response.status === 429) { + logger.info(this.tooManyRequestsMessage); + await new Promise(resolve => setTimeout(resolve, 1000)); // Retry after 1 second + return this._sendRequest(uri, batch, method); // Retry request + } else { + console.dir(e.response.data.errors, { depth: undefined }); + throw new Error(e.message); + } } - } catch (e) { - if (e.response && e.response.status === 429) { - logger.info(this.tooManyRequestsMessage); - await new Promise(resolve => setTimeout(resolve, 1000)); // Retry after 1 second - return this._sendRequest(uri, batch, method); // Retry request - } else { - console.dir(e.response.data.errors, { depth: undefined }); - throw new Error(e.message); + if (bar) { + bar.tick(this.batchSize); } + + await new Promise(resolve => setTimeout(resolve, this.interval)); } } }; diff --git a/src/media.js b/src/media.js index 02e7530..ba80925 100644 --- a/src/media.js +++ b/src/media.js @@ -4,7 +4,7 @@ const logger = require('./logger') const { Blob } = require('buffer'); const {readCTDs, shouldUpdate } = require("./util"); -async function mediaImporter (directory, flotiqApi, mediaApi, internalWpsLimit = 10) { +async function mediaImporter (directory, flotiqApi, mediaApi, writePerSecondLimit = 10) { const checkIfMediaUsed = true; const flotiqDefinitions = await flotiqApi.fetchContentTypeDefs(); @@ -64,7 +64,7 @@ async function mediaImporter (directory, flotiqApi, mediaApi, internalWpsLimit = form.append('type', file.type); form.append('file', blob, file.fileName); - const mediaEntity = await postMedia('', form, mediaApi, internalWpsLimit); + const mediaEntity = await postMedia('', form, mediaApi, writePerSecondLimit); replacements.push([file, mediaEntity]); } @@ -95,8 +95,8 @@ async function mediaImporter (directory, flotiqApi, mediaApi, internalWpsLimit = return replacements; } -const postMedia = async (url, form, mediaApi, internalWpsLimit) => { - const interval = 1000 / internalWpsLimit; +const postMedia = async (url, form, mediaApi, writePerSecondLimit) => { + const interval = 1000 / writePerSecondLimit; try { await new Promise(resolve => setTimeout(resolve, interval)); @@ -110,7 +110,7 @@ const postMedia = async (url, form, mediaApi, internalWpsLimit) => { logger.info(`Received 429 on media upload, retrying after 1 second...`); // Wait for 1 second before retrying await new Promise(resolve => setTimeout(resolve, 1000)); - return postMedia(url, form, mediaApi, internalWpsLimit); + return postMedia(url, form, mediaApi, writePerSecondLimit); } else { throw new Error(e.message); } diff --git a/tests/flotiq-api.test.js b/tests/flotiq-api.test.js index 3cc9503..d9ff271 100644 --- a/tests/flotiq-api.test.js +++ b/tests/flotiq-api.test.js @@ -19,7 +19,7 @@ describe('FlotiqApi', () => { const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, { batchSize: 100, - internalWpsLimit: 5, + writePerSecondLimit: 5, }); const obj = new Array(100).fill({}); @@ -42,7 +42,7 @@ describe('FlotiqApi', () => { const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, { batchSize: 100, - internalWpsLimit: 5, + writePerSecondLimit: 5, }); const obj = new Array(100).fill({}); diff --git a/tests/media.test.js b/tests/media.test.js index 1de356a..082b2fa 100644 --- a/tests/media.test.js +++ b/tests/media.test.js @@ -83,7 +83,7 @@ describe('mediaImporter', () => { expect(mockMediaApi.post).toHaveBeenCalledTimes(2); }); - it('should respect internalWpsLimit and throttle uploads', async () => { + it('should respect writePerSecondLimit and throttle uploads', async () => { const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, { batchSize: 100, }); @@ -97,7 +97,7 @@ describe('mediaImporter', () => { }; const start = Date.now(); - await mediaImporter(mockDirectory, flotiqApi, mockMediaApi, 1); // internalWpsLimit = 1 + await mediaImporter(mockDirectory, flotiqApi, mockMediaApi, 1); // writePerSecondLimit = 1 const end = Date.now(); const elapsed = end - start;