Skip to content

Commit

Permalink
fix prunning tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Nov 20, 2024
1 parent 560087a commit e13f2c2
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 52 deletions.
135 changes: 85 additions & 50 deletions share/availability/light/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
_ "embed"
"encoding/json"
"errors"
"github.com/ipfs/boxo/bitswap/client"
"github.com/libp2p/go-libp2p/core/host"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"maps"
"slices"
"sync"
Expand All @@ -15,7 +17,6 @@ import (
"github.com/golang/mock/gomock"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
"github.com/ipfs/boxo/exchange/offline"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand All @@ -35,7 +36,6 @@ import (
"github.com/celestiaorg/celestia-node/share/shwap/getters/mock"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex"
"github.com/celestiaorg/celestia-node/store"
)

func TestSharesAvailableSuccess(t *testing.T) {
Expand Down Expand Up @@ -315,27 +315,19 @@ func TestPruneAll(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
t.Cleanup(cancel)

dir := t.TempDir()
store, err := store.NewStore(store.DefaultParameters(), dir)
require.NoError(t, err)
defer require.NoError(t, store.Stop(ctx))
eds, h := randEdsAndHeader(t, size)
err = store.PutODSQ4(ctx, h.DAH, h.Height(), eds)
require.NoError(t, err)

// Create a new bitswap getter
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
clientBs := blockstore.NewBlockstore(ds)
serverBS := &bitswap.Blockstore{Getter: store}
ex := newFakeExchange(serverBS)

ex := newExchangeOverEDS(ctx, t, eds)
getter := bitswap.NewGetter(ex, clientBs, 0)
getter.Start()
defer getter.Stop()

// Create a new ShareAvailability instance and sample the shares
sampleAmount := uint(20)
avail := NewShareAvailability(getter, ds, clientBs, WithSampleAmount(sampleAmount))
err = avail.SharesAvailable(ctx, h)
err := avail.SharesAvailable(ctx, h)
require.NoError(t, err)
// close ShareAvailability to force flush of batched writes
avail.Close(ctx)
Expand All @@ -359,31 +351,23 @@ func TestPruneAll(t *testing.T) {

func TestPrunePartialFailed(t *testing.T) {
const size = 8
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*200)
t.Cleanup(cancel)

dir := t.TempDir()
store, err := store.NewStore(store.DefaultParameters(), dir)
require.NoError(t, err)
defer require.NoError(t, store.Stop(ctx))
eds, h := randEdsAndHeader(t, size)
err = store.PutODSQ4(ctx, h.DAH, h.Height(), eds)
require.NoError(t, err)

// Create a new bitswap getter
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
clientBs := blockstore.NewBlockstore(ds)
serverBS := newHalfFailBlockstore(&bitswap.Blockstore{Getter: store})
ex := newFakeExchange(serverBS)

ex := newHalfSessionExchange(newExchangeOverEDS(ctx, t, eds))
getter := bitswap.NewGetter(ex, clientBs, 0)
getter.Start()
defer getter.Stop()

// Create a new ShareAvailability instance and sample the shares
sampleAmount := uint(20)
avail := NewShareAvailability(getter, ds, clientBs, WithSampleAmount(sampleAmount))
err = avail.SharesAvailable(ctx, h)
require.NoError(t, err)
err := avail.SharesAvailable(ctx, h)
require.Error(t, err)
// close ShareAvailability to force flush of batched writes
avail.Close(ctx)

Expand All @@ -405,38 +389,37 @@ func TestPrunePartialFailed(t *testing.T) {
require.False(t, exist)
}

var _ exchange.SessionExchange = (*fakeSessionExchange)(nil)

func newFakeExchange(bs blockstore.Blockstore) *fakeSessionExchange {
return &fakeSessionExchange{
Interface: offline.Exchange(bs),
session: offline.Exchange(bs),
}
type halfSessionExchange struct {
exchange.SessionExchange
attempt atomic.Int32
}

type fakeSessionExchange struct {
exchange.Interface
session exchange.Fetcher
func newHalfSessionExchange(ex exchange.SessionExchange) *halfSessionExchange {
return &halfSessionExchange{SessionExchange: ex}
}

func (fe *fakeSessionExchange) NewSession(context.Context) exchange.Fetcher {
return fe.session
func (hse *halfSessionExchange) NewSession(context.Context) exchange.Fetcher {
return hse
}

type halfFailBlockstore struct {
blockstore.Blockstore
attempt atomic.Int32
}
func (hse *halfSessionExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) {
out := make(chan blocks.Block, len(cids))
defer close(out)

func newHalfFailBlockstore(bs blockstore.Blockstore) *halfFailBlockstore {
return &halfFailBlockstore{Blockstore: bs}
}
for _, cid := range cids {
if hse.attempt.Add(1)%2 == 0 {
continue
}

func (hfb *halfFailBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
if hfb.attempt.Add(1)%2 == 0 {
return nil, errors.New("fail")
blk, err := hse.SessionExchange.GetBlock(ctx, cid)
if err != nil {
return nil, err
}

out <- blk
}
return hfb.Blockstore.Get(ctx, c)

return out, nil
}

func randEdsAndHeader(t *testing.T, size int) (*rsmt2d.ExtendedDataSquare, *header.ExtendedHeader) {
Expand All @@ -463,3 +446,55 @@ func countKeys(ctx context.Context, t *testing.T, bs blockstore.Blockstore) int
}
return count
}

func newExchangeOverEDS(ctx context.Context, t *testing.T, rsmt2d *rsmt2d.ExtendedDataSquare) exchange.SessionExchange {
bstore := &bitswap.Blockstore{
Getter: testAccessorGetter{
AccessorStreamer: &eds.Rsmt2D{ExtendedDataSquare: rsmt2d},
},
}
return newExchange(ctx, t, bstore)
}

func newExchange(ctx context.Context, t *testing.T, bstore blockstore.Blockstore) exchange.SessionExchange {
net, err := mocknet.FullMeshLinked(3)
require.NoError(t, err)

newServer(ctx, net.Hosts()[0], bstore)
newServer(ctx, net.Hosts()[1], bstore)

client := newClient(ctx, net.Hosts()[2], bstore)

err = net.ConnectAllButSelf()
require.NoError(t, err)
return client
}

func newServer(ctx context.Context, host host.Host, store blockstore.Blockstore) {
net := bitswap.NewNetwork(host, "test")
server := bitswap.NewServer(
ctx,
net,
store,
)
net.Start(server)
}

func newClient(ctx context.Context, host host.Host, store blockstore.Blockstore) *client.Client {
net := bitswap.NewNetwork(host, "test")
client := bitswap.NewClient(ctx, net, store)
net.Start(client)
return client
}

type testAccessorGetter struct {
eds.AccessorStreamer
}

func (t testAccessorGetter) GetByHeight(context.Context, uint64) (eds.AccessorStreamer, error) {
return t.AccessorStreamer, nil
}

func (t testAccessorGetter) HasByHeight(context.Context, uint64) (bool, error) {
return true, nil
}
2 changes: 0 additions & 2 deletions share/shwap/p2p/bitswap/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/celestiaorg/celestia-app/v3/pkg/wrapper"
libshare "github.com/celestiaorg/go-square/v2/share"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
Expand Down Expand Up @@ -133,7 +132,6 @@ func (g *Getter) GetSamples(
smpls := make([]shwap.Sample, len(blks))
for i, blk := range blks {
c := blk.(*SampleBlock).Container
c.Proof = &nmt.Proof{} // TODO
smpls[i] = c
}

Expand Down

0 comments on commit e13f2c2

Please sign in to comment.