Skip to content

Commit

Permalink
Merge branch 'main' into move-providing-to-blockservice
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan committed Nov 18, 2024
2 parents 6af1b50 + 625aadd commit 14bfdc0
Show file tree
Hide file tree
Showing 37 changed files with 1,048 additions and 785 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/gateway-sharness.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ jobs:
shell: bash
steps:
- name: Setup Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: 1.22.x
go-version: 1.23.x
- name: Checkout boxo
uses: actions/checkout@v3
with:
Expand All @@ -34,10 +34,11 @@ jobs:
run: |
go mod edit -replace=github.com/ipfs/boxo=../boxo
make mod_tidy
cat go.mod
working-directory: kubo
- name: Install sharness dependencies
run: make test_sharness_deps
run: |
find . -name go.mod -execdir go mod tidy \;
make test_sharness_deps
working-directory: kubo
- name: Run Kubo Sharness Tests
run: find . -maxdepth 1 -name "*gateway*.sh" -print0 | xargs -0 -I {} bash -c "echo {}; {}"
Expand Down
58 changes: 55 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,69 @@ The following emojis are used to highlight certain changes:

### Added

### Changed

- No longer using `github.com/jbenet/goprocess` to avoid requiring in dependents.

### Removed

### Fixed

### Security

## [v0.24.3]

### Changed

- `go.mod` updates

### Fixed

