Skip to content

Commit

Permalink
Merge pull request #94 from flotiq/feature/25873-add-internal-throttl…
Browse files Browse the repository at this point in the history
…e-to-cli-importer

#25873 add internal throttle to importer
  • Loading branch information
KarolNet authored Jan 27, 2025
2 parents 31ec6d1 + 7cfe778 commit 5ad5e9f
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 39 deletions.
5 changes: 3 additions & 2 deletions commands/importer.js
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ async function featuredImagesImport(flotiqApi, contentTypeDefinitions, featuredI
if (contentTypeDefinition.name === featuredImage.ctdName) {
contentTypeDefinition.featuredImage = featuredImage.featuredImage;
if (replacements.length) {
await shouldUpdate(contentTypeDefinition, replacements)
await shouldUpdate(contentTypeDefinition, replacements)
}
let response = await flotiqApi.updateContentTypeDefinition(contentTypeDefinition.name, contentTypeDefinition)
.catch((e)=>{return e.response});
Expand Down Expand Up @@ -429,6 +429,7 @@ async function handler(argv) {

const flotiqApi = new FlotiqApi(`${config.apiUrl}/api/v1`, argv.flotiqApiKey, {
batchSize: 100,
writePerSecondLimit: 10
});

let [featuredImages, CTDs] = await importer(
Expand All @@ -450,7 +451,7 @@ async function handler(argv) {
directory,
flotiqApi,
mediaApi,
featuredImages
writePerSecondLimit = 10
);

await featuredImagesImport(
Expand Down
66 changes: 39 additions & 27 deletions src/flotiq-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ module.exports = class FlotiqApi {
this.flotiqApiKey = flotiqApiKey;
this.batchSizeRead = options.batchSizeRead || options.batchSize || 1000;
this.batchSize = options.batchSize || 100;
this.interval = 1000 / (options.writePerSecondLimit || 10);

this.headers = {
"Content-type": "application/json;charset=utf-8",
"X-Auth-Token": this.flotiqApiKey,
};

this.tooManyRequestsMessage = `\nReceived status 429 (Too Many Requests), retrying in 1 second...`;

this.middleware = axios.create({
baseURL: this.flotiqApiUrl,
timeout: this.timeout,
Expand Down Expand Up @@ -119,46 +122,27 @@ module.exports = class FlotiqApi {
const bar = new ProgressBar(`Persisting ${ctd} [:bar] :percent ETA :etas`, { total: obj.length });
const uri = `/content/${ctd}/batch?updateExisting=true`;

for (let i = 0; i < obj.length; i += this.batchSize) {
const batch = obj.slice(i, i + this.batchSize);
await this.middleware.post(uri, batch).catch(e =>{
console.dir(e.response.data.errors, { depth: undefined })
throw new Error(e.message);
});
bar.tick(this.batchSize)
}
await this._sendRequest(uri, obj, 'POST', bar);
}


async patchContentObjectBatch(ctd, obj) {
assert(typeof ctd, 'string');
assert(typeof ctd === 'string');
assert(Array.isArray(obj));

const bar = new ProgressBar(`Updating ${ctd} [:bar] :percent ETA :etas`, { total: obj.length });
const uri = `/content/${ctd}/batch`;

for (let i = 0; i < obj.length; i += this.batchSize) {
const batch = obj.slice(i, i + this.batchSize);
await this.middleware.patch(uri, batch).catch(e =>{
console.log(e.response.data.errors)
throw new Error(e.message);
});
bar.tick(this.batchSize);
}
await this._sendRequest(uri, obj, 'PATCH', bar);
}

async deleteContentObjectBatch(ctd, obj) {
assert(typeof ctd, 'string');
assert(typeof ctd === 'string');
assert(Array.isArray(obj));

const uri = `/content/${ctd}/batch-delete`;

const uri = `/content/${ctd}/batch-delete`

for (let i = 0; i < obj.length; i += this.batchSize) {
const batch = obj
.slice(i, i + this.batchSize)
.map(item => item.id);
await this.middleware.post(uri, batch)
}
await this._sendRequest(uri, obj, 'DELETE');
}


Expand Down Expand Up @@ -286,4 +270,32 @@ module.exports = class FlotiqApi {
})
}

async _sendRequest(uri, obj, method, bar) {
for (let i = 0; i < obj.length; i += this.batchSize) {
const batch = obj.slice(i, i + this.batchSize);
const actions = {
POST: async () => await this.middleware.post(uri, batch),
PATCH: async () => await this.middleware.patch(uri, batch),
DELETE: async () => await this.middleware.post(uri, batch),
};

try {
await actions[method]();
} catch (e) {
if (e.response && e.response.status === 429) {
logger.info(this.tooManyRequestsMessage);
await new Promise(resolve => setTimeout(resolve, 1000)); // Retry after 1 second
return this._sendRequest(uri, batch, method); // Retry request
} else {
console.dir(e.response.data.errors, { depth: undefined });
throw new Error(e.message);
}
}
if (bar) {
bar.tick(this.batchSize);
}

await new Promise(resolve => setTimeout(resolve, this.interval));
}
}
};
34 changes: 25 additions & 9 deletions src/media.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const logger = require('./logger')
const { Blob } = require('buffer');
const {readCTDs, shouldUpdate } = require("./util");

async function mediaImporter (directory, flotiqApi, mediaApi) {
async function mediaImporter (directory, flotiqApi, mediaApi, writePerSecondLimit = 10) {
const checkIfMediaUsed = true;

const flotiqDefinitions = await flotiqApi.fetchContentTypeDefs();
Expand Down Expand Up @@ -64,15 +64,9 @@ async function mediaImporter (directory, flotiqApi, mediaApi) {
form.append('type', file.type);
form.append('file', blob, file.fileName);

const mediaEntity = await mediaApi
.post('', form, {
headers: {
'Content-Type': 'multipart/form-data',
},
})
.then(res => res.data)
const mediaEntity = await postMedia('', form, mediaApi, writePerSecondLimit);

replacements.push([file, mediaEntity])
replacements.push([file, mediaEntity]);
}

logger.info('Will replace media in content objects')
Expand Down Expand Up @@ -101,6 +95,28 @@ async function mediaImporter (directory, flotiqApi, mediaApi) {
return replacements;
}

const postMedia = async (url, form, mediaApi, writePerSecondLimit) => {
const interval = 1000 / writePerSecondLimit;

try {
await new Promise(resolve => setTimeout(resolve, interval));
const response = await mediaApi.post(url, form, {
headers: { 'Content-Type': 'multipart/form-data' }
});

return response.data;
} catch (e) {
if (e.response && e.response.status === 429) {
logger.info(`Received 429 on media upload, retrying after 1 second...`);
// Wait for 1 second before retrying
await new Promise(resolve => setTimeout(resolve, 1000));
return postMedia(url, form, mediaApi, writePerSecondLimit);
} else {
throw new Error(e.message);
}
}
}

async function checkIsUsedIn(fileId, objects) {
let isUsed = false;
for (const relatedContentObject of objects) {
Expand Down
2 changes: 1 addition & 1 deletion tests/commands/importer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@ describe('importer', () => {

expect(FlotiqApi).toHaveBeenCalledWith('https://dummy-api.flotiq.com/api/v1', mockApiKey, expect.any(Object));
});
});
});
55 changes: 55 additions & 0 deletions tests/flotiq-api.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
const axios = require('axios');
const FlotiqApi = require('./../src/flotiq-api');

jest.mock('axios');

describe('FlotiqApi', () => {
const mockApiUrl = 'https://dummy-api.flotiq.com';
const mockApiKey = 'dummyApiKey';

it('method persistContentObjectBatch should retry when receiving a 429 status', async () => {
// Mock first response from Axios as 429, seconds as 200
const postMock = jest.fn()
.mockRejectedValueOnce({ response: { status: 429 } })
.mockResolvedValueOnce({ ok: true });

axios.create.mockReturnValue({
post: postMock,
});

const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, {
batchSize: 100,
writePerSecondLimit: 5,
});

const obj = new Array(100).fill({});
await flotiqApi.persistContentObjectBatch('mockContentType', obj);

// Expect first call to be 429, then after retry: success
expect(postMock).toHaveBeenCalledTimes(2);
expect(postMock).toHaveBeenCalledWith(expect.anything(), expect.arrayContaining([{}]));
});

it('method patchContentObjectBatch should retry when receiving a 429 status', async () => {
// Mock first response from Axios as 429, seconds as 200
const patchMock = jest.fn()
.mockRejectedValueOnce({ response: { status: 429 } })
.mockResolvedValueOnce({ ok: true });

axios.create.mockReturnValue({
patch: patchMock,
});

const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, {
batchSize: 100,
writePerSecondLimit: 5,
});

const obj = new Array(100).fill({});
await flotiqApi.patchContentObjectBatch('mockContentType', obj);

// Expect first call to be 429, then after retry: success
expect(patchMock).toHaveBeenCalledTimes(2);
expect(patchMock).toHaveBeenCalledWith(expect.anything(), expect.arrayContaining([{}]));
});
});
109 changes: 109 additions & 0 deletions tests/media.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
const fs = require('fs/promises');
const FlotiqApi = require('./../src/flotiq-api');
const { mediaImporter } = require('./../src/media');

jest.mock('axios');
jest.mock('fs/promises');
jest.mock('node-fetch');
jest.mock('./../src/flotiq-api');
jest.mock('./../src/logger', () => ({
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
}));

describe('mediaImporter', () => {
const mockDirectory = '/mock/directory';
const mockApiUrl = 'https://dummy-api.flotiq.com';
const mockApiKey = 'dummyApiKey';

beforeEach(() => {
jest.clearAllMocks();

FlotiqApi.mockImplementation(() => ({
fetchContentTypeDefinition: jest.fn().mockResolvedValue([]),
fetchContentTypeDefs: jest.fn().mockResolvedValue([]),
updateContentTypeDefinition: jest.fn(),
fetchContentObjects: jest.fn().mockResolvedValue([]),
patchContentObjectBatch: jest.fn(),
persistContentObjectBatch: jest.fn(),
createOrUpdate: jest.fn().mockResolvedValue({
ok: true,
json: jest.fn().mockResolvedValue({success:true})
}),
middleware: {
put: jest.fn(),
delete: jest.fn().mockResolvedValue(undefined)
}
}));

global.fetch = jest.fn(() =>
Promise.resolve({
status: 404, // fetch should return 404 for importer to send postMedia request
})
);

// Mock fs.readFile
fs.readFile.mockResolvedValue(
JSON.stringify([
{
id: 'file1',
url: '/image/0x0/dummy_media_id.jpg',
mimeType: 'image/png',
extension: 'png',
fileName: 'file1.png'
}
])
);
});



it('should retry on 429 error during media upload', async () => {
const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, {
batchSize: 100,
});
flotiqApi.flotiqApiUrl = mockApiUrl;

const mockMediaApi = {
post: jest.fn()
.mockRejectedValueOnce({
response: {
status: 429,
message: 'Too Many Requests'
}
})
.mockResolvedValueOnce({
data: { id: 'new-media-id' }
})
};

await mediaImporter(mockDirectory, flotiqApi, mockMediaApi, 1);

expect(mockMediaApi.post).toHaveBeenCalledTimes(2);
});

it('should respect writePerSecondLimit and throttle uploads', async () => {
const flotiqApi = new FlotiqApi(`${mockApiUrl}/api/v1`, mockApiKey, {
batchSize: 100,
});
flotiqApi.flotiqApiUrl = mockApiUrl;

const mockMediaApi = {
post: jest.fn()
.mockResolvedValueOnce({
data: { id: 'new-media-id' }
})
};

const start = Date.now();
await mediaImporter(mockDirectory, flotiqApi, mockMediaApi, 1); // writePerSecondLimit = 1

const end = Date.now();
const elapsed = end - start;

expect(mockMediaApi.post).toHaveBeenCalledTimes(1);
// Check that importer respected throttle limit
expect(elapsed).toBeGreaterThanOrEqual(1000); // at least 1 second
});
});

0 comments on commit 5ad5e9f

Please sign in to comment.