-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize Parcel Resumption and Reduce Blocking in ChainPorter #1068
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -132,7 +132,7 @@ func NewChainPorter(cfg *ChainPorterConfig) *ChainPorter { | |||||||||
) | ||||||||||
return &ChainPorter{ | ||||||||||
cfg: cfg, | ||||||||||
outboundParcels: make(chan Parcel), | ||||||||||
outboundParcels: make(chan Parcel, 10), | ||||||||||
subscribers: subscribers, | ||||||||||
ContextGuard: &fn.ContextGuard{ | ||||||||||
DefaultTimeout: tapgarden.DefaultTimeout, | ||||||||||
|
@@ -150,9 +150,20 @@ func (p *ChainPorter) Start() error { | |||||||||
|
||||||||||
// Start the main chain porter goroutine. | ||||||||||
p.Wg.Add(1) | ||||||||||
go p.mainEventLoop() | ||||||||||
go func() { | ||||||||||
defer p.Wg.Done() | ||||||||||
p.mainEventLoop() | ||||||||||
}() | ||||||||||
|
||||||||||
startErr = p.resumePendingParcels() | ||||||||||
// Resume any pending parcels in a new goroutine so that we | ||||||||||
// don't delay returning from the Start method. Without this | ||||||||||
// goroutine the resumePendingParcels method would block on the | ||||||||||
// buffering constraint of the outboundParcels channel. | ||||||||||
p.Wg.Add(1) | ||||||||||
go func() { | ||||||||||
defer p.Wg.Done() | ||||||||||
startErr = p.resumePendingParcels() | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think Like this case: taproot-assets/tapfreighter/chain_porter.go Line 330 in 1b5a4ef
But here: taproot-assets/tapfreighter/chain_porter.go Line 194 in 1b5a4ef
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is it even a problem if we delay startup by waiting for pending parcels? Anything that takes a while (e.g. proof transfer) will be done in a goroutine anyway. So I don't see a pressing reason to do things async here. Also, if we do this in another goroutine, we don't need the buffered channel as already mentioned by @jharveyb. I think it just makes the behavior less deterministic (e.g. with 9 parcels the goroutine finishes almost immediately but with 11 it blocks until complete)... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC the start of each 'subsystem' is sequential and blocking on any one Line 185 in a8d8b5a
Even with proof transfer in a goroutine, if we're resuming more than one parcel I think the transfer process for the first one would block resumption of the second? And these wouldn't happen in parallel. The concrete case I was thinking of was: "If I have a wide transfer (with many recipients), what happens on restart?" I think they would attempt to be transferred, sequentially, and any issues with proof upload at that point would block the caretaker startup. Maybe I'm wrong about which errors would cause which functions to block though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We already spin up a goroutine for each individual parcel: taproot-assets/tapfreighter/chain_porter.go Line 337 in 9ac1baa
So at startup, because the main event loop already is a goroutine and just starts another one for each parcel, we can feed in the parcels to resume synchronously and block on that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, so we can handle parcels in parallel then? In that case, IIUC, the only startup delay would be calling And then we actually don't need to modify the current behavior, and the problem I was thinking of doesn't exist. I'm not sure if we have an existing test that handles multiple parcels at once, that would be good to have to validate this. |
||||||||||
}() | ||||||||||
}) | ||||||||||
|
||||||||||
return startErr | ||||||||||
|
@@ -311,8 +322,6 @@ func (p *ChainPorter) QueryParcels(ctx context.Context, | |||||||||
// requests, and attempt to complete a transfer. A response is sent back to the | ||||||||||
// caller if a transfer can be completed. Otherwise, an error is returned. | ||||||||||
func (p *ChainPorter) mainEventLoop() { | ||||||||||
defer p.Wg.Done() | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add a |
||||||||||
|
||||||||||
for { | ||||||||||
select { | ||||||||||
case outboundParcel := <-p.outboundParcels: | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to leave this as unbuffered? Otherwise we would end up trying to start the state machine with multiple packets at once:
taproot-assets/tapfreighter/chain_porter.go
Line 318 in 1b5a4ef