Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery+graph: track job set dependencies in vb #9241

Merged
merged 4 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,9 @@ type AuthenticatedGossiper struct {
// AuthenticatedGossiper lock.
chanUpdateRateLimiter map[uint64][2]*rate.Limiter

// vb is used to enforce job dependency ordering of gossip messages.
vb *ValidationBarrier

sync.Mutex
}

Expand All @@ -542,6 +545,8 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
banman: newBanman(),
}

gossiper.vb = NewValidationBarrier(1000, gossiper.quit)

gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
Expand Down Expand Up @@ -1409,10 +1414,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
log.Errorf("Unable to rebroadcast stale announcements: %v", err)
}

// We'll use this validation to ensure that we process jobs in their
// dependency order during parallel validation.
validationBarrier := graph.NewValidationBarrier(1000, d.quit)

for {
select {
// A new policy update has arrived. We'll commit it to the
Expand Down Expand Up @@ -1481,11 +1482,17 @@ func (d *AuthenticatedGossiper) networkHandler() {
// We'll set up any dependent, and wait until a free
// slot for this job opens up, this allow us to not
// have thousands of goroutines active.
validationBarrier.InitJobDependencies(announcement.msg)
annJobID, err := d.vb.InitJobDependencies(
announcement.msg,
)
if err != nil {
announcement.err <- err
continue
}

d.wg.Add(1)
go d.handleNetworkMessages(
announcement, &announcements, validationBarrier,
announcement, &announcements, annJobID,
)

// The trickle timer has ticked, which indicates we should
Expand Down Expand Up @@ -1536,28 +1543,23 @@ func (d *AuthenticatedGossiper) networkHandler() {
//
// NOTE: must be run as a goroutine.
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) {
deDuped *deDupedAnnouncements, jobID JobID) {

defer d.wg.Done()
defer vb.CompleteJob()
defer d.vb.CompleteJob()

// We should only broadcast this message forward if it originated from
// us or it wasn't received as part of our initial historical sync.
shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()

// If this message has an existing dependency, then we'll wait until
// that has been fully validated before we proceed.
err := vb.WaitForDependants(nMsg.msg)
err := d.vb.WaitForParents(jobID, nMsg.msg)
if err != nil {
log.Debugf("Validating network message %s got err: %v",
nMsg.msg.MsgType(), err)

if !graph.IsError(
err,
graph.ErrVBarrierShuttingDown,
graph.ErrParentValidationFailed,
) {

if errors.Is(err, ErrVBarrierShuttingDown) {
log.Warnf("unexpected error during validation "+
"barrier shutdown: %v", err)
}
Expand All @@ -1577,7 +1579,16 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,

// If this message had any dependencies, then we can now signal them to
// continue.
vb.SignalDependants(nMsg.msg, allow)
err = d.vb.SignalDependents(nMsg.msg, jobID)
if err != nil {
// Something is wrong if SignalDependents returns an error.
log.Errorf("SignalDependents returned error for msg=%v with "+
"JobID=%v", spew.Sdump(nMsg.msg), jobID)

nMsg.err <- err

return
}

// If the announcement was accepted, then add the emitted announcements
// to our announce batch to be broadcast once the trickle timer ticks
Expand Down Expand Up @@ -2407,7 +2418,6 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
err,
graph.ErrOutdated,
graph.ErrIgnored,
graph.ErrVBarrierShuttingDown,
) {

log.Error(err)
Expand Down Expand Up @@ -3148,7 +3158,6 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
if graph.IsError(
err, graph.ErrOutdated,
graph.ErrIgnored,
graph.ErrVBarrierShuttingDown,
) {

log.Debugf("Update edge for short_chan_id(%v) got: %v",
Expand Down
Loading
Loading