From 546bf6685a414e0cf470894704ec7ef7a5654e9c Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 19 Jan 2024 11:59:08 +0100 Subject: [PATCH 01/15] feat: add block session support to @helia/interface There are no implementations yet but the usage pattern will be something like: ```javascript // unixfs cat command export async function * cat (cid: CID, blockstore: Blocks, options: Partial = {}): AsyncIterable { // create a session for the CID if support is available const blocks = await (blockstore.createSession != null ? blockstore.createSession(cid) : blockstore) const opts: CatOptions = mergeOptions(defaultOptions, options) // resolve and export using the session, if created, otherwise fall back to regular blockstore access const resolved = await resolve(cid, opts.path, blocks, opts) const result = await exporter(resolved.cid, blocks, opts) if (result.type !== 'file' && result.type !== 'raw') { throw new NotAFileError() } if (result.content == null) { throw new NoContentError() } yield * result.content(opts) } ``` --- packages/block-brokers/src/bitswap.ts | 8 +- .../src/trustless-gateway/broker.ts | 4 +- .../src/trustless-gateway/index.ts | 6 +- .../test/trustless-gateway.spec.ts | 12 +- packages/interface/src/blocks.ts | 32 +++-- packages/utils/src/storage.ts | 22 +++- packages/utils/src/utils/networked-storage.ts | 122 +++++++++++------- packages/utils/test/block-broker.spec.ts | 4 +- packages/utils/test/storage.spec.ts | 4 +- .../test/utils/networked-storage.spec.ts | 4 +- 10 files changed, 136 insertions(+), 82 deletions(-) diff --git a/packages/block-brokers/src/bitswap.ts b/packages/block-brokers/src/bitswap.ts index 072594fce..f764e233f 100644 --- a/packages/block-brokers/src/bitswap.ts +++ b/packages/block-brokers/src/bitswap.ts @@ -1,5 +1,5 @@ import { createBitswap } from 'ipfs-bitswap' -import type { BlockAnnouncer, BlockBroker, BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks' +import type { BlockBroker, BlockRetrievalOptions } from '@helia/interface/blocks' import type { Libp2p, Startable } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { Bitswap, BitswapNotifyProgressEvents, BitswapOptions, BitswapWantBlockProgressEvents } from 'ipfs-bitswap' @@ -17,9 +17,7 @@ export interface BitswapInit extends BitswapOptions { } -class BitswapBlockBroker implements BlockAnnouncer>, BlockRetriever< -ProgressOptions ->, Startable { +class BitswapBlockBroker implements BlockBroker, ProgressOptions>, Startable { private readonly bitswap: Bitswap private started: boolean @@ -69,7 +67,7 @@ ProgressOptions this.bitswap.notify(cid, block, options) } - async retrieve (cid: CID, { validateFn, ...options }: BlockRetrievalOptions> = {}): Promise { + async retrieve (cid: CID, options: BlockRetrievalOptions> = {}): Promise { return this.bitswap.want(cid, options) } } diff --git a/packages/block-brokers/src/trustless-gateway/broker.ts b/packages/block-brokers/src/trustless-gateway/broker.ts index 433a690d0..a080fde98 100644 --- a/packages/block-brokers/src/trustless-gateway/broker.ts +++ b/packages/block-brokers/src/trustless-gateway/broker.ts @@ -1,7 +1,7 @@ import { TrustlessGateway } from './trustless-gateway.js' import { DEFAULT_TRUSTLESS_GATEWAYS } from './index.js' import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayComponents, TrustlessGatewayGetBlockProgressEvents } from './index.js' -import type { BlockRetrievalOptions, BlockRetriever } from '@helia/interface/blocks' +import type { BlockRetrievalOptions, BlockBroker } from '@helia/interface/blocks' import type { Logger } from '@libp2p/interface' import type { CID } from 'multiformats/cid' import type { ProgressOptions } from 'progress-events' @@ -10,7 +10,7 @@ import type { ProgressOptions } from 'progress-events' * A class that accepts a list of trustless gateways that are queried * for blocks. */ -export class TrustlessGatewayBlockBroker implements BlockRetriever< +export class TrustlessGatewayBlockBroker implements BlockBroker< ProgressOptions > { private readonly gateways: TrustlessGateway[] diff --git a/packages/block-brokers/src/trustless-gateway/index.ts b/packages/block-brokers/src/trustless-gateway/index.ts index 91dcab15c..5908816c5 100644 --- a/packages/block-brokers/src/trustless-gateway/index.ts +++ b/packages/block-brokers/src/trustless-gateway/index.ts @@ -1,7 +1,7 @@ import { TrustlessGatewayBlockBroker } from './broker.js' -import type { BlockRetriever } from '@helia/interface/src/blocks.js' +import type { BlockBroker } from '@helia/interface/src/blocks.js' import type { ComponentLogger } from '@libp2p/interface' -import type { ProgressEvent } from 'progress-events' +import type { ProgressEvent, ProgressOptions } from 'progress-events' export const DEFAULT_TRUSTLESS_GATEWAYS = [ // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ @@ -25,6 +25,6 @@ export interface TrustlessGatewayComponents { logger: ComponentLogger } -export function trustlessGateway (init: TrustlessGatewayBlockBrokerInit = {}): (components: TrustlessGatewayComponents) => BlockRetriever { +export function trustlessGateway (init: TrustlessGatewayBlockBrokerInit = {}): (components: TrustlessGatewayComponents) => BlockBroker> { return (components) => new TrustlessGatewayBlockBroker(components, init) } diff --git a/packages/block-brokers/test/trustless-gateway.spec.ts b/packages/block-brokers/test/trustless-gateway.spec.ts index 481ac6df8..9851e4121 100644 --- a/packages/block-brokers/test/trustless-gateway.spec.ts +++ b/packages/block-brokers/test/trustless-gateway.spec.ts @@ -8,12 +8,12 @@ import { type StubbedInstance, stubConstructor } from 'sinon-ts' import { TrustlessGatewayBlockBroker } from '../src/trustless-gateway/broker.js' import { TrustlessGateway } from '../src/trustless-gateway/trustless-gateway.js' import { createBlock } from './fixtures/create-block.js' -import type { BlockRetriever } from '@helia/interface/blocks' +import type { BlockBroker } from '@helia/interface/blocks' import type { CID } from 'multiformats/cid' describe('trustless-gateway-block-broker', () => { let blocks: Array<{ cid: CID, block: Uint8Array }> - let gatewayBlockBroker: BlockRetriever + let gatewayBlockBroker: BlockBroker let gateways: Array> // take a Record) => void> and stub the gateways @@ -54,7 +54,7 @@ describe('trustless-gateway-block-broker', () => { gateway.getRawBlock.rejects(new Error('failed')) } - await expect(gatewayBlockBroker.retrieve(blocks[0].cid)) + await expect(gatewayBlockBroker.retrieve?.(blocks[0].cid)) .to.eventually.be.rejected() .with.property('errors') .with.lengthOf(gateways.length) @@ -78,7 +78,7 @@ describe('trustless-gateway-block-broker', () => { } }) - await expect(gatewayBlockBroker.retrieve(blocks[1].cid)).to.eventually.be.rejected() + await expect(gatewayBlockBroker.retrieve?.(blocks[1].cid)).to.eventually.be.rejected() // all gateways were called expect(gateways[0].getRawBlock.calledWith(blocks[1].cid)).to.be.true() @@ -105,7 +105,7 @@ describe('trustless-gateway-block-broker', () => { } }) - const block = await gatewayBlockBroker.retrieve(cid1, { + const block = await gatewayBlockBroker.retrieve?.(cid1, { validateFn: async (block) => { if (block !== block1) { throw new Error('invalid block') @@ -136,7 +136,7 @@ describe('trustless-gateway-block-broker', () => { gateway.reliability.returns(0) // make sure other gateways are called last } }) - const block = await gatewayBlockBroker.retrieve(cid1, { + const block = await gatewayBlockBroker.retrieve?.(cid1, { validateFn: async (block) => { if (block !== block1) { throw new Error('invalid block') diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index ecd230126..635b03460 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -44,7 +44,9 @@ export type DeleteManyBlocksProgressEvents = export interface GetOfflineOptions { /** - * If true, do not attempt to fetch any missing blocks from the network (default: false) + * If true, do not attempt to fetch any missing blocks from the network + * + * @default false */ offline?: boolean } @@ -54,7 +56,18 @@ ProgressOptions, ProgressOptions, GetOfflineOptions & ProgressOptions, ProgressOptions, ProgressOptions, ProgressOptions > { - + /** + * A session blockstore is a special blockstore that only pulls content from a + * subset of network peers which respond as having the block for the initial + * root CID. + * + * Any blocks written to the blockstore as part of the session will propagate + * to the blockstore the session was created from. + * + * This method is optional to maintain compatibility with existing + * blockstores that do not support sessions. + */ + createSession?(root: CID, options?: AbortOptions & ProgressOptions): Promise } export type BlockRetrievalOptions = AbortOptions & GetProgressOptions & { @@ -67,18 +80,19 @@ export type BlockRetrievalOptions } -export interface BlockRetriever { +export interface BlockBroker { /** * Retrieve a block from a source */ - retrieve(cid: CID, options?: BlockRetrievalOptions): Promise -} + retrieve?(cid: CID, options?: BlockRetrievalOptions): Promise -export interface BlockAnnouncer { /** * Make a new block available to peers */ - announce(cid: CID, block: Uint8Array, options?: NotifyProgressOptions): void -} + announce?(cid: CID, block: Uint8Array, options?: NotifyProgressOptions): void -export type BlockBroker = BlockRetriever | BlockAnnouncer + /** + * Create a new session + */ + createSession?(root: CID, options?: BlockRetrievalOptions): Promise> +} diff --git a/packages/utils/src/storage.ts b/packages/utils/src/storage.ts index 909c2e89c..999251082 100644 --- a/packages/utils/src/storage.ts +++ b/packages/utils/src/storage.ts @@ -1,4 +1,4 @@ -import { start, stop } from '@libp2p/interface' +import { CodeError, start, stop } from '@libp2p/interface' import createMortice from 'mortice' import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks' import type { Pins } from '@helia/interface/pins' @@ -24,14 +24,14 @@ export interface GetOptions extends AbortOptions { */ export class BlockStorage implements Blocks, Startable { public lock: Mortice - private readonly child: Blockstore + private readonly child: Blocks private readonly pins: Pins private started: boolean /** * Create a new BlockStorage */ - constructor (blockstore: Blockstore, pins: Pins, options: BlockStorageInit = {}) { + constructor (blockstore: Blocks, pins: Pins, options: BlockStorageInit = {}) { this.child = blockstore this.pins = pins this.lock = createMortice({ @@ -169,4 +169,20 @@ export class BlockStorage implements Blocks, Startable { releaseLock() } } + + async createSession (root: CID, options?: AbortOptions): Promise { + const releaseLock = await this.lock.readLock() + + try { + const blocks = await this.child.createSession?.(root, options) + + if (blocks == null) { + throw new CodeError('Sessions not supported', 'ERR_UNSUPPORTED') + } + + return blocks + } finally { + releaseLock() + } + } } diff --git a/packages/utils/src/utils/networked-storage.ts b/packages/utils/src/utils/networked-storage.ts index bf98500f2..644256204 100644 --- a/packages/utils/src/utils/networked-storage.ts +++ b/packages/utils/src/utils/networked-storage.ts @@ -4,23 +4,19 @@ import filter from 'it-filter' import forEach from 'it-foreach' import { CustomProgressEvent, type ProgressOptions } from 'progress-events' import { equals as uint8ArrayEquals } from 'uint8arrays/equals' -import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetriever, BlockAnnouncer, BlockRetrievalOptions } from '@helia/interface/blocks' +import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetrievalOptions } from '@helia/interface/blocks' import type { AbortOptions, ComponentLogger, Logger, LoggerOptions, Startable } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { AwaitIterable } from 'interface-store' import type { CID } from 'multiformats/cid' import type { MultihashHasher } from 'multiformats/hashes/interface' -export interface GetOptions extends AbortOptions { - progress?(evt: Event): void +export interface NetworkedStorageStorageInit { + root?: CID } -function isBlockRetriever (b: any): b is BlockRetriever { - return typeof b.retrieve === 'function' -} - -function isBlockAnnouncer (b: any): b is BlockAnnouncer { - return typeof b.announce === 'function' +export interface GetOptions extends AbortOptions { + progress?(evt: Event): void } export interface NetworkedStorageComponents { @@ -36,20 +32,20 @@ export interface NetworkedStorageComponents { */ export class NetworkedStorage implements Blocks, Startable { private readonly child: Blockstore - private readonly blockRetrievers: BlockRetriever[] - private readonly blockAnnouncers: BlockAnnouncer[] + private readonly blockBrokers: BlockBroker[] private readonly hashers: Record private started: boolean private readonly log: Logger + private readonly logger: ComponentLogger /** * Create a new BlockStorage */ - constructor (components: NetworkedStorageComponents) { - this.log = components.logger.forComponent('helia:networked-storage') + constructor (components: NetworkedStorageComponents, init: NetworkedStorageStorageInit = {}) { + this.log = components.logger.forComponent(`helia:networked-storage${init.root == null ? '' : `:${init.root}`}`) + this.logger = components.logger this.child = components.blockstore - this.blockRetrievers = (components.blockBrokers ?? []).filter(isBlockRetriever) - this.blockAnnouncers = (components.blockBrokers ?? []).filter(isBlockAnnouncer) + this.blockBrokers = components.blockBrokers ?? [] this.hashers = components.hashers ?? {} this.started = false } @@ -59,12 +55,12 @@ export class NetworkedStorage implements Blocks, Startable { } async start (): Promise { - await start(this.child, ...new Set([...this.blockRetrievers, ...this.blockAnnouncers])) + await start(this.child, ...this.blockBrokers) this.started = true } async stop (): Promise { - await stop(this.child, ...new Set([...this.blockRetrievers, ...this.blockAnnouncers])) + await stop(this.child, ...this.blockBrokers) this.started = false } @@ -83,8 +79,8 @@ export class NetworkedStorage implements Blocks, Startable { options.onProgress?.(new CustomProgressEvent('blocks:put:providers:notify', cid)) - this.blockAnnouncers.forEach(provider => { - provider.announce(cid, block, options) + this.blockBrokers.forEach(broker => { + broker.announce?.(cid, block, options) }) options.onProgress?.(new CustomProgressEvent('blocks:put:blockstore:put', cid)) @@ -108,8 +104,8 @@ export class NetworkedStorage implements Blocks, Startable { const notifyEach = forEach(missingBlocks, ({ cid, block }): void => { options.onProgress?.(new CustomProgressEvent('blocks:put-many:providers:notify', cid)) - this.blockAnnouncers.forEach(provider => { - provider.announce(cid, block, options) + this.blockBrokers.forEach(broker => { + broker.announce?.(cid, block, options) }) }) @@ -124,7 +120,7 @@ export class NetworkedStorage implements Blocks, Startable { if (options.offline !== true && !(await this.child.has(cid))) { // we do not have the block locally, get it from a block provider options.onProgress?.(new CustomProgressEvent('blocks:get:providers:get', cid)) - const block = await raceBlockRetrievers(cid, this.blockRetrievers, this.hashers[cid.multihash.code], { + const block = await raceBlockRetrievers(cid, this.blockBrokers, this.hashers[cid.multihash.code], { ...options, log: this.log }) @@ -133,8 +129,8 @@ export class NetworkedStorage implements Blocks, Startable { // notify other block providers of the new block options.onProgress?.(new CustomProgressEvent('blocks:get:providers:notify', cid)) - this.blockAnnouncers.forEach(provider => { - provider.announce(cid, block, options) + this.blockBrokers.forEach(broker => { + broker.announce?.(cid, block, options) }) return block @@ -155,7 +151,7 @@ export class NetworkedStorage implements Blocks, Startable { if (options.offline !== true && !(await this.child.has(cid))) { // we do not have the block locally, get it from a block provider options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:get', cid)) - const block = await raceBlockRetrievers(cid, this.blockRetrievers, this.hashers[cid.multihash.code], { + const block = await raceBlockRetrievers(cid, this.blockBrokers, this.hashers[cid.multihash.code], { ...options, log: this.log }) @@ -164,8 +160,8 @@ export class NetworkedStorage implements Blocks, Startable { // notify other block providers of the new block options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:notify', cid)) - this.blockAnnouncers.forEach(provider => { - provider.announce(cid, block, options) + this.blockBrokers.forEach(broker => { + broker.announce?.(cid, block, options) }) } })) @@ -200,6 +196,25 @@ export class NetworkedStorage implements Blocks, Startable { options.onProgress?.(new CustomProgressEvent('blocks:get-all:blockstore:get-many')) yield * this.child.getAll(options) } + + async createSession (root: CID, options?: AbortOptions & ProgressOptions): Promise { + const blockBrokers = await Promise.all(this.blockBrokers.map(async broker => { + if (broker.createSession == null) { + return broker + } + + return broker.createSession(root, options) + })) + + return new NetworkedStorage({ + blockstore: this.child, + blockBrokers, + hashers: this.hashers, + logger: this.logger + }, { + root + }) + } } export const getCidBlockVerifierFunction = (cid: CID, hasher: MultihashHasher): Required['validateFn'] => { @@ -222,38 +237,49 @@ export const getCidBlockVerifierFunction = (cid: CID, hasher: MultihashHasher): * Race block providers cancelling any pending requests once the block has been * found. */ -async function raceBlockRetrievers (cid: CID, providers: BlockRetriever[], hasher: MultihashHasher, options: AbortOptions & LoggerOptions): Promise { +async function raceBlockRetrievers (cid: CID, blockBrokers: BlockBroker[], hasher: MultihashHasher, options: AbortOptions & LoggerOptions): Promise { const validateFn = getCidBlockVerifierFunction(cid, hasher) const controller = new AbortController() const signal = anySignal([controller.signal, options.signal]) + const retrievers: Array>> = [] + + for (const broker of blockBrokers) { + if (broker.retrieve != null) { + // @ts-expect-error retrieve may be undefined even though we've just + // checked that it isn't + retrievers.push(broker) + } + } + try { return await Promise.any( - providers.map(async provider => { - try { - let blocksWereValidated = false - const block = await provider.retrieve(cid, { - ...options, - signal, - validateFn: async (block: Uint8Array): Promise => { + retrievers + .map(async retriever => { + try { + let blocksWereValidated = false + const block = await retriever.retrieve(cid, { + ...options, + signal, + validateFn: async (block: Uint8Array): Promise => { + await validateFn(block) + blocksWereValidated = true + } + }) + + if (!blocksWereValidated) { + // the blockBroker either did not throw an error when attempting to validate the block + // or did not call the validateFn at all. We should validate the block ourselves await validateFn(block) - blocksWereValidated = true } - }) - if (!blocksWereValidated) { - // the blockBroker either did not throw an error when attempting to validate the block - // or did not call the validateFn at all. We should validate the block ourselves - await validateFn(block) + return block + } catch (err) { + options.log.error('could not retrieve verified block for %c', cid, err) + throw err } - - return block - } catch (err) { - options.log.error('could not retrieve verified block for %c', cid, err) - throw err - } - }) + }) ) } finally { signal.clear() diff --git a/packages/utils/test/block-broker.spec.ts b/packages/utils/test/block-broker.spec.ts index 1cef6c475..140b7a51a 100644 --- a/packages/utils/test/block-broker.spec.ts +++ b/packages/utils/test/block-broker.spec.ts @@ -12,7 +12,7 @@ import { type StubbedInstance, stubInterface } from 'sinon-ts' import { defaultHashers } from '../src/utils/default-hashers.js' import { NetworkedStorage } from '../src/utils/networked-storage.js' import { createBlock } from './fixtures/create-block.js' -import type { BlockBroker, BlockRetriever } from '@helia/interface/blocks' +import type { BlockBroker } from '@helia/interface/blocks' import type { Blockstore } from 'interface-blockstore' import type { CID } from 'multiformats/cid' @@ -21,7 +21,7 @@ describe('block-broker', () => { let blockstore: Blockstore let bitswapBlockBroker: StubbedInstance> let blocks: Array<{ cid: CID, block: Uint8Array }> - let gatewayBlockBroker: StubbedInstance> + let gatewayBlockBroker: StubbedInstance> beforeEach(async () => { blocks = [] diff --git a/packages/utils/test/storage.spec.ts b/packages/utils/test/storage.spec.ts index 92d7614a2..db93c844d 100644 --- a/packages/utils/test/storage.spec.ts +++ b/packages/utils/test/storage.spec.ts @@ -10,13 +10,13 @@ import * as raw from 'multiformats/codecs/raw' import { PinsImpl } from '../src/pins.js' import { BlockStorage } from '../src/storage.js' import { createBlock } from './fixtures/create-block.js' +import type { Blocks } from '@helia/interface' import type { Pins } from '@helia/interface/pins' -import type { Blockstore } from 'interface-blockstore' import type { CID } from 'multiformats/cid' describe('storage', () => { let storage: BlockStorage - let blockstore: Blockstore + let blockstore: Blocks let pins: Pins let blocks: Array<{ cid: CID, block: Uint8Array }> diff --git a/packages/utils/test/utils/networked-storage.spec.ts b/packages/utils/test/utils/networked-storage.spec.ts index 46a7d4ad1..46c98fe60 100644 --- a/packages/utils/test/utils/networked-storage.spec.ts +++ b/packages/utils/test/utils/networked-storage.spec.ts @@ -12,14 +12,14 @@ import { type StubbedInstance, stubInterface } from 'sinon-ts' import { defaultHashers } from '../../src/utils/default-hashers.js' import { NetworkedStorage } from '../../src/utils/networked-storage.js' import { createBlock } from '../fixtures/create-block.js' -import type { BlockAnnouncer, BlockRetriever } from '@helia/interface/blocks' +import type { BlockBroker } from '@helia/interface/blocks' import type { Blockstore } from 'interface-blockstore' import type { CID } from 'multiformats/cid' describe('networked-storage', () => { let storage: NetworkedStorage let blockstore: Blockstore - let bitswap: StubbedInstance> + let bitswap: StubbedInstance> let blocks: Array<{ cid: CID, block: Uint8Array }> beforeEach(async () => { From 5836dcdec0c26fd62a64d8c4776df2dbd53a33c9 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 30 Jan 2024 16:40:37 +0100 Subject: [PATCH 02/15] chore: pr comments --- packages/block-brokers/src/bitswap.ts | 2 +- packages/interface/src/blocks.ts | 2 +- packages/utils/src/utils/networked-storage.ts | 34 +++++++++++-------- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/packages/block-brokers/src/bitswap.ts b/packages/block-brokers/src/bitswap.ts index f764e233f..aae1c7b7e 100644 --- a/packages/block-brokers/src/bitswap.ts +++ b/packages/block-brokers/src/bitswap.ts @@ -63,7 +63,7 @@ class BitswapBlockBroker implements BlockBroker): void { + async announce (cid: CID, block: Uint8Array, options?: ProgressOptions): Promise { this.bitswap.notify(cid, block, options) } diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index 635b03460..f55f6df75 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -89,7 +89,7 @@ export interface BlockBroker /** * Create a new session diff --git a/packages/utils/src/utils/networked-storage.ts b/packages/utils/src/utils/networked-storage.ts index 644256204..692f77266 100644 --- a/packages/utils/src/utils/networked-storage.ts +++ b/packages/utils/src/utils/networked-storage.ts @@ -11,7 +11,7 @@ import type { AwaitIterable } from 'interface-store' import type { CID } from 'multiformats/cid' import type { MultihashHasher } from 'multiformats/hashes/interface' -export interface NetworkedStorageStorageInit { +export interface NetworkedStorageInit { root?: CID } @@ -41,7 +41,7 @@ export class NetworkedStorage implements Blocks, Startable { /** * Create a new BlockStorage */ - constructor (components: NetworkedStorageComponents, init: NetworkedStorageStorageInit = {}) { + constructor (components: NetworkedStorageComponents, init: NetworkedStorageInit = {}) { this.log = components.logger.forComponent(`helia:networked-storage${init.root == null ? '' : `:${init.root}`}`) this.logger = components.logger this.child = components.blockstore @@ -79,9 +79,13 @@ export class NetworkedStorage implements Blocks, Startable { options.onProgress?.(new CustomProgressEvent('blocks:put:providers:notify', cid)) - this.blockBrokers.forEach(broker => { - broker.announce?.(cid, block, options) - }) + await Promise.all( + this.blockBrokers.map(broker => broker.announce?.(cid, block, options)) + ) + + await Promise.all( + this.blockBrokers.map(broker => broker.announce?.(cid, block, options)) + ) options.onProgress?.(new CustomProgressEvent('blocks:put:blockstore:put', cid)) @@ -102,11 +106,11 @@ export class NetworkedStorage implements Blocks, Startable { return !has }) - const notifyEach = forEach(missingBlocks, ({ cid, block }): void => { + const notifyEach = forEach(missingBlocks, async ({ cid, block }): Promise => { options.onProgress?.(new CustomProgressEvent('blocks:put-many:providers:notify', cid)) - this.blockBrokers.forEach(broker => { - broker.announce?.(cid, block, options) - }) + await Promise.all( + this.blockBrokers.map(broker => broker.announce?.(cid, block, options)) + ) }) options.onProgress?.(new CustomProgressEvent('blocks:put-many:blockstore:put-many')) @@ -129,9 +133,9 @@ export class NetworkedStorage implements Blocks, Startable { // notify other block providers of the new block options.onProgress?.(new CustomProgressEvent('blocks:get:providers:notify', cid)) - this.blockBrokers.forEach(broker => { - broker.announce?.(cid, block, options) - }) + await Promise.all( + this.blockBrokers.map(broker => broker.announce?.(cid, block, options)) + ) return block } @@ -160,9 +164,9 @@ export class NetworkedStorage implements Blocks, Startable { // notify other block providers of the new block options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:notify', cid)) - this.blockBrokers.forEach(broker => { - broker.announce?.(cid, block, options) - }) + await Promise.all( + this.blockBrokers.map(broker => broker.announce?.(cid, block, options)) + ) } })) } From 3d9cb49f4153acd74cc48225fe9c699295a210f3 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 30 Jan 2024 17:08:10 +0100 Subject: [PATCH 03/15] chore: linting --- packages/utils/src/utils/networked-storage.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/utils/src/utils/networked-storage.ts b/packages/utils/src/utils/networked-storage.ts index 692f77266..3f012174e 100644 --- a/packages/utils/src/utils/networked-storage.ts +++ b/packages/utils/src/utils/networked-storage.ts @@ -80,11 +80,11 @@ export class NetworkedStorage implements Blocks, Startable { options.onProgress?.(new CustomProgressEvent('blocks:put:providers:notify', cid)) await Promise.all( - this.blockBrokers.map(broker => broker.announce?.(cid, block, options)) + this.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) ) await Promise.all( - this.blockBrokers.map(broker => broker.announce?.(cid, block, options)) + this.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) ) options.onProgress?.(new CustomProgressEvent('blocks:put:blockstore:put', cid)) @@ -109,7 +109,7 @@ export class NetworkedStorage implements Blocks, Startable { const notifyEach = forEach(missingBlocks, async ({ cid, block }): Promise => { options.onProgress?.(new CustomProgressEvent('blocks:put-many:providers:notify', cid)) await Promise.all( - this.blockBrokers.map(broker => broker.announce?.(cid, block, options)) + this.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) ) }) @@ -134,7 +134,7 @@ export class NetworkedStorage implements Blocks, Startable { // notify other block providers of the new block options.onProgress?.(new CustomProgressEvent('blocks:get:providers:notify', cid)) await Promise.all( - this.blockBrokers.map(broker => broker.announce?.(cid, block, options)) + this.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) ) return block @@ -165,7 +165,7 @@ export class NetworkedStorage implements Blocks, Startable { // notify other block providers of the new block options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:notify', cid)) await Promise.all( - this.blockBrokers.map(broker => broker.announce?.(cid, block, options)) + this.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) ) } })) From 39b6f7a68ba55b1b3571e61d89277fed07f58e73 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 5 Feb 2024 18:11:18 +0100 Subject: [PATCH 04/15] chore: simplify options --- packages/block-brokers/src/bitswap.ts | 9 +++--- .../src/trustless-gateway/broker.ts | 7 ++-- .../src/trustless-gateway/index.ts | 4 +-- packages/interface/src/blocks.ts | 32 +++++++++++++++---- 4 files changed, 34 insertions(+), 18 deletions(-) diff --git a/packages/block-brokers/src/bitswap.ts b/packages/block-brokers/src/bitswap.ts index aae1c7b7e..dfe79ff18 100644 --- a/packages/block-brokers/src/bitswap.ts +++ b/packages/block-brokers/src/bitswap.ts @@ -1,11 +1,10 @@ import { createBitswap } from 'ipfs-bitswap' -import type { BlockBroker, BlockRetrievalOptions } from '@helia/interface/blocks' +import type { BlockAnnounceOptions, BlockBroker, BlockRetrievalOptions } from '@helia/interface/blocks' import type { Libp2p, Startable } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { Bitswap, BitswapNotifyProgressEvents, BitswapOptions, BitswapWantBlockProgressEvents } from 'ipfs-bitswap' import type { CID } from 'multiformats/cid' import type { MultihashHasher } from 'multiformats/hashes/interface' -import type { ProgressOptions } from 'progress-events' interface BitswapComponents { libp2p: Libp2p @@ -17,7 +16,7 @@ export interface BitswapInit extends BitswapOptions { } -class BitswapBlockBroker implements BlockBroker, ProgressOptions>, Startable { +class BitswapBlockBroker implements BlockBroker, Startable { private readonly bitswap: Bitswap private started: boolean @@ -63,11 +62,11 @@ class BitswapBlockBroker implements BlockBroker): Promise { + async announce (cid: CID, block: Uint8Array, options?: BlockAnnounceOptions): Promise { this.bitswap.notify(cid, block, options) } - async retrieve (cid: CID, options: BlockRetrievalOptions> = {}): Promise { + async retrieve (cid: CID, options: BlockRetrievalOptions = {}): Promise { return this.bitswap.want(cid, options) } } diff --git a/packages/block-brokers/src/trustless-gateway/broker.ts b/packages/block-brokers/src/trustless-gateway/broker.ts index a080fde98..d19548167 100644 --- a/packages/block-brokers/src/trustless-gateway/broker.ts +++ b/packages/block-brokers/src/trustless-gateway/broker.ts @@ -4,15 +4,12 @@ import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayComponents, Trust import type { BlockRetrievalOptions, BlockBroker } from '@helia/interface/blocks' import type { Logger } from '@libp2p/interface' import type { CID } from 'multiformats/cid' -import type { ProgressOptions } from 'progress-events' /** * A class that accepts a list of trustless gateways that are queried * for blocks. */ -export class TrustlessGatewayBlockBroker implements BlockBroker< -ProgressOptions -> { +export class TrustlessGatewayBlockBroker implements BlockBroker { private readonly gateways: TrustlessGateway[] private readonly log: Logger @@ -24,7 +21,7 @@ ProgressOptions }) } - async retrieve (cid: CID, options: BlockRetrievalOptions> = {}): Promise { + async retrieve (cid: CID, options: BlockRetrievalOptions = {}): Promise { // Loop through the gateways until we get a block or run out of gateways // TODO: switch to toSorted when support is better const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability()) diff --git a/packages/block-brokers/src/trustless-gateway/index.ts b/packages/block-brokers/src/trustless-gateway/index.ts index 5908816c5..93489ecbd 100644 --- a/packages/block-brokers/src/trustless-gateway/index.ts +++ b/packages/block-brokers/src/trustless-gateway/index.ts @@ -1,7 +1,7 @@ import { TrustlessGatewayBlockBroker } from './broker.js' import type { BlockBroker } from '@helia/interface/src/blocks.js' import type { ComponentLogger } from '@libp2p/interface' -import type { ProgressEvent, ProgressOptions } from 'progress-events' +import type { ProgressEvent } from 'progress-events' export const DEFAULT_TRUSTLESS_GATEWAYS = [ // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ @@ -25,6 +25,6 @@ export interface TrustlessGatewayComponents { logger: ComponentLogger } -export function trustlessGateway (init: TrustlessGatewayBlockBrokerInit = {}): (components: TrustlessGatewayComponents) => BlockBroker> { +export function trustlessGateway (init: TrustlessGatewayBlockBrokerInit = {}): (components: TrustlessGatewayComponents) => BlockBroker { return (components) => new TrustlessGatewayBlockBroker(components, init) } diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index f55f6df75..377fc8043 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -67,10 +67,10 @@ ProgressOptions, ProgressOptions): Promise + createSession?(root: CID, options?: CreateSessionOptions): Promise } -export type BlockRetrievalOptions = AbortOptions & GetProgressOptions & { +export interface BlockRetrievalOptions = ProgressEvent> extends AbortOptions, ProgressOptions { /** * A function that blockBrokers should call prior to returning a block to ensure it can maintain control * of the block request flow. e.g. TrustedGatewayBlockBroker will use this to ensure that the block @@ -80,19 +80,39 @@ export type BlockRetrievalOptions } -export interface BlockBroker { +export interface BlockAnnounceOptions = ProgressEvent> extends AbortOptions, ProgressOptions { + +} + +export interface CreateSessionOptions = ProgressEvent> extends AbortOptions, ProgressOptions { + /** + * The minimum number of providers for the root CID that are required for + * successful session creation. + */ + providers?: number + + /** + * How long each queried provider has to respond either that they have the + * root block or to send it to us. + * + * @default 5000 + */ + timeout?: number +} + +export interface BlockBroker = ProgressEvent, AnnounceProgressEvents extends ProgressEvent = ProgressEvent> { /** * Retrieve a block from a source */ - retrieve?(cid: CID, options?: BlockRetrievalOptions): Promise + retrieve?(cid: CID, options?: BlockRetrievalOptions): Promise /** * Make a new block available to peers */ - announce?(cid: CID, block: Uint8Array, options?: NotifyProgressOptions): Promise + announce?(cid: CID, block: Uint8Array, options?: BlockAnnounceOptions): Promise /** * Create a new session */ - createSession?(root: CID, options?: BlockRetrievalOptions): Promise> + createSession?(root: CID, options?: CreateSessionOptions): Promise> } From d1228b9d69818141c6ec38f0131449ff0a1301ce Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 5 Feb 2024 18:38:00 +0100 Subject: [PATCH 05/15] chore: createSession is not optional on blocks --- packages/interface/src/blocks.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index 377fc8043..fd283dd6c 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -67,7 +67,7 @@ ProgressOptions, ProgressOptions): Promise + createSession(root: CID, options?: CreateSessionOptions): Promise } export interface BlockRetrievalOptions = ProgressEvent> extends AbortOptions, ProgressOptions { From 32713ab2e0ce25187a601f4a5907ef74f1a1c79d Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 6 Feb 2024 08:59:03 +0100 Subject: [PATCH 06/15] chore: update utils --- packages/utils/src/storage.ts | 4 ++-- packages/utils/src/utils/networked-storage.ts | 4 ---- packages/utils/test/storage.spec.ts | 9 ++++++++- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/packages/utils/src/storage.ts b/packages/utils/src/storage.ts index 999251082..f184ae22c 100644 --- a/packages/utils/src/storage.ts +++ b/packages/utils/src/storage.ts @@ -170,11 +170,11 @@ export class BlockStorage implements Blocks, Startable { } } - async createSession (root: CID, options?: AbortOptions): Promise { + async createSession (root: CID, options?: AbortOptions): Promise { const releaseLock = await this.lock.readLock() try { - const blocks = await this.child.createSession?.(root, options) + const blocks = await this.child.createSession(root, options) if (blocks == null) { throw new CodeError('Sessions not supported', 'ERR_UNSUPPORTED') diff --git a/packages/utils/src/utils/networked-storage.ts b/packages/utils/src/utils/networked-storage.ts index 3f012174e..57b58c78e 100644 --- a/packages/utils/src/utils/networked-storage.ts +++ b/packages/utils/src/utils/networked-storage.ts @@ -83,10 +83,6 @@ export class NetworkedStorage implements Blocks, Startable { this.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) ) - await Promise.all( - this.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) - ) - options.onProgress?.(new CustomProgressEvent('blocks:put:blockstore:put', cid)) return this.child.put(cid, block, options) diff --git a/packages/utils/test/storage.spec.ts b/packages/utils/test/storage.spec.ts index db93c844d..42c5a5b49 100644 --- a/packages/utils/test/storage.spec.ts +++ b/packages/utils/test/storage.spec.ts @@ -12,8 +12,15 @@ import { BlockStorage } from '../src/storage.js' import { createBlock } from './fixtures/create-block.js' import type { Blocks } from '@helia/interface' import type { Pins } from '@helia/interface/pins' +import type { Blockstore } from 'interface-blockstore' import type { CID } from 'multiformats/cid' +class MemoryBlocks extends MemoryBlockstore implements Blocks { + async createSession (): Promise { + throw new Error('Not implemented') + } +} + describe('storage', () => { let storage: BlockStorage let blockstore: Blocks @@ -29,7 +36,7 @@ describe('storage', () => { const datastore = new MemoryDatastore() - blockstore = new MemoryBlockstore() + blockstore = new MemoryBlocks() pins = new PinsImpl(datastore, blockstore, []) storage = new BlockStorage(blockstore, pins, { holdGcLock: true From 0c28d660e6373c5f603985c42de33a0f1e03f50b Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 8 Feb 2024 09:55:31 +0100 Subject: [PATCH 07/15] chore: create block brokers last to access components --- packages/utils/src/index.ts | 27 ++++++++++--------- packages/utils/src/utils/networked-storage.ts | 26 +++++++++--------- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/packages/utils/src/index.ts b/packages/utils/src/index.ts index 7890f8795..65f294543 100644 --- a/packages/utils/src/index.ts +++ b/packages/utils/src/index.ts @@ -112,6 +112,7 @@ interface Components { dagWalkers: Record logger: ComponentLogger blockBrokers: BlockBroker[] + routing: Routing } export class Helia implements HeliaInterface { @@ -130,6 +131,7 @@ export class Helia implements HeliaInterface { this.hashers = defaultHashers(init.hashers) this.dagWalkers = defaultDagWalkers(init.dagWalkers) + // @ts-expect-error routing is not set const components: Components = { blockstore: init.blockstore, datastore: init.datastore, @@ -140,19 +142,7 @@ export class Helia implements HeliaInterface { ...(init.components ?? {}) } - components.blockBrokers = init.blockBrokers.map((fn) => { - return fn(components) - }) - - const networkedStorage = new NetworkedStorage(components) - - this.pins = new PinsImpl(init.datastore, networkedStorage, this.dagWalkers) - - this.blockstore = new BlockStorage(networkedStorage, this.pins, { - holdGcLock: init.holdGcLock ?? true - }) - this.datastore = init.datastore - this.routing = new RoutingClass(components, { + this.routing = components.routing = new RoutingClass(components, { routers: (init.routers ?? []).flatMap((router: any) => { // if the router itself is a router const routers = [ @@ -172,6 +162,17 @@ export class Helia implements HeliaInterface { return routers }) }) + + const networkedStorage = new NetworkedStorage(components) + this.pins = new PinsImpl(init.datastore, networkedStorage, this.dagWalkers) + this.blockstore = new BlockStorage(networkedStorage, this.pins, { + holdGcLock: init.holdGcLock ?? true + }) + this.datastore = init.datastore + + components.blockBrokers = init.blockBrokers.map((fn) => { + return fn(components) + }) } async start (): Promise { diff --git a/packages/utils/src/utils/networked-storage.ts b/packages/utils/src/utils/networked-storage.ts index 57b58c78e..0a59cf01b 100644 --- a/packages/utils/src/utils/networked-storage.ts +++ b/packages/utils/src/utils/networked-storage.ts @@ -22,8 +22,8 @@ export interface GetOptions extends AbortOptions { export interface NetworkedStorageComponents { blockstore: Blockstore logger: ComponentLogger - blockBrokers?: BlockBroker[] - hashers?: Record + blockBrokers: BlockBroker[] + hashers: Record } /** @@ -32,11 +32,11 @@ export interface NetworkedStorageComponents { */ export class NetworkedStorage implements Blocks, Startable { private readonly child: Blockstore - private readonly blockBrokers: BlockBroker[] private readonly hashers: Record private started: boolean private readonly log: Logger private readonly logger: ComponentLogger + private readonly components: NetworkedStorageComponents /** * Create a new BlockStorage @@ -45,7 +45,7 @@ export class NetworkedStorage implements Blocks, Startable { this.log = components.logger.forComponent(`helia:networked-storage${init.root == null ? '' : `:${init.root}`}`) this.logger = components.logger this.child = components.blockstore - this.blockBrokers = components.blockBrokers ?? [] + this.components = components this.hashers = components.hashers ?? {} this.started = false } @@ -55,12 +55,12 @@ export class NetworkedStorage implements Blocks, Startable { } async start (): Promise { - await start(this.child, ...this.blockBrokers) + await start(this.child, ...this.components.blockBrokers) this.started = true } async stop (): Promise { - await stop(this.child, ...this.blockBrokers) + await stop(this.child, ...this.components.blockBrokers) this.started = false } @@ -80,7 +80,7 @@ export class NetworkedStorage implements Blocks, Startable { options.onProgress?.(new CustomProgressEvent('blocks:put:providers:notify', cid)) await Promise.all( - this.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) + this.components.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) ) options.onProgress?.(new CustomProgressEvent('blocks:put:blockstore:put', cid)) @@ -105,7 +105,7 @@ export class NetworkedStorage implements Blocks, Startable { const notifyEach = forEach(missingBlocks, async ({ cid, block }): Promise => { options.onProgress?.(new CustomProgressEvent('blocks:put-many:providers:notify', cid)) await Promise.all( - this.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) + this.components.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) ) }) @@ -120,7 +120,7 @@ export class NetworkedStorage implements Blocks, Startable { if (options.offline !== true && !(await this.child.has(cid))) { // we do not have the block locally, get it from a block provider options.onProgress?.(new CustomProgressEvent('blocks:get:providers:get', cid)) - const block = await raceBlockRetrievers(cid, this.blockBrokers, this.hashers[cid.multihash.code], { + const block = await raceBlockRetrievers(cid, this.components.blockBrokers, this.hashers[cid.multihash.code], { ...options, log: this.log }) @@ -130,7 +130,7 @@ export class NetworkedStorage implements Blocks, Startable { // notify other block providers of the new block options.onProgress?.(new CustomProgressEvent('blocks:get:providers:notify', cid)) await Promise.all( - this.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) + this.components.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) ) return block @@ -151,7 +151,7 @@ export class NetworkedStorage implements Blocks, Startable { if (options.offline !== true && !(await this.child.has(cid))) { // we do not have the block locally, get it from a block provider options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:get', cid)) - const block = await raceBlockRetrievers(cid, this.blockBrokers, this.hashers[cid.multihash.code], { + const block = await raceBlockRetrievers(cid, this.components.blockBrokers, this.hashers[cid.multihash.code], { ...options, log: this.log }) @@ -161,7 +161,7 @@ export class NetworkedStorage implements Blocks, Startable { // notify other block providers of the new block options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:notify', cid)) await Promise.all( - this.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) + this.components.blockBrokers.map(async broker => broker.announce?.(cid, block, options)) ) } })) @@ -198,7 +198,7 @@ export class NetworkedStorage implements Blocks, Startable { } async createSession (root: CID, options?: AbortOptions & ProgressOptions): Promise { - const blockBrokers = await Promise.all(this.blockBrokers.map(async broker => { + const blockBrokers = await Promise.all(this.components.blockBrokers.map(async broker => { if (broker.createSession == null) { return broker } From d0610c9c665c7e87733792fa26f87045587c5e95 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 9 Feb 2024 09:39:26 +0100 Subject: [PATCH 08/15] chore: do not dedupe at the routing level as different impls return different metadata --- packages/interface/src/blocks.ts | 23 ++++++++++++++++++++++- packages/utils/src/routing.ts | 19 ------------------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index fd283dd6c..078bad501 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -88,8 +88,29 @@ export interface CreateSessionOptions router.findProviders(key, options)) @@ -50,13 +47,6 @@ export class Routing implements RoutingInterface, Startable { continue } - // deduplicate peers - if (seen.has(peer.id)) { - continue - } - - seen.add(peer.id) - yield peer } } @@ -142,8 +132,6 @@ export class Routing implements RoutingInterface, Startable { throw new CodeError('No peer routers available', 'ERR_NO_ROUTERS_AVAILABLE') } - const seen = new PeerSet() - for await (const peer of merge( ...supports(this.routers, 'getClosestPeers') .map(router => router.getClosestPeers(key, options)) @@ -152,13 +140,6 @@ export class Routing implements RoutingInterface, Startable { continue } - // deduplicate peers - if (seen.has(peer.id)) { - continue - } - - seen.add(peer.id) - yield peer } } From 9aeded1ae2c3ada8a19148031f493bd42e0db981 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 9 Feb 2024 10:33:39 +0100 Subject: [PATCH 09/15] chore: add defaults to interface --- packages/interface/src/blocks.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index 078bad501..572007745 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -137,3 +137,7 @@ export interface BlockBroker): Promise> } + +export const DEFAULT_MIN_SESSION_PROVIDERS = 1 +export const DEFAULT_MAX_SESSION_PROVIDERS = 5 +export const DEFAULT_SESSION_QUERY_CONCURRENCY = 5 From 8e6051a4cfe15b2e606cd5816a055d8284209143 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 9 Feb 2024 10:34:33 +0100 Subject: [PATCH 10/15] chore: remove unused dep --- packages/utils/package.json | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/utils/package.json b/packages/utils/package.json index 937c4469f..5b9adec27 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -59,7 +59,6 @@ "@ipld/dag-pb": "^4.0.8", "@libp2p/interface": "^1.1.2", "@libp2p/logger": "^4.0.5", - "@libp2p/peer-collections": "^5.1.5", "@libp2p/utils": "^5.2.3", "any-signal": "^4.1.1", "cborg": "^4.0.8", From ca1c45827b118101dd2fb2e6fce3cc12017a8bd2 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 9 Feb 2024 12:25:54 +0100 Subject: [PATCH 11/15] chore: update constants --- packages/interface/src/blocks.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index 572007745..5511b7b0f 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -118,7 +118,7 @@ export interface CreateSessionOptions = ProgressEvent, AnnounceProgressEvents extends ProgressEvent = ProgressEvent> { @@ -138,6 +138,7 @@ export interface BlockBroker): Promise> } -export const DEFAULT_MIN_SESSION_PROVIDERS = 1 -export const DEFAULT_MAX_SESSION_PROVIDERS = 5 +export const DEFAULT_SESSION_MIN_PROVIDERS = 1 +export const DEFAULT_SESSION_MAX_PROVIDERS = 5 export const DEFAULT_SESSION_QUERY_CONCURRENCY = 5 +export const DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT = 5000 From d7d6334036a61e246522885eb27afa9f6cfd3d4d Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 1 Mar 2024 08:32:25 +0000 Subject: [PATCH 12/15] chore: apply suggestions from code review Co-authored-by: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> --- packages/interface/src/blocks.ts | 6 ++---- packages/utils/src/utils/networked-storage.ts | 7 ++++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index 5511b7b0f..a69d5d6db 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -64,8 +64,6 @@ ProgressOptions, ProgressOptions): Promise } @@ -110,7 +108,7 @@ export interface CreateSessionOptions > { + return typeof broker.retrieve === 'function' +} export const getCidBlockVerifierFunction = (cid: CID, hasher: MultihashHasher): Required['validateFn'] => { if (hasher == null) { throw new CodeError(`No hasher configured for multihash code 0x${cid.multihash.code.toString(16)}, please configure one. You can look up which hash this is at https://github.com/multiformats/multicodec/blob/master/table.csv`, 'ERR_UNKNOWN_HASH_ALG') @@ -246,9 +249,7 @@ async function raceBlockRetrievers (cid: CID, blockBrokers: BlockBroker[], hashe const retrievers: Array>> = [] for (const broker of blockBrokers) { - if (broker.retrieve != null) { - // @ts-expect-error retrieve may be undefined even though we've just - // checked that it isn't + if (isRetrievingBlockBroker(broker)) { retrievers.push(broker) } } From 8997d41537660151c85cd86f7bac1da202691374 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 1 Mar 2024 14:55:46 +0000 Subject: [PATCH 13/15] feat: add sessions to trustless gateways Implements blockstore sessions for trustless gateways. - Queries the Helia routing for block providers - Creates a set of trustless gateways from routing results - Uses only these gateways to fetch session blocks --- packages/block-brokers/.aegir.js | 36 ++++++ packages/block-brokers/package.json | 8 ++ .../src/trustless-gateway/broker.ts | 111 +++++++++++++++++- .../src/trustless-gateway/index.ts | 3 +- .../trustless-gateway/trustless-gateway.ts | 9 +- .../test/trustless-gateway.spec.ts | 56 ++++++++- 6 files changed, 211 insertions(+), 12 deletions(-) create mode 100644 packages/block-brokers/.aegir.js diff --git a/packages/block-brokers/.aegir.js b/packages/block-brokers/.aegir.js new file mode 100644 index 000000000..c3b498b03 --- /dev/null +++ b/packages/block-brokers/.aegir.js @@ -0,0 +1,36 @@ +import cors from 'cors' +import polka from 'polka' + +/** @type {import('aegir').PartialOptions} */ +const options = { + test: { + async before (options) { + const server = polka({ + port: 0, + host: '127.0.0.1' + }) + server.use(cors()) + server.all('/ipfs/bafkreiefnkxuhnq3536qo2i2w3tazvifek4mbbzb6zlq3ouhprjce5c3aq', (req, res) => { + res.writeHead(200, { + 'content-type': 'application/octet-stream' + }) + res.end(Uint8Array.from([0, 1, 2, 0])) + }) + + await server.listen() + const { port } = server.server.address() + + return { + server, + env: { + TRUSTLESS_GATEWAY: `http://127.0.0.1:${port}` + } + } + }, + async after (options, before) { + await before.server.stop() + } + } +} + +export default options diff --git a/packages/block-brokers/package.json b/packages/block-brokers/package.json index aaaa89bca..f454f00be 100644 --- a/packages/block-brokers/package.json +++ b/packages/block-brokers/package.json @@ -55,15 +55,23 @@ "dependencies": { "@helia/interface": "^4.0.1", "@libp2p/interface": "^1.1.4", + "@libp2p/utils": "^5.2.6", + "@multiformats/multiaddr-to-uri": "^10.0.1", "interface-blockstore": "^5.2.10", "ipfs-bitswap": "^20.0.2", "multiformats": "^13.1.0", + "p-defer": "^4.0.0", "progress-events": "^1.0.0" }, "devDependencies": { "@libp2p/logger": "^4.0.7", + "@libp2p/peer-id-factory": "^4.0.7", + "@multiformats/multiaddr": "^12.1.14", + "@multiformats/uri-to-multiaddr": "^8.0.0", "@types/sinon": "^17.0.3", "aegir": "^42.2.5", + "cors": "^2.8.5", + "polka": "^0.5.2", "sinon": "^17.0.1", "sinon-ts": "^2.0.0" } diff --git a/packages/block-brokers/src/trustless-gateway/broker.ts b/packages/block-brokers/src/trustless-gateway/broker.ts index d19548167..c0d9c8aca 100644 --- a/packages/block-brokers/src/trustless-gateway/broker.ts +++ b/packages/block-brokers/src/trustless-gateway/broker.ts @@ -1,26 +1,48 @@ +import { DEFAULT_SESSION_MIN_PROVIDERS, DEFAULT_SESSION_MAX_PROVIDERS, DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY, DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT } from '@helia/interface' +import { PeerQueue } from '@libp2p/utils/peer-queue' +import { multiaddrToUri } from '@multiformats/multiaddr-to-uri' +import pDefer from 'p-defer' import { TrustlessGateway } from './trustless-gateway.js' import { DEFAULT_TRUSTLESS_GATEWAYS } from './index.js' import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayComponents, TrustlessGatewayGetBlockProgressEvents } from './index.js' -import type { BlockRetrievalOptions, BlockBroker } from '@helia/interface/blocks' +import type { Routing, BlockRetrievalOptions, BlockBroker, CreateSessionOptions } from '@helia/interface' import type { Logger } from '@libp2p/interface' import type { CID } from 'multiformats/cid' +export interface CreateTrustlessGatewaySessionOptions extends CreateSessionOptions { + /** + * Specify the cache control header to send to the remote. 'only-if-cached' + * will prevent the gateway from fetching the content if they don't have it. + * + * @default only-if-cached + */ + cacheControl?: string +} + /** * A class that accepts a list of trustless gateways that are queried * for blocks. */ export class TrustlessGatewayBlockBroker implements BlockBroker { + private readonly components: TrustlessGatewayComponents private readonly gateways: TrustlessGateway[] + private readonly routing: Routing private readonly log: Logger constructor (components: TrustlessGatewayComponents, init: TrustlessGatewayBlockBrokerInit = {}) { + this.components = components this.log = components.logger.forComponent('helia:trustless-gateway-block-broker') + this.routing = components.routing this.gateways = (init.gateways ?? DEFAULT_TRUSTLESS_GATEWAYS) .map((gatewayOrUrl) => { - return new TrustlessGateway(gatewayOrUrl) + return new TrustlessGateway(gatewayOrUrl, components.logger) }) } + addGateway (gatewayOrUrl: string): void { + this.gateways.push(new TrustlessGateway(gatewayOrUrl, this.components.logger)) + } + async retrieve (cid: CID, options: BlockRetrievalOptions = {}): Promise { // Loop through the gateways until we get a block or run out of gateways // TODO: switch to toSorted when support is better @@ -38,7 +60,7 @@ export class TrustlessGatewayBlockBroker implements BlockBroker 0) { + throw new AggregateError(aggregateErrors, `Unable to fetch raw block for CID ${cid} from any gateway`) + } else { + throw new Error(`Unable to fetch raw block for CID ${cid} from any gateway`) + } + } + + async createSession (root: CID, options: CreateTrustlessGatewaySessionOptions = {}): Promise> { + const gateways: string[] = [] + const minProviders = options.minProviders ?? DEFAULT_SESSION_MIN_PROVIDERS + const maxProviders = options.minProviders ?? DEFAULT_SESSION_MAX_PROVIDERS + const deferred = pDefer>() + const broker = new TrustlessGatewayBlockBroker(this.components, { + gateways + }) + + this.log('finding transport-ipfs-gateway-http providers for cid %c', root) + + const queue = new PeerQueue({ + concurrency: options.providerQueryConcurrency ?? DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY + }) + + Promise.resolve().then(async () => { + for await (const provider of this.routing.findProviders(root, options)) { + if (provider.protocols == null || !provider.protocols.includes('transport-ipfs-gateway-http')) { + continue + } + + this.log('found transport-ipfs-gateway-http provider %p for cid %c', provider.id, root) + + void queue.add(async () => { + for (const ma of provider.multiaddrs) { + let uri: string | undefined + + try { + // /ip4/x.x.x.x/tcp/31337/http + // /ip4/x.x.x.x/tcp/31337/https + // etc + uri = multiaddrToUri(ma) + + const resource = `${uri}/ipfs/${root.toString()}?format=raw` + + // make sure the peer is available - HEAD support doesn't seem to + // be very widely implemented so as long as the remote responds + // we are happy they are valid + // https://specs.ipfs.tech/http-gateways/trustless-gateway/#head-ipfs-cid-path-params + const response = await fetch(resource, { + method: 'HEAD', + headers: { + Accept: 'application/vnd.ipld.raw', + 'Cache-Control': options.cacheControl ?? 'only-if-cached' + }, + signal: AbortSignal.timeout(options.providerQueryTimeout ?? DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT) + }) + + this.log('HEAD %s %d', resource, response.status) + gateways.push(uri) + broker.addGateway(uri) + + this.log('found %d transport-ipfs-gateway-http providers for cid %c', gateways.length, root) + + if (gateways.length === minProviders) { + deferred.resolve(broker) + } + + if (gateways.length === maxProviders) { + queue.clear() + } + } catch (err: any) { + this.log.error('could not fetch %c from %a', root, uri ?? ma, err) + } + } + }) + } + }) + .catch(err => { + this.log.error('error creating session for %c', root, err) + }) + + return deferred.promise } } diff --git a/packages/block-brokers/src/trustless-gateway/index.ts b/packages/block-brokers/src/trustless-gateway/index.ts index 93489ecbd..16f0ac07d 100644 --- a/packages/block-brokers/src/trustless-gateway/index.ts +++ b/packages/block-brokers/src/trustless-gateway/index.ts @@ -1,5 +1,5 @@ import { TrustlessGatewayBlockBroker } from './broker.js' -import type { BlockBroker } from '@helia/interface/src/blocks.js' +import type { Routing, BlockBroker } from '@helia/interface' import type { ComponentLogger } from '@libp2p/interface' import type { ProgressEvent } from 'progress-events' @@ -22,6 +22,7 @@ export interface TrustlessGatewayBlockBrokerInit { } export interface TrustlessGatewayComponents { + routing: Routing logger: ComponentLogger } diff --git a/packages/block-brokers/src/trustless-gateway/trustless-gateway.ts b/packages/block-brokers/src/trustless-gateway/trustless-gateway.ts index 352d6ea38..1da76d2bb 100644 --- a/packages/block-brokers/src/trustless-gateway/trustless-gateway.ts +++ b/packages/block-brokers/src/trustless-gateway/trustless-gateway.ts @@ -1,3 +1,4 @@ +import type { ComponentLogger, Logger } from '@libp2p/interface' import type { CID } from 'multiformats/cid' /** @@ -36,8 +37,11 @@ export class TrustlessGateway { */ #successes = 0 - constructor (url: URL | string) { + private readonly log: Logger + + constructor (url: URL | string, logger: ComponentLogger) { this.url = url instanceof URL ? url : new URL(url) + this.log = logger.forComponent(`helia:trustless-gateway-block-broker:${this.url.hostname}`) } /** @@ -67,6 +71,9 @@ export class TrustlessGateway { }, cache: 'force-cache' }) + + this.log('GET %s %d', gwUrl, res.status) + if (!res.ok) { this.#errors++ throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`) diff --git a/packages/block-brokers/test/trustless-gateway.spec.ts b/packages/block-brokers/test/trustless-gateway.spec.ts index 9851e4121..684328081 100644 --- a/packages/block-brokers/test/trustless-gateway.spec.ts +++ b/packages/block-brokers/test/trustless-gateway.spec.ts @@ -1,13 +1,17 @@ /* eslint-env mocha */ import { defaultLogger } from '@libp2p/logger' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { multiaddr } from '@multiformats/multiaddr' +import { uriToMultiaddr } from '@multiformats/uri-to-multiaddr' import { expect } from 'aegir/chai' import * as raw from 'multiformats/codecs/raw' import Sinon from 'sinon' -import { type StubbedInstance, stubConstructor } from 'sinon-ts' +import { type StubbedInstance, stubConstructor, stubInterface } from 'sinon-ts' import { TrustlessGatewayBlockBroker } from '../src/trustless-gateway/broker.js' import { TrustlessGateway } from '../src/trustless-gateway/trustless-gateway.js' import { createBlock } from './fixtures/create-block.js' +import type { Routing } from '@helia/interface' import type { BlockBroker } from '@helia/interface/blocks' import type { CID } from 'multiformats/cid' @@ -15,6 +19,7 @@ describe('trustless-gateway-block-broker', () => { let blocks: Array<{ cid: CID, block: Uint8Array }> let gatewayBlockBroker: BlockBroker let gateways: Array> + let routing: StubbedInstance // take a Record) => void> and stub the gateways // Record.default is the default handler @@ -29,6 +34,7 @@ describe('trustless-gateway-block-broker', () => { } beforeEach(async () => { + routing = stubInterface() blocks = [] for (let i = 0; i < 10; i++) { @@ -36,12 +42,13 @@ describe('trustless-gateway-block-broker', () => { } gateways = [ - stubConstructor(TrustlessGateway, 'http://localhost:8080'), - stubConstructor(TrustlessGateway, 'http://localhost:8081'), - stubConstructor(TrustlessGateway, 'http://localhost:8082'), - stubConstructor(TrustlessGateway, 'http://localhost:8083') + stubConstructor(TrustlessGateway, 'http://localhost:8080', defaultLogger()), + stubConstructor(TrustlessGateway, 'http://localhost:8081', defaultLogger()), + stubConstructor(TrustlessGateway, 'http://localhost:8082', defaultLogger()), + stubConstructor(TrustlessGateway, 'http://localhost:8083', defaultLogger()) ] gatewayBlockBroker = new TrustlessGatewayBlockBroker({ + routing, logger: defaultLogger() }) // must copy the array because the broker calls .sort which mutates in-place @@ -150,4 +157,43 @@ describe('trustless-gateway-block-broker', () => { expect(gateways[1].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false() expect(gateways[2].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false() }) + + it('creates a session', async () => { + routing.findProviders.returns(async function * () { + // non-http provider + yield { + id: await createEd25519PeerId(), + multiaddrs: [ + multiaddr('/ip4/132.32.25.6/tcp/1234') + ], + protocols: [ + 'transport-bitswap' + ] + } + // expired peer info + yield { + id: await createEd25519PeerId(), + multiaddrs: [] + } + // http gateway + yield { + id: await createEd25519PeerId(), + multiaddrs: [ + uriToMultiaddr(process.env.TRUSTLESS_GATEWAY ?? '') + ], + protocols: [ + 'transport-ipfs-gateway-http' + ] + } + }()) + + const sessionBlockstore = await gatewayBlockBroker.createSession?.(blocks[0].cid, { + minProviders: 1, + providerQueryConcurrency: 1 + }) + + expect(sessionBlockstore).to.be.ok() + + await expect(sessionBlockstore?.retrieve?.(blocks[0].cid)).to.eventually.deep.equal(blocks[0].block) + }) }) From 60365d2cef853938cba07855b86ae7dab5f51f75 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 1 Mar 2024 17:50:57 +0000 Subject: [PATCH 14/15] chore: close server --- packages/block-brokers/.aegir.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/block-brokers/.aegir.js b/packages/block-brokers/.aegir.js index c3b498b03..f6afadd65 100644 --- a/packages/block-brokers/.aegir.js +++ b/packages/block-brokers/.aegir.js @@ -28,7 +28,7 @@ const options = { } }, async after (options, before) { - await before.server.stop() + await before.server.server.close() } } } From ac06ff6781ac70bac9442fdb0102cd92f27b6bb8 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 14 Mar 2024 15:09:36 +0100 Subject: [PATCH 15/15] chore: filter by http address --- packages/block-brokers/package.json | 1 + .../src/trustless-gateway/broker.ts | 43 ++++++++++++++++++- .../test/trustless-gateway.spec.ts | 13 ++---- 3 files changed, 46 insertions(+), 11 deletions(-) diff --git a/packages/block-brokers/package.json b/packages/block-brokers/package.json index f454f00be..5f9b5f7b1 100644 --- a/packages/block-brokers/package.json +++ b/packages/block-brokers/package.json @@ -56,6 +56,7 @@ "@helia/interface": "^4.0.1", "@libp2p/interface": "^1.1.4", "@libp2p/utils": "^5.2.6", + "@multiformats/multiaddr-matcher": "^1.2.0", "@multiformats/multiaddr-to-uri": "^10.0.1", "interface-blockstore": "^5.2.10", "ipfs-bitswap": "^20.0.2", diff --git a/packages/block-brokers/src/trustless-gateway/broker.ts b/packages/block-brokers/src/trustless-gateway/broker.ts index c0d9c8aca..7d3a42bde 100644 --- a/packages/block-brokers/src/trustless-gateway/broker.ts +++ b/packages/block-brokers/src/trustless-gateway/broker.ts @@ -1,5 +1,7 @@ import { DEFAULT_SESSION_MIN_PROVIDERS, DEFAULT_SESSION_MAX_PROVIDERS, DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY, DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT } from '@helia/interface' import { PeerQueue } from '@libp2p/utils/peer-queue' +import { isPrivateIp } from '@libp2p/utils/private-ip' +import { DNS, HTTP, HTTPS } from '@multiformats/multiaddr-matcher' import { multiaddrToUri } from '@multiformats/multiaddr-to-uri' import pDefer from 'p-defer' import { TrustlessGateway } from './trustless-gateway.js' @@ -17,6 +19,22 @@ export interface CreateTrustlessGatewaySessionOptions extends CreateSessionOptio * @default only-if-cached */ cacheControl?: string + + /** + * By default we will only connect to peers with HTTPS addresses, pass true + * to also connect to HTTP addresses. + * + * @default false + */ + allowInsecure?: boolean + + /** + * By default we will only connect to peers with public or DNS addresses, pass + * true to also connect to private addresses. + * + * @default false + */ + allowLocal?: boolean } /** @@ -103,14 +121,30 @@ export class TrustlessGatewayBlockBroker implements BlockBroker { for await (const provider of this.routing.findProviders(root, options)) { - if (provider.protocols == null || !provider.protocols.includes('transport-ipfs-gateway-http')) { + const httpAddresses = provider.multiaddrs.filter(ma => { + if (HTTPS.matches(ma) || (options.allowInsecure === true && HTTP.matches(ma))) { + if (options.allowLocal === true) { + return true + } + + if (DNS.matches(ma)) { + return true + } + + return isPrivateIp(ma.toOptions().host) === false + } + + return false + }) + + if (httpAddresses.length === 0) { continue } this.log('found transport-ipfs-gateway-http provider %p for cid %c', provider.id, root) void queue.add(async () => { - for (const ma of provider.multiaddrs) { + for (const ma of httpAddresses) { let uri: string | undefined try { @@ -125,6 +159,11 @@ export class TrustlessGatewayBlockBroker implements BlockBroker { let blocks: Array<{ cid: CID, block: Uint8Array }> - let gatewayBlockBroker: BlockBroker + let gatewayBlockBroker: TrustlessGatewayBlockBroker let gateways: Array> let routing: StubbedInstance @@ -165,9 +164,6 @@ describe('trustless-gateway-block-broker', () => { id: await createEd25519PeerId(), multiaddrs: [ multiaddr('/ip4/132.32.25.6/tcp/1234') - ], - protocols: [ - 'transport-bitswap' ] } // expired peer info @@ -180,16 +176,15 @@ describe('trustless-gateway-block-broker', () => { id: await createEd25519PeerId(), multiaddrs: [ uriToMultiaddr(process.env.TRUSTLESS_GATEWAY ?? '') - ], - protocols: [ - 'transport-ipfs-gateway-http' ] } }()) const sessionBlockstore = await gatewayBlockBroker.createSession?.(blocks[0].cid, { minProviders: 1, - providerQueryConcurrency: 1 + providerQueryConcurrency: 1, + allowInsecure: true, + allowLocal: true }) expect(sessionBlockstore).to.be.ok()