diff --git a/amt.go b/amt.go index 586a088..3f98cda 100644 --- a/amt.go +++ b/amt.go @@ -321,6 +321,14 @@ func (r *Root) ForEachAt(ctx context.Context, start uint64, cb func(uint64, *cbg return r.node.forEachAt(ctx, r.store, r.bitWidth, r.height, start, 0, cb) } +func (r *Root) ForEachParallel(ctx context.Context, concurrency int, cb func(uint64, *cbg.Deferred) error) error { + return r.node.forEachAtParallel(ctx, r.store, r.bitWidth, r.height, 0, 0, cb, concurrency) +} + +func (r *Root) ForEachAtParallel(ctx context.Context, concurrency int, start uint64, cb func(uint64, *cbg.Deferred) error) error { + return r.node.forEachAtParallel(ctx, r.store, r.bitWidth, r.height, start, 0, cb, concurrency) +} + // FirstSetIndex finds the lowest index in this AMT that has a value set for // it. If this operation is called on an empty AMT, an ErrNoValues will be // returned. diff --git a/amt_test.go b/amt_test.go index a610dba..41e100a 100644 --- a/amt_test.go +++ b/amt_test.go @@ -34,6 +34,13 @@ var ( bitWidths2to3 = []uint{2, 3} ) +func runTestWithBitWidthsOnly(t *testing.T, bitwidths []uint, fn func(*testing.T, ...Option)) { + t.Helper() + for _, bw := range bitwidths { + t.Run(fmt.Sprintf("bitwidth=%d", bw), func(t *testing.T) { fn(t, UseTreeBitWidth(bw)) }) + } +} + func runTestWithBitWidths(t *testing.T, bitwidths []uint, fn func(*testing.T, ...Option)) { t.Helper() if testing.Short() { @@ -62,7 +69,7 @@ func newMockBlocks() *mockBlocks { return &mockBlocks{make(map[cid.Cid]block.Block), sync.Mutex{}, 0, 0} } -func (mb *mockBlocks) Get(c cid.Cid) (block.Block, error) { +func (mb *mockBlocks) Get(ctx context.Context, c cid.Cid) (block.Block, error) { mb.dataMu.Lock() defer mb.dataMu.Unlock() d, ok := mb.data[c] @@ -73,7 +80,24 @@ func (mb *mockBlocks) Get(c cid.Cid) (block.Block, error) { return nil, fmt.Errorf("Not Found") } -func (mb *mockBlocks) Put(b block.Block) error { +func (mb *mockBlocks) GetMany(ctx context.Context, cs []cid.Cid) ([]block.Block, []cid.Cid, error) { + mb.dataMu.Lock() + defer mb.dataMu.Unlock() + blocks := make([]block.Block, 0, len(cs)) + missingCIDs := make([]cid.Cid, 0, len(cs)) + for _, c := range cs { + mb.getCount++ + d, ok := mb.data[c] + if !ok { + missingCIDs = append(missingCIDs, c) + } else { + blocks = append(blocks, d) + } + } + return blocks, missingCIDs, nil +} + +func (mb *mockBlocks) Put(ctx context.Context, b block.Block) error { mb.dataMu.Lock() defer mb.dataMu.Unlock() mb.putCount++ @@ -81,6 +105,16 @@ func (mb *mockBlocks) Put(b block.Block) error { return nil } +func (mb *mockBlocks) PutMany(ctx context.Context, bs []block.Block) error { + mb.dataMu.Lock() + defer mb.dataMu.Unlock() + for _, b := range bs { + mb.putCount++ + mb.data[b.Cid()] = b + } + return nil +} + func (mb *mockBlocks) report(b *testing.B) { mb.dataMu.Lock() defer mb.dataMu.Unlock() @@ -342,6 +376,7 @@ func TestForEachWithoutFlush(t *testing.T) { require.NoError(t, err) set1 := make(map[uint64]struct{}) set2 := make(map[uint64]struct{}) + set3 := make(map[uint64]struct{}) for _, val := range vals { err := amt.Set(ctx, val, cborstr("")) require.NoError(t, err) @@ -357,7 +392,7 @@ func TestForEachWithoutFlush(t *testing.T) { assert.Equal(t, make(map[uint64]struct{}), set1) // ensure it still works after flush - _, err = amt.Flush(ctx) + c, err := amt.Flush(ctx) require.NoError(t, err) amt.ForEach(ctx, func(u uint64, deferred *cbg.Deferred) error { @@ -365,6 +400,15 @@ func TestForEachWithoutFlush(t *testing.T) { return nil }) assert.Equal(t, make(map[uint64]struct{}), set2) + + // ensure that it works with a loaded AMT + loadedAMT, err := LoadAMT(ctx, bs, c, opts...) + err = loadedAMT.ForEach(ctx, func(u uint64, deferred *cbg.Deferred) error { + delete(set3, u) + return nil + }) + require.NoError(t, err) + assert.Equal(t, make(map[uint64]struct{}), set3) } }) } @@ -794,6 +838,94 @@ func TestForEach(t *testing.T) { }) } +func TestForEachParallel(t *testing.T) { + bs := cbor.NewGetManyCborStore(newMockBlocks()) + ctx := context.Background() + a, err := NewAMT(bs) + require.NoError(t, err) + + r := rand.New(rand.NewSource(101)) + + indexes := make(map[uint64]struct{}) + for i := 0; i < 10000; i++ { + if r.Intn(2) == 0 { + indexes[uint64(i)] = struct{}{} + } + } + + for i := range indexes { + if err := a.Set(ctx, i, cborstr("value")); err != nil { + t.Fatal(err) + } + } + + for i := range indexes { + assertGet(ctx, t, a, i, "value") + } + + assertCount(t, a, uint64(len(indexes))) + + // test before flush + m := sync.Mutex{} + foundVals := make(map[uint64]struct{}) + err = a.ForEachParallel(ctx, 16, func(i uint64, v *cbg.Deferred) error { + m.Lock() + foundVals[i] = struct{}{} + m.Unlock() + return nil + }) + if err != nil { + t.Fatal(err) + } + if len(foundVals) != len(indexes) { + t.Fatal("didnt see enough values") + } + + c, err := a.Flush(ctx) + if err != nil { + t.Fatal(err) + } + + assertCount(t, a, uint64(len(indexes))) + + // test after flush + foundVals = make(map[uint64]struct{}) + err = a.ForEachParallel(ctx, 16, func(i uint64, v *cbg.Deferred) error { + m.Lock() + foundVals[i] = struct{}{} + m.Unlock() + return nil + }) + if err != nil { + t.Fatal(err) + } + if len(foundVals) != len(indexes) { + t.Fatal("didnt see enough values") + } + + na, err := LoadAMT(ctx, bs, c) + if err != nil { + t.Fatal(err) + } + + assertCount(t, na, uint64(len(indexes))) + + // test from loaded AMT + foundVals = make(map[uint64]struct{}) + err = na.ForEachParallel(ctx, 16, func(i uint64, v *cbg.Deferred) error { + m.Lock() + foundVals[i] = struct{}{} + m.Unlock() + return nil + }) + if err != nil { + t.Fatal(err) + } + if len(foundVals) != len(indexes) { + t.Fatal("didnt see enough values") + } +} + func TestForEachAt(t *testing.T) { runTestWithBitWidths(t, bitWidths2to18, func(t *testing.T, opts ...Option) { bs := cbor.NewCborStore(newMockBlocks()) @@ -858,6 +990,65 @@ func TestForEachAt(t *testing.T) { }) } +func TestForEachAtParallel(t *testing.T) { + runTestWithBitWidths(t, bitWidths2to18, func(t *testing.T, opts ...Option) { + bs := cbor.NewGetManyCborStore(newMockBlocks()) + ctx := context.Background() + a, err := NewAMT(bs, opts...) + require.NoError(t, err) + + r := rand.New(rand.NewSource(101)) + + var indexes []uint64 + for i := 0; i < cbg.MaxLength; i++ { // above bitwidth 13, inserting more than cbg.MaxLength causes node.Values to exceed the cbg.MaxLength + indexes = append(indexes, uint64(i)) + if err := a.Set(ctx, uint64(i), cborstr(fmt.Sprint(i))); err != nil { + t.Fatal(err) + } + } + + for _, i := range indexes { + assertGet(ctx, t, a, i, fmt.Sprint(i)) + } + + assertCount(t, a, uint64(len(indexes))) + + c, err := a.Flush(ctx) + if err != nil { + t.Fatal(err) + } + + na, err := LoadAMT(ctx, bs, c, opts...) + if err != nil { + t.Fatal(err) + } + + assertCount(t, na, uint64(len(indexes))) + m := sync.Mutex{} + for try := 0; try < 10; try++ { + start := uint64(r.Intn(cbg.MaxLength)) + + expectedIndexes := make(map[uint64]struct{}) + for i := start; i < cbg.MaxLength; i++ { + expectedIndexes[i] = struct{}{} + } + + err = na.ForEachAtParallel(ctx, 16, start, func(i uint64, v *cbg.Deferred) error { + m.Lock() + delete(expectedIndexes, i) + m.Unlock() + return nil + }) + if err != nil { + t.Fatal(err) + } + if len(expectedIndexes) != 0 { + t.Fatal("didnt see enough values") + } + } + }) +} + func TestFirstSetIndex(t *testing.T) { runTestWithBitWidths(t, bitWidths2to18, func(t *testing.T, opts ...Option) { bs := cbor.NewCborStore(newMockBlocks()) diff --git a/go.mod b/go.mod index 6443b3c..308ec9d 100644 --- a/go.mod +++ b/go.mod @@ -3,33 +3,37 @@ module github.com/filecoin-project/go-amt-ipld/v4 go 1.20 require ( - github.com/ipfs/go-block-format v0.0.2 - github.com/ipfs/go-cid v0.0.7 - github.com/ipfs/go-ipld-cbor v0.0.4 + github.com/ipfs/go-block-format v0.1.2 + github.com/ipfs/go-cid v0.4.1 + github.com/ipfs/go-ipld-cbor v0.1.0 github.com/stretchr/testify v1.7.0 - github.com/whyrusleeping/cbor-gen v0.0.0-20220323183124-98fa8256a799 + github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 + golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/ipfs/go-ipfs-util v0.0.1 // indirect - github.com/ipfs/go-ipld-format v0.0.1 // indirect + github.com/gopherjs/gopherjs v1.17.2 // indirect + github.com/ipfs/go-ipfs-util v0.0.2 // indirect + github.com/ipfs/go-ipld-format v0.5.0 // indirect + github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/kr/pretty v0.1.0 // indirect - github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect - github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771 // indirect - github.com/mr-tron/base58 v1.1.3 // indirect - github.com/multiformats/go-base32 v0.0.3 // indirect - github.com/multiformats/go-base36 v0.1.0 // indirect - github.com/multiformats/go-multibase v0.0.3 // indirect - github.com/multiformats/go-multihash v0.0.13 // indirect - github.com/multiformats/go-varint v0.0.5 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect + github.com/mr-tron/base58 v1.2.0 // indirect + github.com/multiformats/go-base32 v0.1.0 // indirect + github.com/multiformats/go-base36 v0.2.0 // indirect + github.com/multiformats/go-multibase v0.2.0 // indirect + github.com/multiformats/go-multihash v0.2.3 // indirect + github.com/multiformats/go-varint v0.0.7 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/polydawn/refmt v0.0.0-20190221155625-df39d6c2d992 // indirect + github.com/polydawn/refmt v0.89.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - golang.org/x/crypto v0.1.0 // indirect - golang.org/x/sys v0.1.0 // indirect + golang.org/x/crypto v0.12.0 // indirect + golang.org/x/sys v0.12.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/yaml.v3 v3.0.0 // indirect + lukechampine.com/blake3 v1.2.1 // indirect ) + +replace github.com/ipfs/go-ipld-cbor => github.com/vulcanize/go-ipld-cbor v0.1.1-internal-0.0.1 diff --git a/go.sum b/go.sum index c523cd7..ecb7190 100644 --- a/go.sum +++ b/go.sum @@ -1,91 +1,112 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= -github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= -github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= -github.com/ipfs/go-block-format v0.0.2 h1:qPDvcP19izTjU8rgo6p7gTXZlkMkF5bz5G3fqIsSCPE= -github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= -github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= -github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= +github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= +github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= +github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4= +github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= +github.com/ipfs/boxo v0.13.1 h1:nQ5oQzcMZR3oL41REJDcTbrvDvuZh3J9ckc9+ILeRQI= +github.com/ipfs/go-block-format v0.1.2 h1:GAjkfhVx1f4YTODS6Esrj1wt2HhrtwTnhEr+DyPUaJo= +github.com/ipfs/go-block-format v0.1.2/go.mod h1:mACVcrxarQKstUU3Yf/RdwbC4DzPV6++rO2a3d+a/KE= github.com/ipfs/go-cid v0.0.6/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I= -github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY= -github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I= -github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50= -github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= -github.com/ipfs/go-ipld-cbor v0.0.4 h1:Aw3KPOKXjvrm6VjwJvFf1F1ekR/BH3jdof3Bk7OTiSA= -github.com/ipfs/go-ipld-cbor v0.0.4/go.mod h1:BkCduEx3XBCO6t2Sfo5BaHzuok7hbhdMm9Oh8B2Ftq4= -github.com/ipfs/go-ipld-format v0.0.1 h1:HCu4eB/Gh+KD/Q0M8u888RFkorTWNIL3da4oc5dwc80= -github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dCDnkOJhcZkms= -github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpRVWLVmUEE= -github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= +github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= +github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk= +github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= +github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8= +github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ= +github.com/ipfs/go-ipld-format v0.5.0 h1:WyEle9K96MSrvr47zZHKKcDxJ/vlpET6PSiQsAFO+Ds= +github.com/ipfs/go-ipld-format v0.5.0/go.mod h1:ImdZqJQaEouMjCvqCe0ORUS+uoBmf7Hf+EO/jh+nk3M= +github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY= +github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= +github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= +github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= -github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= -github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771 h1:MHkK1uRtFbVqvAgvWxafZe54+5uBxLluGylDiKgdhwo= github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= +github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= +github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= -github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= -github.com/mr-tron/base58 v1.1.3 h1:v+sk57XuaCKGXpWtVBX8YJzO7hMGx4Aajh4TQbdEFdc= github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= -github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI= +github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= +github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= -github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ89tUg4F4= +github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aGkbLYxPE= +github.com/multiformats/go-base32 v0.1.0/go.mod h1:Kj3tFY6zNr+ABYMqeUNeGvkIC/UYgtWibDcT0rExnbI= github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM= -github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= -github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk= +github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0= +github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= -github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= -github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= -github.com/multiformats/go-multihash v0.0.13 h1:06x+mk/zj1FoMsgNejLpy6QTvJqlSt/BhLEy87zidlc= +github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= +github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= -github.com/multiformats/go-varint v0.0.5 h1:XVZwSo04Cs3j/jS0uAEPpT3JY6DzMcVLLoWOSnCxOjg= +github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= +github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= +github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= +github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/polydawn/refmt v0.0.0-20190221155625-df39d6c2d992 h1:bzMe+2coZJYHnhGgVlcQKuRy4FSny4ds8dLQjw5P1XE= -github.com/polydawn/refmt v0.0.0-20190221155625-df39d6c2d992/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o= -github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= -github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= -github.com/smartystreets/goconvey v0.0.0-20190222223459-a17d461953aa h1:E+gaaifzi2xF65PbDmuKI3PhLWY6G5opMLniFq8vmXA= -github.com/smartystreets/goconvey v0.0.0-20190222223459-a17d461953aa/go.mod h1:2RVY1rIf+2J2o/IM9+vPq9RzmHDSseB7FoXiSNIUsoU= +github.com/polydawn/refmt v0.89.0 h1:ADJTApkvkeBZsN0tBTx8QjpD9JkmxbKp0cxfr9qszm4= +github.com/polydawn/refmt v0.89.0/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= +github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= +github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= +github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436 h1:qOpVTI+BrstcjTZLm2Yz/3sOnqkzj3FQoh0g+E5s3Gc= -github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= -github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= -github.com/whyrusleeping/cbor-gen v0.0.0-20220323183124-98fa8256a799 h1:DOOT2B85S0tHoLGTzV+FakaSSihgRCVwZkjqKQP5L/w= -github.com/whyrusleeping/cbor-gen v0.0.0-20220323183124-98fa8256a799/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= -golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/vulcanize/go-ipld-cbor v0.1.1-internal-0.0.1 h1:Y9NZaJQ1rBrhNz95W1HcJJbrqs6ZlFbYMwls8q5qLlU= +github.com/vulcanize/go-ipld-cbor v0.1.1-internal-0.0.1/go.mod h1:ikVQT4Jo3JB0Nks0Mr/Pyy7fz25ypxbdEoZHeBrsOo4= +github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ= +github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= +github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25 h1:yVYDLoN2gmB3OdBXFW8e1UwgVbmCvNlnAKhvHPaNARI= +github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= -golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= +golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= -golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI= +lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k= diff --git a/node.go b/node.go index 2d85fdf..d06e9c1 100644 --- a/node.go +++ b/node.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/sync/errgroup" "github.com/filecoin-project/go-amt-ipld/v4/internal" ) @@ -348,6 +349,203 @@ func (n *node) forEachAt(ctx context.Context, bs cbor.IpldStore, bitWidth uint, return nil } +type descentContext struct { + height int + offset uint64 +} + +type child struct { + link *link + descentContext +} + +type listChildren struct { + children []child +} + +func (n *node) forEachAtParallel(ctx context.Context, bs cbor.IpldStore, bitWidth uint, height int, start, offset uint64, cb func(uint64, *cbg.Deferred) error, concurrency int) error { + // Setup synchronization + grp, errGrpCtx := errgroup.WithContext(ctx) + // Input and output queues for workers. + feed := make(chan *listChildren) + out := make(chan *listChildren) + done := make(chan struct{}) + + for i := 0; i < concurrency; i++ { + grp.Go(func() error { + for childrenList := range feed { + linksToVisit := make([]cid.Cid, 0, len(childrenList.children)) + linksToVisitContext := make([]descentContext, 0, len(childrenList.children)) + cachedNodes := make([]*node, 0, len(childrenList.children)) + cachedNodesContext := make([]descentContext, 0, len(childrenList.children)) + for _, child := range childrenList.children { + if child.link.cached != nil { + cachedNodes = append(cachedNodes, child.link.cached) + cachedNodesContext = append(cachedNodesContext, child.descentContext) + } else if child.link.cid != cid.Undef { + linksToVisit = append(linksToVisit, child.link.cid) + linksToVisitContext = append(linksToVisitContext, child.descentContext) + } else { + return fmt.Errorf("invalid child") + } + } + + dserv := bs.(cbor.IpldBatchOpStore) + nodes := make([]interface{}, len(linksToVisit)) + for j := 0; j < len(linksToVisit); j++ { + nodes[j] = new(internal.Node) + } + cursorChan, missingCIDs, err := dserv.GetMany(errGrpCtx, linksToVisit, nodes) + if err != nil { + return err + } + if len(missingCIDs) != 0 { + return fmt.Errorf("GetMany returned an incomplete result set. The set is missing these CIDs: %+v", missingCIDs) + } + for cursor := range cursorChan { + if cursor.Err != nil { + return cursor.Err + } + internalNextNode, ok := nodes[cursor.Index].(*internal.Node) + if !ok { + return fmt.Errorf("expected node, got %T", nodes[cursor.Index]) + } + nextNode, err := newNode(*internalNextNode, bitWidth, false, linksToVisitContext[cursor.Index].height == 0) + if err != nil { + return err + } + nextChildren, err := nextNode.walkChildren(ctx, bitWidth, linksToVisitContext[cursor.Index].height, start, linksToVisitContext[cursor.Index].offset, cb) + if err != nil { + return err + } + select { + case <-errGrpCtx.Done(): + return nil + default: + if nextChildren != nil { + out <- nextChildren + } + } + } + for j, cachedNode := range cachedNodes { + nextChildren, err := cachedNode.walkChildren(ctx, bitWidth, cachedNodesContext[j].height, start, cachedNodesContext[j].offset, cb) + if err != nil { + return err + } + select { + case <-errGrpCtx.Done(): + return nil + default: + if nextChildren != nil { + out <- nextChildren + } + } + } + + select { + case done <- struct{}{}: + case <-errGrpCtx.Done(): + } + } + return nil + }) + } + + send := feed + var todoQueue []*listChildren + var inProgress int + + // start the walk + children, err := n.walkChildren(ctx, bitWidth, height, start, offset, cb) + // if we hit an error or there are no children, then we're done + if err != nil || children == nil { + close(feed) + grp.Wait() + return err + } + next := children + +dispatcherLoop: + for { + select { + case send <- next: + inProgress++ + if len(todoQueue) > 0 { + next = todoQueue[0] + todoQueue = todoQueue[1:] + } else { + next = nil + send = nil + } + case <-done: + inProgress-- + if inProgress == 0 && next == nil { + break dispatcherLoop + } + case nextNodes := <-out: + if next == nil { + next = nextNodes + send = feed + } else { + todoQueue = append(todoQueue, nextNodes) + } + case <-errGrpCtx.Done(): + break dispatcherLoop + } + } + close(feed) + return grp.Wait() +} + +func (n *node) walkChildren(ctx context.Context, bitWidth uint, height int, start, offset uint64, cb func(uint64, *cbg.Deferred) error) (*listChildren, error) { + if height == 0 { + // height=0 means we're at leaf nodes and get to use our callback + for i, v := range n.values { + if v != nil { + ix := offset + uint64(i) + if ix < start { + // if we're here, 'start' is probably somewhere in the + // middle of this node's elements + continue + } + + // use 'offset' to determine the actual index for this element, it + // tells us how distant we are from the left-most leaf node + if err := cb(offset+uint64(i), v); err != nil { + return nil, err + } + } + } + + return nil, nil + } + children := make([]child, 0, len(n.links)) + + subCount := nodesForHeight(bitWidth, height) + for i, ln := range n.links { + if ln == nil { + continue + } + + // 'offs' tells us the index of the left-most element of the subtree defined + // by 'sub' + offs := offset + (uint64(i) * subCount) + nextOffs := offs + subCount + // nextOffs > offs checks for overflow at MaxIndex (where the next offset wraps back + // to 0). + if nextOffs >= offs && start >= nextOffs { + // if we're here, 'start' lets us skip this entire sub-tree + continue + } + children = append(children, child{ln, descentContext{ + height: height - 1, + offset: offs, + }}) + } + + return &listChildren{children: children}, nil +} + var errNoVals = fmt.Errorf("no values") // Recursive implementation of FirstSetIndex that's performed on the left-most @@ -494,6 +692,76 @@ func (n *node) flush(ctx context.Context, bs cbor.IpldStore, bitWidth uint, heig return nd, nil } +// compact converts a node into its internal.Node representation +func (n *node) compact(ctx context.Context, bitWidth uint, height int) (*internal.Node, error) { + nd := new(internal.Node) + nd.Bmap = make([]byte, bmapBytes(bitWidth)) + + if height == 0 { + // leaf node, we're storing values in this node + for i, val := range n.values { + if val == nil { + continue + } + nd.Values = append(nd.Values, val) + // set the bit in the bitmap for this position to indicate its presence + nd.Bmap[i/8] |= 1 << (uint(i) % 8) + } + return nd, nil + } + + // non-leaf node, we're only storing Links in this node + for i, ln := range n.links { + if ln == nil { + continue + } + if ln.dirty { + if ln.cached == nil { + return nil, fmt.Errorf("expected dirty node to be cached") + } + subn, err := ln.cached.compact(ctx, bitWidth, height-1) + if err != nil { + return nil, err + } + c, err := calcCID(subn) + if err != nil { + return nil, err + } + + ln.cid = c + ln.dirty = false + } + nd.Links = append(nd.Links, ln.cid) + // set the bit in the bitmap for this position to indicate its presence + nd.Bmap[i/8] |= 1 << (uint(i) % 8) + } + + return nd, nil +} + +func calcCID(node cbg.CBORMarshaler) (cid.Cid, error) { + mhType := cbor.DefaultMultihash + mhLen := -1 + codec := uint64(cid.DagCBOR) + + buf := new(bytes.Buffer) + if err := node.MarshalCBOR(buf); err != nil { + return cid.Undef, err + } + + pref := cid.Prefix{ + Codec: codec, + MhType: mhType, + MhLength: mhLen, + Version: 1, + } + c, err := pref.Sum(buf.Bytes()) + if err != nil { + return cid.Undef, err + } + return c, nil +} + func (n *node) setLink(bitWidth uint, i uint64, l *link) { if n.links == nil { if l == nil {