Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Parcel Resumption and Reduce Blocking in ChainPorter #1068

Closed
wants to merge 3 commits into from

Conversation

ffranr
Copy link
Contributor

@ffranr ffranr commented Aug 8, 2024

This PR addresses this concern by @jharveyb : #1055 (comment)

Changes

  • Improved Goroutine Management: The WaitGroup.Done() call has been repositioned to align closely with the corresponding WaitGroup.Add(1) call, making the code easier to read and reducing the risk of errors.

  • Buffered Channel for Outbound Parcels: A buffer has been added to the outboundParcels channel, allowing the system to handle new parcels without being blocked by pending ones, which enhances overall performance.

  • Concurrent Resumption of Pending Parcels: The ChainPorter.resumePendingParcels method is now executed in a separate goroutine, ensuring that resuming pending parcels does not delay the startup process.

This commit moves the WaitGroup.Done() call closer to the corresponding
WaitGroup.Add(1) call. The purpose of this change is to group the
goroutine management code together, making it easier to read and
reducing the risk of forgetting to decrement the WaitGroup counter.
This commit introduces a buffer to the ChainPorter.outboundParcels
channel. By adding this buffer, the system can handle new parcels
without being blocked by resumed pending parcels, improving overall
efficiency and reducing potential delays.
Resume any pending parcels in a new goroutine so that we don't delay
returning from the `ChainPorter.Start` method.
@ffranr ffranr requested a review from jharveyb August 8, 2024 12:03
@ffranr ffranr self-assigned this Aug 8, 2024
@ffranr ffranr requested a review from GeorgeTsagk August 8, 2024 12:22
@@ -132,7 +132,7 @@ func NewChainPorter(cfg *ChainPorterConfig) *ChainPorter {
)
return &ChainPorter{
cfg: cfg,
outboundParcels: make(chan Parcel),
outboundParcels: make(chan Parcel, 10),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to leave this as unbuffered? Otherwise we would end up trying to start the state machine with multiple packets at once:

case outboundParcel := <-p.outboundParcels:

p.Wg.Add(1)
go func() {
defer p.Wg.Done()
startErr = p.resumePendingParcels()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think resumePendingParcels() needs to be modified to receive Quit signals?

Like this case:

But here:

p.outboundParcels <- pendingParcel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it even a problem if we delay startup by waiting for pending parcels? Anything that takes a while (e.g. proof transfer) will be done in a goroutine anyway. So I don't see a pressing reason to do things async here.

Also, if we do this in another goroutine, we don't need the buffered channel as already mentioned by @jharveyb. I think it just makes the behavior less deterministic (e.g. with 9 parcels the goroutine finishes almost immediately but with 11 it blocks until complete)...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the start of each 'subsystem' is sequential and blocking on any one Start() call would block the rest?

if err := s.cfg.ChainPorter.Start(); err != nil {

Even with proof transfer in a goroutine, if we're resuming more than one parcel I think the transfer process for the first one would block resumption of the second? And these wouldn't happen in parallel.

The concrete case I was thinking of was: "If I have a wide transfer (with many recipients), what happens on restart?"

I think they would attempt to be transferred, sequentially, and any issues with proof upload at that point would block the caretaker startup. Maybe I'm wrong about which errors would cause which functions to block though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already spin up a goroutine for each individual parcel:

go p.advanceState(sendPkg, outboundParcel.kit())

So at startup, because the main event loop already is a goroutine and just starts another one for each parcel, we can feed in the parcels to resume synchronously and block on that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so we can handle parcels in parallel then?

In that case, IIUC, the only startup delay would be calling advanceState() for each resumed parcel, which should be fast.

And then we actually don't need to modify the current behavior, and the problem I was thinking of doesn't exist.

I'm not sure if we have an existing test that handles multiple parcels at once, that would be good to have to validate this.

@@ -311,8 +314,6 @@ func (p *ChainPorter) QueryParcels(ctx context.Context,
// requests, and attempt to complete a transfer. A response is sent back to the
// caller if a transfer can be completed. Otherwise, an error is returned.
func (p *ChainPorter) mainEventLoop() {
defer p.Wg.Done()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a NOTE comment to this method that it MUST be run as a goroutine.
Normally, the defer p.Wg.Done() at the start of a method indicates this to someone reading the code.

p.Wg.Add(1)
go func() {
defer p.Wg.Done()
startErr = p.resumePendingParcels()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it even a problem if we delay startup by waiting for pending parcels? Anything that takes a while (e.g. proof transfer) will be done in a goroutine anyway. So I don't see a pressing reason to do things async here.

Also, if we do this in another goroutine, we don't need the buffered channel as already mentioned by @jharveyb. I think it just makes the behavior less deterministic (e.g. with 9 parcels the goroutine finishes almost immediately but with 11 it blocks until complete)...

@GeorgeTsagk GeorgeTsagk removed their request for review August 9, 2024 10:01
@ffranr
Copy link
Contributor Author

ffranr commented Aug 13, 2024

Closing, see #1068 (comment)

@ffranr ffranr closed this Aug 13, 2024
@jharveyb
Copy link
Collaborator

jharveyb commented Aug 13, 2024

Closing, see #1068 (comment)

@ffranr

It would still be good to have an itest that exercised having multiple transfers running concurrently. A simple way to get that would be just delaying mining a block; so submit a transfer, then a second transfer - both should end up waiting for the confirmation, but proceed as normal after a block is mined.

@ffranr
Copy link
Contributor Author

ffranr commented Aug 13, 2024

Closing, see #1068 (comment)

@ffranr

It would still be good to have an itest that exercised having multiple transfers running concurrently. A simple way to get that would be just delaying mining a block; so submit a transfer, then a second transfer - both should end up waiting for the confirmation, but proceed as normal after a block is mined.

@jharveyb good idea! I'll spin that into an issue.

@ffranr
Copy link
Contributor Author

ffranr commented Aug 13, 2024

issue: #1081

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: ✅ Done
Development

Successfully merging this pull request may close these issues.

3 participants