diff --git a/share/shwap/p2p/bitswap/block.go b/share/shwap/p2p/bitswap/block.go new file mode 100644 index 0000000000..6825fc43ce --- /dev/null +++ b/share/shwap/p2p/bitswap/block.go @@ -0,0 +1,38 @@ +package bitswap + +import ( + "context" + + "github.com/ipfs/go-cid" + logger "github.com/ipfs/go-log/v2" + + "github.com/celestiaorg/celestia-node/share" + eds "github.com/celestiaorg/celestia-node/share/new_eds" +) + +var log = logger.Logger("shwap/bitswap") + +// Block represents Bitswap compatible generalization over Shwap containers. +// All Shwap containers must have a registered wrapper +// implementing the interface in order to be compatible with Bitswap. +// NOTE: This is not a Blockchain block, but an IPFS/Bitswap block. +type Block interface { + // CID returns Shwap ID of the Block formatted as CID. + CID() cid.Cid + // Height reports the Height of the Shwap container behind the Block. + Height() uint64 + + // Populate fills up the Block with the Shwap container getting it out of the EDS + // Accessor. + Populate(context.Context, eds.Accessor) error + // Marshal serializes bytes of the Shwap Container the Block holds. + // MUST exclude the Shwap ID. + Marshal() ([]byte, error) + // UnmarshalFn returns closure that unmarshal the Block with the Shwap container. + // Unmarshalling involves data validation against the given Root. + UnmarshalFn(*share.Root) UnmarshalFn +} + +// UnmarshalFn is a closure produced by a Block that unmarshalls and validates +// the given serialized bytes of a Shwap container and populates the Block with it on success. +type UnmarshalFn func([]byte) error diff --git a/share/shwap/p2p/bitswap/block_fetch.go b/share/shwap/p2p/bitswap/block_fetch.go new file mode 100644 index 0000000000..4a66eb72ef --- /dev/null +++ b/share/shwap/p2p/bitswap/block_fetch.go @@ -0,0 +1,259 @@ +package bitswap + +import ( + "context" + "crypto/sha256" + "fmt" + "sync" + + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/exchange" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + + "github.com/celestiaorg/celestia-node/share" +) + +// WithFetcher instructs [Fetch] to use the given Fetcher. +// Useful for reusable Fetcher sessions. +func WithFetcher(session exchange.Fetcher) FetchOption { + return func(options *fetchOptions) { + options.Session = session + } +} + +// WithStore instructs [Fetch] to store all the fetched Blocks into the given Blockstore. +func WithStore(store blockstore.Blockstore) FetchOption { + return func(options *fetchOptions) { + options.Store = store + } +} + +// Fetch fetches and populates given Blocks using Fetcher wrapping Bitswap. +// +// Validates Block against the given Root and skips Blocks that are already populated. +// Gracefully synchronize identical Blocks requested simultaneously. +// Blocks until either context is canceled or all Blocks are fetched and populated. +func Fetch(ctx context.Context, exchg exchange.Interface, root *share.Root, blks []Block, opts ...FetchOption) error { + var from, to int + for to < len(blks) { + from, to = to, to+maxPerFetch + if to >= len(blks) { + to = len(blks) + } + + err := fetch(ctx, exchg, root, blks[from:to], opts...) + if err != nil { + return err + } + } + + return ctx.Err() +} + +// maxPerFetch sets the limit for maximum items in a single fetch. +// This limit comes from server side default limit size on max possible simultaneous CID WANTs from a peer. +// https://github.com/ipfs/boxo/blob/dfd4a53ba828a368cec8d61c3fe12969ac6aa94c/bitswap/internal/defaults/defaults.go#L29-L30 +const maxPerFetch = 1024 + +// fetch fetches given Blocks. +// See [Fetch] for detailed description. +func fetch(ctx context.Context, exchg exchange.Interface, root *share.Root, blks []Block, opts ...FetchOption) error { + var options fetchOptions + for _, opt := range opts { + opt(&options) + } + + fetcher := options.getFetcher(exchg) + cids := make([]cid.Cid, 0, len(blks)) + duplicates := make(map[cid.Cid]Block) + for _, blk := range blks { + cid := blk.CID() // memoize CID for reuse as it ain't free + cids = append(cids, cid) + + // store the UnmarshalFn s.t. hasher can access it + // and fill in the Block + unmarshalFn := blk.UnmarshalFn(root) + _, exists := unmarshalFns.LoadOrStore(cid, &unmarshalEntry{UnmarshalFn: unmarshalFn}) + if exists { + // the unmarshalFn has already been stored for the cid + // means there is ongoing fetch happening for the same cid + duplicates[cid] = blk // so mark the Block as duplicate + } else { + // cleanup are by the original requester and + // only after we are sure we got the block + defer unmarshalFns.Delete(cid) + } + } + + blkCh, err := fetcher.GetBlocks(ctx, cids) + if err != nil { + return fmt.Errorf("requesting Bitswap blocks: %w", err) + } + + for bitswapBlk := range blkCh { // GetBlocks closes blkCh on ctx cancellation + // NOTE: notification for duplicates is on purpose and to cover a flaky case + // It's harmless in practice to do additional notifications in case of duplicates + if err := exchg.NotifyNewBlocks(ctx, bitswapBlk); err != nil { + log.Error("failed to notify the new Bitswap block: %s", err) + } + + blk, ok := duplicates[bitswapBlk.Cid()] + if ok { + // uncommon duplicate case: concurrent fetching of the same block. + // The block hasn't been invoked inside hasher verification, + // so we have to unmarshal it ourselves. + unmarshalFn := blk.UnmarshalFn(root) + err := unmarshal(unmarshalFn, bitswapBlk.RawData()) + if err != nil { + // this means verification succeeded in the hasher but failed here + // this case should never happen in practice + // and if so something is really wrong + panic(fmt.Sprintf("unmarshaling duplicate block: %s", err)) + } + // NOTE: This approach has a downside that we redo deserialization and computationally + // expensive computation for as many duplicates. We tried solutions that doesn't have this + // problem, but they are *much* more complex. Considering this a rare edge-case the tradeoff + // towards simplicity has been made. + continue + } + // common case: the block was populated by the hasher + // so store it if requested + err := options.store(ctx, bitswapBlk) + if err != nil { + log.Error("failed to store the new Bitswap block: %s", err) + } + } + + return ctx.Err() +} + +// unmarshal unmarshalls the Shwap Container data into a Block with the given UnmarshalFn +func unmarshal(unmarshalFn UnmarshalFn, data []byte) error { + _, containerData, err := unmarshalProto(data) + if err != nil { + return err + } + + err = unmarshalFn(containerData) + if err != nil { + return fmt.Errorf("verifying and unmarshalling container data: %w", err) + } + + return nil +} + +// unmarshalFns exist to communicate between Fetch and hasher, and it's global as a necessity +// +// Fetch registers UnmarshalFNs that hasher then uses to validate and unmarshal Block responses coming +// through Bitswap +// +// Bitswap does not provide *stateful* verification out of the box and by default +// messages are verified by their respective MultiHashes that are registered globally. +// For every Block type there is a global hasher registered that accesses stored UnmarshalFn once a +// message is received. It then uses UnmarshalFn to validate and fill in the respective Block +// +// sync.Map is used to minimize contention for disjoint keys +var unmarshalFns sync.Map + +// unmarshalEntry wraps UnmarshalFn with a mutex to protect it from concurrent access. +type unmarshalEntry struct { + sync.Mutex + UnmarshalFn +} + +// hasher implements hash.Hash to be registered as custom multihash +// hasher is the *hack* to inject custom verification logic into Bitswap +type hasher struct { + // IDSize of the respective Shwap container + IDSize int // to be set during hasher registration + + sum []byte +} + +func (h *hasher) Write(data []byte) (int, error) { + err := h.write(data) + if err != nil { + err = fmt.Errorf("hasher: %w", err) + log.Error(err) + return 0, fmt.Errorf("shwap/bitswap: %w", err) + } + + return len(data), nil +} + +func (h *hasher) write(data []byte) error { + cid, container, err := unmarshalProto(data) + if err != nil { + return fmt.Errorf("unmarshalling proto: %w", err) + } + + // getBlock ID out of CID validating it + id, err := extractCID(cid) + if err != nil { + return fmt.Errorf("validating cid: %w", err) + } + + // getBlock registered UnmarshalFn and use it to check data validity and + // pass it to Fetch caller + val, ok := unmarshalFns.Load(cid) + if !ok { + return fmt.Errorf("no unmarshallers registered for %s", cid.String()) + } + entry := val.(*unmarshalEntry) + + // ensure UnmarshalFn is synchronized + // NOTE: Bitswap may call hasher.Write concurrently, which may call unmarshall concurrently + // this we need this synchronization. + entry.Lock() + err = entry.UnmarshalFn(container) + if err != nil { + return fmt.Errorf("verifying and unmarshalling container data: %w", err) + } + entry.Unlock() + + // set the id as resulting sum + // it's required for the sum to match the requested ID + // to satisfy hash contract and signal to Bitswap that data is correct + h.sum = id + return nil +} + +func (h *hasher) Sum([]byte) []byte { + return h.sum +} + +func (h *hasher) Reset() { + h.sum = nil +} + +func (h *hasher) Size() int { + return h.IDSize +} + +func (h *hasher) BlockSize() int { + return sha256.BlockSize +} + +type FetchOption func(*fetchOptions) + +type fetchOptions struct { + Session exchange.Fetcher + Store blockstore.Blockstore +} + +func (options *fetchOptions) getFetcher(exhng exchange.Interface) exchange.Fetcher { + if options.Session != nil { + return options.Session + } + + return exhng +} + +func (options *fetchOptions) store(ctx context.Context, blk blocks.Block) error { + if options.Store == nil { + return nil + } + + return options.Store.Put(ctx, blk) +} diff --git a/share/shwap/p2p/bitswap/block_fetch_test.go b/share/shwap/p2p/bitswap/block_fetch_test.go new file mode 100644 index 0000000000..1c8aa367e1 --- /dev/null +++ b/share/shwap/p2p/bitswap/block_fetch_test.go @@ -0,0 +1,186 @@ +package bitswap + +import ( + "context" + "math/rand/v2" + "sync" + "testing" + "time" + + "github.com/ipfs/boxo/bitswap/client" + "github.com/ipfs/boxo/bitswap/network" + "github.com/ipfs/boxo/bitswap/server" + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/exchange" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" + "github.com/libp2p/go-libp2p/core/host" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/rsmt2d" + + eds "github.com/celestiaorg/celestia-node/share/new_eds" +) + +func TestFetch_Options(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + const items = 128 + bstore, cids := testBlockstore(ctx, t, items) + + t.Run("WithBlockstore", func(t *testing.T) { + exchange := newExchange(ctx, t, bstore) + + blks := make([]Block, 0, cids.Len()) + _ = cids.ForEach(func(c cid.Cid) error { + blk, err := newEmptyTestBlock(c) + require.NoError(t, err) + blks = append(blks, blk) + return nil + }) + + bstore := blockstore.NewBlockstore(ds.NewMapDatastore()) + err := Fetch(ctx, exchange, nil, blks, WithStore(bstore)) + require.NoError(t, err) + + for _, blk := range blks { + ok, err := bstore.Has(ctx, blk.CID()) + require.NoError(t, err) + require.True(t, ok) + } + }) + + t.Run("WithFetcher", func(t *testing.T) { + exchange := newExchange(ctx, t, bstore) + + blks := make([]Block, 0, cids.Len()) + _ = cids.ForEach(func(c cid.Cid) error { + blk, err := newEmptyTestBlock(c) + require.NoError(t, err) + blks = append(blks, blk) + return nil + }) + + session := exchange.NewSession(ctx) + fetcher := &testFetcher{Embedded: session} + err := Fetch(ctx, exchange, nil, blks, WithFetcher(fetcher)) + require.NoError(t, err) + require.Equal(t, len(blks), fetcher.Fetched) + }) +} + +func TestFetch_Duplicates(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + const items = 128 + bstore, cids := testBlockstore(ctx, t, items) + exchange := newExchange(ctx, t, bstore) + + var wg sync.WaitGroup + for i := range items { + blks := make([]Block, 0, cids.Len()) + _ = cids.ForEach(func(c cid.Cid) error { + blk, err := newEmptyTestBlock(c) + require.NoError(t, err) + blks = append(blks, blk) + return nil + }) + + wg.Add(1) + go func(i int) { + rint := rand.IntN(10) + // this sleep ensures fetches aren't started simultaneously, allowing to check for edge-cases + time.Sleep(time.Millisecond * time.Duration(rint)) + + err := Fetch(ctx, exchange, nil, blks) + assert.NoError(t, err) + wg.Done() + }(i) + } + wg.Wait() + + var entries int + unmarshalFns.Range(func(key, _ any) bool { + unmarshalFns.Delete(key) + entries++ + return true + }) + require.Zero(t, entries) +} + +func newExchangeOverEDS(ctx context.Context, t *testing.T, rsmt2d *rsmt2d.ExtendedDataSquare) exchange.SessionExchange { + bstore := &Blockstore{ + Getter: testAccessorGetter{ + Accessor: eds.Rsmt2D{ExtendedDataSquare: rsmt2d}, + }, + } + return newExchange(ctx, t, bstore) +} + +func newExchange(ctx context.Context, t *testing.T, bstore blockstore.Blockstore) exchange.SessionExchange { + net, err := mocknet.FullMeshLinked(3) + require.NoError(t, err) + + newServer(ctx, net.Hosts()[0], bstore) + newServer(ctx, net.Hosts()[1], bstore) + + client := newClient(ctx, net.Hosts()[2], bstore) + + err = net.ConnectAllButSelf() + require.NoError(t, err) + return client +} + +func newServer(ctx context.Context, host host.Host, store blockstore.Blockstore) { + net := network.NewFromIpfsHost(host, routinghelpers.Null{}) + server := server.New( + ctx, + net, + store, + server.TaskWorkerCount(2), + server.EngineTaskWorkerCount(2), + server.ProvideEnabled(false), + server.SetSendDontHaves(false), + ) + net.Start(server) +} + +func newClient(ctx context.Context, host host.Host, store blockstore.Blockstore) *client.Client { + net := network.NewFromIpfsHost(host, routinghelpers.Null{}) + client := client.New( + ctx, + net, + store, + ) + net.Start(client) + return client +} + +type testAccessorGetter struct { + eds.Accessor +} + +func (t testAccessorGetter) GetByHeight(context.Context, uint64) (eds.Accessor, error) { + return t.Accessor, nil +} + +type testFetcher struct { + Fetched int + + Embedded exchange.Fetcher +} + +func (t *testFetcher) GetBlock(context.Context, cid.Cid) (blocks.Block, error) { + panic("not implemented") +} + +func (t *testFetcher) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + t.Fetched += len(cids) + return t.Embedded.GetBlocks(ctx, cids) +} diff --git a/share/shwap/p2p/bitswap/block_proto.go b/share/shwap/p2p/bitswap/block_proto.go new file mode 100644 index 0000000000..8cab645705 --- /dev/null +++ b/share/shwap/p2p/bitswap/block_proto.go @@ -0,0 +1,46 @@ +package bitswap + +import ( + "fmt" + + "github.com/ipfs/go-cid" + + bitswappb "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap/pb" +) + +// marshalProto wraps the given Block in composition protobuf and marshals it. +func marshalProto(blk Block) ([]byte, error) { + containerData, err := blk.Marshal() + if err != nil { + return nil, fmt.Errorf("marshaling Shwap container: %w", err) + } + + blkProto := bitswappb.Block{ + Cid: blk.CID().Bytes(), + Container: containerData, + } + + blkData, err := blkProto.Marshal() + if err != nil { + return nil, fmt.Errorf("marshaling Bitswap Block protobuf: %w", err) + } + + return blkData, nil +} + +// unmarshalProto unwraps given data from composition protobuf and provides +// inner CID and serialized container data. +func unmarshalProto(data []byte) (cid.Cid, []byte, error) { + var blk bitswappb.Block + err := blk.Unmarshal(data) + if err != nil { + return cid.Undef, nil, fmt.Errorf("unmarshalling protobuf block: %w", err) + } + + cid, err := cid.Cast(blk.Cid) + if err != nil { + return cid, nil, fmt.Errorf("casting cid: %w", err) + } + + return cid, blk.Container, nil +} diff --git a/share/shwap/p2p/bitswap/block_registry.go b/share/shwap/p2p/bitswap/block_registry.go new file mode 100644 index 0000000000..495167aea7 --- /dev/null +++ b/share/shwap/p2p/bitswap/block_registry.go @@ -0,0 +1,34 @@ +package bitswap + +import ( + "fmt" + "hash" + + "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" +) + +// registerBlock registers the new Block type and multihash for it. +func registerBlock(mhcode, codec uint64, idSize int, bldrFn func(cid.Cid) (Block, error)) { + mh.Register(mhcode, func() hash.Hash { + return &hasher{IDSize: idSize} + }) + specRegistry[mhcode] = blockSpec{ + idSize: idSize, + codec: codec, + builder: bldrFn, + } +} + +// blockSpec holds constant metadata about particular Block types. +type blockSpec struct { + idSize int + codec uint64 + builder func(cid.Cid) (Block, error) +} + +func (spec *blockSpec) String() string { + return fmt.Sprintf("BlockSpec{IDSize: %d, Codec: %d}", spec.idSize, spec.codec) +} + +var specRegistry = make(map[uint64]blockSpec) diff --git a/share/shwap/p2p/bitswap/block_store.go b/share/shwap/p2p/bitswap/block_store.go new file mode 100644 index 0000000000..ffc6b0ede7 --- /dev/null +++ b/share/shwap/p2p/bitswap/block_store.go @@ -0,0 +1,107 @@ +package bitswap + +import ( + "context" + "fmt" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + + eds "github.com/celestiaorg/celestia-node/share/new_eds" +) + +// AccessorGetter abstracts storage system that indexes and manages multiple eds.AccessorGetter by network height. +type AccessorGetter interface { + // GetByHeight returns an Accessor by its height. + GetByHeight(ctx context.Context, height uint64) (eds.Accessor, error) +} + +// Blockstore implements generalized Bitswap compatible storage over Shwap containers +// that operates with Block and accesses data through AccessorGetter. +type Blockstore struct { + Getter AccessorGetter +} + +func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, error) { + spec, ok := specRegistry[cid.Prefix().MhType] + if !ok { + return nil, fmt.Errorf("unsupported Block type: %v", cid.Prefix().MhType) + } + + blk, err := spec.builder(cid) + if err != nil { + return nil, fmt.Errorf("failed to build a Block for %s: %w", spec.String(), err) + } + + eds, err := b.Getter.GetByHeight(ctx, blk.Height()) + if err != nil { + return nil, fmt.Errorf("getting EDS Accessor for height %v: %w", blk.Height(), err) + } + + if err = blk.Populate(ctx, eds); err != nil { + return nil, fmt.Errorf("failed to populate Shwap Block on height %v for %s: %w", blk.Height(), spec.String(), err) + } + + return convertBitswap(blk) +} + +func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { + blk, err := b.getBlock(ctx, cid) + if err != nil { + log.Errorf("blockstore: getting local block(%s): %s", cid, err) + return nil, err + } + + return blk, nil +} + +func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { + // TODO(@Wondertan): There must be a way to derive size without reading, proving, serializing and + // allocating Sample's block.Block or we could do hashing + // NOTE:Bitswap uses GetSize also to determine if we have content stored or not + // so simply returning constant size is not an option + blk, err := b.Get(ctx, cid) + if err != nil { + return 0, err + } + return len(blk.RawData()), nil +} + +func (b *Blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) { + _, err := b.Get(ctx, cid) + if err != nil { + return false, err + } + return true, nil +} + +func (b *Blockstore) Put(context.Context, blocks.Block) error { + panic("not implemented") +} + +func (b *Blockstore) PutMany(context.Context, []blocks.Block) error { + panic("not implemented") +} + +func (b *Blockstore) DeleteBlock(context.Context, cid.Cid) error { + panic("not implemented") +} + +func (b *Blockstore) AllKeysChan(context.Context) (<-chan cid.Cid, error) { panic("not implemented") } + +func (b *Blockstore) HashOnRead(bool) { panic("not implemented") } + +// convertBitswap converts and marshals Block to Bitswap Block. +func convertBitswap(blk Block) (blocks.Block, error) { + protoData, err := marshalProto(blk) + if err != nil { + return nil, fmt.Errorf("failed to wrap Block with proto: %w", err) + } + + bitswapBlk, err := blocks.NewBlockWithCid(protoData, blk.CID()) + if err != nil { + return nil, fmt.Errorf("assembling Bitswap block: %w", err) + } + + return bitswapBlk, nil +} diff --git a/share/shwap/p2p/bitswap/block_test.go b/share/shwap/p2p/bitswap/block_test.go new file mode 100644 index 0000000000..2f01229dfc --- /dev/null +++ b/share/shwap/p2p/bitswap/block_test.go @@ -0,0 +1,112 @@ +package bitswap + +import ( + "context" + crand "crypto/rand" + "encoding/binary" + "testing" + "time" + + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/share" + eds "github.com/celestiaorg/celestia-node/share/new_eds" +) + +const ( + testCodec = 0x9999 + testMultihashCode = 0x9999 + testIDSize = 2 +) + +func init() { + registerBlock( + testMultihashCode, + testCodec, + testIDSize, + func(cid cid.Cid) (Block, error) { + return newEmptyTestBlock(cid) + }, + ) +} + +func testBlockstore(ctx context.Context, t *testing.T, items int) (blockstore.Blockstore, *cid.Set) { + bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + + cids := cid.NewSet() + for i := range items { + blk := newTestBlock(i) + bitswapBlk, err := convertBitswap(blk) + require.NoError(t, err) + err = bstore.Put(ctx, bitswapBlk) + require.NoError(t, err) + cids.Add(blk.CID()) + } + return bstore, cids +} + +type testID uint16 + +func (t testID) MarshalBinary() (data []byte, err error) { + data = binary.BigEndian.AppendUint16(data, uint16(t)) + return data, nil +} + +func (t *testID) UnmarshalBinary(data []byte) error { + *t = testID(binary.BigEndian.Uint16(data)) + return nil +} + +type testBlock struct { + id testID + data []byte +} + +func newTestBlock(id int) *testBlock { + bytes := make([]byte, 256) + _, _ = crand.Read(bytes) + return &testBlock{id: testID(id), data: bytes} +} + +func newEmptyTestBlock(cid cid.Cid) (*testBlock, error) { + idData, err := extractCID(cid) + if err != nil { + return nil, err + } + + var id testID + err = id.UnmarshalBinary(idData) + if err != nil { + return nil, err + } + + return &testBlock{id: id}, nil +} + +func (t *testBlock) CID() cid.Cid { + return encodeCID(t.id, testMultihashCode, testCodec) +} + +func (t *testBlock) Height() uint64 { + return 1 +} + +func (t *testBlock) Populate(context.Context, eds.Accessor) error { + return nil // noop +} + +func (t *testBlock) Marshal() ([]byte, error) { + return t.data, nil +} + +func (t *testBlock) UnmarshalFn(*share.Root) UnmarshalFn { + return func(bytes []byte) error { + t.data = bytes + time.Sleep(time.Millisecond * 1) + return nil + } +} diff --git a/share/shwap/p2p/bitswap/cid.go b/share/shwap/p2p/bitswap/cid.go new file mode 100644 index 0000000000..8e93fed548 --- /dev/null +++ b/share/shwap/p2p/bitswap/cid.go @@ -0,0 +1,53 @@ +package bitswap + +import ( + "encoding" + "fmt" + + "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" +) + +// extractCID retrieves Shwap ID out of the CID. +func extractCID(cid cid.Cid) ([]byte, error) { + if err := validateCID(cid); err != nil { + return nil, err + } + // mhPrefixSize is the size of the multihash prefix that used to cut it off. + const mhPrefixSize = 4 + return cid.Hash()[mhPrefixSize:], nil +} + +// encodeCID encodes Shwap ID into the CID. +func encodeCID(bm encoding.BinaryMarshaler, mhcode, codec uint64) cid.Cid { + data, err := bm.MarshalBinary() + if err != nil { + panic(fmt.Errorf("marshaling for CID: %w", err)) + } + + buf, err := mh.Encode(data, mhcode) + if err != nil { + panic(fmt.Errorf("encoding to CID: %w", err)) + } + + return cid.NewCidV1(codec, buf) +} + +// validateCID checks correctness of the CID. +func validateCID(cid cid.Cid) error { + prefix := cid.Prefix() + spec, ok := specRegistry[prefix.MhType] + if !ok { + return fmt.Errorf("unsupported multihash type %d", prefix.MhType) + } + + if prefix.Codec != spec.codec { + return fmt.Errorf("invalid CID codec %d", prefix.Codec) + } + + if prefix.MhLength != spec.idSize { + return fmt.Errorf("invalid multihash length %d", prefix.MhLength) + } + + return nil +} diff --git a/share/shwap/p2p/bitswap/pb/bitswap.pb.go b/share/shwap/p2p/bitswap/pb/bitswap.pb.go new file mode 100644 index 0000000000..a84077e9ef --- /dev/null +++ b/share/shwap/p2p/bitswap/pb/bitswap.pb.go @@ -0,0 +1,372 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: share/shwap/p2p/bitswap/pb/bitswap.proto + +package pb + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type Block struct { + Cid []byte `protobuf:"bytes,1,opt,name=cid,proto3" json:"cid,omitempty"` + Container []byte `protobuf:"bytes,2,opt,name=container,proto3" json:"container,omitempty"` +} + +func (m *Block) Reset() { *m = Block{} } +func (m *Block) String() string { return proto.CompactTextString(m) } +func (*Block) ProtoMessage() {} +func (*Block) Descriptor() ([]byte, []int) { + return fileDescriptor_09fd4e2ff1d5ce94, []int{0} +} +func (m *Block) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Block) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Block.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Block) XXX_Merge(src proto.Message) { + xxx_messageInfo_Block.Merge(m, src) +} +func (m *Block) XXX_Size() int { + return m.Size() +} +func (m *Block) XXX_DiscardUnknown() { + xxx_messageInfo_Block.DiscardUnknown(m) +} + +var xxx_messageInfo_Block proto.InternalMessageInfo + +func (m *Block) GetCid() []byte { + if m != nil { + return m.Cid + } + return nil +} + +func (m *Block) GetContainer() []byte { + if m != nil { + return m.Container + } + return nil +} + +func init() { + proto.RegisterType((*Block)(nil), "bitswap.Block") +} + +func init() { + proto.RegisterFile("share/shwap/p2p/bitswap/pb/bitswap.proto", fileDescriptor_09fd4e2ff1d5ce94) +} + +var fileDescriptor_09fd4e2ff1d5ce94 = []byte{ + // 171 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x28, 0xce, 0x48, 0x2c, + 0x4a, 0xd5, 0x2f, 0xce, 0x28, 0x4f, 0x2c, 0xd0, 0x2f, 0x30, 0x2a, 0xd0, 0x4f, 0xca, 0x2c, 0x29, + 0x06, 0xb3, 0x93, 0x60, 0x4c, 0xbd, 0x82, 0xa2, 0xfc, 0x92, 0x7c, 0x21, 0x76, 0x28, 0x57, 0xc9, + 0x9c, 0x8b, 0xd5, 0x29, 0x27, 0x3f, 0x39, 0x5b, 0x48, 0x80, 0x8b, 0x39, 0x39, 0x33, 0x45, 0x82, + 0x51, 0x81, 0x51, 0x83, 0x27, 0x08, 0xc4, 0x14, 0x92, 0xe1, 0xe2, 0x4c, 0xce, 0xcf, 0x2b, 0x49, + 0xcc, 0xcc, 0x4b, 0x2d, 0x92, 0x60, 0x02, 0x8b, 0x23, 0x04, 0x9c, 0x22, 0x4f, 0x3c, 0x92, 0x63, + 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x09, 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, + 0x63, 0xb8, 0xf1, 0x58, 0x8e, 0x21, 0xca, 0x3e, 0x3d, 0xb3, 0x24, 0xa3, 0x34, 0x49, 0x2f, 0x39, + 0x3f, 0x57, 0x3f, 0x39, 0x35, 0x27, 0xb5, 0xb8, 0x24, 0x33, 0x31, 0xbf, 0x28, 0x1d, 0xce, 0xd6, + 0xcd, 0xcb, 0x4f, 0x01, 0x39, 0x12, 0x97, 0x53, 0x93, 0xd8, 0xc0, 0x6e, 0x34, 0x06, 0x04, 0x00, + 0x00, 0xff, 0xff, 0xe7, 0x9c, 0x32, 0xc5, 0xcf, 0x00, 0x00, 0x00, +} + +func (m *Block) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Block) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Block) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Container) > 0 { + i -= len(m.Container) + copy(dAtA[i:], m.Container) + i = encodeVarintBitswap(dAtA, i, uint64(len(m.Container))) + i-- + dAtA[i] = 0x12 + } + if len(m.Cid) > 0 { + i -= len(m.Cid) + copy(dAtA[i:], m.Cid) + i = encodeVarintBitswap(dAtA, i, uint64(len(m.Cid))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintBitswap(dAtA []byte, offset int, v uint64) int { + offset -= sovBitswap(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *Block) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Cid) + if l > 0 { + n += 1 + l + sovBitswap(uint64(l)) + } + l = len(m.Container) + if l > 0 { + n += 1 + l + sovBitswap(uint64(l)) + } + return n +} + +func sovBitswap(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozBitswap(x uint64) (n int) { + return sovBitswap(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Block) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBitswap + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Block: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Block: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Cid", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBitswap + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthBitswap + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthBitswap + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Cid = append(m.Cid[:0], dAtA[iNdEx:postIndex]...) + if m.Cid == nil { + m.Cid = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Container", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBitswap + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthBitswap + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthBitswap + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Container = append(m.Container[:0], dAtA[iNdEx:postIndex]...) + if m.Container == nil { + m.Container = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipBitswap(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthBitswap + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipBitswap(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBitswap + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBitswap + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBitswap + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthBitswap + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupBitswap + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthBitswap + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthBitswap = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowBitswap = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupBitswap = fmt.Errorf("proto: unexpected end of group") +) diff --git a/share/shwap/p2p/bitswap/pb/bitswap.proto b/share/shwap/p2p/bitswap/pb/bitswap.proto new file mode 100644 index 0000000000..3ba19aa49c --- /dev/null +++ b/share/shwap/p2p/bitswap/pb/bitswap.proto @@ -0,0 +1,9 @@ +// Defined in CIP-19 https://github.com/celestiaorg/CIPs/blob/82aeb7dfc472105a11babffd548c730c899a3d24/cips/cip-19.md +syntax = "proto3"; +package bitswap; +option go_package = "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap/pb"; + +message Block { + bytes cid = 1; + bytes container = 2; +} diff --git a/share/shwap/p2p/bitswap/row_block.go b/share/shwap/p2p/bitswap/row_block.go new file mode 100644 index 0000000000..927b7f320b --- /dev/null +++ b/share/shwap/p2p/bitswap/row_block.go @@ -0,0 +1,118 @@ +package bitswap + +import ( + "context" + "fmt" + + "github.com/ipfs/go-cid" + + "github.com/celestiaorg/rsmt2d" + + "github.com/celestiaorg/celestia-node/share" + eds "github.com/celestiaorg/celestia-node/share/new_eds" + "github.com/celestiaorg/celestia-node/share/shwap" + shwappb "github.com/celestiaorg/celestia-node/share/shwap/pb" +) + +const ( + // rowCodec is a CID codec used for row Bitswap requests over Namespaced Merkle Tree. + rowCodec = 0x7800 + + // rowMultihashCode is the multihash code for custom axis sampling multihash function. + rowMultihashCode = 0x7801 +) + +func init() { + registerBlock( + rowMultihashCode, + rowCodec, + shwap.RowIDSize, + func(cid cid.Cid) (Block, error) { + return EmptyRowBlockFromCID(cid) + }, + ) +} + +// RowBlock is a Bitswap compatible block for Shwap's Row container. +type RowBlock struct { + ID shwap.RowID + + Container shwap.Row +} + +// NewEmptyRowBlock constructs a new empty RowBlock. +func NewEmptyRowBlock(height uint64, rowIdx int, root *share.Root) (*RowBlock, error) { + id, err := shwap.NewRowID(height, rowIdx, root) + if err != nil { + return nil, err + } + + return &RowBlock{ID: id}, nil +} + +// EmptyRowBlockFromCID constructs an empty RowBlock out of the CID. +func EmptyRowBlockFromCID(cid cid.Cid) (*RowBlock, error) { + ridData, err := extractCID(cid) + if err != nil { + return nil, err + } + + rid, err := shwap.RowIDFromBinary(ridData) + if err != nil { + return nil, fmt.Errorf("while unmarhaling RowBlock: %w", err) + } + return &RowBlock{ID: rid}, nil +} + +func (rb *RowBlock) CID() cid.Cid { + return encodeCID(rb.ID, rowMultihashCode, rowCodec) +} + +func (rb *RowBlock) Height() uint64 { + return rb.ID.Height +} + +func (rb *RowBlock) Marshal() ([]byte, error) { + if rb.Container.IsEmpty() { + return nil, fmt.Errorf("cannot marshal empty RowBlock") + } + + container := rb.Container.ToProto() + containerData, err := container.Marshal() + if err != nil { + return nil, fmt.Errorf("marshaling RowBlock container: %w", err) + } + + return containerData, nil +} + +func (rb *RowBlock) Populate(ctx context.Context, eds eds.Accessor) error { + half, err := eds.AxisHalf(ctx, rsmt2d.Row, rb.ID.RowIndex) + if err != nil { + return fmt.Errorf("accessing Row AxisHalf: %w", err) + } + + rb.Container = half.ToRow() + return nil +} + +func (rb *RowBlock) UnmarshalFn(root *share.Root) UnmarshalFn { + return func(data []byte) error { + if !rb.Container.IsEmpty() { + return nil + } + + var row shwappb.Row + if err := row.Unmarshal(data); err != nil { + return fmt.Errorf("unmarshaling Row: %w", err) + } + + cntr := shwap.RowFromProto(&row) + if err := cntr.Validate(root, rb.ID.RowIndex); err != nil { + return fmt.Errorf("validating Row: %w", err) + } + + rb.Container = cntr + return nil + } +} diff --git a/share/shwap/p2p/bitswap/row_block_test.go b/share/shwap/p2p/bitswap/row_block_test.go new file mode 100644 index 0000000000..36902b0e76 --- /dev/null +++ b/share/shwap/p2p/bitswap/row_block_test.go @@ -0,0 +1,38 @@ +package bitswap + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/edstest" +) + +func TestRow_FetchRoundtrip(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + eds := edstest.RandEDS(t, 4) + root, err := share.NewRoot(eds) + require.NoError(t, err) + exchange := newExchangeOverEDS(ctx, t, eds) + + blks := make([]Block, eds.Width()) + for i := range blks { + blk, err := NewEmptyRowBlock(1, i, root) + require.NoError(t, err) + blks[i] = blk + } + + err = Fetch(ctx, exchange, root, blks) + require.NoError(t, err) + + for _, blk := range blks { + row := blk.(*RowBlock) + err = row.Container.Validate(root, row.ID.RowIndex) + require.NoError(t, err) + } +} diff --git a/share/shwap/p2p/bitswap/row_namespace_data_block.go b/share/shwap/p2p/bitswap/row_namespace_data_block.go new file mode 100644 index 0000000000..6b7f905fe2 --- /dev/null +++ b/share/shwap/p2p/bitswap/row_namespace_data_block.go @@ -0,0 +1,122 @@ +package bitswap + +import ( + "context" + "fmt" + + "github.com/ipfs/go-cid" + + "github.com/celestiaorg/celestia-node/share" + eds "github.com/celestiaorg/celestia-node/share/new_eds" + "github.com/celestiaorg/celestia-node/share/shwap" + shwappb "github.com/celestiaorg/celestia-node/share/shwap/pb" +) + +const ( + // rowNamespaceDataCodec is a CID codec used for data Bitswap requests over Namespaced Merkle Tree. + rowNamespaceDataCodec = 0x7820 + + // rowNamespaceDataMultihashCode is the multihash code for data multihash function. + rowNamespaceDataMultihashCode = 0x7821 +) + +func init() { + registerBlock( + rowNamespaceDataMultihashCode, + rowNamespaceDataCodec, + shwap.RowNamespaceDataIDSize, + func(cid cid.Cid) (Block, error) { + return EmptyRowNamespaceDataBlockFromCID(cid) + }, + ) +} + +// RowNamespaceDataBlock is a Bitswap compatible block for Shwap's RowNamespaceData container. +type RowNamespaceDataBlock struct { + ID shwap.RowNamespaceDataID + + Container shwap.RowNamespaceData +} + +// NewEmptyRowNamespaceDataBlock constructs a new empty RowNamespaceDataBlock. +func NewEmptyRowNamespaceDataBlock( + height uint64, + rowIdx int, + namespace share.Namespace, + root *share.Root, +) (*RowNamespaceDataBlock, error) { + id, err := shwap.NewRowNamespaceDataID(height, rowIdx, namespace, root) + if err != nil { + return nil, err + } + + return &RowNamespaceDataBlock{ID: id}, nil +} + +// EmptyRowNamespaceDataBlockFromCID constructs an empty RowNamespaceDataBlock out of the CID. +func EmptyRowNamespaceDataBlockFromCID(cid cid.Cid) (*RowNamespaceDataBlock, error) { + rndidData, err := extractCID(cid) + if err != nil { + return nil, err + } + + rndid, err := shwap.RowNamespaceDataIDFromBinary(rndidData) + if err != nil { + return nil, fmt.Errorf("unmarhalling RowNamespaceDataBlock: %w", err) + } + + return &RowNamespaceDataBlock{ID: rndid}, nil +} + +func (rndb *RowNamespaceDataBlock) CID() cid.Cid { + return encodeCID(rndb.ID, rowNamespaceDataMultihashCode, rowNamespaceDataCodec) +} + +func (rndb *RowNamespaceDataBlock) Height() uint64 { + return rndb.ID.Height +} + +func (rndb *RowNamespaceDataBlock) Marshal() ([]byte, error) { + if rndb.Container.IsEmpty() { + return nil, fmt.Errorf("cannot marshal empty RowNamespaceDataBlock") + } + + container := rndb.Container.ToProto() + containerData, err := container.Marshal() + if err != nil { + return nil, fmt.Errorf("marshaling RowNamespaceDataBlock container: %w", err) + } + + return containerData, nil +} + +func (rndb *RowNamespaceDataBlock) Populate(ctx context.Context, eds eds.Accessor) error { + rnd, err := eds.RowNamespaceData(ctx, rndb.ID.DataNamespace, rndb.ID.RowIndex) + if err != nil { + return fmt.Errorf("accessing RowNamespaceData: %w", err) + } + + rndb.Container = rnd + return nil +} + +func (rndb *RowNamespaceDataBlock) UnmarshalFn(root *share.Root) UnmarshalFn { + return func(data []byte) error { + if !rndb.Container.IsEmpty() { + return nil + } + + var rnd shwappb.RowNamespaceData + if err := rnd.Unmarshal(data); err != nil { + return fmt.Errorf("unmarshaling RowNamespaceData: %w", err) + } + + cntr := shwap.RowNamespaceDataFromProto(&rnd) + if err := cntr.Validate(root, rndb.ID.DataNamespace, rndb.ID.RowIndex); err != nil { + return fmt.Errorf("validating RowNamespaceData: %w", err) + } + + rndb.Container = cntr + return nil + } +} diff --git a/share/shwap/p2p/bitswap/row_namespace_data_block_test.go b/share/shwap/p2p/bitswap/row_namespace_data_block_test.go new file mode 100644 index 0000000000..391ee7a825 --- /dev/null +++ b/share/shwap/p2p/bitswap/row_namespace_data_block_test.go @@ -0,0 +1,39 @@ +package bitswap + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/edstest" + "github.com/celestiaorg/celestia-node/share/sharetest" +) + +func TestRowNamespaceData_FetchRoundtrip(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + namespace := sharetest.RandV0Namespace() + eds, root := edstest.RandEDSWithNamespace(t, namespace, 64, 16) + exchange := newExchangeOverEDS(ctx, t, eds) + + rowIdxs := share.RowsWithNamespace(root, namespace) + blks := make([]Block, len(rowIdxs)) + for i, rowIdx := range rowIdxs { + blk, err := NewEmptyRowNamespaceDataBlock(1, rowIdx, namespace, root) + require.NoError(t, err) + blks[i] = blk + } + + err := Fetch(ctx, exchange, root, blks) + require.NoError(t, err) + + for _, blk := range blks { + rnd := blk.(*RowNamespaceDataBlock) + err = rnd.Container.Validate(root, rnd.ID.DataNamespace, rnd.ID.RowIndex) + require.NoError(t, err) + } +} diff --git a/share/shwap/p2p/bitswap/sample_block.go b/share/shwap/p2p/bitswap/sample_block.go new file mode 100644 index 0000000000..625ca2bf08 --- /dev/null +++ b/share/shwap/p2p/bitswap/sample_block.go @@ -0,0 +1,117 @@ +package bitswap + +import ( + "context" + "fmt" + + "github.com/ipfs/go-cid" + + "github.com/celestiaorg/celestia-node/share" + eds "github.com/celestiaorg/celestia-node/share/new_eds" + "github.com/celestiaorg/celestia-node/share/shwap" + shwappb "github.com/celestiaorg/celestia-node/share/shwap/pb" +) + +const ( + // sampleCodec is a CID codec used for share sampling Bitswap requests over Namespaced + // Merkle Tree. + sampleCodec = 0x7810 + + // sampleMultihashCode is the multihash code for share sampling multihash function. + sampleMultihashCode = 0x7811 +) + +func init() { + registerBlock( + sampleMultihashCode, + sampleCodec, + shwap.SampleIDSize, + func(cid cid.Cid) (Block, error) { + return EmptySampleBlockFromCID(cid) + }, + ) +} + +// SampleBlock is a Bitswap compatible block for Shwap's Sample container. +type SampleBlock struct { + ID shwap.SampleID + Container shwap.Sample +} + +// NewEmptySampleBlock constructs a new empty SampleBlock. +func NewEmptySampleBlock(height uint64, rowIdx, colIdx int, root *share.Root) (*SampleBlock, error) { + id, err := shwap.NewSampleID(height, rowIdx, colIdx, root) + if err != nil { + return nil, err + } + + return &SampleBlock{ID: id}, nil +} + +// EmptySampleBlockFromCID constructs an empty SampleBlock out of the CID. +func EmptySampleBlockFromCID(cid cid.Cid) (*SampleBlock, error) { + sidData, err := extractCID(cid) + if err != nil { + return nil, err + } + + sid, err := shwap.SampleIDFromBinary(sidData) + if err != nil { + return nil, fmt.Errorf("while unmarhaling SampleBlock: %w", err) + } + + return &SampleBlock{ID: sid}, nil +} + +func (sb *SampleBlock) CID() cid.Cid { + return encodeCID(sb.ID, sampleMultihashCode, sampleCodec) +} + +func (sb *SampleBlock) Height() uint64 { + return sb.ID.Height +} + +func (sb *SampleBlock) Marshal() ([]byte, error) { + if sb.Container.IsEmpty() { + return nil, fmt.Errorf("cannot marshal empty SampleBlock") + } + + container := sb.Container.ToProto() + containerData, err := container.Marshal() + if err != nil { + return nil, fmt.Errorf("marshaling SampleBlock container: %w", err) + } + + return containerData, nil +} + +func (sb *SampleBlock) Populate(ctx context.Context, eds eds.Accessor) error { + smpl, err := eds.Sample(ctx, sb.ID.RowIndex, sb.ID.ShareIndex) + if err != nil { + return fmt.Errorf("accessing Sample: %w", err) + } + + sb.Container = smpl + return nil +} + +func (sb *SampleBlock) UnmarshalFn(root *share.Root) UnmarshalFn { + return func(data []byte) error { + if !sb.Container.IsEmpty() { + return nil + } + + var sample shwappb.Sample + if err := sample.Unmarshal(data); err != nil { + return fmt.Errorf("unmarshaling Sample: %w", err) + } + + cntr := shwap.SampleFromProto(&sample) + if err := cntr.Validate(root, sb.ID.RowIndex, sb.ID.ShareIndex); err != nil { + return fmt.Errorf("validating Sample: %w", err) + } + + sb.Container = cntr + return nil + } +} diff --git a/share/shwap/p2p/bitswap/sample_block_test.go b/share/shwap/p2p/bitswap/sample_block_test.go new file mode 100644 index 0000000000..0733acebdf --- /dev/null +++ b/share/shwap/p2p/bitswap/sample_block_test.go @@ -0,0 +1,41 @@ +package bitswap + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds/edstest" +) + +func TestSample_FetchRoundtrip(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + eds := edstest.RandEDS(t, 32) + root, err := share.NewRoot(eds) + require.NoError(t, err) + exchange := newExchangeOverEDS(ctx, t, eds) + + width := int(eds.Width()) + blks := make([]Block, 0, width*width) + for x := 0; x < width; x++ { + for y := 0; y < width; y++ { + blk, err := NewEmptySampleBlock(1, x, y, root) + require.NoError(t, err) + blks = append(blks, blk) + } + } + + err = Fetch(ctx, exchange, root, blks) + require.NoError(t, err) + + for _, sample := range blks { + blk := sample.(*SampleBlock) + err = blk.Container.Validate(root, blk.ID.RowIndex, blk.ID.ShareIndex) + require.NoError(t, err) + } +} diff --git a/share/shwap/row.go b/share/shwap/row.go index f03c7366f5..c1126cd8a7 100644 --- a/share/shwap/row.go +++ b/share/shwap/row.go @@ -75,6 +75,11 @@ func (r Row) ToProto() *pb.Row { } } +// IsEmpty reports whether the Row is empty, i.e. doesn't contain any shares. +func (r Row) IsEmpty() bool { + return r.halfShares == nil +} + // Validate checks if the row's shares match the expected number from the root data and validates // the side of the row. func (r Row) Validate(dah *share.Root, idx int) error { diff --git a/share/shwap/row_namespace_data.go b/share/shwap/row_namespace_data.go index 5d424ee0f3..07ffd8f967 100644 --- a/share/shwap/row_namespace_data.go +++ b/share/shwap/row_namespace_data.go @@ -133,6 +133,11 @@ func (rnd RowNamespaceData) ToProto() *pb.RowNamespaceData { } } +// IsEmpty reports whether the RowNamespaceData is empty, i.e. doesn't contain a proof. +func (rnd RowNamespaceData) IsEmpty() bool { + return rnd.Proof == nil +} + // Validate checks validity of the RowNamespaceData against the Root, Namespace and Row index. func (rnd RowNamespaceData) Validate(dah *share.Root, namespace share.Namespace, rowIdx int) error { if rnd.Proof == nil || rnd.Proof.IsEmptyProof() { diff --git a/share/shwap/sample.go b/share/shwap/sample.go index cb263415ad..25c6da609b 100644 --- a/share/shwap/sample.go +++ b/share/shwap/sample.go @@ -78,6 +78,11 @@ func (s Sample) ToProto() *pb.Sample { } } +// IsEmpty reports whether the Sample is empty, i.e. doesn't contain a proof. +func (s Sample) IsEmpty() bool { + return s.Proof == nil +} + // Validate checks the inclusion of the share using its Merkle proof under the specified root. // Returns an error if the proof is invalid or does not correspond to the indicated proof type. func (s Sample) Validate(dah *share.Root, rowIdx, colIdx int) error {