Skip to content

Commit

Permalink
fix(bootstrap): handle HTTP 429 (#940)
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel Bodin authored Apr 18, 2022
1 parent f623e99 commit 968046b
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 28 deletions.
52 changes: 34 additions & 18 deletions src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@ import { saveDoc } from './saveDocs';
import { datadog } from './utils/datadog';
import { log } from './utils/log';
import * as sentry from './utils/sentry';
import { backoff } from './utils/wait';

type PkgJob = {
pkg: PrefetchedPkg;
retry: number;
};

export class Bootstrap extends EventEmitter {
stateManager: StateManager;
algoliaClient: SearchClient;
mainIndex: SearchIndex;
bootstrapIndex: SearchIndex;
prefetcher: Prefetcher | undefined;
consumer: QueueObject<PrefetchedPkg> | undefined;
consumer: QueueObject<PkgJob> | undefined;
interval: NodeJS.Timer | undefined;

constructor(
Expand All @@ -51,17 +57,18 @@ export class Bootstrap extends EventEmitter {
}

if (this.consumer) {
if (this.consumer.length() > 0) {
await this.consumer.drain();
}
this.consumer.kill();
await this.consumer.drain();
}

if (this.prefetcher) {
this.prefetcher.stop();
}

log.info('Stopped Bootstrap gracefully');
log.info('Stopped Bootstrap gracefully', {
queued: this.consumer?.length(),
processing: this.consumer?.running(),
});
}

/**
Expand Down Expand Up @@ -109,7 +116,7 @@ export class Bootstrap extends EventEmitter {
const consumer = createPkgConsumer(this.stateManager, this.bootstrapIndex);
consumer.unsaturated(async () => {
const next = await prefetcher.getNext();
consumer.push(next);
consumer.push({ pkg: next, retry: 0 });
done += 1;
});
consumer.buffer = 0;
Expand Down Expand Up @@ -152,12 +159,9 @@ export class Bootstrap extends EventEmitter {
* Last step after everything has been processed.
*/
private async afterProcessing(): Promise<void> {
if (this.consumer!.length() > 0) {
// While we no longer are in "processing" mode
// it can be possible that there's a last iteration in the queue
await this.consumer!.drain();
}

// While we no longer are in "processing" mode
// it can be possible that there's a last iteration in the queue
await this.consumer!.drain();
this.consumer!.kill();

await this.stateManager.save({
Expand Down Expand Up @@ -203,12 +207,13 @@ export class Bootstrap extends EventEmitter {

log.info(
chalk.dim.italic
.white`[progress] %d/%d docs (%s%) (%s prefetched) (%s processing)`,
.white`[progress] %d/%d docs (%s%) (%s prefetched) (%s processing; %s buffer)`,
offset + nbDocs,
totalDocs,
((Math.max(offset + nbDocs, 1) / totalDocs) * 100).toFixed(2),
this.prefetcher!.idleCount,
this.consumer!.running()
this.consumer!.running(),
this.consumer!.length()
);
}
}
Expand All @@ -219,8 +224,8 @@ export class Bootstrap extends EventEmitter {
function createPkgConsumer(
stateManager: StateManager,
index: SearchIndex
): QueueObject<PrefetchedPkg> {
return queue<PrefetchedPkg>(async (pkg) => {
): QueueObject<PkgJob> {
const consumer = queue<PkgJob>(async ({ pkg, retry }) => {
if (!pkg) {
return;
}
Expand Down Expand Up @@ -252,11 +257,22 @@ function createPkgConsumer(
bootstrapLastId: pkg.id,
});
}
log.info(`Done:`, pkg.id);
} catch (err) {
sentry.report(err);
log.info(`Failed:`, pkg.id);

if (err instanceof Error && err.message.includes('Access denied')) {
await backoff(retry + 1, config.retryBackoffPow);
consumer.push({ pkg, retry: retry + 1 });
sentry.report(new Error('Throttling job'), { err });
return;
}

sentry.report(new Error('Error during job'), { err });
} finally {
log.info(`Done:`, pkg.id);
datadog.timing('loop', Date.now() - start);
}
}, config.bootstrapConcurrency);

return consumer;
}
2 changes: 1 addition & 1 deletion src/formatPkg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ export function getMains(pkg: Pick<NicePackageType, 'main'>): string[] {
export function getExportKeys(
exp: NicePackageType['exports'] | string
): string[] {
if (typeof exp !== 'object') {
if (typeof exp !== 'object' || exp === null) {
return [];
}
const keys = Object.keys(exp);
Expand Down
6 changes: 3 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ class Main {

const main = new Main();

process.on('unhandledRejection', async (reason) => {
sentry.report(reason);
process.on('unhandledRejection', async (err) => {
sentry.report(new Error('unhandledRejection'), { err });
await close();
});
process.on('uncaughtException', (err) => {
sentry.report(err, { context: 'uncaughtException' });
sentry.report(new Error('uncauthexception'), { err });
});

(async (): Promise<void> => {
Expand Down
9 changes: 9 additions & 0 deletions src/utils/wait.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import { log } from './log';

// Coming in nodejs 16
export function wait(waitTime: number): Promise<void> {
return new Promise((resolve) => {
setTimeout(resolve, waitTime);
});
}

export async function backoff(retry: number, pow: number): Promise<void> {
// retry backoff
const bo = Math.pow(retry + 1, pow) * 1000;
log.info('Retrying (', retry, '), waiting for', bo);
await wait(bo);
}
9 changes: 3 additions & 6 deletions src/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { saveDoc } from './saveDocs';
import { datadog } from './utils/datadog';
import { log } from './utils/log';
import * as sentry from './utils/sentry';
import { wait } from './utils/wait';
import { backoff } from './utils/wait';

type ChangeJob = {
change: DatabaseChangesResultItem;
Expand Down Expand Up @@ -102,6 +102,7 @@ export class Watch {

log.info('Stopped Watch gracefully', {
queued: this.changesConsumer?.length(),
processing: this.changesConsumer?.running(),
});
}

Expand Down Expand Up @@ -255,10 +256,7 @@ export class Watch {
}

if (job.retry > 0) {
// retry backoff
const backoff = Math.pow(job.retry + 1, config.retryBackoffPow) * 1000;
log.info('Retrying (', job.retry, '), waiting for', backoff);
await wait(backoff);
await backoff(job.retry, config.retryBackoffPow);
}

if (change.deleted) {
Expand All @@ -273,7 +271,6 @@ export class Watch {
const res = await npm.getDoc(change.id, change.changes[0]!.rev);

if (isFailure(res)) {
log.error('Got an error', res.error);
throw new Error(res.error);
}

Expand Down

0 comments on commit 968046b

Please sign in to comment.