From 9216631704ee9471932b79c48543c04de1458c07 Mon Sep 17 00:00:00 2001 From: Vjacheslav Trushkin Date: Mon, 27 Nov 2023 11:19:17 +0200 Subject: [PATCH] chore: add onfail to concurrent queries --- @iconify/tools/src/download/api/queue.ts | 140 ++++++++++++++---- @iconify/tools/src/import/figma/query.ts | 119 +++++++++------ @iconify/tools/tests/misc/concurrency-test.ts | 13 +- 3 files changed, 192 insertions(+), 80 deletions(-) diff --git a/@iconify/tools/src/download/api/queue.ts b/@iconify/tools/src/download/api/queue.ts index a92b551..da08eea 100644 --- a/@iconify/tools/src/download/api/queue.ts +++ b/@iconify/tools/src/download/api/queue.ts @@ -1,47 +1,81 @@ /** - * Concurrent queries limit + * Default parameters */ -let queriesLimit = 5; +export const defaultQueueParams: ConcurrentQueriesCommonParams = { + limit: 5, + retries: 3, +}; /** - * Concurrent queries retries count + * Callback to get query */ -let maxRetries = 3; +export type GetConcurrentQueryCallback = (index: number) => Promise; /** - * Set concurrent queries default limit + * Parameters */ -export function setConcurrentQueriesDefaultLimit(value: number) { - queriesLimit = value; +export interface ConcurrentQueriesCommonParams { + // Number of queries to run at the same time + limit?: number; + + // Number of retries to attempt + retries?: number; + + // Callback to run when a Promise fails, contains error message and index of item that caused error + // + // If callback is not present, runConcurrentQueries() throws an error. + // + // If callback is present, runConcurrentQueries() will not throw an error, it will call + // callback instead, which should throw an error if runConcurrentQueries() should be aborted + onfail?: ( + index: number, + error: unknown, + params: ConcurrentQueriesParams + ) => void | Promise; } -/** - * Set concurrent queries default retries count - */ -export function setConcurrentQueriesDefaultRetries(value: number) { - maxRetries = value; +export interface ConcurrentQueriesParamsWithCount + extends ConcurrentQueriesCommonParams { + total: number; + callback: (index: number) => Promise; } -/** - * Callback to get query - */ -export type GetConcurrentQueryCallback = (index: number) => Promise; +export interface ConcurrentQueriesParamsWithPromises + extends ConcurrentQueriesCommonParams { + promises: Promise[]; +} + +export type ConcurrentQueriesParams = + | ConcurrentQueriesParamsWithCount + | ConcurrentQueriesParamsWithPromises; /** * Runs concurrent async operations */ export function runConcurrentQueries( - count: number, - callback: GetConcurrentQueryCallback, - limit = 0, - retries = 0 + params: ConcurrentQueriesParams ): Promise { - // Set limit and retries count - limit = Math.max(1, Math.min(limit || queriesLimit, count)); - retries = Math.max(1, retries || maxRetries); + // Merge with defaults + const allParams = { + ...defaultQueueParams, + ...params, + } as ConcurrentQueriesParams; + + // Get type of params + const paramsWithCount = allParams as ConcurrentQueriesParamsWithCount; + const paramsWithPromises = + allParams as ConcurrentQueriesParamsWithPromises; + const isCallback = typeof paramsWithCount.total === 'number'; + + // Get options + const count = isCallback + ? paramsWithCount.total + : paramsWithPromises.promises.length; + const limit = Math.max(1, Math.min(allParams.limit || 1, count)); + const retries = Math.max(1, allParams.retries || 1); // Results - const results: T[] = Array(count).fill(null as unknown as T); + const results: T[] = Array(count).fill(undefined as unknown as T); // Queue let nextIndex = 0; @@ -53,27 +87,80 @@ export function runConcurrentQueries( // Function to call after item is resolved function resolvedItem() { if (rejected || resolved) { + // Already resolved/rejected promise return; } if (!resolving.size && nextIndex > count) { + // All items done resolved = true; resolve(results); return; } if (resolving.size < limit && nextIndex <= count) { + // More items in queue startNext(); } } + // Item failed all retries + function fail(index: number, err: unknown) { + function done(failed: boolean) { + // Done: reject or continue to next item + resolving.delete(index); + if (failed) { + rejected = true; + reject(err); + } else { + resolvedItem(); + } + } + + // check for callback + if (allParams.onfail) { + let retry: void | Promise; + try { + retry = allParams.onfail(index, err, params); + } catch (err2) { + // Callback threw error: use that error + err = err2; + done(true); + return; + } + + if (retry instanceof Promise) { + // Callback returned a Promise: wait for it + retry + .then(() => { + done(false); + }) + .catch((err2) => { + // Callback threw error: use that error + err = err2; + done(true); + }); + return; + } + + // Nothing happened: continue + done(false); + } else { + // No callback + done(true); + } + } + // Run item function run(index: number, retry: number) { // Mark as resolving resolving.add(index); // Get promise and run it - const p = callback(index); + const p = isCallback + ? paramsWithCount.callback(index) + : paramsWithPromises.promises[index]; + p.then((value) => { resolving.delete(index); results[index] = value; @@ -85,8 +172,7 @@ export function runConcurrentQueries( run(index, retry + 1); }); } else if (!rejected) { - rejected = true; - reject(err); + fail(index, err); } }); } diff --git a/@iconify/tools/src/import/figma/query.ts b/@iconify/tools/src/import/figma/query.ts index 75d9060..8dd1e37 100644 --- a/@iconify/tools/src/import/figma/query.ts +++ b/@iconify/tools/src/import/figma/query.ts @@ -4,7 +4,10 @@ import { clearAPICache, getAPICache, } from '../../download/api/cache'; -import { runConcurrentQueries } from '../../download/api/queue'; +import { + ConcurrentQueriesParamsWithCount, + runConcurrentQueries, +} from '../../download/api/queue'; import type { APICacheOptions, APIQueryParams } from '../../download/api/types'; import type { DocumentNotModified } from '../../download/types/modified'; import type { @@ -19,6 +22,26 @@ import type { } from './types/options'; import type { FigmaIconNode, FigmaNodesImportResult } from './types/result'; +/** + * Extra parameters added to runConcurrentQueries() + * + * Can be used to identify failed items in onfail callback + */ +interface FigmaIconNodeWithURL extends FigmaIconNode { + url: string; +} + +export type FigmaConcurrentQueriesParamsFunction = + | 'figmaImagesQuery' + | 'figmaDownloadImages'; + +export interface FigmaConcurrentQueriesParams< + T extends FigmaConcurrentQueriesParamsFunction +> { + function: T; + payload: T extends 'figmaImagesQuery' ? string[][] : FigmaIconNodeWithURL[]; +} + /** * Compare last modified dates */ @@ -181,9 +204,6 @@ export async function figmaImagesQuery( const maxLength = 2048 - uri.length; const svgOptions = options.svgOptions || {}; - let lastError: number | undefined; - let found = 0; - // Send query const query = (ids: string[]): Promise => { return new Promise((resolve, reject) => { @@ -254,12 +274,24 @@ export async function figmaImagesQuery( } // Get data - const results = await runConcurrentQueries(queue.length, (index) => - query(queue[index]) - ); + const queryParams: ConcurrentQueriesParamsWithCount & + FigmaConcurrentQueriesParams<'figmaImagesQuery'> = { + // Params + total: queue.length, + callback: (index) => query(queue[index]), + // Payload to identify failed items in onfail callback + function: 'figmaImagesQuery', + payload: queue, + }; + const results = await runConcurrentQueries(queryParams); // Parse data + let found = 0; results.forEach((data) => { + if (!data) { + // skip + return; + } const images = data.images; for (const id in images) { const node = nodes.icons[id]; @@ -273,15 +305,7 @@ export async function figmaImagesQuery( // Validate results if (!found) { - if (lastError) { - throw new Error( - `Error retrieving image data from API${ - lastError ? ': ' + lastError.toString() : '' - }` - ); - } else { - throw new Error('No valid icon layers were found'); - } + throw new Error('No valid icon layers were found'); } nodes.generatedIconsCount = found; return nodes; @@ -299,45 +323,46 @@ export async function figmaDownloadImages( let count = 0; // Filter data - interface FigmaIconNodeWithURL extends FigmaIconNode { - url: string; - } - const filtered = Object.create(null) as Record< - string, - FigmaIconNodeWithURL - >; + const filtered: FigmaIconNodeWithURL[] = []; for (let i = 0; i < ids.length; i++) { const id = ids[i]; const item = icons[id]; if (item.url) { - filtered[id] = item as FigmaIconNodeWithURL; + filtered.push(item as FigmaIconNodeWithURL); } } - const keys = Object.keys(filtered); // Download everything - await runConcurrentQueries(keys.length, (index) => { - return new Promise((resolve, reject) => { - const id = keys[index]; - const item = filtered[id]; - sendAPIQuery( - { - uri: item.url, - }, - cache - ) - .then((data) => { - if (typeof data === 'string') { - count++; - item.content = data; - resolve(true); - } else { - reject(data); - } - }) - .catch(reject); - }); - }); + const params: ConcurrentQueriesParamsWithCount & + FigmaConcurrentQueriesParams<'figmaDownloadImages'> = { + // Params + total: filtered.length, + callback: (index) => { + return new Promise((resolve, reject) => { + const item = filtered[index]; + sendAPIQuery( + { + uri: item.url, + }, + cache + ) + .then((data) => { + if (typeof data === 'string') { + count++; + item.content = data; + resolve(undefined); + } else { + reject(data); + } + }) + .catch(reject); + }); + }, + // Payload to identify failed items in onfail callback + function: 'figmaDownloadImages', + payload: filtered, + }; + await runConcurrentQueries(params); // Make sure something was downloaded if (!count) { diff --git a/@iconify/tools/tests/misc/concurrency-test.ts b/@iconify/tools/tests/misc/concurrency-test.ts index 71a9038..74c0b83 100644 --- a/@iconify/tools/tests/misc/concurrency-test.ts +++ b/@iconify/tools/tests/misc/concurrency-test.ts @@ -13,9 +13,10 @@ describe('Testing concurrency', () => { const callbacks: Set = new Set(); const resolved: Set = new Set(); - const result = await runConcurrentQueries( - keys.length, - (index) => { + const result = await runConcurrentQueries({ + total: keys.length, + + callback: (index) => { expect(callbacks.has(index)).toBeFalsy(); expect(resolved.has(index)).toBeFalsy(); @@ -28,7 +29,7 @@ describe('Testing concurrency', () => { expect(Array.from(resolved)).toEqual([0]); } - const key = keys[index] as keyof typeof tests; + const key = keys[index]; const delay = tests[key]; return new Promise((resolve) => { setTimeout(() => { @@ -37,8 +38,8 @@ describe('Testing concurrency', () => { }, delay); }); }, - 3 - ); + limit: 3, + }); expect(result).toEqual(Object.values(tests)); });