Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share): Shwap Bitwap composition #3421

Merged
merged 40 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
6fad566
composition redesign
Wondertan May 27, 2024
7d14e2b
remove generics and type assert
walldiss May 27, 2024
3041174
cleaning progress
Wondertan May 28, 2024
b3b64ba
duplicates test
Wondertan May 30, 2024
7cf5c63
populate missing containers
walldiss May 30, 2024
4943855
support for multipe populators
Wondertan May 30, 2024
108f450
remove populatorsMap and fix cid mapping for blocks
walldiss May 30, 2024
60b3a25
beutify
Wondertan May 30, 2024
a44b8b3
fix context
Wondertan May 30, 2024
94b8519
abort irrelevant change
Wondertan May 30, 2024
4ad1ac7
comment and API fixes
Wondertan Jun 4, 2024
e14c010
cid over the wire
Wondertan Jun 4, 2024
21bd564
simplify and deflakify(some cases) duplicate test
Wondertan Jun 5, 2024
3ac32ab
simplify note
Wondertan Jun 5, 2024
f1b3c49
extract readCID func
Wondertan Jun 11, 2024
9f8b0ec
generalize protobuf message and respective refactorings
Wondertan Jun 12, 2024
01439ef
migrate to Accessor
Wondertan Jun 12, 2024
5af5300
extract blockstore
Wondertan Jun 12, 2024
11ee5c7
split BlockFromEDS into several methods
Wondertan Jun 14, 2024
b25d590
populate -> marshal confusion
Wondertan Jun 14, 2024
9bd193f
improve bitswap tests and ensure there are multiple servers in the setup
Wondertan Jun 15, 2024
963b3da
notifyblock, cleanup on defer and revert duplicates and atomics
Wondertan Jun 15, 2024
e84e349
improve test for samples
Wondertan Jun 15, 2024
7732ce2
synchronize access to marshallers
Wondertan Jun 15, 2024
a6ec650
remove redundant allowlist
Wondertan Jun 15, 2024
ef45389
sessions and fetch limits
Wondertan Jun 20, 2024
491163d
pb comment
Wondertan Jun 20, 2024
502dbae
improve docs, logging and errors
Wondertan Jun 20, 2024
3e8ac7b
micro fix
Wondertan Jun 20, 2024
297e726
improve comments
Wondertan Jun 20, 2024
2b13d15
review comments
Wondertan Jun 24, 2024
9459c89
avoid nolint directive
Wondertan Jun 24, 2024
c947ce8
verify blocks aren't empty
Wondertan Jun 24, 2024
ff00d94
WithFetcher and WithStore options
Wondertan Jun 24, 2024
77fc979
improve comment
Wondertan Jun 24, 2024
b1d7285
untagle Block unmarshalling and extract proto funcs in a separate file
Wondertan Jun 25, 2024
99821a9
report number of empty blocks
Wondertan Jun 25, 2024
5ad3d76
introduce test block and avoid testing Fetch functions over real blocks
Wondertan Jun 25, 2024
4842fcb
comment improve and simplyf test construction
Wondertan Jun 26, 2024
f88f5ed
simplify and remove IsEmpty
Wondertan Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions share/shwap/p2p/bitswap/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package bitswap

import (
"context"

"github.com/ipfs/go-cid"
logger "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-node/share"
eds "github.com/celestiaorg/celestia-node/share/new_eds"
)

var log = logger.Logger("shwap/bitswap")

// Block represents Bitswap compatible generalization over Shwap containers.
// All Shwap containers must have a registered wrapper
// implementing the interface in order to be compatible with Bitswap.
// NOTE: This is not a Blockchain block, but an IPFS/Bitswap block.
type Block interface {
// CID returns Shwap ID of the Block formatted as CID.
CID() cid.Cid
// Height reports the Height of the Shwap container behind the Block.
Height() uint64

// Populate fills up the Block with the Shwap container getting it out of the EDS
// Accessor.
Populate(context.Context, eds.Accessor) error
// Marshal serializes bytes of the Shwap Container the Block holds.
// MUST exclude the Shwap ID.
Marshal() ([]byte, error)
// UnmarshalFn returns closure that unmarshal the Block with the Shwap container.
// Unmarshalling involves data validation against the given Root.
UnmarshalFn(*share.Root) UnmarshalFn
}

