diff --git a/packages/block-brokers/.aegir.js b/packages/block-brokers/.aegir.js new file mode 100644 index 000000000..c3b498b03 --- /dev/null +++ b/packages/block-brokers/.aegir.js @@ -0,0 +1,36 @@ +import cors from 'cors' +import polka from 'polka' + +/** @type {import('aegir').PartialOptions} */ +const options = { + test: { + async before (options) { + const server = polka({ + port: 0, + host: '127.0.0.1' + }) + server.use(cors()) + server.all('/ipfs/bafkreiefnkxuhnq3536qo2i2w3tazvifek4mbbzb6zlq3ouhprjce5c3aq', (req, res) => { + res.writeHead(200, { + 'content-type': 'application/octet-stream' + }) + res.end(Uint8Array.from([0, 1, 2, 0])) + }) + + await server.listen() + const { port } = server.server.address() + + return { + server, + env: { + TRUSTLESS_GATEWAY: `http://127.0.0.1:${port}` + } + } + }, + async after (options, before) { + await before.server.stop() + } + } +} + +export default options diff --git a/packages/block-brokers/package.json b/packages/block-brokers/package.json index aaaa89bca..f454f00be 100644 --- a/packages/block-brokers/package.json +++ b/packages/block-brokers/package.json @@ -55,15 +55,23 @@ "dependencies": { "@helia/interface": "^4.0.1", "@libp2p/interface": "^1.1.4", + "@libp2p/utils": "^5.2.6", + "@multiformats/multiaddr-to-uri": "^10.0.1", "interface-blockstore": "^5.2.10", "ipfs-bitswap": "^20.0.2", "multiformats": "^13.1.0", + "p-defer": "^4.0.0", "progress-events": "^1.0.0" }, "devDependencies": { "@libp2p/logger": "^4.0.7", + "@libp2p/peer-id-factory": "^4.0.7", + "@multiformats/multiaddr": "^12.1.14", + "@multiformats/uri-to-multiaddr": "^8.0.0", "@types/sinon": "^17.0.3", "aegir": "^42.2.5", + "cors": "^2.8.5", + "polka": "^0.5.2", "sinon": "^17.0.1", "sinon-ts": "^2.0.0" } diff --git a/packages/block-brokers/src/trustless-gateway/broker.ts b/packages/block-brokers/src/trustless-gateway/broker.ts index d19548167..c0d9c8aca 100644 --- a/packages/block-brokers/src/trustless-gateway/broker.ts +++ b/packages/block-brokers/src/trustless-gateway/broker.ts @@ -1,26 +1,48 @@ +import { DEFAULT_SESSION_MIN_PROVIDERS, DEFAULT_SESSION_MAX_PROVIDERS, DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY, DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT } from '@helia/interface' +import { PeerQueue } from '@libp2p/utils/peer-queue' +import { multiaddrToUri } from '@multiformats/multiaddr-to-uri' +import pDefer from 'p-defer' import { TrustlessGateway } from './trustless-gateway.js' import { DEFAULT_TRUSTLESS_GATEWAYS } from './index.js' import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayComponents, TrustlessGatewayGetBlockProgressEvents } from './index.js' -import type { BlockRetrievalOptions, BlockBroker } from '@helia/interface/blocks' +import type { Routing, BlockRetrievalOptions, BlockBroker, CreateSessionOptions } from '@helia/interface' import type { Logger } from '@libp2p/interface' import type { CID } from 'multiformats/cid' +export interface CreateTrustlessGatewaySessionOptions extends CreateSessionOptions { + /** + * Specify the cache control header to send to the remote. 'only-if-cached' + * will prevent the gateway from fetching the content if they don't have it. + * + * @default only-if-cached + */ + cacheControl?: string +} + /** * A class that accepts a list of trustless gateways that are queried * for blocks. */ export class TrustlessGatewayBlockBroker implements BlockBroker { + private readonly components: TrustlessGatewayComponents private readonly gateways: TrustlessGateway[] + private readonly routing: Routing private readonly log: Logger constructor (components: TrustlessGatewayComponents, init: TrustlessGatewayBlockBrokerInit = {}) { + this.components = components this.log = components.logger.forComponent('helia:trustless-gateway-block-broker') + this.routing = components.routing this.gateways = (init.gateways ?? DEFAULT_TRUSTLESS_GATEWAYS) .map((gatewayOrUrl) => { - return new TrustlessGateway(gatewayOrUrl) + return new TrustlessGateway(gatewayOrUrl, components.logger) }) } + addGateway (gatewayOrUrl: string): void { + this.gateways.push(new TrustlessGateway(gatewayOrUrl, this.components.logger)) + } + async retrieve (cid: CID, options: BlockRetrievalOptions = {}): Promise { // Loop through the gateways until we get a block or run out of gateways // TODO: switch to toSorted when support is better @@ -38,7 +60,7 @@ export class TrustlessGatewayBlockBroker implements BlockBroker 0) { + throw new AggregateError(aggregateErrors, `Unable to fetch raw block for CID ${cid} from any gateway`) + } else { + throw new Error(`Unable to fetch raw block for CID ${cid} from any gateway`) + } + } + + async createSession (root: CID, options: CreateTrustlessGatewaySessionOptions = {}): Promise> { + const gateways: string[] = [] + const minProviders = options.minProviders ?? DEFAULT_SESSION_MIN_PROVIDERS + const maxProviders = options.minProviders ?? DEFAULT_SESSION_MAX_PROVIDERS + const deferred = pDefer>() + const broker = new TrustlessGatewayBlockBroker(this.components, { + gateways + }) + + this.log('finding transport-ipfs-gateway-http providers for cid %c', root) + + const queue = new PeerQueue({ + concurrency: options.providerQueryConcurrency ?? DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY + }) + + Promise.resolve().then(async () => { + for await (const provider of this.routing.findProviders(root, options)) { + if (provider.protocols == null || !provider.protocols.includes('transport-ipfs-gateway-http')) { + continue + } + + this.log('found transport-ipfs-gateway-http provider %p for cid %c', provider.id, root) + + void queue.add(async () => { + for (const ma of provider.multiaddrs) { + let uri: string | undefined + + try { + // /ip4/x.x.x.x/tcp/31337/http + // /ip4/x.x.x.x/tcp/31337/https + // etc + uri = multiaddrToUri(ma) + + const resource = `${uri}/ipfs/${root.toString()}?format=raw` + + // make sure the peer is available - HEAD support doesn't seem to + // be very widely implemented so as long as the remote responds + // we are happy they are valid + // https://specs.ipfs.tech/http-gateways/trustless-gateway/#head-ipfs-cid-path-params + const response = await fetch(resource, { + method: 'HEAD', + headers: { + Accept: 'application/vnd.ipld.raw', + 'Cache-Control': options.cacheControl ?? 'only-if-cached' + }, + signal: AbortSignal.timeout(options.providerQueryTimeout ?? DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT) + }) + + this.log('HEAD %s %d', resource, response.status) + gateways.push(uri) + broker.addGateway(uri) + + this.log('found %d transport-ipfs-gateway-http providers for cid %c', gateways.length, root) + + if (gateways.length === minProviders) { + deferred.resolve(broker) + } + + if (gateways.length === maxProviders) { + queue.clear() + } + } catch (err: any) { + this.log.error('could not fetch %c from %a', root, uri ?? ma, err) + } + } + }) + } + }) + .catch(err => { + this.log.error('error creating session for %c', root, err) + }) + + return deferred.promise } } diff --git a/packages/block-brokers/src/trustless-gateway/index.ts b/packages/block-brokers/src/trustless-gateway/index.ts index 93489ecbd..16f0ac07d 100644 --- a/packages/block-brokers/src/trustless-gateway/index.ts +++ b/packages/block-brokers/src/trustless-gateway/index.ts @@ -1,5 +1,5 @@ import { TrustlessGatewayBlockBroker } from './broker.js' -import type { BlockBroker } from '@helia/interface/src/blocks.js' +import type { Routing, BlockBroker } from '@helia/interface' import type { ComponentLogger } from '@libp2p/interface' import type { ProgressEvent } from 'progress-events' @@ -22,6 +22,7 @@ export interface TrustlessGatewayBlockBrokerInit { } export interface TrustlessGatewayComponents { + routing: Routing logger: ComponentLogger } diff --git a/packages/block-brokers/src/trustless-gateway/trustless-gateway.ts b/packages/block-brokers/src/trustless-gateway/trustless-gateway.ts index 352d6ea38..1da76d2bb 100644 --- a/packages/block-brokers/src/trustless-gateway/trustless-gateway.ts +++ b/packages/block-brokers/src/trustless-gateway/trustless-gateway.ts @@ -1,3 +1,4 @@ +import type { ComponentLogger, Logger } from '@libp2p/interface' import type { CID } from 'multiformats/cid' /** @@ -36,8 +37,11 @@ export class TrustlessGateway { */ #successes = 0 - constructor (url: URL | string) { + private readonly log: Logger + + constructor (url: URL | string, logger: ComponentLogger) { this.url = url instanceof URL ? url : new URL(url) + this.log = logger.forComponent(`helia:trustless-gateway-block-broker:${this.url.hostname}`) } /** @@ -67,6 +71,9 @@ export class TrustlessGateway { }, cache: 'force-cache' }) + + this.log('GET %s %d', gwUrl, res.status) + if (!res.ok) { this.#errors++ throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`) diff --git a/packages/block-brokers/test/trustless-gateway.spec.ts b/packages/block-brokers/test/trustless-gateway.spec.ts index 9851e4121..684328081 100644 --- a/packages/block-brokers/test/trustless-gateway.spec.ts +++ b/packages/block-brokers/test/trustless-gateway.spec.ts @@ -1,13 +1,17 @@ /* eslint-env mocha */ import { defaultLogger } from '@libp2p/logger' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { multiaddr } from '@multiformats/multiaddr' +import { uriToMultiaddr } from '@multiformats/uri-to-multiaddr' import { expect } from 'aegir/chai' import * as raw from 'multiformats/codecs/raw' import Sinon from 'sinon' -import { type StubbedInstance, stubConstructor } from 'sinon-ts' +import { type StubbedInstance, stubConstructor, stubInterface } from 'sinon-ts' import { TrustlessGatewayBlockBroker } from '../src/trustless-gateway/broker.js' import { TrustlessGateway } from '../src/trustless-gateway/trustless-gateway.js' import { createBlock } from './fixtures/create-block.js' +import type { Routing } from '@helia/interface' import type { BlockBroker } from '@helia/interface/blocks' import type { CID } from 'multiformats/cid' @@ -15,6 +19,7 @@ describe('trustless-gateway-block-broker', () => { let blocks: Array<{ cid: CID, block: Uint8Array }> let gatewayBlockBroker: BlockBroker let gateways: Array> + let routing: StubbedInstance // take a Record) => void> and stub the gateways // Record.default is the default handler @@ -29,6 +34,7 @@ describe('trustless-gateway-block-broker', () => { } beforeEach(async () => { + routing = stubInterface() blocks = [] for (let i = 0; i < 10; i++) { @@ -36,12 +42,13 @@ describe('trustless-gateway-block-broker', () => { } gateways = [ - stubConstructor(TrustlessGateway, 'http://localhost:8080'), - stubConstructor(TrustlessGateway, 'http://localhost:8081'), - stubConstructor(TrustlessGateway, 'http://localhost:8082'), - stubConstructor(TrustlessGateway, 'http://localhost:8083') + stubConstructor(TrustlessGateway, 'http://localhost:8080', defaultLogger()), + stubConstructor(TrustlessGateway, 'http://localhost:8081', defaultLogger()), + stubConstructor(TrustlessGateway, 'http://localhost:8082', defaultLogger()), + stubConstructor(TrustlessGateway, 'http://localhost:8083', defaultLogger()) ] gatewayBlockBroker = new TrustlessGatewayBlockBroker({ + routing, logger: defaultLogger() }) // must copy the array because the broker calls .sort which mutates in-place @@ -150,4 +157,43 @@ describe('trustless-gateway-block-broker', () => { expect(gateways[1].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false() expect(gateways[2].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false() }) + + it('creates a session', async () => { + routing.findProviders.returns(async function * () { + // non-http provider + yield { + id: await createEd25519PeerId(), + multiaddrs: [ + multiaddr('/ip4/132.32.25.6/tcp/1234') + ], + protocols: [ + 'transport-bitswap' + ] + } + // expired peer info + yield { + id: await createEd25519PeerId(), + multiaddrs: [] + } + // http gateway + yield { + id: await createEd25519PeerId(), + multiaddrs: [ + uriToMultiaddr(process.env.TRUSTLESS_GATEWAY ?? '') + ], + protocols: [ + 'transport-ipfs-gateway-http' + ] + } + }()) + + const sessionBlockstore = await gatewayBlockBroker.createSession?.(blocks[0].cid, { + minProviders: 1, + providerQueryConcurrency: 1 + }) + + expect(sessionBlockstore).to.be.ok() + + await expect(sessionBlockstore?.retrieve?.(blocks[0].cid)).to.eventually.deep.equal(blocks[0].block) + }) })