From 0e2d24d81d53b6b55c25b02395f9de17bfdd6190 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Wed, 9 Oct 2019 16:48:44 +0200 Subject: [PATCH] feat: gc uses multihashes instead CIDs --- src/cli/commands/repo/gc.js | 2 +- src/core/components/pin/gc.js | 62 +++++++++++------------------ src/http/api/resources/repo.js | 2 +- test/core/gc.spec.js | 71 +++++++++++++++++++--------------- 4 files changed, 63 insertions(+), 74 deletions(-) diff --git a/src/cli/commands/repo/gc.js b/src/cli/commands/repo/gc.js index b805ac9934..5eb6d7a672 100644 --- a/src/cli/commands/repo/gc.js +++ b/src/cli/commands/repo/gc.js @@ -27,7 +27,7 @@ module.exports = { if (r.err) { streamErrors && print(r.err.message, true, true) } else { - print((quiet ? '' : 'removed ') + r.cid) + print((quiet ? '' : 'removed ') + r.multihash) } } })()) diff --git a/src/core/components/pin/gc.js b/src/core/components/pin/gc.js index a974a85de5..c03f0abe66 100644 --- a/src/core/components/pin/gc.js +++ b/src/core/components/pin/gc.js @@ -3,7 +3,6 @@ const CID = require('cids') const base32 = require('base32.js') const callbackify = require('callbackify') -const { cidToString } = require('../../../utils/cid') const log = require('debug')('ipfs:gc') const { default: Queue } = require('p-queue') // TODO: Use exported key from root when upgraded to ipfs-mfs@>=13 @@ -47,7 +46,7 @@ module.exports = function gc (self) { }) } -// Get Set of CIDs of blocks to keep +// Get Set of multihashes of blocks to keep async function createMarkedSet (ipfs) { const output = new Set() @@ -55,7 +54,8 @@ async function createMarkedSet (ipfs) { log(`Found ${pins.length} pinned blocks`) pins.forEach(pin => { - output.add(cidToString(new CID(pin), { base: 'base32' })) + const cid = new CID(pin) + output.add(base32.encode(cid.multihash)) }) } @@ -91,7 +91,6 @@ async function getDescendants (ipfs, cid) { const refs = await ipfs.refs(cid, { recursive: true }) const cids = [cid, ...refs.map(r => new CID(r.ref))] log(`Found ${cids.length} MFS blocks`) - // log(' ' + cids.join('\n ')) return cids } @@ -100,54 +99,37 @@ async function getDescendants (ipfs, cid) { async function deleteUnmarkedBlocks (ipfs, markedSet, blockKeys) { // Iterate through all blocks and find those that are not in the marked set // The blockKeys variable has the form [ { key: Key() }, { key: Key() }, ... ] - const unreferenced = [] const result = [] + let blockCounter = 0 const queue = new Queue({ concurrency: BLOCK_RM_CONCURRENCY }) for await (const { key: k } of blockKeys) { - try { - const cid = dsKeyToCid(k) - const b32 = cid.toV1().toString('base32') - if (!markedSet.has(b32)) { - unreferenced.push(cid) - - queue.add(async () => { - const res = { - cid - } - - try { - await ipfs._repo.blocks.delete(cid) - } catch (err) { - res.err = new Error(`Could not delete block with CID ${cid}: ${err.message}`) - } - - result.push(res) - }) - } - } catch (err) { - const msg = `Could not convert block with key '${k}' to CID` - log(msg, err) - result.push({ err: new Error(msg + `: ${err.message}`) }) + blockCounter++ + const multihashString = k.toString().substr(1) + if (!markedSet.has(multihashString)) { + queue.add(async () => { + const res = { + multihash: multihashString + } + + try { + await ipfs._repo.blocks.delete(Buffer.from(multihashString)) + } catch (err) { + res.err = new Error(`Could not delete block with multihash ${multihashString}: ${err.message}`) + } + + result.push(res) + }) } } await queue.onIdle() - log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blockKeys.length} blocks. ` + - `Deleted ${unreferenced.length} blocks.`) + log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blockCounter} blocks. ` + + `Deleted ${result.filter(res => res.err === undefined).length} blocks.`) return result } - -// TODO: Use exported utility when upgrade to ipfs-repo@>=0.27.1 -// https://github.com/ipfs/js-ipfs-repo/pull/206 -function dsKeyToCid (key) { - // Block key is of the form / - const decoder = new base32.Decoder() - const buff = decoder.write(key.toString().slice(1)).finalize() - return new CID(Buffer.from(buff)) -} diff --git a/src/http/api/resources/repo.js b/src/http/api/resources/repo.js index 108ce6abda..39768223e8 100644 --- a/src/http/api/resources/repo.js +++ b/src/http/api/resources/repo.js @@ -18,7 +18,7 @@ exports.gc = { const response = filtered.map(r => { return { Err: r.err && r.err.message, - Key: !r.err && { '/': r.cid.toString() } + Key: !r.err && r.multihash } }) return h.response(response) diff --git a/test/core/gc.spec.js b/test/core/gc.spec.js index 42022f3fed..059ffb3bbc 100644 --- a/test/core/gc.spec.js +++ b/test/core/gc.spec.js @@ -7,6 +7,8 @@ const IPFSFactory = require('ipfsd-ctl') const pEvent = require('p-event') const env = require('ipfs-utils/src/env') const IPFS = require('../../src/core') +const CID = require('cids') +const base32 = require('base32.js') const { Errors } = require('interface-datastore') // We need to detect when a readLock or writeLock is requested for the tests @@ -90,17 +92,17 @@ describe('gc', function () { name: 'add', add1: () => ipfs.add(fixtures[0], { pin: false }), add2: () => ipfs.add(fixtures[1], { pin: false }), - resToCid: (res) => res[0].hash + resToMultihash: (res) => base32.encode(new CID(res[0].hash).multihash) }, { name: 'object put', add1: () => ipfs.object.put({ Data: 'obj put 1', Links: [] }), add2: () => ipfs.object.put({ Data: 'obj put 2', Links: [] }), - resToCid: (res) => res.toString() + resToMultihash: (res) => base32.encode(res.multihash) }, { name: 'block put', add1: () => ipfs.block.put(Buffer.from('block put 1'), null), add2: () => ipfs.block.put(Buffer.from('block put 2'), null), - resToCid: (res) => res.cid.toString() + resToMultihash: (res) => base32.encode(res.cid.multihash) }] describe('locks', function () { @@ -122,9 +124,9 @@ describe('gc', function () { await gcStarted const add2 = test.add2() - const deleted = (await gc).map(i => i.cid.toString()) - const add1Res = test.resToCid(await add1) - const add2Res = test.resToCid(await add2) + const deleted = (await gc).map(i => i.multihash) + const add1Res = test.resToMultihash(await add1) + const add2Res = test.resToMultihash(await add2) // Should have garbage collected blocks from first add, because GC should // have waited for first add to finish @@ -152,9 +154,9 @@ describe('gc', function () { await gcStarted const add2 = ipfs.add(fixtures[3], { pin: true }) - const deleted = (await gc).map(i => i.cid.toString()) - const add1Res = (await add1)[0].hash - const add2Res = (await add2)[0].hash + const deleted = (await gc).map(i => i.multihash) + const add1Res = base32.encode(new CID((await add1)[0].hash).multihash) + const add2Res = base32.encode(new CID((await add2)[0].hash).multihash) // Should not have garbage collected blocks from first add, because GC should // have waited for first add + pin to finish (protected by pin) @@ -168,7 +170,9 @@ describe('gc', function () { it('garbage collection should wait for pending block rm to finish', async () => { // Add two blocks so that we can remove them const cid1 = (await ipfs.block.put(Buffer.from('block to rm 1'), null)).cid + const cid1Multihash = base32.encode(cid1.multihash) const cid2 = (await ipfs.block.put(Buffer.from('block to rm 2'), null)).cid + const cid2Multihash = base32.encode(cid2.multihash) // Remove first block from IPFS // Note: block rm will take a write lock @@ -185,33 +189,34 @@ describe('gc', function () { await gcStarted const rm2 = ipfs.block.rm(cid2) - const deleted = (await gc).map(i => i.cid.toString()) - await rm1 - - // Second rm should fail because GC has already removed that block - try { - await rm2 - } catch (err) { - expect(err.code).eql(Errors.dbDeleteFailedError().code) - } + const deleted = (await gc).map(i => i.multihash) + const rm1Out = await rm1 + expect(rm1Out[0]).to.not.have.property('error') // Confirm second block has been removed - const localRefs = (await ipfs.refs.local()).map(r => r.ref) - expect(localRefs).not.includes(cid2.toString()) + const localMultihashes = (await ipfs.refs.local()).map(r => base32.encode(new CID(r.ref).multihash)) + expect(localMultihashes).not.includes(cid2Multihash) + + // Second rm should fail because GC has already removed that block + expect((await rm2)[0]) + .to.have.property('error') + .that.has.property('code').that.equal(Errors.dbDeleteFailedError().code) // Should not have garbage collected block from first block put, because // GC should have waited for first rm (removing first block put) to finish - expect(deleted).not.includes(cid1.toString()) + expect(deleted).not.includes(cid1Multihash) // Should have garbage collected block from second block put, because GC // should have completed before second rm (removing second block put) - expect(deleted).includes(cid2.toString()) + expect(deleted).includes(cid2Multihash) }) it('garbage collection should wait for pending pin add to finish', async () => { // Add two blocks so that we can pin them - const cid1 = (await ipfs.block.put(Buffer.from('block to pin add 1'), null)).cid - const cid2 = (await ipfs.block.put(Buffer.from('block to pin add 2'), null)).cid + const cid1 = (await ipfs.block.put(Buffer.from('block to test pin add 1'), null)).cid + const cid2 = (await ipfs.block.put(Buffer.from('block to test pin add 2'), null)).cid + const cid1Multihash = base32.encode(cid1.multihash) + const cid2Multihash = base32.encode(cid2.multihash) // Pin first block // Note: pin add will take a read lock @@ -221,7 +226,7 @@ describe('gc', function () { // Once pin lock has been requested, start GC await pinLockRequested const gc = ipfs.repo.gc() - const deleted = (await gc).map(i => i.cid.toString()) + const deleted = (await gc).map(i => i.multihash) await pin1 // TODO: Adding pin for removed block never returns, which means the lock @@ -229,22 +234,24 @@ describe('gc', function () { // const pin2 = ipfs.pin.add(cid2) // Confirm second second block has been removed - const localRefs = (await ipfs.refs.local()).map(r => r.ref) - expect(localRefs).not.includes(cid2.toString()) + const localMultihashes = (await ipfs.refs.local()).map(r => base32.encode(new CID(r.ref).multihash)) + expect(localMultihashes).not.includes(cid2Multihash) // Should not have garbage collected block from first block put, because // GC should have waited for pin (protecting first block put) to finish - expect(deleted).not.includes(cid1.toString()) + expect(deleted).not.includes(cid1Multihash) // Should have garbage collected block from second block put, because GC // should have completed before second pin - expect(deleted).includes(cid2.toString()) + expect(deleted).includes(cid2Multihash) }) it('garbage collection should wait for pending pin rm to finish', async () => { // Add two blocks so that we can pin them const cid1 = (await ipfs.block.put(Buffer.from('block to pin rm 1'), null)).cid const cid2 = (await ipfs.block.put(Buffer.from('block to pin rm 2'), null)).cid + const cid1Multihash = base32.encode(cid1.multihash) + const cid2Multihash = base32.encode(cid2.multihash) // Pin blocks await ipfs.pin.add(cid1) @@ -265,17 +272,17 @@ describe('gc', function () { await gcStarted const pinRm2 = ipfs.pin.rm(cid2) - const deleted = (await gc).map(i => i.cid.toString()) + const deleted = (await gc).map(i => i.multihash) await pinRm1 await pinRm2 // Should have garbage collected block from first block put, because // GC should have waited for pin rm (unpinning first block put) to finish - expect(deleted).includes(cid1.toString()) + expect(deleted).includes(cid1Multihash) // Should not have garbage collected block from second block put, because // GC should have completed before second block was unpinned - expect(deleted).not.includes(cid2.toString()) + expect(deleted).not.includes(cid2Multihash) }) }) })