Skip to content

Commit

Permalink
generalize protobuf message and respective refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Jun 12, 2024
1 parent 6786c24 commit 115882c
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 671 deletions.
31 changes: 31 additions & 0 deletions share/shwap/p2p/bitswap/block.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package bitswap

import (
"fmt"

"github.com/gogo/protobuf/proto"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logger "github.com/ipfs/go-log/v2"

bitswappb "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap/pb"

"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/share"
Expand All @@ -23,6 +28,7 @@ type Block interface {
// CID returns Shwap ID of the Block formatted as CID.
CID() cid.Cid
// BlockFromEDS extract Bitswap Block out of the EDS.
// TODO: Split into MarshalBinary and Populate
BlockFromEDS(*rsmt2d.ExtendedDataSquare) (blocks.Block, error)

// IsEmpty reports whether the Block been populated with Shwap container.
Expand All @@ -32,3 +38,28 @@ type Block interface {
// Population involves data validation against the Root.
PopulateFn(*share.Root) PopulateFn
}

// toBlock converts given protobuf container into Bitswap Block.
func toBlock(cid cid.Cid, container proto.Marshaler) (blocks.Block, error) {
containerData, err := container.Marshal()
if err != nil {
return nil, fmt.Errorf("marshaling container: %w", err)
}

blkProto := bitswappb.Block{
Cid: cid.Bytes(),
Container: containerData,
}

blkData, err := blkProto.Marshal()
if err != nil {
return nil, fmt.Errorf("marshaling Block protobuf: %w", err)
}

blk, err := blocks.NewBlockWithCid(blkData, cid)
if err != nil {
return nil, fmt.Errorf("assembling Bitswap block: %w", err)
}

return blk, nil
}
77 changes: 42 additions & 35 deletions share/shwap/p2p/bitswap/block_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/ipfs/boxo/exchange"
"github.com/ipfs/go-cid"

bitswappb "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap/pb"

"github.com/celestiaorg/celestia-node/share"
)

Expand Down Expand Up @@ -47,7 +49,8 @@ func Fetch(ctx context.Context, fetcher exchange.Fetcher, root *share.Root, blks
}
// uncommon duplicate case: concurrent fetching of the same block,
// so we have to populate it ourselves instead of the hasher,
err := blk.PopulateFn(root)(bitswapBlk.RawData())
populateFn := blk.PopulateFn(root)
_, err := populate(populateFn, bitswapBlk.RawData())
if err != nil {
// this means verification succeeded in the hasher but failed here
// this case should never happen in practice
Expand All @@ -63,6 +66,42 @@ func Fetch(ctx context.Context, fetcher exchange.Fetcher, root *share.Root, blks
return ctx.Err()
}

// populate populates the data into a Block via PopulateFn
// If populateFn is nil -- gets it from the global populatorFns.
func populate(populate PopulateFn, data []byte) ([]byte, error) {
var blk bitswappb.Block
err := blk.Unmarshal(data)
if err != nil {
return nil, fmt.Errorf("unmarshalling block: %w", err)
}
cid, err := cid.Cast(blk.Cid)
if err != nil {
return nil, fmt.Errorf("casting cid: %w", err)
}
// get ID out of CID validating it
id, err := extractCID(cid)
if err != nil {
return nil, fmt.Errorf("validating cid: %w", err)
}

if populate == nil {
// get registered PopulateFn and use it to check data validity and
// pass it to Fetch caller
val, ok := populatorFns.LoadAndDelete(cid)
if !ok {
return nil, fmt.Errorf("no populator registered")
}
populate = val.(PopulateFn)
}

err = populate(blk.Container)
if err != nil {
return nil, fmt.Errorf("verifying data: %w", err)
}

return id, nil
}

// populatorFns exist to communicate between Fetch and hasher.
//
// Fetch registers PopulateFNs that hasher then uses to validate and populate Block responses coming
Expand All @@ -86,41 +125,9 @@ type hasher struct {
}

func (h *hasher) Write(data []byte) (int, error) {
if len(data) == 0 {
errMsg := "hasher: empty message"
log.Error(errMsg)
return 0, fmt.Errorf("shwap/bitswap: %s", errMsg)
}

// cut off the first tag type byte out of protobuf data
const pbTypeOffset = 1
cidData := data[pbTypeOffset:]

cid, err := readCID(cidData)
if err != nil {
err = fmt.Errorf("hasher: reading cid: %w", err)
log.Error(err)
return 0, fmt.Errorf("shwap/bitswap: %w", err)
}
// get ID out of CID and validate it
id, err := extractCID(cid)
if err != nil {
err = fmt.Errorf("hasher: validating cid: %w", err)
log.Error(err)
return 0, fmt.Errorf("shwap/bitswap: %w", err)
}
// get registered PopulateFn and use it to check data validity and
// pass it to Fetch caller
val, ok := populatorFns.LoadAndDelete(cid)
if !ok {
errMsg := "hasher: no verifier registered"
log.Error(errMsg)
return 0, fmt.Errorf("shwap/bitswap: %s", errMsg)
}
populate := val.(PopulateFn)
err = populate(data)
id, err := populate(nil, data)
if err != nil {
err = fmt.Errorf("hasher: verifying data: %w", err)
err = fmt.Errorf("hasher: %w", err)
log.Error(err)
return 0, fmt.Errorf("shwap/bitswap: %w", err)
}
Expand Down
20 changes: 0 additions & 20 deletions share/shwap/p2p/bitswap/cid.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bitswap

import (
"encoding"
"encoding/binary"
"fmt"

"github.com/ipfs/go-cid"
Expand All @@ -23,25 +22,6 @@ func (a allowlist) IsAllowed(code uint64) bool {
return ok
}

// readCID reads out cid out of bytes
func readCID(data []byte) (cid.Cid, error) {
cidLen, ln := binary.Uvarint(data)
if ln <= 0 || len(data) < ln+int(cidLen) {
return cid.Undef, fmt.Errorf("invalid data length")
}
// extract CID out of data
// we do this on the raw data to:
// * Avoid complicating hasher with generalized bytes -> type unmarshalling
// * Avoid type allocations
cidRaw := data[ln : ln+int(cidLen)]
castCid, err := cid.Cast(cidRaw)
if err != nil {
return cid.Undef, fmt.Errorf("casting cid: %w", err)
}

return castCid, nil
}

// extractCID retrieves Shwap ID out of the CID.
func extractCID(cid cid.Cid) ([]byte, error) {
if err := validateCID(cid); err != nil {
Expand Down
Loading

0 comments on commit 115882c

Please sign in to comment.