diff --git a/commands/importer.js b/commands/importer.js index 6ec205a..fb4a02e 100644 --- a/commands/importer.js +++ b/commands/importer.js @@ -399,7 +399,7 @@ async function featuredImagesImport(flotiqApi, contentTypeDefinitions, featuredI if (contentTypeDefinition.name === featuredImage.ctdName) { contentTypeDefinition.featuredImage = featuredImage.featuredImage; if (replacements.length) { - await shouldUpdate(contentTypeDefinition, replacements) + await shouldUpdate(contentTypeDefinition, replacements) } let response = await flotiqApi.updateContentTypeDefinition(contentTypeDefinition.name, contentTypeDefinition) .catch((e)=>{return e.response}); @@ -429,6 +429,7 @@ async function handler(argv) { const flotiqApi = new FlotiqApi(`${config.apiUrl}/api/v1`, argv.flotiqApiKey, { batchSize: 100, + writePerSecondLimit: 10 }); let [featuredImages, CTDs] = await importer( @@ -450,7 +451,7 @@ async function handler(argv) { directory, flotiqApi, mediaApi, - featuredImages + writePerSecondLimit = 10 ); await featuredImagesImport( diff --git a/src/flotiq-api.js b/src/flotiq-api.js index ee2ee48..887e4af 100644 --- a/src/flotiq-api.js +++ b/src/flotiq-api.js @@ -20,12 +20,15 @@ module.exports = class FlotiqApi { this.flotiqApiKey = flotiqApiKey; this.batchSizeRead = options.batchSizeRead || options.batchSize || 1000; this.batchSize = options.batchSize || 100; + this.interval = 1000 / (options.writePerSecondLimit || 10); this.headers = { "Content-type": "application/json;charset=utf-8", "X-Auth-Token": this.flotiqApiKey, }; + this.tooManyRequestsMessage = `\nReceived status 429 (Too Many Requests), retrying in 1 second...`; + this.middleware = axios.create({ baseURL: this.flotiqApiUrl, timeout: this.timeout, @@ -119,46 +122,27 @@ module.exports = class FlotiqApi { const bar = new ProgressBar(`Persisting ${ctd} [:bar] :percent ETA :etas`, { total: obj.length }); const uri = `/content/${ctd}/batch?updateExisting=true`; - for (let i = 0; i < obj.length; i += this.batchSize) { - const batch = obj.slice(i, i + this.batchSize); - await this.middleware.post(uri, batch).catch(e =>{ - console.dir(e.response.data.errors, { depth: undefined }) - throw new Error(e.message); - }); - bar.tick(this.batchSize) - } + await this._sendRequest(uri, obj, 'POST', bar); } async patchContentObjectBatch(ctd, obj) { - assert(typeof ctd, 'string'); + assert(typeof ctd === 'string'); assert(Array.isArray(obj)); - + const bar = new ProgressBar(`Updating ${ctd} [:bar] :percent ETA :etas`, { total: obj.length }); const uri = `/content/${ctd}/batch`; - for (let i = 0; i < obj.length; i += this.batchSize) { - const batch = obj.slice(i, i + this.batchSize); - await this.middleware.patch(uri, batch).catch(e =>{ - console.log(e.response.data.errors) - throw new Error(e.message); - }); - bar.tick(this.batchSize); - } + await this._sendRequest(uri, obj, 'PATCH', bar); } async deleteContentObjectBatch(ctd, obj) { - assert(typeof ctd, 'string'); + assert(typeof ctd === 'string'); assert(Array.isArray(obj)); + + const uri = `/content/${ctd}/batch-delete`; - const uri = `/content/${ctd}/batch-delete` - - for (let i = 0; i < obj.length; i += this.batchSize) { - const batch = obj - .slice(i, i + this.batchSize) - .map(item => item.id); - await this.middleware.post(uri, batch) - } + await this._sendRequest(uri, obj, 'DELETE'); } @@ -286,4 +270,32 @@ module.exports = class FlotiqApi { }) } + async _sendRequest(uri, obj, method, bar) { + for (let i = 0; i < obj.length; i += this.batchSize) { + const batch = obj.slice(i, i + this.batchSize); + const actions = { + POST: async () => await this.middleware.post(uri, batch), + PATCH: async () => await this.middleware.patch(uri, batch), + DELETE: async () => await this.middleware.post(uri, batch), + }; + + try { + await actions[method](); + } catch (e) { + if (e.response && e.response.status === 429) { + logger.info(this.tooManyRequestsMessage); + await new Promise(resolve => setTimeout(resolve, 1000)); // Retry after 1 second + return this._sendRequest(uri, batch, method); // Retry request + } else { + console.dir(e.response.data.errors, { depth: undefined }); + throw new Error(e.message); + } + } + if (bar) { + bar.tick(this.batchSize); + } + + await new Promise(resolve => setTimeout(resolve, this.interval)); + } + } }; diff --git a/src/media.js b/src/media.js index d236b33..ba80925 100644 --- a/src/media.js +++ b/src/media.js @@ -4,7 +4,7 @@ const logger = require('./logger') const { Blob } = require('buffer'); const {readCTDs, shouldUpdate } = require("./util"); -async function mediaImporter (directory, flotiqApi, mediaApi) { +async function mediaImporter (directory, flotiqApi, mediaApi, writePerSecondLimit = 10) { const checkIfMediaUsed = true; const flotiqDefinitions = await flotiqApi.fetchContentTypeDefs(); @@ -64,15 +64,9 @@ async function mediaImporter (directory, flotiqApi, mediaApi) { form.append('type', file.type); form.append('file', blob, file.fileName); - const mediaEntity = await mediaApi - .post('', form, { - headers: { - 'Content-Type': 'multipart/form-data', - }, - }) - .then(res => res.data) + const mediaEntity = await postMedia('', form, mediaApi, writePerSecondLimit); - replacements.push([file, mediaEntity]) + replacements.push([file, mediaEntity]); } logger.info('Will replace media in content objects') @@ -101,6 +95,28 @@ async function mediaImporter (directory, flotiqApi, mediaApi) { return replacements; } +const postMedia = async (url, form, mediaApi, writePerSecondLimit) => { + const interval = 1000 / writePerSecondLimit; + + try { + await new Promise(resolve => setTimeout(resolve, interval)); + const response = await mediaApi.post(url, form, { + headers: { 'Content-Type': 'multipart/form-data' } + }); + + return response.data; + } catch (e) { + if (e.response && e.response.status === 429) { + logger.info(`Received 429 on media upload, retrying after 1 second...`); + // Wait for 1 second before retrying + await new Promise(resolve => setTimeout(resolve, 1000)); + return postMedia(url, form, mediaApi, writePerSecondLimit); + } else { + throw new Error(e.message); + } + } +} + async function checkIsUsedIn(fileId, objects) { let isUsed = false; for (const relatedContentObject of objects) { diff --git a/tests/commands/importer.test.js b/tests/commands/importer.test.js index bb27407..3362a25 100644 --- a/tests/commands/importer.test.js +++ b/tests/commands/importer.test.js @@ -78,4 +78,4 @@ describe('importer', () => { expect(FlotiqApi).toHaveBeenCalledWith('https://dummy-api.flotiq.com/api/v1', mockApiKey, expect.any(Object)); }); -}); \ No newline at end of file +}); diff --git a/tests/flotiq-api.test.js b/tests/flotiq-api.test.js new file mode 100644 index 0000000..d9ff271 --- /dev/null +++ b/tests/flotiq-api.test.js @@ -0,0 +1,55 @@ +const axios = require('axios'); +const FlotiqApi = require('./../src/flotiq-api'); + +jest.mock('axios'); + +describe('FlotiqApi', () => { + const mockApiUrl = 'https://dummy-api.flotiq.com'; + const mockApiKey = 'dummyApiKey'; + + it('method persistContentObjectBatch should retry when receiving a 429 status', async () => { + // Mock first response from Axios as 429, seconds as 200 + const postMock = jest.fn() + .mockRejectedValueOnce({ response: { status: 429 } }) + .mockResolvedValueOnce({ ok: true }); + + axios.create.mockReturnValue({ + post: postMock, + }); + + const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, { + batchSize: 100, + writePerSecondLimit: 5, + }); + + const obj = new Array(100).fill({}); + await flotiqApi.persistContentObjectBatch('mockContentType', obj); + + // Expect first call to be 429, then after retry: success + expect(postMock).toHaveBeenCalledTimes(2); + expect(postMock).toHaveBeenCalledWith(expect.anything(), expect.arrayContaining([{}])); + }); + + it('method patchContentObjectBatch should retry when receiving a 429 status', async () => { + // Mock first response from Axios as 429, seconds as 200 + const patchMock = jest.fn() + .mockRejectedValueOnce({ response: { status: 429 } }) + .mockResolvedValueOnce({ ok: true }); + + axios.create.mockReturnValue({ + patch: patchMock, + }); + + const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, { + batchSize: 100, + writePerSecondLimit: 5, + }); + + const obj = new Array(100).fill({}); + await flotiqApi.patchContentObjectBatch('mockContentType', obj); + + // Expect first call to be 429, then after retry: success + expect(patchMock).toHaveBeenCalledTimes(2); + expect(patchMock).toHaveBeenCalledWith(expect.anything(), expect.arrayContaining([{}])); + }); +}); diff --git a/tests/media.test.js b/tests/media.test.js new file mode 100644 index 0000000..082b2fa --- /dev/null +++ b/tests/media.test.js @@ -0,0 +1,109 @@ +const fs = require('fs/promises'); +const FlotiqApi = require('./../src/flotiq-api'); +const { mediaImporter } = require('./../src/media'); + +jest.mock('axios'); +jest.mock('fs/promises'); +jest.mock('node-fetch'); +jest.mock('./../src/flotiq-api'); +jest.mock('./../src/logger', () => ({ + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), +})); + +describe('mediaImporter', () => { + const mockDirectory = '/mock/directory'; + const mockApiUrl = 'https://dummy-api.flotiq.com'; + const mockApiKey = 'dummyApiKey'; + + beforeEach(() => { + jest.clearAllMocks(); + + FlotiqApi.mockImplementation(() => ({ + fetchContentTypeDefinition: jest.fn().mockResolvedValue([]), + fetchContentTypeDefs: jest.fn().mockResolvedValue([]), + updateContentTypeDefinition: jest.fn(), + fetchContentObjects: jest.fn().mockResolvedValue([]), + patchContentObjectBatch: jest.fn(), + persistContentObjectBatch: jest.fn(), + createOrUpdate: jest.fn().mockResolvedValue({ + ok: true, + json: jest.fn().mockResolvedValue({success:true}) + }), + middleware: { + put: jest.fn(), + delete: jest.fn().mockResolvedValue(undefined) + } + })); + + global.fetch = jest.fn(() => + Promise.resolve({ + status: 404, // fetch should return 404 for importer to send postMedia request + }) + ); + + // Mock fs.readFile + fs.readFile.mockResolvedValue( + JSON.stringify([ + { + id: 'file1', + url: '/image/0x0/dummy_media_id.jpg', + mimeType: 'image/png', + extension: 'png', + fileName: 'file1.png' + } + ]) + ); + }); + + + + it('should retry on 429 error during media upload', async () => { + const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, { + batchSize: 100, + }); + flotiqApi.flotiqApiUrl = mockApiUrl; + + const mockMediaApi = { + post: jest.fn() + .mockRejectedValueOnce({ + response: { + status: 429, + message: 'Too Many Requests' + } + }) + .mockResolvedValueOnce({ + data: { id: 'new-media-id' } + }) + }; + + await mediaImporter(mockDirectory, flotiqApi, mockMediaApi, 1); + + expect(mockMediaApi.post).toHaveBeenCalledTimes(2); + }); + + it('should respect writePerSecondLimit and throttle uploads', async () => { + const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, { + batchSize: 100, + }); + flotiqApi.flotiqApiUrl = mockApiUrl; + + const mockMediaApi = { + post: jest.fn() + .mockResolvedValueOnce({ + data: { id: 'new-media-id' } + }) + }; + + const start = Date.now(); + await mediaImporter(mockDirectory, flotiqApi, mockMediaApi, 1); // writePerSecondLimit = 1 + + const end = Date.now(); + const elapsed = end - start; + + expect(mockMediaApi.post).toHaveBeenCalledTimes(1); + // Check that importer respected throttle limit + expect(elapsed).toBeGreaterThanOrEqual(1000); // at least 1 second + }); +});