Skip to content

Commit

Permalink
#25873 add internal throttle to importer
Browse files Browse the repository at this point in the history
  • Loading branch information
WHLukasz committed Jan 17, 2025
1 parent 31ec6d1 commit c7f99b1
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 6 deletions.
1 change: 1 addition & 0 deletions commands/importer.js
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ async function handler(argv) {

const flotiqApi = new FlotiqApi(`${config.apiUrl}/api/v1`, argv.flotiqApiKey, {
batchSize: 100,
internalWPSLimit: 10
});

let [featuredImages, CTDs] = await importer(
Expand Down
29 changes: 24 additions & 5 deletions src/flotiq-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module.exports = class FlotiqApi {
this.flotiqApiKey = flotiqApiKey;
this.batchSizeRead = options.batchSizeRead || options.batchSize || 1000;
this.batchSize = options.batchSize || 100;
this.internalWPSLimit = options.internalWPSLimit || 10;

this.headers = {
"Content-type": "application/json;charset=utf-8",
Expand Down Expand Up @@ -115,17 +116,35 @@ module.exports = class FlotiqApi {
async persistContentObjectBatch(ctd, obj) {
assert(typeof ctd, 'string');
assert(Array.isArray(obj));
const interval = 1000 / this.internalWPSLimit;

const bar = new ProgressBar(`Persisting ${ctd} [:bar] :percent ETA :etas`, { total: obj.length });
const uri = `/content/${ctd}/batch?updateExisting=true`;

for (let i = 0; i < obj.length; i += this.batchSize) {
const batch = obj.slice(i, i + this.batchSize);
await this.middleware.post(uri, batch).catch(e =>{
console.dir(e.response.data.errors, { depth: undefined })
throw new Error(e.message);
});
bar.tick(this.batchSize)

const sendRequest = async () => {
try {
await this.middleware.post(uri, batch);
} catch (e) {
if (e.response && e.response.status === 429) {
logger.info(`\nReceived status 429 (Too Many Requests), retrying in 1 second...`);
await new Promise(resolve => setTimeout(resolve, 1000));
return sendRequest();
} else {
console.dir(e.response.data.errors, { depth: undefined });
throw new Error(e.message);
}
}
};

await sendRequest();
bar.tick(this.batchSize);

if (i + this.batchSize < obj.length) {
await new Promise(resolve => setTimeout(resolve, interval));
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/commands/importer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@ describe('importer', () => {

expect(FlotiqApi).toHaveBeenCalledWith('https://dummy-api.flotiq.com/api/v1', mockApiKey, expect.any(Object));
});
});
});
33 changes: 33 additions & 0 deletions tests/flotiq-api.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const axios = require('axios');
const FlotiqApi = require('./../src/flotiq-api');

jest.mock('axios');

describe('FlotiqApi', () => {
const mockApiUrl = 'https://dummy-api.flotiq.com';
const mockApiKey = 'dummyApiKey';

it('method persistContentObjectBatch should retry when receiving a 429 status', async () => {
// Mock first response from Axios as 429, seconds as 200
const postMock = jest.fn()
.mockRejectedValueOnce({ response: { status: 429 } })
.mockResolvedValueOnce({ ok: true });

axios.create.mockReturnValue({
post: postMock,
});

const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, {
batchSize: 100,
internalWPSLimit: 5,
});

const obj = new Array(100).fill({});
await flotiqApi.persistContentObjectBatch('mockContentType', obj);

// Expect first call to be 429, then after retry: success
expect(postMock).toHaveBeenCalledTimes(2);
expect(postMock).toHaveBeenCalledWith(expect.anything(), expect.arrayContaining([{}]));
});
});

0 comments on commit c7f99b1

Please sign in to comment.