diff --git a/share/shwap/p2p/bitswap/block_fetch_test.go b/share/shwap/p2p/bitswap/block_fetch_test.go index 53b6ae1033..9b4c4f59cd 100644 --- a/share/shwap/p2p/bitswap/block_fetch_test.go +++ b/share/shwap/p2p/bitswap/block_fetch_test.go @@ -25,30 +25,29 @@ import ( "github.com/celestiaorg/rsmt2d" - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds/edstest" eds "github.com/celestiaorg/celestia-node/share/new_eds" ) -func TestFetchOptions(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) +func TestFetch_Options(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - eds := edstest.RandEDS(t, 4) - root, err := share.NewRoot(eds) - require.NoError(t, err) - exchange := newExchange(ctx, t, eds) - - blks := make([]Block, eds.Width()) - for i := range blks { - blk, err := NewEmptyRowBlock(1, i, root) // create the same Block ID - require.NoError(t, err) - blks[i] = blk - } + const items = 128 + bstore, cids := testBlockstore(ctx, t, items) t.Run("WithBlockstore", func(t *testing.T) { + exchange := newExchange(ctx, t, bstore) + + blks := make([]Block, 0, cids.Len()) + _ = cids.ForEach(func(c cid.Cid) error { + blk, err := newEmptyTestBlock(c) + require.NoError(t, err) + blks = append(blks, blk) + return nil + }) + bstore := blockstore.NewBlockstore(ds.NewMapDatastore()) - err := Fetch(ctx, exchange, root, blks, WithStore(bstore)) + err := Fetch(ctx, exchange, nil, blks, WithStore(bstore)) require.NoError(t, err) for _, blk := range blks { @@ -59,31 +58,41 @@ func TestFetchOptions(t *testing.T) { }) t.Run("WithFetcher", func(t *testing.T) { + exchange := newExchange(ctx, t, bstore) + + blks := make([]Block, 0, cids.Len()) + _ = cids.ForEach(func(c cid.Cid) error { + blk, err := newEmptyTestBlock(c) + require.NoError(t, err) + blks = append(blks, blk) + return nil + }) + session := exchange.NewSession(ctx) fetcher := &testFetcher{Embedded: session} - err := Fetch(ctx, exchange, root, blks, WithFetcher(fetcher)) + err := Fetch(ctx, exchange, nil, blks, WithFetcher(fetcher)) require.NoError(t, err) require.Equal(t, len(blks), fetcher.Fetched) }) } -func TestFetchDuplicates(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) +func TestFetch_Duplicates(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - eds := edstest.RandEDS(t, 4) - root, err := share.NewRoot(eds) - require.NoError(t, err) - exchange := newExchange(ctx, t, eds) + const items = 128 + bstore, cids := testBlockstore(ctx, t, items) + exchange := newExchange(ctx, t, bstore) var wg sync.WaitGroup - for i := range 100 { - blks := make([]Block, eds.Width()) - for i := range blks { - blk, err := NewEmptyRowBlock(1, i, root) // create the same Block ID + for i := range items { + blks := make([]Block, 0, cids.Len()) + _ = cids.ForEach(func(c cid.Cid) error { + blk, err := newEmptyTestBlock(c) require.NoError(t, err) - blks[i] = blk - } + blks = append(blks, blk) + return nil + }) wg.Add(1) go func(i int) { @@ -91,11 +100,8 @@ func TestFetchDuplicates(t *testing.T) { // this sleep ensures fetches aren't started simultaneously, allowing to check for edge-cases time.Sleep(time.Millisecond * time.Duration(rint)) - err := Fetch(ctx, exchange, root, blks) + err := Fetch(ctx, exchange, nil, blks) assert.NoError(t, err) - for _, blk := range blks { - assert.False(t, blk.IsEmpty()) - } wg.Done() }(i) } @@ -110,13 +116,16 @@ func TestFetchDuplicates(t *testing.T) { require.Zero(t, entries) } -func newExchange(ctx context.Context, t *testing.T, rsmt2dEds *rsmt2d.ExtendedDataSquare) exchange.SessionExchange { +func newExchangeOverEDS(ctx context.Context, t *testing.T, rsmt2d *rsmt2d.ExtendedDataSquare) exchange.SessionExchange { bstore := &Blockstore{ Getter: testAccessorGetter{ - Accessor: eds.Rsmt2D{ExtendedDataSquare: rsmt2dEds}, + Accessor: eds.Rsmt2D{ExtendedDataSquare: rsmt2d}, }, } + return newExchange(ctx, t, bstore) +} +func newExchange(ctx context.Context, t *testing.T, bstore blockstore.Blockstore) exchange.SessionExchange { net, err := mocknet.FullMeshLinked(3) require.NoError(t, err) diff --git a/share/shwap/p2p/bitswap/block_registry.go b/share/shwap/p2p/bitswap/block_registry.go index b547e37859..495167aea7 100644 --- a/share/shwap/p2p/bitswap/block_registry.go +++ b/share/shwap/p2p/bitswap/block_registry.go @@ -9,12 +9,12 @@ import ( ) // registerBlock registers the new Block type and multihash for it. -func registerBlock(mhcode, codec uint64, size int, bldrFn func(cid.Cid) (Block, error)) { +func registerBlock(mhcode, codec uint64, idSize int, bldrFn func(cid.Cid) (Block, error)) { mh.Register(mhcode, func() hash.Hash { - return &hasher{IDSize: size} + return &hasher{IDSize: idSize} }) specRegistry[mhcode] = blockSpec{ - size: size, + idSize: idSize, codec: codec, builder: bldrFn, } @@ -22,13 +22,13 @@ func registerBlock(mhcode, codec uint64, size int, bldrFn func(cid.Cid) (Block, // blockSpec holds constant metadata about particular Block types. type blockSpec struct { - size int + idSize int codec uint64 builder func(cid.Cid) (Block, error) } func (spec *blockSpec) String() string { - return fmt.Sprintf("BlockSpec{size: %d, codec: %d}", spec.size, spec.codec) + return fmt.Sprintf("BlockSpec{IDSize: %d, Codec: %d}", spec.idSize, spec.codec) } var specRegistry = make(map[uint64]blockSpec) diff --git a/share/shwap/p2p/bitswap/block_store.go b/share/shwap/p2p/bitswap/block_store.go index 700d8798c7..ffc6b0ede7 100644 --- a/share/shwap/p2p/bitswap/block_store.go +++ b/share/shwap/p2p/bitswap/block_store.go @@ -42,17 +42,7 @@ func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, e return nil, fmt.Errorf("failed to populate Shwap Block on height %v for %s: %w", blk.Height(), spec.String(), err) } - protoData, err := marshalProto(blk) - if err != nil { - return nil, fmt.Errorf("failed to wrap Block with proto: %w", err) - } - - bitswapBlk, err := blocks.NewBlockWithCid(protoData, cid) - if err != nil { - return nil, fmt.Errorf("assembling Bitswap block: %w", err) - } - - return bitswapBlk, nil + return convertBitswap(blk) } func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { @@ -100,3 +90,18 @@ func (b *Blockstore) DeleteBlock(context.Context, cid.Cid) error { func (b *Blockstore) AllKeysChan(context.Context) (<-chan cid.Cid, error) { panic("not implemented") } func (b *Blockstore) HashOnRead(bool) { panic("not implemented") } + +// convertBitswap converts and marshals Block to Bitswap Block. +func convertBitswap(blk Block) (blocks.Block, error) { + protoData, err := marshalProto(blk) + if err != nil { + return nil, fmt.Errorf("failed to wrap Block with proto: %w", err) + } + + bitswapBlk, err := blocks.NewBlockWithCid(protoData, blk.CID()) + if err != nil { + return nil, fmt.Errorf("assembling Bitswap block: %w", err) + } + + return bitswapBlk, nil +} diff --git a/share/shwap/p2p/bitswap/block_test.go b/share/shwap/p2p/bitswap/block_test.go new file mode 100644 index 0000000000..8fc046b56c --- /dev/null +++ b/share/shwap/p2p/bitswap/block_test.go @@ -0,0 +1,116 @@ +package bitswap + +import ( + "context" + crand "crypto/rand" + "encoding/binary" + "testing" + "time" + + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/share" + eds "github.com/celestiaorg/celestia-node/share/new_eds" +) + +const ( + testCodec = 0x9999 + testMultihashCode = 0x9999 + testIDSize = 2 +) + +func init() { + registerBlock( + testMultihashCode, + testCodec, + testIDSize, + func(cid cid.Cid) (Block, error) { + return newEmptyTestBlock(cid) + }, + ) +} + +func testBlockstore(ctx context.Context, t *testing.T, items int) (blockstore.Blockstore, *cid.Set) { + bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + + cids := cid.NewSet() + for i := range items { + blk := newTestBlock(i) + bitswapBlk, err := convertBitswap(blk) + require.NoError(t, err) + err = bstore.Put(ctx, bitswapBlk) + require.NoError(t, err) + cids.Add(blk.CID()) + } + return bstore, cids +} + +type testID uint16 + +func (t testID) MarshalBinary() (data []byte, err error) { + data = binary.BigEndian.AppendUint16(data, uint16(t)) + return data, nil +} + +func (t *testID) UnmarshalBinary(data []byte) error { + *t = testID(binary.BigEndian.Uint16(data)) + return nil +} + +type testBlock struct { + id testID + data []byte +} + +func newTestBlock(id int) *testBlock { + bytes := make([]byte, 256) + _, _ = crand.Read(bytes) + return &testBlock{id: testID(id), data: bytes} +} + +func newEmptyTestBlock(cid cid.Cid) (*testBlock, error) { + idData, err := extractCID(cid) + if err != nil { + return nil, err + } + + var id testID + err = id.UnmarshalBinary(idData) + if err != nil { + return nil, err + } + + return &testBlock{id: id}, nil +} + +func (t *testBlock) CID() cid.Cid { + return encodeCID(t.id, testMultihashCode, testCodec) +} + +func (t *testBlock) Height() uint64 { + return 1 +} + +func (t *testBlock) IsEmpty() bool { + return t.data == nil +} + +func (t *testBlock) Populate(context.Context, eds.Accessor) error { + return nil // noop +} + +func (t *testBlock) Marshal() ([]byte, error) { + return t.data, nil +} + +func (t *testBlock) UnmarshalFn(*share.Root) UnmarshalFn { + return func(bytes []byte) error { + t.data = bytes + time.Sleep(time.Millisecond * 1) + return nil + } +} diff --git a/share/shwap/p2p/bitswap/cid.go b/share/shwap/p2p/bitswap/cid.go index ea65715d2e..8e93fed548 100644 --- a/share/shwap/p2p/bitswap/cid.go +++ b/share/shwap/p2p/bitswap/cid.go @@ -45,7 +45,7 @@ func validateCID(cid cid.Cid) error { return fmt.Errorf("invalid CID codec %d", prefix.Codec) } - if prefix.MhLength != spec.size { + if prefix.MhLength != spec.idSize { return fmt.Errorf("invalid multihash length %d", prefix.MhLength) } diff --git a/share/shwap/p2p/bitswap/row_block_test.go b/share/shwap/p2p/bitswap/row_block_test.go index 85d71717a5..36902b0e76 100644 --- a/share/shwap/p2p/bitswap/row_block_test.go +++ b/share/shwap/p2p/bitswap/row_block_test.go @@ -11,14 +11,14 @@ import ( "github.com/celestiaorg/celestia-node/share/eds/edstest" ) -func TestRowRoundtrip_GetContainers(t *testing.T) { +func TestRow_FetchRoundtrip(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() eds := edstest.RandEDS(t, 4) root, err := share.NewRoot(eds) require.NoError(t, err) - exchange := newExchange(ctx, t, eds) + exchange := newExchangeOverEDS(ctx, t, eds) blks := make([]Block, eds.Width()) for i := range blks { diff --git a/share/shwap/p2p/bitswap/row_namespace_data_block_test.go b/share/shwap/p2p/bitswap/row_namespace_data_block_test.go index a4a878cf7b..391ee7a825 100644 --- a/share/shwap/p2p/bitswap/row_namespace_data_block_test.go +++ b/share/shwap/p2p/bitswap/row_namespace_data_block_test.go @@ -12,13 +12,13 @@ import ( "github.com/celestiaorg/celestia-node/share/sharetest" ) -func TestRowNamespaceDataRoundtrip_GetContainers(t *testing.T) { +func TestRowNamespaceData_FetchRoundtrip(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() namespace := sharetest.RandV0Namespace() eds, root := edstest.RandEDSWithNamespace(t, namespace, 64, 16) - exchange := newExchange(ctx, t, eds) + exchange := newExchangeOverEDS(ctx, t, eds) rowIdxs := share.RowsWithNamespace(root, namespace) blks := make([]Block, len(rowIdxs)) diff --git a/share/shwap/p2p/bitswap/sample_block_test.go b/share/shwap/p2p/bitswap/sample_block_test.go index ccea1eb4cc..0733acebdf 100644 --- a/share/shwap/p2p/bitswap/sample_block_test.go +++ b/share/shwap/p2p/bitswap/sample_block_test.go @@ -11,14 +11,14 @@ import ( "github.com/celestiaorg/celestia-node/share/eds/edstest" ) -func TestSampleRoundtrip_GetContainers(t *testing.T) { +func TestSample_FetchRoundtrip(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() eds := edstest.RandEDS(t, 32) root, err := share.NewRoot(eds) require.NoError(t, err) - exchange := newExchange(ctx, t, eds) + exchange := newExchangeOverEDS(ctx, t, eds) width := int(eds.Width()) blks := make([]Block, 0, width*width)