Skip to content

Commit

Permalink
chore: add onfail to concurrent queries
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberalien committed Nov 27, 2023
1 parent 76cdeac commit 9216631
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 80 deletions.
140 changes: 113 additions & 27 deletions @iconify/tools/src/download/api/queue.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,81 @@
/**
* Concurrent queries limit
* Default parameters
*/
let queriesLimit = 5;
export const defaultQueueParams: ConcurrentQueriesCommonParams<unknown> = {
limit: 5,
retries: 3,
};

/**
* Concurrent queries retries count
* Callback to get query
*/
let maxRetries = 3;
export type GetConcurrentQueryCallback<T> = (index: number) => Promise<T>;

/**
* Set concurrent queries default limit
* Parameters
*/
export function setConcurrentQueriesDefaultLimit(value: number) {
queriesLimit = value;
export interface ConcurrentQueriesCommonParams<T> {
// Number of queries to run at the same time
limit?: number;

// Number of retries to attempt
retries?: number;

// Callback to run when a Promise fails, contains error message and index of item that caused error
//
// If callback is not present, runConcurrentQueries() throws an error.
//
// If callback is present, runConcurrentQueries() will not throw an error, it will call
// callback instead, which should throw an error if runConcurrentQueries() should be aborted
onfail?: (
index: number,
error: unknown,
params: ConcurrentQueriesParams<T>
) => void | Promise<void>;
}

/**
* Set concurrent queries default retries count
*/
export function setConcurrentQueriesDefaultRetries(value: number) {
maxRetries = value;
export interface ConcurrentQueriesParamsWithCount<T>
extends ConcurrentQueriesCommonParams<T> {
total: number;
callback: (index: number) => Promise<T>;
}

/**
* Callback to get query
*/
export type GetConcurrentQueryCallback<T> = (index: number) => Promise<T>;
export interface ConcurrentQueriesParamsWithPromises<T>
extends ConcurrentQueriesCommonParams<T> {
promises: Promise<T>[];
}

export type ConcurrentQueriesParams<T> =
| ConcurrentQueriesParamsWithCount<T>
| ConcurrentQueriesParamsWithPromises<T>;

/**
* Runs concurrent async operations
*/
export function runConcurrentQueries<T>(
count: number,
callback: GetConcurrentQueryCallback<T>,
limit = 0,
retries = 0
params: ConcurrentQueriesParams<T>
): Promise<T[]> {
// Set limit and retries count
limit = Math.max(1, Math.min(limit || queriesLimit, count));
retries = Math.max(1, retries || maxRetries);
// Merge with defaults
const allParams = {
...defaultQueueParams,
...params,
} as ConcurrentQueriesParams<T>;

// Get type of params
const paramsWithCount = allParams as ConcurrentQueriesParamsWithCount<T>;
const paramsWithPromises =
allParams as ConcurrentQueriesParamsWithPromises<T>;
const isCallback = typeof paramsWithCount.total === 'number';

// Get options
const count = isCallback
? paramsWithCount.total
: paramsWithPromises.promises.length;
const limit = Math.max(1, Math.min(allParams.limit || 1, count));
const retries = Math.max(1, allParams.retries || 1);

// Results
const results: T[] = Array<T>(count).fill(null as unknown as T);
const results: T[] = Array<T>(count).fill(undefined as unknown as T);

// Queue
let nextIndex = 0;
Expand All @@ -53,27 +87,80 @@ export function runConcurrentQueries<T>(
// Function to call after item is resolved
function resolvedItem() {
if (rejected || resolved) {
// Already resolved/rejected promise
return;
}

if (!resolving.size && nextIndex > count) {
// All items done
resolved = true;
resolve(results);
return;
}

if (resolving.size < limit && nextIndex <= count) {
// More items in queue
startNext();
}
}

// Item failed all retries
function fail(index: number, err: unknown) {
function done(failed: boolean) {
// Done: reject or continue to next item
resolving.delete(index);
if (failed) {
rejected = true;
reject(err);
} else {
resolvedItem();
}
}

// check for callback
if (allParams.onfail) {
let retry: void | Promise<void>;
try {
retry = allParams.onfail(index, err, params);
} catch (err2) {
// Callback threw error: use that error
err = err2;
done(true);
return;
}

if (retry instanceof Promise) {
// Callback returned a Promise: wait for it
retry
.then(() => {
done(false);
})
.catch((err2) => {
// Callback threw error: use that error
err = err2;
done(true);
});
return;
}

// Nothing happened: continue
done(false);
} else {
// No callback
done(true);
}
}

// Run item
function run(index: number, retry: number) {
// Mark as resolving
resolving.add(index);

// Get promise and run it
const p = callback(index);
const p = isCallback
? paramsWithCount.callback(index)
: paramsWithPromises.promises[index];

p.then((value) => {
resolving.delete(index);
results[index] = value;
Expand All @@ -85,8 +172,7 @@ export function runConcurrentQueries<T>(
run(index, retry + 1);
});
} else if (!rejected) {
rejected = true;
reject(err);
fail(index, err);
}
});
}
Expand Down
119 changes: 72 additions & 47 deletions @iconify/tools/src/import/figma/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import {
clearAPICache,
getAPICache,
} from '../../download/api/cache';
import { runConcurrentQueries } from '../../download/api/queue';
import {
ConcurrentQueriesParamsWithCount,
runConcurrentQueries,
} from '../../download/api/queue';
import type { APICacheOptions, APIQueryParams } from '../../download/api/types';
import type { DocumentNotModified } from '../../download/types/modified';
import type {
Expand All @@ -19,6 +22,26 @@ import type {
} from './types/options';
import type { FigmaIconNode, FigmaNodesImportResult } from './types/result';

/**
* Extra parameters added to runConcurrentQueries()
*
* Can be used to identify failed items in onfail callback
*/
interface FigmaIconNodeWithURL extends FigmaIconNode {
url: string;
}

export type FigmaConcurrentQueriesParamsFunction =
| 'figmaImagesQuery'
| 'figmaDownloadImages';

export interface FigmaConcurrentQueriesParams<
T extends FigmaConcurrentQueriesParamsFunction
> {
function: T;
payload: T extends 'figmaImagesQuery' ? string[][] : FigmaIconNodeWithURL[];
}

/**
* Compare last modified dates
*/
Expand Down Expand Up @@ -181,9 +204,6 @@ export async function figmaImagesQuery(
const maxLength = 2048 - uri.length;
const svgOptions = options.svgOptions || {};

let lastError: number | undefined;
let found = 0;

// Send query
const query = (ids: string[]): Promise<FigmaAPIImagesResponse> => {
return new Promise((resolve, reject) => {
Expand Down Expand Up @@ -254,12 +274,24 @@ export async function figmaImagesQuery(
}

// Get data
const results = await runConcurrentQueries(queue.length, (index) =>
query(queue[index])
);
const queryParams: ConcurrentQueriesParamsWithCount<FigmaAPIImagesResponse> &
FigmaConcurrentQueriesParams<'figmaImagesQuery'> = {
// Params
total: queue.length,
callback: (index) => query(queue[index]),
// Payload to identify failed items in onfail callback
function: 'figmaImagesQuery',
payload: queue,
};
const results = await runConcurrentQueries(queryParams);

// Parse data
let found = 0;
results.forEach((data) => {
if (!data) {
// skip
return;
}
const images = data.images;
for (const id in images) {
const node = nodes.icons[id];
Expand All @@ -273,15 +305,7 @@ export async function figmaImagesQuery(

// Validate results
if (!found) {
if (lastError) {
throw new Error(
`Error retrieving image data from API${
lastError ? ': ' + lastError.toString() : ''
}`
);
} else {
throw new Error('No valid icon layers were found');
}
throw new Error('No valid icon layers were found');
}
nodes.generatedIconsCount = found;
return nodes;
Expand All @@ -299,45 +323,46 @@ export async function figmaDownloadImages(
let count = 0;

// Filter data
interface FigmaIconNodeWithURL extends FigmaIconNode {
url: string;
}
const filtered = Object.create(null) as Record<
string,
FigmaIconNodeWithURL
>;
const filtered: FigmaIconNodeWithURL[] = [];
for (let i = 0; i < ids.length; i++) {
const id = ids[i];
const item = icons[id];
if (item.url) {
filtered[id] = item as FigmaIconNodeWithURL;
filtered.push(item as FigmaIconNodeWithURL);
}
}
const keys = Object.keys(filtered);

// Download everything
await runConcurrentQueries(keys.length, (index) => {
return new Promise((resolve, reject) => {
const id = keys[index];
const item = filtered[id];
sendAPIQuery(
{
uri: item.url,
},
cache
)
.then((data) => {
if (typeof data === 'string') {
count++;
item.content = data;
resolve(true);
} else {
reject(data);
}
})
.catch(reject);
});
});
const params: ConcurrentQueriesParamsWithCount<undefined> &
FigmaConcurrentQueriesParams<'figmaDownloadImages'> = {
// Params
total: filtered.length,
callback: (index) => {
return new Promise((resolve, reject) => {
const item = filtered[index];
sendAPIQuery(
{
uri: item.url,
},
cache
)
.then((data) => {
if (typeof data === 'string') {
count++;
item.content = data;
resolve(undefined);
} else {
reject(data);
}
})
.catch(reject);
});
},
// Payload to identify failed items in onfail callback
function: 'figmaDownloadImages',
payload: filtered,
};
await runConcurrentQueries(params);

// Make sure something was downloaded
if (!count) {
Expand Down
Loading

0 comments on commit 9216631

Please sign in to comment.