Skip to content

Commit

Permalink
introduce test block and avoid testing Fetch functions over real blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Jun 25, 2024
1 parent 99821a9 commit 5ad3d76
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 58 deletions.
79 changes: 44 additions & 35 deletions share/shwap/p2p/bitswap/block_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,29 @@ import (

"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
eds "github.com/celestiaorg/celestia-node/share/new_eds"
)

func TestFetchOptions(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
func TestFetch_Options(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

eds := edstest.RandEDS(t, 4)
root, err := share.NewRoot(eds)
require.NoError(t, err)
exchange := newExchange(ctx, t, eds)

blks := make([]Block, eds.Width())
for i := range blks {
blk, err := NewEmptyRowBlock(1, i, root) // create the same Block ID
require.NoError(t, err)
blks[i] = blk
}
const items = 128
bstore, cids := testBlockstore(ctx, t, items)

t.Run("WithBlockstore", func(t *testing.T) {
exchange := newExchange(ctx, t, bstore)

blks := make([]Block, 0, cids.Len())
_ = cids.ForEach(func(c cid.Cid) error {
blk, err := newEmptyTestBlock(c)
require.NoError(t, err)
blks = append(blks, blk)
return nil
})

bstore := blockstore.NewBlockstore(ds.NewMapDatastore())
err := Fetch(ctx, exchange, root, blks, WithStore(bstore))
err := Fetch(ctx, exchange, nil, blks, WithStore(bstore))
require.NoError(t, err)

for _, blk := range blks {
Expand All @@ -59,43 +58,50 @@ func TestFetchOptions(t *testing.T) {
})

t.Run("WithFetcher", func(t *testing.T) {
exchange := newExchange(ctx, t, bstore)

blks := make([]Block, 0, cids.Len())
_ = cids.ForEach(func(c cid.Cid) error {
blk, err := newEmptyTestBlock(c)
require.NoError(t, err)
blks = append(blks, blk)
return nil
})

session := exchange.NewSession(ctx)
fetcher := &testFetcher{Embedded: session}
err := Fetch(ctx, exchange, root, blks, WithFetcher(fetcher))
err := Fetch(ctx, exchange, nil, blks, WithFetcher(fetcher))
require.NoError(t, err)
require.Equal(t, len(blks), fetcher.Fetched)
})
}

func TestFetchDuplicates(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
func TestFetch_Duplicates(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

eds := edstest.RandEDS(t, 4)
root, err := share.NewRoot(eds)
require.NoError(t, err)
exchange := newExchange(ctx, t, eds)
const items = 128
bstore, cids := testBlockstore(ctx, t, items)
exchange := newExchange(ctx, t, bstore)

var wg sync.WaitGroup
for i := range 100 {
blks := make([]Block, eds.Width())
for i := range blks {
blk, err := NewEmptyRowBlock(1, i, root) // create the same Block ID
for i := range items {
blks := make([]Block, 0, cids.Len())
_ = cids.ForEach(func(c cid.Cid) error {
blk, err := newEmptyTestBlock(c)
require.NoError(t, err)
blks[i] = blk
}
blks = append(blks, blk)
return nil
})

wg.Add(1)
go func(i int) {
rint := rand.IntN(10)
// this sleep ensures fetches aren't started simultaneously, allowing to check for edge-cases
time.Sleep(time.Millisecond * time.Duration(rint))

err := Fetch(ctx, exchange, root, blks)
err := Fetch(ctx, exchange, nil, blks)
assert.NoError(t, err)
for _, blk := range blks {
assert.False(t, blk.IsEmpty())
}
wg.Done()
}(i)
}
Expand All @@ -110,13 +116,16 @@ func TestFetchDuplicates(t *testing.T) {
require.Zero(t, entries)
}

func newExchange(ctx context.Context, t *testing.T, rsmt2dEds *rsmt2d.ExtendedDataSquare) exchange.SessionExchange {
func newExchangeOverEDS(ctx context.Context, t *testing.T, rsmt2d *rsmt2d.ExtendedDataSquare) exchange.SessionExchange {
bstore := &Blockstore{
Getter: testAccessorGetter{
Accessor: eds.Rsmt2D{ExtendedDataSquare: rsmt2dEds},
Accessor: eds.Rsmt2D{ExtendedDataSquare: rsmt2d},
},
}
return newExchange(ctx, t, bstore)
}

func newExchange(ctx context.Context, t *testing.T, bstore blockstore.Blockstore) exchange.SessionExchange {
net, err := mocknet.FullMeshLinked(3)
require.NoError(t, err)

Expand Down
10 changes: 5 additions & 5 deletions share/shwap/p2p/bitswap/block_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,26 @@ import (
)

// registerBlock registers the new Block type and multihash for it.
func registerBlock(mhcode, codec uint64, size int, bldrFn func(cid.Cid) (Block, error)) {
func registerBlock(mhcode, codec uint64, idSize int, bldrFn func(cid.Cid) (Block, error)) {
mh.Register(mhcode, func() hash.Hash {
return &hasher{IDSize: size}
return &hasher{IDSize: idSize}
})
specRegistry[mhcode] = blockSpec{
size: size,
idSize: idSize,
codec: codec,
builder: bldrFn,
}
}

// blockSpec holds constant metadata about particular Block types.
type blockSpec struct {
size int
idSize int
codec uint64
builder func(cid.Cid) (Block, error)
}

func (spec *blockSpec) String() string {
return fmt.Sprintf("BlockSpec{size: %d, codec: %d}", spec.size, spec.codec)
return fmt.Sprintf("BlockSpec{IDSize: %d, Codec: %d}", spec.idSize, spec.codec)
}

var specRegistry = make(map[uint64]blockSpec)
27 changes: 16 additions & 11 deletions share/shwap/p2p/bitswap/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,7 @@ func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, e
return nil, fmt.Errorf("failed to populate Shwap Block on height %v for %s: %w", blk.Height(), spec.String(), err)
}

protoData, err := marshalProto(blk)
if err != nil {
return nil, fmt.Errorf("failed to wrap Block with proto: %w", err)
}

bitswapBlk, err := blocks.NewBlockWithCid(protoData, cid)
if err != nil {
return nil, fmt.Errorf("assembling Bitswap block: %w", err)
}

return bitswapBlk, nil
return convertBitswap(blk)
}

func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
Expand Down Expand Up @@ -100,3 +90,18 @@ func (b *Blockstore) DeleteBlock(context.Context, cid.Cid) error {
func (b *Blockstore) AllKeysChan(context.Context) (<-chan cid.Cid, error) { panic("not implemented") }

func (b *Blockstore) HashOnRead(bool) { panic("not implemented") }

// convertBitswap converts and marshals Block to Bitswap Block.
func convertBitswap(blk Block) (blocks.Block, error) {
protoData, err := marshalProto(blk)
if err != nil {
return nil, fmt.Errorf("failed to wrap Block with proto: %w", err)
}

bitswapBlk, err := blocks.NewBlockWithCid(protoData, blk.CID())
if err != nil {
return nil, fmt.Errorf("assembling Bitswap block: %w", err)
}

return bitswapBlk, nil
}
116 changes: 116 additions & 0 deletions share/shwap/p2p/bitswap/block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package bitswap

import (
"context"
crand "crypto/rand"
"encoding/binary"
"testing"
"time"

"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share"
eds "github.com/celestiaorg/celestia-node/share/new_eds"
)

const (
testCodec = 0x9999
testMultihashCode = 0x9999
testIDSize = 2
)

func init() {
registerBlock(
testMultihashCode,
testCodec,
testIDSize,
func(cid cid.Cid) (Block, error) {
return newEmptyTestBlock(cid)
},
)
}

func testBlockstore(ctx context.Context, t *testing.T, items int) (blockstore.Blockstore, *cid.Set) {
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))

cids := cid.NewSet()
for i := range items {
blk := newTestBlock(i)
bitswapBlk, err := convertBitswap(blk)
require.NoError(t, err)
err = bstore.Put(ctx, bitswapBlk)
require.NoError(t, err)
cids.Add(blk.CID())
}
return bstore, cids
}

type testID uint16

func (t testID) MarshalBinary() (data []byte, err error) {
data = binary.BigEndian.AppendUint16(data, uint16(t))
return data, nil
}

func (t *testID) UnmarshalBinary(data []byte) error {
*t = testID(binary.BigEndian.Uint16(data))
return nil
}

type testBlock struct {
id testID
data []byte
}

func newTestBlock(id int) *testBlock {
bytes := make([]byte, 256)
_, _ = crand.Read(bytes)
return &testBlock{id: testID(id), data: bytes}
}

func newEmptyTestBlock(cid cid.Cid) (*testBlock, error) {
idData, err := extractCID(cid)
if err != nil {
return nil, err
}

var id testID
err = id.UnmarshalBinary(idData)
if err != nil {
return nil, err
}

return &testBlock{id: id}, nil
}

func (t *testBlock) CID() cid.Cid {
return encodeCID(t.id, testMultihashCode, testCodec)
}

func (t *testBlock) Height() uint64 {
return 1
}

func (t *testBlock) IsEmpty() bool {
return t.data == nil
}

func (t *testBlock) Populate(context.Context, eds.Accessor) error {
return nil // noop
}

func (t *testBlock) Marshal() ([]byte, error) {
return t.data, nil
}

func (t *testBlock) UnmarshalFn(*share.Root) UnmarshalFn {
return func(bytes []byte) error {
t.data = bytes
time.Sleep(time.Millisecond * 1)
return nil
}
}
2 changes: 1 addition & 1 deletion share/shwap/p2p/bitswap/cid.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func validateCID(cid cid.Cid) error {
return fmt.Errorf("invalid CID codec %d", prefix.Codec)
}

if prefix.MhLength != spec.size {
if prefix.MhLength != spec.idSize {
return fmt.Errorf("invalid multihash length %d", prefix.MhLength)
}

Expand Down
4 changes: 2 additions & 2 deletions share/shwap/p2p/bitswap/row_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
"github.com/celestiaorg/celestia-node/share/eds/edstest"
)

func TestRowRoundtrip_GetContainers(t *testing.T) {
func TestRow_FetchRoundtrip(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

eds := edstest.RandEDS(t, 4)
root, err := share.NewRoot(eds)
require.NoError(t, err)
exchange := newExchange(ctx, t, eds)
exchange := newExchangeOverEDS(ctx, t, eds)

blks := make([]Block, eds.Width())
for i := range blks {
Expand Down
4 changes: 2 additions & 2 deletions share/shwap/p2p/bitswap/row_namespace_data_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"github.com/celestiaorg/celestia-node/share/sharetest"
)

func TestRowNamespaceDataRoundtrip_GetContainers(t *testing.T) {
func TestRowNamespaceData_FetchRoundtrip(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

namespace := sharetest.RandV0Namespace()
eds, root := edstest.RandEDSWithNamespace(t, namespace, 64, 16)
exchange := newExchange(ctx, t, eds)
exchange := newExchangeOverEDS(ctx, t, eds)

rowIdxs := share.RowsWithNamespace(root, namespace)
blks := make([]Block, len(rowIdxs))
Expand Down
4 changes: 2 additions & 2 deletions share/shwap/p2p/bitswap/sample_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
"github.com/celestiaorg/celestia-node/share/eds/edstest"
)

func TestSampleRoundtrip_GetContainers(t *testing.T) {
func TestSample_FetchRoundtrip(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

eds := edstest.RandEDS(t, 32)
root, err := share.NewRoot(eds)
require.NoError(t, err)
exchange := newExchange(ctx, t, eds)
exchange := newExchangeOverEDS(ctx, t, eds)

width := int(eds.Width())
blks := make([]Block, 0, width*width)
Expand Down

0 comments on commit 5ad3d76

Please sign in to comment.