Skip to content

Commit

Permalink
Remove dependency on goprocess
Browse files Browse the repository at this point in the history
The dependency on goprocess is not needed by boxo, and removing it removes the need to support it in code dependent on boxo.

Closes #709
  • Loading branch information
gammazero committed Oct 30, 2024
1 parent 29598b2 commit 1d5e869
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 279 deletions.
4 changes: 1 addition & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ The following emojis are used to highlight certain changes:

### Changed

- updated to go-libp2p to [v0.37.0](https://github.com/libp2p/go-libp2p/releases/tag/v0.37.0)
- No longer using `github.com/jbenet/goprocess` to avoid requiring in dependents.

### Removed

### Fixed

- Fix panic if current live count is greater than broadcast limit [#702](https://github.com/ipfs/boxo/pull/702)

### Security

## [v0.24.2]
Expand Down
7 changes: 3 additions & 4 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ func (bs *Bitswap) Stat() (*Stat, error) {

func (bs *Bitswap) Close() error {
bs.net.Stop()
return multierr.Combine(
bs.Client.Close(),
bs.Server.Close(),
)
bs.Client.Close()
bs.Server.Close()
return nil
}

func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
Expand Down
38 changes: 15 additions & 23 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -117,10 +115,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
// exclusively. We should probably find another way to share logging data
ctx, cancelFunc := context.WithCancel(parent)

px := process.WithTeardown(func() error {
return nil
})

// onDontHaveTimeout is called when a want-block is sent to a peer that
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
// or when no response is received within a timeout.
Expand Down Expand Up @@ -165,7 +159,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
bs = &Client{
blockstore: bstore,
network: network,
process: px,
cancel: cancelFunc,
closing: make(chan struct{}),
pm: pm,
sm: sm,
sim: sim,
Expand All @@ -185,16 +180,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore

pqm.Startup()

// bind the context and process.
// do it over here to avoid closing before all setup is done.
go func() {
<-px.Closing() // process closes first
sm.Shutdown()
cancelFunc()
notif.Shutdown()
}()
procctx.CloseAfterContext(px, ctx) // parent cancelled first

return bs
}

Expand All @@ -212,7 +197,9 @@ type Client struct {
// manages channels of outgoing blocks for sessions
notif notifications.PubSub

process process.Process
cancel context.CancelFunc
closing chan struct{}
closeOnce sync.Once

// Counters for various statistics
counterLk sync.Mutex
Expand Down Expand Up @@ -287,7 +274,7 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
defer span.End()

select {
case <-bs.process.Closing():
case <-bs.closing:
return errors.New("bitswap is closed")
default:
}
Expand All @@ -310,10 +297,10 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
return nil
}

// receiveBlocksFrom process blocks received from the network
// receiveBlocksFrom processes blocks received from the network
func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
case <-bs.closing:
return errors.New("bitswap is closed")
default:
}
Expand Down Expand Up @@ -465,8 +452,13 @@ func (bs *Client) ReceiveError(err error) {
}

// Close is called to shutdown the Client
func (bs *Client) Close() error {
return bs.process.Close()
func (bs *Client) Close() {
bs.closeOnce.Do(func() {
close(bs.closing)
bs.sm.Shutdown()
bs.cancel()
bs.notif.Shutdown()
})
}

// GetWantlist returns the current local wantlist (both want-blocks and
Expand Down
48 changes: 27 additions & 21 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/peertracker"
process "github.com/jbenet/goprocess"
"github.com/libp2p/go-libp2p/core/peer"
mh "github.com/multiformats/go-multihash"
)
Expand Down Expand Up @@ -195,6 +194,9 @@ type Engine struct {

taskWorkerLock sync.Mutex
taskWorkerCount int
waitWorkers sync.WaitGroup
cancel context.CancelFunc
closeOnce sync.Once

targetMessageSize int

Expand Down Expand Up @@ -376,12 +378,13 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer
// more tasks if it has some maximum work already outstanding.
func NewEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
opts ...Option,
) *Engine {
ctx, cancel := context.WithCancel(context.Background())

e := &Engine{
scoreLedger: NewDefaultScoreLedger(),
bstoreWorkerCount: defaults.BitswapEngineBlockstoreWorkerCount,
Expand All @@ -401,6 +404,7 @@ func NewEngine(
tagUseful: fmt.Sprintf(tagFormat, "useful", uuid.New().String()),
maxQueuedWantlistEntriesPerPeer: defaults.MaxQueuedWantlistEntiresPerPeer,
maxCidSize: defaults.MaximumAllowedCid,
cancel: cancel,
}

for _, opt := range opts {
Expand Down Expand Up @@ -437,6 +441,8 @@ func NewEngine(
log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.wantHaveReplaceSize)
}

e.startWorkers(ctx)

return e
}

Expand All @@ -462,43 +468,42 @@ func (e *Engine) SetSendDontHaves(send bool) {
// Starts the score ledger. Before start the function checks and,
// if it is unset, initializes the scoreLedger with the default
// implementation.
func (e *Engine) startScoreLedger(px process.Process) {
func (e *Engine) startScoreLedger() {
e.scoreLedger.Start(func(p peer.ID, score int) {
if score == 0 {
e.peerTagger.UntagPeer(p, e.tagUseful)
} else {
e.peerTagger.TagPeer(p, e.tagUseful, score)
}
})
px.Go(func(ppx process.Process) {
<-ppx.Closing()
e.scoreLedger.Stop()
})
}

func (e *Engine) startBlockstoreManager(px process.Process) {
// startWorkers starts workers to handle requests from other nodes for the data
// on this node.
func (e *Engine) startWorkers(ctx context.Context) {
e.bsm.start()
px.Go(func(ppx process.Process) {
<-ppx.Closing()
e.bsm.stop()
})
}

// Start up workers to handle requests from other nodes for the data on this node
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
e.startBlockstoreManager(px)
e.startScoreLedger(px)
e.startScoreLedger()

e.taskWorkerLock.Lock()
defer e.taskWorkerLock.Unlock()

e.waitWorkers.Add(e.taskWorkerCount)
for i := 0; i < e.taskWorkerCount; i++ {
px.Go(func(_ process.Process) {
e.taskWorker(ctx)
})
go e.taskWorker(ctx)
}
}

// Close shuts down the decision engine and returns after all workers have
// finished. Safe to call multiple times/concurrently.
func (e *Engine) Close() {
e.closeOnce.Do(func() {
e.cancel()
e.bsm.stop()
e.scoreLedger.Stop()
})
e.waitWorkers.Wait()
}

func (e *Engine) onPeerAdded(p peer.ID) {
e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
}
Expand All @@ -524,6 +529,7 @@ func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
// and adds them to an envelope that is passed off to the bitswap workers,
// which send the message to the network.
func (e *Engine) taskWorker(ctx context.Context) {
defer e.waitWorkers.Done()
defer e.taskWorkerExit()
for {
oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
Expand Down
Loading

0 comments on commit 1d5e869

Please sign in to comment.