diff --git a/share/availability/light/availability_test.go b/share/availability/light/availability_test.go index 3d129c2625..ceb21621f4 100644 --- a/share/availability/light/availability_test.go +++ b/share/availability/light/availability_test.go @@ -4,7 +4,9 @@ import ( "context" _ "embed" "encoding/json" - "errors" + "github.com/ipfs/boxo/bitswap/client" + "github.com/libp2p/go-libp2p/core/host" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "maps" "slices" "sync" @@ -15,7 +17,6 @@ import ( "github.com/golang/mock/gomock" "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/exchange" - "github.com/ipfs/boxo/exchange/offline" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -35,7 +36,6 @@ import ( "github.com/celestiaorg/celestia-node/share/shwap/getters/mock" "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex" - "github.com/celestiaorg/celestia-node/store" ) func TestSharesAvailableSuccess(t *testing.T) { @@ -315,19 +315,11 @@ func TestPruneAll(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) t.Cleanup(cancel) - dir := t.TempDir() - store, err := store.NewStore(store.DefaultParameters(), dir) - require.NoError(t, err) - defer require.NoError(t, store.Stop(ctx)) eds, h := randEdsAndHeader(t, size) - err = store.PutODSQ4(ctx, h.DAH, h.Height(), eds) - require.NoError(t, err) - - // Create a new bitswap getter ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) clientBs := blockstore.NewBlockstore(ds) - serverBS := &bitswap.Blockstore{Getter: store} - ex := newFakeExchange(serverBS) + + ex := newExchangeOverEDS(ctx, t, eds) getter := bitswap.NewGetter(ex, clientBs, 0) getter.Start() defer getter.Stop() @@ -335,7 +327,7 @@ func TestPruneAll(t *testing.T) { // Create a new ShareAvailability instance and sample the shares sampleAmount := uint(20) avail := NewShareAvailability(getter, ds, clientBs, WithSampleAmount(sampleAmount)) - err = avail.SharesAvailable(ctx, h) + err := avail.SharesAvailable(ctx, h) require.NoError(t, err) // close ShareAvailability to force flush of batched writes avail.Close(ctx) @@ -359,22 +351,14 @@ func TestPruneAll(t *testing.T) { func TestPrunePartialFailed(t *testing.T) { const size = 8 - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*200) t.Cleanup(cancel) - dir := t.TempDir() - store, err := store.NewStore(store.DefaultParameters(), dir) - require.NoError(t, err) - defer require.NoError(t, store.Stop(ctx)) eds, h := randEdsAndHeader(t, size) - err = store.PutODSQ4(ctx, h.DAH, h.Height(), eds) - require.NoError(t, err) - - // Create a new bitswap getter ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) clientBs := blockstore.NewBlockstore(ds) - serverBS := newHalfFailBlockstore(&bitswap.Blockstore{Getter: store}) - ex := newFakeExchange(serverBS) + + ex := newHalfSessionExchange(newExchangeOverEDS(ctx, t, eds)) getter := bitswap.NewGetter(ex, clientBs, 0) getter.Start() defer getter.Stop() @@ -382,8 +366,8 @@ func TestPrunePartialFailed(t *testing.T) { // Create a new ShareAvailability instance and sample the shares sampleAmount := uint(20) avail := NewShareAvailability(getter, ds, clientBs, WithSampleAmount(sampleAmount)) - err = avail.SharesAvailable(ctx, h) - require.NoError(t, err) + err := avail.SharesAvailable(ctx, h) + require.Error(t, err) // close ShareAvailability to force flush of batched writes avail.Close(ctx) @@ -405,38 +389,37 @@ func TestPrunePartialFailed(t *testing.T) { require.False(t, exist) } -var _ exchange.SessionExchange = (*fakeSessionExchange)(nil) - -func newFakeExchange(bs blockstore.Blockstore) *fakeSessionExchange { - return &fakeSessionExchange{ - Interface: offline.Exchange(bs), - session: offline.Exchange(bs), - } +type halfSessionExchange struct { + exchange.SessionExchange + attempt atomic.Int32 } -type fakeSessionExchange struct { - exchange.Interface - session exchange.Fetcher +func newHalfSessionExchange(ex exchange.SessionExchange) *halfSessionExchange { + return &halfSessionExchange{SessionExchange: ex} } -func (fe *fakeSessionExchange) NewSession(context.Context) exchange.Fetcher { - return fe.session +func (hse *halfSessionExchange) NewSession(context.Context) exchange.Fetcher { + return hse } -type halfFailBlockstore struct { - blockstore.Blockstore - attempt atomic.Int32 -} +func (hse *halfSessionExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + out := make(chan blocks.Block, len(cids)) + defer close(out) -func newHalfFailBlockstore(bs blockstore.Blockstore) *halfFailBlockstore { - return &halfFailBlockstore{Blockstore: bs} -} + for _, cid := range cids { + if hse.attempt.Add(1)%2 == 0 { + continue + } -func (hfb *halfFailBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { - if hfb.attempt.Add(1)%2 == 0 { - return nil, errors.New("fail") + blk, err := hse.SessionExchange.GetBlock(ctx, cid) + if err != nil { + return nil, err + } + + out <- blk } - return hfb.Blockstore.Get(ctx, c) + + return out, nil } func randEdsAndHeader(t *testing.T, size int) (*rsmt2d.ExtendedDataSquare, *header.ExtendedHeader) { @@ -463,3 +446,55 @@ func countKeys(ctx context.Context, t *testing.T, bs blockstore.Blockstore) int } return count } + +func newExchangeOverEDS(ctx context.Context, t *testing.T, rsmt2d *rsmt2d.ExtendedDataSquare) exchange.SessionExchange { + bstore := &bitswap.Blockstore{ + Getter: testAccessorGetter{ + AccessorStreamer: &eds.Rsmt2D{ExtendedDataSquare: rsmt2d}, + }, + } + return newExchange(ctx, t, bstore) +} + +func newExchange(ctx context.Context, t *testing.T, bstore blockstore.Blockstore) exchange.SessionExchange { + net, err := mocknet.FullMeshLinked(3) + require.NoError(t, err) + + newServer(ctx, net.Hosts()[0], bstore) + newServer(ctx, net.Hosts()[1], bstore) + + client := newClient(ctx, net.Hosts()[2], bstore) + + err = net.ConnectAllButSelf() + require.NoError(t, err) + return client +} + +func newServer(ctx context.Context, host host.Host, store blockstore.Blockstore) { + net := bitswap.NewNetwork(host, "test") + server := bitswap.NewServer( + ctx, + net, + store, + ) + net.Start(server) +} + +func newClient(ctx context.Context, host host.Host, store blockstore.Blockstore) *client.Client { + net := bitswap.NewNetwork(host, "test") + client := bitswap.NewClient(ctx, net, store) + net.Start(client) + return client +} + +type testAccessorGetter struct { + eds.AccessorStreamer +} + +func (t testAccessorGetter) GetByHeight(context.Context, uint64) (eds.AccessorStreamer, error) { + return t.AccessorStreamer, nil +} + +func (t testAccessorGetter) HasByHeight(context.Context, uint64) (bool, error) { + return true, nil +} diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index 11b3d2846a..e46f0d24c6 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -15,7 +15,6 @@ import ( "github.com/celestiaorg/celestia-app/v3/pkg/wrapper" libshare "github.com/celestiaorg/go-square/v2/share" - "github.com/celestiaorg/nmt" "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/header" @@ -133,7 +132,6 @@ func (g *Getter) GetSamples( smpls := make([]shwap.Sample, len(blks)) for i, blk := range blks { c := blk.(*SampleBlock).Container - c.Proof = &nmt.Proof{} // TODO smpls[i] = c }