Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Parcel Resumption and Reduce Blocking in ChainPorter #1068

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions tapfreighter/chain_porter.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func NewChainPorter(cfg *ChainPorterConfig) *ChainPorter {
)
return &ChainPorter{
cfg: cfg,
outboundParcels: make(chan Parcel),
outboundParcels: make(chan Parcel, 10),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to leave this as unbuffered? Otherwise we would end up trying to start the state machine with multiple packets at once:

case outboundParcel := <-p.outboundParcels:

subscribers: subscribers,
ContextGuard: &fn.ContextGuard{
DefaultTimeout: tapgarden.DefaultTimeout,
Expand All @@ -150,9 +150,20 @@ func (p *ChainPorter) Start() error {

// Start the main chain porter goroutine.
p.Wg.Add(1)
go p.mainEventLoop()
go func() {
defer p.Wg.Done()
p.mainEventLoop()
}()

startErr = p.resumePendingParcels()
// Resume any pending parcels in a new goroutine so that we
// don't delay returning from the Start method. Without this
// goroutine the resumePendingParcels method would block on the
// buffering constraint of the outboundParcels channel.
p.Wg.Add(1)
go func() {
defer p.Wg.Done()
startErr = p.resumePendingParcels()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think resumePendingParcels() needs to be modified to receive Quit signals?

Like this case:

But here:

p.outboundParcels <- pendingParcel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it even a problem if we delay startup by waiting for pending parcels? Anything that takes a while (e.g. proof transfer) will be done in a goroutine anyway. So I don't see a pressing reason to do things async here.

Also, if we do this in another goroutine, we don't need the buffered channel as already mentioned by @jharveyb. I think it just makes the behavior less deterministic (e.g. with 9 parcels the goroutine finishes almost immediately but with 11 it blocks until complete)...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the start of each 'subsystem' is sequential and blocking on any one Start() call would block the rest?

if err := s.cfg.ChainPorter.Start(); err != nil {

Even with proof transfer in a goroutine, if we're resuming more than one parcel I think the transfer process for the first one would block resumption of the second? And these wouldn't happen in parallel.

The concrete case I was thinking of was: "If I have a wide transfer (with many recipients), what happens on restart?"

I think they would attempt to be transferred, sequentially, and any issues with proof upload at that point would block the caretaker startup. Maybe I'm wrong about which errors would cause which functions to block though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already spin up a goroutine for each individual parcel:

go p.advanceState(sendPkg, outboundParcel.kit())

So at startup, because the main event loop already is a goroutine and just starts another one for each parcel, we can feed in the parcels to resume synchronously and block on that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so we can handle parcels in parallel then?

In that case, IIUC, the only startup delay would be calling advanceState() for each resumed parcel, which should be fast.

And then we actually don't need to modify the current behavior, and the problem I was thinking of doesn't exist.

I'm not sure if we have an existing test that handles multiple parcels at once, that would be good to have to validate this.

}()
})

return startErr
Expand Down Expand Up @@ -311,8 +322,6 @@ func (p *ChainPorter) QueryParcels(ctx context.Context,
// requests, and attempt to complete a transfer. A response is sent back to the
// caller if a transfer can be completed. Otherwise, an error is returned.
func (p *ChainPorter) mainEventLoop() {
defer p.Wg.Done()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a NOTE comment to this method that it MUST be run as a goroutine.
Normally, the defer p.Wg.Done() at the start of a method indicates this to someone reading the code.


for {
select {
case outboundParcel := <-p.outboundParcels:
Expand Down
Loading