Skip to content

Commit

Permalink
fix(core): Add 2 retries when loading CIDs from IPFS (#1334)
Browse files Browse the repository at this point in the history
  • Loading branch information
Spencer T Brody authored Apr 29, 2021
1 parent 5abeba1 commit 279d729
Showing 1 changed file with 25 additions and 23 deletions.
48 changes: 25 additions & 23 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { Subscription } from 'rxjs';
import { MessageBus } from './pubsub/message-bus';

const IPFS_GET_TIMEOUT = 60000 // 1 minute
const IPFS_MAX_RECORD_SIZE = 256000 // 256 KB
const IPFS_MAX_COMMIT_SIZE = 256000 // 256 KB
const IPFS_RESUBSCRIBE_INTERVAL_DELAY = 1000 * 15 // 15 sec

function messageTypeToString(type: MsgType): string {
Expand All @@ -32,6 +32,8 @@ function messageTypeToString(type: MsgType): string {
}
}

const IPFS_RETRIES = 2

/**
* Ceramic core Dispatcher used for handling messages from pub/sub topic.
*/
Expand All @@ -57,12 +59,12 @@ export class Dispatcher {
const cid = await this._ipfs.dag.put(jws, { format: 'dag-jose', hashAlg: 'sha2-256' })
// put the payload into the ipfs dag
await this._ipfs.block.put(linkedBlock, { cid: jws.link.toString() })
await this._restrictRecordSize(jws.link.toString())
await this._restrictRecordSize(cid)
await this._restrictCommitSize(jws.link.toString())
await this._restrictCommitSize(cid)
return cid
}
const cid = await this._ipfs.dag.put(data)
await this._restrictRecordSize(cid)
await this._restrictCommitSize(cid)
return cid
}

Expand All @@ -74,14 +76,9 @@ export class Dispatcher {
* @param cid - Commit CID
*/
async retrieveCommit (cid: CID | string): Promise<any> {
try {
const record = await this._ipfs.dag.get(cid, {timeout: IPFS_GET_TIMEOUT})
await this._restrictRecordSize(cid)
return cloneDeep(record.value)
} catch (e) {
this._logger.err(`Error while loading commit CID ${cid.toString()} from IPFS: ${e}`)
throw e
}
const commit = await this.retrieveFromIPFS(cid)
await this._restrictCommitSize(cid)
return commit
}

/**
Expand All @@ -90,24 +87,29 @@ export class Dispatcher {
* @param path - optional IPLD path to load, starting from the object represented by `cid`
*/
async retrieveFromIPFS (cid: CID | string, path?: string): Promise<any> {
try {
const record = await this._ipfs.dag.get(cid, {timeout: IPFS_GET_TIMEOUT, path})
return cloneDeep(record.value)
} catch (e) {
this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}`)
throw e
for (let i = 0; i <= IPFS_RETRIES; i++) {
try {
const record = await this._ipfs.dag.get(cid, {timeout: IPFS_GET_TIMEOUT, path})
return cloneDeep(record.value)
} catch (e) {
this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}. Retries remaining: ${IPFS_RETRIES - i}`)
if (i < IPFS_RETRIES) {
continue
}
throw e
}
}
}

/**
* Restricts record size to IPFS_MAX_RECORD_SIZE
* @param cid - Record CID
* Restricts commit size to IPFS_MAX_COMMIT_SIZE
* @param cid - Commit CID
* @private
*/
async _restrictRecordSize(cid: CID | string): Promise<void> {
async _restrictCommitSize(cid: CID | string): Promise<void> {
const stat = await this._ipfs.block.stat(cid, { timeout: IPFS_GET_TIMEOUT })
if (stat.size > IPFS_MAX_RECORD_SIZE) {
throw new Error(`${cid.toString()} record size ${stat.size} exceeds the maximum block size of ${IPFS_MAX_RECORD_SIZE}`)
if (stat.size > IPFS_MAX_COMMIT_SIZE) {
throw new Error(`${cid.toString()} record size ${stat.size} exceeds the maximum block size of ${IPFS_MAX_COMMIT_SIZE}`)
}
}

Expand Down

0 comments on commit 279d729

Please sign in to comment.