// UnmarshalFn is a closure produced by a Block that unmarshalls and validates
// the given serialized bytes of a Shwap container and populates the Block with it on success.
type UnmarshalFn func([]byte) error
259 changes: 259 additions & 0 deletions share/shwap/p2p/bitswap/block_fetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
package bitswap

import (
"context"
"crypto/sha256"
"fmt"
"sync"

"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"

"github.com/celestiaorg/celestia-node/share"
)

// WithFetcher instructs [Fetch] to use the given Fetcher.
// Useful for reusable Fetcher sessions.
func WithFetcher(session exchange.Fetcher) FetchOption {
return func(options *fetchOptions) {
options.Session = session
}
}

// WithStore instructs [Fetch] to store all the fetched Blocks into the given Blockstore.
func WithStore(store blockstore.Blockstore) FetchOption {
return func(options *fetchOptions) {
options.Store = store
}
}

// Fetch fetches and populates given Blocks using Fetcher wrapping Bitswap.
//
// Validates Block against the given Root and skips Blocks that are already populated.
// Gracefully synchronize identical Blocks requested simultaneously.
// Blocks until either context is canceled or all Blocks are fetched and populated.
func Fetch(ctx context.Context, exchg exchange.Interface, root *share.Root, blks []Block, opts ...FetchOption) error {
var from, to int
for to < len(blks) {
from, to = to, to+maxPerFetch
if to >= len(blks) {
to = len(blks)
}

err := fetch(ctx, exchg, root, blks[from:to], opts...)
if err != nil {
return err
}
}

return ctx.Err()
}

// maxPerFetch sets the limit for maximum items in a single fetch.
// This limit comes from server side default limit size on max possible simultaneous CID WANTs from a peer.
// https://github.com/ipfs/boxo/blob/dfd4a53ba828a368cec8d61c3fe12969ac6aa94c/bitswap/internal/defaults/defaults.go#L29-L30
const maxPerFetch = 1024

// fetch fetches given Blocks.
// See [Fetch] for detailed description.
func fetch(ctx context.Context, exchg exchange.Interface, root *share.Root, blks []Block, opts ...FetchOption) error {
var options fetchOptions
for _, opt := range opts {
opt(&options)
}

fetcher := options.getFetcher(exchg)
cids := make([]cid.Cid, 0, len(blks))
duplicates := make(map[cid.Cid]Block)
for _, blk := range blks {
cid := blk.CID() // memoize CID for reuse as it ain't free
cids = append(cids, cid)

// store the UnmarshalFn s.t. hasher can access it
// and fill in the Block
unmarshalFn := blk.UnmarshalFn(root)
_, exists := unmarshalFns.LoadOrStore(cid, &unmarshalEntry{UnmarshalFn: unmarshalFn})
if exists {
// the unmarshalFn has already been stored for the cid
// means there is ongoing fetch happening for the same cid
duplicates[cid] = blk // so mark the Block as duplicate
} else {
// cleanup are by the original requester and
// only after we are sure we got the block
defer unmarshalFns.Delete(cid)
}
}

blkCh, err := fetcher.GetBlocks(ctx, cids)
if err != nil {
return fmt.Errorf("requesting Bitswap blocks: %w", err)
}

for bitswapBlk := range blkCh { // GetBlocks closes blkCh on ctx cancellation
// NOTE: notification for duplicates is on purpose and to cover a flaky case
// It's harmless in practice to do additional notifications in case of duplicates
if err := exchg.NotifyNewBlocks(ctx, bitswapBlk); err != nil {
log.Error("failed to notify the new Bitswap block: %s", err)
}

blk, ok := duplicates[bitswapBlk.Cid()]
if ok {
// uncommon duplicate case: concurrent fetching of the same block.
// The block hasn't been invoked inside hasher verification,
// so we have to unmarshal it ourselves.
unmarshalFn := blk.UnmarshalFn(root)
err := unmarshal(unmarshalFn, bitswapBlk.RawData())
if err != nil {
// this means verification succeeded in the hasher but failed here
// this case should never happen in practice
// and if so something is really wrong
panic(fmt.Sprintf("unmarshaling duplicate block: %s", err))
}
// NOTE: This approach has a downside that we redo deserialization and computationally
// expensive computation for as many duplicates. We tried solutions that doesn't have this
// problem, but they are *much* more complex. Considering this a rare edge-case the tradeoff
// towards simplicity has been made.
continue
}
// common case: the block was populated by the hasher
// so store it if requested
err := options.store(ctx, bitswapBlk)
if err != nil {
log.Error("failed to store the new Bitswap block: %s", err)
}
}

return ctx.Err()
}

