From 279d7291b8234075a1776532c122f3ce0070f172 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Thu, 29 Apr 2021 13:30:45 -0400 Subject: [PATCH] fix(core): Add 2 retries when loading CIDs from IPFS (#1334) --- packages/core/src/dispatcher.ts | 48 +++++++++++++++++---------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/packages/core/src/dispatcher.ts b/packages/core/src/dispatcher.ts index 415154b038..8fbacdcbc8 100644 --- a/packages/core/src/dispatcher.ts +++ b/packages/core/src/dispatcher.ts @@ -16,7 +16,7 @@ import { Subscription } from 'rxjs'; import { MessageBus } from './pubsub/message-bus'; const IPFS_GET_TIMEOUT = 60000 // 1 minute -const IPFS_MAX_RECORD_SIZE = 256000 // 256 KB +const IPFS_MAX_COMMIT_SIZE = 256000 // 256 KB const IPFS_RESUBSCRIBE_INTERVAL_DELAY = 1000 * 15 // 15 sec function messageTypeToString(type: MsgType): string { @@ -32,6 +32,8 @@ function messageTypeToString(type: MsgType): string { } } +const IPFS_RETRIES = 2 + /** * Ceramic core Dispatcher used for handling messages from pub/sub topic. */ @@ -57,12 +59,12 @@ export class Dispatcher { const cid = await this._ipfs.dag.put(jws, { format: 'dag-jose', hashAlg: 'sha2-256' }) // put the payload into the ipfs dag await this._ipfs.block.put(linkedBlock, { cid: jws.link.toString() }) - await this._restrictRecordSize(jws.link.toString()) - await this._restrictRecordSize(cid) + await this._restrictCommitSize(jws.link.toString()) + await this._restrictCommitSize(cid) return cid } const cid = await this._ipfs.dag.put(data) - await this._restrictRecordSize(cid) + await this._restrictCommitSize(cid) return cid } @@ -74,14 +76,9 @@ export class Dispatcher { * @param cid - Commit CID */ async retrieveCommit (cid: CID | string): Promise { - try { - const record = await this._ipfs.dag.get(cid, {timeout: IPFS_GET_TIMEOUT}) - await this._restrictRecordSize(cid) - return cloneDeep(record.value) - } catch (e) { - this._logger.err(`Error while loading commit CID ${cid.toString()} from IPFS: ${e}`) - throw e - } + const commit = await this.retrieveFromIPFS(cid) + await this._restrictCommitSize(cid) + return commit } /** @@ -90,24 +87,29 @@ export class Dispatcher { * @param path - optional IPLD path to load, starting from the object represented by `cid` */ async retrieveFromIPFS (cid: CID | string, path?: string): Promise { - try { - const record = await this._ipfs.dag.get(cid, {timeout: IPFS_GET_TIMEOUT, path}) - return cloneDeep(record.value) - } catch (e) { - this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}`) - throw e + for (let i = 0; i <= IPFS_RETRIES; i++) { + try { + const record = await this._ipfs.dag.get(cid, {timeout: IPFS_GET_TIMEOUT, path}) + return cloneDeep(record.value) + } catch (e) { + this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}. Retries remaining: ${IPFS_RETRIES - i}`) + if (i < IPFS_RETRIES) { + continue + } + throw e + } } } /** - * Restricts record size to IPFS_MAX_RECORD_SIZE - * @param cid - Record CID + * Restricts commit size to IPFS_MAX_COMMIT_SIZE + * @param cid - Commit CID * @private */ - async _restrictRecordSize(cid: CID | string): Promise { + async _restrictCommitSize(cid: CID | string): Promise { const stat = await this._ipfs.block.stat(cid, { timeout: IPFS_GET_TIMEOUT }) - if (stat.size > IPFS_MAX_RECORD_SIZE) { - throw new Error(`${cid.toString()} record size ${stat.size} exceeds the maximum block size of ${IPFS_MAX_RECORD_SIZE}`) + if (stat.size > IPFS_MAX_COMMIT_SIZE) { + throw new Error(`${cid.toString()} record size ${stat.size} exceeds the maximum block size of ${IPFS_MAX_COMMIT_SIZE}`) } }