- `bitswap/client` no longer logs `"Received provider X for cid Y not requested` to ERROR level, moved to DEBUG [#771](https://github.com/ipfs/boxo/pull/711)

## [v0.24.2]

### Changed

- updated to go-libp2p to [v0.37.0](https://github.com/libp2p/go-libp2p/releases/tag/v0.37.0)
- `ipns/pb`: removed use of deprecated `Exporter` (SA1019, [golang/protobuf#1640](https://github.com/golang/protobuf/issues/1640), [9a7055](https://github.com/ipfs/boxo/pull/699/commits/9a7055e444527d5aad3187503a1b84bcae44f7b9))

### Fixed

- `bitswap/client`: fix panic if current live count is greater than broadcast limit [#702](https://github.com/ipfs/boxo/pull/702)

## [v0.24.1]

### Changed

- `routing/http/client`: creating delegated routing client with `New` now defaults to querying delegated routing server with `DefaultProtocolFilter` ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#689](https://github.com/ipfs/boxo/pull/689)
- updated go-libp2p to [v0.36.5](https://github.com/libp2p/go-libp2p/releases/tag/v0.36.5)
- updated dependencies [#693](https://github.com/ipfs/boxo/pull/693)
- update `go-libp2p-kad-dht` to [v0.27.0](https://github.com/libp2p/go-libp2p-kad-dht/releases/tag/v0.27.0)

### Fixed

- `routing/http/client`: optional address and protocol filter parameters from [IPIP-484](https://github.com/ipfs/specs/pull/484) use human-readable `,` instead of `%2C`. [#688](https://github.com/ipfs/boxo/pull/688)
- `bitswap/client` Cleanup live wants when wants are canceled. This prevents live wants from continuing to get rebroadcasted even after the wants are canceled. [#690](https://github.com/ipfs/boxo/pull/690)
- Fix problem adding invalid CID to exhausted wants list resulting in possible performance issue. [#692](https://github.com/ipfs/boxo/pull/692)

## [v0.24.0]

### Added

* `boxo/bitswap/server`:
* A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672)
- `routing/http`: added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671)
- `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do.
* `routing/http`:
* added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671) [#678](https://github.com/ipfs/boxo/pull/678)
* added support for address and protocol filtering to the delegated routing client ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#678](https://github.com/ipfs/boxo/pull/678). To add filtering to the client, use the [`WithFilterAddrs`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#WithFilterAddrs) and [`WithFilterProtocols`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#WithFilterProtocols) options when creating the client.Client-side filtering for servers that don't support filtering is enabled by default. To disable it, use the [`disableLocalFiltering`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#disableLocalFiltering) option when creating the client.

### Changed

### Removed

### Fixed
= `unixfs/hamt` Log error instead of panic if both link and shard are nil [#393](https://github.com/ipfs/boxo/pull/393)

- `unixfs/hamt` Log error instead of panic if both link and shard are nil [#393](https://github.com/ipfs/boxo/pull/393)

### Security

Expand Down
7 changes: 3 additions & 4 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,9 @@ func (bs *Bitswap) Stat() (*Stat, error) {

func (bs *Bitswap) Close() error {
bs.net.Stop()
return multierr.Combine(
bs.Client.Close(),
bs.Server.Close(),
)
bs.Client.Close()
bs.Server.Close()
return nil
}

func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
Expand Down
43 changes: 16 additions & 27 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -117,10 +115,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
// exclusively. We should probably find another way to share logging data
ctx, cancelFunc := context.WithCancel(parent)

px := process.WithTeardown(func() error {
return nil
})

// onDontHaveTimeout is called when a want-block is sent to a peer that
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
// or when no response is received within a timeout.
Expand Down Expand Up @@ -165,9 +159,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
bs = &Client{
blockstore: bstore,
network: network,
process: px,
cancel: cancelFunc,
closing: make(chan struct{}),
pm: pm,
pqm: pqm,
sm: sm,
sim: sim,
notif: notif,
Expand All @@ -184,17 +178,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
option(bs)
}

bs.pqm.Startup()

// bind the context and process.
// do it over here to avoid closing before all setup is done.
go func() {
<-px.Closing() // process closes first
sm.Shutdown()
cancelFunc()
notif.Shutdown()
}()
procctx.CloseAfterContext(px, ctx) // parent cancelled first
pqm.Startup()

return bs
}
Expand All @@ -203,9 +187,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
type Client struct {
pm *bspm.PeerManager

// the provider query manager manages requests to find providers
pqm *bspqm.ProviderQueryManager

// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork

Expand All @@ -216,7 +197,9 @@ type Client struct {
// manages channels of outgoing blocks for sessions
notif notifications.PubSub

process process.Process
cancel context.CancelFunc
closing chan struct{}
closeOnce sync.Once

// Counters for various statistics
counterLk sync.Mutex
Expand Down Expand Up @@ -291,7 +274,7 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
defer span.End()

select {
case <-bs.process.Closing():
case <-bs.closing:
return errors.New("bitswap is closed")
default:
}
Expand All @@ -314,10 +297,10 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
return nil
}

// receiveBlocksFrom process blocks received from the network
// receiveBlocksFrom processes blocks received from the network
func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
case <-bs.closing:
return errors.New("bitswap is closed")
default:
}
Expand Down Expand Up @@ -470,7 +453,13 @@ func (bs *Client) ReceiveError(err error) {

// Close is called to shutdown the Client
func (bs *Client) Close() error {
return bs.process.Close()
bs.closeOnce.Do(func() {
close(bs.closing)
bs.sm.Shutdown()
bs.cancel()
bs.notif.Shutdown()
})
return nil
}

// GetWantlist returns the current local wantlist (both want-blocks and
Expand Down
12 changes: 6 additions & 6 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type MessageQueue struct {

// Dont touch any of these variables outside of run loop
sender bsnet.MessageSender
rebroadcastIntervalLk sync.RWMutex
rebroadcastIntervalLk sync.Mutex
rebroadcastInterval time.Duration
rebroadcastTimer *clock.Timer
// For performance reasons we just clear out the fields of the message
Expand Down Expand Up @@ -389,9 +389,9 @@ func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {

// Startup starts the processing of messages and rebroadcasting.
func (mq *MessageQueue) Startup() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastIntervalLk.Lock()
mq.rebroadcastTimer = mq.clock.Timer(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()
mq.rebroadcastIntervalLk.Unlock()
go mq.runQueue()
}

Expand Down Expand Up @@ -422,7 +422,7 @@ func (mq *MessageQueue) runQueue() {
}

var workScheduled time.Time
for mq.ctx.Err() == nil {
for {
select {
case <-mq.rebroadcastTimer.C:
mq.rebroadcastWantlist()
Expand Down Expand Up @@ -471,9 +471,9 @@ func (mq *MessageQueue) runQueue() {

// Periodically resend the list of wants to the peer
func (mq *MessageQueue) rebroadcastWantlist() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastIntervalLk.Lock()
mq.rebroadcastTimer.Reset(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()
mq.rebroadcastIntervalLk.Unlock()

// If some wants were transferred from the rebroadcast list
if mq.transferRebroadcastWants() {
Expand Down
3 changes: 2 additions & 1 deletion bitswap/client/internal/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ func (ps *impl) Shutdown() {
// corresponding to |keys|.
func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block {
blocksCh := make(chan blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
if len(keys) == 0 {
close(blocksCh)
return blocksCh
}

valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking

// prevent shutdown
ps.lk.RLock()
defer ps.lk.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type PeerManager struct {
createPeerQueue PeerQueueFactory
ctx context.Context

psLk sync.RWMutex
psLk sync.Mutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}

Expand Down
10 changes: 6 additions & 4 deletions bitswap/client/internal/peermanager/peerwantmanager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package peermanager

import (
"bytes"
"fmt"
"strings"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -158,8 +158,6 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
// sendWants only sends the peer the want-blocks and want-haves that have not
// already been sent to it.
func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))
fltWantHvs := make([]cid.Cid, 0, len(wantHaves))

// Get the existing want-blocks and want-haves for the peer
pws, ok := pwm.peerWants[p]
Expand All @@ -169,6 +167,8 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
return
}

fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))

// Iterate over the requested want-blocks
for _, c := range wantBlocks {
// If the want-block hasn't been sent to the peer
Expand Down Expand Up @@ -198,6 +198,8 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
pwm.reverseIndexAdd(c, p)
}

fltWantHvs := make([]cid.Cid, 0, len(wantHaves))

// Iterate over the requested want-haves
for _, c := range wantHaves {
// If we've already broadcasted this want, don't bother with a
Expand Down Expand Up @@ -450,7 +452,7 @@ func (pwm *peerWantManager) getWants() []cid.Cid {
}

func (pwm *peerWantManager) String() string {
var b bytes.Buffer
var b strings.Builder
for p, ws := range pwm.peerWants {
b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len()))
for _, c := range ws.wantHaves.Keys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (rpm *receivedProviderMessage) debugMessage() {
func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) {
requestStatus, ok := pqm.inProgressRequestStatuses[rpm.k]
if !ok {
log.Errorf("Received provider (%s) for cid (%s) not requested", rpm.p.String(), rpm.k.String())
log.Debugf("Received provider (%s) for cid (%s) not requested", rpm.p.String(), rpm.k.String())
return
}
requestStatus.providersSoFar = append(requestStatus.providersSoFar, rpm.p)
Expand Down
6 changes: 2 additions & 4 deletions bitswap/client/internal/session/peerresponsetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID {
return ""
}

rnd := rand.Float64()

// Find the total received blocks for all candidate peers
total := 0
for _, p := range peers {
Expand All @@ -41,6 +39,7 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID {

// Choose one of the peers with a chance proportional to the number
// of blocks received from that peer
rnd := rand.Float64()
counted := 0.0
for _, p := range peers {
counted += float64(prt.getPeerCount(p)) / float64(total)
Expand All @@ -52,8 +51,7 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID {
// We shouldn't get here unless there is some weirdness with floating point
// math that doesn't quite cover the whole range of peers in the for loop
// so just choose the last peer.
index := len(peers) - 1
return peers[index]
return peers[len(peers)-1]
}

// getPeerCount returns the number of times the peer was first to send us a
Expand Down
Loading

0 comments on commit 14bfdc0

Please sign in to comment.