// unmarshal unmarshalls the Shwap Container data into a Block with the given UnmarshalFn
func unmarshal(unmarshalFn UnmarshalFn, data []byte) error {
_, containerData, err := unmarshalProto(data)
if err != nil {
return err
}

err = unmarshalFn(containerData)
if err != nil {
return fmt.Errorf("verifying and unmarshalling container data: %w", err)
}

return nil
}

// unmarshalFns exist to communicate between Fetch and hasher, and it's global as a necessity
//
// Fetch registers UnmarshalFNs that hasher then uses to validate and unmarshal Block responses coming
// through Bitswap
//
// Bitswap does not provide *stateful* verification out of the box and by default
// messages are verified by their respective MultiHashes that are registered globally.
// For every Block type there is a global hasher registered that accesses stored UnmarshalFn once a
// message is received. It then uses UnmarshalFn to validate and fill in the respective Block
//
// sync.Map is used to minimize contention for disjoint keys
var unmarshalFns sync.Map

// unmarshalEntry wraps UnmarshalFn with a mutex to protect it from concurrent access.
type unmarshalEntry struct {
sync.Mutex
UnmarshalFn
}

// hasher implements hash.Hash to be registered as custom multihash
// hasher is the *hack* to inject custom verification logic into Bitswap
type hasher struct {
// IDSize of the respective Shwap container
IDSize int // to be set during hasher registration

sum []byte
}

func (h *hasher) Write(data []byte) (int, error) {
err := h.write(data)
if err != nil {
err = fmt.Errorf("hasher: %w", err)
log.Error(err)
return 0, fmt.Errorf("shwap/bitswap: %w", err)
}

return len(data), nil
}

func (h *hasher) write(data []byte) error {
cid, container, err := unmarshalProto(data)
if err != nil {
return fmt.Errorf("unmarshalling proto: %w", err)
}

// getBlock ID out of CID validating it
id, err := extractCID(cid)
if err != nil {
return fmt.Errorf("validating cid: %w", err)
}

// getBlock registered UnmarshalFn and use it to check data validity and
// pass it to Fetch caller
val, ok := unmarshalFns.Load(cid)
if !ok {
return fmt.Errorf("no unmarshallers registered for %s", cid.String())
}
entry := val.(*unmarshalEntry)

// ensure UnmarshalFn is synchronized
// NOTE: Bitswap may call hasher.Write concurrently, which may call unmarshall concurrently
// this we need this synchronization.
entry.Lock()
err = entry.UnmarshalFn(container)
if err != nil {
return fmt.Errorf("verifying and unmarshalling container data: %w", err)
}
entry.Unlock()

// set the id as resulting sum
// it's required for the sum to match the requested ID
// to satisfy hash contract and signal to Bitswap that data is correct
h.sum = id
return nil
}

func (h *hasher) Sum([]byte) []byte {
return h.sum
}

func (h *hasher) Reset() {
h.sum = nil
}

func (h *hasher) Size() int {
return h.IDSize
}

func (h *hasher) BlockSize() int {
return sha256.BlockSize
}

type FetchOption func(*fetchOptions)

type fetchOptions struct {
Session exchange.Fetcher
Store blockstore.Blockstore
}

func (options *fetchOptions) getFetcher(exhng exchange.Interface) exchange.Fetcher {
if options.Session != nil {
return options.Session
}

return exhng
}

func (options *fetchOptions) store(ctx context.Context, blk blocks.Block) error {
if options.Store == nil {
return nil
}

return options.Store.Put(ctx, blk)
}
Loading
Loading